Fix missing sequence nr and signature updates at refresh offers

This commit is contained in:
Manfred Karrer 2016-02-22 16:18:14 +01:00
parent ca20de64d9
commit bb6334f6a0
24 changed files with 720 additions and 506 deletions

View File

@ -80,7 +80,7 @@ public class Storage<T extends Serializable> {
public T initAndGetPersisted(String fileName) {
this.fileName = fileName;
storageFile = new File(dir, fileName);
fileManager = new FileManager<>(dir, storageFile, 600);
fileManager = new FileManager<>(dir, storageFile, 300);
return getPersisted();
}

View File

@ -17,15 +17,14 @@
package io.bitsquare.arbitration;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import io.bitsquare.app.ProgramArguments;
import io.bitsquare.common.Timer;
import io.bitsquare.common.UserThread;
import io.bitsquare.common.crypto.KeyRing;
import io.bitsquare.common.handlers.ErrorMessageHandler;
import io.bitsquare.common.handlers.ResultHandler;
import io.bitsquare.common.util.Utilities;
import io.bitsquare.p2p.BootstrapListener;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.P2PService;
@ -47,7 +46,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -57,10 +55,14 @@ import static org.bitcoinj.core.Utils.HEX;
public class ArbitratorManager {
private static final Logger log = LoggerFactory.getLogger(ArbitratorManager.class);
private final KeyRing keyRing;
private final ArbitratorService arbitratorService;
private final User user;
private final ObservableMap<NodeAddress, Arbitrator> arbitratorsObservableMap = FXCollections.observableHashMap();
///////////////////////////////////////////////////////////////////////////////////////////
// Static
///////////////////////////////////////////////////////////////////////////////////////////
private static final long REPUBLISH_MILLIS = Arbitrator.TTL / 2;
private static final long RETRY_REPUBLISH_SEC = 5;
private static final String publicKeyForTesting = "027a381b5333a56e1cc3d90d3a7d07f26509adf7029ed06fc997c656621f8da1ee";
// Keys for invited arbitrators in bootstrapping phase (before registration is open to anyone and security payment is implemented)
// For testing purpose here is a private key so anyone can setup an arbitrator for now.
@ -87,10 +89,24 @@ public class ArbitratorManager {
"0274f772a98d23e7a0251ab30d7121897b5aebd11a2f1e45ab654aa57503173245",
"036d8a1dfcb406886037d2381da006358722823e1940acc2598c844bbc0fd1026f"
));
private static final String publicKeyForTesting = "027a381b5333a56e1cc3d90d3a7d07f26509adf7029ed06fc997c656621f8da1ee";
///////////////////////////////////////////////////////////////////////////////////////////
// Instance fields
///////////////////////////////////////////////////////////////////////////////////////////
private final KeyRing keyRing;
private final ArbitratorService arbitratorService;
private final User user;
private final ObservableMap<NodeAddress, Arbitrator> arbitratorsObservableMap = FXCollections.observableHashMap();
private final boolean isDevTest;
private BootstrapListener bootstrapListener;
private ScheduledThreadPoolExecutor republishArbitratorExecutor;
private Timer republishArbitratorTimer, retryRepublishArbitratorTimer;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
@Inject
public ArbitratorManager(@Named(ProgramArguments.DEV_TEST) boolean isDevTest, KeyRing keyRing, ArbitratorService arbitratorService, User user) {
@ -113,19 +129,26 @@ public class ArbitratorManager {
}
public void shutDown() {
if (republishArbitratorExecutor != null)
MoreExecutors.shutdownAndAwaitTermination(republishArbitratorExecutor, 500, TimeUnit.MILLISECONDS);
stopRepublishArbitratorTimer();
stopRetryRepublishArbitratorTimer();
if (bootstrapListener != null)
arbitratorService.getP2PService().removeP2PServiceListener(bootstrapListener);
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void onAllServicesInitialized() {
if (user.getRegisteredArbitrator() != null) {
P2PService p2PService = arbitratorService.getP2PService();
if (!p2PService.isBootstrapped()) {
bootstrapListener = new BootstrapListener() {
@Override
public void onBootstrapComplete() {
republishArbitrator();
ArbitratorManager.this.onBootstrapComplete();
}
};
p2PService.addP2PServiceListener(bootstrapListener);
@ -133,29 +156,13 @@ public class ArbitratorManager {
} else {
republishArbitrator();
}
// re-publish periodically
republishArbitratorExecutor = Utilities.getScheduledThreadPoolExecutor("republishArbitrator", 1, 5, 5);
long delay = Arbitrator.TTL / 2;
republishArbitratorExecutor.scheduleAtFixedRate(this::republishArbitrator, delay, delay, TimeUnit.MILLISECONDS);
}
republishArbitratorTimer = UserThread.runPeriodically(this::republishArbitrator, REPUBLISH_MILLIS, TimeUnit.MILLISECONDS);
applyArbitrators();
}
private void republishArbitrator() {
if (bootstrapListener != null)
arbitratorService.getP2PService().removeP2PServiceListener(bootstrapListener);
Arbitrator registeredArbitrator = user.getRegisteredArbitrator();
if (registeredArbitrator != null) {
addArbitrator(registeredArbitrator,
this::applyArbitrators,
log::error
);
}
}
public void applyArbitrators() {
Map<NodeAddress, Arbitrator> map = arbitratorService.getArbitrators();
log.trace("Arbitrators . size=" + map.values().size());
@ -222,18 +229,6 @@ public class ArbitratorManager {
return key.signMessage(keyToSignAsHex);
}
private boolean verifySignature(PublicKey storageSignaturePubKey, byte[] registrationPubKey, String signature) {
String keyToSignAsHex = Utils.HEX.encode(storageSignaturePubKey.getEncoded());
try {
ECKey key = ECKey.fromPublicOnly(registrationPubKey);
key.verifyMessage(keyToSignAsHex, signature);
return true;
} catch (SignatureException e) {
log.warn("verifySignature failed");
return false;
}
}
@Nullable
public ECKey getRegistrationKey(String privKeyBigIntString) {
try {
@ -246,4 +241,61 @@ public class ArbitratorManager {
public boolean isPublicKeyInList(String pubKeyAsHex) {
return isDevTest && pubKeyAsHex.equals(publicKeyForTesting) || publicKeys.contains(pubKeyAsHex);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private void onBootstrapComplete() {
if (bootstrapListener != null) {
arbitratorService.getP2PService().removeP2PServiceListener(bootstrapListener);
bootstrapListener = null;
}
republishArbitrator();
}
private void republishArbitrator() {
Arbitrator registeredArbitrator = user.getRegisteredArbitrator();
if (registeredArbitrator != null) {
addArbitrator(registeredArbitrator,
this::applyArbitrators,
errorMessage -> {
if (retryRepublishArbitratorTimer == null)
retryRepublishArbitratorTimer = UserThread.runPeriodically(() -> {
stopRetryRepublishArbitratorTimer();
republishArbitrator();
}, RETRY_REPUBLISH_SEC);
}
);
}
}
private boolean verifySignature(PublicKey storageSignaturePubKey, byte[] registrationPubKey, String signature) {
String keyToSignAsHex = Utils.HEX.encode(storageSignaturePubKey.getEncoded());
try {
ECKey key = ECKey.fromPublicOnly(registrationPubKey);
key.verifyMessage(keyToSignAsHex, signature);
return true;
} catch (SignatureException e) {
log.warn("verifySignature failed");
return false;
}
}
private void stopRetryRepublishArbitratorTimer() {
if (retryRepublishArbitratorTimer != null) {
retryRepublishArbitratorTimer.stop();
retryRepublishArbitratorTimer = null;
}
}
private void stopRepublishArbitratorTimer() {
if (republishArbitratorTimer != null) {
republishArbitratorTimer.stop();
republishArbitratorTimer = null;
}
}
}

View File

@ -54,8 +54,8 @@ public final class Offer implements StoragePayload, RequiresOwnerIsOnlinePayload
@JsonExclude
private static final Logger log = LoggerFactory.getLogger(Offer.class);
public static final long TTL = TimeUnit.SECONDS.toMillis(60);
// public static final long TTL = TimeUnit.SECONDS.toMillis(10); //TODO
// public static final long TTL = TimeUnit.SECONDS.toMillis(60);
public static final long TTL = TimeUnit.SECONDS.toMillis(10); //TODO
public final static String TAC_OFFERER = "When placing that offer I accept that anyone who fulfills my conditions can " +
"take that offer.";

View File

@ -60,7 +60,10 @@ import static io.bitsquare.util.Validator.nonEmptyStringOf;
public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMessageListener {
private static final Logger log = LoggerFactory.getLogger(OpenOfferManager.class);
private static final long RETRY_DELAY_AFTER_ALL_CON_LOST_SEC = 5;
private static final long RETRY_REPUBLISH_DELAY_SEC = 5;
private static final long REPUBLISH_AGAIN_AT_STARTUP_DELAY_SEC = 10;
private static final long REPUBLISH_INTERVAL_MILLIS = 10 * Offer.TTL;
private static final long REFRESH_INTERVAL_MILLIS = (long) (Offer.TTL * 0.5);
private final KeyRing keyRing;
private final User user;
@ -73,7 +76,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
private final TradableList<OpenOffer> openOffers;
private final Storage<TradableList<OpenOffer>> openOffersStorage;
private boolean stopped;
private Timer periodicRepublishOffersTimer, periodicRefreshOffersTimer, republishOffersTimer;
private Timer periodicRepublishOffersTimer, periodicRefreshOffersTimer, retryRepublishOffersTimer;
private BootstrapListener bootstrapListener;
@ -131,6 +134,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
stopPeriodicRefreshOffersTimer();
stopPeriodicRepublishOffersTimer();
stopRetryRepublishOffersTimer();
log.info("remove all open offers at shutDown");
// we remove own offers from offerbook when we go offline
@ -167,10 +171,21 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
// Republish means we send the complete offer object
republishOffers();
startRepublishOffersThread();
startPeriodicRepublishOffersTimer();
// Refresh is started once we get a success from republish
// We republish after a bit as it might be that our connected node still has the offer in the data map
// but other peers have it already removed because of expired TTL.
// Those other not directly connected peers would not get the broadcast of the new offer, as the first
// connected peer (seed node) does nto broadcast if it has the data in the map.
// To update quickly to the whole network we repeat the republishOffers call after a few seconds when we
// are better connected to the network. There is no guarantee that all peers will receive it but we have
// also our periodic timer, so after that longer interval the offer should be available to all peers.
if (retryRepublishOffersTimer == null)
retryRepublishOffersTimer = UserThread.runAfter(OpenOfferManager.this::republishOffers,
REPUBLISH_AGAIN_AT_STARTUP_DELAY_SEC);
p2PService.getPeerManager().addListener(this);
}
@ -184,90 +199,25 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
stopped = true;
stopPeriodicRefreshOffersTimer();
stopPeriodicRepublishOffersTimer();
stopRetryRepublishOffersTimer();
restart();
}
@Override
public void onNewConnectionAfterAllConnectionsLost() {
stopped = false;
restart();
}
@Override
public void onAwakeFromStandby() {
stopped = false;
if (!p2PService.getNetworkNode().getAllConnections().isEmpty())
restart();
}
///////////////////////////////////////////////////////////////////////////////////////////
// RepublishOffers, refreshOffers
///////////////////////////////////////////////////////////////////////////////////////////
private void startRepublishOffersThread() {
stopped = false;
if (periodicRepublishOffersTimer == null)
periodicRepublishOffersTimer = UserThread.runPeriodically(OpenOfferManager.this::republishOffers,
Offer.TTL * 10,
TimeUnit.MILLISECONDS);
}
private void republishOffers() {
Log.traceCall("Number of offer for republish: " + openOffers.size());
if (!stopped) {
stopPeriodicRefreshOffersTimer();
for (OpenOffer openOffer : openOffers) {
offerBookService.republishOffers(openOffer.getOffer(),
() -> {
log.debug("Successful added offer to P2P network");
// Refresh means we send only the dat needed to refresh the TTL (hash, signature and sequence nr.)
startRefreshOffersThread();
},
errorMessage -> {
//TODO handle with retry
log.error("Add offer to P2P network failed. " + errorMessage);
stopRepublishOffersTimer();
republishOffersTimer = UserThread.runAfter(OpenOfferManager.this::republishOffers,
RETRY_DELAY_AFTER_ALL_CON_LOST_SEC);
});
openOffer.setStorage(openOffersStorage);
}
} else {
log.warn("We have stopped already. We ignore that republishOffers call.");
}
}
private void startRefreshOffersThread() {
stopped = false;
// refresh sufficiently before offer would expire
if (periodicRefreshOffersTimer == null)
periodicRefreshOffersTimer = UserThread.runPeriodically(OpenOfferManager.this::refreshOffers,
(long) (Offer.TTL * 0.5),
TimeUnit.MILLISECONDS);
}
private void refreshOffers() {
if (!stopped) {
Log.traceCall("Number of offer for refresh: " + openOffers.size());
for (OpenOffer openOffer : openOffers) {
offerBookService.refreshOffer(openOffer.getOffer(),
() -> log.debug("Successful refreshed TTL for offer"),
errorMessage -> log.error("Refresh TTL for offer failed. " + errorMessage));
}
} else {
log.warn("We have stopped already. We ignore that refreshOffers call.");
}
}
private void restart() {
startRepublishOffersThread();
startRefreshOffersThread();
if (republishOffersTimer == null) {
stopped = false;
republishOffersTimer = UserThread.runAfter(OpenOfferManager.this::republishOffers, RETRY_DELAY_AFTER_ALL_CON_LOST_SEC);
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
@ -282,6 +232,12 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
openOffers.add(openOffer);
openOffersStorage.queueUpForSave();
resultHandler.handleResult(transaction);
if (!stopped) {
startPeriodicRepublishOffersTimer();
startPeriodicRefreshOffersTimer();
} else {
log.warn("We have stopped already. We ignore that placeOfferProtocol.placeOffer.onResult call.");
}
}
);
placeOfferProtocol.placeOffer();
@ -395,6 +351,97 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
}
///////////////////////////////////////////////////////////////////////////////////////////
// RepublishOffers, refreshOffers
///////////////////////////////////////////////////////////////////////////////////////////
private void republishOffers() {
Log.traceCall("Number of offer for republish: " + openOffers.size());
if (!stopped) {
stopPeriodicRefreshOffersTimer();
openOffers.stream().forEach(openOffer -> {
offerBookService.republishOffers(openOffer.getOffer(),
() -> {
if (!stopped) {
log.debug("Successful added offer to P2P network");
// Refresh means we send only the dat needed to refresh the TTL (hash, signature and sequence nr.)
if (periodicRefreshOffersTimer == null)
startPeriodicRefreshOffersTimer();
} else {
log.warn("We have stopped already. We ignore that offerBookService.republishOffers.onSuccess call.");
}
},
errorMessage -> {
if (!stopped) {
log.error("Add offer to P2P network failed. " + errorMessage);
stopRetryRepublishOffersTimer();
retryRepublishOffersTimer = UserThread.runAfter(OpenOfferManager.this::republishOffers,
RETRY_REPUBLISH_DELAY_SEC);
} else {
log.warn("We have stopped already. We ignore that offerBookService.republishOffers.onFault call.");
}
});
openOffer.setStorage(openOffersStorage);
});
} else {
log.warn("We have stopped already. We ignore that republishOffers call.");
}
}
private void startPeriodicRepublishOffersTimer() {
Log.traceCall();
stopped = false;
if (periodicRepublishOffersTimer == null)
periodicRepublishOffersTimer = UserThread.runPeriodically(() -> {
if (!stopped) {
republishOffers();
} else {
log.warn("We have stopped already. We ignore that periodicRepublishOffersTimer.run call.");
}
},
REPUBLISH_INTERVAL_MILLIS,
TimeUnit.MILLISECONDS);
else
log.warn("periodicRepublishOffersTimer already stated");
}
private void startPeriodicRefreshOffersTimer() {
Log.traceCall();
stopped = false;
// refresh sufficiently before offer would expire
if (periodicRefreshOffersTimer == null)
periodicRefreshOffersTimer = UserThread.runPeriodically(() -> {
if (!stopped) {
Log.traceCall("Number of offer for refresh: " + openOffers.size());
openOffers.stream().forEach(openOffer -> {
offerBookService.refreshOffer(openOffer.getOffer(),
() -> log.debug("Successful refreshed TTL for offer"),
errorMessage -> log.error("Refresh TTL for offer failed. " + errorMessage));
});
} else {
log.warn("We have stopped already. We ignore that periodicRefreshOffersTimer.run call.");
}
},
REFRESH_INTERVAL_MILLIS,
TimeUnit.MILLISECONDS);
else
log.warn("periodicRefreshOffersTimer already stated");
}
private void restart() {
Log.traceCall();
if (retryRepublishOffersTimer == null)
retryRepublishOffersTimer = UserThread.runAfter(() -> {
stopped = false;
stopRetryRepublishOffersTimer();
republishOffers();
}, RETRY_REPUBLISH_DELAY_SEC);
startPeriodicRepublishOffersTimer();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////
@ -413,10 +460,10 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
}
}
private void stopRepublishOffersTimer() {
if (republishOffersTimer != null) {
republishOffersTimer.stop();
republishOffersTimer = null;
private void stopRetryRepublishOffersTimer() {
if (retryRepublishOffersTimer != null) {
retryRepublishOffersTimer.stop();
retryRepublishOffersTimer = null;
}
}
}

View File

@ -696,6 +696,8 @@ public class CreateOfferView extends ActivatableViewAndModel<AnchorPane, CreateO
createOfferButton.setVisible(false);
createOfferButton.setOnAction(e -> onPlaceOffer());
createOfferButton.setMinHeight(40);
createOfferButton.setPadding(new Insets(0, 20, 0, 20));
placeOfferSpinner = placeOfferTuple.second;
placeOfferSpinner.setPrefSize(18, 18);
placeOfferSpinnerInfoLabel = placeOfferTuple.third;

View File

@ -579,6 +579,8 @@ public class TakeOfferView extends ActivatableViewAndModel<AnchorPane, TakeOffer
takeOfferButton.setVisible(false);
takeOfferButton.setOnAction(e -> onTakeOffer());
takeOfferButton.setMinHeight(40);
takeOfferButton.setPadding(new Insets(0, 20, 0, 20));
takeOfferSpinner = takeOfferTuple.second;
takeOfferSpinner.setPrefSize(18, 18);
takeOfferSpinnerInfoLabel = takeOfferTuple.third;

View File

@ -124,14 +124,14 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
networkNode.addConnectionListener(this);
networkNode.addMessageListener(this);
broadcaster = new Broadcaster(networkNode);
p2PDataStorage = new P2PDataStorage(broadcaster, networkNode, storageDir);
p2PDataStorage.addHashMapChangedListener(this);
Set<NodeAddress> seedNodeAddresses = seedNodesRepository.getSeedNodeAddresses(useLocalhost, networkId);
peerManager = new PeerManager(networkNode, seedNodeAddresses, storageDir, clock);
broadcaster = new Broadcaster(networkNode, peerManager);
p2PDataStorage = new P2PDataStorage(broadcaster, networkNode, storageDir);
p2PDataStorage.addHashMapChangedListener(this);
requestDataManager = new RequestDataManager(networkNode, p2PDataStorage, peerManager, seedNodeAddresses, this);
peerExchangeManager = new PeerExchangeManager(networkNode, peerManager, seedNodeAddresses);
@ -175,6 +175,9 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
if (peerManager != null)
peerManager.shutDown();
if (broadcaster != null)
broadcaster.shutDown();
if (requestDataManager != null)
requestDataManager.shutDown();
@ -224,7 +227,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
Log.traceCall();
requestDataManager.requestPreliminaryData();
keepAliveManager.start();
keepAliveManager.restart();
p2pServiceListeners.stream().forEach(SetupListener::onTorNodeReady);
}

View File

@ -60,9 +60,9 @@ public class Connection implements MessageListener {
private static final int MAX_MSG_SIZE = 100 * 1024; // 100 kb of compressed data
private static final int MSG_THROTTLE_PER_SEC = 10; // With MAX_MSG_SIZE of 100kb results in bandwidth of 10 mbit/sec
private static final int MSG_THROTTLE_PER_10_SEC = 50; // With MAX_MSG_SIZE of 100kb results in bandwidth of 5 mbit/sec for 10 sec
private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(60);
//private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(60);
//TODO
// private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(30);
private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(20);
public static int getMaxMsgSize() {
return MAX_MSG_SIZE;
@ -358,6 +358,7 @@ public class Connection implements MessageListener {
log.info("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" +
"ShutDown connection:"
+ "\npeersNodeAddress=" + peersNodeAddress
+ "\ncloseConnectionReason=" + closeConnectionReason
+ "\nuid=" + uid
+ "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n");

View File

@ -5,7 +5,9 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import io.bitsquare.app.Log;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.CloseConnectionReason;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.ConnectionListener;
import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.storage.messages.DataBroadcastMessage;
import javafx.beans.property.IntegerProperty;
@ -19,7 +21,7 @@ import org.slf4j.LoggerFactory;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
public class Broadcaster {
public class Broadcaster implements ConnectionListener, PeerManager.Listener {
private static final Logger log = LoggerFactory.getLogger(Broadcaster.class);
@ -28,15 +30,35 @@ public class Broadcaster {
}
private final NetworkNode networkNode;
private PeerManager peerManager;
private final Set<Listener> listeners = new CopyOnWriteArraySet<>();
private boolean stopped = false;
private final IntegerProperty numOfBroadcasts = new SimpleIntegerProperty(0);
public Broadcaster(NetworkNode networkNode) {
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
public Broadcaster(NetworkNode networkNode, PeerManager peerManager) {
this.networkNode = networkNode;
this.peerManager = peerManager;
networkNode.removeConnectionListener(this);
peerManager.removeListener(this);
}
public void shutDown() {
stopped = true;
networkNode.removeConnectionListener(this);
peerManager.removeListener(this);
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void broadcast(DataBroadcastMessage message, @Nullable NodeAddress sender) {
Log.traceCall("Sender=" + sender + "\n\t" +
"Message=" + StringUtils.abbreviate(message.toString(), 100));
@ -48,21 +70,28 @@ public class Broadcaster {
.filter(connection -> !connection.getPeersNodeAddressOptional().get().equals(sender))
.forEach(connection -> {
NodeAddress nodeAddress = connection.getPeersNodeAddressOptional().get();
log.trace("Broadcast message to " +
nodeAddress + ".");
log.trace("Broadcast message to " + nodeAddress + ".");
SettableFuture<Connection> future = networkNode.sendMessage(connection, message);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("Broadcast to " + nodeAddress + " succeeded.");
numOfBroadcasts.set(numOfBroadcasts.get() + 1);
listeners.stream().forEach(listener -> listener.onBroadcasted(message));
if (!stopped) {
//log.trace("Broadcast to " + nodeAddress + " succeeded.");
numOfBroadcasts.set(numOfBroadcasts.get() + 1);
listeners.stream().forEach(listener -> listener.onBroadcasted(message));
} else {
log.warn("We have stopped already. We ignore that networkNode.sendMessage.onSuccess call.");
}
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("Broadcast to " + nodeAddress + " failed.\n\t" +
"ErrorMessage=" + throwable.getMessage());
if (!stopped) {
log.info("Broadcast to " + nodeAddress + " failed.\n\t" +
"ErrorMessage=" + throwable.getMessage());
} else {
log.warn("We have stopped already. We ignore that networkNode.sendMessage.onFailure call.");
}
}
});
});
@ -85,4 +114,43 @@ public class Broadcaster {
public void removeListener(Listener listener) {
listeners.remove(listener);
}
///////////////////////////////////////////////////////////////////////////////////////////
// ConnectionListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onConnection(Connection connection) {
stopped = false;
}
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
}
@Override
public void onError(Throwable throwable) {
}
///////////////////////////////////////////////////////////////////////////////////////////
// PeerManager.Listener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onAllConnectionsLost() {
stopped = true;
}
@Override
public void onNewConnectionAfterAllConnectionsLost() {
stopped = false;
}
@Override
public void onAwakeFromStandby() {
if (!networkNode.getAllConnections().isEmpty())
stopped = false;
}
}

View File

@ -4,10 +4,8 @@ import io.bitsquare.app.Log;
import io.bitsquare.common.Clock;
import io.bitsquare.common.Timer;
import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.*;
import io.bitsquare.p2p.peers.getdata.messages.GetUpdatedDataRequest;
import io.bitsquare.p2p.peers.peerexchange.ReportedPeer;
import io.bitsquare.storage.Storage;
import javafx.beans.value.ChangeListener;
@ -20,15 +18,15 @@ import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
public class PeerManager implements ConnectionListener, MessageListener {
public class PeerManager implements ConnectionListener {
private static final Logger log = LoggerFactory.getLogger(PeerManager.class);
///////////////////////////////////////////////////////////////////////////////////////////
// Static
///////////////////////////////////////////////////////////////////////////////////////////
private static final long CHECK_MAX_CONN_DELAY_SEC = 3;
private static int MAX_CONNECTIONS;
private static int MIN_CONNECTIONS;
private static int MAX_CONNECTIONS_PEER;
@ -85,6 +83,8 @@ public class PeerManager implements ConnectionListener, MessageListener {
private final ChangeListener<NodeAddress> connectionNodeAddressListener;
private final Clock.Listener listener;
private final List<Listener> listeners = new LinkedList<>();
private boolean stopped;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
@ -107,10 +107,14 @@ public class PeerManager implements ConnectionListener, MessageListener {
printConnectedPeers();
if (checkMaxConnectionsTimer == null && newValue != null)
checkMaxConnectionsTimer = UserThread.runAfter(() -> {
removeTooOldReportedPeers();
removeTooOldPersistedPeers();
checkMaxConnections(MAX_CONNECTIONS);
}, 3);
if (!stopped) {
removeTooOldReportedPeers();
removeTooOldPersistedPeers();
checkMaxConnections(MAX_CONNECTIONS);
} else {
log.warn("We have stopped already. We ignore that checkMaxConnectionsTimer.run call.");
}
}, CHECK_MAX_CONN_DELAY_SEC);
};
// we check if app was idle for more then 5 sec.
@ -126,7 +130,8 @@ public class PeerManager implements ConnectionListener, MessageListener {
@Override
public void onMissedSecondTick(long missed) {
if (missed > Clock.IDLE_TOLERANCE) {
log.error("We have been idle for {} sec", missed / 1000);
log.warn("We have been in standby mode for {} sec", missed / 1000);
stopped = false;
listeners.stream().forEach(Listener::onAwakeFromStandby);
}
}
@ -142,6 +147,11 @@ public class PeerManager implements ConnectionListener, MessageListener {
stopCheckMaxConnectionsTimer();
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
public int getMaxConnections() {
return MAX_CONNECTIONS_ABSOLUTE;
}
@ -154,6 +164,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
listeners.remove(listener);
}
///////////////////////////////////////////////////////////////////////////////////////////
// ConnectionListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@ -161,18 +172,13 @@ public class PeerManager implements ConnectionListener, MessageListener {
@Override
public void onConnection(Connection connection) {
connection.getNodeAddressProperty().addListener(connectionNodeAddressListener);
Optional<NodeAddress> peersNodeAddressOptional = connection.getPeersNodeAddressOptional();
// OutboundConnection always know their peers address
// A seed node get only InboundConnection from other seed nodes with getData or peer exchange,
// never direct messages, so we need to check for PeerType.SEED_NODE at onMessage
if (connection instanceof OutboundConnection &&
peersNodeAddressOptional.isPresent() &&
seedNodeAddresses.contains(peersNodeAddressOptional.get())) {
if (isSeedNode(connection))
connection.setPeerType(Connection.PeerType.SEED_NODE);
}
if (lostAllConnections) {
lostAllConnections = false;
stopped = false;
listeners.stream().forEach(Listener::onNewConnectionAfterAllConnectionsLost);
}
}
@ -183,8 +189,10 @@ public class PeerManager implements ConnectionListener, MessageListener {
handleConnectionFault(connection);
lostAllConnections = networkNode.getAllConnections().isEmpty();
if (lostAllConnections)
if (lostAllConnections) {
stopped = true;
listeners.stream().forEach(Listener::onAllConnectionsLost);
}
}
@Override
@ -193,25 +201,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
///////////////////////////////////////////////////////////////////////////////////////////
// MessageListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
//TODO move to RequestDataManager
@Override
public void onMessage(Message message, Connection connection) {
// In case a seed node connects to another seed node we get his address at the DataRequest triggered from
// RequestDataManager.updateDataFromConnectedSeedNode
if (message instanceof GetUpdatedDataRequest) {
Log.traceCall(message.toString() + "\n\tconnection=" + connection);
if (isSeedNode(connection))
connection.setPeerType(Connection.PeerType.SEED_NODE);
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// Check seed node connections
// Check max connections
///////////////////////////////////////////////////////////////////////////////////////////
private boolean checkMaxConnections(int limit) {
@ -222,7 +212,6 @@ public class PeerManager implements ConnectionListener, MessageListener {
int size = allConnections.size();
log.info("We have {} connections open. Our limit is {}", size, limit);
if (size > limit) {
// Only InboundConnection, and PEER type connections
log.info("We have too many connections open.\n\t" +
"Lets try first to remove the inbound connections of type PEER.");
List<Connection> candidates = allConnections.stream()
@ -235,7 +224,6 @@ public class PeerManager implements ConnectionListener, MessageListener {
"MAX_CONNECTIONS_PEER limit of {}", MAX_CONNECTIONS_PEER);
if (size > MAX_CONNECTIONS_PEER) {
log.info("Lets try to remove ANY connection of type PEER.");
// Only PEER type connections
candidates = allConnections.stream()
.filter(e -> e.getPeerType() == Connection.PeerType.PEER)
.collect(Collectors.toList());
@ -245,7 +233,6 @@ public class PeerManager implements ConnectionListener, MessageListener {
"MAX_CONNECTIONS_NON_DIRECT limit of {}", MAX_CONNECTIONS_NON_DIRECT);
if (size > MAX_CONNECTIONS_NON_DIRECT) {
log.info("Lets try to remove any connection which is not of type DIRECT_MSG_PEER.");
// All connections except DIRECT_MSG_PEER
candidates = allConnections.stream()
.filter(e -> e.getPeerType() != Connection.PeerType.DIRECT_MSG_PEER)
.collect(Collectors.toList());
@ -255,9 +242,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
"MAX_CONNECTIONS_ABSOLUTE limit of {}", MAX_CONNECTIONS_ABSOLUTE);
if (size > MAX_CONNECTIONS_ABSOLUTE) {
log.info("Lets try to remove any connection.");
// All connections
candidates = allConnections.stream()
.collect(Collectors.toList());
candidates = allConnections.stream().collect(Collectors.toList());
}
}
}
@ -286,9 +271,9 @@ public class PeerManager implements ConnectionListener, MessageListener {
private void removeSuperfluousSeedNodes() {
Log.traceCall();
Set<Connection> allConnections = networkNode.getAllConnections();
if (allConnections.size() > MAX_CONNECTIONS_PEER) {
List<Connection> candidates = allConnections.stream()
Set<Connection> connections = networkNode.getConfirmedConnections();
if (hasSufficientConnections()) {
List<Connection> candidates = connections.stream()
.filter(this::isSeedNode)
.collect(Collectors.toList());
@ -318,7 +303,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
.filter(e -> e.nodeAddress.equals(nodeAddress)).findAny();
if (reportedPeerOptional.isPresent()) {
ReportedPeer reportedPeer = reportedPeerOptional.get();
reportedPeers.remove(reportedPeer);
removeReportedPeer(reportedPeer);
return reportedPeer;
} else {
return null;
@ -338,7 +323,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
}
public void addToReportedPeers(HashSet<ReportedPeer> reportedPeersToAdd, Connection connection) {
printReportedPeers(reportedPeersToAdd);
printNewReportedPeers(reportedPeersToAdd);
// We check if the reported msg is not violating our rules
if (reportedPeersToAdd.size() <= (MAX_REPORTED_PEERS + PeerManager.MAX_CONNECTIONS_ABSOLUTE + 10)) {
@ -359,6 +344,25 @@ public class PeerManager implements ConnectionListener, MessageListener {
}
}
private void purgeReportedPeersIfExceeds() {
Log.traceCall();
int size = reportedPeers.size();
int limit = MAX_REPORTED_PEERS - MAX_CONNECTIONS_ABSOLUTE;
if (size > limit) {
log.trace("We have already {} reported peers which exceeds our limit of {}." +
"We remove random peers from the reported peers list.", size, limit);
int diff = size - limit;
List<ReportedPeer> list = new ArrayList<>(reportedPeers);
// we dont use sorting by lastActivityDate to keep it more random
for (int i = 0; i < diff; i++) {
ReportedPeer toRemove = list.remove(new Random().nextInt(list.size()));
removeReportedPeer(toRemove);
}
} else {
log.trace("No need to purge reported peers.\n\tWe don't have more then {} reported peers yet.", MAX_REPORTED_PEERS);
}
}
private void printReportedPeers() {
if (!reportedPeers.isEmpty()) {
if (printReportedPeersDetails) {
@ -372,25 +376,26 @@ public class PeerManager implements ConnectionListener, MessageListener {
}
}
private void printReportedPeers(HashSet<ReportedPeer> reportedPeers) {
private void printNewReportedPeers(HashSet<ReportedPeer> reportedPeers) {
if (printReportedPeersDetails) {
StringBuilder result = new StringBuilder("We received now reportedPeers:");
StringBuilder result = new StringBuilder("We received new reportedPeers:");
reportedPeers.stream().forEach(e -> result.append("\n\t").append(e));
log.info(result.toString());
}
log.info("Number of new arrived reported peers: {}", reportedPeers.size());
}
///////////////////////////////////////////////////////////////////////////////////////////
// Persisted peers
///////////////////////////////////////////////////////////////////////////////////////////
private boolean removePersistedPeer(ReportedPeer reportedPeer) {
if (persistedPeers.contains(reportedPeer)) {
persistedPeers.remove(reportedPeer);
private boolean removePersistedPeer(ReportedPeer persistedPeer) {
if (persistedPeers.contains(persistedPeer)) {
persistedPeers.remove(persistedPeer);
if (dbStorage != null)
dbStorage.queueUpForSave(persistedPeers, 5000);
dbStorage.queueUpForSave(persistedPeers, 2000);
return true;
} else {
@ -401,12 +406,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
private boolean removePersistedPeer(NodeAddress nodeAddress) {
Optional<ReportedPeer> persistedPeerOptional = persistedPeers.stream()
.filter(e -> e.nodeAddress.equals(nodeAddress)).findAny();
persistedPeerOptional.ifPresent(persistedPeer -> {
persistedPeers.remove(persistedPeer);
if (dbStorage != null)
dbStorage.queueUpForSave(persistedPeers, 5000);
});
return persistedPeerOptional.isPresent();
return persistedPeerOptional.isPresent() && removePersistedPeer(persistedPeerOptional.get());
}
private void removeTooOldPersistedPeers() {
@ -417,6 +417,24 @@ public class PeerManager implements ConnectionListener, MessageListener {
persistedPeersToRemove.forEach(this::removePersistedPeer);
}
private void purgePersistedPeersIfExceeds() {
Log.traceCall();
int size = persistedPeers.size();
int limit = MAX_PERSISTED_PEERS;
if (size > limit) {
log.trace("We have already {} persisted peers which exceeds our limit of {}." +
"We remove random peers from the persisted peers list.", size, limit);
int diff = size - limit;
List<ReportedPeer> list = new ArrayList<>(persistedPeers);
// we dont use sorting by lastActivityDate to avoid attack vectors and keep it more random
for (int i = 0; i < diff; i++) {
ReportedPeer toRemove = list.remove(new Random().nextInt(list.size()));
removePersistedPeer(toRemove);
}
} else {
log.trace("No need to purge persisted peers.\n\tWe don't have more then {} persisted peers yet.", MAX_PERSISTED_PEERS);
}
}
public Set<ReportedPeer> getPersistedPeers() {
return persistedPeers;
@ -431,32 +449,6 @@ public class PeerManager implements ConnectionListener, MessageListener {
return networkNode.getNodeAddressesOfConfirmedConnections().size() >= MIN_CONNECTIONS;
}
public void handleConnectionFault(Connection connection) {
connection.getPeersNodeAddressOptional().ifPresent(nodeAddress -> handleConnectionFault(nodeAddress, connection));
}
public void handleConnectionFault(NodeAddress nodeAddress, @Nullable Connection connection) {
Log.traceCall("nodeAddress=" + nodeAddress);
ReportedPeer reportedPeer = removeReportedPeer(nodeAddress);
if (connection != null && connection.getRuleViolation() != null) {
removePersistedPeer(nodeAddress);
} else {
if (reportedPeer != null) {
removePersistedPeer(nodeAddress);
persistedPeers.add(reportedPeer);
dbStorage.queueUpForSave(persistedPeers, 5000);
removeTooOldPersistedPeers();
}
}
}
/* public Set<ReportedPeer> getConnectedAndReportedPeers() {
Set<ReportedPeer> result = new HashSet<>(reportedPeers);
result.addAll(getConnectedPeers());
return result;
}*/
public boolean isSeedNode(ReportedPeer reportedPeer) {
return seedNodeAddresses.contains(reportedPeer.nodeAddress);
}
@ -486,6 +478,23 @@ public class PeerManager implements ConnectionListener, MessageListener {
return networkNode.getNodeAddressesOfConfirmedConnections().contains(nodeAddress);
}
public void handleConnectionFault(Connection connection) {
connection.getPeersNodeAddressOptional().ifPresent(nodeAddress -> handleConnectionFault(nodeAddress, connection));
}
public void handleConnectionFault(NodeAddress nodeAddress) {
handleConnectionFault(nodeAddress, null);
}
public void handleConnectionFault(NodeAddress nodeAddress, @Nullable Connection connection) {
Log.traceCall("nodeAddress=" + nodeAddress);
ReportedPeer reportedPeer = removeReportedPeer(nodeAddress);
if (connection != null && connection.getRuleViolation() != null)
removePersistedPeer(nodeAddress);
else
removeTooOldPersistedPeers();
}
public void shutDownConnection(Connection connection, CloseConnectionReason closeConnectionReason) {
if (connection.getPeerType() != Connection.PeerType.DIRECT_MSG_PEER)
connection.shutDown(closeConnectionReason);
@ -500,54 +509,17 @@ public class PeerManager implements ConnectionListener, MessageListener {
.ifPresent(connection -> connection.shutDown(closeConnectionReason));
}
public HashSet<ReportedPeer> getConnectedNonSeedNodeReportedPeers(NodeAddress excludedNodeAddress) {
return new HashSet<>(getConnectedNonSeedNodeReportedPeers().stream()
.filter(e -> !e.nodeAddress.equals(excludedNodeAddress))
.collect(Collectors.toSet()));
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private void purgeReportedPeersIfExceeds() {
Log.traceCall();
int size = getReportedPeers().size();
int limit = MAX_REPORTED_PEERS - MAX_CONNECTIONS_ABSOLUTE;
if (size > limit) {
log.trace("We have already {} reported peers which exceeds our limit of {}." +
"We remove random peers from the reported peers list.", size, limit);
int diff = size - limit;
List<ReportedPeer> list = new ArrayList<>(getReportedPeers());
// we dont use sorting by lastActivityDate to avoid attack vectors and keep it more random
for (int i = 0; i < diff; i++) {
ReportedPeer toRemove = getAndRemoveRandomReportedPeer(list);
removeReportedPeer(toRemove);
}
} else {
log.trace("No need to purge reported peers.\n\tWe don't have more then {} reported peers yet.", MAX_REPORTED_PEERS);
}
}
private void purgePersistedPeersIfExceeds() {
Log.traceCall();
int size = getPersistedPeers().size();
int limit = MAX_PERSISTED_PEERS - MAX_CONNECTIONS_ABSOLUTE;
if (size > limit) {
log.trace("We have already {} persisted peers which exceeds our limit of {}." +
"We remove random peers from the persisted peers list.", size, limit);
int diff = size - limit;
List<ReportedPeer> list = new ArrayList<>(getReportedPeers());
// we dont use sorting by lastActivityDate to avoid attack vectors and keep it more random
for (int i = 0; i < diff; i++) {
ReportedPeer toRemove = getAndRemoveRandomReportedPeer(list);
removePersistedPeer(toRemove);
}
} else {
log.trace("No need to purge persisted peers.\n\tWe don't have more then {} persisted peers yet.", MAX_PERSISTED_PEERS);
}
}
private ReportedPeer getAndRemoveRandomReportedPeer(List<ReportedPeer> list) {
checkArgument(!list.isEmpty(), "List must not be empty");
return list.remove(new Random().nextInt(list.size()));
}
private Set<ReportedPeer> getConnectedPeers() {
private Set<ReportedPeer> getConnectedReportedPeers() {
// networkNode.getConfirmedConnections includes:
// filter(connection -> connection.getPeersNodeAddressOptional().isPresent())
return networkNode.getConfirmedConnections().stream()
@ -555,18 +527,12 @@ public class PeerManager implements ConnectionListener, MessageListener {
.collect(Collectors.toSet());
}
private HashSet<ReportedPeer> getConnectedPeersNonSeedNodes() {
return new HashSet<>(getConnectedPeers().stream()
private HashSet<ReportedPeer> getConnectedNonSeedNodeReportedPeers() {
return new HashSet<>(getConnectedReportedPeers().stream()
.filter(e -> !isSeedNode(e))
.collect(Collectors.toSet()));
}
public HashSet<ReportedPeer> getConnectedPeersNonSeedNodes(NodeAddress excludedNodeAddress) {
return new HashSet<>(getConnectedPeersNonSeedNodes().stream()
.filter(e -> !e.nodeAddress.equals(excludedNodeAddress))
.collect(Collectors.toSet()));
}
private void stopCheckMaxConnectionsTimer() {
if (checkMaxConnectionsTimer != null) {
checkMaxConnectionsTimer.stop();

View File

@ -50,7 +50,7 @@ public class RequestDataHandler implements MessageListener {
private final PeerManager peerManager;
private final Listener listener;
private Timer timeoutTimer;
private final long nonce = new Random().nextLong();
private final int nonce = new Random().nextInt();
private boolean stopped;
///////////////////////////////////////////////////////////////////////////////////////////
@ -67,11 +67,8 @@ public class RequestDataHandler implements MessageListener {
networkNode.addMessageListener(this);
}
public void cleanup() {
Log.traceCall();
stopped = true;
networkNode.removeMessageListener(this);
stopTimeoutTimer();
public void cancel() {
cleanup();
}
@ -99,21 +96,29 @@ public class RequestDataHandler implements MessageListener {
@Override
public void onFailure(@NotNull Throwable throwable) {
String errorMessage = "Sending getDataRequest to " + nodeAddress +
" failed. That is expected if the peer is offline.\n\t" +
"getDataRequest=" + getDataRequest + "." +
"\n\tException=" + throwable.getMessage();
log.info(errorMessage);
handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE);
if (!stopped) {
String errorMessage = "Sending getDataRequest to " + nodeAddress +
" failed. That is expected if the peer is offline.\n\t" +
"getDataRequest=" + getDataRequest + "." +
"\n\tException=" + throwable.getMessage();
log.info(errorMessage);
handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE);
} else {
log.warn("We have stopped already. We ignore that requestData.onFailure call.");
}
}
});
checkArgument(timeoutTimer == null, "requestData must not be called twice.");
timeoutTimer = UserThread.runAfter(() -> {
String errorMessage = "A timeout occurred at sending getDataRequest:" + getDataRequest +
" on nodeAddress:" + nodeAddress;
log.info(errorMessage + " / RequestDataHandler=" + RequestDataHandler.this);
handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT);
if (!stopped) {
String errorMessage = "A timeout occurred at sending getDataRequest:" + getDataRequest +
" on nodeAddress:" + nodeAddress;
log.info(errorMessage + " / RequestDataHandler=" + RequestDataHandler.this);
handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT);
} else {
log.warn("We have stopped already. We ignore that timeoutTimer.run call.");
}
},
10);
} else {
@ -160,16 +165,25 @@ public class RequestDataHandler implements MessageListener {
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private void handleFault(String errorMessage, NodeAddress nodeAddress, CloseConnectionReason closeConnectionReason) {
cleanup();
peerManager.shutDownConnection(nodeAddress, closeConnectionReason);
peerManager.handleConnectionFault(nodeAddress);
listener.onFault(errorMessage, null);
}
private void cleanup() {
Log.traceCall();
stopped = true;
networkNode.removeMessageListener(this);
stopTimeoutTimer();
}
private void stopTimeoutTimer() {
if (timeoutTimer != null) {
timeoutTimer.stop();
timeoutTimer = null;
}
}
private void handleFault(String errorMessage, NodeAddress nodeAddress, CloseConnectionReason closeConnectionReason) {
cleanup();
peerManager.shutDownConnection(nodeAddress, closeConnectionReason);
listener.onFault(errorMessage, null);
}
}

View File

@ -82,7 +82,7 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
stopRetryTimer();
networkNode.removeMessageListener(this);
peerManager.removeListener(this);
handlerMap.values().stream().forEach(RequestDataHandler::cleanup);
closeAllHandlers();
}
@ -122,14 +122,12 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
@Override
public void onConnection(Connection connection) {
Log.traceCall();
// clean up in case we could not clean up at disconnect
closeRequestDataHandler(connection);
}
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
Log.traceCall();
closeRequestDataHandler(connection);
closeHandler(connection);
}
@Override
@ -144,28 +142,27 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
@Override
public void onAllConnectionsLost() {
Log.traceCall();
closeAllRequestDataHandlers();
closeAllHandlers();
stopRetryTimer();
stopped = true;
restart();
}
@Override
public void onNewConnectionAfterAllConnectionsLost() {
Log.traceCall();
closeAllRequestDataHandlers();
closeAllHandlers();
stopped = false;
retryAfterDelay();
restart();
}
@Override
public void onAwakeFromStandby() {
Log.traceCall();
closeAllRequestDataHandlers();
closeAllHandlers();
stopped = false;
if (!networkNode.getAllConnections().isEmpty())
retryAfterDelay();
restart();
}
@ -185,22 +182,25 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
new GetDataRequestHandler.Listener() {
@Override
public void onComplete() {
log.trace("requestDataHandshake completed.\n\tConnection={}",
connection);
log.trace("requestDataHandshake completed.\n\tConnection={}", connection);
}
@Override
public void onFault(String errorMessage, @Nullable Connection connection) {
log.trace("GetDataRequestHandler failed.\n\tConnection={}\n\t" +
"ErrorMessage={}", connection, errorMessage);
peerManager.handleConnectionFault(connection);
if (!stopped) {
log.trace("GetDataRequestHandler failed.\n\tConnection={}\n\t" +
"ErrorMessage={}", connection, errorMessage);
peerManager.handleConnectionFault(connection);
} else {
log.warn("We have stopped already. We ignore that getDataRequestHandler.handle.onFault call.");
}
}
});
getDataRequestHandler.handle((GetDataRequest) message, connection);
} else {
log.warn("We have stopped already. We ignore that onMessage call.");
}
}
}
}
///////////////////////////////////////////////////////////////////////////////////////////
@ -239,37 +239,32 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
@Override
public void onFault(String errorMessage, @Nullable Connection connection) {
log.trace("requestDataHandshake of outbound connection failed.\n\tnodeAddress={}\n\t" +
log.trace("requestDataHandshake with outbound connection failed.\n\tnodeAddress={}\n\t" +
"ErrorMessage={}", nodeAddress, errorMessage);
handlerMap.remove(nodeAddress);
peerManager.handleConnectionFault(nodeAddress, connection);
if (!stopped) {
if (!remainingNodeAddresses.isEmpty()) {
log.info("There are remaining nodes available for requesting data. " +
"We will try requestDataFromPeers again.");
NodeAddress nextCandidate = remainingNodeAddresses.get(0);
remainingNodeAddresses.remove(nextCandidate);
requestData(nextCandidate, remainingNodeAddresses);
} else {
log.info("There is no remaining node available for requesting data. " +
"That is expected if no other node is online.\n\t" +
"We will try to use reported peers (if no available we use persisted peers) " +
"and try again to request data from our seed nodes after a random pause.");
// Notify listeners
if (!nodeAddressOfPreliminaryDataRequest.isPresent()) {
if (peerManager.isSeedNode(nodeAddress))
listener.onNoSeedNodeAvailable();
else
listener.onNoPeersAvailable();
}
retryAfterDelay();
}
if (!remainingNodeAddresses.isEmpty()) {
log.info("There are remaining nodes available for requesting data. " +
"We will try requestDataFromPeers again.");
NodeAddress nextCandidate = remainingNodeAddresses.get(0);
remainingNodeAddresses.remove(nextCandidate);
requestData(nextCandidate, remainingNodeAddresses);
} else {
log.warn("We have stopped already. We ignore that requestData.onFault call.");
log.info("There is no remaining node available for requesting data. " +
"That is expected if no other node is online.\n\t" +
"We will try to use reported peers (if no available we use persisted peers) " +
"and try again to request data from our seed nodes after a random pause.");
// Notify listeners
if (!nodeAddressOfPreliminaryDataRequest.isPresent()) {
if (peerManager.isSeedNode(nodeAddress))
listener.onNoSeedNodeAvailable();
else
listener.onNoPeersAvailable();
}
restart();
}
}
});
@ -288,10 +283,13 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
// Utils
///////////////////////////////////////////////////////////////////////////////////////////
private void retryAfterDelay() {
private void restart() {
Log.traceCall();
if (retryTimer == null) {
retryTimer = UserThread.runAfter(() -> {
log.trace("retryTimer called");
stopped = false;
stopRetryTimer();
// We create a new list of candidates
@ -345,19 +343,21 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
}
}
private void closeRequestDataHandler(Connection connection) {
private void closeHandler(Connection connection) {
Optional<NodeAddress> peersNodeAddressOptional = connection.getPeersNodeAddressOptional();
if (peersNodeAddressOptional.isPresent()) {
NodeAddress nodeAddress = peersNodeAddressOptional.get();
if (handlerMap.containsKey(nodeAddress)) {
handlerMap.get(nodeAddress).cleanup();
handlerMap.get(nodeAddress).cancel();
handlerMap.remove(nodeAddress);
}
} else {
log.trace("closeRequestDataHandler: nodeAddress not set in connection " + connection);
}
}
private void closeAllRequestDataHandlers() {
handlerMap.values().stream().forEach(RequestDataHandler::cleanup);
private void closeAllHandlers() {
handlerMap.values().stream().forEach(RequestDataHandler::cancel);
handlerMap.clear();
}

View File

@ -3,5 +3,5 @@ package io.bitsquare.p2p.peers.getdata.messages;
import io.bitsquare.p2p.Message;
public interface GetDataRequest extends Message {
long getNonce();
int getNonce();
}

View File

@ -12,9 +12,9 @@ public final class GetDataResponse implements Message {
private final int messageVersion = Version.getP2PMessageVersion();
public final HashSet<ProtectedData> dataSet;
public final long requestNonce;
public final int requestNonce;
public GetDataResponse(HashSet<ProtectedData> dataSet, long requestNonce) {
public GetDataResponse(HashSet<ProtectedData> dataSet, int requestNonce) {
this.dataSet = dataSet;
this.requestNonce = requestNonce;
}

View File

@ -10,15 +10,15 @@ public final class GetUpdatedDataRequest implements SendersNodeAddressMessage, G
private final int messageVersion = Version.getP2PMessageVersion();
private final NodeAddress senderNodeAddress;
private final long nonce;
private final int nonce;
public GetUpdatedDataRequest(NodeAddress senderNodeAddress, long nonce) {
public GetUpdatedDataRequest(NodeAddress senderNodeAddress, int nonce) {
this.senderNodeAddress = senderNodeAddress;
this.nonce = nonce;
}
@Override
public long getNonce() {
public int getNonce() {
return nonce;
}

View File

@ -8,14 +8,14 @@ public final class PreliminaryGetDataRequest implements AnonymousMessage, GetDat
private static final long serialVersionUID = Version.P2P_NETWORK_VERSION;
private final int messageVersion = Version.getP2PMessageVersion();
private final long nonce;
private final int nonce;
public PreliminaryGetDataRequest(long nonce) {
public PreliminaryGetDataRequest(int nonce) {
this.nonce = nonce;
}
@Override
public long getNonce() {
public int getNonce() {
return nonce;
}

View File

@ -57,10 +57,8 @@ class KeepAliveHandler implements MessageListener {
this.listener = listener;
}
public void cleanup() {
stopped = true;
if (connection != null)
connection.removeMessageListener(this);
public void cancel() {
cleanup();
}
@ -83,13 +81,18 @@ class KeepAliveHandler implements MessageListener {
@Override
public void onFailure(@NotNull Throwable throwable) {
String errorMessage = "Sending ping to " + connection +
" failed. That is expected if the peer is offline.\n\tping=" + ping +
".\n\tException=" + throwable.getMessage();
log.info(errorMessage);
cleanup();
peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE);
listener.onFault(errorMessage);
if (!stopped) {
String errorMessage = "Sending ping to " + connection +
" failed. That is expected if the peer is offline.\n\tping=" + ping +
".\n\tException=" + throwable.getMessage();
log.info(errorMessage);
cleanup();
peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE);
peerManager.handleConnectionFault(connection);
listener.onFault(errorMessage);
} else {
log.warn("We have stopped already. We ignore that sendPing.onFailure call.");
}
}
});
} else {
@ -121,4 +124,10 @@ class KeepAliveHandler implements MessageListener {
}
}
}
private void cleanup() {
stopped = true;
if (connection != null)
connection.removeMessageListener(this);
}
}

View File

@ -17,14 +17,13 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
public class KeepAliveManager implements MessageListener, ConnectionListener, PeerManager.Listener {
private static final Logger log = LoggerFactory.getLogger(KeepAliveManager.class);
private static final int INTERVAL_SEC = new Random().nextInt(10) + 10;
//private static final int INTERVAL_SEC = new Random().nextInt(10) + 10;
//TODO
// private static final int INTERVAL_SEC = 5;
private static final int INTERVAL_SEC = 5;
private final NetworkNode networkNode;
private final PeerManager peerManager;
@ -49,13 +48,10 @@ public class KeepAliveManager implements MessageListener, ConnectionListener, Pe
public void shutDown() {
Log.traceCall();
stopped = true;
networkNode.removeMessageListener(this);
networkNode.removeConnectionListener(this);
peerManager.removeListener(this);
closeAllMaintenanceHandlers();
closeAllHandlers();
stopKeepAliveTimer();
}
@ -64,10 +60,14 @@ public class KeepAliveManager implements MessageListener, ConnectionListener, Pe
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void start() {
stopped = false;
public void restart() {
if (keepAliveTimer == null)
keepAliveTimer = UserThread.runPeriodically(this::keepAlive, INTERVAL_SEC);
keepAliveTimer = UserThread.runPeriodically(() -> {
stopped = false;
keepAlive();
}, INTERVAL_SEC);
else
log.warn("keepAliveTimer already running");
}
@ -91,12 +91,16 @@ public class KeepAliveManager implements MessageListener, ConnectionListener, Pe
@Override
public void onFailure(@NotNull Throwable throwable) {
String errorMessage = "Sending pong to " + connection +
" failed. That is expected if the peer is offline. pong=" + pong + "." +
"Exception: " + throwable.getMessage();
log.info(errorMessage);
peerManager.handleConnectionFault(connection);
peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE);
if (!stopped) {
String errorMessage = "Sending pong to " + connection +
" failed. That is expected if the peer is offline. pong=" + pong + "." +
"Exception: " + throwable.getMessage();
log.info(errorMessage);
peerManager.handleConnectionFault(connection);
peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE);
} else {
log.warn("We have stopped already. We ignore that networkNode.sendMessage.onFailure call.");
}
}
});
} else {
@ -113,14 +117,12 @@ public class KeepAliveManager implements MessageListener, ConnectionListener, Pe
@Override
public void onConnection(Connection connection) {
Log.traceCall();
// clean up in case we could not clean up at disconnect
closeMaintenanceHandler(connection);
}
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
Log.traceCall();
closeMaintenanceHandler(connection);
closeHandler(connection);
}
@Override
@ -135,23 +137,27 @@ public class KeepAliveManager implements MessageListener, ConnectionListener, Pe
@Override
public void onAllConnectionsLost() {
Log.traceCall();
closeAllMaintenanceHandlers();
closeAllHandlers();
stopKeepAliveTimer();
stopped = true;
restart();
}
@Override
public void onNewConnectionAfterAllConnectionsLost() {
Log.traceCall();
closeAllMaintenanceHandlers();
start();
closeAllHandlers();
stopped = false;
restart();
}
@Override
public void onAwakeFromStandby() {
Log.traceCall();
closeAllMaintenanceHandlers();
closeAllHandlers();
stopped = false;
if (!networkNode.getAllConnections().isEmpty())
start();
restart();
}
@ -204,16 +210,16 @@ public class KeepAliveManager implements MessageListener, ConnectionListener, Pe
}
}
private void closeMaintenanceHandler(Connection connection) {
private void closeHandler(Connection connection) {
String uid = connection.getUid();
if (handlerMap.containsKey(uid)) {
handlerMap.get(uid).cleanup();
handlerMap.get(uid).cancel();
handlerMap.remove(uid);
}
}
private void closeAllMaintenanceHandlers() {
handlerMap.values().stream().forEach(KeepAliveHandler::cleanup);
private void closeAllHandlers() {
handlerMap.values().stream().forEach(KeepAliveHandler::cancel);
handlerMap.clear();
}

View File

@ -68,7 +68,7 @@ class GetPeersRequestHandler {
checkArgument(connection.getPeersNodeAddressOptional().isPresent(),
"The peers address must have been already set at the moment");
GetPeersResponse getPeersResponse = new GetPeersResponse(getPeersRequest.nonce,
peerManager.getConnectedPeersNonSeedNodes(connection.getPeersNodeAddressOptional().get()));
peerManager.getConnectedNonSeedNodeReportedPeers(connection.getPeersNodeAddressOptional().get()));
SettableFuture<Connection> future = networkNode.sendMessage(connection,
getPeersResponse);
Futures.addCallback(future, new FutureCallback<Connection>() {

View File

@ -65,15 +65,9 @@ class PeerExchangeHandler implements MessageListener {
this.listener = listener;
}
public void cleanup() {
stopped = true;
if (connection != null)
connection.removeMessageListener(this);
if (timeoutTimer != null) {
timeoutTimer.stop();
timeoutTimer = null;
}
public void cancel() {
Log.traceCall();
cleanup();
}
@ -85,38 +79,50 @@ class PeerExchangeHandler implements MessageListener {
Log.traceCall("nodeAddress=" + nodeAddress + " / this=" + this);
if (!stopped) {
if (networkNode.getNodeAddress() != null) {
GetPeersRequest getPeersRequest = new GetPeersRequest(networkNode.getNodeAddress(), nonce, peerManager.getConnectedPeersNonSeedNodes(nodeAddress));
GetPeersRequest getPeersRequest = new GetPeersRequest(networkNode.getNodeAddress(), nonce, peerManager.getConnectedNonSeedNodeReportedPeers(nodeAddress));
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, getPeersRequest);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
if (!connection.getPeersNodeAddressOptional().isPresent()) {
connection.setPeersNodeAddress(nodeAddress);
//TODO remove setPeersNodeAddress if never needed
log.warn("sendGetPeersRequest: !connection.getPeersNodeAddressOptional().isPresent()");
if (!stopped) {
if (!connection.getPeersNodeAddressOptional().isPresent()) {
connection.setPeersNodeAddress(nodeAddress);
//TODO remove setPeersNodeAddress if never needed
log.warn("sendGetPeersRequest: !connection.getPeersNodeAddressOptional().isPresent()");
}
PeerExchangeHandler.this.connection = connection;
connection.addMessageListener(PeerExchangeHandler.this);
log.trace("Send " + getPeersRequest + " to " + nodeAddress + " succeeded.");
} else {
log.warn("We have stopped that handler already. We ignore that sendGetPeersRequest.onSuccess call.");
}
PeerExchangeHandler.this.connection = connection;
connection.addMessageListener(PeerExchangeHandler.this);
log.trace("Send " + getPeersRequest + " to " + nodeAddress + " succeeded.");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
String errorMessage = "Sending getPeersRequest to " + nodeAddress +
" failed. That is expected if the peer is offline.\n\tgetPeersRequest=" + getPeersRequest +
".\n\tException=" + throwable.getMessage();
log.info(errorMessage);
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_FAILURE, nodeAddress);
if (!stopped) {
String errorMessage = "Sending getPeersRequest to " + nodeAddress +
" failed. That is expected if the peer is offline.\n\tgetPeersRequest=" + getPeersRequest +
".\n\tException=" + throwable.getMessage();
log.info(errorMessage);
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_FAILURE, nodeAddress);
} else {
log.warn("We have stopped that handler already. We ignore that sendGetPeersRequest.onFailure call.");
}
}
});
checkArgument(timeoutTimer == null, "requestReportedPeers must not be called twice.");
timeoutTimer = UserThread.runAfter(() -> {
String errorMessage = "A timeout occurred at sending getPeersRequest:" + getPeersRequest + " for nodeAddress:" + nodeAddress;
log.info(errorMessage + " / PeerExchangeHandler=" +
PeerExchangeHandler.this);
log.info("timeoutTimer called on " + this);
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, nodeAddress);
if (!stopped) {
String errorMessage = "A timeout occurred at sending getPeersRequest:" + getPeersRequest + " for nodeAddress:" + nodeAddress;
log.info(errorMessage + " / PeerExchangeHandler=" +
PeerExchangeHandler.this);
log.info("timeoutTimer called on " + this);
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, nodeAddress);
} else {
log.warn("We have stopped that handler already. We ignore that timeoutTimer.run call.");
}
},
TIME_OUT_SEC, TimeUnit.SECONDS);
} else {
@ -162,6 +168,7 @@ class PeerExchangeHandler implements MessageListener {
///////////////////////////////////////////////////////////////////////////////////////////
private void handleFault(String errorMessage, CloseConnectionReason sendMsgFailure, NodeAddress nodeAddress) {
Log.traceCall();
// TODO retry
cleanup();
if (connection == null)
@ -169,6 +176,20 @@ class PeerExchangeHandler implements MessageListener {
else
peerManager.shutDownConnection(connection, sendMsgFailure);
peerManager.handleConnectionFault(nodeAddress, connection);
listener.onFault(errorMessage, connection);
}
private void cleanup() {
Log.traceCall();
stopped = true;
if (connection != null)
connection.removeMessageListener(this);
if (timeoutTimer != null) {
timeoutTimer.stop();
timeoutTimer = null;
}
}
}

View File

@ -59,7 +59,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener,
stopPeriodicTimer();
stopRetryTimer();
closeAllPeerExchangeHandlers();
closeAllHandlers();
}
@ -85,14 +85,12 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener,
@Override
public void onConnection(Connection connection) {
Log.traceCall();
// clean up in case we could not clean up at disconnect
closePeerExchangeHandler(connection);
}
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
Log.traceCall();
closePeerExchangeHandler(connection);
closeHandler(connection);
if (retryTimer == null) {
retryTimer = UserThread.runAfter(() -> {
@ -115,24 +113,26 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener,
@Override
public void onAllConnectionsLost() {
Log.traceCall();
closeAllPeerExchangeHandlers();
closeAllHandlers();
stopPeriodicTimer();
stopRetryTimer();
stopped = true;
restart();
}
@Override
public void onNewConnectionAfterAllConnectionsLost() {
Log.traceCall();
closeAllPeerExchangeHandlers();
closeAllHandlers();
stopped = false;
restart();
}
@Override
public void onAwakeFromStandby() {
Log.traceCall();
closeAllPeerExchangeHandlers();
closeAllHandlers();
stopped = false;
if (!networkNode.getAllConnections().isEmpty())
restart();
}
@ -197,30 +197,32 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener,
"nodeAddress={}", errorMessage, nodeAddress);
handlerMap.remove(nodeAddress);
peerManager.handleConnectionFault(nodeAddress, connection);
if (!stopped) {
if (!remainingNodeAddresses.isEmpty()) {
if (!peerManager.hasSufficientConnections()) {
log.info("There are remaining nodes available for requesting peers. " +
"We will try getReportedPeers again.");
NodeAddress nextCandidate = remainingNodeAddresses.get(new Random().nextInt(remainingNodeAddresses.size()));
remainingNodeAddresses.remove(nextCandidate);
requestReportedPeers(nextCandidate, remainingNodeAddresses);
} else {
// That path will rarely be reached
log.info("We have already sufficient connections.");
}
if (!remainingNodeAddresses.isEmpty()) {
if (!peerManager.hasSufficientConnections()) {
log.info("There are remaining nodes available for requesting peers. " +
"We will try getReportedPeers again.");
NodeAddress nextCandidate = remainingNodeAddresses.get(new Random().nextInt(remainingNodeAddresses.size()));
remainingNodeAddresses.remove(nextCandidate);
requestReportedPeers(nextCandidate, remainingNodeAddresses);
} else {
log.info("There is no remaining node available for requesting peers. " +
"That is expected if no other node is online.\n\t" +
"We will try again after a pause.");
if (retryTimer == null)
retryTimer = UserThread.runAfter(() -> {
log.trace("ConnectToMorePeersTimer called from requestReportedPeers code path");
// That path will rarely be reached
log.info("We have already sufficient connections.");
}
} else {
log.info("There is no remaining node available for requesting peers. " +
"That is expected if no other node is online.\n\t" +
"We will try again after a pause.");
if (retryTimer == null)
retryTimer = UserThread.runAfter(() -> {
if (!stopped) {
log.trace("retryTimer called from requestReportedPeers code path");
stopRetryTimer();
requestWithAvailablePeers();
}, RETRY_DELAY_SEC);
}
} else {
stopRetryTimer();
log.warn("We have stopped already. We ignore that retryTimer.run call.");
}
}, RETRY_DELAY_SEC);
}
}
});
@ -266,9 +268,14 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener,
log.info("No more peers are available for requestReportedPeers. We will try again after a pause.");
if (retryTimer == null)
retryTimer = UserThread.runAfter(() -> {
log.trace("ConnectToMorePeersTimer called from requestWithAvailablePeers code path");
stopRetryTimer();
requestWithAvailablePeers();
if (!stopped) {
log.trace("retryTimer called from requestWithAvailablePeers code path");
stopRetryTimer();
requestWithAvailablePeers();
} else {
stopRetryTimer();
log.warn("We have stopped already. We ignore that retryTimer.run call.");
}
}, RETRY_DELAY_SEC);
}
} else {
@ -289,6 +296,8 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener,
if (periodicTimer == null)
periodicTimer = UserThread.runPeriodically(this::requestWithAvailablePeers,
REQUEST_PERIODICALLY_INTERVAL_MINUTES, TimeUnit.MINUTES);
else
log.warn("periodicTimer already started");
}
private void restart() {
@ -296,10 +305,13 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener,
if (retryTimer == null) {
retryTimer = UserThread.runAfter(() -> {
log.trace("ConnectToMorePeersTimer called from onNewConnectionAfterAllConnectionsLost");
stopped = false;
log.trace("retryTimer called from restart");
stopRetryTimer();
requestWithAvailablePeers();
}, RETRY_DELAY_AFTER_ALL_CON_LOST_SEC);
} else {
log.warn("retryTimer already started");
}
}
@ -338,19 +350,23 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener,
}
}
private void closePeerExchangeHandler(Connection connection) {
private void closeHandler(Connection connection) {
Log.traceCall();
Optional<NodeAddress> peersNodeAddressOptional = connection.getPeersNodeAddressOptional();
if (peersNodeAddressOptional.isPresent()) {
NodeAddress nodeAddress = peersNodeAddressOptional.get();
if (handlerMap.containsKey(nodeAddress)) {
handlerMap.get(nodeAddress).cleanup();
handlerMap.get(nodeAddress).cancel();
handlerMap.remove(nodeAddress);
}
} else {
log.warn("closeHandler: nodeAddress not set in connection " + connection);
}
}
private void closeAllPeerExchangeHandlers() {
handlerMap.values().stream().forEach(PeerExchangeHandler::cleanup);
private void closeAllHandlers() {
Log.traceCall();
handlerMap.values().stream().forEach(PeerExchangeHandler::cancel);
handlerMap.clear();
}
}

View File

@ -1,15 +1,14 @@
package io.bitsquare.p2p.storage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.MoreExecutors;
import io.bitsquare.app.Log;
import io.bitsquare.app.Version;
import io.bitsquare.common.Timer;
import io.bitsquare.common.UserThread;
import io.bitsquare.common.crypto.CryptoException;
import io.bitsquare.common.crypto.Hash;
import io.bitsquare.common.crypto.Sig;
import io.bitsquare.common.persistance.Persistable;
import io.bitsquare.common.util.Utilities;
import io.bitsquare.common.wire.Payload;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress;
@ -30,7 +29,6 @@ import java.security.PublicKey;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
// Run in UserThread
@ -39,15 +37,14 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
@VisibleForTesting
//public static int CHECK_TTL_INTERVAL_MILLIS = (int) TimeUnit.SECONDS.toMillis(30);
public static int CHECK_TTL_INTERVAL_MILLIS = (int) TimeUnit.HOURS.toMillis(30);
// public static int CHECK_TTL_INTERVAL_MILLIS = (int) TimeUnit.SECONDS.toMillis(5);//TODO
public static int CHECK_TTL_INTERVAL_SEC = 5;//TODO
private final Broadcaster broadcaster;
private final Map<ByteArray, ProtectedData> map = new ConcurrentHashMap<>();
private final CopyOnWriteArraySet<HashMapChangedListener> hashMapChangedListeners = new CopyOnWriteArraySet<>();
private Timer removeExpiredEntriesTimer;
private HashMap<ByteArray, MapValue> sequenceNumberMap = new HashMap<>();
private final Storage<HashMap> storage;
private final ScheduledThreadPoolExecutor removeExpiredEntriesExecutor;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
@ -60,21 +57,25 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
networkNode.addConnectionListener(this);
storage = new Storage<>(storageDir);
removeExpiredEntriesExecutor = Utilities.getScheduledThreadPoolExecutor("removeExpiredEntries", 1, 10, 5);
HashMap<ByteArray, MapValue> persisted = storage.initAndGetPersisted("SequenceNumberMap");
if (persisted != null)
sequenceNumberMap = getPurgedSequenceNumberMap(persisted);
}
public void shutDown() {
if (removeExpiredEntriesTimer != null)
removeExpiredEntriesTimer.stop();
}
public void onBootstrapComplete() {
removeExpiredEntriesExecutor.scheduleAtFixedRate(() -> UserThread.execute(() -> {
removeExpiredEntriesTimer = UserThread.runPeriodically(() -> {
log.trace("removeExpiredEntries");
// The moment when an object becomes expired will not be synchronous in the network and we could
// get add messages after the object has expired. To avoid repeated additions of already expired
// object when we get it sent from new peers, we dont remove the sequence number from the map.
// That way an ADD message for an already expired data will fail because the sequence number
// is equal and not larger.
// is equal and not larger as expected.
Map<ByteArray, ProtectedData> temp = new HashMap<>(map);
Set<ProtectedData> toRemoveSet = new HashSet<>();
temp.entrySet().stream()
@ -83,7 +84,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
ByteArray hashOfPayload = entry.getKey();
ProtectedData protectedData = map.get(hashOfPayload);
toRemoveSet.add(protectedData);
log.error("remove protectedData:\n\t" + protectedData);
log.warn("We found an expired data entry. We remove the protectedData:\n\t" + protectedData);
map.remove(hashOfPayload);
});
@ -93,8 +94,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
if (sequenceNumberMap.size() > 1000)
sequenceNumberMap = getPurgedSequenceNumberMap(sequenceNumberMap);
}), CHECK_TTL_INTERVAL_MILLIS, CHECK_TTL_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
}, CHECK_TTL_INTERVAL_SEC);
}
@ -166,11 +166,6 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void shutDown() {
Log.traceCall();
MoreExecutors.shutdownAndAwaitTermination(removeExpiredEntriesExecutor, 500, TimeUnit.MILLISECONDS);
}
public boolean add(ProtectedData protectedData, @Nullable NodeAddress sender) {
return add(protectedData, sender, false);
}
@ -190,13 +185,9 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
if (result) {
map.put(hashOfPayload, protectedData);
// Republished data have a larger sequence number. We set the rePublish flag to enable broadcasting
// even we had the data with the old seq nr. already
if (sequenceNumberMap.containsKey(hashOfPayload) &&
protectedData.sequenceNumber > sequenceNumberMap.get(hashOfPayload).sequenceNr)
sequenceNumberMap.put(hashOfPayload, new MapValue(protectedData.sequenceNumber, System.currentTimeMillis()));
storage.queueUpForSave(sequenceNumberMap, 5000);
sequenceNumberMap.put(hashOfPayload, new MapValue(protectedData.sequenceNumber, System.currentTimeMillis()));
storage.queueUpForSave(sequenceNumberMap, 100);
log.error("sequenceNumberMap queueUpForSave protectedData.sequenceNumber " + protectedData.sequenceNumber);
StringBuilder sb = new StringBuilder("\n\n------------------------------------------------------------\n");
sb.append("Data set after doAdd (truncated)");
@ -230,7 +221,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
if (storedData.expirablePayload instanceof StoragePayload) {
if (sequenceNumberMap.containsKey(hashOfPayload) && sequenceNumberMap.get(hashOfPayload).sequenceNr == sequenceNumber) {
log.warn("We got that message with that seq nr already from another peer. We ignore that message.");
log.trace("We got that message with that seq nr already from another peer. We ignore that message.");
return true;
} else {
PublicKey ownerPubKey = ((StoragePayload) storedData.expirablePayload).getOwnerPubKey();
@ -239,11 +230,14 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
checkIfStoredDataPubKeyMatchesNewDataPubKey(ownerPubKey, hashOfPayload);
if (result) {
log.error("refreshDate called for storedData:\n\t" + StringUtils.abbreviate(storedData.toString(), 100));
log.info("refreshDate called for storedData:\n\t" + StringUtils.abbreviate(storedData.toString(), 100));
storedData.refreshDate();
storedData.updateSequenceNumber(sequenceNumber);
storedData.updateSignature(signature);
sequenceNumberMap.put(hashOfPayload, new MapValue(sequenceNumber, System.currentTimeMillis()));
storage.queueUpForSave(sequenceNumberMap, 5000);
storage.queueUpForSave(sequenceNumberMap, 100);
log.error("sequenceNumberMap queueUpForSave sequenceNumber " + sequenceNumber);
StringBuilder sb = new StringBuilder("\n\n------------------------------------------------------------\n");
sb.append("Data set after refreshTTL (truncated)");
@ -286,7 +280,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
broadcast(new RemoveDataMessage(protectedData), sender);
sequenceNumberMap.put(hashOfPayload, new MapValue(protectedData.sequenceNumber, System.currentTimeMillis()));
storage.queueUpForSave(sequenceNumberMap, 5000);
storage.queueUpForSave(sequenceNumberMap, 100);
} else {
log.debug("remove failed");
}
@ -311,7 +305,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
broadcast(new RemoveMailboxDataMessage(protectedMailboxData), sender);
sequenceNumberMap.put(hashOfData, new MapValue(protectedMailboxData.sequenceNumber, System.currentTimeMillis()));
storage.queueUpForSave(sequenceNumberMap, 5000);
storage.queueUpForSave(sequenceNumberMap, 100);
} else {
log.debug("removeMailboxData failed");
}
@ -332,6 +326,8 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
else
sequenceNumber = 0;
log.error("getProtectedData sequenceNumber " + sequenceNumber);
byte[] hashOfDataAndSeqNr = Hash.getHash(new DataAndSeqNrPair(payload, sequenceNumber));
byte[] signature = Sig.sign(ownerStoragePubKey.getPrivate(), hashOfDataAndSeqNr);
return new ProtectedData(payload, payload.getTTL(), ownerStoragePubKey.getPublic(), sequenceNumber, signature);
@ -346,6 +342,8 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
else
sequenceNumber = 0;
log.error("getRefreshTTLMessage sequenceNumber " + sequenceNumber);
byte[] hashOfDataAndSeqNr = Hash.getHash(new DataAndSeqNrPair(payload, sequenceNumber));
byte[] signature = Sig.sign(ownerStoragePubKey.getPrivate(), hashOfDataAndSeqNr);
return new RefreshTTLMessage(hashOfDataAndSeqNr, signature, hashOfPayload.bytes, sequenceNumber);
@ -393,7 +391,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
if (sequenceNumberMap.containsKey(hashOfData)) {
Integer storedSequenceNumber = sequenceNumberMap.get(hashOfData).sequenceNr;
if (newSequenceNumber < storedSequenceNumber) {
log.warn("Sequence number is invalid. newSequenceNumber="
log.warn("Sequence number is invalid. sequenceNumber = "
+ newSequenceNumber + " / storedSequenceNumber=" + storedSequenceNumber);
return false;
} else {

View File

@ -19,8 +19,8 @@ public class ProtectedData implements Payload {
transient public long ttl;
public final PublicKey ownerPubKey;
public final int sequenceNumber;
public final byte[] signature;
public int sequenceNumber;
public byte[] signature;
@VisibleForTesting
transient public Date date;
@ -49,6 +49,14 @@ public class ProtectedData implements Payload {
date = new Date();
}
public void updateSequenceNumber(int sequenceNumber) {
this.sequenceNumber = sequenceNumber;
}
public void updateSignature(byte[] signature) {
this.signature = signature;
}
public boolean isExpired() {
return (new Date().getTime() - date.getTime()) > ttl;
}
@ -64,4 +72,5 @@ public class ProtectedData implements Payload {
", signature.hashCode()=" + Arrays.toString(signature).hashCode() +
'}';
}
}

View File

@ -56,7 +56,7 @@ public class ProtectedDataStorageTest {
dir2.mkdir();
UserThread.setExecutor(Executors.newSingleThreadExecutor());
P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS = 500;
P2PDataStorage.CHECK_TTL_INTERVAL_SEC = 500;
keyRing1 = new KeyRing(new KeyStorage(dir1));
@ -112,7 +112,7 @@ public class ProtectedDataStorageTest {
// @Test
public void testTTL() throws InterruptedException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException, NoSuchProviderException {
mockData.ttl = (int) (P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS * 1.5);
mockData.ttl = (int) (P2PDataStorage.CHECK_TTL_INTERVAL_SEC * 1.5);
ProtectedData data = dataStorage1.getProtectedData(mockData, storageSignatureKeyPair1);
log.debug("data.date " + data.date);
log.debug("data.date " + data.date.getTime());
@ -120,11 +120,11 @@ public class ProtectedDataStorageTest {
log.debug("test 1");
Assert.assertEquals(1, dataStorage1.getMap().size());
Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS);
Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_SEC);
log.debug("test 2");
Assert.assertEquals(1, dataStorage1.getMap().size());
Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS * 2);
Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_SEC * 2);
log.debug("test 3 removed");
Assert.assertEquals(0, dataStorage1.getMap().size());
}
@ -162,31 +162,31 @@ public class ProtectedDataStorageTest {
*/
@Test
public void testRefreshTTL() throws InterruptedException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException, NoSuchProviderException {
mockData.ttl = (int) (P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS * 1.5);
mockData.ttl = (int) (P2PDataStorage.CHECK_TTL_INTERVAL_SEC * 1.5);
ProtectedData data = dataStorage1.getProtectedData(mockData, storageSignatureKeyPair1);
Assert.assertTrue(dataStorage1.add(data, null));
Assert.assertEquals(1, dataStorage1.getMap().size());
Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS);
Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_SEC);
log.debug("test 1");
Assert.assertEquals(1, dataStorage1.getMap().size());
RefreshTTLMessage refreshTTLMessage = dataStorage1.getRefreshTTLMessage(mockData, storageSignatureKeyPair1);
Assert.assertTrue(dataStorage1.refreshTTL(refreshTTLMessage, null));
Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS);
Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_SEC);
log.debug("test 2");
Assert.assertEquals(1, dataStorage1.getMap().size());
refreshTTLMessage = dataStorage1.getRefreshTTLMessage(mockData, storageSignatureKeyPair1);
Assert.assertTrue(dataStorage1.refreshTTL(refreshTTLMessage, null));
Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS);
Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_SEC);
log.debug("test 3");
Assert.assertEquals(1, dataStorage1.getMap().size());
Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS);
Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_SEC);
log.debug("test 4");
Assert.assertEquals(1, dataStorage1.getMap().size());
Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS * 2);
Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_SEC * 2);
log.debug("test 5 removed");
Assert.assertEquals(0, dataStorage1.getMap().size());
}