Coverage for silkaj/money/tools.py: 98%

57 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-22 12:04 +0000

1# Copyright 2016-2025 Maël Azimi <m.a@moul.re> 

2# 

3# Silkaj is free software: you can redistribute it and/or modify 

4# it under the terms of the GNU Affero General Public License as published by 

5# the Free Software Foundation, either version 3 of the License, or 

6# (at your option) any later version. 

7# 

8# Silkaj is distributed in the hope that it will be useful, 

9# but WITHOUT ANY WARRANTY; without even the implied warranty of 

10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

11# GNU Affero General Public License for more details. 

12# 

13# You should have received a copy of the GNU Affero General Public License 

14# along with Silkaj. If not, see <https://www.gnu.org/licenses/>. 

15 

16import functools 

17from typing import Union 

18 

19from duniterpy.api import bma 

20from duniterpy.documents.transaction import InputSource, OutputSource 

21 

22from silkaj.constants import CENT_MULT_TO_UNIT 

23from silkaj.network import client_instance 

24from silkaj.public_key import gen_pubkey_checksum 

25from silkaj.wot import tools as wt 

26 

27 

28def display_amount( 

29 tx: list, 

30 message: str, 

31 amount: float, 

32 ud_value: float, 

33 currency_symbol: str, 

34) -> None: 

35 """ 

36 Displays an amount in unit and relative reference. 

37 """ 

38 UD_amount = str(round((amount / ud_value), 2)) 

39 unit_amount = str(amount / CENT_MULT_TO_UNIT) 

40 tx.append( 

41 [ 

42 f"{message} (unit|relative)", 

43 f"{unit_amount} {currency_symbol} | {UD_amount} UD {currency_symbol}", 

44 ], 

45 ) 

46 

47 

48def display_pubkey(tx: list, message: str, pubkey: str) -> None: 

49 """ 

50 Displays a pubkey and the eventually associated identity 

51 """ 

52 tx.append([f"{message} (pubkey:checksum)", gen_pubkey_checksum(pubkey)]) 

53 idty = wt.is_member(pubkey) 

54 if idty: 

55 tx.append([f"{message} (id)", idty["uid"]]) 

56 

57 

58def get_amount_from_pubkey(pubkey: str) -> list[int]: 

59 listinput, amount = get_sources(pubkey) 

60 

61 totalAmountInput = 0 

62 for _input in listinput: 

63 totalAmountInput += amount_in_current_base(_input) 

64 return [totalAmountInput, amount] 

65 

66 

67def get_sources(pubkey: str) -> tuple[list[InputSource], int]: 

68 client = client_instance() 

69 # Sources written into the blockchain 

70 sources = client(bma.tx.sources, pubkey) 

71 

72 listinput = [] 

73 amount = 0 

74 for source in sources["sources"]: 

75 if source["conditions"] == f"SIG({pubkey})": 

76 listinput.append( 

77 InputSource( 

78 amount=source["amount"], 

79 base=source["base"], 

80 source=source["type"], 

81 origin_id=source["identifier"], 

82 index=source["noffset"], 

83 ), 

84 ) 

85 amount += amount_in_current_base(listinput[-1]) 

86 

87 # pending source 

88 history = (client(bma.tx.pending, pubkey))["history"] 

89 pendings = history["sending"] + history["pending"] 

90 

91 # add pending output 

92 pending_sources = [] 

93 for pending in pendings: 

94 for i, output in enumerate(pending["outputs"]): 

95 # duniterpy#80 

96 outputsplited = output.split(":") 

97 if outputsplited[2] == f"SIG({pubkey})": 

98 inputgenerated = InputSource( 

99 amount=int(outputsplited[0]), 

100 base=int(outputsplited[1]), 

101 source="T", 

102 origin_id=pending["hash"], 

103 index=i, 

104 ) 

105 if inputgenerated not in listinput: 

106 # add pendings before blockchain sources for change txs 

107 listinput.insert(0, inputgenerated) 

108 

109 for _input in pending["inputs"]: 

110 pending_sources.append(InputSource.from_inline(_input)) 

111 

112 # remove input already used 

113 for _input in pending_sources: 

114 if _input in listinput: 

115 listinput.remove(_input) 

116 

117 return listinput, amount 

118 

119 

120@functools.lru_cache(maxsize=1) 

121def get_ud_value() -> int: 

122 client = client_instance() 

123 blockswithud = client(bma.blockchain.ud) 

124 NBlastUDblock = blockswithud["result"]["blocks"][-1] 

125 lastUDblock = client(bma.blockchain.block, NBlastUDblock) 

126 return lastUDblock["dividend"] * 10 ** lastUDblock["unitbase"] 

127 

128 

129def amount_in_current_base(source: Union[InputSource, OutputSource]) -> int: 

130 """ 

131 Get amount in current base from input or output source 

132 """ 

133 return source.amount * 10**source.base