aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2017-06-03 10:56:47 -0400
committerRob Austein <sra@hactrn.net>2017-06-03 10:56:47 -0400
commit61029eb57165c181497c09549cc2dd0fa9928f16 (patch)
tree497efa9d96e449afde9090ea5357592b069fd6d8
parent6a47490407210471afdd80f009123bd72014db3a (diff)
Add --soft-backup option to cryptech_backup.
cryptech_backup is designed to help the user transfer keys from one Cryptech HSM to another, but what is is a user who has no second HSM supposed to do for backup? The --soft-backup option enables a mode in which cryptech_backup generates its own KEKEK instead of getting one from the (nonexistent) target HSM. We make a best-effort attempt to keep this soft KEKEK secure, by wrapping it with a symmetric key derived from a passphrase, using AESKeyWrapWithPadding and PBKDF2, but there's a limit to what a software-only solution can do here. The --soft-backup code depends (heavily) on PyCrypto.
-rwxr-xr-xcryptech_backup187
1 files changed, 183 insertions, 4 deletions
diff --git a/cryptech_backup b/cryptech_backup
index f14f119..76cdfbb 100755
--- a/cryptech_backup
+++ b/cryptech_backup
@@ -17,6 +17,15 @@ sure only to export keys using a KEKEK known to have been generated by
the target HSM. See the unit tests in the source repository for
an example of how to fake this in a few lines of Python.
+We also implement a software-based variant on this backup mechanism,
+for cases where there is no second HSM. The protocol is much the
+same, but the KEKEK is generated in software and encrypted using a
+symmetric key derived from a passphrase using PBKDF2. This requires
+the PyCrypto library, and is only as secure as memory on the machine
+where you're running it (so it's theoretically vulnerable to root or
+anybody with access to /dev/mem). Don't use this mode unless you
+understand the risks, and see the "NOTE WELL" above.
+
YOU HAVE BEEN WARNED. Be careful out there.
"""
@@ -72,6 +81,11 @@ def main():
"-u", "--uuid",
help = "UUID of existing KEKEK to use")
+ setup_mutex_group.add_argument(
+ "-s", "--soft-backup",
+ action = "store_true",
+ help = "software-based backup, see warnings")
+
setup_parser.add_argument(
"-k", "--keylen",
type = int,
@@ -147,9 +161,11 @@ def cmd_setup(args, hsm):
"""
result = {}
+ uuids = []
- uuids = []
- if args.uuid:
+ if args.soft_backup:
+ SoftKEKEK.generate(args, result)
+ elif args.uuid:
uuids.append(args.uuid)
elif not args.new:
uuids.extend(hsm.pkey_match(
@@ -177,7 +193,11 @@ def cmd_setup(args, hsm):
if not result:
sys.exit("Could not find suitable KEKEK")
- result.update(comment = "KEKEK public key")
+ if args.soft_backup:
+ result.update(comment = "KEKEK software keypair")
+ else:
+ result.update(comment = "KEKEK public key")
+
json.dump(result, args.output, indent = 4, sort_keys = True)
args.output.write("\n")
@@ -249,7 +269,14 @@ def cmd_import(args, hsm):
"""
db = json.load(args.input)
- with hsm.pkey_open(uuid.UUID(db["kekek_uuid"]).bytes) as kekek:
+
+ soft_key = SoftKEKEK.is_soft_key(db)
+
+ with (hsm.pkey_load(SoftKEKEK.recover(db), HAL_KEY_FLAG_USAGE_KEYENCIPHERMENT)
+ if soft_key else
+ hsm.pkey_open(uuid.UUID(db["kekek_uuid"]).bytes)
+ ) as kekek:
+
for k in db["keys"]:
pkcs8 = b64join(k.get("pkcs8", ""))
spki = b64join(k.get("spki", ""))
@@ -262,6 +289,158 @@ def cmd_import(args, hsm):
with hsm.pkey_load(der = spki, flags = flags) as pkey:
print "Loaded {} as {}".format(k["uuid"], pkey.uuid)
+ if soft_key:
+ kekek.delete()
+
+
+class AESKeyWrapWithPadding(object):
+ """
+ Implementation of AES Key Wrap With Padding from RFC 5649.
+ """
+
+ class UnwrapError(Exception):
+ "Something went wrong during unwrap."
+
+ def __init__(self, key):
+ from Crypto.Cipher import AES
+ self.ctx = AES.new(key, AES.MODE_ECB)
+
+ def _encrypt(self, b1, b2):
+ aes_block = self.ctx.encrypt(b1 + b2)
+ return aes_block[:8], aes_block[8:]
+
+ def _decrypt(self, b1, b2):
+ aes_block = self.ctx.decrypt(b1 + b2)
+ return aes_block[:8], aes_block[8:]
+
+ @staticmethod
+ def _start_stop(start, stop): # Syntactic sugar
+ step = -1 if start > stop else 1
+ return xrange(start, stop + step, step)
+
+ @staticmethod
+ def _xor(R0, t):
+ from struct import pack, unpack
+ return pack(">Q", unpack(">Q", R0)[0] ^ t)
+
+ def wrap(self, Q):
+ "RFC 5649 section 4.1."
+ from struct import pack
+ m = len(Q) # Plaintext length
+ if m % 8 != 0: # Pad Q if needed
+ Q += "\x00" * (8 - (m % 8))
+ R = [pack(">LL", 0xa65959a6, m)] # Magic MSB(32,A), build LSB(32,A)
+ R.extend(Q[i : i + 8] # Append Q
+ for i in xrange(0, len(Q), 8))
+ n = len(R) - 1
+ if n == 1:
+ R[0], R[1] = self._encrypt(R[0], R[1])
+ else:
+ # RFC 3394 section 2.2.1
+ for j in self._start_stop(0, 5):
+ for i in self._start_stop(1, n):
+ R[0], R[i] = self._encrypt(R[0], R[i])
+ R[0] = self._xor(R[0], n * j + i)
+ assert len(R) == (n + 1) and all(len(r) == 8 for r in R)
+ return "".join(R)
+
+ def unwrap(self, C):
+ "RFC 5649 section 4.2."
+ from struct import unpack
+ if len(C) % 8 != 0:
+ raise self.UnwrapError("Ciphertext length {} is not an integral number of blocks"
+ .format(len(C)))
+ n = (len(C) / 8) - 1
+ R = [C[i : i + 8] for i in xrange(0, len(C), 8)]
+ if n == 1:
+ R[0], R[1] = self._decrypt(R[0], R[1])
+ else:
+ # RFC 3394 section 2.2.2 steps (1), (2), and part of (3)
+ for j in self._start_stop(5, 0):
+ for i in self._start_stop(n, 1):
+ R[0] = self._xor(R[0], n * j + i)
+ R[0], R[i] = self._decrypt(R[0], R[i])
+ magic, m = unpack(">LL", R[0])
+ if magic != 0xa65959a6:
+ raise self.UnwrapError("Magic value in AIV should have been 0xa65959a6, was 0x{:02x}"
+ .format(magic))
+ if m <= 8 * (n - 1) or m > 8 * n:
+ raise self.UnwrapError("Length encoded in AIV out of range: m {}, n {}".format(m, n))
+ R = "".join(R[1:])
+ assert len(R) == 8 * n
+ if any(r != "\x00" for r in R[m:]):
+ raise self.UnwrapError("Nonzero trailing bytes {}".format(R[m:].encode("hex")))
+ return R[:m]
+
+
+class SoftKEKEK(object):
+ """
+ Wrapper around all the goo we need to implement soft backups.
+ Requires PyCrypto on about every other line.
+ """
+
+ oid_aesKeyWrap = "\x60\x86\x48\x01\x65\x03\x04\x01\x30"
+
+ def parse_EncryptedPrivateKeyInfo(self, der):
+ from Crypto.Util.asn1 import DerObject, DerSequence, DerOctetString, DerObjectId
+ encryptedPrivateKeyInfo = DerSequence()
+ encryptedPrivateKeyInfo.decode(der)
+ encryptionAlgorithm = DerSequence()
+ algorithm = DerObjectId()
+ encryptedData = DerOctetString()
+ encryptionAlgorithm.decode(encryptedPrivateKeyInfo[0])
+ DerObject.decode(algorithm, encryptionAlgorithm[0])
+ DerObject.decode(encryptedData, encryptedPrivateKeyInfo[1])
+ if algorithm.payload != self.oid_aesKeyWrap:
+ raise ValueError
+ return encryptedData.payload
+
+ def encode_EncryptedPrivateKeyInfo(self, der):
+ from Crypto.Util.asn1 import DerSequence, DerOctetString
+ return DerSequence([
+ DerSequence([
+ chr(0x06) + chr(len(self.oid_aesKeyWrap)) + self.oid_aesKeyWrap
+ ]).encode(),
+ DerOctetString(der).encode()
+ ]).encode()
+
+ def gen_salt(self, bytes = 16):
+ from Crypto import Random
+ return Random.new().read(bytes)
+
+ def wrapper(self, salt, keylen = 256, iterations = 8000):
+ from Crypto.Protocol.KDF import PBKDF2
+ from Crypto.Hash import SHA256, HMAC
+ return AESKeyWrapWithPadding(PBKDF2(
+ password = getpass.getpass("KEKEK Passphrase: "),
+ salt = salt,
+ dkLen = keylen/8,
+ count = iterations,
+ prf = lambda p, s: HMAC.new(p, s, SHA256).digest()))
+
+ @classmethod
+ def is_soft_key(cls, db):
+ return all(k in db for k in ("kekek_pkcs8", "kekek_salt"))
+
+ @classmethod
+ def generate(cls, args, result):
+ from Crypto.PublicKey import RSA
+ self = cls()
+ k = RSA.generate(args.keylen)
+ salt = self.gen_salt()
+ spki = k.publickey().exportKey(format = "DER")
+ pkcs8 = self.encode_EncryptedPrivateKeyInfo(self.wrapper(salt).wrap(
+ k.exportKey(format = "DER", pkcs = 8)))
+ result.update(kekek_salt = b64(salt),
+ kekek_pkcs8 = b64(pkcs8),
+ kekek_pubkey = b64(spki))
+
+ @classmethod
+ def recover(cls, db):
+ self = cls()
+ return self.wrapper(b64join(db["kekek_salt"])).unwrap(
+ self.parse_EncryptedPrivateKeyInfo(b64join(db["kekek_pkcs8"])))
+
if __name__ == "__main__":
main()