mirror of
https://github.com/haveno-dex/haveno.git
synced 2025-07-28 09:24:15 -04:00
extract authentication to class, map to user thread
This commit is contained in:
parent
f788778f3c
commit
e3cdad4299
27 changed files with 914 additions and 695 deletions
|
@ -18,10 +18,17 @@
|
||||||
package io.bitsquare.common;
|
package io.bitsquare.common;
|
||||||
|
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.Timer;
|
||||||
|
import java.util.TimerTask;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
public class UserThread {
|
public class UserThread {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(UserThread.class);
|
||||||
|
|
||||||
public static Executor getExecutor() {
|
public static Executor getExecutor() {
|
||||||
return executor;
|
return executor;
|
||||||
|
@ -41,4 +48,34 @@ public class UserThread {
|
||||||
public static void execute(Runnable command) {
|
public static void execute(Runnable command) {
|
||||||
UserThread.executor.execute(command);
|
UserThread.executor.execute(command);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public static Timer runAfterRandomDelay(Runnable runnable, long minDelayInSec, long maxDelayInSec) {
|
||||||
|
return UserThread.runAfterRandomDelay(runnable, minDelayInSec, maxDelayInSec, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Timer runAfterRandomDelay(Runnable runnable, long minDelay, long maxDelay, TimeUnit timeUnit) {
|
||||||
|
return UserThread.runAfter(runnable, new Random().nextInt((int) (maxDelay - minDelay)) + minDelay, timeUnit);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Timer runAfter(Runnable runnable, long delayInSec) {
|
||||||
|
return UserThread.runAfter(runnable, delayInSec, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Timer runAfter(Runnable runnable, long delay, TimeUnit timeUnit) {
|
||||||
|
Timer timer = new Timer();
|
||||||
|
timer.schedule(new TimerTask() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
Thread.currentThread().setName("TimerTask-" + new Random().nextInt(10000));
|
||||||
|
try {
|
||||||
|
UserThread.execute(() -> runnable.run());
|
||||||
|
} catch (Throwable t) {
|
||||||
|
t.printStackTrace();
|
||||||
|
log.error("Executing timerTask failed. " + t.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, timeUnit.toMillis(delay));
|
||||||
|
return timer;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,9 +35,6 @@ import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.net.URLConnection;
|
import java.net.URLConnection;
|
||||||
import java.net.URLEncoder;
|
import java.net.URLEncoder;
|
||||||
import java.util.Random;
|
|
||||||
import java.util.Timer;
|
|
||||||
import java.util.TimerTask;
|
|
||||||
import java.util.concurrent.ArrayBlockingQueue;
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
@ -83,36 +80,6 @@ public class Utilities {
|
||||||
return threadPoolExecutor;
|
return threadPoolExecutor;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Timer runTimerTaskWithRandomDelay(Runnable runnable, long minDelay, long maxDelay) {
|
|
||||||
return runTimerTaskWithRandomDelay(runnable, minDelay, maxDelay, TimeUnit.SECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Timer runTimerTaskWithRandomDelay(Runnable runnable, long minDelay, long maxDelay, TimeUnit timeUnit) {
|
|
||||||
return runTimerTask(runnable, new Random().nextInt((int) (maxDelay - minDelay)) + minDelay, timeUnit);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Timer runTimerTask(Runnable runnable, long delay) {
|
|
||||||
return runTimerTask(runnable, delay, TimeUnit.SECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Timer runTimerTask(Runnable runnable, long delay, TimeUnit timeUnit) {
|
|
||||||
Timer timer = new Timer();
|
|
||||||
timer.schedule(new TimerTask() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
Thread.currentThread().setName("TimerTask-" + new Random().nextInt(10000));
|
|
||||||
try {
|
|
||||||
runnable.run();
|
|
||||||
} catch (Throwable t) {
|
|
||||||
t.printStackTrace();
|
|
||||||
log.error("Executing timerTask failed. " + t.getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}, timeUnit.convert(delay, timeUnit));
|
|
||||||
return timer;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public static boolean isUnix() {
|
public static boolean isUnix() {
|
||||||
return isOSX() || isLinux() || getOSName().contains("freebsd");
|
return isOSX() || isLinux() || getOSName().contains("freebsd");
|
||||||
}
|
}
|
||||||
|
|
|
@ -119,7 +119,7 @@ public class ArbitratorManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onHiddenServiceReady() {
|
public void onHiddenServicePublished() {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -121,7 +121,7 @@ public class DisputeManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onHiddenServiceReady() {
|
public void onHiddenServicePublished() {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -154,7 +154,7 @@ public class TradeManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onHiddenServiceReady() {
|
public void onHiddenServicePublished() {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -134,7 +134,7 @@ public class OpenOfferManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onHiddenServiceReady() {
|
public void onHiddenServicePublished() {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -249,7 +249,7 @@ class MainViewModel implements ViewModel {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onHiddenServiceReady() {
|
public void onHiddenServicePublished() {
|
||||||
p2pNetworkInfoFooter.set("Tor hidden service available.");
|
p2pNetworkInfoFooter.set("Tor hidden service available.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
package io.bitsquare.p2p;
|
package io.bitsquare.p2p;
|
||||||
|
|
||||||
import com.google.common.collect.Sets;
|
|
||||||
import com.google.common.util.concurrent.FutureCallback;
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.SettableFuture;
|
import com.google.common.util.concurrent.SettableFuture;
|
||||||
|
@ -12,7 +11,6 @@ import io.bitsquare.common.crypto.CryptoException;
|
||||||
import io.bitsquare.common.crypto.KeyRing;
|
import io.bitsquare.common.crypto.KeyRing;
|
||||||
import io.bitsquare.common.crypto.PubKeyRing;
|
import io.bitsquare.common.crypto.PubKeyRing;
|
||||||
import io.bitsquare.common.crypto.SealedAndSigned;
|
import io.bitsquare.common.crypto.SealedAndSigned;
|
||||||
import io.bitsquare.common.util.Utilities;
|
|
||||||
import io.bitsquare.crypto.EncryptionService;
|
import io.bitsquare.crypto.EncryptionService;
|
||||||
import io.bitsquare.crypto.SealedAndSignedMessage;
|
import io.bitsquare.crypto.SealedAndSignedMessage;
|
||||||
import io.bitsquare.p2p.messaging.*;
|
import io.bitsquare.p2p.messaging.*;
|
||||||
|
@ -27,8 +25,13 @@ import io.bitsquare.p2p.storage.data.ExpirableMailboxPayload;
|
||||||
import io.bitsquare.p2p.storage.data.ExpirablePayload;
|
import io.bitsquare.p2p.storage.data.ExpirablePayload;
|
||||||
import io.bitsquare.p2p.storage.data.ProtectedData;
|
import io.bitsquare.p2p.storage.data.ProtectedData;
|
||||||
import io.bitsquare.p2p.storage.data.ProtectedMailboxData;
|
import io.bitsquare.p2p.storage.data.ProtectedMailboxData;
|
||||||
import io.bitsquare.p2p.storage.messages.DataSetMessage;
|
import io.bitsquare.p2p.storage.messages.AllDataMessage;
|
||||||
import io.bitsquare.p2p.storage.messages.GetDataSetMessage;
|
import io.bitsquare.p2p.storage.messages.GetAllDataMessage;
|
||||||
|
import javafx.beans.property.BooleanProperty;
|
||||||
|
import javafx.beans.property.SimpleBooleanProperty;
|
||||||
|
import org.fxmisc.easybind.EasyBind;
|
||||||
|
import org.fxmisc.easybind.monadic.MonadicBinding;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
import org.jetbrains.annotations.Nullable;
|
import org.jetbrains.annotations.Nullable;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -46,7 +49,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
/**
|
/**
|
||||||
* Represents our node in the P2P network
|
* Represents our node in the P2P network
|
||||||
*/
|
*/
|
||||||
public class P2PService {
|
public class P2PService implements SetupListener {
|
||||||
private static final Logger log = LoggerFactory.getLogger(P2PService.class);
|
private static final Logger log = LoggerFactory.getLogger(P2PService.class);
|
||||||
|
|
||||||
private final SeedNodesRepository seedNodesRepository;
|
private final SeedNodesRepository seedNodesRepository;
|
||||||
|
@ -55,7 +58,6 @@ public class P2PService {
|
||||||
private final boolean useLocalhost;
|
private final boolean useLocalhost;
|
||||||
@Nullable
|
@Nullable
|
||||||
private final EncryptionService encryptionService;
|
private final EncryptionService encryptionService;
|
||||||
private SetupListener setupListener;
|
|
||||||
private KeyRing keyRing;
|
private KeyRing keyRing;
|
||||||
private final File storageDir;
|
private final File storageDir;
|
||||||
private final NetworkStatistics networkStatistics;
|
private final NetworkStatistics networkStatistics;
|
||||||
|
@ -68,18 +70,15 @@ public class P2PService {
|
||||||
private final CopyOnWriteArraySet<P2PServiceListener> p2pServiceListeners = new CopyOnWriteArraySet<>();
|
private final CopyOnWriteArraySet<P2PServiceListener> p2pServiceListeners = new CopyOnWriteArraySet<>();
|
||||||
private final Map<DecryptedMsgWithPubKey, ProtectedMailboxData> mailboxMap = new ConcurrentHashMap<>();
|
private final Map<DecryptedMsgWithPubKey, ProtectedMailboxData> mailboxMap = new ConcurrentHashMap<>();
|
||||||
private volatile boolean shutDownInProgress;
|
private volatile boolean shutDownInProgress;
|
||||||
private Set<Address> seedNodeAddresses;
|
private Address connectedSeedNode;
|
||||||
private Set<Address> connectedSeedNodes = new HashSet<>();
|
|
||||||
private Set<Address> authenticatedPeerAddresses = new HashSet<>();
|
private Set<Address> authenticatedPeerAddresses = new HashSet<>();
|
||||||
private boolean authenticatedToFirstPeer;
|
|
||||||
private boolean allDataReceived;
|
|
||||||
public boolean authenticated;
|
|
||||||
private boolean shutDownComplete;
|
private boolean shutDownComplete;
|
||||||
private CopyOnWriteArraySet<Runnable> shutDownResultHandlers = new CopyOnWriteArraySet<>();
|
private CopyOnWriteArraySet<Runnable> shutDownResultHandlers = new CopyOnWriteArraySet<>();
|
||||||
private final CopyOnWriteArraySet<Long> getDataSetMessageNonceList = new CopyOnWriteArraySet<>();
|
private BooleanProperty hiddenServicePublished = new SimpleBooleanProperty();
|
||||||
private boolean allSeedNodesRequested;
|
private BooleanProperty allDataLoaded = new SimpleBooleanProperty();
|
||||||
private Timer sendGetAllDataMessageTimer;
|
private BooleanProperty authenticated = new SimpleBooleanProperty();
|
||||||
private volatile boolean hiddenServiceReady;
|
private MonadicBinding<Boolean> readyForAuthentication;
|
||||||
|
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
// Constructor
|
// Constructor
|
||||||
|
@ -108,6 +107,7 @@ public class P2PService {
|
||||||
|
|
||||||
private void init() {
|
private void init() {
|
||||||
// network
|
// network
|
||||||
|
Set<Address> seedNodeAddresses;
|
||||||
if (useLocalhost) {
|
if (useLocalhost) {
|
||||||
networkNode = new LocalhostNetworkNode(port);
|
networkNode = new LocalhostNetworkNode(port);
|
||||||
seedNodeAddresses = seedNodesRepository.getLocalhostSeedNodeAddresses();
|
seedNodeAddresses = seedNodesRepository.getLocalhostSeedNodeAddresses();
|
||||||
|
@ -124,31 +124,6 @@ public class P2PService {
|
||||||
dataStorage = new ProtectedExpirableDataStorage(peerGroup, storageDir);
|
dataStorage = new ProtectedExpirableDataStorage(peerGroup, storageDir);
|
||||||
|
|
||||||
|
|
||||||
// Listeners
|
|
||||||
setupListener = new SetupListener() {
|
|
||||||
@Override
|
|
||||||
public void onTorNodeReady() {
|
|
||||||
UserThread.execute(() -> p2pServiceListeners.stream().forEach(e -> e.onTorNodeReady()));
|
|
||||||
|
|
||||||
// we don't know yet our own address so we can not filter that from the
|
|
||||||
// seedNodeAddresses in case we are a seed node
|
|
||||||
sendGetAllDataMessage(seedNodeAddresses);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onHiddenServiceReady() {
|
|
||||||
hiddenServiceReady = true;
|
|
||||||
tryStartAuthentication();
|
|
||||||
|
|
||||||
UserThread.execute(() -> p2pServiceListeners.stream().forEach(e -> e.onHiddenServiceReady()));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onSetupFailed(Throwable throwable) {
|
|
||||||
UserThread.execute(() -> p2pServiceListeners.stream().forEach(e -> e.onSetupFailed(throwable)));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
networkNode.addConnectionListener(new ConnectionListener() {
|
networkNode.addConnectionListener(new ConnectionListener() {
|
||||||
@Override
|
@Override
|
||||||
public void onConnection(Connection connection) {
|
public void onConnection(Connection connection) {
|
||||||
|
@ -156,30 +131,11 @@ public class P2PService {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) {
|
public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) {
|
||||||
checkArgument(peerAddress.equals(connection.getPeerAddress()));
|
checkArgument(peerAddress.equals(connection.getPeerAddress()),
|
||||||
|
"peerAddress must match connection.getPeerAddress()");
|
||||||
authenticatedPeerAddresses.add(peerAddress);
|
authenticatedPeerAddresses.add(peerAddress);
|
||||||
|
authenticated.set(true);
|
||||||
|
|
||||||
if (!authenticatedToFirstPeer) {
|
|
||||||
authenticatedToFirstPeer = true;
|
|
||||||
|
|
||||||
SettableFuture<Connection> future = sendMessage(peerAddress,
|
|
||||||
new GetDataSetMessage(addToListAndGetNonce()));
|
|
||||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
|
||||||
@Override
|
|
||||||
public void onSuccess(@Nullable Connection connection) {
|
|
||||||
log.info("onPeerAddressAuthenticated Send GetAllDataMessage to " + peerAddress + " succeeded.");
|
|
||||||
connectedSeedNodes.add(peerAddress);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(Throwable throwable) {
|
|
||||||
log.warn("onPeerAddressAuthenticated Send GetAllDataMessage to " + peerAddress + " failed. " +
|
|
||||||
"Exception:" + throwable.getMessage());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
P2PService.this.authenticated = true;
|
|
||||||
dataStorage.setAuthenticated(true);
|
dataStorage.setAuthenticated(true);
|
||||||
UserThread.execute(() -> p2pServiceListeners.stream().forEach(e -> e.onAuthenticated()));
|
UserThread.execute(() -> p2pServiceListeners.stream().forEach(e -> e.onAuthenticated()));
|
||||||
}
|
}
|
||||||
|
@ -197,30 +153,24 @@ public class P2PService {
|
||||||
});
|
});
|
||||||
|
|
||||||
networkNode.addMessageListener((message, connection) -> {
|
networkNode.addMessageListener((message, connection) -> {
|
||||||
if (message instanceof GetDataSetMessage) {
|
if (message instanceof GetAllDataMessage) {
|
||||||
log.trace("Received GetDataSetMessage: " + message);
|
log.trace("Received GetDataSetMessage: " + message);
|
||||||
|
networkNode.sendMessage(connection, new AllDataMessage(getDataSet()));
|
||||||
// we only reply if we did not get the message form ourselves (in case we are a seed node)
|
} else if (message instanceof AllDataMessage) {
|
||||||
if (!getDataSetMessageNonceList.contains(((GetDataSetMessage) message).nonce)) {
|
AllDataMessage allDataMessage = (AllDataMessage) message;
|
||||||
networkNode.sendMessage(connection, new DataSetMessage(getHashSet()));
|
HashSet<ProtectedData> set = allDataMessage.set;
|
||||||
} else {
|
if (!set.isEmpty()) {
|
||||||
connection.shutDown(() -> {
|
|
||||||
if (allSeedNodesRequested) dataReceived();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
} else if (message instanceof DataSetMessage) {
|
|
||||||
DataSetMessage dataSetMessage = (DataSetMessage) message;
|
|
||||||
StringBuilder sb = new StringBuilder("Received DataSetMessage:\n\n");
|
StringBuilder sb = new StringBuilder("Received DataSetMessage:\n\n");
|
||||||
dataSetMessage.set.stream().forEach(e -> sb.append(e.toString() + "\n"));
|
set.stream().forEach(e -> sb.append(e.toString() + "\n"));
|
||||||
sb.append("\n");
|
sb.append("\n");
|
||||||
log.trace(sb.toString());
|
log.trace(sb.toString());
|
||||||
// we keep that connection open as the bootstrapping peer will use that for the authentication
|
// we keep that connection open as the bootstrapping peer will use that for the authentication
|
||||||
|
|
||||||
// as we are not authenticated yet the data adding will not be broadcasted
|
// as we are not authenticated yet the data adding will not be broadcasted
|
||||||
HashSet<ProtectedData> set = dataSetMessage.set;
|
|
||||||
set.stream().forEach(e -> dataStorage.add(e, connection.getPeerAddress()));
|
set.stream().forEach(e -> dataStorage.add(e, connection.getPeerAddress()));
|
||||||
|
} else {
|
||||||
dataReceived();
|
log.trace("Received DataSetMessage: Empty data set");
|
||||||
|
}
|
||||||
|
allDataLoaded();
|
||||||
} else if (message instanceof SealedAndSignedMessage) {
|
} else if (message instanceof SealedAndSignedMessage) {
|
||||||
if (encryptionService != null) {
|
if (encryptionService != null) {
|
||||||
try {
|
try {
|
||||||
|
@ -239,23 +189,22 @@ public class P2PService {
|
||||||
|
|
||||||
peerGroup.addPeerListener(new PeerListener() {
|
peerGroup.addPeerListener(new PeerListener() {
|
||||||
@Override
|
@Override
|
||||||
public void onFirstPeerAdded(Peer peer) {
|
public void onFirstAuthenticatePeer(Peer peer) {
|
||||||
log.trace("onFirstPeer " + peer.toString());
|
log.trace("onFirstAuthenticatePeer " + peer);
|
||||||
|
sendGetAllDataMessageAfterAuthentication(peer);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onPeerAdded(Peer peer) {
|
public void onPeerAdded(Peer peer) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onPeerRemoved(Address address) {
|
public void onPeerRemoved(Address address) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onConnectionAuthenticated(Connection connection) {
|
public void onConnectionAuthenticated(Connection connection) {
|
||||||
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -270,6 +219,115 @@ public class P2PService {
|
||||||
public void onRemoved(ProtectedData entry) {
|
public void onRemoved(ProtectedData entry) {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
readyForAuthentication = EasyBind.combine(hiddenServicePublished, allDataLoaded, authenticated,
|
||||||
|
(a, b, c) -> a && b && !c);
|
||||||
|
readyForAuthentication.subscribe((observable, oldValue, newValue) -> {
|
||||||
|
// we need to have both the initial data delivered and the hidden service published before we
|
||||||
|
// bootstrap and authenticate to other nodes.
|
||||||
|
if (newValue)
|
||||||
|
authenticateSeedNode();
|
||||||
|
});
|
||||||
|
|
||||||
|
allDataLoaded.addListener((observable, oldValue, newValue) -> {
|
||||||
|
if (newValue)
|
||||||
|
UserThread.execute(() -> p2pServiceListeners.stream().forEach(e -> e.onAllDataReceived()));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// SetupListener implementation
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onTorNodeReady() {
|
||||||
|
UserThread.execute(() -> p2pServiceListeners.stream().forEach(e -> e.onTorNodeReady()));
|
||||||
|
|
||||||
|
// 1. Step: As soon we have the tor node ready (hidden service still not available) we request the
|
||||||
|
// data set from a random seed node.
|
||||||
|
sendGetAllDataMessage(peerGroup.getSeedNodeAddresses());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onHiddenServicePublished() {
|
||||||
|
checkArgument(networkNode.getAddress() != null, "Address must be set when we have the hidden service ready");
|
||||||
|
|
||||||
|
UserThread.execute(() -> p2pServiceListeners.stream().forEach(e -> e.onHiddenServicePublished()));
|
||||||
|
|
||||||
|
// 3. (or 2.). Step: Hidden service is published
|
||||||
|
hiddenServicePublished.set(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSetupFailed(Throwable throwable) {
|
||||||
|
UserThread.execute(() -> p2pServiceListeners.stream().forEach(e -> e.onSetupFailed(throwable)));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sendGetAllDataMessage(Collection<Address> seedNodeAddresses) {
|
||||||
|
if (!seedNodeAddresses.isEmpty()) {
|
||||||
|
log.trace("sendGetAllDataMessage");
|
||||||
|
List<Address> remainingSeedNodeAddresses = new ArrayList<>(seedNodeAddresses);
|
||||||
|
Collections.shuffle(remainingSeedNodeAddresses);
|
||||||
|
Address candidate = remainingSeedNodeAddresses.remove(0);
|
||||||
|
log.info("We try to send a GetAllDataMessage request to a random seed node. " + candidate);
|
||||||
|
|
||||||
|
SettableFuture<Connection> future = networkNode.sendMessage(candidate, new GetAllDataMessage());
|
||||||
|
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(@Nullable Connection connection) {
|
||||||
|
log.info("Send GetAllDataMessage to " + candidate + " succeeded.");
|
||||||
|
connectedSeedNode = candidate;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Throwable throwable) {
|
||||||
|
log.info("Send GetAllDataMessage to " + candidate + " failed. " +
|
||||||
|
"That is expected if other seed nodes are offline." +
|
||||||
|
"\nException:" + throwable.getMessage());
|
||||||
|
log.trace("We try to connect another random seed node. " + remainingSeedNodeAddresses);
|
||||||
|
sendGetAllDataMessage(remainingSeedNodeAddresses);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
log.info("There is no seed node available for requesting data. That is expected for the first seed node.");
|
||||||
|
allDataLoaded();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void allDataLoaded() {
|
||||||
|
// 2. (or 3.) Step: We got all data loaded
|
||||||
|
if (!allDataLoaded.get()) {
|
||||||
|
log.trace("allDataLoaded");
|
||||||
|
allDataLoaded.set(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. Step: hiddenServicePublished and allDataLoaded. We start authenticate to the connected seed node.
|
||||||
|
private void authenticateSeedNode() {
|
||||||
|
if (connectedSeedNode != null) {
|
||||||
|
log.trace("authenticateSeedNode");
|
||||||
|
peerGroup.authenticateSeedNode(connectedSeedNode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 5. Step:
|
||||||
|
private void sendGetAllDataMessageAfterAuthentication(final Peer peer) {
|
||||||
|
log.trace("sendGetDataSetMessageAfterAuthentication");
|
||||||
|
// After authentication we request again data as we might have missed pushed data in the meantime
|
||||||
|
SettableFuture<Connection> future = networkNode.sendMessage(peer.connection, new GetAllDataMessage());
|
||||||
|
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(@Nullable Connection connection) {
|
||||||
|
log.info("onPeerAddressAuthenticated Send GetAllDataMessage to " + peer.address + " succeeded.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(@NotNull Throwable throwable) {
|
||||||
|
log.warn("onPeerAddressAuthenticated Send GetAllDataMessage to " + peer.address + " failed. " +
|
||||||
|
"Exception:" + throwable.getMessage());
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -277,12 +335,10 @@ public class P2PService {
|
||||||
// API
|
// API
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
// startup sequence
|
// used by seed nodes to exclude themselves form list
|
||||||
// networkNode.start
|
public void removeMySeedNodeAddressFromList(Address mySeedNodeAddress) {
|
||||||
// SetupListener.onTorNodeReady: sendGetAllDataMessage
|
peerGroup.removeMySeedNodeAddressFromList(mySeedNodeAddress);
|
||||||
// SetupListener.onHiddenServiceReady: tryStartAuthentication
|
}
|
||||||
// if hiddenServiceReady && allDataReceived) routing.startAuthentication
|
|
||||||
// ConnectionListener.onPeerAddressAuthenticated
|
|
||||||
|
|
||||||
public void start() {
|
public void start() {
|
||||||
start(null);
|
start(null);
|
||||||
|
@ -292,7 +348,7 @@ public class P2PService {
|
||||||
if (listener != null)
|
if (listener != null)
|
||||||
addP2PServiceListener(listener);
|
addP2PServiceListener(listener);
|
||||||
|
|
||||||
networkNode.start(setupListener);
|
networkNode.start(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void shutDown(Runnable shutDownCompleteHandler) {
|
public void shutDown(Runnable shutDownCompleteHandler) {
|
||||||
|
@ -301,9 +357,6 @@ public class P2PService {
|
||||||
|
|
||||||
shutDownResultHandlers.add(shutDownCompleteHandler);
|
shutDownResultHandlers.add(shutDownCompleteHandler);
|
||||||
|
|
||||||
if (sendGetAllDataMessageTimer != null)
|
|
||||||
sendGetAllDataMessageTimer.cancel();
|
|
||||||
|
|
||||||
if (dataStorage != null)
|
if (dataStorage != null)
|
||||||
dataStorage.shutDown();
|
dataStorage.shutDown();
|
||||||
|
|
||||||
|
@ -324,21 +377,6 @@ public class P2PService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isAuthenticated() {
|
|
||||||
return authenticated;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void removeEntryFromMailbox(DecryptedMsgWithPubKey decryptedMsgWithPubKey) {
|
|
||||||
log.trace("removeEntryFromMailbox");
|
|
||||||
ProtectedMailboxData mailboxData = mailboxMap.get(decryptedMsgWithPubKey);
|
|
||||||
if (mailboxData != null && mailboxData.expirablePayload instanceof ExpirableMailboxPayload) {
|
|
||||||
checkArgument(mailboxData.receiversPubKey.equals(keyRing.getSignatureKeyPair().getPublic()),
|
|
||||||
"mailboxData.receiversPubKey is not matching with our key. That must not happen.");
|
|
||||||
removeMailboxData((ExpirableMailboxPayload) mailboxData.expirablePayload, mailboxData.receiversPubKey);
|
|
||||||
mailboxMap.remove(decryptedMsgWithPubKey);
|
|
||||||
log.trace("Removed successfully protectedExpirableData.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
// Messaging
|
// Messaging
|
||||||
|
@ -347,9 +385,7 @@ public class P2PService {
|
||||||
public void sendEncryptedMailMessage(Address peerAddress, PubKeyRing pubKeyRing, MailMessage message,
|
public void sendEncryptedMailMessage(Address peerAddress, PubKeyRing pubKeyRing, MailMessage message,
|
||||||
SendMailMessageListener sendMailMessageListener) {
|
SendMailMessageListener sendMailMessageListener) {
|
||||||
checkNotNull(peerAddress, "PeerAddress must not be null (sendEncryptedMailMessage)");
|
checkNotNull(peerAddress, "PeerAddress must not be null (sendEncryptedMailMessage)");
|
||||||
|
checkAuthentication();
|
||||||
if (!authenticatedToFirstPeer)
|
|
||||||
throw new AuthenticationException("You must be authenticated before sending direct messages.");
|
|
||||||
|
|
||||||
if (!authenticatedPeerAddresses.contains(peerAddress))
|
if (!authenticatedPeerAddresses.contains(peerAddress))
|
||||||
peerGroup.authenticateToPeer(peerAddress,
|
peerGroup.authenticateToPeer(peerAddress,
|
||||||
|
@ -365,7 +401,7 @@ public class P2PService {
|
||||||
try {
|
try {
|
||||||
SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage(
|
SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage(
|
||||||
encryptionService.encryptAndSign(pubKeyRing, message), peerAddress);
|
encryptionService.encryptAndSign(pubKeyRing, message), peerAddress);
|
||||||
SettableFuture<Connection> future = sendMessage(peerAddress, sealedAndSignedMessage);
|
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, sealedAndSignedMessage);
|
||||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(@Nullable Connection connection) {
|
public void onSuccess(@Nullable Connection connection) {
|
||||||
|
@ -389,9 +425,7 @@ public class P2PService {
|
||||||
MailboxMessage message, SendMailboxMessageListener sendMailboxMessageListener) {
|
MailboxMessage message, SendMailboxMessageListener sendMailboxMessageListener) {
|
||||||
checkNotNull(peerAddress, "PeerAddress must not be null (sendEncryptedMailboxMessage)");
|
checkNotNull(peerAddress, "PeerAddress must not be null (sendEncryptedMailboxMessage)");
|
||||||
checkArgument(!keyRing.getPubKeyRing().equals(peersPubKeyRing), "We got own keyring instead of that from peer");
|
checkArgument(!keyRing.getPubKeyRing().equals(peersPubKeyRing), "We got own keyring instead of that from peer");
|
||||||
|
checkAuthentication();
|
||||||
if (!authenticatedToFirstPeer)
|
|
||||||
throw new AuthenticationException("You must be authenticated before sending direct messages.");
|
|
||||||
|
|
||||||
if (authenticatedPeerAddresses.contains(peerAddress)) {
|
if (authenticatedPeerAddresses.contains(peerAddress)) {
|
||||||
trySendEncryptedMailboxMessage(peerAddress, peersPubKeyRing, message, sendMailboxMessageListener);
|
trySendEncryptedMailboxMessage(peerAddress, peersPubKeyRing, message, sendMailboxMessageListener);
|
||||||
|
@ -411,7 +445,7 @@ public class P2PService {
|
||||||
try {
|
try {
|
||||||
SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage(
|
SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage(
|
||||||
encryptionService.encryptAndSign(peersPubKeyRing, message), peerAddress);
|
encryptionService.encryptAndSign(peersPubKeyRing, message), peerAddress);
|
||||||
SettableFuture<Connection> future = sendMessage(peerAddress, sealedAndSignedMessage);
|
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, sealedAndSignedMessage);
|
||||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(@Nullable Connection connection) {
|
public void onSuccess(@Nullable Connection connection) {
|
||||||
|
@ -443,12 +477,11 @@ public class P2PService {
|
||||||
|
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
// ProtectedData
|
// Data storage
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
public boolean addData(ExpirablePayload expirablePayload) {
|
public boolean addData(ExpirablePayload expirablePayload) {
|
||||||
if (!authenticatedToFirstPeer)
|
checkAuthentication();
|
||||||
throw new AuthenticationException("You must be authenticated before adding data to the P2P network.");
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
return dataStorage.add(dataStorage.getDataWithSignedSeqNr(expirablePayload,
|
return dataStorage.add(dataStorage.getDataWithSignedSeqNr(expirablePayload,
|
||||||
|
@ -460,8 +493,7 @@ public class P2PService {
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean addMailboxData(ExpirableMailboxPayload expirableMailboxPayload, PublicKey receiversPublicKey) {
|
public boolean addMailboxData(ExpirableMailboxPayload expirableMailboxPayload, PublicKey receiversPublicKey) {
|
||||||
if (!authenticatedToFirstPeer)
|
checkAuthentication();
|
||||||
throw new AuthenticationException("You must be authenticated before adding data to the P2P network.");
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
return dataStorage.add(dataStorage.getMailboxDataWithSignedSeqNr(expirableMailboxPayload,
|
return dataStorage.add(dataStorage.getMailboxDataWithSignedSeqNr(expirableMailboxPayload,
|
||||||
|
@ -473,8 +505,8 @@ public class P2PService {
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean removeData(ExpirablePayload expirablePayload) {
|
public boolean removeData(ExpirablePayload expirablePayload) {
|
||||||
if (!authenticatedToFirstPeer)
|
checkAuthentication();
|
||||||
throw new AuthenticationException("You must be authenticated before removing data from the P2P network.");
|
|
||||||
try {
|
try {
|
||||||
return dataStorage.remove(dataStorage.getDataWithSignedSeqNr(expirablePayload,
|
return dataStorage.remove(dataStorage.getDataWithSignedSeqNr(expirablePayload,
|
||||||
keyRing.getSignatureKeyPair()), networkNode.getAddress());
|
keyRing.getSignatureKeyPair()), networkNode.getAddress());
|
||||||
|
@ -484,9 +516,22 @@ public class P2PService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void removeEntryFromMailbox(DecryptedMsgWithPubKey decryptedMsgWithPubKey) {
|
||||||
|
checkAuthentication();
|
||||||
|
|
||||||
|
ProtectedMailboxData mailboxData = mailboxMap.get(decryptedMsgWithPubKey);
|
||||||
|
if (mailboxData != null && mailboxData.expirablePayload instanceof ExpirableMailboxPayload) {
|
||||||
|
checkArgument(mailboxData.receiversPubKey.equals(keyRing.getSignatureKeyPair().getPublic()),
|
||||||
|
"mailboxData.receiversPubKey is not matching with our key. That must not happen.");
|
||||||
|
removeMailboxData((ExpirableMailboxPayload) mailboxData.expirablePayload, mailboxData.receiversPubKey);
|
||||||
|
mailboxMap.remove(decryptedMsgWithPubKey);
|
||||||
|
log.trace("Removed successfully protectedExpirableData.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public boolean removeMailboxData(ExpirableMailboxPayload expirableMailboxPayload, PublicKey receiversPublicKey) {
|
public boolean removeMailboxData(ExpirableMailboxPayload expirableMailboxPayload, PublicKey receiversPublicKey) {
|
||||||
if (!authenticatedToFirstPeer)
|
checkAuthentication();
|
||||||
throw new AuthenticationException("You must be authenticated before removing data from the P2P network.");
|
|
||||||
try {
|
try {
|
||||||
return dataStorage.removeMailboxData(dataStorage.getMailboxDataWithSignedSeqNr(expirableMailboxPayload,
|
return dataStorage.removeMailboxData(dataStorage.getMailboxDataWithSignedSeqNr(expirableMailboxPayload,
|
||||||
keyRing.getSignatureKeyPair(), receiversPublicKey), networkNode.getAddress());
|
keyRing.getSignatureKeyPair(), receiversPublicKey), networkNode.getAddress());
|
||||||
|
@ -541,10 +586,15 @@ public class P2PService {
|
||||||
dataStorage.addHashMapChangedListener(hashMapChangedListener);
|
dataStorage.addHashMapChangedListener(hashMapChangedListener);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
// Getters
|
// Getters
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
public boolean isAuthenticated() {
|
||||||
|
return authenticated.get();
|
||||||
|
}
|
||||||
|
|
||||||
public NetworkNode getNetworkNode() {
|
public NetworkNode getNetworkNode() {
|
||||||
return networkNode;
|
return networkNode;
|
||||||
}
|
}
|
||||||
|
@ -566,94 +616,7 @@ public class P2PService {
|
||||||
// Private
|
// Private
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
private void sendGetAllDataMessage(Set<Address> seedNodeAddresses) {
|
private HashSet<ProtectedData> getDataSet() {
|
||||||
Address networkNodeAddress = networkNode.getAddress();
|
|
||||||
if (networkNodeAddress != null)
|
|
||||||
seedNodeAddresses.remove(networkNodeAddress);
|
|
||||||
List<Address> remainingSeedNodeAddresses = new ArrayList<>(seedNodeAddresses);
|
|
||||||
|
|
||||||
if (!seedNodeAddresses.isEmpty()) {
|
|
||||||
Collections.shuffle(remainingSeedNodeAddresses);
|
|
||||||
Address candidate = remainingSeedNodeAddresses.remove(0);
|
|
||||||
log.info("We try to send a GetAllDataMessage request to a random seed node. " + candidate);
|
|
||||||
|
|
||||||
// we use a nonce to see if we are sending to ourselves in case we are a seed node
|
|
||||||
// we don't know our own onion address at that moment so we cannot filter seed nodes
|
|
||||||
SettableFuture<Connection> future = sendMessage(candidate, new GetDataSetMessage(addToListAndGetNonce()));
|
|
||||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
|
||||||
@Override
|
|
||||||
public void onSuccess(@Nullable Connection connection) {
|
|
||||||
log.info("Send GetAllDataMessage to " + candidate + " succeeded.");
|
|
||||||
connectedSeedNodes.add(candidate);
|
|
||||||
|
|
||||||
// we try to connect to 2 seed nodes
|
|
||||||
if (connectedSeedNodes.size() < 2 && !remainingSeedNodeAddresses.isEmpty()) {
|
|
||||||
// give a random pause of 1-3 sec. before using the next
|
|
||||||
|
|
||||||
if (sendGetAllDataMessageTimer != null) sendGetAllDataMessageTimer.cancel();
|
|
||||||
sendGetAllDataMessageTimer = Utilities.runTimerTaskWithRandomDelay(() -> {
|
|
||||||
Thread.currentThread().setName("SendGetAllDataMessageTimer-" + new Random().nextInt(1000));
|
|
||||||
try {
|
|
||||||
UserThread.execute(() -> sendGetAllDataMessage(Sets.newHashSet(remainingSeedNodeAddresses)));
|
|
||||||
} catch (Throwable t) {
|
|
||||||
t.printStackTrace();
|
|
||||||
log.error("Executing task failed. " + t.getMessage());
|
|
||||||
}
|
|
||||||
}, 1, 3);
|
|
||||||
} else {
|
|
||||||
allSeedNodesRequested = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(Throwable throwable) {
|
|
||||||
log.info("Send GetAllDataMessage to " + candidate + " failed. Exception:" + throwable.getMessage());
|
|
||||||
log.trace("We try to connect another random seed node. " + remainingSeedNodeAddresses);
|
|
||||||
UserThread.execute(() -> sendGetAllDataMessage(Sets.newHashSet(remainingSeedNodeAddresses)));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
log.info("There is no seed node available for requesting data. That is expected for the first seed node.");
|
|
||||||
dataReceived();
|
|
||||||
allSeedNodesRequested = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private long addToListAndGetNonce() {
|
|
||||||
long nonce = new Random().nextLong();
|
|
||||||
while (nonce == 0) {
|
|
||||||
nonce = new Random().nextLong();
|
|
||||||
}
|
|
||||||
getDataSetMessageNonceList.add(nonce);
|
|
||||||
return nonce;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void dataReceived() {
|
|
||||||
if (!allDataReceived) {
|
|
||||||
allDataReceived = true;
|
|
||||||
UserThread.execute(() -> p2pServiceListeners.stream().forEach(e -> e.onAllDataReceived()));
|
|
||||||
|
|
||||||
tryStartAuthentication();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void tryStartAuthentication() {
|
|
||||||
// we need to have both the initial data delivered and the hidden service published before we
|
|
||||||
// bootstrap and authenticate to other nodes
|
|
||||||
if (allDataReceived && hiddenServiceReady) {
|
|
||||||
// we remove ourselves in case we are a seed node
|
|
||||||
checkArgument(networkNode.getAddress() != null, "Address must be set when we are authenticated");
|
|
||||||
connectedSeedNodes.remove(networkNode.getAddress());
|
|
||||||
|
|
||||||
peerGroup.startAuthentication(connectedSeedNodes);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private SettableFuture<Connection> sendMessage(Address peerAddress, Message message) {
|
|
||||||
return networkNode.sendMessage(peerAddress, message);
|
|
||||||
}
|
|
||||||
|
|
||||||
private HashSet<ProtectedData> getHashSet() {
|
|
||||||
return new HashSet<>(dataStorage.getMap().values());
|
return new HashSet<>(dataStorage.getMap().values());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -670,14 +633,6 @@ public class P2PService {
|
||||||
Address senderAddress = mailboxMessage.getSenderAddress();
|
Address senderAddress = mailboxMessage.getSenderAddress();
|
||||||
checkNotNull(senderAddress, "senderAddress must not be null for mailbox messages");
|
checkNotNull(senderAddress, "senderAddress must not be null for mailbox messages");
|
||||||
|
|
||||||
log.trace("mailboxData.publicKey " + mailboxData.ownerStoragePubKey.hashCode());
|
|
||||||
log.trace("keyRing.getStorageSignatureKeyPair().getPublic() "
|
|
||||||
+ keyRing.getSignatureKeyPair().getPublic().hashCode());
|
|
||||||
log.trace("keyRing.getMsgSignatureKeyPair().getPublic() "
|
|
||||||
+ keyRing.getSignatureKeyPair().getPublic().hashCode());
|
|
||||||
log.trace("keyRing.getMsgEncryptionKeyPair().getPublic() "
|
|
||||||
+ keyRing.getEncryptionKeyPair().getPublic().hashCode());
|
|
||||||
|
|
||||||
|
|
||||||
mailboxMap.put(decryptedMsgWithPubKey, mailboxData);
|
mailboxMap.put(decryptedMsgWithPubKey, mailboxData);
|
||||||
log.trace("Decryption of SealedAndSignedMessage succeeded. senderAddress="
|
log.trace("Decryption of SealedAndSignedMessage succeeded. senderAddress="
|
||||||
|
@ -691,4 +646,9 @@ public class P2PService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void checkAuthentication() {
|
||||||
|
if (authenticatedPeerAddresses.isEmpty())
|
||||||
|
throw new AuthenticationException("You must be authenticated before adding data to the P2P network.");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -103,7 +103,7 @@ public class Connection {
|
||||||
// API
|
// API
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
public synchronized void setAuthenticated(Address peerAddress, Connection connection) {
|
public void setAuthenticated(Address peerAddress, Connection connection) {
|
||||||
this.peerAddress = peerAddress;
|
this.peerAddress = peerAddress;
|
||||||
isAuthenticated = true;
|
isAuthenticated = true;
|
||||||
UserThread.execute(() -> sharedSpace.getConnectionListener().onPeerAddressAuthenticated(peerAddress, connection));
|
UserThread.execute(() -> sharedSpace.getConnectionListener().onPeerAddressAuthenticated(peerAddress, connection));
|
||||||
|
@ -149,7 +149,7 @@ public class Connection {
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
public synchronized Address getPeerAddress() {
|
public Address getPeerAddress() {
|
||||||
return peerAddress;
|
return peerAddress;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -157,7 +157,7 @@ public class Connection {
|
||||||
return sharedSpace.getLastActivityDate();
|
return sharedSpace.getLastActivityDate();
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized boolean isAuthenticated() {
|
public boolean isAuthenticated() {
|
||||||
return isAuthenticated;
|
return isAuthenticated;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -321,11 +321,11 @@ public class Connection {
|
||||||
this.useCompression = useCompression;
|
this.useCompression = useCompression;
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void updateLastActivityDate() {
|
public void updateLastActivityDate() {
|
||||||
lastActivityDate = new Date();
|
lastActivityDate = new Date();
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized Date getLastActivityDate() {
|
public Date getLastActivityDate() {
|
||||||
return lastActivityDate;
|
return lastActivityDate;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,8 +25,8 @@ import java.util.function.Consumer;
|
||||||
public class LocalhostNetworkNode extends NetworkNode {
|
public class LocalhostNetworkNode extends NetworkNode {
|
||||||
private static final Logger log = LoggerFactory.getLogger(LocalhostNetworkNode.class);
|
private static final Logger log = LoggerFactory.getLogger(LocalhostNetworkNode.class);
|
||||||
|
|
||||||
private static int simulateTorDelayTorNode = 1 * 1000;
|
private static int simulateTorDelayTorNode = 1 * 100;
|
||||||
private static int simulateTorDelayHiddenService = 2 * 1000;
|
private static int simulateTorDelayHiddenService = 2 * 100;
|
||||||
private Address address;
|
private Address address;
|
||||||
|
|
||||||
public static void setSimulateTorDelayTorNode(int simulateTorDelayTorNode) {
|
public static void setSimulateTorDelayTorNode(int simulateTorDelayTorNode) {
|
||||||
|
@ -69,7 +69,7 @@ public class LocalhostNetworkNode extends NetworkNode {
|
||||||
address = new Address("localhost", port);
|
address = new Address("localhost", port);
|
||||||
|
|
||||||
|
|
||||||
setupListeners.stream().forEach(e -> e.onHiddenServiceReady());
|
setupListeners.stream().forEach(e -> e.onHiddenServicePublished());
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -125,6 +125,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
|
||||||
ListenableFuture<Connection> future = executorService.submit(() -> {
|
ListenableFuture<Connection> future = executorService.submit(() -> {
|
||||||
Thread.currentThread().setName("NetworkNode:SendMessage-to-connection-" + connection.getObjectId());
|
Thread.currentThread().setName("NetworkNode:SendMessage-to-connection-" + connection.getObjectId());
|
||||||
try {
|
try {
|
||||||
|
log.debug("## connection.sendMessage");
|
||||||
connection.sendMessage(message);
|
connection.sendMessage(message);
|
||||||
return connection;
|
return connection;
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
|
@ -134,10 +135,12 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
|
||||||
final SettableFuture<Connection> resultFuture = SettableFuture.create();
|
final SettableFuture<Connection> resultFuture = SettableFuture.create();
|
||||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||||
public void onSuccess(Connection connection) {
|
public void onSuccess(Connection connection) {
|
||||||
|
log.debug("## connection.sendMessage onSuccess");
|
||||||
UserThread.execute(() -> resultFuture.set(connection));
|
UserThread.execute(() -> resultFuture.set(connection));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void onFailure(@NotNull Throwable throwable) {
|
public void onFailure(@NotNull Throwable throwable) {
|
||||||
|
log.debug("## connection.sendMessage onFailure");
|
||||||
UserThread.execute(() -> resultFuture.setException(throwable));
|
UserThread.execute(() -> resultFuture.setException(throwable));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -5,7 +5,7 @@ public interface SetupListener {
|
||||||
|
|
||||||
void onTorNodeReady();
|
void onTorNodeReady();
|
||||||
|
|
||||||
void onHiddenServiceReady();
|
void onHiddenServicePublished();
|
||||||
|
|
||||||
void onSetupFailed(Throwable throwable);
|
void onSetupFailed(Throwable throwable);
|
||||||
|
|
||||||
|
|
|
@ -7,7 +7,6 @@ import com.google.common.util.concurrent.MoreExecutors;
|
||||||
import com.msopentech.thali.java.toronionproxy.JavaOnionProxyContext;
|
import com.msopentech.thali.java.toronionproxy.JavaOnionProxyContext;
|
||||||
import com.msopentech.thali.java.toronionproxy.JavaOnionProxyManager;
|
import com.msopentech.thali.java.toronionproxy.JavaOnionProxyManager;
|
||||||
import io.bitsquare.common.UserThread;
|
import io.bitsquare.common.UserThread;
|
||||||
import io.bitsquare.common.util.Utilities;
|
|
||||||
import io.bitsquare.p2p.Address;
|
import io.bitsquare.p2p.Address;
|
||||||
import io.nucleo.net.HiddenServiceDescriptor;
|
import io.nucleo.net.HiddenServiceDescriptor;
|
||||||
import io.nucleo.net.TorNode;
|
import io.nucleo.net.TorNode;
|
||||||
|
@ -79,12 +78,8 @@ public class TorNetworkNode extends NetworkNode {
|
||||||
TorNetworkNode.this.hiddenServiceDescriptor = hiddenServiceDescriptor;
|
TorNetworkNode.this.hiddenServiceDescriptor = hiddenServiceDescriptor;
|
||||||
|
|
||||||
startServer(hiddenServiceDescriptor.getServerSocket());
|
startServer(hiddenServiceDescriptor.getServerSocket());
|
||||||
Runnable task = () -> {
|
UserThread.runAfter(() -> setupListeners.stream().forEach(e -> e.onHiddenServicePublished()),
|
||||||
Thread.currentThread().setName("DelayNotifySetupListenersTimer-" + new Random().nextInt(1000));
|
500, TimeUnit.MILLISECONDS);
|
||||||
setupListeners.stream()
|
|
||||||
.forEach(e -> UserThread.execute(() -> e.onHiddenServiceReady()));
|
|
||||||
};
|
|
||||||
Utilities.runTimerTask(task, 500, TimeUnit.MILLISECONDS);
|
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -102,8 +97,7 @@ public class TorNetworkNode extends NetworkNode {
|
||||||
log.info("Shutdown TorNetworkNode");
|
log.info("Shutdown TorNetworkNode");
|
||||||
this.shutDownCompleteHandler = shutDownCompleteHandler;
|
this.shutDownCompleteHandler = shutDownCompleteHandler;
|
||||||
|
|
||||||
shutDownTimeoutTimer = Utilities.runTimerTask(() -> {
|
shutDownTimeoutTimer = UserThread.runAfter(() -> {
|
||||||
Thread.currentThread().setName("ShutDownTimeoutTimer-" + new Random().nextInt(1000));
|
|
||||||
log.error("A timeout occurred at shutDown");
|
log.error("A timeout occurred at shutDown");
|
||||||
shutDownExecutorService();
|
shutDownExecutorService();
|
||||||
}, SHUT_DOWN_TIMEOUT, TimeUnit.DAYS.MILLISECONDS);
|
}, SHUT_DOWN_TIMEOUT, TimeUnit.DAYS.MILLISECONDS);
|
||||||
|
@ -176,8 +170,7 @@ public class TorNetworkNode extends NetworkNode {
|
||||||
private void restartTor() {
|
private void restartTor() {
|
||||||
restartCounter++;
|
restartCounter++;
|
||||||
if (restartCounter <= MAX_RESTART_ATTEMPTS) {
|
if (restartCounter <= MAX_RESTART_ATTEMPTS) {
|
||||||
shutDown(() -> Utilities.runTimerTask(() -> {
|
shutDown(() -> UserThread.runAfter(() -> {
|
||||||
Thread.currentThread().setName("RestartTorTimer-" + new Random().nextInt(1000));
|
|
||||||
log.warn("We restart tor as starting tor failed.");
|
log.warn("We restart tor as starting tor failed.");
|
||||||
start(null);
|
start(null);
|
||||||
}, WAIT_BEFORE_RESTART, TimeUnit.MILLISECONDS));
|
}, WAIT_BEFORE_RESTART, TimeUnit.MILLISECONDS));
|
||||||
|
|
|
@ -0,0 +1,255 @@
|
||||||
|
package io.bitsquare.p2p.peer;
|
||||||
|
|
||||||
|
import com.google.common.collect.Sets;
|
||||||
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
|
import com.google.common.util.concurrent.Futures;
|
||||||
|
import com.google.common.util.concurrent.SettableFuture;
|
||||||
|
import io.bitsquare.common.UserThread;
|
||||||
|
import io.bitsquare.common.util.Tuple2;
|
||||||
|
import io.bitsquare.p2p.Address;
|
||||||
|
import io.bitsquare.p2p.network.Connection;
|
||||||
|
import io.bitsquare.p2p.network.NetworkNode;
|
||||||
|
import io.bitsquare.p2p.peer.messages.ChallengeMessage;
|
||||||
|
import io.bitsquare.p2p.peer.messages.GetPeersMessage;
|
||||||
|
import io.bitsquare.p2p.peer.messages.PeersMessage;
|
||||||
|
import io.bitsquare.p2p.peer.messages.RequestAuthenticationMessage;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
import org.jetbrains.annotations.Nullable;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
|
||||||
|
// authentication example:
|
||||||
|
// node2 -> node1 RequestAuthenticationMessage
|
||||||
|
// node1: close connection
|
||||||
|
// node1 -> node2 ChallengeMessage on new connection
|
||||||
|
// node2: authentication to node1 done if nonce ok
|
||||||
|
// node2 -> node1 GetPeersMessage
|
||||||
|
// node1: authentication to node2 done if nonce ok
|
||||||
|
// node1 -> node2 PeersMessage
|
||||||
|
|
||||||
|
public class AuthenticationHandshake {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(AuthenticationHandshake.class);
|
||||||
|
|
||||||
|
private final NetworkNode networkNode;
|
||||||
|
private final PeerGroup peerGroup;
|
||||||
|
private final Address myAddress;
|
||||||
|
|
||||||
|
private SettableFuture<Connection> resultFuture;
|
||||||
|
private long startAuthTs;
|
||||||
|
private long nonce = 0;
|
||||||
|
|
||||||
|
public AuthenticationHandshake(NetworkNode networkNode, PeerGroup peerGroup, Address myAddress) {
|
||||||
|
this.networkNode = networkNode;
|
||||||
|
this.peerGroup = peerGroup;
|
||||||
|
this.myAddress = myAddress;
|
||||||
|
|
||||||
|
setupMessageListener();
|
||||||
|
}
|
||||||
|
|
||||||
|
public SettableFuture<Connection> requestAuthenticationToPeer(Address peerAddress) {
|
||||||
|
// Requesting peer
|
||||||
|
resultFuture = SettableFuture.create();
|
||||||
|
startAuthTs = System.currentTimeMillis();
|
||||||
|
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, new RequestAuthenticationMessage(myAddress, getAndSetNonce()));
|
||||||
|
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(@Nullable Connection connection) {
|
||||||
|
log.info("send RequestAuthenticationMessage to " + peerAddress + " succeeded.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(@NotNull Throwable throwable) {
|
||||||
|
log.info("Send RequestAuthenticationMessage to " + peerAddress + " failed." +
|
||||||
|
"\nException:" + throwable.getMessage());
|
||||||
|
UserThread.execute(() -> resultFuture.setException(throwable));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return resultFuture;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SettableFuture<Connection> requestAuthentication(Set<Address> remainingAddresses, Address peerAddress) {
|
||||||
|
log.info("requestAuthentication " + this);
|
||||||
|
log.info("remainingAddresses " + remainingAddresses);
|
||||||
|
log.info("peerAddress " + peerAddress);
|
||||||
|
// Requesting peer
|
||||||
|
resultFuture = SettableFuture.create();
|
||||||
|
startAuthTs = System.currentTimeMillis();
|
||||||
|
remainingAddresses.remove(peerAddress);
|
||||||
|
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, new RequestAuthenticationMessage(myAddress, getAndSetNonce()));
|
||||||
|
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(@Nullable Connection connection) {
|
||||||
|
log.info("send RequestAuthenticationMessage to " + peerAddress + " succeeded.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(@NotNull Throwable throwable) {
|
||||||
|
log.info("Send RequestAuthenticationMessage to " + peerAddress + " failed." +
|
||||||
|
"\nThat is expected if seed nodes are offline." +
|
||||||
|
"\nException:" + throwable.getMessage());
|
||||||
|
log.trace("We try to authenticate to another random seed nodes of that list: " + remainingAddresses);
|
||||||
|
authenticateToNextRandomPeer(remainingAddresses);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return resultFuture;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public SettableFuture<Connection> processAuthenticationRequest(RequestAuthenticationMessage requestAuthenticationMessage, Connection connection) {
|
||||||
|
// Responding peer
|
||||||
|
resultFuture = SettableFuture.create();
|
||||||
|
startAuthTs = System.currentTimeMillis();
|
||||||
|
|
||||||
|
Address peerAddress = requestAuthenticationMessage.address;
|
||||||
|
log.trace("RequestAuthenticationMessage from " + peerAddress + " at " + myAddress);
|
||||||
|
connection.shutDown(() -> UserThread.runAfter(() -> {
|
||||||
|
// we delay a bit as listeners for connection.onDisconnect are on other threads and might lead to
|
||||||
|
// inconsistent state (removal of connection from NetworkNode.authenticatedConnections)
|
||||||
|
log.trace("processAuthenticationMessage: connection.shutDown complete. RequestAuthenticationMessage from " + peerAddress + " at " + myAddress);
|
||||||
|
|
||||||
|
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, new ChallengeMessage(myAddress, requestAuthenticationMessage.nonce, getAndSetNonce()));
|
||||||
|
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(Connection connection) {
|
||||||
|
log.debug("onSuccess sending ChallengeMessage");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Throwable throwable) {
|
||||||
|
log.warn("onFailure sending ChallengeMessage.");
|
||||||
|
UserThread.execute(() -> resultFuture.setException(throwable));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
},
|
||||||
|
100 + PeerGroup.simulateAuthTorNode,
|
||||||
|
TimeUnit.MILLISECONDS));
|
||||||
|
|
||||||
|
return resultFuture;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setupMessageListener() {
|
||||||
|
networkNode.addMessageListener((message, connection) -> {
|
||||||
|
if (message instanceof ChallengeMessage) {
|
||||||
|
// Requesting peer
|
||||||
|
ChallengeMessage challengeMessage = (ChallengeMessage) message;
|
||||||
|
Address peerAddress = challengeMessage.address;
|
||||||
|
log.trace("ChallengeMessage from " + peerAddress + " at " + myAddress);
|
||||||
|
log.trace("challengeMessage" + challengeMessage);
|
||||||
|
// HashMap<Address, Long> tempNonceMap = new HashMap<>(nonceMap);
|
||||||
|
boolean verified = nonce != 0 && nonce == challengeMessage.requesterNonce;
|
||||||
|
if (verified) {
|
||||||
|
connection.setPeerAddress(peerAddress);
|
||||||
|
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress,
|
||||||
|
new GetPeersMessage(myAddress, challengeMessage.challengerNonce, new HashSet<>(peerGroup.getAllPeerAddresses())));
|
||||||
|
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(Connection connection) {
|
||||||
|
log.trace("GetPeersMessage sent successfully from " + myAddress + " to " + peerAddress);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(@NotNull Throwable throwable) {
|
||||||
|
log.info("GetPeersMessage sending failed " + throwable.getMessage());
|
||||||
|
UserThread.execute(() -> resultFuture.setException(throwable));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
log.warn("verify nonce failed. challengeMessage=" + challengeMessage + " / nonce=" + nonce);
|
||||||
|
UserThread.execute(() -> resultFuture.setException(new Exception("Verify nonce failed. challengeMessage=" + challengeMessage + " / nonceMap=" + nonce)));
|
||||||
|
}
|
||||||
|
} else if (message instanceof GetPeersMessage) {
|
||||||
|
// Responding peer
|
||||||
|
GetPeersMessage getPeersMessage = (GetPeersMessage) message;
|
||||||
|
Address peerAddress = getPeersMessage.address;
|
||||||
|
log.trace("GetPeersMessage from " + peerAddress + " at " + myAddress);
|
||||||
|
boolean verified = nonce != 0 && nonce == getPeersMessage.challengerNonce;
|
||||||
|
if (verified) {
|
||||||
|
// we add the reported peers to our own set
|
||||||
|
HashSet<Address> peerAddresses = ((GetPeersMessage) message).peerAddresses;
|
||||||
|
log.trace("Received peers: " + peerAddresses);
|
||||||
|
peerGroup.addToReportedPeers(peerAddresses, connection);
|
||||||
|
|
||||||
|
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress,
|
||||||
|
new PeersMessage(myAddress, new HashSet<>(peerGroup.getAllPeerAddresses())));
|
||||||
|
log.trace("sent PeersMessage to " + peerAddress + " from " + myAddress
|
||||||
|
+ " with allPeers=" + peerGroup.getAllPeerAddresses());
|
||||||
|
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(Connection connection) {
|
||||||
|
log.trace("PeersMessage sent successfully from " + myAddress + " to " + peerAddress);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(@NotNull Throwable throwable) {
|
||||||
|
log.info("PeersMessage sending failed " + throwable.getMessage());
|
||||||
|
UserThread.execute(() -> resultFuture.setException(throwable));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
log.info("\n\nAuthenticationComplete\nPeer with address " + peerAddress
|
||||||
|
+ " authenticated (" + connection.getObjectId() + "). Took "
|
||||||
|
+ (System.currentTimeMillis() - startAuthTs) + " ms. \n\n");
|
||||||
|
|
||||||
|
UserThread.execute(() -> resultFuture.set(connection));
|
||||||
|
} else {
|
||||||
|
log.warn("verify nonce failed. getPeersMessage=" + getPeersMessage + " / nonceMap=" + nonce);
|
||||||
|
UserThread.execute(() -> resultFuture.setException(new Exception("Verify nonce failed. getPeersMessage=" + getPeersMessage + " / nonceMap=" + nonce)));
|
||||||
|
}
|
||||||
|
} else if (message instanceof PeersMessage) {
|
||||||
|
// Requesting peer
|
||||||
|
PeersMessage peersMessage = (PeersMessage) message;
|
||||||
|
Address peerAddress = peersMessage.address;
|
||||||
|
log.trace("PeersMessage from " + peerAddress + " at " + myAddress);
|
||||||
|
HashSet<Address> peerAddresses = peersMessage.peerAddresses;
|
||||||
|
log.trace("Received peers: " + peerAddresses);
|
||||||
|
peerGroup.addToReportedPeers(peerAddresses, connection);
|
||||||
|
|
||||||
|
// we wait until the handshake is completed before setting the authenticate flag
|
||||||
|
// authentication at both sides of the connection
|
||||||
|
log.info("\n\nAuthenticationComplete\nPeer with address " + peerAddress
|
||||||
|
+ " authenticated (" + connection.getObjectId() + "). Took "
|
||||||
|
+ (System.currentTimeMillis() - startAuthTs) + " ms. \n\n");
|
||||||
|
|
||||||
|
UserThread.execute(() -> resultFuture.set(connection));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void authenticateToNextRandomPeer(Set<Address> remainingAddresses) {
|
||||||
|
Optional<Tuple2<Address, Set<Address>>> tupleOptional = getRandomAddressAndRemainingSet(remainingAddresses);
|
||||||
|
if (tupleOptional.isPresent()) {
|
||||||
|
Tuple2<Address, Set<Address>> tuple = tupleOptional.get();
|
||||||
|
requestAuthentication(tuple.second, tuple.first);
|
||||||
|
} else {
|
||||||
|
log.info("No other seed node found. That is expected for the first seed node.");
|
||||||
|
UserThread.execute(() -> resultFuture.set(null));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Optional<Tuple2<Address, Set<Address>>> getRandomAddressAndRemainingSet(Set<Address> addresses) {
|
||||||
|
if (!addresses.isEmpty()) {
|
||||||
|
List<Address> list = new ArrayList<>(addresses);
|
||||||
|
Collections.shuffle(list);
|
||||||
|
Address address = list.remove(0);
|
||||||
|
return Optional.of(new Tuple2<>(address, Sets.newHashSet(list)));
|
||||||
|
} else {
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private long getAndSetNonce() {
|
||||||
|
nonce = new Random().nextLong();
|
||||||
|
while (nonce == 0)
|
||||||
|
nonce = getAndSetNonce();
|
||||||
|
|
||||||
|
return nonce;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -4,7 +4,7 @@ import io.bitsquare.p2p.Address;
|
||||||
import io.bitsquare.p2p.network.Connection;
|
import io.bitsquare.p2p.network.Connection;
|
||||||
|
|
||||||
public abstract class AuthenticationListener implements PeerListener {
|
public abstract class AuthenticationListener implements PeerListener {
|
||||||
public void onFirstPeerAdded(Peer peer) {
|
public void onFirstAuthenticatePeer(Peer peer) {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void onPeerAdded(Peer peer) {
|
public void onPeerAdded(Peer peer) {
|
||||||
|
|
|
@ -1,14 +1,18 @@
|
||||||
package io.bitsquare.p2p.peer;
|
package io.bitsquare.p2p.peer;
|
||||||
|
|
||||||
import com.google.common.collect.Sets;
|
|
||||||
import com.google.common.util.concurrent.FutureCallback;
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.SettableFuture;
|
import com.google.common.util.concurrent.SettableFuture;
|
||||||
import io.bitsquare.common.UserThread;
|
import io.bitsquare.common.UserThread;
|
||||||
import io.bitsquare.common.util.Utilities;
|
|
||||||
import io.bitsquare.p2p.Address;
|
import io.bitsquare.p2p.Address;
|
||||||
import io.bitsquare.p2p.network.*;
|
import io.bitsquare.p2p.network.Connection;
|
||||||
import io.bitsquare.p2p.peer.messages.*;
|
import io.bitsquare.p2p.network.ConnectionListener;
|
||||||
|
import io.bitsquare.p2p.network.MessageListener;
|
||||||
|
import io.bitsquare.p2p.network.NetworkNode;
|
||||||
|
import io.bitsquare.p2p.peer.messages.MaintenanceMessage;
|
||||||
|
import io.bitsquare.p2p.peer.messages.PingMessage;
|
||||||
|
import io.bitsquare.p2p.peer.messages.PongMessage;
|
||||||
|
import io.bitsquare.p2p.peer.messages.RequestAuthenticationMessage;
|
||||||
import io.bitsquare.p2p.storage.messages.BroadcastMessage;
|
import io.bitsquare.p2p.storage.messages.BroadcastMessage;
|
||||||
import org.jetbrains.annotations.NotNull;
|
import org.jetbrains.annotations.NotNull;
|
||||||
import org.jetbrains.annotations.Nullable;
|
import org.jetbrains.annotations.Nullable;
|
||||||
|
@ -21,10 +25,13 @@ import java.util.concurrent.CopyOnWriteArraySet;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static com.google.common.base.Preconditions.checkArgument;
|
||||||
|
|
||||||
|
|
||||||
public class PeerGroup {
|
public class PeerGroup {
|
||||||
private static final Logger log = LoggerFactory.getLogger(PeerGroup.class);
|
private static final Logger log = LoggerFactory.getLogger(PeerGroup.class);
|
||||||
|
|
||||||
private static int simulateAuthTorNode = 0;
|
static int simulateAuthTorNode = 0;
|
||||||
|
|
||||||
public static void setSimulateAuthTorNode(int simulateAuthTorNode) {
|
public static void setSimulateAuthTorNode(int simulateAuthTorNode) {
|
||||||
PeerGroup.simulateAuthTorNode = simulateAuthTorNode;
|
PeerGroup.simulateAuthTorNode = simulateAuthTorNode;
|
||||||
|
@ -33,19 +40,18 @@ public class PeerGroup {
|
||||||
private static int MAX_CONNECTIONS = 8;
|
private static int MAX_CONNECTIONS = 8;
|
||||||
private static int MAINTENANCE_INTERVAL = new Random().nextInt(2 * 60 * 1000) + 2 * 60 * 1000; // 2-4 min.
|
private static int MAINTENANCE_INTERVAL = new Random().nextInt(2 * 60 * 1000) + 2 * 60 * 1000; // 2-4 min.
|
||||||
private static int PING_AFTER_CONNECTION_INACTIVITY = 30 * 1000;
|
private static int PING_AFTER_CONNECTION_INACTIVITY = 30 * 1000;
|
||||||
private long startAuthTs;
|
|
||||||
|
|
||||||
public static void setMaxConnections(int maxConnections) {
|
public static void setMaxConnections(int maxConnections) {
|
||||||
MAX_CONNECTIONS = maxConnections;
|
MAX_CONNECTIONS = maxConnections;
|
||||||
}
|
}
|
||||||
|
|
||||||
private final NetworkNode networkNode;
|
private final NetworkNode networkNode;
|
||||||
private final CopyOnWriteArraySet<Address> seedNodes;
|
|
||||||
private final ConcurrentHashMap<Address, Long> nonceMap = new ConcurrentHashMap<>();
|
|
||||||
|
private final Set<Address> seedNodeAddresses;
|
||||||
private final CopyOnWriteArraySet<PeerListener> peerListeners = new CopyOnWriteArraySet<>();
|
private final CopyOnWriteArraySet<PeerListener> peerListeners = new CopyOnWriteArraySet<>();
|
||||||
private final ConcurrentHashMap<Address, Peer> authenticatedPeers = new ConcurrentHashMap<>();
|
private final ConcurrentHashMap<Address, Peer> authenticatedPeers = new ConcurrentHashMap<>();
|
||||||
private final CopyOnWriteArraySet<Address> reportedPeerAddresses = new CopyOnWriteArraySet<>();
|
private final CopyOnWriteArraySet<Address> reportedPeerAddresses = new CopyOnWriteArraySet<>();
|
||||||
private final ConcurrentHashMap<Address, Runnable> authenticationCompleteHandlers = new ConcurrentHashMap<>();
|
|
||||||
;
|
;
|
||||||
|
|
||||||
private final Timer maintenanceTimer = new Timer();
|
private final Timer maintenanceTimer = new Timer();
|
||||||
|
@ -58,20 +64,39 @@ public class PeerGroup {
|
||||||
|
|
||||||
public PeerGroup(final NetworkNode networkNode, Set<Address> seeds) {
|
public PeerGroup(final NetworkNode networkNode, Set<Address> seeds) {
|
||||||
this.networkNode = networkNode;
|
this.networkNode = networkNode;
|
||||||
|
this.seedNodeAddresses = seeds;
|
||||||
// We copy it as we remove ourselves later from the list if we are a seed node
|
|
||||||
this.seedNodes = new CopyOnWriteArraySet<>(seeds);
|
|
||||||
|
|
||||||
init(networkNode);
|
init(networkNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void init(NetworkNode networkNode) {
|
private void init(NetworkNode networkNode) {
|
||||||
networkNode.addMessageListener((message, connection) -> {
|
networkNode.addMessageListener((message, connection) -> {
|
||||||
if (message instanceof AuthenticationMessage)
|
if (message instanceof MaintenanceMessage)
|
||||||
processAuthenticationMessage((AuthenticationMessage) message, connection);
|
|
||||||
else if (message instanceof MaintenanceMessage)
|
|
||||||
processMaintenanceMessage((MaintenanceMessage) message, connection);
|
processMaintenanceMessage((MaintenanceMessage) message, connection);
|
||||||
|
else if (message instanceof RequestAuthenticationMessage) {
|
||||||
|
AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, PeerGroup.this, getMyAddress());
|
||||||
|
SettableFuture<Connection> future = authenticationHandshake.processAuthenticationRequest((RequestAuthenticationMessage) message, connection);
|
||||||
|
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(@Nullable Connection connection) {
|
||||||
|
if (connection != null) {
|
||||||
|
UserThread.execute(() -> {
|
||||||
|
setAuthenticated(connection, connection.getPeerAddress());
|
||||||
|
purgeReportedPeers();
|
||||||
});
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(@NotNull Throwable throwable) {
|
||||||
|
throwable.printStackTrace();
|
||||||
|
log.error("AuthenticationHandshake failed. " + throwable.getMessage());
|
||||||
|
UserThread.execute(() -> removePeer(connection.getPeerAddress()));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
networkNode.addConnectionListener(new ConnectionListener() {
|
networkNode.addConnectionListener(new ConnectionListener() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -96,24 +121,6 @@ public class PeerGroup {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
networkNode.addSetupListener(new SetupListener() {
|
|
||||||
@Override
|
|
||||||
public void onTorNodeReady() {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onHiddenServiceReady() {
|
|
||||||
// remove ourselves in case we are a seed node
|
|
||||||
Address myAddress = getAddress();
|
|
||||||
if (myAddress != null)
|
|
||||||
seedNodes.remove(myAddress);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onSetupFailed(Throwable throwable) {
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
maintenanceTimer.scheduleAtFixedRate(new TimerTask() {
|
maintenanceTimer.scheduleAtFixedRate(new TimerTask() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
@ -140,11 +147,7 @@ public class PeerGroup {
|
||||||
log.info("Number of connections exceeds MAX_CONNECTIONS. Current size=" + authenticatedConnections.size());
|
log.info("Number of connections exceeds MAX_CONNECTIONS. Current size=" + authenticatedConnections.size());
|
||||||
Connection connection = authenticatedConnections.remove(0);
|
Connection connection = authenticatedConnections.remove(0);
|
||||||
log.info("Shutdown oldest connection with last activity date=" + connection.getLastActivityDate() + " / connection=" + connection);
|
log.info("Shutdown oldest connection with last activity date=" + connection.getLastActivityDate() + " / connection=" + connection);
|
||||||
|
connection.shutDown(() -> UserThread.runAfterRandomDelay(() -> disconnectOldConnections(), 100, 500, TimeUnit.MILLISECONDS));
|
||||||
connection.shutDown(() -> Utilities.runTimerTask(() -> {
|
|
||||||
Thread.currentThread().setName("DelayDisconnectOldConnectionsTimer-" + new Random().nextInt(1000));
|
|
||||||
disconnectOldConnections();
|
|
||||||
}, 1, TimeUnit.MILLISECONDS));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -153,8 +156,7 @@ public class PeerGroup {
|
||||||
Set<Peer> connectedPeersList = new HashSet<>(authenticatedPeers.values());
|
Set<Peer> connectedPeersList = new HashSet<>(authenticatedPeers.values());
|
||||||
connectedPeersList.stream()
|
connectedPeersList.stream()
|
||||||
.filter(e -> (new Date().getTime() - e.connection.getLastActivityDate().getTime()) > PING_AFTER_CONNECTION_INACTIVITY)
|
.filter(e -> (new Date().getTime() - e.connection.getLastActivityDate().getTime()) > PING_AFTER_CONNECTION_INACTIVITY)
|
||||||
.forEach(e -> Utilities.runTimerTaskWithRandomDelay(() -> {
|
.forEach(e -> UserThread.runAfterRandomDelay(() -> {
|
||||||
Thread.currentThread().setName("DelayPingPeersTimer-" + new Random().nextInt(1000));
|
|
||||||
SettableFuture<Connection> future = networkNode.sendMessage(e.connection, new PingMessage(e.getPingNonce()));
|
SettableFuture<Connection> future = networkNode.sendMessage(e.connection, new PingMessage(e.getPingNonce()));
|
||||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -176,6 +178,10 @@ public class PeerGroup {
|
||||||
// API
|
// API
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
public void removeMySeedNodeAddressFromList(Address mySeedNodeAddress) {
|
||||||
|
seedNodeAddresses.remove(mySeedNodeAddress);
|
||||||
|
}
|
||||||
|
|
||||||
public void shutDown() {
|
public void shutDown() {
|
||||||
if (!shutDownInProgress) {
|
if (!shutDownInProgress) {
|
||||||
shutDownInProgress = true;
|
shutDownInProgress = true;
|
||||||
|
@ -189,15 +195,16 @@ public class PeerGroup {
|
||||||
log.trace("message = " + message);
|
log.trace("message = " + message);
|
||||||
printConnectedPeersMap();
|
printConnectedPeersMap();
|
||||||
|
|
||||||
|
// TODO add randomized timing?
|
||||||
authenticatedPeers.values().stream()
|
authenticatedPeers.values().stream()
|
||||||
.filter(e -> !e.address.equals(sender))
|
.filter(e -> !e.address.equals(sender))
|
||||||
.forEach(peer -> {
|
.forEach(peer -> {
|
||||||
log.trace("Broadcast message from " + getAddress() + " to " + peer.address + ".");
|
log.trace("Broadcast message from " + getMyAddress() + " to " + peer.address + ".");
|
||||||
SettableFuture<Connection> future = networkNode.sendMessage(peer.address, message);
|
SettableFuture<Connection> future = networkNode.sendMessage(peer.address, message);
|
||||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(Connection connection) {
|
public void onSuccess(Connection connection) {
|
||||||
log.trace("Broadcast from " + getAddress() + " to " + peer.address + " succeeded.");
|
log.trace("Broadcast from " + getMyAddress() + " to " + peer.address + " succeeded.");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -209,6 +216,270 @@ public class PeerGroup {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Authentication to seed node
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
public void authenticateSeedNode(Address peerAddress) {
|
||||||
|
authenticateToSeedNode(new HashSet<>(seedNodeAddresses), peerAddress, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void authenticateToSeedNode(Set<Address> remainingAddresses, Address peerAddress, boolean continueOnSuccess) {
|
||||||
|
checkArgument(!authenticatedPeers.containsKey(peerAddress),
|
||||||
|
"We have that peer already authenticated. That must never happen.");
|
||||||
|
|
||||||
|
AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress());
|
||||||
|
SettableFuture<Connection> future = authenticationHandshake.requestAuthentication(remainingAddresses, peerAddress);
|
||||||
|
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(@Nullable Connection connection) {
|
||||||
|
if (connection != null) {
|
||||||
|
setAuthenticated(connection, peerAddress);
|
||||||
|
if (continueOnSuccess) {
|
||||||
|
if (getAuthenticatedPeers().size() <= MAX_CONNECTIONS) {
|
||||||
|
log.info("We still don't have enough connections. Lets try the reported peers.");
|
||||||
|
authenticateToAnyReportedPeer();
|
||||||
|
} else {
|
||||||
|
log.info("We have already enough connections.");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.info("We have already tried all reported peers and seed nodes. " +
|
||||||
|
"We stop bootstrapping now, but will repeat after an while.");
|
||||||
|
UserThread.runAfter(() -> authenticateToAnyReportedPeer(), 60);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(@NotNull Throwable throwable) {
|
||||||
|
throwable.printStackTrace();
|
||||||
|
log.error("AuthenticationHandshake failed. " + throwable.getMessage());
|
||||||
|
removePeer(peerAddress);
|
||||||
|
|
||||||
|
// If we fail we try again with the remaining set
|
||||||
|
remainingAddresses.remove(peerAddress);
|
||||||
|
List<Address> list = new ArrayList<>(remainingAddresses);
|
||||||
|
removeAuthenticatedPeersFromList(list);
|
||||||
|
if (!list.isEmpty()) {
|
||||||
|
Address item = getAndRemoveRandomItem(list);
|
||||||
|
log.info("We try to build an authenticated connection to a seed node. " + item);
|
||||||
|
authenticateToSeedNode(remainingAddresses, item, true);
|
||||||
|
} else {
|
||||||
|
log.info("We don't have any more seed nodes for connecting. Lets try the reported peers.");
|
||||||
|
authenticateToAnyReportedPeer();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void authenticateToAnyReportedPeer() {
|
||||||
|
// after we have at least one seed node we try to get reported peers connected
|
||||||
|
List<Address> list = new ArrayList<>(reportedPeerAddresses);
|
||||||
|
removeAuthenticatedPeersFromList(list);
|
||||||
|
if (!list.isEmpty()) {
|
||||||
|
Address item = getAndRemoveRandomItem(list);
|
||||||
|
log.info("We try to build an authenticated connection to a random peer. " + item + " / list=" + list);
|
||||||
|
authenticateToReportedPeer(new HashSet<>(list), item);
|
||||||
|
} else {
|
||||||
|
log.info("We don't have any reported peers for connecting. Lets try the remaining seed nodes.");
|
||||||
|
authenticateToRemainingSeedNodes();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void authenticateToReportedPeer(Set<Address> remainingAddresses, Address peerAddress) {
|
||||||
|
checkArgument(!authenticatedPeers.containsKey(peerAddress),
|
||||||
|
"We have that peer already authenticated. That must never happen.");
|
||||||
|
|
||||||
|
AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress());
|
||||||
|
SettableFuture<Connection> future = authenticationHandshake.requestAuthentication(remainingAddresses, peerAddress);
|
||||||
|
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(@Nullable Connection connection) {
|
||||||
|
if (connection != null) {
|
||||||
|
setAuthenticated(connection, peerAddress);
|
||||||
|
if (getAuthenticatedPeers().size() <= MAX_CONNECTIONS) {
|
||||||
|
log.info("We still don't have enough connections. Lets try the remaining seed nodes.");
|
||||||
|
authenticateToRemainingSeedNodes();
|
||||||
|
} else {
|
||||||
|
log.info("We have already enough connections.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(@NotNull Throwable throwable) {
|
||||||
|
throwable.printStackTrace();
|
||||||
|
log.error("AuthenticationHandshake failed. " + throwable.getMessage());
|
||||||
|
removePeer(peerAddress);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void authenticateToRemainingSeedNodes() {
|
||||||
|
// after we have at least one seed node we try to get reported peers connected
|
||||||
|
List<Address> list = new ArrayList<>(seedNodeAddresses);
|
||||||
|
removeAuthenticatedPeersFromList(list);
|
||||||
|
if (!list.isEmpty()) {
|
||||||
|
Address item = getAndRemoveRandomItem(list);
|
||||||
|
log.info("We try to build an authenticated connection to a random seed node. " + item + " / list=" + list);
|
||||||
|
authenticateToSeedNode(new HashSet<>(list), item, false);
|
||||||
|
} else {
|
||||||
|
log.info("We don't have any more seed nodes for connecting. " +
|
||||||
|
"We stop bootstrapping now, but will repeat after an while.");
|
||||||
|
UserThread.runAfter(() -> authenticateToAnyReportedPeer(), 60);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*private void authenticateToAnyNode1(Set<Address> addresses, Address peerAddress, boolean prioritizeSeedNodes) {
|
||||||
|
checkArgument(!authenticatedPeers.containsKey(peerAddress),
|
||||||
|
"We have that peer already authenticated. That must never happen.");
|
||||||
|
|
||||||
|
AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress());
|
||||||
|
SettableFuture<Connection> future = authenticationHandshake.requestAuthentication(addresses, peerAddress);
|
||||||
|
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(@Nullable Connection connection) {
|
||||||
|
setAuthenticated(connection, peerAddress);
|
||||||
|
authenticateToNextRandomPeer();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Throwable throwable) {
|
||||||
|
throwable.printStackTrace();
|
||||||
|
log.error("AuthenticationHandshake failed. " + throwable.getMessage());
|
||||||
|
removePeer(peerAddress);
|
||||||
|
authenticateToNextRandomPeer();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void authenticateToNextRandomPeer() {
|
||||||
|
UserThread.runAfterRandomDelay(() -> {
|
||||||
|
log.info("authenticateToNextRandomPeer");
|
||||||
|
if (getAuthenticatedPeers().size() <= MAX_CONNECTIONS) {
|
||||||
|
Optional<Address> candidate = getRandomReportedPeerAddress();
|
||||||
|
if (candidate.isPresent()) {
|
||||||
|
log.info("We try to build an authenticated connection to a random peer. " + candidate.get());
|
||||||
|
authenticateToReportedPeer(candidate.get(), );
|
||||||
|
} else {
|
||||||
|
log.info("No more reportedPeerAddresses available for connecting. We try the remaining seed nodes");
|
||||||
|
candidate = getRandomSeedNodeAddress();
|
||||||
|
if (candidate.isPresent()) {
|
||||||
|
log.info("We try to build an authenticated connection to a random seed node. " + candidate.get());
|
||||||
|
authenticateToReportedPeer(candidate.get(), get);
|
||||||
|
} else {
|
||||||
|
log.info("No more seed nodes available for connecting.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.info("We have already enough connections.");
|
||||||
|
}
|
||||||
|
}, 200, 400, TimeUnit.MILLISECONDS);
|
||||||
|
}*/
|
||||||
|
|
||||||
|
private Optional<Address> getRandomSeedNodeAddress() {
|
||||||
|
List<Address> list = new ArrayList<>(seedNodeAddresses);
|
||||||
|
log.debug("### getRandomSeedNodeAddress list " + list);
|
||||||
|
removeAuthenticatedPeersFromList(list);
|
||||||
|
log.debug("### list post removeAuthenticatedPeersFromList " + list);
|
||||||
|
return getRandomEntry(list);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Optional<Address> getRandomReportedPeerAddress() {
|
||||||
|
List<Address> list = new ArrayList<>(reportedPeerAddresses);
|
||||||
|
log.debug("### list reportedPeerAddresses " + reportedPeerAddresses);
|
||||||
|
log.debug("### list authenticatedPeers " + authenticatedPeers);
|
||||||
|
log.debug("### list pre " + list);
|
||||||
|
removeAuthenticatedPeersFromList(list);
|
||||||
|
log.debug("### list post " + list);
|
||||||
|
return getRandomEntry(list);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void removeAuthenticatedPeersFromList(List<Address> list) {
|
||||||
|
authenticatedPeers.values().stream().forEach(e -> list.remove(e.address));
|
||||||
|
}
|
||||||
|
|
||||||
|
private Optional<Address> getRandomEntry(List<Address> list) {
|
||||||
|
if (list.size() > 0) {
|
||||||
|
Collections.shuffle(list);
|
||||||
|
return Optional.of(list.get(0));
|
||||||
|
} else {
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Authentication to non-seed node peer
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
public void authenticateToPeer(Address peerAddress, @Nullable Runnable authenticationCompleteHandler, @Nullable Runnable faultHandler) {
|
||||||
|
checkArgument(!authenticatedPeers.containsKey(peerAddress),
|
||||||
|
"We have that seed node already authenticated. That must never happen.");
|
||||||
|
|
||||||
|
AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress());
|
||||||
|
SettableFuture<Connection> future = authenticationHandshake.requestAuthenticationToPeer(peerAddress);
|
||||||
|
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(@Nullable Connection connection) {
|
||||||
|
if (connection != null) {
|
||||||
|
setAuthenticated(connection, peerAddress);
|
||||||
|
if (authenticationCompleteHandler != null)
|
||||||
|
authenticationCompleteHandler.run();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(@NotNull Throwable throwable) {
|
||||||
|
throwable.printStackTrace();
|
||||||
|
log.error("AuthenticationHandshake failed. " + throwable.getMessage());
|
||||||
|
removePeer(peerAddress);
|
||||||
|
if (faultHandler != null)
|
||||||
|
faultHandler.run();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void setAuthenticated(Connection connection, Address peerAddress) {
|
||||||
|
log.info("\n\n############################################################\n" +
|
||||||
|
"We are authenticated to:" +
|
||||||
|
"\nconnection=" + connection
|
||||||
|
+ "\nmyAddress=" + getMyAddress()
|
||||||
|
+ "\npeerAddress= " + peerAddress
|
||||||
|
+ "\n############################################################\n");
|
||||||
|
|
||||||
|
connection.setAuthenticated(peerAddress, connection);
|
||||||
|
|
||||||
|
Peer peer = new Peer(connection);
|
||||||
|
addAuthenticatedPeer(peerAddress, peer);
|
||||||
|
|
||||||
|
peerListeners.stream().forEach(e -> e.onConnectionAuthenticated(connection));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addAuthenticatedPeer(Address address, Peer peer) {
|
||||||
|
boolean firstPeerAdded;
|
||||||
|
authenticatedPeers.put(address, peer);
|
||||||
|
firstPeerAdded = authenticatedPeers.size() == 1;
|
||||||
|
|
||||||
|
UserThread.execute(() -> peerListeners.stream().forEach(e -> e.onPeerAdded(peer)));
|
||||||
|
|
||||||
|
if (firstPeerAdded)
|
||||||
|
UserThread.execute(() -> peerListeners.stream().forEach(e -> e.onFirstAuthenticatePeer(peer)));
|
||||||
|
|
||||||
|
if (authenticatedPeers.size() > MAX_CONNECTIONS)
|
||||||
|
disconnectOldConnections();
|
||||||
|
|
||||||
|
printConnectedPeersMap();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Listeners
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
public void addMessageListener(MessageListener messageListener) {
|
public void addMessageListener(MessageListener messageListener) {
|
||||||
networkNode.addMessageListener(messageListener);
|
networkNode.addMessageListener(messageListener);
|
||||||
}
|
}
|
||||||
|
@ -225,6 +496,11 @@ public class PeerGroup {
|
||||||
peerListeners.remove(peerListener);
|
peerListeners.remove(peerListener);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Getters
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
public Map<Address, Peer> getAuthenticatedPeers() {
|
public Map<Address, Peer> getAuthenticatedPeers() {
|
||||||
return authenticatedPeers;
|
return authenticatedPeers;
|
||||||
}
|
}
|
||||||
|
@ -233,202 +509,19 @@ public class PeerGroup {
|
||||||
CopyOnWriteArraySet<Address> allPeerAddresses = new CopyOnWriteArraySet<>(reportedPeerAddresses);
|
CopyOnWriteArraySet<Address> allPeerAddresses = new CopyOnWriteArraySet<>(reportedPeerAddresses);
|
||||||
allPeerAddresses.addAll(authenticatedPeers.values().stream()
|
allPeerAddresses.addAll(authenticatedPeers.values().stream()
|
||||||
.map(e -> e.address).collect(Collectors.toList()));
|
.map(e -> e.address).collect(Collectors.toList()));
|
||||||
// remove own address and seed nodes
|
|
||||||
allPeerAddresses.remove(getAddress());
|
|
||||||
return allPeerAddresses;
|
return allPeerAddresses;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Set<Address> getSeedNodeAddresses() {
|
||||||
|
return seedNodeAddresses;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
// Authentication
|
// Reported peers
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
// authentication example:
|
void addToReportedPeers(HashSet<Address> peerAddresses, Connection connection) {
|
||||||
// node2 -> node1 RequestAuthenticationMessage
|
|
||||||
// node1: close connection
|
|
||||||
// node1 -> node2 ChallengeMessage on new connection
|
|
||||||
// node2: authentication to node1 done if nonce ok
|
|
||||||
// node2 -> node1 GetPeersMessage
|
|
||||||
// node1: authentication to node2 done if nonce ok
|
|
||||||
// node1 -> node2 PeersMessage
|
|
||||||
|
|
||||||
public void startAuthentication(Set<Address> connectedSeedNodes) {
|
|
||||||
connectedSeedNodes.forEach(connectedSeedNode -> {
|
|
||||||
sendRequestAuthenticationMessage(seedNodes, connectedSeedNode);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private void sendRequestAuthenticationMessage(Set<Address> remainingSeedNodes, final Address address) {
|
|
||||||
log.info("We try to authenticate to a random seed node. " + address);
|
|
||||||
startAuthTs = System.currentTimeMillis();
|
|
||||||
final boolean[] alreadyConnected = {false};
|
|
||||||
authenticatedPeers.values().stream().forEach(e -> {
|
|
||||||
remainingSeedNodes.remove(e.address);
|
|
||||||
if (address.equals(e.address))
|
|
||||||
alreadyConnected[0] = true;
|
|
||||||
});
|
|
||||||
if (!alreadyConnected[0]) {
|
|
||||||
long nonce = addToMapAndGetNonce(address);
|
|
||||||
SettableFuture<Connection> future = networkNode.sendMessage(address, new RequestAuthenticationMessage(getAddress(), nonce));
|
|
||||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
|
||||||
@Override
|
|
||||||
public void onSuccess(@Nullable Connection connection) {
|
|
||||||
log.info("send RequestAuthenticationMessage to " + address + " succeeded.");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(@NotNull Throwable throwable) {
|
|
||||||
log.info("Send RequestAuthenticationMessage to " + address + " failed. Exception:" + throwable.getMessage());
|
|
||||||
log.trace("We try to authenticate to another random seed nodes of that list: " + remainingSeedNodes);
|
|
||||||
getNextSeedNode(remainingSeedNodes);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
getNextSeedNode(remainingSeedNodes);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void getNextSeedNode(Set<Address> remainingSeedNodes) {
|
|
||||||
List<Address> remainingSeedNodeAddresses = new ArrayList<>(remainingSeedNodes);
|
|
||||||
|
|
||||||
Address myAddress = getAddress();
|
|
||||||
if (myAddress != null)
|
|
||||||
remainingSeedNodeAddresses.remove(myAddress);
|
|
||||||
|
|
||||||
if (!remainingSeedNodeAddresses.isEmpty()) {
|
|
||||||
Collections.shuffle(remainingSeedNodeAddresses);
|
|
||||||
Address address = remainingSeedNodeAddresses.remove(0);
|
|
||||||
sendRequestAuthenticationMessage(Sets.newHashSet(remainingSeedNodeAddresses), address);
|
|
||||||
} else {
|
|
||||||
log.info("No other seed node found. That is expected for the first seed node.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private void processAuthenticationMessage(AuthenticationMessage message, Connection connection) {
|
|
||||||
log.trace("processAuthenticationMessage " + message + " from " + connection.getPeerAddress() + " at " + getAddress());
|
|
||||||
if (message instanceof RequestAuthenticationMessage) {
|
|
||||||
RequestAuthenticationMessage requestAuthenticationMessage = (RequestAuthenticationMessage) message;
|
|
||||||
Address peerAddress = requestAuthenticationMessage.address;
|
|
||||||
log.trace("RequestAuthenticationMessage from " + peerAddress + " at " + getAddress());
|
|
||||||
connection.shutDown(() -> Utilities.runTimerTask(() -> {
|
|
||||||
Thread.currentThread().setName("DelaySendChallengeMessageTimer-" + new Random().nextInt(1000));
|
|
||||||
// we delay a bit as listeners for connection.onDisconnect are on other threads and might lead to
|
|
||||||
// inconsistent state (removal of connection from NetworkNode.authenticatedConnections)
|
|
||||||
log.trace("processAuthenticationMessage: connection.shutDown complete. RequestAuthenticationMessage from " + peerAddress + " at " + getAddress());
|
|
||||||
long nonce = addToMapAndGetNonce(peerAddress);
|
|
||||||
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, new ChallengeMessage(getAddress(), requestAuthenticationMessage.nonce, nonce));
|
|
||||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
|
||||||
@Override
|
|
||||||
public void onSuccess(Connection connection) {
|
|
||||||
log.debug("onSuccess sending ChallengeMessage");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(Throwable throwable) {
|
|
||||||
log.warn("onFailure sending ChallengeMessage. We try again.");
|
|
||||||
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, new ChallengeMessage(getAddress(), requestAuthenticationMessage.nonce, nonce));
|
|
||||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
|
||||||
@Override
|
|
||||||
public void onSuccess(Connection connection) {
|
|
||||||
log.debug("onSuccess sending 2. ChallengeMessage");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(Throwable throwable) {
|
|
||||||
log.warn("onFailure sending ChallengeMessage. We give up.");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
});
|
|
||||||
},
|
|
||||||
100 + simulateAuthTorNode,
|
|
||||||
TimeUnit.MILLISECONDS));
|
|
||||||
} else if (message instanceof ChallengeMessage) {
|
|
||||||
ChallengeMessage challengeMessage = (ChallengeMessage) message;
|
|
||||||
Address peerAddress = challengeMessage.address;
|
|
||||||
log.trace("ChallengeMessage from " + peerAddress + " at " + getAddress());
|
|
||||||
log.trace("nonceMap" + nonceMap);
|
|
||||||
log.trace("challengeMessage" + challengeMessage);
|
|
||||||
HashMap<Address, Long> tempNonceMap = new HashMap<>(nonceMap);
|
|
||||||
boolean verified = verifyNonceAndAuthenticatePeerAddress(challengeMessage.requesterNonce, peerAddress);
|
|
||||||
if (verified) {
|
|
||||||
connection.setPeerAddress(peerAddress);
|
|
||||||
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress,
|
|
||||||
new GetPeersMessage(getAddress(), challengeMessage.challengerNonce, new ArrayList<Address>(getAllPeerAddresses())));
|
|
||||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
|
||||||
@Override
|
|
||||||
public void onSuccess(Connection connection) {
|
|
||||||
log.trace("GetPeersMessage sent successfully from " + getAddress() + " to " + peerAddress);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(@NotNull Throwable throwable) {
|
|
||||||
log.info("GetPeersMessage sending failed " + throwable.getMessage());
|
|
||||||
removePeer(peerAddress);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
log.warn("verifyNonceAndAuthenticatePeerAddress failed. challengeMessage=" + challengeMessage + " / nonceMap=" + tempNonceMap);
|
|
||||||
}
|
|
||||||
} else if (message instanceof GetPeersMessage) {
|
|
||||||
GetPeersMessage getPeersMessage = (GetPeersMessage) message;
|
|
||||||
Address peerAddress = getPeersMessage.address;
|
|
||||||
log.trace("GetPeersMessage from " + peerAddress + " at " + getAddress());
|
|
||||||
boolean verified = verifyNonceAndAuthenticatePeerAddress(getPeersMessage.challengerNonce, peerAddress);
|
|
||||||
if (verified) {
|
|
||||||
setAuthenticated(connection, peerAddress);
|
|
||||||
purgeReportedPeers();
|
|
||||||
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress,
|
|
||||||
new PeersMessage(getAddress(), new ArrayList(getAllPeerAddresses())));
|
|
||||||
log.trace("sent PeersMessage to " + peerAddress + " from " + getAddress()
|
|
||||||
+ " with allPeers=" + getAllPeerAddresses());
|
|
||||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
|
||||||
@Override
|
|
||||||
public void onSuccess(Connection connection) {
|
|
||||||
log.trace("PeersMessage sent successfully from " + getAddress() + " to " + peerAddress);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(@NotNull Throwable throwable) {
|
|
||||||
log.info("PeersMessage sending failed " + throwable.getMessage());
|
|
||||||
removePeer(peerAddress);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// now we add the reported peers to our own set
|
|
||||||
ArrayList<Address> peerAddresses = ((GetPeersMessage) message).peerAddresses;
|
|
||||||
log.trace("Received peers: " + peerAddresses);
|
|
||||||
// remove ourselves
|
|
||||||
addToReportedPeers(peerAddresses, connection);
|
|
||||||
}
|
|
||||||
} else if (message instanceof PeersMessage) {
|
|
||||||
PeersMessage peersMessage = (PeersMessage) message;
|
|
||||||
Address peerAddress = peersMessage.address;
|
|
||||||
log.trace("PeersMessage from " + peerAddress + " at " + getAddress());
|
|
||||||
ArrayList<Address> peerAddresses = peersMessage.peerAddresses;
|
|
||||||
log.trace("Received peers: " + peerAddresses);
|
|
||||||
// remove ourselves
|
|
||||||
addToReportedPeers(peerAddresses, connection);
|
|
||||||
|
|
||||||
// we wait until the handshake is completed before setting the authenticate flag
|
|
||||||
// authentication at both sides of the connection
|
|
||||||
|
|
||||||
log.info("\n\nAuthenticationComplete\nPeer with address " + peerAddress
|
|
||||||
+ " authenticated (" + connection.getObjectId() + "). Took "
|
|
||||||
+ (System.currentTimeMillis() - startAuthTs) + " ms. \n\n");
|
|
||||||
|
|
||||||
setAuthenticated(connection, peerAddress);
|
|
||||||
|
|
||||||
Runnable authenticationCompleteHandler = authenticationCompleteHandlers.remove(connection.getPeerAddress());
|
|
||||||
if (authenticationCompleteHandler != null)
|
|
||||||
authenticationCompleteHandler.run();
|
|
||||||
|
|
||||||
authenticateToNextRandomPeer();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void addToReportedPeers(ArrayList<Address> peerAddresses, Connection connection) {
|
|
||||||
log.trace("addToReportedPeers");
|
log.trace("addToReportedPeers");
|
||||||
// we disconnect misbehaving nodes trying to send too many peers
|
// we disconnect misbehaving nodes trying to send too many peers
|
||||||
// reported peers include the peers connected peers which is normally max. 8 but we give some headroom
|
// reported peers include the peers connected peers which is normally max. 8 but we give some headroom
|
||||||
|
@ -436,113 +529,29 @@ public class PeerGroup {
|
||||||
if (peerAddresses.size() > 1100) {
|
if (peerAddresses.size() > 1100) {
|
||||||
connection.shutDown();
|
connection.shutDown();
|
||||||
} else {
|
} else {
|
||||||
peerAddresses.remove(getAddress());
|
peerAddresses.remove(getMyAddress());
|
||||||
reportedPeerAddresses.addAll(peerAddresses);
|
reportedPeerAddresses.addAll(peerAddresses);
|
||||||
purgeReportedPeers();
|
purgeReportedPeers();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void purgeReportedPeers() {
|
void purgeReportedPeers() {
|
||||||
log.trace("purgeReportedPeers");
|
log.trace("purgeReportedPeers");
|
||||||
int all = getAllPeerAddresses().size();
|
int all = getAllPeerAddresses().size();
|
||||||
if (all > 1000) {
|
if (all > 1000) {
|
||||||
int diff = all - 100;
|
int diff = all - 100;
|
||||||
List<Address> list = new ArrayList<>(getNotConnectedPeerAddresses());
|
List<Address> list = new LinkedList<>(getReportedNotConnectedPeerAddresses());
|
||||||
for (int i = 0; i < diff; i++) {
|
for (int i = 0; i < diff; i++) {
|
||||||
Address toRemove = list.remove(new Random().nextInt(list.size()));
|
Address toRemove = getAndRemoveRandomItem(list);
|
||||||
reportedPeerAddresses.remove(toRemove);
|
reportedPeerAddresses.remove(toRemove);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized CopyOnWriteArraySet<Address> getNotConnectedPeerAddresses() {
|
private Set<Address> getReportedNotConnectedPeerAddresses() {
|
||||||
CopyOnWriteArraySet<Address> list = new CopyOnWriteArraySet<>(getAllPeerAddresses());
|
Set<Address> set = new HashSet<>(reportedPeerAddresses);
|
||||||
authenticatedPeers.values().stream().forEach(e -> list.remove(e.address));
|
authenticatedPeers.values().stream().forEach(e -> set.remove(e.address));
|
||||||
return list;
|
return set;
|
||||||
}
|
|
||||||
|
|
||||||
private void authenticateToNextRandomPeer() {
|
|
||||||
Utilities.runTimerTaskWithRandomDelay(() -> {
|
|
||||||
Thread.currentThread().setName("DelayAuthenticateToNextRandomPeerTimer-" + new Random().nextInt(1000));
|
|
||||||
if (getAuthenticatedPeers().size() <= MAX_CONNECTIONS) {
|
|
||||||
Address randomNotConnectedPeerAddress = getRandomNotConnectedPeerAddress();
|
|
||||||
if (randomNotConnectedPeerAddress != null) {
|
|
||||||
log.info("We try to build an authenticated connection to a random peer. " + randomNotConnectedPeerAddress);
|
|
||||||
authenticateToPeer(randomNotConnectedPeerAddress, null, () -> authenticateToNextRandomPeer());
|
|
||||||
} else {
|
|
||||||
log.info("No more peers available for connecting.");
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.info("We have already enough connections.");
|
|
||||||
}
|
|
||||||
}, 200, 400, TimeUnit.MILLISECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void authenticateToPeer(Address address, @Nullable Runnable authenticationCompleteHandler, @Nullable Runnable faultHandler) {
|
|
||||||
startAuthTs = System.currentTimeMillis();
|
|
||||||
|
|
||||||
if (authenticationCompleteHandler != null)
|
|
||||||
authenticationCompleteHandlers.put(address, authenticationCompleteHandler);
|
|
||||||
|
|
||||||
long nonce = addToMapAndGetNonce(address);
|
|
||||||
SettableFuture<Connection> future = networkNode.sendMessage(address, new RequestAuthenticationMessage(getAddress(), nonce));
|
|
||||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
|
||||||
@Override
|
|
||||||
public void onSuccess(@Nullable Connection connection) {
|
|
||||||
log.debug("send RequestAuthenticationMessage succeeded");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(@NotNull Throwable throwable) {
|
|
||||||
log.info("send IdMessage failed. " + throwable.getMessage());
|
|
||||||
removePeer(address);
|
|
||||||
if (faultHandler != null) faultHandler.run();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private long addToMapAndGetNonce(Address peerAddress) {
|
|
||||||
long nonce = new Random().nextLong();
|
|
||||||
while (nonce == 0) {
|
|
||||||
nonce = new Random().nextLong();
|
|
||||||
}
|
|
||||||
log.trace("addToMapAndGetNonce nonceMap=" + nonceMap + " / peerAddress=" + peerAddress);
|
|
||||||
nonceMap.put(peerAddress, nonce);
|
|
||||||
return nonce;
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean verifyNonceAndAuthenticatePeerAddress(long peersNonce, Address peerAddress) {
|
|
||||||
log.trace("verifyNonceAndAuthenticatePeerAddress nonceMap=" + nonceMap + " / peerAddress=" + peerAddress);
|
|
||||||
Long nonce = nonceMap.remove(peerAddress);
|
|
||||||
return nonce != null && nonce == peersNonce;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void setAuthenticated(Connection connection, Address peerAddress) {
|
|
||||||
log.info("\n\n############################################################\n" +
|
|
||||||
"We are authenticated to:" +
|
|
||||||
"\nconnection=" + connection
|
|
||||||
+ "\nmyAddress=" + getAddress()
|
|
||||||
+ "\npeerAddress= " + peerAddress
|
|
||||||
+ "\n############################################################\n");
|
|
||||||
|
|
||||||
connection.setAuthenticated(peerAddress, connection);
|
|
||||||
|
|
||||||
Peer peer = new Peer(connection);
|
|
||||||
addAuthenticatedPeer(peerAddress, peer);
|
|
||||||
|
|
||||||
peerListeners.stream().forEach(e -> e.onConnectionAuthenticated(connection));
|
|
||||||
|
|
||||||
log.debug("\n### setAuthenticated post connection " + connection);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Address getRandomNotConnectedPeerAddress() {
|
|
||||||
List<Address> list = new ArrayList<>(getNotConnectedPeerAddresses());
|
|
||||||
if (list.size() > 0) {
|
|
||||||
Collections.shuffle(list);
|
|
||||||
return list.get(0);
|
|
||||||
} else {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -551,7 +560,7 @@ public class PeerGroup {
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
private void processMaintenanceMessage(MaintenanceMessage message, Connection connection) {
|
private void processMaintenanceMessage(MaintenanceMessage message, Connection connection) {
|
||||||
log.debug("Received message " + message + " at " + getAddress() + " from " + connection.getPeerAddress());
|
log.debug("Received message " + message + " at " + getMyAddress() + " from " + connection.getPeerAddress());
|
||||||
if (message instanceof PingMessage) {
|
if (message instanceof PingMessage) {
|
||||||
SettableFuture<Connection> future = networkNode.sendMessage(connection, new PongMessage(((PingMessage) message).nonce));
|
SettableFuture<Connection> future = networkNode.sendMessage(connection, new PongMessage(((PingMessage) message).nonce));
|
||||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||||
|
@ -571,7 +580,7 @@ public class PeerGroup {
|
||||||
if (peer != null) {
|
if (peer != null) {
|
||||||
if (((PongMessage) message).nonce != peer.getPingNonce()) {
|
if (((PongMessage) message).nonce != peer.getPingNonce()) {
|
||||||
removePeer(peer.address);
|
removePeer(peer.address);
|
||||||
log.warn("PongMessage invalid: self/peer " + getAddress() + "/" + connection.getPeerAddress());
|
log.warn("PongMessage invalid: self/peer " + getMyAddress() + "/" + connection.getPeerAddress());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -582,40 +591,19 @@ public class PeerGroup {
|
||||||
// Peers
|
// Peers
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
private void removePeer(@Nullable Address peerAddress) {
|
void removePeer(@Nullable Address peerAddress) {
|
||||||
reportedPeerAddresses.remove(peerAddress);
|
reportedPeerAddresses.remove(peerAddress);
|
||||||
|
|
||||||
Peer disconnectedPeer;
|
Peer disconnectedPeer = authenticatedPeers.remove(peerAddress);
|
||||||
disconnectedPeer = authenticatedPeers.remove(peerAddress);
|
|
||||||
|
|
||||||
if (disconnectedPeer != null)
|
if (disconnectedPeer != null)
|
||||||
UserThread.execute(() -> peerListeners.stream().forEach(e -> e.onPeerRemoved(peerAddress)));
|
UserThread.execute(() -> peerListeners.stream().forEach(e -> e.onPeerRemoved(peerAddress)));
|
||||||
|
|
||||||
printConnectedPeersMap();
|
printConnectedPeersMap();
|
||||||
printReportedPeersMap();
|
printReportedPeersMap();
|
||||||
|
|
||||||
log.trace("removePeer nonceMap=" + nonceMap + " / peerAddress=" + peerAddress);
|
|
||||||
nonceMap.remove(peerAddress);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addAuthenticatedPeer(Address address, Peer peer) {
|
private Address getMyAddress() {
|
||||||
boolean firstPeerAdded;
|
|
||||||
authenticatedPeers.put(address, peer);
|
|
||||||
firstPeerAdded = authenticatedPeers.size() == 1;
|
|
||||||
|
|
||||||
UserThread.execute(() -> peerListeners.stream().forEach(e -> e.onPeerAdded(peer)));
|
|
||||||
|
|
||||||
if (firstPeerAdded)
|
|
||||||
UserThread.execute(() -> peerListeners.stream().forEach(e -> e.onFirstPeerAdded(peer)));
|
|
||||||
|
|
||||||
if (authenticatedPeers.size() > MAX_CONNECTIONS)
|
|
||||||
disconnectOldConnections();
|
|
||||||
|
|
||||||
log.trace("addConnectedPeer [post]");
|
|
||||||
printConnectedPeersMap();
|
|
||||||
}
|
|
||||||
|
|
||||||
private Address getAddress() {
|
|
||||||
return networkNode.getAddress();
|
return networkNode.getAddress();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -624,8 +612,12 @@ public class PeerGroup {
|
||||||
// Utils
|
// Utils
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
private Address getAndRemoveRandomItem(List<Address> list) {
|
||||||
|
return list.remove(new Random().nextInt(list.size()));
|
||||||
|
}
|
||||||
|
|
||||||
public void printConnectedPeersMap() {
|
public void printConnectedPeersMap() {
|
||||||
StringBuilder result = new StringBuilder("\nConnected peers for node " + getAddress() + ":");
|
StringBuilder result = new StringBuilder("\nConnected peers for node " + getMyAddress() + ":");
|
||||||
authenticatedPeers.values().stream().forEach(e -> {
|
authenticatedPeers.values().stream().forEach(e -> {
|
||||||
result.append("\n\t" + e.address);
|
result.append("\n\t" + e.address);
|
||||||
});
|
});
|
||||||
|
@ -634,16 +626,11 @@ public class PeerGroup {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void printReportedPeersMap() {
|
public void printReportedPeersMap() {
|
||||||
StringBuilder result = new StringBuilder("\nReported peerAddresses for node " + getAddress() + ":");
|
StringBuilder result = new StringBuilder("\nReported peerAddresses for node " + getMyAddress() + ":");
|
||||||
reportedPeerAddresses.stream().forEach(e -> {
|
reportedPeerAddresses.stream().forEach(e -> {
|
||||||
result.append("\n\t" + e);
|
result.append("\n\t" + e);
|
||||||
});
|
});
|
||||||
result.append("\n");
|
result.append("\n");
|
||||||
log.info(result.toString());
|
log.info(result.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getObjectId() {
|
|
||||||
return super.toString().split("@")[1].toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,7 +4,7 @@ import io.bitsquare.p2p.Address;
|
||||||
import io.bitsquare.p2p.network.Connection;
|
import io.bitsquare.p2p.network.Connection;
|
||||||
|
|
||||||
public interface PeerListener {
|
public interface PeerListener {
|
||||||
void onFirstPeerAdded(Peer peer);
|
void onFirstAuthenticatePeer(Peer peer);
|
||||||
|
|
||||||
void onPeerAdded(Peer peer);
|
void onPeerAdded(Peer peer);
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,7 @@ package io.bitsquare.p2p.peer.messages;
|
||||||
import io.bitsquare.app.Version;
|
import io.bitsquare.app.Version;
|
||||||
import io.bitsquare.p2p.Address;
|
import io.bitsquare.p2p.Address;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.HashSet;
|
||||||
|
|
||||||
public final class GetPeersMessage implements AuthenticationMessage {
|
public final class GetPeersMessage implements AuthenticationMessage {
|
||||||
// That object is sent over the wire, so we need to take care of version compatibility.
|
// That object is sent over the wire, so we need to take care of version compatibility.
|
||||||
|
@ -11,9 +11,9 @@ public final class GetPeersMessage implements AuthenticationMessage {
|
||||||
|
|
||||||
public final Address address;
|
public final Address address;
|
||||||
public final long challengerNonce;
|
public final long challengerNonce;
|
||||||
public final ArrayList<Address> peerAddresses;
|
public final HashSet<Address> peerAddresses;
|
||||||
|
|
||||||
public GetPeersMessage(Address address, long challengerNonce, ArrayList<Address> peerAddresses) {
|
public GetPeersMessage(Address address, long challengerNonce, HashSet<Address> peerAddresses) {
|
||||||
this.address = address;
|
this.address = address;
|
||||||
this.challengerNonce = challengerNonce;
|
this.challengerNonce = challengerNonce;
|
||||||
this.peerAddresses = peerAddresses;
|
this.peerAddresses = peerAddresses;
|
||||||
|
|
|
@ -3,16 +3,16 @@ package io.bitsquare.p2p.peer.messages;
|
||||||
import io.bitsquare.app.Version;
|
import io.bitsquare.app.Version;
|
||||||
import io.bitsquare.p2p.Address;
|
import io.bitsquare.p2p.Address;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.HashSet;
|
||||||
|
|
||||||
public final class PeersMessage implements AuthenticationMessage {
|
public final class PeersMessage implements AuthenticationMessage {
|
||||||
// That object is sent over the wire, so we need to take care of version compatibility.
|
// That object is sent over the wire, so we need to take care of version compatibility.
|
||||||
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
|
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
|
||||||
|
|
||||||
public final Address address;
|
public final Address address;
|
||||||
public final ArrayList<Address> peerAddresses;
|
public final HashSet<Address> peerAddresses;
|
||||||
|
|
||||||
public PeersMessage(Address address, ArrayList<Address> peerAddresses) {
|
public PeersMessage(Address address, HashSet<Address> peerAddresses) {
|
||||||
this.address = address;
|
this.address = address;
|
||||||
this.peerAddresses = peerAddresses;
|
this.peerAddresses = peerAddresses;
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,13 +11,17 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
import static com.google.common.base.Preconditions.checkArgument;
|
||||||
|
|
||||||
public class SeedNode {
|
public class SeedNode {
|
||||||
private static final Logger log = LoggerFactory.getLogger(SeedNode.class);
|
private static final Logger log = LoggerFactory.getLogger(SeedNode.class);
|
||||||
|
|
||||||
private int port = 8001;
|
private Address mySeedNodeAddress = new Address("localhost:8001");
|
||||||
private boolean useLocalhost = false;
|
private boolean useLocalhost = false;
|
||||||
private Set<Address> seedNodes;
|
private Set<Address> seedNodes;
|
||||||
private P2PService p2PService;
|
private P2PService p2PService;
|
||||||
|
@ -31,30 +35,45 @@ public class SeedNode {
|
||||||
// API
|
// API
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
// args: port useLocalhost seedNodes
|
// args: myAddress (incl. port) useLocalhost seedNodes (separated with |)
|
||||||
// eg. 4444 true localhost:7777 localhost:8888
|
// 2. and 3. args are optional
|
||||||
|
// eg. lmvdenjkyvx2ovga.onion:8001 false eo5ay2lyzrfvx2nr.onion:8002|si3uu56adkyqkldl.onion:8003
|
||||||
|
// or when using localhost: localhost:8001 true localhost:8002|localhost:8003
|
||||||
public void processArgs(String[] args) {
|
public void processArgs(String[] args) {
|
||||||
if (args.length > 0) {
|
if (args.length > 0) {
|
||||||
port = Integer.parseInt(args[0]);
|
|
||||||
|
String arg0 = args[0];
|
||||||
|
checkArgument(arg0.contains(":") && arg0.split(":").length == 2 && arg0.split(":")[1].length() == 4, "Wrong program argument");
|
||||||
|
mySeedNodeAddress = new Address(arg0);
|
||||||
|
|
||||||
if (args.length > 1) {
|
if (args.length > 1) {
|
||||||
useLocalhost = ("true").equals(args[1]);
|
String arg1 = args[1];
|
||||||
|
checkArgument(arg1.equals("true") || arg1.equals("false"));
|
||||||
|
useLocalhost = ("true").equals(arg1);
|
||||||
|
|
||||||
if (args.length > 2) {
|
if (args.length == 3) {
|
||||||
|
String arg2 = args[2];
|
||||||
|
checkArgument(arg2.contains(":") && arg2.split(":").length > 1 && arg2.split(":")[1].length() > 3, "Wrong program argument");
|
||||||
|
List<String> list = Arrays.asList(arg2.split("|"));
|
||||||
seedNodes = new HashSet<>();
|
seedNodes = new HashSet<>();
|
||||||
for (int i = 2; i < args.length; i++) {
|
list.forEach(e -> {
|
||||||
seedNodes.add(new Address(args[i]));
|
checkArgument(e.contains(":") && e.split(":").length == 2 && e.split(":")[1].length() == 4, "Wrong program argument");
|
||||||
}
|
seedNodes.add(new Address(e));
|
||||||
|
});
|
||||||
|
seedNodes.remove(mySeedNodeAddress);
|
||||||
|
} else {
|
||||||
|
log.error("Wrong number of program arguments." +
|
||||||
|
"\nProgram arguments: myAddress useLocalhost seedNodes");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void createAndStartP2PService() {
|
public void createAndStartP2PService() {
|
||||||
createAndStartP2PService(null, null, port, useLocalhost, seedNodes, null);
|
createAndStartP2PService(null, null, mySeedNodeAddress, useLocalhost, seedNodes, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void createAndStartP2PService(EncryptionService encryptionService, KeyRing keyRing, int port, boolean useLocalhost, @Nullable Set<Address> seedNodes, @Nullable P2PServiceListener listener) {
|
public void createAndStartP2PService(EncryptionService encryptionService, KeyRing keyRing, Address mySeedNodeAddress, boolean useLocalhost, @Nullable Set<Address> seedNodes, @Nullable P2PServiceListener listener) {
|
||||||
SeedNodesRepository seedNodesRepository = new SeedNodesRepository();
|
SeedNodesRepository seedNodesRepository = new SeedNodesRepository();
|
||||||
if (seedNodes != null && !seedNodes.isEmpty()) {
|
if (seedNodes != null && !seedNodes.isEmpty()) {
|
||||||
if (useLocalhost)
|
if (useLocalhost)
|
||||||
|
@ -63,7 +82,8 @@ public class SeedNode {
|
||||||
seedNodesRepository.setTorSeedNodeAddresses(seedNodes);
|
seedNodesRepository.setTorSeedNodeAddresses(seedNodes);
|
||||||
}
|
}
|
||||||
|
|
||||||
p2PService = new P2PService(seedNodesRepository, port, new File("bitsquare_seed_node_" + port), useLocalhost, encryptionService, keyRing, new File("dummy"));
|
p2PService = new P2PService(seedNodesRepository, mySeedNodeAddress.port, new File("bitsquare_seed_node_" + mySeedNodeAddress.port), useLocalhost, encryptionService, keyRing, new File("dummy"));
|
||||||
|
p2PService.removeMySeedNodeAddressFromList(mySeedNodeAddress);
|
||||||
p2PService.start(listener);
|
p2PService.start(listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,22 +6,22 @@ import io.bitsquare.p2p.storage.data.ProtectedData;
|
||||||
|
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
|
||||||
public final class DataSetMessage implements Message {
|
public final class AllDataMessage implements Message {
|
||||||
// That object is sent over the wire, so we need to take care of version compatibility.
|
// That object is sent over the wire, so we need to take care of version compatibility.
|
||||||
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
|
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
|
||||||
|
|
||||||
public final HashSet<ProtectedData> set;
|
public final HashSet<ProtectedData> set;
|
||||||
|
|
||||||
public DataSetMessage(HashSet<ProtectedData> set) {
|
public AllDataMessage(HashSet<ProtectedData> set) {
|
||||||
this.set = set;
|
this.set = set;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(Object o) {
|
public boolean equals(Object o) {
|
||||||
if (this == o) return true;
|
if (this == o) return true;
|
||||||
if (!(o instanceof DataSetMessage)) return false;
|
if (!(o instanceof AllDataMessage)) return false;
|
||||||
|
|
||||||
DataSetMessage that = (DataSetMessage) o;
|
AllDataMessage that = (AllDataMessage) o;
|
||||||
|
|
||||||
return !(set != null ? !set.equals(that.set) : that.set != null);
|
return !(set != null ? !set.equals(that.set) : that.set != null);
|
||||||
|
|
|
@ -3,13 +3,10 @@ package io.bitsquare.p2p.storage.messages;
|
||||||
import io.bitsquare.app.Version;
|
import io.bitsquare.app.Version;
|
||||||
import io.bitsquare.p2p.Message;
|
import io.bitsquare.p2p.Message;
|
||||||
|
|
||||||
public final class GetDataSetMessage implements Message {
|
public final class GetAllDataMessage implements Message {
|
||||||
// That object is sent over the wire, so we need to take care of version compatibility.
|
// That object is sent over the wire, so we need to take care of version compatibility.
|
||||||
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
|
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
|
||||||
|
|
||||||
public final long nonce;
|
public GetAllDataMessage() {
|
||||||
|
|
||||||
public GetDataSetMessage(long nonce) {
|
|
||||||
this.nonce = nonce;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -80,7 +80,7 @@ public class TestUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
CountDownLatch latch = new CountDownLatch(1);
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
seedNode.createAndStartP2PService(encryptionService, keyRing, port, useLocalhost, seedNodes, new P2PServiceListener() {
|
seedNode.createAndStartP2PService(encryptionService, keyRing, new Address("localhost", port), useLocalhost, seedNodes, new P2PServiceListener() {
|
||||||
@Override
|
@Override
|
||||||
public void onAllDataReceived() {
|
public void onAllDataReceived() {
|
||||||
}
|
}
|
||||||
|
@ -94,7 +94,7 @@ public class TestUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onHiddenServiceReady() {
|
public void onHiddenServicePublished() {
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -134,7 +134,7 @@ public class TestUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onHiddenServiceReady() {
|
public void onHiddenServicePublished() {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -42,7 +42,7 @@ public class LocalhostNetworkNodeTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onHiddenServiceReady() {
|
public void onHiddenServicePublished() {
|
||||||
log.debug("onHiddenServiceReady");
|
log.debug("onHiddenServiceReady");
|
||||||
startupLatch.countDown();
|
startupLatch.countDown();
|
||||||
}
|
}
|
||||||
|
@ -65,7 +65,7 @@ public class LocalhostNetworkNodeTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onHiddenServiceReady() {
|
public void onHiddenServicePublished() {
|
||||||
log.debug("onHiddenServiceReady 2");
|
log.debug("onHiddenServiceReady 2");
|
||||||
startupLatch.countDown();
|
startupLatch.countDown();
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,7 +42,7 @@ public class TorNetworkNodeTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onHiddenServiceReady() {
|
public void onHiddenServicePublished() {
|
||||||
log.debug("onReadyForReceivingMessages");
|
log.debug("onReadyForReceivingMessages");
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
}
|
}
|
||||||
|
@ -65,7 +65,7 @@ public class TorNetworkNodeTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onHiddenServiceReady() {
|
public void onHiddenServicePublished() {
|
||||||
log.debug("onReadyForReceivingMessages");
|
log.debug("onReadyForReceivingMessages");
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -124,7 +124,7 @@ public class TorNetworkNodeTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onHiddenServiceReady() {
|
public void onHiddenServicePublished() {
|
||||||
log.debug("onReadyForReceivingMessages");
|
log.debug("onReadyForReceivingMessages");
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
}
|
}
|
||||||
|
@ -144,7 +144,7 @@ public class TorNetworkNodeTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onHiddenServiceReady() {
|
public void onHiddenServicePublished() {
|
||||||
log.debug("onReadyForReceivingMessages");
|
log.debug("onReadyForReceivingMessages");
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
}
|
}
|
||||||
|
|
|
@ -79,10 +79,11 @@ public class PeerGroupTest {
|
||||||
LocalhostNetworkNode.setSimulateTorDelayTorNode(0);
|
LocalhostNetworkNode.setSimulateTorDelayTorNode(0);
|
||||||
LocalhostNetworkNode.setSimulateTorDelayHiddenService(0);
|
LocalhostNetworkNode.setSimulateTorDelayHiddenService(0);
|
||||||
seedNodes = new HashSet<>();
|
seedNodes = new HashSet<>();
|
||||||
seedNodes.add(new Address("localhost:8001"));
|
Address address = new Address("localhost:8001");
|
||||||
|
seedNodes.add(address);
|
||||||
seedNode1 = new SeedNode();
|
seedNode1 = new SeedNode();
|
||||||
latch = new CountDownLatch(2);
|
latch = new CountDownLatch(2);
|
||||||
seedNode1.createAndStartP2PService(null, null, 8001, useLocalhost, seedNodes, new P2PServiceListener() {
|
seedNode1.createAndStartP2PService(null, null, address, useLocalhost, seedNodes, new P2PServiceListener() {
|
||||||
@Override
|
@Override
|
||||||
public void onAllDataReceived() {
|
public void onAllDataReceived() {
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
|
@ -98,7 +99,7 @@ public class PeerGroupTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onHiddenServiceReady() {
|
public void onHiddenServicePublished() {
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -118,13 +119,15 @@ public class PeerGroupTest {
|
||||||
LocalhostNetworkNode.setSimulateTorDelayTorNode(0);
|
LocalhostNetworkNode.setSimulateTorDelayTorNode(0);
|
||||||
LocalhostNetworkNode.setSimulateTorDelayHiddenService(0);
|
LocalhostNetworkNode.setSimulateTorDelayHiddenService(0);
|
||||||
seedNodes = new HashSet<>();
|
seedNodes = new HashSet<>();
|
||||||
seedNodes.add(new Address("localhost:8001"));
|
Address address1 = new Address("localhost:8001");
|
||||||
seedNodes.add(new Address("localhost:8002"));
|
seedNodes.add(address1);
|
||||||
|
Address address2 = new Address("localhost:8002");
|
||||||
|
seedNodes.add(address2);
|
||||||
|
|
||||||
latch = new CountDownLatch(6);
|
latch = new CountDownLatch(6);
|
||||||
|
|
||||||
seedNode1 = new SeedNode();
|
seedNode1 = new SeedNode();
|
||||||
seedNode1.createAndStartP2PService(null, null, 8001, useLocalhost, seedNodes, new P2PServiceListener() {
|
seedNode1.createAndStartP2PService(null, null, address1, useLocalhost, seedNodes, new P2PServiceListener() {
|
||||||
@Override
|
@Override
|
||||||
public void onAllDataReceived() {
|
public void onAllDataReceived() {
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
|
@ -141,7 +144,7 @@ public class PeerGroupTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onHiddenServiceReady() {
|
public void onHiddenServicePublished() {
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -155,7 +158,7 @@ public class PeerGroupTest {
|
||||||
Thread.sleep(500);
|
Thread.sleep(500);
|
||||||
|
|
||||||
seedNode2 = new SeedNode();
|
seedNode2 = new SeedNode();
|
||||||
seedNode2.createAndStartP2PService(null, null, 8002, useLocalhost, seedNodes, new P2PServiceListener() {
|
seedNode2.createAndStartP2PService(null, null, address2, useLocalhost, seedNodes, new P2PServiceListener() {
|
||||||
@Override
|
@Override
|
||||||
public void onAllDataReceived() {
|
public void onAllDataReceived() {
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
|
@ -172,7 +175,7 @@ public class PeerGroupTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onHiddenServiceReady() {
|
public void onHiddenServicePublished() {
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -382,7 +385,7 @@ public class PeerGroupTest {
|
||||||
SeedNode seedNode = new SeedNode();
|
SeedNode seedNode = new SeedNode();
|
||||||
|
|
||||||
latch = new CountDownLatch(1);
|
latch = new CountDownLatch(1);
|
||||||
seedNode.createAndStartP2PService(null, null, port, useLocalhost, seedNodes, new P2PServiceListener() {
|
seedNode.createAndStartP2PService(null, null, new Address("localhost", port), useLocalhost, seedNodes, new P2PServiceListener() {
|
||||||
@Override
|
@Override
|
||||||
public void onAllDataReceived() {
|
public void onAllDataReceived() {
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
|
@ -398,7 +401,7 @@ public class PeerGroupTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onHiddenServiceReady() {
|
public void onHiddenServicePublished() {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,6 @@ package io.bitsquare.p2p.seed;
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import io.bitsquare.app.Logging;
|
import io.bitsquare.app.Logging;
|
||||||
import io.bitsquare.common.UserThread;
|
import io.bitsquare.common.UserThread;
|
||||||
import io.bitsquare.common.util.Utilities;
|
|
||||||
import org.bouncycastle.jce.provider.BouncyCastleProvider;
|
import org.bouncycastle.jce.provider.BouncyCastleProvider;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -12,7 +11,6 @@ import java.nio.file.Path;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
import java.security.NoSuchAlgorithmException;
|
import java.security.NoSuchAlgorithmException;
|
||||||
import java.security.Security;
|
import java.security.Security;
|
||||||
import java.util.Random;
|
|
||||||
import java.util.Scanner;
|
import java.util.Scanner;
|
||||||
import java.util.Timer;
|
import java.util.Timer;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
@ -24,8 +22,8 @@ public class SeedNodeMain {
|
||||||
|
|
||||||
private boolean stopped;
|
private boolean stopped;
|
||||||
|
|
||||||
// args: port useLocalhost seedNodes
|
// args: myAddress (incl. port) useLocalhost seedNodes (separated with |)
|
||||||
// eg. 4444 true localhost:7777 localhost:8888
|
// eg. lmvdenjkyvx2ovga.onion:8001 false eo5ay2lyzrfvx2nr.onion:8002|si3uu56adkyqkldl.onion:8003
|
||||||
// To stop enter: q
|
// To stop enter: q
|
||||||
public static void main(String[] args) throws NoSuchAlgorithmException {
|
public static void main(String[] args) throws NoSuchAlgorithmException {
|
||||||
Path path = Paths.get("seed_node_log");
|
Path path = Paths.get("seed_node_log");
|
||||||
|
@ -62,8 +60,7 @@ public class SeedNodeMain {
|
||||||
if (line.equals("q")) {
|
if (line.equals("q")) {
|
||||||
if (!stopped) {
|
if (!stopped) {
|
||||||
stopped = true;
|
stopped = true;
|
||||||
Timer timeout = Utilities.runTimerTask(() -> {
|
Timer timeout = UserThread.runAfter(() -> {
|
||||||
Thread.currentThread().setName("ShutdownTimeout-" + new Random().nextInt(1000));
|
|
||||||
log.error("Timeout occurred at shutDown request");
|
log.error("Timeout occurred at shutDown request");
|
||||||
System.exit(1);
|
System.exit(1);
|
||||||
}, 10);
|
}, 10);
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue