Reworked queue and buffering

This commit is contained in:
Mark Qvist 2020-05-28 22:18:19 +02:00
parent 417f39d02a
commit 163c6b021f
4 changed files with 290 additions and 134 deletions

View file

@ -2,27 +2,49 @@
#include <SPI.h>
#include "Utilities.h"
FIFOBuffer serialFIFO;
uint8_t serialBuffer[CONFIG_UART_BUFFER_SIZE];
FIFOBuffer16 packet_starts;
size_t packet_starts_buf[CONFIG_QUEUE_MAX_LENGTH+1];
FIFOBuffer16 packet_lengths;
size_t packet_lengths_buf[CONFIG_QUEUE_MAX_LENGTH+1];
uint8_t packet_queue[CONFIG_QUEUE_SIZE];
volatile uint8_t queue_height = 0;
volatile size_t queued_bytes = 0;
volatile size_t queue_cursor = 0;
volatile size_t current_packet_start = 0;
void setup() {
// Seed the PRNG
randomSeed(analogRead(0));
// Initialise serial communication
memset(serialBuffer, 0, sizeof(serialBuffer));
fifo_init(&serialFIFO, serialBuffer, sizeof(serialBuffer));
Serial.begin(serial_baudrate);
while (!Serial);
serial_timer_init();
// Configure input and output pins
pinMode(pin_led_rx, OUTPUT);
pinMode(pin_led_tx, OUTPUT);
// Initialise buffers
memset(pbuf, 0, sizeof(pbuf));
memset(sbuf, 0, sizeof(sbuf));
memset(cbuf, 0, sizeof(cbuf));
#if QUEUE_SIZE > 0
memset(qbuf, 0, sizeof(qbuf));
memset(queued_lengths, 0, sizeof(queued_lengths));
#endif
memset(packet_queue, 0, sizeof(packet_queue));
memset(packet_starts_buf, 0, sizeof(packet_starts));
memset(packet_lengths_buf, 0, sizeof(packet_lengths));
fifo16_init(&packet_starts, packet_starts_buf, sizeof(packet_starts_buf));
fifo16_init(&packet_lengths, packet_lengths_buf, sizeof(packet_lengths_buf));
// Set chip select, reset and interrupt
// pins for the LoRa module
@ -188,67 +210,38 @@ void receiveCallback(int packet_size) {
}
}
bool outboundReady() {
#if QUEUE_SIZE > 0
if (queue_head != queue_tail) {
return true;
} else {
return false;
}
#else
return outbound_ready;
#endif
}
bool queueFull() {
size_t new_queue_head = (queue_head+1)%QUEUE_BUF_SIZE;
if (new_queue_head == queue_tail) {
return true;
} else {
return false;
}
return (queue_height < CONFIG_QUEUE_MAX_LENGTH && queued_bytes < CONFIG_QUEUE_SIZE);
}
void enqueuePacket(size_t length) {
size_t new_queue_head = (queue_head+1)%QUEUE_BUF_SIZE;
if (new_queue_head != queue_tail) {
queued_lengths[queue_head] = length;
size_t insert_addr = queue_head * MTU;
for (int i = 0; i < length; i++) {
qbuf[insert_addr+i] = sbuf[i];
volatile bool queue_flushing = false;
void flushQueue(void) {
if (!queue_flushing) {
queue_flushing = true;
size_t processed = 0;
for (size_t n = 0; n < queue_height; n++) {
size_t start = fifo16_pop_locked(&packet_starts);
size_t length = fifo16_pop_locked(&packet_lengths);
if (length >= MIN_L) {
for (size_t i = 0; i < length; i++) {
size_t pos = (start+i)%CONFIG_QUEUE_SIZE;
tbuf[i] = packet_queue[pos];
}
transmit(length);
processed++;
}
}
queue_head = new_queue_head;
if (!queueFull()) {
kiss_indicate_ready();
}
} else {
kiss_indicate_error(ERROR_QUEUE_FULL);
}
queue_height = 0;
queued_bytes = 0;
queue_flushing = false;
kiss_indicate_ready();
}
#if QUEUE_SIZE > 0
void processQueue() {
size_t fetch_address = queue_tail*MTU;
size_t fetch_length = queued_lengths[queue_tail];
for (int i = 0; i < fetch_length; i++) {
tbuf[i] = qbuf[fetch_address+i];
qbuf[fetch_address+i] = 0x00;
}
queued_lengths[queue_tail] = 0;
queue_tail = ++queue_tail%QUEUE_BUF_SIZE;
transmit(fetch_length);
if (!queueFull()) {
kiss_indicate_ready();
}
}
#endif
void transmit(size_t size) {
if (radio_online) {
if (!promisc) {
@ -264,11 +257,7 @@ void transmit(size_t size) {
LoRa.write(header); written++;
for (size_t i; i < size; i++) {
#if QUEUE_SIZE > 0
LoRa.write(tbuf[i]);
#else
LoRa.write(sbuf[i]);
#endif
LoRa.write(tbuf[i]);
written++;
@ -298,11 +287,7 @@ void transmit(size_t size) {
LoRa.beginPacket();
for (size_t i; i < size; i++) {
#if QUEUE_SIZE > 0
LoRa.write(tbuf[i]);
#else
LoRa.write(sbuf[i]);
#endif
LoRa.write(tbuf[i]);
written++;
}
@ -315,26 +300,35 @@ void transmit(size_t size) {
kiss_indicate_error(ERROR_TXFAILED);
led_indicate_error(5);
}
#if QUEUE_SIZE == 0
if (FLOW_CONTROL_ENABLED)
kiss_indicate_ready();
#endif
}
void serialCallback(uint8_t sbyte) {
if (IN_FRAME && sbyte == FEND && command == CMD_DATA) {
IN_FRAME = false;
if (QUEUE_SIZE == 0) {
if (outbound_ready) {
kiss_indicate_error(ERROR_QUEUE_FULL);
} else {
outbound_ready = true;
}
} else {
enqueuePacket(frame_len);
if (queue_height < CONFIG_QUEUE_MAX_LENGTH && queued_bytes < CONFIG_QUEUE_SIZE) {
size_t s = current_packet_start;
size_t e = queue_cursor-1; if (e == -1) e = CONFIG_QUEUE_SIZE-1;
size_t l;
if (s != e) {
l = (s < e) ? e - s + 1 : CONFIG_QUEUE_SIZE - s + e + 1;
} else {
l = 1;
}
if (l >= MIN_L) {
queue_height++;
fifo16_push_locked(&packet_starts, s);
fifo16_push_locked(&packet_lengths, l);
current_packet_start = queue_cursor;
}
}
if (!queueFull()) kiss_indicate_ready();
} else if (sbyte == FEND) {
IN_FRAME = true;
command = CMD_UNKNOWN;
@ -352,7 +346,11 @@ void serialCallback(uint8_t sbyte) {
if (sbyte == TFESC) sbyte = FESC;
ESCAPE = false;
}
sbuf[frame_len++] = sbyte;
if (queue_height < CONFIG_QUEUE_MAX_LENGTH && queued_bytes < CONFIG_QUEUE_SIZE) {
queued_bytes++;
packet_queue[queue_cursor++] = sbyte;
if (queue_cursor == CONFIG_QUEUE_SIZE) queue_cursor = 0;
}
}
} else if (command == CMD_FREQUENCY) {
if (sbyte == FESC) {
@ -566,19 +564,20 @@ void validateStatus() {
void loop() {
if (radio_online) {
checkModemStatus();
if (outboundReady() && !SERIAL_READING) {
if (queue_height > 0) {
if (!dcd_waiting) updateModemStatus();
if (!dcd && !dcd_led) {
if (dcd_waiting) delay(lora_rx_turnaround_ms);
updateModemStatus();
if (!dcd) {
dcd_waiting = false;
#if QUEUE_SIZE > 0
processQueue();
#else
outbound_ready = false;
transmit(frame_len);
#endif
flushQueue();
}
} else {
dcd_waiting = true;
@ -594,14 +593,36 @@ void loop() {
}
}
if (Serial.available()) {
SERIAL_READING = true;
char sbyte = Serial.read();
serial_poll();
}
void serial_poll() {
while (!fifo_isempty_locked(&serialFIFO)) {
char sbyte = fifo_pop_locked(&serialFIFO);
serialCallback(sbyte);
last_serial_read = millis();
} else {
if (SERIAL_READING && millis()-last_serial_read >= serial_read_timeout_ms) {
SERIAL_READING = false;
}
}
void buffer_serial() {
while (Serial.available()) {
char c = Serial.read();
if (!fifo_isfull_locked(&serialFIFO)) {
fifo_push_locked(&serialFIFO, c);
}
}
}
void serial_timer_init() {
TCCR3A = 0;
TCCR3B = _BV(CS10) |
_BV(WGM33)|
_BV(WGM32);
ICR3 = 23704; // Approximation of 16Mhz / 675
TIMSK3 = _BV(ICIE3);
}
ISR(TIMER3_CAPT_vect) {
buffer_serial();
}