network fixes, still a lot more to do for tcp

This commit is contained in:
John Smith 2022-01-01 13:38:39 -05:00
parent 0e0209a54b
commit c2c5e3c299
10 changed files with 30 additions and 20 deletions

View File

@ -1,4 +1,4 @@
#!/bin/bash #!/bin/bash
exec ./run_local_test.py 2 --config-file ./local-test.yml $1 exec ./run_local_test.py 2 --config-file ./local-test.yml $@

View File

@ -1,4 +1,4 @@
#!/bin/bash #!/bin/bash
exec ./run_local_test.py 20 --config-file ./local-test.yml $1 exec ./run_local_test.py 20 --config-file ./local-test.yml $@

View File

@ -1,4 +1,4 @@
#!/bin/bash #!/bin/bash
exec ./run_local_test.py 4 --config-file ./local-test.yml $1 exec ./run_local_test.py 4 --config-file ./local-test.yml $@

View File

@ -1,4 +1,4 @@
#!/bin/bash #!/bin/bash
exec ./run_local_test.py 8 --config-file ./local-test.yml $1 exec ./run_local_test.py 8 --config-file ./local-test.yml $@

View File

@ -81,7 +81,7 @@ pub fn new_shared_tcp_socket(local_address: SocketAddr) -> Result<socket2::Socke
let domain = Domain::for_address(local_address); let domain = Domain::for_address(local_address);
let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP)) let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))
.map_err(map_to_string) .map_err(map_to_string)
.map_err(logthru_net!())?; .map_err(logthru_net!("failed to create TCP socket"))?;
if let Err(e) = socket.set_linger(None) { if let Err(e) = socket.set_linger(None) {
log_net!(error "Couldn't set TCP linger: {}", e); log_net!(error "Couldn't set TCP linger: {}", e);
} }
@ -100,9 +100,11 @@ pub fn new_shared_tcp_socket(local_address: SocketAddr) -> Result<socket2::Socke
} }
let socket2_addr = socket2::SockAddr::from(local_address); let socket2_addr = socket2::SockAddr::from(local_address);
if let Err(e) = socket.bind(&socket2_addr) { socket
log_net!(error "failed to bind TCP socket: {}", e); .bind(&socket2_addr)
} .map_err(|e| format!("failed to bind TCP socket: {}", e))?;
log_net!("created shared tcp socket on {:?}", &local_address);
Ok(socket) Ok(socket)
} }

View File

