mirror of
https://github.com/haveno-dex/haveno.git
synced 2025-04-19 07:15:54 -04:00
Renaming
This commit is contained in:
parent
def492a22a
commit
a1993a5d9a
@ -3,6 +3,7 @@ package io.bitsquare.p2p.seed;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import io.bitsquare.common.UserThread;
|
||||
import io.bitsquare.common.util.Utilities;
|
||||
import org.bitcoinj.crypto.DRMWorkaround;
|
||||
import org.bouncycastle.jce.provider.BouncyCastleProvider;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -26,6 +27,8 @@ public class SeedNodeMain {
|
||||
// eg. 4444 true localhost:7777 localhost:8888
|
||||
// To stop enter: q
|
||||
public static void main(String[] args) throws NoSuchAlgorithmException {
|
||||
|
||||
DRMWorkaround.maybeDisableExportControls();
|
||||
seedNodeMain = new SeedNodeMain(args);
|
||||
}
|
||||
|
||||
|
@ -54,6 +54,7 @@ import javafx.scene.layout.StackPane;
|
||||
import javafx.stage.Modality;
|
||||
import javafx.stage.Stage;
|
||||
import javafx.stage.StageStyle;
|
||||
import org.bitcoinj.crypto.DRMWorkaround;
|
||||
import org.bouncycastle.jce.provider.BouncyCastleProvider;
|
||||
import org.controlsfx.dialog.Dialogs;
|
||||
import org.reactfx.EventStreams;
|
||||
@ -115,6 +116,8 @@ public class BitsquareApp extends Application {
|
||||
Thread.setDefaultUncaughtExceptionHandler(handler);
|
||||
Thread.currentThread().setUncaughtExceptionHandler(handler);
|
||||
|
||||
DRMWorkaround.maybeDisableExportControls();
|
||||
|
||||
Security.addProvider(new BouncyCastleProvider());
|
||||
|
||||
try {
|
||||
|
@ -16,9 +16,9 @@ import io.bitsquare.crypto.EncryptionService;
|
||||
import io.bitsquare.crypto.SealedAndSignedMessage;
|
||||
import io.bitsquare.p2p.messaging.*;
|
||||
import io.bitsquare.p2p.network.*;
|
||||
import io.bitsquare.p2p.routing.Peer;
|
||||
import io.bitsquare.p2p.routing.Routing;
|
||||
import io.bitsquare.p2p.routing.RoutingListener;
|
||||
import io.bitsquare.p2p.peer.Peer;
|
||||
import io.bitsquare.p2p.peer.PeerGroup;
|
||||
import io.bitsquare.p2p.peer.PeerListener;
|
||||
import io.bitsquare.p2p.seed.SeedNodesRepository;
|
||||
import io.bitsquare.p2p.storage.HashMapChangedListener;
|
||||
import io.bitsquare.p2p.storage.ProtectedExpirableDataStorage;
|
||||
@ -60,7 +60,7 @@ public class P2PService {
|
||||
private final NetworkStatistics networkStatistics;
|
||||
|
||||
private NetworkNode networkNode;
|
||||
private Routing routing;
|
||||
private PeerGroup peerGroup;
|
||||
private ProtectedExpirableDataStorage dataStorage;
|
||||
private final List<DecryptedMailListener> decryptedMailListeners = new CopyOnWriteArrayList<>();
|
||||
private final List<DecryptedMailboxListener> decryptedMailboxListeners = new CopyOnWriteArrayList<>();
|
||||
@ -106,7 +106,7 @@ public class P2PService {
|
||||
}
|
||||
|
||||
private void init() {
|
||||
// network layer
|
||||
// network
|
||||
if (useLocalhost) {
|
||||
networkNode = new LocalhostNetworkNode(port);
|
||||
seedNodeAddresses = seedNodesRepository.getLocalhostSeedNodeAddresses();
|
||||
@ -115,12 +115,12 @@ public class P2PService {
|
||||
seedNodeAddresses = seedNodesRepository.getTorSeedNodeAddresses();
|
||||
}
|
||||
|
||||
// routing layer
|
||||
routing = new Routing(networkNode, seedNodeAddresses);
|
||||
if (useLocalhost) Routing.setSimulateAuthTorNode(2 * 1000);
|
||||
// peer group
|
||||
peerGroup = new PeerGroup(networkNode, seedNodeAddresses);
|
||||
if (useLocalhost) PeerGroup.setSimulateAuthTorNode(2 * 1000);
|
||||
|
||||
// storage layer
|
||||
dataStorage = new ProtectedExpirableDataStorage(routing, storageDir);
|
||||
// storage
|
||||
dataStorage = new ProtectedExpirableDataStorage(peerGroup, storageDir);
|
||||
|
||||
|
||||
// Listeners
|
||||
@ -236,7 +236,7 @@ public class P2PService {
|
||||
}
|
||||
});
|
||||
|
||||
routing.addRoutingListener(new RoutingListener() {
|
||||
peerGroup.addPeerListener(new PeerListener() {
|
||||
@Override
|
||||
public void onFirstPeerAdded(Peer peer) {
|
||||
log.trace("onFirstPeer " + peer.toString());
|
||||
@ -306,8 +306,8 @@ public class P2PService {
|
||||
if (dataStorage != null)
|
||||
dataStorage.shutDown();
|
||||
|
||||
if (routing != null)
|
||||
routing.shutDown();
|
||||
if (peerGroup != null)
|
||||
peerGroup.shutDown();
|
||||
|
||||
if (networkNode != null)
|
||||
networkNode.shutDown(() -> {
|
||||
@ -351,7 +351,7 @@ public class P2PService {
|
||||
throw new AuthenticationException("You must be authenticated before sending direct messages.");
|
||||
|
||||
if (!authenticatedPeerAddresses.contains(peerAddress))
|
||||
routing.authenticateToPeer(peerAddress,
|
||||
peerGroup.authenticateToPeer(peerAddress,
|
||||
() -> doSendEncryptedMailMessage(peerAddress, pubKeyRing, message, sendMailMessageListener),
|
||||
() -> UserThread.execute(() -> sendMailMessageListener.onFault()));
|
||||
else
|
||||
@ -395,7 +395,7 @@ public class P2PService {
|
||||
if (authenticatedPeerAddresses.contains(peerAddress)) {
|
||||
trySendEncryptedMailboxMessage(peerAddress, peersPubKeyRing, message, sendMailboxMessageListener);
|
||||
} else {
|
||||
routing.authenticateToPeer(peerAddress,
|
||||
peerGroup.authenticateToPeer(peerAddress,
|
||||
() -> trySendEncryptedMailboxMessage(peerAddress, peersPubKeyRing, message, sendMailboxMessageListener),
|
||||
() -> {
|
||||
log.info("We cannot authenticate to peer. Peer might be offline. We will store message in mailbox.");
|
||||
@ -548,8 +548,8 @@ public class P2PService {
|
||||
return networkNode;
|
||||
}
|
||||
|
||||
public Routing getRouting() {
|
||||
return routing;
|
||||
public PeerGroup getPeerGroup() {
|
||||
return peerGroup;
|
||||
}
|
||||
|
||||
public Address getAddress() {
|
||||
@ -644,7 +644,7 @@ public class P2PService {
|
||||
checkArgument(networkNode.getAddress() != null, "Address must be set when we are authenticated");
|
||||
connectedSeedNodes.remove(networkNode.getAddress());
|
||||
|
||||
routing.startAuthentication(connectedSeedNodes);
|
||||
peerGroup.startAuthentication(connectedSeedNodes);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -34,7 +34,7 @@ public class Connection {
|
||||
private static final Logger log = LoggerFactory.getLogger(Connection.class);
|
||||
private static final int MAX_MSG_SIZE = 5 * 1024 * 1024; // 5 MB of compressed data
|
||||
private static final int MAX_ILLEGAL_REQUESTS = 5;
|
||||
private static final int SOCKET_TIMEOUT = 30 * 60 * 1000; // 30 min.
|
||||
private static final int SOCKET_TIMEOUT = 60 * 1000; // 1 min.
|
||||
private InputHandler inputHandler;
|
||||
private boolean isAuthenticated;
|
||||
|
||||
|
@ -1,9 +1,9 @@
|
||||
package io.bitsquare.p2p.routing;
|
||||
package io.bitsquare.p2p.peer;
|
||||
|
||||
import io.bitsquare.p2p.Address;
|
||||
import io.bitsquare.p2p.network.Connection;
|
||||
|
||||
public abstract class AuthenticationListener implements RoutingListener {
|
||||
public abstract class AuthenticationListener implements PeerListener {
|
||||
public void onFirstPeerAdded(Peer peer) {
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
package io.bitsquare.p2p.routing;
|
||||
package io.bitsquare.p2p.peer;
|
||||
|
||||
import io.bitsquare.p2p.Address;
|
||||
import io.bitsquare.p2p.network.Connection;
|
650
network/src/main/java/io/bitsquare/p2p/peer/PeerGroup.java
Normal file
650
network/src/main/java/io/bitsquare/p2p/peer/PeerGroup.java
Normal file
@ -0,0 +1,650 @@
|
||||
package io.bitsquare.p2p.peer;
|
||||
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import io.bitsquare.common.UserThread;
|
||||
import io.bitsquare.common.util.Utilities;
|
||||
import io.bitsquare.p2p.Address;
|
||||
import io.bitsquare.p2p.network.*;
|
||||
import io.bitsquare.p2p.peer.messages.*;
|
||||
import io.bitsquare.p2p.storage.messages.BroadcastMessage;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CopyOnWriteArraySet;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class PeerGroup {
|
||||
private static final Logger log = LoggerFactory.getLogger(PeerGroup.class);
|
||||
|
||||
private static int simulateAuthTorNode = 0;
|
||||
|
||||
public static void setSimulateAuthTorNode(int simulateAuthTorNode) {
|
||||
PeerGroup.simulateAuthTorNode = simulateAuthTorNode;
|
||||
}
|
||||
|
||||
private static int MAX_CONNECTIONS = 8;
|
||||
private static int MAINTENANCE_INTERVAL = new Random().nextInt(2 * 60 * 1000) + 2 * 60 * 1000; // 2-4 min.
|
||||
private static int PING_AFTER_CONNECTION_INACTIVITY = 30 * 1000;
|
||||
private long startAuthTs;
|
||||
|
||||
public static void setMaxConnections(int maxConnections) {
|
||||
MAX_CONNECTIONS = maxConnections;
|
||||
}
|
||||
|
||||
private final NetworkNode networkNode;
|
||||
private final List<Address> seedNodes;
|
||||
private final Map<Address, Long> nonceMap = new ConcurrentHashMap<>();
|
||||
private final List<PeerListener> peerListeners = new CopyOnWriteArrayList<>();
|
||||
private final Map<Address, Peer> authenticatedPeers = new ConcurrentHashMap<>();
|
||||
private final Set<Address> reportedPeerAddresses = new CopyOnWriteArraySet<>();
|
||||
private final Map<Address, Runnable> authenticationCompleteHandlers = new ConcurrentHashMap<>();
|
||||
private final Timer maintenanceTimer = new Timer();
|
||||
private volatile boolean shutDownInProgress;
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Constructor
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
public PeerGroup(final NetworkNode networkNode, List<Address> seeds) {
|
||||
this.networkNode = networkNode;
|
||||
|
||||
// We copy it as we remove ourselves later from the list if we are a seed node
|
||||
this.seedNodes = new CopyOnWriteArrayList<>(seeds);
|
||||
|
||||
init(networkNode);
|
||||
}
|
||||
|
||||
private void init(NetworkNode networkNode) {
|
||||
networkNode.addMessageListener((message, connection) -> {
|
||||
if (message instanceof AuthenticationMessage)
|
||||
processAuthenticationMessage((AuthenticationMessage) message, connection);
|
||||
else if (message instanceof MaintenanceMessage)
|
||||
processMaintenanceMessage((MaintenanceMessage) message, connection);
|
||||
});
|
||||
|
||||
networkNode.addConnectionListener(new ConnectionListener() {
|
||||
@Override
|
||||
public void onConnection(Connection connection) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Reason reason, Connection connection) {
|
||||
// only removes authenticated nodes
|
||||
if (connection.isAuthenticated())
|
||||
removePeer(connection.getPeerAddress());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable throwable) {
|
||||
}
|
||||
});
|
||||
|
||||
networkNode.addSetupListener(new SetupListener() {
|
||||
@Override
|
||||
public void onTorNodeReady() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onHiddenServiceReady() {
|
||||
// remove ourselves in case we are a seed node
|
||||
Address myAddress = getAddress();
|
||||
if (myAddress != null)
|
||||
seedNodes.remove(myAddress);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSetupFailed(Throwable throwable) {
|
||||
}
|
||||
});
|
||||
|
||||
maintenanceTimer.scheduleAtFixedRate(new TimerTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
Thread.currentThread().setName("MaintenanceTimer-" + new Random().nextInt(1000));
|
||||
try {
|
||||
UserThread.execute(() -> {
|
||||
disconnectOldConnections();
|
||||
pingPeers();
|
||||
});
|
||||
} catch (Throwable t) {
|
||||
t.printStackTrace();
|
||||
log.error("Executing task failed. " + t.getMessage());
|
||||
}
|
||||
}
|
||||
}, MAINTENANCE_INTERVAL, MAINTENANCE_INTERVAL);
|
||||
}
|
||||
|
||||
private void disconnectOldConnections() {
|
||||
List<Connection> authenticatedConnections = networkNode.getAllConnections().stream()
|
||||
.filter(e -> e.isAuthenticated())
|
||||
.collect(Collectors.toList());
|
||||
if (authenticatedConnections.size() > MAX_CONNECTIONS) {
|
||||
authenticatedConnections.sort((o1, o2) -> o1.getLastActivityDate().compareTo(o2.getLastActivityDate()));
|
||||
log.info("Number of connections exceeds MAX_CONNECTIONS. Current size=" + authenticatedConnections.size());
|
||||
Connection connection = authenticatedConnections.remove(0);
|
||||
log.info("Shutdown oldest connection with last activity date=" + connection.getLastActivityDate() + " / connection=" + connection);
|
||||
|
||||
connection.shutDown(() -> Utilities.runTimerTask(() -> {
|
||||
Thread.currentThread().setName("DelayDisconnectOldConnectionsTimer-" + new Random().nextInt(1000));
|
||||
disconnectOldConnections();
|
||||
}, 1, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
}
|
||||
|
||||
private void pingPeers() {
|
||||
log.trace("pingPeers");
|
||||
List<Peer> connectedPeersList = new ArrayList<>(authenticatedPeers.values());
|
||||
connectedPeersList.stream()
|
||||
.filter(e -> (new Date().getTime() - e.connection.getLastActivityDate().getTime()) > PING_AFTER_CONNECTION_INACTIVITY)
|
||||
.forEach(e -> Utilities.runTimerTaskWithRandomDelay(() -> {
|
||||
Thread.currentThread().setName("DelayPingPeersTimer-" + new Random().nextInt(1000));
|
||||
SettableFuture<Connection> future = networkNode.sendMessage(e.connection, new PingMessage(e.getPingNonce()));
|
||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||
@Override
|
||||
public void onSuccess(Connection connection) {
|
||||
log.trace("PingMessage sent successfully");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(@NotNull Throwable throwable) {
|
||||
log.info("PingMessage sending failed " + throwable.getMessage());
|
||||
removePeer(e.address);
|
||||
}
|
||||
});
|
||||
}, 5, 10));
|
||||
}
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// API
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
public void shutDown() {
|
||||
if (!shutDownInProgress) {
|
||||
shutDownInProgress = true;
|
||||
if (maintenanceTimer != null)
|
||||
maintenanceTimer.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
public void broadcast(BroadcastMessage message, @Nullable Address sender) {
|
||||
log.trace("Broadcast message to " + authenticatedPeers.values().size() + " peers.");
|
||||
log.trace("message = " + message);
|
||||
printConnectedPeersMap();
|
||||
|
||||
authenticatedPeers.values().stream()
|
||||
.filter(e -> !e.address.equals(sender))
|
||||
.forEach(peer -> {
|
||||
log.trace("Broadcast message from " + getAddress() + " to " + peer.address + ".");
|
||||
SettableFuture<Connection> future = networkNode.sendMessage(peer.address, message);
|
||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||
@Override
|
||||
public void onSuccess(Connection connection) {
|
||||
log.trace("Broadcast from " + getAddress() + " to " + peer.address + " succeeded.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(@NotNull Throwable throwable) {
|
||||
log.info("Broadcast failed. " + throwable.getMessage());
|
||||
removePeer(peer.address);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
public void addMessageListener(MessageListener messageListener) {
|
||||
networkNode.addMessageListener(messageListener);
|
||||
}
|
||||
|
||||
public void removeMessageListener(MessageListener messageListener) {
|
||||
networkNode.removeMessageListener(messageListener);
|
||||
}
|
||||
|
||||
public void addPeerListener(PeerListener peerListener) {
|
||||
peerListeners.add(peerListener);
|
||||
}
|
||||
|
||||
public void removePeerListener(PeerListener peerListener) {
|
||||
peerListeners.remove(peerListener);
|
||||
}
|
||||
|
||||
public Map<Address, Peer> getAuthenticatedPeers() {
|
||||
return authenticatedPeers;
|
||||
}
|
||||
|
||||
// Use ArrayList not List as we need it serializable
|
||||
public ArrayList<Address> getAllPeerAddresses() {
|
||||
ArrayList<Address> allPeerAddresses = new ArrayList<>(reportedPeerAddresses);
|
||||
allPeerAddresses.addAll(authenticatedPeers.values().stream()
|
||||
.map(e -> e.address).collect(Collectors.toList()));
|
||||
// remove own address and seed nodes
|
||||
allPeerAddresses.remove(getAddress());
|
||||
return allPeerAddresses;
|
||||
}
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Authentication
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// authentication example:
|
||||
// node2 -> node1 RequestAuthenticationMessage
|
||||
// node1: close connection
|
||||
// node1 -> node2 ChallengeMessage on new connection
|
||||
// node2: authentication to node1 done if nonce ok
|
||||
// node2 -> node1 GetPeersMessage
|
||||
// node1: authentication to node2 done if nonce ok
|
||||
// node1 -> node2 PeersMessage
|
||||
|
||||
public void startAuthentication(Set<Address> connectedSeedNodes) {
|
||||
connectedSeedNodes.forEach(connectedSeedNode -> {
|
||||
sendRequestAuthenticationMessage(seedNodes, connectedSeedNode);
|
||||
});
|
||||
}
|
||||
|
||||
private void sendRequestAuthenticationMessage(final List<Address> remainingSeedNodes, final Address address) {
|
||||
log.info("We try to authenticate to a random seed node. " + address);
|
||||
startAuthTs = System.currentTimeMillis();
|
||||
final boolean[] alreadyConnected = {false};
|
||||
authenticatedPeers.values().stream().forEach(e -> {
|
||||
remainingSeedNodes.remove(e.address);
|
||||
if (address.equals(e.address))
|
||||
alreadyConnected[0] = true;
|
||||
});
|
||||
if (!alreadyConnected[0]) {
|
||||
long nonce = addToMapAndGetNonce(address);
|
||||
SettableFuture<Connection> future = networkNode.sendMessage(address, new RequestAuthenticationMessage(getAddress(), nonce));
|
||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable Connection connection) {
|
||||
log.info("send RequestAuthenticationMessage to " + address + " succeeded.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(@NotNull Throwable throwable) {
|
||||
log.info("Send RequestAuthenticationMessage to " + address + " failed. Exception:" + throwable.getMessage());
|
||||
log.trace("We try to authenticate to another random seed nodes of that list: " + remainingSeedNodes);
|
||||
getNextSeedNode(remainingSeedNodes);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
getNextSeedNode(remainingSeedNodes);
|
||||
}
|
||||
}
|
||||
|
||||
private void getNextSeedNode(List<Address> remainingSeedNodes) {
|
||||
List<Address> remainingSeedNodeAddresses = new CopyOnWriteArrayList<>(remainingSeedNodes);
|
||||
|
||||
Address myAddress = getAddress();
|
||||
if (myAddress != null)
|
||||
remainingSeedNodeAddresses.remove(myAddress);
|
||||
|
||||
if (!remainingSeedNodeAddresses.isEmpty()) {
|
||||
Collections.shuffle(remainingSeedNodeAddresses);
|
||||
Address address = remainingSeedNodeAddresses.remove(0);
|
||||
sendRequestAuthenticationMessage(remainingSeedNodeAddresses, address);
|
||||
} else {
|
||||
log.info("No other seed node found. That is expected for the first seed node.");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void processAuthenticationMessage(AuthenticationMessage message, Connection connection) {
|
||||
log.trace("processAuthenticationMessage " + message + " from " + connection.getPeerAddress() + " at " + getAddress());
|
||||
if (message instanceof RequestAuthenticationMessage) {
|
||||
RequestAuthenticationMessage requestAuthenticationMessage = (RequestAuthenticationMessage) message;
|
||||
Address peerAddress = requestAuthenticationMessage.address;
|
||||
log.trace("RequestAuthenticationMessage from " + peerAddress + " at " + getAddress());
|
||||
|
||||
connection.shutDown(() -> Utilities.runTimerTask(() -> {
|
||||
Thread.currentThread().setName("DelaySendChallengeMessageTimer-" + new Random().nextInt(1000));
|
||||
// we delay a bit as listeners for connection.onDisconnect are on other threads and might lead to
|
||||
// inconsistent state (removal of connection from NetworkNode.authenticatedConnections)
|
||||
log.trace("processAuthenticationMessage: connection.shutDown complete. RequestAuthenticationMessage from " + peerAddress + " at " + getAddress());
|
||||
long nonce = addToMapAndGetNonce(peerAddress);
|
||||
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, new ChallengeMessage(getAddress(), requestAuthenticationMessage.nonce, nonce));
|
||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||
@Override
|
||||
public void onSuccess(Connection connection) {
|
||||
log.debug("onSuccess sending ChallengeMessage");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable throwable) {
|
||||
log.warn("onFailure sending ChallengeMessage. We try again.");
|
||||
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, new ChallengeMessage(getAddress(), requestAuthenticationMessage.nonce, nonce));
|
||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||
@Override
|
||||
public void onSuccess(Connection connection) {
|
||||
log.debug("onSuccess sending 2. ChallengeMessage");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable throwable) {
|
||||
log.warn("onFailure sending ChallengeMessage. We give up.");
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
},
|
||||
100 + simulateAuthTorNode,
|
||||
TimeUnit.MILLISECONDS));
|
||||
} else if (message instanceof ChallengeMessage) {
|
||||
ChallengeMessage challengeMessage = (ChallengeMessage) message;
|
||||
Address peerAddress = challengeMessage.address;
|
||||
log.trace("ChallengeMessage from " + peerAddress + " at " + getAddress());
|
||||
HashMap<Address, Long> tempNonceMap = new HashMap<>(nonceMap);
|
||||
boolean verified = verifyNonceAndAuthenticatePeerAddress(challengeMessage.requesterNonce, peerAddress);
|
||||
if (verified) {
|
||||
connection.setPeerAddress(peerAddress);
|
||||
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress,
|
||||
new GetPeersMessage(getAddress(), challengeMessage.challengerNonce, getAllPeerAddresses()));
|
||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||
@Override
|
||||
public void onSuccess(Connection connection) {
|
||||
log.trace("GetPeersMessage sent successfully from " + getAddress() + " to " + peerAddress);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(@NotNull Throwable throwable) {
|
||||
log.info("GetPeersMessage sending failed " + throwable.getMessage());
|
||||
removePeer(peerAddress);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
log.warn("verifyNonceAndAuthenticatePeerAddress failed. challengeMessage=" + challengeMessage + " / nonceMap=" + tempNonceMap);
|
||||
}
|
||||
} else if (message instanceof GetPeersMessage) {
|
||||
GetPeersMessage getPeersMessage = (GetPeersMessage) message;
|
||||
Address peerAddress = getPeersMessage.address;
|
||||
log.trace("GetPeersMessage from " + peerAddress + " at " + getAddress());
|
||||
boolean verified = verifyNonceAndAuthenticatePeerAddress(getPeersMessage.challengerNonce, peerAddress);
|
||||
if (verified) {
|
||||
setAuthenticated(connection, peerAddress);
|
||||
purgeReportedPeers();
|
||||
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress,
|
||||
new PeersMessage(getAddress(), getAllPeerAddresses()));
|
||||
log.trace("sent PeersMessage to " + peerAddress + " from " + getAddress()
|
||||
+ " with allPeers=" + getAllPeerAddresses());
|
||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||
@Override
|
||||
public void onSuccess(Connection connection) {
|
||||
log.trace("PeersMessage sent successfully from " + getAddress() + " to " + peerAddress);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(@NotNull Throwable throwable) {
|
||||
log.info("PeersMessage sending failed " + throwable.getMessage());
|
||||
removePeer(peerAddress);
|
||||
}
|
||||
});
|
||||
|
||||
// now we add the reported peers to our own set
|
||||
ArrayList<Address> peerAddresses = ((GetPeersMessage) message).peerAddresses;
|
||||
log.trace("Received peers: " + peerAddresses);
|
||||
// remove ourselves
|
||||
addToReportedPeers(peerAddresses, connection);
|
||||
}
|
||||
} else if (message instanceof PeersMessage) {
|
||||
PeersMessage peersMessage = (PeersMessage) message;
|
||||
Address peerAddress = peersMessage.address;
|
||||
log.trace("PeersMessage from " + peerAddress + " at " + getAddress());
|
||||
ArrayList<Address> peerAddresses = peersMessage.peerAddresses;
|
||||
log.trace("Received peers: " + peerAddresses);
|
||||
// remove ourselves
|
||||
addToReportedPeers(peerAddresses, connection);
|
||||
|
||||
// we wait until the handshake is completed before setting the authenticate flag
|
||||
// authentication at both sides of the connection
|
||||
|
||||
log.info("\n\nAuthenticationComplete\nPeer with address " + peerAddress
|
||||
+ " authenticated (" + connection.getObjectId() + "). Took "
|
||||
+ (System.currentTimeMillis() - startAuthTs) + " ms. \n\n");
|
||||
|
||||
setAuthenticated(connection, peerAddress);
|
||||
|
||||
Runnable authenticationCompleteHandler = authenticationCompleteHandlers.remove(connection.getPeerAddress());
|
||||
if (authenticationCompleteHandler != null)
|
||||
authenticationCompleteHandler.run();
|
||||
|
||||
authenticateToNextRandomPeer();
|
||||
}
|
||||
}
|
||||
|
||||
private void addToReportedPeers(ArrayList<Address> peerAddresses, Connection connection) {
|
||||
log.trace("addToReportedPeers");
|
||||
// we disconnect misbehaving nodes trying to send too many peers
|
||||
// reported peers include the peers connected peers which is normally max. 8 but we give some headroom
|
||||
// for safety
|
||||
if (peerAddresses.size() > 1100) {
|
||||
connection.shutDown();
|
||||
} else {
|
||||
peerAddresses.remove(getAddress());
|
||||
reportedPeerAddresses.addAll(peerAddresses);
|
||||
purgeReportedPeers();
|
||||
}
|
||||
}
|
||||
|
||||
private void purgeReportedPeers() {
|
||||
log.trace("purgeReportedPeers");
|
||||
int all = getAllPeerAddresses().size();
|
||||
if (all > 1000) {
|
||||
int diff = all - 100;
|
||||
List<Address> list = getNotConnectedPeerAddresses();
|
||||
for (int i = 0; i < diff; i++) {
|
||||
Address toRemove = list.remove(new Random().nextInt(list.size()));
|
||||
reportedPeerAddresses.remove(toRemove);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private List<Address> getNotConnectedPeerAddresses() {
|
||||
ArrayList<Address> list = new ArrayList<>(getAllPeerAddresses());
|
||||
log.debug("## getNotConnectedPeerAddresses ");
|
||||
log.debug("## reportedPeersList=" + list);
|
||||
authenticatedPeers.values().stream().forEach(e -> list.remove(e.address));
|
||||
log.debug("## connectedPeers=" + authenticatedPeers);
|
||||
log.debug("## reportedPeersList=" + list);
|
||||
return list;
|
||||
}
|
||||
|
||||
private void authenticateToNextRandomPeer() {
|
||||
Utilities.runTimerTaskWithRandomDelay(() -> {
|
||||
Thread.currentThread().setName("DelayAuthenticateToNextRandomPeerTimer-" + new Random().nextInt(1000));
|
||||
if (getAuthenticatedPeers().size() <= MAX_CONNECTIONS) {
|
||||
Address randomNotConnectedPeerAddress = getRandomNotConnectedPeerAddress();
|
||||
if (randomNotConnectedPeerAddress != null) {
|
||||
log.info("We try to build an authenticated connection to a random peer. " + randomNotConnectedPeerAddress);
|
||||
authenticateToPeer(randomNotConnectedPeerAddress, null, () -> authenticateToNextRandomPeer());
|
||||
} else {
|
||||
log.info("No more peers available for connecting.");
|
||||
}
|
||||
} else {
|
||||
log.info("We have already enough connections.");
|
||||
}
|
||||
}, 200, 400, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
public void authenticateToPeer(Address address, @Nullable Runnable authenticationCompleteHandler, @Nullable Runnable faultHandler) {
|
||||
startAuthTs = System.currentTimeMillis();
|
||||
|
||||
if (authenticationCompleteHandler != null)
|
||||
authenticationCompleteHandlers.put(address, authenticationCompleteHandler);
|
||||
|
||||
long nonce = addToMapAndGetNonce(address);
|
||||
SettableFuture<Connection> future = networkNode.sendMessage(address, new RequestAuthenticationMessage(getAddress(), nonce));
|
||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable Connection connection) {
|
||||
log.debug("send RequestAuthenticationMessage succeeded");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(@NotNull Throwable throwable) {
|
||||
log.info("send IdMessage failed. " + throwable.getMessage());
|
||||
removePeer(address);
|
||||
if (faultHandler != null) faultHandler.run();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private long addToMapAndGetNonce(Address peerAddress) {
|
||||
long nonce = new Random().nextLong();
|
||||
while (nonce == 0) {
|
||||
nonce = new Random().nextLong();
|
||||
}
|
||||
log.trace("addToMapAndGetNonce nonceMap=" + nonceMap + " / peerAddress=" + peerAddress);
|
||||
nonceMap.put(peerAddress, nonce);
|
||||
return nonce;
|
||||
}
|
||||
|
||||
private boolean verifyNonceAndAuthenticatePeerAddress(long peersNonce, Address peerAddress) {
|
||||
log.trace("verifyNonceAndAuthenticatePeerAddress nonceMap=" + nonceMap + " / peerAddress=" + peerAddress);
|
||||
Long nonce = nonceMap.remove(peerAddress);
|
||||
return nonce != null && nonce == peersNonce;
|
||||
}
|
||||
|
||||
private void setAuthenticated(Connection connection, Address peerAddress) {
|
||||
log.info("\n\n############################################################\n" +
|
||||
"We are authenticated to:" +
|
||||
"\nconnection=" + connection
|
||||
+ "\nmyAddress=" + getAddress()
|
||||
+ "\npeerAddress= " + peerAddress
|
||||
+ "\n############################################################\n");
|
||||
|
||||
connection.setAuthenticated(peerAddress, connection);
|
||||
|
||||
Peer peer = new Peer(connection);
|
||||
addAuthenticatedPeer(peerAddress, peer);
|
||||
|
||||
peerListeners.stream().forEach(e -> e.onConnectionAuthenticated(connection));
|
||||
|
||||
log.debug("\n### setAuthenticated post connection " + connection);
|
||||
}
|
||||
|
||||
private Address getRandomNotConnectedPeerAddress() {
|
||||
List<Address> list = getNotConnectedPeerAddresses();
|
||||
if (list.size() > 0) {
|
||||
Collections.shuffle(list);
|
||||
return list.get(0);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Maintenance
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
private void processMaintenanceMessage(MaintenanceMessage message, Connection connection) {
|
||||
log.debug("Received message " + message + " at " + getAddress() + " from " + connection.getPeerAddress());
|
||||
if (message instanceof PingMessage) {
|
||||
SettableFuture<Connection> future = networkNode.sendMessage(connection, new PongMessage(((PingMessage) message).nonce));
|
||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||
@Override
|
||||
public void onSuccess(Connection connection) {
|
||||
log.trace("PongMessage sent successfully");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(@NotNull Throwable throwable) {
|
||||
log.info("PongMessage sending failed " + throwable.getMessage());
|
||||
removePeer(connection.getPeerAddress());
|
||||
}
|
||||
});
|
||||
} else if (message instanceof PongMessage) {
|
||||
Peer peer = authenticatedPeers.get(connection.getPeerAddress());
|
||||
if (peer != null) {
|
||||
if (((PongMessage) message).nonce != peer.getPingNonce()) {
|
||||
removePeer(peer.address);
|
||||
log.warn("PongMessage invalid: self/peer " + getAddress() + "/" + connection.getPeerAddress());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Peers
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
private void removePeer(@Nullable Address peerAddress) {
|
||||
reportedPeerAddresses.remove(peerAddress);
|
||||
|
||||
Peer disconnectedPeer;
|
||||
disconnectedPeer = authenticatedPeers.remove(peerAddress);
|
||||
|
||||
if (disconnectedPeer != null)
|
||||
UserThread.execute(() -> peerListeners.stream().forEach(e -> e.onPeerRemoved(peerAddress)));
|
||||
|
||||
log.trace("removePeer [post]");
|
||||
printConnectedPeersMap();
|
||||
printReportedPeersMap();
|
||||
|
||||
log.trace("removePeer nonceMap=" + nonceMap + " / peerAddress=" + peerAddress);
|
||||
nonceMap.remove(peerAddress);
|
||||
}
|
||||
|
||||
private void addAuthenticatedPeer(Address address, Peer peer) {
|
||||
boolean firstPeerAdded;
|
||||
authenticatedPeers.put(address, peer);
|
||||
firstPeerAdded = authenticatedPeers.size() == 1;
|
||||
|
||||
UserThread.execute(() -> peerListeners.stream().forEach(e -> e.onPeerAdded(peer)));
|
||||
|
||||
if (firstPeerAdded)
|
||||
UserThread.execute(() -> peerListeners.stream().forEach(e -> e.onFirstPeerAdded(peer)));
|
||||
|
||||
if (authenticatedPeers.size() > MAX_CONNECTIONS)
|
||||
disconnectOldConnections();
|
||||
|
||||
log.trace("addConnectedPeer [post]");
|
||||
printConnectedPeersMap();
|
||||
}
|
||||
|
||||
private Address getAddress() {
|
||||
return networkNode.getAddress();
|
||||
}
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Utils
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
public void printConnectedPeersMap() {
|
||||
StringBuilder result = new StringBuilder("\nConnected peers for node " + getAddress() + ":");
|
||||
authenticatedPeers.values().stream().forEach(e -> {
|
||||
result.append("\n\t" + e.address);
|
||||
});
|
||||
result.append("\n");
|
||||
log.info(result.toString());
|
||||
}
|
||||
|
||||
public void printReportedPeersMap() {
|
||||
StringBuilder result = new StringBuilder("\nReported peerAddresses for node " + getAddress() + ":");
|
||||
reportedPeerAddresses.stream().forEach(e -> {
|
||||
result.append("\n\t" + e);
|
||||
});
|
||||
result.append("\n");
|
||||
log.info(result.toString());
|
||||
}
|
||||
|
||||
private String getObjectId() {
|
||||
return super.toString().split("@")[1].toString();
|
||||
}
|
||||
|
||||
}
|
@ -1,9 +1,9 @@
|
||||
package io.bitsquare.p2p.routing;
|
||||
package io.bitsquare.p2p.peer;
|
||||
|
||||
import io.bitsquare.p2p.Address;
|
||||
import io.bitsquare.p2p.network.Connection;
|
||||
|
||||
public interface RoutingListener {
|
||||
public interface PeerListener {
|
||||
void onFirstPeerAdded(Peer peer);
|
||||
|
||||
void onPeerAdded(Peer peer);
|
@ -1,4 +1,4 @@
|
||||
package io.bitsquare.p2p.routing.messages;
|
||||
package io.bitsquare.p2p.peer.messages;
|
||||
|
||||
import io.bitsquare.p2p.Message;
|
||||
|
@ -1,4 +1,4 @@
|
||||
package io.bitsquare.p2p.routing.messages;
|
||||
package io.bitsquare.p2p.peer.messages;
|
||||
|
||||
import io.bitsquare.app.Version;
|
||||
import io.bitsquare.p2p.Address;
|
@ -1,4 +1,4 @@
|
||||
package io.bitsquare.p2p.routing.messages;
|
||||
package io.bitsquare.p2p.peer.messages;
|
||||
|
||||
import io.bitsquare.app.Version;
|
||||
import io.bitsquare.p2p.Address;
|
@ -1,4 +1,4 @@
|
||||
package io.bitsquare.p2p.routing.messages;
|
||||
package io.bitsquare.p2p.peer.messages;
|
||||
|
||||
import io.bitsquare.p2p.Message;
|
||||
|
@ -1,4 +1,4 @@
|
||||
package io.bitsquare.p2p.routing.messages;
|
||||
package io.bitsquare.p2p.peer.messages;
|
||||
|
||||
import io.bitsquare.app.Version;
|
||||
import io.bitsquare.p2p.Address;
|
@ -1,4 +1,4 @@
|
||||
package io.bitsquare.p2p.routing.messages;
|
||||
package io.bitsquare.p2p.peer.messages;
|
||||
|
||||
import io.bitsquare.app.Version;
|
||||
|
@ -1,4 +1,4 @@
|
||||
package io.bitsquare.p2p.routing.messages;
|
||||
package io.bitsquare.p2p.peer.messages;
|
||||
|
||||
import io.bitsquare.app.Version;
|
||||
|
@ -1,4 +1,4 @@
|
||||
package io.bitsquare.p2p.routing.messages;
|
||||
package io.bitsquare.p2p.peer.messages;
|
||||
|
||||
import io.bitsquare.app.Version;
|
||||
import io.bitsquare.p2p.Address;
|
@ -8,7 +8,7 @@ import io.bitsquare.common.crypto.Sig;
|
||||
import io.bitsquare.p2p.Address;
|
||||
import io.bitsquare.p2p.network.IllegalRequest;
|
||||
import io.bitsquare.p2p.network.MessageListener;
|
||||
import io.bitsquare.p2p.routing.Routing;
|
||||
import io.bitsquare.p2p.peer.PeerGroup;
|
||||
import io.bitsquare.p2p.storage.data.*;
|
||||
import io.bitsquare.p2p.storage.messages.*;
|
||||
import io.bitsquare.storage.Storage;
|
||||
@ -30,7 +30,7 @@ public class ProtectedExpirableDataStorage {
|
||||
@VisibleForTesting
|
||||
public static int CHECK_TTL_INTERVAL = 10 * 60 * 1000;
|
||||
|
||||
private final Routing routing;
|
||||
private final PeerGroup peerGroup;
|
||||
private final Map<BigInteger, ProtectedData> map = new ConcurrentHashMap<>();
|
||||
private final List<HashMapChangedListener> hashMapChangedListeners = new CopyOnWriteArrayList<>();
|
||||
private ConcurrentHashMap<BigInteger, Integer> sequenceNumberMap = new ConcurrentHashMap<>();
|
||||
@ -44,8 +44,8 @@ public class ProtectedExpirableDataStorage {
|
||||
// Constructor
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
public ProtectedExpirableDataStorage(Routing routing, File storageDir) {
|
||||
this.routing = routing;
|
||||
public ProtectedExpirableDataStorage(PeerGroup peerGroup, File storageDir) {
|
||||
this.peerGroup = peerGroup;
|
||||
|
||||
storage = new Storage<>(storageDir);
|
||||
|
||||
@ -103,7 +103,7 @@ public class ProtectedExpirableDataStorage {
|
||||
if (!shutDownInProgress) {
|
||||
shutDownInProgress = true;
|
||||
timer.cancel();
|
||||
routing.shutDown();
|
||||
peerGroup.shutDown();
|
||||
}
|
||||
}
|
||||
|
||||
@ -231,7 +231,7 @@ public class ProtectedExpirableDataStorage {
|
||||
}
|
||||
|
||||
public void addMessageListener(MessageListener messageListener) {
|
||||
routing.addMessageListener(messageListener);
|
||||
peerGroup.addMessageListener(messageListener);
|
||||
}
|
||||
|
||||
|
||||
@ -324,7 +324,7 @@ public class ProtectedExpirableDataStorage {
|
||||
|
||||
private void broadcast(BroadcastMessage message, @Nullable Address sender) {
|
||||
if (authenticated) {
|
||||
routing.broadcast(message, sender);
|
||||
peerGroup.broadcast(message, sender);
|
||||
log.trace("Broadcast message " + message);
|
||||
} else {
|
||||
log.trace("Broadcast not allowed because we are not authenticated yet. That is normal after received AllDataMessage at startup.");
|
||||
|
@ -8,7 +8,7 @@ import io.bitsquare.p2p.messaging.MailboxMessage;
|
||||
import io.bitsquare.p2p.messaging.SendMailboxMessageListener;
|
||||
import io.bitsquare.p2p.mocks.MockMailboxMessage;
|
||||
import io.bitsquare.p2p.network.LocalhostNetworkNode;
|
||||
import io.bitsquare.p2p.routing.Routing;
|
||||
import io.bitsquare.p2p.peer.PeerGroup;
|
||||
import io.bitsquare.p2p.seed.SeedNode;
|
||||
import io.bitsquare.p2p.storage.data.DataAndSeqNr;
|
||||
import io.bitsquare.p2p.storage.data.ProtectedData;
|
||||
@ -57,7 +57,7 @@ public class P2PServiceTest {
|
||||
|
||||
LocalhostNetworkNode.setSimulateTorDelayTorNode(10);
|
||||
LocalhostNetworkNode.setSimulateTorDelayHiddenService(100);
|
||||
Routing.setMaxConnections(8);
|
||||
PeerGroup.setMaxConnections(8);
|
||||
|
||||
keyRing1 = new KeyRing(new KeyStorage(dir1));
|
||||
keyRing2 = new KeyRing(new KeyStorage(dir2));
|
||||
|
@ -1,7 +1,7 @@
|
||||
package io.bitsquare.p2p.network;
|
||||
|
||||
import io.bitsquare.p2p.Address;
|
||||
import io.bitsquare.p2p.routing.messages.RequestAuthenticationMessage;
|
||||
import io.bitsquare.p2p.peer.messages.RequestAuthenticationMessage;
|
||||
import org.bouncycastle.jce.provider.BouncyCastleProvider;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
|
@ -6,6 +6,8 @@ import io.bitsquare.p2p.P2PService;
|
||||
import io.bitsquare.p2p.P2PServiceListener;
|
||||
import io.bitsquare.p2p.network.Connection;
|
||||
import io.bitsquare.p2p.network.LocalhostNetworkNode;
|
||||
import io.bitsquare.p2p.peer.AuthenticationListener;
|
||||
import io.bitsquare.p2p.peer.PeerGroup;
|
||||
import io.bitsquare.p2p.seed.SeedNode;
|
||||
import org.junit.*;
|
||||
import org.slf4j.Logger;
|
||||
@ -20,8 +22,8 @@ import java.util.concurrent.CountDownLatch;
|
||||
|
||||
// need to define seed node addresses first before using tor version
|
||||
@Ignore
|
||||
public class RoutingTest {
|
||||
private static final Logger log = LoggerFactory.getLogger(RoutingTest.class);
|
||||
public class PeerGroupTest {
|
||||
private static final Logger log = LoggerFactory.getLogger(PeerGroupTest.class);
|
||||
|
||||
boolean useLocalhost = true;
|
||||
private CountDownLatch latch;
|
||||
@ -33,7 +35,7 @@ public class RoutingTest {
|
||||
public void setup() throws InterruptedException {
|
||||
LocalhostNetworkNode.setSimulateTorDelayTorNode(50);
|
||||
LocalhostNetworkNode.setSimulateTorDelayHiddenService(8);
|
||||
Routing.setMaxConnections(100);
|
||||
PeerGroup.setMaxConnections(100);
|
||||
|
||||
seedNodes = new ArrayList<>();
|
||||
if (useLocalhost) {
|
||||
@ -107,7 +109,7 @@ public class RoutingTest {
|
||||
P2PService p2PService1 = seedNode1.getP2PService();
|
||||
latch.await();
|
||||
Thread.sleep(500);
|
||||
Assert.assertEquals(0, p2PService1.getRouting().getAllPeerAddresses().size());
|
||||
Assert.assertEquals(0, p2PService1.getPeerGroup().getAllPeerAddresses().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -180,8 +182,8 @@ public class RoutingTest {
|
||||
});
|
||||
P2PService p2PService2 = seedNode2.getP2PService();
|
||||
latch.await();
|
||||
Assert.assertEquals(1, p2PService1.getRouting().getAllPeerAddresses().size());
|
||||
Assert.assertEquals(1, p2PService2.getRouting().getAllPeerAddresses().size());
|
||||
Assert.assertEquals(1, p2PService1.getPeerGroup().getAllPeerAddresses().size());
|
||||
Assert.assertEquals(1, p2PService2.getPeerGroup().getAllPeerAddresses().size());
|
||||
}
|
||||
|
||||
// @Test
|
||||
@ -214,7 +216,7 @@ public class RoutingTest {
|
||||
latch1.countDown();
|
||||
}
|
||||
};
|
||||
seedNode1.getP2PService().getRouting().addRoutingListener(routingListener1);
|
||||
seedNode1.getP2PService().getPeerGroup().addPeerListener(routingListener1);
|
||||
|
||||
AuthenticationListener routingListener2 = new AuthenticationListener() {
|
||||
@Override
|
||||
@ -223,10 +225,10 @@ public class RoutingTest {
|
||||
latch1.countDown();
|
||||
}
|
||||
};
|
||||
seedNode2.getP2PService().getRouting().addRoutingListener(routingListener2);
|
||||
seedNode2.getP2PService().getPeerGroup().addPeerListener(routingListener2);
|
||||
latch1.await();
|
||||
seedNode1.getP2PService().getRouting().removeRoutingListener(routingListener1);
|
||||
seedNode2.getP2PService().getRouting().removeRoutingListener(routingListener2);
|
||||
seedNode1.getP2PService().getPeerGroup().removePeerListener(routingListener1);
|
||||
seedNode2.getP2PService().getPeerGroup().removePeerListener(routingListener2);
|
||||
|
||||
// wait until Peers msg finished
|
||||
Thread.sleep(sleepTime);
|
||||
@ -236,21 +238,21 @@ public class RoutingTest {
|
||||
// authentication from seedNode3 to seedNode2, then from seedNode2 to seedNode3
|
||||
SeedNode seedNode3 = getAndStartSeedNode(8003);
|
||||
CountDownLatch latch2 = new CountDownLatch(3);
|
||||
seedNode1.getP2PService().getRouting().addRoutingListener(new AuthenticationListener() {
|
||||
seedNode1.getP2PService().getPeerGroup().addPeerListener(new AuthenticationListener() {
|
||||
@Override
|
||||
public void onConnectionAuthenticated(Connection connection) {
|
||||
log.debug("onConnectionAuthenticated " + connection);
|
||||
latch2.countDown();
|
||||
}
|
||||
});
|
||||
seedNode2.getP2PService().getRouting().addRoutingListener(new AuthenticationListener() {
|
||||
seedNode2.getP2PService().getPeerGroup().addPeerListener(new AuthenticationListener() {
|
||||
@Override
|
||||
public void onConnectionAuthenticated(Connection connection) {
|
||||
log.debug("onConnectionAuthenticated " + connection);
|
||||
latch2.countDown();
|
||||
}
|
||||
});
|
||||
seedNode3.getP2PService().getRouting().addRoutingListener(new AuthenticationListener() {
|
||||
seedNode3.getP2PService().getPeerGroup().addPeerListener(new AuthenticationListener() {
|
||||
@Override
|
||||
public void onConnectionAuthenticated(Connection connection) {
|
||||
log.debug("onConnectionAuthenticated " + connection);
|
||||
@ -295,7 +297,7 @@ public class RoutingTest {
|
||||
latch1.countDown();
|
||||
}
|
||||
};
|
||||
seedNode1.getP2PService().getRouting().addRoutingListener(routingListener1);
|
||||
seedNode1.getP2PService().getPeerGroup().addPeerListener(routingListener1);
|
||||
|
||||
AuthenticationListener routingListener2 = new AuthenticationListener() {
|
||||
@Override
|
||||
@ -304,13 +306,13 @@ public class RoutingTest {
|
||||
latch1.countDown();
|
||||
}
|
||||
};
|
||||
seedNode2.getP2PService().getRouting().addRoutingListener(routingListener2);
|
||||
seedNode2.getP2PService().getPeerGroup().addPeerListener(routingListener2);
|
||||
latch1.await();
|
||||
|
||||
// shut down node 2
|
||||
Thread.sleep(sleepTime);
|
||||
seedNode1.getP2PService().getRouting().removeRoutingListener(routingListener1);
|
||||
seedNode2.getP2PService().getRouting().removeRoutingListener(routingListener2);
|
||||
seedNode1.getP2PService().getPeerGroup().removePeerListener(routingListener1);
|
||||
seedNode2.getP2PService().getPeerGroup().removePeerListener(routingListener2);
|
||||
CountDownLatch shutDownLatch1 = new CountDownLatch(1);
|
||||
seedNode2.shutDown(() -> shutDownLatch1.countDown());
|
||||
shutDownLatch1.await();
|
||||
@ -325,7 +327,7 @@ public class RoutingTest {
|
||||
latch3.countDown();
|
||||
}
|
||||
};
|
||||
seedNode2.getP2PService().getRouting().addRoutingListener(routingListener2);
|
||||
seedNode2.getP2PService().getPeerGroup().addPeerListener(routingListener2);
|
||||
latch3.await();
|
||||
|
||||
Thread.sleep(sleepTime);
|
||||
@ -347,7 +349,7 @@ public class RoutingTest {
|
||||
|
||||
latch = new CountDownLatch(i * 2);
|
||||
authentications += (i * 2);
|
||||
node.getP2PService().getRouting().addRoutingListener(new AuthenticationListener() {
|
||||
node.getP2PService().getPeerGroup().addPeerListener(new AuthenticationListener() {
|
||||
@Override
|
||||
public void onConnectionAuthenticated(Connection connection) {
|
||||
log.debug("onConnectionAuthenticated " + connection);
|
||||
@ -364,8 +366,8 @@ public class RoutingTest {
|
||||
// total authentications at com nodes = 90, System load (nr. threads/used memory (MB)): 170/20
|
||||
// total authentications at 20 nodes = 380, System load (nr. threads/used memory (MB)): 525/46
|
||||
for (int i = 0; i < length; i++) {
|
||||
nodes[i].getP2PService().getRouting().printConnectedPeersMap();
|
||||
nodes[i].getP2PService().getRouting().printReportedPeersMap();
|
||||
nodes[i].getP2PService().getPeerGroup().printConnectedPeersMap();
|
||||
nodes[i].getP2PService().getPeerGroup().printReportedPeersMap();
|
||||
}
|
||||
|
||||
CountDownLatch shutDownLatch = new CountDownLatch(length);
|
@ -9,7 +9,7 @@ import io.bitsquare.p2p.Address;
|
||||
import io.bitsquare.p2p.TestUtils;
|
||||
import io.bitsquare.p2p.mocks.MockMessage;
|
||||
import io.bitsquare.p2p.network.NetworkNode;
|
||||
import io.bitsquare.p2p.routing.Routing;
|
||||
import io.bitsquare.p2p.peer.PeerGroup;
|
||||
import io.bitsquare.p2p.storage.data.DataAndSeqNr;
|
||||
import io.bitsquare.p2p.storage.data.ExpirableMailboxPayload;
|
||||
import io.bitsquare.p2p.storage.data.ProtectedData;
|
||||
@ -36,7 +36,7 @@ public class ProtectedDataStorageTest {
|
||||
boolean useClearNet = true;
|
||||
private ArrayList<Address> seedNodes = new ArrayList<>();
|
||||
private NetworkNode networkNode1;
|
||||
private Routing routing1;
|
||||
private PeerGroup peerGroup1;
|
||||
private EncryptionService encryptionService1, encryptionService2;
|
||||
private ProtectedExpirableDataStorage dataStorage1;
|
||||
private KeyPair storageSignatureKeyPair1, storageSignatureKeyPair2;
|
||||
@ -64,8 +64,8 @@ public class ProtectedDataStorageTest {
|
||||
storageSignatureKeyPair1 = keyRing1.getSignatureKeyPair();
|
||||
encryptionService1 = new EncryptionService(keyRing1);
|
||||
networkNode1 = TestUtils.getAndStartSeedNode(8001, encryptionService1, keyRing1, useClearNet, seedNodes).getP2PService().getNetworkNode();
|
||||
routing1 = new Routing(networkNode1, seedNodes);
|
||||
dataStorage1 = new ProtectedExpirableDataStorage(routing1, new File("dummy"));
|
||||
peerGroup1 = new PeerGroup(networkNode1, seedNodes);
|
||||
dataStorage1 = new ProtectedExpirableDataStorage(peerGroup1, new File("dummy"));
|
||||
|
||||
// for mailbox
|
||||
keyRing2 = new KeyRing(new KeyStorage(dir2));
|
||||
@ -80,7 +80,7 @@ public class ProtectedDataStorageTest {
|
||||
public void tearDown() throws InterruptedException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException {
|
||||
Thread.sleep(sleepTime);
|
||||
if (dataStorage1 != null) dataStorage1.shutDown();
|
||||
if (routing1 != null) routing1.shutDown();
|
||||
if (peerGroup1 != null) peerGroup1.shutDown();
|
||||
|
||||
if (networkNode1 != null) {
|
||||
CountDownLatch shutDownLatch = new CountDownLatch(1);
|
||||
@ -107,7 +107,7 @@ public class ProtectedDataStorageTest {
|
||||
public void testExpirableData() throws InterruptedException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException, NoSuchProviderException {
|
||||
ProtectedExpirableDataStorage.CHECK_TTL_INTERVAL = 10;
|
||||
// CHECK_TTL_INTERVAL is used in constructor of ProtectedExpirableDataStorage so we recreate it here
|
||||
dataStorage1 = new ProtectedExpirableDataStorage(routing1, new File("dummy"));
|
||||
dataStorage1 = new ProtectedExpirableDataStorage(peerGroup1, new File("dummy"));
|
||||
mockData.ttl = 50;
|
||||
|
||||
ProtectedData data = dataStorage1.getDataWithSignedSeqNr(mockData, storageSignatureKeyPair1);
|
||||
|
Loading…
x
Reference in New Issue
Block a user