aboutsummaryrefslogtreecommitdiff
path: root/projects
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2016-07-07 15:05:37 -0400
committerRob Austein <sra@hactrn.net>2016-07-07 15:05:37 -0400
commit40e903c8e0a925df1c67eec6fbd1be6ffccd0f09 (patch)
tree66e90ad17f8f16b38ab20fd461cfc1c4f94f020a /projects
parentbc43cb1ce59ce5a628025a91d36868b0bf4791a4 (diff)
Rewrite and add cleanup sequences to avoid confusing the RPC server.
Diffstat (limited to 'projects')
-rwxr-xr-xprojects/hsm/cryptech_probe88
1 files changed, 45 insertions, 43 deletions
diff --git a/projects/hsm/cryptech_probe b/projects/hsm/cryptech_probe
index 16a2a8f..bc798bc 100755
--- a/projects/hsm/cryptech_probe
+++ b/projects/hsm/cryptech_probe
@@ -38,8 +38,15 @@ import time
import argparse
import serial.tools.list_ports_posix
+class positive_integer(int):
+ def __init__(self, value):
+ if self <= 0:
+ raise ValueError
+
parser = argparse.ArgumentParser(formatter_class = argparse.ArgumentDefaultsHelpFormatter)
-parser.add_argument("-v", "--verbose", action = "store_true", help = "blather about what we're doing")
+parser.add_argument("-v", "--verbose", action = "store_true", help = "blather about what we're doing")
+parser.add_argument("--no-cleanup", action = "store_true", help = "don't send cleanup sequences after probing")
+parser.add_argument("--read-size", type = positive_integer, help = "size of read buffer", default = 1024)
args = parser.parse_args()
SLIP_END = chr(0300) # Indicates end of SLIP packet
@@ -65,30 +72,8 @@ RPC_reply = chr(0) * 12 # opcode = RPC_FUNC_GET_VERSION, client_handle =
probe_string = SLIP_END + Control_U + SLIP_END + RPC_query + SLIP_END + Control_U + Control_M
-def looks_like_console(response):
- # Check whether we got a known console prompt.
- return any(prompt in response for prompt in ("Username:", "Password:", "cryptech>"))
-
-
-def looks_like_rpc(response):
- # Check whether we got something that looks like the response to an RPC version query.
- # We skip over the version value itself, as it might change, but we check that it's
- # terminated properly. This is fragile, and will need to handle SLIP decoding if
- # we ever bump one of the version fields up into the range where the SLIP control
- # characters live, but it will do for the moment.
- try:
- return response[response.index(SLIP_END + RPC_reply) + len(SLIP_END + RPC_reply) + 4] == SLIP_END
- except ValueError:
- return False
- except IndexError:
- return False
-
-
-rpc_hints = None
-
-ports = dict((port, None)
- for port, desc, hwid in serial.tools.list_ports_posix.comports()
- if "VID:PID=0403:6014" in hwid)
+ports = [port for port, desc, hwid in serial.tools.list_ports_posix.comports()
+ if "VID:PID=0403:6014" in hwid]
if not ports:
sys.exit("Couldn't find any likely USB ports")
@@ -97,33 +82,50 @@ if args.verbose:
print "Candidate USB ports:", ", ".join(ports)
for port in ports:
+
while True:
try:
- ports[port] = serial.Serial(port, 921600, timeout=0.1)
+ tty = serial.Serial(port, 921600, timeout=0.1)
break
except serial.SerialException:
time.sleep(0.2)
-for port in ports:
- # Do we really need to dole out characters one at a time here?
- # Dunno, but this works well enough.
for c in probe_string:
- ports[port].write(c)
+ tty.write(c)
time.sleep(0.1)
-time.sleep(1)
-
-for port in ports:
- s = ""
- while True:
- c = ports[port].read(1)
- if len(c) > 0:
- s += c
- else:
- break
+ response = tty.read(args.read_size)
if args.verbose:
- print "Received from {}: {!r} ({})".format(port, s, ":".join("{:02x}".format(ord(c)) for c in s))
- if looks_like_console(s):
+ print "Received from {}: {!r} ({})".format(port, response, ":".join("{:02x}".format(ord(c)) for c in response))
+
+ # Check whether we got a known console prompt.
+
+ is_cty = any(prompt in response for prompt in ("Username:", "Password:", "cryptech>"))
+
+ # Check whether we got something that looks like the response to an RPC version query.
+ # We skip over the version value itself, as it might change, but we check that it's
+ # terminated properly. This is fragile, and will need to handle SLIP decoding if
+ # we ever bump one of the version fields up into the range where the SLIP control
+ # characters live, but it will do for the moment.
+
+ try:
+ is_hsm = response[response.index(SLIP_END + RPC_reply) + len(SLIP_END + RPC_reply) + 4] == SLIP_END
+ except ValueError:
+ is_hsm = False
+ except IndexError:
+ is_hsm = False
+
+ if is_cty:
print "{} looks like the Cryptech HSM console port".format(port)
- if looks_like_rpc(s):
+ if is_hsm:
print "{} looks like the Cryptech HSM RPC port".format(port)
+
+ if (is_cty or is_hsm) and not args.no_cleanup:
+ if is_cty:
+ tty.write(Control_U)
+ if is_hsm:
+ tty.write(SLIP_END)
+ while tty.read(args.read_size):
+ pass
+
+ tty.close()