Coverage for silkaj/money/transfer.py: 93%

210 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-20 12:29 +0000

1# Copyright 2016-2025 Maël Azimi <m.a@moul.re> 

2# 

3# Silkaj is free software: you can redistribute it and/or modify 

4# it under the terms of the GNU Affero General Public License as published by 

5# the Free Software Foundation, either version 3 of the License, or 

6# (at your option) any later version. 

7# 

8# Silkaj is distributed in the hope that it will be useful, 

9# but WITHOUT ANY WARRANTY; without even the implied warranty of 

10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

11# GNU Affero General Public License for more details. 

12# 

13# You should have received a copy of the GNU Affero General Public License 

14# along with Silkaj. If not, see <https://www.gnu.org/licenses/>. 

15 

16import math 

17import re 

18import shlex 

19import time 

20from pathlib import Path 

21from typing import Optional 

22 

23import rich_click as click 

24from duniterpy.api.bma.tx import process 

25from duniterpy.documents import ( 

26 BlockID, 

27 InputSource, 

28 OutputSource, 

29 SIGParameter, 

30 Transaction, 

31 Unlock, 

32) 

33from duniterpy.key import SigningKey 

34 

35from silkaj import auth, network, public_key, tools, tui 

36from silkaj.blockchain import tools as bc_tools 

37from silkaj.constants import ( 

38 BMA_SLEEP, 

39 CENT_MULT_TO_UNIT, 

40 MINIMAL_ABSOLUTE_TX_AMOUNT, 

41 MINIMAL_RELATIVE_TX_AMOUNT, 

42) 

43from silkaj.money import tools as m_tools 

44from silkaj.public_key import gen_pubkey_checksum 

45 

46MAX_COMMENT_LENGTH = 255 

47 

48 

49# max size for tx doc is 100 lines. 

50# Formula for accepted field numbers is: 

51# (2 * IU + 2 * IS + OUT) <= ( MAX_LINES_IN_TX_DOC - FIX_LINES) 

52# with IU = inputs/unlocks ; IS = Issuers/Signatures ; OUT = Outpouts. 

53MAX_LINES_IN_TX_DOC = 100 

54# 2 lines are necessary, and we block 1 more for the comment 

55FIX_LINES = 3 

56# assuming there is only 1 issuer and 2 outputs, max inputs is 46 

57MAX_INPUTS_PER_TX = 46 

58# assuming there is 1 issuer and 1 input, max outputs is 93. 

59MAX_OUTPUTS = 93 

60# for now, silkaj handles txs for one issuer only 

61NBR_ISSUERS = 1 

62 

63 

64@click.command("transfer", help="Transfer money") 

65@click.option( 

66 "amounts", 

67 "--amount", 

68 "-a", 

69 multiple=True, 

70 type=click.FloatRange(MINIMAL_ABSOLUTE_TX_AMOUNT), 

71 help=f"Quantitative amount(s):\n-a <amount>\nMinimum amount is {MINIMAL_ABSOLUTE_TX_AMOUNT}.", 

72 cls=tools.MutuallyExclusiveOption, 

73 mutually_exclusive=["amountsud", "allsources", "file_path"], 

74) 

75@click.option( 

76 "amountsud", 

77 "--amountUD", 

78 "-d", 

79 multiple=True, 

80 type=click.FloatRange(MINIMAL_RELATIVE_TX_AMOUNT), 

81 help=f"Relative amount(s):\n-d <amount_UD>\nMinimum amount is {MINIMAL_RELATIVE_TX_AMOUNT}", 

82 cls=tools.MutuallyExclusiveOption, 

83 mutually_exclusive=["amounts", "allsources", "file_path"], 

84) 

85@click.option( 

86 "--allSources", 

87 is_flag=True, 

88 help="Send all sources to one recipient", 

89 cls=tools.MutuallyExclusiveOption, 

90 mutually_exclusive=["amounts", "amountsud", "file_path"], 

91) 

92@click.option( 

93 "recipients", 

94 "--recipient", 

95 "-r", 

96 multiple=True, 

97 help="Pubkey(s)` recipients + optional checksum:\n-r <pubkey>[:checksum]\n\ 

98Sending to many recipients is possible:\n\ 

99* With one amount, all will receive the amount\n\ 

100* With many amounts (one per recipient)", 

101 cls=tools.MutuallyExclusiveOption, 

102 mutually_exclusive=["file_path"], 

103) 

