Rename ReportedPeer to Peer

This commit is contained in:
Manfred Karrer 2016-02-26 18:24:00 +01:00
parent 586d9cdcfd
commit 6daf959f78
12 changed files with 157 additions and 113 deletions

View file

@ -48,7 +48,7 @@ public class Log {
rollingPolicy.start(); rollingPolicy.start();
triggeringPolicy = new SizeBasedTriggeringPolicy(); triggeringPolicy = new SizeBasedTriggeringPolicy();
triggeringPolicy.setMaxFileSize("1MB"); triggeringPolicy.setMaxFileSize("10MB");
triggeringPolicy.start(); triggeringPolicy.start();
PatternLayoutEncoder encoder = new PatternLayoutEncoder(); PatternLayoutEncoder encoder = new PatternLayoutEncoder();
@ -63,7 +63,7 @@ public class Log {
logbackLogger = loggerContext.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME); logbackLogger = loggerContext.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME);
//TODO for now use always trace //TODO for now use always trace
logbackLogger.setLevel(useDetailedLogging ? Level.INFO : Level.INFO); logbackLogger.setLevel(useDetailedLogging ? Level.TRACE : Level.INFO);
// logbackLogger.setLevel(useDetailedLogging ? Level.TRACE : Level.DEBUG); // logbackLogger.setLevel(useDetailedLogging ? Level.TRACE : Level.DEBUG);
logbackLogger.addAppender(appender); logbackLogger.addAppender(appender);
} }

View file

