Script pour obtenir une animation de la toile de confiance duniter. [dépôt archivé] le code a été intégré à DataJune (https://git.42l.fr/HugoTrentesaux/DataJune.jl)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
This repo is archived. You can view files and clone it, but cannot push or open issues/pull-requests.
 
 
 

128 lines
5.5 KiB

# These utility functions allow to browse a blockchain archive in JSON format
# and extract the relevant information into text files.
import os
import json
from tqdm import tqdm
CERTVALIDITY = 3600 * 24 * 365 * 2 # validity of certification in seconds (2 years)
# storage class
class Storage():
def __init__(self):
self.mapping = {} # dict mapping pubkey -> pseudo
self.pubkeys = {} # dict mapping pubkey -> position
self.cert_in = [] # list of entering certs: (time, (from, to))
self.cert_out = [] # list of expiring certs: (time, (from, to))
self.cert_exp = {} # dict mapping an edge to all cert expiry dates: (from, to) -> [time...]
self.joiners = [] # list of joiners: (time, pubkey) (reasons: enough certs, exired membership renewal)
self.leavers = [] # list of leavers: (time, pubkey) (reasons: not enough cert, revocation, membership expiry)
def save(self, folder):
# write pubkey to pseudo mapping
with open(os.path.join(folder,"mapping"), "w") as f:
for (k,v) in self.mapping.items():
f.write(f"{k} {v}\n")
# write one pseudo per line
with open(os.path.join(folder,"pseudo"), "w") as f:
for v in self.mapping.values():
f.write(f"{v}\n")
# write certification entering
with open(os.path.join(folder,"cert_in"), "w") as f:
for (t,c) in self.cert_in:
source = str(self.pubkeys[c[0]])
target = str(self.pubkeys[c[1]])
f.write(f"{t} {source} {target}\n")
# write certification expiry
with open(os.path.join(folder,"cert_out"), "w") as f:
for (t,c) in self.cert_out:
source = str(self.pubkeys[c[0]])
target = str(self.pubkeys[c[1]])
f.write(f"{t} {source} {target}\n")
# write joiners
with open(os.path.join(folder,"joiners"), "w") as f:
for (time,pubkey) in self.joiners:
f.write(f"{time} {self.pubkeys[pubkey]}\n")
# write leavers
with open(os.path.join(folder,"leavers"), "w") as f:
for (time,pubkey) in self.leavers:
f.write(f"{time} {self.pubkeys[pubkey]}\n")
# new pseudo
def get_joiners(storage, block):
t = block["time"]
for join in block["joiners"]:
parts = join.split(":")
pubkey = parts[0]
pseudo = parts[4]
storage.mapping[pubkey] = pseudo
storage.joiners.append((t, pubkey))
# leavers
def get_leavers(storage, block):
t = block["time"]
# if block["leavers"]: # expired membership (not necessarily exculded)
# for leav in block["leavers"]:
# parts = leav.split(":")
# pubkey = parts[0]
# storage.leavers.append((t, pubkey))
# if block["revoked"]: # revoked their key account (will appear in exculded)
# for leav in block["revoked"]:
# parts = leav.split(":")
# pubkey = parts[0]
# storage.leavers.append((t, pubkey))
if block["excluded"]: # excluded because of distance rule or missing certifications or revocation
for leav in block["excluded"]:
storage.leavers.append((t, leav))
# new certifications
def get_new_certs(storage, block):
t = block["time"]
expiration = t + CERTVALIDITY
if block["certifications"]:
for cert in block["certifications"]:
parts = cert.split(":")
source = parts[0]
target = parts[1]
if (source, target) in storage.cert_exp.keys(): # if cert previously existed (expiration planned)
if storage.cert_exp[(source, target)][-1] < t: # certification already expired
storage.cert_in.append((t, (source, target))) # add certification again
storage.cert_exp[(source, target)].append(expiration) # plan new expiration
else: # certification did not already expire (it's a renewal)
storage.cert_exp[(source, target)][-1] = expiration # update last expiration date for this certification
else: # new certification
storage.cert_in.append((t, (source, target))) # add certification
storage.cert_exp[(source, target)] = [expiration] # plan expiration
# Note: at the end, cert_exp contains a list of dates where the certification expired
# parse blocks
def parse_blocks(storage, folder):
chunks = len(os.listdir(folder))
for chunknumber in tqdm(range(chunks)):
chunkfile = os.path.join(folder,f"chunk_{chunknumber}-250.json")
with open(chunkfile) as f:
s = f.read()
blocks = json.loads(s)["blocks"] # old format (duniter)
# blocks = json.loads(s) # new format (dex)
for block in blocks:
get_joiners(storage, block)
get_leavers(storage, block)
get_new_certs(storage, block)
# fills in pubkey storage
for (i,p) in enumerate(storage.mapping.keys()):
storage.pubkeys[p] = i+1 # julia starts counting from 1
# fills in cert out from cert expiry
for (c,ts) in storage.cert_exp.items():
for t in ts:
storage.cert_out.append((t, c))
# sort it by time
storage.cert_out.sort(key=lambda x: x[0])
# parse and save
def main(source, dest):
storage = Storage()
print("parsing blocks...")
parse_blocks(storage, source)
print("saving files...")
storage.save(dest)