Improve broadcast listener handling, add randomized delay for messages to all connected peers

This commit is contained in:
Manfred Karrer 2016-02-23 13:04:49 +01:00
parent 2de98b6af4
commit 0908c6d61f
13 changed files with 405 additions and 249 deletions

View File

@ -360,35 +360,39 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
if (!stopped) {
stopPeriodicRefreshOffersTimer();
openOffers.stream().forEach(openOffer -> {
offerBookService.republishOffers(openOffer.getOffer(),
() -> {
if (!stopped) {
log.debug("Successful added offer to P2P network");
// Refresh means we send only the dat needed to refresh the TTL (hash, signature and sequence nr.)
if (periodicRefreshOffersTimer == null)
startPeriodicRefreshOffersTimer();
} else {
log.warn("We have stopped already. We ignore that offerBookService.republishOffers.onSuccess call.");
}
},
errorMessage -> {
if (!stopped) {
log.error("Add offer to P2P network failed. " + errorMessage);
stopRetryRepublishOffersTimer();
retryRepublishOffersTimer = UserThread.runAfter(OpenOfferManager.this::republishOffers,
RETRY_REPUBLISH_DELAY_SEC);
} else {
log.warn("We have stopped already. We ignore that offerBookService.republishOffers.onFault call.");
}
});
openOffer.setStorage(openOffersStorage);
});
openOffers.stream().forEach(openOffer ->
UserThread.runAfterRandomDelay(() ->
republishOffer(openOffer), 1, 1000, TimeUnit.MILLISECONDS));
} else {
log.warn("We have stopped already. We ignore that republishOffers call.");
}
}
private void republishOffer(OpenOffer openOffer) {
offerBookService.republishOffers(openOffer.getOffer(),
() -> {
if (!stopped) {
log.debug("Successful added offer to P2P network");
// Refresh means we send only the dat needed to refresh the TTL (hash, signature and sequence nr.)
if (periodicRefreshOffersTimer == null)
startPeriodicRefreshOffersTimer();
} else {
log.warn("We have stopped already. We ignore that offerBookService.republishOffers.onSuccess call.");
}
},
errorMessage -> {
if (!stopped) {
log.error("Add offer to P2P network failed. " + errorMessage);
stopRetryRepublishOffersTimer();
retryRepublishOffersTimer = UserThread.runAfter(OpenOfferManager.this::republishOffers,
RETRY_REPUBLISH_DELAY_SEC);
} else {
log.warn("We have stopped already. We ignore that offerBookService.republishOffers.onFault call.");
}
});
openOffer.setStorage(openOffersStorage);
}
private void startPeriodicRepublishOffersTimer() {
Log.traceCall();
stopped = false;
@ -403,7 +407,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
REPUBLISH_INTERVAL_MILLIS,
TimeUnit.MILLISECONDS);
else
log.warn("periodicRepublishOffersTimer already stated");
log.trace("periodicRepublishOffersTimer already stated");
}
private void startPeriodicRefreshOffersTimer() {
@ -414,11 +418,9 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
periodicRefreshOffersTimer = UserThread.runPeriodically(() -> {
if (!stopped) {
Log.traceCall("Number of offer for refresh: " + openOffers.size());
openOffers.stream().forEach(openOffer -> {
offerBookService.refreshOffer(openOffer.getOffer(),
() -> log.debug("Successful refreshed TTL for offer"),
errorMessage -> log.error("Refresh TTL for offer failed. " + errorMessage));
});
openOffers.stream().forEach(openOffer ->
UserThread.runAfterRandomDelay(() ->
refreshOffer(openOffer), 1, 5000, TimeUnit.MILLISECONDS));
} else {
log.warn("We have stopped already. We ignore that periodicRefreshOffersTimer.run call.");
}
@ -426,7 +428,13 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
REFRESH_INTERVAL_MILLIS,
TimeUnit.MILLISECONDS);
else
log.warn("periodicRefreshOffersTimer already stated");
log.trace("periodicRefreshOffersTimer already stated");
}
private void refreshOffer(OpenOffer openOffer) {
offerBookService.refreshOffer(openOffer.getOffer(),
() -> log.debug("Successful refreshed TTL for offer"),
errorMessage -> log.error("Refresh TTL for offer failed. " + errorMessage));
}
private void restart() {

View File

@ -204,8 +204,8 @@ public class MainViewModel implements ViewModel {
new Popup().warning("The application could not startup after 3 minutes.\n" +
"There might be some network connection problems or a unstable Tor path.\n\n" +
"Please restart and try again.")
.closeButtonText("Shut down")
.onClose(BitsquareApp.shutDownHandler::run)
.actionButtonText("Shut down and start again")
.onAction(BitsquareApp.shutDownHandler::run)
.show();
}, 3, TimeUnit.MINUTES);

View File

@ -42,7 +42,7 @@
<Label fx:id="bitcoinPeersLabel" text="Connected peers:" GridPane.rowIndex="2"/>
<TextArea fx:id="bitcoinPeersTextArea" GridPane.rowIndex="2" GridPane.columnIndex="1" GridPane.hgrow="ALWAYS"
GridPane.vgrow="ALWAYS" editable="false" focusTraversable="false"/>
GridPane.vgrow="SOMETIMES" editable="false" focusTraversable="false"/>
<TitledGroupBg text="P2P network" GridPane.rowIndex="3" GridPane.rowSpan="5">
<padding>
@ -66,7 +66,8 @@
</TextField>
<Label fx:id="p2PPeersLabel" text="Connected peers:" GridPane.rowIndex="4"/>
<TableView fx:id="p2PPeerTable" GridPane.rowIndex="4" GridPane.columnIndex="1" GridPane.hgrow="ALWAYS">
<TableView fx:id="p2PPeerTable" GridPane.rowIndex="4" GridPane.columnIndex="1" GridPane.hgrow="ALWAYS"
GridPane.vgrow="ALWAYS">
<columns>
<TableColumn text="Onion address" fx:id="onionAddressColumn" minWidth="220">
<cellValueFactory>

View File

@ -96,7 +96,7 @@ public class NetworkSettingsView extends ActivatableViewAndModel<GridPane, Activ
GridPane.setMargin(p2PPeersLabel, new Insets(4, 0, 0, 0));
GridPane.setValignment(p2PPeersLabel, VPos.TOP);
bitcoinPeersTextArea.setPrefRowCount(12);
bitcoinPeersTextArea.setPrefRowCount(10);
netWorkComboBox.setItems(FXCollections.observableArrayList(BitcoinNetwork.values()));
netWorkComboBox.getSelectionModel().select(preferences.getBitcoinNetwork());
netWorkComboBox.setOnAction(e -> onSelectNetwork());
@ -142,11 +142,13 @@ public class NetworkSettingsView extends ActivatableViewAndModel<GridPane, Activ
}
};
p2PPeerTable.setMinHeight(300);
p2PPeerTable.setColumnResizePolicy(TableView.CONSTRAINED_RESIZE_POLICY);
p2PPeerTable.setPlaceholder(new Label("No connections are available"));
p2PPeerTable.getSortOrder().add(creationDateColumn);
creationDateColumn.setSortType(TableColumn.SortType.ASCENDING);
//TODO sorting needs other NetworkStatisticListItem as columns type
/* creationDateColumn.setComparator((o1, o2) ->
o1.statistic.getCreationDate().compareTo(o2.statistic.getCreationDate()));

View File

@ -9,7 +9,6 @@ import com.google.inject.name.Named;
import io.bitsquare.app.Log;
import io.bitsquare.app.ProgramArguments;
import io.bitsquare.common.Clock;
import io.bitsquare.common.Timer;
import io.bitsquare.common.UserThread;
import io.bitsquare.common.crypto.CryptoException;
import io.bitsquare.common.crypto.KeyRing;
@ -18,6 +17,7 @@ import io.bitsquare.crypto.DecryptedMsgWithPubKey;
import io.bitsquare.crypto.EncryptionService;
import io.bitsquare.p2p.messaging.*;
import io.bitsquare.p2p.network.*;
import io.bitsquare.p2p.peers.BroadcastHandler;
import io.bitsquare.p2p.peers.Broadcaster;
import io.bitsquare.p2p.peers.PeerManager;
import io.bitsquare.p2p.peers.getdata.RequestDataManager;
@ -27,13 +27,13 @@ import io.bitsquare.p2p.seed.SeedNodesRepository;
import io.bitsquare.p2p.storage.HashMapChangedListener;
import io.bitsquare.p2p.storage.P2PDataStorage;
import io.bitsquare.p2p.storage.messages.AddDataMessage;
import io.bitsquare.p2p.storage.messages.BroadcastMessage;
import io.bitsquare.p2p.storage.messages.RefreshTTLMessage;
import io.bitsquare.p2p.storage.payload.MailboxStoragePayload;
import io.bitsquare.p2p.storage.payload.StoragePayload;
import io.bitsquare.p2p.storage.storageentry.ProtectedMailboxStorageEntry;
import io.bitsquare.p2p.storage.storageentry.ProtectedStorageEntry;
import javafx.beans.property.*;
import javafx.beans.value.ChangeListener;
import org.fxmisc.easybind.EasyBind;
import org.fxmisc.easybind.Subscription;
import org.fxmisc.easybind.monadic.MonadicBinding;
@ -82,10 +82,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
private volatile boolean shutDownInProgress;
private boolean shutDownComplete;
private ChangeListener<NodeAddress> connectionNodeAddressListener;
private Subscription networkReadySubscription;
private boolean isBootstrapped;
private ChangeListener<Number> numOfBroadcastsChangeListener;
private KeepAliveManager keepAliveManager;
@ -116,10 +114,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
}
private void init(boolean useLocalhost, int networkId, File storageDir) {
connectionNodeAddressListener = (observable, oldValue, newValue) ->
UserThread.execute(() ->
numConnectedPeers.set(networkNode.getNodeAddressesOfConfirmedConnections().size()));
networkNode = useLocalhost ? new LocalhostNetworkNode(port) : new TorNetworkNode(port, torDir);
networkNode.addConnectionListener(this);
networkNode.addMessageListener(this);
@ -147,6 +141,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
if (newValue)
onNetworkReady();
});
numConnectedPeers.set(networkNode.getAllConnections().size());
}
@ -307,24 +303,15 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
@Override
public void onConnection(Connection connection) {
if (connection.getPeersNodeAddressOptional().isPresent()) {
connectionNodeAddressListener.changed(connection.peersNodeAddressProperty(), null,
connection.peersNodeAddressProperty().get());
} else {
connection.peersNodeAddressProperty().addListener(connectionNodeAddressListener);
}
numConnectedPeers.set(networkNode.getAllConnections().size());
UserThread.runAfter(() -> numConnectedPeers.set(networkNode.getAllConnections().size()), 1);
}
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
Log.traceCall();
connection.peersNodeAddressProperty().removeListener(connectionNodeAddressListener);
// We removed the listener after a delay to be sure the connection has been removed
// from the networkNode already.
UserThread.runAfter(() ->
connectionNodeAddressListener.changed(connection.peersNodeAddressProperty(), null,
connection.peersNodeAddressProperty().get())
, 1);
numConnectedPeers.set(networkNode.getAllConnections().size());
UserThread.runAfter(() -> numConnectedPeers.set(networkNode.getAllConnections().size()), 1);
}
@Override
@ -548,53 +535,35 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
optionalKeyRing.get().getSignatureKeyPair(),
receiversPublicKey);
Timer sendMailboxMessageTimeoutTimer = UserThread.runAfter(() -> {
boolean result = p2PDataStorage.remove(protectedMailboxStorageEntry, networkNode.getNodeAddress());
log.debug("remove result=" + result);
sendMailboxMessageListener.onFault("A timeout occurred when trying to broadcast mailbox data.");
}, 30);
Broadcaster.Listener listener = message -> {
if (message instanceof AddDataMessage &&
((AddDataMessage) message).protectedStorageEntry.equals(protectedMailboxStorageEntry)) {
sendMailboxMessageListener.onStoredInMailbox();
sendMailboxMessageTimeoutTimer.stop();
BroadcastHandler.Listener listener = new BroadcastHandler.Listener() {
@Override
public void onBroadcasted(BroadcastMessage message, int numOfCompletedBroadcasts) {
}
};
broadcaster.addListener(listener);
if (numOfBroadcastsChangeListener != null) {
log.warn("numOfBroadcastsChangeListener should be null");
broadcaster.getNumOfBroadcastsProperty().removeListener(numOfBroadcastsChangeListener);
}
numOfBroadcastsChangeListener = (observable, oldValue, newValue) -> {
// We want to get at least 1 successful broadcast
if ((int) newValue > 0)
broadcaster.removeListener(listener);
if (numOfBroadcastsChangeListener != null) {
broadcaster.getNumOfBroadcastsProperty().removeListener(numOfBroadcastsChangeListener);
numOfBroadcastsChangeListener = null;
}
/* UserThread.execute(() -> {
if (numOfBroadcastsChangeListener != null) {
broadcaster.getNumOfBroadcastsProperty().removeListener(numOfBroadcastsChangeListener);
numOfBroadcastsChangeListener = null;
@Override
public void onBroadcastedToFirstPeer(BroadcastMessage message) {
if (message instanceof AddDataMessage &&
((AddDataMessage) message).protectedStorageEntry.equals(protectedMailboxStorageEntry)) {
sendMailboxMessageListener.onStoredInMailbox();
}
});*/
}
@Override
public void onBroadcastCompleted(BroadcastMessage message, int numOfCompletedBroadcasts, int numOfFailedBroadcasts) {
if (numOfCompletedBroadcasts == 0)
sendMailboxMessageListener.onFault("Broadcast completed without any successful broadcast");
}
@Override
public void onBroadcastFailed(String errorMessage) {
}
};
broadcaster.getNumOfBroadcastsProperty().addListener(numOfBroadcastsChangeListener);
boolean result = p2PDataStorage.add(protectedMailboxStorageEntry, networkNode.getNodeAddress());
boolean result = p2PDataStorage.add(protectedMailboxStorageEntry, networkNode.getNodeAddress(), listener);
if (!result) {
sendMailboxMessageTimeoutTimer.stop();
broadcaster.removeListener(listener);
if (numOfBroadcastsChangeListener != null)
broadcaster.getNumOfBroadcastsProperty().removeListener(numOfBroadcastsChangeListener);
//TODO remove and add again with a delay to ensure the data will be broadcasted
sendMailboxMessageListener.onFault("Data already exists in our local database");
boolean result2 = p2PDataStorage.remove(protectedMailboxStorageEntry, networkNode.getNodeAddress());
log.debug("remove result=" + result2);
boolean removeResult = p2PDataStorage.remove(protectedMailboxStorageEntry, networkNode.getNodeAddress());
log.debug("remove result=" + removeResult);
}
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");

