aboutsummaryrefslogtreecommitdiff
path: root/projects/hsm/cryptech_probe
diff options
context:
space:
mode:
Diffstat (limited to 'projects/hsm/cryptech_probe')
-rwxr-xr-xprojects/hsm/cryptech_probe158
1 files changed, 158 insertions, 0 deletions
diff --git a/projects/hsm/cryptech_probe b/projects/hsm/cryptech_probe
new file mode 100755
index 0000000..356931a
--- /dev/null
+++ b/projects/hsm/cryptech_probe
@@ -0,0 +1,158 @@
+#!/usr/bin/env python3
+#
+# Copyright (c) 2016, NORDUnet A/S All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+# - Redistributions of source code must retain the above copyright notice,
+# this list of conditions and the following disclaimer.
+#
+# - Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+#
+# - Neither the name of the NORDUnet nor the names of its contributors may
+# be used to endorse or promote products derived from this software
+# without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
+# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""
+Utility to probe USB serial port(s) trying to figure out which one(s)
+we have plugged in today. stdout is environment variable settings,
+suitable for use in bash with "eval `cryptech_probe`"; all other output
+goes to stderr.
+"""
+
+import sys
+import time
+import argparse
+import serial.tools.list_ports_posix
+
+if sys.version_info.major == 2:
+ def colon_hex(raw):
+ return ":".join("{:02x}".format(ord(b)) for b in raw)
+else:
+ def colon_hex(raw):
+ return ":".join("{:02x}".format(b) for b in raw)
+
+class positive_integer(int):
+ def __init__(self, value):
+ if self <= 0:
+ raise ValueError
+
+parser = argparse.ArgumentParser(formatter_class = argparse.ArgumentDefaultsHelpFormatter)
+parser.add_argument("-v", "--verbose", action = "store_true", help = "produce human-readable output")
+parser.add_argument("-d", "--debug", action = "store_true", help = "blather about what we're doing")
+parser.add_argument("--no-cleanup", action = "store_true", help = "don't send cleanup sequences after probing")
+parser.add_argument("--read-buffer-size", type = positive_integer, help = "size of read buffer", default = 1024)
+args = parser.parse_args()
+
+SLIP_END = b"\300" # Indicates end of SLIP packet
+SLIP_ESC = b"\333" # Indicates byte stuffing
+SLIP_ESC_END = b"\334" # ESC ESC_END means END data byte
+SLIP_ESC_ESC = b"\335" # ESC ESC_ESC means ESC data byte
+
+Control_U = b"\025" # Console: clear line
+Control_M = b"\015" # Console: end of line
+
+RPC_query = b"\0" * 8 # client_handle = 0, function code = RPC_FUNC_GET_VERSION
+RPC_reply = b"\0" * 12 # opcode = RPC_FUNC_GET_VERSION, client_handle = 0, valret = HAL_OK
+
+# This is the query string we send to each USB port we find. It's
+# intended to be relatively harmless, at least for either of the HSM
+# ports: the final Control-U should prevent the console from trying to
+# interpret the RPC command, and the SLIP_END markers should cause
+# the RPC server to treat the ASCII control characters as noise.
+#
+# Yes, this is a total kludge. Useful identifiers for the USB ports
+# are are on the wish list for a future revision of the hardware, but
+# for the moment, we do what we can with what we have.
+
+probe_string = SLIP_END + Control_U + SLIP_END + RPC_query + SLIP_END + Control_U + Control_M
+
+ports = [port for port, desc, hwid in serial.tools.list_ports_posix.comports()
+ if "VID:PID=0403:6014" in hwid]
+
+if not ports:
+ sys.exit("Couldn't find any likely USB ports")
+
+if args.debug:
+ sys.stderr.write("Candidate USB ports: {}\n".format(", ".join(ports)))
+
+env = {}
+
+for port in ports:
+
+ while True:
+ try:
+ tty = serial.Serial(port, 921600, timeout=0.1)
+ break
+ except serial.SerialException:
+ time.sleep(0.2)
+
+ # Not sure we really need to dribble the probe string out this slowly anymore,
+ # but once upon a time we did this for a reason and it's not like this program
+ # is a performance bottleneck, so stick with the safe version.
+
+ for i in range(len(probe_string)):
+ tty.write(probe_string[i:i+1])
+ time.sleep(0.1)
+
+ response = tty.read(args.read_buffer_size)
+ if args.debug:
+ sys.stderr.write("Received from {}: {!r} ({})\n".format(port, response, colon_hex(response)))
+
+ # Check whether we got a known console prompt.
+
+ is_cty = any(prompt in response for prompt in (b"Username:", b"Password:", b"cryptech>"))
+
+ # Check whether we got something that looks like the response to an RPC version query.
+ # We skip over the version value itself, as it might change, but we check that it's
+ # terminated properly. This is fragile, and will need to handle SLIP decoding if
+ # we ever bump one of the version fields up into the range where the SLIP control
+ # characters live, but it will do for the moment.
+
+ try:
+ is_hsm = response[response.index(SLIP_END + RPC_reply) + len(SLIP_END + RPC_reply) + 4] == SLIP_END[0]
+ except ValueError:
+ is_hsm = False
+ except IndexError:
+ is_hsm = False
+
+ if is_cty and args.verbose:
+ sys.stderr.write("{} looks like the Cryptech HSM console port\n".format(port))
+
+ if is_hsm and args.verbose:
+ sys.stderr.write("{} looks like the Cryptech HSM RPC port\n".format(port))
+
+ if is_cty:
+ env.update(CRYPTECH_CTY_CLIENT_SERIAL_DEVICE = port)
+
+ if is_hsm:
+ env.update(CRYPTECH_RPC_CLIENT_SERIAL_DEVICE = port)
+
+ if (is_cty or is_hsm) and not args.no_cleanup:
+ if is_cty:
+ tty.write(Control_U)
+ if is_hsm:
+ tty.write(SLIP_END)
+ while tty.read(args.read_buffer_size):
+ pass
+
+ tty.close()
+
+if env:
+ sys.stdout.write("export {}\n".format(
+ " ".join("{}='{}'".format(var, env[var]) for var in sorted(env))))