104@click.option( 

105 "file_path", 

106 "--file", 

107 "-f", 

108 type=click.Path(exists=True, dir_okay=False, path_type=Path), 

109 help="File`s path containing a list of amounts in absolute or \ 

110relative reference and recipients` pubkeys", 

111 cls=tools.MutuallyExclusiveOption, 

112 mutually_exclusive=["recipients", "amounts", "amountsUD", "allsources"], 

113) 

114@click.option("--comment", "-c", default="", help="Comment") 

115@click.option( 

116 "--outputBackChange", 

117 help="Pubkey recipient to send the rest of the transaction: <pubkey[:checksum]>", 

118) 

119@click.option( 

120 "--yes", 

121 "-y", 

122 is_flag=True, 

123 help="Assume yes. Do not prompt confirmation", 

124) 

125def transfer_money( 

126 amounts: list[float], 

127 amountsud: list[float], 

128 allsources: bool, 

129 recipients: list[str], 

130 file_path: Path, 

131 comment: str, 

132 outputbackchange: str, 

133 yes: bool, 

134) -> None: 

135 if file_path: 

136 tx_amounts, recipients = parse_file_containing_amounts_recipients(file_path) 

137 else: 

138 if not (amounts or amountsud or allsources): 

139 tools.message_exit("Error: amount, amountUD or allSources is not set.") 

140 if not recipients: 

141 tools.message_exit("Error: A recipient should be passed") 

142 if allsources and len(recipients) > 1: 

143 tools.message_exit( 

144 "Error: the --allSources option can only be used with one recipient.", 

145 ) 

146 # compute amounts and amountsud 

147 if not allsources: 

148 tx_amounts = transaction_amount(amounts, amountsud, recipients) 

149 

150 key = auth.auth_method() 

151 issuer_pubkey = key.pubkey 

152 

153 pubkey_amount = m_tools.get_amount_from_pubkey(issuer_pubkey) 

154 if allsources: 

155 if pubkey_amount[0] <= 0: 

156 tools.message_exit( 

157 f"Error: Issuer pubkey {gen_pubkey_checksum(issuer_pubkey)} is empty. \ 

158No transaction sent.", 

159 ) 

160 

161 tx_amounts = [pubkey_amount[0]] 

162 

163 recipients = list(recipients) 

164 outputbackchange = check_transaction_values( 

165 comment, 

166 recipients, 

167 outputbackchange, 

168 pubkey_amount[0] < sum(tx_amounts), 

169 issuer_pubkey, 

170 ) 

171 

172 if not yes: 

173 table = tui.Table() 

174 table.fill_rows( 

175 gen_confirmation_table( 

176 issuer_pubkey, 

177 pubkey_amount[0], 

178 tx_amounts, 

179 recipients, 

180 outputbackchange, 

181 comment, 

182 ), 

183 ) 

184 confirmation_table = table.draw() 

185 

186 if yes or click.confirm( 

187 f"{confirmation_table}\nDo you confirm sending this transaction?", 

188 ): 

189 handle_intermediaries_transactions( 

190 key, 

191 issuer_pubkey, 

192 tx_amounts, 

193 recipients, 

194 comment, 

195 outputbackchange, 

196 ) 

197 

198 

199def parse_file_containing_amounts_recipients( 

200 file_path: Path, 

201) -> tuple[list[int], list[str]]: 

202 """ 

203 Parse file in a specific format 

204 Comments are ignored 

205 Format should be: 

206 ```txt 

207 [ABSOLUTE/RELATIVE] 

208 

209 # comment1 

210 amount1 recipient1`s pubkey 

211 # comment2 

212 amount2 recipient2`s pubkey 

213 ``` 

214 """ 

215 reference = "" 

216 amounts, recipients = [], [] 

217 with file_path.open(encoding="utf-8") as file: 

218 for n, raw_line in enumerate(file): 

219 line = shlex.split(raw_line, True) 

220 if line: 

221 if n == 0: 

222 reference = line[0] 

223 else: 

224 try: 

225 amounts.append(float(line[0])) 

226 recipients.append(line[1]) 

227 except (ValueError, IndexError): 

228 tools.message_exit(f"Syntax error at line {n + 1}") 

229 

230 if not reference or reference not in ("ABSOLUTE", "RELATIVE"): 

231 tools.message_exit( 

232 f"{file_path} must contain at first line 'ABSOLUTE' or 'RELATIVE' header", 

233 ) 

234 

235 if not amounts or not recipients: 

236 tools.message_exit("No amounts or recipients specified") 

237 

238 # Compute amount depending on the reference 

239 reference_mult = ( 

240 CENT_MULT_TO_UNIT if reference == "ABSOLUTE" else m_tools.get_ud_value() 

241 ) 

242 tx_amounts = compute_amounts(amounts, reference_mult) 

243 

244 return tx_amounts, recipients 

245 

246 

247def transaction_amount( 

248 amounts: list[float], 

249 UDs_amounts: list[float], 

250 outputAddresses: list[str], 

251) -> list[int]: 

252 """ 

253 Check that the number of passed amounts(UD) and recipients are the same 

254 Returns a list of amounts. 

255 """ 

256 # Create amounts list 

257 if amounts: 

258 amounts_list = compute_amounts(amounts, CENT_MULT_TO_UNIT) 

259 elif UDs_amounts: 

260 UD_value = m_tools.get_ud_value() 

261 amounts_list = compute_amounts(UDs_amounts, UD_value) 

262 if len(amounts_list) != len(outputAddresses) and len(amounts_list) != 1: 

263 tools.message_exit( 

264 "Error: The number of passed recipients is not the same as the passed amounts.", 

265 ) 

266 # In case one amount is passed with multiple recipients 

267 # generate list containing multiple time the same amount 

268 if len(amounts_list) == 1 and len(outputAddresses) > 1: 

269 return [amounts_list[0]] * len(outputAddresses) 

270 return amounts_list 

271 

272 

273def compute_amounts(amounts: list[float], multiplicator: float) -> list[int]: 

274 """ 

275 Computes the amounts(UD) and returns a list. 

276 Multiplicator should be either CENT_MULT_TO_UNIT or UD_Value. 

277 If relative amount, check that amount is superior to minimal amount. 

278 """ 

279 # Create amounts list 

280 amounts_list = [] 

281 for amount in amounts: 

282 computed_amount = amount * multiplicator 

283 # check if relative amounts are high enough 

284 if (multiplicator != CENT_MULT_TO_UNIT) and ( 

285 computed_amount < (MINIMAL_ABSOLUTE_TX_AMOUNT * CENT_MULT_TO_UNIT) 

286 ): 

287 tools.message_exit(f"Error: amount {amount} is too low.") 

288 amounts_list.append(round(computed_amount)) 

289 return amounts_list 

290 

291 

292def check_transaction_values( 

293 comment: str, 

294 outputAddresses: list[str], 

295 outputBackChange: str, 

296 enough_source: bool, 

297 issuer_pubkey: str, 

298) -> str: 

299 """ 

300 Check the comment format 

301 Check the pubkeys and the checksums of the recipients and the outputbackchange 

302 In case of a valid checksum, assign and return the pubkey without the checksum 

303 Check the balance is big enough for the transaction 

304 """ 

305 checkComment(comment) 

306 # we check output numbers and leave one line for the backchange. 

307 if len(outputAddresses) > (MAX_OUTPUTS - 1): 

308 tools.message_exit( 

309 f"Error : there should be less than {MAX_OUTPUTS - 1} outputs.", 

310 ) 

311 for i, outputAddress in enumerate(outputAddresses): 

312 if public_key.check_pubkey_format(outputAddress): 

313 outputAddresses[i] = public_key.validate_checksum(outputAddress) 

314 if outputBackChange and public_key.check_pubkey_format(outputBackChange): 

315 outputBackChange = public_key.validate_checksum(outputBackChange) 

316 if enough_source: 

317 pubkey = gen_pubkey_checksum(issuer_pubkey) 

318 tools.message_exit( 

319 f"{pubkey} pubkey doesn`t have enough money for this transaction.", 

320 ) 