View File

@ -108,6 +108,10 @@ public class Statistic {
return lastActivityTimestamp;
}
public long getLastActivityAge() {
return System.currentTimeMillis() - lastActivityTimestamp;
}
public int getSentBytes() {
return sentBytes.get();
}

View File

@ -0,0 +1,233 @@
package io.bitsquare.p2p.peers;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import io.bitsquare.app.Log;
import io.bitsquare.common.Timer;
import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.CloseConnectionReason;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.ConnectionListener;
import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.storage.messages.BroadcastMessage;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
public class BroadcastHandler implements ConnectionListener, PeerManager.Listener {
private static final Logger log = LoggerFactory.getLogger(BroadcastHandler.class);
private static final long TIMEOUT_SEC = 60;
interface ResultHandler {
void onCompleted(BroadcastHandler broadcastHandler);
void onFault(BroadcastHandler broadcastHandler);
}
public interface Listener {
void onBroadcasted(BroadcastMessage message, int numOfCompletedBroadcasts);
void onBroadcastedToFirstPeer(BroadcastMessage message);
void onBroadcastCompleted(BroadcastMessage message, int numOfCompletedBroadcasts, int numOfFailedBroadcasts);
void onBroadcastFailed(String errorMessage);
}
private final NetworkNode networkNode;
public final String uid;
private PeerManager peerManager;
private boolean stopped = false;
private int numOfCompletedBroadcasts = 0;
private int numOfFailedBroadcasts = 0;
private BroadcastMessage message;
private ResultHandler resultHandler;
@Nullable
private Listener listener;
private int numOfPeers;
private Timer timeoutTimer;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
public BroadcastHandler(NetworkNode networkNode, PeerManager peerManager) {
this.networkNode = networkNode;
this.peerManager = peerManager;
networkNode.removeConnectionListener(this);
peerManager.removeListener(this);
uid = UUID.randomUUID().toString();
}
public void cancel() {
stopped = true;
onFault("Broadcast canceled.");
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void broadcast(BroadcastMessage message, @Nullable NodeAddress sender, ResultHandler resultHandler, @Nullable Listener listener) {
this.message = message;
this.resultHandler = resultHandler;
this.listener = listener;
Log.traceCall("Sender=" + sender + "\n\t" +
"Message=" + StringUtils.abbreviate(message.toString(), 100));
timeoutTimer = UserThread.runAfter(() ->
onFault("Timeout: Broadcast did not complete after " + TIMEOUT_SEC + " sec."), TIMEOUT_SEC);
Set<Connection> receivers = networkNode.getConfirmedConnections();
if (!receivers.isEmpty()) {
numOfPeers = receivers.size();
numOfCompletedBroadcasts = 0;
log.info("Broadcast message to {} peers.", numOfPeers);
receivers.stream()
.filter(connection -> !connection.getPeersNodeAddressOptional().get().equals(sender))
.forEach(connection -> UserThread.runAfterRandomDelay(() ->
sendToPeer(connection, message), 1, 500, TimeUnit.MILLISECONDS));
} else {
String errorMessage = "Message not broadcasted because we have no available peers yet.\n\t" +
"That should never happen as broadcast should not be called in such cases.\n" +
"message = " + StringUtils.abbreviate(message.toString(), 100);
onFault(errorMessage);
}
}
private void sendToPeer(Connection connection, BroadcastMessage message) {
String errorMessage = "Message not broadcasted because we have stopped the handler already.\n\t" +
"message = " + StringUtils.abbreviate(message.toString(), 100);
if (!stopped) {
NodeAddress nodeAddress = connection.getPeersNodeAddressOptional().get();
log.trace("Broadcast message to " + nodeAddress + ".");
SettableFuture<Connection> future = networkNode.sendMessage(connection, message);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
numOfCompletedBroadcasts++;
if (!stopped) {
log.trace("Broadcast to " + nodeAddress + " succeeded.");
if (listener != null)
listener.onBroadcasted(message, numOfCompletedBroadcasts);
if (listener != null && numOfCompletedBroadcasts == 1)
listener.onBroadcastedToFirstPeer(message);
if (numOfCompletedBroadcasts + numOfFailedBroadcasts == numOfPeers) {
if (listener != null)
listener.onBroadcastCompleted(message, numOfCompletedBroadcasts, numOfFailedBroadcasts);
cleanup();
resultHandler.onCompleted(BroadcastHandler.this);
}
} else {
log.warn("stopped at onSuccess: " + errorMessage);
onFault(errorMessage);
}
}
@Override
public void onFailure(@NotNull Throwable throwable) {
numOfFailedBroadcasts++;
if (!stopped) {
log.info("Broadcast to " + nodeAddress + " failed.\n\t" +
"ErrorMessage=" + throwable.getMessage());
} else {
log.warn("stopped at onFailure: " + errorMessage);
onFault(errorMessage);
}
}
});
} else {
log.warn("stopped at sendToPeer: " + errorMessage);
onFault(errorMessage);
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// ConnectionListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onConnection(Connection connection) {
stopped = false;
}
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
}
@Override
public void onError(Throwable throwable) {
}
///////////////////////////////////////////////////////////////////////////////////////////
// PeerManager.Listener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onAllConnectionsLost() {
stopped = true;
}
@Override
public void onNewConnectionAfterAllConnectionsLost() {
stopped = false;
}
@Override
public void onAwakeFromStandby() {
if (!networkNode.getAllConnections().isEmpty())
stopped = false;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof BroadcastHandler)) return false;
BroadcastHandler that = (BroadcastHandler) o;
return !(uid != null ? !uid.equals(that.uid) : that.uid != null);
}
@Override
public int hashCode() {
return uid != null ? uid.hashCode() : 0;
}
private void onFault(String errorMessage) {
log.warn(errorMessage);
if (listener != null)
listener.onBroadcastFailed(errorMessage);
if (listener != null && (numOfCompletedBroadcasts + numOfFailedBroadcasts == numOfPeers || stopped))
listener.onBroadcastCompleted(message, numOfCompletedBroadcasts, numOfFailedBroadcasts);
cleanup();
resultHandler.onFault(this);
}
private void cleanup() {
stopped = true;
networkNode.removeConnectionListener(this);
peerManager.removeListener(this);
if (timeoutTimer != null) {
timeoutTimer.stop();
timeoutTimer = null;
}
}
}

