Coverage for silkaj/wot/status.py: 92%

72 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-22 12:04 +0000

1# Copyright 2016-2025 Maël Azimi <m.a@moul.re> 

2# 

3# Silkaj is free software: you can redistribute it and/or modify 

4# it under the terms of the GNU Affero General Public License as published by 

5# the Free Software Foundation, either version 3 of the License, or 

6# (at your option) any later version. 

7# 

8# Silkaj is distributed in the hope that it will be useful, 

9# but WITHOUT ANY WARRANTY; without even the implied warranty of 

10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

11# GNU Affero General Public License for more details. 

12# 

13# You should have received a copy of the GNU Affero General Public License 

14# along with Silkaj. If not, see <https://www.gnu.org/licenses/>. 

15 

16import pendulum 

17import rich_click as click 

18from duniterpy.api.bma import blockchain, wot 

19 

20from silkaj.blockchain.tools import get_blockchain_parameters 

21from silkaj.constants import DATE 

22from silkaj.network import client_instance 

23from silkaj.public_key import gen_pubkey_checksum, is_pubkey_and_check 

24from silkaj.tui import Table 

25from silkaj.wot import tools as wt 

26 

27 

28@click.command( 

29 "status", 

30 help="Check received and sent certifications and \ 

31consult the membership status of any given identity", 

32) 

33@click.argument("uid_pubkey") 

34def status(uid_pubkey: str) -> None: 

35 """ 

36 get searched id 

37 get id of received and sent certifications 

38 display in a table the result with the numbers 

39 """ 

40 client = client_instance() 

41 first_block = client(blockchain.block, 1) 

42 time_first_block = first_block["time"] 

43 

44 checked_pubkey = is_pubkey_and_check(uid_pubkey) 

45 if checked_pubkey: 

46 uid_pubkey = str(checked_pubkey) 

47 

48 identity, pubkey, signed = wt.choose_identity(uid_pubkey) 

49 certifications = {} # type: dict 

50 params = get_blockchain_parameters() 

51 

52 req = (client(wot.requirements, search=pubkey, pubkey=True))["identities"][0] 

53 certifications["received_expire"] = [] 

54 certifications["received"] = [] 

55 certifications["sent"] = [] 

56 certifications["sent_expire"] = [] 

57 for lookup_cert in identity["others"]: 

58 for req_cert in req["certifications"]: 

59 if req_cert["from"] == lookup_cert["pubkey"]: 

60 certifications["received_expire"].append( 

61 pendulum.now().add(seconds=req_cert["expiresIn"]).format(DATE), 

62 ) 

63 certifications["received"].append(f"{lookup_cert['uids'][0]}") 

64 break 

65 for pending_cert in req["pendingCerts"]: 

66 certifications["received"].append( 

67 f"{(wt.identity_of(pending_cert['from']))['uid']}", 

68 ) 

69 certifications["received_expire"].append( 

70 pendulum.from_timestamp(pending_cert["expires_on"], tz="local").format( 

71 DATE 

72 ), 

73 ) 

74 certifications["sent"], certifications["sent_expire"] = get_sent_certifications( 

75 signed, 

76 time_first_block, 

77 params, 

78 ) 

79 

80 nbr_sent_certs = len(certifications["sent"]) if "sent" in certifications else 0 

81 

82 table = Table(style="columns").set_cols_align(["r", "r", "r", "r"]) 

83 table.fill_from_dict(certifications) 

84 

85 print( 

86 f"{identity['uid']} ({gen_pubkey_checksum(pubkey, True)}) \ 

87from block #{identity['meta']['timestamp'][:15]}…\n\ 

88received {len(certifications['received'])} and sent \ 

89{nbr_sent_certs}/{params['sigStock']} certifications:\n\ 

90{table.draw()}\n\ 

91✔: Certification written in the blockchain\n\ 

92✘: Pending certification, deadline treatment\n", 

93 ) 

94 membership_status(certifications, pubkey, req) 

95 

96 

97def membership_status(certifications: dict, pubkey: str, req: dict) -> None: 

98 params = get_blockchain_parameters() 

99 if len(certifications["received"]) >= params["sigQty"]: 

100 date = certifications["received_expire"][ 

101 len(certifications["received"]) - params["sigQty"] 

102 ] 

103 print(f"Membership expiration due to certification expirations: {date}") 

104 member_lookup = wt.is_member(pubkey) 

105 is_member = bool(member_lookup) 

106 print("member:", is_member) 

107 if req["revoked"]: 

108 revoke_date = pendulum.from_timestamp(req["revoked_on"], tz="local").format( 

109 DATE 

110 ) 

111 print(f"revoked: {req['revoked']}\nrevoked on: {revoke_date}") 

112 if not is_member and req["wasMember"]: 

113 print("expired:", req["expired"], "\nwasMember:", req["wasMember"]) 

114 elif is_member: 

115 expiration_date = ( 

116 pendulum.now().add(seconds=req["membershipExpiresIn"]).format(DATE) 

117 ) 

118 print(f"Membership document expiration: {expiration_date}") 

119 print("Sentry:", req["isSentry"]) 

120 print("outdistanced:", req["outdistanced"]) 

121 

122 

123def get_sent_certifications( 

124 signed: list, 

125 time_first_block: int, 

126 params: dict, 

127) -> tuple[list[str], list[str]]: 

128 sent = [] 

129 expire = [] 

130 if signed: 

131 for cert in signed: 

132 sent.append(cert["uid"]) 

133 expire.append( 

134 expiration_date_from_block_id( 

135 cert["cert_time"]["block"], 

136 time_first_block, 

137 params, 

138 ), 

139 ) 

140 return sent, expire 

141 

142 

143def expiration_date_from_block_id( 

144 block_id: str, 

145 time_first_block: int, 

146 params: dict, 

147) -> str: 

148 expir_timestamp = ( 

149 date_approximation(block_id, time_first_block, params["avgGenTime"]) 

150 + params["sigValidity"] 

151 ) 

152 return pendulum.from_timestamp(expir_timestamp, tz="local").format(DATE) 

153 

154 

155def date_approximation(block_id, time_first_block, avgentime): 

156 return time_first_block + block_id * avgentime