Refactor WIP

This commit is contained in:
Manfred Karrer 2015-12-24 16:17:58 +01:00
parent cd631eb53b
commit addeb6e1ed
16 changed files with 244 additions and 153 deletions

View File

@ -31,8 +31,8 @@ public class Log {
private static SizeBasedTriggeringPolicy triggeringPolicy;
private static Logger logbackLogger;
public static void setup(String fileName, boolean releaseVersion) {
Log.PRINT_TRACE_METHOD = !releaseVersion;
public static void setup(String fileName, boolean useDetailedLogging) {
Log.PRINT_TRACE_METHOD = useDetailedLogging;
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
RollingFileAppender appender = new RollingFileAppender();
@ -48,7 +48,7 @@ public class Log {
rollingPolicy.start();
triggeringPolicy = new SizeBasedTriggeringPolicy();
triggeringPolicy.setMaxFileSize(releaseVersion ? "1MB" : "50MB");
triggeringPolicy.setMaxFileSize(useDetailedLogging ? "50MB" : "1MB");
triggeringPolicy.start();
PatternLayoutEncoder encoder = new PatternLayoutEncoder();
@ -62,7 +62,7 @@ public class Log {
appender.start();
logbackLogger = loggerContext.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME);
logbackLogger.setLevel(releaseVersion ? Level.DEBUG : Level.TRACE);
logbackLogger.setLevel(useDetailedLogging ? Level.TRACE : Level.DEBUG);
logbackLogger.addAppender(appender);
}

View File

@ -99,7 +99,7 @@ public class BitsquareApp extends Application {
@Override
public void start(Stage primaryStage) throws IOException {
String logPath = Paths.get(env.getProperty(BitsquareEnvironment.APP_DATA_DIR_KEY), "bitsquare").toString();
Log.setup(logPath, IS_RELEASE_VERSION);
Log.setup(logPath, !IS_RELEASE_VERSION);
log.info("Log files under: " + logPath);
Version.printVersion();

View File

@ -67,7 +67,7 @@
</GridPane.margin>
</Label>
<TextField fx:id="onionAddress" GridPane.rowIndex="3" GridPane.columnIndex="1"
mouseTransparent="true" focusTraversable="false">
editable="false" focusTraversable="false">
<GridPane.margin>
<Insets top="50.0"/>
</GridPane.margin>
@ -75,7 +75,7 @@
<Label fx:id="authenticatedPeersLabel" text="Authenticated peers:" GridPane.rowIndex="4"/>
<TextArea fx:id="authenticatedPeersTextArea" GridPane.rowIndex="4" GridPane.columnIndex="1"
mouseTransparent="true" focusTraversable="false"/>
editable="false" focusTraversable="false"/>
<columnConstraints>
<ColumnConstraints hgrow="SOMETIMES" halignment="RIGHT" minWidth="200.0"/>

View File

@ -133,8 +133,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
// peer group
peerGroup = new PeerGroup(networkNode);
peerGroup.addAuthenticationListener(this);
if (useLocalhost)
PeerGroup.setSimulateAuthTorNode(100);
// P2P network data storage
dataStorage = new P2PDataStorage(peerGroup, networkNode, storageDir);
@ -203,24 +201,27 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
if (peerGroup != null)
peerGroup.shutDown();
if (requestDataManager != null)
requestDataManager.shutDown();
if (networkNode != null)
networkNode.shutDown(() -> {
shutDownResultHandlers.stream().forEach(e -> e.run());
shutDownComplete = true;
});
} else {
if (shutDownComplete)
shutDownCompleteHandler.run();
else
shutDownResultHandlers.add(shutDownCompleteHandler);
log.debug("shutDown already in progress");
if (shutDownComplete) {
shutDownCompleteHandler.run();
} else {
shutDownResultHandlers.add(shutDownCompleteHandler);
}
}
}
/**
* Bootstrap sequence:
* Startup sequence:
* <p>
* Variant 1 (normal expected mode):
* onTorNodeReady -> requestDataManager.requestData()

View File

@ -387,6 +387,7 @@ public class Connection implements MessageListener {
public void reportIllegalRequest(IllegalRequest illegalRequest) {
Log.traceCall();
log.warn("We got reported an illegal request " + illegalRequest);
log.debug("connection={}" + this);
int violations;
if (illegalRequests.contains(illegalRequest))
violations = illegalRequests.get(illegalRequest);
@ -401,6 +402,7 @@ public class Connection implements MessageListener {
"violations={}\n" +
"illegalRequest={}\n" +
"illegalRequests={}", violations, illegalRequest, illegalRequests.toString());
log.debug("connection={}" + this);
shutDown(false);
} else {
illegalRequests.put(illegalRequest, ++violations);
@ -418,11 +420,13 @@ public class Connection implements MessageListener {
} else if (e instanceof SocketTimeoutException || e instanceof TimeoutException) {
shutDownReason = ConnectionListener.Reason.TIMEOUT;
log.warn("TimeoutException at connection with port " + socket.getLocalPort());
log.debug("connection={}" + this);
} else if (e instanceof EOFException) {
shutDownReason = ConnectionListener.Reason.PEER_DISCONNECTED;
} else {
shutDownReason = ConnectionListener.Reason.UNKNOWN;
log.warn("Exception at connection with port " + socket.getLocalPort());
log.debug("connection={}" + this);
e.printStackTrace();
}

View File

@ -277,16 +277,14 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
Log.traceCall();
boolean newEntry = messageListeners.add(messageListener);
if (!newEntry)
log.warn("Try to add a messageListener which was already added.\nmessageListener={}\nmessageListeners={}"
, messageListener, messageListeners);
log.warn("Try to add a messageListener which was already added.");
}
public void removeMessageListener(MessageListener messageListener) {
Log.traceCall();
boolean contained = messageListeners.remove(messageListener);
if (!contained)
log.warn("Try to remove a messageListener which was never added.\nmessageListener={}\nmessageListeners={}"
, messageListener, messageListeners);
log.warn("Try to remove a messageListener which was never added.");
}
@ -330,13 +328,13 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
}
private Optional<Connection> lookupOutboundConnection(Address peerAddress) {
Log.traceCall("search for " + peerAddress.toString() + " / outBoundConnections " + outBoundConnections);
// Log.traceCall("search for " + peerAddress.toString() + " / outBoundConnections " + outBoundConnections);
return outBoundConnections.stream()
.filter(e -> e.getPeerAddress().isPresent() && peerAddress.equals(e.getPeerAddress().get())).findAny();
}
private Optional<Connection> lookupInboundConnection(Address peerAddress) {
Log.traceCall("search for " + peerAddress.toString() + " / inBoundConnections " + inBoundConnections);
// Log.traceCall("search for " + peerAddress.toString() + " / inBoundConnections " + inBoundConnections);
return inBoundConnections.stream()
.filter(e -> e.getPeerAddress().isPresent() && peerAddress.equals(e.getPeerAddress().get())).findAny();
}

View File

@ -22,7 +22,10 @@ import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
// Authentication protocol:
// client: send AuthenticationRequest to seedNode
@ -36,30 +39,37 @@ public class AuthenticationHandshake implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(AuthenticationHandshake.class);
private final NetworkNode networkNode;
private final PeerGroup peerGroup;
private final Address myAddress;
private final Address peerAddress;
private final Supplier<Set<ReportedPeer>> authenticatedAndReportedPeersSupplier;
private final BiConsumer<HashSet<ReportedPeer>, Connection> addReportedPeersConsumer;
private final long startAuthTs;
private Optional<SettableFuture<Connection>> resultFutureOptional = Optional.empty();
private long nonce;
private boolean stopped;
private Optional<SettableFuture<Connection>> resultFutureOptional;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
public AuthenticationHandshake(NetworkNode networkNode, PeerGroup peerGroup, Address myAddress, Address peerAddress) {
public AuthenticationHandshake(NetworkNode networkNode,
Address myAddress,
Address peerAddress,
Supplier<Set<ReportedPeer>> authenticatedAndReportedPeersSupplier,
BiConsumer<HashSet<ReportedPeer>, Connection> addReportedPeersConsumer) {
this.authenticatedAndReportedPeersSupplier = authenticatedAndReportedPeersSupplier;
this.addReportedPeersConsumer = addReportedPeersConsumer;
Log.traceCall("peerAddress " + peerAddress);
this.networkNode = networkNode;
this.peerGroup = peerGroup;
this.myAddress = myAddress;
this.peerAddress = peerAddress;
startAuthTs = System.currentTimeMillis();
stopped = false;
nonce = 0;
resultFutureOptional = Optional.empty();
networkNode.addMessageListener(this);
}
@ -72,7 +82,9 @@ public class AuthenticationHandshake implements MessageListener {
@Override
public void onMessage(Message message, Connection connection) {
if (stopped) {
log.warn("AuthenticationHandshake already shut down but still got onMessage called. That must not happen.");
log.warn("AuthenticationHandshake (peerAddress={}) already shut down but still got onMessage called. That must not happen.", peerAddress);
log.warn("message={}", message);
log.warn("connection={}", connection);
return;
}
@ -93,7 +105,7 @@ public class AuthenticationHandshake implements MessageListener {
if (verified) {
AuthenticationResponse authenticationResponse = new AuthenticationResponse(myAddress,
authenticationChallenge.responderNonce,
new HashSet<>(peerGroup.getAuthenticatedAndReportedPeers()));
new HashSet<>(authenticatedAndReportedPeersSupplier.get()));
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, authenticationResponse);
log.trace("Sent GetPeersAuthRequest {} to {}", authenticationResponse, peerAddress);
Futures.addCallback(future, new FutureCallback<Connection>() {
@ -115,7 +127,7 @@ public class AuthenticationHandshake implements MessageListener {
});
// now we add the reported peers to our list
peerGroup.addToReportedPeers(authenticationChallenge.reportedPeers, connection);
addReportedPeersConsumer.accept(authenticationChallenge.reportedPeers, connection);
} else {
log.warn("Verification of nonce failed. AuthenticationResponse=" + authenticationChallenge + " / nonce=" + nonce);
failed(new Exception("Verification of nonce failed. AuthenticationResponse=" + authenticationChallenge + " / nonceMap=" + nonce));
@ -126,13 +138,13 @@ public class AuthenticationHandshake implements MessageListener {
log.trace("Received GetPeersAuthRequest from " + peerAddress + " at " + myAddress);
boolean verified = nonce != 0 && nonce == authenticationResponse.responderNonce;
if (verified) {
peerGroup.addToReportedPeers(authenticationResponse.reportedPeers, connection);
addReportedPeersConsumer.accept(authenticationResponse.reportedPeers, connection);
log.info("AuthenticationComplete: Peer with address " + peerAddress
+ " authenticated (" + connection.getUid() + "). Took "
+ (System.currentTimeMillis() - startAuthTs) + " ms.");
completed(connection);
} else {
log.warn("Verification of nonce failed. getPeersMessage=" + authenticationResponse + " / nonce=" + nonce);
log.warn("Verification of nonce failed. authenticationResponse=" + authenticationResponse + " / nonce=" + nonce);
failed(new Exception("Verification of nonce failed. getPeersMessage=" + authenticationResponse + " / nonce=" + nonce));
}
}
@ -150,7 +162,7 @@ public class AuthenticationHandshake implements MessageListener {
// Requesting peer
if (stopped) {
log.warn("AuthenticationHandshake already shut down but still got requestAuthentication called. That must not happen.");
log.warn("AuthenticationHandshake (peerAddress={}) already shut down but still got requestAuthentication called. That must not happen.", peerAddress);
}
resultFutureOptional = Optional.of(SettableFuture.create());
@ -167,8 +179,8 @@ public class AuthenticationHandshake implements MessageListener {
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("Send AuthenticationRequest to " + peerAddress + " failed." +
"\nException:" + throwable.getMessage());
log.info("Send AuthenticationRequest to " + peerAddress + " failed. " +
"It might be that the peer went offline.\nException:" + throwable.getMessage());
failed(throwable);
}
});
@ -187,7 +199,9 @@ public class AuthenticationHandshake implements MessageListener {
// Responding peer
if (stopped) {
log.warn("AuthenticationHandshake already shut down but still got respondToAuthenticationRequest called. That must not happen.");
log.warn("AuthenticationHandshake (peerAddress={}) already shut down but still got respondToAuthenticationRequest called. That must not happen.", peerAddress);
log.warn("authenticationRequest={}", authenticationRequest);
log.warn("connection={}", connection);
}
resultFutureOptional = Optional.of(SettableFuture.create());
@ -206,7 +220,7 @@ public class AuthenticationHandshake implements MessageListener {
AuthenticationChallenge authenticationChallenge = new AuthenticationChallenge(myAddress,
authenticationRequest.requesterNonce,
getAndSetNonce(),
new HashSet<>(peerGroup.getAuthenticatedAndReportedPeers()));
new HashSet<>(authenticatedAndReportedPeersSupplier.get()));
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, authenticationChallenge);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
@ -220,12 +234,12 @@ public class AuthenticationHandshake implements MessageListener {
@Override
public void onFailure(@NotNull Throwable throwable) {
log.warn("onFailure sending AuthenticationResponse. " + throwable.getMessage());
log.warn("Failure at sending AuthenticationResponse. It might be that the peer went offline." + throwable.getMessage());
failed(throwable);
}
});
} else {
log.warn("AuthenticationHandshake already shut down before we could sent AuthenticationResponse. That might happen in rare cases.");
log.info("AuthenticationHandshake (peerAddress={}) already shut down before we could sent AuthenticationResponse. That might happen in rare cases.", peerAddress);
}
}, 1000, TimeUnit.MILLISECONDS); // Don't set the delay too short as the CloseConnectionMessage might arrive too late at the peer
});
@ -238,10 +252,20 @@ public class AuthenticationHandshake implements MessageListener {
///////////////////////////////////////////////////////////////////////////////////////////
public void cancel() {
Log.traceCall();
failed(new CancelAuthenticationException());
}
///////////////////////////////////////////////////////////////////////////////////////////
// Getter
///////////////////////////////////////////////////////////////////////////////////////////
public Optional<SettableFuture<Connection>> getResultFutureOptional() {
return resultFutureOptional;
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////
@ -274,25 +298,8 @@ public class AuthenticationHandshake implements MessageListener {
}
private void shutDown() {
Log.traceCall();
stopped = true;
Log.traceCall("peerAddress = " + peerAddress);
networkNode.removeMessageListener(this);
stopped = true;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof AuthenticationHandshake)) return false;
AuthenticationHandshake that = (AuthenticationHandshake) o;
return !(peerAddress != null ? !peerAddress.equals(that.peerAddress) : that.peerAddress != null);
}
@Override
public int hashCode() {
return peerAddress != null ? peerAddress.hashCode() : 0;
}
}

View File

@ -5,6 +5,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import io.bitsquare.app.Log;
import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.MessageListener;
@ -16,38 +17,47 @@ import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import java.util.Timer;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class MaintenanceManager implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(MaintenanceManager.class);
private Timer sendPingTimer;
private final PeerGroup peerGroup;
private static final int INACTIVITY_PERIOD_BEFORE_PING = 5 * 60 * 1000;
private final NetworkNode networkNode;
private final Supplier<Map<Address, Peer>> authenticatedPeersSupplier;
private final Consumer<Address> removePeerConsumer;
private Timer sendPingTimer;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
public MaintenanceManager(PeerGroup peerGroup, NetworkNode networkNode) {
this.peerGroup = peerGroup;
public MaintenanceManager(NetworkNode networkNode,
Supplier<Map<Address, Peer>> authenticatedPeersSupplier,
Consumer<Address> removePeerConsumer) {
this.networkNode = networkNode;
this.authenticatedPeersSupplier = authenticatedPeersSupplier;
this.removePeerConsumer = removePeerConsumer;
networkNode.addMessageListener(this);
startMaintenanceTimer();
}
public void shutDown() {
Log.traceCall();
if (sendPingTimer != null)
sendPingTimer.cancel();
networkNode.removeMessageListener(this);
}
///////////////////////////////////////////////////////////////////////////////////////////
// MessageListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@ -67,16 +77,16 @@ public class MaintenanceManager implements MessageListener {
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("PongMessage sending failed " + throwable.getMessage());
connection.getPeerAddress().ifPresent(peerAddress -> peerGroup.removePeer(peerAddress));
connection.getPeerAddress().ifPresent(peerAddress -> removePeerConsumer.accept(peerAddress));
}
});
} else if (message instanceof PongMessage) {
connection.getPeerAddress().ifPresent(peerAddress -> {
Peer peer = peerGroup.getAuthenticatedPeers().get(peerAddress);
Peer peer = authenticatedPeersSupplier.get().get(peerAddress);
if (peer != null) {
if (((PongMessage) message).nonce != peer.getPingNonce()) {
log.warn("PongMessage invalid: self/peer " + peerGroup.getMyAddress() + "/" + peerAddress);
peerGroup.removePeer(peer.address);
if (((PongMessage) message).nonce != peer.pingNonce) {
log.warn("PongMessage invalid: self/peer " + networkNode.getAddress() + "/" + peerAddress);
removePeerConsumer.accept(peer.address);
}
}
});
@ -95,15 +105,14 @@ public class MaintenanceManager implements MessageListener {
}, 5, 7, TimeUnit.MINUTES);
}
private void pingPeers() {
Set<Peer> connectedPeersList = new HashSet<>(peerGroup.getAuthenticatedPeers().values());
Set<Peer> connectedPeersList = new HashSet<>(authenticatedPeersSupplier.get().values());
if (!connectedPeersList.isEmpty()) {
Log.traceCall();
connectedPeersList.stream()
.filter(e -> (new Date().getTime() - e.connection.getLastActivityDate().getTime()) > PeerGroup.INACTIVITY_PERIOD_BEFORE_PING)
.filter(e -> (new Date().getTime() - e.connection.getLastActivityDate().getTime()) > INACTIVITY_PERIOD_BEFORE_PING)
.forEach(e -> UserThread.runAfterRandomDelay(() -> {
SettableFuture<Connection> future = networkNode.sendMessage(e.connection, new PingMessage(e.getPingNonce()));
SettableFuture<Connection> future = networkNode.sendMessage(e.connection, new PingMessage(e.pingNonce));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
@ -113,7 +122,7 @@ public class MaintenanceManager implements MessageListener {
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("PingMessage sending failed " + throwable.getMessage());
peerGroup.removePeer(e.address);
removePeerConsumer.accept(e.address);
}
});
}, 2, 4, TimeUnit.SECONDS));

View File

@ -12,16 +12,13 @@ public class Peer {
public final Connection connection;
public final Address address;
private final long pingNonce;
public final long pingNonce;
public Peer(Connection connection, Address address) {
this.connection = connection;
this.address = address;
pingNonce = new Random().nextLong();
}
public long getPingNonce() {
return pingNonce;
pingNonce = new Random().nextLong();
}
@Override

View File

@ -5,6 +5,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import io.bitsquare.app.Log;
import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.MessageListener;
@ -17,21 +18,40 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class PeerExchangeManager implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(PeerExchangeManager.class);
private final PeerGroup peerGroup;
private final NetworkNode networkNode;
private final Supplier<Set<ReportedPeer>> authenticatedAndReportedPeersSupplier;
private final Supplier<Map<Address, Peer>> authenticatedPeersSupplier;
private final Consumer<Address> removePeerConsumer;
private final BiConsumer<HashSet<ReportedPeer>, Connection> addReportedPeersConsumer;
private Timer getPeersTimer;
public PeerExchangeManager(PeerGroup peerGroup, NetworkNode networkNode) {
this.peerGroup = peerGroup;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
public PeerExchangeManager(NetworkNode networkNode,
Supplier<Set<ReportedPeer>> authenticatedAndReportedPeersSupplier,
Supplier<Map<Address, Peer>> authenticatedPeersSupplier,
Consumer<Address> removePeerConsumer,
BiConsumer<HashSet<ReportedPeer>, Connection> addReportedPeersConsumer) {
this.networkNode = networkNode;
this.authenticatedAndReportedPeersSupplier = authenticatedAndReportedPeersSupplier;
this.authenticatedPeersSupplier = authenticatedPeersSupplier;
this.removePeerConsumer = removePeerConsumer;
this.addReportedPeersConsumer = addReportedPeersConsumer;
networkNode.addMessageListener(this);
startGetPeersTimer();
@ -41,8 +61,11 @@ public class PeerExchangeManager implements MessageListener {
Log.traceCall();
if (getPeersTimer != null)
getPeersTimer.cancel();
networkNode.removeMessageListener(this);
}
///////////////////////////////////////////////////////////////////////////////////////////
// MessageListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@ -57,7 +80,7 @@ public class PeerExchangeManager implements MessageListener {
log.trace("Received peers: " + reportedPeers);
SettableFuture<Connection> future = networkNode.sendMessage(connection,
new GetPeersResponse(new HashSet<>(peerGroup.getAuthenticatedAndReportedPeers())));
new GetPeersResponse(new HashSet<>(authenticatedAndReportedPeersSupplier.get())));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
@ -67,16 +90,15 @@ public class PeerExchangeManager implements MessageListener {
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("GetPeersResponse sending failed " + throwable.getMessage());
peerGroup.removePeer(getPeersRequestMessage.senderAddress);
removePeerConsumer.accept(getPeersRequestMessage.senderAddress);
}
});
peerGroup.addToReportedPeers(reportedPeers, connection);
addReportedPeersConsumer.accept(reportedPeers, connection);
} else if (message instanceof GetPeersResponse) {
GetPeersResponse getPeersResponse = (GetPeersResponse) message;
HashSet<ReportedPeer> reportedPeers = getPeersResponse.reportedPeers;
log.trace("Received peers: " + reportedPeers);
peerGroup.addToReportedPeers(reportedPeers, connection);
addReportedPeersConsumer.accept(reportedPeers, connection);
}
}
}
@ -93,13 +115,13 @@ public class PeerExchangeManager implements MessageListener {
}
private void trySendGetPeersRequest() {
Set<Peer> connectedPeersList = new HashSet<>(peerGroup.getAuthenticatedPeers().values());
Set<Peer> connectedPeersList = new HashSet<>(authenticatedPeersSupplier.get().values());
if (!connectedPeersList.isEmpty()) {
Log.traceCall();
connectedPeersList.stream()
.forEach(e -> UserThread.runAfterRandomDelay(() -> {
SettableFuture<Connection> future = networkNode.sendMessage(e.connection,
new GetPeersRequest(peerGroup.getMyAddress(), new HashSet<>(peerGroup.getAuthenticatedAndReportedPeers())));
new GetPeersRequest(networkNode.getAddress(), new HashSet<>(authenticatedAndReportedPeersSupplier.get())));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
@ -109,11 +131,10 @@ public class PeerExchangeManager implements MessageListener {
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("sendGetPeersRequest sending failed " + throwable.getMessage());
peerGroup.removePeer(e.address);
removePeerConsumer.accept(e.address);
}
});
}, 3, 5, TimeUnit.SECONDS));
}
}
}

View File

@ -27,11 +27,9 @@ import static com.google.common.base.Preconditions.checkArgument;
public class PeerGroup implements MessageListener, ConnectionListener {
private static final Logger log = LoggerFactory.getLogger(PeerGroup.class);
private static int simulateAuthTorNode = 0;
public static void setSimulateAuthTorNode(int simulateAuthTorNode) {
PeerGroup.simulateAuthTorNode = simulateAuthTorNode;
}
///////////////////////////////////////////////////////////////////////////////////////////
// Static
///////////////////////////////////////////////////////////////////////////////////////////
private static int MAX_CONNECTIONS_LOW_PRIORITY;
private static int MAX_CONNECTIONS_NORMAL_PRIORITY;
@ -44,27 +42,26 @@ public class PeerGroup implements MessageListener, ConnectionListener {
}
static {
setMaxConnectionsLowPriority(8);
setMaxConnectionsLowPriority(18);
}
static final int INACTIVITY_PERIOD_BEFORE_PING = 5 * 60 * 1000;
private static final int MAX_REPORTED_PEERS = 1000;
///////////////////////////////////////////////////////////////////////////////////////////
// Class fields
///////////////////////////////////////////////////////////////////////////////////////////
private final NetworkNode networkNode;
private final CopyOnWriteArraySet<AuthenticationListener> authenticationListeners = new CopyOnWriteArraySet<>();
public Map<Address, Peer> getAuthenticatedPeers() {
return authenticatedPeers;
}
private final Map<Address, Peer> authenticatedPeers = new HashMap<>();
private final Set<ReportedPeer> reportedPeers = new HashSet<>();
private final MaintenanceManager maintenanceManager;
private final PeerExchangeManager peerExchangeManager;
private Optional<Set<Address>> seedNodeAddressesOptional = Optional.empty();
private final CopyOnWriteArraySet<AuthenticationListener> authenticationListeners = new CopyOnWriteArraySet<>();
private final Map<Address, Peer> authenticatedPeers = new HashMap<>();
private final Set<ReportedPeer> reportedPeers = new HashSet<>();
private final List<Address> remainingSeedNodes = new ArrayList<>();
private final Map<Address, AuthenticationHandshake> authenticationHandshakes = new HashMap<>();
private Optional<Set<Address>> seedNodeAddressesOptional = Optional.empty();
///////////////////////////////////////////////////////////////////////////////////////////
@ -79,8 +76,14 @@ public class PeerGroup implements MessageListener, ConnectionListener {
networkNode.addMessageListener(this);
networkNode.addConnectionListener(this);
maintenanceManager = new MaintenanceManager(this, networkNode);
peerExchangeManager = new PeerExchangeManager(this, networkNode);
maintenanceManager = new MaintenanceManager(networkNode,
() -> getAuthenticatedPeers(),
address -> removePeer(address));
peerExchangeManager = new PeerExchangeManager(networkNode,
() -> getAuthenticatedAndReportedPeers(),
() -> getAuthenticatedPeers(),
address -> removePeer(address),
(newReportedPeers, connection) -> addToReportedPeers(newReportedPeers, connection));
}
@ -161,6 +164,9 @@ public class PeerGroup implements MessageListener, ConnectionListener {
Log.traceCall();
maintenanceManager.shutDown();
peerExchangeManager.shutDown();
networkNode.removeMessageListener(this);
networkNode.removeConnectionListener(this);
}
public void addAuthenticationListener(AuthenticationListener listener) {
@ -183,10 +189,18 @@ public class PeerGroup implements MessageListener, ConnectionListener {
if (!authenticatedPeers.containsKey(peerAddress)) {
AuthenticationHandshake authenticationHandshake;
if (!authenticationHandshakes.containsKey(peerAddress)) {
log.info("We got an incoming AuthenticationRequest for the peerAddress ({})", peerAddress);
log.info("We got an incoming AuthenticationRequest for the peerAddress {}. " +
"We create an AuthenticationHandshake.", peerAddress);
log.trace("message={}", message);
log.trace("connection={}", connection);
// We protect that connection from getting closed by maintenance cleanup...
connection.setConnectionPriority(ConnectionPriority.AUTH_REQUEST);
authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress(), peerAddress);
authenticationHandshake = new AuthenticationHandshake(networkNode,
getMyAddress(),
peerAddress,
() -> getAuthenticatedAndReportedPeers(),
(newReportedPeers, connection1) -> addToReportedPeers(newReportedPeers, connection1)
);
authenticationHandshakes.put(peerAddress, authenticationHandshake);
doRespondToAuthenticationRequest(message, connection, peerAddress, authenticationHandshake);
} else {
@ -203,13 +217,13 @@ public class PeerGroup implements MessageListener, ConnectionListener {
rejectAuthenticationRequest(peerAddress);
} else {
log.info("We accept the authentication request but cancel our own request.");
cancelOwnAuthenticationRequest(peerAddress, authenticationHandshake);
cancelOwnAuthenticationRequest(peerAddress);
doRespondToAuthenticationRequest(message, connection, peerAddress, authenticationHandshake);
}
}
} else {
log.warn("We got an incoming AuthenticationRequest but we are already authenticated to that peer " +
log.info("We got an incoming AuthenticationRequest but we are already authenticated to that peer " +
"with peerAddress {}.\n" +
"That might happen in some race conditions. We reject the request.", peerAddress);
rejectAuthenticationRequest(peerAddress);
@ -219,7 +233,8 @@ public class PeerGroup implements MessageListener, ConnectionListener {
private void processAuthenticationRejection(AuthenticationRejection message) {
Log.traceCall(message.toString());
Address peerAddress = message.senderAddress;
cancelOwnAuthenticationRequest(peerAddress, authenticationHandshakes.get(peerAddress));
cancelOwnAuthenticationRequest(peerAddress);
}
private void doRespondToAuthenticationRequest(AuthenticationRequest message, Connection connection,
@ -243,10 +258,12 @@ public class PeerGroup implements MessageListener, ConnectionListener {
);
}
private void cancelOwnAuthenticationRequest(Address peerAddress, AuthenticationHandshake authenticationHandshake) {
private void cancelOwnAuthenticationRequest(Address peerAddress) {
Log.traceCall();
authenticationHandshake.cancel();
authenticationHandshakes.remove(peerAddress);
if (authenticationHandshakes.containsKey(peerAddress)) {
authenticationHandshakes.get(peerAddress).cancel();
authenticationHandshakes.remove(peerAddress);
}
}
private void rejectAuthenticationRequest(Address peerAddress) {
@ -347,6 +364,11 @@ public class PeerGroup implements MessageListener, ConnectionListener {
}
}
private Address getAndRemoveRandomAddress(List<Address> list) {
checkArgument(!list.isEmpty(), "List must not be empty");
return list.remove(new Random().nextInt(list.size()));
}
///////////////////////////////////////////////////////////////////////////////////////////
// Authentication to reported peers
@ -399,6 +421,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// Authentication to peer used for direct messaging
///////////////////////////////////////////////////////////////////////////////////////////
@ -410,9 +433,32 @@ public class PeerGroup implements MessageListener, ConnectionListener {
Log.traceCall(peerAddress.getFullAddress());
if (authenticatedPeers.containsKey(peerAddress)) {
log.warn("We have that peer already authenticated. That should never happen.");
log.warn("We have that peer already authenticated. That should never happen. peerAddress={}", peerAddress);
if (completeHandler != null)
completeHandler.run();
} else if (authenticationHandshakes.containsKey(peerAddress)) {
log.info("We are in the process to authenticate to that peer. peerAddress={}", peerAddress);
Optional<SettableFuture<Connection>> resultFutureOptional = authenticationHandshakes.get(peerAddress).getResultFutureOptional();
if (resultFutureOptional.isPresent()) {
Futures.addCallback(resultFutureOptional.get(), new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
if (completeHandler != null)
completeHandler.run();
}
@Override
public void onFailure(@NotNull Throwable throwable) {
if (faultHandler != null)
faultHandler.run();
}
});
} else {
log.warn("We are in the process to authenticate to that peer but the future object is not set. " +
"That should not happen. peerAddress={}", peerAddress);
if (faultHandler != null)
faultHandler.run();
}
} else {
log.info("We try to authenticate to peer {} for sending a private message.", peerAddress);
authenticate(peerAddress, new FutureCallback<Connection>() {
@ -446,7 +492,13 @@ public class PeerGroup implements MessageListener, ConnectionListener {
private void authenticate(Address peerAddress, FutureCallback<Connection> futureCallback) {
Log.traceCall(peerAddress.getFullAddress());
if (!authenticationHandshakes.containsKey(peerAddress)) {
AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress(), peerAddress);
log.info("We create an AuthenticationHandshake to authenticate to peer {}.", peerAddress);
AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode,
getMyAddress(),
peerAddress,
() -> getAuthenticatedAndReportedPeers(),
(newReportedPeers, connection) -> addToReportedPeers(newReportedPeers, connection)
);
authenticationHandshakes.put(peerAddress, authenticationHandshake);
SettableFuture<Connection> authenticationFuture = authenticationHandshake.requestAuthentication();
Futures.addCallback(authenticationFuture, futureCallback);
@ -519,9 +571,11 @@ public class PeerGroup implements MessageListener, ConnectionListener {
int allConnectionsSize = allConnections.size();
log.info("We have {} connections open. Lets remove the passive connections" +
" which have not been active recently.", allConnectionsSize);
if (size != allConnectionsSize)
if (size != allConnectionsSize) {
log.warn("authenticatedPeers.size()!=allConnections.size(). There is some inconsistency.");
log.debug("authenticatedPeers={}", authenticatedPeers);
log.debug("networkNode.getAllConnections()={}", networkNode.getAllConnections());
}
List<Connection> authenticatedConnections = allConnections.stream()
.filter(e -> e.isAuthenticated())
.filter(e -> e.getConnectionPriority() == ConnectionPriority.PASSIVE)
@ -583,6 +637,10 @@ public class PeerGroup implements MessageListener, ConnectionListener {
return all;
}
public Map<Address, Peer> getAuthenticatedPeers() {
return authenticatedPeers;
}
public boolean isInAuthenticationProcess(Address address) {
return authenticationHandshakes.containsKey(address);
}
@ -620,14 +678,13 @@ public class PeerGroup implements MessageListener, ConnectionListener {
}
});
this.reportedPeers.addAll(adjustedReportedPeers);
reportedPeers.addAll(adjustedReportedPeers);
purgeReportedPeersIfExceeds();
}
printReportedPeers();
}
// TODO unit test
private void purgeReportedPeersIfExceeds() {
Log.traceCall();
int size = reportedPeers.size();
@ -653,11 +710,6 @@ public class PeerGroup implements MessageListener, ConnectionListener {
return networkNode.getAddress();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Utils
///////////////////////////////////////////////////////////////////////////////////////////
private ReportedPeer getAndRemoveRandomReportedPeer(List<ReportedPeer> list) {
checkArgument(!list.isEmpty(), "List must not be empty");
return list.remove(new Random().nextInt(list.size()));
@ -675,11 +727,10 @@ public class PeerGroup implements MessageListener, ConnectionListener {
return reportedPeer;
}
private Address getAndRemoveRandomAddress(List<Address> list) {
checkArgument(!list.isEmpty(), "List must not be empty");
return list.remove(new Random().nextInt(list.size()));
}
///////////////////////////////////////////////////////////////////////////////////////////
// Utils
///////////////////////////////////////////////////////////////////////////////////////////
public void printAllPeers() {
printAuthenticatedPeers();

View File

@ -61,6 +61,13 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
networkNode.addMessageListener(this);
}
public void shutDown() {
Log.traceCall();
networkNode.removeMessageListener(this);
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////

View File

@ -1,5 +1,6 @@
package io.bitsquare.p2p.seed;
import com.google.common.annotations.VisibleForTesting;
import io.bitsquare.app.Log;
import io.bitsquare.app.Version;
import io.bitsquare.common.UserThread;
@ -97,14 +98,15 @@ public class SeedNode {
}
}
public void createAndStartP2PService(boolean releaseVersion) {
createAndStartP2PService(mySeedNodeAddress, useLocalhost, Version.NETWORK_ID, releaseVersion, progArgSeedNodes, null);
public void createAndStartP2PService(boolean useDetailedLogging) {
createAndStartP2PService(mySeedNodeAddress, useLocalhost, Version.NETWORK_ID, useDetailedLogging, progArgSeedNodes, null);
}
@VisibleForTesting
public void createAndStartP2PService(Address mySeedNodeAddress,
boolean useLocalhost,
int networkId,
boolean releaseVersion,
boolean useDetailedLogging,
@Nullable Set<Address> progArgSeedNodes,
@Nullable P2PServiceListener listener) {
Log.traceCall();
@ -113,7 +115,7 @@ public class SeedNode {
"Bitsquare_seed_node_" + String.valueOf(mySeedNodeAddress.getFullAddress().replace(":", "_")));
String logPath = Paths.get(appPath.toString(), "logs").toString();
Log.setup(logPath, releaseVersion);
Log.setup(logPath, useDetailedLogging);
log.info("Log files under: " + logPath);
SeedNodesRepository seedNodesRepository = new SeedNodesRepository();

View File

@ -80,7 +80,7 @@ public class TestUtils {
}
CountDownLatch latch = new CountDownLatch(1);
seedNode.createAndStartP2PService(new Address("localhost", port), useLocalhost, 2, false,
seedNode.createAndStartP2PService(new Address("localhost", port), useLocalhost, 2, true,
seedNodes, new P2PServiceListener() {
@Override
public void onRequestingDataCompleted() {

View File

@ -80,7 +80,7 @@ public class PeerGroupTest {
seedNodes.add(address);
seedNode1 = new SeedNode("test_dummy_dir");
latch = new CountDownLatch(2);
seedNode1.createAndStartP2PService(address, useLocalhost, 2, false,
seedNode1.createAndStartP2PService(address, useLocalhost, 2, true,
seedNodes, new P2PServiceListener() {
@Override
public void onRequestingDataCompleted() {
@ -128,7 +128,7 @@ public class PeerGroupTest {
latch = new CountDownLatch(6);
seedNode1 = new SeedNode("test_dummy_dir");
seedNode1.createAndStartP2PService(address1, useLocalhost, 2, false, seedNodes, new P2PServiceListener() {
seedNode1.createAndStartP2PService(address1, useLocalhost, 2, true, seedNodes, new P2PServiceListener() {
@Override
public void onRequestingDataCompleted() {
latch.countDown();
@ -163,7 +163,7 @@ public class PeerGroupTest {
Thread.sleep(500);
seedNode2 = new SeedNode("test_dummy_dir");
seedNode2.createAndStartP2PService(address2, useLocalhost, 2, false, seedNodes, new P2PServiceListener() {
seedNode2.createAndStartP2PService(address2, useLocalhost, 2, true, seedNodes, new P2PServiceListener() {
@Override
public void onRequestingDataCompleted() {
latch.countDown();
@ -396,7 +396,7 @@ public class PeerGroupTest {
SeedNode seedNode = new SeedNode("test_dummy_dir");
latch = new CountDownLatch(1);
seedNode.createAndStartP2PService(new Address("localhost", port), useLocalhost, 2, false, seedNodes, new P2PServiceListener() {
seedNode.createAndStartP2PService(new Address("localhost", port), useLocalhost, 2, true, seedNodes, new P2PServiceListener() {
@Override
public void onRequestingDataCompleted() {
latch.countDown();

View File

@ -8,7 +8,6 @@ import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.security.NoSuchAlgorithmException;
import java.security.Security;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
@ -16,20 +15,18 @@ import java.util.concurrent.ThreadFactory;
public class SeedNodeMain {
private static final Logger log = LoggerFactory.getLogger(SeedNodeMain.class);
public static final boolean IS_RELEASE_VERSION = false;
public static final boolean USE_DETAILED_LOGGING = true;
private SeedNode seedNode;
private boolean stopped;
// args: myAddress (incl. port) useLocalhost seedNodes (separated with |)
// eg. lmvdenjkyvx2ovga.onion:8001 false eo5ay2lyzrfvx2nr.onion:8002|si3uu56adkyqkldl.onion:8003
// To stop enter: q
public static void main(String[] args) throws NoSuchAlgorithmException {
public static void main(String[] args) throws InterruptedException {
new SeedNodeMain(args);
}
public SeedNodeMain(String[] args) {
public SeedNodeMain(String[] args) throws InterruptedException {
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("SeedNodeMain")
.setDaemon(true)
@ -54,7 +51,7 @@ public class SeedNodeMain {
try {
seedNode = new SeedNode(BitsquareEnvironment.defaultUserDataDir());
seedNode.processArgs(args);
seedNode.createAndStartP2PService(IS_RELEASE_VERSION);
seedNode.createAndStartP2PService(USE_DETAILED_LOGGING);
} catch (Throwable t) {
log.error("Executing task failed. " + t.getMessage());
t.printStackTrace();
@ -62,10 +59,7 @@ public class SeedNodeMain {
});
while (true) {
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
}
Thread.sleep(Long.MAX_VALUE);
}
}
}