aboutsummaryrefslogblamecommitdiff
path: root/py11/__init__.py
blob: 80495d9cfe6b02faeb7dc0fa114f5e5b2d5497e2 (plain) (tree)
1
2
3
4
5
6
7
8






                                                                      
 



                         

                         
 

                      



























                                                                                                   
 
                                               




                                       
                                             


                                     
                            












                                  








                                                                      
 


                                                                                                                       

                                



                                                                                                    

                                                                                                                        


                            
                                  











                                                       

                                                                                                  











                                                                          


                                                                            
                                                            
                                                                                       
                                                 
                                                                                       
                                         
 



                                                                  
                
                                                                                     




                                                    

                          
                                                                         

                                   
 





                                                       


















                                                                                         












                                                                 

                                                                                                                  
              
                                                         
                                                                                        

                                                           
                                                     







                                                                          










                                                              




                                                  
                                                
                                       
                                               

                  





                                                                         
 
                                          
                                                    

                                              
                                                             
 



                                                               




                                                                           















                                                              
                                                  
                                        
                                                  

                   



                                               
"""
This is a Python interface to PKCS #11, using the ctypes module from
the Python standard library.

This is not (yet?) a complete implementation.  It's intended primarily
to simplify testing of the underlying PKCS #11 shared library.
"""

from ctypes      import *
from .exceptions import *
from .types      import *
from .constants  import *
from .attributes import *
from .prototypes import *


