Add disconnect/awake handlers (WIP)

This commit is contained in:
Manfred Karrer 2016-02-20 12:04:07 +01:00
parent 3efca4101a
commit ae6b95eed8
17 changed files with 1029 additions and 736 deletions

View File

@ -1,19 +1,65 @@
package io.bitsquare.common;
public interface Clock {
void start();
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
void stop();
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
void addListener(Listener listener);
public class Clock {
private static final Logger log = LoggerFactory.getLogger(Clock.class);
void removeListener(Listener listener);
public static final int IDLE_TOLERANCE = 5000;
interface Listener {
public interface Listener {
void onSecondTick();
void onMinuteTick();
void onMissedSecondTick(long missed);
}
private Timer timer;
private final List<Listener> listeners = new LinkedList<>();
private long counter = 0;
private long lastSecondTick;
public Clock() {
}
public void start() {
if (timer == null) {
lastSecondTick = System.currentTimeMillis();
timer = UserThread.runPeriodically(() -> {
listeners.stream().forEach(Listener::onSecondTick);
counter++;
if (counter >= 60) {
counter = 0;
listeners.stream().forEach(Listener::onMinuteTick);
}
long currentTimeMillis = System.currentTimeMillis();
long diff = currentTimeMillis - lastSecondTick;
if (diff > 1000)
listeners.stream().forEach(listener -> listener.onMissedSecondTick(diff - 1000));
lastSecondTick = currentTimeMillis;
}, 1, TimeUnit.SECONDS);
}
}
public void stop() {
timer.stop();
timer = null;
counter = 0;
}
public void addListener(Listener listener) {
listeners.add(listener);
}
public void removeListener(Listener listener) {
listeners.remove(listener);
}
}

View File

@ -177,7 +177,7 @@ public class OpenOfferManager {
@Override
public void onMissedSecondTick(long missed) {
if (missed > 5000) {
if (missed > Clock.IDLE_TOLERANCE) {
log.error("We have been idle for {} sec", missed / 1000);
// We have been idle for at least 5 sec.

View File

@ -26,7 +26,6 @@ import io.bitsquare.common.crypto.KeyRing;
import io.bitsquare.common.crypto.KeyStorage;
import io.bitsquare.crypto.EncryptionServiceModule;
import io.bitsquare.gui.GuiModule;
import io.bitsquare.gui.common.UIClock;
import io.bitsquare.gui.common.view.CachingViewLoader;
import io.bitsquare.gui.main.intructions.InstructionCenter;
import io.bitsquare.gui.main.notifications.NotificationCenter;
@ -64,7 +63,7 @@ class BitsquareAppModule extends AppModule {
bind(Preferences.class).in(Singleton.class);
bind(NotificationCenter.class).in(Singleton.class);
bind(InstructionCenter.class).in(Singleton.class);
bind(Clock.class).to(UIClock.class).in(Singleton.class);
bind(Clock.class).in(Singleton.class);
File storageDir = new File(env.getRequiredProperty(Storage.DIR_KEY));
bind(File.class).annotatedWith(named(Storage.DIR_KEY)).toInstance(storageDir);

View File

@ -1,79 +0,0 @@
package io.bitsquare.gui.common;
import io.bitsquare.common.Clock;
import io.bitsquare.common.Timer;
import io.bitsquare.common.UserThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class UIClock implements Clock {
private static final Logger log = LoggerFactory.getLogger(UIClock.class);
private Timer timer;
private final List<Listener> listeners = new LinkedList<>();
private long counter = 0;
private long lastSecondTick;
public UIClock() {
}
@Override
public void start() {
if (timer == null) {
lastSecondTick = System.currentTimeMillis();
timer = UserThread.runPeriodically(() -> {
listeners.stream().forEach(Listener::onSecondTick);
counter++;
if (counter >= 60) {
counter = 0;
listeners.stream().forEach(Listener::onMinuteTick);
}
long currentTimeMillis = System.currentTimeMillis();
long diff = currentTimeMillis - lastSecondTick;
if (diff > 1000)
listeners.stream().forEach(listener -> listener.onMissedSecondTick(diff - 1000));
lastSecondTick = currentTimeMillis;
}, 1, TimeUnit.SECONDS);
/* timer = FxTimer.runPeriodically(Duration.ofSeconds(1), () -> {
listeners.stream().forEach(Listener::onSecondTick);
counter++;
if (counter >= 60) {
counter = 0;
listeners.stream().forEach(Listener::onMinuteTick);
}
long currentTimeMillis = System.currentTimeMillis();
long diff = currentTimeMillis - lastSecondTick;
if (diff > 1000)
listeners.stream().forEach(listener -> listener.onMissedSecondTick(diff - 1000));
lastSecondTick = currentTimeMillis;
});*/
}
}
@Override
public void stop() {
timer.stop();
timer = null;
counter = 0;
}
@Override
public void addListener(Listener listener) {
listeners.add(listener);
}
@Override
public void removeListener(Listener listener) {
listeners.remove(listener);
}
}

View File

@ -8,6 +8,7 @@ import com.google.inject.Inject;
import com.google.inject.name.Named;
import io.bitsquare.app.Log;
import io.bitsquare.app.ProgramArguments;
import io.bitsquare.common.Clock;
import io.bitsquare.common.Timer;
import io.bitsquare.common.UserThread;
import io.bitsquare.common.crypto.CryptoException;
@ -56,6 +57,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
private final SeedNodesRepository seedNodesRepository;
private final int port;
private final File torDir;
private Clock clock;
private final Optional<EncryptionService> optionalEncryptionService;
private final Optional<KeyRing> optionalKeyRing;
@ -99,11 +101,13 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
@Named(ProgramArguments.USE_LOCALHOST) boolean useLocalhost,
@Named(ProgramArguments.NETWORK_ID) int networkId,
@Named("storage.dir") File storageDir,
Clock clock,
@Nullable EncryptionService encryptionService,
@Nullable KeyRing keyRing) {
this.seedNodesRepository = seedNodesRepository;
this.port = port;
this.torDir = torDir;
this.clock = clock;
optionalEncryptionService = encryptionService == null ? Optional.empty() : Optional.of(encryptionService);
optionalKeyRing = keyRing == null ? Optional.empty() : Optional.of(keyRing);
@ -126,7 +130,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
p2PDataStorage.addHashMapChangedListener(this);
Set<NodeAddress> seedNodeAddresses = seedNodesRepository.getSeedNodeAddresses(useLocalhost, networkId);
peerManager = new PeerManager(networkNode, seedNodeAddresses, storageDir);
peerManager = new PeerManager(networkNode, seedNodeAddresses, storageDir, clock);
requestDataManager = new RequestDataManager(networkNode, p2PDataStorage, peerManager, seedNodeAddresses, this);
@ -247,11 +251,11 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
Log.traceCall();
networkReadySubscription.unsubscribe();
Optional<NodeAddress> seedNodeOfPreliminaryDataRequest = requestDataManager.getNodeOfPreliminaryDataRequest();
Optional<NodeAddress> seedNodeOfPreliminaryDataRequest = requestDataManager.getNodeAddressOfPreliminaryDataRequest();
checkArgument(seedNodeOfPreliminaryDataRequest.isPresent(),
"seedNodeOfPreliminaryDataRequest must be present");
requestDataManager.requestUpdatesData();
requestDataManager.requestUpdateData();
}
///////////////////////////////////////////////////////////////////////////////////////////
@ -267,7 +271,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
@Override
public void onUpdatedDataReceived() {
Optional<NodeAddress> seedNodeOfPreliminaryDataRequest = requestDataManager.getNodeOfPreliminaryDataRequest();
Optional<NodeAddress> seedNodeOfPreliminaryDataRequest = requestDataManager.getNodeAddressOfPreliminaryDataRequest();
checkArgument(seedNodeOfPreliminaryDataRequest.isPresent(),
"seedNodeOfPreliminaryDataRequest must be present");
peerExchangeManager.requestReportedPeersFromSeedNodes(seedNodeOfPreliminaryDataRequest.get());

View File

@ -1,6 +1,7 @@
package io.bitsquare.p2p.peers;
import io.bitsquare.app.Log;
import io.bitsquare.common.Clock;
import io.bitsquare.common.Timer;
import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.Message;
@ -36,6 +37,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
private static int MAX_CONNECTIONS_ABSOLUTE;
private final boolean printReportedPeersDetails = true;
private boolean lostAllConnections;
public static void setMaxConnections(int maxConnections) {
MAX_CONNECTIONS = maxConnections;
@ -54,7 +56,26 @@ public class PeerManager implements ConnectionListener, MessageListener {
private static final long MAX_AGE = TimeUnit.DAYS.toMillis(14); // max age for reported peers is 14 days
///////////////////////////////////////////////////////////////////////////////////////////
// Listener
///////////////////////////////////////////////////////////////////////////////////////////
public interface Listener {
void onAllConnectionsLost();
void onNewConnectionAfterAllConnectionsLost();
void onAwakeFromStandby();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Instance fields
///////////////////////////////////////////////////////////////////////////////////////////
private final NetworkNode networkNode;
private Clock clock;
private final Set<NodeAddress> seedNodeAddresses;
private final Storage<HashSet<ReportedPeer>> dbStorage;
@ -62,14 +83,16 @@ public class PeerManager implements ConnectionListener, MessageListener {
private final Set<ReportedPeer> reportedPeers = new HashSet<>();
private Timer checkMaxConnectionsTimer;
private final ChangeListener<NodeAddress> connectionNodeAddressListener;
private final Clock.Listener listener;
private final List<Listener> listeners = new LinkedList<>();
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
public PeerManager(NetworkNode networkNode, Set<NodeAddress> seedNodeAddresses, File storageDir) {
public PeerManager(NetworkNode networkNode, Set<NodeAddress> seedNodeAddresses, File storageDir, Clock clock) {
this.networkNode = networkNode;
this.clock = clock;
this.seedNodeAddresses = new HashSet<>(seedNodeAddresses);
networkNode.addConnectionListener(this);
dbStorage = new Storage<>(storageDir);
@ -89,12 +112,33 @@ public class PeerManager implements ConnectionListener, MessageListener {
checkMaxConnections(MAX_CONNECTIONS);
}, 3);
};
// we check if app was idle for more then 5 sec.
listener = new Clock.Listener() {
@Override
public void onSecondTick() {
}
@Override
public void onMinuteTick() {
}
@Override
public void onMissedSecondTick(long missed) {
if (missed > Clock.IDLE_TOLERANCE) {
log.error("We have been idle for {} sec", missed / 1000);
listeners.stream().forEach(Listener::onAwakeFromStandby);
}
}
};
clock.addListener(listener);
}
public void shutDown() {
Log.traceCall();
networkNode.removeConnectionListener(this);
clock.removeListener(listener);
stopCheckMaxConnectionsTimer();
}
@ -102,6 +146,14 @@ public class PeerManager implements ConnectionListener, MessageListener {
return MAX_CONNECTIONS_ABSOLUTE;
}
public void addListener(Listener listener) {
listeners.add(listener);
}
public void removeListener(Listener listener) {
listeners.remove(listener);
}
///////////////////////////////////////////////////////////////////////////////////////////
// ConnectionListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@ -118,12 +170,21 @@ public class PeerManager implements ConnectionListener, MessageListener {
seedNodeAddresses.contains(peersNodeAddressOptional.get())) {
connection.setPeerType(Connection.PeerType.SEED_NODE);
}
if (lostAllConnections) {
lostAllConnections = false;
listeners.stream().forEach(Listener::onNewConnectionAfterAllConnectionsLost);
}
}
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
connection.getNodeAddressProperty().removeListener(connectionNodeAddressListener);
handleConnectionFault(connection);
lostAllConnections = networkNode.getAllConnections().isEmpty();
if (lostAllConnections)
listeners.stream().forEach(Listener::onAllConnectionsLost);
}
@Override

View File

@ -0,0 +1,120 @@
package io.bitsquare.p2p.peers.getdata;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import io.bitsquare.app.Log;
import io.bitsquare.common.Timer;
import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.network.CloseConnectionReason;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.peers.PeerManager;
import io.bitsquare.p2p.peers.getdata.messages.GetDataRequest;
import io.bitsquare.p2p.peers.getdata.messages.GetDataResponse;
import io.bitsquare.p2p.storage.P2PDataStorage;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
public class GetDataRequestHandler {
private static final Logger log = LoggerFactory.getLogger(GetDataRequestHandler.class);
private static final long TIME_OUT_SEC = 20;
///////////////////////////////////////////////////////////////////////////////////////////
// Listener
///////////////////////////////////////////////////////////////////////////////////////////
public interface Listener {
void onComplete();
void onFault(String errorMessage, Connection connection);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Class fields
///////////////////////////////////////////////////////////////////////////////////////////
private final NetworkNode networkNode;
private final PeerManager peerManager;
private P2PDataStorage dataStorage;
private final Listener listener;
private Timer timeoutTimer;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
public GetDataRequestHandler(NetworkNode networkNode, PeerManager peerManager, P2PDataStorage dataStorage, Listener listener) {
this.networkNode = networkNode;
this.peerManager = peerManager;
this.dataStorage = dataStorage;
this.listener = listener;
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void handle(GetDataRequest getDataRequest, final Connection connection) {
Log.traceCall(getDataRequest + "\n\tconnection=" + connection);
GetDataResponse getDataResponse = new GetDataResponse(new HashSet<>(dataStorage.getMap().values()),
getDataRequest.getNonce());
SettableFuture<Connection> future = networkNode.sendMessage(connection, getDataResponse);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("Send DataResponse to {} succeeded. getDataResponse={}",
connection.getPeersNodeAddressOptional(), getDataResponse);
cleanup();
listener.onComplete();
}
@Override
public void onFailure(@NotNull Throwable throwable) {
String errorMessage = "Sending getDataRequest to " + connection +
" failed. That is expected if the peer is offline. getDataResponse=" + getDataResponse + "." +
"Exception: " + throwable.getMessage();
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_FAILURE, connection);
}
});
checkArgument(timeoutTimer == null, "requestData must not be called twice.");
timeoutTimer = UserThread.runAfter(() -> {
String errorMessage = "A timeout occurred for getDataResponse:" + getDataResponse +
" on connection:" + connection;
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, connection);
},
TIME_OUT_SEC, TimeUnit.SECONDS);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private void handleFault(String errorMessage, CloseConnectionReason closeConnectionReason, Connection connection) {
log.info(errorMessage);
peerManager.shutDownConnection(connection, closeConnectionReason);
cleanup();
listener.onFault(errorMessage, connection);
}
private void cleanup() {
if (timeoutTimer != null) {
timeoutTimer.stop();
timeoutTimer = null;
}
}
}

View File

@ -0,0 +1,175 @@
package io.bitsquare.p2p.peers.getdata;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import io.bitsquare.app.Log;
import io.bitsquare.common.Timer;
import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.CloseConnectionReason;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.MessageListener;
import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.peers.PeerManager;
import io.bitsquare.p2p.peers.getdata.messages.GetDataRequest;
import io.bitsquare.p2p.peers.getdata.messages.GetDataResponse;
import io.bitsquare.p2p.peers.getdata.messages.GetUpdatedDataRequest;
import io.bitsquare.p2p.peers.getdata.messages.PreliminaryGetDataRequest;
import io.bitsquare.p2p.storage.P2PDataStorage;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;
import static com.google.common.base.Preconditions.checkArgument;
public class RequestDataHandler implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(RequestDataHandler.class);
///////////////////////////////////////////////////////////////////////////////////////////
// Listener
///////////////////////////////////////////////////////////////////////////////////////////
public interface Listener {
void onComplete();
void onFault(String errorMessage, @Nullable Connection connection);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Class fields
///////////////////////////////////////////////////////////////////////////////////////////
private final NetworkNode networkNode;
private final P2PDataStorage dataStorage;
private final PeerManager peerManager;
private final Listener listener;
private Timer timeoutTimer;
private final long nonce = new Random().nextLong();
private boolean stopped;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
public RequestDataHandler(NetworkNode networkNode, P2PDataStorage dataStorage, PeerManager peerManager,
Listener listener) {
this.networkNode = networkNode;
this.dataStorage = dataStorage;
this.peerManager = peerManager;
this.listener = listener;
networkNode.addMessageListener(this);
}
public void cleanup() {
Log.traceCall();
stopped = true;
networkNode.removeMessageListener(this);
stopTimeoutTimer();
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void requestData(NodeAddress nodeAddress) {
Log.traceCall("nodeAddress=" + nodeAddress);
if (!stopped) {
GetDataRequest getDataRequest;
if (networkNode.getNodeAddress() == null)
getDataRequest = new PreliminaryGetDataRequest(nonce);
else
getDataRequest = new GetUpdatedDataRequest(networkNode.getNodeAddress(), nonce);
log.info("We send a {} to peer {}. ", getDataRequest.getClass().getSimpleName(), nodeAddress);
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, getDataRequest);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
log.trace("Send " + getDataRequest + " to " + nodeAddress + " succeeded.");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
String errorMessage = "Sending getDataRequest to " + nodeAddress +
" failed. That is expected if the peer is offline.\n\t" +
"getDataRequest=" + getDataRequest + "." +
"\n\tException=" + throwable.getMessage();
log.info(errorMessage);
handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE);
}
});
checkArgument(timeoutTimer == null, "requestData must not be called twice.");
timeoutTimer = UserThread.runAfter(() -> {
String errorMessage = "A timeout occurred at sending getDataRequest:" + getDataRequest +
" on nodeAddress:" + nodeAddress;
log.info(errorMessage + " / RequestDataHandler=" + RequestDataHandler.this);
handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT);
},
10);
} else {
log.warn("We have stopped already. We ignore that requestData call.");
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// MessageListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onMessage(Message message, Connection connection) {
if (message instanceof GetDataResponse) {
Log.traceCall(message.toString() + "\n\tconnection=" + connection);
if (!stopped) {
GetDataResponse getDataResponse = (GetDataResponse) message;
if (getDataResponse.requestNonce == nonce) {
stopTimeoutTimer();
checkArgument(connection.getPeersNodeAddressOptional().isPresent(),
"RequestDataHandler.onMessage: connection.getPeersNodeAddressOptional() must be present " +
"at that moment");
((GetDataResponse) message).dataSet.stream()
.forEach(protectedData -> dataStorage.add(protectedData,
connection.getPeersNodeAddressOptional().get()));
cleanup();
listener.onComplete();
} else {
log.debug("Nonce not matching. That can happen rarely if we get a response after a canceled " +
"handshake (timeout causes connection close but peer might have sent a msg before " +
"connection was closed).\n\t" +
"We drop that message. nonce={} / requestNonce={}",
nonce, getDataResponse.requestNonce);
}
} else {
log.warn("We have stopped already. We ignore that onDataRequest call.");
}
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private void stopTimeoutTimer() {
if (timeoutTimer != null) {
timeoutTimer.stop();
timeoutTimer = null;
}
}
private void handleFault(String errorMessage, NodeAddress nodeAddress, CloseConnectionReason closeConnectionReason) {
cleanup();
peerManager.shutDownConnection(nodeAddress, closeConnectionReason);
listener.onFault(errorMessage, null);
}
}

View File

@ -1,207 +0,0 @@
package io.bitsquare.p2p.peers.getdata;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import io.bitsquare.app.Log;
import io.bitsquare.common.Timer;
import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.CloseConnectionReason;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.MessageListener;
import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.peers.PeerManager;
import io.bitsquare.p2p.peers.getdata.messages.GetDataRequest;
import io.bitsquare.p2p.peers.getdata.messages.GetDataResponse;
import io.bitsquare.p2p.peers.getdata.messages.GetUpdatedDataRequest;
import io.bitsquare.p2p.peers.getdata.messages.PreliminaryGetDataRequest;
import io.bitsquare.p2p.storage.P2PDataStorage;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
public class RequestDataHandshake implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(RequestDataHandshake.class);
///////////////////////////////////////////////////////////////////////////////////////////
// Listener
///////////////////////////////////////////////////////////////////////////////////////////
public interface Listener {
void onComplete();
void onFault(String errorMessage, @Nullable Connection connection);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Class fields
///////////////////////////////////////////////////////////////////////////////////////////
private final NetworkNode networkNode;
private final P2PDataStorage dataStorage;
private final PeerManager peerManager;
private final Listener listener;
private Timer timeoutTimer;
private final long nonce = new Random().nextLong();
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
public RequestDataHandshake(NetworkNode networkNode, P2PDataStorage dataStorage, PeerManager peerManager,
Listener listener) {
this.networkNode = networkNode;
this.dataStorage = dataStorage;
this.peerManager = peerManager;
this.listener = listener;
networkNode.addMessageListener(this);
}
public void shutDown() {
Log.traceCall();
networkNode.removeMessageListener(this);
stopTimeoutTimer();
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void requestData(NodeAddress nodeAddress) {
Log.traceCall("nodeAddress=" + nodeAddress);
GetDataRequest getDataRequest;
if (networkNode.getNodeAddress() == null)
getDataRequest = new PreliminaryGetDataRequest(nonce);
else
getDataRequest = new GetUpdatedDataRequest(networkNode.getNodeAddress(), nonce);
log.info("We send a {} to peer {}. ", getDataRequest.getClass().getSimpleName(), nodeAddress);
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, getDataRequest);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
log.trace("Send " + getDataRequest + " to " + nodeAddress + " succeeded.");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
String errorMessage = "Sending getDataRequest to " + nodeAddress +
" failed. That is expected if the peer is offline.\n\t" +
"getDataRequest=" + getDataRequest + "." +
"\n\tException=" + throwable.getMessage();
log.info(errorMessage);
peerManager.shutDownConnection(nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE);
shutDown();
listener.onFault(errorMessage, null);
}
});
checkArgument(timeoutTimer == null, "requestData must not be called twice.");
timeoutTimer = UserThread.runAfter(() -> {
String errorMessage = "A timeout occurred at sending getDataRequest:" + getDataRequest +
" on nodeAddress:" + nodeAddress;
log.info(errorMessage + " / RequestDataHandshake=" +
RequestDataHandshake.this);
peerManager.shutDownConnection(nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT);
shutDown();
listener.onFault(errorMessage, null);
},
10);
}
public void onDataRequest(Message message, final Connection connection) {
Log.traceCall(message.toString() + "\n\tconnection=" + connection);
GetDataResponse getDataResponse = new GetDataResponse(new HashSet<>(dataStorage.getMap().values()),
((GetDataRequest) message).getNonce());
SettableFuture<Connection> future = networkNode.sendMessage(connection, getDataResponse);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("Send DataResponse to {} succeeded. getDataResponse={}",
connection.getPeersNodeAddressOptional(), getDataResponse);
shutDown();
listener.onComplete();
}
@Override
public void onFailure(@NotNull Throwable throwable) {
String errorMessage = "Sending getDataRequest to " + connection +
" failed. That is expected if the peer is offline. getDataResponse=" + getDataResponse + "." +
"Exception: " + throwable.getMessage();
log.info(errorMessage);
peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE);
shutDown();
listener.onFault(errorMessage, connection);
}
});
checkArgument(timeoutTimer == null, "requestData must not be called twice.");
timeoutTimer = UserThread.runAfter(() -> {
String errorMessage = "A timeout occurred for getDataResponse:" + getDataResponse +
" on connection:" + connection;
log.info(errorMessage + " / RequestDataHandshake=" +
RequestDataHandshake.this);
peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_TIMEOUT);
shutDown();
listener.onFault(errorMessage, connection);
},
10, TimeUnit.SECONDS);
}
///////////////////////////////////////////////////////////////////////////////////////////
// MessageListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onMessage(Message message, Connection connection) {
if (message instanceof GetDataResponse) {
Log.traceCall(message.toString() + "\n\tconnection=" + connection);
GetDataResponse getDataResponse = (GetDataResponse) message;
if (getDataResponse.requestNonce == nonce) {
stopTimeoutTimer();
checkArgument(connection.getPeersNodeAddressOptional().isPresent(),
"RequestDataHandshake.onMessage: connection.getPeersNodeAddressOptional() must be present " +
"at that moment");
((GetDataResponse) message).dataSet.stream()
.forEach(protectedData -> dataStorage.add(protectedData,
connection.getPeersNodeAddressOptional().get()));
shutDown();
listener.onComplete();
} else {
log.debug("Nonce not matching. That can happen rarely if we get a response after a canceled " +
"handshake (timeout causes connection close but peer might have sent a msg before " +
"connection was closed).\n\t" +
"We drop that message. nonce={} / requestNonce={}",
nonce, getDataResponse.requestNonce);
}
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private void stopTimeoutTimer() {
if (timeoutTimer != null) {
timeoutTimer.stop();
timeoutTimer = null;
}
}
}

