Refactor WIP

This commit is contained in:
Manfred Karrer 2015-12-22 11:29:59 +01:00
parent 0d751eb561
commit 873402d941
5 changed files with 27 additions and 36 deletions

View file

@ -186,7 +186,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
if (listener != null) if (listener != null)
addP2PServiceListener(listener); addP2PServiceListener(listener);
peerGroup.setSeedNodeAddresses(seedNodeAddresses);
networkNode.start(this); networkNode.start(this);
} }
@ -271,7 +270,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
private void authenticateSeedNode() { private void authenticateSeedNode() {
Log.traceCall(); Log.traceCall();
checkNotNull(connectedSeedNode != null, "connectedSeedNode must not be null"); checkNotNull(connectedSeedNode != null, "connectedSeedNode must not be null");
peerGroup.authenticateSeedNode(connectedSeedNode); peerGroup.authenticateSeedNode(connectedSeedNode, seedNodeAddresses);
} }

View file

@ -116,7 +116,7 @@ public class Connection implements MessageListener {
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Called form UserThread // Called form UserThread
public void setAuthenticated(Address peerAddress, Connection connection) { public void setAuthenticated(Address peerAddress) {
Log.traceCall(); Log.traceCall();
this.peerAddress = peerAddress; this.peerAddress = peerAddress;
isAuthenticated = true; isAuthenticated = true;

View file

@ -96,7 +96,8 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
ListenableFuture<Connection> future = executorService.submit(() -> { ListenableFuture<Connection> future = executorService.submit(() -> {
Thread.currentThread().setName("NetworkNode:SendMessage-to-" + peerAddress); Thread.currentThread().setName("NetworkNode:SendMessage-to-" + peerAddress);
try { try {
Socket socket = createSocket(peerAddress); // can take a while when using tor // can take a while when using tor
Socket socket = createSocket(peerAddress);
if (timeoutOccurred[0]) if (timeoutOccurred[0])
throw new TimeoutException("Timeout occurred when tried to create Socket to peer: " + peerAddress); throw new TimeoutException("Timeout occurred when tried to create Socket to peer: " + peerAddress);
@ -112,8 +113,9 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
+ "\nmessage=" + message + "\nmessage=" + message
+ "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n"); + "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n");
// can take a while when using tor
newConnection.sendMessage(message); newConnection.sendMessage(message);
return newConnection; // can take a while when using tor return newConnection;
} catch (Throwable throwable) { } catch (Throwable throwable) {
if (!(throwable instanceof ConnectException || throwable instanceof IOException || throwable instanceof TimeoutException)) { if (!(throwable instanceof ConnectException || throwable instanceof IOException || throwable instanceof TimeoutException)) {
throwable.printStackTrace(); throwable.printStackTrace();

View file

@ -5,7 +5,6 @@ import io.bitsquare.p2p.network.Connection;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Random; import java.util.Random;
public class Peer { public class Peer {
@ -21,15 +20,6 @@ public class Peer {
pingNonce = new Random().nextLong(); pingNonce = new Random().nextLong();
} }
private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
try {
in.defaultReadObject();
pingNonce = new Random().nextLong();
} catch (Throwable t) {
log.trace("Cannot be deserialized." + t.getMessage());
}
}
public long getPingNonce() { public long getPingNonce() {
return pingNonce; return pingNonce;
} }

View file

@ -53,7 +53,6 @@ public class PeerGroup implements MessageListener, ConnectionListener {
private Timer getPeersTimer; private Timer getPeersTimer;
private Set<Address> seedNodeAddresses; private Set<Address> seedNodeAddresses;
private boolean shutDownInProgress;
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -72,9 +71,6 @@ public class PeerGroup implements MessageListener, ConnectionListener {
startGetPeersTimer(); startGetPeersTimer();
} }
public void addAuthenticationListener(AuthenticationListener listener) {
authenticationListeners.add(listener);
}
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// ConnectionListener implementation // ConnectionListener implementation
@ -112,10 +108,6 @@ public class PeerGroup implements MessageListener, ConnectionListener {
// API // API
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public void setSeedNodeAddresses(Set<Address> seedNodeAddresses) {
this.seedNodeAddresses = seedNodeAddresses;
}
public void broadcast(DataBroadcastMessage message, @Nullable Address sender) { public void broadcast(DataBroadcastMessage message, @Nullable Address sender) {
Log.traceCall("Sender " + sender + ". Message " + message.toString()); Log.traceCall("Sender " + sender + ". Message " + message.toString());
if (authenticatedPeers.values().size() > 0) { if (authenticatedPeers.values().size() > 0) {
@ -140,18 +132,22 @@ public class PeerGroup implements MessageListener, ConnectionListener {
}); });
}); });
} else { } else {
log.trace("Message not broadcasted because we are not authenticated yet. " + log.trace("Message not broadcasted because we have no authenticated peers yet. " +
"That is expected at startup.\nmessage = {}", message); "That is expected at startup.\nmessage = {}", message);
} }
} }
public void shutDown() { public void shutDown() {
Log.traceCall(); Log.traceCall();
if (!shutDownInProgress) {
shutDownInProgress = true;
if (sendPingTimer != null) if (sendPingTimer != null)
sendPingTimer.cancel(); sendPingTimer.cancel();
if (getPeersTimer != null)
getPeersTimer.cancel();
} }
public void addAuthenticationListener(AuthenticationListener listener) {
authenticationListeners.add(listener);
} }
@ -203,10 +199,13 @@ public class PeerGroup implements MessageListener, ConnectionListener {
// Authentication to seed node // Authentication to seed node
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// After HS is published or after a retry from a successful GetDataRequest if no seed nodes have been available initially // Normal case: Called after HS is published
public void authenticateSeedNode(Address peerAddress) { // Special case: No seed nodes have been available initially. We retried until we succeed with a GetDataRequest.
// Then authenticateSeedNode gets called
public void authenticateSeedNode(Address seedNodeAddress, Set<Address> seedNodeAddresses) {
Log.traceCall(); Log.traceCall();
authenticateToSeedNode(new HashSet<>(seedNodeAddresses), peerAddress, true); this.seedNodeAddresses = seedNodeAddresses;
authenticateToSeedNode(new HashSet<>(seedNodeAddresses), seedNodeAddress, true);
} }
// First we try to connect to 1 seed node. // First we try to connect to 1 seed node.
@ -216,7 +215,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
// //
// After connection is authenticated, we try to connect to any reported peer as long we have not // After connection is authenticated, we try to connect to any reported peer as long we have not
// reached our max connection size. // reached our max connection size.
private void authenticateToSeedNode(Set<Address> remainingAddresses, Address peerAddress, boolean connectToReportedAfterSuccess) { private void authenticateToSeedNode(Set<Address> remainingAddresses, Address peerAddress, boolean connectToReportedPeersAfterSuccess) {
Log.traceCall(peerAddress.getFullAddress()); Log.traceCall(peerAddress.getFullAddress());
if (!authenticationHandshakes.containsKey(peerAddress)) { if (!authenticationHandshakes.containsKey(peerAddress)) {
AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress(), peerAddress); AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress(), peerAddress);
@ -226,7 +225,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
@Override @Override
public void onSuccess(Connection connection) { public void onSuccess(Connection connection) {
setAuthenticated(connection, peerAddress); setAuthenticated(connection, peerAddress);
if (connectToReportedAfterSuccess) { if (connectToReportedPeersAfterSuccess) {
if (getAuthenticatedPeers().size() < MAX_CONNECTIONS_LOW_PRIO) { if (getAuthenticatedPeers().size() < MAX_CONNECTIONS_LOW_PRIO) {
log.info("We still don't have enough connections. Lets try the reported peers."); log.info("We still don't have enough connections. Lets try the reported peers.");
authenticateToRemainingReportedPeers(true); authenticateToRemainingReportedPeers(true);
@ -441,7 +440,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
+ "\n############################################################\n"); + "\n############################################################\n");
addAuthenticatedPeer(new Peer(connection)); addAuthenticatedPeer(new Peer(connection));
connection.setAuthenticated(peerAddress, connection); connection.setAuthenticated(peerAddress);
authenticationListeners.stream().forEach(e -> e.onPeerAddressAuthenticated(peerAddress, connection)); authenticationListeners.stream().forEach(e -> e.onPeerAddressAuthenticated(peerAddress, connection));
} }
@ -765,10 +764,11 @@ public class PeerGroup implements MessageListener, ConnectionListener {
if (disconnectedPeer != null) if (disconnectedPeer != null)
printAuthenticatedPeers(); printAuthenticatedPeers();
} }
//TODO call removeReportedPeer
} }
private Address getMyAddress() { private Address getMyAddress() {
// Log.traceCall();
return networkNode.getAddress(); return networkNode.getAddress();
} }