P2P network improvements

This commit is contained in:
Manfred Karrer 2016-02-26 00:10:44 +01:00
parent f0d727e345
commit 4bc8c70a83
19 changed files with 214 additions and 153 deletions

View File

@ -28,26 +28,22 @@ public class Version {
// The version nr. for the objects sent over the network. A change will break the serialization of old objects.
// If objects are used for both network and database the network version is applied.
public static final long P2P_NETWORK_VERSION = 2;
public static final int P2P_NETWORK_VERSION = 2;
// The version nr. of the serialized data stored to disc. A change will break the serialization of old objects.
public static final long LOCAL_DB_VERSION = 2;
public static final int LOCAL_DB_VERSION = 2;
// The version nr. of the current protocol. The offer holds that version.
// A taker will check the version of the offers to see if his version is compatible.
public static final long TRADE_PROTOCOL_VERSION = 1;
public static final int TRADE_PROTOCOL_VERSION = 1;
private static int p2pMessageVersion;
public static int getP2PMessageVersion() {
// TODO investigate why a changed NETWORK_PROTOCOL_VERSION for the serialized objects does not trigger
// reliable a disconnect., but java serialisation should be replaced anyway, so using one existing field
// for the version is fine.
// BTC_NETWORK_ID is 0, 1 or 2, we use for changes at NETWORK_PROTOCOL_VERSION a multiplication with 10
// to avoid conflicts:
// E.g. btc BTC_NETWORK_ID=1, NETWORK_PROTOCOL_VERSION=1 -> getNetworkId()=2;
// BTC_NETWORK_ID=0, NETWORK_PROTOCOL_VERSION=2 -> getNetworkId()=2; -> wrong
return BTC_NETWORK_ID + 10 * (int) P2P_NETWORK_VERSION;
return p2pMessageVersion;
}
// The version for the bitcoin network (Mainnet = 0, TestNet = 1, Regtest = 2)
@ -55,6 +51,12 @@ public class Version {
public static void setBtcNetworkId(int btcNetworkId) {
BTC_NETWORK_ID = btcNetworkId;
// BTC_NETWORK_ID is 0, 1 or 2, we use for changes at NETWORK_PROTOCOL_VERSION a multiplication with 10
// to avoid conflicts:
// E.g. btc BTC_NETWORK_ID=1, NETWORK_PROTOCOL_VERSION=1 -> getNetworkId()=2;
// BTC_NETWORK_ID=0, NETWORK_PROTOCOL_VERSION=2 -> getNetworkId()=2; -> wrong
p2pMessageVersion = BTC_NETWORK_ID + 10 * P2P_NETWORK_VERSION;
}
public static int getBtcNetworkId() {

View File

@ -74,8 +74,8 @@ public class UserThread {
return getTimer().runLater(Duration.ofMillis(timeUnit.toMillis(delay)), runnable);
}
public static Timer runPeriodically(Runnable runnable, long interval) {
return UserThread.runPeriodically(runnable, interval, TimeUnit.SECONDS);
public static Timer runPeriodically(Runnable runnable, long intervalInSec) {
return UserThread.runPeriodically(runnable, intervalInSec, TimeUnit.SECONDS);
}
public static Timer runPeriodically(Runnable runnable, long interval, TimeUnit timeUnit) {

View File

@ -97,8 +97,8 @@ public final class PubKeyRing implements Payload {
@Override
public String toString() {
return "PubKeyRing{" +
"signaturePubKey.hashCode()=" + signaturePubKey.hashCode() +
", encryptionPubKey.hashCode()=" + encryptionPubKey.hashCode() +
"signaturePubKey.hashCode()=" + (signaturePubKey != null ? signaturePubKey.hashCode() : "") +
", encryptionPubKey.hashCode()=" + (encryptionPubKey != null ? encryptionPubKey.hashCode() : "") +
'}';
}

View File

@ -4,6 +4,7 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import io.bitsquare.app.Log;
import io.bitsquare.btc.pricefeed.providers.BitcoinAveragePriceProvider;
import io.bitsquare.btc.pricefeed.providers.PoloniexPriceProvider;
import io.bitsquare.btc.pricefeed.providers.PriceProvider;
@ -74,6 +75,7 @@ public class MarketPriceFeed {
this.faultHandler = faultHandler;
requestAllPrices(fiatPriceProvider, () -> {
log.trace("requestAllPrices result");
applyPrice();
UserThread.runPeriodically(() -> requestPrice(fiatPriceProvider), PERIOD_FIAT_SEC);
});
@ -144,7 +146,7 @@ public class MarketPriceFeed {
}
private void requestPrice(PriceProvider provider) {
//Log.traceCall();
Log.traceCall();
GetPriceRequest getPriceRequest = new GetPriceRequest();
SettableFuture<MarketPrice> future = getPriceRequest.requestPrice(currencyCode, provider);
Futures.addCallback(future, new FutureCallback<MarketPrice>() {
@ -163,7 +165,7 @@ public class MarketPriceFeed {
}
private void requestAllPrices(PriceProvider provider, @Nullable Runnable resultHandler) {
// Log.traceCall();
Log.traceCall();
GetPriceRequest getPriceRequest = new GetPriceRequest();
SettableFuture<Map<String, MarketPrice>> future = getPriceRequest.requestAllPrices(provider);
Futures.addCallback(future, new FutureCallback<Map<String, MarketPrice>>() {

View File

@ -74,7 +74,7 @@ public class PoloniexPriceProvider implements PriceProvider {
@Override
public String toString() {
return "BitcoinAveragePriceProvider{" +
return "PoloniexPriceProvider{" +
'}';
}
}

View File

@ -33,6 +33,7 @@ import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.P2PService;
import io.bitsquare.p2p.messaging.DecryptedDirectMessageListener;
import io.bitsquare.p2p.messaging.SendDirectMessageListener;
import io.bitsquare.p2p.peers.BroadcastHandler;
import io.bitsquare.p2p.peers.PeerManager;
import io.bitsquare.storage.Storage;
import io.bitsquare.trade.TradableList;
@ -138,6 +139,8 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
log.info("remove all open offers at shutDown");
// we remove own offers from offerbook when we go offline
// Normally we use a delay for broadcasting to the peers, but at shut down we want to get it fast out
BroadcastHandler.setDelayMs(1);
openOffers.forEach(openOffer -> offerBookService.removeOfferAtShutDown(openOffer.getOffer()));
if (completeHandler != null)

View File

@ -267,6 +267,8 @@ public class MainViewModel implements ViewModel {
// Other disconnects might be caused by peers running an older version
if (connection.getPeerType() == Connection.PeerType.SEED_NODE &&
closeConnectionReason == CloseConnectionReason.RULE_VIOLATION) {
log.warn("onDisconnect closeConnectionReason=" + closeConnectionReason);
log.warn("onDisconnect connection=" + connection);
new Popup()
.warning("You got disconnected from a seed node.\n\n" +
"Reason for getting disconnected: " + connection.getRuleViolation().name() + "\n\n" +

View File

@ -9,7 +9,6 @@ import io.bitsquare.common.UserThread;
import io.bitsquare.common.util.Tuple2;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.Utils;
import io.bitsquare.p2p.messaging.PrefixedSealedAndSignedMessage;
import io.bitsquare.p2p.network.messages.CloseConnectionMessage;
import io.bitsquare.p2p.network.messages.SendersNodeAddressMessage;
@ -31,6 +30,8 @@ import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
@ -77,6 +78,7 @@ public class Connection implements MessageListener {
private final String portInfo;
private final String uid;
private final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
private final ReentrantLock objectOutputStreamLock = new ReentrantLock(true);
// holder of state shared between InputHandler and Connection
private final SharedModel sharedModel;
private final Statistic statistic;
@ -88,11 +90,6 @@ public class Connection implements MessageListener {
// mutable data, set from other threads but not changed internally.
private Optional<NodeAddress> peersNodeAddressOptional = Optional.empty();
private volatile boolean stopped;
//TODO got java.util.zip.DataFormatException: invalid distance too far back
// java.util.zip.DataFormatException: invalid literal/lengths set
// use GZIPInputStream but problems with blocking
private final boolean useCompression = false;
private PeerType peerType;
private final ObjectProperty<NodeAddress> peersNodeAddressProperty = new SimpleObjectProperty<>();
private final List<Tuple2<Long, Serializable>> messageTimeStamps = new ArrayList<>();
@ -134,9 +131,8 @@ public class Connection implements MessageListener {
objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
// We create a thread for handling inputStream data
inputHandler = new InputHandler(sharedModel, objectInputStream, portInfo, this, useCompression);
inputHandler = new InputHandler(sharedModel, objectInputStream, portInfo, this);
singleThreadExecutor.submit(inputHandler);
} catch (IOException e) {
sharedModel.handleConnectionException(e);
@ -180,23 +176,11 @@ public class Connection implements MessageListener {
peersNodeAddress, uid, StringUtils.abbreviate(message.toString(), 100), size);
}
Object objectToWrite;
//noinspection ConstantConditions
if (useCompression) {
byte[] messageAsBytes = ByteArrayUtils.objectToByteArray(message);
// log.trace("Write object uncompressed data size: " + messageAsBytes.length);
@SuppressWarnings("UnnecessaryLocalVariable") byte[] compressed = Utils.compress(message);
//log.trace("Write object compressed data size: " + compressed.length);
objectToWrite = compressed;
} else {
// log.trace("Write object data size: " + ByteArrayUtils.objectToByteArray(message).length);
objectToWrite = message;
}
if (!stopped) {
objectOutputStream.writeObject(objectToWrite);
objectOutputStreamLock.lock();
objectOutputStream.writeObject(message);
objectOutputStream.flush();
statistic.addSentBytes(size);
statistic.addSentMessage(message);
@ -207,6 +191,13 @@ public class Connection implements MessageListener {
} catch (IOException e) {
// an exception lead to a shutdown
sharedModel.handleConnectionException(e);
} catch (Throwable t) {
log.error(t.getMessage());
t.printStackTrace();
sharedModel.handleConnectionException(t);
} finally {
if (objectOutputStreamLock.isLocked())
objectOutputStreamLock.unlock();
}
} else {
log.debug("called sendMessage but was already stopped");
@ -227,8 +218,8 @@ public class Connection implements MessageListener {
}
@SuppressWarnings("unused")
public void reportIllegalRequest(RuleViolation ruleViolation) {
sharedModel.reportInvalidRequest(ruleViolation);
public boolean reportIllegalRequest(RuleViolation ruleViolation) {
return sharedModel.reportInvalidRequest(ruleViolation);
}
private boolean violatesThrottleLimit(Serializable serializable) {
@ -242,8 +233,12 @@ public class Connection implements MessageListener {
violated = now - compareValue < TimeUnit.SECONDS.toMillis(1);
if (violated) {
log.error("violatesThrottleLimit 1 ");
log.error("elapsed " + (now - compareValue));
log.error("now " + now);
log.error("compareValue " + compareValue);
log.error("messageTimeStamps: \n\t" + messageTimeStamps.stream().map(e -> e.second.toString() + "\n\t").toString());
log.error("messageTimeStamps: \n\t" + messageTimeStamps.stream()
.map(e -> "\n\tts=" + e.first.toString() + " message=" + e.second.toString())
.collect(Collectors.toList()).toString());
}
}
@ -342,6 +337,7 @@ public class Connection implements MessageListener {
return statistic;
}
///////////////////////////////////////////////////////////////////////////////////////////
// ShutDown
///////////////////////////////////////////////////////////////////////////////////////////
@ -453,7 +449,6 @@ public class Connection implements MessageListener {
", uid='" + uid + '\'' +
", sharedSpace=" + sharedModel.toString() +
", stopped=" + stopped +
", useCompression=" + useCompression +
'}';
}
@ -483,7 +478,7 @@ public class Connection implements MessageListener {
this.socket = socket;
}
public void reportInvalidRequest(RuleViolation ruleViolation) {
public boolean reportInvalidRequest(RuleViolation ruleViolation) {
log.warn("We got reported an corrupt request " + ruleViolation + "\n\tconnection=" + this);
int numRuleViolations;
if (ruleViolations.contains(ruleViolation))
@ -502,8 +497,9 @@ public class Connection implements MessageListener {
"connection={}", numRuleViolations, ruleViolation, ruleViolations.toString(), connection);
this.ruleViolation = ruleViolation;
shutDown(CloseConnectionReason.RULE_VIOLATION);
return true;
} else {
ruleViolations.put(ruleViolation, ++numRuleViolations);
return false;
}
}
@ -577,12 +573,10 @@ public class Connection implements MessageListener {
private final ObjectInputStream objectInputStream;
private final String portInfo;
private final MessageListener messageListener;
private final boolean useCompression;
private volatile boolean stopped;
public InputHandler(SharedModel sharedModel, ObjectInputStream objectInputStream, String portInfo, MessageListener messageListener, boolean useCompression) {
this.useCompression = useCompression;
public InputHandler(SharedModel sharedModel, ObjectInputStream objectInputStream, String portInfo, MessageListener messageListener) {
this.sharedModel = sharedModel;
this.objectInputStream = objectInputStream;
this.portInfo = portInfo;
@ -633,40 +627,21 @@ public class Connection implements MessageListener {
}
if (size > getMaxMsgSize()) {
reportInvalidRequest(RuleViolation.MAX_MSG_SIZE_EXCEEDED);
return;
if (reportInvalidRequest(RuleViolation.MAX_MSG_SIZE_EXCEEDED))
return;
}
Serializable serializable;
if (useCompression) {
if (rawInputObject instanceof byte[]) {
byte[] compressedObjectAsBytes = (byte[]) rawInputObject;
size = compressedObjectAsBytes.length;
//log.trace("Read object compressed data size: " + size);
serializable = Utils.decompress(compressedObjectAsBytes);
} else {
reportInvalidRequest(RuleViolation.INVALID_DATA_TYPE);
return;
}
if (rawInputObject instanceof Serializable) {
serializable = (Serializable) rawInputObject;
} else {
if (rawInputObject instanceof Serializable) {
serializable = (Serializable) rawInputObject;
} else {
reportInvalidRequest(RuleViolation.INVALID_DATA_TYPE);
return;
}
}
//log.trace("Read object decompressed data size: " + ByteArrayUtils.objectToByteArray(serializable).length);
// compressed size might be bigger theoretically so we check again after decompression
if (size > getMaxMsgSize()) {
reportInvalidRequest(RuleViolation.MAX_MSG_SIZE_EXCEEDED);
reportInvalidRequest(RuleViolation.INVALID_DATA_TYPE);
return;
}
if (sharedModel.connection.violatesThrottleLimit(serializable)) {
reportInvalidRequest(RuleViolation.THROTTLE_LIMIT_EXCEEDED);
return;
if (reportInvalidRequest(RuleViolation.THROTTLE_LIMIT_EXCEEDED))
return;
}
if (!(serializable instanceof Message)) {
@ -680,6 +655,9 @@ public class Connection implements MessageListener {
connection.statistic.addReceivedMessage(message);
if (message.getMessageVersion() != Version.getP2PMessageVersion()) {
log.warn("message.getMessageVersion()=" + message.getMessageVersion());
log.warn("message=" + message);
log.warn("Version.getP2PMessageVersion()=" + Version.getP2PMessageVersion());
reportInvalidRequest(RuleViolation.WRONG_NETWORK_ID);
return;
}
@ -745,9 +723,11 @@ public class Connection implements MessageListener {
}
}
private void reportInvalidRequest(RuleViolation ruleViolation) {
sharedModel.reportInvalidRequest(ruleViolation);
stop();
private boolean reportInvalidRequest(RuleViolation ruleViolation) {
boolean causedShutDown = sharedModel.reportInvalidRequest(ruleViolation);
if (causedShutDown)
stop();
return causedShutDown;
}
@Override

View File

@ -117,22 +117,24 @@ public abstract class NetworkNode implements MessageListener {
new ConnectionListener() {
@Override
public void onConnection(Connection connection) {
outBoundConnections.add((OutboundConnection) connection);
printOutBoundConnections();
connectionListeners.stream().forEach(e -> e.onConnection(connection));
if (!connection.isStopped()) {
outBoundConnections.add((OutboundConnection) connection);
printOutBoundConnections();
connectionListeners.stream().forEach(e -> e.onConnection(connection));
}
}
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
log.trace("onDisconnect connectionListener\n\tconnection={}" + connection);
printOutBoundConnections();
outBoundConnections.remove(connection);
printOutBoundConnections();
connectionListeners.stream().forEach(e -> e.onDisconnect(closeConnectionReason, connection));
}
@Override
public void onError(Throwable throwable) {
log.error("new OutboundConnection.ConnectionListener.onError " + throwable.getMessage());
connectionListeners.stream().forEach(e -> e.onError(throwable));
}
}, peersNodeAddress);
@ -341,9 +343,11 @@ public abstract class NetworkNode implements MessageListener {
new ConnectionListener() {
@Override
public void onConnection(Connection connection) {
inBoundConnections.add((InboundConnection) connection);
printInboundConnections();
connectionListeners.stream().forEach(e -> e.onConnection(connection));
if (!connection.isStopped()) {
inBoundConnections.add((InboundConnection) connection);
printInboundConnections();
connectionListeners.stream().forEach(e -> e.onConnection(connection));
}
}
@Override
@ -356,6 +360,7 @@ public abstract class NetworkNode implements MessageListener {
@Override
public void onError(Throwable throwable) {
log.error("server.ConnectionListener.onError " + throwable.getMessage());
connectionListeners.stream().forEach(e -> e.onError(throwable));
}
});

View File

@ -16,14 +16,25 @@ import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class BroadcastHandler implements PeerManager.Listener {
///////////////////////////////////////////////////////////////////////////////////////////
// Static
///////////////////////////////////////////////////////////////////////////////////////////
private static final Logger log = LoggerFactory.getLogger(BroadcastHandler.class);
private static final long TIMEOUT_PER_PEER_SEC = Timer.STRESS_TEST ? 2 : 20;
private static final long DELAY_MS = Timer.STRESS_TEST ? 10 : 300;
private static final long TIMEOUT_PER_PEER_SEC = Timer.STRESS_TEST ? 5 : 30;
public static void setDelayMs(long delayMs) {
DELAY_MS = delayMs;
}
private static long DELAY_MS = Timer.STRESS_TEST ? 1000 : 2000;
interface ResultHandler {
void onCompleted(BroadcastHandler broadcastHandler);
@ -31,6 +42,11 @@ public class BroadcastHandler implements PeerManager.Listener {
void onFault(BroadcastHandler broadcastHandler);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Listener
///////////////////////////////////////////////////////////////////////////////////////////
public interface Listener {
void onBroadcasted(BroadcastMessage message, int numOfCompletedBroadcasts);
@ -41,6 +57,11 @@ public class BroadcastHandler implements PeerManager.Listener {
void onBroadcastFailed(String errorMessage);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Instance fields
///////////////////////////////////////////////////////////////////////////////////////////
private final NetworkNode networkNode;
public final String uid;
private PeerManager peerManager;
@ -53,6 +74,7 @@ public class BroadcastHandler implements PeerManager.Listener {
private Listener listener;
private int numOfPeers;
private Timer timeoutTimer;
private Set<String> broadcastQueue = new HashSet<>();
///////////////////////////////////////////////////////////////////////////////////////////
@ -76,28 +98,42 @@ public class BroadcastHandler implements PeerManager.Listener {
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void broadcast(BroadcastMessage message, @Nullable NodeAddress sender, ResultHandler resultHandler, @Nullable Listener listener) {
public void broadcast(BroadcastMessage message, @Nullable NodeAddress sender, ResultHandler resultHandler,
@Nullable Listener listener) {
this.message = message;
this.resultHandler = resultHandler;
this.listener = listener;
Log.traceCall("Sender=" + sender + "\n\t" +
"Message=" + StringUtils.abbreviate(message.toString(), 100));
Set<Connection> receivers = networkNode.getConfirmedConnections();
Set<Connection> receivers = networkNode.getConfirmedConnections()
.stream()
.filter(connection -> !connection.getPeersNodeAddressOptional().get().equals(sender))
.collect(Collectors.toSet());
if (!receivers.isEmpty()) {
timeoutTimer = UserThread.runAfter(() ->
onFault("Timeout: Broadcast did not complete after " + TIMEOUT_PER_PEER_SEC + " sec."), TIMEOUT_PER_PEER_SEC * receivers.size());
numOfPeers = receivers.size();
numOfCompletedBroadcasts = 0;
log.info("Broadcast message to {} peers.", numOfPeers);
receivers.stream()
.filter(connection -> !connection.getPeersNodeAddressOptional().get().equals(sender))
.forEach(connection -> UserThread.runAfterRandomDelay(() ->
sendToPeer(connection, message), 1, DELAY_MS, TimeUnit.MILLISECONDS));
long timeoutDelay = TIMEOUT_PER_PEER_SEC * receivers.size();
timeoutTimer = UserThread.runAfter(() -> {
String errorMessage = "Timeout: Broadcast did not complete after " + timeoutDelay + " sec.";
log.warn(errorMessage + "\n\t" +
"numOfPeers=" + numOfPeers + "\n\t" +
"numOfCompletedBroadcasts=" + numOfCompletedBroadcasts + "\n\t" +
"numOfCompletedBroadcasts=" + numOfCompletedBroadcasts + "\n\t" +
"numOfFailedBroadcasts=" + numOfFailedBroadcasts + "\n\t" +
"broadcastQueue.size()=" + broadcastQueue.size() + "\n\t" +
"broadcastQueue=" + broadcastQueue);
onFault(errorMessage);
}, timeoutDelay);
receivers.stream().forEach(connection -> UserThread.runAfterRandomDelay(() ->
sendToPeer(connection, message), DELAY_MS, DELAY_MS * 2, TimeUnit.MILLISECONDS));
} else {
onFault("Message not broadcasted because we have no available peers yet.\n\t" +
"That should never happen as broadcast should not be called in such cases.\n" +
"message = " + StringUtils.abbreviate(message.toString(), 100));
"message = " + StringUtils.abbreviate(message.toString(), 100), false);
}
}
@ -107,11 +143,13 @@ public class BroadcastHandler implements PeerManager.Listener {
if (!stopped) {
NodeAddress nodeAddress = connection.getPeersNodeAddressOptional().get();
log.trace("Broadcast message to " + nodeAddress + ".");
broadcastQueue.add(nodeAddress.getFullAddress());
SettableFuture<Connection> future = networkNode.sendMessage(connection, message);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
numOfCompletedBroadcasts++;
broadcastQueue.remove(nodeAddress.getFullAddress());
if (!stopped) {
log.trace("Broadcast to " + nodeAddress + " succeeded.");
@ -136,6 +174,7 @@ public class BroadcastHandler implements PeerManager.Listener {
@Override
public void onFailure(@NotNull Throwable throwable) {
numOfFailedBroadcasts++;
broadcastQueue.remove(nodeAddress.getFullAddress());
if (!stopped) {
log.info("Broadcast to " + nodeAddress + " failed.\n\t" +
"ErrorMessage=" + throwable.getMessage());
@ -184,7 +223,15 @@ public class BroadcastHandler implements PeerManager.Listener {
}
private void onFault(String errorMessage) {
log.warn(errorMessage);
onFault(errorMessage, true);
}
private void onFault(String errorMessage, boolean logWarning) {
if (logWarning)
log.warn(errorMessage);
else
log.trace(errorMessage);
if (listener != null)
listener.onBroadcastFailed(errorMessage);

View File

@ -14,6 +14,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.File;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -45,7 +46,7 @@ public class PeerManager implements ConnectionListener {
}
static {
setMaxConnections(6);
setMaxConnections(12);
}
private static final int MAX_REPORTED_PEERS = 1000;
@ -80,7 +81,7 @@ public class PeerManager implements ConnectionListener {
private final Set<ReportedPeer> reportedPeers = new HashSet<>();
private Timer checkMaxConnectionsTimer;
private final Clock.Listener listener;
private final List<Listener> listeners = new LinkedList<>();
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
private boolean stopped;
@ -187,9 +188,8 @@ public class PeerManager implements ConnectionListener {
///////////////////////////////////////////////////////////////////////////////////////////
private void doHouseKeeping() {
log.trace("Peers before doHouseKeeping");
printConnectedPeers();
if (checkMaxConnectionsTimer == null)
if (checkMaxConnectionsTimer == null) {
printConnectedPeers();
checkMaxConnectionsTimer = UserThread.runAfter(() -> {
stopCheckMaxConnectionsTimer();
if (!stopped) {
@ -203,8 +203,7 @@ public class PeerManager implements ConnectionListener {
}
}, CHECK_MAX_CONN_DELAY_SEC);
log.trace("Peers after doHouseKeeping");
printConnectedPeers();
}
}
private boolean checkMaxConnections(int limit) {
@ -256,7 +255,8 @@ public class PeerManager implements ConnectionListener {
log.info("Candidates.size() for shut down=" + candidates.size());
Connection connection = candidates.remove(0);
log.info("We are going to shut down the oldest connection.\n\tconnection=" + connection.toString());
connection.shutDown(CloseConnectionReason.TOO_MANY_CONNECTIONS_OPEN, () -> checkMaxConnections(limit));
if (!connection.isStopped())
connection.shutDown(CloseConnectionReason.TOO_MANY_CONNECTIONS_OPEN, () -> checkMaxConnections(limit));
return true;
} else {
log.warn("No candidates found to remove (That case should not be possible as we use in the " +
@ -276,7 +276,7 @@ public class PeerManager implements ConnectionListener {
.filter(connection -> !connection.hasPeersNodeAddress())
.forEach(connection -> UserThread.runAfter(() -> {
// We give 30 seconds delay and check again if still no address is set
if (!connection.hasPeersNodeAddress()) {
if (!connection.hasPeersNodeAddress() && !connection.isStopped()) {
log.info("We close the connection as the peer address is still unknown.\n\t" +
"connection=" + connection);
connection.shutDown(CloseConnectionReason.UNKNOWN_PEER_ADDRESS);
@ -315,6 +315,7 @@ public class PeerManager implements ConnectionListener {
return contained;
}
@Nullable
private ReportedPeer removeReportedPeer(NodeAddress nodeAddress) {
Optional<ReportedPeer> reportedPeerOptional = reportedPeers.stream()
.filter(e -> e.nodeAddress.equals(nodeAddress)).findAny();
@ -421,11 +422,15 @@ public class PeerManager implements ConnectionListener {
}
private boolean removePersistedPeer(NodeAddress nodeAddress) {
Optional<ReportedPeer> persistedPeerOptional = persistedPeers.stream()
.filter(e -> e.nodeAddress.equals(nodeAddress)).findAny();
Optional<ReportedPeer> persistedPeerOptional = getPersistedPeerOptional(nodeAddress);
return persistedPeerOptional.isPresent() && removePersistedPeer(persistedPeerOptional.get());
}
private Optional<ReportedPeer> getPersistedPeerOptional(NodeAddress nodeAddress) {
return persistedPeers.stream()
.filter(e -> e.nodeAddress.equals(nodeAddress)).findAny();
}
private void removeTooOldPersistedPeers() {
Log.traceCall();
Set<ReportedPeer> persistedPeersToRemove = persistedPeers.stream()
@ -505,8 +510,17 @@ public class PeerManager implements ConnectionListener {
public void handleConnectionFault(NodeAddress nodeAddress, @Nullable Connection connection) {
Log.traceCall("nodeAddress=" + nodeAddress);
boolean doRemovePersistedPeer = false;
ReportedPeer reportedPeer = removeReportedPeer(nodeAddress);
if (connection != null && connection.getRuleViolation() != null)
Optional<ReportedPeer> persistedPeerOptional = getPersistedPeerOptional(nodeAddress);
if (persistedPeerOptional.isPresent()) {
ReportedPeer persistedPeer = persistedPeerOptional.get();
persistedPeer.increaseFailedConnectionAttempts();
doRemovePersistedPeer = persistedPeer.tooManyFailedConnectionAttempts();
}
doRemovePersistedPeer = doRemovePersistedPeer || (connection != null && connection.getRuleViolation() != null);
if (doRemovePersistedPeer)
removePersistedPeer(nodeAddress);
else
removeTooOldPersistedPeers();

View File

@ -32,7 +32,7 @@ public class RequestDataHandler implements MessageListener {
private static final long TIME_OUT_SEC = Timer.STRESS_TEST ? 5 : 20;
///////////////////////////////////////////////////////////////////////////////////////////
// Listener
///////////////////////////////////////////////////////////////////////////////////////////
@ -120,19 +120,20 @@ public class RequestDataHandler implements MessageListener {
}
});
checkArgument(timeoutTimer == null, "requestData must not be called twice.");
timeoutTimer = UserThread.runAfter(() -> {
if (!stopped) {
String errorMessage = "A timeout occurred at sending getDataRequest:" + getDataRequest +
" on nodeAddress:" + nodeAddress;
log.info(errorMessage + " / RequestDataHandler=" + RequestDataHandler.this);
handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT);
} else {
log.trace("We have stopped already. We ignore that timeoutTimer.run call. " +
"Might be caused by an previous networkNode.sendMessage.onFailure.");
}
},
TIME_OUT_SEC);
if (timeoutTimer == null) {
timeoutTimer = UserThread.runAfter(() -> {
if (!stopped) {
String errorMessage = "A timeout occurred at sending getDataRequest:" + getDataRequest +
" on nodeAddress:" + nodeAddress;
log.info(errorMessage + " / RequestDataHandler=" + RequestDataHandler.this);
handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT);
} else {
log.trace("We have stopped already. We ignore that timeoutTimer.run call. " +
"Might be caused by an previous networkNode.sendMessage.onFailure.");
}
},
TIME_OUT_SEC);
}
} else {
log.warn("We have stopped already. We ignore that requestData call.");
}

View File

@ -242,6 +242,7 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
log.trace("requestDataHandshake with outbound connection failed.\n\tnodeAddress={}\n\t" +
"ErrorMessage={}", nodeAddress, errorMessage);
peerManager.handleConnectionFault(nodeAddress);
handlerMap.remove(nodeAddress);
if (!remainingNodeAddresses.isEmpty()) {

View File

@ -66,8 +66,6 @@ public class KeepAliveManager implements MessageListener, ConnectionListener, Pe
stopped = false;
keepAlive();
}, INTERVAL_SEC);
else
log.warn("keepAliveTimer already running");
}
@ -97,7 +95,6 @@ public class KeepAliveManager implements MessageListener, ConnectionListener, Pe
"Exception: " + throwable.getMessage();
log.info(errorMessage);
peerManager.handleConnectionFault(connection);
// peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE);
} else {
log.warn("We have stopped already. We ignore that networkNode.sendMessage.onFailure call.");
}

View File

@ -4,17 +4,15 @@ import io.bitsquare.app.Version;
import io.bitsquare.p2p.Message;
public abstract class KeepAliveMessage implements Message {
private final int messageVersion = Version.getP2PMessageVersion();
@Override
public int getMessageVersion() {
return messageVersion;
return Version.getP2PMessageVersion();
}
@Override
public String toString() {
return "KeepAliveMessage{" +
"messageVersion=" + messageVersion +
"messageVersion=" + getMessageVersion() +
'}';
}
}

View File

@ -23,8 +23,6 @@ import javax.annotation.Nullable;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
class PeerExchangeHandler implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(PeerExchangeHandler.class);
@ -118,21 +116,22 @@ class PeerExchangeHandler implements MessageListener {
}
});
checkArgument(timeoutTimer == null, "requestReportedPeers must not be called twice.");
timeoutTimer = UserThread.runAfter(() -> {
if (!stopped) {
String errorMessage = "A timeout occurred at sending getPeersRequest:" + getPeersRequest + " for nodeAddress:" + nodeAddress;
log.info(errorMessage + " / PeerExchangeHandler=" +
PeerExchangeHandler.this);
log.info("timeoutTimer called on " + this);
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, nodeAddress);
} else {
log.trace("We have stopped that handler already. We ignore that timeoutTimer.run call.");
}
},
TIME_OUT_SEC, TimeUnit.SECONDS);
if (timeoutTimer == null) {
timeoutTimer = UserThread.runAfter(() -> {
if (!stopped) {
String errorMessage = "A timeout occurred at sending getPeersRequest:" + getPeersRequest + " for nodeAddress:" + nodeAddress;
log.info(errorMessage + " / PeerExchangeHandler=" +
PeerExchangeHandler.this);
log.info("timeoutTimer called on " + this);
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, nodeAddress);
} else {
log.trace("We have stopped that handler already. We ignore that timeoutTimer.run call.");
}
},
TIME_OUT_SEC, TimeUnit.SECONDS);
}
} else {
log.warn("My node address is still null at sendGetPeersRequest. We ignore that call.");
log.debug("My node address is still null at sendGetPeersRequest. We ignore that call.");
}
} else {
log.trace("We have stopped that handler already. We ignore that sendGetPeersRequest call.");

View File

@ -196,6 +196,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener,
log.trace("PeerExchangeHandshake of outbound connection failed.\n\terrorMessage={}\n\t" +
"nodeAddress={}", errorMessage, nodeAddress);
peerManager.handleConnectionFault(nodeAddress);
handlerMap.remove(nodeAddress);
if (!remainingNodeAddresses.isEmpty()) {
if (!peerManager.hasSufficientConnections()) {
@ -295,8 +296,6 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener,
if (periodicTimer == null)
periodicTimer = UserThread.runPeriodically(this::requestWithAvailablePeers,
REQUEST_PERIODICALLY_INTERVAL_SEC, TimeUnit.SECONDS);
else
log.warn("periodicTimer already started");
}
private void restart() {

View File

@ -10,15 +10,25 @@ import java.util.Date;
public final class ReportedPeer implements Payload, Persistable {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.P2P_NETWORK_VERSION;
private static final int MAX_FAILED_CONNECTION_ATTEMPTS = 5;
public final NodeAddress nodeAddress;
public final Date date;
transient private int failedConnectionAttempts = 0;
public ReportedPeer(NodeAddress nodeAddress) {
this.nodeAddress = nodeAddress;
this.date = new Date();
}
public void increaseFailedConnectionAttempts() {
this.failedConnectionAttempts++;
}
public boolean tooManyFailedConnectionAttempts() {
return failedConnectionAttempts >= MAX_FAILED_CONNECTION_ATTEMPTS;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;

View File

@ -89,7 +89,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
ByteArray hashOfPayload = entry.getKey();
ProtectedStorageEntry protectedStorageEntry = map.get(hashOfPayload);
toRemoveSet.add(protectedStorageEntry);
log.warn("We found an expired data entry. We remove the protectedData:\n\t" + protectedStorageEntry);
log.info("We found an expired data entry. We remove the protectedData:\n\t" + protectedStorageEntry);
map.remove(hashOfPayload);
});
@ -254,7 +254,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
map.values().stream().forEach(e -> sb.append("\n").append(StringUtils.abbreviate(e.toString(), 100)));
sb.append("\n------------------------------------------------------------\n");
log.trace(sb.toString());
log.info("Data set after addProtectedExpirableData: size=" + map.values().size());
log.info("Data set after refreshTTL: size=" + map.values().size());
broadcast(refreshTTLMessage, sender, null);
} else {
@ -388,15 +388,16 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
map.values().stream().forEach(e -> sb.append("\n").append(StringUtils.abbreviate(e.toString(), 100)));
sb.append("\n------------------------------------------------------------\n");
log.trace(sb.toString());
log.info("Data set after addProtectedExpirableData: size=" + map.values().size());
log.info("Data set after doRemoveProtectedExpirableData: size=" + map.values().size());
}
private boolean isSequenceNrValid(int newSequenceNumber, ByteArray hashOfData) {
if (sequenceNumberMap.containsKey(hashOfData)) {
Integer storedSequenceNumber = sequenceNumberMap.get(hashOfData).sequenceNr;
if (newSequenceNumber < storedSequenceNumber) {
log.warn("Sequence number is invalid. sequenceNumber = "
+ newSequenceNumber + " / storedSequenceNumber=" + storedSequenceNumber);
log.info("Sequence number is invalid. sequenceNumber = "
+ newSequenceNumber + " / storedSequenceNumber=" + storedSequenceNumber + "\n" +
"That can happen if the data owner gets an old delayed data storage message.");
return false;
} else {
return true;