aboutsummaryrefslogtreecommitdiff
path: root/projects
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2020-05-25 19:33:47 -0400
committerRob Austein <sra@hactrn.net>2020-05-25 19:33:47 -0400
commitf948d674351a2ca87d33c0e0d8558cbbb2f59682 (patch)
treebf3e5c562e24929da786d8834d3ffe455d974e67 /projects
parent52f72e1e5dc5d3b646b54363f811ee2fd7958c19 (diff)
Untested conversion to support Python 3
Diffstat (limited to 'projects')
-rwxr-xr-xprojects/cli-test/filetransfer18
-rwxr-xr-xprojects/hsm/cryptech_probe21
-rwxr-xr-xprojects/hsm/cryptech_upload65
3 files changed, 51 insertions, 53 deletions
diff --git a/projects/cli-test/filetransfer b/projects/cli-test/filetransfer
index f704e31..147f081 100755
--- a/projects/cli-test/filetransfer
+++ b/projects/cli-test/filetransfer
@@ -37,7 +37,7 @@ import struct
import serial
import argparse
-from binascii import crc32
+from binascii import crc32, hexlify
CHUNK_SIZE = 256
DFU_CHUNK_SIZE = 256
@@ -79,19 +79,19 @@ def parse_args():
def _write(dst, data):
dst.write(data)
if len(data) == 4:
- print("Wrote 0x{!s}".format(data.encode('hex')))
+ print(("Wrote 0x{!s}".format(hexlify(data))))
else:
- print("Wrote {!r}".format(data))
+ print(("Wrote {!r}".format(data)))
def _read(dst):
- res = ''
+ res = b''
while True:
x = dst.read(1)
if not x:
break
res += x
- print ("Read {!r}".format(res))
+ print(("Read {!r}".format(res)))
return res
@@ -142,19 +142,19 @@ def send_file(filename, args, dst):
if not data:
break
dst.write(data)
- print("Wrote {!s} bytes (chunk {!s}/{!s})".format(len(data), counter, int(size / chunk_size)))
+ print(("Wrote {!s} bytes (chunk {!s}/{!s})".format(len(data), counter, int(size / chunk_size))))
# read ACK (a counter of number of 4k chunks received)
while True:
ack_bytes = dst.read(4)
if len(ack_bytes) == 4:
break
- print('ERROR: Did not receive an ACK, got {!r}'.format(ack_bytes))
+ print(('ERROR: Did not receive an ACK, got {!r}'.format(ack_bytes)))
dst.write('\r') # eventually get back to the CLI prompt
ack = struct.unpack('<I', ack_bytes)[0]
if ack != counter + 1:
- print('ERROR: Did not receive the expected counter as ACK (got {!r}/{!r}, not {!r})'.format(ack, ack_bytes, counter))
+ print(('ERROR: Did not receive the expected counter as ACK (got {!r}/{!r}, not {!r})'.format(ack, ack_bytes, counter)))
flush = dst.read(100)
- print('FLUSH data: {!r}'.format(flush))
+ print(('FLUSH data: {!r}'.format(flush)))
return False
counter += 1
diff --git a/projects/hsm/cryptech_probe b/projects/hsm/cryptech_probe
index ccee40a..3d14484 100755
--- a/projects/hsm/cryptech_probe
+++ b/projects/hsm/cryptech_probe
@@ -38,6 +38,7 @@ goes to stderr.
import sys
import time
import argparse
+import binascii
import serial.tools.list_ports_posix
class positive_integer(int):
@@ -52,16 +53,16 @@ parser.add_argument("--no-cleanup", action = "store_true", help = "don't
parser.add_argument("--read-buffer-size", type = positive_integer, help = "size of read buffer", default = 1024)
args = parser.parse_args()
-SLIP_END = chr(0300) # Indicates end of SLIP packet
-SLIP_ESC = chr(0333) # Indicates byte stuffing
-SLIP_ESC_END = chr(0334) # ESC ESC_END means END data byte
-SLIP_ESC_ESC = chr(0335) # ESC ESC_ESC means ESC data byte
+SLIP_END = b"\300" # Indicates end of SLIP packet
+SLIP_ESC = b"\333" # Indicates byte stuffing
+SLIP_ESC_END = b"\334" # ESC ESC_END means END data byte
+SLIP_ESC_ESC = b"\335" # ESC ESC_ESC means ESC data byte
-Control_U = chr(0025) # Console: clear line
-Control_M = chr(0015) # Console: end of line
+Control_U = b"\025" # Console: clear line
+Control_M = b"\015" # Console: end of line
-RPC_query = chr(0) * 8 # client_handle = 0, function code = RPC_FUNC_GET_VERSION
-RPC_reply = chr(0) * 12 # opcode = RPC_FUNC_GET_VERSION, client_handle = 0, valret = HAL_OK
+RPC_query = b"\0" * 8 # client_handle = 0, function code = RPC_FUNC_GET_VERSION
+RPC_reply = b"\0" * 12 # opcode = RPC_FUNC_GET_VERSION, client_handle = 0, valret = HAL_OK
# This is the query string we send to each USB port we find. It's
# intended to be relatively harmless, at least for either of the HSM
@@ -101,11 +102,11 @@ for port in ports:
response = tty.read(args.read_buffer_size)
if args.debug:
- sys.stderr.write("Received from {}: {!r} ({})\n".format(port, response, ":".join("{:02x}".format(ord(c)) for c in response)))
+ sys.stderr.write("Received from {}: {!r} ({})\n".format(port, response, ":".join(binascii.hexlify(c) for c in response)))
# Check whether we got a known console prompt.
- is_cty = any(prompt in response for prompt in ("Username:", "Password:", "cryptech>"))
+ is_cty = any(prompt in response for prompt in (b"Username:", b"Password:", b"cryptech>"))
# Check whether we got something that looks like the response to an RPC version query.
# We skip over the version value itself, as it might change, but we check that it's
diff --git a/projects/hsm/cryptech_upload b/projects/hsm/cryptech_upload
index b6e02bd..0c18f25 100755
--- a/projects/hsm/cryptech_upload
+++ b/projects/hsm/cryptech_upload
@@ -142,18 +142,15 @@ class ManagementPortAbstract(object):
self.args = args
def write(self, data):
- numeric = isinstance(data, (int, long))
+ numeric = isinstance(data, int)
if numeric:
data = struct.pack("<I", data)
self.send(data)
if self.args.debug:
- if numeric:
- print("Wrote 0x{!s}".format(data.encode("hex")))
- else:
- print("Wrote {!r}".format(data))
+ print("Wrote {!r}".format(data))
def read(self):
- res = ""
+ res = b""
x = self.recv()
while not x:
x = self.recv()
@@ -161,26 +158,26 @@ class ManagementPortAbstract(object):
res += x
x = self.recv()
if self.args.debug:
- print ("Read {!r}".format(res))
+ print("Read {!r}".format(res))
return res
def execute(self, cmd):
- self.write("\r")
+ self.write(b"\r")
prompt = self.read()
#if prompt.endswith("This is the bootloader speaking..."):
# prompt = self.read()
if prompt.endswith("Username: "):
- self.write(self.args.username + "\r")
+ self.write(self.args.username.encode("ascii") + b"\r")
prompt = self.read()
- if prompt.endswith("Password: "):
+ if prompt.endswith(b"Password: "):
if not self.args.pin or self.args.separate_pins:
self.args.pin = getpass.getpass("{} PIN: ".format(self.args.username))
- self.write(self.args.pin + "\r")
+ self.write(self.args.pin.encode("ascii") + b"\r")
prompt = self.read()
- if not prompt.endswith(("> ", "# ")):
+ if not prompt.endswith((b"> ", b"# ")):
print("Device does not seem to be ready for a file transfer (got {!r})".format(prompt))
return prompt
- self.write(cmd + "\r")
+ self.write(cmd.encode("ascii") + b"\r")
response = self.read()
return response
@@ -227,7 +224,7 @@ class ManagementPortSocket(ManagementPortAbstract):
try:
return self.socket.recv(1)
except socket.timeout:
- return ""
+ return b""
def set_timeout(self, timeout):
self.socket.settimeout(timeout)
@@ -244,19 +241,19 @@ def send_file(src, size, args, dst):
if args.fpga:
chunk_size = FPGA_CHUNK_SIZE
- response = dst.execute("fpga bitstream upload")
+ response = dst.execute(b"fpga bitstream upload")
elif args.firmware:
chunk_size = FIRMWARE_CHUNK_SIZE
- response = dst.execute("firmware upload")
- if "Rebooting" in response:
- response = dst.execute("firmware upload")
+ response = dst.execute(b"firmware upload")
+ if b"Rebooting" in response:
+ response = dst.execute(b"firmware upload")
elif args.bootloader:
chunk_size = FIRMWARE_CHUNK_SIZE
- response = dst.execute("bootloader upload")
- if "Access denied" in response:
- print "Access denied"
+ response = dst.execute(b"bootloader upload")
+ if b"Access denied" in response:
+ print("Access denied")
return False
- if not "OK" in response:
+ if not b"OK" in response:
print("Device did not accept the upload command (got {!r})".format(response))
return False
@@ -266,19 +263,19 @@ def send_file(src, size, args, dst):
# 1. Write size of file (4 bytes)
dst.write(struct.pack("<I", size))
response = dst.read()
- if not response.startswith("Send "):
- print response
+ if not response.startswith(b"Send "):
+ print(response)
return False
# 2. Write file contents while calculating CRC-32
chunks = int((size + chunk_size - 1) / chunk_size)
- for counter in xrange(chunks):
+ for counter in range(chunks):
data = src.read(chunk_size)
dst.write(data)
if not args.quiet:
print("Wrote {!s} bytes (chunk {!s}/{!s})".format(len(data), counter + 1, chunks))
# read ACK (a counter of number of 4k chunks received)
- ack_bytes = ""
+ ack_bytes = b""
while len(ack_bytes) < 4:
ack_bytes += dst.read()
ack = struct.unpack("<I", ack_bytes[:4])[0]
@@ -293,16 +290,16 @@ def send_file(src, size, args, dst):
dst.write(struct.pack("<I", crc))
response = dst.read()
if not args.quiet:
- print response
+ print(response)
src.close()
if args.fpga:
# tell the fpga to read its new configuration
- dst.execute("fpga reset")
+ dst.execute(b"fpga reset")
# log out of the CLI
# (firmware/bootloader upgrades reboot, don't need an exit)
- dst.execute("exit")
+ dst.execute(b"exit")
return True
@@ -332,7 +329,7 @@ def main():
if args.bootloader:
if not args.simon_says_whack_my_bootloader:
sys.exit("You didn't say \"Simon says\"")
- print dire_bootloader_warning
+ print(dire_bootloader_warning)
args.pin = None
if args.explicit_image is None and args.firmware_tarball is None:
@@ -344,12 +341,12 @@ def main():
if size == 0: # Flashing from stdin won't work, sorry
sys.exit("Can't flash from a pipe or zero-length file")
if not args.quiet:
- print "Uploading from explicitly-specified file {}".format(args.explicit_image.name)
+ print("Uploading from explicitly-specified file {}".format(args.explicit_image.name))
else:
tar = tarfile.open(fileobj = args.firmware_tarball)
if not args.quiet:
- print "Firmware tarball {} content:".format(args.firmware_tarball.name)
+ print("Firmware tarball {} content:".format(args.firmware_tarball.name))
tar.list(True)
if args.fpga:
name = "alpha_fmc.bit"
@@ -366,10 +363,10 @@ def main():
sys.exit("Expected component {} missing from firmware tarball {}".format(name, args.firmware_tarball.name))
src = tar.extractfile(name)
if not args.quiet:
- print "Uploading {} from {}".format(name, args.firmware_tarball.name)
+ print("Uploading {} from {}".format(name, args.firmware_tarball.name))
if not args.quiet:
- print "Initializing management port and synchronizing with HSM, this may take a few seconds"
+ print("Initializing management port and synchronizing with HSM, this may take a few seconds")
try:
dst = ManagementPortSocket(args, timeout = 1)
except socket.error as e: