# Copyright (c) 2016, NORDUnet A/S
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
# - Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# - Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# - Neither the name of the NORDUnet nor the names of its contributors may
# be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
A Python interface to the Cryptech libhal RPC API.
"""
# A lot of this is hand-generated XDR data structure encoding. If and
# when we ever convert the C library to use data structures processed
# by rpcgen, we may want to rewrite this code to use the output of
# something like https://github.com/floodlight/xdr.git -- in either
# case the generated code would just be for the data structures, we're
# not likely to want to use the full ONC RPC mechanism.
import os
import sys
import time
import uuid
import xdrlib
import serial
import contextlib
SLIP_END = chr(0300) # indicates end of packet
SLIP_ESC = chr(0333) # indicates byte stuffing
SLIP_ESC_END = chr(0334) # ESC ESC_END means END data byte
SLIP_ESC_ESC = chr(0335) # ESC ESC_ESC means ESC data byte
HAL_OK = 0
class HALError(Exception):
"LibHAL error"
table = [None]
@classmethod
def define(cls, **kw):
assert len(kw) == 1
name, text = kw.items()[0]
e = type(name, (cls,), dict(__doc__ = text))
cls.table.append(e)
globals()[name] = e
HALError.define(HAL_ERROR_BAD_ARGUMENTS = "Bad arguments given")
HALError.define(HAL_ERROR_UNSUPPORTED_KEY = "Unsupported key type or key length")
HALError.define(HAL_ERROR_IO_SETUP_FAILED = "Could not set up I/O with FPGA")
HALError.define(HAL_ERROR_IO_TIMEOUT = "I/O with FPGA timed out")
HALError.define(HAL_ERROR_IO_UNEXPECTED = "Unexpected response from FPGA")
HALError.define(HAL_ERROR_IO_OS_ERROR = "Operating system error talking to FPGA")
HALError.define(HAL_ERROR_IO_BAD_COUNT = "Bad byte count")
HALError.define(HAL_ERROR_CSPRNG_BROKEN = "CSPRNG is returning nonsense")
HALError.define(HAL_ERROR_KEYWRAP_BAD_MAGIC = "Bad magic number while unwrapping key")
HALError.define(HAL_ERROR_KEYWRAP_BAD_LENGTH = "Length out of range while unwrapping key")
HALError.define(HAL_ERROR_KEYWRAP_BAD_PADDING = "Non-zero padding detected unwrapping key")
HALError.define(HAL_ERROR_IMPOSSIBLE = "\"Impossible\" error")
HALError.define(HAL_ERROR_ALLOCATION_FAILURE = "Memory allocation failed")
HALError.define(HAL_ERROR_RESULT_TOO_LONG = "Result too long for buffer")
HALError.define(HAL_ERROR_ASN1_PARSE_FAILED = "ASN.1 parse failed")
HALError.define(HAL_ERROR_KEY_NOT_ON_CURVE = "EC key is not on its purported curve")
HALError.define(HAL_ERROR_INVALID_SIGNATURE = "Invalid signature")
HALError.define(HAL_ERROR_CORE_NOT_FOUND = "Requested core not found")
HALError.define(HAL_ERROR_CORE_BUSY = "Requested core busy")
HALError.define(HAL_ERROR_KEYSTORE_ACCESS = "Could not access keystore")
HALError.define(HAL_ERROR_KEY_NOT_FOUND = "Key not found")
HALError.define(HAL_ERROR_KEY_NAME_IN_USE = "Key name in use")
HALError.define(HAL_ERROR_NO_KEY_SLOTS_AVAILABLE = "No key slots available")
HALError.define(HAL_ERROR_PIN_INCORRECT = "PIN incorrect")
HALError.define(HAL_ERROR_NO_CLIENT_SLOTS_AVAILABLE = "No client slots available")
HALError.define(HAL_ERROR_FORBIDDEN = "Forbidden")
HALError.define(HAL_ERROR_XDR_BUFFER_OVERFLOW = "XDR buffer overflow")
HALError.define(HAL_ERROR_RPC_TRANSPORT = "RPC transport error")
HALError.define(HAL_ERROR_RPC_PACKET_OVERFLOW = "RPC packet overflow")
HALError.define(HAL_ERROR_RPC_BAD_FUNCTION = "Bad RPC function number")
HALError.define(HAL_ERROR_KEY_NAME_TOO_LONG = "Key name too long")
HALError.define(HAL_ERROR_MASTERKEY_NOT_SET = "Master key (Key Encryption Key) not set")
HALError.define(HAL_ERROR_MASTERKEY_FAIL = "Master key generic failure")
HALError.define(HAL_ERROR_MASTERKEY_BAD_LENGTH = "Master key of unacceptable length")
HALError.define(HAL_ERROR_KS_DRIVER_NOT_FOUND = "Keystore driver not found")
HALError.define(HAL_ERROR_KEYSTORE_BAD_CRC = "Bad CRC in keystore")
HALError.define(HAL_ERROR_KEYSTORE_BAD_BLOCK_TYPE = "Unsupported keystore block type")
HALError.define(HAL_ERROR_KEYSTORE_LOST_DATA = "Keystore appears to have lost data")
HALError.define(HAL_ERROR_BAD_ATTRIBUTE_LENGTH = "Bad attribute length")
HALError.define(HAL_ERROR_ATTRIBUTE_NOT_FOUND = "Attribute not found")
HALError.define(HAL_ERROR_NO_KEY_INDEX_SLOTS = "No key index slots available")
class Enum(int):
def __new__(cls, name, value):
self = int.__new__(cls, value)
self._name = name
setattr(self.__class__, name, self)
return self
def __str__(self):
return self._name
def __repr__(self):
return "<Enum:{0.__class__.__name__} {0._name}:{0:d}>".format(self)
_counter = 0
@classmethod
def define(cls, names):
symbols = []
for name in names.translate(None, "{}").split(","):
if "=" in name:
name, sep, expr = name.partition("=")
cls._counter = eval(expr.strip())
if not isinstance(cls._counter, int):
raise TypeError
symbols.append(cls(name.strip(), cls._counter))
cls._counter += 1
cls.index = dict((int(symbol), symbol) for symbol in symbols)
globals().update((symbol._name, symbol) for symbol in symbols)
def xdr_packer(self, packer):
packer.pack_uint(self)
class RPCFunc(Enum): pass
RPCFunc.define('''
RPC_FUNC_GET_VERSION = 0,
RPC_FUNC_GET_RANDOM,
RPC_FUNC_SET_PIN,
RPC_FUNC_LOGIN,
RPC_FUNC_LOGOUT,
RPC_FUNC_LOGOUT_ALL,
RPC_FUNC_IS_LOGGED_IN,
RPC_FUNC_HASH_GET_DIGEST_LEN,
RPC_FUNC_HASH_GET_DIGEST_ALGORITHM_ID,
RPC_FUNC_HASH_GET_ALGORITHM,
RPC_FUNC_HASH_INITIALIZE,
RPC_FUNC_HASH_UPDATE,
RPC_FUNC_HASH_FINALIZE,
RPC_FUNC_PKEY_LOAD,
RPC_FUNC_PKEY_FIND,
RPC_FUNC_PKEY_GENERATE_RSA,
RPC_FUNC_PKEY_GENERATE_EC,
RPC_FUNC_PKEY_CLOSE,
RPC_FUNC_PKEY_DELETE,
RPC_FUNC_PKEY_GET_KEY_TYPE,
RPC_FUNC_PKEY_GET_KEY_FLAGS,
RPC_FUNC_PKEY_GET_PUBLIC_KEY_LEN,
RPC_FUNC_PKEY_GET_PUBLIC_KEY,
RPC_FUNC_PKEY_SIGN,
RPC_FUNC_PKEY_VERIFY,
RPC_FUNC_PKEY_LIST,
RPC_FUNC_PKEY_RENAME,
RPC_FUNC_PKEY_MATCH,
RPC_FUNC_PKEY_SET_ATTRIBUTE,
RPC_FUNC_PKEY_GET_ATTRIBUTE,
RPC_FUNC_PKEY_DELETE_ATTRIBUTE,
''')
class HALDigestAlgorithm(Enum): pass
HALDigestAlgorithm.define('''
HAL_DIGEST_ALGORITHM_NONE,
HAL_DIGEST_ALGORITHM_SHA1,
HAL_DIGEST_ALGORITHM_SHA224,
HAL_DIGEST_ALGORITHM_SHA256,
HAL_DIGEST_ALGORITHM_SHA512_224,
HAL_DIGEST_ALGORITHM_SHA512_256,
HAL_DIGEST_ALGORITHM_SHA384,
HAL_DIGEST_ALGORITHM_SHA512
''')
class HALKeyType(Enum): pass
HALKeyType.define('''
HAL_KEY_TYPE_NONE,
HAL_KEY_TYPE_RSA_PRIVATE,
HAL_KEY_TYPE_RSA_PUBLIC,
HAL_KEY_TYPE_EC_PRIVATE,
HAL_KEY_TYPE_EC_PUBLIC
''')
class HALCurve(Enum): pass
HALCurve.define('''
HAL_CURVE_NONE,
HAL_CURVE_P256,
HAL_CURVE_P384,
HAL_CURVE_P521
''')
class HALUser(Enum): pass
HALUser.define('''
HAL_USER_NONE,
HAL_USER_NORMAL,
HAL_USER_SO,
HAL_USER_WHEEL
''')
HAL_KEY_FLAG_USAGE_DIGITALSIGNATURE = (1 << 0)
HAL_KEY_FLAG_USAGE_KEYENCIPHERMENT = (1 << 1)
HAL_KEY_FLAG_USAGE_DATAENCIPHERMENT = (1 << 2)
HAL_KEY_FLAG_TOKEN = (1 << 3)
HAL_KEY_FLAG_PUBLIC = (1 << 4)
class Attribute(object):
def __init__(self, type, value):
self.type = type
self.value = value
def xdr_packer(self, packer):
packer.pack_uint(self.type)
packer.pack_bytes(self.value)
class UUID(uuid.UUID):
def xdr_packer(self, packer):
packer.pack_bytes(self.bytes)
def cached_property(func):
attr_name = "_" + func.__name__
def wrapped(self):
try:
value = getattr(self, attr_name)
except AttributeError:
value = func(self)
setattr(self, attr_name, value)
return value
wrapped.__name__ = func.__name__
return property(wrapped)
class Handle(object):
def __int__(self):
return self.handle
def __cmp__(self, other):
return cmp(self.handle, int(other))
def xdr_packer(self, packer):
packer.pack_uint(self.handle)
class Digest(Handle):
def __init__(self, hsm, handle, algorithm):
self.hsm = hsm
self.handle = handle
self.algorithm = algorithm
def update(self, data):
self.hsm.hash_update(self, data)
def finalize(self, length = None):
return self.hsm.hash_finalize(self, length or self.digest_length)
@cached_property
def algorithm_id(self):
return self.hsm.hash_get_digest_algorithm_id(self.algorithm)
@cached_property
def digest_length(self):
return self.hsm.hash_get_digest_length(self.algorithm)
class LocalDigest(object):
"""
Implements same interface as Digest class, but using PyCrypto, to
support mixed-mode PKey operations. This only supports algorithms
that PyCrypto supports, so no SHA512/224 or SHA512/256, sorry.
"""
def __init__(self, hsm, handle, algorithm, key):
from Crypto.Hash import HMAC, SHA, SHA224, SHA256, SHA384, SHA512
self.hsm = hsm
self.handle = handle
self.algorithm = algorithm
try:
h = self._algorithms[algorithm]
except AttributeError:
self._algorithms = {
HAL_DIGEST_ALGORITHM_SHA1 : SHA.SHA1Hash,
HAL_DIGEST_ALGORITHM_SHA224 : SHA224.SHA224Hash,
HAL_DIGEST_ALGORITHM_SHA256 : SHA256.SHA256Hash,
HAL_DIGEST_ALGORITHM_SHA384 : SHA384.SHA384Hash,
HAL_DIGEST_ALGORITHM_SHA512 : SHA512.SHA512Hash
}
h = self._algorithms[algorithm]
self.digest_length = h.digest_size
self.algorithm_id = chr(0x30) + chr(2 + len(h.oid)) + h.oid
self._context = HMAC.HMAC(key = key, digestmod = h) if key else h()
def update(self, data):
self._context.update(data)
def finalize(self, length = None):
return self._context.digest()
def finalize_padded(self, pkey):
if pkey.key_type not in (HAL_KEY_TYPE_RSA_PRIVATE, HAL_KEY_TYPE_RSA_PUBLIC):
return self.finalize()
# PKCS #1.5 requires the digest to be wrapped up in an ASN.1 DigestInfo object.
from Crypto.Util.asn1 import DerSequence, DerNull, DerOctetString
return DerSequence([DerSequence([self._context.oid, DerNull().encode()]).encode(),
DerOctetString(self.finalize()).encode()]).encode()
class PKey(Handle):
def __init__(self, hsm, handle, uuid):
self.hsm = hsm
self.handle = handle
self.uuid = uuid
def close(self):
self.hsm.pkey_close(self)
def delete(self):
self.hsm.pkey_delete(self)
@cached_property
def key_type(self):
return self.hsm.pkey_get_key_type(self)
@cached_property
def key_flags(self):
return self.hsm.pkey_get_key_flags(self)
@cached_property
def public_key_len(self):
return self.hsm.pkey_get_public_key_len(self)
@cached_property
def public_key(self):
return self.hsm.pkey_get_public_key(self, self.public_key_len)
def sign(self, hash = 0, data = "", length = 1024):
return self.hsm.pkey_sign(self, hash = hash, data = data, length = length)
def verify(self, hash = 0, data = "", signature = None):
self.hsm.pkey_verify(self, hash = hash, data = data, signature = signature)
def set_attribute(self, attr_type, attr_value = None):
self.hsm.pkey_set_attribute(self, attr_type, attr_value)
def get_attribute(self, attr_type):
return self.hsm.pkey_get_attribute(self, attr_type)
def delete_attribute(self, attr_type):
self.hsm.pkey_delete_attribute(self, attr_type)
class HSM(object):
debug = False
mixed_mode = False
_send_delay = 0 # 0.1
def _raise_if_error(self, status):
if status != 0:
raise HALError.table[status]()
def __init__(self, device = os.getenv("CRYPTECH_RPC_CLIENT_SERIAL_DEVICE", "/dev/ttyUSB0")):
while True:
try:
self.tty = serial.Serial(device, 921600, timeout = 0.1)
break
except serial.SerialException:
time.sleep(0.2)
def _write(self, c):
if self.debug:
sys.stdout.write("{:02x}".format(ord(c)))
self.tty.write(c)
if self._send_delay > 0:
time.sleep(self._send_delay)
def _send(self, msg): # Expects an xdrlib.Packer
if self.debug:
sys.stdout.write("+send: ")
self._write(SLIP_END)
for c in msg.get_buffer():
if c == SLIP_END:
self._write(SLIP_ESC)
self._write(SLIP_ESC_END)
elif c == SLIP_ESC:
self._write(SLIP_ESC)
self._write(SLIP_ESC_ESC)
else:
self._write(c)
self._write(SLIP_END)
if self.debug:
sys.stdout.write("\n")
def _recv(self, code): # Returns an xdrlib.Unpacker
if self.debug:
sys.stdout.write("+recv: ")
msg = []
esc = False
while True:
c = self.tty.read(1)
if self.debug and c:
sys.stdout.write("{:02x}".format(ord(c)))
if not c:
time.sleep(0.1)
elif c == SLIP_END and not msg:
continue
elif c == SLIP_END:
if self.debug:
sys.stdout.write("\n")
msg = xdrlib.Unpacker("".join(msg))
if msg.unpack_uint() == code:
return msg
msg = []
if self.debug:
sys.stdout.write("+recv: ")
elif c == SLIP_ESC:
esc = True
elif esc and c == SLIP_ESC_END:
esc = False
msg.append(SLIP_END)
elif esc and c == SLIP_ESC_ESC:
esc = False
msg.append(SLIP_ESC)
else:
msg.append(c)
def _pack(self, packer, args):
for arg in args:
if hasattr(arg, "xdr_packer"):
arg.xdr_packer(packer)
else:
try:
func = getattr(self, "_pack_" + type(arg).__name__)
except AttributeError:
raise RuntimeError("Don't know how to pack {!r} ({!r})".format(arg, type(arg)))
else:
func(packer, arg)
@staticmethod
def _pack_int(packer, arg):
packer.pack_uint(arg)
@staticmethod
def _pack_str(packer, arg):
packer.pack_bytes(arg)
def _pack_tuple(self, packer, arg):
packer.pack_uint(len(arg))
self._pack(packer, arg)
_pack_long = _pack_int
_pack_list = _pack_tuple
@contextlib.contextmanager
def rpc(self, code, *args, **kwargs):
client = kwargs.get("client", 0)
packer = xdrlib.Packer()
packer.pack_uint(code)
packer.pack_uint(client)
self._pack(packer, args)
self._send(packer)
unpacker = self._recv(code)
client = unpacker.unpack_uint()
self._raise_if_error(unpacker.unpack_uint())
yield unpacker
unpacker.done()
def get_version(self):
with self.rpc(RPC_FUNC_GET_VERSION) as r:
return r.unpack_uint()
def get_random(self, n):
with self.rpc(RPC_FUNC_GET_RANDOM, n) as r:
return r.unpack_bytes()
def set_pin(self, user, pin, client = 0):
with self.rpc(RPC_FUNC_SET_PIN, user, pin, client = client):
return
def login(self, user, pin, client = 0):
with self.rpc(RPC_FUNC_LOGIN, user, pin, client = client):
return
def logout(self, client = 0):
with self.rpc(RPC_FUNC_LOGOUT, client = client):
return
def logout_all(self):
with self.rpc(RPC_FUNC_LOGOUT_ALL):
return
def is_logged_in(self, user, client = 0):
with self.rpc(RPC_FUNC_IS_LOGGED_IN, user, client = client):
return
def hash_get_digest_length(self, alg):
with self.rpc(RPC_FUNC_HASH_GET_DIGEST_LEN, alg) as r:
return r.unpack_uint()
def hash_get_digest_algorithm_id(self, alg, max_len = 256):
with self.rpc(RPC_FUNC_HASH_GET_DIGEST_ALGORITHM_ID, alg, max_len) as r:
return r.unpack_bytes()
def hash_get_algorithm(self, handle):
with self.rpc(RPC_FUNC_HASH_GET_ALGORITHM, handle) as r:
return HALDigestAlgorithm.index[r.unpack_uint()]
def hash_initialize(self, alg, key = "", client = 0, session = 0, mixed_mode = None):
if mixed_mode is None:
mixed_mode = self.mixed_mode
if mixed_mode:
return LocalDigest(self, 0, alg, key)
else:
with self.rpc(RPC_FUNC_HASH_INITIALIZE, session, alg, key, client = client) as r:
return Digest(self, r.unpack_uint(), alg)
def hash_update(self, handle, data):
with self.rpc(RPC_FUNC_HASH_UPDATE, handle, data):
return
def hash_finalize(self, handle, length = None):
if length is None:
length = self.hash_get_digest_length(self.hash_get_algorithm(handle))
with self.rpc(RPC_FUNC_HASH_FINALIZE, handle, length) as r:
return r.unpack_bytes()
def pkey_load(self, type, curve, der, flags = 0, client = 0, session = 0):
with self.rpc(RPC_FUNC_PKEY_LOAD, session, type, curve, der, flags, client = client) as r:
return PKey(self, r.unpack_uint(), UUID(bytes = r.unpack_bytes()))
def pkey_find(self, uuid, flags = 0, client = 0, session = 0):
with self.rpc(RPC_FUNC_PKEY_FIND, session, uuid, flags, client = client) as r:
return PKey(self, r.unpack_uint(), uuid)
def pkey_generate_rsa(self, keylen, exponent = "\x01\x00\x01", flags = 0, client = 0, session = 0):
with self.rpc(RPC_FUNC_PKEY_GENERATE_RSA, session, keylen, exponent, flags, client = client) as r:
return PKey(self, r.unpack_uint(), UUID(bytes = r.unpack_bytes()))
def pkey_generate_ec(self, curve, flags = 0, client = 0, session = 0):
with self.rpc(RPC_FUNC_PKEY_GENERATE_EC, session, curve, flags, client = client) as r:
return PKey(self, r.unpack_uint(), UUID(bytes = r.unpack_bytes()))
def pkey_close(self, pkey):
with self.rpc(RPC_FUNC_PKEY_CLOSE, pkey):
return
def pkey_delete(self, pkey):
with self.rpc(RPC_FUNC_PKEY_DELETE, pkey):
return
def pkey_get_key_type(self, pkey):
with self.rpc(RPC_FUNC_PKEY_GET_KEY_TYPE, pkey) as r:
return HALKeyType.index[r.unpack_uint()]
def pkey_get_key_flags(self, pkey):
with self.rpc(RPC_FUNC_PKEY_GET_KEY_FLAGS, pkey) as r:
return r.unpack_uint()
def pkey_get_public_key_len(self, pkey):
with self.rpc(RPC_FUNC_PKEY_GET_PUBLIC_KEY_LEN, pkey) as r:
return r.unpack_uint()
def pkey_get_public_key(self, pkey, length = None):
if length is None:
length = self.pkey_get_public_key_len(pkey)
with self.rpc(RPC_FUNC_PKEY_GET_PUBLIC_KEY, pkey, length) as r:
return r.unpack_bytes()
def pkey_sign(self, pkey, hash = 0, data = "", length = 1024):
assert not hash or not data
if isinstance(hash, LocalDigest):
hash, data = 0, hash.finalize_padded(pkey)
with self.rpc(RPC_FUNC_PKEY_SIGN, pkey, hash, data, length) as r:
return r.unpack_bytes()
def pkey_verify(self, pkey, hash = 0, data = "", signature = None):
assert not hash or not data
if isinstance(hash, LocalDigest):
hash, data = 0, hash.finalize_padded(pkey)
with self.rpc(RPC_FUNC_PKEY_VERIFY, pkey, hash, data, signature):
return
def pkey_list(self, flags = 0, client = 0, session = 0, length = 512):
with self.rpc(RPC_FUNC_PKEY_LIST, session, length, flags, client = client) as r:
return tuple((HALKeyType.index[r.unpack_uint()],
HALCurve.index[r.unpack_uint()],
r.unpack_uint(),
UUID(bytes = r.unpack_bytes()))
for i in xrange(r.unpack_uint()))
def pkey_match(self, type = 0, curve = 0, flags = 0, attributes = (),
previous_uuid = UUID(int = 0), length = 512, client = 0, session = 0):
with self.rpc(RPC_FUNC_PKEY_MATCH, session, type, curve, flags,
attributes, length, previous_uuid, client = client) as r:
return tuple(UUID(bytes = r.unpack_bytes())
for i in xrange(r.unpack_uint()))
def pkey_set_attribute(self, pkey, attr_type, attr_value = None):
if attr_value is None and isinstance(attr_type, Attribute):
attr_type, attr_value = attr_type.type, attr_type.attr_value
with self.rpc(RPC_FUNC_PKEY_SET_ATTRIBUTE, pkey, attr_type, attr_value):
return
def pkey_get_attribute(self, pkey, attr_type):
with self.rpc(RPC_FUNC_PKEY_GET_ATTRIBUTE, pkey, attr_type) as r:
return Attribute(attr_type, r.unpack_bytes())
def pkey_delete_attribute(self, pkey, attr_type):
with self.rpc(RPC_FUNC_PKEY_DELETE_ATTRIBUTE, pkey, attr_type):
return