diff options
Diffstat (limited to 'cryptech/py11/__init__.py')
-rw-r--r-- | cryptech/py11/__init__.py | 533 |
1 files changed, 267 insertions, 266 deletions
diff --git a/cryptech/py11/__init__.py b/cryptech/py11/__init__.py index 740ecb6..0c794ff 100644 --- a/cryptech/py11/__init__.py +++ b/cryptech/py11/__init__.py @@ -23,272 +23,273 @@ else: class PKCS11 (object): - """ - PKCS #11 API object, encapsulating the PKCS #11 library itself. - Sample usage: - - from cryptech.py11 import * - - p11 = PKCS11() - p11.C_Initialize() - session = p11.C_OpenSession() - p11.C_login(session, CK_USER, "secret") - p11.C_FindObjectsInit(session, {CKA_CLASS: CKO_PRIVATE_KEY, CKA_KEY_TYPE: CKK_EC, CKA_ID: foo}) - keys = list(p11.C_FindObjects(session)) - p11.C_FindObjectsFinal(session) - if len(keys) != 1: - raise RuntimeError - p11.C_SignInit(session, CK_ECDSA_SHA256, keys[0]) - sig = p11.Sign(session, "Your mother was a hamster") - p11.C_CloseAllSessions(slot) - p11.C_Finalize() - - The full raw PKCS #11 API is available via the .so attribute, but - using this can be tricky, both because it requires strict adherence - to the C API and because one needs to be careful not to run afoul of - the Python garbage collector. - - The example above uses a set of interface routines built on top of the - raw PKCS #11 API, which map the API into something a bit more Pythonic. - """ - - # Whether to use C_GetFunctionList() instead of dynamic link symbols. - use_C_GetFunctionList = False - - def __init__(self, so_name = default_so_name): - self.so_name = so_name - self.so = CDLL(so_name) - self.d = type("Dispatch", (object,), {})() - for name, args in Prototypes: - try: - func = getattr(self.so, name) - except AttributeError: - self.use_C_GetFunctionList = True - else: - func.restype = CK_RV - func.argtypes = args - func.errcheck = CKR_Exception.raise_on_failure - setattr(self.d, name, func) - if self.use_C_GetFunctionList: - functions = CK_FUNCTION_LIST_PTR() - self.so.C_GetFunctionList(byref(functions)) - for name, args in Prototypes: - func = getattr(functions.contents, name) - func.errcheck = CKR_Exception.raise_on_failure - setattr(self.d, name, func) - self.adb = AttributeDB() - - def __getattr__(self, name): - try: - return getattr(self.d, name) - except AttributeError: - return getattr(self.so, name) - - def C_GetFunctionList(self): - return self - - @property - def version(self): - info = CK_INFO() - self.d.C_GetInfo(byref(info)) - return info.cryptokiVersion - - # Be very careful if you try to provide your own locking functions. - # For most purposes, if you really just want locking, you're best - # off specifying CKF_OS_LOCKING_OK and letting the C code deal with - # it. The one case where you might want to provide your own locking - # is when writing tests to verify behavior of the locking code. - # - # We have to stash references to the callback functions passed to - # C_Initialize() to avoid dumping core when the garbage collector - # deletes the function pointer instances out from under the C code. - - def C_Initialize(self, flags = 0, create_mutex = None, destroy_mutex = None, lock_mutex = None, unlock_mutex = None): - if flags == 0 and create_mutex is None and destroy_mutex is None and lock_mutex is None and unlock_mutex is None: - self._C_Initialize_args = None - self.d.C_Initialize(None) - else: - create_mutex = CK_CREATEMUTEX() if create_mutex is None else CK_CREATEMUTEX(create_mutex) - destroy_mutex = CK_DESTROYMUTEX() if destroy_mutex is None else CK_DESTROYMUTEX(destroy_mutex) - lock_mutex = CK_LOCKMUTEX() if lock_mutex is None else CK_LOCKMUTEX(lock_mutex) - unlock_mutex = CK_UNLOCKMUTEX() if unlock_mutex is None else CK_UNLOCKMUTEX(unlock_mutex) - self._C_Initialize_args = CK_C_INITIALIZE_ARGS(create_mutex, destroy_mutex, - lock_mutex, unlock_mutex, flags, None) - self.d.C_Initialize(cast(byref(self._C_Initialize_args), CK_VOID_PTR)) - - def C_Finalize(self): - self.d.C_Finalize(None) - self._C_Initialize_args = None - - def C_GetSlotList(self): - count = CK_ULONG() - self.d.C_GetSlotList(CK_TRUE, None, byref(count)) - slots = (CK_SLOT_ID * count.value)() - self.d.C_GetSlotList(CK_TRUE, slots, byref(count)) - return tuple(slots[i] for i in range(count.value)) - - def C_GetTokenInfo(self, slot_id): - token_info = CK_TOKEN_INFO() - self.d.C_GetTokenInfo(slot_id, byref(token_info)) - return token_info - - def C_OpenSession(self, slot, flags = CKF_RW_SESSION, application = None, notify = CK_NOTIFY()): - flags |= CKF_SERIAL_SESSION - handle = CK_SESSION_HANDLE() - self.d.C_OpenSession(slot, flags, application, notify, byref(handle)) - return handle.value - - def C_GenerateRandom(self, session, n): - buffer = create_string_buffer(n) - self.d.C_GenerateRandom(session, buffer, sizeof(buffer)) - return buffer.raw - - def C_Login(self, session, user, pin): - self.d.C_Login(session, user, pin, len(pin)) - - def C_GetAttributeValue(self, session_handle, object_handle, *attributes): - if len(attributes) == 1 and isinstance(attributes[0], (list, tuple)): - attributes = attributes[0] - template = self.adb.getvalue_create_template(attributes) - self.d.C_GetAttributeValue(session_handle, object_handle, template, len(template)) - self.adb.getvalue_allocate_template(template) - self.d.C_GetAttributeValue(session_handle, object_handle, template, len(template)) - return self.adb.from_ctypes(template) - - def C_FindObjectsInit(self, session, template = None, **kwargs): - if kwargs: - assert not template - template = kwargs - if template: - self.d.C_FindObjectsInit(session, self.adb.to_ctypes(template), len(template)) - else: - self.d.C_FindObjectsInit(session, None, 0) - - def C_FindObjects(self, session, chunk_size = 10): - objects = (CK_OBJECT_HANDLE * chunk_size)() - count = CK_ULONG(1) - while count.value > 0: - self.d.C_FindObjects(session, objects, len(objects), byref(count)) - for i in range(count.value): - yield objects[i] - - def FindObjects(self, session, template = None, **kwargs): - self.C_FindObjectsInit(session, template, **kwargs) - result = tuple(self.C_FindObjects(session)) - self.C_FindObjectsFinal(session) - return result - - def _parse_GenerateKeyPair_template(self, - # Attributes common to public and private templates - CKA_ID, - CKA_LABEL = None, - CKA_TOKEN = False, - # Attributes only in private template - CKA_SIGN = False, - CKA_DECRYPT = False, - CKA_UNWRAP = False, - CKA_SENSITIVE = True, - CKA_PRIVATE = True, - CKA_EXTRACTABLE = False, - # Finer-grained control for CKA_TOKEN - public_CKA_TOKEN = False, - private_CKA_TOKEN = False, - # Anything else is only in public template - **kwargs): - if CKA_LABEL is None: - CKA_LABEL = CKA_ID - return (dict(kwargs, - CKA_LABEL = CKA_LABEL, - CKA_ID = CKA_ID, - CKA_TOKEN = public_CKA_TOKEN or CKA_TOKEN), - dict(CKA_LABEL = CKA_LABEL, - CKA_ID = CKA_ID, - CKA_TOKEN = private_CKA_TOKEN or CKA_TOKEN, - CKA_SIGN = CKA_SIGN, - CKA_DECRYPT = CKA_DECRYPT, - CKA_UNWRAP = CKA_UNWRAP, - CKA_SENSITIVE = CKA_SENSITIVE, - CKA_PRIVATE = CKA_PRIVATE, - CKA_EXTRACTABLE = CKA_EXTRACTABLE)) - - def C_GenerateKeyPair(self, session, mechanism_type, public_template = None, private_template = None, **kwargs): - if kwargs: - assert not public_template and not private_template - public_template, private_template = self._parse_GenerateKeyPair_template(**kwargs) - public_template = self.adb.to_ctypes(public_template) - private_template = self.adb.to_ctypes(private_template) - mechanism = CK_MECHANISM(mechanism_type, None, 0) - public_handle = CK_OBJECT_HANDLE() - private_handle = CK_OBJECT_HANDLE() - self.d.C_GenerateKeyPair(session, byref(mechanism), - public_template, len(public_template), - private_template, len(private_template), - byref(public_handle), byref(private_handle)) - return public_handle.value, private_handle.value - - def C_SignInit(self, session, mechanism_type, private_key): - mechanism = CK_MECHANISM(mechanism_type, None, 0) - self.d.C_SignInit(session, byref(mechanism), private_key) - - def C_Sign(self, session, data): - n = CK_ULONG() - self.d.C_Sign(session, data, len(data), None, byref(n)) - sig = create_string_buffer(n.value) - self.d.C_Sign(session, data, len(data), sig, byref(n)) - return sig.raw - - def C_SignUpdate(self, session, data): - self.d.C_SignUpdate(session, data, len(data)) - - def C_SignFinal(self, session): - n = CK_ULONG() - self.d.C_SignFinal(session, None, byref(n)) - sig = create_string_buffer(n.value) - self.d.C_SignFinal(session, sig, byref(n)) - return sig.raw - - def C_VerifyInit(self, session, mechanism_type, public_key): - mechanism = CK_MECHANISM(mechanism_type, None, 0) - self.d.C_VerifyInit(session, byref(mechanism), public_key) - - def C_Verify(self, session, data, signature): - self.d.C_Verify(session, data, len(data), signature, len(signature)) - - def C_VerifyUpdate(self, session, data): - self.d.C_VerifyUpdate(session, data, len(data)) - - def C_VerifyFinal(self, session, signature): - self.d.C_VerifyFinal(session, signature, len(signature)) - - def C_CreateObject(self, session, template = None, **kwargs): - if kwargs: - assert not template - template = kwargs - template = self.adb.to_ctypes(template) - handle = CK_OBJECT_HANDLE() - self.d.C_CreateObject(session, template, len(template), byref(handle)) - return handle.value - - def C_DigestInit(self, session, mechanism_type): - mechanism = CK_MECHANISM(mechanism_type, None, 0) - self.d.C_DigestInit(session, byref(mechanism)) - - def C_Digest(self, session, data): - n = CK_ULONG() - self.d.C_Digest(session, data, len(data), None, byref(n)) - hash = create_string_buffer(n.value) - self.d.C_Digest(session, data, len(data), hash, byref(n)) - return hash.raw - - def C_DigestUpdate(self, session, data): - self.d.C_DigestUpdate(session, data, len(data)) - - def C_DigestFinal(self, session): - n = CK_ULONG() - self.d.C_DigestFinal(session, None, byref(n)) - hash = create_string_buffer(n.value) - self.d.C_DigestFinal(session, hash, byref(n)) - return hash.raw + """ + PKCS #11 API object, encapsulating the PKCS #11 library itself. + Sample usage: + + from cryptech.py11 import * + + p11 = PKCS11() + p11.C_Initialize() + session = p11.C_OpenSession() + p11.C_login(session, CK_USER, "secret") + p11.C_FindObjectsInit(session, {CKA_CLASS: CKO_PRIVATE_KEY, CKA_KEY_TYPE: CKK_EC, CKA_ID: foo}) + keys = list(p11.C_FindObjects(session)) + p11.C_FindObjectsFinal(session) + if len(keys) != 1: + raise RuntimeError + p11.C_SignInit(session, CK_ECDSA_SHA256, keys[0]) + sig = p11.Sign(session, "Your mother was a hamster") + p11.C_CloseAllSessions(slot) + p11.C_Finalize() + + The full raw PKCS #11 API is available via the .so attribute, but + using this can be tricky, both because it requires strict adherence + to the C API and because one needs to be careful not to run afoul of + the Python garbage collector. + + The example above uses a set of interface routines built on top of the + raw PKCS #11 API, which map the API into something a bit more Pythonic. + """ + + # Whether to use C_GetFunctionList() instead of dynamic link symbols. + use_C_GetFunctionList = False + + def __init__(self, so_name = default_so_name): + self.so_name = so_name + self.so = CDLL(so_name) + self.d = type("Dispatch", (object,), {})() + for name, args in Prototypes: + try: + func = getattr(self.so, name) + except AttributeError: + self.use_C_GetFunctionList = True + else: + func.restype = CK_RV + func.argtypes = args + func.errcheck = CKR_Exception.raise_on_failure + setattr(self.d, name, func) + if self.use_C_GetFunctionList: + functions = CK_FUNCTION_LIST_PTR() + self.so.C_GetFunctionList(byref(functions)) + for name, args in Prototypes: + func = getattr(functions.contents, name) + func.errcheck = CKR_Exception.raise_on_failure + setattr(self.d, name, func) + self.adb = AttributeDB() + + def __getattr__(self, name): + try: + return getattr(self.d, name) + except AttributeError: + return getattr(self.so, name) + + def C_GetFunctionList(self): + return self + + @property + def version(self): + info = CK_INFO() + self.d.C_GetInfo(byref(info)) + return info.cryptokiVersion + + # Be very careful if you try to provide your own locking functions. + # For most purposes, if you really just want locking, you're best + # off specifying CKF_OS_LOCKING_OK and letting the C code deal with + # it. The one case where you might want to provide your own locking + # is when writing tests to verify behavior of the locking code. + # + # We have to stash references to the callback functions passed to + # C_Initialize() to avoid dumping core when the garbage collector + # deletes the function pointer instances out from under the C code. + + def C_Initialize(self, flags = 0, create_mutex = None, destroy_mutex = None, lock_mutex = None, unlock_mutex = None): + if flags == 0 and create_mutex is None and destroy_mutex is None and lock_mutex is None and unlock_mutex is None: + self._C_Initialize_args = None + self.d.C_Initialize(None) + else: + create_mutex = CK_CREATEMUTEX() if create_mutex is None else CK_CREATEMUTEX(create_mutex) + destroy_mutex = CK_DESTROYMUTEX() if destroy_mutex is None else CK_DESTROYMUTEX(destroy_mutex) + lock_mutex = CK_LOCKMUTEX() if lock_mutex is None else CK_LOCKMUTEX(lock_mutex) + unlock_mutex = CK_UNLOCKMUTEX() if unlock_mutex is None else CK_UNLOCKMUTEX(unlock_mutex) + self._C_Initialize_args = CK_C_INITIALIZE_ARGS(create_mutex, destroy_mutex, + lock_mutex, unlock_mutex, flags, None) + self.d.C_Initialize(cast(byref(self._C_Initialize_args), CK_VOID_PTR)) + + def C_Finalize(self): + self.d.C_Finalize(None) + self._C_Initialize_args = None + + def C_GetSlotList(self): + count = CK_ULONG() + self.d.C_GetSlotList(CK_TRUE, None, byref(count)) + slots = (CK_SLOT_ID * count.value)() + self.d.C_GetSlotList(CK_TRUE, slots, byref(count)) + return tuple(slots[i] for i in range(count.value)) + + def C_GetTokenInfo(self, slot_id): + token_info = CK_TOKEN_INFO() + self.d.C_GetTokenInfo(slot_id, byref(token_info)) + return token_info + + def C_OpenSession(self, slot, flags = CKF_RW_SESSION, application = None, notify = CK_NOTIFY()): + flags |= CKF_SERIAL_SESSION + handle = CK_SESSION_HANDLE() + self.d.C_OpenSession(slot, flags, application, notify, byref(handle)) + return handle.value + + def C_GenerateRandom(self, session, n): + buffer = create_string_buffer(n) + self.d.C_GenerateRandom(session, buffer, sizeof(buffer)) + return buffer.raw + + def C_Login(self, session, user, pin): + pin = pin.encode() + self.d.C_Login(session, user, pin, len(pin)) + + def C_GetAttributeValue(self, session_handle, object_handle, *attributes): + if len(attributes) == 1 and isinstance(attributes[0], (list, tuple)): + attributes = attributes[0] + template = self.adb.getvalue_create_template(attributes) + self.d.C_GetAttributeValue(session_handle, object_handle, template, len(template)) + self.adb.getvalue_allocate_template(template) + self.d.C_GetAttributeValue(session_handle, object_handle, template, len(template)) + return self.adb.from_ctypes(template) + + def C_FindObjectsInit(self, session, template = None, **kwargs): + if kwargs: + assert not template + template = kwargs + if template: + self.d.C_FindObjectsInit(session, self.adb.to_ctypes(template), len(template)) + else: + self.d.C_FindObjectsInit(session, None, 0) + + def C_FindObjects(self, session, chunk_size = 10): + objects = (CK_OBJECT_HANDLE * chunk_size)() + count = CK_ULONG(1) + while count.value > 0: + self.d.C_FindObjects(session, objects, len(objects), byref(count)) + for i in range(count.value): + yield objects[i] + + def FindObjects(self, session, template = None, **kwargs): + self.C_FindObjectsInit(session, template, **kwargs) + result = tuple(self.C_FindObjects(session)) + self.C_FindObjectsFinal(session) + return result + + def _parse_GenerateKeyPair_template(self, + # Attributes common to public and private templates + CKA_ID, + CKA_LABEL = None, + CKA_TOKEN = False, + # Attributes only in private template + CKA_SIGN = False, + CKA_DECRYPT = False, + CKA_UNWRAP = False, + CKA_SENSITIVE = True, + CKA_PRIVATE = True, + CKA_EXTRACTABLE = False, + # Finer-grained control for CKA_TOKEN + public_CKA_TOKEN = False, + private_CKA_TOKEN = False, + # Anything else is only in public template + **kwargs): + if CKA_LABEL is None: + CKA_LABEL = CKA_ID + return (dict(kwargs, + CKA_LABEL = CKA_LABEL, + CKA_ID = CKA_ID, + CKA_TOKEN = public_CKA_TOKEN or CKA_TOKEN), + dict(CKA_LABEL = CKA_LABEL, + CKA_ID = CKA_ID, + CKA_TOKEN = private_CKA_TOKEN or CKA_TOKEN, + CKA_SIGN = CKA_SIGN, + CKA_DECRYPT = CKA_DECRYPT, + CKA_UNWRAP = CKA_UNWRAP, + CKA_SENSITIVE = CKA_SENSITIVE, + CKA_PRIVATE = CKA_PRIVATE, + CKA_EXTRACTABLE = CKA_EXTRACTABLE)) + + def C_GenerateKeyPair(self, session, mechanism_type, public_template = None, private_template = None, **kwargs): + if kwargs: + assert not public_template and not private_template + public_template, private_template = self._parse_GenerateKeyPair_template(**kwargs) + public_template = self.adb.to_ctypes(public_template) + private_template = self.adb.to_ctypes(private_template) + mechanism = CK_MECHANISM(mechanism_type, None, 0) + public_handle = CK_OBJECT_HANDLE() + private_handle = CK_OBJECT_HANDLE() + self.d.C_GenerateKeyPair(session, byref(mechanism), + public_template, len(public_template), + private_template, len(private_template), + byref(public_handle), byref(private_handle)) + return public_handle.value, private_handle.value + + def C_SignInit(self, session, mechanism_type, private_key): + mechanism = CK_MECHANISM(mechanism_type, None, 0) + self.d.C_SignInit(session, byref(mechanism), private_key) + + def C_Sign(self, session, data): + n = CK_ULONG() + self.d.C_Sign(session, data, len(data), None, byref(n)) + sig = create_string_buffer(n.value) + self.d.C_Sign(session, data, len(data), sig, byref(n)) + return sig.raw + + def C_SignUpdate(self, session, data): + self.d.C_SignUpdate(session, data, len(data)) + + def C_SignFinal(self, session): + n = CK_ULONG() + self.d.C_SignFinal(session, None, byref(n)) + sig = create_string_buffer(n.value) + self.d.C_SignFinal(session, sig, byref(n)) + return sig.raw + + def C_VerifyInit(self, session, mechanism_type, public_key): + mechanism = CK_MECHANISM(mechanism_type, None, 0) + self.d.C_VerifyInit(session, byref(mechanism), public_key) + + def C_Verify(self, session, data, signature): + self.d.C_Verify(session, data, len(data), signature, len(signature)) + + def C_VerifyUpdate(self, session, data): + self.d.C_VerifyUpdate(session, data, len(data)) + + def C_VerifyFinal(self, session, signature): + self.d.C_VerifyFinal(session, signature, len(signature)) + + def C_CreateObject(self, session, template = None, **kwargs): + if kwargs: + assert not template + template = kwargs + template = self.adb.to_ctypes(template) + handle = CK_OBJECT_HANDLE() + self.d.C_CreateObject(session, template, len(template), byref(handle)) + return handle.value + + def C_DigestInit(self, session, mechanism_type): + mechanism = CK_MECHANISM(mechanism_type, None, 0) + self.d.C_DigestInit(session, byref(mechanism)) + + def C_Digest(self, session, data): + n = CK_ULONG() + self.d.C_Digest(session, data, len(data), None, byref(n)) + hash = create_string_buffer(n.value) + self.d.C_Digest(session, data, len(data), hash, byref(n)) + return hash.raw + + def C_DigestUpdate(self, session, data): + self.d.C_DigestUpdate(session, data, len(data)) + + def C_DigestFinal(self, session): + n = CK_ULONG() + self.d.C_DigestFinal(session, None, byref(n)) + hash = create_string_buffer(n.value) + self.d.C_DigestFinal(session, hash, byref(n)) + return hash.raw __all__ = ["PKCS11"] __all__.extend(name for name in globals() |