From 3897a8269bf39f55314959e4284a01fccd7fa2b2 Mon Sep 17 00:00:00 2001 From: Matt Mets Date: Mon, 6 Mar 2023 12:41:21 +0100 Subject: [PATCH] More agressive python linting --- hw/production_test/Makefile | 9 +- hw/production_test/encode_usb_strings.py | 53 ++- hw/production_test/icebin2nvcm.py | 35 +- hw/production_test/icenvcm.py | 397 +++++++++++++---------- hw/production_test/production_test.py | 10 +- hw/production_test/requirements.txt | 6 +- hw/production_test/reset.py | 14 +- hw/production_test/usb_test.py | 188 +++++------ 8 files changed, 380 insertions(+), 332 deletions(-) diff --git a/hw/production_test/Makefile b/hw/production_test/Makefile index 6ca83eb..632c24b 100644 --- a/hw/production_test/Makefile +++ b/hw/production_test/Makefile @@ -7,10 +7,13 @@ PYTHON_FILES = \ encode_usb_strings.py \ reset.py +# autopep8: Fixes simple format errors automatically +# mypy: static type hint analysis +# pylint: pep8 and static code analysis lint: autopep8 --in-place --max-line-length 70 --aggressive --aggressive ${PYTHON_FILES} mypy --disallow-untyped-defs ${PYTHON_FILES} - pycodestyle --max-line-length 70 ${PYTHON_FILES} + pylint --generated-member=usb1.TRANSFER_COMPLETED,usb1.USBErrorInterrupted,usb1.USBErrorIO ${PYTHON_FILES} # Check that the NVCM generator gives a correct output for a known binary verify-nvcm: @@ -21,4 +24,6 @@ verify: time ./icenvcm.py --verify nvcm_test/application_fpga.bin program: - time ./icenvcm.py --my-design-is-good-enough --write ../application_fpga/application_fpga.bin --ignore-blank + ./icenvcm.py -i + time ./icenvcm.py --my-design-is-good-enough --ignore-blank --write ../application_fpga/application_fpga.bin --verify nvcm_test/application_fpga.bin + ./icenvcm.py -b diff --git a/hw/production_test/encode_usb_strings.py b/hw/production_test/encode_usb_strings.py index ef6bc63..eb4d2e4 100755 --- a/hw/production_test/encode_usb_strings.py +++ b/hw/production_test/encode_usb_strings.py @@ -1,8 +1,5 @@ #!/usr/bin/env python - -manufacturer = 'Mullvad' -product = 'MTA1-USB-V1' -serial = "68de5d27-e223-4874-bc76-a54d6e84068f" +"""Convert python strings to UTF16 USB descriptors""" def descriptor_to_string(descriptor: bytes) -> str: @@ -11,17 +8,17 @@ def descriptor_to_string(descriptor: bytes) -> str: Keyword arguments: descriptor -- UTF-16 formatted USB descriptor string """ - bLength = descriptor[0] - if bLength != len(descriptor): - raise Exception( - 'Length mismatch, length_field:{:} actual_length:{:}' - .format(bLength, len(descriptor))) + b_length = descriptor[0] + if b_length != len(descriptor): + raise ValueError( + 'Length mismatch, ' + + f'length_field:{b_length} length:{len(descriptor)}') - bDescriptorType = descriptor[1] - if bDescriptorType != 0x03: - raise Exception( - 'Type mismatch, bDescriptorType:{:02x} expected:0x03' - .format(bDescriptorType)) + b_descriptor_type = descriptor[1] + if b_descriptor_type != 0x03: + raise ValueError( + f'Type mismatch, bDescriptorType:{b_descriptor_type:02x}' + + 'expected:0x03') return descriptor[2:].decode('utf-16', errors='strict') @@ -42,6 +39,10 @@ def string_to_descriptor(string: str) -> bytes: if __name__ == "__main__": + MANUFACTURER = 'Mullvad' + PRODUCT = 'MTA1-USB-V1' + SERIAL = "68de5d27-e223-4874-bc76-a54d6e84068f" + # serial = bytes([ # 0x14,0x03, # 0x32,0x00,0x30,0x00,0x31,0x00,0x37,0x00,0x2D,0x00, @@ -73,32 +74,30 @@ if __name__ == "__main__": # print(['{:02x} '.format(i) for i in sample_mfr]) # print(['{:02x} '.format(i) for i in rt]) - with open('usb_strings.h', 'w') as f: + with open('usb_strings.h', 'w', encoding='utf-8') as f: f.write('#ifndef USB_STRINGS\n') f.write('#define USB_STRINGS\n') f.write( - 'unsigned char __code Prod_Des[]={{ // "{}"\n' - .format(product)) + f'unsigned char __code Prod_Des[]={{ // "{PRODUCT}"\n') f.write(' ') - f.write(', '.join(['0x{:02x}'.format(i) - for i in string_to_descriptor(product)])) + f.write(', '.join([f'0x{i:02x}' + for i in string_to_descriptor(PRODUCT)])) f.write('\n};\n') f.write( - 'unsigned char __code Manuf_Des[]={{ // "{}"\n' - .format(manufacturer)) + 'unsigned char __code Manuf_Des[]={ ' + + f'// "{MANUFACTURER}"\n') f.write(' ') - f.write(', '.join(['0x{:02x}'.format(i) - for i in string_to_descriptor(manufacturer)])) + f.write(', '.join([f'0x{i:02x}' + for i in string_to_descriptor(MANUFACTURER)])) f.write('\n};\n') f.write( - 'unsigned char __code SerDes[]={{ // "{}"\n' - .format(serial)) + f'unsigned char __code SerDes[]={{ // "{SERIAL}"\n') f.write(' ') - f.write(', '.join(['0x{:02x}'.format(i) - for i in string_to_descriptor(serial)])) + f.write(', '.join([f'0x{i:02x}' + for i in string_to_descriptor(SERIAL)])) f.write('\n};\n') f.write('#endif\n') diff --git a/hw/production_test/icebin2nvcm.py b/hw/production_test/icebin2nvcm.py index d67950a..685435d 100755 --- a/hw/production_test/icebin2nvcm.py +++ b/hw/production_test/icebin2nvcm.py @@ -1,30 +1,33 @@ #!/usr/bin/env python - -import sys - -# -# bistream to NVCM command conversion is based on majbthrd's work in -# https://github.com/YosysHQ/icestorm/pull/272 -# +""" bistream to NVCM command conversion is based on majbthrd's work +in https://github.com/YosysHQ/icestorm/pull/272 +""" def icebin2nvcm(bitstream: bytes) -> list[str]: + """Convert an ice40 bitstream into an NVCM program + + The NVCM format is a set of commands that are run against the + NVCM state machine, which instruct the state machine to write + the bitstream into the NVCM. It's somewhat convoluted! + + Keyword arguments: + bitstream -- Bitstream to convert into NVCM format + """ + # ensure that the file starts with the correct bistream preamble for origin in range(0, len(bitstream)): if bitstream[origin:origin + 4] == bytes.fromhex('7EAA997E'): break if origin == len(bitstream): - raise Exception("Preamble not found") + raise ValueError('Preamble not found') - print("Found preamable at %08x" % (origin), file=sys.stderr) + print(f'Found preamable at {origin:08x}') # there might be stuff in the header with vendor tools, # but not usually in icepack produced output, so ignore it for now - # todo: what is the correct size? - print(len(bitstream)) - rows = [] rows.append('06') @@ -43,7 +46,7 @@ def icebin2nvcm(bitstream: bytes) -> list[str]: addr = pos - origin nvcm_addr = int(addr / 328) * 4096 + (addr % 328) - row_str = "02%06x%s" % (nvcm_addr, row.hex()) + row_str = f'02{nvcm_addr:06x}{row.hex()}' row_str = ' '.join([row_str[i:i + 2] for i in range(0, len(row_str), 2)]) + ' ' @@ -71,11 +74,11 @@ if __name__ == "__main__": args = parser.parse_args() with open(args.infile, 'rb') as f_in: - bitstream = f_in.read() + data = f_in.read() - cmds = icebin2nvcm(bitstream) + cmds = icebin2nvcm(data) - with open(args.outfile, 'w') as f_out: + with open(args.outfile, 'w', encoding='utf-8') as f_out: for cmd in cmds: f_out.write(cmd) f_out.write('\n') diff --git a/hw/production_test/icenvcm.py b/hw/production_test/icenvcm.py index 0de07b1..cb3d735 100755 --- a/hw/production_test/icenvcm.py +++ b/hw/production_test/icenvcm.py @@ -20,16 +20,18 @@ # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. # +"""NVCM programming tool for iCE40 FPGAs""" -import usb_test -from icebin2nvcm import icebin2nvcm -import sys -from time import sleep import os +import sys +import struct +from time import sleep +from usb_test import IceFlasher +from icebin2nvcm import icebin2nvcm class Nvcm(): - # todo: add expected bitstream sizes + """NVCM programming interface for ICE40 FPGAs""" id_table = { 0x06: "ICE40LP8K / ICE40HX8K", 0x07: "ICE40LP4K / ICE40HX4K", @@ -46,6 +48,12 @@ class Nvcm(): 0x21: "ICE40UP3K", } + banks = { + 'nvcm': 0x00, + 'trim': 0x10, + 'sig': 0x20 + } + def __init__( self, pins: dict, @@ -54,7 +62,7 @@ class Nvcm(): self.pins = pins self.debug = debug - self.flasher = usb_test.ice40_flasher() + self.flasher = IceFlasher() self.flasher.gpio_put(self.pins['5v_en'], False) self.flasher.gpio_put(self.pins['crst'], False) @@ -86,24 +94,35 @@ class Nvcm(): def enable(self, cs: bool, reset: bool = True) -> None: """Set the CS and Reset pin states""" - # gpio.write(cs << cs_pin | reset << reset_pin) self.flasher.gpio_put(self.pins['ss'], cs) self.flasher.gpio_put(self.pins['crst'], reset) def writehex(self, s: str, toggle_cs: bool = True) -> None: + """Write SPI data to the target device + + Keyword arguments: + s -- data to send (formatted as a string of hex data) + toggle_cs -- If true, automatically lower the CS pin before + transmit, and raise it after transmit + """ if self.debug and not s == "0500": print("TX", s) - x = bytes.fromhex(s) + data = bytes.fromhex(s) - # b = dev.exchange(x, duplex=True, readlen=len(x)) - self.flasher.spi_write(x, toggle_cs) + self.flasher.spi_write(data, toggle_cs) def sendhex(self, s: str, toggle_cs: bool = True) -> bytes: + """Perform a full-duplex write/read on the target device + + Keyword arguments: + s -- data to send (formatted as a string of hex data) + toggle_cs -- If true, automatically lower the CS pin before + transmit, and raise it after transmit + """ if self.debug and not s == "0500": print("TX", s) x = bytes.fromhex(s) - # b = dev.exchange(x, duplex=True, readlen=len(x)) b = self.flasher.spi_rxtx(x, toggle_cs) if self.debug and not s == "0500": @@ -111,15 +130,16 @@ class Nvcm(): return b def delay(self, count: int) -> None: - # run the clock with no CS asserted - # dev.exchange(b'\x00', duplex=True, readlen=count) - # self.sendhex('00' * count, False) + """'Delay' by sending clocks with CS de-asserted + + Keyword arguments: + count -- Number of bytes to clock + """ + self.flasher.spi_clk_out(count) - def tck(self, count: int) -> None: - self.delay((count >> 3) * 2) - def init(self) -> None: + """Reboot the part and enter SPI command mode""" if self.debug: print("init") self.enable(cs=True, reset=True) @@ -129,42 +149,53 @@ class Nvcm(): sleep(0.1) self.enable(cs=True, reset=True) - def status_wait(self, count: int = 1000) -> None: - for i in range(0, count): - self.tck(5000) + def status_wait(self) -> None: + """Wait for the status register to clear""" + for i in range(0, 1000): + self.delay(1250) ret = self.sendhex("0500") - x = int.from_bytes(ret, byteorder='big') + status = struct.unpack('>H', ret)[0] - # print("x=%04x" %(x)) - - if (x & 0x00c1) == 0: + if (status & 0x00c1) == 0: return - print("x=%04x" % (x)) - raise Exception("status failed to clear") + raise ValueError("status failed to clear") def command(self, cmd: str) -> None: + """Send a command to the NVCM state machine""" self.writehex(cmd) self.status_wait() - self.tck(8) + self.delay(2) def pgm_enable(self) -> None: + """Enable program mode""" self.command("06") def pgm_disable(self) -> None: + """Disable program mode""" self.command("04") def enable_access(self) -> None: - # ! Shift in Access-NVCM instruction; - # SMCInstruction[1] = 0x70807E99557E; + """Send the 'access NVCM' instruction""" self.command("7eaa997e010e") def read_bytes( self, + cmd: int, address: int, - length: int = 8, - cmd: int = 0x03) -> bytes: - """Returns a byte array of the contents""" + length: int = 8) -> bytes: + """Read NVCM memory and return as a byte array + + Known read commands are: + 0x03: Read NVCM bank + 0x84: Read RF + + Keyword arguments: + cmd -- Read command + address -- NVCM memory address to read from + length -- Number of bytes to read + """ + msg = '' msg += ("%02x%06x" % (cmd, address)) msg += ("00" * 9) # dummy bytes @@ -173,76 +204,79 @@ class Nvcm(): return ret[4 + 9:] - def read( + def read_int( self, - address: int, - length: int = 8, - cmd: int = 0x03) -> int: - """Returns a big integer""" + cmd: int, + address: int) -> int: + """Read NVCM memory and return as an integer - val = self.read_bytes(address, length, cmd) - x = 0 - for i in range(0, length): - x = x << 8 | val[i] - return x + Read commands are documented in read_bytes - def write(self, address: int, data: str, cmd: int = 0x02) -> None: + Keyword arguments: + cmd -- Read command + address -- NVCM memory address to read from + """ + + val = self.read_bytes(cmd, address, 8) + return struct.unpack('>Q', val)[0] + + def write(self, cmd: int, address: int, data: str) -> None: + """Write data to the NVCM memory + + Keyword arguments: + cmd -- Write command + address -- NVCM memory address to write to + length -- Number of bytes to write + """ self.writehex("%02x%06x" % (cmd, address) + data) try: self.status_wait() - except Exception as e: + except Exception as exc: raise Exception( "WRITE FAILED: cmd=%02x address=%06x data=%s" % - (cmd, address, data)) + (cmd, address, data)) from exc - self.tck(8) + self.delay(2) - def bank_select(self, bank: int) -> None: - self.write(cmd=0x83, address=0x000025, data="%02x" % (bank)) + def bank_select(self, bank: str) -> None: + """ Select the active NVCM bank to target - def select_nvcm(self) -> None: - # ! Shift in Restore Access-NVCM instruction; - # SDR 40 TDI(0x00A40000C1); - self.bank_select(0x00) + Keyword arguments: + bank -- NVCM bank: nvcm, trim, or sig + """ - def select_trim(self) -> None: - # ! Shift in Trim setup-NVCM instruction; - # SDR 40 TDI(0x08A40000C1); - self.bank_select(0x10) - - def select_sig(self) -> None: - # ! Shift in Access Silicon Signature instruction; - # IDInstruction[1] = 0x04A40000C1; - # SDR 40 TDI(IDInstruction[1]); - self.bank_select(0x20) + self.write(0x83, 0x000025, f"{self.banks[bank]:02x}") def read_trim(self) -> int: - # ! Shift in Access-NVCM instruction; - # SMCInstruction[1] = 0x70807E99557E; + """Read the RF trim register""" self.enable_access() # ! Shift in READ_RF(0x84) instruction; # SDR 104 TDI(0x00000000000000000004000021); - x = self.read(cmd=0x84, address=0x000020, length=8) - self.tck(8) + val = self.read_int(0x84, 0x000020) + self.delay(2) # print("FSM Trim Register %x" % (x)) - self.select_nvcm() - return x + self.bank_select('nvcm') + return val def write_trim(self, data: str) -> None: + """Write to the RF trim register + + Keyword arguments: + data -- Hex-formatted string, should be 8 bytes of data + """ # ! Setup Programming Parameter in Trim Registers; # ! Shift in Trim setup-NVCM instruction; # TRIMInstruction[1] = 0x000000430F4FA80004000041; - self.write(cmd=0x82, address=0x000020, data=data) + self.write(0x82, 0x000020, data) def nvcm_enable(self) -> None: + """Enable NVCM interface by sending knock command""" if self.debug: print("enable") - # ! Shift in Access-NVCM instruction; - # SMCInstruction[1] = 0x70807E99557E; self.enable_access() # ! Setup Reading Parameter in Trim Registers; @@ -253,76 +287,102 @@ class Nvcm(): self.write_trim("00000000c4000000") def enable_trim(self) -> None: + """Enable NVCM write commands""" # ! Setup Programming Parameter in Trim Registers; # ! Shift in Trim setup-NVCM instruction; # TRIMInstruction[1] = 0x000000430F4FA80004000041; self.write_trim("0015f2f0c2000000") - def trim_blank_check(self) -> bool: + def trim_blank_check(self) -> None: + """Check that the NVCM trim parameters are blank""" + print("NVCM Trim_Parameter_OTP blank check") - self.select_trim() + self.bank_select('trim') - x = self.read(0x000020, 1) - self.select_nvcm() + ret = self.read_bytes(0x03, 0x000020, 1)[0] + self.bank_select('nvcm') - if x != 0: - print( - "NVCM Trim_Parameter_OTP Block is not blank. (%02x)" % - x) - return False - return True + if ret != 0x00: + raise ValueError( + 'NVCM Trim_Parameter_OTP Block not blank. ' + + f'(read: 0x{ret:%02x})') - def blank_check(self, total_fuse: int) -> bool: - self.select_nvcm() + def blank_check(self, total_fuse: int) -> None: + """Check if sub-section of the NVCM memory is blank + + To check all of the memory, first determine how much NVCM + memory your part actually has, or at least the size of the + file that you plan to write to it. + + Keyword arguments: + total_fuse -- Number of fuse bytes to read before stopping + """ + self.bank_select('nvcm') status = True print("NVCM main memory blank check") - contents = self.read_bytes(0x000000, total_fuse) + contents = self.read_bytes(0x03, 0x000000, total_fuse) - for i in range(0, total_fuse): - x = contents[i] + for index in range(0, total_fuse): + val = contents[index] if self.debug: - print("%08x: %02x" % (i, x)) - if x != 0: + print(f"{index:08x}: {val:02x}") + if val != 0: print( - "%08x: NVCM Main Memory Block is not blank." % - (i), file=sys.stderr) + f"{index:08x}: NVCM Memory Block is not blank.", + file=sys.stderr) status = False - self.select_nvcm() - return status + self.bank_select('nvcm') + if not status: + raise ValueError("NVCM Main Memory not blank") def program(self, rows: list[str]) -> None: + """Program the memory by running an NVCM command sequence + + Keyword arguments: + rows -- List of NVCM commands to run, formatted as hex + strings + """ print("NVCM Program main memory") - self.select_nvcm() + self.bank_select('nvcm') self.enable_trim() self.pgm_enable() - status = True - i = 0 for row in rows: # print('data for row:',i, row) - if i % 1024 == 0: + if i % (1024 * 8) == 0: print("%6d / %6d bytes" % (i, len(rows) * 8)) i += 8 try: self.command(row) - except Exception as e: - raise Exception("programming failed, row:", row) + except Exception as exc: + raise Exception( + "programming failed, row:{row}" + ) from exc self.pgm_disable() def write_trim_pages(self, lock_bits: str) -> None: - self.select_nvcm() + """Write to the trim pages + + The trim pages can be written multiple times. Known usages + are to configure the device for NVCM boot, and to secure + the device by disabling the NVCM interface. + + Keyword arguments: + lock_bits -- Mas of bits to set in the trim pages + """ + self.bank_select('nvcm') self.enable_trim() - self.select_trim() + self.bank_select('trim') self.pgm_enable() @@ -335,28 +395,33 @@ class Nvcm(): # SDR 96 TDI(0x000000008000000C05000040); # ! Program Security Bit row 4; # SDR 96 TDI(0x00000000800000C07000040); - self.write(0x000020, lock_bits) - self.write(0x000060, lock_bits) - self.write(0x0000a0, lock_bits) - self.write(0x0000e0, lock_bits) + self.write(0x02, 0x000020, lock_bits) + self.write(0x02, 0x000060, lock_bits) + self.write(0x02, 0x0000a0, lock_bits) + self.write(0x02, 0x0000e0, lock_bits) self.pgm_disable() # verify a read back - x = self.read(0x000020, 8) + val = self.read_int(0x03, 0x000020) - self.select_nvcm() + self.bank_select('nvcm') lock_bits_int = int(lock_bits, 16) - if x & lock_bits_int != lock_bits_int: - raise Exception( + if val & lock_bits_int != lock_bits_int: + raise ValueError( "Failed to write trim lock bits: " + - "%016x != expected %016x" % - (x, lock_bits_int)) + f"{val:016x} != expected {lock_bits_int:016x}" + ) - print("New state %016x" % (x)) + print(f"New state {val:016x}") def trim_secure(self) -> None: + """Disable NVCM readout by programming the security bits + + Use with caution- the device will no longer respond to NVCM + commands after this command runs. + """ print("NVCM Secure") trim = self.read_trim() if (trim >> 60) & 0x3 != 0: @@ -367,38 +432,43 @@ class Nvcm(): self.write_trim_pages("3000000100000000") def trim_program(self) -> None: + """Configure the device to boot from NVCM (?) + + Use with caution- the device will no longer boot from + external SPI flash after this command runs. + """ print("NVCM Program Trim_Parameter_OTP") self.write_trim_pages("0015f2f1c4000000") def info(self) -> None: """ Print the contents of the configuration registers """ - self.select_sig() - sig1 = self.read(0x000000, 8) + self.bank_select('sig') + sig1 = self.read_int(0x03, 0x000000) - self.select_sig() - sig2 = self.read(0x000008, 8) + self.bank_select('sig') + sig2 = self.read_int(0x03, 0x000008) # have to switch back to nvcm bank before switching to trim? - self.select_nvcm() + self.bank_select('nvcm') trim = self.read_trim() -# self.select_nvcm() +# self.bank_select('nvcm') - self.select_trim() - trim0 = self.read(0x000020, 8) + self.bank_select('trim') + trim0 = self.read_int(0x03, 0x000020) - self.select_trim() - trim1 = self.read(0x000060, 8) + self.bank_select('trim') + trim1 = self.read_int(0x03, 0x000060) - self.select_trim() - trim2 = self.read(0x0000a0, 8) + self.bank_select('trim') + trim2 = self.read_int(0x03, 0x0000a0) - self.select_trim() - trim3 = self.read(0x0000e0, 8) + self.bank_select('trim') + trim3 = self.read_int(0x03, 0x0000e0) - self.select_nvcm() + self.bank_select('nvcm') - secured = ((trim >> 60) & 0x3) + secured = (trim >> 60) & 0x3 device_id = (sig1 >> 56) & 0xFF print("Device: %s (%02x) secure=%d" % ( @@ -422,17 +492,17 @@ class Nvcm(): length: Length of data to read """ - self.select_nvcm() + self.bank_select('nvcm') contents = bytearray() for offset in range(0, length, 8): - if offset % (1024*8) == 0: + if offset % (1024 * 8) == 0: print("%6d / %6d bytes" % (offset, length)) nvcm_addr = int(offset / 328) * 4096 + (offset % 328) - contents += self.read_bytes(nvcm_addr, 8) - self.tck(8) + contents += self.read_bytes(0x03, nvcm_addr, 8) + self.delay(2) return bytes(contents) @@ -441,6 +511,7 @@ class Nvcm(): Keyword arguments: filename -- File to write to, or '-' to write to stdout + length -- Number of bytes to read from NVCM """ contents = bytearray() @@ -454,13 +525,13 @@ class Nvcm(): if filename == '-': with os.fdopen(sys.stdout.fileno(), "wb", - closefd=False) as f: - f.write(contents) - f.flush() + closefd=False) as out_file: + out_file.write(contents) + out_file.flush() else: - with open(filename, "wb") as f: - f.write(contents) - f.flush() + with open(filename, "wb") as out_file: + out_file.write(contents) + out_file.flush() def verify(self, filename: str) -> None: """ Verify that the contents of the NVCM match a file @@ -468,10 +539,10 @@ class Nvcm(): Keyword arguments: filename -- File to compare """ - with open(filename, "rb") as f: - compare = f.read() + with open(filename, "rb") as verify_file: + compare = verify_file.read() - assert (len(compare) > 0) + assert len(compare) > 0 contents = bytearray() contents += bytes([0xff, 0x00, 0x00, 0xff]) @@ -482,13 +553,17 @@ class Nvcm(): if len(contents) > len(compare): contents = contents[:len(compare)] - assert (compare == contents) + assert compare == contents print('Verification complete, NVCM contents match file') def sleep_flash(pins: dict) -> None: - """ Put the SPI bootloader flash in deep sleep mode""" - flasher = usb_test.ice40_flasher() + """ Put the SPI bootloader flash in deep sleep mode + + Keyword arguments: + pins -- Dictionary of pins to use for SPI interface + """ + flasher = IceFlasher() # Disable board power flasher.gpio_put(pins['5v_en'], False) @@ -521,8 +596,8 @@ def sleep_flash(pins: dict) -> None: data = flasher.spi_rxtx(bytes([0x9f, 0, 0])) print('flash ID while awake:', ' '.join( - ['{:02x}'.format(b) for b in data])) - assert (data == bytes([0xff, 0xef, 0x40])) + [f'{b:02x}' for b in data])) + assert data == bytes([0xff, 0xef, 0x40]) # Test that the flash will ignore a sleep command that doesn't # start on the first byte @@ -532,8 +607,8 @@ def sleep_flash(pins: dict) -> None: data = flasher.spi_rxtx(bytes([0x9f, 0, 0])) print('flash ID while awake:', ' '.join( - ['{:02x}'.format(b) for b in data])) - assert (data == bytes([0xff, 0xef, 0x40])) + [f'{b:02x}' for b in data])) + assert data == bytes([0xff, 0xef, 0x40]) # put the flash to sleep flasher.spi_write(bytes([0xb9])) @@ -542,8 +617,8 @@ def sleep_flash(pins: dict) -> None: data = flasher.spi_rxtx(bytes([0x9f, 0, 0])) print('flash ID while asleep:', ' '.join( - ['{:02x}'.format(b) for b in data])) - assert (data == bytes([0xff, 0xff, 0xff])) + [f'{b:02x}' for b in data])) + assert data == bytes([0xff, 0xff, 0xff]) if __name__ == "__main__": @@ -552,11 +627,6 @@ if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument('--port', - type=str, - default='ftdi://::/1', - help='FTDI port of the form ftdi://::/1') - parser.add_argument( '-v', '--verbose', @@ -635,7 +705,7 @@ if __name__ == "__main__": print( "Are you sure your design is good enough?", file=sys.stderr) - exit(1) + sys.exit(1) tp1_pins = { '5v_en': 7, @@ -650,7 +720,10 @@ if __name__ == "__main__": if args.sleep_flash: sleep_flash(tp1_pins) - nvcm = Nvcm(tp1_pins, spi_speed=args.spi_speed, debug=args.verbose) + nvcm = Nvcm( + tp1_pins, + spi_speed=args.spi_speed, + debug=args.verbose) nvcm.power_on() # # Turn on ICE40 in CRAM boot mode @@ -661,17 +734,15 @@ if __name__ == "__main__": nvcm.info() if args.write_file: - with open(args.write_file, "rb") as f: - bitstream = f.read() - print("read %d bytes" % (len(bitstream))) + with open(args.write_file, "rb") as in_file: + bitstream = in_file.read() + print(f"read {len(bitstream)} bytes") cmds = icebin2nvcm(bitstream) - if not cmds: - exit(1) if not args.ignore_blank: - nvcm.trim_blank_check() or exit(1) + nvcm.trim_blank_check() # how much should we check? - nvcm.blank_check(0x100) or exit(1) + nvcm.blank_check(0x100) # this is it! nvcm.program(cmds) diff --git a/hw/production_test/production_test.py b/hw/production_test/production_test.py index 04a21df..80dd540 100755 --- a/hw/production_test/production_test.py +++ b/hw/production_test/production_test.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -import usb_test +from usb_test import IceFlasher import time import numpy from subprocess import run @@ -25,7 +25,7 @@ file_locations = { def enable_power(): """Enable power to the TK-1""" - d = usb_test.ice40_flasher() + d = IceFlasher() d.gpio_set_direction(7, True) d.gpio_put(7, True) d.close() @@ -36,7 +36,7 @@ def enable_power(): def disable_power(): """Disable power to the TK-1""" time.sleep(.1) - d = usb_test.ice40_flasher() + d = IceFlasher() d.gpio_set_direction(7, True) d.gpio_put(7, False) d.close() @@ -55,7 +55,7 @@ def voltage_test(): """Measure 3.3V 2.5V, and 1.2V voltage rails on the TK-1""" enable_power() - d = usb_test.ice40_flasher() + d = IceFlasher() vals = measure_voltages(d,20) d.close() @@ -128,7 +128,7 @@ def test_extra_io(): enable_power() time.sleep(.1) - d = usb_test.ice40_flasher() + d = IceFlasher() d.gpio_put(16, False) d.gpio_set_direction(16, True) diff --git a/hw/production_test/requirements.txt b/hw/production_test/requirements.txt index c976cb5..620349e 100644 --- a/hw/production_test/requirements.txt +++ b/hw/production_test/requirements.txt @@ -1,3 +1,7 @@ +libusb1==3.0.0 +pyusb==1.2.1 numpy==1.23.4 pyserial==3.5 -pyusb==1.2.1 +autopep8==2.0.1 +mypy==1.0.1 +pylint==2.16.3 diff --git a/hw/production_test/reset.py b/hw/production_test/reset.py index 6e7eeac..179556d 100755 --- a/hw/production_test/reset.py +++ b/hw/production_test/reset.py @@ -1,6 +1,7 @@ #!/usr/bin/env python -import hid_test # type: ignore -import time +"""Automatically reset a TK-1""" + +from usb_test import IceFlasher def reset_tk1() -> None: @@ -10,11 +11,10 @@ def reset_tk1() -> None: to the TK1. The result is that TK1 again will be in firmware mode, so a new app can be loaded. """ - d = hid_test.ice40_flasher() - d.gpio_set_direction(14, True) - d.gpio_put(14, False) - d.gpio_set_direction(14, False) - d.close() + flasher = IceFlasher() + flasher.gpio_set_direction(14, True) + flasher.gpio_put(14, False) + flasher.gpio_set_direction(14, False) reset_tk1() diff --git a/hw/production_test/usb_test.py b/hw/production_test/usb_test.py index c6e48ac..a69c5e4 100755 --- a/hw/production_test/usb_test.py +++ b/hw/production_test/usb_test.py @@ -1,16 +1,12 @@ #!/usr/bin/env python -from time import sleep - -# pyusb -import usb.core # type: ignore -import usb.util # type: ignore - -# libusb -import usb1 # type: ignore +"""IceFlasher, an iCE40 programming tool based on an RPi Pico""" import struct from typing import List, Any +import usb1 # type: ignore + + # def processReceivedData(transfer): # # print('got rx data', # transfer.getStatus(), @@ -28,8 +24,10 @@ from typing import List, Any # transfer.submit() -class ice40_flasher: - FLASHER_REQUEST_LED_SET = 0x00, +class IceFlasher: + """ iCE40 programming tool based on an RPi Pico """ + + FLASHER_REQUEST_LED_SET = 0x00 FLASHER_REQUEST_PIN_DIRECTION_SET = 0x10 FLASHER_REQUEST_PULLUPS_SET = 0x12 FLASHER_REQUEST_PIN_VALUES_SET = 0x20 @@ -41,109 +39,70 @@ class ice40_flasher: FLASHER_REQUEST_ADC_READ = 0x50 FLASHER_REQUEST_BOOTLOADER = 0xFF - SPI_MAX_TRANSFER_SIZE = (2048 - 8) + SPI_MAX_TRANSFER_SIZE = 2048 - 8 def __init__(self) -> None: - self.backend = 'libusb' + self.transfer_list: List[Any] = [] - if self.backend == 'pyusb': - # See: - # https://github.com/pyusb/pyusb/blob/master/docs/tutorial.rst - self.dev = usb.core.find( - idVendor=0xcafe, idProduct=0x4010) + # See: https://github.com/vpelletier/python-libusb1#usage + self.context = usb1.USBContext() + self.handle = self.context.openByVendorIDAndProductID( + 0xcafe, + 0x4010, + skip_on_error=True, + ) - if self.dev is None: - raise ValueError('Device not found') - - self.dev.set_configuration() - - elif self.backend == 'libusb': - # See: https://github.com/vpelletier/python-libusb1#usage - self.context = usb1.USBContext() - self.handle = self.context.openByVendorIDAndProductID( - 0xcafe, - 0x4010, - skip_on_error=True, - ) - - if self.handle is None: - # Device not present, or user is not allowed to access - # device. - raise ValueError('Device not found') - self.handle.claimInterface(0) - - self.transfer_list: List[Any] = [] + if self.handle is None: + # Device not present, or user is not allowed to access + # device. + raise ValueError('Device not found') + self.handle.claimInterface(0) def _wait_async(self) -> None: - if self.backend == 'libusb': - while any(transfer.isSubmitted() - for transfer in self.transfer_list): - try: - self.context.handleEvents() - except usb1.USBErrorInterrupted: - pass + while any(transfer.isSubmitted() + for transfer in self.transfer_list): + try: + self.context.handleEvents() + except usb1.USBErrorInterrupted: + pass - for transfer in reversed(self.transfer_list): - if transfer.getStatus() == \ - usb1.TRANSFER_COMPLETED: - self.transfer_list.remove(transfer) - else: - print( - transfer.getStatus(), - usb1.TRANSFER_COMPLETED) + for transfer in reversed(self.transfer_list): + if transfer.getStatus() == \ + usb1.TRANSFER_COMPLETED: + self.transfer_list.remove(transfer) + else: + print( + transfer.getStatus(), + usb1.TRANSFER_COMPLETED) - def _write(self, request_id: int, data: bytes, - nonblocking: bool = False) -> None: - if self.backend == 'pyusb': - self.dev.ctrl_transfer(0x40, request_id, 0, 0, data) - - elif self.backend == 'libusb': - if nonblocking: - transfer = self.handle.getTransfer() - transfer.setControl( - # usb1.ENDPOINT_OUT | usb1.TYPE_VENDOR | - # usb1.RECIPIENT_DEVICE, #request type - 0x40, - request_id, # request - 0, # index - 0, - data, # data - callback=None, # callback functiopn - user_data=None, # userdata - timeout=1000 - ) - transfer.submit() - self.transfer_list.append(transfer) - - else: - self.handle.controlWrite(0x40, request_id, 0, 0, data) + def _write(self, request_id: int, data: bytes) -> None: + transfer = self.handle.getTransfer() + transfer.setControl( + # usb1.ENDPOINT_OUT | usb1.TYPE_VENDOR | + # usb1.RECIPIENT_DEVICE, #request type + 0x40, + request_id, # request + 0, # index + 0, + data, # data + callback=None, # callback functiopn + user_data=None, # userdata + timeout=1000 + ) + transfer.submit() + self.transfer_list.append(transfer) def _read(self, request_id: int, length: int) -> bytes: - if self.backend == 'pyusb': - # ctrl_transfer(self, bmRequestType, bRequest, wValue=0, - # wIndex=0, data_or_wLength = None, timeout = None): - # Request type: - # bit 7: direction 0:host to device (OUT), - # 1: device to host (IN) - # bits 5-6: type: 0:standard 1:class 2:vendor 3:reserved - # bits 0-4: recipient: 0:device 1:interface 2:endpoint - # 3:other - ret = self.dev.ctrl_transfer( - 0xC0, request_id, 0, 0, length) - - elif self.backend == 'libusb': - self._wait_async() - ret = self.handle.controlRead( - 0xC0, request_id, 0, 0, length) - - return ret + self._wait_async() + return self.handle.controlRead( + 0xC0, request_id, 0, 0, length) def gpio_set_direction(self, pin: int, direction: bool) -> None: """Set the direction of a single GPIO pin Keyword arguments: pin -- GPIO pin number - direction -- True: Set pin as output, False: set pin as input + value -- True: Set pin as output, False: set pin as input """ msg = struct.pack('>II', (1 << pin), @@ -217,6 +176,7 @@ class ice40_flasher: cs_pin -- GPIO pin number to use as the CS signal mosi_pin -- GPIO pin number to use as the MOSI signal miso_pin -- GPIO pin number to use as the MISO signal + clock_speed -- SPI clock speed, in MHz """ header = struct.pack('>BBBBB', sck_pin, @@ -232,12 +192,12 @@ class ice40_flasher: def spi_write( self, buf: bytes, - toggle_cs: bool = True) -> bytes: + toggle_cs: bool = True) -> None: """Write data to the SPI port Keyword arguments: buf -- Byte buffer to send. - toggle_cs: (Optional) If true, toggle the CS line + toggle_cs -- (Optional) If true, toggle the CS line """ max_chunk_size = self.SPI_MAX_TRANSFER_SIZE for i in range(0, len(buf), max_chunk_size): @@ -255,7 +215,7 @@ class ice40_flasher: Keyword arguments: buf -- Byte buffer to send. - toggle_cs: (Optional) If true, toggle the CS line + toggle_cs -- (Optional) If true, toggle the CS line """ ret = bytearray() @@ -279,13 +239,13 @@ class ice40_flasher: Keyword arguments: buf -- Byte buffer to send. - toggle_cs: (Optional) If true, toggle the CS line + toggle_cs -- (Optional) If true, toggle the CS line """ if len(buf) > self.SPI_MAX_TRANSFER_SIZE: - raise Exception( - 'Message too large, size:{:} max:{:}'.format( - len(buf), self.SPI_MAX_TRANSFER_SIZE)) + raise ValueError( + 'Message too large, ' + + f'size:{len(buf)} max:{self.SPI_MAX_TRANSFER_SIZE}') header = struct.pack('>I', len(buf)) msg = bytearray() @@ -297,7 +257,7 @@ class ice40_flasher: else: cmd = self.FLASHER_REQUEST_SPI_BITBANG_NO_CS - self._write(cmd, msg, nonblocking=True) + self._write(cmd, msg) if not read_after_write: return bytes() @@ -309,14 +269,22 @@ class ice40_flasher: return msg_in def spi_clk_out(self, byte_count: int) -> None: + """Run the SPI clock without transferring data + + This function is useful for SPI devices that need a clock to + advance their state machines. + + Keyword arguments: + byte_count -- Number of bytes worth of clocks to send + """ + header = struct.pack('>I', byte_count) msg = bytearray() msg.extend(header) self._write( self.FLASHER_REQUEST_SPI_CLKOUT, - msg, - nonblocking=True) + msg) def adc_read_all(self) -> tuple[float, float, float]: """Read the voltage values of ADC 0, 1, and 2 @@ -337,13 +305,11 @@ class ice40_flasher: """ try: self._write(self.FLASHER_REQUEST_BOOTLOADER, bytes()) - except usb.core.USBError: - # We expect the device to disappear immediately, so mask - # the resulting error + except usb1.USBErrorIO: pass if __name__ == '__main__': - flasher = ice40_flasher() + flasher = IceFlasher() flasher.spi_pins_set(1, 2, 3, 4, 15)