Add throttle to outgoing messages, use delays when sending msg

This commit is contained in:
Manfred Karrer 2016-03-04 01:24:16 +01:00
parent afd88bc55f
commit 32066b15a7
15 changed files with 91 additions and 47 deletions

View File

@ -50,6 +50,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import javax.inject.Named;
import java.io.File;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@ -357,13 +358,23 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
///////////////////////////////////////////////////////////////////////////////////////////
private void republishOffers() {
Log.traceCall("Number of offer for republish: " + openOffers.size());
int size = openOffers.size();
final ArrayList<OpenOffer> openOffersList = new ArrayList<>(openOffers);
Log.traceCall("Number of offer for republish: " + size);
if (!stopped) {
stopPeriodicRefreshOffersTimer();
openOffers.stream().forEach(openOffer ->
UserThread.runAfterRandomDelay(() ->
republishOffer(openOffer), 1, 1000, TimeUnit.MILLISECONDS));
for (int i = 0; i < size; i++) {
// we delay to avoid reaching throttle limits
// roughly 1 offer per second
final int n = i;
final long minDelay = i * 500 + 1;
final long maxDelay = minDelay * 2 + 500;
UserThread.runAfterRandomDelay(() -> {
OpenOffer openOffer = openOffersList.get(n);
if (openOffers.contains(openOffer))
republishOffer(openOffer);
}, minDelay, maxDelay, TimeUnit.MILLISECONDS);
}
} else {
log.warn("We have stopped already. We ignore that republishOffers call.");
}
@ -418,10 +429,24 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
if (periodicRefreshOffersTimer == null)
periodicRefreshOffersTimer = UserThread.runPeriodically(() -> {
if (!stopped) {
Log.traceCall("Number of offer for refresh: " + openOffers.size());
openOffers.stream().forEach(openOffer ->
UserThread.runAfterRandomDelay(() ->
refreshOffer(openOffer), 1, 5000, TimeUnit.MILLISECONDS));
int size = openOffers.size();
Log.traceCall("Number of offer for refresh: " + size);
//we clone our list as openOffers might change during our delayed call
final ArrayList<OpenOffer> openOffersList = new ArrayList<>(openOffers);
for (int i = 0; i < size; i++) {
// we delay to avoid reaching throttle limits
// roughly 1 offer per second
final int n = i;
final long minDelay = i * 500 + 1;
final long maxDelay = minDelay * 2 + 500;
UserThread.runAfterRandomDelay(() -> {
OpenOffer openOffer = openOffersList.get(n);
// we need to check if in the meantime the offer has been removed
if (openOffers.contains(openOffer))
refreshOffer(openOffer);
}, minDelay, maxDelay, TimeUnit.MILLISECONDS);
}
} else {
log.warn("We have stopped already. We ignore that periodicRefreshOffersTimer.run call.");
}

View File

@ -137,7 +137,6 @@ public class MainViewModel implements ViewModel {
private Timer checkNumberOfP2pNetworkPeersTimer;
private Timer startupTimeout;
private final Map<String, Subscription> disputeIsClosedSubscriptionsMap = new HashMap<>();
private Subscription downloadPercentageSubscription;
///////////////////////////////////////////////////////////////////////////////////////////
@ -187,6 +186,8 @@ public class MainViewModel implements ViewModel {
public void initializeAllServices() {
Log.traceCall();
UserThread.runAfter(() -> tacWindow.showIfNeeded(), 2);
BooleanProperty walletInitialized = initBitcoinWallet();
BooleanProperty p2pNetWorkReady = initP2PNetwork();
@ -447,8 +448,6 @@ public class MainViewModel implements ViewModel {
setupDevDummyPaymentAccount();
setupMarketPriceFeed();
tacWindow.showIfNeeded();
showAppScreen.set(true);
}

View File

@ -209,7 +209,7 @@ public class SeedWordsView extends ActivatableView<GridPane, Void> {
new Popup()
.warning("Your bitcoin wallet is encrypted.\n\n" +
"After restore, the wallet will no longer be encrypted and you must set a new password.")
.closeButtonText("I understand")
.closeButtonText("I got it")
.onClose(() -> doRestore()).show();
} else {
doRestore();

View File

@ -94,7 +94,7 @@ public class FundsView extends ActivatableViewAndModel<TabPane, Activatable> {
"traders.")
.closeButtonText("I want to learn more")
.onClose(() -> Utilities.openWebPage("https://bitsquare.io/faq"))
.actionButtonText("I understand")
.actionButtonText("I got it")
.onAction(() -> {
})
.dontShowAgainId(key, preferences)

View File

@ -162,8 +162,7 @@ class CreateOfferDataModel extends ActivatableDataModel {
}
});
} else {
// Simulate a bit of delay
UserThread.runAfter(() -> feeFromFundingTxProperty.set(FeePolicy.getMinRequiredFeeForFundingTx()), 1);
feeFromFundingTxProperty.set(FeePolicy.getMinRequiredFeeForFundingTx());
}
}
};

View File

@ -263,7 +263,7 @@ public class CreateOfferView extends ActivatableViewAndModel<AnchorPane, CreateO
"It will be refunded to you after the trade has successfully completed.")
.closeButtonText("I want to learn more")
.onClose(() -> Utilities.openWebPage("https://bitsquare.io/faq#6"))
.actionButtonText("I understand")
.actionButtonText("I got it")
.onAction(() -> {
})
.dontShowAgainId(key, preferences)

View File

@ -198,8 +198,7 @@ class TakeOfferDataModel extends ActivatableDataModel {
}
});
} else {
// Simulate a bit of delay
UserThread.runAfter(() -> feeFromFundingTxProperty.set(FeePolicy.getMinRequiredFeeForFundingTx()), 1);
feeFromFundingTxProperty.set(FeePolicy.getMinRequiredFeeForFundingTx());
}
}
};

