From a1993a5d9af18529fa111e824846415039a62378 Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Thu, 5 Nov 2015 21:59:30 +0100 Subject: [PATCH] Renaming --- .../io/bitsquare/p2p/seed/SeedNodeMain.java | 3 + .../java/io/bitsquare/app/BitsquareApp.java | 3 + .../java/io/bitsquare/p2p/P2PService.java | 36 +- .../io/bitsquare/p2p/network/Connection.java | 2 +- .../AuthenticationListener.java | 4 +- .../bitsquare/p2p/{routing => peer}/Peer.java | 2 +- .../java/io/bitsquare/p2p/peer/PeerGroup.java | 650 ++++++++++++++++++ .../PeerListener.java} | 4 +- .../messages/AuthenticationMessage.java | 2 +- .../messages/ChallengeMessage.java | 2 +- .../messages/GetPeersMessage.java | 2 +- .../messages/MaintenanceMessage.java | 2 +- .../messages/PeersMessage.java | 2 +- .../messages/PingMessage.java | 2 +- .../messages/PongMessage.java | 2 +- .../RequestAuthenticationMessage.java | 2 +- .../ProtectedExpirableDataStorage.java | 14 +- .../java/io/bitsquare/p2p/P2PServiceTest.java | 4 +- .../p2p/network/LocalhostNetworkNodeTest.java | 2 +- .../{RoutingTest.java => PeerGroupTest.java} | 44 +- .../p2p/storage/ProtectedDataStorageTest.java | 12 +- 21 files changed, 727 insertions(+), 69 deletions(-) rename network/src/main/java/io/bitsquare/p2p/{routing => peer}/AuthenticationListener.java (74%) rename network/src/main/java/io/bitsquare/p2p/{routing => peer}/Peer.java (97%) create mode 100644 network/src/main/java/io/bitsquare/p2p/peer/PeerGroup.java rename network/src/main/java/io/bitsquare/p2p/{routing/RoutingListener.java => peer/PeerListener.java} (79%) rename network/src/main/java/io/bitsquare/p2p/{routing => peer}/messages/AuthenticationMessage.java (68%) rename network/src/main/java/io/bitsquare/p2p/{routing => peer}/messages/ChallengeMessage.java (95%) rename network/src/main/java/io/bitsquare/p2p/{routing => peer}/messages/GetPeersMessage.java (95%) rename network/src/main/java/io/bitsquare/p2p/{routing => peer}/messages/MaintenanceMessage.java (67%) rename network/src/main/java/io/bitsquare/p2p/{routing => peer}/messages/PeersMessage.java (94%) rename network/src/main/java/io/bitsquare/p2p/{routing => peer}/messages/PingMessage.java (92%) rename network/src/main/java/io/bitsquare/p2p/{routing => peer}/messages/PongMessage.java (92%) rename network/src/main/java/io/bitsquare/p2p/{routing => peer}/messages/RequestAuthenticationMessage.java (94%) rename network/src/test/java/io/bitsquare/p2p/routing/{RoutingTest.java => PeerGroupTest.java} (88%) diff --git a/bootstrap/src/main/java/io/bitsquare/p2p/seed/SeedNodeMain.java b/bootstrap/src/main/java/io/bitsquare/p2p/seed/SeedNodeMain.java index 855189fb44..4686920397 100644 --- a/bootstrap/src/main/java/io/bitsquare/p2p/seed/SeedNodeMain.java +++ b/bootstrap/src/main/java/io/bitsquare/p2p/seed/SeedNodeMain.java @@ -3,6 +3,7 @@ package io.bitsquare.p2p.seed; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.bitsquare.common.UserThread; import io.bitsquare.common.util.Utilities; +import org.bitcoinj.crypto.DRMWorkaround; import org.bouncycastle.jce.provider.BouncyCastleProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,6 +27,8 @@ public class SeedNodeMain { // eg. 4444 true localhost:7777 localhost:8888 // To stop enter: q public static void main(String[] args) throws NoSuchAlgorithmException { + + DRMWorkaround.maybeDisableExportControls(); seedNodeMain = new SeedNodeMain(args); } diff --git a/gui/src/main/java/io/bitsquare/app/BitsquareApp.java b/gui/src/main/java/io/bitsquare/app/BitsquareApp.java index 5fe5f9eee3..78bd192fe1 100644 --- a/gui/src/main/java/io/bitsquare/app/BitsquareApp.java +++ b/gui/src/main/java/io/bitsquare/app/BitsquareApp.java @@ -54,6 +54,7 @@ import javafx.scene.layout.StackPane; import javafx.stage.Modality; import javafx.stage.Stage; import javafx.stage.StageStyle; +import org.bitcoinj.crypto.DRMWorkaround; import org.bouncycastle.jce.provider.BouncyCastleProvider; import org.controlsfx.dialog.Dialogs; import org.reactfx.EventStreams; @@ -115,6 +116,8 @@ public class BitsquareApp extends Application { Thread.setDefaultUncaughtExceptionHandler(handler); Thread.currentThread().setUncaughtExceptionHandler(handler); + DRMWorkaround.maybeDisableExportControls(); + Security.addProvider(new BouncyCastleProvider()); try { diff --git a/network/src/main/java/io/bitsquare/p2p/P2PService.java b/network/src/main/java/io/bitsquare/p2p/P2PService.java index 4d567214db..d2cfd39a93 100644 --- a/network/src/main/java/io/bitsquare/p2p/P2PService.java +++ b/network/src/main/java/io/bitsquare/p2p/P2PService.java @@ -16,9 +16,9 @@ import io.bitsquare.crypto.EncryptionService; import io.bitsquare.crypto.SealedAndSignedMessage; import io.bitsquare.p2p.messaging.*; import io.bitsquare.p2p.network.*; -import io.bitsquare.p2p.routing.Peer; -import io.bitsquare.p2p.routing.Routing; -import io.bitsquare.p2p.routing.RoutingListener; +import io.bitsquare.p2p.peer.Peer; +import io.bitsquare.p2p.peer.PeerGroup; +import io.bitsquare.p2p.peer.PeerListener; import io.bitsquare.p2p.seed.SeedNodesRepository; import io.bitsquare.p2p.storage.HashMapChangedListener; import io.bitsquare.p2p.storage.ProtectedExpirableDataStorage; @@ -60,7 +60,7 @@ public class P2PService { private final NetworkStatistics networkStatistics; private NetworkNode networkNode; - private Routing routing; + private PeerGroup peerGroup; private ProtectedExpirableDataStorage dataStorage; private final List decryptedMailListeners = new CopyOnWriteArrayList<>(); private final List decryptedMailboxListeners = new CopyOnWriteArrayList<>(); @@ -106,7 +106,7 @@ public class P2PService { } private void init() { - // network layer + // network if (useLocalhost) { networkNode = new LocalhostNetworkNode(port); seedNodeAddresses = seedNodesRepository.getLocalhostSeedNodeAddresses(); @@ -115,12 +115,12 @@ public class P2PService { seedNodeAddresses = seedNodesRepository.getTorSeedNodeAddresses(); } - // routing layer - routing = new Routing(networkNode, seedNodeAddresses); - if (useLocalhost) Routing.setSimulateAuthTorNode(2 * 1000); + // peer group + peerGroup = new PeerGroup(networkNode, seedNodeAddresses); + if (useLocalhost) PeerGroup.setSimulateAuthTorNode(2 * 1000); - // storage layer - dataStorage = new ProtectedExpirableDataStorage(routing, storageDir); + // storage + dataStorage = new ProtectedExpirableDataStorage(peerGroup, storageDir); // Listeners @@ -236,7 +236,7 @@ public class P2PService { } }); - routing.addRoutingListener(new RoutingListener() { + peerGroup.addPeerListener(new PeerListener() { @Override public void onFirstPeerAdded(Peer peer) { log.trace("onFirstPeer " + peer.toString()); @@ -306,8 +306,8 @@ public class P2PService { if (dataStorage != null) dataStorage.shutDown(); - if (routing != null) - routing.shutDown(); + if (peerGroup != null) + peerGroup.shutDown(); if (networkNode != null) networkNode.shutDown(() -> { @@ -351,7 +351,7 @@ public class P2PService { throw new AuthenticationException("You must be authenticated before sending direct messages."); if (!authenticatedPeerAddresses.contains(peerAddress)) - routing.authenticateToPeer(peerAddress, + peerGroup.authenticateToPeer(peerAddress, () -> doSendEncryptedMailMessage(peerAddress, pubKeyRing, message, sendMailMessageListener), () -> UserThread.execute(() -> sendMailMessageListener.onFault())); else @@ -395,7 +395,7 @@ public class P2PService { if (authenticatedPeerAddresses.contains(peerAddress)) { trySendEncryptedMailboxMessage(peerAddress, peersPubKeyRing, message, sendMailboxMessageListener); } else { - routing.authenticateToPeer(peerAddress, + peerGroup.authenticateToPeer(peerAddress, () -> trySendEncryptedMailboxMessage(peerAddress, peersPubKeyRing, message, sendMailboxMessageListener), () -> { log.info("We cannot authenticate to peer. Peer might be offline. We will store message in mailbox."); @@ -548,8 +548,8 @@ public class P2PService { return networkNode; } - public Routing getRouting() { - return routing; + public PeerGroup getPeerGroup() { + return peerGroup; } public Address getAddress() { @@ -644,7 +644,7 @@ public class P2PService { checkArgument(networkNode.getAddress() != null, "Address must be set when we are authenticated"); connectedSeedNodes.remove(networkNode.getAddress()); - routing.startAuthentication(connectedSeedNodes); + peerGroup.startAuthentication(connectedSeedNodes); } } diff --git a/network/src/main/java/io/bitsquare/p2p/network/Connection.java b/network/src/main/java/io/bitsquare/p2p/network/Connection.java index 44142ccad2..2f3e062975 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/Connection.java +++ b/network/src/main/java/io/bitsquare/p2p/network/Connection.java @@ -34,7 +34,7 @@ public class Connection { private static final Logger log = LoggerFactory.getLogger(Connection.class); private static final int MAX_MSG_SIZE = 5 * 1024 * 1024; // 5 MB of compressed data private static final int MAX_ILLEGAL_REQUESTS = 5; - private static final int SOCKET_TIMEOUT = 30 * 60 * 1000; // 30 min. + private static final int SOCKET_TIMEOUT = 60 * 1000; // 1 min. private InputHandler inputHandler; private boolean isAuthenticated; diff --git a/network/src/main/java/io/bitsquare/p2p/routing/AuthenticationListener.java b/network/src/main/java/io/bitsquare/p2p/peer/AuthenticationListener.java similarity index 74% rename from network/src/main/java/io/bitsquare/p2p/routing/AuthenticationListener.java rename to network/src/main/java/io/bitsquare/p2p/peer/AuthenticationListener.java index 08e4faa0d0..2c1810158d 100644 --- a/network/src/main/java/io/bitsquare/p2p/routing/AuthenticationListener.java +++ b/network/src/main/java/io/bitsquare/p2p/peer/AuthenticationListener.java @@ -1,9 +1,9 @@ -package io.bitsquare.p2p.routing; +package io.bitsquare.p2p.peer; import io.bitsquare.p2p.Address; import io.bitsquare.p2p.network.Connection; -public abstract class AuthenticationListener implements RoutingListener { +public abstract class AuthenticationListener implements PeerListener { public void onFirstPeerAdded(Peer peer) { } diff --git a/network/src/main/java/io/bitsquare/p2p/routing/Peer.java b/network/src/main/java/io/bitsquare/p2p/peer/Peer.java similarity index 97% rename from network/src/main/java/io/bitsquare/p2p/routing/Peer.java rename to network/src/main/java/io/bitsquare/p2p/peer/Peer.java index cb7b7a3141..8e63bda9b1 100644 --- a/network/src/main/java/io/bitsquare/p2p/routing/Peer.java +++ b/network/src/main/java/io/bitsquare/p2p/peer/Peer.java @@ -1,4 +1,4 @@ -package io.bitsquare.p2p.routing; +package io.bitsquare.p2p.peer; import io.bitsquare.p2p.Address; import io.bitsquare.p2p.network.Connection; diff --git a/network/src/main/java/io/bitsquare/p2p/peer/PeerGroup.java b/network/src/main/java/io/bitsquare/p2p/peer/PeerGroup.java new file mode 100644 index 0000000000..5fcf477260 --- /dev/null +++ b/network/src/main/java/io/bitsquare/p2p/peer/PeerGroup.java @@ -0,0 +1,650 @@ +package io.bitsquare.p2p.peer; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.SettableFuture; +import io.bitsquare.common.UserThread; +import io.bitsquare.common.util.Utilities; +import io.bitsquare.p2p.Address; +import io.bitsquare.p2p.network.*; +import io.bitsquare.p2p.peer.messages.*; +import io.bitsquare.p2p.storage.messages.BroadcastMessage; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class PeerGroup { + private static final Logger log = LoggerFactory.getLogger(PeerGroup.class); + + private static int simulateAuthTorNode = 0; + + public static void setSimulateAuthTorNode(int simulateAuthTorNode) { + PeerGroup.simulateAuthTorNode = simulateAuthTorNode; + } + + private static int MAX_CONNECTIONS = 8; + private static int MAINTENANCE_INTERVAL = new Random().nextInt(2 * 60 * 1000) + 2 * 60 * 1000; // 2-4 min. + private static int PING_AFTER_CONNECTION_INACTIVITY = 30 * 1000; + private long startAuthTs; + + public static void setMaxConnections(int maxConnections) { + MAX_CONNECTIONS = maxConnections; + } + + private final NetworkNode networkNode; + private final List
seedNodes; + private final Map nonceMap = new ConcurrentHashMap<>(); + private final List peerListeners = new CopyOnWriteArrayList<>(); + private final Map authenticatedPeers = new ConcurrentHashMap<>(); + private final Set
reportedPeerAddresses = new CopyOnWriteArraySet<>(); + private final Map authenticationCompleteHandlers = new ConcurrentHashMap<>(); + private final Timer maintenanceTimer = new Timer(); + private volatile boolean shutDownInProgress; + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Constructor + /////////////////////////////////////////////////////////////////////////////////////////// + + public PeerGroup(final NetworkNode networkNode, List
seeds) { + this.networkNode = networkNode; + + // We copy it as we remove ourselves later from the list if we are a seed node + this.seedNodes = new CopyOnWriteArrayList<>(seeds); + + init(networkNode); + } + + private void init(NetworkNode networkNode) { + networkNode.addMessageListener((message, connection) -> { + if (message instanceof AuthenticationMessage) + processAuthenticationMessage((AuthenticationMessage) message, connection); + else if (message instanceof MaintenanceMessage) + processMaintenanceMessage((MaintenanceMessage) message, connection); + }); + + networkNode.addConnectionListener(new ConnectionListener() { + @Override + public void onConnection(Connection connection) { + } + + @Override + public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) { + } + + @Override + public void onDisconnect(Reason reason, Connection connection) { + // only removes authenticated nodes + if (connection.isAuthenticated()) + removePeer(connection.getPeerAddress()); + } + + @Override + public void onError(Throwable throwable) { + } + }); + + networkNode.addSetupListener(new SetupListener() { + @Override + public void onTorNodeReady() { + } + + @Override + public void onHiddenServiceReady() { + // remove ourselves in case we are a seed node + Address myAddress = getAddress(); + if (myAddress != null) + seedNodes.remove(myAddress); + } + + @Override + public void onSetupFailed(Throwable throwable) { + } + }); + + maintenanceTimer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + Thread.currentThread().setName("MaintenanceTimer-" + new Random().nextInt(1000)); + try { + UserThread.execute(() -> { + disconnectOldConnections(); + pingPeers(); + }); + } catch (Throwable t) { + t.printStackTrace(); + log.error("Executing task failed. " + t.getMessage()); + } + } + }, MAINTENANCE_INTERVAL, MAINTENANCE_INTERVAL); + } + + private void disconnectOldConnections() { + List authenticatedConnections = networkNode.getAllConnections().stream() + .filter(e -> e.isAuthenticated()) + .collect(Collectors.toList()); + if (authenticatedConnections.size() > MAX_CONNECTIONS) { + authenticatedConnections.sort((o1, o2) -> o1.getLastActivityDate().compareTo(o2.getLastActivityDate())); + log.info("Number of connections exceeds MAX_CONNECTIONS. Current size=" + authenticatedConnections.size()); + Connection connection = authenticatedConnections.remove(0); + log.info("Shutdown oldest connection with last activity date=" + connection.getLastActivityDate() + " / connection=" + connection); + + connection.shutDown(() -> Utilities.runTimerTask(() -> { + Thread.currentThread().setName("DelayDisconnectOldConnectionsTimer-" + new Random().nextInt(1000)); + disconnectOldConnections(); + }, 1, TimeUnit.MILLISECONDS)); + } + } + + private void pingPeers() { + log.trace("pingPeers"); + List connectedPeersList = new ArrayList<>(authenticatedPeers.values()); + connectedPeersList.stream() + .filter(e -> (new Date().getTime() - e.connection.getLastActivityDate().getTime()) > PING_AFTER_CONNECTION_INACTIVITY) + .forEach(e -> Utilities.runTimerTaskWithRandomDelay(() -> { + Thread.currentThread().setName("DelayPingPeersTimer-" + new Random().nextInt(1000)); + SettableFuture future = networkNode.sendMessage(e.connection, new PingMessage(e.getPingNonce())); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.trace("PingMessage sent successfully"); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + log.info("PingMessage sending failed " + throwable.getMessage()); + removePeer(e.address); + } + }); + }, 5, 10)); + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // API + /////////////////////////////////////////////////////////////////////////////////////////// + + public void shutDown() { + if (!shutDownInProgress) { + shutDownInProgress = true; + if (maintenanceTimer != null) + maintenanceTimer.cancel(); + } + } + + public void broadcast(BroadcastMessage message, @Nullable Address sender) { + log.trace("Broadcast message to " + authenticatedPeers.values().size() + " peers."); + log.trace("message = " + message); + printConnectedPeersMap(); + + authenticatedPeers.values().stream() + .filter(e -> !e.address.equals(sender)) + .forEach(peer -> { + log.trace("Broadcast message from " + getAddress() + " to " + peer.address + "."); + SettableFuture future = networkNode.sendMessage(peer.address, message); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.trace("Broadcast from " + getAddress() + " to " + peer.address + " succeeded."); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + log.info("Broadcast failed. " + throwable.getMessage()); + removePeer(peer.address); + } + }); + }); + } + + public void addMessageListener(MessageListener messageListener) { + networkNode.addMessageListener(messageListener); + } + + public void removeMessageListener(MessageListener messageListener) { + networkNode.removeMessageListener(messageListener); + } + + public void addPeerListener(PeerListener peerListener) { + peerListeners.add(peerListener); + } + + public void removePeerListener(PeerListener peerListener) { + peerListeners.remove(peerListener); + } + + public Map getAuthenticatedPeers() { + return authenticatedPeers; + } + + // Use ArrayList not List as we need it serializable + public ArrayList
getAllPeerAddresses() { + ArrayList
allPeerAddresses = new ArrayList<>(reportedPeerAddresses); + allPeerAddresses.addAll(authenticatedPeers.values().stream() + .map(e -> e.address).collect(Collectors.toList())); + // remove own address and seed nodes + allPeerAddresses.remove(getAddress()); + return allPeerAddresses; + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Authentication + /////////////////////////////////////////////////////////////////////////////////////////// + + // authentication example: + // node2 -> node1 RequestAuthenticationMessage + // node1: close connection + // node1 -> node2 ChallengeMessage on new connection + // node2: authentication to node1 done if nonce ok + // node2 -> node1 GetPeersMessage + // node1: authentication to node2 done if nonce ok + // node1 -> node2 PeersMessage + + public void startAuthentication(Set
connectedSeedNodes) { + connectedSeedNodes.forEach(connectedSeedNode -> { + sendRequestAuthenticationMessage(seedNodes, connectedSeedNode); + }); + } + + private void sendRequestAuthenticationMessage(final List
remainingSeedNodes, final Address address) { + log.info("We try to authenticate to a random seed node. " + address); + startAuthTs = System.currentTimeMillis(); + final boolean[] alreadyConnected = {false}; + authenticatedPeers.values().stream().forEach(e -> { + remainingSeedNodes.remove(e.address); + if (address.equals(e.address)) + alreadyConnected[0] = true; + }); + if (!alreadyConnected[0]) { + long nonce = addToMapAndGetNonce(address); + SettableFuture future = networkNode.sendMessage(address, new RequestAuthenticationMessage(getAddress(), nonce)); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable Connection connection) { + log.info("send RequestAuthenticationMessage to " + address + " succeeded."); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + log.info("Send RequestAuthenticationMessage to " + address + " failed. Exception:" + throwable.getMessage()); + log.trace("We try to authenticate to another random seed nodes of that list: " + remainingSeedNodes); + getNextSeedNode(remainingSeedNodes); + } + }); + } else { + getNextSeedNode(remainingSeedNodes); + } + } + + private void getNextSeedNode(List
remainingSeedNodes) { + List
remainingSeedNodeAddresses = new CopyOnWriteArrayList<>(remainingSeedNodes); + + Address myAddress = getAddress(); + if (myAddress != null) + remainingSeedNodeAddresses.remove(myAddress); + + if (!remainingSeedNodeAddresses.isEmpty()) { + Collections.shuffle(remainingSeedNodeAddresses); + Address address = remainingSeedNodeAddresses.remove(0); + sendRequestAuthenticationMessage(remainingSeedNodeAddresses, address); + } else { + log.info("No other seed node found. That is expected for the first seed node."); + } + } + + + private void processAuthenticationMessage(AuthenticationMessage message, Connection connection) { + log.trace("processAuthenticationMessage " + message + " from " + connection.getPeerAddress() + " at " + getAddress()); + if (message instanceof RequestAuthenticationMessage) { + RequestAuthenticationMessage requestAuthenticationMessage = (RequestAuthenticationMessage) message; + Address peerAddress = requestAuthenticationMessage.address; + log.trace("RequestAuthenticationMessage from " + peerAddress + " at " + getAddress()); + + connection.shutDown(() -> Utilities.runTimerTask(() -> { + Thread.currentThread().setName("DelaySendChallengeMessageTimer-" + new Random().nextInt(1000)); + // we delay a bit as listeners for connection.onDisconnect are on other threads and might lead to + // inconsistent state (removal of connection from NetworkNode.authenticatedConnections) + log.trace("processAuthenticationMessage: connection.shutDown complete. RequestAuthenticationMessage from " + peerAddress + " at " + getAddress()); + long nonce = addToMapAndGetNonce(peerAddress); + SettableFuture future = networkNode.sendMessage(peerAddress, new ChallengeMessage(getAddress(), requestAuthenticationMessage.nonce, nonce)); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.debug("onSuccess sending ChallengeMessage"); + } + + @Override + public void onFailure(Throwable throwable) { + log.warn("onFailure sending ChallengeMessage. We try again."); + SettableFuture future = networkNode.sendMessage(peerAddress, new ChallengeMessage(getAddress(), requestAuthenticationMessage.nonce, nonce)); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.debug("onSuccess sending 2. ChallengeMessage"); + } + + @Override + public void onFailure(Throwable throwable) { + log.warn("onFailure sending ChallengeMessage. We give up."); + } + }); + } + }); + }, + 100 + simulateAuthTorNode, + TimeUnit.MILLISECONDS)); + } else if (message instanceof ChallengeMessage) { + ChallengeMessage challengeMessage = (ChallengeMessage) message; + Address peerAddress = challengeMessage.address; + log.trace("ChallengeMessage from " + peerAddress + " at " + getAddress()); + HashMap tempNonceMap = new HashMap<>(nonceMap); + boolean verified = verifyNonceAndAuthenticatePeerAddress(challengeMessage.requesterNonce, peerAddress); + if (verified) { + connection.setPeerAddress(peerAddress); + SettableFuture future = networkNode.sendMessage(peerAddress, + new GetPeersMessage(getAddress(), challengeMessage.challengerNonce, getAllPeerAddresses())); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.trace("GetPeersMessage sent successfully from " + getAddress() + " to " + peerAddress); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + log.info("GetPeersMessage sending failed " + throwable.getMessage()); + removePeer(peerAddress); + } + }); + } else { + log.warn("verifyNonceAndAuthenticatePeerAddress failed. challengeMessage=" + challengeMessage + " / nonceMap=" + tempNonceMap); + } + } else if (message instanceof GetPeersMessage) { + GetPeersMessage getPeersMessage = (GetPeersMessage) message; + Address peerAddress = getPeersMessage.address; + log.trace("GetPeersMessage from " + peerAddress + " at " + getAddress()); + boolean verified = verifyNonceAndAuthenticatePeerAddress(getPeersMessage.challengerNonce, peerAddress); + if (verified) { + setAuthenticated(connection, peerAddress); + purgeReportedPeers(); + SettableFuture future = networkNode.sendMessage(peerAddress, + new PeersMessage(getAddress(), getAllPeerAddresses())); + log.trace("sent PeersMessage to " + peerAddress + " from " + getAddress() + + " with allPeers=" + getAllPeerAddresses()); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.trace("PeersMessage sent successfully from " + getAddress() + " to " + peerAddress); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + log.info("PeersMessage sending failed " + throwable.getMessage()); + removePeer(peerAddress); + } + }); + + // now we add the reported peers to our own set + ArrayList
peerAddresses = ((GetPeersMessage) message).peerAddresses; + log.trace("Received peers: " + peerAddresses); + // remove ourselves + addToReportedPeers(peerAddresses, connection); + } + } else if (message instanceof PeersMessage) { + PeersMessage peersMessage = (PeersMessage) message; + Address peerAddress = peersMessage.address; + log.trace("PeersMessage from " + peerAddress + " at " + getAddress()); + ArrayList
peerAddresses = peersMessage.peerAddresses; + log.trace("Received peers: " + peerAddresses); + // remove ourselves + addToReportedPeers(peerAddresses, connection); + + // we wait until the handshake is completed before setting the authenticate flag + // authentication at both sides of the connection + + log.info("\n\nAuthenticationComplete\nPeer with address " + peerAddress + + " authenticated (" + connection.getObjectId() + "). Took " + + (System.currentTimeMillis() - startAuthTs) + " ms. \n\n"); + + setAuthenticated(connection, peerAddress); + + Runnable authenticationCompleteHandler = authenticationCompleteHandlers.remove(connection.getPeerAddress()); + if (authenticationCompleteHandler != null) + authenticationCompleteHandler.run(); + + authenticateToNextRandomPeer(); + } + } + + private void addToReportedPeers(ArrayList
peerAddresses, Connection connection) { + log.trace("addToReportedPeers"); + // we disconnect misbehaving nodes trying to send too many peers + // reported peers include the peers connected peers which is normally max. 8 but we give some headroom + // for safety + if (peerAddresses.size() > 1100) { + connection.shutDown(); + } else { + peerAddresses.remove(getAddress()); + reportedPeerAddresses.addAll(peerAddresses); + purgeReportedPeers(); + } + } + + private void purgeReportedPeers() { + log.trace("purgeReportedPeers"); + int all = getAllPeerAddresses().size(); + if (all > 1000) { + int diff = all - 100; + List
list = getNotConnectedPeerAddresses(); + for (int i = 0; i < diff; i++) { + Address toRemove = list.remove(new Random().nextInt(list.size())); + reportedPeerAddresses.remove(toRemove); + } + } + } + + private List
getNotConnectedPeerAddresses() { + ArrayList
list = new ArrayList<>(getAllPeerAddresses()); + log.debug("## getNotConnectedPeerAddresses "); + log.debug("## reportedPeersList=" + list); + authenticatedPeers.values().stream().forEach(e -> list.remove(e.address)); + log.debug("## connectedPeers=" + authenticatedPeers); + log.debug("## reportedPeersList=" + list); + return list; + } + + private void authenticateToNextRandomPeer() { + Utilities.runTimerTaskWithRandomDelay(() -> { + Thread.currentThread().setName("DelayAuthenticateToNextRandomPeerTimer-" + new Random().nextInt(1000)); + if (getAuthenticatedPeers().size() <= MAX_CONNECTIONS) { + Address randomNotConnectedPeerAddress = getRandomNotConnectedPeerAddress(); + if (randomNotConnectedPeerAddress != null) { + log.info("We try to build an authenticated connection to a random peer. " + randomNotConnectedPeerAddress); + authenticateToPeer(randomNotConnectedPeerAddress, null, () -> authenticateToNextRandomPeer()); + } else { + log.info("No more peers available for connecting."); + } + } else { + log.info("We have already enough connections."); + } + }, 200, 400, TimeUnit.MILLISECONDS); + } + + public void authenticateToPeer(Address address, @Nullable Runnable authenticationCompleteHandler, @Nullable Runnable faultHandler) { + startAuthTs = System.currentTimeMillis(); + + if (authenticationCompleteHandler != null) + authenticationCompleteHandlers.put(address, authenticationCompleteHandler); + + long nonce = addToMapAndGetNonce(address); + SettableFuture future = networkNode.sendMessage(address, new RequestAuthenticationMessage(getAddress(), nonce)); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable Connection connection) { + log.debug("send RequestAuthenticationMessage succeeded"); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + log.info("send IdMessage failed. " + throwable.getMessage()); + removePeer(address); + if (faultHandler != null) faultHandler.run(); + } + }); + } + + private long addToMapAndGetNonce(Address peerAddress) { + long nonce = new Random().nextLong(); + while (nonce == 0) { + nonce = new Random().nextLong(); + } + log.trace("addToMapAndGetNonce nonceMap=" + nonceMap + " / peerAddress=" + peerAddress); + nonceMap.put(peerAddress, nonce); + return nonce; + } + + private boolean verifyNonceAndAuthenticatePeerAddress(long peersNonce, Address peerAddress) { + log.trace("verifyNonceAndAuthenticatePeerAddress nonceMap=" + nonceMap + " / peerAddress=" + peerAddress); + Long nonce = nonceMap.remove(peerAddress); + return nonce != null && nonce == peersNonce; + } + + private void setAuthenticated(Connection connection, Address peerAddress) { + log.info("\n\n############################################################\n" + + "We are authenticated to:" + + "\nconnection=" + connection + + "\nmyAddress=" + getAddress() + + "\npeerAddress= " + peerAddress + + "\n############################################################\n"); + + connection.setAuthenticated(peerAddress, connection); + + Peer peer = new Peer(connection); + addAuthenticatedPeer(peerAddress, peer); + + peerListeners.stream().forEach(e -> e.onConnectionAuthenticated(connection)); + + log.debug("\n### setAuthenticated post connection " + connection); + } + + private Address getRandomNotConnectedPeerAddress() { + List
list = getNotConnectedPeerAddresses(); + if (list.size() > 0) { + Collections.shuffle(list); + return list.get(0); + } else { + return null; + } + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Maintenance + /////////////////////////////////////////////////////////////////////////////////////////// + + private void processMaintenanceMessage(MaintenanceMessage message, Connection connection) { + log.debug("Received message " + message + " at " + getAddress() + " from " + connection.getPeerAddress()); + if (message instanceof PingMessage) { + SettableFuture future = networkNode.sendMessage(connection, new PongMessage(((PingMessage) message).nonce)); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.trace("PongMessage sent successfully"); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + log.info("PongMessage sending failed " + throwable.getMessage()); + removePeer(connection.getPeerAddress()); + } + }); + } else if (message instanceof PongMessage) { + Peer peer = authenticatedPeers.get(connection.getPeerAddress()); + if (peer != null) { + if (((PongMessage) message).nonce != peer.getPingNonce()) { + removePeer(peer.address); + log.warn("PongMessage invalid: self/peer " + getAddress() + "/" + connection.getPeerAddress()); + } + } + } + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Peers + /////////////////////////////////////////////////////////////////////////////////////////// + + private void removePeer(@Nullable Address peerAddress) { + reportedPeerAddresses.remove(peerAddress); + + Peer disconnectedPeer; + disconnectedPeer = authenticatedPeers.remove(peerAddress); + + if (disconnectedPeer != null) + UserThread.execute(() -> peerListeners.stream().forEach(e -> e.onPeerRemoved(peerAddress))); + + log.trace("removePeer [post]"); + printConnectedPeersMap(); + printReportedPeersMap(); + + log.trace("removePeer nonceMap=" + nonceMap + " / peerAddress=" + peerAddress); + nonceMap.remove(peerAddress); + } + + private void addAuthenticatedPeer(Address address, Peer peer) { + boolean firstPeerAdded; + authenticatedPeers.put(address, peer); + firstPeerAdded = authenticatedPeers.size() == 1; + + UserThread.execute(() -> peerListeners.stream().forEach(e -> e.onPeerAdded(peer))); + + if (firstPeerAdded) + UserThread.execute(() -> peerListeners.stream().forEach(e -> e.onFirstPeerAdded(peer))); + + if (authenticatedPeers.size() > MAX_CONNECTIONS) + disconnectOldConnections(); + + log.trace("addConnectedPeer [post]"); + printConnectedPeersMap(); + } + + private Address getAddress() { + return networkNode.getAddress(); + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Utils + /////////////////////////////////////////////////////////////////////////////////////////// + + public void printConnectedPeersMap() { + StringBuilder result = new StringBuilder("\nConnected peers for node " + getAddress() + ":"); + authenticatedPeers.values().stream().forEach(e -> { + result.append("\n\t" + e.address); + }); + result.append("\n"); + log.info(result.toString()); + } + + public void printReportedPeersMap() { + StringBuilder result = new StringBuilder("\nReported peerAddresses for node " + getAddress() + ":"); + reportedPeerAddresses.stream().forEach(e -> { + result.append("\n\t" + e); + }); + result.append("\n"); + log.info(result.toString()); + } + + private String getObjectId() { + return super.toString().split("@")[1].toString(); + } + +} diff --git a/network/src/main/java/io/bitsquare/p2p/routing/RoutingListener.java b/network/src/main/java/io/bitsquare/p2p/peer/PeerListener.java similarity index 79% rename from network/src/main/java/io/bitsquare/p2p/routing/RoutingListener.java rename to network/src/main/java/io/bitsquare/p2p/peer/PeerListener.java index d3d8840729..fe2b8e4605 100644 --- a/network/src/main/java/io/bitsquare/p2p/routing/RoutingListener.java +++ b/network/src/main/java/io/bitsquare/p2p/peer/PeerListener.java @@ -1,9 +1,9 @@ -package io.bitsquare.p2p.routing; +package io.bitsquare.p2p.peer; import io.bitsquare.p2p.Address; import io.bitsquare.p2p.network.Connection; -public interface RoutingListener { +public interface PeerListener { void onFirstPeerAdded(Peer peer); void onPeerAdded(Peer peer); diff --git a/network/src/main/java/io/bitsquare/p2p/routing/messages/AuthenticationMessage.java b/network/src/main/java/io/bitsquare/p2p/peer/messages/AuthenticationMessage.java similarity index 68% rename from network/src/main/java/io/bitsquare/p2p/routing/messages/AuthenticationMessage.java rename to network/src/main/java/io/bitsquare/p2p/peer/messages/AuthenticationMessage.java index 8448e6240e..141a3ff722 100644 --- a/network/src/main/java/io/bitsquare/p2p/routing/messages/AuthenticationMessage.java +++ b/network/src/main/java/io/bitsquare/p2p/peer/messages/AuthenticationMessage.java @@ -1,4 +1,4 @@ -package io.bitsquare.p2p.routing.messages; +package io.bitsquare.p2p.peer.messages; import io.bitsquare.p2p.Message; diff --git a/network/src/main/java/io/bitsquare/p2p/routing/messages/ChallengeMessage.java b/network/src/main/java/io/bitsquare/p2p/peer/messages/ChallengeMessage.java similarity index 95% rename from network/src/main/java/io/bitsquare/p2p/routing/messages/ChallengeMessage.java rename to network/src/main/java/io/bitsquare/p2p/peer/messages/ChallengeMessage.java index 2de51fe625..1cfacfe15a 100644 --- a/network/src/main/java/io/bitsquare/p2p/routing/messages/ChallengeMessage.java +++ b/network/src/main/java/io/bitsquare/p2p/peer/messages/ChallengeMessage.java @@ -1,4 +1,4 @@ -package io.bitsquare.p2p.routing.messages; +package io.bitsquare.p2p.peer.messages; import io.bitsquare.app.Version; import io.bitsquare.p2p.Address; diff --git a/network/src/main/java/io/bitsquare/p2p/routing/messages/GetPeersMessage.java b/network/src/main/java/io/bitsquare/p2p/peer/messages/GetPeersMessage.java similarity index 95% rename from network/src/main/java/io/bitsquare/p2p/routing/messages/GetPeersMessage.java rename to network/src/main/java/io/bitsquare/p2p/peer/messages/GetPeersMessage.java index 9412c8b97b..e5eb02f891 100644 --- a/network/src/main/java/io/bitsquare/p2p/routing/messages/GetPeersMessage.java +++ b/network/src/main/java/io/bitsquare/p2p/peer/messages/GetPeersMessage.java @@ -1,4 +1,4 @@ -package io.bitsquare.p2p.routing.messages; +package io.bitsquare.p2p.peer.messages; import io.bitsquare.app.Version; import io.bitsquare.p2p.Address; diff --git a/network/src/main/java/io/bitsquare/p2p/routing/messages/MaintenanceMessage.java b/network/src/main/java/io/bitsquare/p2p/peer/messages/MaintenanceMessage.java similarity index 67% rename from network/src/main/java/io/bitsquare/p2p/routing/messages/MaintenanceMessage.java rename to network/src/main/java/io/bitsquare/p2p/peer/messages/MaintenanceMessage.java index 16c087fc9d..e5455d4bba 100644 --- a/network/src/main/java/io/bitsquare/p2p/routing/messages/MaintenanceMessage.java +++ b/network/src/main/java/io/bitsquare/p2p/peer/messages/MaintenanceMessage.java @@ -1,4 +1,4 @@ -package io.bitsquare.p2p.routing.messages; +package io.bitsquare.p2p.peer.messages; import io.bitsquare.p2p.Message; diff --git a/network/src/main/java/io/bitsquare/p2p/routing/messages/PeersMessage.java b/network/src/main/java/io/bitsquare/p2p/peer/messages/PeersMessage.java similarity index 94% rename from network/src/main/java/io/bitsquare/p2p/routing/messages/PeersMessage.java rename to network/src/main/java/io/bitsquare/p2p/peer/messages/PeersMessage.java index 8d05a33044..cc687234f7 100644 --- a/network/src/main/java/io/bitsquare/p2p/routing/messages/PeersMessage.java +++ b/network/src/main/java/io/bitsquare/p2p/peer/messages/PeersMessage.java @@ -1,4 +1,4 @@ -package io.bitsquare.p2p.routing.messages; +package io.bitsquare.p2p.peer.messages; import io.bitsquare.app.Version; import io.bitsquare.p2p.Address; diff --git a/network/src/main/java/io/bitsquare/p2p/routing/messages/PingMessage.java b/network/src/main/java/io/bitsquare/p2p/peer/messages/PingMessage.java similarity index 92% rename from network/src/main/java/io/bitsquare/p2p/routing/messages/PingMessage.java rename to network/src/main/java/io/bitsquare/p2p/peer/messages/PingMessage.java index e4143aa635..8da67acc79 100644 --- a/network/src/main/java/io/bitsquare/p2p/routing/messages/PingMessage.java +++ b/network/src/main/java/io/bitsquare/p2p/peer/messages/PingMessage.java @@ -1,4 +1,4 @@ -package io.bitsquare.p2p.routing.messages; +package io.bitsquare.p2p.peer.messages; import io.bitsquare.app.Version; diff --git a/network/src/main/java/io/bitsquare/p2p/routing/messages/PongMessage.java b/network/src/main/java/io/bitsquare/p2p/peer/messages/PongMessage.java similarity index 92% rename from network/src/main/java/io/bitsquare/p2p/routing/messages/PongMessage.java rename to network/src/main/java/io/bitsquare/p2p/peer/messages/PongMessage.java index 93a1181047..a5ded72c8e 100644 --- a/network/src/main/java/io/bitsquare/p2p/routing/messages/PongMessage.java +++ b/network/src/main/java/io/bitsquare/p2p/peer/messages/PongMessage.java @@ -1,4 +1,4 @@ -package io.bitsquare.p2p.routing.messages; +package io.bitsquare.p2p.peer.messages; import io.bitsquare.app.Version; diff --git a/network/src/main/java/io/bitsquare/p2p/routing/messages/RequestAuthenticationMessage.java b/network/src/main/java/io/bitsquare/p2p/peer/messages/RequestAuthenticationMessage.java similarity index 94% rename from network/src/main/java/io/bitsquare/p2p/routing/messages/RequestAuthenticationMessage.java rename to network/src/main/java/io/bitsquare/p2p/peer/messages/RequestAuthenticationMessage.java index 8b7ec3b17b..ca16aff52a 100644 --- a/network/src/main/java/io/bitsquare/p2p/routing/messages/RequestAuthenticationMessage.java +++ b/network/src/main/java/io/bitsquare/p2p/peer/messages/RequestAuthenticationMessage.java @@ -1,4 +1,4 @@ -package io.bitsquare.p2p.routing.messages; +package io.bitsquare.p2p.peer.messages; import io.bitsquare.app.Version; import io.bitsquare.p2p.Address; diff --git a/network/src/main/java/io/bitsquare/p2p/storage/ProtectedExpirableDataStorage.java b/network/src/main/java/io/bitsquare/p2p/storage/ProtectedExpirableDataStorage.java index 458862a673..5af6fa4ad3 100644 --- a/network/src/main/java/io/bitsquare/p2p/storage/ProtectedExpirableDataStorage.java +++ b/network/src/main/java/io/bitsquare/p2p/storage/ProtectedExpirableDataStorage.java @@ -8,7 +8,7 @@ import io.bitsquare.common.crypto.Sig; import io.bitsquare.p2p.Address; import io.bitsquare.p2p.network.IllegalRequest; import io.bitsquare.p2p.network.MessageListener; -import io.bitsquare.p2p.routing.Routing; +import io.bitsquare.p2p.peer.PeerGroup; import io.bitsquare.p2p.storage.data.*; import io.bitsquare.p2p.storage.messages.*; import io.bitsquare.storage.Storage; @@ -30,7 +30,7 @@ public class ProtectedExpirableDataStorage { @VisibleForTesting public static int CHECK_TTL_INTERVAL = 10 * 60 * 1000; - private final Routing routing; + private final PeerGroup peerGroup; private final Map map = new ConcurrentHashMap<>(); private final List hashMapChangedListeners = new CopyOnWriteArrayList<>(); private ConcurrentHashMap sequenceNumberMap = new ConcurrentHashMap<>(); @@ -44,8 +44,8 @@ public class ProtectedExpirableDataStorage { // Constructor /////////////////////////////////////////////////////////////////////////////////////////// - public ProtectedExpirableDataStorage(Routing routing, File storageDir) { - this.routing = routing; + public ProtectedExpirableDataStorage(PeerGroup peerGroup, File storageDir) { + this.peerGroup = peerGroup; storage = new Storage<>(storageDir); @@ -103,7 +103,7 @@ public class ProtectedExpirableDataStorage { if (!shutDownInProgress) { shutDownInProgress = true; timer.cancel(); - routing.shutDown(); + peerGroup.shutDown(); } } @@ -231,7 +231,7 @@ public class ProtectedExpirableDataStorage { } public void addMessageListener(MessageListener messageListener) { - routing.addMessageListener(messageListener); + peerGroup.addMessageListener(messageListener); } @@ -324,7 +324,7 @@ public class ProtectedExpirableDataStorage { private void broadcast(BroadcastMessage message, @Nullable Address sender) { if (authenticated) { - routing.broadcast(message, sender); + peerGroup.broadcast(message, sender); log.trace("Broadcast message " + message); } else { log.trace("Broadcast not allowed because we are not authenticated yet. That is normal after received AllDataMessage at startup."); diff --git a/network/src/test/java/io/bitsquare/p2p/P2PServiceTest.java b/network/src/test/java/io/bitsquare/p2p/P2PServiceTest.java index e85fd41b61..4691b67d56 100644 --- a/network/src/test/java/io/bitsquare/p2p/P2PServiceTest.java +++ b/network/src/test/java/io/bitsquare/p2p/P2PServiceTest.java @@ -8,7 +8,7 @@ import io.bitsquare.p2p.messaging.MailboxMessage; import io.bitsquare.p2p.messaging.SendMailboxMessageListener; import io.bitsquare.p2p.mocks.MockMailboxMessage; import io.bitsquare.p2p.network.LocalhostNetworkNode; -import io.bitsquare.p2p.routing.Routing; +import io.bitsquare.p2p.peer.PeerGroup; import io.bitsquare.p2p.seed.SeedNode; import io.bitsquare.p2p.storage.data.DataAndSeqNr; import io.bitsquare.p2p.storage.data.ProtectedData; @@ -57,7 +57,7 @@ public class P2PServiceTest { LocalhostNetworkNode.setSimulateTorDelayTorNode(10); LocalhostNetworkNode.setSimulateTorDelayHiddenService(100); - Routing.setMaxConnections(8); + PeerGroup.setMaxConnections(8); keyRing1 = new KeyRing(new KeyStorage(dir1)); keyRing2 = new KeyRing(new KeyStorage(dir2)); diff --git a/network/src/test/java/io/bitsquare/p2p/network/LocalhostNetworkNodeTest.java b/network/src/test/java/io/bitsquare/p2p/network/LocalhostNetworkNodeTest.java index 8476d81df9..602f25f3a3 100644 --- a/network/src/test/java/io/bitsquare/p2p/network/LocalhostNetworkNodeTest.java +++ b/network/src/test/java/io/bitsquare/p2p/network/LocalhostNetworkNodeTest.java @@ -1,7 +1,7 @@ package io.bitsquare.p2p.network; import io.bitsquare.p2p.Address; -import io.bitsquare.p2p.routing.messages.RequestAuthenticationMessage; +import io.bitsquare.p2p.peer.messages.RequestAuthenticationMessage; import org.bouncycastle.jce.provider.BouncyCastleProvider; import org.junit.Before; import org.junit.Ignore; diff --git a/network/src/test/java/io/bitsquare/p2p/routing/RoutingTest.java b/network/src/test/java/io/bitsquare/p2p/routing/PeerGroupTest.java similarity index 88% rename from network/src/test/java/io/bitsquare/p2p/routing/RoutingTest.java rename to network/src/test/java/io/bitsquare/p2p/routing/PeerGroupTest.java index aa7f01e6af..52947b2c00 100644 --- a/network/src/test/java/io/bitsquare/p2p/routing/RoutingTest.java +++ b/network/src/test/java/io/bitsquare/p2p/routing/PeerGroupTest.java @@ -6,6 +6,8 @@ import io.bitsquare.p2p.P2PService; import io.bitsquare.p2p.P2PServiceListener; import io.bitsquare.p2p.network.Connection; import io.bitsquare.p2p.network.LocalhostNetworkNode; +import io.bitsquare.p2p.peer.AuthenticationListener; +import io.bitsquare.p2p.peer.PeerGroup; import io.bitsquare.p2p.seed.SeedNode; import org.junit.*; import org.slf4j.Logger; @@ -20,8 +22,8 @@ import java.util.concurrent.CountDownLatch; // need to define seed node addresses first before using tor version @Ignore -public class RoutingTest { - private static final Logger log = LoggerFactory.getLogger(RoutingTest.class); +public class PeerGroupTest { + private static final Logger log = LoggerFactory.getLogger(PeerGroupTest.class); boolean useLocalhost = true; private CountDownLatch latch; @@ -33,7 +35,7 @@ public class RoutingTest { public void setup() throws InterruptedException { LocalhostNetworkNode.setSimulateTorDelayTorNode(50); LocalhostNetworkNode.setSimulateTorDelayHiddenService(8); - Routing.setMaxConnections(100); + PeerGroup.setMaxConnections(100); seedNodes = new ArrayList<>(); if (useLocalhost) { @@ -107,7 +109,7 @@ public class RoutingTest { P2PService p2PService1 = seedNode1.getP2PService(); latch.await(); Thread.sleep(500); - Assert.assertEquals(0, p2PService1.getRouting().getAllPeerAddresses().size()); + Assert.assertEquals(0, p2PService1.getPeerGroup().getAllPeerAddresses().size()); } @Test @@ -180,8 +182,8 @@ public class RoutingTest { }); P2PService p2PService2 = seedNode2.getP2PService(); latch.await(); - Assert.assertEquals(1, p2PService1.getRouting().getAllPeerAddresses().size()); - Assert.assertEquals(1, p2PService2.getRouting().getAllPeerAddresses().size()); + Assert.assertEquals(1, p2PService1.getPeerGroup().getAllPeerAddresses().size()); + Assert.assertEquals(1, p2PService2.getPeerGroup().getAllPeerAddresses().size()); } // @Test @@ -214,7 +216,7 @@ public class RoutingTest { latch1.countDown(); } }; - seedNode1.getP2PService().getRouting().addRoutingListener(routingListener1); + seedNode1.getP2PService().getPeerGroup().addPeerListener(routingListener1); AuthenticationListener routingListener2 = new AuthenticationListener() { @Override @@ -223,10 +225,10 @@ public class RoutingTest { latch1.countDown(); } }; - seedNode2.getP2PService().getRouting().addRoutingListener(routingListener2); + seedNode2.getP2PService().getPeerGroup().addPeerListener(routingListener2); latch1.await(); - seedNode1.getP2PService().getRouting().removeRoutingListener(routingListener1); - seedNode2.getP2PService().getRouting().removeRoutingListener(routingListener2); + seedNode1.getP2PService().getPeerGroup().removePeerListener(routingListener1); + seedNode2.getP2PService().getPeerGroup().removePeerListener(routingListener2); // wait until Peers msg finished Thread.sleep(sleepTime); @@ -236,21 +238,21 @@ public class RoutingTest { // authentication from seedNode3 to seedNode2, then from seedNode2 to seedNode3 SeedNode seedNode3 = getAndStartSeedNode(8003); CountDownLatch latch2 = new CountDownLatch(3); - seedNode1.getP2PService().getRouting().addRoutingListener(new AuthenticationListener() { + seedNode1.getP2PService().getPeerGroup().addPeerListener(new AuthenticationListener() { @Override public void onConnectionAuthenticated(Connection connection) { log.debug("onConnectionAuthenticated " + connection); latch2.countDown(); } }); - seedNode2.getP2PService().getRouting().addRoutingListener(new AuthenticationListener() { + seedNode2.getP2PService().getPeerGroup().addPeerListener(new AuthenticationListener() { @Override public void onConnectionAuthenticated(Connection connection) { log.debug("onConnectionAuthenticated " + connection); latch2.countDown(); } }); - seedNode3.getP2PService().getRouting().addRoutingListener(new AuthenticationListener() { + seedNode3.getP2PService().getPeerGroup().addPeerListener(new AuthenticationListener() { @Override public void onConnectionAuthenticated(Connection connection) { log.debug("onConnectionAuthenticated " + connection); @@ -295,7 +297,7 @@ public class RoutingTest { latch1.countDown(); } }; - seedNode1.getP2PService().getRouting().addRoutingListener(routingListener1); + seedNode1.getP2PService().getPeerGroup().addPeerListener(routingListener1); AuthenticationListener routingListener2 = new AuthenticationListener() { @Override @@ -304,13 +306,13 @@ public class RoutingTest { latch1.countDown(); } }; - seedNode2.getP2PService().getRouting().addRoutingListener(routingListener2); + seedNode2.getP2PService().getPeerGroup().addPeerListener(routingListener2); latch1.await(); // shut down node 2 Thread.sleep(sleepTime); - seedNode1.getP2PService().getRouting().removeRoutingListener(routingListener1); - seedNode2.getP2PService().getRouting().removeRoutingListener(routingListener2); + seedNode1.getP2PService().getPeerGroup().removePeerListener(routingListener1); + seedNode2.getP2PService().getPeerGroup().removePeerListener(routingListener2); CountDownLatch shutDownLatch1 = new CountDownLatch(1); seedNode2.shutDown(() -> shutDownLatch1.countDown()); shutDownLatch1.await(); @@ -325,7 +327,7 @@ public class RoutingTest { latch3.countDown(); } }; - seedNode2.getP2PService().getRouting().addRoutingListener(routingListener2); + seedNode2.getP2PService().getPeerGroup().addPeerListener(routingListener2); latch3.await(); Thread.sleep(sleepTime); @@ -347,7 +349,7 @@ public class RoutingTest { latch = new CountDownLatch(i * 2); authentications += (i * 2); - node.getP2PService().getRouting().addRoutingListener(new AuthenticationListener() { + node.getP2PService().getPeerGroup().addPeerListener(new AuthenticationListener() { @Override public void onConnectionAuthenticated(Connection connection) { log.debug("onConnectionAuthenticated " + connection); @@ -364,8 +366,8 @@ public class RoutingTest { // total authentications at com nodes = 90, System load (nr. threads/used memory (MB)): 170/20 // total authentications at 20 nodes = 380, System load (nr. threads/used memory (MB)): 525/46 for (int i = 0; i < length; i++) { - nodes[i].getP2PService().getRouting().printConnectedPeersMap(); - nodes[i].getP2PService().getRouting().printReportedPeersMap(); + nodes[i].getP2PService().getPeerGroup().printConnectedPeersMap(); + nodes[i].getP2PService().getPeerGroup().printReportedPeersMap(); } CountDownLatch shutDownLatch = new CountDownLatch(length); diff --git a/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java b/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java index e691ef5f89..a481ed54c2 100644 --- a/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java +++ b/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java @@ -9,7 +9,7 @@ import io.bitsquare.p2p.Address; import io.bitsquare.p2p.TestUtils; import io.bitsquare.p2p.mocks.MockMessage; import io.bitsquare.p2p.network.NetworkNode; -import io.bitsquare.p2p.routing.Routing; +import io.bitsquare.p2p.peer.PeerGroup; import io.bitsquare.p2p.storage.data.DataAndSeqNr; import io.bitsquare.p2p.storage.data.ExpirableMailboxPayload; import io.bitsquare.p2p.storage.data.ProtectedData; @@ -36,7 +36,7 @@ public class ProtectedDataStorageTest { boolean useClearNet = true; private ArrayList
seedNodes = new ArrayList<>(); private NetworkNode networkNode1; - private Routing routing1; + private PeerGroup peerGroup1; private EncryptionService encryptionService1, encryptionService2; private ProtectedExpirableDataStorage dataStorage1; private KeyPair storageSignatureKeyPair1, storageSignatureKeyPair2; @@ -64,8 +64,8 @@ public class ProtectedDataStorageTest { storageSignatureKeyPair1 = keyRing1.getSignatureKeyPair(); encryptionService1 = new EncryptionService(keyRing1); networkNode1 = TestUtils.getAndStartSeedNode(8001, encryptionService1, keyRing1, useClearNet, seedNodes).getP2PService().getNetworkNode(); - routing1 = new Routing(networkNode1, seedNodes); - dataStorage1 = new ProtectedExpirableDataStorage(routing1, new File("dummy")); + peerGroup1 = new PeerGroup(networkNode1, seedNodes); + dataStorage1 = new ProtectedExpirableDataStorage(peerGroup1, new File("dummy")); // for mailbox keyRing2 = new KeyRing(new KeyStorage(dir2)); @@ -80,7 +80,7 @@ public class ProtectedDataStorageTest { public void tearDown() throws InterruptedException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException { Thread.sleep(sleepTime); if (dataStorage1 != null) dataStorage1.shutDown(); - if (routing1 != null) routing1.shutDown(); + if (peerGroup1 != null) peerGroup1.shutDown(); if (networkNode1 != null) { CountDownLatch shutDownLatch = new CountDownLatch(1); @@ -107,7 +107,7 @@ public class ProtectedDataStorageTest { public void testExpirableData() throws InterruptedException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException, NoSuchProviderException { ProtectedExpirableDataStorage.CHECK_TTL_INTERVAL = 10; // CHECK_TTL_INTERVAL is used in constructor of ProtectedExpirableDataStorage so we recreate it here - dataStorage1 = new ProtectedExpirableDataStorage(routing1, new File("dummy")); + dataStorage1 = new ProtectedExpirableDataStorage(peerGroup1, new File("dummy")); mockData.ttl = 50; ProtectedData data = dataStorage1.getDataWithSignedSeqNr(mockData, storageSignatureKeyPair1);