aboutsummaryrefslogtreecommitdiff
path: root/cryptech/libhal.py
diff options
context:
space:
mode:
Diffstat (limited to 'cryptech/libhal.py')
-rw-r--r--cryptech/libhal.py78
1 files changed, 44 insertions, 34 deletions
diff --git a/cryptech/libhal.py b/cryptech/libhal.py
index 647dbd6..d7bc328 100644
--- a/cryptech/libhal.py
+++ b/cryptech/libhal.py
@@ -43,14 +43,15 @@ import uuid
import xdrlib
import socket
import logging
+import binascii
logger = logging.getLogger(__name__)
-SLIP_END = chr(0300) # indicates end of packet
-SLIP_ESC = chr(0333) # indicates byte stuffing
-SLIP_ESC_END = chr(0334) # ESC ESC_END means END data byte
-SLIP_ESC_ESC = chr(0335) # ESC ESC_ESC means ESC data byte
+SLIP_END = b"\300" # indicates end of packet
+SLIP_ESC = b"\333" # indicates byte stuffing
+SLIP_ESC_END = b"\334" # ESC ESC_END means END data byte
+SLIP_ESC_ESC = b"\335" # ESC ESC_ESC means ESC data byte
def slip_encode(buffer):
@@ -70,7 +71,7 @@ class HALError(Exception):
@classmethod
def define(cls, **kw):
assert len(kw) == 1
- name, text = kw.items()[0]
+ name, text = list(kw.items())[0]
e = type(name, (cls,), dict(__doc__ = text))
cls.table.append(e)
globals()[name] = e
@@ -122,7 +123,10 @@ HALError.define(HAL_ERROR_RPC_PROTOCOL_ERROR = "RPC protocol error")
HALError.define(HAL_ERROR_NOT_IMPLEMENTED = "Not implemented")
-class Enum(int):
+# When we finally drop Python 2 support, this class should probably be
+# replaced by enum.IntEnum from the Python 3 standard library.
+
+class CEnum(int):
def __new__(cls, name, value):
self = int.__new__(cls, value)
@@ -134,14 +138,14 @@ class Enum(int):
return self._name
def __repr__(self):
- return "<Enum:{0.__class__.__name__} {0._name}:{0:d}>".format(self)
+ return "<CEnum:{0.__class__.__name__} {0._name}:{0:d}>".format(self)
_counter = 0
@classmethod
def define(cls, names):
symbols = []
- for name in names.translate(None, "{}").split(","):
+ for name in names.split(","):
if "=" in name:
name, sep, expr = name.partition("=")
cls._counter = eval(expr.strip())
@@ -156,7 +160,7 @@ class Enum(int):
packer.pack_uint(self)
-class RPCFunc(Enum): pass
+class RPCFunc(CEnum): pass
RPCFunc.define('''
RPC_FUNC_GET_VERSION,
@@ -193,7 +197,7 @@ RPCFunc.define('''
RPC_FUNC_PKEY_GENERATE_HASHSIG,
''')
-class HALDigestAlgorithm(Enum): pass
+class HALDigestAlgorithm(CEnum): pass
HALDigestAlgorithm.define('''
HAL_DIGEST_ALGORITHM_NONE,
@@ -206,7 +210,7 @@ HALDigestAlgorithm.define('''
HAL_DIGEST_ALGORITHM_SHA512
''')
-class HALKeyType(Enum): pass
+class HALKeyType(CEnum): pass
HALKeyType.define('''
HAL_KEY_TYPE_NONE,
@@ -220,7 +224,7 @@ HALKeyType.define('''
HAL_KEY_TYPE_HASHSIG_LMOTS
''')
-class HALCurve(Enum): pass
+class HALCurve(CEnum): pass
HALCurve.define('''
HAL_CURVE_NONE,
@@ -229,7 +233,7 @@ HALCurve.define('''
HAL_CURVE_P521
''')
-class HALUser(Enum): pass
+class HALUser(CEnum): pass
HALUser.define('''
HAL_USER_NONE,
@@ -238,7 +242,7 @@ HALUser.define('''
HAL_USER_WHEEL
''')
-class HALLmotsAlgorithm(Enum): pass
+class HALLmotsAlgorithm(CEnum): pass
HALLmotsAlgorithm.define('''
HAL_LMOTS_RESERVED = 0,
@@ -248,7 +252,7 @@ HALLmotsAlgorithm.define('''
HAL_LMOTS_SHA256_N32_W8 = 4
''')
-class HALLmsAlgorithm(Enum): pass
+class HALLmsAlgorithm(CEnum): pass
HALLmsAlgorithm.define('''
HAL_LMS_RESERVED = 0,
@@ -336,6 +340,7 @@ class LocalDigest(object):
def __init__(self, hsm, handle, algorithm, key):
from Crypto.Hash import HMAC, SHA, SHA224, SHA256, SHA384, SHA512
+ from struct import pack
self.hsm = hsm
self.handle = handle
self.algorithm = algorithm
@@ -351,7 +356,7 @@ class LocalDigest(object):
}
h = self._algorithms[algorithm]
self.digest_length = h.digest_size
- self.algorithm_id = chr(0x30) + chr(2 + len(h.oid)) + h.oid
+ self.algorithm_id = pack("BB", 0x30, 2 + len(h.oid)) + h.oid
self._context = HMAC.HMAC(key = key, digestmod = h) if key else h()
def update(self, data):
@@ -411,10 +416,10 @@ class PKey(Handle):
def public_key(self):
return self.hsm.pkey_get_public_key(self, self.public_key_len)
- def sign(self, hash = 0, data = "", length = 1024):
+ def sign(self, hash = 0, data = b"", length = 1024):
return self.hsm.pkey_sign(self, hash = hash, data = data, length = length)
- def verify(self, hash = 0, data = "", signature = None):
+ def verify(self, hash = 0, data = b"", signature = None):
self.hsm.pkey_verify(self, hash = hash, data = data, signature = signature)
def set_attributes(self, attributes = None, **kwargs):
@@ -423,9 +428,9 @@ class PKey(Handle):
def get_attributes(self, attributes):
attrs = self.hsm.pkey_get_attributes(self, attributes, 0)
- attrs = dict((k, v) for k, v in attrs.iteritems() if v != HAL_PKEY_ATTRIBUTE_NIL)
+ attrs = dict((k, v) for k, v in attrs.items() if v != HAL_PKEY_ATTRIBUTE_NIL)
result = dict((a, None) for a in attributes)
- result.update(self.hsm.pkey_get_attributes(self, attrs.iterkeys(), sum(attrs.itervalues())))
+ result.update(self.hsm.pkey_get_attributes(self, iter(attrs.keys()), sum(attrs.values())))
return result
def export_pkey(self, pkey):
@@ -461,7 +466,7 @@ class HSM(object):
def _send(self, msg): # Expects an xdrlib.Packer
msg = slip_encode(msg.get_buffer())
if self.debug_io:
- logger.debug("send: %s", ":".join("{:02x}".format(ord(c)) for c in msg))
+ logger.debug("send: %s", ":".join(binascii.hexlify(c) for c in msg))
self.socket.sendall(msg)
def _recv(self, code): # Returns a ContextManagedUnpacker
@@ -469,24 +474,29 @@ class HSM(object):
while True:
msg = [self.sockfile.read(1)]
while msg[-1] != SLIP_END:
- if msg[-1] == "":
+ if msg[-1] == b"":
raise HAL_ERROR_RPC_TRANSPORT()
msg.append(self.sockfile.read(1))
if self.debug_io:
- logger.debug("recv: %s", ":".join("{:02x}".format(ord(c)) for c in msg))
- msg = slip_decode("".join(msg))
+ logger.debug("recv: %s", ":".join(binascii.hexlify(c) for c in msg))
+ msg = slip_decode(b"".join(msg))
if not msg:
continue
- msg = ContextManagedUnpacker("".join(msg))
+ msg = ContextManagedUnpacker(b"".join(msg))
if msg.unpack_uint() != code:
continue
return msg
- _pack_builtin = (((int, long), "_pack_uint"),
+ _pack_builtin = ((int, "_pack_uint"),
(str, "_pack_bytes"),
((list, tuple, set), "_pack_array"),
(dict, "_pack_items"))
+ try:
+ _pack_builtin += (long, "_pack_uint")
+ except NameError: # "long" merged with "int" in Python 3
+ pass
+
def _pack_arg(self, packer, arg):
if hasattr(arg, "xdr_packer"):
return arg.xdr_packer(packer)
@@ -511,7 +521,7 @@ class HSM(object):
def _pack_items(self, packer, arg):
packer.pack_uint(len(arg))
- for name, value in arg.iteritems():
+ for name, value in arg.items():
self._pack_arg(packer, name)
self._pack_arg(packer, HAL_PKEY_ATTRIBUTE_NIL if value is None else value)
@@ -569,7 +579,7 @@ class HSM(object):
def hash_initialize(self, alg, key = None, client = 0, session = 0, mixed_mode = None):
if key is None:
- key = ""
+ key = b""
if mixed_mode is None:
mixed_mode = self.mixed_mode
if mixed_mode:
@@ -600,7 +610,7 @@ class HSM(object):
logger.debug("Opened pkey %s", pkey.uuid)
return pkey
- def pkey_generate_rsa(self, keylen, flags = 0, exponent = "\x01\x00\x01", client = 0, session = 0):
+ def pkey_generate_rsa(self, keylen, flags = 0, exponent = b"\x01\x00\x01", client = 0, session = 0):
with self.rpc(RPC_FUNC_PKEY_GENERATE_RSA, session, keylen, exponent, flags, client = client) as r:
pkey = PKey(self, r.unpack_uint(), UUID(bytes = r.unpack_bytes()))
logger.debug("Generated RSA pkey %s", pkey.uuid)
@@ -656,14 +666,14 @@ class HSM(object):
with self.rpc(RPC_FUNC_PKEY_GET_PUBLIC_KEY, pkey, length) as r:
return r.unpack_bytes()
- def pkey_sign(self, pkey, hash = 0, data = "", length = 1024):
+ def pkey_sign(self, pkey, hash = 0, data = b"", length = 1024):
assert not hash or not data
if isinstance(hash, LocalDigest):
hash, data = 0, hash.finalize_padded(pkey)
with self.rpc(RPC_FUNC_PKEY_SIGN, pkey, hash, data, length) as r:
return r.unpack_bytes()
- def pkey_verify(self, pkey, hash = 0, data = "", signature = None):
+ def pkey_verify(self, pkey, hash = 0, data = b"", signature = None):
assert not hash or not data
if isinstance(hash, LocalDigest):
hash, data = 0, hash.finalize_padded(pkey)
@@ -680,7 +690,7 @@ class HSM(object):
attributes, s, length, u, client = client) as r:
s = r.unpack_uint()
n = r.unpack_uint()
- for i in xrange(n):
+ for i in range(n):
u = UUID(bytes = r.unpack_bytes())
yield u
@@ -695,9 +705,9 @@ class HSM(object):
if n != len(attributes):
raise HAL_ERROR_RPC_PROTOCOL_ERROR
if attributes_buffer_len > 0:
- return dict((r.unpack_uint(), r.unpack_bytes()) for i in xrange(n))
+ return dict((r.unpack_uint(), r.unpack_bytes()) for i in range(n))
else:
- return dict((r.unpack_uint(), r.unpack_uint()) for i in xrange(n))
+ return dict((r.unpack_uint(), r.unpack_uint()) for i in range(n))
def pkey_export(self, pkey, kekek, pkcs8_max = 2560, kek_max = 512):
with self.rpc(RPC_FUNC_PKEY_EXPORT, pkey, kekek, pkcs8_max, kek_max) as r: