From 32066b15a7cd9285cdd990980cc085ef4da5de92 Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Fri, 4 Mar 2016 01:24:16 +0100 Subject: [PATCH] Add throttle to outgoing messages, use delays when sending msg --- .../trade/offer/OpenOfferManager.java | 43 +++++++++++++++---- .../io/bitsquare/gui/main/MainViewModel.java | 5 +-- .../content/seedwords/SeedWordsView.java | 2 +- .../bitsquare/gui/main/funds/FundsView.java | 2 +- .../createoffer/CreateOfferDataModel.java | 3 +- .../offer/createoffer/CreateOfferView.java | 2 +- .../offer/takeoffer/TakeOfferDataModel.java | 3 +- .../main/offer/takeoffer/TakeOfferView.java | 2 +- .../offer/takeoffer/TakeOfferViewModel.java | 2 +- .../io/bitsquare/gui/util/Transitions.java | 2 +- .../io/bitsquare/p2p/network/Connection.java | 13 +++++- .../bitsquare/p2p/peers/BroadcastHandler.java | 36 ++++++++-------- .../p2p/peers/getdata/RequestDataHandler.java | 19 ++++++-- .../p2p/peers/keepalive/KeepAliveHandler.java | 2 +- .../peerexchange/PeerExchangeHandler.java | 2 +- 15 files changed, 91 insertions(+), 47 deletions(-) diff --git a/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java b/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java index f1ee08eed7..59d1e39cc5 100644 --- a/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java +++ b/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java @@ -50,6 +50,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import javax.inject.Named; import java.io.File; +import java.util.ArrayList; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -357,13 +358,23 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe /////////////////////////////////////////////////////////////////////////////////////////// private void republishOffers() { - Log.traceCall("Number of offer for republish: " + openOffers.size()); + int size = openOffers.size(); + final ArrayList openOffersList = new ArrayList<>(openOffers); + Log.traceCall("Number of offer for republish: " + size); if (!stopped) { stopPeriodicRefreshOffersTimer(); - - openOffers.stream().forEach(openOffer -> - UserThread.runAfterRandomDelay(() -> - republishOffer(openOffer), 1, 1000, TimeUnit.MILLISECONDS)); + for (int i = 0; i < size; i++) { + // we delay to avoid reaching throttle limits + // roughly 1 offer per second + final int n = i; + final long minDelay = i * 500 + 1; + final long maxDelay = minDelay * 2 + 500; + UserThread.runAfterRandomDelay(() -> { + OpenOffer openOffer = openOffersList.get(n); + if (openOffers.contains(openOffer)) + republishOffer(openOffer); + }, minDelay, maxDelay, TimeUnit.MILLISECONDS); + } } else { log.warn("We have stopped already. We ignore that republishOffers call."); } @@ -418,10 +429,24 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe if (periodicRefreshOffersTimer == null) periodicRefreshOffersTimer = UserThread.runPeriodically(() -> { if (!stopped) { - Log.traceCall("Number of offer for refresh: " + openOffers.size()); - openOffers.stream().forEach(openOffer -> - UserThread.runAfterRandomDelay(() -> - refreshOffer(openOffer), 1, 5000, TimeUnit.MILLISECONDS)); + int size = openOffers.size(); + Log.traceCall("Number of offer for refresh: " + size); + + //we clone our list as openOffers might change during our delayed call + final ArrayList openOffersList = new ArrayList<>(openOffers); + for (int i = 0; i < size; i++) { + // we delay to avoid reaching throttle limits + // roughly 1 offer per second + final int n = i; + final long minDelay = i * 500 + 1; + final long maxDelay = minDelay * 2 + 500; + UserThread.runAfterRandomDelay(() -> { + OpenOffer openOffer = openOffersList.get(n); + // we need to check if in the meantime the offer has been removed + if (openOffers.contains(openOffer)) + refreshOffer(openOffer); + }, minDelay, maxDelay, TimeUnit.MILLISECONDS); + } } else { log.warn("We have stopped already. We ignore that periodicRefreshOffersTimer.run call."); } diff --git a/gui/src/main/java/io/bitsquare/gui/main/MainViewModel.java b/gui/src/main/java/io/bitsquare/gui/main/MainViewModel.java index 79479092e2..942d0089d3 100644 --- a/gui/src/main/java/io/bitsquare/gui/main/MainViewModel.java +++ b/gui/src/main/java/io/bitsquare/gui/main/MainViewModel.java @@ -137,7 +137,6 @@ public class MainViewModel implements ViewModel { private Timer checkNumberOfP2pNetworkPeersTimer; private Timer startupTimeout; private final Map disputeIsClosedSubscriptionsMap = new HashMap<>(); - private Subscription downloadPercentageSubscription; /////////////////////////////////////////////////////////////////////////////////////////// @@ -187,6 +186,8 @@ public class MainViewModel implements ViewModel { public void initializeAllServices() { Log.traceCall(); + UserThread.runAfter(() -> tacWindow.showIfNeeded(), 2); + BooleanProperty walletInitialized = initBitcoinWallet(); BooleanProperty p2pNetWorkReady = initP2PNetwork(); @@ -447,8 +448,6 @@ public class MainViewModel implements ViewModel { setupDevDummyPaymentAccount(); setupMarketPriceFeed(); - tacWindow.showIfNeeded(); - showAppScreen.set(true); } diff --git a/gui/src/main/java/io/bitsquare/gui/main/account/content/seedwords/SeedWordsView.java b/gui/src/main/java/io/bitsquare/gui/main/account/content/seedwords/SeedWordsView.java index a90f8a33d8..606a07db16 100644 --- a/gui/src/main/java/io/bitsquare/gui/main/account/content/seedwords/SeedWordsView.java +++ b/gui/src/main/java/io/bitsquare/gui/main/account/content/seedwords/SeedWordsView.java @@ -209,7 +209,7 @@ public class SeedWordsView extends ActivatableView { new Popup() .warning("Your bitcoin wallet is encrypted.\n\n" + "After restore, the wallet will no longer be encrypted and you must set a new password.") - .closeButtonText("I understand") + .closeButtonText("I got it") .onClose(() -> doRestore()).show(); } else { doRestore(); diff --git a/gui/src/main/java/io/bitsquare/gui/main/funds/FundsView.java b/gui/src/main/java/io/bitsquare/gui/main/funds/FundsView.java index 63b3ecbadd..967e1dbbda 100644 --- a/gui/src/main/java/io/bitsquare/gui/main/funds/FundsView.java +++ b/gui/src/main/java/io/bitsquare/gui/main/funds/FundsView.java @@ -94,7 +94,7 @@ public class FundsView extends ActivatableViewAndModel { "traders.") .closeButtonText("I want to learn more") .onClose(() -> Utilities.openWebPage("https://bitsquare.io/faq")) - .actionButtonText("I understand") + .actionButtonText("I got it") .onAction(() -> { }) .dontShowAgainId(key, preferences) diff --git a/gui/src/main/java/io/bitsquare/gui/main/offer/createoffer/CreateOfferDataModel.java b/gui/src/main/java/io/bitsquare/gui/main/offer/createoffer/CreateOfferDataModel.java index 63ca9ccf04..56db1b9952 100644 --- a/gui/src/main/java/io/bitsquare/gui/main/offer/createoffer/CreateOfferDataModel.java +++ b/gui/src/main/java/io/bitsquare/gui/main/offer/createoffer/CreateOfferDataModel.java @@ -162,8 +162,7 @@ class CreateOfferDataModel extends ActivatableDataModel { } }); } else { - // Simulate a bit of delay - UserThread.runAfter(() -> feeFromFundingTxProperty.set(FeePolicy.getMinRequiredFeeForFundingTx()), 1); + feeFromFundingTxProperty.set(FeePolicy.getMinRequiredFeeForFundingTx()); } } }; diff --git a/gui/src/main/java/io/bitsquare/gui/main/offer/createoffer/CreateOfferView.java b/gui/src/main/java/io/bitsquare/gui/main/offer/createoffer/CreateOfferView.java index bb425025dd..d6f4d1074a 100644 --- a/gui/src/main/java/io/bitsquare/gui/main/offer/createoffer/CreateOfferView.java +++ b/gui/src/main/java/io/bitsquare/gui/main/offer/createoffer/CreateOfferView.java @@ -263,7 +263,7 @@ public class CreateOfferView extends ActivatableViewAndModel Utilities.openWebPage("https://bitsquare.io/faq#6")) - .actionButtonText("I understand") + .actionButtonText("I got it") .onAction(() -> { }) .dontShowAgainId(key, preferences) diff --git a/gui/src/main/java/io/bitsquare/gui/main/offer/takeoffer/TakeOfferDataModel.java b/gui/src/main/java/io/bitsquare/gui/main/offer/takeoffer/TakeOfferDataModel.java index 67ba53800f..8f54b7167f 100644 --- a/gui/src/main/java/io/bitsquare/gui/main/offer/takeoffer/TakeOfferDataModel.java +++ b/gui/src/main/java/io/bitsquare/gui/main/offer/takeoffer/TakeOfferDataModel.java @@ -198,8 +198,7 @@ class TakeOfferDataModel extends ActivatableDataModel { } }); } else { - // Simulate a bit of delay - UserThread.runAfter(() -> feeFromFundingTxProperty.set(FeePolicy.getMinRequiredFeeForFundingTx()), 1); + feeFromFundingTxProperty.set(FeePolicy.getMinRequiredFeeForFundingTx()); } } }; diff --git a/gui/src/main/java/io/bitsquare/gui/main/offer/takeoffer/TakeOfferView.java b/gui/src/main/java/io/bitsquare/gui/main/offer/takeoffer/TakeOfferView.java index f2af9526aa..65c6506ed4 100644 --- a/gui/src/main/java/io/bitsquare/gui/main/offer/takeoffer/TakeOfferView.java +++ b/gui/src/main/java/io/bitsquare/gui/main/offer/takeoffer/TakeOfferView.java @@ -392,7 +392,7 @@ public class TakeOfferView extends ActivatableViewAndModel Utilities.openWebPage("https://bitsquare.io/faq#6")) - .actionButtonText("I understand") + .actionButtonText("I got it") .onAction(() -> { }) .dontShowAgainId(key, preferences) diff --git a/gui/src/main/java/io/bitsquare/gui/main/offer/takeoffer/TakeOfferViewModel.java b/gui/src/main/java/io/bitsquare/gui/main/offer/takeoffer/TakeOfferViewModel.java index 1b97fc998f..8afb250c10 100644 --- a/gui/src/main/java/io/bitsquare/gui/main/offer/takeoffer/TakeOfferViewModel.java +++ b/gui/src/main/java/io/bitsquare/gui/main/offer/takeoffer/TakeOfferViewModel.java @@ -253,7 +253,7 @@ class TakeOfferViewModel extends ActivatableWithDataModel im break; case NOT_AVAILABLE: if (takeOfferRequested) - offerWarning.set("Take offer request failed because offer is not available anymore. " + + offerWarning.set("Take offer request failed because the offer is not available anymore. " + "Maybe another trader has taken the offer in the meantime."); else offerWarning.set("You cannot take that offer because the offer was already taken by another trader."); diff --git a/gui/src/main/java/io/bitsquare/gui/util/Transitions.java b/gui/src/main/java/io/bitsquare/gui/util/Transitions.java index c59a13675e..f7b3dd9a5b 100644 --- a/gui/src/main/java/io/bitsquare/gui/util/Transitions.java +++ b/gui/src/main/java/io/bitsquare/gui/util/Transitions.java @@ -32,7 +32,7 @@ import javax.inject.Inject; public class Transitions { - public final static int DEFAULT_DURATION = 400; + public final static int DEFAULT_DURATION = 600; private final Preferences preferences; private Timeline removeBlurTimeLine; diff --git a/network/src/main/java/io/bitsquare/p2p/network/Connection.java b/network/src/main/java/io/bitsquare/p2p/network/Connection.java index 2e629646ef..9ffbc76e18 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/Connection.java +++ b/network/src/main/java/io/bitsquare/p2p/network/Connection.java @@ -98,7 +98,8 @@ public class Connection implements MessageListener { private final ObjectProperty peersNodeAddressProperty = new SimpleObjectProperty<>(); private final List> messageTimeStamps = new ArrayList<>(); private final CopyOnWriteArraySet messageListeners = new CopyOnWriteArraySet<>(); - + private volatile long lastSendTimeStamp = 0; + ; /////////////////////////////////////////////////////////////////////////////////////////// // Constructor @@ -161,6 +162,16 @@ public class Connection implements MessageListener { public void sendMessage(Message message) { if (!stopped) { try { + Log.traceCall(); + // Throttle outgoing messages + if (System.currentTimeMillis() - lastSendTimeStamp < 20) { + log.info("We got 2 sendMessage requests in less then 20 ms. We set the thread to sleep " + + "for 50 ms to avoid that we flood our peer. lastSendTimeStamp={}, now={}, elapsed={}", + lastSendTimeStamp, System.currentTimeMillis(), (System.currentTimeMillis() - lastSendTimeStamp)); + Thread.sleep(50); + } + + lastSendTimeStamp = System.currentTimeMillis(); String peersNodeAddress = peersNodeAddressOptional.isPresent() ? peersNodeAddressOptional.get().toString() : "null"; int size = ByteArrayUtils.objectToByteArray(message).length; diff --git a/network/src/main/java/io/bitsquare/p2p/peers/BroadcastHandler.java b/network/src/main/java/io/bitsquare/p2p/peers/BroadcastHandler.java index 9a0d6bdcac..bdf8592791 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/BroadcastHandler.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/BroadcastHandler.java @@ -100,30 +100,28 @@ public class BroadcastHandler implements PeerManager.Listener { Log.traceCall("Sender=" + sender + "\n\t" + "Message=" + StringUtils.abbreviate(message.toString(), 100)); - Set connectedPeers = networkNode.getConfirmedConnections() + Set connectedPeersSet = networkNode.getConfirmedConnections() .stream() .filter(connection -> !connection.getPeersNodeAddressOptional().get().equals(sender)) .collect(Collectors.toSet()); - if (!connectedPeers.isEmpty()) { + if (!connectedPeersSet.isEmpty()) { numOfCompletedBroadcasts = 0; - if (isDataOwner) { - // the data owner sends to all and immediately - connectedPeers.stream().forEach(connection -> sendToPeer(connection, message)); - numOfPeers = connectedPeers.size(); - log.info("Broadcast message to all {} connected peers.", numOfPeers); - } else { - // for relay nodes we limit to 2 recipients and use a delay - List list = new ArrayList<>(connectedPeers); - Collections.shuffle(list); - int size = list.size(); - // We want min. 2 nodes - if (size > 3) - list = list.subList(0, size / 2); - numOfPeers = list.size(); - log.info("Broadcast message to {} peers out of {} total connected peers.", numOfPeers, connectedPeers.size()); - list.stream().forEach(connection -> UserThread.runAfterRandomDelay(() -> - sendToPeer(connection, message), DELAY_MS, DELAY_MS * 2, TimeUnit.MILLISECONDS)); + List connectedPeersList = new ArrayList<>(connectedPeersSet); + Collections.shuffle(connectedPeersList); + numOfPeers = connectedPeersList.size(); + int factor = 1; + if (!isDataOwner) { + // for not data owner (relay nodes) we send to max. 4 nodes and use a longer delay + numOfPeers = Math.min(4, connectedPeersList.size()); + factor = 2; + } + log.info("Broadcast message to {} peers out of {} total connected peers.", numOfPeers, connectedPeersSet.size()); + for (int i = 0; i < numOfPeers; i++) { + final long minDelay = i * 50 * factor + 1; + final long maxDelay = minDelay * 2 + 50 * factor; + final Connection connection = connectedPeersList.get(i); + UserThread.runAfterRandomDelay(() -> sendToPeer(connection, message), minDelay, maxDelay, TimeUnit.MILLISECONDS); } long timeoutDelay = TIMEOUT_PER_PEER_SEC * numOfPeers; diff --git a/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataHandler.java b/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataHandler.java index a81c1020af..3b021f81a8 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataHandler.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataHandler.java @@ -18,12 +18,16 @@ import io.bitsquare.p2p.peers.getdata.messages.GetDataResponse; import io.bitsquare.p2p.peers.getdata.messages.GetUpdatedDataRequest; import io.bitsquare.p2p.peers.getdata.messages.PreliminaryGetDataRequest; import io.bitsquare.p2p.storage.P2PDataStorage; +import io.bitsquare.p2p.storage.storageentry.ProtectedStorageEntry; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; import java.util.Random; +import java.util.concurrent.TimeUnit; import static com.google.common.base.Preconditions.checkArgument; @@ -155,9 +159,18 @@ public class RequestDataHandler implements MessageListener { checkArgument(connection.getPeersNodeAddressOptional().isPresent(), "RequestDataHandler.onMessage: connection.getPeersNodeAddressOptional() must be present " + "at that moment"); - ((GetDataResponse) message).dataSet.stream() - .forEach(protectedData -> dataStorage.add(protectedData, - connection.getPeersNodeAddressOptional().get(), null, false)); + + final List dataList = new ArrayList<>(((GetDataResponse) message).dataSet); + final NodeAddress sender = connection.getPeersNodeAddressOptional().get(); + for (int i = 0; i < dataList.size(); i++) { + // roughly 3-6 sec for 100 entries + final long minDelay = i * 30 + 1; + final long maxDelay = minDelay * 2 + 30; + final ProtectedStorageEntry protectedData = dataList.get(i); + // TODO questionable if it is needed to relay the data to our peers + UserThread.runAfterRandomDelay(() -> dataStorage.add(protectedData, sender, null, false), + minDelay, maxDelay, TimeUnit.MILLISECONDS); + } cleanup(); listener.onComplete(); diff --git a/network/src/main/java/io/bitsquare/p2p/peers/keepalive/KeepAliveHandler.java b/network/src/main/java/io/bitsquare/p2p/peers/keepalive/KeepAliveHandler.java index 5aef9d1ccb..341e373842 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/keepalive/KeepAliveHandler.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/keepalive/KeepAliveHandler.java @@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit; class KeepAliveHandler implements MessageListener { private static final Logger log = LoggerFactory.getLogger(KeepAliveHandler.class); - private static int DELAY_MS = Timer.STRESS_TEST ? 1000 : 5000; + private static int DELAY_MS = Timer.STRESS_TEST ? 1000 : 10_000; /////////////////////////////////////////////////////////////////////////////////////////// diff --git a/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/PeerExchangeHandler.java b/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/PeerExchangeHandler.java index 319efc2af2..8cf25d0b77 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/PeerExchangeHandler.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/PeerExchangeHandler.java @@ -27,7 +27,7 @@ class PeerExchangeHandler implements MessageListener { private static final Logger log = LoggerFactory.getLogger(PeerExchangeHandler.class); private static final long TIME_OUT_SEC = Timer.STRESS_TEST ? 5 : 20; - private static int DELAY_MS = Timer.STRESS_TEST ? 1000 : 3000; + private static int DELAY_MS = Timer.STRESS_TEST ? 1000 : 1000; ///////////////////////////////////////////////////////////////////////////////////////////