From ed4c22473f5fb07006e773137ed047950e25a4d8 Mon Sep 17 00:00:00 2001 From: Rob Austein Date: Tue, 26 May 2020 15:18:19 -0400 Subject: Wow, python-version-independent hexadecimal is painful --- cryptech/libhal.py | 12 ++++++++++-- cryptech_backup | 8 +++++--- cryptech_muxd | 15 +++++++++++---- tests/parallel-signatures.py | 15 +++++++++++---- tests/test-ecdsa.py | 3 ++- tests/test-rsa.py | 3 ++- unit-tests.py | 2 +- utils/last_gasp_default_pin | 8 ++++++-- 8 files changed, 48 insertions(+), 18 deletions(-) diff --git a/cryptech/libhal.py b/cryptech/libhal.py index d7bc328..56712cb 100644 --- a/cryptech/libhal.py +++ b/cryptech/libhal.py @@ -39,6 +39,7 @@ A Python interface to the Cryptech libhal RPC API. # not likely to want to use the full ONC RPC mechanism. import os +import sys import uuid import xdrlib import socket @@ -47,6 +48,13 @@ import binascii logger = logging.getLogger(__name__) +if sys.version_info.major == 2: + def colon_hex(raw): + return ":".join("{:02x}".format(ord(b)) for b in raw) +else: + def colon_hex(raw): + return ":".join("{:02x}".format(b) for b in raw) + SLIP_END = b"\300" # indicates end of packet SLIP_ESC = b"\333" # indicates byte stuffing @@ -466,7 +474,7 @@ class HSM(object): def _send(self, msg): # Expects an xdrlib.Packer msg = slip_encode(msg.get_buffer()) if self.debug_io: - logger.debug("send: %s", ":".join(binascii.hexlify(c) for c in msg)) + logger.debug("send: %s", colon_hex(msg)) self.socket.sendall(msg) def _recv(self, code): # Returns a ContextManagedUnpacker @@ -478,7 +486,7 @@ class HSM(object): raise HAL_ERROR_RPC_TRANSPORT() msg.append(self.sockfile.read(1)) if self.debug_io: - logger.debug("recv: %s", ":".join(binascii.hexlify(c) for c in msg)) + logger.debug("recv: %s", colon_hex(msg)) msg = slip_decode(b"".join(msg)) if not msg: continue diff --git a/cryptech_backup b/cryptech_backup index be70f8b..1f6f3d1 100755 --- a/cryptech_backup +++ b/cryptech_backup @@ -52,6 +52,7 @@ import base64 import struct import getpass import argparse +import binascii from cryptech.libhal import * @@ -361,14 +362,15 @@ class AESKeyWrapWithPadding(object): R[0], R[i] = self._decrypt(R[0], R[i]) magic, m = struct.unpack(">LL", R[0]) if magic != 0xa65959a6: - raise self.UnwrapError("Magic value in AIV should have been 0xa65959a6, was 0x{:02x}" - .format(magic)) + raise self.UnwrapError("Magic value in AIV should have been 0xa65959a6, was 0x{:08x}" + .format(magic)) if m <= 8 * (n - 1) or m > 8 * n: raise self.UnwrapError("Length encoded in AIV out of range: m {}, n {}".format(m, n)) R = b"".join(R[1:]) assert len(R) == 8 * n if any(r != b"\x00" for r in R[m:]): - raise self.UnwrapError("Nonzero trailing bytes {}".format(R[m:].encode("hex"))) + raise self.UnwrapError("Nonzero trailing bytes {}".format( + binascii.hexlify(R[m:]).decode("ascii"))) return R[:m] diff --git a/cryptech_muxd b/cryptech_muxd index 6c21f7e..7a85a06 100755 --- a/cryptech_muxd +++ b/cryptech_muxd @@ -45,7 +45,6 @@ import atexit import weakref import logging import argparse -import binascii import logging.handlers import serial @@ -65,6 +64,14 @@ from cryptech.libhal import HAL_OK, RPC_FUNC_GET_VERSION, RPC_FUNC_LOGOUT, RPC_F logger = logging.getLogger("cryptech_muxd") +if sys.version_info.major == 2: + def colon_hex(raw): + return ":".join("{:02x}".format(ord(b)) for b in raw) +else: + def colon_hex(raw): + return ":".join("{:02x}".format(b) for b in raw) + + SLIP_END = b"\300" # Indicates end of SLIP packet SLIP_ESC = b"\333" # Indicates byte stuffing SLIP_ESC_END = b"\334" # ESC ESC_END means END data byte @@ -174,7 +181,7 @@ class RPCIOStream(SerialIOStream): @tornado.gen.coroutine def rpc_input(self, query, handle = 0, queue = None): "Send a query to the HSM." - logger.debug("RPC send: %s", ":".join(binascii.hexlify(c) for c in query)) + logger.debug("RPC send: %s", colon_hex(query)) if queue is not None: self.queues[handle] = queue with (yield self.rpc_input_lock.acquire()): @@ -193,7 +200,7 @@ class RPCIOStream(SerialIOStream): for q in self.queues.values(): q.put_nowait(None) return - logger.debug("RPC recv: %s", ":".join(binascii.hexlify(c) for c in reply)) + logger.debug("RPC recv: %s", colon_hex(reply)) if reply == SLIP_END: continue try: @@ -371,7 +378,7 @@ class ProbeIOStream(SerialIOStream): yield tornado.gen.sleep(0.5) response = yield self.read_bytes(self.read_chunk_size, partial = True) - logger.debug("Probing %s: %r %s", self.serial_device, response, ":".join(binascii.hexlify(c) for c in response)) + logger.debug("Probing %s: %r %s", self.serial_device, response, colon_hex(response)) is_cty = any(prompt in response for prompt in (b"Username:", b"Password:", b"cryptech>")) diff --git a/tests/parallel-signatures.py b/tests/parallel-signatures.py index 7cb7132..87bac0a 100755 --- a/tests/parallel-signatures.py +++ b/tests/parallel-signatures.py @@ -44,7 +44,6 @@ import uuid import xdrlib import socket import logging -import binascii import datetime import collections @@ -81,6 +80,14 @@ globals().update((name, getattr(cryptech.libhal, name)) for prefix in ("HAL", "RPC", "SLIP"))) +if sys.version_info.major == 2: + def colon_hex(raw): + return ":".join("{:02x}".format(ord(b)) for b in raw) +else: + def colon_hex(raw): + return ":".join("{:02x}".format(b) for b in raw) + + class PKey(cryptech.libhal.Handle): def __init__(self, hsm, handle, uuid): @@ -136,7 +143,7 @@ class HSM(cryptech.libhal.HSM): self._pack_args(packer, args) packer = cryptech.libhal.slip_encode(packer.get_buffer()) if self.debug_io: - logger.debug("send: %s", ":".join(binascii.hexlify(c) for c in packer)) + logger.debug("send: %s", colon_hex(packer)) yield self.iostream.write(packer) while True: try: @@ -144,7 +151,7 @@ class HSM(cryptech.libhal.HSM): except StreamClosedError: raise HAL_ERROR_RPC_TRANSPORT() if self.debug_io: - logger.debug("recv: %s", ":".join(binascii.hexlify(c) for c in unpacker)) + logger.debug("recv: %s", colon_hex(unpacker)) unpacker = cryptech.libhal.slip_decode(unpacker) if not unpacker: continue @@ -210,7 +217,7 @@ def client(args, k, p, q, r, m, v, h): t0 = datetime.datetime.now() s = yield p.sign(data = m) t1 = datetime.datetime.now() - logger.debug("Signature %s: %s", n, ":".join(binascii.hexlify(b) for b in s)) + logger.debug("Signature %s: %s", n, colon_hex(s)) if args.verify and not v.verify(h, s): raise RuntimeError("RSA verification failed") r.add(t0, t1) diff --git a/tests/test-ecdsa.py b/tests/test-ecdsa.py index cf21019..4c14d9f 100644 --- a/tests/test-ecdsa.py +++ b/tests/test-ecdsa.py @@ -126,9 +126,10 @@ for curve in curves: if isinstance(value, int): value = long_to_bytes(value, order) if value is not None: + value = hexlify(value).decode("ascii") print() print("static const uint8_t {}[] = {{ /* {:d} bytes */".format(name, len(value))) - print(wrapper.fill(", ".join("0x" + hexlify(v) for v in value))) + print(wrapper.fill(", ".join("0x" + value[i : i + 2] for i in range(0, len(value), 2)))) print("};") print() diff --git a/tests/test-rsa.py b/tests/test-rsa.py index 57c554d..d0538ed 100644 --- a/tests/test-rsa.py +++ b/tests/test-rsa.py @@ -78,8 +78,9 @@ def trailing_comma(item, sequence): return "" if item == sequence[-1] else "," def print_hex(name, value, comment): + value = hexlify(value).decode("ascii") printlines("static const uint8_t {name}[] = {{ /* {comment}, {length:d} bytes */", - wrapper.fill(", ".join("0x" + hexlify(v) for v in value)), + wrapper.fill(", ".join("0x" + value[i : i + 2] for i in range(0, len(value), 2))) "}};", "", name = name, comment = comment, length = len(value)) diff --git a/unit-tests.py b/unit-tests.py index 2bb80a8..77ca4cb 100644 --- a/unit-tests.py +++ b/unit-tests.py @@ -1727,7 +1727,7 @@ class AESKeyWrapWithPadding(object): R = b"".join(R[1:]) assert len(R) == 8 * n if any(r != b"\x00" for r in R[m:]): - raise self.UnwrapError("Nonzero trailing bytes {}".format(binascii.hexlify(R[m:]))) + raise self.UnwrapError("Nonzero trailing bytes 0x{}".format(binascii.hexlify(R[m:]).decode("ascii"))) return R[:m] diff --git a/utils/last_gasp_default_pin b/utils/last_gasp_default_pin index 4dd1d54..1f340d0 100755 --- a/utils/last_gasp_default_pin +++ b/utils/last_gasp_default_pin @@ -66,6 +66,10 @@ args = parser.parse_args() def HMAC_SHA256(pin, salt): return HMAC.new(pin, salt, SHA256).digest() +def uint8(value): + value = hexlify(value).decode("ascii") + return ", ".join("0x" + value[i : i + 2] for i in range(0, len(value), 2)) + salt = urandom(16) pin = PBKDF2(password = args.pin, @@ -84,5 +88,5 @@ static const hal_ks_pin_t hal_last_gasp_pin = {{ {{{pin}}}, {{{salt}}} }};'''.format(iterations = args.iterations, - pin = ", ".join(hexlify(v) for v in pin), - salt = ", ".join(hexlify(v) for v in salt))) + pin = uint8(pin), + salt = uint8(salt))) -- cgit v1.2.3