View File

@ -1,19 +1,10 @@
package io.bitsquare.p2p.peers;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import io.bitsquare.app.Log;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.CloseConnectionReason;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.ConnectionListener;
import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.storage.messages.BroadcastMessage;
import javafx.beans.property.IntegerProperty;
import javafx.beans.property.SimpleIntegerProperty;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -21,19 +12,12 @@ import org.slf4j.LoggerFactory;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
public class Broadcaster implements ConnectionListener, PeerManager.Listener {
public class Broadcaster implements BroadcastHandler.ResultHandler {
private static final Logger log = LoggerFactory.getLogger(Broadcaster.class);
public interface Listener {
void onBroadcasted(BroadcastMessage message);
}
private final NetworkNode networkNode;
private PeerManager peerManager;
private final Set<Listener> listeners = new CopyOnWriteArraySet<>();
private boolean stopped = false;
private final IntegerProperty numOfBroadcasts = new SimpleIntegerProperty(0);
private Set<BroadcastHandler> broadcastHandlers = new CopyOnWriteArraySet<>();
///////////////////////////////////////////////////////////////////////////////////////////
@ -43,14 +27,11 @@ public class Broadcaster implements ConnectionListener, PeerManager.Listener {
public Broadcaster(NetworkNode networkNode, PeerManager peerManager) {
this.networkNode = networkNode;
this.peerManager = peerManager;
networkNode.removeConnectionListener(this);
peerManager.removeListener(this);
}
public void shutDown() {
stopped = true;
networkNode.removeConnectionListener(this);
peerManager.removeListener(this);
broadcastHandlers.stream().forEach(BroadcastHandler::cancel);
broadcastHandlers.clear();
}
@ -58,98 +39,27 @@ public class Broadcaster implements ConnectionListener, PeerManager.Listener {
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void broadcast(BroadcastMessage message, @Nullable NodeAddress sender) {
public void broadcast(BroadcastMessage message, @Nullable NodeAddress sender, @Nullable BroadcastHandler.Listener listener) {
Log.traceCall("Sender=" + sender + "\n\t" +
"Message=" + StringUtils.abbreviate(message.toString(), 100));
numOfBroadcasts.set(0);
Set<Connection> receivers = networkNode.getConfirmedConnections();
if (!receivers.isEmpty()) {
log.info("Broadcast message to {} peers.", receivers.size());
receivers.stream()
.filter(connection -> !connection.getPeersNodeAddressOptional().get().equals(sender))
.forEach(connection -> {
NodeAddress nodeAddress = connection.getPeersNodeAddressOptional().get();
log.trace("Broadcast message to " + nodeAddress + ".");
SettableFuture<Connection> future = networkNode.sendMessage(connection, message);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
if (!stopped) {
//log.trace("Broadcast to " + nodeAddress + " succeeded.");
numOfBroadcasts.set(numOfBroadcasts.get() + 1);
listeners.stream().forEach(listener -> listener.onBroadcasted(message));
} else {
log.warn("We have stopped already. We ignore that networkNode.sendMessage.onSuccess call.");
}
}
@Override
public void onFailure(@NotNull Throwable throwable) {
if (!stopped) {
log.info("Broadcast to " + nodeAddress + " failed.\n\t" +
"ErrorMessage=" + throwable.getMessage());
} else {
log.warn("We have stopped already. We ignore that networkNode.sendMessage.onFailure call.");
}
}
});
});
} else {
log.warn("Message not broadcasted because we have no available peers yet.\n\t" +
"That should never happen as broadcast should not be called in such cases.\n" +
"message = {}", StringUtils.abbreviate(message.toString(), 100));
}
}
public IntegerProperty getNumOfBroadcastsProperty() {
return numOfBroadcasts;
}
// That listener gets immediately removed after the handler is called
public void addListener(Listener listener) {
listeners.add(listener);
}
public void removeListener(Listener listener) {
listeners.remove(listener);
BroadcastHandler broadcastHandler = new BroadcastHandler(networkNode, peerManager);
broadcastHandler.broadcast(message, sender, this, listener);
broadcastHandlers.add(broadcastHandler);
}
///////////////////////////////////////////////////////////////////////////////////////////
// ConnectionListener implementation
// BroadcastHandler.ResultHandler implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onConnection(Connection connection) {
stopped = false;
public void onCompleted(BroadcastHandler broadcastHandler) {
broadcastHandlers.remove(broadcastHandler);
}
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
}
@Override
public void onError(Throwable throwable) {
}
///////////////////////////////////////////////////////////////////////////////////////////
// PeerManager.Listener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onAllConnectionsLost() {
stopped = true;
}
@Override
public void onNewConnectionAfterAllConnectionsLost() {
stopped = false;
}
@Override
public void onAwakeFromStandby() {
if (!networkNode.getAllConnections().isEmpty())
stopped = false;
public void onFault(BroadcastHandler broadcastHandler) {
broadcastHandlers.remove(broadcastHandler);
}
}