@ -4,6 +4,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.time.Duration; import java.time.Duration;
import java.util.UUID;
/** /**
* We simulate a global frame rate timer similar to FXTimer to avoid creation of threads for each timer call. * We simulate a global frame rate timer similar to FXTimer to avoid creation of threads for each timer call.
@ -16,12 +17,15 @@ public class FrameRateTimer implements Timer, Runnable {
private Runnable runnable; private Runnable runnable;
private long startTs; private long startTs;
private boolean isPeriodically; private boolean isPeriodically;
private String uid = UUID.randomUUID().toString();
private volatile boolean stopped;
public FrameRateTimer() { public FrameRateTimer() {
} }
@Override @Override
public void run() { public void run() {
if (!stopped) {
try { try {
long currentTimeMillis = System.currentTimeMillis(); long currentTimeMillis = System.currentTimeMillis();
if ((currentTimeMillis - startTs) >= interval) { if ((currentTimeMillis - startTs) >= interval) {
@ -38,6 +42,7 @@ public class FrameRateTimer implements Timer, Runnable {
throw t; throw t;
} }
} }
}
@Override @Override
public Timer runLater(Duration delay, Runnable runnable) { public Timer runLater(Duration delay, Runnable runnable) {
@ -60,6 +65,24 @@ public class FrameRateTimer implements Timer, Runnable {
@Override @Override
public void stop() { public void stop() {
stopped = true;
MasterTimer.removeListener(this); MasterTimer.removeListener(this);
} }
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof FrameRateTimer)) return false;
FrameRateTimer that = (FrameRateTimer) o;
return !(uid != null ? !uid.equals(that.uid) : that.uid != null);
}
@Override
public int hashCode() {
return uid != null ? uid.hashCode() : 0;
}
} }

View file

@ -10,15 +10,16 @@ import java.util.concurrent.CopyOnWriteArraySet;
public class MasterTimer { public class MasterTimer {
private final static Logger log = LoggerFactory.getLogger(MasterTimer.class); private final static Logger log = LoggerFactory.getLogger(MasterTimer.class);
private static final java.util.Timer timer = new java.util.Timer(); private static final java.util.Timer timer = new java.util.Timer();
public static long FRAME_INTERVAL_MS = 16; // frame rate of 60 fps is about 16 ms but we don't need such a short interval, 100 ms should be good enough
public static final long FRAME_INTERVAL_MS = 100;
static { static {
timer.scheduleAtFixedRate(new TimerTask() { timer.scheduleAtFixedRate(new TimerTask() {
@Override @Override
public void run() { public void run() {
listeners.stream().forEach(UserThread::execute); UserThread.execute(() -> listeners.stream().forEach(Runnable::run));
} }
}, FRAME_INTERVAL_MS, FRAME_INTERVAL_MS); // frame rate of 60 fps is about 16 ms }, FRAME_INTERVAL_MS, FRAME_INTERVAL_MS);
} }
private static Set<Runnable> listeners = new CopyOnWriteArraySet<>(); private static Set<Runnable> listeners = new CopyOnWriteArraySet<>();
@ -30,6 +31,4 @@ public class MasterTimer {
public static void removeListener(Runnable runnable) { public static void removeListener(Runnable runnable) {
listeners.remove(runnable); listeners.remove(runnable);
} }
} }

View file

@ -35,7 +35,7 @@ public class P2pNetworkListItem {
private final Statistic statistic; private final Statistic statistic;
private final Connection connection; private final Connection connection;
private final Subscription sentBytesSubscription, receivedBytesSubscription, onionAddressSubscription; private final Subscription sentBytesSubscription, receivedBytesSubscription, onionAddressSubscription, roundTripTimeSubscription;
private final Clock clock; private final Clock clock;
private final BSFormatter formatter; private final BSFormatter formatter;
@ -44,6 +44,9 @@ public class P2pNetworkListItem {
private final StringProperty receivedBytes = new SimpleStringProperty(); private final StringProperty receivedBytes = new SimpleStringProperty();
private final StringProperty peerType = new SimpleStringProperty(); private final StringProperty peerType = new SimpleStringProperty();
private final StringProperty connectionType = new SimpleStringProperty(); private final StringProperty connectionType = new SimpleStringProperty();
private final StringProperty roundTripTime = new SimpleStringProperty();
private final StringProperty onionAddress = new SimpleStringProperty(); private final StringProperty onionAddress = new SimpleStringProperty();
private final Clock.Listener listener; private final Clock.Listener listener;
@ -59,6 +62,8 @@ public class P2pNetworkListItem {
e -> receivedBytes.set(formatter.formatBytes((int) e))); e -> receivedBytes.set(formatter.formatBytes((int) e)));
onionAddressSubscription = EasyBind.subscribe(connection.peersNodeAddressProperty(), onionAddressSubscription = EasyBind.subscribe(connection.peersNodeAddressProperty(),
nodeAddress -> onionAddress.set(nodeAddress != null ? nodeAddress.getFullAddress() : "Not known yet")); nodeAddress -> onionAddress.set(nodeAddress != null ? nodeAddress.getFullAddress() : "Not known yet"));
roundTripTimeSubscription = EasyBind.subscribe(statistic.roundTripTimeProperty(),
roundTripTime -> this.roundTripTime.set(DurationFormatUtils.formatDuration((long) roundTripTime, "ss.SSS")));
listener = new Clock.Listener() { listener = new Clock.Listener() {
@Override @Override
@ -90,6 +95,7 @@ public class P2pNetworkListItem {
sentBytesSubscription.unsubscribe(); sentBytesSubscription.unsubscribe();
receivedBytesSubscription.unsubscribe(); receivedBytesSubscription.unsubscribe();
onionAddressSubscription.unsubscribe(); onionAddressSubscription.unsubscribe();
roundTripTimeSubscription.unsubscribe();
clock.removeListener(listener); clock.removeListener(listener);
} }
@ -158,4 +164,11 @@ public class P2pNetworkListItem {
return receivedBytes; return receivedBytes;
} }
public String getRoundTripTime() {
return roundTripTime.get();
}
public StringProperty roundTripTimeProperty() {
return roundTripTime;
}
} }

View file

@ -17,6 +17,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.*; import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -67,7 +68,7 @@ public class BroadcastHandler implements PeerManager.Listener {
private Listener listener; private Listener listener;
private int numOfPeers; private int numOfPeers;
private Timer timeoutTimer; private Timer timeoutTimer;
private Set<String> broadcastQueue = new HashSet<>(); private Set<String> broadcastQueue = new CopyOnWriteArraySet<>();
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -99,30 +100,32 @@ public class BroadcastHandler implements PeerManager.Listener {
Log.traceCall("Sender=" + sender + "\n\t" + Log.traceCall("Sender=" + sender + "\n\t" +
"Message=" + StringUtils.abbreviate(message.toString(), 100)); "Message=" + StringUtils.abbreviate(message.toString(), 100));
Set<Connection> receivers = networkNode.getConfirmedConnections() Set<Connection> connectedPeers = networkNode.getConfirmedConnections()
.stream() .stream()
.filter(connection -> !connection.getPeersNodeAddressOptional().get().equals(sender)) .filter(connection -> !connection.getPeersNodeAddressOptional().get().equals(sender))
.collect(Collectors.toSet()); .collect(Collectors.toSet());
if (!receivers.isEmpty()) { if (!connectedPeers.isEmpty()) {
numOfCompletedBroadcasts = 0; numOfCompletedBroadcasts = 0;
if (isDataOwner) { if (isDataOwner) {
// the data owner sends to all and immediately // the data owner sends to all and immediately
receivers.stream().forEach(connection -> sendToPeer(connection, message)); connectedPeers.stream().forEach(connection -> sendToPeer(connection, message));
numOfPeers = receivers.size(); numOfPeers = connectedPeers.size();
log.info("Broadcast message to {} peers.", numOfPeers); log.info("Broadcast message to {} peers.", numOfPeers);
} else { } else {
// for relay nodes we limit to 2 recipients and use a delay // for relay nodes we limit to 2 recipients and use a delay
List<Connection> list = new ArrayList<>(receivers); List<Connection> list = new ArrayList<>(connectedPeers);
Collections.shuffle(list); Collections.shuffle(list);
list = list.subList(0, Math.min(2, list.size())); int size = list.size();
if (size > 1)
list = list.subList(0, size / 2);
numOfPeers = list.size(); numOfPeers = list.size();
log.info("Broadcast message to {} peers.", numOfPeers); log.info("Broadcast message to {} peers.", numOfPeers);
list.stream().forEach(connection -> UserThread.runAfterRandomDelay(() -> list.stream().forEach(connection -> UserThread.runAfterRandomDelay(() ->
sendToPeer(connection, message), DELAY_MS, DELAY_MS * 2, TimeUnit.MILLISECONDS)); sendToPeer(connection, message), DELAY_MS, DELAY_MS * 2, TimeUnit.MILLISECONDS));
} }
long timeoutDelay = TIMEOUT_PER_PEER_SEC * receivers.size(); long timeoutDelay = TIMEOUT_PER_PEER_SEC * numOfPeers;
timeoutTimer = UserThread.runAfter(() -> { timeoutTimer = UserThread.runAfter(() -> {
String errorMessage = "Timeout: Broadcast did not complete after " + timeoutDelay + " sec."; String errorMessage = "Timeout: Broadcast did not complete after " + timeoutDelay + " sec.";
@ -145,6 +148,7 @@ public class BroadcastHandler implements PeerManager.Listener {
String errorMessage = "Message not broadcasted because we have stopped the handler already.\n\t" + String errorMessage = "Message not broadcasted because we have stopped the handler already.\n\t" +
"message = " + StringUtils.abbreviate(message.toString(), 100); "message = " + StringUtils.abbreviate(message.toString(), 100);
if (!stopped) { if (!stopped) {
if (!connection.isStopped()) {
NodeAddress nodeAddress = connection.getPeersNodeAddressOptional().get(); NodeAddress nodeAddress = connection.getPeersNodeAddressOptional().get();
log.trace("Broadcast message to " + nodeAddress + "."); log.trace("Broadcast message to " + nodeAddress + ".");
broadcastQueue.add(nodeAddress.getFullAddress()); broadcastQueue.add(nodeAddress.getFullAddress());
@ -189,6 +193,9 @@ public class BroadcastHandler implements PeerManager.Listener {
} }
} }
}); });
} else {
onFault("Connection stopped already");
}
} else { } else {
onFault("stopped at sendToPeer: " + errorMessage); onFault("stopped at sendToPeer: " + errorMessage);
} }
@ -231,6 +238,8 @@ public class BroadcastHandler implements PeerManager.Listener {
} }
private void onFault(String errorMessage, boolean logWarning) { private void onFault(String errorMessage, boolean logWarning) {
cleanup();
if (logWarning) if (logWarning)
log.warn(errorMessage); log.warn(errorMessage);
else else
@ -242,7 +251,6 @@ public class BroadcastHandler implements PeerManager.Listener {
if (listener != null && (numOfCompletedBroadcasts + numOfFailedBroadcasts == numOfPeers || stopped)) if (listener != null && (numOfCompletedBroadcasts + numOfFailedBroadcasts == numOfPeers || stopped))
listener.onBroadcastCompleted(message, numOfCompletedBroadcasts, numOfFailedBroadcasts); listener.onBroadcastCompleted(message, numOfCompletedBroadcasts, numOfFailedBroadcasts);
cleanup();
resultHandler.onFault(this); resultHandler.onFault(this);
} }

View file

@ -6,7 +6,7 @@ import io.bitsquare.common.Timer;
import io.bitsquare.common.UserThread; import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.NodeAddress; import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.*; import io.bitsquare.p2p.network.*;
import io.bitsquare.p2p.peers.peerexchange.ReportedPeer; import io.bitsquare.p2p.peers.peerexchange.Peer;
import io.bitsquare.storage.Storage; import io.bitsquare.storage.Storage;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -76,10 +76,10 @@ public class PeerManager implements ConnectionListener {
private final NetworkNode networkNode; private final NetworkNode networkNode;
private Clock clock; private Clock clock;
private final Set<NodeAddress> seedNodeAddresses; private final Set<NodeAddress> seedNodeAddresses;
private final Storage<HashSet<ReportedPeer>> dbStorage; private final Storage<HashSet<Peer>> dbStorage;
private final HashSet<ReportedPeer> persistedPeers = new HashSet<>(); private final HashSet<Peer> persistedPeers = new HashSet<>();
private final Set<ReportedPeer> reportedPeers = new HashSet<>(); private final Set<Peer> reportedPeers = new HashSet<>();
private Timer checkMaxConnectionsTimer; private Timer checkMaxConnectionsTimer;
private final Clock.Listener listener; private final Clock.Listener listener;
private final List<Listener> listeners = new CopyOnWriteArrayList<>(); private final List<Listener> listeners = new CopyOnWriteArrayList<>();
@ -96,7 +96,7 @@ public class PeerManager implements ConnectionListener {
this.seedNodeAddresses = new HashSet<>(seedNodeAddresses); this.seedNodeAddresses = new HashSet<>(seedNodeAddresses);
networkNode.addConnectionListener(this); networkNode.addConnectionListener(this);
dbStorage = new Storage<>(storageDir); dbStorage = new Storage<>(storageDir);
HashSet<ReportedPeer> persistedPeers = dbStorage.initAndGetPersisted("PersistedPeers"); HashSet<Peer> persistedPeers = dbStorage.initAndGetPersisted("PersistedPeers");
if (persistedPeers != null) { if (persistedPeers != null) {
log.info("We have persisted reported peers. persistedPeers.size()=" + persistedPeers.size()); log.info("We have persisted reported peers. persistedPeers.size()=" + persistedPeers.size());
this.persistedPeers.addAll(persistedPeers); this.persistedPeers.addAll(persistedPeers);
@ -310,18 +310,18 @@ public class PeerManager implements ConnectionListener {
// Reported peers // Reported peers
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
private boolean removeReportedPeer(ReportedPeer reportedPeer) { private boolean removeReportedPeer(Peer reportedPeer) {
boolean contained = reportedPeers.remove(reportedPeer); boolean contained = reportedPeers.remove(reportedPeer);
printReportedPeers(); printReportedPeers();
return contained; return contained;
} }
@Nullable @Nullable
private ReportedPeer removeReportedPeer(NodeAddress nodeAddress) { private Peer removeReportedPeer(NodeAddress nodeAddress) {
Optional<ReportedPeer> reportedPeerOptional = reportedPeers.stream() Optional<Peer> reportedPeerOptional = reportedPeers.stream()
.filter(e -> e.nodeAddress.equals(nodeAddress)).findAny(); .filter(e -> e.nodeAddress.equals(nodeAddress)).findAny();
if (reportedPeerOptional.isPresent()) { if (reportedPeerOptional.isPresent()) {
ReportedPeer reportedPeer = reportedPeerOptional.get(); Peer reportedPeer = reportedPeerOptional.get();
removeReportedPeer(reportedPeer); removeReportedPeer(reportedPeer);
return reportedPeer; return reportedPeer;
} else { } else {
@ -331,17 +331,17 @@ public class PeerManager implements ConnectionListener {
private void removeTooOldReportedPeers() { private void removeTooOldReportedPeers() {
Log.traceCall(); Log.traceCall();
Set<ReportedPeer> reportedPeersToRemove = reportedPeers.stream() Set<Peer> reportedPeersToRemove = reportedPeers.stream()
.filter(reportedPeer -> new Date().getTime() - reportedPeer.date.getTime() > MAX_AGE) .filter(reportedPeer -> new Date().getTime() - reportedPeer.date.getTime() > MAX_AGE)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
reportedPeersToRemove.forEach(this::removeReportedPeer); reportedPeersToRemove.forEach(this::removeReportedPeer);
} }
public Set<ReportedPeer> getReportedPeers() { public Set<Peer> getReportedPeers() {
return reportedPeers; return reportedPeers;
} }
public void addToReportedPeers(HashSet<ReportedPeer> reportedPeersToAdd, Connection connection) { public void addToReportedPeers(HashSet<Peer> reportedPeersToAdd, Connection connection) {
printNewReportedPeers(reportedPeersToAdd); printNewReportedPeers(reportedPeersToAdd);
// We check if the reported msg is not violating our rules // We check if the reported msg is not violating our rules
@ -371,10 +371,10 @@ public class PeerManager implements ConnectionListener {
log.trace("We have already {} reported peers which exceeds our limit of {}." + log.trace("We have already {} reported peers which exceeds our limit of {}." +
"We remove random peers from the reported peers list.", size, limit); "We remove random peers from the reported peers list.", size, limit);
int diff = size - limit; int diff = size - limit;
List<ReportedPeer> list = new ArrayList<>(reportedPeers); List<Peer> list = new ArrayList<>(reportedPeers);
// we dont use sorting by lastActivityDate to keep it more random // we dont use sorting by lastActivityDate to keep it more random
for (int i = 0; i < diff; i++) { for (int i = 0; i < diff; i++) {
ReportedPeer toRemove = list.remove(new Random().nextInt(list.size())); Peer toRemove = list.remove(new Random().nextInt(list.size()));
removeReportedPeer(toRemove); removeReportedPeer(toRemove);
} }
} else { } else {
@ -395,7 +395,7 @@ public class PeerManager implements ConnectionListener {
} }
} }
private void printNewReportedPeers(HashSet<ReportedPeer> reportedPeers) { private void printNewReportedPeers(HashSet<Peer> reportedPeers) {
if (printReportedPeersDetails) { if (printReportedPeersDetails) {
StringBuilder result = new StringBuilder("We received new reportedPeers:"); StringBuilder result = new StringBuilder("We received new reportedPeers:");
reportedPeers.stream().forEach(e -> result.append("\n\t").append(e)); reportedPeers.stream().forEach(e -> result.append("\n\t").append(e));
@ -409,7 +409,7 @@ public class PeerManager implements ConnectionListener {
// Persisted peers // Persisted peers
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
private boolean removePersistedPeer(ReportedPeer persistedPeer) { private boolean removePersistedPeer(Peer persistedPeer) {
if (persistedPeers.contains(persistedPeer)) { if (persistedPeers.contains(persistedPeer)) {
persistedPeers.remove(persistedPeer); persistedPeers.remove(persistedPeer);
@ -423,18 +423,18 @@ public class PeerManager implements ConnectionListener {
} }
private boolean removePersistedPeer(NodeAddress nodeAddress) { private boolean removePersistedPeer(NodeAddress nodeAddress) {
Optional<ReportedPeer> persistedPeerOptional = getPersistedPeerOptional(nodeAddress); Optional<Peer> persistedPeerOptional = getPersistedPeerOptional(nodeAddress);
return persistedPeerOptional.isPresent() && removePersistedPeer(persistedPeerOptional.get()); return persistedPeerOptional.isPresent() && removePersistedPeer(persistedPeerOptional.get());
} }
private Optional<ReportedPeer> getPersistedPeerOptional(NodeAddress nodeAddress) { private Optional<Peer> getPersistedPeerOptional(NodeAddress nodeAddress) {
return persistedPeers.stream() return persistedPeers.stream()
.filter(e -> e.nodeAddress.equals(nodeAddress)).findAny(); .filter(e -> e.nodeAddress.equals(nodeAddress)).findAny();
} }
private void removeTooOldPersistedPeers() { private void removeTooOldPersistedPeers() {
Log.traceCall(); Log.traceCall();
Set<ReportedPeer> persistedPeersToRemove = persistedPeers.stream() Set<Peer> persistedPeersToRemove = persistedPeers.stream()
.filter(reportedPeer -> new Date().getTime() - reportedPeer.date.getTime() > MAX_AGE) .filter(reportedPeer -> new Date().getTime() - reportedPeer.date.getTime() > MAX_AGE)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
persistedPeersToRemove.forEach(this::removePersistedPeer); persistedPeersToRemove.forEach(this::removePersistedPeer);
@ -448,10 +448,10 @@ public class PeerManager implements ConnectionListener {
log.trace("We have already {} persisted peers which exceeds our limit of {}." + log.trace("We have already {} persisted peers which exceeds our limit of {}." +
"We remove random peers from the persisted peers list.", size, limit); "We remove random peers from the persisted peers list.", size, limit);
int diff = size - limit; int diff = size - limit;
List<ReportedPeer> list = new ArrayList<>(persistedPeers); List<Peer> list = new ArrayList<>(persistedPeers);
// we dont use sorting by lastActivityDate to avoid attack vectors and keep it more random // we dont use sorting by lastActivityDate to avoid attack vectors and keep it more random
for (int i = 0; i < diff; i++) { for (int i = 0; i < diff; i++) {
ReportedPeer toRemove = list.remove(new Random().nextInt(list.size())); Peer toRemove = list.remove(new Random().nextInt(list.size()));
removePersistedPeer(toRemove); removePersistedPeer(toRemove);
} }
} else { } else {
@ -459,7 +459,7 @@ public class PeerManager implements ConnectionListener {
} }
} }
public Set<ReportedPeer> getPersistedPeers() { public Set<Peer> getPersistedPeers() {
return persistedPeers; return persistedPeers;
} }
@ -472,7 +472,7 @@ public class PeerManager implements ConnectionListener {
return networkNode.getNodeAddressesOfConfirmedConnections().size() >= MIN_CONNECTIONS; return networkNode.getNodeAddressesOfConfirmedConnections().size() >= MIN_CONNECTIONS;
} }
public boolean isSeedNode(ReportedPeer reportedPeer) { public boolean isSeedNode(Peer reportedPeer) {
return seedNodeAddresses.contains(reportedPeer.nodeAddress); return seedNodeAddresses.contains(reportedPeer.nodeAddress);
} }
@ -484,7 +484,7 @@ public class PeerManager implements ConnectionListener {
return connection.hasPeersNodeAddress() && seedNodeAddresses.contains(connection.getPeersNodeAddressOptional().get()); return connection.hasPeersNodeAddress() && seedNodeAddresses.contains(connection.getPeersNodeAddressOptional().get());
} }
public boolean isSelf(ReportedPeer reportedPeer) { public boolean isSelf(Peer reportedPeer) {
return isSelf(reportedPeer.nodeAddress); return isSelf(reportedPeer.nodeAddress);
} }
@ -492,7 +492,7 @@ public class PeerManager implements ConnectionListener {
return nodeAddress.equals(networkNode.getNodeAddress()); return nodeAddress.equals(networkNode.getNodeAddress());
} }
public boolean isConfirmed(ReportedPeer reportedPeer) { public boolean isConfirmed(Peer reportedPeer) {
return isConfirmed(reportedPeer.nodeAddress); return isConfirmed(reportedPeer.nodeAddress);
} }
@ -512,10 +512,10 @@ public class PeerManager implements ConnectionListener {
public void handleConnectionFault(NodeAddress nodeAddress, @Nullable Connection connection) { public void handleConnectionFault(NodeAddress nodeAddress, @Nullable Connection connection) {
Log.traceCall("nodeAddress=" + nodeAddress); Log.traceCall("nodeAddress=" + nodeAddress);
boolean doRemovePersistedPeer = false; boolean doRemovePersistedPeer = false;
ReportedPeer reportedPeer = removeReportedPeer(nodeAddress); Peer reportedPeer = removeReportedPeer(nodeAddress);
Optional<ReportedPeer> persistedPeerOptional = getPersistedPeerOptional(nodeAddress); Optional<Peer> persistedPeerOptional = getPersistedPeerOptional(nodeAddress);
if (persistedPeerOptional.isPresent()) { if (persistedPeerOptional.isPresent()) {
ReportedPeer persistedPeer = persistedPeerOptional.get(); Peer persistedPeer = persistedPeerOptional.get();
persistedPeer.increaseFailedConnectionAttempts(); persistedPeer.increaseFailedConnectionAttempts();
doRemovePersistedPeer = persistedPeer.tooManyFailedConnectionAttempts(); doRemovePersistedPeer = persistedPeer.tooManyFailedConnectionAttempts();
} }
@ -541,7 +541,7 @@ public class PeerManager implements ConnectionListener {
.ifPresent(connection -> connection.shutDown(closeConnectionReason)); .ifPresent(connection -> connection.shutDown(closeConnectionReason));
} }
public HashSet<ReportedPeer> getConnectedNonSeedNodeReportedPeers(NodeAddress excludedNodeAddress) { public HashSet<Peer> getConnectedNonSeedNodeReportedPeers(NodeAddress excludedNodeAddress) {
return new HashSet<>(getConnectedNonSeedNodeReportedPeers().stream() return new HashSet<>(getConnectedNonSeedNodeReportedPeers().stream()
.filter(e -> !e.nodeAddress.equals(excludedNodeAddress)) .filter(e -> !e.nodeAddress.equals(excludedNodeAddress))
.collect(Collectors.toSet())); .collect(Collectors.toSet()));
@ -551,15 +551,15 @@ public class PeerManager implements ConnectionListener {
// Private // Private
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
private Set<ReportedPeer> getConnectedReportedPeers() { private Set<Peer> getConnectedReportedPeers() {
// networkNode.getConfirmedConnections includes: // networkNode.getConfirmedConnections includes:
// filter(connection -> connection.getPeersNodeAddressOptional().isPresent()) // filter(connection -> connection.getPeersNodeAddressOptional().isPresent())
return networkNode.getConfirmedConnections().stream() return networkNode.getConfirmedConnections().stream()
.map(c -> new ReportedPeer(c.getPeersNodeAddressOptional().get())) .map(c -> new Peer(c.getPeersNodeAddressOptional().get()))
.collect(Collectors.toSet()); .collect(Collectors.toSet());
} }
private HashSet<ReportedPeer> getConnectedNonSeedNodeReportedPeers() { private HashSet<Peer> getConnectedNonSeedNodeReportedPeers() {
return new HashSet<>(getConnectedReportedPeers().stream() return new HashSet<>(getConnectedReportedPeers().stream()
.filter(e -> !isSeedNode(e)) .filter(e -> !isSeedNode(e))
.collect(Collectors.toSet())); .collect(Collectors.toSet()));

View file

@ -8,7 +8,7 @@ import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.*; import io.bitsquare.p2p.network.*;
import io.bitsquare.p2p.peers.PeerManager; import io.bitsquare.p2p.peers.PeerManager;
import io.bitsquare.p2p.peers.getdata.messages.GetDataRequest; import io.bitsquare.p2p.peers.getdata.messages.GetDataRequest;
import io.bitsquare.p2p.peers.peerexchange.ReportedPeer; import io.bitsquare.p2p.peers.peerexchange.Peer;
import io.bitsquare.p2p.storage.P2PDataStorage; import io.bitsquare.p2p.storage.P2PDataStorage;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -324,7 +324,7 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
} }
} }
private List<NodeAddress> getSortedNodeAddresses(Collection<ReportedPeer> collection) { private List<NodeAddress> getSortedNodeAddresses(Collection<Peer> collection) {
return collection.stream() return collection.stream()
.collect(Collectors.toList()) .collect(Collectors.toList())
.stream() .stream()

View file

@ -7,7 +7,7 @@ import io.bitsquare.p2p.NodeAddress;
import java.util.Date; import java.util.Date;
public final class ReportedPeer implements Payload, Persistable { public final class Peer implements Payload, Persistable {
// That object is sent over the wire, so we need to take care of version compatibility. // That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.P2P_NETWORK_VERSION; private static final long serialVersionUID = Version.P2P_NETWORK_VERSION;
private static final int MAX_FAILED_CONNECTION_ATTEMPTS = 5; private static final int MAX_FAILED_CONNECTION_ATTEMPTS = 5;
@ -16,7 +16,7 @@ public final class ReportedPeer implements Payload, Persistable {
public final Date date; public final Date date;
transient private int failedConnectionAttempts = 0; transient private int failedConnectionAttempts = 0;
public ReportedPeer(NodeAddress nodeAddress) { public Peer(NodeAddress nodeAddress) {
this.nodeAddress = nodeAddress; this.nodeAddress = nodeAddress;
this.date = new Date(); this.date = new Date();
} }
@ -32,9 +32,9 @@ public final class ReportedPeer implements Payload, Persistable {
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o) return true;
if (!(o instanceof ReportedPeer)) return false; if (!(o instanceof Peer)) return false;
ReportedPeer that = (ReportedPeer) o; Peer that = (Peer) o;
return !(nodeAddress != null ? !nodeAddress.equals(that.nodeAddress) : that.nodeAddress != null); return !(nodeAddress != null ? !nodeAddress.equals(that.nodeAddress) : that.nodeAddress != null);

View file

@ -89,10 +89,11 @@ class PeerExchangeHandler implements MessageListener {
@Override @Override
public void onSuccess(Connection connection) { public void onSuccess(Connection connection) {
if (!stopped) { if (!stopped) {
if (!connection.getPeersNodeAddressOptional().isPresent()) { //TODO
/*if (!connection.getPeersNodeAddressOptional().isPresent()) {
connection.setPeersNodeAddress(nodeAddress); connection.setPeersNodeAddress(nodeAddress);
log.warn("sendGetPeersRequest: !connection.getPeersNodeAddressOptional().isPresent()"); log.warn("sendGetPeersRequest: !connection.getPeersNodeAddressOptional().isPresent()");
} }*/
PeerExchangeHandler.this.connection = connection; PeerExchangeHandler.this.connection = connection;
connection.addMessageListener(PeerExchangeHandler.this); connection.addMessageListener(PeerExchangeHandler.this);

