mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2025-01-04 11:00:47 -05:00
62b1ce8539
The CI appears to use the latest version of isort, which is a problem when isort gets a major version bump. Rather than try to pin the version, I've done the necessary to make isort5 happy with synapse.
73 lines
2.2 KiB
Python
73 lines
2.2 KiB
Python
import argparse
|
|
import json
|
|
import logging
|
|
import sys
|
|
|
|
import dns.resolver
|
|
import urllib2
|
|
from signedjson.key import decode_verify_key_bytes, write_signing_keys
|
|
from signedjson.sign import verify_signed_json
|
|
from unpaddedbase64 import decode_base64
|
|
|
|
|
|
def get_targets(server_name):
|
|
if ":" in server_name:
|
|
target, port = server_name.split(":")
|
|
yield (target, int(port))
|
|
return
|
|
try:
|
|
answers = dns.resolver.query("_matrix._tcp." + server_name, "SRV")
|
|
for srv in answers:
|
|
yield (srv.target, srv.port)
|
|
except dns.resolver.NXDOMAIN:
|
|
yield (server_name, 8448)
|
|
|
|
|
|
def get_server_keys(server_name, target, port):
|
|
url = "https://%s:%i/_matrix/key/v1" % (target, port)
|
|
keys = json.load(urllib2.urlopen(url))
|
|
verify_keys = {}
|
|
for key_id, key_base64 in keys["verify_keys"].items():
|
|
verify_key = decode_verify_key_bytes(key_id, decode_base64(key_base64))
|
|
verify_signed_json(keys, server_name, verify_key)
|
|
verify_keys[key_id] = verify_key
|
|
return verify_keys
|
|
|
|
|
|
def main():
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("signature_name")
|
|
parser.add_argument(
|
|
"input_json", nargs="?", type=argparse.FileType("r"), default=sys.stdin
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
logging.basicConfig()
|
|
|
|
server_name = args.signature_name
|
|
keys = {}
|
|
for target, port in get_targets(server_name):
|
|
try:
|
|
keys = get_server_keys(server_name, target, port)
|
|
print("Using keys from https://%s:%s/_matrix/key/v1" % (target, port))
|
|
write_signing_keys(sys.stdout, keys.values())
|
|
break
|
|
except Exception:
|
|
logging.exception("Error talking to %s:%s", target, port)
|
|
|
|
json_to_check = json.load(args.input_json)
|
|
print("Checking JSON:")
|
|
for key_id in json_to_check["signatures"][args.signature_name]:
|
|
try:
|
|
key = keys[key_id]
|
|
verify_signed_json(json_to_check, args.signature_name, key)
|
|
print("PASS %s" % (key_id,))
|
|
except Exception:
|
|
logging.exception("Check for key %s failed" % (key_id,))
|
|
print("FAIL %s" % (key_id,))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|