Add AuthenticationListener

This commit is contained in:
Manfred Karrer 2015-12-22 10:39:59 +01:00
parent 2d518f9a16
commit 0d751eb561
7 changed files with 243 additions and 247 deletions

View File

@ -15,6 +15,7 @@ import io.bitsquare.crypto.EncryptionService;
import io.bitsquare.crypto.SealedAndSignedMessage;
import io.bitsquare.p2p.messaging.*;
import io.bitsquare.p2p.network.*;
import io.bitsquare.p2p.peers.AuthenticationListener;
import io.bitsquare.p2p.peers.PeerGroup;
import io.bitsquare.p2p.peers.RequestDataManager;
import io.bitsquare.p2p.seed.SeedNodesRepository;
@ -41,17 +42,15 @@ import java.util.concurrent.CopyOnWriteArraySet;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
public class P2PService implements SetupListener, MessageListener, ConnectionListener, HashMapChangedListener {
public class P2PService implements SetupListener, MessageListener, ConnectionListener, HashMapChangedListener, AuthenticationListener {
private static final Logger log = LoggerFactory.getLogger(P2PService.class);
private final SeedNodesRepository seedNodesRepository;
private final int port;
private final File torDir;
private final boolean useLocalhost;
@Nullable
private final EncryptionService encryptionService;
@Nullable
private final KeyRing keyRing;
private final Optional<EncryptionService> optionalEncryptionService;
private final Optional<KeyRing> optionalKeyRing;
// set in init
private NetworkNode networkNode;
@ -96,8 +95,9 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
this.port = port;
this.torDir = torDir;
this.useLocalhost = useLocalhost;
this.encryptionService = encryptionService;
this.keyRing = keyRing;
optionalEncryptionService = encryptionService == null ? Optional.empty() : Optional.of(encryptionService);
optionalKeyRing = keyRing == null ? Optional.empty() : Optional.of(keyRing);
dbStorage = new Storage<>(storageDir);
@ -131,6 +131,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
// peer group
peerGroup = new PeerGroup(networkNode);
peerGroup.addAuthenticationListener(this);
if (useLocalhost)
PeerGroup.setSimulateAuthTorNode(200);
@ -138,7 +139,23 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
dataStorage = new P2PDataStorage(peerGroup, storageDir);
dataStorage.addHashMapChangedListener(this);
// Request initial data manager
requestDataManager = new RequestDataManager(networkNode, dataStorage, new RequestDataManager.Listener() {
@Override
public void onNoSeedNodeAvailable() {
p2pServiceListeners.stream().forEach(e -> e.onNoSeedNodeAvailable());
}
@Override
public void onDataReceived(Address seedNode) {
connectedSeedNode = seedNode;
requestingDataCompleted.set(true);
p2pServiceListeners.stream().forEach(e -> e.onRequestingDataCompleted());
}
});
peerGroup.addAuthenticationListener(requestDataManager);
// Test multiple states to check when we are ready for authenticateSeedNode
readyForAuthentication = EasyBind.combine(hiddenServicePublished, requestingDataCompleted, firstPeerAuthenticated,
(hiddenServicePublished, requestingDataCompleted, firstPeerAuthenticated)
-> hiddenServicePublished && requestingDataCompleted && !firstPeerAuthenticated);
@ -157,7 +174,10 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
public void startAsSeedNode(Address mySeedNodeAddress, @Nullable P2PServiceListener listener) {
Log.traceCall();
// we remove ourselves from the list of seed nodes
seedNodeAddresses.remove(mySeedNodeAddress);
start(listener);
}
@ -199,33 +219,30 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
}
/**
* Bootstrap sequence:
* <p>
* Variant 1 (normal expected mode):
* onTorNodeReady -> requestDataManager.requestData()
* RequestDataManager.Listener.onDataReceived && onHiddenServicePublished -> authenticateSeedNode()
* RequestDataManager.onPeerAddressAuthenticated -> RequestDataManager.requestDataFromAuthenticatedSeedNode()
* <p>
* Variant 2 (no seed node available):
* onTorNodeReady -> requestDataManager.requestData
* RequestDataManager.Listener.onNoSeedNodeAvailable && onHiddenServicePublished -> retry after 20-30 until
* seed node is available and data can be retrieved
* RequestDataManager.Listener.onDataReceived && onHiddenServicePublished -> authenticateSeedNode()
* RequestDataManager.onPeerAddressAuthenticated -> RequestDataManager.requestDataFromAuthenticatedSeedNode()
*/
///////////////////////////////////////////////////////////////////////////////////////////
// SetupListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onTorNodeReady() {
Log.traceCall();
p2pServiceListeners.stream().forEach(e -> e.onTorNodeReady());
// 1. Step: As soon we have the tor node ready (hidden service still not available) we request the
// data set from a random seed node.
requestDataManager = new RequestDataManager(networkNode, dataStorage, new RequestDataManager.Listener() {
@Override
public void onNoSeedNodeAvailable() {
// 2b. or 3b Step: If no seed node available we keep trying again after a random pause
p2pServiceListeners.stream().forEach(e -> e.onNoSeedNodeAvailable());
}
@Override
public void onDataReceived(Address seedNode) {
// 2a. or 3a Step: We received initial data set
connectedSeedNode = seedNode;
requestingDataCompleted.set(true);
p2pServiceListeners.stream().forEach(e -> e.onRequestingDataCompleted());
}
});
requestDataManager.requestData(seedNodeAddresses);
p2pServiceListeners.stream().forEach(e -> e.onTorNodeReady());
}
@Override
@ -240,7 +257,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
dbStorage.queueUpForSave(myOnionAddress);
}
// 3. (or 2.). Step: Hidden service is published
hiddenServicePublished.set(true);
p2pServiceListeners.stream().forEach(e -> e.onHiddenServicePublished());
@ -252,16 +268,55 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
p2pServiceListeners.stream().forEach(e -> e.onSetupFailed(throwable));
}
// 4. Step: hiddenServicePublished and requestingDataCompleted. We start authenticate to the connected seed node.
private void authenticateSeedNode() {
Log.traceCall();
checkNotNull(connectedSeedNode != null, "connectedSeedNode must not be null");
peerGroup.authenticateSeedNode(connectedSeedNode);
}
// 5. Step: in RequestDataManager (after authentication to first seed node we request again the data)
///////////////////////////////////////////////////////////////////////////////////////////
// ConnectionListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onConnection(Connection connection) {
}
@Override
public void onDisconnect(Reason reason, Connection connection) {
Log.traceCall();
if (connection.isAuthenticated())
authenticatedPeerAddresses.remove(connection.getPeerAddress());
numAuthenticatedPeers.set(authenticatedPeerAddresses.size());
}
@Override
public void onError(Throwable throwable) {
}
///////////////////////////////////////////////////////////////////////////////////////////
// AuthenticationListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) {
Log.traceCall();
checkArgument(peerAddress.equals(connection.getPeerAddress()),
"peerAddress must match connection.getPeerAddress()");
authenticatedPeerAddresses.add(peerAddress);
if (!firstPeerAuthenticated.get()) {
firstPeerAuthenticated.set(true);
p2pServiceListeners.stream().forEach(e -> e.onFirstPeerAuthenticated());
}
numAuthenticatedPeers.set(authenticatedPeerAddresses.size());
}
///////////////////////////////////////////////////////////////////////////////////////////
// MessageListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@ -271,11 +326,11 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
if (message instanceof SealedAndSignedMessage) {
Log.traceCall(message.toString());
// Seed nodes don't have set the encryptionService
if (encryptionService != null) {
if (optionalEncryptionService.isPresent()) {
try {
SealedAndSignedMessage sealedAndSignedMessage = (SealedAndSignedMessage) message;
if (verifyAddressPrefixHash(sealedAndSignedMessage)) {
DecryptedMsgWithPubKey decryptedMsgWithPubKey = encryptionService.decryptAndVerify(
DecryptedMsgWithPubKey decryptedMsgWithPubKey = optionalEncryptionService.get().decryptAndVerify(
sealedAndSignedMessage.sealedAndSigned);
// We set connectionType to that connection to avoid that is get closed when
@ -298,46 +353,10 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
}
///////////////////////////////////////////////////////////////////////////////////////////
// ConnectionListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onConnection(Connection connection) {
}
@Override
public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) {
Log.traceCall();
checkArgument(peerAddress.equals(connection.getPeerAddress()),
"peerAddress must match connection.getPeerAddress()");
authenticatedPeerAddresses.add(peerAddress);
if (!firstPeerAuthenticated.get()) {
firstPeerAuthenticated.set(true);
p2pServiceListeners.stream().forEach(e -> e.onFirstPeerAuthenticated());
}
numAuthenticatedPeers.set(authenticatedPeerAddresses.size());
}
@Override
public void onDisconnect(Reason reason, Connection connection) {
Log.traceCall();
if (connection.isAuthenticated())
authenticatedPeerAddresses.remove(connection.getPeerAddress());
numAuthenticatedPeers.set(authenticatedPeerAddresses.size());
}
@Override
public void onError(Throwable throwable) {
}
///////////////////////////////////////////////////////////////////////////////////////////
// HashMapChangedListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onAdded(ProtectedData entry) {
if (entry instanceof ProtectedMailboxData)
@ -370,27 +389,26 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
private void doSendEncryptedMailMessage(Address peerAddress, PubKeyRing pubKeyRing, MailMessage message,
SendMailMessageListener sendMailMessageListener) {
Log.traceCall();
if (encryptionService != null) {
try {
SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage(
encryptionService.encryptAndSign(pubKeyRing, message), peerAddress.getAddressPrefixHash());
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, sealedAndSignedMessage);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
sendMailMessageListener.onArrived();
}
checkArgument(optionalEncryptionService.isPresent(), "EncryptionService not set. Seems that is called on a seed node which must not happen.");
try {
SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage(
optionalEncryptionService.get().encryptAndSign(pubKeyRing, message), peerAddress.getAddressPrefixHash());
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, sealedAndSignedMessage);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
sendMailMessageListener.onArrived();
}
@Override
public void onFailure(@NotNull Throwable throwable) {
throwable.printStackTrace();
sendMailMessageListener.onFault();
}
});
} catch (CryptoException e) {
e.printStackTrace();
sendMailMessageListener.onFault();
}
@Override
public void onFailure(@NotNull Throwable throwable) {
throwable.printStackTrace();
sendMailMessageListener.onFault();
}
});
} catch (CryptoException e) {
e.printStackTrace();
sendMailMessageListener.onFault();
}
}
@ -400,7 +418,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
///////////////////////////////////////////////////////////////////////////////////////////
private void processProtectedMailboxData(ProtectedMailboxData mailboxData) {
if (encryptionService != null) {
// Seed nodes don't have set the encryptionService
if (optionalEncryptionService.isPresent()) {
Log.traceCall();
ExpirablePayload expirablePayload = mailboxData.expirablePayload;
if (expirablePayload instanceof ExpirableMailboxPayload) {
@ -408,7 +427,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
SealedAndSignedMessage sealedAndSignedMessage = expirableMailboxPayload.sealedAndSignedMessage;
if (verifyAddressPrefixHash(sealedAndSignedMessage)) {
try {
DecryptedMsgWithPubKey decryptedMsgWithPubKey = encryptionService.decryptAndVerify(
DecryptedMsgWithPubKey decryptedMsgWithPubKey = optionalEncryptionService.get().decryptAndVerify(
sealedAndSignedMessage.sealedAndSigned);
if (decryptedMsgWithPubKey.message instanceof MailboxMessage) {
MailboxMessage mailboxMessage = (MailboxMessage) decryptedMsgWithPubKey.message;
@ -439,7 +458,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
MailboxMessage message, SendMailboxMessageListener sendMailboxMessageListener) {
Log.traceCall();
checkNotNull(peerAddress, "PeerAddress must not be null (sendEncryptedMailboxMessage)");
checkArgument(!keyRing.getPubKeyRing().equals(peersPubKeyRing), "We got own keyring instead of that from peer");
checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen.");
checkArgument(!optionalKeyRing.get().getPubKeyRing().equals(peersPubKeyRing), "We got own keyring instead of that from peer");
checkAuthentication();
if (authenticatedPeerAddresses.contains(peerAddress)) {
@ -458,40 +478,86 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
private void trySendEncryptedMailboxMessage(Address peerAddress, PubKeyRing peersPubKeyRing,
MailboxMessage message, SendMailboxMessageListener sendMailboxMessageListener) {
Log.traceCall();
if (encryptionService != null) {
try {
SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage(
encryptionService.encryptAndSign(peersPubKeyRing, message), peerAddress.getAddressPrefixHash());
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, sealedAndSignedMessage);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
log.trace("SendEncryptedMailboxMessage onSuccess");
sendMailboxMessageListener.onArrived();
}
checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen.");
checkArgument(optionalEncryptionService.isPresent(), "EncryptionService not set. Seems that is called on a seed node which must not happen.");
try {
SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage(
optionalEncryptionService.get().encryptAndSign(peersPubKeyRing, message), peerAddress.getAddressPrefixHash());
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, sealedAndSignedMessage);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
log.trace("SendEncryptedMailboxMessage onSuccess");
sendMailboxMessageListener.onArrived();
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.trace("SendEncryptedMailboxMessage onFailure");
log.debug(throwable.toString());
log.info("We cannot send message to peer. Peer might be offline. We will store message in mailbox.");
log.trace("create MailboxEntry with peerAddress " + peerAddress);
PublicKey receiverStoragePublicKey = peersPubKeyRing.getSignaturePubKey();
addMailboxData(new ExpirableMailboxPayload(sealedAndSignedMessage,
keyRing.getSignatureKeyPair().getPublic(),
receiverStoragePublicKey),
receiverStoragePublicKey);
sendMailboxMessageListener.onStoredInMailbox();
}
});
} catch (CryptoException e) {
log.error("sendEncryptedMessage failed");
e.printStackTrace();
sendMailboxMessageListener.onFault();
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.trace("SendEncryptedMailboxMessage onFailure");
log.debug(throwable.toString());
log.info("We cannot send message to peer. Peer might be offline. We will store message in mailbox.");
log.trace("create MailboxEntry with peerAddress " + peerAddress);
PublicKey receiverStoragePublicKey = peersPubKeyRing.getSignaturePubKey();
addMailboxData(new ExpirableMailboxPayload(sealedAndSignedMessage,
optionalKeyRing.get().getSignatureKeyPair().getPublic(),
receiverStoragePublicKey),
receiverStoragePublicKey);
sendMailboxMessageListener.onStoredInMailbox();
}
});
} catch (CryptoException e) {
log.error("sendEncryptedMessage failed");
e.printStackTrace();
sendMailboxMessageListener.onFault();
}
}
private void addMailboxData(ExpirableMailboxPayload expirableMailboxPayload, PublicKey receiversPublicKey) {
Log.traceCall();
checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen.");
checkAuthentication();
try {
ProtectedMailboxData protectedMailboxData = dataStorage.getMailboxDataWithSignedSeqNr(
expirableMailboxPayload,
optionalKeyRing.get().getSignatureKeyPair(),
receiversPublicKey);
dataStorage.add(protectedMailboxData, networkNode.getAddress());
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
}
}
public void removeEntryFromMailbox(DecryptedMsgWithPubKey decryptedMsgWithPubKey) {
Log.traceCall();
checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen.");
checkAuthentication();
if (mailboxMap.containsKey(decryptedMsgWithPubKey)) {
ProtectedMailboxData mailboxData = mailboxMap.get(decryptedMsgWithPubKey);
if (mailboxData != null && mailboxData.expirablePayload instanceof ExpirableMailboxPayload) {
ExpirableMailboxPayload expirableMailboxPayload = (ExpirableMailboxPayload) mailboxData.expirablePayload;
PublicKey receiversPubKey = mailboxData.receiversPubKey;
checkArgument(receiversPubKey.equals(optionalKeyRing.get().getSignatureKeyPair().getPublic()),
"receiversPubKey is not matching with our key. That must not happen.");
try {
ProtectedMailboxData protectedMailboxData = dataStorage.getMailboxDataWithSignedSeqNr(
expirableMailboxPayload,
optionalKeyRing.get().getSignatureKeyPair(),
receiversPubKey);
dataStorage.removeMailboxData(protectedMailboxData, networkNode.getAddress());
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
}
mailboxMap.remove(decryptedMsgWithPubKey);
log.trace("Removed successfully decryptedMsgWithPubKey.");
}
} else {
log.warn("decryptedMsgWithPubKey not found in mailboxMap. That should never happen." +
"\ndecryptedMsgWithPubKey={}\nmailboxMap={}", decryptedMsgWithPubKey, mailboxMap);
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// Data storage
///////////////////////////////////////////////////////////////////////////////////////////
@ -508,10 +574,10 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
private boolean doAddData(ExpirablePayload expirablePayload, boolean rePublish) {
Log.traceCall();
checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen.");
checkAuthentication();
try {
ProtectedData protectedData = dataStorage.getDataWithSignedSeqNr(expirablePayload, keyRing.getSignatureKeyPair());
ProtectedData protectedData = dataStorage.getDataWithSignedSeqNr(expirablePayload, optionalKeyRing.get().getSignatureKeyPair());
if (rePublish)
return dataStorage.rePublish(protectedData, networkNode.getAddress());
else
@ -522,27 +588,12 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
}
}
private void addMailboxData(ExpirableMailboxPayload expirableMailboxPayload, PublicKey receiversPublicKey) {
Log.traceCall();
checkAuthentication();
try {
ProtectedMailboxData protectedMailboxData = dataStorage.getMailboxDataWithSignedSeqNr(
expirableMailboxPayload,
keyRing.getSignatureKeyPair(),
receiversPublicKey);
dataStorage.add(protectedMailboxData, networkNode.getAddress());
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
}
}
public boolean removeData(ExpirablePayload expirablePayload) {
Log.traceCall();
checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen.");
checkAuthentication();
try {
ProtectedData protectedData = dataStorage.getDataWithSignedSeqNr(expirablePayload, keyRing.getSignatureKeyPair());
ProtectedData protectedData = dataStorage.getDataWithSignedSeqNr(expirablePayload, optionalKeyRing.get().getSignatureKeyPair());
return dataStorage.remove(protectedData, networkNode.getAddress());
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
@ -550,40 +601,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
}
}
public void removeEntryFromMailbox(DecryptedMsgWithPubKey decryptedMsgWithPubKey) {
Log.traceCall();
checkAuthentication();
if (mailboxMap.containsKey(decryptedMsgWithPubKey)) {
ProtectedMailboxData mailboxData = mailboxMap.get(decryptedMsgWithPubKey);
if (mailboxData != null && mailboxData.expirablePayload instanceof ExpirableMailboxPayload) {
ExpirableMailboxPayload expirableMailboxPayload = (ExpirableMailboxPayload) mailboxData.expirablePayload;
PublicKey receiversPubKey = mailboxData.receiversPubKey;
checkArgument(receiversPubKey.equals(keyRing.getSignatureKeyPair().getPublic()),
"receiversPubKey is not matching with our key. That must not happen.");
try {
ProtectedMailboxData protectedMailboxData = dataStorage.getMailboxDataWithSignedSeqNr(
expirableMailboxPayload,
keyRing.getSignatureKeyPair(),
receiversPubKey);
dataStorage.removeMailboxData(protectedMailboxData, networkNode.getAddress());
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
}
mailboxMap.remove(decryptedMsgWithPubKey);
log.trace("Removed successfully decryptedMsgWithPubKey.");
}
} else {
log.warn("decryptedMsgWithPubKey not found in mailboxMap. That should never happen." +
"\ndecryptedMsgWithPubKey={}\nmailboxMap={}", decryptedMsgWithPubKey, mailboxMap);
}
}
public Map<ByteArray, ProtectedData> getDataMap() {
return dataStorage.getMap();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Listeners
@ -646,15 +663,15 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
return numAuthenticatedPeers;
}
public Map<ByteArray, ProtectedData> getDataMap() {
return dataStorage.getMap();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private HashSet<ProtectedData> getDataSet() {
return new HashSet<>(getDataMap().values());
}
private boolean verifyAddressPrefixHash(SealedAndSignedMessage sealedAndSignedMessage) {
if (myOnionAddress != null) {
byte[] blurredAddressHash = myOnionAddress.getAddressPrefixHash();

View File

@ -120,8 +120,6 @@ public class Connection implements MessageListener {
Log.traceCall();
this.peerAddress = peerAddress;
isAuthenticated = true;
if (!stopped)
connectionListener.onPeerAddressAuthenticated(peerAddress, connection);
}
public void setConnectionPriority(ConnectionPriority connectionPriority) {

View File

@ -1,8 +1,6 @@
package io.bitsquare.p2p.network;
import io.bitsquare.p2p.Address;
public interface ConnectionListener {
enum Reason {
SOCKET_CLOSED,
@ -15,8 +13,6 @@ public interface ConnectionListener {
void onConnection(Connection connection);
void onPeerAddressAuthenticated(Address peerAddress, Connection connection);
void onDisconnect(Reason reason, Connection connection);
void onError(Throwable throwable);

View File

@ -230,14 +230,6 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
connectionListeners.stream().forEach(e -> e.onConnection(connection));
}
@Override
public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) {
Log.traceCall();
log.trace("onAuthenticationComplete peerAddress/connection: " + peerAddress + " / " + connection);
connectionListeners.stream().forEach(e -> e.onPeerAddressAuthenticated(peerAddress, connection));
}
@Override
public void onDisconnect(Reason reason, Connection connection) {
Log.traceCall();
@ -320,12 +312,6 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
NetworkNode.this.onConnection(connection);
}
@Override
public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) {
Log.traceCall();
NetworkNode.this.onPeerAddressAuthenticated(peerAddress, connection);
}
@Override
public void onDisconnect(Reason reason, Connection connection) {
Log.traceCall();

View File

@ -0,0 +1,8 @@
package io.bitsquare.p2p.peers;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.network.Connection;
public interface AuthenticationListener {
void onPeerAddressAuthenticated(Address peerAddress, Connection connection);
}

View File

@ -18,6 +18,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -40,19 +41,18 @@ public class PeerGroup implements MessageListener, ConnectionListener {
MAX_CONNECTIONS_LOW_PRIO = maxConnectionsLowPrio;
}
private static final int PING_AFTER_CONNECTION_INACTIVITY = 30 * 1000;
private static final int INACTIVITY_PERIOD_BEFORE_PING = 30 * 1000;
private static final int MAX_REPORTED_PEERS = 1000;
private final NetworkNode networkNode;
private Set<Address> seedNodeAddresses;
private final CopyOnWriteArraySet<AuthenticationListener> authenticationListeners = new CopyOnWriteArraySet<>();
private final Map<Address, Peer> authenticatedPeers = new HashMap<>();
private final Set<ReportedPeer> reportedPeers = new HashSet<>();
private final Map<Address, AuthenticationHandshake> authenticationHandshakes = new HashMap<>();
private Timer sendPingTimer;
private Timer getPeersTimer;
private Timer sendPingTimer = new Timer();
private Timer getPeersTimer = new Timer();
private Set<Address> seedNodeAddresses;
private boolean shutDownInProgress;
@ -72,10 +72,29 @@ public class PeerGroup implements MessageListener, ConnectionListener {
startGetPeersTimer();
}
public void setSeedNodeAddresses(Set<Address> seedNodeAddresses) {
this.seedNodeAddresses = seedNodeAddresses;
public void addAuthenticationListener(AuthenticationListener listener) {
authenticationListeners.add(listener);
}
///////////////////////////////////////////////////////////////////////////////////////////
// ConnectionListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onConnection(Connection connection) {
}
@Override
public void onDisconnect(Reason reason, Connection connection) {
log.debug("onDisconnect connection=" + connection + " / reason=" + reason);
removePeer(connection.getPeerAddress());
}
@Override
public void onError(Throwable throwable) {
}
///////////////////////////////////////////////////////////////////////////////////////////
// MessageListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@ -89,33 +108,14 @@ public class PeerGroup implements MessageListener, ConnectionListener {
}
///////////////////////////////////////////////////////////////////////////////////////////
// ConnectionListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onConnection(Connection connection) {
}
@Override
public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) {
}
@Override
public void onDisconnect(Reason reason, Connection connection) {
log.debug("onDisconnect connection=" + connection + " / reason=" + reason);
removePeer(connection.getPeerAddress());
}
@Override
public void onError(Throwable throwable) {
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void setSeedNodeAddresses(Set<Address> seedNodeAddresses) {
this.seedNodeAddresses = seedNodeAddresses;
}
public void broadcast(DataBroadcastMessage message, @Nullable Address sender) {
Log.traceCall("Sender " + sender + ". Message " + message.toString());
if (authenticatedPeers.values().size() > 0) {
@ -442,6 +442,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
addAuthenticatedPeer(new Peer(connection));
connection.setAuthenticated(peerAddress, connection);
authenticationListeners.stream().forEach(e -> e.onPeerAddressAuthenticated(peerAddress, connection));
}
private void addAuthenticatedPeer(Peer peer) {
@ -455,6 +456,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
printAuthenticatedPeers();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Maintenance
///////////////////////////////////////////////////////////////////////////////////////////
@ -542,7 +544,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
if (!connectedPeersList.isEmpty()) {
Log.traceCall();
connectedPeersList.stream()
.filter(e -> (new Date().getTime() - e.connection.getLastActivityDate().getTime()) > PING_AFTER_CONNECTION_INACTIVITY)
.filter(e -> (new Date().getTime() - e.connection.getLastActivityDate().getTime()) > INACTIVITY_PERIOD_BEFORE_PING)
.forEach(e -> UserThread.runAfterRandomDelay(() -> {
SettableFuture<Connection> future = networkNode.sendMessage(e.connection, new PingMessage(e.getPingNonce()));
Futures.addCallback(future, new FutureCallback<Connection>() {

View File

@ -8,7 +8,6 @@ import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.ConnectionListener;
import io.bitsquare.p2p.network.MessageListener;
import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.storage.P2PDataStorage;
@ -25,7 +24,7 @@ import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
public class RequestDataManager implements MessageListener, ConnectionListener {
public class RequestDataManager implements MessageListener, AuthenticationListener {
private static final Logger log = LoggerFactory.getLogger(RequestDataManager.class);
///////////////////////////////////////////////////////////////////////////////////////////
@ -56,7 +55,6 @@ public class RequestDataManager implements MessageListener, ConnectionListener {
this.listener = listener;
networkNode.addMessageListener(this);
networkNode.addConnectionListener(this);
}
///////////////////////////////////////////////////////////////////////////////////////////
@ -129,13 +127,11 @@ public class RequestDataManager implements MessageListener, ConnectionListener {
listener.onDataReceived(connectedSeedNodeAddress);
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// ConnectionListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onConnection(Connection connection) {
}
///////////////////////////////////////////////////////////////////////////////////////////
// AuthenticationListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) {
@ -143,13 +139,6 @@ public class RequestDataManager implements MessageListener, ConnectionListener {
requestDataFromAuthenticatedSeedNode(peerAddress, connection);
}
@Override
public void onDisconnect(Reason reason, Connection connection) {
}
@Override
public void onError(Throwable throwable) {
}
// 5. Step after authentication to first seed node we request again the data
private void requestDataFromAuthenticatedSeedNode(Address peerAddress, Connection connection) {