View file

@ -313,7 +313,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener,
} }
} }
private List<NodeAddress> getNodeAddresses(Collection<ReportedPeer> collection) { private List<NodeAddress> getNodeAddresses(Collection<Peer> collection) {
return collection.stream() return collection.stream()
.map(e -> e.nodeAddress) .map(e -> e.nodeAddress)
.collect(Collectors.toList()); .collect(Collectors.toList());

View file

@ -3,7 +3,7 @@ package io.bitsquare.p2p.peers.peerexchange.messages;
import io.bitsquare.app.Version; import io.bitsquare.app.Version;
import io.bitsquare.p2p.NodeAddress; import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.messages.SendersNodeAddressMessage; import io.bitsquare.p2p.network.messages.SendersNodeAddressMessage;
import io.bitsquare.p2p.peers.peerexchange.ReportedPeer; import io.bitsquare.p2p.peers.peerexchange.Peer;
import java.util.HashSet; import java.util.HashSet;
@ -15,9 +15,9 @@ public final class GetPeersRequest extends PeerExchangeMessage implements Sender
private final NodeAddress senderNodeAddress; private final NodeAddress senderNodeAddress;
public final int nonce; public final int nonce;
public final HashSet<ReportedPeer> reportedPeers; public final HashSet<Peer> reportedPeers;
public GetPeersRequest(NodeAddress senderNodeAddress, int nonce, HashSet<ReportedPeer> reportedPeers) { public GetPeersRequest(NodeAddress senderNodeAddress, int nonce, HashSet<Peer> reportedPeers) {
checkNotNull(senderNodeAddress, "senderNodeAddress must not be null at GetPeersRequest"); checkNotNull(senderNodeAddress, "senderNodeAddress must not be null at GetPeersRequest");
this.senderNodeAddress = senderNodeAddress; this.senderNodeAddress = senderNodeAddress;
this.nonce = nonce; this.nonce = nonce;

View file

@ -1,7 +1,7 @@
package io.bitsquare.p2p.peers.peerexchange.messages; package io.bitsquare.p2p.peers.peerexchange.messages;
import io.bitsquare.app.Version; import io.bitsquare.app.Version;
import io.bitsquare.p2p.peers.peerexchange.ReportedPeer; import io.bitsquare.p2p.peers.peerexchange.Peer;
import java.util.HashSet; import java.util.HashSet;
@ -10,9 +10,9 @@ public final class GetPeersResponse extends PeerExchangeMessage {
private static final long serialVersionUID = Version.P2P_NETWORK_VERSION; private static final long serialVersionUID = Version.P2P_NETWORK_VERSION;
public final int requestNonce; public final int requestNonce;
public final HashSet<ReportedPeer> reportedPeers; public final HashSet<Peer> reportedPeers;
public GetPeersResponse(int requestNonce, HashSet<ReportedPeer> reportedPeers) { public GetPeersResponse(int requestNonce, HashSet<Peer> reportedPeers) {
this.requestNonce = requestNonce; this.requestNonce = requestNonce;
this.reportedPeers = reportedPeers; this.reportedPeers = reportedPeers;
} }