@ -134,7 +134,7 @@ impl RawTcpProtocolHandler {
} }
} }
pub async fn on_accept_async( async fn on_accept_async(
self, self,
stream: AsyncPeekStream, stream: AsyncPeekStream,
socket_addr: SocketAddr, socket_addr: SocketAddr,
@ -181,7 +181,12 @@ impl RawTcpProtocolHandler {
socket socket
.connect(&remote_socket2_addr) .connect(&remote_socket2_addr)
.map_err(map_to_string) .map_err(map_to_string)
.map_err(logthru_net!(error "addr={}", remote_socket_addr))?; .map_err(logthru_net!(error "local_address={} remote_addr={}", local_address, remote_socket_addr))?;
log_net!(
"tcp connect successful: local_address={} remote_addr={}",
local_address,
remote_socket_addr
);
let std_stream: std::net::TcpStream = socket.into(); let std_stream: std::net::TcpStream = socket.into();
let ts = TcpStream::from(std_stream); let ts = TcpStream::from(std_stream);
@ -189,7 +194,7 @@ impl RawTcpProtocolHandler {
let local_address = ts let local_address = ts
.local_addr() .local_addr()
.map_err(map_to_string) .map_err(map_to_string)
.map_err(logthru_net!())?; .map_err(logthru_net!("could not get local address from TCP stream"))?;
let ps = AsyncPeekStream::new(ts); let ps = AsyncPeekStream::new(ts);
let peer_addr = PeerAddress::new( let peer_addr = PeerAddress::new(
SocketAddress::from_socket_addr(remote_socket_addr), SocketAddress::from_socket_addr(remote_socket_addr),
@ -225,7 +230,7 @@ impl RawTcpProtocolHandler {
let mut stream = TcpStream::connect(socket_addr) let mut stream = TcpStream::connect(socket_addr)
.await .await
.map_err(|e| format!("{}", e))?; .map_err(|e| format!("failed to connect TCP for unbound message: {}", e))?;
stream.write_all(&data).await.map_err(|e| format!("{}", e)) stream.write_all(&data).await.map_err(|e| format!("{}", e))
} }
} }

View File

@ -263,7 +263,7 @@ impl WebsocketProtocolHandler {
socket socket
.connect(&remote_socket2_addr) .connect(&remote_socket2_addr)
.map_err(map_to_string) .map_err(map_to_string)
.map_err(logthru_net!(error "addr={}", remote_socket_addr))?; .map_err(logthru_net!(error "local_address={} remote_socket_addr={}", local_address, remote_socket_addr))?;
let std_stream: std::net::TcpStream = socket.into(); let std_stream: std::net::TcpStream = socket.into();
let tcp_stream = TcpStream::from(std_stream); let tcp_stream = TcpStream::from(std_stream);

View File

@ -542,7 +542,7 @@ impl NetworkManager {
// Called by the RPC handler when we want to issue an direct receipt // Called by the RPC handler when we want to issue an direct receipt
pub async fn send_direct_receipt<B: AsRef<[u8]>>( pub async fn send_direct_receipt<B: AsRef<[u8]>>(
&self, &self,
dial_info: DialInfo, dial_info: &DialInfo,
rcpt_data: B, rcpt_data: B,
alternate_port: bool, alternate_port: bool,
) -> Result<(), String> { ) -> Result<(), String> {
@ -553,11 +553,11 @@ impl NetworkManager {
// Send receipt directly // Send receipt directly
if alternate_port { if alternate_port {
self.net() self.net()
.send_data_unbound_to_dial_info(&dial_info, rcpt_data.as_ref().to_vec()) .send_data_unbound_to_dial_info(dial_info, rcpt_data.as_ref().to_vec())
.await .await
} else { } else {
self.net() self.net()
.send_data_to_dial_info(&dial_info, rcpt_data.as_ref().to_vec()) .send_data_to_dial_info(dial_info, rcpt_data.as_ref().to_vec())
.await .await
} }
} }

View File

@ -945,9 +945,12 @@ impl RPCProcessor {
// Possibly from an alternate port // Possibly from an alternate port
let network_manager = self.network_manager(); let network_manager = self.network_manager();
network_manager network_manager
.send_direct_receipt(dial_info, rcpt_data, alternate_port) .send_direct_receipt(&dial_info, rcpt_data, alternate_port)
.await .await
.map_err(map_error_string!())?; .map_err(map_error_string!())
.map_err(
logthru_net!(error "failed to send direct receipt to dial info: {}, alternate_port={}", dial_info, alternate_port),
)?;
Ok(()) Ok(())
} }

View File

@ -15,7 +15,7 @@ pub enum BumpPortType {
pub fn tcp_port_available(addr: &SocketAddr) -> bool { pub fn tcp_port_available(addr: &SocketAddr) -> bool {
cfg_if! { cfg_if! {
if #[cfg(target_arch = "wasm32")] { if #[cfg(target_arch = "wasm32")] {
false true
} else { } else {
match TcpListener::bind(addr) { match TcpListener::bind(addr) {
Ok(_) => true, Ok(_) => true,
@ -28,7 +28,7 @@ pub fn tcp_port_available(addr: &SocketAddr) -> bool {
pub fn udp_port_available(addr: &SocketAddr) -> bool { pub fn udp_port_available(addr: &SocketAddr) -> bool {
cfg_if! { cfg_if! {
if #[cfg(target_arch = "wasm32")] { if #[cfg(target_arch = "wasm32")] {
false true
} else { } else {
match UdpSocket::bind(addr) { match UdpSocket::bind(addr) {
Ok(_) => true, Ok(_) => true,