Source code for bushel.directory.network_status

import collections
import datetime
import re

from bushel.directory.document import DirectoryDocument
from bushel.directory.document import expect_arguments
from bushel.directory.document import parse_timestamp


[docs]class NetworkStatusConsensusDirectorySignature(collections.namedtuple( "NetworkStatusConsensusDirectorySignature", ['algorithm', 'identity', 'signing_key_digest', 'signature'])): """ Directory signatures as found in :class:`NetworkStatusConsensus`, defined in the Tor directory protocol version 3 ([dir-spec]_ ยง3.4.1). For the signature, we take the hash through the _space_ after ``directory-signature``, not the newline: this ensures that all authorities sign the same thing. :var str algorithm: one of "sha1" or "sha256", or *None* if this was not present :var str identity: hex-encoded digest of the authority identity key of the signing authority :var str signing_key_digest: hex-encoded digest of the current authority signing key of the signing authority :var bytes signature: signature of the status document, with the initial item "network-status-version", and the signature item "directory-signature", using the signing key """
[docs]class NetworkStatusConsensus(DirectoryDocument): def __init__(self, raw_content): super().__init__(raw_content) self.PARSE_FUNCTIONS = { "network-status-version": self.parse_network_status_version, "vote-status": self.parse_vote_status, "consensus-method": self.parse_consensus_method, "valid-after": self.parse_valid_after, "fresh-until": self.parse_fresh_until, "valid-until": self.parse_valid_until, "voting-delay": self.parse_voting_delay, "client-versions": self.parse_client_versions, "server-versions": self.parse_server_versions, "recommended-client-protocols": self.parse_recommended_client_protocols, "recommended-relay-protocols": self.parse_recommended_relay_protocols, } @expect_arguments(1, 1, True) def parse_network_status_version(self, item): self.network_status_version = item.arguments[0] @expect_arguments(1, 1, True) def parse_vote_status(self, item): self.vote_status = item.arguments[0] @expect_arguments(1, 1, True) def parse_consensus_method(self, item): self.consensus_method = item.arguments[0] @expect_arguments(2, 2, True) def parse_valid_after(self, item): self.valid_after = parse_timestamp(item) @expect_arguments(2, 2, True) def parse_fresh_until(self, item): self.fresh_until = parse_timestamp(item) @expect_arguments(2, 2, True) def parse_valid_until(self, item): self.valid_until = parse_timestamp(item) @expect_arguments(2, 2, True) def parse_voting_delay(self, item): self.vote_seconds, self.dist_seconds = item.arguments @expect_arguments(1, 1, True) def parse_client_versions(self, item): self.client_versions = item.arguments[0].split(",") @expect_arguments(1, 1, True) def parse_server_versions(self, item): self.server_versions = item.arguments[0].split(",") @expect_arguments(1, 12, False) def parse_known_flags(self, item): self.known_flags = item.arguments @expect_arguments(1, 9, False) def parse_recommended_client_protocols(self, item): self.recommended_client_protocols = {x[0]: x[1] for x in [y.split("=") for y in item.arguments]} @expect_arguments(1, 9, False) def parse_recommended_relay_protocols(self, item): self.recommended_relay_protocols = {x[0]: x[1] for x in [y.split("=") for y in item.arguments]}