#!/usr/bin/env python # -*- coding: utf-8 -*- #======================================================================= # # trng_extract.py # -------------- # This program extracts data values from the trng. The program supports # reading from the entropy providers as well as the rng output. # Extraxted data can be delivered in text or binary form. # # # Author: Joachim Strömbergson, Paul Sekirk # Copyright (c) 2014, SUNET # All rights reserved. # # Redistribution and use in source and binary forms, with or # without modification, are permitted provided that the following # conditions are met: # # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in # the documentation and/or other materials provided with the # distribution. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, # STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF # ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # #======================================================================= #------------------------------------------------------------------- # Python module imports. #------------------------------------------------------------------- import sys import io import fcntl import argparse #------------------------------------------------------------------- # Defines. #------------------------------------------------------------------- # Output control. Shall be flags set by the argument parser. VERBOSE = False DEBUG = False BINARY_OUTPUT = True # TRNG defines. TRNG_PREFIX = 0x00 TRNG_ADDR_NAME0 = 0x00 TRNG_ADDR_NAME1 = 0x01 TRNG_ADDR_VERSION = 0x02 ENT1_PREFIX = 0x05 CSPRNG_PREFIX = 0x0b CSPRNG_DATA = 0x20 # ENT1 defines. This is the Avalanche noise based entropy provider. ENT11_PREFIX = 0x05 ENT1_NOISE = 0x20 ENT1_DELTA = 0x30 # ENT2 defines. This is the ROSC entropy provider. ENT2_PREFIX = 0x06 ENT2_ADDR_NAME0 = 0x00 ENT2_ADDR_NAME1 = 0x01 ENT2_ADDR_VERSION = 0x02 ENT2_DATA = 0x20 ENT2_CTRL = 0x10 ENT2_STATUS = 0x11 ENT2_ENT_DATA = 0x20 ENT2_ENT_RAW = 0x21 ENT2_ROSC_OUT = 0x22 # Mixer defines MIXER_PREFIX = 0x0a # CSPRNG defines CSPRNG_PREFIX = 0x0b # Command codes SOC = 0x55 EOC = 0xaa READ_CMD = 0x10 WRITE_CMD = 0x11 RESET_CMD = 0x01 # Response codes SOR = 0xaa EOR = 0x55 READ_OK = 0x7f WRITE_OK = 0x7e RESET_OK = 0x7d UNKNOWN = 0xfe ERROR = 0xfd # I2C interface defines # from /usr/include/linux/i2c-dev.h I2C_SLAVE = 0x0703 I2C_DEVICE = "/dev/i2c-2" I2C_ADDR = 0x0f # Number of 32 bit data words extracted in a run. # Should be set by the arg parser. NUM_WORDS = 40000000 #---------------------------------------------------------------- # hexlist() # # Helper function to cretae a list of hex numbers from a # given list of values. #---------------------------------------------------------------- def hexlist(list): return "[ " + ' '.join('%02x' % b for b in list) + " ]" #---------------------------------------------------------------- # I2C class # # Handles the actual device including reading and writing # bytes from the device. #---------------------------------------------------------------- class I2C: # file handle for the i2c device file = None # constructor: initialize the i2c communications channel def __init__(self, dev, addr): self.dev = dev self.addr = addr try: self.file = io.FileIO(self.dev, 'r+b') except IOError as e: print "Unable to open %s: %s" % (self.dev, e.strerror) sys.exit(1) try: fcntl.ioctl(self.file, I2C_SLAVE, self.addr) except IOError as e: print "Unable to set I2C slave device 0x%02x: %s" % (self.addr, e.strerror) sys.exit(1) # destructor: close the i2c communications channel def __del__(self): if (self.file): self.file.close() # write a command to the i2c device def write(self, buf): if DEBUG: print "write %s" % hexlist(buf) self.file.write(bytearray(buf)) # read one response byte from the i2c device def read(self): # read() on the i2c device will only return one byte at a time, # and tc.get_resp() needs to parse the response one byte at a time return ord(self.file.read(1)) #---------------------------------------------------------------- # Commerror() # # Empty class exception eater. #---------------------------------------------------------------- class Commerror(Exception): pass #---------------------------------------------------------------- # Comm # # Class for communicating with the HW via the I2C interface #---------------------------------------------------------------- class Comm: def __init__(self): self.i2c = I2C(I2C_DEVICE, I2C_ADDR) def send_write_cmd(self, prefix, addr, data): buf = [SOC, WRITE_CMD, prefix, addr] for s in (24, 16, 8, 0): buf.append(data >> s & 0xff) buf.append(EOC) self.i2c.write(buf) def send_read_cmd(self, prefix, addr): buf = [SOC, READ_CMD, prefix, addr, EOC] self.i2c.write(buf) def get_resp(self): buf = [] len = 2 i = 0 while i < len: b = self.i2c.read() if ((i == 0) and (b != SOR)): # we've gotten out of sync, and there's probably nothing we can do print "response byte 0: expected 0x%02x (SOR), got 0x%02x" % (SOR, b) raise CommError() elif (i == 1): # response code try: # anonymous dictionary of message lengths len = {READ_OK:9, WRITE_OK:5, RESET_OK:3, ERROR:4, UNKNOWN:4}[b] except KeyError: # unknown response code # we've gotten out of sync, and there's probably nothing we can do print "unknown response code 0x%02x" % b raise CommError() buf.append(b) i += 1 if DEBUG: print "read %s" % hexlist(buf) return buf def write_data(self, prefix, addr, data): self.send_write_cmd(prefix, addr, data) return self.get_resp() def read_data(self, prefix, addr): self.send_read_cmd(prefix, addr) return self.get_resp() #---------------------------------------------------------------- # print_data() # # Print either text or binary data to std out. #---------------------------------------------------------------- def print_data(my_data): my_bytes = my_data[4 : 8] if (BINARY_OUTPUT): for my_byte in my_bytes: sys.stdout.write(chr(my_byte)) else: print("0x%02x 0x%02x 0x%02x 0x%02x" % (my_bytes[0], my_bytes[1], my_bytes[2], my_bytes[3])) #---------------------------------------------------------------- # wait_ready() # # Wait for the ready bit in the status register for the # given core to be set accessible via the given device. #---------------------------------------------------------------- def wait_ready(dev, prefix, addr): my_status = False while not my_status: my_status = dev.read_data(prefix, addr)[7] if VERBOSE: print("Status: %s" % my_status) #---------------------------------------------------------------- # get_avalanche_entropy() #---------------------------------------------------------------- def get_avalanche_entropy(dev): if VERBOSE: print "Reading avalanche entropy data." for i in range(NUM_WORDS): dev.read_data(ENT1_PREFIX, ENT1_DATA) #---------------------------------------------------------------- # get_rosc_entropy() #---------------------------------------------------------------- def get_rosc_entropy(dev): if VERBOSE: print "Reading rosc entropy data." # for i in range(NUM_WORDS): for i in range(10): wait_ready(dev, ENT2_PREFIX, ENT2_STATUS) my_data = dev.read_data(ENT2_PREFIX, ENT2_ENT_DATA) print_data(my_data) #---------------------------------------------------------------- # looptest() # # Simple test that loops a large number of times to see # if the ready bit is ever cleared. #---------------------------------------------------------------- def looptest(dev): print("TRNG name: ", my_commdev.read_data(TRNG_PREFIX, TRNG_ADDR_NAME0)) print("ENT2 status: ", my_commdev.read_data(ENT2_PREFIX, ENT2_STATUS)) for i in range(100000000): ent_data = dev.read_data(ENT2_PREFIX, ENT2_ENT_DATA) ent_status = dev.read_data(ENT2_PREFIX, ENT2_STATUS) if not ent_status[7]: print("Got: %d" % ent_status[7]) #---------------------------------------------------------------- # main #---------------------------------------------------------------- def main(): if VERBOSE: print("Starting trng data extraction.") my_commdev = Comm() # looptest(my_commdev) # get_avalanche_entropy() # get_avalanche_delta() get_rosc_entropy(my_commdev) # get_rosc_raw() # get_rng_data() #------------------------------------------------------------------- # __name__ # Python thingy which allows the file to be run standalone as # well as parsed from within a Python interpreter. #------------------------------------------------------------------- if __name__=="__main__": # Run the main function. sys.exit(main()) #======================================================================= # EOF trng_extract.py #=======================================================================