diff options
author | Rob Austein <sra@hactrn.net> | 2016-07-07 15:05:37 -0400 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2016-07-07 15:05:37 -0400 |
commit | 40e903c8e0a925df1c67eec6fbd1be6ffccd0f09 (patch) | |
tree | 66e90ad17f8f16b38ab20fd461cfc1c4f94f020a /projects/hsm | |
parent | bc43cb1ce59ce5a628025a91d36868b0bf4791a4 (diff) |
Rewrite and add cleanup sequences to avoid confusing the RPC server.
Diffstat (limited to 'projects/hsm')
-rwxr-xr-x | projects/hsm/cryptech_probe | 88 |
1 files changed, 45 insertions, 43 deletions
diff --git a/projects/hsm/cryptech_probe b/projects/hsm/cryptech_probe index 16a2a8f..bc798bc 100755 --- a/projects/hsm/cryptech_probe +++ b/projects/hsm/cryptech_probe @@ -38,8 +38,15 @@ import time import argparse import serial.tools.list_ports_posix +class positive_integer(int): + def __init__(self, value): + if self <= 0: + raise ValueError + parser = argparse.ArgumentParser(formatter_class = argparse.ArgumentDefaultsHelpFormatter) -parser.add_argument("-v", "--verbose", action = "store_true", help = "blather about what we're doing") +parser.add_argument("-v", "--verbose", action = "store_true", help = "blather about what we're doing") +parser.add_argument("--no-cleanup", action = "store_true", help = "don't send cleanup sequences after probing") +parser.add_argument("--read-size", type = positive_integer, help = "size of read buffer", default = 1024) args = parser.parse_args() SLIP_END = chr(0300) # Indicates end of SLIP packet @@ -65,30 +72,8 @@ RPC_reply = chr(0) * 12 # opcode = RPC_FUNC_GET_VERSION, client_handle = probe_string = SLIP_END + Control_U + SLIP_END + RPC_query + SLIP_END + Control_U + Control_M -def looks_like_console(response): - # Check whether we got a known console prompt. - return any(prompt in response for prompt in ("Username:", "Password:", "cryptech>")) - - -def looks_like_rpc(response): - # Check whether we got something that looks like the response to an RPC version query. - # We skip over the version value itself, as it might change, but we check that it's - # terminated properly. This is fragile, and will need to handle SLIP decoding if - # we ever bump one of the version fields up into the range where the SLIP control - # characters live, but it will do for the moment. - try: - return response[response.index(SLIP_END + RPC_reply) + len(SLIP_END + RPC_reply) + 4] == SLIP_END - except ValueError: - return False - except IndexError: - return False - - -rpc_hints = None - -ports = dict((port, None) - for port, desc, hwid in serial.tools.list_ports_posix.comports() - if "VID:PID=0403:6014" in hwid) +ports = [port for port, desc, hwid in serial.tools.list_ports_posix.comports() + if "VID:PID=0403:6014" in hwid] if not ports: sys.exit("Couldn't find any likely USB ports") @@ -97,33 +82,50 @@ if args.verbose: print "Candidate USB ports:", ", ".join(ports) for port in ports: + while True: try: - ports[port] = serial.Serial(port, 921600, timeout=0.1) + tty = serial.Serial(port, 921600, timeout=0.1) break except serial.SerialException: time.sleep(0.2) -for port in ports: - # Do we really need to dole out characters one at a time here? - # Dunno, but this works well enough. for c in probe_string: - ports[port].write(c) + tty.write(c) time.sleep(0.1) -time.sleep(1) - -for port in ports: - s = "" - while True: - c = ports[port].read(1) - if len(c) > 0: - s += c - else: - break + response = tty.read(args.read_size) if args.verbose: - print "Received from {}: {!r} ({})".format(port, s, ":".join("{:02x}".format(ord(c)) for c in s)) - if looks_like_console(s): + print "Received from {}: {!r} ({})".format(port, response, ":".join("{:02x}".format(ord(c)) for c in response)) + + # Check whether we got a known console prompt. + + is_cty = any(prompt in response for prompt in ("Username:", "Password:", "cryptech>")) + + # Check whether we got something that looks like the response to an RPC version query. + # We skip over the version value itself, as it might change, but we check that it's + # terminated properly. This is fragile, and will need to handle SLIP decoding if + # we ever bump one of the version fields up into the range where the SLIP control + # characters live, but it will do for the moment. + + try: + is_hsm = response[response.index(SLIP_END + RPC_reply) + len(SLIP_END + RPC_reply) + 4] == SLIP_END + except ValueError: + is_hsm = False + except IndexError: + is_hsm = False + + if is_cty: print "{} looks like the Cryptech HSM console port".format(port) - if looks_like_rpc(s): + if is_hsm: print "{} looks like the Cryptech HSM RPC port".format(port) + + if (is_cty or is_hsm) and not args.no_cleanup: + if is_cty: + tty.write(Control_U) + if is_hsm: + tty.write(SLIP_END) + while tty.read(args.read_size): + pass + + tty.close() |