Cleanup listeners

This commit is contained in:
Manfred Karrer 2015-11-09 16:16:43 +01:00
parent 0f87eb99cc
commit b81e263c24
13 changed files with 467 additions and 439 deletions

View File

@ -99,8 +99,8 @@ public class PubKeyRing implements Serializable {
@Override
public String toString() {
return "PubKeyRing{" +
"\n\nsignaturePubKey=\n" + Util.pubKeyToString(getSignaturePubKey()) +
"\n\nencryptionPubKey=\n" + Util.pubKeyToString(getEncryptionPubKey()) +
"\n\nsignaturePubKey.hashCode()=\n" + signaturePubKey.hashCode() +
"\n\nencryptionPubKey.hashCode()=\n" + encryptionPubKey.hashCode() +
'}';
}

View File

@ -818,6 +818,7 @@ public class TradeWalletService {
// We need to recreate the transaction otherwise we get a null pointer...
Transaction result = new Transaction(params, transaction.bitcoinSerialize());
result.getConfidence(Context.get()).setSource(TransactionConfidence.Source.SELF);
log.trace("transaction " + result.toString());
if (wallet != null)
@ -835,6 +836,7 @@ public class TradeWalletService {
// We need to recreate the tx otherwise we get a null pointer...
Transaction transaction = new Transaction(params, serializedTransaction);
transaction.getConfidence(Context.get()).setSource(TransactionConfidence.Source.NETWORK);
log.trace("transaction " + transaction.toString());
if (wallet != null)
@ -941,9 +943,9 @@ public class TradeWalletService {
transaction.addInput(p2SHMultiSigOutput);
transaction.addOutput(buyerPayoutAmount, new Address(params, buyerAddressString));
transaction.addOutput(sellerPayoutAmount, new Address(params, sellerAddressString));
transaction.setLockTime(lockTime);
// When using lockTime we need to set sequenceNumber to 0
transaction.getInputs().stream().forEach(i -> i.setSequenceNumber(0));
transaction.setLockTime(lockTime);
return transaction;
}

View File

@ -44,10 +44,10 @@ public class CreateAndSignDepositTxAsBuyer extends TradeTask {
Coin buyerInputAmount = FeePolicy.SECURITY_DEPOSIT.add(FeePolicy.TX_FEE);
Coin msOutputAmount = buyerInputAmount.add(FeePolicy.SECURITY_DEPOSIT).add(trade.getTradeAmount());
log.debug("getContractAsJson");
log.debug("----------");
log.debug(trade.getContractAsJson());
log.debug("----------");
log.info("\n\n------------------------------------------------------------\n"
+ "Contract as json\n"
+ trade.getContractAsJson()
+ "\n------------------------------------------------------------\n");
byte[] contractHash = Hash.getHash(trade.getContractAsJson());
trade.setContractHash(contractHash);

View File

@ -49,7 +49,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
/**
* Represents our node in the P2P network
*/
public class P2PService implements SetupListener {
public class P2PService implements SetupListener, MessageListener, ConnectionListener, PeerListener {
private static final Logger log = LoggerFactory.getLogger(P2PService.class);
private final SeedNodesRepository seedNodesRepository;
@ -126,94 +126,9 @@ public class P2PService implements SetupListener {
dataStorage = new ProtectedExpirableDataStorage(peerGroup, storageDir);
networkNode.addConnectionListener(new ConnectionListener() {
@Override
public void onConnection(Connection connection) {
Log.traceCall();
}
@Override
public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) {
Log.traceCall();
checkArgument(peerAddress.equals(connection.getPeerAddress()),
"peerAddress must match connection.getPeerAddress()");
authenticatedPeerAddresses.add(peerAddress);
authenticated.set(true);
dataStorage.setAuthenticated();
p2pServiceListeners.stream().forEach(e -> e.onFirstPeerAuthenticated());
}
@Override
public void onDisconnect(Reason reason, Connection connection) {
Log.traceCall();
if (connection.isAuthenticated())
authenticatedPeerAddresses.remove(connection.getPeerAddress());
}
@Override
public void onError(Throwable throwable) {
Log.traceCall();
log.error("onError self/ConnectionException " + networkNode.getAddress() + "/" + throwable);
}
});
networkNode.addMessageListener((message, connection) -> {
Log.traceCall();
if (message instanceof GetDataRequest) {
log.trace("Received GetDataSetMessage: " + message);
networkNode.sendMessage(connection, new GetDataResponse(getDataSet()));
} else if (message instanceof GetDataResponse) {
GetDataResponse getDataResponse = (GetDataResponse) message;
HashSet<ProtectedData> set = getDataResponse.set;
if (!set.isEmpty()) {
// we keep that connection open as the bootstrapping peer will use that for the authentication
// as we are not authenticated yet the data adding will not be broadcasted
set.stream().forEach(e -> dataStorage.add(e, connection.getPeerAddress()));
} else {
log.trace("Received DataSetMessage: Empty data set");
}
setRequestingDataCompleted();
} else if (message instanceof SealedAndSignedMessage) {
if (encryptionService != null) {
try {
SealedAndSignedMessage sealedAndSignedMessage = (SealedAndSignedMessage) message;
DecryptedMsgWithPubKey decryptedMsgWithPubKey = encryptionService.decryptAndVerify(
sealedAndSignedMessage.sealedAndSigned);
decryptedMailListeners.stream().forEach(
e -> e.onMailMessage(decryptedMsgWithPubKey, connection.getPeerAddress()));
} catch (CryptoException e) {
log.info("Decryption of SealedAndSignedMessage failed. " +
"That is expected if the message is not intended for us.");
}
}
}
});
peerGroup.addPeerListener(new PeerListener() {
@Override
public void onFirstAuthenticatePeer(Peer peer) {
Log.traceCall();
log.trace("onFirstAuthenticatePeer " + peer);
sendGetAllDataMessageAfterAuthentication(peer);
}
@Override
public void onPeerAdded(Peer peer) {
Log.traceCall();
}
@Override
public void onPeerRemoved(Address address) {
Log.traceCall();
}
@Override
public void onConnectionAuthenticated(Connection connection) {
Log.traceCall();
}
});
networkNode.addConnectionListener(this);
networkNode.addMessageListener(this);
peerGroup.addPeerListener(this);
dataStorage.addHashMapChangedListener(new HashMapChangedListener() {
@Override
@ -244,6 +159,104 @@ public class P2PService implements SetupListener {
});
}
///////////////////////////////////////////////////////////////////////////////////////////
// MessageListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onMessage(Message message, Connection connection) {
Log.traceCall();
if (message instanceof GetDataRequest) {
log.info("Received GetDataSetMessage: " + message);
networkNode.sendMessage(connection, new GetDataResponse(getDataSet()));
} else if (message instanceof GetDataResponse) {
GetDataResponse getDataResponse = (GetDataResponse) message;
log.info("Received GetDataResponse: " + message);
HashSet<ProtectedData> set = getDataResponse.set;
if (!set.isEmpty()) {
// we keep that connection open as the bootstrapping peer will use that for the authentication
// as we are not authenticated yet the data adding will not be broadcasted
set.stream().forEach(e -> dataStorage.add(e, connection.getPeerAddress()));
} else {
log.trace("Received DataSetMessage: Empty data set");
}
setRequestingDataCompleted();
} else if (message instanceof SealedAndSignedMessage) {
if (encryptionService != null) {
try {
SealedAndSignedMessage sealedAndSignedMessage = (SealedAndSignedMessage) message;
DecryptedMsgWithPubKey decryptedMsgWithPubKey = encryptionService.decryptAndVerify(
sealedAndSignedMessage.sealedAndSigned);
log.info("Received SealedAndSignedMessage and decrypted it: " + decryptedMsgWithPubKey);
decryptedMailListeners.stream().forEach(
e -> e.onMailMessage(decryptedMsgWithPubKey, connection.getPeerAddress()));
} catch (CryptoException e) {
log.info("Decryption of SealedAndSignedMessage failed. " +
"That is expected if the message is not intended for us.");
}
}
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// ConnectionListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onConnection(Connection connection) {
}
@Override
public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) {
Log.traceCall();
checkArgument(peerAddress.equals(connection.getPeerAddress()),
"peerAddress must match connection.getPeerAddress()");
authenticatedPeerAddresses.add(peerAddress);
authenticated.set(true);
p2pServiceListeners.stream().forEach(e -> e.onFirstPeerAuthenticated());
}
@Override
public void onDisconnect(Reason reason, Connection connection) {
Log.traceCall();
if (connection.isAuthenticated())
authenticatedPeerAddresses.remove(connection.getPeerAddress());
}
@Override
public void onError(Throwable throwable) {
}
///////////////////////////////////////////////////////////////////////////////////////////
// PeerListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onFirstAuthenticatePeer(Peer peer) {
Log.traceCall();
log.trace("onFirstAuthenticatePeer " + peer);
sendGetAllDataMessageAfterAuthentication(peer);
}
@Override
public void onPeerAdded(Peer peer) {
Log.traceCall();
}
@Override
public void onPeerRemoved(Address address) {
Log.traceCall();
}
@Override
public void onConnectionAuthenticated(Connection connection) {
Log.traceCall();
}
///////////////////////////////////////////////////////////////////////////////////////////
// SetupListener implementation
@ -579,16 +592,6 @@ public class P2PService implements SetupListener {
// Listeners
///////////////////////////////////////////////////////////////////////////////////////////
public void addMessageListener(MessageListener messageListener) {
Log.traceCall();
networkNode.addMessageListener(messageListener);
}
public void removeMessageListener(MessageListener messageListener) {
Log.traceCall();
networkNode.removeMessageListener(messageListener);
}
public void addDecryptedMailListener(DecryptedMailListener listener) {
Log.traceCall();
decryptedMailListeners.add(listener);

View File

@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit;
* All handlers are called on User thread.
* Shared data between InputHandler thread and that
*/
public class Connection {
public class Connection implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(Connection.class);
private static final int MAX_MSG_SIZE = 5 * 1024 * 1024; // 5 MB of compressed data
private static final int MAX_ILLEGAL_REQUESTS = 5;
@ -40,11 +40,16 @@ public class Connection {
private static final int SOCKET_TIMEOUT = 30 * 60 * 1000; // 30 min.
private InputHandler inputHandler;
private volatile boolean isAuthenticated;
private String connectionId;
public static int getMaxMsgSize() {
return MAX_MSG_SIZE;
}
private final Socket socket;
private final MessageListener messageListener;
private final ConnectionListener connectionListener;
private final String portInfo;
private final String uid;
private final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
@ -72,19 +77,23 @@ public class Connection {
///////////////////////////////////////////////////////////////////////////////////////////
public Connection(Socket socket, MessageListener messageListener, ConnectionListener connectionListener) {
this.socket = socket;
this.messageListener = messageListener;
this.connectionListener = connectionListener;
Log.traceCall();
uid = UUID.randomUUID().toString();
if (socket.getLocalPort() == 0)
portInfo = "port=" + socket.getPort();
else
portInfo = "localPort=" + socket.getLocalPort() + "/port=" + socket.getPort();
init(socket, messageListener, connectionListener);
init();
}
private void init(Socket socket, MessageListener messageListener, ConnectionListener connectionListener) {
private void init() {
Log.traceCall();
sharedSpace = new SharedSpace(this, socket, messageListener, connectionListener, useCompression);
sharedSpace = new SharedSpace(this, socket);
try {
socket.setSoTimeout(SOCKET_TIMEOUT);
// Need to access first the ObjectOutputStream otherwise the ObjectInputStream would block
@ -97,7 +106,7 @@ public class Connection {
// We create a thread for handling inputStream data
inputHandler = new InputHandler(sharedSpace, objectInputStream, portInfo);
inputHandler = new InputHandler(sharedSpace, objectInputStream, portInfo, this, useCompression);
singleThreadExecutor.submit(inputHandler);
} catch (IOException e) {
sharedSpace.handleConnectionException(e);
@ -117,11 +126,10 @@ public class Connection {
// Called form UserThread
public void setAuthenticated(Address peerAddress, Connection connection) {
Log.traceCall();
synchronized (peerAddress) {
this.peerAddress = peerAddress;
}
this.peerAddress = peerAddress;
isAuthenticated = true;
sharedSpace.getConnectionListener().onPeerAddressAuthenticated(peerAddress, connection);
if (!stopped)
connectionListener.onPeerAddressAuthenticated(peerAddress, connection);
}
// Called form various threads
@ -178,6 +186,18 @@ public class Connection {
this.peerAddress = peerAddress;
}
///////////////////////////////////////////////////////////////////////////////////////////
// MessageListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
// Only get non - CloseConnectionMessage messages
@Override
public void onMessage(Message message, Connection connection) {
// connection is null as we get called from InputHandler, which does not hold a reference to Connection
UserThread.execute(() -> messageListener.onMessage(message, this));
}
///////////////////////////////////////////////////////////////////////////////////////////
// Getters
///////////////////////////////////////////////////////////////////////////////////////////
@ -231,14 +251,14 @@ public class Connection {
private void shutDown(boolean sendCloseConnectionMessage, @Nullable Runnable shutDownCompleteHandler) {
Log.traceCall(this.toString());
if (!stopped) {
log.info("\n\n############################################################\n" +
log.info("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" +
"ShutDown connection:"
+ "\npeerAddress=" + peerAddress
+ "\nlocalPort/port=" + sharedSpace.getSocket().getLocalPort()
+ "/" + sharedSpace.getSocket().getPort()
+ "\nobjectId=" + objectId + " / uid=" + uid
+ "\nisAuthenticated=" + isAuthenticated
+ "\n############################################################\n");
+ "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n");
log.trace("ShutDown connection requested. Connection=" + this.toString());
@ -275,7 +295,7 @@ public class Connection {
shutDownReason = ConnectionListener.Reason.SHUT_DOWN;
final ConnectionListener.Reason finalShutDownReason = shutDownReason;
// keep UserThread.execute as its not clear if that is called from a non-UserThread
UserThread.execute(() -> sharedSpace.getConnectionListener().onDisconnect(finalShutDownReason, this));
UserThread.execute(() -> connectionListener.onDisconnect(finalShutDownReason, this));
try {
sharedSpace.getSocket().close();
@ -289,6 +309,7 @@ public class Connection {
log.debug("Connection shutdown complete " + this.toString());
// keep UserThread.execute as its not clear if that is called from a non-UserThread
if (shutDownCompleteHandler != null)
UserThread.execute(shutDownCompleteHandler);
}
@ -330,6 +351,11 @@ public class Connection {
'}';
}
public String getConnectionId() {
return connectionId;
}
///////////////////////////////////////////////////////////////////////////////////////////
// SharedSpace
///////////////////////////////////////////////////////////////////////////////////////////
@ -343,9 +369,6 @@ public class Connection {
private final Connection connection;
private final Socket socket;
private final MessageListener messageListener;
private final ConnectionListener connectionListener;
private final boolean useCompression;
private final ConcurrentHashMap<IllegalRequest, Integer> illegalRequests = new ConcurrentHashMap<>();
// mutable
@ -353,14 +376,10 @@ public class Connection {
private volatile boolean stopped;
private ConnectionListener.Reason shutDownReason;
public SharedSpace(Connection connection, Socket socket, MessageListener messageListener,
ConnectionListener connectionListener, boolean useCompression) {
public SharedSpace(Connection connection, Socket socket) {
Log.traceCall();
this.connection = connection;
this.socket = socket;
this.messageListener = messageListener;
this.connectionListener = connectionListener;
this.useCompression = useCompression;
}
public synchronized void updateLastActivityDate() {
@ -387,7 +406,7 @@ public class Connection {
public void handleConnectionException(Exception e) {
Log.traceCall(e.toString());
log.warn("Exception might be expected: " + e.toString());
log.debug("Exception might be expected: " + e.toString());
if (e instanceof SocketException) {
if (socket.isClosed())
shutDownReason = ConnectionListener.Reason.SOCKET_CLOSED;
@ -409,34 +428,18 @@ public class Connection {
}
}
public void onMessage(Message message) {
//Log.traceCall();
UserThread.execute(() -> messageListener.onMessage(message, connection));
}
public boolean useCompression() {
//Log.traceCall();
return useCompression;
}
public void shutDown(boolean sendCloseConnectionMessage) {
Log.traceCall();
connection.shutDown(sendCloseConnectionMessage);
}
public synchronized ConnectionListener getConnectionListener() {
// Log.traceCall();
return connectionListener;
}
public synchronized Socket getSocket() {
//Log.traceCall();
return socket;
}
public String getConnectionId() {
//Log.traceCall();
return connection.objectId;
return connection.getConnectionId();
}
public void stop() {
@ -445,7 +448,6 @@ public class Connection {
}
public synchronized ConnectionListener.Reason getShutDownReason() {
//Log.traceCall();
return shutDownReason;
}
@ -453,12 +455,10 @@ public class Connection {
public String toString() {
return "SharedSpace{" +
", socket=" + socket +
", useCompression=" + useCompression +
", illegalRequests=" + illegalRequests +
", lastActivityDate=" + lastActivityDate +
'}';
}
}
@ -473,13 +473,18 @@ public class Connection {
private final SharedSpace sharedSpace;
private final ObjectInputStream objectInputStream;
private final String portInfo;
private final MessageListener messageListener;
private final boolean useCompression;
private volatile boolean stopped;
public InputHandler(SharedSpace sharedSpace, ObjectInputStream objectInputStream, String portInfo) {
public InputHandler(SharedSpace sharedSpace, ObjectInputStream objectInputStream, String portInfo, MessageListener messageListener, boolean useCompression) {
this.useCompression = useCompression;
Log.traceCall();
this.sharedSpace = sharedSpace;
this.objectInputStream = objectInputStream;
this.portInfo = portInfo;
this.messageListener = messageListener;
}
public void stop() {
@ -502,7 +507,7 @@ public class Connection {
int size = ByteArrayUtils.objectToByteArray(rawInputObject).length;
if (size <= getMaxMsgSize()) {
Serializable serializable = null;
if (sharedSpace.useCompression()) {
if (useCompression) {
if (rawInputObject instanceof byte[]) {
byte[] compressedObjectAsBytes = (byte[]) rawInputObject;
size = compressedObjectAsBytes.length;
@ -529,7 +534,7 @@ public class Connection {
stopped = true;
sharedSpace.shutDown(false);
} else if (!stopped) {
sharedSpace.onMessage(message);
messageListener.onMessage(message, null);
}
} else {
sharedSpace.reportIllegalRequest(IllegalRequest.InvalidDataType);

View File

@ -34,8 +34,9 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
protected final CopyOnWriteArraySet<SetupListener> setupListeners = new CopyOnWriteArraySet<>();
protected ListeningExecutorService executorService;
private Server server;
private volatile boolean shutDownInProgress;
private ConnectionListener startServerConnectionListener;
private volatile boolean shutDownInProgress;
// accessed from different threads
private final CopyOnWriteArraySet<Connection> outBoundConnections = new CopyOnWriteArraySet<>();
@ -97,12 +98,12 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
newConnection.setPeerAddress(peerAddress);
outBoundConnections.add(newConnection);
log.info("\n\n############################################################\n" +
log.info("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" +
"NetworkNode created new outbound connection:"
+ "\npeerAddress=" + peerAddress
+ "\nconnection.uid=" + newConnection.getUid()
+ "\nmessage=" + message
+ "\n############################################################\n");
+ "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n");
newConnection.sendMessage(message);
return newConnection; // can take a while when using tor
@ -196,19 +197,9 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
///////////////////////////////////////////////////////////////////////////////////////////
// ConnectionListener
// ConnectionListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
public void addConnectionListener(ConnectionListener connectionListener) {
Log.traceCall();
connectionListeners.add(connectionListener);
}
public void removeConnectionListener(ConnectionListener connectionListener) {
Log.traceCall();
connectionListeners.remove(connectionListener);
}
@Override
public void onConnection(Connection connection) {
Log.traceCall();
@ -241,9 +232,30 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
///////////////////////////////////////////////////////////////////////////////////////////
// MessageListener
// MessageListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onMessage(Message message, Connection connection) {
Log.traceCall();
messageListeners.stream().forEach(e -> e.onMessage(message, connection));
}
///////////////////////////////////////////////////////////////////////////////////////////
// Listeners
///////////////////////////////////////////////////////////////////////////////////////////
public void addConnectionListener(ConnectionListener connectionListener) {
Log.traceCall();
connectionListeners.add(connectionListener);
}
public void removeConnectionListener(ConnectionListener connectionListener) {
Log.traceCall();
connectionListeners.remove(connectionListener);
}
public void addMessageListener(MessageListener messageListener) {
Log.traceCall();
messageListeners.add(messageListener);
@ -254,12 +266,6 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
messageListeners.remove(messageListener);
}
@Override
public void onMessage(Message message, Connection connection) {
Log.traceCall();
messageListeners.stream().forEach(e -> e.onMessage(message, connection));
}
///////////////////////////////////////////////////////////////////////////////////////////
// Protected
@ -272,39 +278,40 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
protected void startServer(ServerSocket serverSocket) {
Log.traceCall();
startServerConnectionListener = new ConnectionListener() {
@Override
public void onConnection(Connection connection) {
Log.traceCall();
// we still have not authenticated so put it to the temp list
inBoundConnections.add(connection);
NetworkNode.this.onConnection(connection);
}
@Override
public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) {
Log.traceCall();
NetworkNode.this.onPeerAddressAuthenticated(peerAddress, connection);
}
@Override
public void onDisconnect(Reason reason, Connection connection) {
Log.traceCall();
Address peerAddress = connection.getPeerAddress();
log.trace("onDisconnect at incoming connection to peerAddress (or connection) "
+ ((peerAddress == null) ? connection : peerAddress));
inBoundConnections.remove(connection);
NetworkNode.this.onDisconnect(reason, connection);
}
@Override
public void onError(Throwable throwable) {
Log.traceCall();
NetworkNode.this.onError(throwable);
}
};
server = new Server(serverSocket,
(message, connection) -> NetworkNode.this.onMessage(message, connection),
new ConnectionListener() {
@Override
public void onConnection(Connection connection) {
Log.traceCall();
// we still have not authenticated so put it to the temp list
inBoundConnections.add(connection);
NetworkNode.this.onConnection(connection);
}
@Override
public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) {
Log.traceCall();
NetworkNode.this.onPeerAddressAuthenticated(peerAddress, connection);
}
@Override
public void onDisconnect(Reason reason, Connection connection) {
Log.traceCall();
Address peerAddress = connection.getPeerAddress();
log.trace("onDisconnect at incoming connection to peerAddress (or connection) "
+ ((peerAddress == null) ? connection : peerAddress));
inBoundConnections.remove(connection);
NetworkNode.this.onDisconnect(reason, connection);
}
@Override
public void onError(Throwable throwable) {
Log.traceCall();
NetworkNode.this.onError(throwable);
}
});
NetworkNode.this,
startServerConnectionListener);
executorService.submit(server);
}

View File

@ -45,12 +45,12 @@ class Server implements Runnable {
log.info("Accepted new client on localPort/port " + socket.getLocalPort() + "/" + socket.getPort());
Connection connection = new Connection(socket, messageListener, connectionListener);
log.info("\n\n############################################################\n" +
log.info("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" +
"Server created new inbound connection:"
+ "\nlocalPort/port=" + serverSocket.getLocalPort()
+ "/" + socket.getPort()
+ "\nconnection.uid=" + connection.getUid()
+ "\n############################################################\n");
+ "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n");
if (!stopped)
connections.add(connection);

View File

@ -1,6 +1,5 @@
package io.bitsquare.p2p.network;
public interface SetupListener {
void onTorNodeReady();

View File

@ -7,6 +7,7 @@ import com.google.common.util.concurrent.SettableFuture;
import io.bitsquare.app.Log;
import io.bitsquare.common.util.Tuple2;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.MessageListener;
import io.bitsquare.p2p.network.NetworkNode;
@ -28,7 +29,7 @@ import java.util.*;
// node1: authentication to node2 done if nonce ok
// node1 -> node2 PeersMessage
public class AuthenticationHandshake {
public class AuthenticationHandshake implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(AuthenticationHandshake.class);
private final NetworkNode networkNode;
@ -39,7 +40,11 @@ public class AuthenticationHandshake {
private long startAuthTs;
private long nonce = 0;
private boolean stopped;
private MessageListener messageListener;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
public AuthenticationHandshake(NetworkNode networkNode, PeerGroup peerGroup, Address myAddress) {
Log.traceCall();
@ -47,26 +52,105 @@ public class AuthenticationHandshake {
this.peerGroup = peerGroup;
this.myAddress = myAddress;
setupMessageListener();
networkNode.addMessageListener(this);
}
private void onFault(@NotNull Throwable throwable) {
///////////////////////////////////////////////////////////////////////////////////////////
// MessageListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onMessage(Message message, Connection connection) {
Log.traceCall();
cleanup();
resultFuture.setException(throwable);
if (message instanceof AuthenticationMessage) {
if (message instanceof AuthenticationResponse) {
// Requesting peer
AuthenticationResponse authenticationResponse = (AuthenticationResponse) message;
Address peerAddress = authenticationResponse.address;
log.trace("ChallengeMessage from " + peerAddress + " at " + myAddress);
boolean verified = nonce != 0 && nonce == authenticationResponse.requesterNonce;
if (verified) {
connection.setPeerAddress(peerAddress);
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress,
new GetPeersAuthRequest(myAddress, authenticationResponse.challengerNonce, new HashSet<>(peerGroup.getAllPeerAddresses())));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("GetPeersMessage sent successfully from " + myAddress + " to " + peerAddress);
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("GetPeersMessage sending failed " + throwable.getMessage());
onFault(throwable);
}
});
} else {
log.warn("verify nonce failed. challengeMessage=" + authenticationResponse + " / nonce=" + nonce);
onFault(new Exception("Verify nonce failed. challengeMessage=" + authenticationResponse + " / nonceMap=" + nonce));
}
} else if (message instanceof GetPeersAuthRequest) {
// Responding peer
GetPeersAuthRequest getPeersAuthRequest = (GetPeersAuthRequest) message;
Address peerAddress = getPeersAuthRequest.address;
log.trace("GetPeersMessage from " + peerAddress + " at " + myAddress);
boolean verified = nonce != 0 && nonce == getPeersAuthRequest.challengerNonce;
if (verified) {
// we add the reported peers to our own set
HashSet<Address> peerAddresses = getPeersAuthRequest.peerAddresses;
log.trace("Received peers: " + peerAddresses);
peerGroup.addToReportedPeers(peerAddresses, connection);
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress,
new GetPeersAuthResponse(myAddress, new HashSet<>(peerGroup.getAllPeerAddresses())));
log.trace("sent PeersMessage to " + peerAddress + " from " + myAddress
+ " with allPeers=" + peerGroup.getAllPeerAddresses());
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("PeersMessage sent successfully from " + myAddress + " to " + peerAddress);
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("PeersMessage sending failed " + throwable.getMessage());
onFault(throwable);
}
});
log.info("AuthenticationComplete: Peer with address " + peerAddress
+ " authenticated (" + connection.objectId + "). Took "
+ (System.currentTimeMillis() - startAuthTs) + " ms.");
onSuccess(connection);
} else {
log.warn("verify nonce failed. getPeersMessage=" + getPeersAuthRequest + " / nonce=" + nonce);
onFault(new Exception("Verify nonce failed. getPeersMessage=" + getPeersAuthRequest + " / nonce=" + nonce));
}
} else if (message instanceof GetPeersAuthResponse) {
// Requesting peer
GetPeersAuthResponse getPeersAuthResponse = (GetPeersAuthResponse) message;
Address peerAddress = getPeersAuthResponse.address;
log.trace("PeersMessage from " + peerAddress + " at " + myAddress);
HashSet<Address> peerAddresses = getPeersAuthResponse.peerAddresses;
log.trace("Received peers: " + peerAddresses);
peerGroup.addToReportedPeers(peerAddresses, connection);
// we wait until the handshake is completed before setting the authenticate flag
// authentication at both sides of the connection
log.info("\n\nAuthenticationComplete\nPeer with address " + peerAddress
+ " authenticated (" + connection.objectId + "). Took "
+ (System.currentTimeMillis() - startAuthTs) + " ms. \n\n");
onSuccess(connection);
}
}
}
private void onSuccess(Connection connection) {
Log.traceCall();
cleanup();
resultFuture.set(connection);
}
private void cleanup() {
Log.traceCall();
stopped = true;
networkNode.removeMessageListener(messageListener);
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
public SettableFuture<Connection> requestAuthenticationToPeer(Address peerAddress) {
Log.traceCall();
@ -178,97 +262,10 @@ public class AuthenticationHandshake {
return resultFuture;
}
private void setupMessageListener() {
Log.traceCall();
messageListener = (message, connection) -> {
Log.traceCall();
if (message instanceof AuthenticationMessage) {
if (message instanceof AuthenticationResponse) {
// Requesting peer
AuthenticationResponse authenticationResponse = (AuthenticationResponse) message;
Address peerAddress = authenticationResponse.address;
log.trace("ChallengeMessage from " + peerAddress + " at " + myAddress);
boolean verified = nonce != 0 && nonce == authenticationResponse.requesterNonce;
if (verified) {
connection.setPeerAddress(peerAddress);
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress,
new GetPeersAuthRequest(myAddress, authenticationResponse.challengerNonce, new HashSet<>(peerGroup.getAllPeerAddresses())));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("GetPeersMessage sent successfully from " + myAddress + " to " + peerAddress);
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("GetPeersMessage sending failed " + throwable.getMessage());
onFault(throwable);
}
});
} else {
log.warn("verify nonce failed. challengeMessage=" + authenticationResponse + " / nonce=" + nonce);
onFault(new Exception("Verify nonce failed. challengeMessage=" + authenticationResponse + " / nonceMap=" + nonce));
}
} else if (message instanceof GetPeersAuthRequest) {
// Responding peer
GetPeersAuthRequest getPeersAuthRequest = (GetPeersAuthRequest) message;
Address peerAddress = getPeersAuthRequest.address;
log.trace("GetPeersMessage from " + peerAddress + " at " + myAddress);
boolean verified = nonce != 0 && nonce == getPeersAuthRequest.challengerNonce;
if (verified) {
// we add the reported peers to our own set
HashSet<Address> peerAddresses = getPeersAuthRequest.peerAddresses;
log.trace("Received peers: " + peerAddresses);
peerGroup.addToReportedPeers(peerAddresses, connection);
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress,
new GetPeersAuthResponse(myAddress, new HashSet<>(peerGroup.getAllPeerAddresses())));
log.trace("sent PeersMessage to " + peerAddress + " from " + myAddress
+ " with allPeers=" + peerGroup.getAllPeerAddresses());
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("PeersMessage sent successfully from " + myAddress + " to " + peerAddress);
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("PeersMessage sending failed " + throwable.getMessage());
onFault(throwable);
}
});
log.info("\n\nAuthenticationComplete: Peer with address " + peerAddress
+ " authenticated (" + connection.objectId + "). Took "
+ (System.currentTimeMillis() - startAuthTs) + " ms. \n\n");
onSuccess(connection);
} else {
log.warn("verify nonce failed. getPeersMessage=" + getPeersAuthRequest + " / nonce=" + nonce);
onFault(new Exception("Verify nonce failed. getPeersMessage=" + getPeersAuthRequest + " / nonce=" + nonce));
}
} else if (message instanceof GetPeersAuthResponse) {
// Requesting peer
GetPeersAuthResponse getPeersAuthResponse = (GetPeersAuthResponse) message;
Address peerAddress = getPeersAuthResponse.address;
log.trace("PeersMessage from " + peerAddress + " at " + myAddress);
HashSet<Address> peerAddresses = getPeersAuthResponse.peerAddresses;
log.trace("Received peers: " + peerAddresses);
peerGroup.addToReportedPeers(peerAddresses, connection);
// we wait until the handshake is completed before setting the authenticate flag
// authentication at both sides of the connection
log.info("\n\nAuthenticationComplete\nPeer with address " + peerAddress
+ " authenticated (" + connection.objectId + "). Took "
+ (System.currentTimeMillis() - startAuthTs) + " ms. \n\n");
onSuccess(connection);
}
}
};
networkNode.addMessageListener(messageListener);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private void authenticateToNextRandomPeer(Set<Address> remainingAddresses) {
Log.traceCall();
@ -303,4 +300,21 @@ public class AuthenticationHandshake {
return nonce;
}
private void onFault(@NotNull Throwable throwable) {
Log.traceCall();
cleanup();
resultFuture.setException(throwable);
}
private void onSuccess(Connection connection) {
Log.traceCall();
cleanup();
resultFuture.set(connection);
}
private void cleanup() {
Log.traceCall();
stopped = true;
networkNode.removeMessageListener(this);
}
}

View File

@ -3,6 +3,7 @@ package io.bitsquare.p2p.peers;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.network.Connection;
//TODO used only in unittests yet
public abstract class AuthenticationListener implements PeerListener {
public void onFirstAuthenticatePeer(Peer peer) {
}

View File

@ -8,6 +8,7 @@ import io.bitsquare.common.UserThread;
import io.bitsquare.common.util.Tuple2;
import io.bitsquare.common.util.Utilities;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.ConnectionListener;
import io.bitsquare.p2p.network.MessageListener;
@ -27,7 +28,7 @@ import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
// Run in UserThread
public class PeerGroup {
public class PeerGroup implements MessageListener, ConnectionListener {
private static final Logger log = LoggerFactory.getLogger(PeerGroup.class);
static int simulateAuthTorNode = 0;
@ -70,49 +71,53 @@ public class PeerGroup {
this.networkNode = networkNode;
this.seedNodeAddresses = seeds;
init(networkNode);
}
private void init(NetworkNode networkNode) {
Log.traceCall();
networkNode.addMessageListener((message, connection) -> {
if (message instanceof MaintenanceMessage)
processMaintenanceMessage((MaintenanceMessage) message, connection);
else if (message instanceof AuthenticationRequest) {
processAuthenticationRequest(networkNode, (AuthenticationRequest) message, connection);
}
});
networkNode.addConnectionListener(new ConnectionListener() {
@Override
public void onConnection(Connection connection) {
Log.traceCall();
}
@Override
public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) {
Log.traceCall();
}
@Override
public void onDisconnect(Reason reason, Connection connection) {
Log.traceCall();
log.debug("onDisconnect connection=" + connection + " / reason=" + reason);
// only removes authenticated nodes
if (connection.isAuthenticated())
removePeer(connection.getPeerAddress());
}
@Override
public void onError(Throwable throwable) {
Log.traceCall();
}
});
networkNode.addMessageListener(this);
networkNode.addConnectionListener(this);
setupMaintenanceTimer();
}
///////////////////////////////////////////////////////////////////////////////////////////
// MessageListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onMessage(Message message, Connection connection) {
Log.traceCall();
if (message instanceof MaintenanceMessage)
processMaintenanceMessage((MaintenanceMessage) message, connection);
else if (message instanceof AuthenticationRequest) {
processAuthenticationRequest(networkNode, (AuthenticationRequest) message, connection);
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// ConnectionListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onConnection(Connection connection) {
}
@Override
public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) {
}
@Override
public void onDisconnect(Reason reason, Connection connection) {
log.debug("onDisconnect connection=" + connection + " / reason=" + reason);
// only removes authenticated nodes
if (connection.isAuthenticated())
removePeer(connection.getPeerAddress());
}
@Override
public void onError(Throwable throwable) {
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
@ -122,6 +127,35 @@ public class PeerGroup {
seedNodeAddresses.remove(mySeedNodeAddress);
}
public void broadcast(BroadcastMessage message, @Nullable Address sender) {
Log.traceCall("Sender " + sender + ". Message " + message.toString());
if (authenticatedPeers.values().size() > 0) {
log.info("Broadcast message to {} peers. Message:", authenticatedPeers.values().size(), message);
// TODO add randomized timing?
authenticatedPeers.values().stream()
.filter(e -> !e.address.equals(sender))
.forEach(peer -> {
log.trace("Broadcast message from " + getMyAddress() + " to " + peer.address + ".");
SettableFuture<Connection> future = networkNode.sendMessage(peer.address, message);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("Broadcast from " + getMyAddress() + " to " + peer.address + " succeeded.");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("Broadcast failed. " + throwable.getMessage());
removePeer(peer.address);
}
});
});
} else {
log.trace("Message {} not broadcasted because we are not authenticated yet. " +
"That is expected at startup.", message);
}
}
public void shutDown() {
Log.traceCall();
if (!shutDownInProgress) {
@ -131,32 +165,6 @@ public class PeerGroup {
}
}
public void broadcast(BroadcastMessage message, @Nullable Address sender) {
Log.traceCall();
log.trace("Broadcast message to " + authenticatedPeers.values().size() + " peers.");
log.trace("message = " + message);
// TODO add randomized timing?
authenticatedPeers.values().stream()
.filter(e -> !e.address.equals(sender))
.forEach(peer -> {
log.trace("Broadcast message from " + getMyAddress() + " to " + peer.address + ".");
SettableFuture<Connection> future = networkNode.sendMessage(peer.address, message);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("Broadcast from " + getMyAddress() + " to " + peer.address + " succeeded.");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("Broadcast failed. " + throwable.getMessage());
removePeer(peer.address);
}
});
});
}
///////////////////////////////////////////////////////////////////////////////////////////
// Authentication to seed node
@ -536,16 +544,6 @@ public class PeerGroup {
// Listeners
///////////////////////////////////////////////////////////////////////////////////////////
public void addMessageListener(MessageListener messageListener) {
Log.traceCall();
networkNode.addMessageListener(messageListener);
}
public void removeMessageListener(MessageListener messageListener) {
Log.traceCall();
networkNode.removeMessageListener(messageListener);
}
public void addPeerListener(PeerListener peerListener) {
Log.traceCall();
peerListeners.add(peerListener);
@ -579,6 +577,10 @@ public class PeerGroup {
return seedNodeAddresses;
}
public NetworkNode getNetworkNode() {
return networkNode;
}
///////////////////////////////////////////////////////////////////////////////////////////
// Reported peers
@ -675,19 +677,20 @@ public class PeerGroup {
public void printAuthenticatedPeers() {
Log.traceCall();
StringBuilder result = new StringBuilder("\n\n############################################################\n" +
StringBuilder result = new StringBuilder("\n\n------------------------------------------------------------\n" +
"Authenticated peers for node " + getMyAddress() + ":");
authenticatedPeers.values().stream().forEach(e -> result.append("\n").append(e.address));
result.append("\n############################################################\n");
result.append("\n------------------------------------------------------------\n");
log.info(result.toString());
}
public void printReportedPeers() {
Log.traceCall();
StringBuilder result = new StringBuilder("\n\n############################################################\n" +
StringBuilder result = new StringBuilder("\n\n------------------------------------------------------------\n" +
"Reported peers for node " + getMyAddress() + ":");
reportedPeerAddresses.stream().forEach(e -> result.append("\n").append(e));
result.append("\n############################################################\n");
result.append("\n------------------------------------------------------------\n");
log.info(result.toString());
}
}

View File

@ -8,8 +8,11 @@ import io.bitsquare.common.crypto.Hash;
import io.bitsquare.common.crypto.Sig;
import io.bitsquare.common.util.Utilities;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.IllegalRequest;
import io.bitsquare.p2p.network.MessageListener;
import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.peers.PeerGroup;
import io.bitsquare.p2p.storage.data.*;
import io.bitsquare.p2p.storage.messages.*;
@ -29,7 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
// Run in UserThread
public class ProtectedExpirableDataStorage {
public class ProtectedExpirableDataStorage implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(ProtectedExpirableDataStorage.class);
@VisibleForTesting
@ -40,7 +43,6 @@ public class ProtectedExpirableDataStorage {
private final CopyOnWriteArraySet<HashMapChangedListener> hashMapChangedListeners = new CopyOnWriteArraySet<>();
private ConcurrentHashMap<BigInteger, Integer> sequenceNumberMap = new ConcurrentHashMap<>();
private final Storage<ConcurrentHashMap> storage;
private boolean authenticated;
private final Timer timer = new Timer();
private volatile boolean shutDownInProgress;
@ -65,25 +67,8 @@ public class ProtectedExpirableDataStorage {
sequenceNumberMap = persisted;
}
addMessageListener((message, connection) -> {
Log.traceCall("onMessage: Message=" + message);
if (message instanceof DataMessage) {
if (connection.isAuthenticated()) {
log.trace("ProtectedExpirableDataMessage received " + message + " on connection " + connection);
if (message instanceof AddDataMessage) {
add(((AddDataMessage) message).data, connection.getPeerAddress());
} else if (message instanceof RemoveDataMessage) {
remove(((RemoveDataMessage) message).data, connection.getPeerAddress());
} else if (message instanceof RemoveMailboxDataMessage) {
removeMailboxData(((RemoveMailboxDataMessage) message).data, connection.getPeerAddress());
}
} else {
log.warn("Connection is not authenticated yet. We don't accept storage operations form non-authenticated nodes.");
log.warn("Connection = " + connection);
connection.reportIllegalRequest(IllegalRequest.NotAuthenticated);
}
}
});
NetworkNode networkNode = peerGroup.getNetworkNode();
networkNode.addMessageListener(this);
timer.scheduleAtFixedRate(new TimerTask() {
@Override
@ -107,6 +92,31 @@ public class ProtectedExpirableDataStorage {
.forEach(entry -> map.remove(entry.getKey()));
}
///////////////////////////////////////////////////////////////////////////////////////////
// MessageListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onMessage(Message message, Connection connection) {
Log.traceCall("Message=" + message);
if (message instanceof DataMessage) {
if (connection.isAuthenticated()) {
log.trace("ProtectedExpirableDataMessage received " + message + " on connection " + connection);
if (message instanceof AddDataMessage) {
add(((AddDataMessage) message).data, connection.getPeerAddress());
} else if (message instanceof RemoveDataMessage) {
remove(((RemoveDataMessage) message).data, connection.getPeerAddress());
} else if (message instanceof RemoveMailboxDataMessage) {
removeMailboxData(((RemoveMailboxDataMessage) message).data, connection.getPeerAddress());
}
} else {
log.warn("Connection is not authenticated yet. We don't accept storage operations form non-authenticated nodes.");
log.warn("Connection = " + connection);
connection.reportIllegalRequest(IllegalRequest.NotAuthenticated);
}
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
@ -121,11 +131,6 @@ public class ProtectedExpirableDataStorage {
}
}
public void setAuthenticated() {
Log.traceCall();
this.authenticated = true;
}
public boolean add(ProtectedData protectedData, @Nullable Address sender) {
Log.traceCall();
BigInteger hashOfPayload = getHashAsBigInteger(protectedData.expirablePayload);
@ -143,10 +148,10 @@ public class ProtectedExpirableDataStorage {
log.trace("Data added to our map and it will be broadcasted to our peers.");
hashMapChangedListeners.stream().forEach(e -> e.onAdded(protectedData));
StringBuilder sb = new StringBuilder("\n\n############################################################\n");
StringBuilder sb = new StringBuilder("\n\n------------------------------------------------------------\n");
sb.append("Data set after addProtectedExpirableData:");
map.values().stream().forEach(e -> sb.append("\n").append(e.toString()).append("\n"));
sb.append("\n############################################################\n");
sb.append("\n------------------------------------------------------------\n");
log.info(sb.toString());
if (!containsKey)
@ -211,7 +216,6 @@ public class ProtectedExpirableDataStorage {
}
public Map<BigInteger, ProtectedData> getMap() {
//Log.traceCall();
return map;
}
@ -252,11 +256,6 @@ public class ProtectedExpirableDataStorage {
hashMapChangedListeners.add(hashMapChangedListener);
}
private void addMessageListener(MessageListener messageListener) {
Log.traceCall();
peerGroup.addMessageListener(messageListener);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
@ -268,11 +267,11 @@ public class ProtectedExpirableDataStorage {
log.trace("Data removed from our map. We broadcast the message to our peers.");
hashMapChangedListeners.stream().forEach(e -> e.onRemoved(protectedData));
StringBuilder sb = new StringBuilder("\n\n############################################################\n" +
StringBuilder sb = new StringBuilder("\n\n------------------------------------------------------------\n" +
"Data set after removeProtectedExpirableData:");
map.values().stream().forEach(e -> sb.append("\n").append(e.toString()));
sb.append("\n############################################################\n");
log.trace(sb.toString());
sb.append("\n------------------------------------------------------------\n");
log.info(sb.toString());
}
private boolean isSequenceNrValid(ProtectedData data, BigInteger hashOfData) {
@ -352,17 +351,12 @@ public class ProtectedExpirableDataStorage {
}
private void broadcast(BroadcastMessage message, @Nullable Address sender) {
Log.traceCall();
if (authenticated) {
peerGroup.broadcast(message, sender);
log.trace("Broadcast message " + message);
} else {
log.trace("Broadcast not allowed because we are not authenticated yet. That is normal after received AllDataMessage at startup.");
}
Log.traceCall(message.toString());
peerGroup.broadcast(message, sender);
}
private BigInteger getHashAsBigInteger(ExpirablePayload payload) {
//Log.traceCall();
return new BigInteger(Hash.getHash(payload));
}
}

View File

@ -282,7 +282,7 @@ public class P2PServiceTest {
// send to online peer
CountDownLatch latch2 = new CountDownLatch(2);
MockMailboxMessage mockMessage = new MockMailboxMessage("MockMailboxMessage", p2PService2.getAddress());
p2PService2.addMessageListener((message, connection) -> {
p2PService2.getNetworkNode().addMessageListener((message, connection) -> {
log.trace("message " + message);
if (message instanceof SealedAndSignedMessage) {
try {