Renamings, cleanup

This commit is contained in:
Manfred Karrer 2015-11-07 01:06:39 +01:00
parent 79f3ac99cf
commit bce5460aa4
21 changed files with 159 additions and 153 deletions

View File

@ -53,7 +53,6 @@ import javafx.scene.layout.StackPane;
import javafx.stage.Modality;
import javafx.stage.Stage;
import javafx.stage.StageStyle;
import org.bitcoinj.crypto.DRMWorkaround;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.controlsfx.dialog.Dialogs;
import org.reactfx.EventStreams;
@ -115,10 +114,8 @@ public class BitsquareApp extends Application {
Thread.setDefaultUncaughtExceptionHandler(handler);
Thread.currentThread().setUncaughtExceptionHandler(handler);
DRMWorkaround.maybeDisableExportControls();
Security.addProvider(new BouncyCastleProvider());
try {
// Use CrashFX for report crash logs
/*CrashFX.setup("Bitsquare/" + Version.VERSION,
@ -220,7 +217,11 @@ public class BitsquareApp extends Application {
try {
throwable.printStackTrace();
try {
new Popup().error(throwable.getMessage()).show();
String message = throwable.getMessage();
if (message != null)
new Popup().error(message).show();
else
new Popup().error(throwable.toString()).show();
} catch (Throwable throwable3) {
log.error("Error at displaying Throwable.");
throwable3.printStackTrace();

View File

@ -337,7 +337,7 @@ public class Popup {
}
protected void setTruncatedMessage() {
if (message.length() > 500)
if (message != null && message.length() > 500)
truncatedMessage = message.substring(0, 500) + "...";
else
truncatedMessage = message;

View File

@ -10,11 +10,9 @@ public final class SealedAndSignedMessage implements MailboxMessage {
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
public final SealedAndSigned sealedAndSigned;
public final Address peerAddress;
public SealedAndSignedMessage(SealedAndSigned sealedAndSigned, Address peerAddress) {
public SealedAndSignedMessage(SealedAndSigned sealedAndSigned) {
this.sealedAndSigned = sealedAndSigned;
this.peerAddress = peerAddress;
}
@Override

View File

@ -58,7 +58,7 @@ public class P2PService implements SetupListener {
private final boolean useLocalhost;
@Nullable
private final EncryptionService encryptionService;
private KeyRing keyRing;
private final KeyRing keyRing;
private final File storageDir;
private final NetworkStatistics networkStatistics;
@ -71,12 +71,12 @@ public class P2PService implements SetupListener {
private final Map<DecryptedMsgWithPubKey, ProtectedMailboxData> mailboxMap = new ConcurrentHashMap<>();
private volatile boolean shutDownInProgress;
private Address connectedSeedNode;
private Set<Address> authenticatedPeerAddresses = new HashSet<>();
private final Set<Address> authenticatedPeerAddresses = new HashSet<>();
private boolean shutDownComplete;
private CopyOnWriteArraySet<Runnable> shutDownResultHandlers = new CopyOnWriteArraySet<>();
private BooleanProperty hiddenServicePublished = new SimpleBooleanProperty();
private BooleanProperty allDataLoaded = new SimpleBooleanProperty();
private BooleanProperty authenticated = new SimpleBooleanProperty();
private final CopyOnWriteArraySet<Runnable> shutDownResultHandlers = new CopyOnWriteArraySet<>();
private final BooleanProperty hiddenServicePublished = new SimpleBooleanProperty();
private final BooleanProperty allDataLoaded = new SimpleBooleanProperty();
private final BooleanProperty authenticated = new SimpleBooleanProperty();
private MonadicBinding<Boolean> readyForAuthentication;
@ -136,7 +136,7 @@ public class P2PService implements SetupListener {
authenticatedPeerAddresses.add(peerAddress);
authenticated.set(true);
dataStorage.setAuthenticated(true);
dataStorage.setAuthenticated();
UserThread.execute(() -> p2pServiceListeners.stream().forEach(e -> e.onAuthenticated()));
}
@ -277,7 +277,7 @@ public class P2PService implements SetupListener {
}
@Override
public void onFailure(Throwable throwable) {
public void onFailure(@NotNull Throwable throwable) {
log.info("Send GetAllDataMessage to " + candidate + " failed. " +
"That is expected if other seed nodes are offline." +
"\nException:" + throwable.getMessage());
@ -396,7 +396,7 @@ public class P2PService implements SetupListener {
if (encryptionService != null) {
try {
SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage(
encryptionService.encryptAndSign(pubKeyRing, message), peerAddress);
encryptionService.encryptAndSign(pubKeyRing, message));
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, sealedAndSignedMessage);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
@ -405,7 +405,7 @@ public class P2PService implements SetupListener {
}
@Override
public void onFailure(Throwable throwable) {
public void onFailure(@NotNull Throwable throwable) {
throwable.printStackTrace();
UserThread.execute(() -> sendMailMessageListener.onFault());
}
@ -440,7 +440,7 @@ public class P2PService implements SetupListener {
if (encryptionService != null) {
try {
SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage(
encryptionService.encryptAndSign(peersPubKeyRing, message), peerAddress);
encryptionService.encryptAndSign(peersPubKeyRing, message));
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, sealedAndSignedMessage);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
@ -450,7 +450,7 @@ public class P2PService implements SetupListener {
}
@Override
public void onFailure(Throwable throwable) {
public void onFailure(@NotNull Throwable throwable) {
log.trace("SendEncryptedMailboxMessage onFailure");
log.debug(throwable.toString());
log.info("We cannot send message to peer. Peer might be offline. We will store message in mailbox.");
@ -488,15 +488,14 @@ public class P2PService implements SetupListener {
}
}
public boolean addMailboxData(ExpirableMailboxPayload expirableMailboxPayload, PublicKey receiversPublicKey) {
private void addMailboxData(ExpirableMailboxPayload expirableMailboxPayload, PublicKey receiversPublicKey) {
checkAuthentication();
try {
return dataStorage.add(dataStorage.getMailboxDataWithSignedSeqNr(expirableMailboxPayload,
dataStorage.add(dataStorage.getMailboxDataWithSignedSeqNr(expirableMailboxPayload,
keyRing.getSignatureKeyPair(), receiversPublicKey), networkNode.getAddress());
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
return false;
}
}
@ -525,15 +524,14 @@ public class P2PService implements SetupListener {
}
}
public boolean removeMailboxData(ExpirableMailboxPayload expirableMailboxPayload, PublicKey receiversPublicKey) {
private void removeMailboxData(ExpirableMailboxPayload expirableMailboxPayload, PublicKey receiversPublicKey) {
checkAuthentication();
try {
return dataStorage.removeMailboxData(dataStorage.getMailboxDataWithSignedSeqNr(expirableMailboxPayload,
dataStorage.removeMailboxData(dataStorage.getMailboxDataWithSignedSeqNr(expirableMailboxPayload,
keyRing.getSignatureKeyPair(), receiversPublicKey), networkNode.getAddress());
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
return false;
}
}

View File

@ -23,16 +23,15 @@ public class Utils {
server.close();
return port;
} catch (IOException ignored) {
} finally {
return new Random().nextInt(10000) + 50000;
}
}
}
public static byte[] compress(Serializable input) {
return compress(ByteArrayUtils.objectToByteArray(input));
}
public static byte[] compress(byte[] input) {
private static byte[] compress(byte[] input) {
Deflater compressor = new Deflater();
compressor.setLevel(Deflater.BEST_SPEED);
compressor.setInput(input);
@ -45,19 +44,19 @@ public class Utils {
}
try {
bos.close();
} catch (IOException e) {
} catch (IOException ignored) {
}
return bos.toByteArray();
}
public static byte[] decompress(byte[] compressedData, int offset, int length) {
Inflater decompressor = new Inflater();
decompressor.setInput(compressedData, offset, length);
private static byte[] decompress(byte[] compressedData, int offset, int length) {
Inflater inflater = new Inflater();
inflater.setInput(compressedData, offset, length);
ByteArrayOutputStream bos = new ByteArrayOutputStream(length);
byte[] buf = new byte[8192];
while (!decompressor.finished()) {
while (!inflater.finished()) {
try {
int count = decompressor.inflate(buf);
int count = inflater.inflate(buf);
bos.write(buf, 0, count);
} catch (DataFormatException e) {
e.printStackTrace();

View File

@ -8,7 +8,6 @@ import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.Utils;
import io.bitsquare.p2p.network.messages.CloseConnectionMessage;
import javafx.concurrent.Task;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -59,7 +58,7 @@ public class Connection {
//TODO got java.util.zip.DataFormatException: invalid distance too far back
// java.util.zip.DataFormatException: invalid literal/lengths set
// use GZIPInputStream but problems with blocking
boolean useCompression = false;
private final boolean useCompression = false;
///////////////////////////////////////////////////////////////////////////////////////////
@ -182,23 +181,21 @@ public class Connection {
shutDown(true, null);
}
void shutDown(boolean sendCloseConnectionMessage) {
private void shutDown(boolean sendCloseConnectionMessage) {
shutDown(sendCloseConnectionMessage, null);
}
private void shutDown(boolean sendCloseConnectionMessage, @Nullable Runnable shutDownCompleteHandler) {
if (!stopped) {
StringBuilder result = new StringBuilder("\n\n############################################################\n" +
log.info("\n\n############################################################\n" +
"ShutDown connection:"
+ "\npeerAddress=" + peerAddress
+ "\nlocalPort/port=" + sharedSpace.getSocket().getLocalPort()
+ "/" + sharedSpace.getSocket().getPort()
+ "\nobjectId=" + getObjectId() + " / uid=" + getUid()
+ "\nisAuthenticated=" + isAuthenticated());
result.append("\n############################################################\n");
log.info(result.toString());
+ "\nisAuthenticated=" + isAuthenticated()
+ "\n############################################################\n");
log.trace("ShutDown " + this.getObjectId());
log.trace("ShutDown connection requested. Connection=" + this.toString());
stopped = true;
@ -258,7 +255,8 @@ public class Connection {
Connection that = (Connection) o;
if (portInfo != null ? !portInfo.equals(that.portInfo) : that.portInfo != null) return false;
return !(uid != null ? !uid.equals(that.uid) : that.uid != null);
if (uid != null ? !uid.equals(that.uid) : that.uid != null) return false;
return !(peerAddress != null ? !peerAddress.equals(that.peerAddress) : that.peerAddress != null);
}
@ -266,6 +264,7 @@ public class Connection {
public int hashCode() {
int result = portInfo != null ? portInfo.hashCode() : 0;
result = 31 * result + (uid != null ? uid.hashCode() : 0);
result = 31 * result + (peerAddress != null ? peerAddress.hashCode() : 0);
return result;
}
@ -285,7 +284,7 @@ public class Connection {
}
public String getObjectId() {
return super.toString().split("@")[1].toString();
return super.toString().split("@")[1];
}
public void setPeerAddress(@Nullable Address peerAddress) {
@ -398,7 +397,7 @@ public class Connection {
}
public void stop() {
this.stopped = stopped;
this.stopped = true;
}
public ConnectionListener.Reason getShutDownReason() {
@ -470,12 +469,6 @@ public class Connection {
stopped = true;
sharedSpace.shutDown(false);
} else {
Task task = new Task() {
@Override
protected Object call() throws Exception {
return null;
}
};
sharedSpace.onMessage(message);
}
} else {

View File

@ -10,12 +10,12 @@ import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.Address;
import io.nucleo.net.HiddenServiceDescriptor;
import io.nucleo.net.TorNode;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.BindException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Random;
@ -26,7 +26,7 @@ public class LocalhostNetworkNode extends NetworkNode {
private static final Logger log = LoggerFactory.getLogger(LocalhostNetworkNode.class);
private static int simulateTorDelayTorNode = 1 * 100;
private static int simulateTorDelayHiddenService = 2 * 100;
private static int simulateTorDelayHiddenService = 1 * 100;
private Address address;
public static void setSimulateTorDelayTorNode(int simulateTorDelayTorNode) {
@ -60,8 +60,6 @@ public class LocalhostNetworkNode extends NetworkNode {
createHiddenService(hiddenServiceDescriptor -> {
try {
startServer(new ServerSocket(port));
} catch (BindException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
@ -112,7 +110,7 @@ public class LocalhostNetworkNode extends NetworkNode {
UserThread.execute(() -> resultHandler.accept(torNode));
}
public void onFailure(Throwable throwable) {
public void onFailure(@NotNull Throwable throwable) {
log.error("[simulation] TorNode creation failed");
}
});
@ -140,7 +138,7 @@ public class LocalhostNetworkNode extends NetworkNode {
UserThread.execute(() -> resultHandler.accept(hiddenServiceDescriptor));
}
public void onFailure(Throwable throwable) {
public void onFailure(@NotNull Throwable throwable) {
log.error("[simulation] Hidden service creation failed");
}
});

View File

@ -90,12 +90,13 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
newConnection.setPeerAddress(peerAddress);
outBoundConnections.add(newConnection);
log.info("\n\nNetworkNode created new outbound connection:"
log.info("\n\n############################################################\n" +
"NetworkNode created new outbound connection:"
+ "\npeerAddress=" + peerAddress.port
+ "\nconnection.uid=" + newConnection.getUid()
+ "\nmessage=" + message
+ "\n\n");
+ "\n############################################################\n");
newConnection.sendMessage(message);
return newConnection;

View File

@ -10,7 +10,7 @@ import java.net.SocketException;
import java.util.HashSet;
import java.util.Set;
public class Server implements Runnable {
class Server implements Runnable {
private static final Logger log = LoggerFactory.getLogger(Server.class);
private final ServerSocket serverSocket;
@ -39,13 +39,12 @@ public class Server implements Runnable {
log.info("Accepted new client on localPort/port " + socket.getLocalPort() + "/" + socket.getPort());
Connection connection = new Connection(socket, messageListener, connectionListener);
StringBuilder result = new StringBuilder("\n\n############################################################\n" +
log.info("\n\n############################################################\n" +
"Server created new inbound connection:"
+ "\nlocalPort/port=" + serverSocket.getLocalPort()
+ "/" + socket.getPort()
+ "\nconnection.uid=" + connection.getUid());
result.append("\n############################################################\n");
log.info(result.toString());
+ "\nconnection.uid=" + connection.getUid()
+ "\n############################################################\n");
if (!stopped)
connections.add(connection);

View File

@ -10,6 +10,7 @@ import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.Address;
import io.nucleo.net.HiddenServiceDescriptor;
import io.nucleo.net.TorNode;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -100,7 +101,7 @@ public class TorNetworkNode extends NetworkNode {
shutDownTimeoutTimer = UserThread.runAfter(() -> {
log.error("A timeout occurred at shutDown");
shutDownExecutorService();
}, SHUT_DOWN_TIMEOUT, TimeUnit.DAYS.MILLISECONDS);
}, SHUT_DOWN_TIMEOUT, TimeUnit.MILLISECONDS);
if (executorService != null) {
executorService.submit(() -> super.shutDown(() -> {
@ -135,7 +136,7 @@ public class TorNetworkNode extends NetworkNode {
}
@Override
public void onFailure(Throwable throwable) {
public void onFailure(@NotNull Throwable throwable) {
throwable.printStackTrace();
log.error("Shutdown torNode failed with exception: " + throwable.getMessage());
shutDownExecutorService();
@ -211,7 +212,7 @@ public class TorNetworkNode extends NetworkNode {
resultHandler.accept(torNode);
}
public void onFailure(Throwable throwable) {
public void onFailure(@NotNull Throwable throwable) {
log.error("TorNode creation failed with exception: " + throwable.getMessage());
restartTor();
}
@ -242,7 +243,7 @@ public class TorNetworkNode extends NetworkNode {
resultHandler.accept(hiddenServiceDescriptor);
}
public void onFailure(Throwable throwable) {
public void onFailure(@NotNull Throwable throwable) {
log.error("Hidden service creation failed");
restartTor();
}

View File

@ -8,6 +8,7 @@ import io.bitsquare.common.UserThread;
import io.bitsquare.common.util.Tuple2;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.MessageListener;
import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.peers.messages.auth.*;
import org.jetbrains.annotations.NotNull;
@ -38,6 +39,8 @@ public class AuthenticationHandshake {
private SettableFuture<Connection> resultFuture;
private long startAuthTs;
private long nonce = 0;
private boolean stopped;
private MessageListener messageListener;
public AuthenticationHandshake(NetworkNode networkNode, PeerGroup peerGroup, Address myAddress) {
this.networkNode = networkNode;
@ -47,6 +50,21 @@ public class AuthenticationHandshake {
setupMessageListener();
}
private void onFault(@NotNull Throwable throwable) {
cleanup();
UserThread.execute(() -> resultFuture.setException(throwable));
}
private void onSuccess(Connection connection) {
cleanup();
UserThread.execute(() -> resultFuture.set(connection));
}
private void cleanup() {
stopped = true;
networkNode.removeMessageListener(messageListener);
}
public SettableFuture<Connection> requestAuthenticationToPeer(Address peerAddress) {
// Requesting peer
resultFuture = SettableFuture.create();
@ -62,7 +80,7 @@ public class AuthenticationHandshake {
public void onFailure(@NotNull Throwable throwable) {
log.info("Send RequestAuthenticationMessage to " + peerAddress + " failed." +
"\nException:" + throwable.getMessage());
UserThread.execute(() -> resultFuture.setException(throwable));
onFault(throwable);
}
});
@ -94,7 +112,6 @@ public class AuthenticationHandshake {
return resultFuture;
}
public SettableFuture<Connection> processAuthenticationRequest(AuthenticationRequest authenticationRequest, Connection connection) {
// Responding peer
resultFuture = SettableFuture.create();
@ -105,23 +122,25 @@ public class AuthenticationHandshake {
log.info("We shut down inbound connection from peer {} to establish a new " +
"connection with his reported address.", peerAddress);
connection.shutDown(() -> UserThread.runAfter(() -> {
// we delay a bit as listeners for connection.onDisconnect are on other threads and might lead to
// inconsistent state (removal of connection from NetworkNode.authenticatedConnections)
log.trace("processAuthenticationMessage: connection.shutDown complete. RequestAuthenticationMessage from " + peerAddress + " at " + myAddress);
if (!stopped) {
// we delay a bit as listeners for connection.onDisconnect are on other threads and might lead to
// inconsistent state (removal of connection from NetworkNode.authenticatedConnections)
log.trace("processAuthenticationMessage: connection.shutDown complete. RequestAuthenticationMessage from " + peerAddress + " at " + myAddress);
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, new AuthenticationResponse(myAddress, authenticationRequest.nonce, getAndSetNonce()));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("onSuccess sending ChallengeMessage");
}
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, new AuthenticationResponse(myAddress, authenticationRequest.nonce, getAndSetNonce()));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("onSuccess sending ChallengeMessage");
}
@Override
public void onFailure(Throwable throwable) {
log.warn("onFailure sending ChallengeMessage.");
UserThread.execute(() -> resultFuture.setException(throwable));
}
});
@Override
public void onFailure(@NotNull Throwable throwable) {
log.warn("onFailure sending ChallengeMessage.");
onFault(throwable);
}
});
}
},
100 + PeerGroup.simulateAuthTorNode,
TimeUnit.MILLISECONDS));
@ -130,15 +149,13 @@ public class AuthenticationHandshake {
}
private void setupMessageListener() {
networkNode.addMessageListener((message, connection) -> {
messageListener = (message, connection) -> {
if (message instanceof AuthenticationMessage) {
if (message instanceof AuthenticationResponse) {
// Requesting peer
AuthenticationResponse authenticationResponse = (AuthenticationResponse) message;
Address peerAddress = authenticationResponse.address;
log.trace("ChallengeMessage from " + peerAddress + " at " + myAddress);
log.trace("challengeMessage" + authenticationResponse);
// HashMap<Address, Long> tempNonceMap = new HashMap<>(nonceMap);
boolean verified = nonce != 0 && nonce == authenticationResponse.requesterNonce;
if (verified) {
connection.setPeerAddress(peerAddress);
@ -153,12 +170,12 @@ public class AuthenticationHandshake {
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("GetPeersMessage sending failed " + throwable.getMessage());
UserThread.execute(() -> resultFuture.setException(throwable));
onFault(throwable);
}
});
} else {
log.warn("verify nonce failed. challengeMessage=" + authenticationResponse + " / nonce=" + nonce);
UserThread.execute(() -> resultFuture.setException(new Exception("Verify nonce failed. challengeMessage=" + authenticationResponse + " / nonceMap=" + nonce)));
onFault(new Exception("Verify nonce failed. challengeMessage=" + authenticationResponse + " / nonceMap=" + nonce));
}
} else if (message instanceof GetPeersAuthRequest) {
// Responding peer
@ -185,7 +202,7 @@ public class AuthenticationHandshake {
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("PeersMessage sending failed " + throwable.getMessage());
UserThread.execute(() -> resultFuture.setException(throwable));
onFault(throwable);
}
});
@ -193,10 +210,10 @@ public class AuthenticationHandshake {
+ " authenticated (" + connection.getObjectId() + "). Took "
+ (System.currentTimeMillis() - startAuthTs) + " ms. \n\n");
UserThread.execute(() -> resultFuture.set(connection));
onSuccess(connection);
} else {
log.warn("verify nonce failed. getPeersMessage=" + getPeersAuthRequest + " / nonceMap=" + nonce);
UserThread.execute(() -> resultFuture.setException(new Exception("Verify nonce failed. getPeersMessage=" + getPeersAuthRequest + " / nonceMap=" + nonce)));
log.warn("verify nonce failed. getPeersMessage=" + getPeersAuthRequest + " / nonce=" + nonce);
onFault(new Exception("Verify nonce failed. getPeersMessage=" + getPeersAuthRequest + " / nonce=" + nonce));
}
} else if (message instanceof GetPeersAuthResponse) {
// Requesting peer
@ -213,12 +230,13 @@ public class AuthenticationHandshake {
+ " authenticated (" + connection.getObjectId() + "). Took "
+ (System.currentTimeMillis() - startAuthTs) + " ms. \n\n");
UserThread.execute(() -> resultFuture.set(connection));
onSuccess(connection);
}
}
});
}
};
networkNode.addMessageListener(messageListener);
}
private void authenticateToNextRandomPeer(Set<Address> remainingAddresses) {
Optional<Tuple2<Address, Set<Address>>> tupleOptional = getRandomAddressAndRemainingSet(remainingAddresses);
@ -227,7 +245,7 @@ public class AuthenticationHandshake {
requestAuthentication(tuple.second, tuple.first);
} else {
log.info("No other seed node found. That is expected for the first seed node.");
UserThread.execute(() -> resultFuture.set(null));
onSuccess(null);
}
}

View File

@ -37,14 +37,16 @@ public class PeerGroup {
}
private static int MAX_CONNECTIONS = 8;
private static int MAINTENANCE_INTERVAL = new Random().nextInt(2 * 60 * 1000) + 2 * 60 * 1000; // 2-4 min.
private static int GET_PEERS_INTERVAL = 30000;//new Random().nextInt(2 * 60 * 1000) + 2 * 60 * 1000; // 2-4 min.
private static int PING_AFTER_CONNECTION_INACTIVITY = 30 * 1000;
public static void setMaxConnections(int maxConnections) {
MAX_CONNECTIONS = maxConnections;
}
private static final int MAINTENANCE_INTERVAL = new Random().nextInt(2 * 60 * 1000) + 2 * 60 * 1000; // 2-4 min.
private static final int GET_PEERS_INTERVAL = new Random().nextInt(1 * 60 * 1000) + 1 * 60 * 1000; // 1-2 min.
private static final int PING_AFTER_CONNECTION_INACTIVITY = 30 * 1000;
private final NetworkNode networkNode;
private final Set<Address> seedNodeAddresses;
@ -62,7 +64,7 @@ public class PeerGroup {
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
public PeerGroup(final NetworkNode networkNode, Set<Address> seeds) {
public PeerGroup(NetworkNode networkNode, Set<Address> seeds) {
this.networkNode = networkNode;
this.seedNodeAddresses = seeds;
@ -182,7 +184,7 @@ public class PeerGroup {
// First we try to connect to 1 seed node. If we fail we try to connect to any reported peer.
// After connection is authenticated, we try to connect to any reported peer as long we have not
// reached our max connection size.
public void authenticateToSeedNode(Set<Address> remainingAddresses, Address peerAddress, boolean continueOnSuccess) {
private void authenticateToSeedNode(Set<Address> remainingAddresses, Address peerAddress, boolean continueOnSuccess) {
checkArgument(!authenticatedPeers.containsKey(peerAddress),
"We have that peer already authenticated. That must never happen.");
@ -324,7 +326,7 @@ public class PeerGroup {
});
}
void setAuthenticated(Connection connection, Address peerAddress) {
private void setAuthenticated(Connection connection, Address peerAddress) {
log.info("\n\n############################################################\n" +
"We are authenticated to:" +
"\nconnection=" + connection
@ -381,7 +383,7 @@ public class PeerGroup {
public void run() {
Thread.currentThread().setName("GetPeersTimer-" + new Random().nextInt(1000));
try {
UserThread.execute(() -> sendAnnounceAndGetPeersMessage());
UserThread.execute(() -> sendGetPeersRequest());
} catch (Throwable t) {
t.printStackTrace();
log.error("Executing task failed. " + t.getMessage());
@ -426,8 +428,8 @@ public class PeerGroup {
}, 5, 10));
}
private void sendAnnounceAndGetPeersMessage() {
log.trace("sendAnnounceAndGetPeersMessage");
private void sendGetPeersRequest() {
log.trace("sendGetPeersRequest");
Set<Peer> connectedPeersList = new HashSet<>(authenticatedPeers.values());
connectedPeersList.stream()
.forEach(e -> UserThread.runAfterRandomDelay(() -> {
@ -436,12 +438,12 @@ public class PeerGroup {
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("AnnounceAndGetPeersMessage sent successfully");
log.trace("sendGetPeersRequest sent successfully");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("AnnounceAndGetPeersMessage sending failed " + throwable.getMessage());
log.info("sendGetPeersRequest sending failed " + throwable.getMessage());
removePeer(e.address);
}
});
@ -481,7 +483,7 @@ public class PeerGroup {
addToReportedPeers(peerAddresses, connection);
SettableFuture<Connection> future = networkNode.sendMessage(connection,
new GetPeersResponse(getMyAddress(), new HashSet<>(getAllPeerAddresses())));
new GetPeersResponse(new HashSet<>(getAllPeerAddresses())));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
@ -528,7 +530,7 @@ public class PeerGroup {
// Getters
///////////////////////////////////////////////////////////////////////////////////////////
public Map<Address, Peer> getAuthenticatedPeers() {
private Map<Address, Peer> getAuthenticatedPeers() {
return authenticatedPeers;
}
@ -562,7 +564,7 @@ public class PeerGroup {
}
}
void purgeReportedPeers() {
private void purgeReportedPeers() {
log.trace("purgeReportedPeers");
int all = getAllPeerAddresses().size();
if (all > 1000) {
@ -586,14 +588,14 @@ public class PeerGroup {
// Peers
///////////////////////////////////////////////////////////////////////////////////////////
void removePeer(@Nullable Address peerAddress) {
private void removePeer(@Nullable Address peerAddress) {
reportedPeerAddresses.remove(peerAddress);
Peer disconnectedPeer = authenticatedPeers.remove(peerAddress);
if (disconnectedPeer != null)
UserThread.execute(() -> peerListeners.stream().forEach(e -> e.onPeerRemoved(peerAddress)));
if (peerAddress != null) {
Peer disconnectedPeer = authenticatedPeers.remove(peerAddress);
if (disconnectedPeer != null)
UserThread.execute(() -> peerListeners.stream().forEach(e -> e.onPeerRemoved(peerAddress)));
}
printAuthenticatedPeers();
printReportedPeers();
}
@ -630,9 +632,7 @@ public class PeerGroup {
public void printAuthenticatedPeers() {
StringBuilder result = new StringBuilder("\n\n############################################################\n" +
"Authenticated peers for node " + getMyAddress() + ":");
authenticatedPeers.values().stream().forEach(e -> {
result.append("\n" + e.address);
});
authenticatedPeers.values().stream().forEach(e -> result.append("\n").append(e.address));
result.append("\n############################################################\n");
log.info(result.toString());
}
@ -640,9 +640,7 @@ public class PeerGroup {
public void printReportedPeers() {
StringBuilder result = new StringBuilder("\n\n############################################################\n" +
"Reported peers for node " + getMyAddress() + ":");
reportedPeerAddresses.stream().forEach(e -> {
result.append("\n" + e);
});
reportedPeerAddresses.stream().forEach(e -> result.append("\n").append(e));
result.append("\n############################################################\n");
log.info(result.toString());
}

View File

@ -6,10 +6,11 @@ import io.bitsquare.p2p.network.Connection;
public interface PeerListener {
void onFirstAuthenticatePeer(Peer peer);
// TODO never used
void onPeerAdded(Peer peer);
// TODO never used
void onPeerRemoved(Address address);
// TODO remove
void onConnectionAuthenticated(Connection connection);
}

View File

@ -9,18 +9,15 @@ public final class GetPeersResponse implements MaintenanceMessage {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
public final Address address;
public final HashSet<Address> peerAddresses;
public GetPeersResponse(Address address, HashSet<Address> peerAddresses) {
this.address = address;
public GetPeersResponse(HashSet<Address> peerAddresses) {
this.peerAddresses = peerAddresses;
}
@Override
public String toString() {
return "GetPeersMessage{" +
"address=" + address +
", peerAddresses=" + peerAddresses +
'}';
}

View File

@ -6,6 +6,7 @@ import io.bitsquare.crypto.EncryptionService;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.P2PService;
import io.bitsquare.p2p.P2PServiceListener;
import io.bitsquare.p2p.peers.PeerGroup;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -25,7 +26,7 @@ public class SeedNode {
private boolean useLocalhost = false;
private Set<Address> seedNodes;
private P2PService p2PService;
protected boolean stopped;
private boolean stopped;
public SeedNode() {
}
@ -35,33 +36,36 @@ public class SeedNode {
// API
///////////////////////////////////////////////////////////////////////////////////////////
// args: myAddress (incl. port) useLocalhost seedNodes (separated with |)
// args: myAddress (incl. port) maxConnections useLocalhost seedNodes (separated with |)
// 2. and 3. args are optional
// eg. lmvdenjkyvx2ovga.onion:8001 false eo5ay2lyzrfvx2nr.onion:8002|si3uu56adkyqkldl.onion:8003
// or when using localhost: localhost:8001 true localhost:8002|localhost:8003
// eg. lmvdenjkyvx2ovga.onion:8001 20 false eo5ay2lyzrfvx2nr.onion:8002|si3uu56adkyqkldl.onion:8003
// or when using localhost: localhost:8001 20 true localhost:8002|localhost:8003
public void processArgs(String[] args) {
if (args.length > 0) {
String arg0 = args[0];
checkArgument(arg0.contains(":") && arg0.split(":").length == 2 && arg0.split(":")[1].length() == 4, "Wrong program argument");
mySeedNodeAddress = new Address(arg0);
if (args.length > 1) {
String arg1 = args[1];
checkArgument(arg1.equals("true") || arg1.equals("false"));
useLocalhost = ("true").equals(arg1);
int maxConnections = Integer.parseInt(arg1);
checkArgument(maxConnections < 1000, "maxConnections seems to be a bit too high...");
PeerGroup.setMaxConnections(maxConnections);
if (args.length > 2) {
String arg2 = args[2];
checkArgument(arg2.contains(":") && arg2.split(":").length > 1 && arg2.split(":")[1].length() > 3, "Wrong program argument");
List<String> list = Arrays.asList(arg2.split("|"));
checkArgument(arg2.equals("true") || arg2.equals("false"));
useLocalhost = ("true").equals(arg2);
}
if (args.length > 3) {
String arg3 = args[3];
checkArgument(arg3.contains(":") && arg3.split(":").length > 1 && arg3.split(":")[1].length() > 3, "Wrong program argument");
List<String> list = Arrays.asList(arg3.split("|"));
seedNodes = new HashSet<>();
list.forEach(e -> {
checkArgument(e.contains(":") && e.split(":").length == 2 && e.split(":")[1].length() == 4, "Wrong program argument");
seedNodes.add(new Address(e));
});
seedNodes.remove(mySeedNodeAddress);
} else if (args.length > 3) {
} else if (args.length > 4) {
log.error("Too many program arguments." +
"\nProgram arguments: myAddress useLocalhost seedNodes");
}

View File

@ -8,14 +8,14 @@ import java.util.Set;
public class SeedNodesRepository {
protected Set<Address> torSeedNodeAddresses = Sets.newHashSet(
private Set<Address> torSeedNodeAddresses = Sets.newHashSet(
new Address("lmvdenjkyvx2ovga.onion:8001"),
new Address("eo5ay2lyzrfvx2nr.onion:8002"),
new Address("si3uu56adkyqkldl.onion:8003")
);
protected Set<Address> localhostSeedNodeAddresses = Sets.newHashSet(
private Set<Address> localhostSeedNodeAddresses = Sets.newHashSet(
new Address("localhost:8001"),
new Address("localhost:8002"),
new Address("localhost:8003")

View File

@ -110,8 +110,8 @@ public class ProtectedExpirableDataStorage {
}
}
public void setAuthenticated(boolean authenticated) {
this.authenticated = authenticated;
public void setAuthenticated() {
this.authenticated = true;
}
public boolean add(ProtectedData protectedData, @Nullable Address sender) {
@ -233,7 +233,7 @@ public class ProtectedExpirableDataStorage {
hashMapChangedListeners.add(hashMapChangedListener);
}
public void addMessageListener(MessageListener messageListener) {
private void addMessageListener(MessageListener messageListener) {
peerGroup.addMessageListener(messageListener);
}

View File

@ -9,7 +9,7 @@ public final class ExpirableMailboxPayload implements ExpirablePayload {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
public static final long TTL = 10 * 24 * 60 * 60 * 1000; // 10 days
private static final long TTL = 10 * 24 * 60 * 60 * 1000; // 10 days
public final SealedAndSignedMessage sealedAndSignedMessage;
public final PublicKey senderStoragePublicKey;

View File

@ -74,7 +74,7 @@ public class EncryptionServiceTests {
public void testDecryptAndVerifyMessage() throws CryptoException {
EncryptionService encryptionService = new EncryptionService(keyRing);
TestMessage data = new TestMessage("test");
SealedAndSignedMessage encrypted = new SealedAndSignedMessage(encryptionService.encryptAndSign(pubKeyRing, data), null);
SealedAndSignedMessage encrypted = new SealedAndSignedMessage(encryptionService.encryptAndSign(pubKeyRing, data));
DecryptedMsgWithPubKey decrypted = encryptionService.decryptAndVerify(encrypted.sealedAndSigned);
assertEquals(data.data, ((TestMessage) decrypted.message).data);
}

View File

@ -214,7 +214,7 @@ public class ProtectedDataStorageTest {
KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException, NoSuchProviderException {
// sender
MockMessage mockMessage = new MockMessage("MockMessage");
SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage(encryptionService1.encryptAndSign(keyRing1.getPubKeyRing(), mockMessage), null);
SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage(encryptionService1.encryptAndSign(keyRing1.getPubKeyRing(), mockMessage));
ExpirableMailboxPayload expirableMailboxPayload = new ExpirableMailboxPayload(sealedAndSignedMessage,
keyRing1.getSignatureKeyPair().getPublic(),
keyRing2.getSignatureKeyPair().getPublic());

View File

@ -63,7 +63,7 @@ public class SeedNodeMain {
Timer timeout = UserThread.runAfter(() -> {
log.error("Timeout occurred at shutDown request");
System.exit(1);
}, 10);
}, 5);
if (seedNode != null) {
seedNode.shutDown(() -> {