Use extra classes for seednode

This commit is contained in:
Manfred Karrer 2016-01-01 20:26:16 +01:00
parent e456883f9b
commit 97629cb0cd
15 changed files with 160 additions and 106 deletions

View file

@ -113,7 +113,8 @@ public class ArbitratorManager {
}
public void shutDown() {
MoreExecutors.shutdownAndAwaitTermination(republishArbitratorExecutor, 500, TimeUnit.MILLISECONDS);
if (republishArbitratorExecutor != null)
MoreExecutors.shutdownAndAwaitTermination(republishArbitratorExecutor, 500, TimeUnit.MILLISECONDS);
}
public void onAllServicesInitialized() {
@ -189,7 +190,7 @@ public class ArbitratorManager {
resultHandler.handleResult();
if (arbitratorsObservableMap.size() > 0)
UserThread.runAfter(() -> applyArbitrators(), 1);
UserThread.runAfter(() -> applyArbitrators(), 100, TimeUnit.MILLISECONDS);
},
errorMessageHandler::handleErrorMessage);
}

View file

@ -153,7 +153,7 @@ public class TradeManager {
firstPeerAuthenticatedListener = new FirstPeerAuthenticatedListener() {
@Override
public void onFirstPeerAuthenticated() {
// give a bit delay to be sure other listeners has dont its jobs
// give a bit delay to be sure other listeners have executed its work
UserThread.runAfter(() -> initPendingTrades(), 100, TimeUnit.MILLISECONDS);
}
};

View file

@ -53,36 +53,37 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
private final Optional<KeyRing> optionalKeyRing;
// set in init
private NetworkNode networkNode;
private PeerManager peerManager;
private P2PDataStorage dataStorage;
protected NetworkNode networkNode;
protected PeerManager peerManager;
protected P2PDataStorage dataStorage;
private final CopyOnWriteArraySet<DecryptedMailListener> decryptedMailListeners = new CopyOnWriteArraySet<>();
private final CopyOnWriteArraySet<DecryptedMailboxListener> decryptedMailboxListeners = new CopyOnWriteArraySet<>();
private final CopyOnWriteArraySet<P2PServiceListener> p2pServiceListeners = new CopyOnWriteArraySet<>();
protected final CopyOnWriteArraySet<P2PServiceListener> p2pServiceListeners = new CopyOnWriteArraySet<>();
private final Map<DecryptedMsgWithPubKey, ProtectedMailboxData> mailboxMap = new HashMap<>();
private final Set<Address> authenticatedPeerAddresses = new HashSet<>();
private final CopyOnWriteArraySet<Runnable> shutDownResultHandlers = new CopyOnWriteArraySet<>();
private final BooleanProperty hiddenServicePublished = new SimpleBooleanProperty();
private final BooleanProperty requestingDataCompleted = new SimpleBooleanProperty();
protected final BooleanProperty requestingDataCompleted = new SimpleBooleanProperty();
private final BooleanProperty firstPeerAuthenticated = new SimpleBooleanProperty();
private final IntegerProperty numAuthenticatedPeers = new SimpleIntegerProperty(0);
private Address connectedSeedNode;
protected Address connectedSeedNode;
private volatile boolean shutDownInProgress;
private boolean shutDownComplete;
@SuppressWarnings("FieldCanBeLocal")
private MonadicBinding<Boolean> readyForAuthentication;
private final Storage<Address> dbStorage;
private Address myOnionAddress;
private RequestDataManager requestDataManager;
private Set<Address> seedNodeAddresses;
protected RequestDataManager requestDataManager;
protected Set<Address> seedNodeAddresses;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
// Called also from SeedNodeP2PService
@Inject
public P2PService(SeedNodesRepository seedNodesRepository,
@Named(ProgramArguments.PORT_KEY) int port,
@ -105,16 +106,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
init(networkId, storageDir);
}
// Used for seed node
public P2PService(SeedNodesRepository seedNodesRepository,
int port,
File torDir,
boolean useLocalhost,
int networkId,
File storageDir) {
this(seedNodesRepository, port, torDir, useLocalhost, networkId, storageDir, null, null);
}
private void init(int networkId, File storageDir) {
Log.traceCall();
@ -131,7 +122,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
networkNode.addMessageListener(this);
// peer group
peerManager = new PeerManager(networkNode);
peerManager = createPeerManager();
peerManager.setSeedNodeAddresses(seedNodeAddresses);
peerManager.addAuthenticationListener(this);
@ -140,19 +131,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
dataStorage.addHashMapChangedListener(this);
// Request initial data manager
requestDataManager = new RequestDataManager(networkNode, dataStorage, peerManager, new RequestDataManager.Listener() {
@Override
public void onNoSeedNodeAvailable() {
p2pServiceListeners.stream().forEach(e -> e.onNoSeedNodeAvailable());
}
@Override
public void onDataReceived(Address seedNode) {
connectedSeedNode = seedNode;
requestingDataCompleted.set(true);
p2pServiceListeners.stream().forEach(e -> e.onRequestingDataCompleted());
}
});
requestDataManager = createRequestDataManager();
peerManager.addAuthenticationListener(requestDataManager);
// Test multiple states to check when we are ready for authenticateSeedNode
@ -167,21 +146,35 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
});
}
protected PeerManager createPeerManager() {
return new PeerManager(networkNode);
}
protected RequestDataManager createRequestDataManager() {
return new RequestDataManager(networkNode, dataStorage, peerManager, getRequestDataManager());
}
protected RequestDataManager.Listener getRequestDataManager() {
return new RequestDataManager.Listener() {
@Override
public void onNoSeedNodeAvailable() {
p2pServiceListeners.stream().forEach(e -> e.onNoSeedNodeAvailable());
}
@Override
public void onDataReceived(Address seedNode) {
connectedSeedNode = seedNode;
requestingDataCompleted.set(true);
p2pServiceListeners.stream().forEach(e -> e.onRequestingDataCompleted());
}
};
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void startAsSeedNode(Address mySeedNodeAddress, @Nullable P2PServiceListener listener) {
Log.traceCall();
// we remove ourselves from the list of seed nodes
seedNodeAddresses.remove(mySeedNodeAddress);
peerManager.setIsSeedNode(true);
requestDataManager.setIsSeedNode(true);
start(listener);
}
public void start(@Nullable P2PServiceListener listener) {
Log.traceCall();

View file

@ -0,0 +1,37 @@
package io.bitsquare.p2p;
import io.bitsquare.p2p.peers.PeerManager;
import io.bitsquare.p2p.peers.RequestDataManager;
import io.bitsquare.p2p.peers.SeedNodePeerManager;
import io.bitsquare.p2p.peers.SeedNodeRequestDataManager;
import io.bitsquare.p2p.seed.SeedNodesRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
public class SeedNodeP2PService extends P2PService {
private static final Logger log = LoggerFactory.getLogger(SeedNodeP2PService.class);
public SeedNodeP2PService(SeedNodesRepository seedNodesRepository,
Address mySeedNodeAddress,
File torDir,
boolean useLocalhost,
int networkId,
File storageDir) {
super(seedNodesRepository, mySeedNodeAddress.port, torDir, useLocalhost, networkId, storageDir, null, null);
// we remove ourselves from the list of seed nodes
seedNodeAddresses.remove(mySeedNodeAddress);
}
@Override
protected PeerManager createPeerManager() {
return new SeedNodePeerManager(networkNode);
}
@Override
protected RequestDataManager createRequestDataManager() {
return new SeedNodeRequestDataManager(networkNode, dataStorage, peerManager, getRequestDataManager());
}
}

View file

@ -205,12 +205,13 @@ public class AuthenticationHandshake implements MessageListener {
}
});
timeoutTimer = UserThread.runAfter(() -> {
failed(new AuthenticationException("Authentication to peer "
+ peerAddress
+ " failed because of a timeout. " +
"We did not get an AuthenticationChallenge message responded after 30 sec."));
}, 30, TimeUnit.SECONDS);
if (timeoutTimer != null)
timeoutTimer.cancel();
timeoutTimer = UserThread.runAfter(() -> failed(new AuthenticationException("Authentication to peer "
+ peerAddress
+ " failed because of a timeout. " +
"We did not get an AuthenticationChallenge message responded after 30 sec.")), 30);
return resultFutureOptional.get();
}
@ -267,13 +268,11 @@ public class AuthenticationHandshake implements MessageListener {
}
});
timeoutTimer = UserThread.runAfter(() -> {
failed(new AuthenticationException("Authentication of peer "
+ peerAddress
+ " failed because of a timeout. " +
"We did not get an AuthenticationFinalResponse message responded after 30 sec.\n" +
""));
}, 30, TimeUnit.SECONDS);
timeoutTimer = UserThread.runAfter(() -> failed(new AuthenticationException("Authentication of peer "
+ peerAddress
+ " failed because of a timeout. " +
"We did not get an AuthenticationFinalResponse message responded after 30 sec.\n" +
"")), 30, TimeUnit.SECONDS);
} else {
log.info("AuthenticationHandshake (peerAddress={}) already shut down before we could sent " +

View file

@ -55,7 +55,6 @@ public class MaintenanceManager implements MessageListener {
executor = Utilities.getScheduledThreadPoolExecutor("MaintenanceManager", 1, 10, 5);
executor.schedule(() -> {
UserThread.execute(() -> pingPeers());
return null;
}, 5, TimeUnit.MINUTES);
}

View file

@ -2,9 +2,11 @@ package io.bitsquare.p2p.peers;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.bitsquare.app.Log;
import io.bitsquare.common.UserThread;
import io.bitsquare.common.util.Utilities;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.network.Connection;
@ -21,6 +23,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@ -34,6 +37,7 @@ public class PeerExchangeManager implements MessageListener {
private final Supplier<Map<Address, Peer>> authenticatedPeersSupplier;
private final Consumer<Address> removePeerConsumer;
private final BiConsumer<HashSet<ReportedPeer>, Connection> addReportedPeersConsumer;
private final ScheduledThreadPoolExecutor executor;
private Timer getPeersTimer;
@ -54,7 +58,9 @@ public class PeerExchangeManager implements MessageListener {
this.addReportedPeersConsumer = addReportedPeersConsumer;
networkNode.addMessageListener(this);
startGetPeersTimer();
executor = Utilities.getScheduledThreadPoolExecutor("PeerExchangeManager", 1, 10, 5);
executor.schedule(() -> UserThread.execute(() -> trySendGetPeersRequest()), 4, TimeUnit.MINUTES);
}
public void shutDown() {
@ -63,6 +69,7 @@ public class PeerExchangeManager implements MessageListener {
getPeersTimer.cancel();
networkNode.removeMessageListener(this);
MoreExecutors.shutdownAndAwaitTermination(executor, 500, TimeUnit.MILLISECONDS);
}
@ -103,16 +110,6 @@ public class PeerExchangeManager implements MessageListener {
}
}
private void startGetPeersTimer() {
Log.traceCall();
if (getPeersTimer != null)
getPeersTimer.cancel();
getPeersTimer = UserThread.runAfterRandomDelay(() -> {
trySendGetPeersRequest();
startGetPeersTimer();
}, 2, 4, TimeUnit.MINUTES);
}
private void trySendGetPeersRequest() {
Set<Peer> connectedPeersList = new HashSet<>(authenticatedPeersSupplier.get().values());

View file

@ -63,7 +63,6 @@ public class PeerManager implements MessageListener, ConnectionListener {
private final List<Address> remainingSeedNodes = new ArrayList<>();
private Optional<Set<Address>> seedNodeAddressesOptional = Optional.empty();
private Timer connectToSeedNodeTimer;
private boolean isSeedNode;
///////////////////////////////////////////////////////////////////////////////////////////
@ -90,10 +89,6 @@ public class PeerManager implements MessageListener, ConnectionListener {
startConnectToSeedNodeTimer();
}
public void setIsSeedNode(boolean isSeedNode) {
this.isSeedNode = isSeedNode;
}
///////////////////////////////////////////////////////////////////////////////////////////
// ConnectionListener implementation
@ -364,7 +359,7 @@ public class PeerManager implements MessageListener, ConnectionListener {
}
);
} else if (reportedPeersAvailable() && !isSeedNode) {
} else if (reportedPeersAvailable() && !(this instanceof SeedNodePeerManager)) {
authenticateToRemainingReportedPeer();
} else {
log.info("We don't have seed nodes or reported peers available. " +
@ -372,11 +367,11 @@ public class PeerManager implements MessageListener, ConnectionListener {
"none available with the reported peers.");
if (seedNodeAddressesOptional.isPresent()) {
resetRemainingSeedNodes();
if (remainingSeedNodes.isEmpty() && !isSeedNode) {
if (remainingSeedNodes.isEmpty() && !(this instanceof SeedNodePeerManager)) {
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(),
10, 20, TimeUnit.SECONDS);
}
} else if (!isSeedNode) {
} else if (!(this instanceof SeedNodePeerManager)) {
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(),
10, 20, TimeUnit.SECONDS);
} else {
@ -660,7 +655,7 @@ public class PeerManager implements MessageListener, ConnectionListener {
List<Connection> authenticatedConnections = allConnections.stream()
.filter(e -> e.isAuthenticated())
.filter(e -> e.getConnectionPriority() == ConnectionPriority.PASSIVE)
.filter(e -> !isSeedNode || !isAuthConnectionSeedNode(e))
.filter(e -> !(this instanceof SeedNodePeerManager) || !isAuthConnectionSeedNode(e))
.collect(Collectors.toList());
if (authenticatedConnections.size() == 0) {
@ -670,7 +665,7 @@ public class PeerManager implements MessageListener, ConnectionListener {
authenticatedConnections = allConnections.stream()
.filter(e -> e.isAuthenticated())
.filter(e -> e.getConnectionPriority() == ConnectionPriority.PASSIVE || e.getConnectionPriority() == ConnectionPriority.ACTIVE)
.filter(e -> !isSeedNode || !isAuthConnectionSeedNode(e))
.filter(e -> !(this instanceof SeedNodePeerManager) || !isAuthConnectionSeedNode(e))
.collect(Collectors.toList());
if (authenticatedConnections.size() == 0) {
@ -680,7 +675,7 @@ public class PeerManager implements MessageListener, ConnectionListener {
authenticatedConnections = allConnections.stream()
.filter(e -> e.isAuthenticated())
.filter(e -> e.getConnectionPriority() != ConnectionPriority.AUTH_REQUEST)
.filter(e -> !isSeedNode || !isAuthConnectionSeedNode(e))
.filter(e -> !(this instanceof SeedNodePeerManager) || !isAuthConnectionSeedNode(e))
.collect(Collectors.toList());
}
}

View file

@ -41,13 +41,12 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
private final NetworkNode networkNode;
private final P2PDataStorage dataStorage;
protected final P2PDataStorage dataStorage;
private final PeerManager peerManager;
private final Listener listener;
private Optional<Address> optionalConnectedSeedNodeAddress = Optional.empty();
private Optional<Collection<Address>> optionalSeedNodeAddresses = Optional.empty();
private boolean isSeedNode;
///////////////////////////////////////////////////////////////////////////////////////////
@ -74,10 +73,6 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void setIsSeedNode(boolean isSeedNode) {
this.isSeedNode = isSeedNode;
}
public void requestData(Collection<Address> seedNodeAddresses) {
if (!optionalSeedNodeAddresses.isPresent())
optionalSeedNodeAddresses = Optional.of(seedNodeAddresses);
@ -159,15 +154,9 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
@Override
public void onPeerAuthenticated(Address peerAddress, Connection connection) {
if (isSeedNode && dataStorage.getMap().isEmpty()) {
// We are the seed node and entering the network we request the data from the peer
UserThread.runAfterRandomDelay(()
-> requestDataFromAuthenticatedSeedNode(peerAddress, connection), 2, 5, TimeUnit.SECONDS);
}
optionalConnectedSeedNodeAddress.ifPresent(connectedSeedNodeAddress -> {
// We only request the data again if we have initiated the authentication (ConnectionPriority.ACTIVE)
// We delay a bit to be sure that the authentication state is applied to all threads
// We delay a bit to be sure that the authentication state is applied to all listeners
if (connectedSeedNodeAddress.equals(peerAddress) && connection.getConnectionPriority() == ConnectionPriority.ACTIVE) {
// We are the node (can be a seed node as well) which requested the authentication
UserThread.runAfter(()
@ -177,7 +166,7 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
}
// 5. Step after authentication to first seed node we request again the data
private void requestDataFromAuthenticatedSeedNode(Address peerAddress, Connection connection) {
protected void requestDataFromAuthenticatedSeedNode(Address peerAddress, Connection connection) {
Log.traceCall(peerAddress.toString());
// We have to request the data again as we might have missed pushed data in the meantime
SettableFuture<Connection> future = networkNode.sendMessage(connection, new DataRequest());

View file

@ -0,0 +1,13 @@
package io.bitsquare.p2p.peers;
import io.bitsquare.p2p.network.NetworkNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SeedNodePeerManager extends PeerManager {
private static final Logger log = LoggerFactory.getLogger(SeedNodePeerManager.class);
public SeedNodePeerManager(NetworkNode networkNode) {
super(networkNode);
}
}

View file

@ -0,0 +1,29 @@
package io.bitsquare.p2p.peers;
import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.storage.P2PDataStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
public class SeedNodeRequestDataManager extends RequestDataManager {
private static final Logger log = LoggerFactory.getLogger(SeedNodeRequestDataManager.class);
public SeedNodeRequestDataManager(NetworkNode networkNode, P2PDataStorage dataStorage, PeerManager peerManager, Listener listener) {
super(networkNode, dataStorage, peerManager, listener);
}
@Override
public void onPeerAuthenticated(Address peerAddress, Connection connection) {
//TODO not clear which use case is handles here...
if (dataStorage.getMap().isEmpty()) {
UserThread.runAfterRandomDelay(()
-> requestDataFromAuthenticatedSeedNode(peerAddress, connection), 2, 5, TimeUnit.SECONDS);
}
super.onPeerAuthenticated(peerAddress, connection);
}
}

View file

@ -7,6 +7,7 @@ import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.P2PService;
import io.bitsquare.p2p.P2PServiceListener;
import io.bitsquare.p2p.SeedNodeP2PService;
import io.bitsquare.p2p.peers.PeerManager;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
@ -28,7 +29,7 @@ public class SeedNode {
private Address mySeedNodeAddress = new Address("localhost:8001");
private boolean useLocalhost = false;
private Set<Address> progArgSeedNodes;
private P2PService p2PService;
private SeedNodeP2PService seedNodeP2PService;
private boolean stopped;
private final String defaultUserDataDir;
@ -134,13 +135,14 @@ public class SeedNode {
if (torDir.mkdirs())
log.info("Created torDir at " + torDir.getAbsolutePath());
p2PService = new P2PService(seedNodesRepository, mySeedNodeAddress.port, torDir, useLocalhost, networkId, storageDir);
p2PService.startAsSeedNode(mySeedNodeAddress, listener);
seedNodeP2PService = new SeedNodeP2PService(seedNodesRepository, mySeedNodeAddress, torDir, useLocalhost, networkId, storageDir);
seedNodeP2PService.start(listener);
}
public P2PService getP2PService() {
@VisibleForTesting
public P2PService getSeedNodeP2PService() {
Log.traceCall();
return p2PService;
return seedNodeP2PService;
}
public void shutDown() {
@ -154,7 +156,7 @@ public class SeedNode {
if (!stopped) {
stopped = true;
p2PService.shutDown(() -> {
seedNodeP2PService.shutDown(() -> {
if (shutDownCompleteHandler != null) UserThread.execute(shutDownCompleteHandler);
});
}

View file

@ -83,7 +83,7 @@ public class P2PServiceTest {
}
seedNode1 = TestUtils.getAndStartSeedNode(8001, useLocalhost, seedNodes);
p2PService1 = seedNode1.getP2PService();
p2PService1 = seedNode1.getSeedNodeP2PService();
p2PService2 = TestUtils.getAndAuthenticateP2PService(8002, encryptionService2, keyRing2, useLocalhost, seedNodes);
}

View file

@ -109,7 +109,7 @@ public class PeerManagerTest {
}
});
P2PService p2PService1 = seedNode1.getP2PService();
P2PService p2PService1 = seedNode1.getSeedNodeP2PService();
latch.await();
Thread.sleep(500);
Assert.assertEquals(0, p2PService1.getPeerManager().getAuthenticatedAndReportedPeers().size());
@ -158,7 +158,7 @@ public class PeerManagerTest {
}
});
P2PService p2PService1 = seedNode1.getP2PService();
P2PService p2PService1 = seedNode1.getSeedNodeP2PService();
Thread.sleep(500);
@ -192,7 +192,7 @@ public class PeerManagerTest {
}
});
P2PService p2PService2 = seedNode2.getP2PService();
P2PService p2PService2 = seedNode2.getSeedNodeP2PService();
latch.await();
Assert.assertEquals(1, p2PService1.getPeerManager().getAuthenticatedAndReportedPeers().size());
Assert.assertEquals(1, p2PService2.getPeerManager().getAuthenticatedAndReportedPeers().size());

View file

@ -64,7 +64,7 @@ public class ProtectedDataStorageTest {
storageSignatureKeyPair1 = keyRing1.getSignatureKeyPair();
encryptionService1 = new EncryptionService(keyRing1);
networkNode1 = TestUtils.getAndStartSeedNode(8001, useClearNet, seedNodes).getP2PService().getNetworkNode();
networkNode1 = TestUtils.getAndStartSeedNode(8001, useClearNet, seedNodes).getSeedNodeP2PService().getNetworkNode();
peerManager1 = new PeerManager(networkNode1);
dataStorage1 = new P2PDataStorage(peerManager1, networkNode1, new File("dummy"));