Coverage for silkaj/wot/status.py: 92%

72 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-20 12:29 +0000

1# Copyright 2016-2025 Maël Azimi <m.a@moul.re> 

2# 

3# Silkaj is free software: you can redistribute it and/or modify 

4# it under the terms of the GNU Affero General Public License as published by 

5# the Free Software Foundation, either version 3 of the License, or 

6# (at your option) any later version. 

7# 

8# Silkaj is distributed in the hope that it will be useful, 

9# but WITHOUT ANY WARRANTY; without even the implied warranty of 

10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

11# GNU Affero General Public License for more details. 

12# 

13# You should have received a copy of the GNU Affero General Public License 

14# along with Silkaj. If not, see <https://www.gnu.org/licenses/>. 

15 

16import arrow 

17import rich_click as click 

18from duniterpy.api.bma import blockchain, wot 

19 

20from silkaj.blockchain.tools import get_blockchain_parameters 

21from silkaj.constants import DATE 

22from silkaj.network import client_instance 

23from silkaj.public_key import gen_pubkey_checksum, is_pubkey_and_check 

24from silkaj.tui import Table 

25from silkaj.wot import tools as wt 

26 

27 

28@click.command( 

29 "status", 

30 help="Check received and sent certifications and \ 

31consult the membership status of any given identity", 

32) 

33@click.argument("uid_pubkey") 

34def status(uid_pubkey: str) -> None: 

35 """ 

36 get searched id 

37 get id of received and sent certifications 

38 display in a table the result with the numbers 

39 """ 

40 client = client_instance() 

41 first_block = client(blockchain.block, 1) 

42 time_first_block = first_block["time"] 

43 

44 checked_pubkey = is_pubkey_and_check(uid_pubkey) 

45 if checked_pubkey: 

46 uid_pubkey = str(checked_pubkey) 

47 

48 identity, pubkey, signed = wt.choose_identity(uid_pubkey) 

49 certifications = {} # type: dict 

50 params = get_blockchain_parameters() 

51 

52 req = (client(wot.requirements, search=pubkey, pubkey=True))["identities"][0] 

53 certifications["received_expire"] = [] 

54 certifications["received"] = [] 

55 certifications["sent"] = [] 

56 certifications["sent_expire"] = [] 

57 for lookup_cert in identity["others"]: 

58 for req_cert in req["certifications"]: 

59 if req_cert["from"] == lookup_cert["pubkey"]: 

60 certifications["received_expire"].append( 

61 arrow.now().shift(seconds=req_cert["expiresIn"]).format(DATE), 

62 ) 

63 certifications["received"].append(f'{lookup_cert["uids"][0]}') 

64 break 

65 for pending_cert in req["pendingCerts"]: 

66 certifications["received"].append( 

67 f'{(wt.identity_of(pending_cert["from"]))["uid"]}', 

68 ) 

69 certifications["received_expire"].append( 

70 arrow.get(pending_cert["expires_on"]).to("local").format(DATE), 

71 ) 

72 certifications["sent"], certifications["sent_expire"] = get_sent_certifications( 

73 signed, 

74 time_first_block, 

75 params, 

76 ) 

77 

78 nbr_sent_certs = len(certifications["sent"]) if "sent" in certifications else 0 

79 

80 table = Table(style="columns").set_cols_align(["r", "r", "r", "r"]) 

81 table.fill_from_dict(certifications) 

82 

83 print( 

84 f'{identity["uid"]} ({gen_pubkey_checksum(pubkey, True)}) \ 

85from block #{identity["meta"]["timestamp"][:15]}…\n\ 

86received {len(certifications["received"])} and sent \ 

87{nbr_sent_certs}/{params["sigStock"]} certifications:\n\ 

88{table.draw()}\n\ 

89✔: Certification written in the blockchain\n\ 

90✘: Pending certification, deadline treatment\n', 

91 ) 

92 membership_status(certifications, pubkey, req) 

93 

94 

95def membership_status(certifications: dict, pubkey: str, req: dict) -> None: 

96 params = get_blockchain_parameters() 

97 if len(certifications["received"]) >= params["sigQty"]: 

98 date = certifications["received_expire"][ 

99 len(certifications["received"]) - params["sigQty"] 

100 ] 

101 print(f"Membership expiration due to certification expirations: {date}") 

102 member_lookup = wt.is_member(pubkey) 

103 is_member = bool(member_lookup) 

104 print("member:", is_member) 

105 if req["revoked"]: 

106 revoke_date = arrow.get(req["revoked_on"]).to("local").format(DATE) 

107 print(f"revoked: {req['revoked']}\nrevoked on: {revoke_date}") 

108 if not is_member and req["wasMember"]: 

109 print("expired:", req["expired"], "\nwasMember:", req["wasMember"]) 

110 elif is_member: 

111 expiration_date = ( 

112 arrow.now().shift(seconds=req["membershipExpiresIn"]).format(DATE) 

113 ) 

114 print(f"Membership document expiration: {expiration_date}") 

115 print("Sentry:", req["isSentry"]) 

116 print("outdistanced:", req["outdistanced"]) 

117 

118 

119def get_sent_certifications( 

120 signed: list, 

121 time_first_block: int, 

122 params: dict, 

123) -> tuple[list[str], list[str]]: 

124 sent = [] 

125 expire = [] 

126 if signed: 

127 for cert in signed: 

128 sent.append(cert["uid"]) 

129 expire.append( 

130 expiration_date_from_block_id( 

131 cert["cert_time"]["block"], 

132 time_first_block, 

133 params, 

134 ), 

135 ) 

136 return sent, expire 

137 

138 

139def expiration_date_from_block_id( 

140 block_id: str, 

141 time_first_block: int, 

142 params: dict, 

143) -> str: 

144 expir_timestamp = ( 

145 date_approximation(block_id, time_first_block, params["avgGenTime"]) 

146 + params["sigValidity"] 

147 ) 

148 return arrow.get(expir_timestamp).to("local").format(DATE) 

149 

150 

151def date_approximation(block_id, time_first_block, avgentime): 

152 return time_first_block + block_id * avgentime