Coverage for silkaj/wot/status.py: 92%
72 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-20 12:29 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-20 12:29 +0000
1# Copyright 2016-2025 Maël Azimi <m.a@moul.re>
2#
3# Silkaj is free software: you can redistribute it and/or modify
4# it under the terms of the GNU Affero General Public License as published by
5# the Free Software Foundation, either version 3 of the License, or
6# (at your option) any later version.
7#
8# Silkaj is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU Affero General Public License for more details.
12#
13# You should have received a copy of the GNU Affero General Public License
14# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
16import arrow
17import rich_click as click
18from duniterpy.api.bma import blockchain, wot
20from silkaj.blockchain.tools import get_blockchain_parameters
21from silkaj.constants import DATE
22from silkaj.network import client_instance
23from silkaj.public_key import gen_pubkey_checksum, is_pubkey_and_check
24from silkaj.tui import Table
25from silkaj.wot import tools as wt
28@click.command(
29 "status",
30 help="Check received and sent certifications and \
31consult the membership status of any given identity",
32)
33@click.argument("uid_pubkey")
34def status(uid_pubkey: str) -> None:
35 """
36 get searched id
37 get id of received and sent certifications
38 display in a table the result with the numbers
39 """
40 client = client_instance()
41 first_block = client(blockchain.block, 1)
42 time_first_block = first_block["time"]
44 checked_pubkey = is_pubkey_and_check(uid_pubkey)
45 if checked_pubkey:
46 uid_pubkey = str(checked_pubkey)
48 identity, pubkey, signed = wt.choose_identity(uid_pubkey)
49 certifications = {} # type: dict
50 params = get_blockchain_parameters()
52 req = (client(wot.requirements, search=pubkey, pubkey=True))["identities"][0]
53 certifications["received_expire"] = []
54 certifications["received"] = []
55 certifications["sent"] = []
56 certifications["sent_expire"] = []
57 for lookup_cert in identity["others"]:
58 for req_cert in req["certifications"]:
59 if req_cert["from"] == lookup_cert["pubkey"]:
60 certifications["received_expire"].append(
61 arrow.now().shift(seconds=req_cert["expiresIn"]).format(DATE),
62 )
63 certifications["received"].append(f'{lookup_cert["uids"][0]} ✔')
64 break
65 for pending_cert in req["pendingCerts"]:
66 certifications["received"].append(
67 f'{(wt.identity_of(pending_cert["from"]))["uid"]} ✘',
68 )
69 certifications["received_expire"].append(
70 arrow.get(pending_cert["expires_on"]).to("local").format(DATE),
71 )
72 certifications["sent"], certifications["sent_expire"] = get_sent_certifications(
73 signed,
74 time_first_block,
75 params,
76 )
78 nbr_sent_certs = len(certifications["sent"]) if "sent" in certifications else 0
80 table = Table(style="columns").set_cols_align(["r", "r", "r", "r"])
81 table.fill_from_dict(certifications)
83 print(
84 f'{identity["uid"]} ({gen_pubkey_checksum(pubkey, True)}) \
85from block #{identity["meta"]["timestamp"][:15]}…\n\
86received {len(certifications["received"])} and sent \
87{nbr_sent_certs}/{params["sigStock"]} certifications:\n\
88{table.draw()}\n\
89✔: Certification written in the blockchain\n\
90✘: Pending certification, deadline treatment\n',
91 )
92 membership_status(certifications, pubkey, req)
95def membership_status(certifications: dict, pubkey: str, req: dict) -> None:
96 params = get_blockchain_parameters()
97 if len(certifications["received"]) >= params["sigQty"]:
98 date = certifications["received_expire"][
99 len(certifications["received"]) - params["sigQty"]
100 ]
101 print(f"Membership expiration due to certification expirations: {date}")
102 member_lookup = wt.is_member(pubkey)
103 is_member = bool(member_lookup)
104 print("member:", is_member)
105 if req["revoked"]:
106 revoke_date = arrow.get(req["revoked_on"]).to("local").format(DATE)
107 print(f"revoked: {req['revoked']}\nrevoked on: {revoke_date}")
108 if not is_member and req["wasMember"]:
109 print("expired:", req["expired"], "\nwasMember:", req["wasMember"])
110 elif is_member:
111 expiration_date = (
112 arrow.now().shift(seconds=req["membershipExpiresIn"]).format(DATE)
113 )
114 print(f"Membership document expiration: {expiration_date}")
115 print("Sentry:", req["isSentry"])
116 print("outdistanced:", req["outdistanced"])
119def get_sent_certifications(
120 signed: list,
121 time_first_block: int,
122 params: dict,
123) -> tuple[list[str], list[str]]:
124 sent = []
125 expire = []
126 if signed:
127 for cert in signed:
128 sent.append(cert["uid"])
129 expire.append(
130 expiration_date_from_block_id(
131 cert["cert_time"]["block"],
132 time_first_block,
133 params,
134 ),
135 )
136 return sent, expire
139def expiration_date_from_block_id(
140 block_id: str,
141 time_first_block: int,
142 params: dict,
143) -> str:
144 expir_timestamp = (
145 date_approximation(block_id, time_first_block, params["avgGenTime"])
146 + params["sigValidity"]
147 )
148 return arrow.get(expir_timestamp).to("local").format(DATE)
151def date_approximation(block_id, time_first_block, avgentime):
152 return time_first_block + block_id * avgentime