From ec3be7d441b7bc1888186c05db6e6b46abe63879 Mon Sep 17 00:00:00 2001 From: Rob Austein Date: Fri, 18 Sep 2015 01:38:53 -0400 Subject: MUTEX callbacks via ctypes. Beware of Garbage Collector. --- scripts/py11-test.py | 72 +++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 71 insertions(+), 1 deletion(-) (limited to 'scripts') diff --git a/scripts/py11-test.py b/scripts/py11-test.py index 8e189e3..a5e9c88 100644 --- a/scripts/py11-test.py +++ b/scripts/py11-test.py @@ -2,6 +2,7 @@ # doodles figuring out what else py11 should be automating for us. from py11 import * +from struct import pack, unpack def show_token(i, t, strip = True): print "Token in slot #%d:" % i @@ -66,7 +67,76 @@ ec_curve_oid_p521 = "".join(chr(i) for i in (0x06, 0x05, 0x2b, 0x81, 0x04, 0x00, p11 = PKCS11("./libpkcs11.so") -p11.C_Initialize(CKF_OS_LOCKING_OK) +if False: + print "Not using MUTEXes at all" + p11.C_Initialize() + +elif True: + print "Using OS MUTEXes" + p11.C_Initialize(CKF_OS_LOCKING_OK) + +else: + print "Using our own pseudo-MUTEXes" + + # This class probably belongs in the library. + + class Mutexes(object): + + format = "=L" + length = len(pack(format, 0)) + + def __init__(self, verbose = True, flags = 0): + self.init_args = (flags, self.create, self.destroy, self.lock, self.unlock) + self.next_mutex = 0 + self.mutexes = {} + self.verbose = verbose + + def find(self): + if len(self.mutexes) > 0xFFFFFFFF: + raise RuntimeError + while self.next_mutex in self.mutexes: + self.next_mutex = (self.next_mutex + 1) & 0xFFFFFFFF + + def encode(self, n): + return (CK_BYTE * self.length)(*pack(self.format, n)) + + def decode(self, m): + return unpack(self.format, m[:self.length])[0] + + def create(self, mm): + self.find() + if self.verbose: + print "Creating mutex", self.next_mutex + m = self.encode(self.next_mutex) + self.mutexes[self.next_mutex] = [False, m] + mm[0] = m + return CKR_OK + + def destroy(self, m): + mutex = self.decode(m) + if self.verbose: + print "Destroying mutex", mutex + del self.mutexes[mutex] + return CKR_OK + + def lock(self, m): + mutex = self.decode(m) + if self.verbose: + print "Locking mutex", mutex + self.mutexes[mutex][0] = True + return CKR_OK + + def unlock(self, m): + mutex = self.decode(m) + if self.verbose: + print "Unlocking mutex", mutex + self.mutexes[mutex][0] = False + return CKR_OK + + mutexes = Mutexes() + p11.C_Initialize(*mutexes.init_args) + +# Dun with MUTEX insanit and C_Initialize() call slots = p11.C_GetSlotList() -- cgit v1.2.3