From ebd6c702e4426370a278b95becba3afb83715c0a Mon Sep 17 00:00:00 2001 From: Rob Austein Date: Sat, 3 Jun 2017 10:56:47 -0400 Subject: Add --soft-backup option to cryptech_backup. cryptech_backup is designed to help the user transfer keys from one Cryptech HSM to another, but what is is a user who has no second HSM supposed to do for backup? The --soft-backup option enables a mode in which cryptech_backup generates its own KEKEK instead of getting one from the (nonexistent) target HSM. We make a best-effort attempt to keep this soft KEKEK secure, by wrapping it with a symmetric key derived from a passphrase, using AESKeyWrapWithPadding and PBKDF2, but there's a limit to what a software-only solution can do here. The --soft-backup code depends (heavily) on PyCrypto. --- cryptech_backup | 187 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 183 insertions(+), 4 deletions(-) diff --git a/cryptech_backup b/cryptech_backup index f14f119..76cdfbb 100755 --- a/cryptech_backup +++ b/cryptech_backup @@ -17,6 +17,15 @@ sure only to export keys using a KEKEK known to have been generated by the target HSM. See the unit tests in the source repository for an example of how to fake this in a few lines of Python. +We also implement a software-based variant on this backup mechanism, +for cases where there is no second HSM. The protocol is much the +same, but the KEKEK is generated in software and encrypted using a +symmetric key derived from a passphrase using PBKDF2. This requires +the PyCrypto library, and is only as secure as memory on the machine +where you're running it (so it's theoretically vulnerable to root or +anybody with access to /dev/mem). Don't use this mode unless you +understand the risks, and see the "NOTE WELL" above. + YOU HAVE BEEN WARNED. Be careful out there. """ @@ -72,6 +81,11 @@ def main(): "-u", "--uuid", help = "UUID of existing KEKEK to use") + setup_mutex_group.add_argument( + "-s", "--soft-backup", + action = "store_true", + help = "software-based backup, see warnings") + setup_parser.add_argument( "-k", "--keylen", type = int, @@ -147,9 +161,11 @@ def cmd_setup(args, hsm): """ result = {} + uuids = [] - uuids = [] - if args.uuid: + if args.soft_backup: + SoftKEKEK.generate(args, result) + elif args.uuid: uuids.append(args.uuid) elif not args.new: uuids.extend(hsm.pkey_match( @@ -177,7 +193,11 @@ def cmd_setup(args, hsm): if not result: sys.exit("Could not find suitable KEKEK") - result.update(comment = "KEKEK public key") + if args.soft_backup: + result.update(comment = "KEKEK software keypair") + else: + result.update(comment = "KEKEK public key") + json.dump(result, args.output, indent = 4, sort_keys = True) args.output.write("\n") @@ -249,7 +269,14 @@ def cmd_import(args, hsm): """ db = json.load(args.input) - with hsm.pkey_open(uuid.UUID(db["kekek_uuid"]).bytes) as kekek: + + soft_key = SoftKEKEK.is_soft_key(db) + + with (hsm.pkey_load(SoftKEKEK.recover(db), HAL_KEY_FLAG_USAGE_KEYENCIPHERMENT) + if soft_key else + hsm.pkey_open(uuid.UUID(db["kekek_uuid"]).bytes) + ) as kekek: + for k in db["keys"]: pkcs8 = b64join(k.get("pkcs8", "")) spki = b64join(k.get("spki", "")) @@ -262,6 +289,158 @@ def cmd_import(args, hsm): with hsm.pkey_load(der = spki, flags = flags) as pkey: print "Loaded {} as {}".format(k["uuid"], pkey.uuid) + if soft_key: + kekek.delete() + + +class AESKeyWrapWithPadding(object): + """ + Implementation of AES Key Wrap With Padding from RFC 5649. + """ + + class UnwrapError(Exception): + "Something went wrong during unwrap." + + def __init__(self, key): + from Crypto.Cipher import AES + self.ctx = AES.new(key, AES.MODE_ECB) + + def _encrypt(self, b1, b2): + aes_block = self.ctx.encrypt(b1 + b2) + return aes_block[:8], aes_block[8:] + + def _decrypt(self, b1, b2): + aes_block = self.ctx.decrypt(b1 + b2) + return aes_block[:8], aes_block[8:] + + @staticmethod + def _start_stop(start, stop): # Syntactic sugar + step = -1 if start > stop else 1 + return xrange(start, stop + step, step) + + @staticmethod + def _xor(R0, t): + from struct import pack, unpack + return pack(">Q", unpack(">Q", R0)[0] ^ t) + + def wrap(self, Q): + "RFC 5649 section 4.1." + from struct import pack + m = len(Q) # Plaintext length + if m % 8 != 0: # Pad Q if needed + Q += "\x00" * (8 - (m % 8)) + R = [pack(">LL", 0xa65959a6, m)] # Magic MSB(32,A), build LSB(32,A) + R.extend(Q[i : i + 8] # Append Q + for i in xrange(0, len(Q), 8)) + n = len(R) - 1 + if n == 1: + R[0], R[1] = self._encrypt(R[0], R[1]) + else: + # RFC 3394 section 2.2.1 + for j in self._start_stop(0, 5): + for i in self._start_stop(1, n): + R[0], R[i] = self._encrypt(R[0], R[i]) + R[0] = self._xor(R[0], n * j + i) + assert len(R) == (n + 1) and all(len(r) == 8 for r in R) + return "".join(R) + + def unwrap(self, C): + "RFC 5649 section 4.2." + from struct import unpack + if len(C) % 8 != 0: + raise self.UnwrapError("Ciphertext length {} is not an integral number of blocks" + .format(len(C))) + n = (len(C) / 8) - 1 + R = [C[i : i + 8] for i in xrange(0, len(C), 8)] + if n == 1: + R[0], R[1] = self._decrypt(R[0], R[1]) + else: + # RFC 3394 section 2.2.2 steps (1), (2), and part of (3) + for j in self._start_stop(5, 0): + for i in self._start_stop(n, 1): + R[0] = self._xor(R[0], n * j + i) + R[0], R[i] = self._decrypt(R[0], R[i]) + magic, m = unpack(">LL", R[0]) + if magic != 0xa65959a6: + raise self.UnwrapError("Magic value in AIV should have been 0xa65959a6, was 0x{:02x}" + .format(magic)) + if m <= 8 * (n - 1) or m > 8 * n: + raise self.UnwrapError("Length encoded in AIV out of range: m {}, n {}".format(m, n)) + R = "".join(R[1:]) + assert len(R) == 8 * n + if any(r != "\x00" for r in R[m:]): + raise self.UnwrapError("Nonzero trailing bytes {}".format(R[m:].encode("hex"))) + return R[:m] + + +class SoftKEKEK(object): + """ + Wrapper around all the goo we need to implement soft backups. + Requires PyCrypto on about every other line. + """ + + oid_aesKeyWrap = "\x60\x86\x48\x01\x65\x03\x04\x01\x30" + + def parse_EncryptedPrivateKeyInfo(self, der): + from Crypto.Util.asn1 import DerObject, DerSequence, DerOctetString, DerObjectId + encryptedPrivateKeyInfo = DerSequence() + encryptedPrivateKeyInfo.decode(der) + encryptionAlgorithm = DerSequence() + algorithm = DerObjectId() + encryptedData = DerOctetString() + encryptionAlgorithm.decode(encryptedPrivateKeyInfo[0]) + DerObject.decode(algorithm, encryptionAlgorithm[0]) + DerObject.decode(encryptedData, encryptedPrivateKeyInfo[1]) + if algorithm.payload != self.oid_aesKeyWrap: + raise ValueError + return encryptedData.payload + + def encode_EncryptedPrivateKeyInfo(self, der): + from Crypto.Util.asn1 import DerSequence, DerOctetString + return DerSequence([ + DerSequence([ + chr(0x06) + chr(len(self.oid_aesKeyWrap)) + self.oid_aesKeyWrap + ]).encode(), + DerOctetString(der).encode() + ]).encode() + + def gen_salt(self, bytes = 16): + from Crypto import Random + return Random.new().read(bytes) + + def wrapper(self, salt, keylen = 256, iterations = 8000): + from Crypto.Protocol.KDF import PBKDF2 + from Crypto.Hash import SHA256, HMAC + return AESKeyWrapWithPadding(PBKDF2( + password = getpass.getpass("KEKEK Passphrase: "), + salt = salt, + dkLen = keylen/8, + count = iterations, + prf = lambda p, s: HMAC.new(p, s, SHA256).digest())) + + @classmethod + def is_soft_key(cls, db): + return all(k in db for k in ("kekek_pkcs8", "kekek_salt")) + + @classmethod + def generate(cls, args, result): + from Crypto.PublicKey import RSA + self = cls() + k = RSA.generate(args.keylen) + salt = self.gen_salt() + spki = k.publickey().exportKey(format = "DER") + pkcs8 = self.encode_EncryptedPrivateKeyInfo(self.wrapper(salt).wrap( + k.exportKey(format = "DER", pkcs = 8))) + result.update(kekek_salt = b64(salt), + kekek_pkcs8 = b64(pkcs8), + kekek_pubkey = b64(spki)) + + @classmethod + def recover(cls, db): + self = cls() + return self.wrapper(b64join(db["kekek_salt"])).unwrap( + self.parse_EncryptedPrivateKeyInfo(b64join(db["kekek_pkcs8"]))) + if __name__ == "__main__": main() -- cgit v1.2.3