class PKCS11 (object):
  """
  PKCS #11 API object, encapsulating the PKCS #11 library itself.
  Sample usage:

    from py11 import *

    p11 = PKCS11()
    p11.C_Initialize()
    session = p11.C_OpenSession()
    p11.C_login(session, CK_USER, "secret")
    p11.C_FindObjectsInit(session, {CKA_CLASS: CKO_PRIVATE_KEY, CKA_KEY_TYPE: CKK_EC, CKA_ID: foo})
    keys = list(p11.C_FindObjects(session))
    p11.C_FindObjectsFinal(session)
    if len(keys) != 1:
      raise RuntimeError
    p11.C_SignInit(session, CK_ECDSA_SHA256, keys[0])
    sig = p11.Sign(session, "Your mother was a hamster")
    p11.C_CloseAllSessions(slot)
    p11.C_Finalize()

  The full raw PKCS #11 API is available via the .so attribute, but
  using this can be tricky, both because it requires strict adherence
  to the C API and because one needs to be careful not to run afoul of
  the Python garbage collector.

  The example above uses a set of interface routines built on top of the
  raw PKCS #11 API, which map the API into something a bit more Pythonic.
  """

  def __init__(self, so_name = "libpkcs11.so"):
    self.so_name = so_name
    self.so = CDLL(so_name)
    def raise_on_failure(rv):
      if rv != CKR_OK:
        raise CKR_Exception.ckr_map[rv]
    for name, args in Prototypes.iteritems():
      func = getattr(self.so, name)
      func.restype = raise_on_failure
      func.argtypes = args
    self.adb = AttributeDB()

  def __getattr__(self, name):
    return getattr(self.so, name)

  def C_GetFunctionList(self):
    return self

  @property
  def version(self):
    info = CK_INFO()
    self.so.C_GetInfo(byref(info))
    return info.cryptokiVersion

  # Be very careful if you try to provide your own locking functions.
  # For most purposes, if you really just want locking, you're best
  # off specifying CKF_OS_LOCKING_OK and letting the C code deal with
  # it.  The one case where you might want to provide your own locking
  # is when writing tests to verify behavior of the locking code.
  #
  # We have to stash references to the callback functions passed to
  # C_Initialize() to avoid dumping core when the garbage collector
  # deletes the function pointer instances out from under the C code.

  def C_Initialize(self, flags = 0, create_mutex = None, destroy_mutex = None, lock_mutex = None, unlock_mutex = None):
    if flags == 0 and create_mutex is None and destroy_mutex is None and lock_mutex is None and unlock_mutex is None:
      self._C_Initialize_args = None
      self.so.C_Initialize(None)
    else:
      create_mutex  = CK_CREATEMUTEX()  if create_mutex  is None else CK_CREATEMUTEX(create_mutex)
      destroy_mutex = CK_DESTROYMUTEX() if destroy_mutex is None else CK_DESTROYMUTEX(destroy_mutex)
      lock_mutex    = CK_LOCKMUTEX()    if lock_mutex    is None else CK_LOCKMUTEX(lock_mutex)
      unlock_mutex  = CK_UNLOCKMUTEX()  if unlock_mutex  is None else CK_UNLOCKMUTEX(unlock_mutex)
      self._C_Initialize_args = CK_C_INITIALIZE_ARGS(create_mutex, destroy_mutex, lock_mutex, unlock_mutex, flags, None)
      self.so.C_Initialize(cast(byref(self._C_Initialize_args), CK_VOID_PTR))

  def C_Finalize(self):
    self.so.C_Finalize(None)
    self._C_Initialize_args = None

  def C_GetSlotList(self):
    slots = (CK_SLOT_ID * 10)()
    count = CK_ULONG(len(slots))
    self.so.C_GetSlotList(CK_TRUE, slots, byref(count))
    return [slots[i] for i in xrange(count.value)]

  def C_GetTokenInfo(self, slot_id):
    token_info = CK_TOKEN_INFO()
    self.so.C_GetTokenInfo(slot_id, byref(token_info))
    return token_info

  def C_OpenSession(self, slot, flags = CKF_RW_SESSION, application = None, notify = CK_NOTIFY()):
    flags |= CKF_SERIAL_SESSION
    handle = CK_SESSION_HANDLE()
    self.so.C_OpenSession(slot, flags, application, notify, byref(handle))
    return handle.value

  def C_GenerateRandom(self, session, n):
    buffer = create_string_buffer(n)
    self.so.C_GenerateRandom(session, buffer, sizeof(buffer))
    return buffer.raw

  def C_Login(self, session, user, pin):
    self.so.C_Login(session, user, pin, len(pin))

  def C_GetAttributeValue(self, session_handle, object_handle, *attributes):
    if len(attributes) == 1 and isinstance(attributes[0], (list, tuple)):
      attributes = attributes[0]
    template = self.adb.getvalue_create_template(attributes)
    self.so.C_GetAttributeValue(session_handle, object_handle, template, len(template))
    self.adb.getvalue_allocate_template(template)
    self.so.C_GetAttributeValue(session_handle, object_handle, template, len(template))
    return self.adb.from_ctypes(template)

  def C_FindObjectsInit(self, session, template = None, **kwargs):
    if kwargs:
      assert not template
      template = kwargs
    if template:
      self.so.C_FindObjectsInit(session, self.adb.to_ctypes(template), len(template))
    else:
      self.so.C_FindObjectsInit(session, None, 0)

  def C_FindObjects(self, session, chunk_size = 10):
    objects = (CK_OBJECT_HANDLE * chunk_size)()
    count   = CK_ULONG(1)
    while count.value > 0:
      self.so.C_FindObjects(session, objects, len(objects), byref(count))
      for i in xrange(count.value):
        yield objects[i]

  def Find(self, session, template = None, **kwargs):
    self.C_FindObjectsInit(session, template, **kwargs)
    for obj in self.C_FindObjects(session):
      yield obj
    self.C_FindObjectsFinal(session)

  def _parse_GenerateKeyPair_template(self,
                                      # Attributes common to public and private templates
                                      CKA_ID,
                                      CKA_LABEL         = None,
                                      CKA_TOKEN         = False,
                                      # Attributes only in private template
                                      CKA_SIGN          = False,
                                      CKA_DECRYPT       = False,
                                      CKA_UNWRAP        = False,
                                      CKA_SENSITIVE     = True,
                                      CKA_PRIVATE       = True,
                                      CKA_EXTRACTABLE   = False,
                                      # Finer-grained control for CKA_TOKEN
                                      public_CKA_TOKEN  = False,
                                      private_CKA_TOKEN = False,
                                      # Anything else is only in public template
                                      **kwargs):
    if CKA_LABEL is None:
      CKA_LABEL = CKA_ID
    return (dict(kwargs,
                 CKA_LABEL      = CKA_LABEL,
                 CKA_ID         = CKA_ID,
                 CKA_TOKEN      = public_CKA_TOKEN or CKA_TOKEN),
            dict(CKA_LABEL      = CKA_LABEL,
                 CKA_ID         = CKA_ID,
                 CKA_TOKEN      = private_CKA_TOKEN or CKA_TOKEN,
                 CKA_SIGN       = CKA_SIGN,
                 CKA_DECRYPT    = CKA_DECRYPT,
                 CKA_UNWRAP     = CKA_UNWRAP,
                 CKA_SENSITIVE  = CKA_SENSITIVE,
                 CKA_PRIVATE    = CKA_PRIVATE,
                 CKA_EXTRACTABLE = CKA_EXTRACTABLE))

  def C_GenerateKeyPair(self, session, mechanism_type, public_template = None, private_template = None, **kwargs):
    if kwargs:
      assert not public_template and not private_template
      public_template, private_template = self._parse_GenerateKeyPair_template(**kwargs)
    public_template  = self.adb.to_ctypes(public_template)
    private_template = self.adb.to_ctypes(private_template)
    mechanism = CK_MECHANISM(mechanism_type, None, 0)
    public_handle  = CK_OBJECT_HANDLE()
    private_handle = CK_OBJECT_HANDLE()
    self.so.C_GenerateKeyPair(session, byref(mechanism),
                              public_template,  len(public_template),
                              private_template, len(private_template),
                              byref(public_handle), byref(private_handle))
    return public_handle.value, private_handle.value

  def C_SignInit(self, session, mechanism_type, private_key):
    mechanism = CK_MECHANISM(mechanism_type, None, 0)
    self.so.C_SignInit(session, byref(mechanism), private_key)

  def C_Sign(self, session, data):
    n = CK_ULONG()
    self.so.C_Sign(session, data, len(data), None, byref(n))
    sig = create_string_buffer(n.value)
    self.so.C_Sign(session, data, len(data), sig, byref(n))
    return sig.raw

  def C_SignUpdate(self, session, data):
    self.so.C_SignUpdate(session, data, len(data))

  def C_SignFinal(self, session):
    n = CK_ULONG()
    self.so.C_SignFinal(session, None, byref(n))
    sig = create_string_buffer(n.value)
    self.so.C_SignFinal(session, sig, byref(n))
    return sig.raw

  def C_VerifyInit(self, session, mechanism_type, public_key):
    mechanism = CK_MECHANISM(mechanism_type, None, 0)
    self.so.C_VerifyInit(session, byref(mechanism), public_key)

  def C_Verify(self, session, data, signature):
    self.so.C_Verify(session, data, len(data), signature, len(signature))

  def C_VerifyUpdate(self, session, data):
    self.so.C_VerifyUpdate(session, data, len(data))

  def C_VerifyFinal(self, session, signature):
    self.so.C_VerifyFinal(session, signature, len(signature))

  def C_CreateObject(self, session, template = None, **kwargs):
    if kwargs:
      assert not template
      template = kwargs
    template = self.adb.to_ctypes(template)
    handle = CK_OBJECT_HANDLE()
    self.so.C_CreateObject(session, template, len(template), byref(handle))
    return handle.value

  def C_DigestInit(self, session, mechanism_type):
    mechanism = CK_MECHANISM(mechanism_type, None, 0)
    self.so.C_DigestInit(session, byref(mechanism))

  def C_Digest(self, session, data):
    n = CK_ULONG()
    self.so.C_Digest(session, data, len(data), None, byref(n))
    hash = create_string_buffer(n.value)
    self.so.C_Digest(session, data, len(data), hash, byref(n))
    return hash.raw

  def C_DigestUpdate(self, session, data):
    self.so.C_DigestUpdate(session, data, len(data))

  def C_DigestFinal(self, session):
    n = CK_ULONG()
    self.so.C_DigestFinal(session, None, byref(n))
    hash = create_string_buffer(n.value)
    self.so.C_DigestFinal(session, hash, byref(n))
    return hash.raw

__all__ = ["PKCS11"]
__all__.extend(name for name in globals()
               if name.startswith("CK")
               or name.startswith("CRYPTOKI_"))