Coverage for silkaj/wot/status.py: 92%
72 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-22 12:04 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-22 12:04 +0000
1# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
2#
3# Silkaj is free software: you can redistribute it and/or modify
4# it under the terms of the GNU Affero General Public License as published by
5# the Free Software Foundation, either version 3 of the License, or
6# (at your option) any later version.
7#
8# Silkaj is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU Affero General Public License for more details.
12#
13# You should have received a copy of the GNU Affero General Public License
14# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
16import pendulum
17import rich_click as click
18from duniterpy.api.bma import blockchain, wot
20from silkaj.blockchain.tools import get_blockchain_parameters
21from silkaj.constants import DATE
22from silkaj.network import client_instance
23from silkaj.public_key import gen_pubkey_checksum, is_pubkey_and_check
24from silkaj.tui import Table
25from silkaj.wot import tools as wt
28@click.command(
29 "status",
30 help="Check received and sent certifications and \
31consult the membership status of any given identity",
32)
33@click.argument("uid_pubkey")
34def status(uid_pubkey: str) -> None:
35 """
36 get searched id
37 get id of received and sent certifications
38 display in a table the result with the numbers
39 """
40 client = client_instance()
41 first_block = client(blockchain.block, 1)
42 time_first_block = first_block["time"]
44 checked_pubkey = is_pubkey_and_check(uid_pubkey)
45 if checked_pubkey:
46 uid_pubkey = str(checked_pubkey)
48 identity, pubkey, signed = wt.choose_identity(uid_pubkey)
49 certifications = {} # type: dict
50 params = get_blockchain_parameters()
52 req = (client(wot.requirements, search=pubkey, pubkey=True))["identities"][0]
53 certifications["received_expire"] = []
54 certifications["received"] = []
55 certifications["sent"] = []
56 certifications["sent_expire"] = []
57 for lookup_cert in identity["others"]:
58 for req_cert in req["certifications"]:
59 if req_cert["from"] == lookup_cert["pubkey"]:
60 certifications["received_expire"].append(
61 pendulum.now().add(seconds=req_cert["expiresIn"]).format(DATE),
62 )
63 certifications["received"].append(f"{lookup_cert['uids'][0]} ✔")
64 break
65 for pending_cert in req["pendingCerts"]:
66 certifications["received"].append(
67 f"{(wt.identity_of(pending_cert['from']))['uid']} ✘",
68 )
69 certifications["received_expire"].append(
70 pendulum.from_timestamp(pending_cert["expires_on"], tz="local").format(
71 DATE
72 ),
73 )
74 certifications["sent"], certifications["sent_expire"] = get_sent_certifications(
75 signed,
76 time_first_block,
77 params,
78 )
80 nbr_sent_certs = len(certifications["sent"]) if "sent" in certifications else 0
82 table = Table(style="columns").set_cols_align(["r", "r", "r", "r"])
83 table.fill_from_dict(certifications)
85 print(
86 f"{identity['uid']} ({gen_pubkey_checksum(pubkey, True)}) \
87from block #{identity['meta']['timestamp'][:15]}…\n\
88received {len(certifications['received'])} and sent \
89{nbr_sent_certs}/{params['sigStock']} certifications:\n\
90{table.draw()}\n\
91✔: Certification written in the blockchain\n\
92✘: Pending certification, deadline treatment\n",
93 )
94 membership_status(certifications, pubkey, req)
97def membership_status(certifications: dict, pubkey: str, req: dict) -> None:
98 params = get_blockchain_parameters()
99 if len(certifications["received"]) >= params["sigQty"]:
100 date = certifications["received_expire"][
101 len(certifications["received"]) - params["sigQty"]
102 ]
103 print(f"Membership expiration due to certification expirations: {date}")
104 member_lookup = wt.is_member(pubkey)
105 is_member = bool(member_lookup)
106 print("member:", is_member)
107 if req["revoked"]:
108 revoke_date = pendulum.from_timestamp(req["revoked_on"], tz="local").format(
109 DATE
110 )
111 print(f"revoked: {req['revoked']}\nrevoked on: {revoke_date}")
112 if not is_member and req["wasMember"]:
113 print("expired:", req["expired"], "\nwasMember:", req["wasMember"])
114 elif is_member:
115 expiration_date = (
116 pendulum.now().add(seconds=req["membershipExpiresIn"]).format(DATE)
117 )
118 print(f"Membership document expiration: {expiration_date}")
119 print("Sentry:", req["isSentry"])
120 print("outdistanced:", req["outdistanced"])
123def get_sent_certifications(
124 signed: list,
125 time_first_block: int,
126 params: dict,
127) -> tuple[list[str], list[str]]:
128 sent = []
129 expire = []
130 if signed:
131 for cert in signed:
132 sent.append(cert["uid"])
133 expire.append(
134 expiration_date_from_block_id(
135 cert["cert_time"]["block"],
136 time_first_block,
137 params,
138 ),
139 )
140 return sent, expire
143def expiration_date_from_block_id(
144 block_id: str,
145 time_first_block: int,
146 params: dict,
147) -> str:
148 expir_timestamp = (
149 date_approximation(block_id, time_first_block, params["avgGenTime"])
150 + params["sigValidity"]
151 )
152 return pendulum.from_timestamp(expir_timestamp, tz="local").format(DATE)
155def date_approximation(block_id, time_first_block, avgentime):
156 return time_first_block + block_id * avgentime