321 return outputBackChange 

322 

323 

324def gen_confirmation_table( 

325 issuer_pubkey: str, 

326 pubkey_amount: int, 

327 tx_amounts: list[int], 

328 outputAddresses: list[str], 

329 outputBackChange: str, 

330 comment: str, 

331) -> list[list]: 

332 """ 

333 Generate transaction confirmation 

334 """ 

335 

336 currency_symbol = tools.get_currency_symbol() 

337 ud_value = m_tools.get_ud_value() 

338 total_tx_amount = sum(tx_amounts) 

339 tx = [] # type: list[list[str]] 

340 # display account situation 

341 m_tools.display_amount( 

342 tx, 

343 "Initial balance", 

344 pubkey_amount, 

345 ud_value, 

346 currency_symbol, 

347 ) 

348 m_tools.display_amount( 

349 tx, 

350 "Total transaction amount", 

351 total_tx_amount, 

352 ud_value, 

353 currency_symbol, 

354 ) 

355 m_tools.display_amount( 

356 tx, 

357 "Balance after transaction", 

358 (pubkey_amount - total_tx_amount), 

359 ud_value, 

360 currency_symbol, 

361 ) 

362 m_tools.display_pubkey(tx, "From", issuer_pubkey) 

363 # display outputs and amounts 