View File

@ -5,9 +5,7 @@ import io.bitsquare.common.Timer;
import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.MessageListener;
import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.network.*;
import io.bitsquare.p2p.peers.PeerManager;
import io.bitsquare.p2p.peers.getdata.messages.GetDataRequest;
import io.bitsquare.p2p.peers.peerexchange.ReportedPeer;
@ -17,12 +15,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
public class RequestDataManager implements MessageListener {
public class RequestDataManager implements MessageListener, ConnectionListener, PeerManager.Listener {
private static final Logger log = LoggerFactory.getLogger(RequestDataManager.class);
private static final long RETRY_DELAY_SEC = 10;
@ -55,11 +52,11 @@ public class RequestDataManager implements MessageListener {
private final Collection<NodeAddress> seedNodeAddresses;
private final Listener listener;
private final Map<NodeAddress, RequestDataHandshake> requestDataHandshakeMap = new HashMap<>();
private Optional<NodeAddress> nodeOfPreliminaryDataRequest = Optional.empty();
private Timer requestDataTimer;
private final Map<NodeAddress, RequestDataHandler> requestDataHandlerMap = new HashMap<>();
private Optional<NodeAddress> nodeAddressOfPreliminaryDataRequest = Optional.empty();
private Timer retryTimer;
private boolean dataUpdateRequested;
private boolean shutDownInProgress;
private boolean stopped;
///////////////////////////////////////////////////////////////////////////////////////////
@ -76,14 +73,16 @@ public class RequestDataManager implements MessageListener {
checkArgument(!seedNodeAddresses.isEmpty(), "seedNodeAddresses must not be empty.");
networkNode.addMessageListener(this);
peerManager.addListener(this);
}
public void shutDown() {
Log.traceCall();
shutDownInProgress = true;
stopRequestDataTimer();
stopped = true;
stopRetryTimer();
networkNode.removeMessageListener(this);
requestDataHandshakeMap.values().stream().forEach(RequestDataHandshake::shutDown);
peerManager.removeListener(this);
requestDataHandlerMap.values().stream().forEach(RequestDataHandler::cleanup);
}
@ -100,19 +99,73 @@ public class RequestDataManager implements MessageListener {
requestData(nextCandidate, nodeAddresses);
}
public void requestUpdatesData() {
public void requestUpdateData() {
Log.traceCall();
checkArgument(nodeOfPreliminaryDataRequest.isPresent(), "seedNodeOfPreliminaryDataRequest must be present");
checkArgument(nodeAddressOfPreliminaryDataRequest.isPresent(), "seedNodeOfPreliminaryDataRequest must be present");
dataUpdateRequested = true;
List<NodeAddress> remainingNodeAddresses = new ArrayList<>(seedNodeAddresses);
Collections.shuffle(remainingNodeAddresses);
NodeAddress candidate = nodeOfPreliminaryDataRequest.get();
NodeAddress candidate = nodeAddressOfPreliminaryDataRequest.get();
remainingNodeAddresses.remove(candidate);
requestData(candidate, remainingNodeAddresses);
}
public Optional<NodeAddress> getNodeOfPreliminaryDataRequest() {
return nodeOfPreliminaryDataRequest;
public Optional<NodeAddress> getNodeAddressOfPreliminaryDataRequest() {
return nodeAddressOfPreliminaryDataRequest;
}
///////////////////////////////////////////////////////////////////////////////////////////
// ConnectionListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onConnection(Connection connection) {
Log.traceCall();
// clean up in case we could not clean up at disconnect
closeRequestDataHandler(connection);
}
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
Log.traceCall();
closeRequestDataHandler(connection);
}
@Override
public void onError(Throwable throwable) {
}
///////////////////////////////////////////////////////////////////////////////////////////
// PeerManager.Listener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onAllConnectionsLost() {
Log.traceCall();
closeAllRequestDataHandlers();
stopRetryTimer();
stopped = true;
}
@Override
public void onNewConnectionAfterAllConnectionsLost() {
Log.traceCall();
closeAllRequestDataHandlers();
stopped = false;
retryAfterDelay();
}
@Override
public void onAwakeFromStandby() {
Log.traceCall();
closeAllRequestDataHandlers();
stopped = false;
if (!networkNode.getAllConnections().isEmpty())
retryAfterDelay();
}
@ -124,124 +177,147 @@ public class RequestDataManager implements MessageListener {
public void onMessage(Message message, Connection connection) {
if (message instanceof GetDataRequest) {
Log.traceCall(message.toString() + "\n\tconnection=" + connection);
RequestDataHandshake requestDataHandshake = new RequestDataHandshake(networkNode, dataStorage, peerManager,
new RequestDataHandshake.Listener() {
@Override
public void onComplete() {
log.trace("requestDataHandshake of inbound connection complete.\n\tConnection={}",
connection);
}
if (!stopped) {
if (peerManager.isSeedNode(connection))
connection.setPeerType(Connection.PeerType.SEED_NODE);
@Override
public void onFault(String errorMessage, @Nullable Connection connection) {
log.trace("requestDataHandshake of inbound connection failed.\n\tConnection={}\n\t" +
"ErrorMessage={}", connection, errorMessage);
peerManager.handleConnectionFault(connection);
}
});
requestDataHandshake.onDataRequest(message, connection);
GetDataRequestHandler getDataRequestHandler = new GetDataRequestHandler(networkNode, peerManager, dataStorage,
new GetDataRequestHandler.Listener() {
@Override
public void onComplete() {
log.trace("requestDataHandshake completed.\n\tConnection={}",
connection);
}
@Override
public void onFault(String errorMessage, @Nullable Connection connection) {
log.trace("GetDataRequestHandler failed.\n\tConnection={}\n\t" +
"ErrorMessage={}", connection, errorMessage);
peerManager.handleConnectionFault(connection);
}
});
getDataRequestHandler.handle((GetDataRequest) message, connection);
} else {
log.warn("We have stopped already. We ignore that onMessage call.");
}
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
// RequestData
///////////////////////////////////////////////////////////////////////////////////////////
private void requestData(NodeAddress nodeAddress, List<NodeAddress> remainingNodeAddresses) {
Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses);
if (!requestDataHandshakeMap.containsKey(nodeAddress)) {
RequestDataHandshake requestDataHandshake = new RequestDataHandshake(networkNode, dataStorage, peerManager,
new RequestDataHandshake.Listener() {
@Override
public void onComplete() {
log.trace("RequestDataHandshake of outbound connection complete. nodeAddress={}",
nodeAddress);
stopRequestDataTimer();
if (!stopped) {
if (!requestDataHandlerMap.containsKey(nodeAddress)) {
RequestDataHandler requestDataHandler = new RequestDataHandler(networkNode, dataStorage, peerManager,
new RequestDataHandler.Listener() {
@Override
public void onComplete() {
log.trace("RequestDataHandshake of outbound connection complete. nodeAddress={}",
nodeAddress);
stopRetryTimer();
// need to remove before listeners are notified as they cause the update call
requestDataHandshakeMap.remove(nodeAddress);
// need to remove before listeners are notified as they cause the update call
requestDataHandlerMap.remove(nodeAddress);
// 1. We get a response from requestPreliminaryData
if (!nodeOfPreliminaryDataRequest.isPresent()) {
nodeOfPreliminaryDataRequest = Optional.of(nodeAddress);
listener.onPreliminaryDataReceived();
// 1. We get a response from requestPreliminaryData
if (!nodeAddressOfPreliminaryDataRequest.isPresent()) {
nodeAddressOfPreliminaryDataRequest = Optional.of(nodeAddress);
listener.onPreliminaryDataReceived();
}
// 2. Later we get a response from requestUpdatesData
if (dataUpdateRequested) {
dataUpdateRequested = false;
listener.onUpdatedDataReceived();
}
listener.onDataReceived();
}
// 2. Later we get a response from requestUpdatesData
if (dataUpdateRequested) {
dataUpdateRequested = false;
listener.onUpdatedDataReceived();
}
@Override
public void onFault(String errorMessage, @Nullable Connection connection) {
log.trace("requestDataHandshake of outbound connection failed.\n\tnodeAddress={}\n\t" +
"ErrorMessage={}", nodeAddress, errorMessage);
listener.onDataReceived();
}
requestDataHandlerMap.remove(nodeAddress);
peerManager.handleConnectionFault(nodeAddress, connection);
@Override
public void onFault(String errorMessage, @Nullable Connection connection) {
log.trace("requestDataHandshake of outbound connection failed.\n\tnodeAddress={}\n\t" +
"ErrorMessage={}", nodeAddress, errorMessage);
if (!stopped) {
if (!remainingNodeAddresses.isEmpty()) {
log.info("There are remaining nodes available for requesting data. " +
"We will try requestDataFromPeers again.");
NodeAddress nextCandidate = remainingNodeAddresses.get(0);
remainingNodeAddresses.remove(nextCandidate);
requestData(nextCandidate, remainingNodeAddresses);
} else {
log.info("There is no remaining node available for requesting data. " +
"That is expected if no other node is online.\n\t" +
"We will try to use reported peers (if no available we use persisted peers) " +
"and try again to request data from our seed nodes after a random pause.");
peerManager.handleConnectionFault(nodeAddress, connection);
// Notify listeners
if (!nodeAddressOfPreliminaryDataRequest.isPresent()) {
if (peerManager.isSeedNode(nodeAddress))
listener.onNoSeedNodeAvailable();
else
listener.onNoPeersAvailable();
}
if (!shutDownInProgress) {
if (!remainingNodeAddresses.isEmpty()) {
log.info("There are remaining nodes available for requesting data. " +
"We will try requestDataFromPeers again.");
NodeAddress nextCandidate = remainingNodeAddresses.get(0);
remainingNodeAddresses.remove(nextCandidate);
requestData(nextCandidate, remainingNodeAddresses);
retryAfterDelay();
}
} else {
log.info("There is no remaining node available for requesting data. " +
"That is expected if no other node is online.\n\t" +
"We will try to use reported peers (if no available we use persisted peers) " +
"and try again to request data from our seed nodes after a random pause.");
// try again after a pause
stopRequestDataTimer();
requestDataTimer = UserThread.runAfter(() -> {
log.trace("requestDataAfterDelayTimer called");
// We want to keep it sorted but avoid duplicates
// We don't filter out already established connections for seed nodes as it might be that
// we got from the other seed node contacted but we still have not requested the initial
// data set
List<NodeAddress> list = new ArrayList<>(seedNodeAddresses);
Collections.shuffle(list);
list.addAll(getFilteredAndSortedList(peerManager.getReportedPeers(), list));
list.addAll(getFilteredAndSortedList(peerManager.getPersistedPeers(), list));
checkArgument(!list.isEmpty(), "seedNodeAddresses must not be empty.");
NodeAddress nextCandidate = list.get(0);
list.remove(nextCandidate);
requestData(nextCandidate, list);
},
RETRY_DELAY_SEC, TimeUnit.SECONDS);
}
requestDataHandshakeMap.remove(nodeAddress);
// Notify listeners
if (!nodeOfPreliminaryDataRequest.isPresent()) {
if (peerManager.isSeedNode(nodeAddress))
listener.onNoSeedNodeAvailable();
else
listener.onNoPeersAvailable();
log.warn("We have stopped already. We ignore that requestData.onFault call.");
}
}
}
});
requestDataHandshakeMap.put(nodeAddress, requestDataHandshake);
requestDataHandshake.requestData(nodeAddress);
});
requestDataHandlerMap.put(nodeAddress, requestDataHandler);
requestDataHandler.requestData(nodeAddress);
} else {
log.warn("We have started already a requestDataHandshake to peer. nodeAddress=" + nodeAddress);
}
} else {
log.warn("We have started already a requestDataHandshake to peer. nodeAddress=" + nodeAddress);
log.warn("We have stopped already. We ignore that requestData call.");
}
}
// sorted by most recent lastActivityDate
private List<NodeAddress> getFilteredAndSortedList(Set<ReportedPeer> set, List<NodeAddress> list) {
return set.stream()
.filter(e -> !list.contains(e.nodeAddress) &&
!peerManager.isSeedNode(e) &&
!peerManager.isSelf(e))
///////////////////////////////////////////////////////////////////////////////////////////
// Utils
///////////////////////////////////////////////////////////////////////////////////////////
private void retryAfterDelay() {
if (retryTimer == null) {
retryTimer = UserThread.runAfter(() -> {
log.trace("retryTimer called");
stopRetryTimer();
// We create a new list of candidates
// 1. shuffled seedNodes
// 2. reported peers sorted by last activity date
// 3. Add as last persisted peers sorted by last activity date
List<NodeAddress> list = getFilteredList(new ArrayList<>(seedNodeAddresses), new ArrayList<>());
Collections.shuffle(list);
List<NodeAddress> filteredReportedPeers = getFilteredNonSeedNodeList(getSortedNodeAddresses(peerManager.getReportedPeers()), list);
list.addAll(filteredReportedPeers);
List<NodeAddress> filteredPersistedPeers = getFilteredNonSeedNodeList(getSortedNodeAddresses(peerManager.getPersistedPeers()), list);
list.addAll(filteredPersistedPeers);
checkArgument(!list.isEmpty(), "seedNodeAddresses must not be empty.");
NodeAddress nextCandidate = list.get(0);
list.remove(nextCandidate);
requestData(nextCandidate, list);
},
RETRY_DELAY_SEC);
}
}
private List<NodeAddress> getSortedNodeAddresses(Collection<ReportedPeer> collection) {
return collection.stream()
.collect(Collectors.toList())
.stream()
.sorted((o1, o2) -> o2.date.compareTo(o1.date))
@ -249,10 +325,37 @@ public class RequestDataManager implements MessageListener {
.collect(Collectors.toList());
}
private void stopRequestDataTimer() {
if (requestDataTimer != null) {
requestDataTimer.stop();
requestDataTimer = null;
private List<NodeAddress> getFilteredList(Collection<NodeAddress> collection, List<NodeAddress> list) {
return collection.stream()
.filter(e -> !list.contains(e) &&
!peerManager.isSelf(e))
.collect(Collectors.toList());
}
private List<NodeAddress> getFilteredNonSeedNodeList(Collection<NodeAddress> collection, List<NodeAddress> list) {
return getFilteredList(collection, list).stream()
.filter(e -> !peerManager.isSeedNode(e))
.collect(Collectors.toList());
}
private void stopRetryTimer() {
if (retryTimer != null) {
retryTimer.stop();
retryTimer = null;
}
}
private void closeRequestDataHandler(Connection connection) {
if (connection.getPeersNodeAddressOptional().isPresent()) {
NodeAddress nodeAddress = connection.getPeersNodeAddressOptional().get();
requestDataHandlerMap.get(nodeAddress).cleanup();
requestDataHandlerMap.remove(nodeAddress);
}
}
private void closeAllRequestDataHandlers() {
requestDataHandlerMap.values().stream().forEach(RequestDataHandler::cleanup);
requestDataHandlerMap.clear();
}
}

View File

@ -5,7 +5,6 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import io.bitsquare.app.Log;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.CloseConnectionReason;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.MessageListener;
@ -23,9 +22,6 @@ import java.util.Random;
class KeepAliveHandler implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(KeepAliveHandler.class);
@Nullable
private Connection connection;
///////////////////////////////////////////////////////////////////////////////////////////
// Listener
@ -34,9 +30,7 @@ class KeepAliveHandler implements MessageListener {
public interface Listener {
void onComplete();
void onFault(String errorMessage, Connection connection);
void onFault(String errorMessage, NodeAddress nodeAddress);
void onFault(String errorMessage);
}
@ -48,6 +42,9 @@ class KeepAliveHandler implements MessageListener {
private final PeerManager peerManager;
private final Listener listener;
private final int nonce = new Random().nextInt();
@Nullable
private Connection connection;
private boolean stopped;
///////////////////////////////////////////////////////////////////////////////////////////
@ -61,6 +58,7 @@ class KeepAliveHandler implements MessageListener {
}
public void cleanup() {
stopped = true;
if (connection != null)
connection.removeMessageListener(this);
}
@ -70,58 +68,33 @@ class KeepAliveHandler implements MessageListener {
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void sendPing(Connection connection) {
Log.traceCall("connection=" + connection + " / this=" + this);
this.connection = connection;
connection.addMessageListener(this);
Ping ping = new Ping(nonce);
SettableFuture<Connection> future = networkNode.sendMessage(connection, ping);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("Send " + ping + " to " + connection + " succeeded.");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
String errorMessage = "Sending ping to " + connection +
" failed. That is expected if the peer is offline.\n\tping=" + ping +
".\n\tException=" + throwable.getMessage();
log.info(errorMessage);
cleanup();
peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE);
listener.onFault(errorMessage, connection);
}
});
}
public void sendPing(NodeAddress nodeAddress) {
Log.traceCall("nodeAddress=" + nodeAddress + " / this=" + this);
Ping ping = new Ping(nonce);
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, ping);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
if (connection != null) {
KeepAliveHandler.this.connection = connection;
connection.addMessageListener(KeepAliveHandler.this);
if (!stopped) {
this.connection = connection;
connection.addMessageListener(this);
Ping ping = new Ping(nonce);
SettableFuture<Connection> future = networkNode.sendMessage(connection, ping);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("Send " + ping + " to " + connection + " succeeded.");
}
log.trace("Send " + ping + " to " + nodeAddress + " succeeded.");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
String errorMessage = "Sending ping to " + nodeAddress +
" failed. That is expected if the peer is offline.\n\tping=" + ping +
".\n\tException=" + throwable.getMessage();
log.info(errorMessage);
cleanup();
peerManager.shutDownConnection(nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE);
listener.onFault(errorMessage, nodeAddress);
}
});
@Override
public void onFailure(@NotNull Throwable throwable) {
String errorMessage = "Sending ping to " + connection +
" failed. That is expected if the peer is offline.\n\tping=" + ping +
".\n\tException=" + throwable.getMessage();
log.info(errorMessage);
cleanup();
peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE);
listener.onFault(errorMessage);
}
});
} else {
log.warn("We have stopped already. We ignore that sendPing call.");
}
}
@ -133,16 +106,19 @@ class KeepAliveHandler implements MessageListener {
public void onMessage(Message message, Connection connection) {
if (message instanceof Pong) {
Log.traceCall(message.toString() + "\n\tconnection=" + connection);
Pong pong = (Pong) message;
if (pong.requestNonce == nonce) {
cleanup();
listener.onComplete();
if (!stopped) {
Pong pong = (Pong) message;
if (pong.requestNonce == nonce) {
cleanup();
listener.onComplete();
} else {
log.warn("Nonce not matching. That should never happen.\n\t" +
"We drop that message. nonce={} / requestNonce={}",
nonce, pong.requestNonce);
}
} else {
log.warn("Nonce not matching. That should never happen.\n\t" +
"We drop that message. nonce={} / requestNonce={}",
nonce, pong.requestNonce);
log.warn("We have stopped already. We ignore that onMessage call.");
}
}
}
}

View File

@ -2,13 +2,11 @@ package io.bitsquare.p2p.peers.keepalive;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.bitsquare.app.Log;
import io.bitsquare.common.Timer;
import io.bitsquare.common.UserThread;
import io.bitsquare.common.util.Utilities;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.*;
import io.bitsquare.p2p.peers.PeerManager;
import io.bitsquare.p2p.peers.keepalive.messages.Ping;
@ -19,10 +17,8 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class KeepAliveManager implements MessageListener, ConnectionListener {
public class KeepAliveManager implements MessageListener, ConnectionListener, PeerManager.Listener {
private static final Logger log = LoggerFactory.getLogger(KeepAliveManager.class);
//private static final int INTERVAL_SEC = new Random().nextInt(10) + 10;
@ -31,9 +27,9 @@ public class KeepAliveManager implements MessageListener, ConnectionListener {
private final NetworkNode networkNode;
private final PeerManager peerManager;
private ScheduledThreadPoolExecutor executor;
private final Map<String, KeepAliveHandler> maintenanceHandlerMap = new HashMap<>();
private boolean shutDownInProgress;
private boolean stopped;
private Timer keepAliveTimer;
///////////////////////////////////////////////////////////////////////////////////////////
@ -46,18 +42,20 @@ public class KeepAliveManager implements MessageListener, ConnectionListener {
networkNode.addMessageListener(this);
networkNode.addConnectionListener(this);
peerManager.addListener(this);
}
public void shutDown() {
Log.traceCall();
shutDownInProgress = true;
stopped = true;
networkNode.removeMessageListener(this);
networkNode.removeConnectionListener(this);
maintenanceHandlerMap.values().stream().forEach(KeepAliveHandler::cleanup);
peerManager.removeListener(this);
if (executor != null)
MoreExecutors.shutdownAndAwaitTermination(executor, 100, TimeUnit.MILLISECONDS);
closeAllMaintenanceHandlers();
stopKeepAliveTimer();
}
@ -66,11 +64,9 @@ public class KeepAliveManager implements MessageListener, ConnectionListener {
///////////////////////////////////////////////////////////////////////////////////////////
public void start() {
if (executor == null) {
executor = Utilities.getScheduledThreadPoolExecutor("KeepAliveManager", 1, 2, 5);
executor.scheduleAtFixedRate(() -> UserThread.execute(this::keepAlive),
INTERVAL_SEC, INTERVAL_SEC, TimeUnit.SECONDS);
}
stopped = false;
if (keepAliveTimer == null)
keepAliveTimer = UserThread.runPeriodically(this::keepAlive, INTERVAL_SEC);
}
@ -82,26 +78,29 @@ public class KeepAliveManager implements MessageListener, ConnectionListener {
public void onMessage(Message message, Connection connection) {
if (message instanceof Ping) {
Log.traceCall(message.toString() + "\n\tconnection=" + connection);
if (!stopped) {
Ping ping = (Ping) message;
Pong pong = new Pong(ping.nonce);
SettableFuture<Connection> future = networkNode.sendMessage(connection, pong);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("Pong sent successfully");
}
Ping ping = (Ping) message;
Pong pong = new Pong(ping.nonce);
SettableFuture<Connection> future = networkNode.sendMessage(connection, pong);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("Pong sent successfully");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
String errorMessage = "Sending pong to " + connection +
" failed. That is expected if the peer is offline. pong=" + pong + "." +
"Exception: " + throwable.getMessage();
log.info(errorMessage);
peerManager.handleConnectionFault(connection);
peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE);
}
});
@Override
public void onFailure(@NotNull Throwable throwable) {
String errorMessage = "Sending pong to " + connection +
" failed. That is expected if the peer is offline. pong=" + pong + "." +
"Exception: " + throwable.getMessage();
log.info(errorMessage);
peerManager.handleConnectionFault(connection);
peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE);
}
});
} else {
log.warn("We have stopped already. We ignore that onMessage call.");
}
}
}
@ -112,15 +111,15 @@ public class KeepAliveManager implements MessageListener, ConnectionListener {
@Override
public void onConnection(Connection connection) {
Log.traceCall();
// clean up in case we could not clean up at disconnect
if (connection.getPeersNodeAddressOptional().isPresent())
maintenanceHandlerMap.remove(connection.getPeersNodeAddressOptional().get().getFullAddress());
closeMaintenanceHandler(connection);
}
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
if (connection.getPeersNodeAddressOptional().isPresent())
maintenanceHandlerMap.remove(connection.getPeersNodeAddressOptional().get().getFullAddress());
Log.traceCall();
closeMaintenanceHandler(connection);
}
@Override
@ -128,58 +127,92 @@ public class KeepAliveManager implements MessageListener, ConnectionListener {
}
///////////////////////////////////////////////////////////////////////////////////////////
// PeerManager.Listener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onAllConnectionsLost() {
Log.traceCall();
closeAllMaintenanceHandlers();
stopKeepAliveTimer();
}
@Override
public void onNewConnectionAfterAllConnectionsLost() {
Log.traceCall();
closeAllMaintenanceHandlers();
start();
}
@Override
public void onAwakeFromStandby() {
Log.traceCall();
closeAllMaintenanceHandlers();
if (!networkNode.getAllConnections().isEmpty())
start();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private void keepAlive() {
Log.traceCall();
if (!shutDownInProgress) {
if (!stopped) {
Log.traceCall();
networkNode.getConfirmedConnections().stream()
.filter(connection -> connection instanceof OutboundConnection)
.forEach(connection -> {
if (!maintenanceHandlerMap.containsKey(getKey(connection))) {
final String uid = connection.getUid();
if (!maintenanceHandlerMap.containsKey(uid)) {
KeepAliveHandler keepAliveHandler = new KeepAliveHandler(networkNode, peerManager, new KeepAliveHandler.Listener() {
@Override
public void onComplete() {
maintenanceHandlerMap.remove(getKey(connection));
maintenanceHandlerMap.remove(uid);
}
@Override
public void onFault(String errorMessage, Connection connection) {
maintenanceHandlerMap.remove(getKey(connection));
}
@Override
public void onFault(String errorMessage, NodeAddress nodeAddress) {
maintenanceHandlerMap.remove(nodeAddress.getFullAddress());
public void onFault(String errorMessage) {
maintenanceHandlerMap.remove(uid);
}
});
maintenanceHandlerMap.put(getKey(connection), keepAliveHandler);
maintenanceHandlerMap.put(uid, keepAliveHandler);
keepAliveHandler.sendPing(connection);
} else {
log.warn("Connection with id {} has not completed and is still in our map. " +
"We will try to ping that peer at the next schedule.", getKey(connection));
"We will try to ping that peer at the next schedule.", uid);
}
});
int size = maintenanceHandlerMap.size();
log.info("maintenanceHandlerMap size=" + size);
if (size > peerManager.getMaxConnections())
log.warn("Seems we don't clean up out map correctly.\n" +
log.warn("Seems we didn't clean up out map correctly.\n" +
"maintenanceHandlerMap size={}, peerManager.getMaxConnections()={}", size, peerManager.getMaxConnections());
} else {
log.warn("We have stopped already. We ignore that keepAlive call.");
}
}
private String getKey(Connection connection) {
private void closeMaintenanceHandler(Connection connection) {
if (connection.getPeersNodeAddressOptional().isPresent()) {
return connection.getPeersNodeAddressOptional().get().getFullAddress();
} else {
// TODO not sure if that can be the case, but handle it otherwise we get an exception
log.warn("!connection.getPeersNodeAddressOptional().isPresent(). That should not happen.");
return "null";
String uid = connection.getUid();
maintenanceHandlerMap.get(uid).cleanup();
maintenanceHandlerMap.remove(uid);
}
}
private void closeAllMaintenanceHandlers() {
maintenanceHandlerMap.values().stream().forEach(KeepAliveHandler::cleanup);
maintenanceHandlerMap.clear();
}
private void stopKeepAliveTimer() {
stopped = true;
if (keepAliveTimer != null) {
keepAliveTimer.stop();
keepAliveTimer = null;
}
}
}

View File

@ -107,10 +107,10 @@ class GetPeersRequestHandler {
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private void handleFault(String errorMessage, CloseConnectionReason sendMsgFailure, Connection connection) {
private void handleFault(String errorMessage, CloseConnectionReason closeConnectionReason, Connection connection) {
// TODO retry
cleanup();
peerManager.shutDownConnection(connection, sendMsgFailure);
peerManager.shutDownConnection(connection, closeConnectionReason);
listener.onFault(errorMessage, connection);
}

View File

@ -52,6 +52,7 @@ class PeerExchangeHandler implements MessageListener {
private final int nonce = new Random().nextInt();
private Timer timeoutTimer;
private Connection connection;
private boolean stopped;
///////////////////////////////////////////////////////////////////////////////////////////
@ -65,6 +66,7 @@ class PeerExchangeHandler implements MessageListener {
}
public void cleanup() {
stopped = true;
if (connection != null)
connection.removeMessageListener(this);
@ -81,43 +83,47 @@ class PeerExchangeHandler implements MessageListener {
public void sendGetPeersRequest(NodeAddress nodeAddress) {
Log.traceCall("nodeAddress=" + nodeAddress + " / this=" + this);
if (networkNode.getNodeAddress() != null) {
GetPeersRequest getPeersRequest = new GetPeersRequest(networkNode.getNodeAddress(), nonce, peerManager.getConnectedPeersNonSeedNodes(nodeAddress));
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, getPeersRequest);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
if (!connection.getPeersNodeAddressOptional().isPresent()) {
connection.setPeersNodeAddress(nodeAddress);
//TODO remove setPeersNodeAddress if never needed
log.warn("sendGetPeersRequest: !connection.getPeersNodeAddressOptional().isPresent()");
if (!stopped) {
if (networkNode.getNodeAddress() != null) {
GetPeersRequest getPeersRequest = new GetPeersRequest(networkNode.getNodeAddress(), nonce, peerManager.getConnectedPeersNonSeedNodes(nodeAddress));
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, getPeersRequest);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
if (!connection.getPeersNodeAddressOptional().isPresent()) {
connection.setPeersNodeAddress(nodeAddress);
//TODO remove setPeersNodeAddress if never needed
log.warn("sendGetPeersRequest: !connection.getPeersNodeAddressOptional().isPresent()");
}
PeerExchangeHandler.this.connection = connection;
connection.addMessageListener(PeerExchangeHandler.this);
log.trace("Send " + getPeersRequest + " to " + nodeAddress + " succeeded.");
}
PeerExchangeHandler.this.connection = connection;
connection.addMessageListener(PeerExchangeHandler.this);
log.trace("Send " + getPeersRequest + " to " + nodeAddress + " succeeded.");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
String errorMessage = "Sending getPeersRequest to " + nodeAddress +
" failed. That is expected if the peer is offline.\n\tgetPeersRequest=" + getPeersRequest +
".\n\tException=" + throwable.getMessage();
log.info(errorMessage);
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_FAILURE, nodeAddress);
}
});
@Override
public void onFailure(@NotNull Throwable throwable) {
String errorMessage = "Sending getPeersRequest to " + nodeAddress +
" failed. That is expected if the peer is offline.\n\tgetPeersRequest=" + getPeersRequest +
".\n\tException=" + throwable.getMessage();
log.info(errorMessage);
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_FAILURE, nodeAddress);
}
});
checkArgument(timeoutTimer == null, "requestReportedPeers must not be called twice.");
timeoutTimer = UserThread.runAfter(() -> {
String errorMessage = "A timeout occurred at sending getPeersRequest:" + getPeersRequest + " for nodeAddress:" + nodeAddress;
log.info(errorMessage + " / PeerExchangeHandler=" +
PeerExchangeHandler.this);
log.info("timeoutTimer called on " + this);
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, nodeAddress);
},
TIME_OUT_SEC, TimeUnit.SECONDS);
checkArgument(timeoutTimer == null, "requestReportedPeers must not be called twice.");
timeoutTimer = UserThread.runAfter(() -> {
String errorMessage = "A timeout occurred at sending getPeersRequest:" + getPeersRequest + " for nodeAddress:" + nodeAddress;
log.info(errorMessage + " / PeerExchangeHandler=" +
PeerExchangeHandler.this);
log.info("timeoutTimer called on " + this);
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, nodeAddress);
},
TIME_OUT_SEC, TimeUnit.SECONDS);
} else {
log.warn("My node address is still null at sendGetPeersRequest. We ignore that call.");
}
} else {
log.trace("My node address is still null at sendGetPeersRequest. We ignore that call.");
log.warn("We have stopped that handler already. We ignore that sendGetPeersRequest call.");
}
}
@ -128,23 +134,24 @@ class PeerExchangeHandler implements MessageListener {
@Override
public void onMessage(Message message, Connection connection) {
if (message instanceof GetPeersResponse) {
GetPeersResponse getPeersResponse = (GetPeersResponse) message;
if (peerManager.isSeedNode(connection))
connection.setPeerType(Connection.PeerType.SEED_NODE);
// Check if the response is for our request
if (getPeersResponse.requestNonce == nonce) {
if (!stopped) {
Log.traceCall(message.toString() + "\n\tconnection=" + connection);
Log.traceCall("this=" + this);
peerManager.addToReportedPeers(getPeersResponse.reportedPeers, connection);
GetPeersResponse getPeersResponse = (GetPeersResponse) message;
if (peerManager.isSeedNode(connection))
connection.setPeerType(Connection.PeerType.SEED_NODE);
cleanup();
listener.onComplete();
// Check if the response is for our request
if (getPeersResponse.requestNonce == nonce) {
peerManager.addToReportedPeers(getPeersResponse.reportedPeers, connection);
cleanup();
listener.onComplete();
} else {
log.warn("Nonce not matching. That should never happen.\n\t" +
"We drop that message. nonce={} / requestNonce={}",
nonce, getPeersResponse.requestNonce);
}
} else {
log.warn("Nonce not matching. That should never happen.\n\t" +
"We drop that message. nonce={} / requestNonce={}",
nonce, getPeersResponse.requestNonce);
log.warn("We have stopped that handler already. We ignore that onMessage call.");
}
}
}

View File

@ -1,10 +1,8 @@
package io.bitsquare.p2p.peers.peerexchange;
import com.google.common.util.concurrent.MoreExecutors;
import io.bitsquare.app.Log;
import io.bitsquare.common.Timer;
import io.bitsquare.common.UserThread;
import io.bitsquare.common.util.Utilities;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.*;
@ -15,17 +13,16 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.*;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
public class PeerExchangeManager implements MessageListener, ConnectionListener {
public class PeerExchangeManager implements MessageListener, ConnectionListener, PeerManager.Listener {
private static final Logger log = LoggerFactory.getLogger(PeerExchangeManager.class);
private static final long RETRY_DELAY_SEC = 60;
private static final long RETRY_DELAY_SEC = 10;
private static final long RETRY_DELAY_AFTER_ALL_CON_LOST_SEC = 3;
private static final long REQUEST_PERIODICALLY_INTERVAL_MINUTES = 10;
@ -33,9 +30,8 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
private final PeerManager peerManager;
private final Set<NodeAddress> seedNodeAddresses;
private final Map<NodeAddress, PeerExchangeHandler> peerExchangeHandlerMap = new HashMap<>();
private Timer connectToMorePeersTimer;
private boolean shutDownInProgress;
private ScheduledThreadPoolExecutor executor;
private Timer retryTimer, periodicTimer;
private boolean stopped;
///////////////////////////////////////////////////////////////////////////////////////////
@ -50,18 +46,20 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
networkNode.addMessageListener(this);
networkNode.addConnectionListener(this);
peerManager.addListener(this);
}
public void shutDown() {
Log.traceCall();
shutDownInProgress = true;
stopped = true;
networkNode.removeMessageListener(this);
networkNode.removeConnectionListener(this);
stopConnectToMorePeersTimer();
peerExchangeHandlerMap.values().stream().forEach(PeerExchangeHandler::cleanup);
peerManager.removeListener(this);
if (executor != null)
MoreExecutors.shutdownAndAwaitTermination(executor, 100, TimeUnit.MILLISECONDS);
stopPeriodicTimer();
stopRetryTimer();
closeAllPeerExchangeHandlers();
}
@ -76,39 +74,32 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
Collections.shuffle(remainingNodeAddresses);
requestReportedPeers(nodeAddress, remainingNodeAddresses);
if (executor == null) {
executor = Utilities.getScheduledThreadPoolExecutor("PeerExchangeManager", 1, 2, 5);
executor.scheduleAtFixedRate(() -> UserThread.execute(this::requestAgain),
REQUEST_PERIODICALLY_INTERVAL_MINUTES, REQUEST_PERIODICALLY_INTERVAL_MINUTES, TimeUnit.MINUTES);
}
startPeriodicTimer();
}
///////////////////////////////////////////////////////////////////////////////////////////
// ConnectionListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onConnection(Connection connection) {
if (connection.getPeersNodeAddressOptional().isPresent())
peerExchangeHandlerMap.remove(connection.getPeersNodeAddressOptional().get().getFullAddress());
Log.traceCall();
// clean up in case we could not clean up at disconnect
closePeerExchangeHandler(connection);
}
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
if (connection.getPeersNodeAddressOptional().isPresent())
peerExchangeHandlerMap.remove(connection.getPeersNodeAddressOptional().get().getFullAddress());
boolean lostAllConnections = networkNode.getAllConnections().isEmpty();
if (lostAllConnections || connectToMorePeersTimer == null) {
long delaySec = lostAllConnections ? RETRY_DELAY_AFTER_ALL_CON_LOST_SEC : RETRY_DELAY_SEC;
if (lostAllConnections && connectToMorePeersTimer != null)
connectToMorePeersTimer.stop();
connectToMorePeersTimer = UserThread.runAfter(() -> {
Log.traceCall();
closePeerExchangeHandler(connection);
if (retryTimer == null) {
retryTimer = UserThread.runAfter(() -> {
log.trace("ConnectToMorePeersTimer called from onDisconnect code path");
stopConnectToMorePeersTimer();
stopRetryTimer();
requestWithAvailablePeers();
}, delaySec);
}, RETRY_DELAY_SEC);
}
}
@ -117,6 +108,36 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
}
///////////////////////////////////////////////////////////////////////////////////////////
// PeerManager.Listener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onAllConnectionsLost() {
Log.traceCall();
closeAllPeerExchangeHandlers();
stopPeriodicTimer();
stopRetryTimer();
}
@Override
public void onNewConnectionAfterAllConnectionsLost() {
Log.traceCall();
closeAllPeerExchangeHandlers();
restart();
}
@Override
public void onAwakeFromStandby() {
Log.traceCall();
closeAllPeerExchangeHandlers();
if (!networkNode.getAllConnections().isEmpty())
restart();
}
///////////////////////////////////////////////////////////////////////////////////////////
// MessageListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@ -125,26 +146,29 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
public void onMessage(Message message, Connection connection) {
if (message instanceof GetPeersRequest) {
Log.traceCall(message.toString() + "\n\tconnection=" + connection);
if (!stopped) {
if (peerManager.isSeedNode(connection))
connection.setPeerType(Connection.PeerType.SEED_NODE);
if (peerManager.isSeedNode(connection))
connection.setPeerType(Connection.PeerType.SEED_NODE);
GetPeersRequestHandler getPeersRequestHandler = new GetPeersRequestHandler(networkNode,
peerManager,
new GetPeersRequestHandler.Listener() {
@Override
public void onComplete() {
log.trace("PeerExchangeHandshake completed.\n\tConnection={}", connection);
}
GetPeersRequestHandler getPeersRequestHandler = new GetPeersRequestHandler(networkNode,
peerManager,
new GetPeersRequestHandler.Listener() {
@Override
public void onComplete() {
log.trace("PeerExchangeHandshake of inbound connection complete.\n\tConnection={}", connection);
}
@Override
public void onFault(String errorMessage, Connection connection) {
log.trace("PeerExchangeHandshake of outbound connection failed.\n\terrorMessage={}\n\t" +
"connection={}", errorMessage, connection);
peerManager.handleConnectionFault(connection);
}
});
getPeersRequestHandler.handle((GetPeersRequest) message, connection);
@Override
public void onFault(String errorMessage, Connection connection) {
log.trace("PeerExchangeHandshake failed.\n\terrorMessage={}\n\t" +
"connection={}", errorMessage, connection);
peerManager.handleConnectionFault(connection);
}
});
getPeersRequestHandler.handle((GetPeersRequest) message, connection);
} else {
log.warn("We have stopped already. We ignore that onMessage call.");
}
}
}
@ -155,122 +179,130 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
private void requestReportedPeers(NodeAddress nodeAddress, List<NodeAddress> remainingNodeAddresses) {
Log.traceCall("nodeAddress=" + nodeAddress);
if (!peerExchangeHandlerMap.containsKey(nodeAddress)) {
PeerExchangeHandler peerExchangeHandler = new PeerExchangeHandler(networkNode,
peerManager,
new PeerExchangeHandler.Listener() {
@Override
public void onComplete() {
log.trace("PeerExchangeHandshake of outbound connection complete. nodeAddress={}", nodeAddress);
peerExchangeHandlerMap.remove(nodeAddress);
requestWithAvailablePeers();
}
if (!stopped) {
if (!peerExchangeHandlerMap.containsKey(nodeAddress)) {
PeerExchangeHandler peerExchangeHandler = new PeerExchangeHandler(networkNode,
peerManager,
new PeerExchangeHandler.Listener() {
@Override
public void onComplete() {
log.trace("PeerExchangeHandshake of outbound connection complete. nodeAddress={}", nodeAddress);
peerExchangeHandlerMap.remove(nodeAddress);
requestWithAvailablePeers();
}
@Override
public void onFault(String errorMessage, @Nullable Connection connection) {
log.trace("PeerExchangeHandshake of outbound connection failed.\n\terrorMessage={}\n\t" +
"nodeAddress={}", errorMessage, nodeAddress);
@Override
public void onFault(String errorMessage, @Nullable Connection connection) {
log.trace("PeerExchangeHandshake of outbound connection failed.\n\terrorMessage={}\n\t" +
"nodeAddress={}", errorMessage, nodeAddress);
peerExchangeHandlerMap.remove(nodeAddress);
peerManager.handleConnectionFault(nodeAddress, connection);
if (!shutDownInProgress) {
if (!remainingNodeAddresses.isEmpty()) {
if (!peerManager.hasSufficientConnections()) {
log.info("There are remaining nodes available for requesting peers. " +
"We will try getReportedPeers again.");
NodeAddress nextCandidate = remainingNodeAddresses.get(new Random().nextInt(remainingNodeAddresses.size()));
remainingNodeAddresses.remove(nextCandidate);
requestReportedPeers(nextCandidate, remainingNodeAddresses);
peerExchangeHandlerMap.remove(nodeAddress);
peerManager.handleConnectionFault(nodeAddress, connection);
if (!stopped) {
if (!remainingNodeAddresses.isEmpty()) {
if (!peerManager.hasSufficientConnections()) {
log.info("There are remaining nodes available for requesting peers. " +
"We will try getReportedPeers again.");
NodeAddress nextCandidate = remainingNodeAddresses.get(new Random().nextInt(remainingNodeAddresses.size()));
remainingNodeAddresses.remove(nextCandidate);
requestReportedPeers(nextCandidate, remainingNodeAddresses);
} else {
// That path will rarely be reached
log.info("We have already sufficient connections.");
}
} else {
// That path will rarely be reached
log.info("We have already sufficient connections.");
log.info("There is no remaining node available for requesting peers. " +
"That is expected if no other node is online.\n\t" +
"We will try again after a pause.");
if (retryTimer == null)
retryTimer = UserThread.runAfter(() -> {
log.trace("ConnectToMorePeersTimer called from requestReportedPeers code path");
stopRetryTimer();
requestWithAvailablePeers();
}, RETRY_DELAY_SEC);
}
} else {
log.info("There is no remaining node available for requesting peers. " +
"That is expected if no other node is online.\n\t" +
"We will try again after a pause.");
if (connectToMorePeersTimer == null)
connectToMorePeersTimer = UserThread.runAfter(() -> {
log.trace("ConnectToMorePeersTimer called from requestReportedPeers code path");
stopConnectToMorePeersTimer();
requestWithAvailablePeers();
}, RETRY_DELAY_SEC);
}
}
}
});
peerExchangeHandlerMap.put(nodeAddress, peerExchangeHandler);
peerExchangeHandler.sendGetPeersRequest(nodeAddress);
});
peerExchangeHandlerMap.put(nodeAddress, peerExchangeHandler);
peerExchangeHandler.sendGetPeersRequest(nodeAddress);
} else {
//TODO check when that happens
log.warn("We have started already a peerExchangeHandler. " +
"We ignore that call. nodeAddress=" + nodeAddress);
}
} else {
//TODO check when that happens
log.warn("We have started already a peerExchangeHandshake. " +
"We ignore that call. " +
"nodeAddress=" + nodeAddress);
log.warn("We have stopped already. We ignore that requestReportedPeers call.");
}
}
private void requestWithAvailablePeers() {
Log.traceCall();
if (!stopped) {
if (!peerManager.hasSufficientConnections()) {
// We create a new list of not connected candidates
// 1. shuffled reported peers
// 2. shuffled persisted peers
// 3. Add as last shuffled seedNodes (least priority)
List<NodeAddress> list = getFilteredNonSeedNodeList(getNodeAddresses(peerManager.getReportedPeers()), new ArrayList<>());
Collections.shuffle(list);
if (!peerManager.hasSufficientConnections()) {
// We create a new list of not connected candidates
// 1. reported shuffled peers
// 2. persisted shuffled peers
// 3. Add as last shuffled seedNodes (least priority)
List<NodeAddress> list = getFilteredNonSeedNodeList(getNodeAddresses(peerManager.getReportedPeers()), new ArrayList<>());
Collections.shuffle(list);
List<NodeAddress> filteredPersistedPeers = getFilteredNonSeedNodeList(getNodeAddresses(peerManager.getPersistedPeers()), list);
Collections.shuffle(filteredPersistedPeers);
list.addAll(filteredPersistedPeers);
List<NodeAddress> filteredPersistedPeers = getFilteredNonSeedNodeList(getNodeAddresses(peerManager.getPersistedPeers()), list);
Collections.shuffle(filteredPersistedPeers);
list.addAll(filteredPersistedPeers);
List<NodeAddress> filteredSeedNodeAddresses = getFilteredList(new ArrayList<>(seedNodeAddresses), list);
Collections.shuffle(filteredSeedNodeAddresses);
list.addAll(filteredSeedNodeAddresses);
List<NodeAddress> filteredSeedNodeAddresses = getFilteredList(new ArrayList<>(seedNodeAddresses), list);
Collections.shuffle(filteredSeedNodeAddresses);
list.addAll(filteredSeedNodeAddresses);
log.info("Number of peers in list for connectToMorePeers: {}", list.size());
log.trace("Filtered connectToMorePeers list: list=" + list);
if (!list.isEmpty()) {
// Dont shuffle as we want the seed nodes at the last entries
NodeAddress nextCandidate = list.get(0);
list.remove(nextCandidate);
requestReportedPeers(nextCandidate, list);
log.info("Number of peers in list for connectToMorePeers: {}", list.size());
log.trace("Filtered connectToMorePeers list: list=" + list);
if (!list.isEmpty()) {
// Dont shuffle as we want the seed nodes at the last entries
NodeAddress nextCandidate = list.get(0);
list.remove(nextCandidate);
requestReportedPeers(nextCandidate, list);
} else {
log.info("No more peers are available for requestReportedPeers. We will try again after a pause.");
if (retryTimer == null)
retryTimer = UserThread.runAfter(() -> {
log.trace("ConnectToMorePeersTimer called from requestWithAvailablePeers code path");
stopRetryTimer();
requestWithAvailablePeers();
}, RETRY_DELAY_SEC);
}
} else {
log.info("No more peers are available for requestReportedPeers. We will try again after a pause.");
if (connectToMorePeersTimer == null)
connectToMorePeersTimer = UserThread.runAfter(() -> {
log.trace("ConnectToMorePeersTimer called from requestWithAvailablePeers code path");
stopConnectToMorePeersTimer();
requestWithAvailablePeers();
}, RETRY_DELAY_SEC);
log.info("We have already sufficient connections.");
}
} else {
log.info("We have already sufficient connections.");
log.warn("We have stopped already. We ignore that requestWithAvailablePeers call.");
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// Maintenance
///////////////////////////////////////////////////////////////////////////////////////////
private void requestAgain() {
checkNotNull(networkNode.getNodeAddress(), "My node address must not be null at requestAgain");
Set<NodeAddress> candidates = new HashSet<>(getNodeAddresses(peerManager.getReportedPeers()));
candidates.addAll(getNodeAddresses(peerManager.getPersistedPeers()));
candidates.addAll(seedNodeAddresses);
candidates.remove(networkNode.getNodeAddress());
ArrayList<NodeAddress> list = new ArrayList<>(candidates);
Collections.shuffle(list);
NodeAddress candidate = list.remove(0);
requestReportedPeers(candidate, list);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Utils
///////////////////////////////////////////////////////////////////////////////////////////
private void startPeriodicTimer() {
stopped = false;
if (periodicTimer == null)
periodicTimer = UserThread.runPeriodically(this::requestWithAvailablePeers,
REQUEST_PERIODICALLY_INTERVAL_MINUTES, TimeUnit.MINUTES);
}
private void restart() {
startPeriodicTimer();
if (retryTimer == null) {
retryTimer = UserThread.runAfter(() -> {
log.trace("ConnectToMorePeersTimer called from onNewConnectionAfterAllConnectionsLost");
stopRetryTimer();
requestWithAvailablePeers();
}, RETRY_DELAY_AFTER_ALL_CON_LOST_SEC);
}
}
private List<NodeAddress> getNodeAddresses(Collection<ReportedPeer> collection) {
return collection.stream()
.map(e -> e.nodeAddress)
@ -291,10 +323,31 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
.collect(Collectors.toList());
}
private void stopConnectToMorePeersTimer() {
if (connectToMorePeersTimer != null) {
connectToMorePeersTimer.stop();
connectToMorePeersTimer = null;
private void stopPeriodicTimer() {
stopped = true;
if (periodicTimer != null) {
periodicTimer.stop();
periodicTimer = null;
}
}
private void stopRetryTimer() {
if (retryTimer != null) {
retryTimer.stop();
retryTimer = null;
}
}
private void closePeerExchangeHandler(Connection connection) {
if (connection.getPeersNodeAddressOptional().isPresent()) {
NodeAddress nodeAddress = connection.getPeersNodeAddressOptional().get();
peerExchangeHandlerMap.get(nodeAddress).cleanup();
peerExchangeHandlerMap.remove(nodeAddress);
}
}
private void closeAllPeerExchangeHandlers() {
peerExchangeHandlerMap.values().stream().forEach(PeerExchangeHandler::cleanup);
peerExchangeHandlerMap.clear();
}
}

View File

@ -3,6 +3,7 @@ package io.bitsquare.p2p.seed;
import com.google.common.annotations.VisibleForTesting;
import io.bitsquare.app.Log;
import io.bitsquare.app.Version;
import io.bitsquare.common.Clock;
import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.P2PService;
@ -137,7 +138,7 @@ public class SeedNode {
log.info("Created torDir at " + torDir.getAbsolutePath());
seedNodesRepository.setNodeAddressToExclude(mySeedNodeAddress);
seedNodeP2PService = new P2PService(seedNodesRepository, mySeedNodeAddress.port, torDir, useLocalhost, networkId, storageDir, null, null);
seedNodeP2PService = new P2PService(seedNodesRepository, mySeedNodeAddress.port, torDir, useLocalhost, networkId, storageDir, new Clock(), null, null);
seedNodeP2PService.start(listener);
}

View File

@ -1,5 +1,6 @@
package io.bitsquare.p2p;
import io.bitsquare.common.Clock;
import io.bitsquare.common.crypto.KeyRing;
import io.bitsquare.crypto.EncryptionService;
import io.bitsquare.p2p.seed.SeedNode;
@ -130,7 +131,7 @@ public class TestUtils {
}
P2PService p2PService = new P2PService(seedNodesRepository, port, new File("seed_node_" + port), useLocalhost,
2, new File("dummy"), encryptionService, keyRing);
2, new File("dummy"), new Clock(), encryptionService, keyRing);
p2PService.start(new P2PServiceListener() {
@Override
public void onRequestingDataCompleted() {