#!/usr/bin/env python3
#
# Copyright (c) 2016-2017, NORDUnet A/S All rights reserved.
# Copyright: 2020-2021, The Commons Conservancy Cryptech Project
# SPDX-License-Identifier: BSD-3-Clause
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
# - Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# - Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# - Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Implementation of Cryptech RPC protocol multiplexer in Python.
Unlike the original C implementation, this uses SLIP encapsulation
over a SOCK_STREAM channel, because support for SOCK_SEQPACKET is not
what we might wish. We outsource all the heavy lifting for serial and
network I/O to the PySerial and Tornado libraries, respectively.
"""
import os
import sys
import time
import struct
import atexit
import logging
import argparse
import logging.handlers
import serial
import serial.tools.list_ports_posix
import tornado.tcpserver
import tornado.iostream
import tornado.netutil
import tornado.ioloop
import tornado.queues
import tornado.locks
import tornado.gen
from cryptech.libhal import HAL_OK, RPC_FUNC_GET_VERSION, RPC_FUNC_LOGOUT, RPC_FUNC_LOGOUT_ALL
logger = logging.getLogger("cryptech_muxd")
if sys.version_info.major == 2:
def colon_hex(raw):
return ":".join("{:02x}".format(ord(b)) for b in raw)
else:
def colon_hex(raw):
return ":".join("{:02x}".format(b) for b in raw)
SLIP_END = b"\300" # Indicates end of SLIP packet
SLIP_ESC = b"\333" # Indicates byte stuffing
SLIP_ESC_END = b"\334" # ESC ESC_END means END data byte
SLIP_ESC_ESC = b"\335" # ESC ESC_ESC means ESC data byte
Control_U = b"\025" # Console: clear line
Control_M = b"\015" # Console: end of line
def slip_encode(buffer):
"Encode a buffer using SLIP encapsulation."
return SLIP_END + buffer.replace(SLIP_ESC, SLIP_ESC + SLIP_ESC_ESC).replace(SLIP_END, SLIP_ESC + SLIP_ESC_END) + SLIP_END
def slip_decode(buffer):
"Decode a SLIP-encapsulated buffer."
return buffer.strip(SLIP_END).replace(SLIP_ESC + SLIP_ESC_END, SLIP_END).replace(SLIP_ESC + SLIP_ESC_ESC, SLIP_ESC)
def client_handle_get(msg):
"Extract client_handle field from a Cryptech RPC message."
return struct.unpack(">L", msg[4:8])[0]
def client_handle_set(msg, handle):
"Replace client_handle field in a Cryptech RPC message."
return msg[:4] + struct.pack(">L", handle) + msg[8:]
logout_msg = struct.pack(">LL", RPC_FUNC_LOGOUT, 0)
logout_all_msg = struct.pack(">LL", RPC_FUNC_LOGOUT_ALL, 0)
class SerialIOStream(tornado.iostream.BaseIOStream):
"""
Implementation of a Tornado IOStream over a PySerial device.
"""
# In theory, we want zero (non-blocking mode) for both the read
# and write timeouts here so that PySerial will let Tornado handle
# all the select()/poll()/epoll()/kqueue() fun, delivering maximum
# throughput to all. In practice, this has always worked for the
# author, but another developer reports that on some (not all)
# platforms this fails consistently with Tornado reporting write
# timeout errors, presumably as the result of receiving an IOError
# or OSError exception from PySerial. For reasons we don't really
# understand, setting a PySerial write timeout on the order of
# 50-100 ms "solves" this problem. Again in theory, this will
# result in lower throughput if PySerial spends too much time
# blocking on a single serial device when Tornado could be doing
# something useful elsewhere, but such is life.
def __init__(self, device):
self.serial = serial.Serial(device, 921600, timeout = 0, write_timeout = 0.1)
self.serial_device = device
super(SerialIOStream, self).__init__()
def fileno(self):
return self.serial.fileno()
def close_fd(self):
self.serial.close()
def write_to_fd(self, data):
return self.serial.write(data)
if tornado.version > "5":
# .. versionchanged:: 5.0
# Interface redesigned to take a buffer and return a number
# of bytes instead of a freshly-allocated object.
def read_from_fd(self, buf):
buf[:] = self.serial.read(len(buf))
return len(buf) or None
else:
def read_from_fd(self):
return self.serial.read(self.read_chunk_size) or None
class PFUnixServer(tornado.tcpserver.TCPServer):
"""
Variant on tornado.tcpserver.TCPServer, listening on a PF_UNIX
(aka PF_LOCAL) socket instead of a TCP socket.
"""
def __init__(self, serial_stream, socket_filename, mode = 0o600):
super(PFUnixServer, self).__init__()
self.serial = serial_stream
self.socket_filename = socket_filename
self.add_socket(tornado.netutil.bind_unix_socket(socket_filename, mode))
atexit.register(self.atexit_unlink)
def atexit_unlink(self):
try:
os.unlink(self.socket_filename)
except:
pass
class RPCIOStream(SerialIOStream):
"""
Tornado IOStream for a serial RPC channel.
"""
def __init__(self, device):
super(RPCIOStream, self).__init__(device)
self.queues = dict()
self.rpc_input_lock = tornado.locks.Lock()
@tornado.gen.coroutine
def rpc_input(self, query, handle = 0, queue = None):
"Send a query to the HSM."
logger.debug("RPC send: %s", colon_hex(query))
if queue is not None:
self.queues[handle] = queue
else:
del self.queues[handle]
with (yield self.rpc_input_lock.acquire()):
yield self.write(query)
logger.debug("RPC sent")
@tornado.gen.coroutine
def rpc_output_loop(self):
"Handle reply stream HSM -> network."
while True:
try:
logger.debug("RPC UART read")
reply = yield self.read_until(SLIP_END)
except tornado.iostream.StreamClosedError:
logger.info("RPC UART closed")
for q in self.queues.values():
q.put_nowait(None)
return
logger.debug("RPC recv: %s", colon_hex(reply))
if reply == SLIP_END:
continue
try:
handle = client_handle_get(slip_decode(reply))
except:
logger.debug("RPC skipping bad packet")
continue
if handle not in self.queues:
logger.debug("RPC ignoring response: handle 0x%x", handle)
continue
logger.debug("RPC queue put: handle 0x%x, qsize %s", handle, self.queues[handle].qsize())
self.queues[handle].put_nowait(reply)
def logout_all(self):
"Execute an RPC LOGOUT_ALL operation."
return self.rpc_input(slip_encode(logout_all_msg))
class QueuedStreamClosedError(tornado.iostream.StreamClosedError):
"Deferred StreamClosedError passed throught a Queue."
class RPCServer(PFUnixServer):
"""
Serve multiplexed Cryptech RPC over a PF_UNIX socket.
"""
@tornado.gen.coroutine
def handle_stream(self, stream, address):
"Handle one network connection."
handle = self.next_client_handle()
queue = tornado.queues.Queue()
logger.info("RPC connected %r, handle 0x%x", stream, handle)
while True:
try:
logger.debug("RPC socket read, handle 0x%x", handle)
query = yield stream.read_until(SLIP_END)
if len(query) < 9:
continue
query = slip_encode(client_handle_set(slip_decode(query), handle))
yield self.serial.rpc_input(query, handle, queue)
logger.debug("RPC queue wait, handle 0x%x", handle)
reply = yield queue.get()
if reply is None:
raise QueuedStreamClosedError()
reply = SLIP_END + reply
logger.debug("RPC socket write, handle 0x%x: %s", handle, colon_hex(reply))
yield stream.write(reply)
except tornado.iostream.StreamClosedError:
logger.info("RPC closing %r, handle 0x%x", stream, handle)
stream.close()
query = slip_encode(client_handle_set(logout_msg, handle))
yield self.serial.rpc_input(query, handle)
return
client_handle = int(time.time()) << 4
@classmethod
def next_client_handle(cls):
cls.client_handle += 1
cls.client_handle &= 0xFFFFFFFF
return cls.client_handle
class CTYIOStream(SerialIOStream):
"""
Tornado IOStream for a serial console channel.
"""
def __init__(self, device, console_log = None):
super(CTYIOStream, self).__init__(device)
self.attached_cty = None
self.console_log = console_log
@tornado.gen.coroutine
def cty_output_loop(self):
while True:
try:
buffer = yield self.read_bytes(self.read_chunk_size, partial = True)
except tornado.iostream.StreamClosedError:
logger.info("CTY UART closed")
if self.attached_cty is not None:
self.attached_cty.close()
return
try:
futures = []
if self.console_log is not None:
futures.append(self.console_log.write(buffer))
if self.attached_cty is not None:
futures.append(self.attached_cty.write(buffer))
if futures:
yield futures
except tornado.iostream.StreamClosedError:
pass
class CTYServer(PFUnixServer):
"""
Serve Cryptech console over a PF_UNIX socket.
"""
@tornado.gen.coroutine
def handle_stream(self, stream, address):
"Handle one network connection."
if self.serial.attached_cty is not None:
yield stream.write("[Console already in use, sorry]\n")
stream.close()
return
logger.info("CTY connected to %r", stream)
try:
self.serial.attached_cty = stream
while self.serial.attached_cty is stream:
yield self.serial.write((yield stream.read_bytes(1024, partial = True)))
except tornado.iostream.StreamClosedError:
stream.close()
finally:
logger.info("CTY disconnected from %r", stream)
if self.serial.attached_cty is stream:
self.serial.attached_cty = None
class ProbeIOStream(SerialIOStream):
"""
Tornado IOStream for probing a serial port. This is nasty.
"""
def __init__(self, device):
super(ProbeIOStream, self).__init__(device)
@classmethod
@tornado.gen.coroutine
def run_probes(cls, args):
if args.rpc_device is not None and args.cty_device is not None:
return
if args.probe:
devs = set(args.probe)
else:
devs = set(str(port)
for port, desc, hwid in serial.tools.list_ports_posix.comports()
if "VID:PID=0403:6014" in hwid)
devs.discard(args.rpc_device)
devs.discard(args.cty_device)
if not devs:
return
logging.debug("Probing candidate devices %s", " ".join(devs))
results = yield dict((dev, ProbeIOStream(dev).run_probe()) for dev in devs)
for dev, result in results.items():
if result == "cty" and args.cty_device is None:
logger.info("Selecting %s as CTY device", dev)
args.cty_device = dev
if result == "rpc" and args.rpc_device is None:
logger.info("Selecting %s as RPC device", dev)
args.rpc_device = dev
@tornado.gen.coroutine
def run_probe(self):
RPC_query = struct.pack(">LL", RPC_FUNC_GET_VERSION, 0)
RPC_reply = struct.pack(">LLL", RPC_FUNC_GET_VERSION, 0, HAL_OK)
probe_string = SLIP_END + Control_U + SLIP_END + RPC_query + SLIP_END + Control_U + Control_M
yield self.write(probe_string)
yield tornado.gen.sleep(0.5)
response = yield self.read_bytes(self.read_chunk_size, partial = True)
logger.debug("Probing %s: %r %s", self.serial_device, response, colon_hex(response))
is_cty = any(prompt in response for prompt in (b"Username:", b"Password:", b"cryptech>"))
try:
is_rpc = response[response.index(SLIP_END + RPC_reply) + len(SLIP_END + RPC_reply) + 4] == SLIP_END[0]
except ValueError:
is_rpc = False
except IndexError:
is_rpc = False
assert not is_cty or not is_rpc
result = None
if is_cty:
result = "cty"
yield self.write(Control_U)
if is_rpc:
result = "rpc"
yield self.write(SLIP_END)
self.close()
raise tornado.gen.Return(result)
@tornado.gen.coroutine
def main():
parser = argparse.ArgumentParser(formatter_class = argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("-v", "--verbose",
action = "count",
help = "blather about what we're doing")
parser.add_argument("-l", "--log-file",
help = "log to file instead of stderr")
parser.add_argument("-L", "--console-log",
type = argparse.FileType("a"),
help = "log console output to file")
parser.add_argument("-p", "--probe",
nargs = "*",
metavar = "DEVICE",
help = "probe for device UARTs")
parser.add_argument("--rpc-device",
help = "RPC serial device name",
default = os.getenv("CRYPTECH_RPC_CLIENT_SERIAL_DEVICE"))
parser.add_argument("--rpc-socket",
help = "RPC PF_UNIX socket name",
default = os.getenv("CRYPTECH_RPC_CLIENT_SOCKET_NAME",
"/tmp/.cryptech_muxd.rpc"))
parser.add_argument("--rpc-socket-mode",
help = "permission bits for RPC socket inode",
default = 0o600, type = lambda s: int(s, 8))
parser.add_argument("--cty-device",
help = "CTY serial device name",
default = os.getenv("CRYPTECH_CTY_CLIENT_SERIAL_DEVICE"))
parser.add_argument("--cty-socket",
help = "CTY PF_UNIX socket name",
default = os.getenv("CRYPTECH_CTY_CLIENT_SOCKET_NAME",
"/tmp/.cryptech_muxd.cty"))
parser.add_argument("--cty-socket-mode",
help = "permission bits for CTY socket inode",
default = 0o600, type = lambda s: int(s, 8))
args = parser.parse_args()
if args.log_file is not None:
logging.getLogger().handlers[:] = [logging.handlers.WatchedFileHandler(args.log_file)]
logging.getLogger().handlers[0].setFormatter(
logging.Formatter("%(asctime)-15s %(name)s[%(process)d]:%(levelname)s: %(message)s",
"%Y-%m-%d %H:%M:%S"))
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG if args.verbose > 1 else logging.INFO)
if args.probe is not None:
yield ProbeIOStream.run_probes(args)
if args.console_log is not None:
console_log = tornado.iostream.PipeIOStream(args.console_log.fileno())
else:
console_log = None
futures = []
if args.rpc_device is None:
logger.warn("No RPC device found")
else:
rpc_stream = RPCIOStream(device = args.rpc_device)
rpc_server = RPCServer(rpc_stream, args.rpc_socket, args.rpc_socket_mode)
futures.append(rpc_stream.rpc_output_loop())
futures.append(rpc_stream.logout_all())
if args.cty_device is None:
logger.warn("No CTY device found")
else:
cty_stream = CTYIOStream(device = args.cty_device, console_log = console_log)
cty_server = CTYServer(cty_stream, args.cty_socket, args.cty_socket_mode)
futures.append(cty_stream.cty_output_loop())
# Might want to use WaitIterator(dict(...)) here so we can
# diagnose and restart output loops if they fail?
if futures:
yield futures
if __name__ == "__main__":
try:
tornado.ioloop.IOLoop.current().run_sync(main)
except (SystemExit, KeyboardInterrupt):
pass
except:
logger.exception("Unhandled exception")
else:
logger.debug("Main loop exited")