364 for outputAddress, tx_amount in zip(outputAddresses, tx_amounts): 

365 m_tools.display_pubkey(tx, "To", outputAddress) 

366 time.sleep(BMA_SLEEP) 

367 m_tools.display_amount(tx, "Amount", tx_amount, ud_value, currency_symbol) 

368 # display last informations 

369 if outputBackChange: 

370 m_tools.display_pubkey(tx, "Backchange", outputBackChange) 

371 tx.append(["Comment", comment]) 

372 return tx 

373 

374 

375def get_list_input_for_transaction( 

376 pubkey: str, 

377 TXamount: int, 

378 outputs_number: int, 

379) -> tuple[list[InputSource], int, bool]: 

380 listinput = m_tools.get_sources(pubkey)[0] 

381 maxInputsNumber = max_inputs_number(outputs_number, NBR_ISSUERS) 

382 # generate final list source 

383 listinputfinal = [] 

384 totalAmountInput = 0 

385 intermediatetransaction = False 

386 for nbr_inputs, _input in enumerate(listinput, start=1): 

387 listinputfinal.append(_input) 

388 totalAmountInput += m_tools.amount_in_current_base(_input) 

389 TXamount -= m_tools.amount_in_current_base(_input) 

390 # if too much sources, it's an intermediate transaction. 

391 amount_not_reached_and_max_doc_size_reached = ( 

392 TXamount > 0 and nbr_inputs >= MAX_INPUTS_PER_TX 

393 ) 

394 amount_reached_too_much_inputs = TXamount <= 0 and maxInputsNumber < nbr_inputs 

395 if ( 

396 amount_not_reached_and_max_doc_size_reached 

397 or amount_reached_too_much_inputs 

398 ): 

399 intermediatetransaction = True 

400 # if we reach the MAX_INPUTX_PER_TX limit, we send the interm.tx 

401 # if we gather the good amount, we send the tx : 

402 # - either this is no int.tx, and the tx is sent to the receiver, 

403 # - or the int.tx it is sent to the issuer before sent to the receiver. 

404 if nbr_inputs >= MAX_INPUTS_PER_TX or TXamount <= 0: 

405 break 

406 if TXamount > 0 and not intermediatetransaction: 

407 tools.message_exit("Error: you don't have enough money") 

408 return listinputfinal, totalAmountInput, intermediatetransaction 

409 

410 

411def handle_intermediaries_transactions( 

412 key: SigningKey, 

413 issuers: str, 

414 tx_amounts: list[int], 

415 outputAddresses: list[str], 

416 Comment: str = "", 

417 OutputbackChange: Optional[str] = None, 

418) -> None: 

419 while True: 

420 # consider there is always one backchange output, hence +1 

421 listinput_and_amount = get_list_input_for_transaction( 

422 issuers, 

423 sum(tx_amounts), 

424 len(outputAddresses) + 1, 

425 ) 

426 intermediatetransaction = listinput_and_amount[2] 

427 

428 if intermediatetransaction: 

429 totalAmountInput = listinput_and_amount[1] 

430 generate_and_send_transaction( 

431 key, 

432 issuers, 

433 [totalAmountInput], 

434 listinput_and_amount, 

435 [issuers], 

436 "Change operation", 

437 ) 

438 else: 

439 generate_and_send_transaction( 

440 key, 

441 issuers, 

442 tx_amounts, 

443 listinput_and_amount, 

444 outputAddresses, 

445 Comment, 

446 OutputbackChange, 

447 ) 

448 break 

449 

450 

451def max_inputs_number(outputs_number: int, issuers_number: int) -> int: 

452 """ 

453 returns the maximum number of inputs. 

454 This function does not take care of backchange line. 

455 formula is IU <= (MAX_LINES_IN_TX_DOC - FIX_LINES - O - 2*IS)/2 

456 """ 

457 return int( 

458 (MAX_LINES_IN_TX_DOC - FIX_LINES - (2 * issuers_number) - outputs_number) / 2, 

459 ) 

460 

461 

462def generate_and_send_transaction( 

463 key: SigningKey, 

464 issuers: str, 

465 tx_amounts: list[int], 

466 listinput_and_amount: tuple[list[InputSource], int, bool], 

467 outputAddresses: list[str], 

468 Comment: str, 

469 OutputbackChange: Optional[str] = None, 

470) -> None: 

471 """ 

472 Display sent transaction 

473 Generate, sign, and send transaction document 

474 """ 

475 intermediate_tx = listinput_and_amount[2] 

476 if intermediate_tx: 

