mirror of
https://github.com/tillitis/tillitis-key1.git
synced 2024-10-01 01:45:38 -04:00
USB speed improvements
* Add write-only commands to avoid unneded reads * Add clock-only command to avoid excessive writes * Add partial async support to reduce inter-command delays
This commit is contained in:
parent
3fd4ec44b4
commit
6371ab68fb
@ -77,23 +77,34 @@ class Nvcm():
|
|||||||
)
|
)
|
||||||
|
|
||||||
def power_on(self) -> None:
|
def power_on(self) -> None:
|
||||||
|
"""Disable power to the DUT"""
|
||||||
self.flasher.gpio_put(self.pins['5v_en'], True)
|
self.flasher.gpio_put(self.pins['5v_en'], True)
|
||||||
|
|
||||||
def power_off(self) -> None:
|
def power_off(self) -> None:
|
||||||
|
"""Enable power to the DUT"""
|
||||||
self.flasher.gpio_put(self.pins['5v_en'], False)
|
self.flasher.gpio_put(self.pins['5v_en'], False)
|
||||||
|
|
||||||
def enable(self, cs: bool, reset: bool = True) -> None:
|
def enable(self, cs: bool, reset: bool = True) -> None:
|
||||||
|
"""Set the CS and Reset pin states"""
|
||||||
# gpio.write(cs << cs_pin | reset << reset_pin)
|
# gpio.write(cs << cs_pin | reset << reset_pin)
|
||||||
self.flasher.gpio_put(self.pins['ss'], cs)
|
self.flasher.gpio_put(self.pins['ss'], cs)
|
||||||
self.flasher.gpio_put(self.pins['crst'], reset)
|
self.flasher.gpio_put(self.pins['crst'], reset)
|
||||||
|
|
||||||
def sendhex_cs(self, s: str, toggle_cs: bool = True) -> bytes:
|
def writehex(self, s: str, toggle_cs: bool = True) -> None:
|
||||||
if self.debug and not s == "0500":
|
if self.debug and not s == "0500":
|
||||||
print("TX", s)
|
print("TX", s)
|
||||||
x = bytes.fromhex(s)
|
x = bytes.fromhex(s)
|
||||||
|
|
||||||
# b = dev.exchange(x, duplex=True, readlen=len(x))
|
# b = dev.exchange(x, duplex=True, readlen=len(x))
|
||||||
b = self.flasher.spi_bitbang(x, toggle_cs)
|
self.flasher.spi_write(x, toggle_cs)
|
||||||
|
|
||||||
|
def sendhex(self, s: str, toggle_cs: bool = True) -> bytes:
|
||||||
|
if self.debug and not s == "0500":
|
||||||
|
print("TX", s)
|
||||||
|
x = bytes.fromhex(s)
|
||||||
|
|
||||||
|
# b = dev.exchange(x, duplex=True, readlen=len(x))
|
||||||
|
b = self.flasher.spi_rxtx(x, toggle_cs)
|
||||||
|
|
||||||
if self.debug and not s == "0500":
|
if self.debug and not s == "0500":
|
||||||
print("RX", b.hex())
|
print("RX", b.hex())
|
||||||
@ -102,7 +113,8 @@ class Nvcm():
|
|||||||
def delay(self, count: int) -> None:
|
def delay(self, count: int) -> None:
|
||||||
# run the clock with no CS asserted
|
# run the clock with no CS asserted
|
||||||
# dev.exchange(b'\x00', duplex=True, readlen=count)
|
# dev.exchange(b'\x00', duplex=True, readlen=count)
|
||||||
self.sendhex_cs('00' * count, False)
|
# self.sendhex('00' * count, False)
|
||||||
|
self.flasher.spi_clk_out(count)
|
||||||
|
|
||||||
def tck(self, count: int) -> None:
|
def tck(self, count: int) -> None:
|
||||||
self.delay((count >> 3) * 2)
|
self.delay((count >> 3) * 2)
|
||||||
@ -110,21 +122,17 @@ class Nvcm():
|
|||||||
def init(self) -> None:
|
def init(self) -> None:
|
||||||
if self.debug:
|
if self.debug:
|
||||||
print("init")
|
print("init")
|
||||||
self.enable(True, True)
|
self.enable(cs=True, reset=True)
|
||||||
self.enable(True, False) # reset high
|
self.enable(cs=True, reset=False)
|
||||||
sleep(0.15)
|
self.enable(cs=False, reset=False)
|
||||||
|
self.enable(cs=False, reset=True)
|
||||||
self.enable(False, False) # enable and reset high
|
sleep(0.1)
|
||||||
sleep(0.12)
|
self.enable(cs=True, reset=True)
|
||||||
self.enable(False, True) # enable low, reset high
|
|
||||||
sleep(0.12)
|
|
||||||
self.enable(True, True) # enable and reset low
|
|
||||||
sleep(0.12)
|
|
||||||
|
|
||||||
def status_wait(self, count: int = 1000) -> None:
|
def status_wait(self, count: int = 1000) -> None:
|
||||||
for i in range(0, count):
|
for i in range(0, count):
|
||||||
self.tck(5000)
|
self.tck(5000)
|
||||||
ret = self.sendhex_cs("0500")
|
ret = self.sendhex("0500")
|
||||||
x = int.from_bytes(ret, byteorder='big')
|
x = int.from_bytes(ret, byteorder='big')
|
||||||
|
|
||||||
# print("x=%04x" %(x))
|
# print("x=%04x" %(x))
|
||||||
@ -132,10 +140,11 @@ class Nvcm():
|
|||||||
if (x & 0x00c1) == 0:
|
if (x & 0x00c1) == 0:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
print("x=%04x" % (x))
|
||||||
raise Exception("status failed to clear")
|
raise Exception("status failed to clear")
|
||||||
|
|
||||||
def command(self, cmd: str) -> None:
|
def command(self, cmd: str) -> None:
|
||||||
self.sendhex_cs(cmd)
|
self.writehex(cmd)
|
||||||
self.status_wait()
|
self.status_wait()
|
||||||
self.tck(8)
|
self.tck(8)
|
||||||
|
|
||||||
@ -160,7 +169,7 @@ class Nvcm():
|
|||||||
msg += ("%02x%06x" % (cmd, address))
|
msg += ("%02x%06x" % (cmd, address))
|
||||||
msg += ("00" * 9) # dummy bytes
|
msg += ("00" * 9) # dummy bytes
|
||||||
msg += ("00" * length) # read
|
msg += ("00" * length) # read
|
||||||
ret = self.sendhex_cs(msg)
|
ret = self.sendhex(msg)
|
||||||
|
|
||||||
return ret[4 + 9:]
|
return ret[4 + 9:]
|
||||||
|
|
||||||
@ -178,7 +187,7 @@ class Nvcm():
|
|||||||
return x
|
return x
|
||||||
|
|
||||||
def write(self, address: int, data: str, cmd: int = 0x02) -> None:
|
def write(self, address: int, data: str, cmd: int = 0x02) -> None:
|
||||||
self.sendhex_cs("%02x%06x" % (cmd, address) + data)
|
self.writehex("%02x%06x" % (cmd, address) + data)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.status_wait()
|
self.status_wait()
|
||||||
@ -418,7 +427,7 @@ class Nvcm():
|
|||||||
contents = bytearray()
|
contents = bytearray()
|
||||||
|
|
||||||
for offset in range(0, length, 8):
|
for offset in range(0, length, 8):
|
||||||
if offset % 1024 == 0:
|
if offset % (1024*8) == 0:
|
||||||
print("%6d / %6d bytes" % (offset, length))
|
print("%6d / %6d bytes" % (offset, length))
|
||||||
|
|
||||||
nvcm_addr = int(offset / 328) * 4096 + (offset % 328)
|
nvcm_addr = int(offset / 328) * 4096 + (offset % 328)
|
||||||
@ -474,6 +483,7 @@ class Nvcm():
|
|||||||
contents = contents[:len(compare)]
|
contents = contents[:len(compare)]
|
||||||
|
|
||||||
assert (compare == contents)
|
assert (compare == contents)
|
||||||
|
print('Verification complete, NVCM contents match file')
|
||||||
|
|
||||||
|
|
||||||
def sleep_flash(pins: dict) -> None:
|
def sleep_flash(pins: dict) -> None:
|
||||||
@ -505,10 +515,10 @@ def sleep_flash(pins: dict) -> None:
|
|||||||
12
|
12
|
||||||
)
|
)
|
||||||
|
|
||||||
flasher.spi_bitbang(bytes([0xAB]))
|
flasher.spi_write(bytes([0xAB]))
|
||||||
|
|
||||||
# Confirm we can talk to flash
|
# Confirm we can talk to flash
|
||||||
data = flasher.spi_bitbang(bytes([0x9f, 0, 0]))
|
data = flasher.spi_rxtx(bytes([0x9f, 0, 0]))
|
||||||
|
|
||||||
print('flash ID while awake:', ' '.join(
|
print('flash ID while awake:', ' '.join(
|
||||||
['{:02x}'.format(b) for b in data]))
|
['{:02x}'.format(b) for b in data]))
|
||||||
@ -516,20 +526,20 @@ def sleep_flash(pins: dict) -> None:
|
|||||||
|
|
||||||
# Test that the flash will ignore a sleep command that doesn't
|
# Test that the flash will ignore a sleep command that doesn't
|
||||||
# start on the first byte
|
# start on the first byte
|
||||||
flasher.spi_bitbang(bytes([0, 0xb9]))
|
flasher.spi_write(bytes([0, 0xb9]))
|
||||||
|
|
||||||
# Confirm we can talk to flash
|
# Confirm we can talk to flash
|
||||||
data = flasher.spi_bitbang(bytes([0x9f, 0, 0]))
|
data = flasher.spi_rxtx(bytes([0x9f, 0, 0]))
|
||||||
|
|
||||||
print('flash ID while awake:', ' '.join(
|
print('flash ID while awake:', ' '.join(
|
||||||
['{:02x}'.format(b) for b in data]))
|
['{:02x}'.format(b) for b in data]))
|
||||||
assert (data == bytes([0xff, 0xef, 0x40]))
|
assert (data == bytes([0xff, 0xef, 0x40]))
|
||||||
|
|
||||||
# put the flash to sleep
|
# put the flash to sleep
|
||||||
flasher.spi_bitbang(bytes([0xb9]))
|
flasher.spi_write(bytes([0xb9]))
|
||||||
|
|
||||||
# Confirm flash is asleep
|
# Confirm flash is asleep
|
||||||
data = flasher.spi_bitbang(bytes([0x9f, 0, 0]))
|
data = flasher.spi_rxtx(bytes([0x9f, 0, 0]))
|
||||||
|
|
||||||
print('flash ID while asleep:', ' '.join(
|
print('flash ID while asleep:', ' '.join(
|
||||||
['{:02x}'.format(b) for b in data]))
|
['{:02x}'.format(b) for b in data]))
|
||||||
@ -568,6 +578,13 @@ if __name__ == "__main__":
|
|||||||
action='store_true',
|
action='store_true',
|
||||||
help='Deassert the reset line to allow the FPGA to boot')
|
help='Deassert the reset line to allow the FPGA to boot')
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
'--speed',
|
||||||
|
dest='spi_speed',
|
||||||
|
type=int,
|
||||||
|
default=15,
|
||||||
|
help='SPI clock speed, in MHz')
|
||||||
|
|
||||||
parser.add_argument('-i', '--info',
|
parser.add_argument('-i', '--info',
|
||||||
dest='read_info',
|
dest='read_info',
|
||||||
action='store_true',
|
action='store_true',
|
||||||
@ -633,7 +650,7 @@ if __name__ == "__main__":
|
|||||||
if args.sleep_flash:
|
if args.sleep_flash:
|
||||||
sleep_flash(tp1_pins)
|
sleep_flash(tp1_pins)
|
||||||
|
|
||||||
nvcm = Nvcm(tp1_pins, debug=args.verbose)
|
nvcm = Nvcm(tp1_pins, spi_speed=args.spi_speed, debug=args.verbose)
|
||||||
nvcm.power_on()
|
nvcm.power_on()
|
||||||
|
|
||||||
# # Turn on ICE40 in CRAM boot mode
|
# # Turn on ICE40 in CRAM boot mode
|
||||||
@ -675,6 +692,6 @@ if __name__ == "__main__":
|
|||||||
|
|
||||||
if args.do_boot:
|
if args.do_boot:
|
||||||
# hold reset low for half a second
|
# hold reset low for half a second
|
||||||
nvcm.enable(True, False)
|
nvcm.enable(cs=True, reset=False)
|
||||||
sleep(0.5)
|
sleep(0.5)
|
||||||
nvcm.enable(True, True)
|
nvcm.enable(cs=True, reset=True)
|
||||||
|
@ -1,7 +1,31 @@
|
|||||||
#!/usr/bin/env python
|
#!/usr/bin/env python
|
||||||
|
from time import sleep
|
||||||
|
|
||||||
|
# pyusb
|
||||||
import usb.core # type: ignore
|
import usb.core # type: ignore
|
||||||
import usb.util # type: ignore
|
import usb.util # type: ignore
|
||||||
|
|
||||||
|
# libusb
|
||||||
|
import usb1 # type: ignore
|
||||||
|
|
||||||
import struct
|
import struct
|
||||||
|
from typing import List, Any
|
||||||
|
|
||||||
|
# def processReceivedData(transfer):
|
||||||
|
# # print('got rx data',
|
||||||
|
# transfer.getStatus(),
|
||||||
|
# transfer.getActualLength())
|
||||||
|
#
|
||||||
|
# if transfer.getStatus() != usb1.TRANSFER_COMPLETED:
|
||||||
|
# # Transfer did not complete successfully, there is no
|
||||||
|
# # data to read. This example does not resubmit transfers
|
||||||
|
# # on errors. You may want to resubmit in some cases (timeout,
|
||||||
|
# # ...).
|
||||||
|
# return
|
||||||
|
# data = transfer.getBuffer()[:transfer.getActualLength()]
|
||||||
|
# # Process data...
|
||||||
|
# # Resubmit transfer once data is processed.
|
||||||
|
# transfer.submit()
|
||||||
|
|
||||||
|
|
||||||
class ice40_flasher:
|
class ice40_flasher:
|
||||||
@ -13,37 +37,105 @@ class ice40_flasher:
|
|||||||
FLASHER_REQUEST_SPI_BITBANG_CS = 0x41
|
FLASHER_REQUEST_SPI_BITBANG_CS = 0x41
|
||||||
FLASHER_REQUEST_SPI_BITBANG_NO_CS = 0x42
|
FLASHER_REQUEST_SPI_BITBANG_NO_CS = 0x42
|
||||||
FLASHER_REQUEST_SPI_PINS_SET = 0x43
|
FLASHER_REQUEST_SPI_PINS_SET = 0x43
|
||||||
|
FLASHER_REQUEST_SPI_CLKOUT = 0x44
|
||||||
FLASHER_REQUEST_ADC_READ = 0x50
|
FLASHER_REQUEST_ADC_READ = 0x50
|
||||||
FLASHER_REQUEST_BOOTLOADER = 0xFF
|
FLASHER_REQUEST_BOOTLOADER = 0xFF
|
||||||
|
|
||||||
|
SPI_MAX_TRANSFER_SIZE = (2048 - 8)
|
||||||
|
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
# See:
|
self.backend = 'libusb'
|
||||||
# https://github.com/pyusb/pyusb/blob/master/docs/tutorial.rst
|
|
||||||
self.dev = usb.core.find(idVendor=0xcafe, idProduct=0x4010)
|
|
||||||
|
|
||||||
if self.dev is None:
|
if self.backend == 'pyusb':
|
||||||
raise ValueError('Device not found')
|
# See:
|
||||||
|
# https://github.com/pyusb/pyusb/blob/master/docs/tutorial.rst
|
||||||
|
self.dev = usb.core.find(
|
||||||
|
idVendor=0xcafe, idProduct=0x4010)
|
||||||
|
|
||||||
self.dev.set_configuration()
|
if self.dev is None:
|
||||||
|
raise ValueError('Device not found')
|
||||||
|
|
||||||
def _write(self, request_id: int, data: bytes) -> None:
|
self.dev.set_configuration()
|
||||||
self.dev.ctrl_transfer(0x40, request_id, 0, 0, data)
|
|
||||||
|
|
||||||
def _write_bulk(self, request_id: int, data: bytes) -> None:
|
elif self.backend == 'libusb':
|
||||||
msg = bytearray()
|
# See: https://github.com/vpelletier/python-libusb1#usage
|
||||||
msg.append(request_id)
|
self.context = usb1.USBContext()
|
||||||
msg.extend(data)
|
self.handle = self.context.openByVendorIDAndProductID(
|
||||||
self.dev.write(0x01, data)
|
0xcafe,
|
||||||
|
0x4010,
|
||||||
|
skip_on_error=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
if self.handle is None:
|
||||||
|
# Device not present, or user is not allowed to access
|
||||||
|
# device.
|
||||||
|
raise ValueError('Device not found')
|
||||||
|
self.handle.claimInterface(0)
|
||||||
|
|
||||||
|
self.transfer_list: List[Any] = []
|
||||||
|
|
||||||
|
def _wait_async(self) -> None:
|
||||||
|
if self.backend == 'libusb':
|
||||||
|
while any(transfer.isSubmitted()
|
||||||
|
for transfer in self.transfer_list):
|
||||||
|
try:
|
||||||
|
self.context.handleEvents()
|
||||||
|
except usb1.USBErrorInterrupted:
|
||||||
|
pass
|
||||||
|
|
||||||
|
for transfer in reversed(self.transfer_list):
|
||||||
|
if transfer.getStatus() == \
|
||||||
|
usb1.TRANSFER_COMPLETED:
|
||||||
|
self.transfer_list.remove(transfer)
|
||||||
|
else:
|
||||||
|
print(
|
||||||
|
transfer.getStatus(),
|
||||||
|
usb1.TRANSFER_COMPLETED)
|
||||||
|
|
||||||
|
def _write(self, request_id: int, data: bytes,
|
||||||
|
nonblocking: bool = False) -> None:
|
||||||
|
if self.backend == 'pyusb':
|
||||||
|
self.dev.ctrl_transfer(0x40, request_id, 0, 0, data)
|
||||||
|
|
||||||
|
elif self.backend == 'libusb':
|
||||||
|
if nonblocking:
|
||||||
|
transfer = self.handle.getTransfer()
|
||||||
|
transfer.setControl(
|
||||||
|
# usb1.ENDPOINT_OUT | usb1.TYPE_VENDOR |
|
||||||
|
# usb1.RECIPIENT_DEVICE, #request type
|
||||||
|
0x40,
|
||||||
|
request_id, # request
|
||||||
|
0, # index
|
||||||
|
0,
|
||||||
|
data, # data
|
||||||
|
callback=None, # callback functiopn
|
||||||
|
user_data=None, # userdata
|
||||||
|
timeout=1000
|
||||||
|
)
|
||||||
|
transfer.submit()
|
||||||
|
self.transfer_list.append(transfer)
|
||||||
|
|
||||||
|
else:
|
||||||
|
self.handle.controlWrite(0x40, request_id, 0, 0, data)
|
||||||
|
|
||||||
def _read(self, request_id: int, length: int) -> bytes:
|
def _read(self, request_id: int, length: int) -> bytes:
|
||||||
# ctrl_transfer(self, bmRequestType, bRequest, wValue=0,
|
if self.backend == 'pyusb':
|
||||||
# wIndex=0, data_or_wLength = None, timeout = None):
|
# ctrl_transfer(self, bmRequestType, bRequest, wValue=0,
|
||||||
# Request type:
|
# wIndex=0, data_or_wLength = None, timeout = None):
|
||||||
# bit 7: direction 0:host to device (OUT),
|
# Request type:
|
||||||
# 1: device to host (IN)
|
# bit 7: direction 0:host to device (OUT),
|
||||||
# bits 5-6: type: 0:standard 1:class 2:vendor 3:reserved
|
# 1: device to host (IN)
|
||||||
# bits 0-4: recipient: 0:device 1:interface 2:endpoint 3:other
|
# bits 5-6: type: 0:standard 1:class 2:vendor 3:reserved
|
||||||
ret = self.dev.ctrl_transfer(0xC0, request_id, 0, 0, length)
|
# bits 0-4: recipient: 0:device 1:interface 2:endpoint
|
||||||
|
# 3:other
|
||||||
|
ret = self.dev.ctrl_transfer(
|
||||||
|
0xC0, request_id, 0, 0, length)
|
||||||
|
|
||||||
|
elif self.backend == 'libusb':
|
||||||
|
self._wait_async()
|
||||||
|
ret = self.handle.controlRead(
|
||||||
|
0xC0, request_id, 0, 0, length)
|
||||||
|
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
def gpio_set_direction(self, pin: int, direction: bool) -> None:
|
def gpio_set_direction(self, pin: int, direction: bool) -> None:
|
||||||
@ -51,14 +143,13 @@ class ice40_flasher:
|
|||||||
|
|
||||||
Keyword arguments:
|
Keyword arguments:
|
||||||
pin -- GPIO pin number
|
pin -- GPIO pin number
|
||||||
value -- True: Set pin as output, False: set pin as input
|
direction -- True: Set pin as output, False: set pin as input
|
||||||
"""
|
"""
|
||||||
msg = struct.pack('>II',
|
msg = struct.pack('>II',
|
||||||
(1 << pin),
|
(1 << pin),
|
||||||
((1 if direction else 0) << pin),
|
((1 if direction else 0) << pin),
|
||||||
)
|
)
|
||||||
|
|
||||||
# self._write_bulk(self.FLASHER_REQUEST_PIN_DIRECTION_SET, msg)
|
|
||||||
self._write(self.FLASHER_REQUEST_PIN_DIRECTION_SET, msg)
|
self._write(self.FLASHER_REQUEST_PIN_DIRECTION_SET, msg)
|
||||||
|
|
||||||
def gpio_set_pulls(
|
def gpio_set_pulls(
|
||||||
@ -138,84 +229,94 @@ class ice40_flasher:
|
|||||||
|
|
||||||
self._write(self.FLASHER_REQUEST_SPI_PINS_SET, msg)
|
self._write(self.FLASHER_REQUEST_SPI_PINS_SET, msg)
|
||||||
|
|
||||||
def spi_bitbang(
|
def spi_write(
|
||||||
|
self,
|
||||||
|
buf: bytes,
|
||||||
|
toggle_cs: bool = True) -> bytes:
|
||||||
|
"""Write data to the SPI port
|
||||||
|
|
||||||
|
Keyword arguments:
|
||||||
|
buf -- Byte buffer to send.
|
||||||
|
toggle_cs: (Optional) If true, toggle the CS line
|
||||||
|
"""
|
||||||
|
max_chunk_size = self.SPI_MAX_TRANSFER_SIZE
|
||||||
|
for i in range(0, len(buf), max_chunk_size):
|
||||||
|
chunk = buf[i:i + max_chunk_size]
|
||||||
|
self._spi_bitbang_inner(
|
||||||
|
buf=chunk,
|
||||||
|
toggle_cs=toggle_cs,
|
||||||
|
read_after_write=False)
|
||||||
|
|
||||||
|
def spi_rxtx(
|
||||||
self,
|
self,
|
||||||
buf: bytes,
|
buf: bytes,
|
||||||
toggle_cs: bool = True) -> bytes:
|
toggle_cs: bool = True) -> bytes:
|
||||||
"""Bitbang a SPI transfer
|
"""Bitbang a SPI transfer
|
||||||
|
|
||||||
Keyword arguments:
|
Keyword arguments:
|
||||||
buf -- Byte buffer to send. If the bit_count is smaller than
|
buf -- Byte buffer to send.
|
||||||
the buffer size, some data will not be sent.
|
|
||||||
toggle_cs: (Optional) If true, toggle the CS line
|
toggle_cs: (Optional) If true, toggle the CS line
|
||||||
"""
|
"""
|
||||||
|
|
||||||
ret = bytearray()
|
ret = bytearray()
|
||||||
|
|
||||||
max_chunk_size = (2048 - 8)
|
max_chunk_size = self.SPI_MAX_TRANSFER_SIZE
|
||||||
for i in range(0, len(buf), max_chunk_size):
|
for i in range(0, len(buf), max_chunk_size):
|
||||||
chunk = buf[i:i + max_chunk_size]
|
chunk = buf[i:i + max_chunk_size]
|
||||||
ret.extend(
|
ret.extend(
|
||||||
self.spi_bitbang_inner(
|
self._spi_bitbang_inner(
|
||||||
buf=chunk,
|
buf=chunk,
|
||||||
toggle_cs=toggle_cs))
|
toggle_cs=toggle_cs))
|
||||||
|
|
||||||
return bytes(ret)
|
return bytes(ret)
|
||||||
|
|
||||||
def spi_bitbang_inner(
|
def _spi_bitbang_inner(
|
||||||
self,
|
self,
|
||||||
buf: bytes,
|
buf: bytes,
|
||||||
bit_count: int = -1,
|
toggle_cs: bool = True,
|
||||||
toggle_cs: bool = True) -> bytes:
|
read_after_write: bool = True) -> bytes:
|
||||||
"""Bitbang a SPI transfer using the specificed GPIO pins
|
"""Bitbang a SPI transfer using the specificed GPIO pins
|
||||||
|
|
||||||
Keyword arguments:
|
Keyword arguments:
|
||||||
buf -- Byte buffer to send. If the bit_count is smaller than
|
buf -- Byte buffer to send.
|
||||||
the buffer size, some data will not be sent.
|
|
||||||
bit_count -- (Optional) Number of bits (not bytes) to
|
|
||||||
bitbang. If left unspecificed, defaults to the size
|
|
||||||
of buf.
|
|
||||||
toggle_cs: (Optional) If true, toggle the CS line
|
toggle_cs: (Optional) If true, toggle the CS line
|
||||||
"""
|
"""
|
||||||
if bit_count == -1:
|
|
||||||
bit_count = len(buf) * 8
|
|
||||||
|
|
||||||
byte_length = (bit_count + 7) // 8
|
if len(buf) > self.SPI_MAX_TRANSFER_SIZE:
|
||||||
|
raise Exception(
|
||||||
|
'Message too large, size:{:} max:{:}'.format(
|
||||||
|
len(buf), self.SPI_MAX_TRANSFER_SIZE))
|
||||||
|
|
||||||
if byte_length > (2048 - 8):
|
header = struct.pack('>I', len(buf))
|
||||||
print(
|
|
||||||
'Message too large, bit_count:{:}'.format(bit_count))
|
|
||||||
exit(1)
|
|
||||||
|
|
||||||
if byte_length != len(buf):
|
|
||||||
print(
|
|
||||||
'Size mismatch, bit_count:{:} len(buf):{:}'.format(
|
|
||||||
bit_count, len(buf) * 8))
|
|
||||||
exit(1)
|
|
||||||
|
|
||||||
header = struct.pack('>I',
|
|
||||||
bit_count)
|
|
||||||
msg = bytearray()
|
msg = bytearray()
|
||||||
msg.extend(header)
|
msg.extend(header)
|
||||||
msg.extend(buf)
|
msg.extend(buf)
|
||||||
|
|
||||||
if toggle_cs:
|
if toggle_cs:
|
||||||
self._write(self.FLASHER_REQUEST_SPI_BITBANG_CS, msg)
|
cmd = self.FLASHER_REQUEST_SPI_BITBANG_CS
|
||||||
msg_in = self._read(
|
|
||||||
self.FLASHER_REQUEST_SPI_BITBANG_CS,
|
|
||||||
byte_length)
|
|
||||||
else:
|
else:
|
||||||
self._write(self.FLASHER_REQUEST_SPI_BITBANG_NO_CS, msg)
|
cmd = self.FLASHER_REQUEST_SPI_BITBANG_NO_CS
|
||||||
msg_in = self._read(
|
|
||||||
self.FLASHER_REQUEST_SPI_BITBANG_NO_CS,
|
self._write(cmd, msg, nonblocking=True)
|
||||||
byte_length)
|
|
||||||
|
if not read_after_write:
|
||||||
|
return bytes()
|
||||||
|
|
||||||
|
msg_in = self._read(
|
||||||
|
self.FLASHER_REQUEST_SPI_BITBANG_CS,
|
||||||
|
len(buf))
|
||||||
|
|
||||||
return msg_in
|
return msg_in
|
||||||
|
|
||||||
def nvcm_command(self, cmd: bytes) -> None:
|
def spi_clk_out(self, byte_count: int) -> None:
|
||||||
"""NVCM fast path: Run a command on the NVCM memory, then
|
header = struct.pack('>I',
|
||||||
"""
|
byte_count)
|
||||||
pass
|
msg = bytearray()
|
||||||
|
msg.extend(header)
|
||||||
|
self._write(
|
||||||
|
self.FLASHER_REQUEST_SPI_CLKOUT,
|
||||||
|
msg,
|
||||||
|
nonblocking=True)
|
||||||
|
|
||||||
def adc_read_all(self) -> tuple[float, float, float]:
|
def adc_read_all(self) -> tuple[float, float, float]:
|
||||||
"""Read the voltage values of ADC 0, 1, and 2
|
"""Read the voltage values of ADC 0, 1, and 2
|
||||||
|
Loading…
Reference in New Issue
Block a user