path: root/py11/__init__.py
blob: bb4813f44dbfafe27e28036baef95f81310c746b (plain) (tree)

























# An attempt at a Python interface to PKCS 11 using the scary ctypes
# module from the Python standard library.

from ctypes      import *
from .exceptions import *
from .types      import *
from .constants  import *
from .attributes import *
from .prototypes import *

class PKCS11 (object):

  def __init__(self, so_name = "libpkcs11.so"):
    self.so_name = so_name
    self.so = CDLL(so_name)
    def raise_on_failure(rv):
      if rv != CKR_OK:
        raise CKR_Exception.ckr_map[rv]
    for name, args in Prototypes.iteritems():
      func = getattr(self.so, name)
      func.restype = raise_on_failure
      func.argtypes = args
    self.adb = AttributeDB()

  def __getattr__(self, name):
    return getattr(self.so, name)

  def C_GetFunctionList(self):
    return self

  def version(self):
    info = CK_INFO()
    return info.cryptokiVersion

  # http://stackoverflow.com/questions/15786635/using-ctypes-to-write-callbacks-that-pass-in-pointer-to-pointer-parameters-to-be
  # suggests that it should be straightforward to write callback
  # functions to implement the mutex semantics, and we get the
  # CK_CREATEMUTEX call, but we dump core on the CK_LOCKMUTEX call.
  # Specifying CKF_OS_LOCKING_OK does work, and is sufficient for most
  # purposes, so leaving the callbacks out of the API for now.

  def C_Initialize(self, flags = 0):
    if flags == 0:
      init_arg = CK_C_INITIALIZE_ARGS(cast(None, CK_CREATEMUTEX), cast(None, CK_DESTROYMUTEX),
                                      cast(None, CK_LOCKMUTEX),   cast(None, CK_UNLOCKMUTEX),
                                      flags, None)
      self.so.C_Initialize(cast(byref(init_arg), CK_VOID_PTR))

  def C_Finalize(self):

  def C_GetSlotList(self):
    slots = (CK_SLOT_ID * 10)()
    count = CK_ULONG(len(slots))
    self.so.C_GetSlotList(CK_TRUE, slots, byref(count))
    return [slots[i] for i in xrange(count.value)]

  def C_GetTokenInfo(self, slot_id):
    token_info = CK_TOKEN_INFO()
    self.so.C_GetTokenInfo(slot_id, byref(token_info))
    return token_info

  def C_OpenSession(self, slot, flags = CKF_SERIAL_SESSION | CKF_RW_SESSION, application = None, notify = CK_NOTIFY()):
    handle = CK_SESSION_HANDLE()
    self.so.C_OpenSession(slot, flags, application, notify, byref(handle))
    return handle.value

  def C_GenerateRandom(self, session, n):
    buffer = create_string_buffer(n)
    self.so.C_GenerateRandom(session, buffer, sizeof(buffer))
    return buffer.raw

  def C_Login(self, session, user, pin):
    self.so.C_Login(session, user, pin, len(pin))

  def C_GetAttributeValue(self, session_handle, object_handle, attributes):
    template = self.adb.getvalue_create_template(attributes)
    self.so.C_GetAttributeValue(session_handle, object_handle, template, len(template))
    self.so.C_GetAttributeValue(session_handle, object_handle, template, len(template))
    return self.adb.from_ctypes(template)

  def C_FindObjectsInit(self, session, template):
    if template:
      self.so.C_FindObjectsInit(session, self.adb.to_ctypes(template), len(template))
      self.so.C_FindObjectsInit(session, None, 0)

  def C_FindObjects(self, session, chunk_size = 10):
    objects = (CK_OBJECT_HANDLE * chunk_size)()
    count   = CK_ULONG(1)
    while count.value > 0:
      self.so.C_FindObjects(session, objects, len(objects), byref(count))
      for i in xrange(count.value):
        yield objects[i]

  def C_GenerateKeyPair(self, session, mechanism_type, public_template, private_template):
    mechanism = CK_MECHANISM(mechanism_type, None, 0)
    public_template  = self.adb.to_ctypes(public_template)
    private_template = self.adb.to_ctypes(private_template)
    public_handle  = CK_OBJECT_HANDLE()
    private_handle = CK_OBJECT_HANDLE()
    self.so.C_GenerateKeyPair(session, byref(mechanism),
                              public_template,  len(public_template),
                              private_template, len(private_template),
                              byref(public_handle), byref(private_handle))
    return public_handle.value, private_handle.value

  def C_SignInit(self, session, mechanism_type, private_key):
    mechanism = CK_MECHANISM(mechanism_type, None, 0)
    self.so.C_SignInit(session, byref(mechanism), private_key)

  def C_Sign(self, session, data):
    n = CK_ULONG()
    self.so.C_Sign(session, data, len(data), None, byref(n))
    sig = create_string_buffer(n.value)
    self.so.C_Sign(session, data, len(data), sig, byref(n))
    return sig.raw

  def C_SignUpdate(self, session, data):
    self.so.C_SignUpdate(session, data, len(data))

  def C_SignFinal(self, session):
    n = CK_ULONG()
    self.so.C_SignFinal(session, None, byref(n))
    sig = create_string_buffer(n.value)
    self.so.C_SignFinal(session, sig, byref(n))
    return sig.raw

  def C_VerifyInit(self, session, mechanism_type, public_key):
    mechanism = CK_MECHANISM(mechanism_type, None, 0)
    self.so.C_VerifyInit(session, byref(mechanism), public_key)

  def C_Verify(self, session, data, signature):
    self.so.C_Verify(session, data, len(data), signature, len(signature))

  def C_VerifyUpdate(self, session, data):
    self.so.C_VerifyUpdate(session, data, len(data))

  def C_VerifyFinal(self, session, signature):
    self.so.C_VerifyFinal(session, signature, len(signature))

  def C_CreateObject(self, session, template):
    template = self.adb.to_ctypes(template)
    handle = CK_OBJECT_HANDLE()
    self.so.C_CreateObject(session, template, len(template), byref(handle))
    return handle.value

  def C_DigestInit(self, session, mechanism_type):
    mechanism = CK_MECHANISM(mechanism_type, None, 0)
    self.so.C_DigestInit(session, byref(mechanism))

  def C_Digest(self, session, data):
    n = CK_ULONG()
    self.so.C_Digest(session, data, len(data), None, byref(n))
    hash = create_string_buffer(n.value)
    self.so.C_Digest(session, data, len(data), hash, byref(n))
    return hash.raw

  def C_DigestUpdate(self, session, data):
    self.so.C_DigestUpdate(session, data, len(data))

  def C_DigestFinal(self, session):
    n = CK_ULONG()
    self.so.C_DigestFinal(session, None, byref(n))
    hash = create_string_buffer(n.value)
    self.so.C_DigestFinal(session, hash, byref(n))
    return hash.raw

__all__ = ["PKCS11"]
__all__.extend(name for name in globals()
               if name.startswith("CK")
               or name.startswith("CRYPTOKI_"))