From 6371ab68fb5cf471b3722690f4aedeb6e3cc0186 Mon Sep 17 00:00:00 2001 From: Matt Mets Date: Sun, 5 Mar 2023 23:31:45 +0100 Subject: [PATCH] USB speed improvements * Add write-only commands to avoid unneded reads * Add clock-only command to avoid excessive writes * Add partial async support to reduce inter-command delays --- hw/production_test/icenvcm.py | 71 ++++++---- hw/production_test/usb_test.py | 229 ++++++++++++++++++++++++--------- 2 files changed, 209 insertions(+), 91 deletions(-) diff --git a/hw/production_test/icenvcm.py b/hw/production_test/icenvcm.py index c763639..0de07b1 100755 --- a/hw/production_test/icenvcm.py +++ b/hw/production_test/icenvcm.py @@ -77,23 +77,34 @@ class Nvcm(): ) def power_on(self) -> None: + """Disable power to the DUT""" self.flasher.gpio_put(self.pins['5v_en'], True) def power_off(self) -> None: + """Enable power to the DUT""" self.flasher.gpio_put(self.pins['5v_en'], False) def enable(self, cs: bool, reset: bool = True) -> None: + """Set the CS and Reset pin states""" # gpio.write(cs << cs_pin | reset << reset_pin) self.flasher.gpio_put(self.pins['ss'], cs) self.flasher.gpio_put(self.pins['crst'], reset) - def sendhex_cs(self, s: str, toggle_cs: bool = True) -> bytes: + def writehex(self, s: str, toggle_cs: bool = True) -> None: if self.debug and not s == "0500": print("TX", s) x = bytes.fromhex(s) # b = dev.exchange(x, duplex=True, readlen=len(x)) - b = self.flasher.spi_bitbang(x, toggle_cs) + self.flasher.spi_write(x, toggle_cs) + + def sendhex(self, s: str, toggle_cs: bool = True) -> bytes: + if self.debug and not s == "0500": + print("TX", s) + x = bytes.fromhex(s) + + # b = dev.exchange(x, duplex=True, readlen=len(x)) + b = self.flasher.spi_rxtx(x, toggle_cs) if self.debug and not s == "0500": print("RX", b.hex()) @@ -102,7 +113,8 @@ class Nvcm(): def delay(self, count: int) -> None: # run the clock with no CS asserted # dev.exchange(b'\x00', duplex=True, readlen=count) - self.sendhex_cs('00' * count, False) + # self.sendhex('00' * count, False) + self.flasher.spi_clk_out(count) def tck(self, count: int) -> None: self.delay((count >> 3) * 2) @@ -110,21 +122,17 @@ class Nvcm(): def init(self) -> None: if self.debug: print("init") - self.enable(True, True) - self.enable(True, False) # reset high - sleep(0.15) - - self.enable(False, False) # enable and reset high - sleep(0.12) - self.enable(False, True) # enable low, reset high - sleep(0.12) - self.enable(True, True) # enable and reset low - sleep(0.12) + self.enable(cs=True, reset=True) + self.enable(cs=True, reset=False) + self.enable(cs=False, reset=False) + self.enable(cs=False, reset=True) + sleep(0.1) + self.enable(cs=True, reset=True) def status_wait(self, count: int = 1000) -> None: for i in range(0, count): self.tck(5000) - ret = self.sendhex_cs("0500") + ret = self.sendhex("0500") x = int.from_bytes(ret, byteorder='big') # print("x=%04x" %(x)) @@ -132,10 +140,11 @@ class Nvcm(): if (x & 0x00c1) == 0: return + print("x=%04x" % (x)) raise Exception("status failed to clear") def command(self, cmd: str) -> None: - self.sendhex_cs(cmd) + self.writehex(cmd) self.status_wait() self.tck(8) @@ -160,7 +169,7 @@ class Nvcm(): msg += ("%02x%06x" % (cmd, address)) msg += ("00" * 9) # dummy bytes msg += ("00" * length) # read - ret = self.sendhex_cs(msg) + ret = self.sendhex(msg) return ret[4 + 9:] @@ -178,7 +187,7 @@ class Nvcm(): return x def write(self, address: int, data: str, cmd: int = 0x02) -> None: - self.sendhex_cs("%02x%06x" % (cmd, address) + data) + self.writehex("%02x%06x" % (cmd, address) + data) try: self.status_wait() @@ -418,7 +427,7 @@ class Nvcm(): contents = bytearray() for offset in range(0, length, 8): - if offset % 1024 == 0: + if offset % (1024*8) == 0: print("%6d / %6d bytes" % (offset, length)) nvcm_addr = int(offset / 328) * 4096 + (offset % 328) @@ -474,6 +483,7 @@ class Nvcm(): contents = contents[:len(compare)] assert (compare == contents) + print('Verification complete, NVCM contents match file') def sleep_flash(pins: dict) -> None: @@ -505,10 +515,10 @@ def sleep_flash(pins: dict) -> None: 12 ) - flasher.spi_bitbang(bytes([0xAB])) + flasher.spi_write(bytes([0xAB])) # Confirm we can talk to flash - data = flasher.spi_bitbang(bytes([0x9f, 0, 0])) + data = flasher.spi_rxtx(bytes([0x9f, 0, 0])) print('flash ID while awake:', ' '.join( ['{:02x}'.format(b) for b in data])) @@ -516,20 +526,20 @@ def sleep_flash(pins: dict) -> None: # Test that the flash will ignore a sleep command that doesn't # start on the first byte - flasher.spi_bitbang(bytes([0, 0xb9])) + flasher.spi_write(bytes([0, 0xb9])) # Confirm we can talk to flash - data = flasher.spi_bitbang(bytes([0x9f, 0, 0])) + data = flasher.spi_rxtx(bytes([0x9f, 0, 0])) print('flash ID while awake:', ' '.join( ['{:02x}'.format(b) for b in data])) assert (data == bytes([0xff, 0xef, 0x40])) # put the flash to sleep - flasher.spi_bitbang(bytes([0xb9])) + flasher.spi_write(bytes([0xb9])) # Confirm flash is asleep - data = flasher.spi_bitbang(bytes([0x9f, 0, 0])) + data = flasher.spi_rxtx(bytes([0x9f, 0, 0])) print('flash ID while asleep:', ' '.join( ['{:02x}'.format(b) for b in data])) @@ -568,6 +578,13 @@ if __name__ == "__main__": action='store_true', help='Deassert the reset line to allow the FPGA to boot') + parser.add_argument( + '--speed', + dest='spi_speed', + type=int, + default=15, + help='SPI clock speed, in MHz') + parser.add_argument('-i', '--info', dest='read_info', action='store_true', @@ -633,7 +650,7 @@ if __name__ == "__main__": if args.sleep_flash: sleep_flash(tp1_pins) - nvcm = Nvcm(tp1_pins, debug=args.verbose) + nvcm = Nvcm(tp1_pins, spi_speed=args.spi_speed, debug=args.verbose) nvcm.power_on() # # Turn on ICE40 in CRAM boot mode @@ -675,6 +692,6 @@ if __name__ == "__main__": if args.do_boot: # hold reset low for half a second - nvcm.enable(True, False) + nvcm.enable(cs=True, reset=False) sleep(0.5) - nvcm.enable(True, True) + nvcm.enable(cs=True, reset=True) diff --git a/hw/production_test/usb_test.py b/hw/production_test/usb_test.py index 62cf64c..c6e48ac 100755 --- a/hw/production_test/usb_test.py +++ b/hw/production_test/usb_test.py @@ -1,7 +1,31 @@ #!/usr/bin/env python +from time import sleep + +# pyusb import usb.core # type: ignore import usb.util # type: ignore + +# libusb +import usb1 # type: ignore + import struct +from typing import List, Any + +# def processReceivedData(transfer): +# # print('got rx data', +# transfer.getStatus(), +# transfer.getActualLength()) +# +# if transfer.getStatus() != usb1.TRANSFER_COMPLETED: +# # Transfer did not complete successfully, there is no +# # data to read. This example does not resubmit transfers +# # on errors. You may want to resubmit in some cases (timeout, +# # ...). +# return +# data = transfer.getBuffer()[:transfer.getActualLength()] +# # Process data... +# # Resubmit transfer once data is processed. +# transfer.submit() class ice40_flasher: @@ -13,37 +37,105 @@ class ice40_flasher: FLASHER_REQUEST_SPI_BITBANG_CS = 0x41 FLASHER_REQUEST_SPI_BITBANG_NO_CS = 0x42 FLASHER_REQUEST_SPI_PINS_SET = 0x43 + FLASHER_REQUEST_SPI_CLKOUT = 0x44 FLASHER_REQUEST_ADC_READ = 0x50 FLASHER_REQUEST_BOOTLOADER = 0xFF + SPI_MAX_TRANSFER_SIZE = (2048 - 8) + def __init__(self) -> None: - # See: - # https://github.com/pyusb/pyusb/blob/master/docs/tutorial.rst - self.dev = usb.core.find(idVendor=0xcafe, idProduct=0x4010) + self.backend = 'libusb' - if self.dev is None: - raise ValueError('Device not found') + if self.backend == 'pyusb': + # See: + # https://github.com/pyusb/pyusb/blob/master/docs/tutorial.rst + self.dev = usb.core.find( + idVendor=0xcafe, idProduct=0x4010) - self.dev.set_configuration() + if self.dev is None: + raise ValueError('Device not found') - def _write(self, request_id: int, data: bytes) -> None: - self.dev.ctrl_transfer(0x40, request_id, 0, 0, data) + self.dev.set_configuration() - def _write_bulk(self, request_id: int, data: bytes) -> None: - msg = bytearray() - msg.append(request_id) - msg.extend(data) - self.dev.write(0x01, data) + elif self.backend == 'libusb': + # See: https://github.com/vpelletier/python-libusb1#usage + self.context = usb1.USBContext() + self.handle = self.context.openByVendorIDAndProductID( + 0xcafe, + 0x4010, + skip_on_error=True, + ) + + if self.handle is None: + # Device not present, or user is not allowed to access + # device. + raise ValueError('Device not found') + self.handle.claimInterface(0) + + self.transfer_list: List[Any] = [] + + def _wait_async(self) -> None: + if self.backend == 'libusb': + while any(transfer.isSubmitted() + for transfer in self.transfer_list): + try: + self.context.handleEvents() + except usb1.USBErrorInterrupted: + pass + + for transfer in reversed(self.transfer_list): + if transfer.getStatus() == \ + usb1.TRANSFER_COMPLETED: + self.transfer_list.remove(transfer) + else: + print( + transfer.getStatus(), + usb1.TRANSFER_COMPLETED) + + def _write(self, request_id: int, data: bytes, + nonblocking: bool = False) -> None: + if self.backend == 'pyusb': + self.dev.ctrl_transfer(0x40, request_id, 0, 0, data) + + elif self.backend == 'libusb': + if nonblocking: + transfer = self.handle.getTransfer() + transfer.setControl( + # usb1.ENDPOINT_OUT | usb1.TYPE_VENDOR | + # usb1.RECIPIENT_DEVICE, #request type + 0x40, + request_id, # request + 0, # index + 0, + data, # data + callback=None, # callback functiopn + user_data=None, # userdata + timeout=1000 + ) + transfer.submit() + self.transfer_list.append(transfer) + + else: + self.handle.controlWrite(0x40, request_id, 0, 0, data) def _read(self, request_id: int, length: int) -> bytes: - # ctrl_transfer(self, bmRequestType, bRequest, wValue=0, - # wIndex=0, data_or_wLength = None, timeout = None): - # Request type: - # bit 7: direction 0:host to device (OUT), - # 1: device to host (IN) - # bits 5-6: type: 0:standard 1:class 2:vendor 3:reserved - # bits 0-4: recipient: 0:device 1:interface 2:endpoint 3:other - ret = self.dev.ctrl_transfer(0xC0, request_id, 0, 0, length) + if self.backend == 'pyusb': + # ctrl_transfer(self, bmRequestType, bRequest, wValue=0, + # wIndex=0, data_or_wLength = None, timeout = None): + # Request type: + # bit 7: direction 0:host to device (OUT), + # 1: device to host (IN) + # bits 5-6: type: 0:standard 1:class 2:vendor 3:reserved + # bits 0-4: recipient: 0:device 1:interface 2:endpoint + # 3:other + ret = self.dev.ctrl_transfer( + 0xC0, request_id, 0, 0, length) + + elif self.backend == 'libusb': + self._wait_async() + ret = self.handle.controlRead( + 0xC0, request_id, 0, 0, length) + return ret def gpio_set_direction(self, pin: int, direction: bool) -> None: @@ -51,14 +143,13 @@ class ice40_flasher: Keyword arguments: pin -- GPIO pin number - value -- True: Set pin as output, False: set pin as input + direction -- True: Set pin as output, False: set pin as input """ msg = struct.pack('>II', (1 << pin), ((1 if direction else 0) << pin), ) -# self._write_bulk(self.FLASHER_REQUEST_PIN_DIRECTION_SET, msg) self._write(self.FLASHER_REQUEST_PIN_DIRECTION_SET, msg) def gpio_set_pulls( @@ -138,84 +229,94 @@ class ice40_flasher: self._write(self.FLASHER_REQUEST_SPI_PINS_SET, msg) - def spi_bitbang( + def spi_write( + self, + buf: bytes, + toggle_cs: bool = True) -> bytes: + """Write data to the SPI port + + Keyword arguments: + buf -- Byte buffer to send. + toggle_cs: (Optional) If true, toggle the CS line + """ + max_chunk_size = self.SPI_MAX_TRANSFER_SIZE + for i in range(0, len(buf), max_chunk_size): + chunk = buf[i:i + max_chunk_size] + self._spi_bitbang_inner( + buf=chunk, + toggle_cs=toggle_cs, + read_after_write=False) + + def spi_rxtx( self, buf: bytes, toggle_cs: bool = True) -> bytes: """Bitbang a SPI transfer Keyword arguments: - buf -- Byte buffer to send. If the bit_count is smaller than - the buffer size, some data will not be sent. + buf -- Byte buffer to send. toggle_cs: (Optional) If true, toggle the CS line """ ret = bytearray() - max_chunk_size = (2048 - 8) + max_chunk_size = self.SPI_MAX_TRANSFER_SIZE for i in range(0, len(buf), max_chunk_size): chunk = buf[i:i + max_chunk_size] ret.extend( - self.spi_bitbang_inner( + self._spi_bitbang_inner( buf=chunk, toggle_cs=toggle_cs)) return bytes(ret) - def spi_bitbang_inner( + def _spi_bitbang_inner( self, buf: bytes, - bit_count: int = -1, - toggle_cs: bool = True) -> bytes: + toggle_cs: bool = True, + read_after_write: bool = True) -> bytes: """Bitbang a SPI transfer using the specificed GPIO pins Keyword arguments: - buf -- Byte buffer to send. If the bit_count is smaller than - the buffer size, some data will not be sent. - bit_count -- (Optional) Number of bits (not bytes) to - bitbang. If left unspecificed, defaults to the size - of buf. + buf -- Byte buffer to send. toggle_cs: (Optional) If true, toggle the CS line """ - if bit_count == -1: - bit_count = len(buf) * 8 - byte_length = (bit_count + 7) // 8 + if len(buf) > self.SPI_MAX_TRANSFER_SIZE: + raise Exception( + 'Message too large, size:{:} max:{:}'.format( + len(buf), self.SPI_MAX_TRANSFER_SIZE)) - if byte_length > (2048 - 8): - print( - 'Message too large, bit_count:{:}'.format(bit_count)) - exit(1) - - if byte_length != len(buf): - print( - 'Size mismatch, bit_count:{:} len(buf):{:}'.format( - bit_count, len(buf) * 8)) - exit(1) - - header = struct.pack('>I', - bit_count) + header = struct.pack('>I', len(buf)) msg = bytearray() msg.extend(header) msg.extend(buf) if toggle_cs: - self._write(self.FLASHER_REQUEST_SPI_BITBANG_CS, msg) - msg_in = self._read( - self.FLASHER_REQUEST_SPI_BITBANG_CS, - byte_length) + cmd = self.FLASHER_REQUEST_SPI_BITBANG_CS else: - self._write(self.FLASHER_REQUEST_SPI_BITBANG_NO_CS, msg) - msg_in = self._read( - self.FLASHER_REQUEST_SPI_BITBANG_NO_CS, - byte_length) + cmd = self.FLASHER_REQUEST_SPI_BITBANG_NO_CS + + self._write(cmd, msg, nonblocking=True) + + if not read_after_write: + return bytes() + + msg_in = self._read( + self.FLASHER_REQUEST_SPI_BITBANG_CS, + len(buf)) return msg_in - def nvcm_command(self, cmd: bytes) -> None: - """NVCM fast path: Run a command on the NVCM memory, then - """ - pass + def spi_clk_out(self, byte_count: int) -> None: + header = struct.pack('>I', + byte_count) + msg = bytearray() + msg.extend(header) + self._write( + self.FLASHER_REQUEST_SPI_CLKOUT, + msg, + nonblocking=True) def adc_read_all(self) -> tuple[float, float, float]: """Read the voltage values of ADC 0, 1, and 2