From aecbf7ced91c8b9f5893f4dc26d6bc76bae3c906 Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Fri, 26 Feb 2016 14:10:22 +0100 Subject: [PATCH] Add isOwner flag for broadcasting to all peers, remove popup for disconnects of rule violations, reduce max nr of network threads --- .../java/io/bitsquare/alert/AlertService.java | 10 ++-- .../arbitration/ArbitratorService.java | 4 +- .../java/io/bitsquare/trade/offer/Offer.java | 2 +- .../trade/offer/OfferBookService.java | 8 ++-- .../trade/offer/OpenOfferManager.java | 4 +- .../tasks/AddOfferToRemoteOfferBook.java | 3 -- .../tasks/BroadcastCreateOfferFeeTx.java | 7 +-- .../io/bitsquare/gui/main/MainViewModel.java | 16 ++----- .../java/io/bitsquare/p2p/P2PService.java | 22 ++++----- .../io/bitsquare/p2p/network/Connection.java | 12 ++--- .../io/bitsquare/p2p/network/NetworkNode.java | 2 +- .../bitsquare/p2p/peers/BroadcastHandler.java | 39 ++++++++-------- .../io/bitsquare/p2p/peers/Broadcaster.java | 5 +- .../io/bitsquare/p2p/peers/PeerManager.java | 8 ++-- .../peers/getdata/GetDataRequestHandler.java | 36 +++++++-------- .../p2p/peers/getdata/RequestDataHandler.java | 2 +- .../p2p/peers/getdata/RequestDataManager.java | 45 ++++++++++-------- .../bitsquare/p2p/storage/P2PDataStorage.java | 46 ++++++++----------- .../p2p/storage/ProtectedDataStorageTest.java | 12 ++--- 19 files changed, 130 insertions(+), 153 deletions(-) diff --git a/core/src/main/java/io/bitsquare/alert/AlertService.java b/core/src/main/java/io/bitsquare/alert/AlertService.java index 7d6b54df74..d0a13830c2 100644 --- a/core/src/main/java/io/bitsquare/alert/AlertService.java +++ b/core/src/main/java/io/bitsquare/alert/AlertService.java @@ -50,7 +50,7 @@ public class AlertService { } public void addAlertMessage(Alert alert, @Nullable ResultHandler resultHandler, @Nullable ErrorMessageHandler errorMessageHandler) { - boolean result = p2PService.addData(alert); + boolean result = p2PService.addData(alert, true, true); if (result) { log.trace("Add alertMessage to network was successful. AlertMessage = " + alert); if (resultHandler != null) resultHandler.handleResult(); @@ -60,11 +60,13 @@ public class AlertService { } public void removeAlertMessage(Alert alert, @Nullable ResultHandler resultHandler, @Nullable ErrorMessageHandler errorMessageHandler) { - if (p2PService.removeData(alert)) { + if (p2PService.removeData(alert, true)) { log.trace("Remove alertMessage from network was successful. AlertMessage = " + alert); - if (resultHandler != null) resultHandler.handleResult(); + if (resultHandler != null) + resultHandler.handleResult(); } else { - if (errorMessageHandler != null) errorMessageHandler.handleErrorMessage("Remove alertMessage failed"); + if (errorMessageHandler != null) + errorMessageHandler.handleErrorMessage("Remove alertMessage failed"); } } diff --git a/core/src/main/java/io/bitsquare/arbitration/ArbitratorService.java b/core/src/main/java/io/bitsquare/arbitration/ArbitratorService.java index e99d9f3d81..5c49c82e8c 100644 --- a/core/src/main/java/io/bitsquare/arbitration/ArbitratorService.java +++ b/core/src/main/java/io/bitsquare/arbitration/ArbitratorService.java @@ -59,7 +59,7 @@ public class ArbitratorService { public void addArbitrator(Arbitrator arbitrator, final ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { log.debug("addArbitrator arbitrator.hashCode() " + arbitrator.hashCode()); - boolean result = p2PService.addData(arbitrator); + boolean result = p2PService.addData(arbitrator, true, true); if (result) { log.trace("Add arbitrator to network was successful. Arbitrator.hashCode() = " + arbitrator.hashCode()); resultHandler.handleResult(); @@ -70,7 +70,7 @@ public class ArbitratorService { public void removeArbitrator(Arbitrator arbitrator, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { log.debug("removeArbitrator arbitrator.hashCode() " + arbitrator.hashCode()); - if (p2PService.removeData(arbitrator)) { + if (p2PService.removeData(arbitrator, true)) { log.trace("Remove arbitrator from network was successful. Arbitrator.hashCode() = " + arbitrator.hashCode()); resultHandler.handleResult(); } else { diff --git a/core/src/main/java/io/bitsquare/trade/offer/Offer.java b/core/src/main/java/io/bitsquare/trade/offer/Offer.java index 45a901ab9e..d4e6b8fd5f 100644 --- a/core/src/main/java/io/bitsquare/trade/offer/Offer.java +++ b/core/src/main/java/io/bitsquare/trade/offer/Offer.java @@ -58,7 +58,7 @@ public final class Offer implements StoragePayload, RequiresOwnerIsOnlinePayload private static final long serialVersionUID = Version.P2P_NETWORK_VERSION; @JsonExclude private static final Logger log = LoggerFactory.getLogger(Offer.class); - public static final long TTL = TimeUnit.SECONDS.toMillis(60); + public static final long TTL = TimeUnit.SECONDS.toMillis(4 * 60); public final static String TAC_OFFERER = "When placing that offer I accept that anyone who fulfills my conditions can " + "take that offer."; public static final String TAC_TAKER = "With taking the offer I commit to the trade conditions as defined."; diff --git a/core/src/main/java/io/bitsquare/trade/offer/OfferBookService.java b/core/src/main/java/io/bitsquare/trade/offer/OfferBookService.java index 79f02b5486..a75d8e1cdb 100644 --- a/core/src/main/java/io/bitsquare/trade/offer/OfferBookService.java +++ b/core/src/main/java/io/bitsquare/trade/offer/OfferBookService.java @@ -93,8 +93,8 @@ public class OfferBookService { doAddOffer(offer, resultHandler, errorMessageHandler, false); } - public void doAddOffer(Offer offer, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler, boolean forceBroadcast) { - boolean result = p2PService.addData(offer, forceBroadcast); + private void doAddOffer(Offer offer, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler, boolean forceBroadcast) { + boolean result = p2PService.addData(offer, forceBroadcast, true); if (result) { log.trace("Add offer to network was successful. Offer ID = " + offer.getId()); resultHandler.handleResult(); @@ -104,7 +104,7 @@ public class OfferBookService { } public void refreshOffer(Offer offer, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { - boolean result = p2PService.refreshTTL(offer); + boolean result = p2PService.refreshTTL(offer, true); if (result) { log.trace("Add offer to network was successful. Offer ID = " + offer.getId()); resultHandler.handleResult(); @@ -114,7 +114,7 @@ public class OfferBookService { } public void removeOffer(Offer offer, @Nullable ResultHandler resultHandler, @Nullable ErrorMessageHandler errorMessageHandler) { - if (p2PService.removeData(offer)) { + if (p2PService.removeData(offer, true)) { log.trace("Remove offer from network was successful. Offer ID = " + offer.getId()); if (resultHandler != null) resultHandler.handleResult(); diff --git a/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java b/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java index 68bdc53bec..eb5da86f7c 100644 --- a/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java +++ b/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java @@ -33,7 +33,6 @@ import io.bitsquare.p2p.NodeAddress; import io.bitsquare.p2p.P2PService; import io.bitsquare.p2p.messaging.DecryptedDirectMessageListener; import io.bitsquare.p2p.messaging.SendDirectMessageListener; -import io.bitsquare.p2p.peers.BroadcastHandler; import io.bitsquare.p2p.peers.PeerManager; import io.bitsquare.storage.Storage; import io.bitsquare.trade.TradableList; @@ -63,7 +62,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe private static final long RETRY_REPUBLISH_DELAY_SEC = Timer.STRESS_TEST ? 1 : 5; private static final long REPUBLISH_AGAIN_AT_STARTUP_DELAY_SEC = Timer.STRESS_TEST ? 1 : 10; - private static final long REPUBLISH_INTERVAL_MS = Timer.STRESS_TEST ? 3000 : 10 * Offer.TTL; + private static final long REPUBLISH_INTERVAL_MS = Timer.STRESS_TEST ? 3000 : 2 * Offer.TTL; private static final long REFRESH_INTERVAL_MS = Timer.STRESS_TEST ? 1000 : (long) (Offer.TTL * 0.5); private final KeyRing keyRing; @@ -140,7 +139,6 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe log.info("remove all open offers at shutDown"); // we remove own offers from offerbook when we go offline // Normally we use a delay for broadcasting to the peers, but at shut down we want to get it fast out - BroadcastHandler.useDelay(false); openOffers.forEach(openOffer -> offerBookService.removeOfferAtShutDown(openOffer.getOffer())); if (completeHandler != null) diff --git a/core/src/main/java/io/bitsquare/trade/protocol/placeoffer/tasks/AddOfferToRemoteOfferBook.java b/core/src/main/java/io/bitsquare/trade/protocol/placeoffer/tasks/AddOfferToRemoteOfferBook.java index ec1947dcea..6d3954bb38 100644 --- a/core/src/main/java/io/bitsquare/trade/protocol/placeoffer/tasks/AddOfferToRemoteOfferBook.java +++ b/core/src/main/java/io/bitsquare/trade/protocol/placeoffer/tasks/AddOfferToRemoteOfferBook.java @@ -19,7 +19,6 @@ package io.bitsquare.trade.protocol.placeoffer.tasks; import io.bitsquare.common.taskrunner.Task; import io.bitsquare.common.taskrunner.TaskRunner; -import io.bitsquare.p2p.peers.BroadcastHandler; import io.bitsquare.trade.protocol.placeoffer.PlaceOfferModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,11 +34,9 @@ public class AddOfferToRemoteOfferBook extends Task { protected void run() { try { runInterceptHook(); - BroadcastHandler.useDelay(false); model.offerBookService.addOffer(model.offer, () -> { model.offerAddedToOfferBook = true; - BroadcastHandler.useDelay(true); complete(); }, errorMessage -> { diff --git a/core/src/main/java/io/bitsquare/trade/protocol/placeoffer/tasks/BroadcastCreateOfferFeeTx.java b/core/src/main/java/io/bitsquare/trade/protocol/placeoffer/tasks/BroadcastCreateOfferFeeTx.java index c0006621c2..074ccab662 100644 --- a/core/src/main/java/io/bitsquare/trade/protocol/placeoffer/tasks/BroadcastCreateOfferFeeTx.java +++ b/core/src/main/java/io/bitsquare/trade/protocol/placeoffer/tasks/BroadcastCreateOfferFeeTx.java @@ -22,7 +22,6 @@ import io.bitsquare.btc.AddressEntry; import io.bitsquare.btc.FeePolicy; import io.bitsquare.common.taskrunner.Task; import io.bitsquare.common.taskrunner.TaskRunner; -import io.bitsquare.p2p.peers.BroadcastHandler; import io.bitsquare.trade.offer.Offer; import io.bitsquare.trade.protocol.placeoffer.PlaceOfferModel; import org.bitcoinj.core.Coin; @@ -63,7 +62,6 @@ public class BroadcastCreateOfferFeeTx extends Task { // Tx malleability happened after broadcast. We first remove the malleable offer. // Then we publish the changed offer to the P2P network again after setting the new TxId. // Normally we use a delay for broadcasting to the peers, but at shut down we want to get it fast out - BroadcastHandler.useDelay(false); model.offerBookService.removeOffer(model.offer, () -> { log.info("We store now the changed txID to the offer and add that again."); @@ -71,10 +69,7 @@ public class BroadcastCreateOfferFeeTx extends Task { model.offer.setOfferFeePaymentTxID(transaction.getHashAsString()); model.setTransaction(transaction); model.offerBookService.addOffer(model.offer, - () -> { - BroadcastHandler.useDelay(true); - complete(); - }, + () -> complete(), errorMessage -> { log.error("addOffer failed"); addOfferFailed = true; diff --git a/gui/src/main/java/io/bitsquare/gui/main/MainViewModel.java b/gui/src/main/java/io/bitsquare/gui/main/MainViewModel.java index 7f6b997208..07353da7e1 100644 --- a/gui/src/main/java/io/bitsquare/gui/main/MainViewModel.java +++ b/gui/src/main/java/io/bitsquare/gui/main/MainViewModel.java @@ -207,17 +207,6 @@ public class MainViewModel implements ViewModel { .onAction(BitsquareApp.shutDownHandler::run) .show(); }, 3, TimeUnit.MINUTES); - - /*startupTimeout = FxTimer.runLater(Duration.ofMinutes(3), () -> { - log.warn("startupTimeout called"); - MainView.blur(); - new Popup().warning("The application could not startup after 3 minutes.\n" + - "There might be some network connection problems or a unstable Tor path.\n\n" + - "Please restart and try again.") - .closeButtonText("Shut down") - .onClose(BitsquareApp.shutDownHandler::run) - .show(); - });*/ } public void shutDown() { @@ -269,7 +258,8 @@ public class MainViewModel implements ViewModel { closeConnectionReason == CloseConnectionReason.RULE_VIOLATION) { log.warn("onDisconnect closeConnectionReason=" + closeConnectionReason); log.warn("onDisconnect connection=" + connection); - new Popup() + //TODO + /* new Popup() .warning("You got disconnected from a seed node.\n\n" + "Reason for getting disconnected: " + connection.getRuleViolation().name() + "\n\n" + "It might be that your installed version is not compatible with " + @@ -277,7 +267,7 @@ public class MainViewModel implements ViewModel { "Please check if you run the latest software version.\n" + "You can download the latest version of Bitsquare at:\n" + "https://github.com/bitsquare/bitsquare/releases") - .show(); + .show();*/ } } diff --git a/network/src/main/java/io/bitsquare/p2p/P2PService.java b/network/src/main/java/io/bitsquare/p2p/P2PService.java index a9c6f164e6..5c5a4c8c19 100644 --- a/network/src/main/java/io/bitsquare/p2p/P2PService.java +++ b/network/src/main/java/io/bitsquare/p2p/P2PService.java @@ -556,11 +556,11 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis public void onBroadcastFailed(String errorMessage) { } }; - boolean result = p2PDataStorage.add(protectedMailboxStorageEntry, networkNode.getNodeAddress(), listener); + boolean result = p2PDataStorage.add(protectedMailboxStorageEntry, networkNode.getNodeAddress(), listener, true, true); if (!result) { //TODO remove and add again with a delay to ensure the data will be broadcasted sendMailboxMessageListener.onFault("Data already exists in our local database"); - boolean removeResult = p2PDataStorage.remove(protectedMailboxStorageEntry, networkNode.getNodeAddress()); + boolean removeResult = p2PDataStorage.remove(protectedMailboxStorageEntry, networkNode.getNodeAddress(), true); log.debug("remove result=" + removeResult); } } catch (CryptoException e) { @@ -593,7 +593,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis expirableMailboxStoragePayload, optionalKeyRing.get().getSignatureKeyPair(), receiversPubKey); - p2PDataStorage.removeMailboxData(protectedMailboxStorageEntry, networkNode.getNodeAddress()); + p2PDataStorage.removeMailboxData(protectedMailboxStorageEntry, networkNode.getNodeAddress(), true); } catch (CryptoException e) { log.error("Signing at getDataWithSignedSeqNr failed. That should never happen."); } @@ -616,17 +616,13 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis // Data storage /////////////////////////////////////////////////////////////////////////////////////////// - public boolean addData(StoragePayload storagePayload) { - return addData(storagePayload, false); - } - - public boolean addData(StoragePayload storagePayload, boolean forceBroadcast) { + public boolean addData(StoragePayload storagePayload, boolean forceBroadcast, boolean isDataOwner) { Log.traceCall(); checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen."); if (isBootstrapped()) { try { ProtectedStorageEntry protectedStorageEntry = p2PDataStorage.getProtectedData(storagePayload, optionalKeyRing.get().getSignatureKeyPair()); - return p2PDataStorage.add(protectedStorageEntry, networkNode.getNodeAddress(), forceBroadcast); + return p2PDataStorage.add(protectedStorageEntry, networkNode.getNodeAddress(), null, forceBroadcast, isDataOwner); } catch (CryptoException e) { log.error("Signing at getDataWithSignedSeqNr failed. That should never happen."); return false; @@ -636,13 +632,13 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis } } - public boolean refreshTTL(StoragePayload storagePayload) { + public boolean refreshTTL(StoragePayload storagePayload, boolean isDataOwner) { Log.traceCall(); checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen."); if (isBootstrapped()) { try { RefreshTTLMessage refreshTTLMessage = p2PDataStorage.getRefreshTTLMessage(storagePayload, optionalKeyRing.get().getSignatureKeyPair()); - return p2PDataStorage.refreshTTL(refreshTTLMessage, networkNode.getNodeAddress()); + return p2PDataStorage.refreshTTL(refreshTTLMessage, networkNode.getNodeAddress(), isDataOwner); } catch (CryptoException e) { log.error("Signing at getDataWithSignedSeqNr failed. That should never happen."); return false; @@ -652,13 +648,13 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis } } - public boolean removeData(StoragePayload storagePayload) { + public boolean removeData(StoragePayload storagePayload, boolean isDataOwner) { Log.traceCall(); checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen."); if (isBootstrapped()) { try { ProtectedStorageEntry protectedStorageEntry = p2PDataStorage.getProtectedData(storagePayload, optionalKeyRing.get().getSignatureKeyPair()); - return p2PDataStorage.remove(protectedStorageEntry, networkNode.getNodeAddress()); + return p2PDataStorage.remove(protectedStorageEntry, networkNode.getNodeAddress(), isDataOwner); } catch (CryptoException e) { log.error("Signing at getDataWithSignedSeqNr failed. That should never happen."); return false; diff --git a/network/src/main/java/io/bitsquare/p2p/network/Connection.java b/network/src/main/java/io/bitsquare/p2p/network/Connection.java index 5f1ae695af..19877f6501 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/Connection.java +++ b/network/src/main/java/io/bitsquare/p2p/network/Connection.java @@ -60,8 +60,8 @@ public class Connection implements MessageListener { private static final int MAX_MSG_SIZE = 100 * 1024; // 100 kb of compressed data //TODO decrease limits again after testing - private static final int MSG_THROTTLE_PER_SEC = 100; // With MAX_MSG_SIZE of 100kb results in bandwidth of 100 mbit/sec - private static final int MSG_THROTTLE_PER_10_SEC = 1000; // With MAX_MSG_SIZE of 100kb results in bandwidth of 1000 mbit/sec for 10 sec + private static final int MSG_THROTTLE_PER_SEC = 10; // With MAX_MSG_SIZE of 100kb results in bandwidth of 10 mbit/sec + private static final int MSG_THROTTLE_PER_10_SEC = 100; // With MAX_MSG_SIZE of 100kb results in bandwidth of 100 mbit/sec for 10 sec private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(60); public static int getMaxMsgSize() { @@ -235,8 +235,6 @@ public class Connection implements MessageListener { if (violated) { log.error("violatesThrottleLimit 1 "); log.error("elapsed " + (now - compareValue)); - log.error("now " + now); - log.error("compareValue " + compareValue); log.error("messageTimeStamps: \n\t" + messageTimeStamps.stream() .map(e -> "\n\tts=" + e.first.toString() + " message=" + e.second.toString()) .collect(Collectors.toList()).toString()); @@ -252,8 +250,10 @@ public class Connection implements MessageListener { if (violated) { log.error("violatesThrottleLimit 2 "); - log.error("compareValue " + compareValue); - log.error("messageTimeStamps: \n\t" + messageTimeStamps.stream().map(e -> e.second.toString() + "\n\t").toString()); + log.error("elapsed " + (now - compareValue)); + log.error("messageTimeStamps: \n\t" + messageTimeStamps.stream() + .map(e -> "\n\tts=" + e.first.toString() + " message=" + e.second.toString()) + .collect(Collectors.toList()).toString()); } } // we limit to max 50 (MSG_THROTTLE_PER_10SEC) entries diff --git a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java index 6e367843a7..5b378a38f5 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java +++ b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java @@ -334,7 +334,7 @@ public abstract class NetworkNode implements MessageListener { /////////////////////////////////////////////////////////////////////////////////////////// void createExecutorService() { - executorService = Utilities.getListeningExecutorService("NetworkNode-" + servicePort, 50, 100, 2 * 60); + executorService = Utilities.getListeningExecutorService("NetworkNode-" + servicePort, 15, 30, 60); } void startServer(ServerSocket serverSocket) { diff --git a/network/src/main/java/io/bitsquare/p2p/peers/BroadcastHandler.java b/network/src/main/java/io/bitsquare/p2p/peers/BroadcastHandler.java index 0ff5351af1..372fb9d771 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/BroadcastHandler.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/BroadcastHandler.java @@ -16,9 +16,7 @@ import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashSet; -import java.util.Set; -import java.util.UUID; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -29,13 +27,7 @@ public class BroadcastHandler implements PeerManager.Listener { private static final Logger log = LoggerFactory.getLogger(BroadcastHandler.class); private static final long TIMEOUT_PER_PEER_SEC = Timer.STRESS_TEST ? 5 : 30; - private static final long DELAY_MS = Timer.STRESS_TEST ? 1000 : 2000; - private static boolean USE_DELAY; - - public static void useDelay(boolean useDelay) { - USE_DELAY = useDelay; - } - + private static final long DELAY_MS = Timer.STRESS_TEST ? 100 : 500; interface ResultHandler { void onCompleted(BroadcastHandler broadcastHandler); @@ -100,7 +92,7 @@ public class BroadcastHandler implements PeerManager.Listener { /////////////////////////////////////////////////////////////////////////////////////////// public void broadcast(BroadcastMessage message, @Nullable NodeAddress sender, ResultHandler resultHandler, - @Nullable Listener listener) { + @Nullable Listener listener, boolean isDataOwner) { this.message = message; this.resultHandler = resultHandler; this.listener = listener; @@ -112,9 +104,23 @@ public class BroadcastHandler implements PeerManager.Listener { .filter(connection -> !connection.getPeersNodeAddressOptional().get().equals(sender)) .collect(Collectors.toSet()); if (!receivers.isEmpty()) { - numOfPeers = receivers.size(); numOfCompletedBroadcasts = 0; - log.info("Broadcast message to {} peers.", numOfPeers); + + if (isDataOwner) { + // the data owner sends to all and immediately + receivers.stream().forEach(connection -> sendToPeer(connection, message)); + numOfPeers = receivers.size(); + log.info("Broadcast message to {} peers.", numOfPeers); + } else { + // for relay nodes we limit to 2 recipients and use a delay + List list = new ArrayList<>(receivers); + Collections.shuffle(list); + list = list.subList(0, Math.min(2, list.size())); + numOfPeers = list.size(); + log.info("Broadcast message to {} peers.", numOfPeers); + list.stream().forEach(connection -> UserThread.runAfterRandomDelay(() -> + sendToPeer(connection, message), DELAY_MS, DELAY_MS * 2, TimeUnit.MILLISECONDS)); + } long timeoutDelay = TIMEOUT_PER_PEER_SEC * receivers.size(); timeoutTimer = UserThread.runAfter(() -> { @@ -129,13 +135,6 @@ public class BroadcastHandler implements PeerManager.Listener { "broadcastQueue=" + broadcastQueue); onFault(errorMessage); }, timeoutDelay); - - if (USE_DELAY) { - receivers.stream().forEach(connection -> UserThread.runAfterRandomDelay(() -> - sendToPeer(connection, message), DELAY_MS, DELAY_MS * 2, TimeUnit.MILLISECONDS)); - } else { - receivers.stream().forEach(connection -> sendToPeer(connection, message)); - } } else { onFault("Message not broadcasted because we have no available peers yet.\n\t" + "message = " + StringUtils.abbreviate(message.toString(), 100), false); diff --git a/network/src/main/java/io/bitsquare/p2p/peers/Broadcaster.java b/network/src/main/java/io/bitsquare/p2p/peers/Broadcaster.java index 08c393bef0..96676feff2 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/Broadcaster.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/Broadcaster.java @@ -39,12 +39,13 @@ public class Broadcaster implements BroadcastHandler.ResultHandler { // API /////////////////////////////////////////////////////////////////////////////////////////// - public void broadcast(BroadcastMessage message, @Nullable NodeAddress sender, @Nullable BroadcastHandler.Listener listener) { + public void broadcast(BroadcastMessage message, @Nullable NodeAddress sender, + @Nullable BroadcastHandler.Listener listener, boolean isDataOwner) { Log.traceCall("Sender=" + sender + "\n\t" + "Message=" + StringUtils.abbreviate(message.toString(), 100)); BroadcastHandler broadcastHandler = new BroadcastHandler(networkNode, peerManager); - broadcastHandler.broadcast(message, sender, this, listener); + broadcastHandler.broadcast(message, sender, this, listener, isDataOwner); broadcastHandlers.add(broadcastHandler); } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java index 2c51f0c9e4..7a93e98ef4 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java @@ -41,13 +41,13 @@ public class PeerManager implements ConnectionListener { public static void setMaxConnections(int maxConnections) { MAX_CONNECTIONS = maxConnections; MIN_CONNECTIONS = Math.max(1, maxConnections - 4); - MAX_CONNECTIONS_PEER = MAX_CONNECTIONS + 5; - MAX_CONNECTIONS_NON_DIRECT = MAX_CONNECTIONS + 10; - MAX_CONNECTIONS_ABSOLUTE = MAX_CONNECTIONS + 30; + MAX_CONNECTIONS_PEER = MAX_CONNECTIONS + 4; + MAX_CONNECTIONS_NON_DIRECT = MAX_CONNECTIONS + 8; + MAX_CONNECTIONS_ABSOLUTE = MAX_CONNECTIONS + 18; } static { - setMaxConnections(12); + setMaxConnections(10); } private static final int MAX_REPORTED_PEERS = 1000; diff --git a/network/src/main/java/io/bitsquare/p2p/peers/getdata/GetDataRequestHandler.java b/network/src/main/java/io/bitsquare/p2p/peers/getdata/GetDataRequestHandler.java index 221388c4fc..8f62f2a605 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/getdata/GetDataRequestHandler.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/getdata/GetDataRequestHandler.java @@ -9,7 +9,6 @@ import io.bitsquare.common.UserThread; import io.bitsquare.p2p.network.CloseConnectionReason; import io.bitsquare.p2p.network.Connection; import io.bitsquare.p2p.network.NetworkNode; -import io.bitsquare.p2p.peers.PeerManager; import io.bitsquare.p2p.peers.getdata.messages.GetDataRequest; import io.bitsquare.p2p.peers.getdata.messages.GetDataResponse; import io.bitsquare.p2p.storage.P2PDataStorage; @@ -20,8 +19,6 @@ import org.slf4j.LoggerFactory; import java.util.HashSet; import java.util.concurrent.TimeUnit; -import static com.google.common.base.Preconditions.checkArgument; - public class GetDataRequestHandler { private static final Logger log = LoggerFactory.getLogger(GetDataRequestHandler.class); @@ -44,19 +41,18 @@ public class GetDataRequestHandler { /////////////////////////////////////////////////////////////////////////////////////////// private final NetworkNode networkNode; - private final PeerManager peerManager; private P2PDataStorage dataStorage; private final Listener listener; private Timer timeoutTimer; + private boolean stopped; /////////////////////////////////////////////////////////////////////////////////////////// // Constructor /////////////////////////////////////////////////////////////////////////////////////////// - public GetDataRequestHandler(NetworkNode networkNode, PeerManager peerManager, P2PDataStorage dataStorage, Listener listener) { + public GetDataRequestHandler(NetworkNode networkNode, P2PDataStorage dataStorage, Listener listener) { this.networkNode = networkNode; - this.peerManager = peerManager; this.dataStorage = dataStorage; this.listener = listener; } @@ -89,28 +85,32 @@ public class GetDataRequestHandler { } }); - checkArgument(timeoutTimer == null, "requestData must not be called twice."); - timeoutTimer = UserThread.runAfter(() -> { - String errorMessage = "A timeout occurred for getDataResponse:" + getDataResponse + - " on connection:" + connection; - handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, connection); - }, - TIME_OUT_SEC, TimeUnit.SECONDS); + if (timeoutTimer == null) { + timeoutTimer = UserThread.runAfter(() -> { + String errorMessage = "A timeout occurred for getDataResponse:" + getDataResponse + + " on connection:" + connection; + handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, connection); + }, + TIME_OUT_SEC, TimeUnit.SECONDS); + } } - /////////////////////////////////////////////////////////////////////////////////////////// // Private /////////////////////////////////////////////////////////////////////////////////////////// private void handleFault(String errorMessage, CloseConnectionReason closeConnectionReason, Connection connection) { - log.info(errorMessage); - //peerManager.shutDownConnection(connection, closeConnectionReason); - cleanup(); - listener.onFault(errorMessage, connection); + if (!stopped) { + log.info(errorMessage + "\n\tcloseConnectionReason=" + closeConnectionReason); + cleanup(); + listener.onFault(errorMessage, connection); + } else { + log.warn("We have already stopped (handleFault)"); + } } private void cleanup() { + stopped = true; if (timeoutTimer != null) { timeoutTimer.stop(); timeoutTimer = null; diff --git a/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataHandler.java b/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataHandler.java index ae5f01bbd2..b7f4beb05c 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataHandler.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataHandler.java @@ -157,7 +157,7 @@ public class RequestDataHandler implements MessageListener { "at that moment"); ((GetDataResponse) message).dataSet.stream() .forEach(protectedData -> dataStorage.add(protectedData, - connection.getPeersNodeAddressOptional().get())); + connection.getPeersNodeAddressOptional().get(), null, false, false)); cleanup(); listener.onComplete(); diff --git a/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataManager.java b/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataManager.java index 09b5e573c6..a6b03b5d83 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataManager.java @@ -53,6 +53,7 @@ public class RequestDataManager implements MessageListener, ConnectionListener, private final Listener listener; private final Map handlerMap = new HashMap<>(); + private Map getDataRequestHandlers = new HashMap<>(); private Optional nodeAddressOfPreliminaryDataRequest = Optional.empty(); private Timer retryTimer; private boolean dataUpdateRequested; @@ -178,25 +179,33 @@ public class RequestDataManager implements MessageListener, ConnectionListener, if (peerManager.isSeedNode(connection)) connection.setPeerType(Connection.PeerType.SEED_NODE); - GetDataRequestHandler getDataRequestHandler = new GetDataRequestHandler(networkNode, peerManager, dataStorage, - new GetDataRequestHandler.Listener() { - @Override - public void onComplete() { - log.trace("requestDataHandshake completed.\n\tConnection={}", connection); - } - - @Override - public void onFault(String errorMessage, @Nullable Connection connection) { - if (!stopped) { - log.trace("GetDataRequestHandler failed.\n\tConnection={}\n\t" + - "ErrorMessage={}", connection, errorMessage); - peerManager.handleConnectionFault(connection); - } else { - log.warn("We have stopped already. We ignore that getDataRequestHandler.handle.onFault call."); + final String uid = connection.getUid(); + if (!getDataRequestHandlers.containsKey(uid)) { + GetDataRequestHandler getDataRequestHandler = new GetDataRequestHandler(networkNode, dataStorage, + new GetDataRequestHandler.Listener() { + @Override + public void onComplete() { + getDataRequestHandlers.remove(uid); + log.trace("requestDataHandshake completed.\n\tConnection={}", connection); } - } - }); - getDataRequestHandler.handle((GetDataRequest) message, connection); + + @Override + public void onFault(String errorMessage, @Nullable Connection connection) { + getDataRequestHandlers.remove(uid); + if (!stopped) { + log.trace("GetDataRequestHandler failed.\n\tConnection={}\n\t" + + "ErrorMessage={}", connection, errorMessage); + peerManager.handleConnectionFault(connection); + } else { + log.warn("We have stopped already. We ignore that getDataRequestHandler.handle.onFault call."); + } + } + }); + getDataRequestHandlers.put(uid, getDataRequestHandler); + getDataRequestHandler.handle((GetDataRequest) message, connection); + } else { + log.warn("We have already a GetDataRequestHandler for that connection started"); + } } else { log.warn("We have stopped already. We ignore that onMessage call."); } diff --git a/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java b/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java index 945f833bfc..de965d3d3a 100644 --- a/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java +++ b/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java @@ -42,7 +42,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { private static final Logger log = LoggerFactory.getLogger(P2PDataStorage.class); @VisibleForTesting - public static int CHECK_TTL_INTERVAL_SEC = Timer.STRESS_TEST ? 5 : 30; + public static int CHECK_TTL_INTERVAL_SEC = Timer.STRESS_TEST ? 5 : 60; private final Broadcaster broadcaster; private final Map map = new ConcurrentHashMap<>(); @@ -113,13 +113,13 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { Log.traceCall(StringUtils.abbreviate(message.toString(), 100) + "\n\tconnection=" + connection); connection.getPeersNodeAddressOptional().ifPresent(peersNodeAddress -> { if (message instanceof AddDataMessage) { - add(((AddDataMessage) message).protectedStorageEntry, peersNodeAddress); + add(((AddDataMessage) message).protectedStorageEntry, peersNodeAddress, null, false, false); } else if (message instanceof RemoveDataMessage) { - remove(((RemoveDataMessage) message).protectedStorageEntry, peersNodeAddress); + remove(((RemoveDataMessage) message).protectedStorageEntry, peersNodeAddress, false); } else if (message instanceof RemoveMailboxDataMessage) { - removeMailboxData(((RemoveMailboxDataMessage) message).protectedMailboxStorageEntry, peersNodeAddress); + removeMailboxData(((RemoveMailboxDataMessage) message).protectedMailboxStorageEntry, peersNodeAddress, false); } else if (message instanceof RefreshTTLMessage) { - refreshTTL((RefreshTTLMessage) message, peersNodeAddress); + refreshTTL((RefreshTTLMessage) message, peersNodeAddress, false); } }); } @@ -171,19 +171,8 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { // API /////////////////////////////////////////////////////////////////////////////////////////// - public boolean add(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender) { - return add(protectedStorageEntry, sender, null, false); - } - - public boolean add(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender, boolean forceBroadcast) { - return add(protectedStorageEntry, sender, null, forceBroadcast); - } - - public boolean add(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender, @Nullable BroadcastHandler.Listener listener) { - return add(protectedStorageEntry, sender, listener, false); - } - - public boolean add(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender, @Nullable BroadcastHandler.Listener listener, boolean forceBroadcast) { + public boolean add(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender, + @Nullable BroadcastHandler.Listener listener, boolean forceBroadcast, boolean isDataOwner) { Log.traceCall(); ByteArray hashOfPayload = getHashAsByteArray(protectedStorageEntry.getStoragePayload()); @@ -209,7 +198,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { log.info("Data set after doAdd: size=" + map.values().size()); if (!containsKey || forceBroadcast) - broadcast(new AddDataMessage(protectedStorageEntry), sender, listener); + broadcast(new AddDataMessage(protectedStorageEntry), sender, listener, isDataOwner); else log.trace("Not broadcasting data as we had it already in our map."); @@ -220,7 +209,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { return result; } - public boolean refreshTTL(RefreshTTLMessage refreshTTLMessage, @Nullable NodeAddress sender) { + public boolean refreshTTL(RefreshTTLMessage refreshTTLMessage, @Nullable NodeAddress sender, boolean isDataOwner) { Log.traceCall(); byte[] hashOfDataAndSeqNr = refreshTTLMessage.hashOfDataAndSeqNr; @@ -235,7 +224,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { log.trace("We got that message with that seq nr already from another peer. We ignore that message."); return true; } else { - PublicKey ownerPubKey = ((StoragePayload) storedData.getStoragePayload()).getOwnerPubKey(); + PublicKey ownerPubKey = storedData.getStoragePayload().getOwnerPubKey(); boolean result = checkSignature(ownerPubKey, hashOfDataAndSeqNr, signature) && isSequenceNrValid(sequenceNumber, hashOfPayload) && checkIfStoredDataPubKeyMatchesNewDataPubKey(ownerPubKey, hashOfPayload); @@ -256,7 +245,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { log.trace(sb.toString()); log.info("Data set after refreshTTL: size=" + map.values().size()); - broadcast(refreshTTLMessage, sender, null); + broadcast(refreshTTLMessage, sender, null, isDataOwner); } else { log.warn("Checks for refreshTTL failed"); } @@ -268,7 +257,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { } } - public boolean remove(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender) { + public boolean remove(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender, boolean isDataOwner) { Log.traceCall(); ByteArray hashOfPayload = getHashAsByteArray(protectedStorageEntry.getStoragePayload()); boolean containsKey = map.containsKey(hashOfPayload); @@ -284,7 +273,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { if (result) { doRemoveProtectedExpirableData(protectedStorageEntry, hashOfPayload); - broadcast(new RemoveDataMessage(protectedStorageEntry), sender, null); + broadcast(new RemoveDataMessage(protectedStorageEntry), sender, null, isDataOwner); sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.sequenceNumber, System.currentTimeMillis())); storage.queueUpForSave(sequenceNumberMap, 100); @@ -294,7 +283,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { return result; } - public boolean removeMailboxData(ProtectedMailboxStorageEntry protectedMailboxStorageEntry, @Nullable NodeAddress sender) { + public boolean removeMailboxData(ProtectedMailboxStorageEntry protectedMailboxStorageEntry, @Nullable NodeAddress sender, boolean isDataOwner) { Log.traceCall(); ByteArray hashOfData = getHashAsByteArray(protectedMailboxStorageEntry.getStoragePayload()); boolean containsKey = map.containsKey(hashOfData); @@ -310,7 +299,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { if (result) { doRemoveProtectedExpirableData(protectedMailboxStorageEntry, hashOfData); - broadcast(new RemoveMailboxDataMessage(protectedMailboxStorageEntry), sender, null); + broadcast(new RemoveMailboxDataMessage(protectedMailboxStorageEntry), sender, null, isDataOwner); sequenceNumberMap.put(hashOfData, new MapValue(protectedMailboxStorageEntry.sequenceNumber, System.currentTimeMillis())); storage.queueUpForSave(sequenceNumberMap, 100); @@ -473,8 +462,9 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { } } - private void broadcast(BroadcastMessage message, @Nullable NodeAddress sender, @Nullable BroadcastHandler.Listener listener) { - broadcaster.broadcast(message, sender, listener); + private void broadcast(BroadcastMessage message, @Nullable NodeAddress sender, + @Nullable BroadcastHandler.Listener listener, boolean isDataOwner) { + broadcaster.broadcast(message, sender, listener, isDataOwner); } private ByteArray getHashAsByteArray(ExpirablePayload data) { diff --git a/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java b/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java index b886178fe6..4ad925bec9 100644 --- a/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java +++ b/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java @@ -99,14 +99,14 @@ public class ProtectedDataStorageTest { //@Test public void testAddAndRemove() throws InterruptedException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException, NoSuchProviderException { ProtectedStorageEntry data = dataStorage1.getProtectedData(mockData, storageSignatureKeyPair1); - Assert.assertTrue(dataStorage1.add(data, null)); + Assert.assertTrue(dataStorage1.add(data, null, null, true, true)); Assert.assertEquals(1, dataStorage1.getMap().size()); int newSequenceNumber = data.sequenceNumber + 1; byte[] hashOfDataAndSeqNr = Hash.getHash(new P2PDataStorage.DataAndSeqNrPair(data.getStoragePayload(), newSequenceNumber)); byte[] signature = Sig.sign(storageSignatureKeyPair1.getPrivate(), hashOfDataAndSeqNr); ProtectedStorageEntry dataToRemove = new ProtectedStorageEntry(data.getStoragePayload(), data.ownerPubKey, newSequenceNumber, signature); - Assert.assertTrue(dataStorage1.remove(dataToRemove, null)); + Assert.assertTrue(dataStorage1.remove(dataToRemove, null, true)); Assert.assertEquals(0, dataStorage1.getMap().size()); } @@ -115,7 +115,7 @@ public class ProtectedDataStorageTest { mockData.ttl = (int) (P2PDataStorage.CHECK_TTL_INTERVAL_SEC * 1.5); ProtectedStorageEntry data = dataStorage1.getProtectedData(mockData, storageSignatureKeyPair1); log.debug("data.date " + data.timeStamp); - Assert.assertTrue(dataStorage1.add(data, null)); + Assert.assertTrue(dataStorage1.add(data, null, null, true, true)); log.debug("test 1"); Assert.assertEquals(1, dataStorage1.getMap().size()); @@ -163,20 +163,20 @@ public class ProtectedDataStorageTest { public void testRefreshTTL() throws InterruptedException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException, NoSuchProviderException { mockData.ttl = (int) (P2PDataStorage.CHECK_TTL_INTERVAL_SEC * 1.5); ProtectedStorageEntry data = dataStorage1.getProtectedData(mockData, storageSignatureKeyPair1); - Assert.assertTrue(dataStorage1.add(data, null)); + Assert.assertTrue(dataStorage1.add(data, null, null, true, true)); Assert.assertEquals(1, dataStorage1.getMap().size()); Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_SEC); log.debug("test 1"); Assert.assertEquals(1, dataStorage1.getMap().size()); RefreshTTLMessage refreshTTLMessage = dataStorage1.getRefreshTTLMessage(mockData, storageSignatureKeyPair1); - Assert.assertTrue(dataStorage1.refreshTTL(refreshTTLMessage, null)); + Assert.assertTrue(dataStorage1.refreshTTL(refreshTTLMessage, null, true)); Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_SEC); log.debug("test 2"); Assert.assertEquals(1, dataStorage1.getMap().size()); refreshTTLMessage = dataStorage1.getRefreshTTLMessage(mockData, storageSignatureKeyPair1); - Assert.assertTrue(dataStorage1.refreshTTL(refreshTTLMessage, null)); + Assert.assertTrue(dataStorage1.refreshTTL(refreshTTLMessage, null, true)); Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_SEC); log.debug("test 3"); Assert.assertEquals(1, dataStorage1.getMap().size());