From fa3feddf3a25e34db2ac57ff8e962f13db07bf40 Mon Sep 17 00:00:00 2001 From: Rob Austein Date: Mon, 25 May 2020 19:33:47 -0400 Subject: Untested conversion to support Python 3 --- cryptech/py11/__init__.py | 4 +- cryptech/py11/attributes.py | 164 ++++++------ cryptech/py11/mutex.py | 92 +++---- cryptech/py11/types.py | 106 ++++---- scripts/build-attributes | 526 +++++++++++++++++++------------------- scripts/build-py11-attributes | 8 +- scripts/py11-test.py | 193 +++++++------- scripts/test-hsmcheck | 202 +++++++-------- scripts/thready-time-signature.py | 16 +- scripts/time-signature.py | 10 +- unit_tests.py | 44 ++-- 11 files changed, 685 insertions(+), 680 deletions(-) diff --git a/cryptech/py11/__init__.py b/cryptech/py11/__init__.py index 1618a76..740ecb6 100644 --- a/cryptech/py11/__init__.py +++ b/cryptech/py11/__init__.py @@ -125,7 +125,7 @@ class PKCS11 (object): self.d.C_GetSlotList(CK_TRUE, None, byref(count)) slots = (CK_SLOT_ID * count.value)() self.d.C_GetSlotList(CK_TRUE, slots, byref(count)) - return tuple(slots[i] for i in xrange(count.value)) + return tuple(slots[i] for i in range(count.value)) def C_GetTokenInfo(self, slot_id): token_info = CK_TOKEN_INFO() @@ -169,7 +169,7 @@ class PKCS11 (object): count = CK_ULONG(1) while count.value > 0: self.d.C_FindObjects(session, objects, len(objects), byref(count)) - for i in xrange(count.value): + for i in range(count.value): yield objects[i] def FindObjects(self, session, template = None, **kwargs): diff --git a/cryptech/py11/attributes.py b/cryptech/py11/attributes.py index 56473ad..c6a87da 100644 --- a/cryptech/py11/attributes.py +++ b/cryptech/py11/attributes.py @@ -2,100 +2,102 @@ # module from the Python standard library. from struct import pack, unpack +from binascii import hexlify, unhexlify from ctypes import * from .types import * from .constants import * class Attribute(object): - @classmethod - def new(cls, attribute_name, type_name): - from . import constants - from . import types - assert attribute_name.startswith("CKA_") - attribute_number = getattr(constants, attribute_name) - type_class = getattr(types, type_name, str) - if type_class is CK_BBOOL: - cls = Attribute_CK_BBOOL - elif type_class is CK_ULONG: - cls = Attribute_CK_ULONG - elif type_name == "biginteger": - cls = Attribute_biginteger - return cls(attribute_name, attribute_number, type_name, type_class) - - def __init__(self, attribute_name, attribute_number, type_name, type_class): - assert attribute_name.startswith("CKA_") - self.name = attribute_name - self.code = attribute_number - self.type_name = type_name - self.type_class = type_class - - def encode(self, x): return x - def decode(self, x): return x + @classmethod + def new(cls, attribute_name, type_name): + from . import constants + from . import types + assert attribute_name.startswith("CKA_") + attribute_number = getattr(constants, attribute_name) + type_class = getattr(types, type_name, str) + if type_class is CK_BBOOL: + cls = Attribute_CK_BBOOL + elif type_class is CK_ULONG: + cls = Attribute_CK_ULONG + elif type_name == "biginteger": + cls = Attribute_biginteger + return cls(attribute_name, attribute_number, type_name, type_class) + + def __init__(self, attribute_name, attribute_number, type_name, type_class): + assert attribute_name.startswith("CKA_") + self.name = attribute_name + self.code = attribute_number + self.type_name = type_name + self.type_class = type_class + + def encode(self, x): return x + def decode(self, x): return x class Attribute_CK_BBOOL(Attribute): - def encode(self, x): return chr(int(x)) - def decode(self, x): return bool(ord(x)) + def encode(self, x): return chr(int(x)) + def decode(self, x): return bool(ord(x)) class Attribute_CK_ULONG(Attribute): - def encode(self, x): return pack("L", x) - def decode(self, x): return unpack("L", x)[0] + def encode(self, x): return pack("L", x) + def decode(self, x): return unpack("L", x)[0] class Attribute_biginteger(Attribute): - def encode(self, x): return "\x00" if x == 0 else ("%0*x" % (((x.bit_length() + 7) / 8) * 2, x)).decode("hex") - def decode(self, x): return long(x.encode("hex"), 16) + def encode(self, x): return "\x00" if x == 0 else unhexlify("{0:0{1}x".format(x, ((x.bit_length() + 7) / 8) * 2)) + def decode(self, x): return int(hexlify(x), 16) class AttributeDB(object): - def __init__(self): - from .attribute_map import attribute_map - self.db = {} - for attribute_name, type_name in attribute_map.iteritems(): - a = Attribute.new(attribute_name, type_name) - self.db[a.name] = a - self.db[a.code] = a - - def encode(self, k, v): - return self.db[k].encode(v) if k in self.db else v - - def decode(self, k, v): - return self.db[k].decode(v) if k in self.db else v - - def getvalue_create_template(self, attributes): - attributes = tuple(self.db[a].code for a in attributes) - template = (CK_ATTRIBUTE * len(attributes))() - for i in xrange(len(attributes)): - template[i].type = attributes[i] - template[i].pValue = None - template[i].ulValueLen = 0 - return template - - def getvalue_allocate_template(self, template): - for t in template: - t.pValue = create_string_buffer(t.ulValueLen) - - def from_ctypes(self, template): - return dict((t.type, self.decode(t.type, t.pValue[:t.ulValueLen])) - for t in template) - - def to_ctypes(self, attributes): - attributes = tuple(attributes.iteritems() - if isinstance(attributes, dict) else - attributes) - template = (CK_ATTRIBUTE * len(attributes))() - for i, kv in enumerate(attributes): - k, v = kv - if k in self.db: - a = self.db[k] - k, v = a.code, a.encode(v) - template[i].type = k - template[i].pValue = create_string_buffer(v) - template[i].ulValueLen = len(v) - return template - - def attribute_name(self, code): - return self.db[code].name - - def attribute_code(self, name): - return self.db[name].code + def __init__(self): + from .attribute_map import attribute_map + self.db = {} + for attribute_name, type_name in attribute_map.items(): + a = Attribute.new(attribute_name, type_name) + self.db[a.name] = a + self.db[a.code] = a + + def encode(self, k, v): + return self.db[k].encode(v) if k in self.db else v + + def decode(self, k, v): + return self.db[k].decode(v) if k in self.db else v + + def getvalue_create_template(self, attributes): + attributes = tuple(self.db[a].code for a in attributes) + template = (CK_ATTRIBUTE * len(attributes))() + for i in range(len(attributes)): + template[i].type = attributes[i] + template[i].pValue = None + template[i].ulValueLen = 0 + return template + + def getvalue_allocate_template(self, template): + for t in template: + t.pValue = create_string_buffer(t.ulValueLen) + + def from_ctypes(self, template): + return dict((t.type, self.decode(t.type, t.pValue[:t.ulValueLen])) + for t in template) + + def to_ctypes(self, attributes): + if isinstance(attributes, dict): + attributes = tuple(iter(attributes.items())) + else: + attributes = tuple(attributes) + template = (CK_ATTRIBUTE * len(attributes))() + for i, kv in enumerate(attributes): + k, v = kv + if k in self.db: + a = self.db[k] + k, v = a.code, a.encode(v) + template[i].type = k + template[i].pValue = create_string_buffer(v) + template[i].ulValueLen = len(v) + return template + + def attribute_name(self, code): + return self.db[code].name + + def attribute_code(self, name): + return self.db[name].code diff --git a/cryptech/py11/mutex.py b/cryptech/py11/mutex.py index da2123c..d3807f6 100644 --- a/cryptech/py11/mutex.py +++ b/cryptech/py11/mutex.py @@ -33,57 +33,57 @@ encoded_format = "=L" encoded_length = len(pack(encoded_format, 0)) encoded_type = CK_BYTE * encoded_length -handle_max = unpack(encoded_format, chr(0xff) * encoded_length)[0] +handle_max = unpack(encoded_format, b"\xff" * encoded_length)[0] class Mutex(object): - def __init__(self, handle): - from threading import Lock - self.encoded = encoded_type(*handle) - self.lock = Lock() + def __init__(self, handle): + from threading import Lock + self.encoded = encoded_type(*handle) + self.lock = Lock() def p11_callback(func): - from threading import ThreadError - def wrapper(self, arg): - try: - func(self, arg) - except ThreadError, e: - print "Failed: %s" % e - return CKR_MUTEX_NOT_LOCKED.ckr_code - except Exception, e: - print "Failed: %s" % e - return CKR_FUNCTION_FAILED.ckr_code - else: - return CKR_OK - return wrapper + from threading import ThreadError + def wrapper(self, arg): + try: + func(self, arg) + except ThreadError as e: + print("Failed: %s" % e) + return CKR_MUTEX_NOT_LOCKED.ckr_code + except Exception as e: + print("Failed: %s" % e) + return CKR_FUNCTION_FAILED.ckr_code + else: + return CKR_OK + return wrapper class MutexDB(object): - def __init__(self): - self.mutexes = {} - self.next_handle = 0 - - def find_free_handle(self): - if len(self.mutexes) > handle_max: - raise RuntimeError - while self.next_handle in self.mutexes: - self.next_handle = (self.next_handle + 1) & handle_max - return pack(encoded_format, self.next_handle) - - @p11_callback - def create(self, handle_ptr): - handle = self.find_free_handle() - self.mutexes[handle] = Mutex(handle) - handle_ptr[0] = self.mutexes[handle].encoded - - @p11_callback - def destroy(self, handle): - del self.mutexes[handle[:encoded_length]] - - @p11_callback - def lock(self, handle): - self.mutexes[handle[:encoded_length]].lock.acquire() - - @p11_callback - def unlock(self, handle): - self.mutexes[handle[:encoded_length]].lock.release() + def __init__(self): + self.mutexes = {} + self.next_handle = 0 + + def find_free_handle(self): + if len(self.mutexes) > handle_max: + raise RuntimeError + while self.next_handle in self.mutexes: + self.next_handle = (self.next_handle + 1) & handle_max + return pack(encoded_format, self.next_handle) + + @p11_callback + def create(self, handle_ptr): + handle = self.find_free_handle() + self.mutexes[handle] = Mutex(handle) + handle_ptr[0] = self.mutexes[handle].encoded + + @p11_callback + def destroy(self, handle): + del self.mutexes[handle[:encoded_length]] + + @p11_callback + def lock(self, handle): + self.mutexes[handle[:encoded_length]].lock.acquire() + + @p11_callback + def unlock(self, handle): + self.mutexes[handle[:encoded_length]].lock.release() diff --git a/cryptech/py11/types.py b/cryptech/py11/types.py index 1b233e7..91e2d8b 100644 --- a/cryptech/py11/types.py +++ b/cryptech/py11/types.py @@ -49,17 +49,17 @@ CK_ULONG_PTR = POINTER(CK_ULONG) CK_VOID_PTR_PTR = POINTER(CK_VOID_PTR) class CK_VERSION (Structure): - _fields_ = [("major", c_ubyte), - ("minor", c_ubyte)] + _fields_ = [("major", c_ubyte), + ("minor", c_ubyte)] CK_VERSION_PTR = POINTER(CK_VERSION) class CK_INFO (Structure): - _fields_ = [("cryptokiVersion", CK_VERSION), - ("manufacturerID", CK_UTF8CHAR * 32), - ("flags", CK_FLAGS), - ("libraryDescription", CK_UTF8CHAR * 32), - ("libraryVersion", CK_VERSION)] + _fields_ = [("cryptokiVersion", CK_VERSION), + ("manufacturerID", CK_UTF8CHAR * 32), + ("flags", CK_FLAGS), + ("libraryDescription", CK_UTF8CHAR * 32), + ("libraryVersion", CK_VERSION)] CK_INFO_PTR = POINTER(CK_INFO) @@ -70,33 +70,33 @@ CK_SLOT_ID = CK_ULONG CK_SLOT_ID_PTR = POINTER(CK_SLOT_ID) class CK_SLOT_INFO (Structure): - _fields_ = [("slotDescription", CK_UTF8CHAR * 64), - ("manufacturerID", CK_UTF8CHAR * 32), - ("flags", CK_FLAGS), - ("hardwareVersion", CK_VERSION), - ("firmwareVersion", CK_VERSION)] + _fields_ = [("slotDescription", CK_UTF8CHAR * 64), + ("manufacturerID", CK_UTF8CHAR * 32), + ("flags", CK_FLAGS), + ("hardwareVersion", CK_VERSION), + ("firmwareVersion", CK_VERSION)] CK_SLOT_INFO_PTR = POINTER(CK_SLOT_INFO) class CK_TOKEN_INFO (Structure): - _fields_ = [("label", CK_UTF8CHAR * 32), - ("manufacturerID", CK_UTF8CHAR * 32), - ("model", CK_UTF8CHAR * 16), - ("serialNumber", CK_CHAR * 16), - ("flags", CK_FLAGS), - ("ulMaxSessionCount", CK_ULONG), - ("ulSessionCount", CK_ULONG), - ("ulMaxRwSessionCount", CK_ULONG), - ("ulRwSessionCount", CK_ULONG), - ("ulMaxPinLen", CK_ULONG), - ("ulMinPinLen", CK_ULONG), - ("ulTotalPublicMemory", CK_ULONG), - ("ulFreePublicMemory", CK_ULONG), - ("ulTotalPrivateMemory", CK_ULONG), - ("ulFreePrivateMemory", CK_ULONG), - ("hardwareVersion", CK_VERSION), - ("firmwareVersion", CK_VERSION), - ("utcTime", CK_CHAR * 16)] + _fields_ = [("label", CK_UTF8CHAR * 32), + ("manufacturerID", CK_UTF8CHAR * 32), + ("model", CK_UTF8CHAR * 16), + ("serialNumber", CK_CHAR * 16), + ("flags", CK_FLAGS), + ("ulMaxSessionCount", CK_ULONG), + ("ulSessionCount", CK_ULONG), + ("ulMaxRwSessionCount", CK_ULONG), + ("ulRwSessionCount", CK_ULONG), + ("ulMaxPinLen", CK_ULONG), + ("ulMinPinLen", CK_ULONG), + ("ulTotalPublicMemory", CK_ULONG), + ("ulFreePublicMemory", CK_ULONG), + ("ulTotalPrivateMemory",CK_ULONG), + ("ulFreePrivateMemory", CK_ULONG), + ("hardwareVersion", CK_VERSION), + ("firmwareVersion", CK_VERSION), + ("utcTime", CK_CHAR * 16)] CK_TOKEN_INFO_PTR = POINTER(CK_TOKEN_INFO) @@ -109,10 +109,10 @@ CK_USER_TYPE = CK_ULONG CK_STATE = CK_ULONG class CK_SESSION_INFO (Structure): - _fields_ = [("slotID", CK_SLOT_ID), - ("state", CK_STATE), - ("flags", CK_FLAGS), - ("ulDeviceError", CK_ULONG)] + _fields_ = [("slotID", CK_SLOT_ID), + ("state", CK_STATE), + ("flags", CK_FLAGS), + ("ulDeviceError", CK_ULONG)] CK_SESSION_INFO_PTR = POINTER(CK_SESSION_INFO) @@ -133,32 +133,32 @@ CK_CERTIFICATE_TYPE = CK_ULONG CK_ATTRIBUTE_TYPE = CK_ULONG class CK_ATTRIBUTE (Structure): - _fields_ = [("type", CK_ATTRIBUTE_TYPE), - ("pValue", CK_VOID_PTR), - ("ulValueLen", CK_ULONG)] + _fields_ = [("type", CK_ATTRIBUTE_TYPE), + ("pValue", CK_VOID_PTR), + ("ulValueLen", CK_ULONG)] CK_ATTRIBUTE_PTR = POINTER(CK_ATTRIBUTE) class CK_DATE (Structure): - _fields_ = [("year", CK_CHAR * 4), - ("month", CK_CHAR * 2), - ("day", CK_CHAR * 2)] + _fields_ = [("year", CK_CHAR * 4), + ("month", CK_CHAR * 2), + ("day", CK_CHAR * 2)] CK_MECHANISM_TYPE = CK_ULONG CK_MECHANISM_TYPE_PTR = POINTER(CK_MECHANISM_TYPE) class CK_MECHANISM (Structure): - _fields_ = [("mechanism", CK_MECHANISM_TYPE), - ("pParameter", CK_VOID_PTR), - ("ulParameterLen", CK_ULONG)] + _fields_ = [("mechanism", CK_MECHANISM_TYPE), + ("pParameter", CK_VOID_PTR), + ("ulParameterLen", CK_ULONG)] CK_MECHANISM_PTR = POINTER(CK_MECHANISM) class CK_MECHANISM_INFO (Structure): - _fields_ = [("ulMinKeySize", CK_ULONG), - ("ulMaxKeySize", CK_ULONG), - ("flags", CK_FLAGS)] + _fields_ = [("ulMinKeySize", CK_ULONG), + ("ulMaxKeySize", CK_ULONG), + ("flags", CK_FLAGS)] CK_MECHANISM_INFO_PTR = POINTER(CK_MECHANISM_INFO) @@ -172,12 +172,12 @@ CK_LOCKMUTEX = CFUNCTYPE(CK_RV, CK_VOID_PTR) CK_UNLOCKMUTEX = CFUNCTYPE(CK_RV, CK_VOID_PTR) class CK_C_INITIALIZE_ARGS (Structure): - _fields_ = [("CreateMutex", CK_CREATEMUTEX), - ("DestroyMutex", CK_DESTROYMUTEX), - ("LockMutex", CK_LOCKMUTEX), - ("UnlockMutex", CK_UNLOCKMUTEX), - ("flags", CK_FLAGS), - ("pReserved", CK_VOID_PTR)] + _fields_ = [("CreateMutex", CK_CREATEMUTEX), + ("DestroyMutex", CK_DESTROYMUTEX), + ("LockMutex", CK_LOCKMUTEX), + ("UnlockMutex", CK_UNLOCKMUTEX), + ("flags", CK_FLAGS), + ("pReserved", CK_VOID_PTR)] CK_C_INITIALIZE_ARGS_PTR = POINTER(CK_C_INITIALIZE_ARGS) @@ -188,7 +188,7 @@ CK_C_INITIALIZE_ARGS_PTR = POINTER(CK_C_INITIALIZE_ARGS) # gratuitous Python module loop too. class CK_FUNCTION_LIST (Structure): - pass + pass CK_FUNCTION_LIST_PTR = POINTER(CK_FUNCTION_LIST) CK_FUNCTION_LIST_PTR_PTR = POINTER(CK_FUNCTION_LIST_PTR) diff --git a/scripts/build-attributes b/scripts/build-attributes index 7a36bdc..e7c500e 100755 --- a/scripts/build-attributes +++ b/scripts/build-attributes @@ -47,318 +47,318 @@ import argparse def define_flags(flag_names): - """ - Flag definitions. Called later, here at front of program just to - make them easier to find. - """ - - flag_names.create("DEFAULT_VALUE", "Value field contains default") - flag_names.footnote( 1, "REQUIRED_BY_CREATEOBJECT") - flag_names.footnote( 2, "FORBIDDEN_BY_CREATEOBJECT") - flag_names.footnote( 3, "REQUIRED_BY_GENERATE") - flag_names.footnote( 4, "FORBIDDEN_BY_GENERATE") - flag_names.footnote( 5, "REQUIRED_BY_UNWRAP") - flag_names.footnote( 6, "FORBIDDEN_BY_UNWRAP") - flag_names.footnote( 7, "SENSITIVE") - flag_names.footnote( 8, "PERHAPS_MODIFIABLE") - flag_names.footnote( 9, "DEFAULT_IS_TOKEN_SPECIFIC") - flag_names.footnote(10, "ONLY_SO_USER_CAN_SET") - flag_names.footnote(11, "LATCHES_WHEN_TRUE") - flag_names.footnote(12, "LATCHES_WHEN_FALSE") + """ + Flag definitions. Called later, here at front of program just to + make them easier to find. + """ + + flag_names.create("DEFAULT_VALUE", "Value field contains default") + flag_names.footnote( 1, "REQUIRED_BY_CREATEOBJECT") + flag_names.footnote( 2, "FORBIDDEN_BY_CREATEOBJECT") + flag_names.footnote( 3, "REQUIRED_BY_GENERATE") + flag_names.footnote( 4, "FORBIDDEN_BY_GENERATE") + flag_names.footnote( 5, "REQUIRED_BY_UNWRAP") + flag_names.footnote( 6, "FORBIDDEN_BY_UNWRAP") + flag_names.footnote( 7, "SENSITIVE") + flag_names.footnote( 8, "PERHAPS_MODIFIABLE") + flag_names.footnote( 9, "DEFAULT_IS_TOKEN_SPECIFIC") + flag_names.footnote(10, "ONLY_SO_USER_CAN_SET") + flag_names.footnote(11, "LATCHES_WHEN_TRUE") + flag_names.footnote(12, "LATCHES_WHEN_FALSE") class PKCS11ParseError(Exception): - "Failure parsing PCKS #11 object definitions from YAML data." + "Failure parsing PCKS #11 object definitions from YAML data." def write_lines(*lines, **d): - """ - Utility to simplify writing formatted text to the output stream. - """ - - for line in lines: - args.output_file.write((line % d) + "\n") - - -class Flags(object): - """ - Descriptor flag database. - - Many of these are derived from PKCS #11 Table 15 footnotes - """ - - prefix = "P11_DESCRIPTOR_" # Prefix string for all descriptor flags - - def __init__(self): - self.names = [] - self.notes = {} - self.width = 0 - - def create(self, name, comment = None): """ - Create a descriptor flag. + Utility to simplify writing formatted text to the output stream. """ - assert len(self.names) < 32 - name = self.prefix + name - self.names.append((name, comment)) - if len(name) > self.width: - self.width = len(name) - - def footnote(self, number, name): - """ - Create a descriptor flag for a PKCS #11 table 15 footnote. - """ + for line in lines: + args.output_file.write((line % d) + "\n") - assert number not in self.notes - self.create(name, "Section 10.2 table 15 footnote #%2d" % number) - self.notes[number] = self.prefix + name - def write(self): +class Flags(object): """ - Generate the flags, assigning bit positions as we go. + Descriptor flag database. + + Many of these are derived from PKCS #11 Table 15 footnotes """ - assert len(self.names) < 32 - self.width = (((self.width + 4) >> 2) << 2) - 1 - bit = 1 - for name, comment in self.names: - format = "#define %(name)s 0x%(bit)08x" - if comment is not None: - format += " /* %(comment)s */" - write_lines(format, bit = bit, comment = comment, name = "%-*s" % (self.width, name)) - bit <<= 1 + prefix = "P11_DESCRIPTOR_" # Prefix string for all descriptor flags + def __init__(self): + self.names = [] + self.notes = {} + self.width = 0 -class AttributeNumbers(dict): - """ - Attribute names and numbers scraped (yuck) from pkcs11t.h. - """ - - def __init__(self, filename): - with open(filename, "r") as f: - for line in f: - word = line.split() - if len(word) <= 2 or word[0] != "#define" or not word[1].startswith("CKA_"): - continue - if word[2] in self: - continue - if word[2].startswith("(CKF_ARRAY_ATTRIBUTE|"): - word[2] = word[2].translate(None, "()").split("|")[1] - self[word[1]] = int(word[2], 16) + def create(self, name, comment = None): + """ + Create a descriptor flag. + """ + assert len(self.names) < 32 + name = self.prefix + name + self.names.append((name, comment)) + if len(name) > self.width: + self.width = len(name) -class Attribute(object): - """ - Definition of one attribute. - """ - - def __init__(self, name, type = None, footnotes = None, default = None, value = None, unimplemented = False): - assert value is None or default is None - self.name = name - self.type = type - self.footnotes = footnotes - self.default = self.convert_integers(default) - self.value = self.convert_integers(value) - self.unimplemented = unimplemented - - @staticmethod - def convert_integers(val): - """ - Convert a non-negative integer initialization value into a byte array. - """ + def footnote(self, number, name): + """ + Create a descriptor flag for a PKCS #11 table 15 footnote. + """ - if not isinstance(val, (int, long)): - return val - if val < 0: - raise ValueError("Negative integers not legal here: %s" % val) - bytes = [] - while val > 0: - bytes.insert(0, val & 0xFF) - val >>= 8 - return bytes or [0] - - def inherit(self, other): - """ - Merge values from paraent attribute definition, if any. - """ + assert number not in self.notes + self.create(name, "Section 10.2 table 15 footnote #%2d" % number) + self.notes[number] = self.prefix + name - for k in ("type", "footnotes", "default", "value"): - if getattr(self, k) is None: - setattr(self, k, getattr(other, k)) - self.unimplemented = self.unimplemented or other.unimplemented + def write(self): + """ + Generate the flags, assigning bit positions as we go. + """ - def format_flags(self): - """ - Generate the descriptor flags field. - """ + assert len(self.names) < 32 + self.width = (((self.width + 4) >> 2) << 2) - 1 + bit = 1 + for name, comment in self.names: + format = "#define %(name)s 0x%(bit)08x" + if comment is not None: + format += " /* %(comment)s */" + write_lines(format, bit = bit, comment = comment, name = "%-*s" % (self.width, name)) + bit <<= 1 - flags = [] - if self.footnotes: - flags.extend(flag_names.notes[f] for f in self.footnotes) - if self.value is None and self.default is not None: - flags.append("P11_DESCRIPTOR_DEFAULT_VALUE") - flags = " | ".join(flags) - return flags or "0" - def format_size(self): +class AttributeNumbers(dict): """ - Generate the descriptor size field. + Attribute names and numbers scraped (yuck) from pkcs11t.h. """ - if isinstance(self.type, str) and self.type.startswith("CK_"): - return "sizeof(%s)" % self.type - elif self.type in ("rfc2279string", "biginteger", "bytearray"): - return "0" - else: - raise PKCS11ParseError("Unknown meta-type %r" % self.type) - - def format_length(self): - """ - Generate the descriptor length field. - """ + def __init__(self, filename): + with open(filename, "r") as f: + for line in f: + word = line.split() + if len(word) <= 2 or word[0] != "#define" or not word[1].startswith("CKA_"): + continue + if word[2] in self: + continue + if word[2].startswith("(CKF_ARRAY_ATTRIBUTE|"): + word[2] = word[2].translate(None, "()").split("|")[1] + self[word[1]] = int(word[2], 16) - value = self.value or self.default - if isinstance(value, list): - return "sizeof(const_0x%s)" % "".join("%02x" % v for v in value) - elif value and isinstance(self.type, str) and self.type.startswith("CK_"): - return "sizeof(%s)" % self.type - else: - return "0" - def format_value(self): - """ - Generate the descriptor value field. +class Attribute(object): """ + Definition of one attribute. + """ + + def __init__(self, name, type = None, footnotes = None, default = None, value = None, unimplemented = False): + assert value is None or default is None + self.name = name + self.type = type + self.footnotes = footnotes + self.default = self.convert_integers(default) + self.value = self.convert_integers(value) + self.unimplemented = unimplemented + + @staticmethod + def convert_integers(val): + """ + Convert a non-negative integer initialization value into a byte array. + """ + + if not isinstance(val, int): + return val + if val < 0: + raise ValueError("Negative integers not legal here: %s" % val) + bytes = [] + while val > 0: + bytes.insert(0, val & 0xFF) + val >>= 8 + return bytes or [0] + + def inherit(self, other): + """ + Merge values from paraent attribute definition, if any. + """ + + for k in ("type", "footnotes", "default", "value"): + if getattr(self, k) is None: + setattr(self, k, getattr(other, k)) + self.unimplemented = self.unimplemented or other.unimplemented + + def format_flags(self): + """ + Generate the descriptor flags field. + """ + + flags = [] + if self.footnotes: + flags.extend(flag_names.notes[f] for f in self.footnotes) + if self.value is None and self.default is not None: + flags.append("P11_DESCRIPTOR_DEFAULT_VALUE") + flags = " | ".join(flags) + return flags or "0" + + def format_size(self): + """ + Generate the descriptor size field. + """ + + if isinstance(self.type, str) and self.type.startswith("CK_"): + return "sizeof(%s)" % self.type + elif self.type in ("rfc2279string", "biginteger", "bytearray"): + return "0" + else: + raise PKCS11ParseError("Unknown meta-type %r" % self.type) + + def format_length(self): + """ + Generate the descriptor length field. + """ + + value = self.value or self.default + if isinstance(value, list): + return "sizeof(const_0x%s)" % "".join("%02x" % v for v in value) + elif value and isinstance(self.type, str) and self.type.startswith("CK_"): + return "sizeof(%s)" % self.type + else: + return "0" + + def format_value(self): + """ + Generate the descriptor value field. + """ + + value = self.value or self.default + if not value: + return "NULL_PTR" + elif isinstance(value, list): + return "const_0x" + "".join("%02x" % v for v in value) + else: + return "&const_" + value + + def format_constant(self, constants): + """ + Generate constant initializer values. These are merged so that we + only end up declaring one copy of each initializer value no matter + how many attributes use it. + """ + + value = self.value or self.default + if not self.unimplemented and value: + if isinstance(value, list): + constants.add("static const CK_BYTE const_%s[] = { %s };" % ( + "0x" + "".join("%02x" % v for v in value), + ", ".join("0x%02x" % v for v in value))) + else: + constants.add("static const %s const_%s = %s;" % (self.type, value, value)) + + def generate(self): + """ + Generate the descriptor line for this attribute. + """ + + if not self.unimplemented: + args.output_file.write(" { %s, %s, %s, %s, %s },\n" % ( + self.name, self.format_size(), self.format_length(), self.format_value(), self.format_flags())) - value = self.value or self.default - if not value: - return "NULL_PTR" - elif isinstance(value, list): - return "const_0x" + "".join("%02x" % v for v in value) - else: - return "&const_" + value - def format_constant(self, constants): +class Class(object): """ - Generate constant initializer values. These are merged so that we - only end up declaring one copy of each initializer value no matter - how many attributes use it. + A PKCS #11 class. """ - value = self.value or self.default - if not self.unimplemented and value: - if isinstance(value, list): - constants.add("static const CK_BYTE const_%s[] = { %s };" % ( - "0x" + "".join("%02x" % v for v in value), - ", ".join("0x%02x" % v for v in value))) - else: - constants.add("static const %s const_%s = %s;" % (self.type, value, value)) + def __init__(self, db, name, superclass = None, concrete = False, **attrs): + assert all(a.startswith("CKA_") for a in attrs), "Non-attribute: %r" % [a for a in attrs if not a.startswith("CKA_")] + self.attributes = dict((k, Attribute(k, **v)) for k, v in attrs.items()) + self.db = db + self.name = name + self.superclass = superclass + self.concrete = concrete - def generate(self): - """ - Generate the descriptor line for this attribute. - """ + def inherit(self, other): + """ + Inherit attributes from parent type. + """ - if not self.unimplemented: - args.output_file.write(" { %s, %s, %s, %s, %s },\n" % ( - self.name, self.format_size(), self.format_length(), self.format_value(), self.format_flags())) + for k, v in other.attributes.items(): + if k not in self.attributes: + self.attributes[k] = v + else: + self.attributes[k].inherit(v) + def collect_constants(self, constants): + """ + Collect initialization constants for all attributes. + """ -class Class(object): - """ - A PKCS #11 class. - """ - - def __init__(self, db, name, superclass = None, concrete = False, **attrs): - assert all(a.startswith("CKA_") for a in attrs), "Non-attribute: %r" % [a for a in attrs if not a.startswith("CKA_")] - self.attributes = dict((k, Attribute(k, **v)) for k, v in attrs.iteritems()) - self.db = db - self.name = name - self.superclass = superclass - self.concrete = concrete - - def inherit(self, other): - """ - Inherit attributes from parent type. - """ + if self.concrete: + for a in self.attributes.values(): + a.format_constant(constants) - for k, v in other.attributes.iteritems(): - if k not in self.attributes: - self.attributes[k] = v - else: - self.attributes[k].inherit(v) + def generate(self): + """ + Generate a descriptor for this type. + """ - def collect_constants(self, constants): - """ - Collect initialization constants for all attributes. - """ + if self.concrete: - if self.concrete: - for a in self.attributes.itervalues(): - a.format_constant(constants) + write_lines("", + "static const p11_attribute_descriptor_t p11_attribute_descriptor_%(name)s[] = {", + name = self.name) - def generate(self): - """ - Generate a descriptor for this type. - """ + for a in sorted(self.attributes, key = lambda x: attribute_numbers[x]): + self.attributes[a].generate() - if self.concrete: + write_lines("};", + "", + "static const p11_descriptor_t p11_descriptor_%(name)s = {", + " p11_attribute_descriptor_%(name)s,", + " sizeof(p11_attribute_descriptor_%(name)s)/sizeof(p11_attribute_descriptor_t)", + "};", + name = self.name) - write_lines("", - "static const p11_attribute_descriptor_t p11_attribute_descriptor_%(name)s[] = {", - name = self.name) + def keyclassmap(self): + """ + Generate a keyclass map entry if this is a concrete key type. + """ - for a in sorted(self.attributes, key = lambda x: attribute_numbers[x]): - self.attributes[a].generate() - - write_lines("};", - "", - "static const p11_descriptor_t p11_descriptor_%(name)s = {", - " p11_attribute_descriptor_%(name)s,", - " sizeof(p11_attribute_descriptor_%(name)s)/sizeof(p11_attribute_descriptor_t)", - "};", - name = self.name) - - def keyclassmap(self): - """ - Generate a keyclass map entry if this is a concrete key type. - """ - - if self.concrete and all(k in self.attributes and self.attributes[k].value for k in ("CKA_CLASS", "CKA_KEY_TYPE")): - write_lines(" { %s, %s, &p11_descriptor_%s }," % ( - self.attributes["CKA_CLASS"].value, self.attributes["CKA_KEY_TYPE"].value, self.name)) + if self.concrete and all(k in self.attributes and self.attributes[k].value for k in ("CKA_CLASS", "CKA_KEY_TYPE")): + write_lines(" { %s, %s, &p11_descriptor_%s }," % ( + self.attributes["CKA_CLASS"].value, self.attributes["CKA_KEY_TYPE"].value, self.name)) class DB(object): - """ - Object type database parsed from YAML - """ - - def __init__(self, y): - self.ordered = [Class(self, **y) for y in y] - self.named = dict((c.name, c) for c in self.ordered) - for c in self.ordered: - if c.superclass is not None: - c.inherit(self.named[c.superclass]) - - def generate(self): """ - Generate output for everything in the database. - """ - - constants = set() - for c in self.ordered: - c.collect_constants(constants) - for constant in sorted(constants): - write_lines(constant) - for c in self.ordered: - c.generate() - write_lines("", - "static const p11_descriptor_keyclass_map_t p11_descriptor_keyclass_map[] = {") - for c in self.ordered: - c.keyclassmap() - write_lines("};") + Object type database parsed from YAML + """ + + def __init__(self, y): + self.ordered = [Class(self, **y) for y in y] + self.named = dict((c.name, c) for c in self.ordered) + for c in self.ordered: + if c.superclass is not None: + c.inherit(self.named[c.superclass]) + + def generate(self): + """ + Generate output for everything in the database. + """ + + constants = set() + for c in self.ordered: + c.collect_constants(constants) + for constant in sorted(constants): + write_lines(constant) + for c in self.ordered: + c.generate() + write_lines("", + "static const p11_descriptor_keyclass_map_t p11_descriptor_keyclass_map[] = {") + for c in self.ordered: + c.keyclassmap() + write_lines("};") # Main program diff --git a/scripts/build-py11-attributes b/scripts/build-py11-attributes index cacb63a..107d33b 100755 --- a/scripts/build-py11-attributes +++ b/scripts/build-py11-attributes @@ -54,10 +54,10 @@ parser.add_argument("output_file", help = "Output .py file", nargs = "?", typ args = parser.parse_args() attribute_map = dict( - (k, v["type"]) - for y in yaml.safe_load(args.yaml_file) - for k, v in y.iteritems() - if k.startswith("CKA_") and "type" in v) + (k, v["type"]) + for y in yaml.safe_load(args.yaml_file) + for k, v in y.items() + if k.startswith("CKA_") and "type" in v) args.output_file.write('''\ # This file was generated automatically from %(input)s by %(script)s. Do not edit this file directly. diff --git a/scripts/py11-test.py b/scripts/py11-test.py index 7a0e1d8..cf633a3 100644 --- a/scripts/py11-test.py +++ b/scripts/py11-test.py @@ -3,149 +3,150 @@ from cryptech.py11 import * from struct import pack, unpack +from binascii import hexlify, unhexlify def show_token(i, t, strip = True): - print "Token in slot #%d:" % i - fw = max(len(fn) for fn, ft in t.__class__._fields_) - for fn, ft in t.__class__._fields_: - fv = getattr(t, fn) - fn = "%-*s :" % (fw, fn) - if isinstance(fv, CK_VERSION): - print " ", fn, "%d.%d" % (fv.major, fv.minor) - elif isinstance(fv, (int, long)): - print " ", fn, fv - else: - print " ", fn, repr(str(fv).rstrip(" ") if strip else str(fv)) + print("Token in slot #%d:" % i) + fw = max(len(fn) for fn, ft in t.__class__._fields_) + for fn, ft in t.__class__._fields_: + fv = getattr(t, fn) + fn = "%-*s :" % (fw, fn) + if isinstance(fv, CK_VERSION): + print(" ", fn, "%d.%d" % (fv.major, fv.minor)) + elif isinstance(fv, int): + print(" ", fn, fv) + else: + print(" ", fn, repr(str(fv).rstrip(" ") if strip else str(fv))) def genkeypair_templates(name = "foo", **kwargs): - label_id = ((CKA_LABEL, name), (CKA_ID, name)) - return ( - # Public template - label_id + tuple(kwargs.iteritems()) + - ((CKA_VERIFY, True), - (CKA_ENCRYPT, False), - (CKA_WRAP, False), - (CKA_TOKEN, False)), - # Private template - label_id + - ((CKA_SIGN, True), - (CKA_DECRYPT, False), - (CKA_UNWRAP, False), - (CKA_SENSITIVE, True), - (CKA_TOKEN, False), - (CKA_PRIVATE, True), - (CKA_EXTRACTABLE, False))) + label_id = ((CKA_LABEL, name), (CKA_ID, name)) + return ( + # Public template + label_id + tuple(kwargs.items()) + + ((CKA_VERIFY, True), + (CKA_ENCRYPT, False), + (CKA_WRAP, False), + (CKA_TOKEN, False)), + # Private template + label_id + + ((CKA_SIGN, True), + (CKA_DECRYPT, False), + (CKA_UNWRAP, False), + (CKA_SENSITIVE, True), + (CKA_TOKEN, False), + (CKA_PRIVATE, True), + (CKA_EXTRACTABLE, False))) def show_keys(session, key_class, *attributes): - p11.C_FindObjectsInit(session, {CKA_CLASS: key_class}) - for o in p11.C_FindObjects(session): - a = p11.C_GetAttributeValue(session, o, attributes + (CKA_CLASS, CKA_KEY_TYPE, CKA_LABEL, CKA_ID, CKA_TOKEN, - CKA_PRIVATE, CKA_LOCAL, CKA_KEY_GEN_MECHANISM)) - w = max(len(p11.adb.attribute_name(k)) for k in a) - print "Object:", o - for k, v in a.iteritems(): - print " %*s (0x%08x): %r" % (w, p11.adb.attribute_name(k), k, v) - p11.C_FindObjectsFinal(session) + p11.C_FindObjectsInit(session, {CKA_CLASS: key_class}) + for o in p11.C_FindObjects(session): + a = p11.C_GetAttributeValue(session, o, attributes + (CKA_CLASS, CKA_KEY_TYPE, CKA_LABEL, CKA_ID, CKA_TOKEN, + CKA_PRIVATE, CKA_LOCAL, CKA_KEY_GEN_MECHANISM)) + w = max(len(p11.adb.attribute_name(k)) for k in a) + print("Object:", o) + for k, v in a.items(): + print(" %*s (0x%08x): %r" % (w, p11.adb.attribute_name(k), k, v)) + p11.C_FindObjectsFinal(session) def build_ecpoint(x, y): - h = (max(x.bit_length(), y.bit_length()) + 15) / 16 - d = chr(0x04) + ("%0*x%0*x" % (h, x, h, y)).decode("hex") - if len(d) < 128: - h = "%c%c" % (0x04, len(d)) - else: - n = len(d).bit_length() - h = ("%c%c" % (0x04, (n + 7) / 8)) + ("%0*x" % ((n + 15) / 16, len(d))).decode("hex") - return h + d + h = (max(x.bit_length(), y.bit_length()) + 15) / 16 + d = b"\04" + unhexlify("%0*x%0*x" % (h, x, h, y)) + if len(d) < 128: + h = b"%c%c" % (0x04, len(d)) + else: + n = len(d).bit_length() + h = (b"%c%c" % (0x04, (n + 7) / 8)) + unhexlify("%0*x" % ((n + 15) / 16, len(d))) + return h + d -ec_curve_oid_p256 = "".join(chr(i) for i in (0x06, 0x08, 0x2a, 0x86, 0x48, 0xce, 0x3d, 0x03, 0x01, 0x07)) -ec_curve_oid_p384 = "".join(chr(i) for i in (0x06, 0x05, 0x2b, 0x81, 0x04, 0x00, 0x22)) -ec_curve_oid_p521 = "".join(chr(i) for i in (0x06, 0x05, 0x2b, 0x81, 0x04, 0x00, 0x23)) +ec_curve_oid_p256 = b"\x06\x08\x2a\x86\x48\xce\x3d\x03\x01\x07" +ec_curve_oid_p384 = b"\x06\x05\x2b\x81\x04\x00\x22" +ec_curve_oid_p521 = b"\x06\x05\x2b\x81\x04\x00\x23" p11 = PKCS11("./libcryptech-pkcs11.so") if False: - print "Not using MUTEXes at all" - p11.C_Initialize() + print("Not using MUTEXes at all") + p11.C_Initialize() elif False: - print "Using OS MUTEXes" - p11.C_Initialize(CKF_OS_LOCKING_OK) + print("Using OS MUTEXes") + p11.C_Initialize(CKF_OS_LOCKING_OK) else: - print "Using our own pseudo-MUTEXes" - from cryptech.py11.mutex import MutexDB - mdb = MutexDB() - p11.C_Initialize(0, mdb.create, mdb.destroy, mdb.lock, mdb.unlock) + print("Using our own pseudo-MUTEXes") + from cryptech.py11.mutex import MutexDB + mdb = MutexDB() + p11.C_Initialize(0, mdb.create, mdb.destroy, mdb.lock, mdb.unlock) slots = p11.C_GetSlotList() -print -print "Listing slots and tokens" +print() +print("Listing slots and tokens") for i in slots: - show_token(i, p11.C_GetTokenInfo(i)) + show_token(i, p11.C_GetTokenInfo(i)) slot = slots[0] -print -print "Generating some random data" +print() +print("Generating some random data") session = p11.C_OpenSession(slot) random = p11.C_GenerateRandom(session, 33) -print len(random), "".join("%02x" % ord(i) for i in random) +print(len(random), hexlify(random)) -print -print "Logging in" +print() +print("Logging in") p11.C_Login(session, CKU_USER, "fnord") if False: - print - print "Generating RSA keypair" - rsa_templates = genkeypair_templates("RSA", CKA_MODULUS_BITS = 1024) - rsa_keypair = p11.C_GenerateKeyPair(session, CKM_RSA_PKCS_KEY_PAIR_GEN, rsa_templates[0], rsa_templates[1]) - print rsa_keypair - -print -print "Generating EC P256 keypair" + print() + print("Generating RSA keypair") + rsa_templates = genkeypair_templates("RSA", CKA_MODULUS_BITS = 1024) + rsa_keypair = p11.C_GenerateKeyPair(session, CKM_RSA_PKCS_KEY_PAIR_GEN, rsa_templates[0], rsa_templates[1]) + print(rsa_keypair) + +print() +print("Generating EC P256 keypair") ec_templates = genkeypair_templates("EC-P256", CKA_EC_PARAMS = ec_curve_oid_p256) ec_p256_keypair = p11.C_GenerateKeyPair(session, CKM_EC_KEY_PAIR_GEN, ec_templates[0], ec_templates[1]) -print ec_p256_keypair +print(ec_p256_keypair) -print -print "Generating EC P384 keypair" +print() +print("Generating EC P384 keypair") ec_templates = genkeypair_templates("EC-P384", CKA_EC_PARAMS = ec_curve_oid_p384) ec_p384_keypair = p11.C_GenerateKeyPair(session, CKM_EC_KEY_PAIR_GEN, ec_templates[0], ec_templates[1]) -print ec_p384_keypair +print(ec_p384_keypair) -print -print "Generating EC P521 keypair" +print() +print("Generating EC P521 keypair") ec_templates = genkeypair_templates("EC-P521", CKA_EC_PARAMS = ec_curve_oid_p521) ec_p521_keypair = p11.C_GenerateKeyPair(session, CKM_EC_KEY_PAIR_GEN, ec_templates[0], ec_templates[1]) -print ec_p521_keypair +print(ec_p521_keypair) -print -print "Listing keys" +print() +print("Listing keys") show_keys(session, CKO_PUBLIC_KEY, CKA_VERIFY, CKA_ENCRYPT, CKA_WRAP) show_keys(session, CKO_PRIVATE_KEY, CKA_SIGN, CKA_DECRYPT, CKA_UNWRAP) hamster = "Your mother was a hamster" -print -print "Testing P-256 signature" +print() +print("Testing P-256 signature") p11.C_SignInit(session, CKM_ECDSA_SHA256, ec_p256_keypair[1]) sig_p256 = p11.C_Sign(session, hamster) -print "Signature:", sig_p256.encode("hex") +print("Signature:", hexlify(sig_p256.encode)) -print -print "Testing P256 verification" +print() +print("Testing P256 verification") p11.C_VerifyInit(session, CKM_ECDSA_SHA256, ec_p256_keypair[0]) p11.C_Verify(session, hamster, sig_p256) -print "OK" # Verification failure throws an exception +print("OK") # Verification failure throws an exception -print -print "Testing C_CreateObject()" +print() +print("Testing C_CreateObject()") handle = p11.C_CreateObject(session, dict( CKA_CLASS = CKO_PUBLIC_KEY, CKA_KEY_TYPE = CKK_EC, @@ -158,16 +159,16 @@ handle = p11.C_CreateObject(session, dict( CKA_EC_POINT = build_ecpoint(0x8101ece47464a6ead70cf69a6e2bd3d88691a3262d22cba4f7635eaff26680a8, 0xd8a12ba61d599235f67d9cb4d58f1783d3ca43e78f0a5abaa624079936c0c3a9), CKA_EC_PARAMS = ec_curve_oid_p256)) -print handle +print(handle) -print -print "Verifying canned signature with public key installed via C_CreateObject()" +print() +print("Verifying canned signature with public key installed via C_CreateObject()") p11.C_VerifyInit(session, CKM_ECDSA, handle) p11.C_Verify(session, - "7c3e883ddc8bd688f96eac5e9324222c8f30f9d6bb59e9c5f020bd39ba2b8377".decode("hex"), - "7214bc9647160bbd39ff2f80533f5dc6ddd70ddf86bb815661e805d5d4e6f27c".decode("hex") + - "7d1ff961980f961bdaa3233b6209f4013317d3e3f9e1493592dbeaa1af2bc367".decode("hex")) -print "OK" + unhexlify("7c3e883ddc8bd688f96eac5e9324222c8f30f9d6bb59e9c5f020bd39ba2b8377"), + unhexlify("7214bc9647160bbd39ff2f80533f5dc6ddd70ddf86bb815661e805d5d4e6f27c") + + unhexlify("7d1ff961980f961bdaa3233b6209f4013317d3e3f9e1493592dbeaa1af2bc367")) +print("OK") p11.C_CloseAllSessions(slot) p11.C_Finalize() diff --git a/scripts/test-hsmcheck b/scripts/test-hsmcheck index b28d578..cb2efce 100755 --- a/scripts/test-hsmcheck +++ b/scripts/test-hsmcheck @@ -49,134 +49,134 @@ from xml.etree.ElementTree import ElementTree, Element, SubElement def write_config(): - """ - Write hsmcheck configuration file. - """ + """ + Write hsmcheck configuration file. + """ - e = Element("Configuration") - r = SubElement(e, "RepositoryList") - r = SubElement(r, "Repository", name = "default") - SubElement(r, "Module").text = args.driver - SubElement(r, "TokenLabel").text = args.token_label - SubElement(r, "PIN").text = args.pin - ElementTree(e).write(args.write_config) - args.write_config.flush() + e = Element("Configuration") + r = SubElement(e, "RepositoryList") + r = SubElement(r, "Repository", name = "default") + SubElement(r, "Module").text = args.driver + SubElement(r, "TokenLabel").text = args.token_label + SubElement(r, "PIN").text = args.pin + ElementTree(e).write(args.write_config) + args.write_config.flush() def hsmcheck(flag): - """ - Run hsmcheck program with appropriate options and verbosity. - """ - - assert flag in "rgsd" - cmd = (args.hsmcheck_binary, "-c", args.write_config.name, "-" + flag) - if args.verbose: - sys.stdout.write("Running: %s\n" % " ".join(cmd)) - if flag == "s": - text = check_output(cmd) - sys.stdout.write(text) - if not args.no_dnssec: - check_dnssec(text) - else: - check_call(cmd) + """ + Run hsmcheck program with appropriate options and verbosity. + """ + + assert flag in "rgsd" + cmd = (args.hsmcheck_binary, "-c", args.write_config.name, "-" + flag) + if args.verbose: + sys.stdout.write("Running: %s\n" % " ".join(cmd)) + if flag == "s": + text = check_output(cmd) + sys.stdout.write(text) + if not args.no_dnssec: + check_dnssec(text) + else: + check_call(cmd) def check_dnssec(text): - """ - Use DNSPython to attempt DNSSEC validation on "hsmcheck -s" output. + """ + Use DNSPython to attempt DNSSEC validation on "hsmcheck -s" output. - This requires the DNSPython toolkit, which in turn requires - PyCrypto; ECDSA support (not yet tested) requires a third package. - On Debian-family Linux, you can install these with: + This requires the DNSPython toolkit, which in turn requires + PyCrypto; ECDSA support (not yet tested) requires a third package. + On Debian-family Linux, you can install these with: - sudo apt-get install python-dnspython python-crypto python-ecdsa + sudo apt-get install python-dnspython python-crypto python-ecdsa - Equivalent packages exist for other platforms. - """ + Equivalent packages exist for other platforms. + """ - try: - from dns.exception import DNSException - import dns.dnssec - import dns.rrset - import Crypto.PublicKey.RSA - #import ecdsa.ecdsa - except ImportError: - sys.exit("Problem importing DNSPython or supporting crypto packages, are they installed?") + try: + from dns.exception import DNSException + import dns.dnssec + import dns.rrset + import Crypto.PublicKey.RSA + #import ecdsa.ecdsa + except ImportError: + sys.exit("Problem importing DNSPython or supporting crypto packages, are they installed?") - wired_ttl = "3600" - wired_rdclass = "IN" + wired_ttl = "3600" + wired_rdclass = "IN" - rrs = {} + rrs = {} - for line in text.splitlines(): + for line in text.splitlines(): - try: - name, ttl, rdclass, rdtype, rdata = line.split(None, 4) - except ValueError: - continue + try: + name, ttl, rdclass, rdtype, rdata = line.split(None, 4) + except ValueError: + continue - if ttl != wired_ttl or rdclass != wired_rdclass: - continue + if ttl != wired_ttl or rdclass != wired_rdclass: + continue - try: - rrs[name, rdtype].append(rdata) - except KeyError: - rrs[name, rdtype] = [rdata] + try: + rrs[name, rdtype].append(rdata) + except KeyError: + rrs[name, rdtype] = [rdata] - # Done parsing. We expect to have seen an A RRset, an RRSIG of that - # A RRset, and the DNSKEY that we'll need to verify the RRSIG. + # Done parsing. We expect to have seen an A RRset, an RRSIG of that + # A RRset, and the DNSKEY that we'll need to verify the RRSIG. - if len(rrs) != 3: - sys.exit("Expected two RRsets and an RRSIG, got %r" % rrs) + if len(rrs) != 3: + sys.exit("Expected two RRsets and an RRSIG, got %r" % rrs) - rrs = dict((rdtype, dns.rrset.from_text_list(name, int(wired_ttl), wired_rdclass, rdtype, rrs[name, rdtype])) - for name, rdtype in rrs) + rrs = dict((rdtype, dns.rrset.from_text_list(name, int(wired_ttl), wired_rdclass, rdtype, rrs[name, rdtype])) + for name, rdtype in rrs) - try: - dns.dnssec.validate(rrs["A"], rrs["RRSIG"], { rrs["DNSKEY"].name : rrs["DNSKEY"] }) - except DNSException, e: - sys.exit("DNSSEC verification failed: %s" % e) + try: + dns.dnssec.validate(rrs["A"], rrs["RRSIG"], { rrs["DNSKEY"].name : rrs["DNSKEY"] }) + except DNSException as e: + sys.exit("DNSSEC verification failed: %s" % e) - sys.stdout.write("\nDNSSEC verification successful!\n\n") + sys.stdout.write("\nDNSSEC verification successful!\n\n") # Main program. try: - default_config = NamedTemporaryFile() - default_hsmcheck = os.getenv("HSMCHECK", "hsmcheck") - default_driver = os.getenv("PKCS11_DRIVER", - os.path.realpath(os.path.join(os.path.dirname(sys.argv[0]), "..", "libpkcs11.so"))) - - parser = ArgumentParser(description = __doc__, formatter_class = ArgumentDefaultsHelpFormatter) - one_of = parser.add_mutually_exclusive_group() - one_of.add_argument("-a", "--all", "--rgsd", const = "rgsd", dest = "test", action = "store_const", help = "run all tests") - one_of.add_argument("-r", "--random", const = "r", dest = "test", action = "store_const", help = "just test random numbers") - one_of.add_argument("-g", "--generate", const = "g", dest = "test", action = "store_const", help = "just test key generation") - one_of.add_argument("-s", "--sign", const = "s", dest = "test", action = "store_const", help = "just test DNSSEC-signature") - one_of.add_argument("-d", "--delete", const = "d", dest = "test", action = "store_const", help = "just delete key") - parser.add_argument("-b", "--hsmcheck-binary", default = default_hsmcheck, help = "location of hsmcheck program") - parser.add_argument("-p", "--pin", default = "12345", help = "HSM PIN to use for tests") - parser.add_argument("-t", "--token-label", default = "Cryptech Token", help = "PKCS #11 label of Cryptech token") - parser.add_argument("-n", "--no-dnssec", action = "store_true", help = "do not attempt DNSSEC validation") - parser.add_argument("-v", "--verbose", action = "store_true", help = "bark more") - parser.add_argument("-D", "--driver", default = default_driver, help = "location of PKCS #11 driver") - parser.add_argument("-w", "--write-config", default = default_config, help = "write generated configuration to this file", - type = ArgumentFileType("w")) - parser.add_argument("--debug", action = "store_true", help = "debug this script") - parser.set_defaults(test = "rgsd") - args = parser.parse_args() - - try: - write_config() - for flag in args.test: - hsmcheck(flag) - - except Exception as e: - if args.debug: - raise - sys.exit("Failed: %s" % e) + default_config = NamedTemporaryFile() + default_hsmcheck = os.getenv("HSMCHECK", "hsmcheck") + default_driver = os.getenv("PKCS11_DRIVER", + os.path.realpath(os.path.join(os.path.dirname(sys.argv[0]), "..", "libpkcs11.so"))) + + parser = ArgumentParser(description = __doc__, formatter_class = ArgumentDefaultsHelpFormatter) + one_of = parser.add_mutually_exclusive_group() + one_of.add_argument("-a", "--all", "--rgsd", const = "rgsd", dest = "test", action = "store_const", help = "run all tests") + one_of.add_argument("-r", "--random", const = "r", dest = "test", action = "store_const", help = "just test random numbers") + one_of.add_argument("-g", "--generate", const = "g", dest = "test", action = "store_const", help = "just test key generation") + one_of.add_argument("-s", "--sign", const = "s", dest = "test", action = "store_const", help = "just test DNSSEC-signature") + one_of.add_argument("-d", "--delete", const = "d", dest = "test", action = "store_const", help = "just delete key") + parser.add_argument("-b", "--hsmcheck-binary", default = default_hsmcheck, help = "location of hsmcheck program") + parser.add_argument("-p", "--pin", default = "12345", help = "HSM PIN to use for tests") + parser.add_argument("-t", "--token-label", default = "Cryptech Token", help = "PKCS #11 label of Cryptech token") + parser.add_argument("-n", "--no-dnssec", action = "store_true", help = "do not attempt DNSSEC validation") + parser.add_argument("-v", "--verbose", action = "store_true", help = "bark more") + parser.add_argument("-D", "--driver", default = default_driver, help = "location of PKCS #11 driver") + parser.add_argument("-w", "--write-config", default = default_config, help = "write generated configuration to this file", + type = ArgumentFileType("w")) + parser.add_argument("--debug", action = "store_true", help = "debug this script") + parser.set_defaults(test = "rgsd") + args = parser.parse_args() + + try: + write_config() + for flag in args.test: + hsmcheck(flag) + + except Exception as e: + if args.debug: + raise + sys.exit("Failed: %s" % e) finally: - default_config.close() + default_config.close() diff --git a/scripts/thready-time-signature.py b/scripts/thready-time-signature.py index fb84c1e..e3bd0bf 100755 --- a/scripts/thready-time-signature.py +++ b/scripts/thready-time-signature.py @@ -8,7 +8,7 @@ import collections import threading import argparse import datetime -import Queue +import queue import sys from Crypto.Hash.SHA256 import SHA256Hash as SHA256 @@ -101,7 +101,7 @@ class Results(object): self.sum += t1 - t0 self.n += 1 if not args.quiet: - print "{:4d} {}".format(self.n, delta) + print("{:4d} {}".format(self.n, delta)) @property def mean(self): @@ -119,7 +119,7 @@ class Results(object): with self.lock: if self.t1 is None: self.t1 = datetime.datetime.now() - print "{0.name} sigs/second {0.sigs_per_second} mean {0.mean} throughput {0.throughput} (n {0.n}, t0 {0.t0}, t1 {0.t1})".format(self) + print("{0.name} sigs/second {0.sigs_per_second} mean {0.mean} throughput {0.throughput} (n {0.n}, t0 {0.t0}, t1 {0.t1})".format(self)) class Worker(threading.Thread): @@ -158,19 +158,19 @@ def main(): args = parser.parse_args() global q - q = Queue.Queue() + q = queue.Queue() global p11 p11 = PKCS11(args.libpkcs11) if not args.quiet: - print "Initializing" + print("Initializing") p11.C_Initialize(CKF_OS_LOCKING_OK) session = p11.C_OpenSession(args.slot) p11.C_Login(session, CKU_USER, args.pin) - for i in xrange(args.threads): + for i in range(args.threads): w = Worker() w.setDaemon(True) w.start() @@ -178,7 +178,7 @@ def main(): for name in args.keys: if not args.quiet: - print "Starting test with key {}, {} iterations".format(name, args.iterations) + print("Starting test with key {}, {} iterations".format(name, args.iterations)) k = key_table[name] @@ -187,7 +187,7 @@ def main(): global results results = Results(name) - for i in xrange(args.iterations): + for i in range(args.iterations): q.put(k) q.join() diff --git a/scripts/time-signature.py b/scripts/time-signature.py index 732eaa4..5c816ee 100755 --- a/scripts/time-signature.py +++ b/scripts/time-signature.py @@ -99,7 +99,7 @@ def main(): p11 = PKCS11(args.libpkcs11) if not args.quiet: - print "Initializing" + print("Initializing") p11.C_Initialize() session = p11.C_OpenSession(args.slot) @@ -107,25 +107,25 @@ def main(): for name in args.keys: - print "Starting test with key {}, {} iterations".format(name, args.iterations) + print("Starting test with key {}, {} iterations".format(name, args.iterations)) k = key_table[name] k.create(args, p11, session, "Your mother was a hamster") mean = datetime.timedelta(seconds = 0) - for i in xrange(args.iterations): + for i in range(args.iterations): t0 = datetime.datetime.now() p11.C_SignInit(session, k.ckm, k.handle) sig = p11.C_Sign(session, k.tbs) t1 = datetime.datetime.now() k.verify(sig) if not args.quiet: - print "{:4d} {}".format(i, t1 - t0) + print("{:4d} {}".format(i, t1 - t0)) mean += t1 - t0 mean /= args.iterations - print "mean {}".format(mean) + print("mean {}".format(mean)) p11.C_DestroyObject(session, k.handle) diff --git a/unit_tests.py b/unit_tests.py index e4c4a97..186daa9 100644 --- a/unit_tests.py +++ b/unit_tests.py @@ -6,6 +6,8 @@ PKCS #11 unit tests, using cryptech.py11 and the Python unit_test framework. import unittest import datetime +import binascii +import struct import sys from cryptech.py11 import * @@ -187,7 +189,7 @@ class TestDevice(TestCase): with self.assertRaises(CKR_OPERATION_NOT_INITIALIZED): for handle in p11.C_FindObjects(session): - self.assertIsInstance(handle, (int, long)) + self.assertIsInstance(handle, (int, int)) with self.assertRaises(CKR_OPERATION_NOT_INITIALIZED): p11.C_FindObjectsFinal(session) @@ -198,7 +200,7 @@ class TestDevice(TestCase): p11.C_FindObjectsInit(session, CKA_CLASS = CKO_PRIVATE_KEY) for handle in p11.C_FindObjects(session): - self.assertIsInstance(handle, (int, long)) + self.assertIsInstance(handle, (int, int)) p11.C_FindObjectsFinal(session) @@ -208,9 +210,9 @@ class TestKeys(TestCase): Tests involving keys. """ - oid_p256 = "".join(chr(i) for i in (0x06, 0x08, 0x2a, 0x86, 0x48, 0xce, 0x3d, 0x03, 0x01, 0x07)) - oid_p384 = "".join(chr(i) for i in (0x06, 0x05, 0x2b, 0x81, 0x04, 0x00, 0x22)) - oid_p521 = "".join(chr(i) for i in (0x06, 0x05, 0x2b, 0x81, 0x04, 0x00, 0x23)) + oid_p256 = b"\x06\x08\x2a\x86\x48\xce\x3d\x03\x01\x07" + oid_p384 = b"\x06\x05\x2b\x81\x04\x00\x22" + oid_p521 = b"\x06\x05\x2b\x81\x04\x00\x23" @classmethod def setUpClass(cls): @@ -337,22 +339,22 @@ class TestKeys(TestCase): @staticmethod def _build_ecpoint(x, y): bytes_per_coordinate = (max(x.bit_length(), y.bit_length()) + 15) / 16 - value = chr(0x04) + ("%0*x%0*x" % (bytes_per_coordinate, x, bytes_per_coordinate, y)).decode("hex") + value = b"\x04" + binascii.unhexlify("{0:0{2}x}{1:0{2}x}".format(x, y, bytes_per_coordinate)) if len(value) < 128: - length = chr(len(value)) + length = struct.pack("U", len(value)) else: n = len(value).bit_length() - length = chr((n + 7) / 8) + ("%0*x" % ((n + 15) / 16, len(value))).decode("hex") - tag = chr(0x04) + length = struct.pack("U", (n + 7) / 8) + binascii.unhexlify("{0:0{1}x}".format(len(value), (n + 15) / 16))) + tag = b"\x04" return tag + length + value def test_canned_ecdsa_p256_verify(self): "EC-P-256 verification test from Suite B Implementer's Guide to FIPS 186-3" Q = self._build_ecpoint(0x8101ece47464a6ead70cf69a6e2bd3d88691a3262d22cba4f7635eaff26680a8, 0xd8a12ba61d599235f67d9cb4d58f1783d3ca43e78f0a5abaa624079936c0c3a9) - H = "7c3e883ddc8bd688f96eac5e9324222c8f30f9d6bb59e9c5f020bd39ba2b8377".decode("hex") - r = "7214bc9647160bbd39ff2f80533f5dc6ddd70ddf86bb815661e805d5d4e6f27c".decode("hex") - s = "7d1ff961980f961bdaa3233b6209f4013317d3e3f9e1493592dbeaa1af2bc367".decode("hex") + H = binascii.unhexlify("7c3e883ddc8bd688f96eac5e9324222c8f30f9d6bb59e9c5f020bd39ba2b8377") + r = binascii.unhexlify("7214bc9647160bbd39ff2f80533f5dc6ddd70ddf86bb815661e805d5d4e6f27c") + s = binascii.unhexlify("7d1ff961980f961bdaa3233b6209f4013317d3e3f9e1493592dbeaa1af2bc367") handle = p11.C_CreateObject( session = self.session, CKA_CLASS = CKO_PUBLIC_KEY, @@ -370,9 +372,9 @@ class TestKeys(TestCase): "EC-P-384 verification test from Suite B Implementer's Guide to FIPS 186-3" Q = self._build_ecpoint(0x1fbac8eebd0cbf35640b39efe0808dd774debff20a2a329e91713baf7d7f3c3e81546d883730bee7e48678f857b02ca0, 0xeb213103bd68ce343365a8a4c3d4555fa385f5330203bdd76ffad1f3affb95751c132007e1b240353cb0a4cf1693bdf9) - H = "b9210c9d7e20897ab86597266a9d5077e8db1b06f7220ed6ee75bd8b45db37891f8ba5550304004159f4453dc5b3f5a1".decode("hex") - r = "a0c27ec893092dea1e1bd2ccfed3cf945c8134ed0c9f81311a0f4a05942db8dbed8dd59f267471d5462aa14fe72de856".decode("hex") - s = "20ab3f45b74f10b6e11f96a2c8eb694d206b9dda86d3c7e331c26b22c987b7537726577667adadf168ebbe803794a402".decode("hex") + H = binascii.unhexlify("b9210c9d7e20897ab86597266a9d5077e8db1b06f7220ed6ee75bd8b45db37891f8ba5550304004159f4453dc5b3f5a1") + r = binascii.unhexlify("a0c27ec893092dea1e1bd2ccfed3cf945c8134ed0c9f81311a0f4a05942db8dbed8dd59f267471d5462aa14fe72de856") + s = binascii.unhexlify("20ab3f45b74f10b6e11f96a2c8eb694d206b9dda86d3c7e331c26b22c987b7537726577667adadf168ebbe803794a402") handle = p11.C_CreateObject( session = self.session, CKA_CLASS = CKO_PUBLIC_KEY, @@ -422,13 +424,13 @@ class TestKeys(TestCase): def assertRawRSASignatureMatches(self, handle, plain, sig): pubkey = self._extract_rsa_public_key(handle) result = pubkey.encrypt(sig, 0)[0] - prefix = "\x00\x01" if False else "\x01" # XXX - expect = prefix + "\xff" * (len(result) - len(plain) - len(prefix) - 1) + "\x00" + plain + prefix = b"\x00\x01" if False else b"\x01" # XXX + expect = prefix + b"\xff" * (len(result) - len(plain) - len(prefix) - 1) + b"\x00" + plain self.assertEqual(result, expect) def test_gen_sign_verify_tralala_rsa_1024(self): "Generate/sign/verify with RSA-1024 (no hashing, message to be signed not a hash at all)" - tralala = "tralala-en-hopsasa" + tralala = b"tralala-en-hopsasa" public_key, private_key = p11.C_GenerateKeyPair( self.session, CKM_RSA_PKCS_KEY_PAIR_GEN, CKA_MODULUS_BITS = 1024, CKA_ID = "RSA-1024", CKA_SIGN = True, CKA_VERIFY = True, CKA_TOKEN = True) @@ -444,7 +446,7 @@ class TestKeys(TestCase): "Generate/sign/verify with RSA-3416 (no hashing, message to be signed not a hash at all)" if not args.all_tests: self.skipTest("Key length not a multiple of 32, so expected to fail (very slowly)") - tralala = "tralala-en-hopsasa" + tralala = b"tralala-en-hopsasa" public_key, private_key = p11.C_GenerateKeyPair( self.session, CKM_RSA_PKCS_KEY_PAIR_GEN, CKA_MODULUS_BITS = 3416, CKA_ID = "RSA-3416", CKA_SIGN = True, CKA_VERIFY = True, CKA_TOKEN = True) @@ -562,7 +564,7 @@ class TestKeys(TestCase): 0F 1F 86 AF 45 25 4D 8F E1 1F C9 EA B3 83 4A 41 17 C1 42 B7 43 AD 51 5E F5 A2 F8 E3 25 ''' - tbs = "".join(chr(int(i, 16)) for i in tbs.split()) + tbs = binascii.unhexlify("".join(tbs.split())) p11.C_SignInit(self.session, CKM_SHA256_RSA_PKCS, private_key) p11.C_SignUpdate(self.session, tbs) sig = p11.C_SignFinal(self.session) @@ -581,7 +583,7 @@ class TestKeys(TestCase): def _find_objects(self, chunk_size = 10, template = None, **kwargs): p11.C_FindObjectsInit(self.session, template, **kwargs) for handle in p11.C_FindObjects(self.session, chunk_size): - self.assertIsInstance(handle, (int, long)) + self.assertIsInstance(handle, (int, int)) p11.C_FindObjectsFinal(self.session) @unittest.skipUnless(pycrypto_loaded, "requires PyCrypto") -- cgit v1.2.3