Merge pull request #24 from markqvist/adaptive_compression

Adaptive Compression
This commit is contained in:
acehoss 2023-09-24 11:36:29 -07:00 committed by GitHub
commit ffccb2511b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 77 additions and 16 deletions

4
.gitignore vendored
View File

@ -5,4 +5,6 @@ testconfig/
/rnsh.egg-info/
/build/
/dist/
.pytest_cache/
.pytest_cache/
*__pycache__
/RNS

View File

@ -1,6 +1,6 @@
[tool.poetry]
name = "rnsh"
version = "0.1.1"
version = "0.1.2"
description = "Shell over Reticulum"
authors = ["acehoss <acehoss@acehoss.net>"]
license = "MIT"
@ -9,7 +9,7 @@ readme = "README.md"
[tool.poetry.dependencies]
python = "^3.7"
docopt = "^0.6.2"
rns = "^0.5.3"
rns = "^0.5.9"
# rns = { git = "https://github.com/acehoss/Reticulum.git", branch = "feature/channel" }
# rns = { path = "../Reticulum/", develop = true }
tomli = "^2.0.1"

View File

@ -49,6 +49,7 @@ import re
import contextlib
import rnsh.args
import pwd
import bz2
import rnsh.protocol as protocol
import rnsh.helpers as helpers
import rnsh.rnsh
@ -278,7 +279,6 @@ async def initiate(configdir: str, identitypath: str, verbosity: int, quietness:
nonlocal stdin_eof
try:
data = process.tty_read(sys.stdin.fileno())
log.debug(f"stdin {data}")
if data is not None:
data_buffer.extend(data)
except EOFError:
@ -362,11 +362,42 @@ async def initiate(configdir: str, identitypath: str, verbosity: int, quietness:
processed = False
if channel.is_ready_to_send():
stdin = data_buffer[:mdu]
data_buffer = data_buffer[mdu:]
def compress_adaptive(buf: bytes):
comp_tries = RNS.RawChannelWriter.COMPRESSION_TRIES
comp_try = 1
comp_success = False
chunk_len = len(buf)
if chunk_len > RNS.RawChannelWriter.MAX_CHUNK_LEN:
chunk_len = RNS.RawChannelWriter.MAX_CHUNK_LEN
chunk_segment = None
chunk_segment = None
while chunk_len > 32 and comp_try < comp_tries:
chunk_segment_length = int(chunk_len/comp_try)
compressed_chunk = bz2.compress(buf[:chunk_segment_length])
compressed_length = len(compressed_chunk)
if compressed_length < protocol.StreamDataMessage.MAX_DATA_LEN and compressed_length < chunk_segment_length:
comp_success = True
break
else:
comp_try += 1
if comp_success:
chunk = compressed_chunk
processed_length = chunk_segment_length
else:
chunk = bytes(buf[:protocol.StreamDataMessage.MAX_DATA_LEN])
processed_length = len(chunk)
return comp_success, processed_length, chunk
comp_success, processed_length, chunk = compress_adaptive(data_buffer)
stdin = chunk
data_buffer = data_buffer[processed_length:]
eof = not sent_eof and stdin_eof and len(stdin) == 0
if len(stdin) > 0 or eof:
channel.send(protocol.StreamDataMessage(protocol.StreamDataMessage.STREAM_ID_STDIN, stdin, eof))
channel.send(protocol.StreamDataMessage(protocol.StreamDataMessage.STREAM_ID_STDIN, stdin, eof, comp_success))
sent_eof = eof
processed = True

View File

@ -525,7 +525,6 @@ class CallbackSubprocess:
Write bytes to the stdin of the child process.
:param data: bytes to write
"""
self._log.debug(f"write({data})")
os.write(self._child_stdin, data)
def set_winsize(self, r: int, c: int, h: int, v: int):

View File

@ -12,6 +12,7 @@ from typing import TypeVar, Generic, Callable, List
from abc import abstractmethod, ABC
from multiprocessing import Manager
import os
import bz2
import RNS
import logging as __logging
@ -204,31 +205,59 @@ class ListenerSession:
await asyncio.sleep(0)
def pump(self) -> bool:
def compress_adaptive(buf: bytes):
comp_tries = RNS.RawChannelWriter.COMPRESSION_TRIES
comp_try = 1
comp_success = False
chunk_len = len(buf)
if chunk_len > RNS.RawChannelWriter.MAX_CHUNK_LEN:
chunk_len = RNS.RawChannelWriter.MAX_CHUNK_LEN
chunk_segment = None
chunk_segment = None
while chunk_len > 32 and comp_try < comp_tries:
chunk_segment_length = int(chunk_len/comp_try)
compressed_chunk = bz2.compress(buf[:chunk_segment_length])
compressed_length = len(compressed_chunk)
if compressed_length < protocol.StreamDataMessage.MAX_DATA_LEN and compressed_length < chunk_segment_length:
comp_success = True
break
else:
comp_try += 1
if comp_success:
chunk = compressed_chunk
processed_length = chunk_segment_length
else:
chunk = bytes(buf[:protocol.StreamDataMessage.MAX_DATA_LEN])
processed_length = len(chunk)
return comp_success, processed_length, chunk
try:
if self.state != LSState.LSSTATE_RUNNING:
return False
elif not self.channel.is_ready_to_send():
return False
elif len(self.stderr_buf) > 0:
mdu = protocol.StreamDataMessage.MAX_DATA_LEN
data = self.stderr_buf[:mdu]
self.stderr_buf = self.stderr_buf[mdu:]
comp_success, processed_length, data = compress_adaptive(self.stderr_buf)
self.stderr_buf = self.stderr_buf[processed_length:]
send_eof = self.process.stderr_eof and len(data) == 0 and not self.stderr_eof_sent
self.stderr_eof_sent = self.stderr_eof_sent or send_eof
msg = protocol.StreamDataMessage(protocol.StreamDataMessage.STREAM_ID_STDERR,
data, send_eof)
data, send_eof, comp_success)
self.send(msg)
if send_eof:
self.stderr_eof_sent = True
return True
elif len(self.stdout_buf) > 0:
mdu = protocol.StreamDataMessage.MAX_DATA_LEN
data = self.stdout_buf[:mdu]
self.stdout_buf = self.stdout_buf[mdu:]
comp_success, processed_length, data = compress_adaptive(self.stdout_buf)
self.stdout_buf = self.stdout_buf[processed_length:]
send_eof = self.process.stdout_eof and len(data) == 0 and not self.stdout_eof_sent
self.stdout_eof_sent = self.stdout_eof_sent or send_eof
msg = protocol.StreamDataMessage(protocol.StreamDataMessage.STREAM_ID_STDOUT,
data, send_eof)
data, send_eof, comp_success)
self.send(msg)
if send_eof:
self.stdout_eof_sent = True