From 1cd42f6d3332e1edf78b06bd7dcf51f5a1a7bb23 Mon Sep 17 00:00:00 2001 From: Rob Austein Date: Mon, 25 May 2020 19:33:38 -0400 Subject: Untested conversion to support Python 3 --- cryptech/libhal.py | 78 ++++++++++++++++++++++++++++++------------------------ 1 file changed, 44 insertions(+), 34 deletions(-) (limited to 'cryptech') diff --git a/cryptech/libhal.py b/cryptech/libhal.py index 647dbd6..d7bc328 100644 --- a/cryptech/libhal.py +++ b/cryptech/libhal.py @@ -43,14 +43,15 @@ import uuid import xdrlib import socket import logging +import binascii logger = logging.getLogger(__name__) -SLIP_END = chr(0300) # indicates end of packet -SLIP_ESC = chr(0333) # indicates byte stuffing -SLIP_ESC_END = chr(0334) # ESC ESC_END means END data byte -SLIP_ESC_ESC = chr(0335) # ESC ESC_ESC means ESC data byte +SLIP_END = b"\300" # indicates end of packet +SLIP_ESC = b"\333" # indicates byte stuffing +SLIP_ESC_END = b"\334" # ESC ESC_END means END data byte +SLIP_ESC_ESC = b"\335" # ESC ESC_ESC means ESC data byte def slip_encode(buffer): @@ -70,7 +71,7 @@ class HALError(Exception): @classmethod def define(cls, **kw): assert len(kw) == 1 - name, text = kw.items()[0] + name, text = list(kw.items())[0] e = type(name, (cls,), dict(__doc__ = text)) cls.table.append(e) globals()[name] = e @@ -122,7 +123,10 @@ HALError.define(HAL_ERROR_RPC_PROTOCOL_ERROR = "RPC protocol error") HALError.define(HAL_ERROR_NOT_IMPLEMENTED = "Not implemented") -class Enum(int): +# When we finally drop Python 2 support, this class should probably be +# replaced by enum.IntEnum from the Python 3 standard library. + +class CEnum(int): def __new__(cls, name, value): self = int.__new__(cls, value) @@ -134,14 +138,14 @@ class Enum(int): return self._name def __repr__(self): - return "".format(self) + return "".format(self) _counter = 0 @classmethod def define(cls, names): symbols = [] - for name in names.translate(None, "{}").split(","): + for name in names.split(","): if "=" in name: name, sep, expr = name.partition("=") cls._counter = eval(expr.strip()) @@ -156,7 +160,7 @@ class Enum(int): packer.pack_uint(self) -class RPCFunc(Enum): pass +class RPCFunc(CEnum): pass RPCFunc.define(''' RPC_FUNC_GET_VERSION, @@ -193,7 +197,7 @@ RPCFunc.define(''' RPC_FUNC_PKEY_GENERATE_HASHSIG, ''') -class HALDigestAlgorithm(Enum): pass +class HALDigestAlgorithm(CEnum): pass HALDigestAlgorithm.define(''' HAL_DIGEST_ALGORITHM_NONE, @@ -206,7 +210,7 @@ HALDigestAlgorithm.define(''' HAL_DIGEST_ALGORITHM_SHA512 ''') -class HALKeyType(Enum): pass +class HALKeyType(CEnum): pass HALKeyType.define(''' HAL_KEY_TYPE_NONE, @@ -220,7 +224,7 @@ HALKeyType.define(''' HAL_KEY_TYPE_HASHSIG_LMOTS ''') -class HALCurve(Enum): pass +class HALCurve(CEnum): pass HALCurve.define(''' HAL_CURVE_NONE, @@ -229,7 +233,7 @@ HALCurve.define(''' HAL_CURVE_P521 ''') -class HALUser(Enum): pass +class HALUser(CEnum): pass HALUser.define(''' HAL_USER_NONE, @@ -238,7 +242,7 @@ HALUser.define(''' HAL_USER_WHEEL ''') -class HALLmotsAlgorithm(Enum): pass +class HALLmotsAlgorithm(CEnum): pass HALLmotsAlgorithm.define(''' HAL_LMOTS_RESERVED = 0, @@ -248,7 +252,7 @@ HALLmotsAlgorithm.define(''' HAL_LMOTS_SHA256_N32_W8 = 4 ''') -class HALLmsAlgorithm(Enum): pass +class HALLmsAlgorithm(CEnum): pass HALLmsAlgorithm.define(''' HAL_LMS_RESERVED = 0, @@ -336,6 +340,7 @@ class LocalDigest(object): def __init__(self, hsm, handle, algorithm, key): from Crypto.Hash import HMAC, SHA, SHA224, SHA256, SHA384, SHA512 + from struct import pack self.hsm = hsm self.handle = handle self.algorithm = algorithm @@ -351,7 +356,7 @@ class LocalDigest(object): } h = self._algorithms[algorithm] self.digest_length = h.digest_size - self.algorithm_id = chr(0x30) + chr(2 + len(h.oid)) + h.oid + self.algorithm_id = pack("BB", 0x30, 2 + len(h.oid)) + h.oid self._context = HMAC.HMAC(key = key, digestmod = h) if key else h() def update(self, data): @@ -411,10 +416,10 @@ class PKey(Handle): def public_key(self): return self.hsm.pkey_get_public_key(self, self.public_key_len) - def sign(self, hash = 0, data = "", length = 1024): + def sign(self, hash = 0, data = b"", length = 1024): return self.hsm.pkey_sign(self, hash = hash, data = data, length = length) - def verify(self, hash = 0, data = "", signature = None): + def verify(self, hash = 0, data = b"", signature = None): self.hsm.pkey_verify(self, hash = hash, data = data, signature = signature) def set_attributes(self, attributes = None, **kwargs): @@ -423,9 +428,9 @@ class PKey(Handle): def get_attributes(self, attributes): attrs = self.hsm.pkey_get_attributes(self, attributes, 0) - attrs = dict((k, v) for k, v in attrs.iteritems() if v != HAL_PKEY_ATTRIBUTE_NIL) + attrs = dict((k, v) for k, v in attrs.items() if v != HAL_PKEY_ATTRIBUTE_NIL) result = dict((a, None) for a in attributes) - result.update(self.hsm.pkey_get_attributes(self, attrs.iterkeys(), sum(attrs.itervalues()))) + result.update(self.hsm.pkey_get_attributes(self, iter(attrs.keys()), sum(attrs.values()))) return result def export_pkey(self, pkey): @@ -461,7 +466,7 @@ class HSM(object): def _send(self, msg): # Expects an xdrlib.Packer msg = slip_encode(msg.get_buffer()) if self.debug_io: - logger.debug("send: %s", ":".join("{:02x}".format(ord(c)) for c in msg)) + logger.debug("send: %s", ":".join(binascii.hexlify(c) for c in msg)) self.socket.sendall(msg) def _recv(self, code): # Returns a ContextManagedUnpacker @@ -469,24 +474,29 @@ class HSM(object): while True: msg = [self.sockfile.read(1)] while msg[-1] != SLIP_END: - if msg[-1] == "": + if msg[-1] == b"": raise HAL_ERROR_RPC_TRANSPORT() msg.append(self.sockfile.read(1)) if self.debug_io: - logger.debug("recv: %s", ":".join("{:02x}".format(ord(c)) for c in msg)) - msg = slip_decode("".join(msg)) + logger.debug("recv: %s", ":".join(binascii.hexlify(c) for c in msg)) + msg = slip_decode(b"".join(msg)) if not msg: continue - msg = ContextManagedUnpacker("".join(msg)) + msg = ContextManagedUnpacker(b"".join(msg)) if msg.unpack_uint() != code: continue return msg - _pack_builtin = (((int, long), "_pack_uint"), + _pack_builtin = ((int, "_pack_uint"), (str, "_pack_bytes"), ((list, tuple, set), "_pack_array"), (dict, "_pack_items")) + try: + _pack_builtin += (long, "_pack_uint") + except NameError: # "long" merged with "int" in Python 3 + pass + def _pack_arg(self, packer, arg): if hasattr(arg, "xdr_packer"): return arg.xdr_packer(packer) @@ -511,7 +521,7 @@ class HSM(object): def _pack_items(self, packer, arg): packer.pack_uint(len(arg)) - for name, value in arg.iteritems(): + for name, value in arg.items(): self._pack_arg(packer, name) self._pack_arg(packer, HAL_PKEY_ATTRIBUTE_NIL if value is None else value) @@ -569,7 +579,7 @@ class HSM(object): def hash_initialize(self, alg, key = None, client = 0, session = 0, mixed_mode = None): if key is None: - key = "" + key = b"" if mixed_mode is None: mixed_mode = self.mixed_mode if mixed_mode: @@ -600,7 +610,7 @@ class HSM(object): logger.debug("Opened pkey %s", pkey.uuid) return pkey - def pkey_generate_rsa(self, keylen, flags = 0, exponent = "\x01\x00\x01", client = 0, session = 0): + def pkey_generate_rsa(self, keylen, flags = 0, exponent = b"\x01\x00\x01", client = 0, session = 0): with self.rpc(RPC_FUNC_PKEY_GENERATE_RSA, session, keylen, exponent, flags, client = client) as r: pkey = PKey(self, r.unpack_uint(), UUID(bytes = r.unpack_bytes())) logger.debug("Generated RSA pkey %s", pkey.uuid) @@ -656,14 +666,14 @@ class HSM(object): with self.rpc(RPC_FUNC_PKEY_GET_PUBLIC_KEY, pkey, length) as r: return r.unpack_bytes() - def pkey_sign(self, pkey, hash = 0, data = "", length = 1024): + def pkey_sign(self, pkey, hash = 0, data = b"", length = 1024): assert not hash or not data if isinstance(hash, LocalDigest): hash, data = 0, hash.finalize_padded(pkey) with self.rpc(RPC_FUNC_PKEY_SIGN, pkey, hash, data, length) as r: return r.unpack_bytes() - def pkey_verify(self, pkey, hash = 0, data = "", signature = None): + def pkey_verify(self, pkey, hash = 0, data = b"", signature = None): assert not hash or not data if isinstance(hash, LocalDigest): hash, data = 0, hash.finalize_padded(pkey) @@ -680,7 +690,7 @@ class HSM(object): attributes, s, length, u, client = client) as r: s = r.unpack_uint() n = r.unpack_uint() - for i in xrange(n): + for i in range(n): u = UUID(bytes = r.unpack_bytes()) yield u @@ -695,9 +705,9 @@ class HSM(object): if n != len(attributes): raise HAL_ERROR_RPC_PROTOCOL_ERROR if attributes_buffer_len > 0: - return dict((r.unpack_uint(), r.unpack_bytes()) for i in xrange(n)) + return dict((r.unpack_uint(), r.unpack_bytes()) for i in range(n)) else: - return dict((r.unpack_uint(), r.unpack_uint()) for i in xrange(n)) + return dict((r.unpack_uint(), r.unpack_uint()) for i in range(n)) def pkey_export(self, pkey, kekek, pkcs8_max = 2560, kek_max = 512): with self.rpc(RPC_FUNC_PKEY_EXPORT, pkey, kekek, pkcs8_max, kek_max) as r: -- cgit v1.2.3