#!/usr/bin/python
#
# Copyright (c) 2016, NORDUnet A/S All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
# - Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# - Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# - Neither the name of the NORDUnet nor the names of its contributors may
# be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Utility to upload new a firmware image or FPGA bitstream
"""
import os
import sys
import time
import struct
import serial
import argparse
import getpass
from binascii import crc32
CHUNK_SIZE = 256
FIRMWARE_CHUNK_SIZE = 256
FPGA_CHUNK_SIZE = 4096
def parse_args():
"""
Parse the command line arguments
"""
parser = argparse.ArgumentParser(description = "File uploader",
add_help = True,
formatter_class = argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument('-d', '--device',
dest='device',
default='/dev/ttyUSB0',
help='Name of management port USB serial device',
)
parser.add_argument('--fpga',
dest='fpga',
action='store_true', default=False,
help='Upload FPGA bitstream',
)
parser.add_argument('--firmware',
dest='firmware',
action='store_true', default=False,
help='Upload firmware image',
)
parser.add_argument('--bootloader',
dest='bootloader',
action='store_true', default=False,
help='Upload bootloader image',
)
# positional argument(s)
parser.add_argument('filename')
return parser.parse_args()
def _write(dst, data):
dst.write(data)
#if len(data) == 4:
# print("Wrote 0x{!s}".format(data.encode('hex')))
#else:
# print("Wrote {!r}".format(data))
def _read(dst):
res = ''
x = dst.read(1)
while not x:
x = dst.read(1)
while x:
res += x
x = dst.read(1)
#print ("Read {!r}".format(res))
return res
def _execute(dst, cmd):
_write(dst, '\r')
prompt = _read(dst)
if prompt.endswith('Username: '):
_write(dst, 'so\r')
prompt = _read(dst)
if prompt.endswith('Password: '):
_write(dst, getpass.getpass('SO PIN: ') + '\r')
prompt = _read(dst)
if not prompt.endswith('> '):
#sys.stderr.write('Device does not seem to be ready for a file transfer (got {!r})\n'.format(prompt))
return prompt
_write(dst, cmd + '\r')
response = _read(dst)
return response
def send_file(filename, args, dst):
s = os.stat(filename)
size = s.st_size
src = open(filename, 'rb')
if args.fpga:
# Skip header in FPGA bitstream file
#size -= 0x64
#src.read(0x64)
chunk_size = FPGA_CHUNK_SIZE
response = _execute(dst, 'fpga bitstream upload')
elif args.firmware:
chunk_size = FIRMWARE_CHUNK_SIZE
response = _execute(dst, 'firmware upload')
if 'Access denied' in response:
print 'Access denied'
return False
if not 'OK' in response:
sys.stderr.write('Device did not accept the upload command (got {!r})\n'.format(response))
return False
crc = 0
counter = 0
# 1. Write size of file (4 bytes)
_write(dst, struct.pack('<I', size))
_read(dst)
# 2. Write file contents while calculating CRC-32
while True:
data = src.read(chunk_size)
if not data:
break
dst.write(data)
print("Wrote {!s} bytes (chunk {!s}/{!s})".format(len(data), counter, int(size / chunk_size)))
# read ACK (a counter of number of 4k chunks received)
while True:
ack_bytes = dst.read(4)
if len(ack_bytes) == 4:
break
print('ERROR: Did not receive an ACK, got {!r}'.format(ack_bytes))
dst.write('\r') # eventually get back to the CLI prompt
ack = struct.unpack('<I', ack_bytes)[0]
if ack != counter + 1:
print('ERROR: Did not receive the expected counter as ACK (got {!r}/{!r}, not {!r})'.format(ack, ack_bytes, counter))
flush = dst.read(100)
print('FLUSH data: {!r}'.format(flush))
return False
counter += 1
crc = crc32(data, crc) & 0xffffffff
_read(dst)
# 3. Write CRC-32 (4 bytes)
_write(dst, struct.pack('<I', crc))
_read(dst)
src.close()
if args.fpga:
_execute(dst, 'fpga reset')
_execute(dst, 'exit')
return True
def main(args):
dst = serial.Serial(args.device, 921600, timeout=2)
send_file(args.filename, args, dst)
dst.close()
return True
if __name__ == '__main__':
try:
args = parse_args()
if main(args):
sys.exit(0)
sys.exit(1)
except KeyboardInterrupt:
pass