aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoachim Strömbergson <joachim@secworks.se>2014-05-24 21:46:55 +0200
committerJoachim Strömbergson <joachim@secworks.se>2014-05-24 21:46:55 +0200
commit42a1d0d74e7da2c4502e57fc979a16d9a0206029 (patch)
treed1d602ca9b4cfca9d0f7df0941d3ac329f897be7
parentace1519dcef56e7397330093bd3a55e796f23a09 (diff)
Adding Python code to extract data from the bp entropy source. Note: Not tested yet.
-rwxr-xr-xsrc/sw/entropy_tester.py279
1 files changed, 279 insertions, 0 deletions
diff --git a/src/sw/entropy_tester.py b/src/sw/entropy_tester.py
new file mode 100755
index 0000000..1b0e19d
--- /dev/null
+++ b/src/sw/entropy_tester.py
@@ -0,0 +1,279 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#=======================================================================
+#
+# entropy_tester.py
+# -----------------
+# Test SW for the FPGA entropy tester project.
+#
+# Note: This program requires the PySerial module.
+# http://pyserial.sourceforge.net/
+#
+#
+# Author: Joachim Strömbergson
+# Copyright (c) 2014, Secworks Sweden AB
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in
+# the documentation and/or other materials provided with the
+# distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+#
+#=======================================================================
+
+#-------------------------------------------------------------------
+# Python module imports.
+#-------------------------------------------------------------------
+import sys
+import serial
+import os
+import time
+import threading
+
+
+#-------------------------------------------------------------------
+# Defines.
+#-------------------------------------------------------------------
+# Serial port defines.
+# CONFIGURE YOUR DEVICE HERE!
+SERIAL_DEVICE = '/dev/cu.usbserial-A801SA6T'
+BAUD_RATE = 9600
+BIT_RATE = int(50E6 / BAUD_RATE)
+
+BAUD_RATE2 = 256000
+BIT_RATE2 = int(50E6 / BAUD_RATE2)
+
+DATA_BITS = 8
+STOP_BITS = 1
+
+# Delay times.
+PROC_DELAY_TIME = 0.001
+COMM_DELAY_TIME = 0.01
+
+
+# Verbose operation on/off
+VERBOSE = False
+
+# Memory map.
+SOC = '\x55'
+EOC = '\xaa'
+READ_CMD = '\x10'
+WRITE_CMD = '\x11'
+
+UART_ADDR_PREFIX = '\x00'
+UART_ADDR_BIT_RATE = '\x10'
+UART_ADDR_DATA_BITS = '\x11'
+UART_ADDR_STOP_BITS = '\x12'
+
+BPENT_ADDR_PREFIX = '\x10'
+BPENT_ADDR_WR_RNG1 = '\x00'
+BPENT_ADDR_WR_RNG2 = '\x01'
+BPENT_ADDR_RD_RNG1_RNG2 = '\x10'
+BPENT_ADDR_RD_P = '\x11'
+BPENT_ADDR_RD_N = '\x12'
+
+
+#-------------------------------------------------------------------
+# print_response()
+#
+# Parses a received buffer and prints the response.
+#-------------------------------------------------------------------
+def print_response(buffer):
+ if VERBOSE:
+ print "Length of response: %d" % len(buffer)
+ if buffer[0] == '\xaa':
+ print "Response contains correct Start of Response (SOR)"
+ if buffer[-1] == '\x55':
+ print "Response contains correct End of Response (EOR)"
+
+ response_code = ord(buffer[1])
+
+ if response_code == 0xfe:
+ print "UNKNOWN response code received."
+
+ elif response_code == 0xfd:
+ print "ERROR response code received."
+
+ elif response_code == 0x7f:
+ read_addr = ord(buffer[2]) * 256 + ord(buffer[3])
+ read_data = (ord(buffer[4]) * 16777216) + (ord(buffer[5]) * 65536) +\
+ (ord(buffer[6]) * 256) + ord(buffer[7])
+ print "READ_OK. address 0x%02x = 0x%08x." % (read_addr, read_data)
+
+ elif response_code == 0x7e:
+ read_addr = ord(buffer[2]) * 256 + ord(buffer[3])
+ print "WRITE_OK. address 0x%02x." % (read_addr)
+
+ elif response_code == 0x7d:
+ print "RESET_OK."
+
+ else:
+ print "Response 0x%02x is unknown." % response_code
+ print buffer
+
+
+#-------------------------------------------------------------------
+# read_serial_thread()
+#
+# Function used in a thread to read from the serial port and
+# collect response from coretest.
+#-------------------------------------------------------------------
+def read_serial_thread(serialport):
+ if VERBOSE:
+ print "Serial port response thread started. Waiting for response..."
+
+ buffer = []
+ while True:
+ if serialport.isOpen():
+ response = serialport.read()
+ buffer.append(response)
+ if ((response == '\x55') and len(buffer) > 7):
+ print_response(buffer)
+ buffer = []
+ else:
+ print "No open device yet."
+ time.sleep(COMM_DELAY_TIME)
+
+
+#-------------------------------------------------------------------
+# write_serial_bytes()
+#
+# Send the bytes in the buffer to coretest over the serial port.
+#-------------------------------------------------------------------
+def write_serial_bytes(tx_cmd, serialport):
+ if VERBOSE:
+ print "Command to be sent:", tx_cmd
+
+ for tx_byte in tx_cmd:
+ serialport.write(tx_byte)
+
+ # Allow the device to complete the transaction.
+ time.sleep(COMM_DELAY_TIME)
+
+
+#-------------------------------------------------------------------
+# read_word()
+#-------------------------------------------------------------------
+def read_word(prefix, addr, ser):
+ cmd = [SOC, READ_CMD] + [prefix, addr] + [EOC]
+ write_serial_bytes(cmd , ser)
+
+
+#-------------------------------------------------------------------
+# read_rng1_rng2()
+#-------------------------------------------------------------------
+def read_rng1_rng2(ser):
+ for i in range(3):
+ read_word(BPENT_ADDR_PREFIX, BPENT_ADDR_RD_RNG1_RNG2, ser)
+
+
+#-------------------------------------------------------------------
+# read_n_data()
+#-------------------------------------------------------------------
+def read_n_data(ser):
+ for i in range(10):
+ read_word(BPENT_ADDR_PREFIX, BPENT_ADDR_RD_N, ser)
+
+
+#-------------------------------------------------------------------
+# read_p_data()
+#-------------------------------------------------------------------
+def read_p_data(ser):
+ for i in range(10):
+ read_word(BPENT_ADDR_PREFIX, BPENT_ADDR_RD_P, ser)
+
+
+#-------------------------------------------------------------------
+# main()
+#
+# Parse any arguments and run the tests.
+#-------------------------------------------------------------------
+def main():
+ # Open device
+ ser = serial.Serial()
+ ser.port=SERIAL_DEVICE
+ ser.baudrate=BAUD_RATE
+ ser.bytesize=DATA_BITS
+ ser.parity='N'
+ ser.stopbits=STOP_BITS
+ ser.timeout=1
+ ser.writeTimeout=0
+
+ if VERBOSE:
+ print "Setting up a serial port and starting a receive thread."
+
+ try:
+ ser.open()
+ except:
+ print "Error: Can't open serial device."
+ sys.exit(1)
+
+ # Try and switch baud rate in the FPGA and then here.
+# bit_rate_high = chr((BIT_RATE2 >> 8) & 0xff)
+# bit_rate_low = chr(BIT_RATE2 & 0xff)
+#
+# if VERBOSE:
+# print("Changing to new baud rate.")
+# print("Baud rate: %d" % BAUD_RATE2)
+# print("Bit rate high byte: 0x%02x" % ord(bit_rate_high))
+# print("Bit rate low byte: 0x%02x" % ord(bit_rate_low))
+#
+# write_serial_bytes([SOC, WRITE_CMD, UART_ADDR_PREFIX, UART_ADDR_BIT_RATE,
+# '\x00', '\x00', bit_rate_high, bit_rate_low, EOC], ser)
+# ser.baudrate=BAUD_RATE2
+
+ try:
+ my_thread = threading.Thread(target=read_serial_thread, args=(ser,))
+ except:
+ print "Error: Can't start thread."
+ sys.exit()
+
+ my_thread.daemon = True
+ my_thread.start()
+
+
+ # Perform RNG read ops.
+ read_rng1_rng2(ser)
+ read_n_data(ser)
+ read_p_data(ser)
+
+
+ # Exit nicely.
+ time.sleep(50 * COMM_DELAY_TIME)
+ if VERBOSE:
+ print "Done. Closing device."
+ ser.close()
+
+
+#-------------------------------------------------------------------
+# __name__
+# Python thingy which allows the file to be run standalone as
+# well as parsed from within a Python interpreter.
+#-------------------------------------------------------------------
+if __name__=="__main__":
+ # Run the main function.
+ sys.exit(main())
+
+
+#=======================================================================
+# EOF hash_tester.py
+#=======================================================================