477 print("Generate Change Transaction") 

478 else: 

479 print("Generate Transaction:") 

480 print(" - From: " + gen_pubkey_checksum(issuers)) 

481 for tx_amount, outputAddress in zip(tx_amounts, outputAddresses): 

482 display_sent_tx(outputAddress, tx_amount) 

483 print(" - Total: " + str(sum(tx_amounts) / 100)) 

484 

485 transaction = generate_transaction_document( 

486 issuers, 

487 tx_amounts, 

488 listinput_and_amount, 

489 outputAddresses, 

490 Comment, 

491 OutputbackChange, 

492 ) 

493 transaction.sign(key) 

494 network.send_document(process, transaction) 

495 

496 

497def display_sent_tx(outputAddress: str, amount: int) -> None: 

498 print( 

499 " - To: ", 

500 gen_pubkey_checksum(outputAddress), 

501 "\n - Amount: ", 

502 amount / 100, 

503 ) 

504 

505 

506def generate_transaction_document( 

507 issuers: str, 

508 tx_amounts: list[int], 

509 listinput_and_amount: tuple[list[InputSource], int, bool], 

510 outputAddresses: list[str], 

511 Comment: str = "", 

512 OutputbackChange: Optional[str] = None, 

513) -> Transaction: 

514 listinput = listinput_and_amount[0] 

515 totalAmountInput = listinput_and_amount[1] 

516 total_tx_amount = sum(tx_amounts) 

517 

518 head_block = bc_tools.get_head_block() 

519 

520 if not OutputbackChange: 

521 OutputbackChange = issuers 

522 

523 # If it's not a foreign exchange transaction, 

524 # we remove units after two digits after the decimal point 

525 if issuers not in outputAddresses: 

526 total_tx_amount = ( 

527 total_tx_amount // 10 ** head_block["unitbase"] 

528 ) * 10 ** head_block["unitbase"] 

529 

530 # Generate output 

531 ################ 

532 listoutput = [] # type: list[OutputSource] 

533 for tx_amount, outputAddress in zip(tx_amounts, outputAddresses): 

534 generate_output(listoutput, head_block["unitbase"], tx_amount, outputAddress) 

535 

536 # Outputs to himself 

537 rest = totalAmountInput - total_tx_amount 

538 generate_output(listoutput, head_block["unitbase"], rest, OutputbackChange) 

539 

540 # Unlocks 

541 unlocks = generate_unlocks(listinput) 

542 

543 # Generate transaction document 

544 ############################## 

545 

546 return Transaction( 

547 block_id=BlockID(head_block["number"], head_block["hash"]), 

548 locktime=0, 

549 issuers=[issuers], 

550 inputs=listinput, 

551 unlocks=unlocks, 

552 outputs=listoutput, 

553 comment=Comment, 

554 currency=head_block["currency"], 

555 ) 

556 

557 

558def generate_unlocks(listinput: list[InputSource]) -> list[Unlock]: 

559 unlocks = [] 

560 for i in range(len(listinput)): 

561 unlocks.append(Unlock(index=i, parameters=[SIGParameter(0)])) 

562 return unlocks 

563 

564 

565def generate_output( 

566 listoutput: list[OutputSource], 

567 unitbase: int, 

568 rest: int, 

569 recipient_address: str, 

570) -> None: 

571 while rest > 0: 

572 outputAmount = truncBase(rest, unitbase) 

573 rest -= outputAmount 

574 if outputAmount > 0: 

575 outputAmount = int(outputAmount / math.pow(10, unitbase)) 

576 listoutput.append( 

577 OutputSource( 

578 amount=outputAmount, 

579 base=unitbase, 

580 condition=f"SIG({recipient_address})", 

581 ), 

582 ) 

583 unitbase = unitbase - 1 

584 

585 

586def checkComment(comment: str) -> None: 

587 if len(comment) > MAX_COMMENT_LENGTH: 

588 tools.message_exit("Error: Comment is too long") 

589 regex = re.compile( 

590 "^[0-9a-zA-Z\\ \\-\\_\\:\\/\\;\\*\\[\\]\\(\\)\\?\ 

591\\!\\^\\+\\=\\@\\&\\~\\#\\{\\}\\|\\\\<\\>\\%\\.]*$", 

592 ) 

593 if not re.search(regex, comment): 

594 tools.message_exit("Error: the format of the comment is invalid") 

595 

596 

597def truncBase(amount: int, base: int) -> int: 

598 _pow = int(math.pow(10, base)) 

599 if amount < _pow: 

600 return 0 

601 return math.trunc(amount / _pow) * _pow