View File

@ -392,7 +392,7 @@ public class TakeOfferView extends ActivatableViewAndModel<AnchorPane, TakeOffer
"It will be refunded to you after the trade has successfully completed.")
.closeButtonText("I want to learn more")
.onClose(() -> Utilities.openWebPage("https://bitsquare.io/faq#6"))
.actionButtonText("I understand")
.actionButtonText("I got it")
.onAction(() -> {
})
.dontShowAgainId(key, preferences)

View File

@ -253,7 +253,7 @@ class TakeOfferViewModel extends ActivatableWithDataModel<TakeOfferDataModel> im
break;
case NOT_AVAILABLE:
if (takeOfferRequested)
offerWarning.set("Take offer request failed because offer is not available anymore. " +
offerWarning.set("Take offer request failed because the offer is not available anymore. " +
"Maybe another trader has taken the offer in the meantime.");
else
offerWarning.set("You cannot take that offer because the offer was already taken by another trader.");

View File

@ -32,7 +32,7 @@ import javax.inject.Inject;
public class Transitions {
public final static int DEFAULT_DURATION = 400;
public final static int DEFAULT_DURATION = 600;
private final Preferences preferences;
private Timeline removeBlurTimeLine;

View File

@ -98,7 +98,8 @@ public class Connection implements MessageListener {
private final ObjectProperty<NodeAddress> peersNodeAddressProperty = new SimpleObjectProperty<>();
private final List<Tuple2<Long, Serializable>> messageTimeStamps = new ArrayList<>();
private final CopyOnWriteArraySet<MessageListener> messageListeners = new CopyOnWriteArraySet<>();
private volatile long lastSendTimeStamp = 0;
;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
@ -161,6 +162,16 @@ public class Connection implements MessageListener {
public void sendMessage(Message message) {
if (!stopped) {
try {
Log.traceCall();
// Throttle outgoing messages
if (System.currentTimeMillis() - lastSendTimeStamp < 20) {
log.info("We got 2 sendMessage requests in less then 20 ms. We set the thread to sleep " +
"for 50 ms to avoid that we flood our peer. lastSendTimeStamp={}, now={}, elapsed={}",
lastSendTimeStamp, System.currentTimeMillis(), (System.currentTimeMillis() - lastSendTimeStamp));
Thread.sleep(50);
}
lastSendTimeStamp = System.currentTimeMillis();
String peersNodeAddress = peersNodeAddressOptional.isPresent() ? peersNodeAddressOptional.get().toString() : "null";
int size = ByteArrayUtils.objectToByteArray(message).length;

View File

@ -100,30 +100,28 @@ public class BroadcastHandler implements PeerManager.Listener {
Log.traceCall("Sender=" + sender + "\n\t" +
"Message=" + StringUtils.abbreviate(message.toString(), 100));
Set<Connection> connectedPeers = networkNode.getConfirmedConnections()
Set<Connection> connectedPeersSet = networkNode.getConfirmedConnections()
.stream()
.filter(connection -> !connection.getPeersNodeAddressOptional().get().equals(sender))
.collect(Collectors.toSet());
if (!connectedPeers.isEmpty()) {
if (!connectedPeersSet.isEmpty()) {
numOfCompletedBroadcasts = 0;
if (isDataOwner) {
// the data owner sends to all and immediately
connectedPeers.stream().forEach(connection -> sendToPeer(connection, message));
numOfPeers = connectedPeers.size();
log.info("Broadcast message to all {} connected peers.", numOfPeers);
} else {
// for relay nodes we limit to 2 recipients and use a delay
List<Connection> list = new ArrayList<>(connectedPeers);
Collections.shuffle(list);
int size = list.size();
// We want min. 2 nodes
if (size > 3)
list = list.subList(0, size / 2);
numOfPeers = list.size();
log.info("Broadcast message to {} peers out of {} total connected peers.", numOfPeers, connectedPeers.size());
list.stream().forEach(connection -> UserThread.runAfterRandomDelay(() ->
sendToPeer(connection, message), DELAY_MS, DELAY_MS * 2, TimeUnit.MILLISECONDS));
List<Connection> connectedPeersList = new ArrayList<>(connectedPeersSet);
Collections.shuffle(connectedPeersList);
numOfPeers = connectedPeersList.size();
int factor = 1;
if (!isDataOwner) {
// for not data owner (relay nodes) we send to max. 4 nodes and use a longer delay
numOfPeers = Math.min(4, connectedPeersList.size());
factor = 2;
}
log.info("Broadcast message to {} peers out of {} total connected peers.", numOfPeers, connectedPeersSet.size());
for (int i = 0; i < numOfPeers; i++) {
final long minDelay = i * 50 * factor + 1;
final long maxDelay = minDelay * 2 + 50 * factor;
final Connection connection = connectedPeersList.get(i);
UserThread.runAfterRandomDelay(() -> sendToPeer(connection, message), minDelay, maxDelay, TimeUnit.MILLISECONDS);
}
long timeoutDelay = TIMEOUT_PER_PEER_SEC * numOfPeers;

View File

@ -18,12 +18,16 @@ import io.bitsquare.p2p.peers.getdata.messages.GetDataResponse;
import io.bitsquare.p2p.peers.getdata.messages.GetUpdatedDataRequest;
import io.bitsquare.p2p.peers.getdata.messages.PreliminaryGetDataRequest;
import io.bitsquare.p2p.storage.P2PDataStorage;
import io.bitsquare.p2p.storage.storageentry.ProtectedStorageEntry;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
@ -155,9 +159,18 @@ public class RequestDataHandler implements MessageListener {
checkArgument(connection.getPeersNodeAddressOptional().isPresent(),
"RequestDataHandler.onMessage: connection.getPeersNodeAddressOptional() must be present " +
"at that moment");
((GetDataResponse) message).dataSet.stream()
.forEach(protectedData -> dataStorage.add(protectedData,
connection.getPeersNodeAddressOptional().get(), null, false));
final List<ProtectedStorageEntry> dataList = new ArrayList<>(((GetDataResponse) message).dataSet);
final NodeAddress sender = connection.getPeersNodeAddressOptional().get();
for (int i = 0; i < dataList.size(); i++) {
// roughly 3-6 sec for 100 entries
final long minDelay = i * 30 + 1;
final long maxDelay = minDelay * 2 + 30;
final ProtectedStorageEntry protectedData = dataList.get(i);
// TODO questionable if it is needed to relay the data to our peers
UserThread.runAfterRandomDelay(() -> dataStorage.add(protectedData, sender, null, false),
minDelay, maxDelay, TimeUnit.MILLISECONDS);
}
cleanup();
listener.onComplete();

View File

@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit;
class KeepAliveHandler implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(KeepAliveHandler.class);
private static int DELAY_MS = Timer.STRESS_TEST ? 1000 : 5000;
private static int DELAY_MS = Timer.STRESS_TEST ? 1000 : 10_000;
///////////////////////////////////////////////////////////////////////////////////////////

View File

@ -27,7 +27,7 @@ class PeerExchangeHandler implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(PeerExchangeHandler.class);
private static final long TIME_OUT_SEC = Timer.STRESS_TEST ? 5 : 20;
private static int DELAY_MS = Timer.STRESS_TEST ? 1000 : 3000;
private static int DELAY_MS = Timer.STRESS_TEST ? 1000 : 1000;
///////////////////////////////////////////////////////////////////////////////////////////