"""
This is a Python interface to PKCS #11, using the ctypes module from
the Python standard library.
This is not (yet?) a complete implementation. It's intended primarily
to simplify testing of the underlying PKCS #11 shared library.
"""
from ctypes import *
from .exceptions import *
from .types import *
from .constants import *
from .attributes import *
from .prototypes import *
class PKCS11 (object):
"""
PKCS #11 API object, encapsulating the PKCS #11 library itself.
Sample usage:
from py11 import *
p11 = PKCS11()
p11.C_Initialize()
session = p11.C_OpenSession()
p11.C_login(session, CK_USER, "secret")
p11.C_FindObjectsInit(session, {CKA_CLASS: CKO_PRIVATE_KEY, CKA_KEY_TYPE: CKK_EC, CKA_ID: foo})
keys = list(p11.C_FindObjects(session))
p11.C_FindObjectsFinal(session)
if len(keys) != 1:
raise RuntimeError
p11.C_SignInit(session, CK_ECDSA_SHA256, keys[0])
sig = p11.Sign(session, "Your mother was a hamster")
p11.C_CloseAllSessions(slot)
p11.C_Finalize()
The full raw PKCS #11 API is available via the .so attribute, but
using this can be tricky, both because it requires strict adherence
to the C API and because one needs to be careful not to run afoul of
the Python garbage collector.
The example above uses a set of interface routines built on top of the
raw PKCS #11 API, which map the API into something a bit more Pythonic.
"""
def __init__(self, so_name = "libpkcs11.so"):
self.so_name = so_name
self.so = CDLL(so_name)
def raise_on_failure(rv):
if rv != CKR_OK:
raise CKR_Exception.ckr_map[rv]
for name, args in Prototypes.iteritems():
func = getattr(self.so, name)
func.restype = raise_on_failure
func.argtypes = args
self.adb = AttributeDB()
def __getattr__(self, name):
return getattr(self.so, name)
def C_GetFunctionList(self):
return self
@property
def version(self):
info = CK_INFO()
self.so.C_GetInfo(byref(info))
return info.cryptokiVersion
# Be very careful if you try to provide your own locking functions.
# For most purposes, if you really just want locking, you're best
# off specifying CKF_OS_LOCKING_OK and letting the C code deal with
# it. The one case where you might want to provide your own locking
# is when writing tests to verify behavior of the locking code.
#
# We have to stash references to the callback functions passed to
# C_Initialize() to avoid dumping core when the garbage collector
# deletes the function pointer instances out from under the C code.
def C_Initialize(self, flags = 0, create_mutex = None, destroy_mutex = None, lock_mutex = None, unlock_mutex = None):
if flags == 0 and create_mutex is None and destroy_mutex is None and lock_mutex is None and unlock_mutex is None:
self._C_Initialize_args = None
self.so.C_Initialize(None)
else:
create_mutex = CK_CREATEMUTEX() if create_mutex is None else CK_CREATEMUTEX(create_mutex)
destroy_mutex = CK_DESTROYMUTEX() if destroy_mutex is None else CK_DESTROYMUTEX(destroy_mutex)
lock_mutex = CK_LOCKMUTEX() if lock_mutex is None else CK_LOCKMUTEX(lock_mutex)
unlock_mutex = CK_UNLOCKMUTEX() if unlock_mutex is None else CK_UNLOCKMUTEX(unlock_mutex)
self._C_Initialize_args = CK_C_INITIALIZE_ARGS(create_mutex, destroy_mutex, lock_mutex, unlock_mutex, flags, None)
self.so.C_Initialize(cast(byref(self._C_Initialize_args), CK_VOID_PTR))
def C_Finalize(self):
self.so.C_Finalize(None)
self._C_Initialize_args = None
def C_GetSlotList(self):
count = CK_ULONG()
self.so.C_GetSlotList(CK_TRUE, None, byref(count))
slots = (CK_SLOT_ID * count.value)()
self.so.C_GetSlotList(CK_TRUE, slots, byref(count))
return tuple(slots[i] for i in xrange(count.value))
def C_GetTokenInfo(self, slot_id):
token_info = CK_TOKEN_INFO()
self.so.C_GetTokenInfo(slot_id, byref(token_info))
return token_info
def C_OpenSession(self, slot, flags = CKF_RW_SESSION, application = None, notify = CK_NOTIFY()):
flags |= CKF_SERIAL_SESSION
handle = CK_SESSION_HANDLE()
self.so.C_OpenSession(slot, flags, application, notify, byref(handle))
return handle.value
def C_GenerateRandom(self, session, n):
buffer = create_string_buffer(n)
self.so.C_GenerateRandom(session, buffer, sizeof(buffer))
return buffer.raw
def C_Login(self, session, user, pin):
self.so.C_Login(session, user, pin, len(pin))
def C_GetAttributeValue(self, session_handle, object_handle, *attributes):
if len(attributes) == 1 and isinstance(attributes[0], (list, tuple)):
attributes = attributes[0]
template = self.adb.getvalue_create_template(attributes)
self.so.C_GetAttributeValue(session_handle, object_handle, template, len(template))
self.adb.getvalue_allocate_template(template)
self.so.C_GetAttributeValue(session_handle, object_handle, template, len(template))
return self.adb.from_ctypes(template)
def C_FindObjectsInit(self, session, template = None, **kwargs):
if kwargs:
assert not template
template = kwargs
if template:
self.so.C_FindObjectsInit(session, self.adb.to_ctypes(template), len(template))
else:
self.so.C_FindObjectsInit(session, None, 0)
def C_FindObjects(self, session, chunk_size = 10):
objects = (CK_OBJECT_HANDLE * chunk_size)()
count = CK_ULONG(1)
while count.value > 0:
self.so.C_FindObjects(session, objects, len(objects), byref(count))
for i in xrange(count.value):
yield objects[i]
def FindObjects(self, session, template = None, **kwargs):
self.C_FindObjectsInit(session, template, **kwargs)
result = tuple(self.C_FindObjects(session))
self.C_FindObjectsFinal(session)
return result
def _parse_GenerateKeyPair_template(self,
# Attributes common to public and private templates
CKA_ID,
CKA_LABEL = None,
CKA_TOKEN = False,
# Attributes only in private template
CKA_SIGN = False,
CKA_DECRYPT = False,
CKA_UNWRAP = False,
CKA_SENSITIVE = True,
CKA_PRIVATE = True,
CKA_EXTRACTABLE = False,
# Finer-grained control for CKA_TOKEN
public_CKA_TOKEN = False,
private_CKA_TOKEN = False,
# Anything else is only in public template
**kwargs):
if CKA_LABEL is None:
CKA_LABEL = CKA_ID
return (dict(kwargs,
CKA_LABEL = CKA_LABEL,
CKA_ID = CKA_ID,
CKA_TOKEN = public_CKA_TOKEN or CKA_TOKEN),
dict(CKA_LABEL = CKA_LABEL,
CKA_ID = CKA_ID,
CKA_TOKEN = private_CKA_TOKEN or CKA_TOKEN,
CKA_SIGN = CKA_SIGN,
CKA_DECRYPT = CKA_DECRYPT,
CKA_UNWRAP = CKA_UNWRAP,
CKA_SENSITIVE = CKA_SENSITIVE,
CKA_PRIVATE = CKA_PRIVATE,
CKA_EXTRACTABLE = CKA_EXTRACTABLE))
def C_GenerateKeyPair(self, session, mechanism_type, public_template = None, private_template = None, **kwargs):
if kwargs:
assert not public_template and not private_template
public_template, private_template = self._parse_GenerateKeyPair_template(**kwargs)
public_template = self.adb.to_ctypes(public_template)
private_template = self.adb.to_ctypes(private_template)
mechanism = CK_MECHANISM(mechanism_type, None, 0)
public_handle = CK_OBJECT_HANDLE()
private_handle = CK_OBJECT_HANDLE()
self.so.C_GenerateKeyPair(session, byref(mechanism),
public_template, len(public_template),
private_template, len(private_template),
byref(public_handle), byref(private_handle))
return public_handle.value, private_handle.value
def C_SignInit(self, session, mechanism_type, private_key):
mechanism = CK_MECHANISM(mechanism_type, None, 0)
self.so.C_SignInit(session, byref(mechanism), private_key)
def C_Sign(self, session, data):
n = CK_ULONG()
self.so.C_Sign(session, data, len(data), None, byref(n))
sig = create_string_buffer(n.value)
self.so.C_Sign(session, data, len(data), sig, byref(n))
return sig.raw
def C_SignUpdate(self, session, data):
self.so.C_SignUpdate(session, data, len(data))
def C_SignFinal(self, session):
n = CK_ULONG()
self.so.C_SignFinal(session, None, byref(n))
sig = create_string_buffer(n.value)
self.so.C_SignFinal(session, sig, byref(n))
return sig.raw
def C_VerifyInit(self, session, mechanism_type, public_key):
mechanism = CK_MECHANISM(mechanism_type, None, 0)
self.so.C_VerifyInit(session, byref(mechanism), public_key)
def C_Verify(self, session, data, signature):
self.so.C_Verify(session, data, len(data), signature, len(signature))
def C_VerifyUpdate(self, session, data):
self.so.C_VerifyUpdate(session, data, len(data))
def C_VerifyFinal(self, session, signature):
self.so.C_VerifyFinal(session, signature, len(signature))
def C_CreateObject(self, session, template = None, **kwargs):
if kwargs:
assert not template
template = kwargs
template = self.adb.to_ctypes(template)
handle = CK_OBJECT_HANDLE()
self.so.C_CreateObject(session, template, len(template), byref(handle))
return handle.value
def C_DigestInit(self, session, mechanism_type):
mechanism = CK_MECHANISM(mechanism_type, None, 0)
self.so.C_DigestInit(session, byref(mechanism))
def C_Digest(self, session, data):
n = CK_ULONG()
self.so.C_Digest(session, data, len(data), None, byref(n))
hash = create_string_buffer(n.value)
self.so.C_Digest(session, data, len(data), hash, byref(n))
return hash.raw
def C_DigestUpdate(self, session, data):
self.so.C_DigestUpdate(session, data, len(data))
def C_DigestFinal(self, session):
n = CK_ULONG()
self.so.C_DigestFinal(session, None, byref(n))
hash = create_string_buffer(n.value)
self.so.C_DigestFinal(session, hash, byref(n))
return hash.raw
__all__ = ["PKCS11"]
__all__.extend(name for name in globals()
if name.startswith("CK")
or name.startswith("CRYPTOKI_"))