diff --git a/network/src/main/java/io/bitsquare/p2p/P2PService.java b/network/src/main/java/io/bitsquare/p2p/P2PService.java index 8a4cbb0284..1e9ee0257a 100644 --- a/network/src/main/java/io/bitsquare/p2p/P2PService.java +++ b/network/src/main/java/io/bitsquare/p2p/P2PService.java @@ -15,6 +15,7 @@ import io.bitsquare.crypto.EncryptionService; import io.bitsquare.crypto.SealedAndSignedMessage; import io.bitsquare.p2p.messaging.*; import io.bitsquare.p2p.network.*; +import io.bitsquare.p2p.peers.AuthenticationListener; import io.bitsquare.p2p.peers.PeerGroup; import io.bitsquare.p2p.peers.RequestDataManager; import io.bitsquare.p2p.seed.SeedNodesRepository; @@ -41,17 +42,15 @@ import java.util.concurrent.CopyOnWriteArraySet; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -public class P2PService implements SetupListener, MessageListener, ConnectionListener, HashMapChangedListener { +public class P2PService implements SetupListener, MessageListener, ConnectionListener, HashMapChangedListener, AuthenticationListener { private static final Logger log = LoggerFactory.getLogger(P2PService.class); private final SeedNodesRepository seedNodesRepository; private final int port; private final File torDir; private final boolean useLocalhost; - @Nullable - private final EncryptionService encryptionService; - @Nullable - private final KeyRing keyRing; + private final Optional optionalEncryptionService; + private final Optional optionalKeyRing; // set in init private NetworkNode networkNode; @@ -96,8 +95,9 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis this.port = port; this.torDir = torDir; this.useLocalhost = useLocalhost; - this.encryptionService = encryptionService; - this.keyRing = keyRing; + + optionalEncryptionService = encryptionService == null ? Optional.empty() : Optional.of(encryptionService); + optionalKeyRing = keyRing == null ? Optional.empty() : Optional.of(keyRing); dbStorage = new Storage<>(storageDir); @@ -131,6 +131,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis // peer group peerGroup = new PeerGroup(networkNode); + peerGroup.addAuthenticationListener(this); if (useLocalhost) PeerGroup.setSimulateAuthTorNode(200); @@ -138,7 +139,23 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis dataStorage = new P2PDataStorage(peerGroup, storageDir); dataStorage.addHashMapChangedListener(this); + // Request initial data manager + requestDataManager = new RequestDataManager(networkNode, dataStorage, new RequestDataManager.Listener() { + @Override + public void onNoSeedNodeAvailable() { + p2pServiceListeners.stream().forEach(e -> e.onNoSeedNodeAvailable()); + } + @Override + public void onDataReceived(Address seedNode) { + connectedSeedNode = seedNode; + requestingDataCompleted.set(true); + p2pServiceListeners.stream().forEach(e -> e.onRequestingDataCompleted()); + } + }); + peerGroup.addAuthenticationListener(requestDataManager); + + // Test multiple states to check when we are ready for authenticateSeedNode readyForAuthentication = EasyBind.combine(hiddenServicePublished, requestingDataCompleted, firstPeerAuthenticated, (hiddenServicePublished, requestingDataCompleted, firstPeerAuthenticated) -> hiddenServicePublished && requestingDataCompleted && !firstPeerAuthenticated); @@ -157,7 +174,10 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis public void startAsSeedNode(Address mySeedNodeAddress, @Nullable P2PServiceListener listener) { Log.traceCall(); + + // we remove ourselves from the list of seed nodes seedNodeAddresses.remove(mySeedNodeAddress); + start(listener); } @@ -199,33 +219,30 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis } + /** + * Bootstrap sequence: + *

+ * Variant 1 (normal expected mode): + * onTorNodeReady -> requestDataManager.requestData() + * RequestDataManager.Listener.onDataReceived && onHiddenServicePublished -> authenticateSeedNode() + * RequestDataManager.onPeerAddressAuthenticated -> RequestDataManager.requestDataFromAuthenticatedSeedNode() + *

+ * Variant 2 (no seed node available): + * onTorNodeReady -> requestDataManager.requestData + * RequestDataManager.Listener.onNoSeedNodeAvailable && onHiddenServicePublished -> retry after 20-30 until + * seed node is available and data can be retrieved + * RequestDataManager.Listener.onDataReceived && onHiddenServicePublished -> authenticateSeedNode() + * RequestDataManager.onPeerAddressAuthenticated -> RequestDataManager.requestDataFromAuthenticatedSeedNode() + */ + /////////////////////////////////////////////////////////////////////////////////////////// // SetupListener implementation /////////////////////////////////////////////////////////////////////////////////////////// - @Override public void onTorNodeReady() { Log.traceCall(); - p2pServiceListeners.stream().forEach(e -> e.onTorNodeReady()); - - // 1. Step: As soon we have the tor node ready (hidden service still not available) we request the - // data set from a random seed node. - requestDataManager = new RequestDataManager(networkNode, dataStorage, new RequestDataManager.Listener() { - @Override - public void onNoSeedNodeAvailable() { - // 2b. or 3b Step: If no seed node available we keep trying again after a random pause - p2pServiceListeners.stream().forEach(e -> e.onNoSeedNodeAvailable()); - } - - @Override - public void onDataReceived(Address seedNode) { - // 2a. or 3a Step: We received initial data set - connectedSeedNode = seedNode; - requestingDataCompleted.set(true); - p2pServiceListeners.stream().forEach(e -> e.onRequestingDataCompleted()); - } - }); requestDataManager.requestData(seedNodeAddresses); + p2pServiceListeners.stream().forEach(e -> e.onTorNodeReady()); } @Override @@ -240,7 +257,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis dbStorage.queueUpForSave(myOnionAddress); } - // 3. (or 2.). Step: Hidden service is published hiddenServicePublished.set(true); p2pServiceListeners.stream().forEach(e -> e.onHiddenServicePublished()); @@ -252,16 +268,55 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis p2pServiceListeners.stream().forEach(e -> e.onSetupFailed(throwable)); } - // 4. Step: hiddenServicePublished and requestingDataCompleted. We start authenticate to the connected seed node. private void authenticateSeedNode() { Log.traceCall(); checkNotNull(connectedSeedNode != null, "connectedSeedNode must not be null"); peerGroup.authenticateSeedNode(connectedSeedNode); } - // 5. Step: in RequestDataManager (after authentication to first seed node we request again the data) + + /////////////////////////////////////////////////////////////////////////////////////////// + // ConnectionListener implementation + /////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public void onConnection(Connection connection) { + } + + @Override + public void onDisconnect(Reason reason, Connection connection) { + Log.traceCall(); + if (connection.isAuthenticated()) + authenticatedPeerAddresses.remove(connection.getPeerAddress()); + + numAuthenticatedPeers.set(authenticatedPeerAddresses.size()); + } + + @Override + public void onError(Throwable throwable) { + } + /////////////////////////////////////////////////////////////////////////////////////////// + // AuthenticationListener implementation + /////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) { + Log.traceCall(); + checkArgument(peerAddress.equals(connection.getPeerAddress()), + "peerAddress must match connection.getPeerAddress()"); + authenticatedPeerAddresses.add(peerAddress); + + if (!firstPeerAuthenticated.get()) { + firstPeerAuthenticated.set(true); + p2pServiceListeners.stream().forEach(e -> e.onFirstPeerAuthenticated()); + } + + numAuthenticatedPeers.set(authenticatedPeerAddresses.size()); + } + + /////////////////////////////////////////////////////////////////////////////////////////// // MessageListener implementation /////////////////////////////////////////////////////////////////////////////////////////// @@ -271,11 +326,11 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis if (message instanceof SealedAndSignedMessage) { Log.traceCall(message.toString()); // Seed nodes don't have set the encryptionService - if (encryptionService != null) { + if (optionalEncryptionService.isPresent()) { try { SealedAndSignedMessage sealedAndSignedMessage = (SealedAndSignedMessage) message; if (verifyAddressPrefixHash(sealedAndSignedMessage)) { - DecryptedMsgWithPubKey decryptedMsgWithPubKey = encryptionService.decryptAndVerify( + DecryptedMsgWithPubKey decryptedMsgWithPubKey = optionalEncryptionService.get().decryptAndVerify( sealedAndSignedMessage.sealedAndSigned); // We set connectionType to that connection to avoid that is get closed when @@ -298,46 +353,10 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis } - /////////////////////////////////////////////////////////////////////////////////////////// - // ConnectionListener implementation - /////////////////////////////////////////////////////////////////////////////////////////// - - @Override - public void onConnection(Connection connection) { - } - - @Override - public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) { - Log.traceCall(); - checkArgument(peerAddress.equals(connection.getPeerAddress()), - "peerAddress must match connection.getPeerAddress()"); - authenticatedPeerAddresses.add(peerAddress); - - if (!firstPeerAuthenticated.get()) { - firstPeerAuthenticated.set(true); - p2pServiceListeners.stream().forEach(e -> e.onFirstPeerAuthenticated()); - } - - numAuthenticatedPeers.set(authenticatedPeerAddresses.size()); - } - - @Override - public void onDisconnect(Reason reason, Connection connection) { - Log.traceCall(); - if (connection.isAuthenticated()) - authenticatedPeerAddresses.remove(connection.getPeerAddress()); - - numAuthenticatedPeers.set(authenticatedPeerAddresses.size()); - } - - @Override - public void onError(Throwable throwable) { - } - - /////////////////////////////////////////////////////////////////////////////////////////// // HashMapChangedListener implementation /////////////////////////////////////////////////////////////////////////////////////////// + @Override public void onAdded(ProtectedData entry) { if (entry instanceof ProtectedMailboxData) @@ -370,27 +389,26 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis private void doSendEncryptedMailMessage(Address peerAddress, PubKeyRing pubKeyRing, MailMessage message, SendMailMessageListener sendMailMessageListener) { Log.traceCall(); - if (encryptionService != null) { - try { - SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage( - encryptionService.encryptAndSign(pubKeyRing, message), peerAddress.getAddressPrefixHash()); - SettableFuture future = networkNode.sendMessage(peerAddress, sealedAndSignedMessage); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(@Nullable Connection connection) { - sendMailMessageListener.onArrived(); - } + checkArgument(optionalEncryptionService.isPresent(), "EncryptionService not set. Seems that is called on a seed node which must not happen."); + try { + SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage( + optionalEncryptionService.get().encryptAndSign(pubKeyRing, message), peerAddress.getAddressPrefixHash()); + SettableFuture future = networkNode.sendMessage(peerAddress, sealedAndSignedMessage); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable Connection connection) { + sendMailMessageListener.onArrived(); + } - @Override - public void onFailure(@NotNull Throwable throwable) { - throwable.printStackTrace(); - sendMailMessageListener.onFault(); - } - }); - } catch (CryptoException e) { - e.printStackTrace(); - sendMailMessageListener.onFault(); - } + @Override + public void onFailure(@NotNull Throwable throwable) { + throwable.printStackTrace(); + sendMailMessageListener.onFault(); + } + }); + } catch (CryptoException e) { + e.printStackTrace(); + sendMailMessageListener.onFault(); } } @@ -400,7 +418,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis /////////////////////////////////////////////////////////////////////////////////////////// private void processProtectedMailboxData(ProtectedMailboxData mailboxData) { - if (encryptionService != null) { + // Seed nodes don't have set the encryptionService + if (optionalEncryptionService.isPresent()) { Log.traceCall(); ExpirablePayload expirablePayload = mailboxData.expirablePayload; if (expirablePayload instanceof ExpirableMailboxPayload) { @@ -408,7 +427,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis SealedAndSignedMessage sealedAndSignedMessage = expirableMailboxPayload.sealedAndSignedMessage; if (verifyAddressPrefixHash(sealedAndSignedMessage)) { try { - DecryptedMsgWithPubKey decryptedMsgWithPubKey = encryptionService.decryptAndVerify( + DecryptedMsgWithPubKey decryptedMsgWithPubKey = optionalEncryptionService.get().decryptAndVerify( sealedAndSignedMessage.sealedAndSigned); if (decryptedMsgWithPubKey.message instanceof MailboxMessage) { MailboxMessage mailboxMessage = (MailboxMessage) decryptedMsgWithPubKey.message; @@ -439,7 +458,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis MailboxMessage message, SendMailboxMessageListener sendMailboxMessageListener) { Log.traceCall(); checkNotNull(peerAddress, "PeerAddress must not be null (sendEncryptedMailboxMessage)"); - checkArgument(!keyRing.getPubKeyRing().equals(peersPubKeyRing), "We got own keyring instead of that from peer"); + checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen."); + checkArgument(!optionalKeyRing.get().getPubKeyRing().equals(peersPubKeyRing), "We got own keyring instead of that from peer"); checkAuthentication(); if (authenticatedPeerAddresses.contains(peerAddress)) { @@ -458,40 +478,86 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis private void trySendEncryptedMailboxMessage(Address peerAddress, PubKeyRing peersPubKeyRing, MailboxMessage message, SendMailboxMessageListener sendMailboxMessageListener) { Log.traceCall(); - if (encryptionService != null) { - try { - SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage( - encryptionService.encryptAndSign(peersPubKeyRing, message), peerAddress.getAddressPrefixHash()); - SettableFuture future = networkNode.sendMessage(peerAddress, sealedAndSignedMessage); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(@Nullable Connection connection) { - log.trace("SendEncryptedMailboxMessage onSuccess"); - sendMailboxMessageListener.onArrived(); - } + checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen."); + checkArgument(optionalEncryptionService.isPresent(), "EncryptionService not set. Seems that is called on a seed node which must not happen."); + try { + SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage( + optionalEncryptionService.get().encryptAndSign(peersPubKeyRing, message), peerAddress.getAddressPrefixHash()); + SettableFuture future = networkNode.sendMessage(peerAddress, sealedAndSignedMessage); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable Connection connection) { + log.trace("SendEncryptedMailboxMessage onSuccess"); + sendMailboxMessageListener.onArrived(); + } - @Override - public void onFailure(@NotNull Throwable throwable) { - log.trace("SendEncryptedMailboxMessage onFailure"); - log.debug(throwable.toString()); - log.info("We cannot send message to peer. Peer might be offline. We will store message in mailbox."); - log.trace("create MailboxEntry with peerAddress " + peerAddress); - PublicKey receiverStoragePublicKey = peersPubKeyRing.getSignaturePubKey(); - addMailboxData(new ExpirableMailboxPayload(sealedAndSignedMessage, - keyRing.getSignatureKeyPair().getPublic(), - receiverStoragePublicKey), - receiverStoragePublicKey); - sendMailboxMessageListener.onStoredInMailbox(); - } - }); - } catch (CryptoException e) { - log.error("sendEncryptedMessage failed"); - e.printStackTrace(); - sendMailboxMessageListener.onFault(); - } + @Override + public void onFailure(@NotNull Throwable throwable) { + log.trace("SendEncryptedMailboxMessage onFailure"); + log.debug(throwable.toString()); + log.info("We cannot send message to peer. Peer might be offline. We will store message in mailbox."); + log.trace("create MailboxEntry with peerAddress " + peerAddress); + PublicKey receiverStoragePublicKey = peersPubKeyRing.getSignaturePubKey(); + addMailboxData(new ExpirableMailboxPayload(sealedAndSignedMessage, + optionalKeyRing.get().getSignatureKeyPair().getPublic(), + receiverStoragePublicKey), + receiverStoragePublicKey); + sendMailboxMessageListener.onStoredInMailbox(); + } + }); + } catch (CryptoException e) { + log.error("sendEncryptedMessage failed"); + e.printStackTrace(); + sendMailboxMessageListener.onFault(); } } + private void addMailboxData(ExpirableMailboxPayload expirableMailboxPayload, PublicKey receiversPublicKey) { + Log.traceCall(); + checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen."); + checkAuthentication(); + try { + ProtectedMailboxData protectedMailboxData = dataStorage.getMailboxDataWithSignedSeqNr( + expirableMailboxPayload, + optionalKeyRing.get().getSignatureKeyPair(), + receiversPublicKey); + dataStorage.add(protectedMailboxData, networkNode.getAddress()); + } catch (CryptoException e) { + log.error("Signing at getDataWithSignedSeqNr failed. That should never happen."); + } + } + + public void removeEntryFromMailbox(DecryptedMsgWithPubKey decryptedMsgWithPubKey) { + Log.traceCall(); + checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen."); + checkAuthentication(); + if (mailboxMap.containsKey(decryptedMsgWithPubKey)) { + ProtectedMailboxData mailboxData = mailboxMap.get(decryptedMsgWithPubKey); + if (mailboxData != null && mailboxData.expirablePayload instanceof ExpirableMailboxPayload) { + ExpirableMailboxPayload expirableMailboxPayload = (ExpirableMailboxPayload) mailboxData.expirablePayload; + PublicKey receiversPubKey = mailboxData.receiversPubKey; + checkArgument(receiversPubKey.equals(optionalKeyRing.get().getSignatureKeyPair().getPublic()), + "receiversPubKey is not matching with our key. That must not happen."); + try { + ProtectedMailboxData protectedMailboxData = dataStorage.getMailboxDataWithSignedSeqNr( + expirableMailboxPayload, + optionalKeyRing.get().getSignatureKeyPair(), + receiversPubKey); + dataStorage.removeMailboxData(protectedMailboxData, networkNode.getAddress()); + } catch (CryptoException e) { + log.error("Signing at getDataWithSignedSeqNr failed. That should never happen."); + } + + mailboxMap.remove(decryptedMsgWithPubKey); + log.trace("Removed successfully decryptedMsgWithPubKey."); + } + } else { + log.warn("decryptedMsgWithPubKey not found in mailboxMap. That should never happen." + + "\ndecryptedMsgWithPubKey={}\nmailboxMap={}", decryptedMsgWithPubKey, mailboxMap); + } + } + + /////////////////////////////////////////////////////////////////////////////////////////// // Data storage /////////////////////////////////////////////////////////////////////////////////////////// @@ -508,10 +574,10 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis private boolean doAddData(ExpirablePayload expirablePayload, boolean rePublish) { Log.traceCall(); + checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen."); checkAuthentication(); - try { - ProtectedData protectedData = dataStorage.getDataWithSignedSeqNr(expirablePayload, keyRing.getSignatureKeyPair()); + ProtectedData protectedData = dataStorage.getDataWithSignedSeqNr(expirablePayload, optionalKeyRing.get().getSignatureKeyPair()); if (rePublish) return dataStorage.rePublish(protectedData, networkNode.getAddress()); else @@ -522,27 +588,12 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis } } - private void addMailboxData(ExpirableMailboxPayload expirableMailboxPayload, PublicKey receiversPublicKey) { - Log.traceCall(); - checkAuthentication(); - - try { - ProtectedMailboxData protectedMailboxData = dataStorage.getMailboxDataWithSignedSeqNr( - expirableMailboxPayload, - keyRing.getSignatureKeyPair(), - receiversPublicKey); - dataStorage.add(protectedMailboxData, networkNode.getAddress()); - } catch (CryptoException e) { - log.error("Signing at getDataWithSignedSeqNr failed. That should never happen."); - } - } - public boolean removeData(ExpirablePayload expirablePayload) { Log.traceCall(); + checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen."); checkAuthentication(); - try { - ProtectedData protectedData = dataStorage.getDataWithSignedSeqNr(expirablePayload, keyRing.getSignatureKeyPair()); + ProtectedData protectedData = dataStorage.getDataWithSignedSeqNr(expirablePayload, optionalKeyRing.get().getSignatureKeyPair()); return dataStorage.remove(protectedData, networkNode.getAddress()); } catch (CryptoException e) { log.error("Signing at getDataWithSignedSeqNr failed. That should never happen."); @@ -550,40 +601,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis } } - public void removeEntryFromMailbox(DecryptedMsgWithPubKey decryptedMsgWithPubKey) { - Log.traceCall(); - checkAuthentication(); - - if (mailboxMap.containsKey(decryptedMsgWithPubKey)) { - ProtectedMailboxData mailboxData = mailboxMap.get(decryptedMsgWithPubKey); - if (mailboxData != null && mailboxData.expirablePayload instanceof ExpirableMailboxPayload) { - ExpirableMailboxPayload expirableMailboxPayload = (ExpirableMailboxPayload) mailboxData.expirablePayload; - PublicKey receiversPubKey = mailboxData.receiversPubKey; - checkArgument(receiversPubKey.equals(keyRing.getSignatureKeyPair().getPublic()), - "receiversPubKey is not matching with our key. That must not happen."); - try { - ProtectedMailboxData protectedMailboxData = dataStorage.getMailboxDataWithSignedSeqNr( - expirableMailboxPayload, - keyRing.getSignatureKeyPair(), - receiversPubKey); - dataStorage.removeMailboxData(protectedMailboxData, networkNode.getAddress()); - } catch (CryptoException e) { - log.error("Signing at getDataWithSignedSeqNr failed. That should never happen."); - } - - mailboxMap.remove(decryptedMsgWithPubKey); - log.trace("Removed successfully decryptedMsgWithPubKey."); - } - } else { - log.warn("decryptedMsgWithPubKey not found in mailboxMap. That should never happen." + - "\ndecryptedMsgWithPubKey={}\nmailboxMap={}", decryptedMsgWithPubKey, mailboxMap); - } - } - - public Map getDataMap() { - return dataStorage.getMap(); - } - /////////////////////////////////////////////////////////////////////////////////////////// // Listeners @@ -646,15 +663,15 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis return numAuthenticatedPeers; } + public Map getDataMap() { + return dataStorage.getMap(); + } + /////////////////////////////////////////////////////////////////////////////////////////// // Private /////////////////////////////////////////////////////////////////////////////////////////// - private HashSet getDataSet() { - return new HashSet<>(getDataMap().values()); - } - private boolean verifyAddressPrefixHash(SealedAndSignedMessage sealedAndSignedMessage) { if (myOnionAddress != null) { byte[] blurredAddressHash = myOnionAddress.getAddressPrefixHash(); diff --git a/network/src/main/java/io/bitsquare/p2p/network/Connection.java b/network/src/main/java/io/bitsquare/p2p/network/Connection.java index 8552846586..014c496f66 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/Connection.java +++ b/network/src/main/java/io/bitsquare/p2p/network/Connection.java @@ -120,8 +120,6 @@ public class Connection implements MessageListener { Log.traceCall(); this.peerAddress = peerAddress; isAuthenticated = true; - if (!stopped) - connectionListener.onPeerAddressAuthenticated(peerAddress, connection); } public void setConnectionPriority(ConnectionPriority connectionPriority) { diff --git a/network/src/main/java/io/bitsquare/p2p/network/ConnectionListener.java b/network/src/main/java/io/bitsquare/p2p/network/ConnectionListener.java index aed7d2fd9f..9ff93f6a6b 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/ConnectionListener.java +++ b/network/src/main/java/io/bitsquare/p2p/network/ConnectionListener.java @@ -1,8 +1,6 @@ package io.bitsquare.p2p.network; -import io.bitsquare.p2p.Address; - public interface ConnectionListener { enum Reason { SOCKET_CLOSED, @@ -15,8 +13,6 @@ public interface ConnectionListener { void onConnection(Connection connection); - void onPeerAddressAuthenticated(Address peerAddress, Connection connection); - void onDisconnect(Reason reason, Connection connection); void onError(Throwable throwable); diff --git a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java index 0629e8ae01..5da0dfb149 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java +++ b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java @@ -230,14 +230,6 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener connectionListeners.stream().forEach(e -> e.onConnection(connection)); } - @Override - public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) { - Log.traceCall(); - log.trace("onAuthenticationComplete peerAddress/connection: " + peerAddress + " / " + connection); - - connectionListeners.stream().forEach(e -> e.onPeerAddressAuthenticated(peerAddress, connection)); - } - @Override public void onDisconnect(Reason reason, Connection connection) { Log.traceCall(); @@ -320,12 +312,6 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener NetworkNode.this.onConnection(connection); } - @Override - public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) { - Log.traceCall(); - NetworkNode.this.onPeerAddressAuthenticated(peerAddress, connection); - } - @Override public void onDisconnect(Reason reason, Connection connection) { Log.traceCall(); diff --git a/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationListener.java b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationListener.java new file mode 100644 index 0000000000..7c65ff3ecb --- /dev/null +++ b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationListener.java @@ -0,0 +1,8 @@ +package io.bitsquare.p2p.peers; + +import io.bitsquare.p2p.Address; +import io.bitsquare.p2p.network.Connection; + +public interface AuthenticationListener { + void onPeerAddressAuthenticated(Address peerAddress, Connection connection); +} diff --git a/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java index 3abcb9a900..b806940e02 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java @@ -18,6 +18,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -40,19 +41,18 @@ public class PeerGroup implements MessageListener, ConnectionListener { MAX_CONNECTIONS_LOW_PRIO = maxConnectionsLowPrio; } - private static final int PING_AFTER_CONNECTION_INACTIVITY = 30 * 1000; + private static final int INACTIVITY_PERIOD_BEFORE_PING = 30 * 1000; private static final int MAX_REPORTED_PEERS = 1000; private final NetworkNode networkNode; - private Set

seedNodeAddresses; - + private final CopyOnWriteArraySet authenticationListeners = new CopyOnWriteArraySet<>(); private final Map authenticatedPeers = new HashMap<>(); private final Set reportedPeers = new HashSet<>(); private final Map authenticationHandshakes = new HashMap<>(); + private Timer sendPingTimer; + private Timer getPeersTimer; - private Timer sendPingTimer = new Timer(); - private Timer getPeersTimer = new Timer(); - + private Set
seedNodeAddresses; private boolean shutDownInProgress; @@ -72,10 +72,29 @@ public class PeerGroup implements MessageListener, ConnectionListener { startGetPeersTimer(); } - public void setSeedNodeAddresses(Set
seedNodeAddresses) { - this.seedNodeAddresses = seedNodeAddresses; + public void addAuthenticationListener(AuthenticationListener listener) { + authenticationListeners.add(listener); } + /////////////////////////////////////////////////////////////////////////////////////////// + // ConnectionListener implementation + /////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public void onConnection(Connection connection) { + } + + @Override + public void onDisconnect(Reason reason, Connection connection) { + log.debug("onDisconnect connection=" + connection + " / reason=" + reason); + removePeer(connection.getPeerAddress()); + } + + @Override + public void onError(Throwable throwable) { + } + + /////////////////////////////////////////////////////////////////////////////////////////// // MessageListener implementation /////////////////////////////////////////////////////////////////////////////////////////// @@ -89,33 +108,14 @@ public class PeerGroup implements MessageListener, ConnectionListener { } - /////////////////////////////////////////////////////////////////////////////////////////// - // ConnectionListener implementation - /////////////////////////////////////////////////////////////////////////////////////////// - - @Override - public void onConnection(Connection connection) { - } - - @Override - public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) { - } - - @Override - public void onDisconnect(Reason reason, Connection connection) { - log.debug("onDisconnect connection=" + connection + " / reason=" + reason); - removePeer(connection.getPeerAddress()); - } - - @Override - public void onError(Throwable throwable) { - } - - /////////////////////////////////////////////////////////////////////////////////////////// // API /////////////////////////////////////////////////////////////////////////////////////////// + public void setSeedNodeAddresses(Set
seedNodeAddresses) { + this.seedNodeAddresses = seedNodeAddresses; + } + public void broadcast(DataBroadcastMessage message, @Nullable Address sender) { Log.traceCall("Sender " + sender + ". Message " + message.toString()); if (authenticatedPeers.values().size() > 0) { @@ -442,6 +442,7 @@ public class PeerGroup implements MessageListener, ConnectionListener { addAuthenticatedPeer(new Peer(connection)); connection.setAuthenticated(peerAddress, connection); + authenticationListeners.stream().forEach(e -> e.onPeerAddressAuthenticated(peerAddress, connection)); } private void addAuthenticatedPeer(Peer peer) { @@ -455,6 +456,7 @@ public class PeerGroup implements MessageListener, ConnectionListener { printAuthenticatedPeers(); } + /////////////////////////////////////////////////////////////////////////////////////////// // Maintenance /////////////////////////////////////////////////////////////////////////////////////////// @@ -542,7 +544,7 @@ public class PeerGroup implements MessageListener, ConnectionListener { if (!connectedPeersList.isEmpty()) { Log.traceCall(); connectedPeersList.stream() - .filter(e -> (new Date().getTime() - e.connection.getLastActivityDate().getTime()) > PING_AFTER_CONNECTION_INACTIVITY) + .filter(e -> (new Date().getTime() - e.connection.getLastActivityDate().getTime()) > INACTIVITY_PERIOD_BEFORE_PING) .forEach(e -> UserThread.runAfterRandomDelay(() -> { SettableFuture future = networkNode.sendMessage(e.connection, new PingMessage(e.getPingNonce())); Futures.addCallback(future, new FutureCallback() { diff --git a/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java b/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java index 53fc28e93a..9279164e82 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java @@ -8,7 +8,6 @@ import io.bitsquare.common.UserThread; import io.bitsquare.p2p.Address; import io.bitsquare.p2p.Message; import io.bitsquare.p2p.network.Connection; -import io.bitsquare.p2p.network.ConnectionListener; import io.bitsquare.p2p.network.MessageListener; import io.bitsquare.p2p.network.NetworkNode; import io.bitsquare.p2p.storage.P2PDataStorage; @@ -25,7 +24,7 @@ import java.util.concurrent.TimeUnit; import static com.google.common.base.Preconditions.checkArgument; -public class RequestDataManager implements MessageListener, ConnectionListener { +public class RequestDataManager implements MessageListener, AuthenticationListener { private static final Logger log = LoggerFactory.getLogger(RequestDataManager.class); /////////////////////////////////////////////////////////////////////////////////////////// @@ -56,7 +55,6 @@ public class RequestDataManager implements MessageListener, ConnectionListener { this.listener = listener; networkNode.addMessageListener(this); - networkNode.addConnectionListener(this); } /////////////////////////////////////////////////////////////////////////////////////////// @@ -129,13 +127,11 @@ public class RequestDataManager implements MessageListener, ConnectionListener { listener.onDataReceived(connectedSeedNodeAddress); } } - /////////////////////////////////////////////////////////////////////////////////////////// - // ConnectionListener implementation - /////////////////////////////////////////////////////////////////////////////////////////// - @Override - public void onConnection(Connection connection) { - } + + /////////////////////////////////////////////////////////////////////////////////////////// + // AuthenticationListener implementation + /////////////////////////////////////////////////////////////////////////////////////////// @Override public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) { @@ -143,13 +139,6 @@ public class RequestDataManager implements MessageListener, ConnectionListener { requestDataFromAuthenticatedSeedNode(peerAddress, connection); } - @Override - public void onDisconnect(Reason reason, Connection connection) { - } - - @Override - public void onError(Throwable throwable) { - } // 5. Step after authentication to first seed node we request again the data private void requestDataFromAuthenticatedSeedNode(Address peerAddress, Connection connection) {