View File

@ -4,6 +4,8 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import io.bitsquare.app.Log;
import io.bitsquare.common.Timer;
import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.MessageListener;
@ -17,9 +19,11 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.Random;
import java.util.concurrent.TimeUnit;
class KeepAliveHandler implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(KeepAliveHandler.class);
private Timer delayTimer;
///////////////////////////////////////////////////////////////////////////////////////////
@ -65,7 +69,11 @@ class KeepAliveHandler implements MessageListener {
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void sendPing(Connection connection) {
public void sendPingAfterRandomDelay(Connection connection) {
delayTimer = UserThread.runAfterRandomDelay(() -> sendPing(connection), 1, 5000, TimeUnit.MILLISECONDS);
}
private void sendPing(Connection connection) {
Log.traceCall("connection=" + connection + " / this=" + this);
if (!stopped) {
Ping ping = new Ping(nonce);
@ -132,5 +140,10 @@ class KeepAliveHandler implements MessageListener {
stopped = true;
if (connection != null)
connection.removeMessageListener(this);
if (delayTimer != null) {
delayTimer.stop();
delayTimer = null;
}
}
}

View File

@ -22,7 +22,8 @@ import java.util.Random;
public class KeepAliveManager implements MessageListener, ConnectionListener, PeerManager.Listener {
private static final Logger log = LoggerFactory.getLogger(KeepAliveManager.class);
private static final int INTERVAL_SEC = new Random().nextInt(10) + 10;
private static final int INTERVAL_SEC = new Random().nextInt(5) + 20;
private static final long LAST_ACTIVITY_AGE_MILLIS = INTERVAL_SEC / 2;
private final NetworkNode networkNode;
private final PeerManager peerManager;
@ -168,7 +169,8 @@ public class KeepAliveManager implements MessageListener, ConnectionListener, Pe
if (!stopped) {
Log.traceCall();
networkNode.getConfirmedConnections().stream()
.filter(connection -> connection instanceof OutboundConnection)
.filter(connection -> connection instanceof OutboundConnection &&
connection.getStatistic().getLastActivityAge() > LAST_ACTIVITY_AGE_MILLIS)
.forEach(connection -> {
final String uid = connection.getUid();
if (!handlerMap.containsKey(uid)) {
@ -184,7 +186,7 @@ public class KeepAliveManager implements MessageListener, ConnectionListener, Pe
}
});
handlerMap.put(uid, keepAliveHandler);
keepAliveHandler.sendPing(connection);
keepAliveHandler.sendPingAfterRandomDelay(connection);
} else {
log.warn("Connection with id {} has not completed and is still in our map. " +
"We will try to ping that peer at the next schedule.", uid);

View File

@ -53,6 +53,7 @@ class PeerExchangeHandler implements MessageListener {
private Timer timeoutTimer;
private Connection connection;
private boolean stopped;
private Timer delayTimer;
///////////////////////////////////////////////////////////////////////////////////////////
@ -75,7 +76,11 @@ class PeerExchangeHandler implements MessageListener {
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void sendGetPeersRequest(NodeAddress nodeAddress) {
public void sendGetPeersRequestAfterRandomDelay(NodeAddress nodeAddress) {
delayTimer = UserThread.runAfterRandomDelay(() -> sendGetPeersRequest(nodeAddress), 1, 3000, TimeUnit.MILLISECONDS);
}
private void sendGetPeersRequest(NodeAddress nodeAddress) {
Log.traceCall("nodeAddress=" + nodeAddress + " / this=" + this);
if (!stopped) {
if (networkNode.getNodeAddress() != null) {
@ -189,6 +194,11 @@ class PeerExchangeHandler implements MessageListener {
timeoutTimer.stop();
timeoutTimer = null;
}
if (delayTimer != null) {
delayTimer.stop();
delayTimer = null;
}
}
}

View File

@ -227,7 +227,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener,
}
});
handlerMap.put(nodeAddress, peerExchangeHandler);
peerExchangeHandler.sendGetPeersRequest(nodeAddress);
peerExchangeHandler.sendGetPeersRequestAfterRandomDelay(nodeAddress);
} else {
log.trace("We have started already a peerExchangeHandler. " +
"We ignore that call. nodeAddress=" + nodeAddress);

View File

@ -13,6 +13,7 @@ import io.bitsquare.common.wire.Payload;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.*;
import io.bitsquare.p2p.peers.BroadcastHandler;
import io.bitsquare.p2p.peers.Broadcaster;
import io.bitsquare.p2p.storage.messages.*;
import io.bitsquare.p2p.storage.payload.ExpirablePayload;
@ -171,10 +172,18 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
///////////////////////////////////////////////////////////////////////////////////////////
public boolean add(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender) {
return add(protectedStorageEntry, sender, false);
return add(protectedStorageEntry, sender, null, false);
}
public boolean add(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender, boolean forceBroadcast) {
return add(protectedStorageEntry, sender, null, forceBroadcast);
}
public boolean add(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender, @Nullable BroadcastHandler.Listener listener) {
return add(protectedStorageEntry, sender, listener, false);
}
public boolean add(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender, @Nullable BroadcastHandler.Listener listener, boolean forceBroadcast) {
Log.traceCall();
ByteArray hashOfPayload = getHashAsByteArray(protectedStorageEntry.getStoragePayload());
@ -200,7 +209,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
log.info("Data set after doAdd: size=" + map.values().size());
if (!containsKey || forceBroadcast)
broadcast(new AddDataMessage(protectedStorageEntry), sender);
broadcast(new AddDataMessage(protectedStorageEntry), sender, listener);
else
log.trace("Not broadcasting data as we had it already in our map.");
@ -222,41 +231,36 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
if (map.containsKey(hashOfPayload)) {
ProtectedStorageEntry storedData = map.get(hashOfPayload);
if (storedData.getStoragePayload() instanceof StoragePayload) {
if (sequenceNumberMap.containsKey(hashOfPayload) && sequenceNumberMap.get(hashOfPayload).sequenceNr == sequenceNumber) {
log.trace("We got that message with that seq nr already from another peer. We ignore that message.");
return true;
} else {
PublicKey ownerPubKey = ((StoragePayload) storedData.getStoragePayload()).getOwnerPubKey();
boolean result = checkSignature(ownerPubKey, hashOfDataAndSeqNr, signature) &&
isSequenceNrValid(sequenceNumber, hashOfPayload) &&
checkIfStoredDataPubKeyMatchesNewDataPubKey(ownerPubKey, hashOfPayload);
if (result) {
log.info("refreshDate called for storedData:\n\t" + StringUtils.abbreviate(storedData.toString(), 100));
storedData.updateTimeStamp();
storedData.updateSequenceNumber(sequenceNumber);
storedData.updateSignature(signature);
sequenceNumberMap.put(hashOfPayload, new MapValue(sequenceNumber, System.currentTimeMillis()));
storage.queueUpForSave(sequenceNumberMap, 100);
StringBuilder sb = new StringBuilder("\n\n------------------------------------------------------------\n");
sb.append("Data set after refreshTTL (truncated)");
map.values().stream().forEach(e -> sb.append("\n").append(StringUtils.abbreviate(e.toString(), 100)));
sb.append("\n------------------------------------------------------------\n");
log.trace(sb.toString());
log.info("Data set after addProtectedExpirableData: size=" + map.values().size());
broadcast(refreshTTLMessage, sender);
} else {
log.warn("Checks for refreshTTL failed");
}
return result;
}
if (sequenceNumberMap.containsKey(hashOfPayload) && sequenceNumberMap.get(hashOfPayload).sequenceNr == sequenceNumber) {
log.trace("We got that message with that seq nr already from another peer. We ignore that message.");
return true;
} else {
log.error("storedData.expirablePayload NOT instanceof StoragePayload. That must not happen.");
return false;
PublicKey ownerPubKey = ((StoragePayload) storedData.getStoragePayload()).getOwnerPubKey();
boolean result = checkSignature(ownerPubKey, hashOfDataAndSeqNr, signature) &&
isSequenceNrValid(sequenceNumber, hashOfPayload) &&
checkIfStoredDataPubKeyMatchesNewDataPubKey(ownerPubKey, hashOfPayload);
if (result) {
log.info("refreshDate called for storedData:\n\t" + StringUtils.abbreviate(storedData.toString(), 100));
storedData.updateTimeStamp();
storedData.updateSequenceNumber(sequenceNumber);
storedData.updateSignature(signature);
sequenceNumberMap.put(hashOfPayload, new MapValue(sequenceNumber, System.currentTimeMillis()));
storage.queueUpForSave(sequenceNumberMap, 100);
StringBuilder sb = new StringBuilder("\n\n------------------------------------------------------------\n");
sb.append("Data set after refreshTTL (truncated)");
map.values().stream().forEach(e -> sb.append("\n").append(StringUtils.abbreviate(e.toString(), 100)));
sb.append("\n------------------------------------------------------------\n");
log.trace(sb.toString());
log.info("Data set after addProtectedExpirableData: size=" + map.values().size());
broadcast(refreshTTLMessage, sender, null);
} else {
log.warn("Checks for refreshTTL failed");
}
return result;
}
} else {
log.warn("We don't have data for that refresh message in our map.");
@ -280,7 +284,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
if (result) {
doRemoveProtectedExpirableData(protectedStorageEntry, hashOfPayload);
broadcast(new RemoveDataMessage(protectedStorageEntry), sender);
broadcast(new RemoveDataMessage(protectedStorageEntry), sender, null);
sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.sequenceNumber, System.currentTimeMillis()));
storage.queueUpForSave(sequenceNumberMap, 100);
@ -299,14 +303,14 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
boolean result = containsKey
&& checkPublicKeys(protectedMailboxStorageEntry, false)
&& isSequenceNrValid(protectedMailboxStorageEntry.sequenceNumber, hashOfData)
&& protectedMailboxStorageEntry.receiversPubKey.equals(protectedMailboxStorageEntry.receiversPubKey) // at remove both keys are the same (only receiver is able to remove data)
&& protectedMailboxStorageEntry.getMailboxStoragePayload().receiverPubKeyForRemoveOperation.equals(protectedMailboxStorageEntry.receiversPubKey) // at remove both keys are the same (only receiver is able to remove data)
&& checkSignature(protectedMailboxStorageEntry)
&& checkIfStoredMailboxDataMatchesNewMailboxData(protectedMailboxStorageEntry.receiversPubKey, hashOfData);
if (result) {
doRemoveProtectedExpirableData(protectedMailboxStorageEntry, hashOfData);
broadcast(new RemoveMailboxDataMessage(protectedMailboxStorageEntry), sender);
broadcast(new RemoveMailboxDataMessage(protectedMailboxStorageEntry), sender, null);
sequenceNumberMap.put(hashOfData, new MapValue(protectedMailboxStorageEntry.sequenceNumber, System.currentTimeMillis()));
storage.queueUpForSave(sequenceNumberMap, 100);
@ -350,7 +354,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
}
public ProtectedMailboxStorageEntry getMailboxDataWithSignedSeqNr(MailboxStoragePayload expirableMailboxStoragePayload,
KeyPair storageSignaturePubKey, PublicKey receiversPublicKey)
KeyPair storageSignaturePubKey, PublicKey receiversPublicKey)
throws CryptoException {
ByteArray hashOfData = getHashAsByteArray(expirableMailboxStoragePayload);
int sequenceNumber;
@ -468,8 +472,8 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
}
}
private void broadcast(BroadcastMessage message, @Nullable NodeAddress sender) {
broadcaster.broadcast(message, sender);
private void broadcast(BroadcastMessage message, @Nullable NodeAddress sender, @Nullable BroadcastHandler.Listener listener) {
broadcaster.broadcast(message, sender, listener);
}
private ByteArray getHashAsByteArray(ExpirablePayload data) {