From 1cd42f6d3332e1edf78b06bd7dcf51f5a1a7bb23 Mon Sep 17 00:00:00 2001 From: Rob Austein Date: Mon, 25 May 2020 19:33:38 -0400 Subject: Untested conversion to support Python 3 --- cryptech/libhal.py | 78 ++++++++++++---------- cryptech_backup | 41 ++++++------ cryptech_console | 4 +- cryptech_muxd | 31 ++++----- pkcs8.py | 124 +++++++++++++++++------------------ tests/parallel-signatures.py | 15 +++-- tests/test-ecdsa.py | 53 +++++++-------- tests/test-rsa.py | 43 ++++++------ tests/time-keygen.py | 4 +- unit-tests.py | 153 ++++++++++++++++++++++--------------------- utils/last_gasp_default_pin | 10 ++- 11 files changed, 284 insertions(+), 272 deletions(-) diff --git a/cryptech/libhal.py b/cryptech/libhal.py index 647dbd6..d7bc328 100644 --- a/cryptech/libhal.py +++ b/cryptech/libhal.py @@ -43,14 +43,15 @@ import uuid import xdrlib import socket import logging +import binascii logger = logging.getLogger(__name__) -SLIP_END = chr(0300) # indicates end of packet -SLIP_ESC = chr(0333) # indicates byte stuffing -SLIP_ESC_END = chr(0334) # ESC ESC_END means END data byte -SLIP_ESC_ESC = chr(0335) # ESC ESC_ESC means ESC data byte +SLIP_END = b"\300" # indicates end of packet +SLIP_ESC = b"\333" # indicates byte stuffing +SLIP_ESC_END = b"\334" # ESC ESC_END means END data byte +SLIP_ESC_ESC = b"\335" # ESC ESC_ESC means ESC data byte def slip_encode(buffer): @@ -70,7 +71,7 @@ class HALError(Exception): @classmethod def define(cls, **kw): assert len(kw) == 1 - name, text = kw.items()[0] + name, text = list(kw.items())[0] e = type(name, (cls,), dict(__doc__ = text)) cls.table.append(e) globals()[name] = e @@ -122,7 +123,10 @@ HALError.define(HAL_ERROR_RPC_PROTOCOL_ERROR = "RPC protocol error") HALError.define(HAL_ERROR_NOT_IMPLEMENTED = "Not implemented") -class Enum(int): +# When we finally drop Python 2 support, this class should probably be +# replaced by enum.IntEnum from the Python 3 standard library. + +class CEnum(int): def __new__(cls, name, value): self = int.__new__(cls, value) @@ -134,14 +138,14 @@ class Enum(int): return self._name def __repr__(self): - return "".format(self) + return "".format(self) _counter = 0 @classmethod def define(cls, names): symbols = [] - for name in names.translate(None, "{}").split(","): + for name in names.split(","): if "=" in name: name, sep, expr = name.partition("=") cls._counter = eval(expr.strip()) @@ -156,7 +160,7 @@ class Enum(int): packer.pack_uint(self) -class RPCFunc(Enum): pass +class RPCFunc(CEnum): pass RPCFunc.define(''' RPC_FUNC_GET_VERSION, @@ -193,7 +197,7 @@ RPCFunc.define(''' RPC_FUNC_PKEY_GENERATE_HASHSIG, ''') -class HALDigestAlgorithm(Enum): pass +class HALDigestAlgorithm(CEnum): pass HALDigestAlgorithm.define(''' HAL_DIGEST_ALGORITHM_NONE, @@ -206,7 +210,7 @@ HALDigestAlgorithm.define(''' HAL_DIGEST_ALGORITHM_SHA512 ''') -class HALKeyType(Enum): pass +class HALKeyType(CEnum): pass HALKeyType.define(''' HAL_KEY_TYPE_NONE, @@ -220,7 +224,7 @@ HALKeyType.define(''' HAL_KEY_TYPE_HASHSIG_LMOTS ''') -class HALCurve(Enum): pass +class HALCurve(CEnum): pass HALCurve.define(''' HAL_CURVE_NONE, @@ -229,7 +233,7 @@ HALCurve.define(''' HAL_CURVE_P521 ''') -class HALUser(Enum): pass +class HALUser(CEnum): pass HALUser.define(''' HAL_USER_NONE, @@ -238,7 +242,7 @@ HALUser.define(''' HAL_USER_WHEEL ''') -class HALLmotsAlgorithm(Enum): pass +class HALLmotsAlgorithm(CEnum): pass HALLmotsAlgorithm.define(''' HAL_LMOTS_RESERVED = 0, @@ -248,7 +252,7 @@ HALLmotsAlgorithm.define(''' HAL_LMOTS_SHA256_N32_W8 = 4 ''') -class HALLmsAlgorithm(Enum): pass +class HALLmsAlgorithm(CEnum): pass HALLmsAlgorithm.define(''' HAL_LMS_RESERVED = 0, @@ -336,6 +340,7 @@ class LocalDigest(object): def __init__(self, hsm, handle, algorithm, key): from Crypto.Hash import HMAC, SHA, SHA224, SHA256, SHA384, SHA512 + from struct import pack self.hsm = hsm self.handle = handle self.algorithm = algorithm @@ -351,7 +356,7 @@ class LocalDigest(object): } h = self._algorithms[algorithm] self.digest_length = h.digest_size - self.algorithm_id = chr(0x30) + chr(2 + len(h.oid)) + h.oid + self.algorithm_id = pack("BB", 0x30, 2 + len(h.oid)) + h.oid self._context = HMAC.HMAC(key = key, digestmod = h) if key else h() def update(self, data): @@ -411,10 +416,10 @@ class PKey(Handle): def public_key(self): return self.hsm.pkey_get_public_key(self, self.public_key_len) - def sign(self, hash = 0, data = "", length = 1024): + def sign(self, hash = 0, data = b"", length = 1024): return self.hsm.pkey_sign(self, hash = hash, data = data, length = length) - def verify(self, hash = 0, data = "", signature = None): + def verify(self, hash = 0, data = b"", signature = None): self.hsm.pkey_verify(self, hash = hash, data = data, signature = signature) def set_attributes(self, attributes = None, **kwargs): @@ -423,9 +428,9 @@ class PKey(Handle): def get_attributes(self, attributes): attrs = self.hsm.pkey_get_attributes(self, attributes, 0) - attrs = dict((k, v) for k, v in attrs.iteritems() if v != HAL_PKEY_ATTRIBUTE_NIL) + attrs = dict((k, v) for k, v in attrs.items() if v != HAL_PKEY_ATTRIBUTE_NIL) result = dict((a, None) for a in attributes) - result.update(self.hsm.pkey_get_attributes(self, attrs.iterkeys(), sum(attrs.itervalues()))) + result.update(self.hsm.pkey_get_attributes(self, iter(attrs.keys()), sum(attrs.values()))) return result def export_pkey(self, pkey): @@ -461,7 +466,7 @@ class HSM(object): def _send(self, msg): # Expects an xdrlib.Packer msg = slip_encode(msg.get_buffer()) if self.debug_io: - logger.debug("send: %s", ":".join("{:02x}".format(ord(c)) for c in msg)) + logger.debug("send: %s", ":".join(binascii.hexlify(c) for c in msg)) self.socket.sendall(msg) def _recv(self, code): # Returns a ContextManagedUnpacker @@ -469,24 +474,29 @@ class HSM(object): while True: msg = [self.sockfile.read(1)] while msg[-1] != SLIP_END: - if msg[-1] == "": + if msg[-1] == b"": raise HAL_ERROR_RPC_TRANSPORT() msg.append(self.sockfile.read(1)) if self.debug_io: - logger.debug("recv: %s", ":".join("{:02x}".format(ord(c)) for c in msg)) - msg = slip_decode("".join(msg)) + logger.debug("recv: %s", ":".join(binascii.hexlify(c) for c in msg)) + msg = slip_decode(b"".join(msg)) if not msg: continue - msg = ContextManagedUnpacker("".join(msg)) + msg = ContextManagedUnpacker(b"".join(msg)) if msg.unpack_uint() != code: continue return msg - _pack_builtin = (((int, long), "_pack_uint"), + _pack_builtin = ((int, "_pack_uint"), (str, "_pack_bytes"), ((list, tuple, set), "_pack_array"), (dict, "_pack_items")) + try: + _pack_builtin += (long, "_pack_uint") + except NameError: # "long" merged with "int" in Python 3 + pass + def _pack_arg(self, packer, arg): if hasattr(arg, "xdr_packer"): return arg.xdr_packer(packer) @@ -511,7 +521,7 @@ class HSM(object): def _pack_items(self, packer, arg): packer.pack_uint(len(arg)) - for name, value in arg.iteritems(): + for name, value in arg.items(): self._pack_arg(packer, name) self._pack_arg(packer, HAL_PKEY_ATTRIBUTE_NIL if value is None else value) @@ -569,7 +579,7 @@ class HSM(object): def hash_initialize(self, alg, key = None, client = 0, session = 0, mixed_mode = None): if key is None: - key = "" + key = b"" if mixed_mode is None: mixed_mode = self.mixed_mode if mixed_mode: @@ -600,7 +610,7 @@ class HSM(object): logger.debug("Opened pkey %s", pkey.uuid) return pkey - def pkey_generate_rsa(self, keylen, flags = 0, exponent = "\x01\x00\x01", client = 0, session = 0): + def pkey_generate_rsa(self, keylen, flags = 0, exponent = b"\x01\x00\x01", client = 0, session = 0): with self.rpc(RPC_FUNC_PKEY_GENERATE_RSA, session, keylen, exponent, flags, client = client) as r: pkey = PKey(self, r.unpack_uint(), UUID(bytes = r.unpack_bytes())) logger.debug("Generated RSA pkey %s", pkey.uuid) @@ -656,14 +666,14 @@ class HSM(object): with self.rpc(RPC_FUNC_PKEY_GET_PUBLIC_KEY, pkey, length) as r: return r.unpack_bytes() - def pkey_sign(self, pkey, hash = 0, data = "", length = 1024): + def pkey_sign(self, pkey, hash = 0, data = b"", length = 1024): assert not hash or not data if isinstance(hash, LocalDigest): hash, data = 0, hash.finalize_padded(pkey) with self.rpc(RPC_FUNC_PKEY_SIGN, pkey, hash, data, length) as r: return r.unpack_bytes() - def pkey_verify(self, pkey, hash = 0, data = "", signature = None): + def pkey_verify(self, pkey, hash = 0, data = b"", signature = None): assert not hash or not data if isinstance(hash, LocalDigest): hash, data = 0, hash.finalize_padded(pkey) @@ -680,7 +690,7 @@ class HSM(object): attributes, s, length, u, client = client) as r: s = r.unpack_uint() n = r.unpack_uint() - for i in xrange(n): + for i in range(n): u = UUID(bytes = r.unpack_bytes()) yield u @@ -695,9 +705,9 @@ class HSM(object): if n != len(attributes): raise HAL_ERROR_RPC_PROTOCOL_ERROR if attributes_buffer_len > 0: - return dict((r.unpack_uint(), r.unpack_bytes()) for i in xrange(n)) + return dict((r.unpack_uint(), r.unpack_bytes()) for i in range(n)) else: - return dict((r.unpack_uint(), r.unpack_uint()) for i in xrange(n)) + return dict((r.unpack_uint(), r.unpack_uint()) for i in range(n)) def pkey_export(self, pkey, kekek, pkcs8_max = 2560, kek_max = 512): with self.rpc(RPC_FUNC_PKEY_EXPORT, pkey, kekek, pkcs8_max, kek_max) as r: diff --git a/cryptech_backup b/cryptech_backup index 76cdfbb..be70f8b 100755 --- a/cryptech_backup +++ b/cryptech_backup @@ -48,6 +48,8 @@ import sys import json import uuid import atexit +import base64 +import struct import getpass import argparse @@ -145,11 +147,11 @@ def defcmd(subparsers, func): return subparser -def b64(bytes): - return bytes.encode("base64").splitlines() +def b64(der): + return base64.b64encode(der).splitlines() def b64join(lines): - return "".join(lines).decode("base64") + return base64.b64decode("".join(lines)) def cmd_setup(args, hsm): @@ -209,7 +211,7 @@ def key_flag_names(flags): token = HAL_KEY_FLAG_TOKEN, public = HAL_KEY_FLAG_PUBLIC, exportable = HAL_KEY_FLAG_EXPORTABLE) - return ", ".join(sorted(k for k, v in names.iteritems() if (flags & v) != 0)) + return ", ".join(sorted(k for k, v in names.items() if (flags & v) != 0)) def cmd_export(args, hsm): @@ -284,10 +286,10 @@ def cmd_import(args, hsm): flags = k.get("flags", 0) if pkcs8 and kek: with kekek.import_pkey(pkcs8 = pkcs8, kek = kek, flags = flags) as pkey: - print "Imported {} as {}".format(k["uuid"], pkey.uuid) + print("Imported {} as {}".format(k["uuid"], pkey.uuid)) elif spki: with hsm.pkey_load(der = spki, flags = flags) as pkey: - print "Loaded {} as {}".format(k["uuid"], pkey.uuid) + print("Loaded {} as {}".format(k["uuid"], pkey.uuid)) if soft_key: kekek.delete() @@ -316,22 +318,20 @@ class AESKeyWrapWithPadding(object): @staticmethod def _start_stop(start, stop): # Syntactic sugar step = -1 if start > stop else 1 - return xrange(start, stop + step, step) + return range(start, stop + step, step) @staticmethod def _xor(R0, t): - from struct import pack, unpack - return pack(">Q", unpack(">Q", R0)[0] ^ t) + return struct.pack(">Q", struct.unpack(">Q", R0)[0] ^ t) def wrap(self, Q): "RFC 5649 section 4.1." - from struct import pack m = len(Q) # Plaintext length if m % 8 != 0: # Pad Q if needed - Q += "\x00" * (8 - (m % 8)) - R = [pack(">LL", 0xa65959a6, m)] # Magic MSB(32,A), build LSB(32,A) + Q += b"\x00" * (8 - (m % 8)) + R = [struct.pack(">LL", 0xa65959a6, m)] # Magic MSB(32,A), build LSB(32,A) R.extend(Q[i : i + 8] # Append Q - for i in xrange(0, len(Q), 8)) + for i in range(0, len(Q), 8)) n = len(R) - 1 if n == 1: R[0], R[1] = self._encrypt(R[0], R[1]) @@ -342,16 +342,15 @@ class AESKeyWrapWithPadding(object): R[0], R[i] = self._encrypt(R[0], R[i]) R[0] = self._xor(R[0], n * j + i) assert len(R) == (n + 1) and all(len(r) == 8 for r in R) - return "".join(R) + return b"".join(R) def unwrap(self, C): "RFC 5649 section 4.2." - from struct import unpack if len(C) % 8 != 0: raise self.UnwrapError("Ciphertext length {} is not an integral number of blocks" .format(len(C))) n = (len(C) / 8) - 1 - R = [C[i : i + 8] for i in xrange(0, len(C), 8)] + R = [C[i : i + 8] for i in range(0, len(C), 8)] if n == 1: R[0], R[1] = self._decrypt(R[0], R[1]) else: @@ -360,15 +359,15 @@ class AESKeyWrapWithPadding(object): for i in self._start_stop(n, 1): R[0] = self._xor(R[0], n * j + i) R[0], R[i] = self._decrypt(R[0], R[i]) - magic, m = unpack(">LL", R[0]) + magic, m = struct.unpack(">LL", R[0]) if magic != 0xa65959a6: raise self.UnwrapError("Magic value in AIV should have been 0xa65959a6, was 0x{:02x}" .format(magic)) if m <= 8 * (n - 1) or m > 8 * n: raise self.UnwrapError("Length encoded in AIV out of range: m {}, n {}".format(m, n)) - R = "".join(R[1:]) + R = b"".join(R[1:]) assert len(R) == 8 * n - if any(r != "\x00" for r in R[m:]): + if any(r != b"\x00" for r in R[m:]): raise self.UnwrapError("Nonzero trailing bytes {}".format(R[m:].encode("hex"))) return R[:m] @@ -379,7 +378,7 @@ class SoftKEKEK(object): Requires PyCrypto on about every other line. """ - oid_aesKeyWrap = "\x60\x86\x48\x01\x65\x03\x04\x01\x30" + oid_aesKeyWrap = b"\x60\x86\x48\x01\x65\x03\x04\x01\x30" def parse_EncryptedPrivateKeyInfo(self, der): from Crypto.Util.asn1 import DerObject, DerSequence, DerOctetString, DerObjectId @@ -399,7 +398,7 @@ class SoftKEKEK(object): from Crypto.Util.asn1 import DerSequence, DerOctetString return DerSequence([ DerSequence([ - chr(0x06) + chr(len(self.oid_aesKeyWrap)) + self.oid_aesKeyWrap + struct.pack("BB", 0x06, len(self.oid_aesKeyWrap)) + self.oid_aesKeyWrap ]).encode(), DerOctetString(der).encode() ]).encode() diff --git a/cryptech_console b/cryptech_console index 5ac12ba..e88bf70 100755 --- a/cryptech_console +++ b/cryptech_console @@ -88,10 +88,10 @@ class FemtoTerm(object): self.close() def stdin_loop(self): - return self.copy_loop(self.stdin_stream, self.socket_stream, "\n", "\r") + return self.copy_loop(self.stdin_stream, self.socket_stream, b"\n", b"\r") def stdout_loop(self): - return self.copy_loop(self.socket_stream, self.stdout_stream, "\r\n", "\n") + return self.copy_loop(self.socket_stream, self.stdout_stream, b"\r\n", b"\n") def main(): parser = argparse.ArgumentParser(formatter_class = argparse.ArgumentDefaultsHelpFormatter) diff --git a/cryptech_muxd b/cryptech_muxd index ff40048..6c21f7e 100755 --- a/cryptech_muxd +++ b/cryptech_muxd @@ -45,6 +45,7 @@ import atexit import weakref import logging import argparse +import binascii import logging.handlers import serial @@ -64,13 +65,13 @@ from cryptech.libhal import HAL_OK, RPC_FUNC_GET_VERSION, RPC_FUNC_LOGOUT, RPC_F logger = logging.getLogger("cryptech_muxd") -SLIP_END = chr(0300) # Indicates end of SLIP packet -SLIP_ESC = chr(0333) # Indicates byte stuffing -SLIP_ESC_END = chr(0334) # ESC ESC_END means END data byte -SLIP_ESC_ESC = chr(0335) # ESC ESC_ESC means ESC data byte +SLIP_END = b"\300" # Indicates end of SLIP packet +SLIP_ESC = b"\333" # Indicates byte stuffing +SLIP_ESC_END = b"\334" # ESC ESC_END means END data byte +SLIP_ESC_ESC = b"\335" # ESC ESC_ESC means ESC data byte -Control_U = chr(0025) # Console: clear line -Control_M = chr(0015) # Console: end of line +Control_U = b"\025" # Console: clear line +Control_M = b"\015" # Console: end of line def slip_encode(buffer): @@ -146,7 +147,7 @@ class PFUnixServer(tornado.tcpserver.TCPServer): (aka PF_LOCAL) socket instead of a TCP socket. """ - def __init__(self, serial_stream, socket_filename, mode = 0600): + def __init__(self, serial_stream, socket_filename, mode = 0o600): super(PFUnixServer, self).__init__() self.serial = serial_stream self.socket_filename = socket_filename @@ -173,7 +174,7 @@ class RPCIOStream(SerialIOStream): @tornado.gen.coroutine def rpc_input(self, query, handle = 0, queue = None): "Send a query to the HSM." - logger.debug("RPC send: %s", ":".join("{:02x}".format(ord(c)) for c in query)) + logger.debug("RPC send: %s", ":".join(binascii.hexlify(c) for c in query)) if queue is not None: self.queues[handle] = queue with (yield self.rpc_input_lock.acquire()): @@ -189,10 +190,10 @@ class RPCIOStream(SerialIOStream): reply = yield self.read_until(SLIP_END) except tornado.iostream.StreamClosedError: logger.info("RPC UART closed") - for q in self.queues.itervalues(): + for q in self.queues.values(): q.put_nowait(None) return - logger.debug("RPC recv: %s", ":".join("{:02x}".format(ord(c)) for c in reply)) + logger.debug("RPC recv: %s", ":".join(binascii.hexlify(c) for c in reply)) if reply == SLIP_END: continue try: @@ -348,7 +349,7 @@ class ProbeIOStream(SerialIOStream): results = yield dict((dev, ProbeIOStream(dev).run_probe()) for dev in devs) - for dev, result in results.iteritems(): + for dev, result in results.items(): if result == "cty" and args.cty_device is None: logger.info("Selecting %s as CTY device", dev) @@ -370,9 +371,9 @@ class ProbeIOStream(SerialIOStream): yield tornado.gen.sleep(0.5) response = yield self.read_bytes(self.read_chunk_size, partial = True) - logger.debug("Probing %s: %r %s", self.serial_device, response, ":".join("{:02x}".format(ord(c)) for c in response)) + logger.debug("Probing %s: %r %s", self.serial_device, response, ":".join(binascii.hexlify(c) for c in response)) - is_cty = any(prompt in response for prompt in ("Username:", "Password:", "cryptech>")) + is_cty = any(prompt in response for prompt in (b"Username:", b"Password:", b"cryptech>")) try: is_rpc = response[response.index(SLIP_END + RPC_reply) + len(SLIP_END + RPC_reply) + 4] == SLIP_END @@ -429,7 +430,7 @@ def main(): parser.add_argument("--rpc-socket-mode", help = "permission bits for RPC socket inode", - default = 0600, type = lambda s: int(s, 8)) + default = 0o600, type = lambda s: int(s, 8)) parser.add_argument("--cty-device", help = "CTY serial device name", @@ -442,7 +443,7 @@ def main(): parser.add_argument("--cty-socket-mode", help = "permission bits for CTY socket inode", - default = 0600, type = lambda s: int(s, 8)) + default = 0o600, type = lambda s: int(s, 8)) args = parser.parse_args() diff --git a/pkcs8.py b/pkcs8.py index cd45ff6..295fbb4 100644 --- a/pkcs8.py +++ b/pkcs8.py @@ -177,80 +177,80 @@ if __name__ == "__main__": rsa_pkcs8 = DER_Decode(der_test_keys["rsa_pkcs8"], PrivateKeyInfo())[0] rsa_pkcs8_privateKey = DER_Decode(str(rsa_pkcs8["privateKey"]), RSAPrivateKey() )[0] - print - print "EC RFC 5915" - print ec_rfc5915.prettyPrint() + print() + print("EC RFC 5915") + print(ec_rfc5915.prettyPrint()) if show_manual_decode: - print + print() compressed, Qx, Qy = decode_ecpoint(ec_rfc5915["publicKey"].asOctets()) - print "version: ", ec_rfc5915["version"] - print "privateKey:", str(ec_rfc5915["privateKey"]).encode("hex") - print "parameters:", ec_rfc5915["parameters"] - print "publicKey: ", compressed - print " Qx: ", Qx.encode("hex") - print " Qy: ", Qy.encode("hex") + print("version: ", ec_rfc5915["version"]) + print("privateKey:", str(ec_rfc5915["privateKey"]).encode("hex")) + print("parameters:", ec_rfc5915["parameters"]) + print("publicKey: ", compressed) + print(" Qx: ", Qx.encode("hex")) + print(" Qy: ", Qy.encode("hex")) # This works, and lets .prettyPrint() display the ANY content properly, # but it breaks some of the key hackery we do after all this display stuff. #ec_pkcs8["privateKeyAlgorithm"]["parameters"] = DER_Decode(ec_pkcs8["privateKeyAlgorithm"]["parameters"])[0] - print - print "EC PKCS #8" - print ec_pkcs8.prettyPrint() - print ec_pkcs8_privateKey.prettyPrint() + print() + print("EC PKCS #8") + print(ec_pkcs8.prettyPrint()) + print(ec_pkcs8_privateKey.prettyPrint()) if show_manual_decode: - print + print() compressed, Qx, Qy = decode_ecpoint(ec_pkcs8_privateKey["publicKey"].asOctets()) - print "version: ", ec_pkcs8["version"] - print "privateKeyAlgorithm:", ec_pkcs8["privateKeyAlgorithm"][0] - print " ", DER_Decode(ec_pkcs8["privateKeyAlgorithm"]["parameters"])[0] - print "privateKey:" - print " version: ", ec_pkcs8_privateKey["version"] - print " privateKey:", str(ec_pkcs8_privateKey["privateKey"]).encode("hex") - print " parameters:", ec_pkcs8_privateKey["parameters"] - print " publicKey: ", compressed - print " Qx: ", Qx.encode("hex") - print " Qy: ", Qy.encode("hex") - - print - print "RSA RFC 2313" - print rsa_rfc2313.prettyPrint() + print("version: ", ec_pkcs8["version"]) + print("privateKeyAlgorithm:", ec_pkcs8["privateKeyAlgorithm"][0]) + print(" ", DER_Decode(ec_pkcs8["privateKeyAlgorithm"]["parameters"])[0]) + print("privateKey:") + print(" version: ", ec_pkcs8_privateKey["version"]) + print(" privateKey:", str(ec_pkcs8_privateKey["privateKey"]).encode("hex")) + print(" parameters:", ec_pkcs8_privateKey["parameters"]) + print(" publicKey: ", compressed) + print(" Qx: ", Qx.encode("hex")) + print(" Qy: ", Qy.encode("hex")) + + print() + print("RSA RFC 2313") + print(rsa_rfc2313.prettyPrint()) if show_manual_decode: - print - print "version:", rsa_rfc2313["version"] - print " n:", rsa_rfc2313["n"] - print " e:", rsa_rfc2313["e"] - print " d:", rsa_rfc2313["d"] - print " p:", rsa_rfc2313["p"] - print " q:", rsa_rfc2313["q"] - print " dP:", rsa_rfc2313["dP"] - print " dQ:", rsa_rfc2313["dQ"] - print " u:", rsa_rfc2313["u"] + print() + print("version:", rsa_rfc2313["version"]) + print(" n:", rsa_rfc2313["n"]) + print(" e:", rsa_rfc2313["e"]) + print(" d:", rsa_rfc2313["d"]) + print(" p:", rsa_rfc2313["p"]) + print(" q:", rsa_rfc2313["q"]) + print(" dP:", rsa_rfc2313["dP"]) + print(" dQ:", rsa_rfc2313["dQ"]) + print(" u:", rsa_rfc2313["u"]) #rsa_pkcs8["privateKeyAlgorithm"]["parameters"] = DER_Decode(rsa_pkcs8["privateKeyAlgorithm"]["parameters"])[0] - print - print "RSA PKCS #8" - print rsa_pkcs8.prettyPrint() - print rsa_pkcs8_privateKey.prettyPrint() + print() + print("RSA PKCS #8") + print(rsa_pkcs8.prettyPrint()) + print(rsa_pkcs8_privateKey.prettyPrint()) if show_manual_decode: - print - print "version: ", rsa_pkcs8["version"] - print "privateKeyAlgorithm:", rsa_pkcs8["privateKeyAlgorithm"][0] - print "privateKey:" - print " version:", rsa_pkcs8_privateKey["version"] - print " n:", rsa_pkcs8_privateKey["n"] - print " e:", rsa_pkcs8_privateKey["e"] - print " d:", rsa_pkcs8_privateKey["d"] - print " p:", rsa_pkcs8_privateKey["p"] - print " q:", rsa_pkcs8_privateKey["q"] - print " dP:", rsa_pkcs8_privateKey["dP"] - print " dQ:", rsa_pkcs8_privateKey["dQ"] - print " u:", rsa_pkcs8_privateKey["u"] + print() + print("version: ", rsa_pkcs8["version"]) + print("privateKeyAlgorithm:", rsa_pkcs8["privateKeyAlgorithm"][0]) + print("privateKey:") + print(" version:", rsa_pkcs8_privateKey["version"]) + print(" n:", rsa_pkcs8_privateKey["n"]) + print(" e:", rsa_pkcs8_privateKey["e"]) + print(" d:", rsa_pkcs8_privateKey["d"]) + print(" p:", rsa_pkcs8_privateKey["p"]) + print(" q:", rsa_pkcs8_privateKey["q"]) + print(" dP:", rsa_pkcs8_privateKey["dP"]) + print(" dQ:", rsa_pkcs8_privateKey["dQ"]) + print(" u:", rsa_pkcs8_privateKey["u"]) # Generate PKCS #8 from ECPrivateKey and check against static data p8 = PrivateKeyInfo() @@ -266,7 +266,7 @@ if __name__ == "__main__": der = DER_Encode(p8) #print; dumpasn1(der) #print; dumpasn1(der_test_keys["ec_pkcs8"]) - print; print "Reencoded PKCS #8 {} static data".format("matches" if der == der_test_keys["ec_pkcs8"] else "doesn't match") + print(); print("Reencoded PKCS #8 {} static data".format("matches" if der == der_test_keys["ec_pkcs8"] else "doesn't match")) # Try doing same thing with ecdsa package ASN.1 utilities. sk = SigningKey.from_der(der_test_keys["ec_rfc5915"]) @@ -278,7 +278,7 @@ if __name__ == "__main__": ECDSA_DER.encode_sequence(encoded_oid_ecPublicKey, sk.curve.encoded_oid), ECDSA_DER.encode_octet_string(ec)) - print; print "ECDSA-library PKCS #8 encoding {} pyasn1 PKCS #8 encoding".format("matches" if p8 == der_test_keys["ec_pkcs8"] else "doesn't match") + print(); print("ECDSA-library PKCS #8 encoding {} pyasn1 PKCS #8 encoding".format("matches" if p8 == der_test_keys["ec_pkcs8"] else "doesn't match")) # Generate ECPrivateKey from PKCS #8 and check against static data ec = ECPrivateKey() @@ -289,11 +289,11 @@ if __name__ == "__main__": der = DER_Encode(ec) #print; dumpasn1(der) #print; dumpasn1(der_test_keys["ec_rfc5915"]) - print; print "Reencoded PKCS #8 {} static data".format("matches" if der == der_test_keys["ec_rfc5915"] else "doesn't match") + print(); print("Reencoded PKCS #8 {} static data".format("matches" if der == der_test_keys["ec_rfc5915"] else "doesn't match")) # Paranoia: Make sure we really can load the RFC 5915 we just generated. sk = SigningKey.from_der(der) - print; print "ECDSA Python library parse of reencoded PKCS #8 data: {!r}".format(sk) + print(); print("ECDSA Python library parse of reencoded PKCS #8 data: {!r}".format(sk)) # Same thing with ecdsa package ASN.1 utilities. car, cdr = ECDSA_DER.remove_sequence(der_test_keys["ec_pkcs8"]) @@ -320,5 +320,5 @@ if __name__ == "__main__": assert cdr == "" assert pubkey[:2] == "\x00\x04" sk = SigningKey.from_string(privkey, curve) - print; print "ECDSA-library PKCS #8 decoding {} pyasn1 PKCS #8 decoding".format( - "matches" if der == sk.to_der() else "doesn't match") + print(); print("ECDSA-library PKCS #8 decoding {} pyasn1 PKCS #8 decoding".format( + "matches" if der == sk.to_der() else "doesn't match")) diff --git a/tests/parallel-signatures.py b/tests/parallel-signatures.py index 980f759..7cb7132 100755 --- a/tests/parallel-signatures.py +++ b/tests/parallel-signatures.py @@ -44,6 +44,7 @@ import uuid import xdrlib import socket import logging +import binascii import datetime import collections @@ -103,7 +104,7 @@ class PKey(cryptech.libhal.Handle): raise Return(r) @coroutine - def verify(self, data = "", signature = None): + def verify(self, data = b"", signature = None): yield self.hsm.pkey_verify(self, data = data, signature = signature) @@ -135,7 +136,7 @@ class HSM(cryptech.libhal.HSM): self._pack_args(packer, args) packer = cryptech.libhal.slip_encode(packer.get_buffer()) if self.debug_io: - logger.debug("send: %s", ":".join("{:02x}".format(ord(c)) for c in packer)) + logger.debug("send: %s", ":".join(binascii.hexlify(c) for c in packer)) yield self.iostream.write(packer) while True: try: @@ -143,11 +144,11 @@ class HSM(cryptech.libhal.HSM): except StreamClosedError: raise HAL_ERROR_RPC_TRANSPORT() if self.debug_io: - logger.debug("recv: %s", ":".join("{:02x}".format(ord(c)) for c in unpacker)) + logger.debug("recv: %s", ":".join(binascii.hexlify(c) for c in unpacker)) unpacker = cryptech.libhal.slip_decode(unpacker) if not unpacker: continue - unpacker = ContextManagedUnpacker("".join(unpacker)) + unpacker = ContextManagedUnpacker(b"".join(unpacker)) if unpacker.unpack_uint() == code: break client = unpacker.unpack_uint() @@ -209,7 +210,7 @@ def client(args, k, p, q, r, m, v, h): t0 = datetime.datetime.now() s = yield p.sign(data = m) t1 = datetime.datetime.now() - logger.debug("Signature %s: %s", n, ":".join("{:02x}".format(ord(b)) for b in s)) + logger.debug("Signature %s: %s", n, ":".join(binascii.hexlify(b) for b in s)) if args.verify and not v.verify(h, s): raise RuntimeError("RSA verification failed") r.add(t0, t1) @@ -236,11 +237,11 @@ def main(): d = k.exportKey(format = "DER", pkcs = 8) h = SHA256(args.text) v = PKCS115_SigScheme(k) - q = range(args.iterations) + q = list(range(args.iterations)) m = pkcs1_hash_and_pad(args.text) r = Result(args, args.key) - hsms = [HSM() for i in xrange(args.clients)] + hsms = [HSM() for i in range(args.clients)] for hsm in hsms: yield hsm.login(HAL_USER_NORMAL, args.pin) diff --git a/tests/test-ecdsa.py b/tests/test-ecdsa.py index f50cf59..cf21019 100644 --- a/tests/test-ecdsa.py +++ b/tests/test-ecdsa.py @@ -38,6 +38,7 @@ p384_u2 = 0xf3b240751d5d8ed394a4b5bf8e2a4c0e1e21aa51f2620a08b8c55a2bc334c96899 p384_v = 0xa0c27ec893092dea1e1bd2ccfed3cf945c8134ed0c9f81311a0f4a05942db8dbed8dd59f267471d5462aa14fe72de856 from textwrap import TextWrapper +from binascii import hexlify, unhexlify from os.path import basename from sys import argv from pyasn1.type.univ import Sequence, Choice, Integer, OctetString, ObjectIdentifier, BitString @@ -54,16 +55,16 @@ def long_to_bytes(number, order): # # This is just plain nasty. # - s = "%x" % number + s = "{:x}".format(number) s = ("0" * (order/8 - len(s))) + s - return s.decode("hex") + return unhexlify(s) -def bytes_to_bits(bytes): +def bytes_to_bits(b): # # This, on the other hand, is not just plain nasty, this is fancy nasty. # This is nasty with raisins in it. # - s = bin(long(bytes.encode("hex"), 16))[2:] + s = bin(int(hexlify(b), 16))[2:] if len(s) % 8: s = ("0" * (8 - len(s) % 8)) + s return tuple(int(i) for i in s) @@ -102,10 +103,10 @@ p384_key = encode_key(p384_d, p384_Qx, p384_Qy, 384, "1.3.132.0.34") ### -print "/*" -print " * ECDSA test data." -print " * File automatically generated by", basename(argv[0]) -print " */" +print("/*") +print(" * ECDSA test data.") +print(" * File automatically generated by", basename(argv[0])) +print(" */") curves = ("p256", "p384") vars = set() @@ -122,29 +123,29 @@ for curve in curves: for var in vars: name = curve + "_" + var value = globals().get(name, None) - if isinstance(value, (int, long)): + if isinstance(value, int): value = long_to_bytes(value, order) if value is not None: - print - print "static const uint8_t %s[] = { /* %d bytes */" % (name, len(value)) - print wrapper.fill(", ".join("0x%02x" % ord(v) for v in value)) - print "};" - -print -print "typedef struct {" -print " hal_curve_name_t curve;" + print() + print("static const uint8_t {}[] = {{ /* {:d} bytes */".format(name, len(value))) + print(wrapper.fill(", ".join("0x" + hexlify(v) for v in value))) + print("};") + +print() +print("typedef struct {") +print(" hal_curve_name_t curve;") for var in vars: - print " const uint8_t *%8s; size_t %8s_len;" % (var, var) -print "} ecdsa_tc_t;" -print -print "static const ecdsa_tc_t ecdsa_tc[] = {" + print(" const uint8_t *{0:>8}; size_t {0:>8}_len;".format(var)) +print("} ecdsa_tc_t;") +print() +print("static const ecdsa_tc_t ecdsa_tc[] = {") for curve in curves: - print " { HAL_CURVE_%s," % curve.upper() + print(" {{ HAL_CURVE_{},".format(curve.upper())) for var in vars: name = curve + "_" + var if name in globals(): - print " %-14s sizeof(%s)," % (name + ",", name) + print(" {:<14} sizeof({}),".format(name + ",", name)) else: - print " %-14s 0," % "NULL," - print " }," -print "};" + print(" {:<14} 0,".format("NULL,")) + print(" },") +print("};") diff --git a/tests/test-rsa.py b/tests/test-rsa.py index 6b52eb9..57c554d 100644 --- a/tests/test-rsa.py +++ b/tests/test-rsa.py @@ -35,6 +35,7 @@ Use PyCrypto to generate test data for Cryptech ModExp core. # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. from argparse import ArgumentParser, FileType +from binascii import hexlify from Crypto import __version__ as PyCryptoVersion from Crypto.PublicKey import RSA from Crypto.Hash import SHA256 @@ -71,32 +72,32 @@ wrapper = TextWrapper(width = 78, initial_indent = " " * 2, subsequent_indent = def printlines(*lines, **kwargs): for line in lines: - args.output.write(line % kwargs + "\n") + args.output.write(line.format(**kwargs) + "\n") def trailing_comma(item, sequence): return "" if item == sequence[-1] else "," def print_hex(name, value, comment): - printlines("static const uint8_t %(name)s[] = { /* %(comment)s, %(length)d bytes */", - wrapper.fill(", ".join("0x%02x" % ord(v) for v in value)), - "};", "", + printlines("static const uint8_t {name}[] = {{ /* {comment}, {length:d} bytes */", + wrapper.fill(", ".join("0x" + hexlify(v) for v in value)), + "}};", "", name = name, comment = comment, length = len(value)) def pad_to_blocksize(value, blocksize): extra = len(value) % blocksize - return value if extra == 0 else ("\x00" * (blocksize - extra)) + value + return value if extra == 0 else (b"\x00" * (blocksize - extra)) + value # Funnily enough, PyCrypto and Cryptlib use exactly the same names for # RSA key components, see Cryptlib documentation pages 186-187 & 339. -h = SHA256.new(plaintext) +h = SHA256.new(plaintext.encode("ascii")) printlines("/*", " * RSA signature test data for Cryptech project, automatically generated by", - " * %(scriptname)s using PyCrypto version %(version)s. Do not edit.", + " * {scriptname} using PyCrypto version {version}. Do not edit.", " *", - " * Plaintext: \"%(plaintext)s\"", - " * SHA-256: %(digest)s", + " * Plaintext: \"{plaintext}\"", + " * SHA-256: {digest}", " */", "", scriptname = scriptname, version = PyCryptoVersion, @@ -122,7 +123,7 @@ for k_len in args.key_lengths: else: blocksize = 4 - printlines("/* %(k_len)d-bit RSA private key (PKCS #%(pkcs)d)", + printlines("/* {k_len:d}-bit RSA private key (PKCS #{pkcs:d})", k.exportKey(format = "PEM", pkcs = args.pkcs_encoding), "*/", "", k_len = k_len, pkcs = args.pkcs_encoding) @@ -143,22 +144,22 @@ for k_len in args.key_lengths: else: value = getattr(k, name) - print_hex("%s_%d" % (name, k_len), + print_hex("{}_{:d}".format(name, k_len), long_to_bytes(value, blocksize = blocksize), - "key component %s" % name) + "key component {}".format(name)) - print_hex("m_%d" % k_len, pad_to_blocksize(m, blocksize), "message to be signed") - print_hex("s_%d" % k_len, pad_to_blocksize(s, blocksize), "signed message") + print_hex("m_{:d}".format(k_len), pad_to_blocksize(m, blocksize), "message to be signed") + print_hex("s_{:d}".format(k_len), pad_to_blocksize(s, blocksize), "signed message") -printlines("typedef struct { const uint8_t *val; size_t len; } rsa_tc_bn_t;", - "typedef struct { size_t size; rsa_tc_bn_t %(fields)s; } rsa_tc_t;", +printlines("typedef struct {{ const uint8_t *val; size_t len; }} rsa_tc_bn_t;", + "typedef struct {{ size_t size; rsa_tc_bn_t {fields}; }} rsa_tc_t;", "", - "static const rsa_tc_t rsa_tc[] = {", + "static const rsa_tc_t rsa_tc[] = {{", fields = ", ".join(fields)) for k_len in args.key_lengths: - printlines(" { %(k_len)d,", k_len = k_len) + printlines(" {{ {k_len:d},", k_len = k_len) for field in fields: - printlines(" { %(field)s_%(k_len)d, sizeof(%(field)s_%(k_len)d) }%(comma)s", + printlines(" {{ {field}_{k_len:d}, sizeof({field}_{k_len:d}) }}{comma}", field = field, k_len = k_len, comma = trailing_comma(field, fields)) - printlines(" }%(comma)s", comma = trailing_comma(k_len, args.key_lengths)) -printlines("};") + printlines(" }}{comma}", comma = trailing_comma(k_len, args.key_lengths)) +printlines("}};") diff --git a/tests/time-keygen.py b/tests/time-keygen.py index b7311ba..14a8119 100755 --- a/tests/time-keygen.py +++ b/tests/time-keygen.py @@ -22,7 +22,7 @@ hsm.login(HAL_USER_NORMAL, args.pin) flags = HAL_KEY_FLAG_USAGE_DIGITALSIGNATURE | (HAL_KEY_FLAG_TOKEN if args.token else 0) sum = timedelta() -for n in xrange(1, args.iterations + 1): +for n in range(1, args.iterations + 1): t0 = datetime.now() @@ -34,4 +34,4 @@ for n in xrange(1, args.iterations + 1): sum += t1 - t0 - print "{:4d} this {} mean {}".format(n, t1 - t0, sum / n) + print("{:4d} this {} mean {}".format(n, t1 - t0, sum / n)) diff --git a/unit-tests.py b/unit-tests.py index 6419b27..2bb80a8 100644 --- a/unit-tests.py +++ b/unit-tests.py @@ -39,6 +39,7 @@ LibHAL unit tests, using libhal.py and the Python unit_test framework. import unittest import datetime +import binascii import logging import sys @@ -221,7 +222,7 @@ class TestDigest(TestCase): """ def v(*bytes): - return "".join(chr(b) for b in bytes) + return b"".join(pack("B", b) for b in bytes) # NIST sample messages. @@ -1210,7 +1211,7 @@ class TestPKeyMatch(TestCaseLoggedIn): token = HAL_KEY_FLAG_TOKEN, public = HAL_KEY_FLAG_PUBLIC, exportable = HAL_KEY_FLAG_EXPORTABLE) - return ", ".join(sorted(k for k, v in names.iteritems() if (flags & v) != 0)) + return ", ".join(sorted(k for k, v in names.items() if (flags & v) != 0)) @staticmethod def cleanup_key(uuid): @@ -1222,7 +1223,7 @@ class TestPKeyMatch(TestCaseLoggedIn): def load_keys(self, flags): uuids = set() - for obj in PreloadedKey.db.itervalues(): + for obj in PreloadedKey.db.values(): with hsm.pkey_load(obj.der, flags) as k: self.addCleanup(self.cleanup_key, k.uuid) uuids.add(k.uuid) @@ -1241,7 +1242,7 @@ class TestPKeyMatch(TestCaseLoggedIn): def ks_match(self, mask, flags): tags = [] uuids = set() - for i in xrange(2): + for i in range(2): uuids |= self.load_keys(flags if mask else HAL_KEY_FLAG_TOKEN * i) tags.extend(PreloadedKey.db) self.assertEqual(len(tags), len(uuids)) @@ -1251,14 +1252,14 @@ class TestPKeyMatch(TestCaseLoggedIn): flags = flags, uuids = uuids))) - for keytype in set(HALKeyType.index.itervalues()) - {HAL_KEY_TYPE_NONE}: + for keytype in set(HALKeyType.index.values()) - {HAL_KEY_TYPE_NONE}: n = 0 for n, k in self.match(mask = mask, flags = flags, uuids = uuids, type = keytype): self.assertEqual(k.key_type, keytype) self.assertEqual(k.get_attributes({0}).pop(0), str(keytype)) self.assertEqual(n, sum(1 for t1, t2 in tags if t1 == keytype)) - for curve in set(HALCurve.index.itervalues()) - {HAL_CURVE_NONE}: + for curve in set(HALCurve.index.values()) - {HAL_CURVE_NONE}: n = 0 for n, k in self.match(mask = mask, flags = flags, uuids = uuids, curve = curve): self.assertEqual(k.key_curve, curve) @@ -1309,13 +1310,13 @@ class TestPKeyAttribute(TestCaseLoggedIn): def load_and_fill(self, flags, n_keys = 1, n_attrs = 2, n_fill = 0): pinwheel = Pinwheel() - for i in xrange(n_keys): - for obj in PreloadedKey.db.itervalues(): + for i in range(n_keys): + for obj in PreloadedKey.db.values(): with hsm.pkey_load(obj.der, flags) as k: pinwheel() self.addCleanup(self.cleanup_key, k.uuid) k.set_attributes(dict((j, "Attribute {}{}".format(j, "*" * n_fill)) - for j in xrange(n_attrs))) + for j in range(n_attrs))) pinwheel() # These sizes work with a 8192-byte keystore block; if you tweak @@ -1360,55 +1361,55 @@ class TestPKeyAttributeP11(TestCaseLoggedIn): def test_set_many_attributes(self): self.k.set_attributes({ - 0x001 : "\x01", - 0x108 : "\x01", - 0x105 : "\x00", - 0x002 : "\x01", - 0x107 : "\x00", - 0x102 : "\x45\x43\x2d\x50\x32\x35\x36", - 0x003 : "\x45\x43\x2d\x50\x32\x35\x36", - 0x162 : "\x00", - 0x103 : "\x01", - 0x000 : "\x03\x00\x00\x00", - 0x100 : "\x03\x00\x00\x00", - 0x101 : "", - 0x109 : "\x00", - 0x10c : "\x00", - 0x110 : "", - 0x111 : "", - 0x163 : "\x00", - 0x166 : "\xff\xff\xff\xff", - 0x170 : "\x01", - 0x210 : "\x00", - 0x163 : "\x01", - 0x166 : "\x40\x10\x00\x00", - 0x180 : "\x06\x08\x2a\x86\x48\xce\x3d\x03\x01\x07" }) + 0x001 : b"\x01", + 0x108 : b"\x01", + 0x105 : b"\x00", + 0x002 : b"\x01", + 0x107 : b"\x00", + 0x102 : b"\x45\x43\x2d\x50\x32\x35\x36", + 0x003 : b"\x45\x43\x2d\x50\x32\x35\x36", + 0x162 : b"\x00", + 0x103 : b"\x01", + 0x000 : b"\x03\x00\x00\x00", + 0x100 : b"\x03\x00\x00\x00", + 0x101 : b"", + 0x109 : b"\x00", + 0x10c : b"\x00", + 0x110 : b"", + 0x111 : b"", + 0x163 : b"\x00", + 0x166 : b"\xff\xff\xff\xff", + 0x170 : b"\x01", + 0x210 : b"\x00", + 0x163 : b"\x01", + 0x166 : b"\x40\x10\x00\x00", + 0x180 : b"\x06\x08\x2a\x86\x48\xce\x3d\x03\x01\x07" }) def test_set_many_attributes_with_deletions(self): self.k.set_attributes({ - 0x001 : "\x01", - 0x108 : "\x01", - 0x105 : "\x00", - 0x002 : "\x01", - 0x107 : "\x00", - 0x102 : "\x45\x43\x2d\x50\x32\x35\x36", - 0x003 : "\x45\x43\x2d\x50\x32\x35\x36", - 0x162 : "\x00", - 0x103 : "\x01", - 0x000 : "\x03\x00\x00\x00", - 0x100 : "\x03\x00\x00\x00", + 0x001 : b"\x01", + 0x108 : b"\x01", + 0x105 : b"\x00", + 0x002 : b"\x01", + 0x107 : b"\x00", + 0x102 : b"\x45\x43\x2d\x50\x32\x35\x36", + 0x003 : b"\x45\x43\x2d\x50\x32\x35\x36", + 0x162 : b"\x00", + 0x103 : b"\x01", + 0x000 : b"\x03\x00\x00\x00", + 0x100 : b"\x03\x00\x00\x00", 0x101 : None, - 0x109 : "\x00", - 0x10c : "\x00", + 0x109 : b"\x00", + 0x10c : b"\x00", 0x110 : None, 0x111 : None, - 0x163 : "\x00", - 0x166 : "\xff\xff\xff\xff", - 0x170 : "\x01", - 0x210 : "\x00", - 0x163 : "\x01", - 0x166 : "\x40\x10\x00\x00", - 0x180 : "\x06\x08\x2a\x86\x48\xce\x3d\x03\x01\x07" }) + 0x163 : b"\x00", + 0x166 : b"\xff\xff\xff\xff", + 0x170 : b"\x01", + 0x210 : b"\x00", + 0x163 : b"\x01", + 0x166 : b"\x40\x10\x00\x00", + 0x180 : b"\x06\x08\x2a\x86\x48\xce\x3d\x03\x01\x07" }) @unittest.skipUnless(ecdsa_loaded, "Requires Python ECDSA package") @@ -1425,7 +1426,7 @@ class TestPKeyAttributeWriteSpeedToken(TestCaseLoggedIn): def set_attributes(self, n_attrs): self.k.set_attributes(dict((i, "Attribute {}".format(i)) - for i in xrange(n_attrs))) + for i in range(n_attrs))) def test_set_1_attribute(self): self.set_attributes(1) @@ -1450,7 +1451,7 @@ class TestPKeyAttributeWriteSpeedVolatile(TestCaseLoggedIn): def set_attributes(self, n_attrs): self.k.set_attributes(dict((i, "Attribute {}".format(i)) - for i in xrange(n_attrs))) + for i in range(n_attrs))) def test_set_1_attribute(self): self.set_attributes(1) @@ -1472,16 +1473,16 @@ class TestPKeyAttributeReadSpeedToken(TestCaseLoggedIn): self.k = hsm.pkey_load(der, HAL_KEY_FLAG_TOKEN) self.addCleanup(self.k.delete) self.k.set_attributes(dict((i, "Attribute {}".format(i)) - for i in xrange(12))) + for i in range(12))) super(TestPKeyAttributeReadSpeedToken, self).setUp() def verify_attributes(self, n_attrs, attributes): expected = dict((i, "Attribute {}".format(i)) - for i in xrange(n_attrs)) + for i in range(n_attrs)) self.assertEqual(attributes, expected) def get_attributes(self, n_attrs): - attributes = self.k.get_attributes(range(n_attrs)) + attributes = self.k.get_attributes(list(range(n_attrs))) self.verify_attributes(n_attrs, attributes) def test_get_1_attribute(self): @@ -1504,16 +1505,16 @@ class TestPKeyAttributeReadSpeedVolatile(TestCaseLoggedIn): self.k = hsm.pkey_load(der, 0) self.addCleanup(self.k.delete) self.k.set_attributes(dict((i, "Attribute {}".format(i)) - for i in xrange(12))) + for i in range(12))) super(TestPKeyAttributeReadSpeedVolatile, self).setUp() def verify_attributes(self, n_attrs, attributes): expected = dict((i, "Attribute {}".format(i)) - for i in xrange(n_attrs)) + for i in range(n_attrs)) self.assertEqual(attributes, expected) def get_attributes(self, n_attrs): - attributes = self.k.get_attributes(range(n_attrs)) + attributes = self.k.get_attributes(list(range(n_attrs))) self.verify_attributes(n_attrs, attributes) def test_get_1_attribute(self): @@ -1537,7 +1538,7 @@ class TestPkeyECDSAVerificationNIST(TestCaseLoggedIn): py_curve, py_hash).to_der() k = hsm.pkey_load(Q, HAL_KEY_FLAG_USAGE_DIGITALSIGNATURE) self.addCleanup(k.delete) - k.verify(signature = (r + s).decode("hex"), data = H.decode("hex")) + k.verify(signature = binascii.unhexlify(r + s), data = binascii.unhexlify(H)) def test_suite_b_p256_verify(self): self.verify( @@ -1563,8 +1564,8 @@ class TestPkeyECDSAVerificationNIST(TestCaseLoggedIn): @unittest.skipUnless(pycrypto_loaded, "Requires Python Crypto package") class TestPKeyBackup(TestCaseLoggedIn): - oid_rsaEncryption = "\x2A\x86\x48\x86\xF7\x0D\x01\x01\x01" - oid_aesKeyWrap = "\x60\x86\x48\x01\x65\x03\x04\x01\x30" + oid_rsaEncryption = b"\x2A\x86\x48\x86\xF7\x0D\x01\x01\x01" + oid_aesKeyWrap = b"\x60\x86\x48\x01\x65\x03\x04\x01\x30" @staticmethod def parse_EncryptedPrivateKeyInfo(der, oid): @@ -1593,7 +1594,7 @@ class TestPKeyBackup(TestCaseLoggedIn): def encode_EncryptedPrivateKeyInfo(der, oid): from Crypto.Util.asn1 import DerSequence, DerOctetString return DerSequence([ - DerSequence([chr(0x06) + chr(len(oid)) + oid]).encode(), + DerSequence([pack("BB", 0x06, len(oid)) + oid]).encode(), DerOctetString(der).encode() ]).encode() @@ -1676,7 +1677,7 @@ class AESKeyWrapWithPadding(object): @staticmethod def _start_stop(start, stop): # Syntactic sugar step = -1 if start > stop else 1 - return xrange(start, stop + step, step) + return range(start, stop + step, step) @staticmethod def _xor(R0, t): @@ -1686,10 +1687,10 @@ class AESKeyWrapWithPadding(object): "RFC 5649 section 4.1." m = len(Q) # Plaintext length if m % 8 != 0: # Pad Q if needed - Q += "\x00" * (8 - (m % 8)) + Q += b"\x00" * (8 - (m % 8)) R = [pack(">LL", 0xa65959a6, m)] # Magic MSB(32,A), build LSB(32,A) R.extend(Q[i : i + 8] # Append Q - for i in xrange(0, len(Q), 8)) + for i in range(0, len(Q), 8)) n = len(R) - 1 if n == 1: R[0], R[1] = self._encrypt(R[0], R[1]) @@ -1700,7 +1701,7 @@ class AESKeyWrapWithPadding(object): R[0], R[i] = self._encrypt(R[0], R[i]) R[0] = self._xor(R[0], n * j + i) assert len(R) == (n + 1) and all(len(r) == 8 for r in R) - return "".join(R) + return b"".join(R) def unwrap(self, C): "RFC 5649 section 4.2." @@ -1708,7 +1709,7 @@ class AESKeyWrapWithPadding(object): raise self.UnwrapError("Ciphertext length {} is not an integral number of blocks" .format(len(C))) n = (len(C) / 8) - 1 - R = [C[i : i + 8] for i in xrange(0, len(C), 8)] + R = [C[i : i + 8] for i in range(0, len(C), 8)] if n == 1: R[0], R[1] = self._decrypt(R[0], R[1]) else: @@ -1723,10 +1724,10 @@ class AESKeyWrapWithPadding(object): .format(magic)) if m <= 8 * (n - 1) or m > 8 * n: raise self.UnwrapError("Length encoded in AIV out of range: m {}, n {}".format(m, n)) - R = "".join(R[1:]) + R = b"".join(R[1:]) assert len(R) == 8 * n - if any(r != "\x00" for r in R[m:]): - raise self.UnwrapError("Nonzero trailing bytes {}".format(R[m:].encode("hex"))) + if any(r != b"\x00" for r in R[m:]): + raise self.UnwrapError("Nonzero trailing bytes {}".format(binascii.hexlify(R[m:]))) return R[:m] @@ -1802,7 +1803,7 @@ class PreloadedECKey(PreloadedKey): if ecdsa_loaded: der = ECDSA_DER.unpem(pem) car, cdr = ECDSA_DER.remove_sequence(der) - cls._check(cdr == "") + cls._check(cdr == b"") version, cdr = ECDSA_DER.remove_integer(car) cls._check(version == 0) algid, pkinfo = ECDSA_DER.remove_sequence(cdr) @@ -1810,11 +1811,11 @@ class PreloadedECKey(PreloadedKey): cls._check(oid == oid_ecPublicKey) oid, cdr = ECDSA_DER.remove_object(cdr) sk_curve = ECDSA_find_curve(oid) - cls._check(cdr == "") + cls._check(cdr == b"") car, cdr = ECDSA_DER.remove_octet_string(pkinfo) - cls._check(cdr == "") + cls._check(cdr == b"") car, cdr = ECDSA_DER.remove_sequence(car) - cls._check(cdr == "") + cls._check(cdr == b"") version, cdr = ECDSA_DER.remove_integer(car) cls._check(version == 1) privkey, cdr = ECDSA_DER.remove_octet_string(cdr) diff --git a/utils/last_gasp_default_pin b/utils/last_gasp_default_pin index 8a91b8a..4dd1d54 100755 --- a/utils/last_gasp_default_pin +++ b/utils/last_gasp_default_pin @@ -44,6 +44,7 @@ generates the default. # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter +from binascii import hexlify from os import urandom from Crypto.Protocol.KDF import PBKDF2 from Crypto.Hash import SHA256, HMAC @@ -65,9 +66,6 @@ args = parser.parse_args() def HMAC_SHA256(pin, salt): return HMAC.new(pin, salt, SHA256).digest() -def hexify(value): - return ", ".join("0x%02x" % ord(v) for v in value) - salt = urandom(16) pin = PBKDF2(password = args.pin, @@ -76,7 +74,7 @@ pin = PBKDF2(password = args.pin, count = args.iterations, prf = HMAC_SHA256) -print '''\ +print('''\ /* * Automatically generated by a script, do not edit. */ @@ -86,5 +84,5 @@ static const hal_ks_pin_t hal_last_gasp_pin = {{ {{{pin}}}, {{{salt}}} }};'''.format(iterations = args.iterations, - pin = hexify(pin), - salt = hexify(salt)) + pin = ", ".join(hexlify(v) for v in pin), + salt = ", ".join(hexlify(v) for v in salt))) -- cgit v1.2.3