aboutsummaryrefslogblamecommitdiff
path: root/cryptech/py11/__init__.py
blob: 0c794ff3279c3c5469d963b4095b4fb8bdac5ec2 (plain) (tree)
1
2
3
4
5
6
7
8






                                                                      
 



                         
                         
 








                                                               

                      










































































































































































































































































                                                                                                                         
 



                                               
"""
This is a Python interface to PKCS #11, using the ctypes module from
the Python standard library.

This is not (yet?) a complete implementation.  It's intended primarily
to simplify testing of the underlying PKCS #11 shared library.
"""

from ctypes      import *
from .exceptions import *
from .types      import *
from .constants  import *
from .attributes import *

import platform

if platform.system() == "Darwin":
    default_so_name = "/usr/local/lib/libcryptech-pkcs11.dylib"
elif platform.system() == "Linux":
    default_so_name = "/usr/lib/libcryptech-pkcs11.so"
else:
    default_so_name = "/usr/local/lib/libcryptech-pkcs11.so"


class PKCS11 (object):
    """
    PKCS #11 API object, encapsulating the PKCS #11 library itself.
    Sample usage:

      from cryptech.py11 import *

      p11 = PKCS11()
      p11.C_Initialize()
      session = p11.C_OpenSession()
      p11.C_login(session, CK_USER, "secret")
      p11.C_FindObjectsInit(session, {CKA_CLASS: CKO_PRIVATE_KEY, CKA_KEY_TYPE: CKK_EC, CKA_ID: foo})
      keys = list(p11.C_FindObjects(session))
      p11.C_FindObjectsFinal(session)
      if len(keys) != 1:
        raise RuntimeError
      p11.C_SignInit(session, CK_ECDSA_SHA256, keys[0])
      sig = p11.Sign(session, "Your mother was a hamster")
      p11.C_CloseAllSessions(slot)
      p11.C_Finalize()

    The full raw PKCS #11 API is available via the .so attribute, but
    using this can be tricky, both because it requires strict adherence
    to the C API and because one needs to be careful not to run afoul of
    the Python garbage collector.

    The example above uses a set of interface routines built on top of the
    raw PKCS #11 API, which map the API into something a bit more Pythonic.
    """

    # Whether to use C_GetFunctionList() instead of dynamic link symbols.
    use_C_GetFunctionList = False

    def __init__(self, so_name = default_so_name):
        self.so_name = so_name
        self.so = CDLL(so_name)
        self.d = type("Dispatch", (object,), {})()
        for name, args in Prototypes:
            try:
                func = getattr(self.so, name)
            except AttributeError:
                self.use_C_GetFunctionList = True
            else:
                func.restype  = CK_RV
                func.argtypes = args
                func.errcheck = CKR_Exception.raise_on_failure
                setattr(self.d, name, func)
        if self.use_C_GetFunctionList:
            functions = CK_FUNCTION_LIST_PTR()
            self.so.C_GetFunctionList(byref(functions))
            for name, args in Prototypes:
                func = getattr(functions.contents, name)
                func.errcheck = CKR_Exception.raise_on_failure
                setattr(self.d, name, func)
        self.adb = AttributeDB()

    def __getattr__(self, name):
        try:
            return getattr(self.d, name)
        except AttributeError:
            return getattr(self.so, name)

    def C_GetFunctionList(self):
        return self

    @property
    def version(self):
        info = CK_INFO()
        self.d.C_GetInfo(byref(info))
        return info.cryptokiVersion

    # Be very careful if you try to provide your own locking functions.
    # For most purposes, if you really just want locking, you're best
    # off specifying CKF_OS_LOCKING_OK and letting the C code deal with
    # it.  The one case where you might want to provide your own locking
    # is when writing tests to verify behavior of the locking code.
    #
    # We have to stash references to the callback functions passed to
    # C_Initialize() to avoid dumping core when the garbage collector
    # deletes the function pointer instances out from under the C code.

    def C_Initialize(self, flags = 0, create_mutex = None, destroy_mutex = None, lock_mutex = None, unlock_mutex = None):
        if flags == 0 and create_mutex is None and destroy_mutex is None and lock_mutex is None and unlock_mutex is None:
            self._C_Initialize_args = None
            self.d.C_Initialize(None)
        else:
            create_mutex  = CK_CREATEMUTEX()  if create_mutex  is None else CK_CREATEMUTEX(create_mutex)
            destroy_mutex = CK_DESTROYMUTEX() if destroy_mutex is None else CK_DESTROYMUTEX(destroy_mutex)
            lock_mutex    = CK_LOCKMUTEX()    if lock_mutex    is None else CK_LOCKMUTEX(lock_mutex)
            unlock_mutex  = CK_UNLOCKMUTEX()  if unlock_mutex  is None else CK_UNLOCKMUTEX(unlock_mutex)
            self._C_Initialize_args = CK_C_INITIALIZE_ARGS(create_mutex, destroy_mutex,
                                                           lock_mutex, unlock_mutex, flags, None)
            self.d.C_Initialize(cast(byref(self._C_Initialize_args), CK_VOID_PTR))

    def C_Finalize(self):
        self.d.C_Finalize(None)
        self._C_Initialize_args = None

    def C_GetSlotList(self):
        count = CK_ULONG()
        self.d.C_GetSlotList(CK_TRUE, None, byref(count))
        slots = (CK_SLOT_ID * count.value)()
        self.d.C_GetSlotList(CK_TRUE, slots, byref(count))
        return tuple(slots[i] for i in range(count.value))

    def C_GetTokenInfo(self, slot_id):
        token_info = CK_TOKEN_INFO()
        self.d.C_GetTokenInfo(slot_id, byref(token_info))
        return token_info

    def C_OpenSession(self, slot, flags = CKF_RW_SESSION, application = None, notify = CK_NOTIFY()):
        flags |= CKF_SERIAL_SESSION
        handle = CK_SESSION_HANDLE()
        self.d.C_OpenSession(slot, flags, application, notify, byref(handle))
        return handle.value

    def C_GenerateRandom(self, session, n):
        buffer = create_string_buffer(n)
        self.d.C_GenerateRandom(session, buffer, sizeof(buffer))
        return buffer.raw

    def C_Login(self, session, user, pin):
        pin = pin.encode()
        self.d.C_Login(session, user, pin, len(pin))

    def C_GetAttributeValue(self, session_handle, object_handle, *attributes):
        if len(attributes) == 1 and isinstance(attributes[0], (list, tuple)):
            attributes = attributes[0]
        template = self.adb.getvalue_create_template(attributes)
        self.d.C_GetAttributeValue(session_handle, object_handle, template, len(template))
        self.adb.getvalue_allocate_template(template)
        self.d.C_GetAttributeValue(session_handle, object_handle, template, len(template))
        return self.adb.from_ctypes(template)

    def C_FindObjectsInit(self, session, template = None, **kwargs):
        if kwargs:
            assert not template
            template = kwargs
        if template:
            self.d.C_FindObjectsInit(session, self.adb.to_ctypes(template), len(template))
        else:
            self.d.C_FindObjectsInit(session, None, 0)

    def C_FindObjects(self, session, chunk_size = 10):
        objects = (CK_OBJECT_HANDLE * chunk_size)()
        count   = CK_ULONG(1)
        while count.value > 0:
            self.d.C_FindObjects(session, objects, len(objects), byref(count))
            for i in range(count.value):
                yield objects[i]

    def FindObjects(self, session, template = None, **kwargs):
        self.C_FindObjectsInit(session, template, **kwargs)
        result = tuple(self.C_FindObjects(session))
        self.C_FindObjectsFinal(session)
        return result

    def _parse_GenerateKeyPair_template(self,
                                        # Attributes common to public and private templates
                                        CKA_ID,
                                        CKA_LABEL         = None,
                                        CKA_TOKEN         = False,
                                        # Attributes only in private template
                                        CKA_SIGN          = False,
                                        CKA_DECRYPT       = False,
                                        CKA_UNWRAP        = False,
                                        CKA_SENSITIVE     = True,
                                        CKA_PRIVATE       = True,
                                        CKA_EXTRACTABLE   = False,
                                        # Finer-grained control for CKA_TOKEN
                                        public_CKA_TOKEN  = False,
                                        private_CKA_TOKEN = False,
                                        # Anything else is only in public template
                                        **kwargs):
        if CKA_LABEL is None:
            CKA_LABEL = CKA_ID
        return (dict(kwargs,
                     CKA_LABEL      = CKA_LABEL,
                     CKA_ID         = CKA_ID,
                     CKA_TOKEN      = public_CKA_TOKEN or CKA_TOKEN),
                dict(CKA_LABEL      = CKA_LABEL,
                     CKA_ID         = CKA_ID,
                     CKA_TOKEN      = private_CKA_TOKEN or CKA_TOKEN,
                     CKA_SIGN       = CKA_SIGN,
                     CKA_DECRYPT    = CKA_DECRYPT,
                     CKA_UNWRAP     = CKA_UNWRAP,
                     CKA_SENSITIVE  = CKA_SENSITIVE,
                     CKA_PRIVATE    = CKA_PRIVATE,
                     CKA_EXTRACTABLE = CKA_EXTRACTABLE))

    def C_GenerateKeyPair(self, session, mechanism_type, public_template = None, private_template = None, **kwargs):
        if kwargs:
            assert not public_template and not private_template
            public_template, private_template = self._parse_GenerateKeyPair_template(**kwargs)
        public_template  = self.adb.to_ctypes(public_template)
        private_template = self.adb.to_ctypes(private_template)
        mechanism = CK_MECHANISM(mechanism_type, None, 0)
        public_handle  = CK_OBJECT_HANDLE()
        private_handle = CK_OBJECT_HANDLE()
        self.d.C_GenerateKeyPair(session, byref(mechanism),
                                 public_template,  len(public_template),
                                 private_template, len(private_template),
                                 byref(public_handle), byref(private_handle))
        return public_handle.value, private_handle.value

    def C_SignInit(self, session, mechanism_type, private_key):
        mechanism = CK_MECHANISM(mechanism_type, None, 0)
        self.d.C_SignInit(session, byref(mechanism), private_key)

    def C_Sign(self, session, data):
        n = CK_ULONG()
        self.d.C_Sign(session, data, len(data), None, byref(n))
        sig = create_string_buffer(n.value)
        self.d.C_Sign(session, data, len(data), sig, byref(n))
        return sig.raw

    def C_SignUpdate(self, session, data):
        self.d.C_SignUpdate(session, data, len(data))

    def C_SignFinal(self, session):
        n = CK_ULONG()
        self.d.C_SignFinal(session, None, byref(n))
        sig = create_string_buffer(n.value)
        self.d.C_SignFinal(session, sig, byref(n))
        return sig.raw

    def C_VerifyInit(self, session, mechanism_type, public_key):
        mechanism = CK_MECHANISM(mechanism_type, None, 0)
        self.d.C_VerifyInit(session, byref(mechanism), public_key)

    def C_Verify(self, session, data, signature):
        self.d.C_Verify(session, data, len(data), signature, len(signature))

    def C_VerifyUpdate(self, session, data):
        self.d.C_VerifyUpdate(session, data, len(data))

    def C_VerifyFinal(self, session, signature):
        self.d.C_VerifyFinal(session, signature, len(signature))

    def C_CreateObject(self, session, template = None, **kwargs):
        if kwargs:
            assert not template
            template = kwargs
        template = self.adb.to_ctypes(template)
        handle = CK_OBJECT_HANDLE()
        self.d.C_CreateObject(session, template, len(template), byref(handle))
        return handle.value

    def C_DigestInit(self, session, mechanism_type):
        mechanism = CK_MECHANISM(mechanism_type, None, 0)
        self.d.C_DigestInit(session, byref(mechanism))

    def C_Digest(self, session, data):
        n = CK_ULONG()
        self.d.C_Digest(session, data, len(data), None, byref(n))
        hash = create_string_buffer(n.value)
        self.d.C_Digest(session, data, len(data), hash, byref(n))
        return hash.raw

    def C_DigestUpdate(self, session, data):
        self.d.C_DigestUpdate(session, data, len(data))

    def C_DigestFinal(self, session):
        n = CK_ULONG()
        self.d.C_DigestFinal(session, None, byref(n))
        hash = create_string_buffer(n.value)
        self.d.C_DigestFinal(session, hash, byref(n))
        return hash.raw

__all__ = ["PKCS11"]
__all__.extend(name for name in globals()
               if name.startswith("CK")
               or name.startswith("CRYPTOKI_"))