diff options
author | Rob Austein <sra@hactrn.net> | 2020-05-25 19:33:47 -0400 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2020-05-25 19:33:47 -0400 |
commit | f948d674351a2ca87d33c0e0d8558cbbb2f59682 (patch) | |
tree | bf3e5c562e24929da786d8834d3ffe455d974e67 /projects | |
parent | 52f72e1e5dc5d3b646b54363f811ee2fd7958c19 (diff) |
Untested conversion to support Python 3
Diffstat (limited to 'projects')
-rwxr-xr-x | projects/cli-test/filetransfer | 18 | ||||
-rwxr-xr-x | projects/hsm/cryptech_probe | 21 | ||||
-rwxr-xr-x | projects/hsm/cryptech_upload | 65 |
3 files changed, 51 insertions, 53 deletions
diff --git a/projects/cli-test/filetransfer b/projects/cli-test/filetransfer index f704e31..147f081 100755 --- a/projects/cli-test/filetransfer +++ b/projects/cli-test/filetransfer @@ -37,7 +37,7 @@ import struct import serial import argparse -from binascii import crc32 +from binascii import crc32, hexlify CHUNK_SIZE = 256 DFU_CHUNK_SIZE = 256 @@ -79,19 +79,19 @@ def parse_args(): def _write(dst, data): dst.write(data) if len(data) == 4: - print("Wrote 0x{!s}".format(data.encode('hex'))) + print(("Wrote 0x{!s}".format(hexlify(data)))) else: - print("Wrote {!r}".format(data)) + print(("Wrote {!r}".format(data))) def _read(dst): - res = '' + res = b'' while True: x = dst.read(1) if not x: break res += x - print ("Read {!r}".format(res)) + print(("Read {!r}".format(res))) return res @@ -142,19 +142,19 @@ def send_file(filename, args, dst): if not data: break dst.write(data) - print("Wrote {!s} bytes (chunk {!s}/{!s})".format(len(data), counter, int(size / chunk_size))) + print(("Wrote {!s} bytes (chunk {!s}/{!s})".format(len(data), counter, int(size / chunk_size)))) # read ACK (a counter of number of 4k chunks received) while True: ack_bytes = dst.read(4) if len(ack_bytes) == 4: break - print('ERROR: Did not receive an ACK, got {!r}'.format(ack_bytes)) + print(('ERROR: Did not receive an ACK, got {!r}'.format(ack_bytes))) dst.write('\r') # eventually get back to the CLI prompt ack = struct.unpack('<I', ack_bytes)[0] if ack != counter + 1: - print('ERROR: Did not receive the expected counter as ACK (got {!r}/{!r}, not {!r})'.format(ack, ack_bytes, counter)) + print(('ERROR: Did not receive the expected counter as ACK (got {!r}/{!r}, not {!r})'.format(ack, ack_bytes, counter))) flush = dst.read(100) - print('FLUSH data: {!r}'.format(flush)) + print(('FLUSH data: {!r}'.format(flush))) return False counter += 1 diff --git a/projects/hsm/cryptech_probe b/projects/hsm/cryptech_probe index ccee40a..3d14484 100755 --- a/projects/hsm/cryptech_probe +++ b/projects/hsm/cryptech_probe @@ -38,6 +38,7 @@ goes to stderr. import sys import time import argparse +import binascii import serial.tools.list_ports_posix class positive_integer(int): @@ -52,16 +53,16 @@ parser.add_argument("--no-cleanup", action = "store_true", help = "don't parser.add_argument("--read-buffer-size", type = positive_integer, help = "size of read buffer", default = 1024) args = parser.parse_args() -SLIP_END = chr(0300) # Indicates end of SLIP packet -SLIP_ESC = chr(0333) # Indicates byte stuffing -SLIP_ESC_END = chr(0334) # ESC ESC_END means END data byte -SLIP_ESC_ESC = chr(0335) # ESC ESC_ESC means ESC data byte +SLIP_END = b"\300" # Indicates end of SLIP packet +SLIP_ESC = b"\333" # Indicates byte stuffing +SLIP_ESC_END = b"\334" # ESC ESC_END means END data byte +SLIP_ESC_ESC = b"\335" # ESC ESC_ESC means ESC data byte -Control_U = chr(0025) # Console: clear line -Control_M = chr(0015) # Console: end of line +Control_U = b"\025" # Console: clear line +Control_M = b"\015" # Console: end of line -RPC_query = chr(0) * 8 # client_handle = 0, function code = RPC_FUNC_GET_VERSION -RPC_reply = chr(0) * 12 # opcode = RPC_FUNC_GET_VERSION, client_handle = 0, valret = HAL_OK +RPC_query = b"\0" * 8 # client_handle = 0, function code = RPC_FUNC_GET_VERSION +RPC_reply = b"\0" * 12 # opcode = RPC_FUNC_GET_VERSION, client_handle = 0, valret = HAL_OK # This is the query string we send to each USB port we find. It's # intended to be relatively harmless, at least for either of the HSM @@ -101,11 +102,11 @@ for port in ports: response = tty.read(args.read_buffer_size) if args.debug: - sys.stderr.write("Received from {}: {!r} ({})\n".format(port, response, ":".join("{:02x}".format(ord(c)) for c in response))) + sys.stderr.write("Received from {}: {!r} ({})\n".format(port, response, ":".join(binascii.hexlify(c) for c in response))) # Check whether we got a known console prompt. - is_cty = any(prompt in response for prompt in ("Username:", "Password:", "cryptech>")) + is_cty = any(prompt in response for prompt in (b"Username:", b"Password:", b"cryptech>")) # Check whether we got something that looks like the response to an RPC version query. # We skip over the version value itself, as it might change, but we check that it's diff --git a/projects/hsm/cryptech_upload b/projects/hsm/cryptech_upload index b6e02bd..0c18f25 100755 --- a/projects/hsm/cryptech_upload +++ b/projects/hsm/cryptech_upload @@ -142,18 +142,15 @@ class ManagementPortAbstract(object): self.args = args def write(self, data): - numeric = isinstance(data, (int, long)) + numeric = isinstance(data, int) if numeric: data = struct.pack("<I", data) self.send(data) if self.args.debug: - if numeric: - print("Wrote 0x{!s}".format(data.encode("hex"))) - else: - print("Wrote {!r}".format(data)) + print("Wrote {!r}".format(data)) def read(self): - res = "" + res = b"" x = self.recv() while not x: x = self.recv() @@ -161,26 +158,26 @@ class ManagementPortAbstract(object): res += x x = self.recv() if self.args.debug: - print ("Read {!r}".format(res)) + print("Read {!r}".format(res)) return res def execute(self, cmd): - self.write("\r") + self.write(b"\r") prompt = self.read() #if prompt.endswith("This is the bootloader speaking..."): # prompt = self.read() if prompt.endswith("Username: "): - self.write(self.args.username + "\r") + self.write(self.args.username.encode("ascii") + b"\r") prompt = self.read() - if prompt.endswith("Password: "): + if prompt.endswith(b"Password: "): if not self.args.pin or self.args.separate_pins: self.args.pin = getpass.getpass("{} PIN: ".format(self.args.username)) - self.write(self.args.pin + "\r") + self.write(self.args.pin.encode("ascii") + b"\r") prompt = self.read() - if not prompt.endswith(("> ", "# ")): + if not prompt.endswith((b"> ", b"# ")): print("Device does not seem to be ready for a file transfer (got {!r})".format(prompt)) return prompt - self.write(cmd + "\r") + self.write(cmd.encode("ascii") + b"\r") response = self.read() return response @@ -227,7 +224,7 @@ class ManagementPortSocket(ManagementPortAbstract): try: return self.socket.recv(1) except socket.timeout: - return "" + return b"" def set_timeout(self, timeout): self.socket.settimeout(timeout) @@ -244,19 +241,19 @@ def send_file(src, size, args, dst): if args.fpga: chunk_size = FPGA_CHUNK_SIZE - response = dst.execute("fpga bitstream upload") + response = dst.execute(b"fpga bitstream upload") elif args.firmware: chunk_size = FIRMWARE_CHUNK_SIZE - response = dst.execute("firmware upload") - if "Rebooting" in response: - response = dst.execute("firmware upload") + response = dst.execute(b"firmware upload") + if b"Rebooting" in response: + response = dst.execute(b"firmware upload") elif args.bootloader: chunk_size = FIRMWARE_CHUNK_SIZE - response = dst.execute("bootloader upload") - if "Access denied" in response: - print "Access denied" + response = dst.execute(b"bootloader upload") + if b"Access denied" in response: + print("Access denied") return False - if not "OK" in response: + if not b"OK" in response: print("Device did not accept the upload command (got {!r})".format(response)) return False @@ -266,19 +263,19 @@ def send_file(src, size, args, dst): # 1. Write size of file (4 bytes) dst.write(struct.pack("<I", size)) response = dst.read() - if not response.startswith("Send "): - print response + if not response.startswith(b"Send "): + print(response) return False # 2. Write file contents while calculating CRC-32 chunks = int((size + chunk_size - 1) / chunk_size) - for counter in xrange(chunks): + for counter in range(chunks): data = src.read(chunk_size) dst.write(data) if not args.quiet: print("Wrote {!s} bytes (chunk {!s}/{!s})".format(len(data), counter + 1, chunks)) # read ACK (a counter of number of 4k chunks received) - ack_bytes = "" + ack_bytes = b"" while len(ack_bytes) < 4: ack_bytes += dst.read() ack = struct.unpack("<I", ack_bytes[:4])[0] @@ -293,16 +290,16 @@ def send_file(src, size, args, dst): dst.write(struct.pack("<I", crc)) response = dst.read() if not args.quiet: - print response + print(response) src.close() if args.fpga: # tell the fpga to read its new configuration - dst.execute("fpga reset") + dst.execute(b"fpga reset") # log out of the CLI # (firmware/bootloader upgrades reboot, don't need an exit) - dst.execute("exit") + dst.execute(b"exit") return True @@ -332,7 +329,7 @@ def main(): if args.bootloader: if not args.simon_says_whack_my_bootloader: sys.exit("You didn't say \"Simon says\"") - print dire_bootloader_warning + print(dire_bootloader_warning) args.pin = None if args.explicit_image is None and args.firmware_tarball is None: @@ -344,12 +341,12 @@ def main(): if size == 0: # Flashing from stdin won't work, sorry sys.exit("Can't flash from a pipe or zero-length file") if not args.quiet: - print "Uploading from explicitly-specified file {}".format(args.explicit_image.name) + print("Uploading from explicitly-specified file {}".format(args.explicit_image.name)) else: tar = tarfile.open(fileobj = args.firmware_tarball) if not args.quiet: - print "Firmware tarball {} content:".format(args.firmware_tarball.name) + print("Firmware tarball {} content:".format(args.firmware_tarball.name)) tar.list(True) if args.fpga: name = "alpha_fmc.bit" @@ -366,10 +363,10 @@ def main(): sys.exit("Expected component {} missing from firmware tarball {}".format(name, args.firmware_tarball.name)) src = tar.extractfile(name) if not args.quiet: - print "Uploading {} from {}".format(name, args.firmware_tarball.name) + print("Uploading {} from {}".format(name, args.firmware_tarball.name)) if not args.quiet: - print "Initializing management port and synchronizing with HSM, this may take a few seconds" + print("Initializing management port and synchronizing with HSM, this may take a few seconds") try: dst = ManagementPortSocket(args, timeout = 1) except socket.error as e: |