Handle offer removal on disconnects

This commit is contained in:
Manfred Karrer 2016-02-18 00:42:22 +01:00
parent cb236f65fb
commit 58bb1868a3
29 changed files with 532 additions and 385 deletions

View file

@ -10,7 +10,7 @@ public class ByteArray implements Serializable {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.P2P_NETWORK_VERSION;
private final byte[] bytes;
public final byte[] bytes;
public ByteArray(byte[] bytes) {
this.bytes = bytes;

View file

@ -20,7 +20,7 @@ package io.bitsquare.alert;
import com.google.inject.Inject;
import io.bitsquare.common.crypto.KeyRing;
import io.bitsquare.p2p.storage.HashMapChangedListener;
import io.bitsquare.p2p.storage.ProtectedData;
import io.bitsquare.p2p.storage.data.ProtectedData;
import io.bitsquare.user.User;
import javafx.beans.property.ObjectProperty;
import javafx.beans.property.ReadOnlyObjectProperty;

View file

@ -30,7 +30,7 @@ import io.bitsquare.p2p.BootstrapListener;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.P2PService;
import io.bitsquare.p2p.storage.HashMapChangedListener;
import io.bitsquare.p2p.storage.ProtectedData;
import io.bitsquare.p2p.storage.data.ProtectedData;
import io.bitsquare.user.User;
import javafx.collections.FXCollections;
import javafx.collections.ObservableMap;

View file

@ -25,7 +25,7 @@ import io.bitsquare.common.handlers.ResultHandler;
import io.bitsquare.common.util.JsonExclude;
import io.bitsquare.locale.Country;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.storage.messages.RequiresLiveOwnerData;
import io.bitsquare.p2p.storage.messages.RequiresOwnerIsOnlineMessage;
import io.bitsquare.p2p.storage.messages.StorageMessage;
import io.bitsquare.payment.PaymentMethod;
import io.bitsquare.trade.protocol.availability.OfferAvailabilityModel;
@ -47,7 +47,7 @@ import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
public final class Offer implements StorageMessage, RequiresLiveOwnerData {
public final class Offer implements StorageMessage, RequiresOwnerIsOnlineMessage {
// That object is sent over the wire, so we need to take care of version compatibility.
@JsonExclude
private static final long serialVersionUID = Version.P2P_NETWORK_VERSION;

View file

@ -21,7 +21,7 @@ import io.bitsquare.common.handlers.ErrorMessageHandler;
import io.bitsquare.common.handlers.ResultHandler;
import io.bitsquare.p2p.P2PService;
import io.bitsquare.p2p.storage.HashMapChangedListener;
import io.bitsquare.p2p.storage.ProtectedData;
import io.bitsquare.p2p.storage.data.ProtectedData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -84,21 +84,27 @@ public class OfferBookService {
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void republishOffers(Offer offer, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
doAddOffer(offer, resultHandler, errorMessageHandler, true);
}
public void addOffer(Offer offer, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
doAddOffer(offer, resultHandler, errorMessageHandler, false);
}
public void republishOffer(Offer offer, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
doAddOffer(offer, resultHandler, errorMessageHandler, true);
public void doAddOffer(Offer offer, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler, boolean forceBroadcast) {
boolean result = p2PService.addData(offer, forceBroadcast);
if (result) {
log.trace("Add offer to network was successful. Offer ID = " + offer.getId());
resultHandler.handleResult();
} else {
errorMessageHandler.handleErrorMessage("Add offer failed");
}
}
private void doAddOffer(Offer offer, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler, boolean rePublish) {
boolean result;
if (rePublish)
result = p2PService.republishData(offer);
else
result = p2PService.addData(offer);
public void refreshOffer(Offer offer, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
boolean result = p2PService.refreshTTL(offer);
if (result) {
log.trace("Add offer to network was successful. Offer ID = " + offer.getId());
resultHandler.handleResult();

View file

@ -30,6 +30,10 @@ import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.P2PService;
import io.bitsquare.p2p.messaging.SendDirectMessageListener;
import io.bitsquare.p2p.network.CloseConnectionReason;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.ConnectionListener;
import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.storage.Storage;
import io.bitsquare.trade.TradableList;
import io.bitsquare.trade.closed.ClosedTradableManager;
@ -70,6 +74,10 @@ public class OpenOfferManager {
private boolean shutDownRequested;
private BootstrapListener bootstrapListener;
private final Timer timer = new Timer();
private Timer republishOffersTime;
private boolean firstTimeConnection;
private boolean allowRefreshOffers;
private boolean lostAllConnections;
///////////////////////////////////////////////////////////////////////////////////////////
@ -112,6 +120,44 @@ public class OpenOfferManager {
if (message instanceof OfferAvailabilityRequest)
handleOfferAvailabilityRequest((OfferAvailabilityRequest) message, peersNodeAddress);
});
NetworkNode networkNode = p2PService.getNetworkNode();
networkNode.addConnectionListener(new ConnectionListener() {
@Override
public void onConnection(Connection connection) {
log.error("ConnectionListener onConnection size " + networkNode.getAllConnections().size());
log.error("ConnectionListener onConnection lostAllConnections " + lostAllConnections);
log.error("ConnectionListener onConnection allowRefreshOffers " + allowRefreshOffers);
log.error("ConnectionListener onConnection republishOffersTime " + republishOffersTime);
if (lostAllConnections) {
lostAllConnections = false;
allowRefreshOffers = false;
// We repeat a rePublishOffers call after 10 seconds if we have more than 3 peers
if (republishOffersTime == null) {
republishOffersTime = UserThread.runAfter(() -> {
if (networkNode.getAllConnections().size() > 3)
republishOffers();
allowRefreshOffers = true;
republishOffersTime = null;
}, 5);
}
}
}
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
log.error("ConnectionListener onDisconnect size " + networkNode.getAllConnections().size());
lostAllConnections = networkNode.getAllConnections().isEmpty();
if (lostAllConnections)
allowRefreshOffers = false;
}
@Override
public void onError(Throwable throwable) {
}
});
}
@ -132,53 +178,75 @@ public class OpenOfferManager {
bootstrapListener = new BootstrapListener() {
@Override
public void onBootstrapComplete() {
startRePublishThread();
onBootstrapped();
}
};
p2PService.addP2PServiceListener(bootstrapListener);
} else {
startRePublishThread();
onBootstrapped();
}
}
private void startRePublishThread() {
private void onBootstrapped() {
if (bootstrapListener != null)
p2PService.removeP2PServiceListener(bootstrapListener);
// republish sufficiently before offer would expire
republishOffers();
startRefreshOffersThread();
//TODO should not be needed
// startRepublishOffersThread();
}
private void startRefreshOffersThread() {
allowRefreshOffers = true;
// refresh sufficiently before offer would expire
long period = (long) (Offer.TTL * 0.7);
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
UserThread.execute(OpenOfferManager.this::rePublishOffers);
UserThread.execute(OpenOfferManager.this::refreshOffers);
}
};
timer.scheduleAtFixedRate(timerTask, 500, period);
p2PService.getNumConnectedPeers().addListener((observable, oldValue, newValue) -> {
if ((int) oldValue == 0 && (int) newValue > 0) {
rePublishOffers();
// We repeat a rePublishOffers call after 10 seconds if we have more than 3 peers
UserThread.runAfter(() -> {
if (p2PService.getNumConnectedPeers().get() > 3)
rePublishOffers();
}, 10);
}
});
timer.scheduleAtFixedRate(timerTask, period, period);
}
private void rePublishOffers() {
private void startRepublishOffersThread() {
long period = Offer.TTL * 10;
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
UserThread.execute(OpenOfferManager.this::republishOffers);
}
};
timer.scheduleAtFixedRate(timerTask, period, period);
}
private void republishOffers() {
log.error("republishOffers ");
Log.traceCall("Number of offer for republish: " + openOffers.size());
for (OpenOffer openOffer : openOffers) {
offerBookService.republishOffer(openOffer.getOffer(),
offerBookService.republishOffers(openOffer.getOffer(),
() -> log.debug("Successful added offer to P2P network"),
errorMessage -> log.error("Add offer to P2P network failed. " + errorMessage));
openOffer.setStorage(openOffersStorage);
}
}
private void refreshOffers() {
if (allowRefreshOffers) {
Log.traceCall("Number of offer for refresh: " + openOffers.size());
for (OpenOffer openOffer : openOffers) {
offerBookService.refreshOffer(openOffer.getOffer(),
() -> log.debug("Successful refreshed TTL for offer"),
errorMessage -> log.error("Refresh TTL for offer failed. " + errorMessage));
openOffer.setStorage(openOffersStorage);
}
}
}
@SuppressWarnings("WeakerAccess")
public void shutDown() {
shutDown(null);

View file

@ -1,5 +1,6 @@
package io.bitsquare.p2p;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
@ -24,8 +25,9 @@ import io.bitsquare.p2p.peers.peerexchange.PeerExchangeManager;
import io.bitsquare.p2p.seed.SeedNodesRepository;
import io.bitsquare.p2p.storage.HashMapChangedListener;
import io.bitsquare.p2p.storage.P2PDataStorage;
import io.bitsquare.p2p.storage.ProtectedData;
import io.bitsquare.p2p.storage.ProtectedMailboxData;
import io.bitsquare.p2p.storage.data.ProtectedData;
import io.bitsquare.p2p.storage.data.ProtectedMailboxData;
import io.bitsquare.p2p.storage.data.RefreshTTLBundle;
import io.bitsquare.p2p.storage.messages.AddDataMessage;
import io.bitsquare.p2p.storage.messages.ExpirableMessage;
import io.bitsquare.p2p.storage.messages.MailboxMessage;
@ -630,25 +632,32 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
///////////////////////////////////////////////////////////////////////////////////////////
public boolean addData(ExpirableMessage expirableMessage) {
Log.traceCall();
return doAddData(expirableMessage, false);
return addData(expirableMessage, false);
}
public boolean republishData(ExpirableMessage expirableMessage) {
Log.traceCall();
return doAddData(expirableMessage, true);
}
private boolean doAddData(ExpirableMessage expirableMessage, boolean rePublish) {
public boolean addData(ExpirableMessage expirableMessage, boolean forceBroadcast) {
Log.traceCall();
checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen.");
if (isBootstrapped()) {
try {
ProtectedData protectedData = p2PDataStorage.getDataWithSignedSeqNr(expirableMessage, optionalKeyRing.get().getSignatureKeyPair());
if (rePublish)
return p2PDataStorage.rePublish(protectedData, networkNode.getNodeAddress());
else
return p2PDataStorage.add(protectedData, networkNode.getNodeAddress());
ProtectedData protectedData = p2PDataStorage.getProtectedData(expirableMessage, optionalKeyRing.get().getSignatureKeyPair());
return p2PDataStorage.add(protectedData, networkNode.getNodeAddress(), forceBroadcast);
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
return false;
}
} else {
throw new NetworkNotReadyException();
}
}
public boolean refreshTTL(ExpirableMessage expirableMessage) {
Log.traceCall();
checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen.");
if (isBootstrapped()) {
try {
RefreshTTLBundle refreshTTLBundle = p2PDataStorage.getRefreshTTLPackage(expirableMessage, optionalKeyRing.get().getSignatureKeyPair());
return p2PDataStorage.refreshTTL(refreshTTLBundle, networkNode.getNodeAddress());
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
return false;
@ -663,7 +672,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen.");
if (isBootstrapped()) {
try {
ProtectedData protectedData = p2PDataStorage.getDataWithSignedSeqNr(expirableMessage, optionalKeyRing.get().getSignatureKeyPair());
ProtectedData protectedData = p2PDataStorage.getProtectedData(expirableMessage, optionalKeyRing.get().getSignatureKeyPair());
return p2PDataStorage.remove(protectedData, networkNode.getNodeAddress());
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
@ -732,6 +741,16 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
return p2PDataStorage.getMap();
}
@VisibleForTesting
public P2PDataStorage getP2PDataStorage() {
return p2PDataStorage;
}
@VisibleForTesting
public PeerManager getPeerManager() {
return peerManager;
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////

View file

@ -9,23 +9,29 @@ public enum CloseConnectionReason {
UNKNOWN_EXCEPTION(false),
// Planned
APP_SHUT_DOWN(true),
CLOSE_REQUESTED_BY_PEER(false),
APP_SHUT_DOWN(true, true),
CLOSE_REQUESTED_BY_PEER(false, true),
// send msg
SEND_MSG_FAILURE(false),
SEND_MSG_TIMEOUT(false),
// maintenance
TOO_MANY_CONNECTIONS_OPEN(true),
TOO_MANY_SEED_NODES_CONNECTED(true),
TOO_MANY_CONNECTIONS_OPEN(true, true),
TOO_MANY_SEED_NODES_CONNECTED(true, true),
// illegal requests
RULE_VIOLATION(true);
RULE_VIOLATION(true, true);
public final boolean sendCloseMessage;
public boolean isIntended;
CloseConnectionReason(boolean sendCloseMessage) {
this(sendCloseMessage, true);
}
CloseConnectionReason(boolean sendCloseMessage, boolean isIntended) {
this.sendCloseMessage = sendCloseMessage;
this.isIntended = isIntended;
}
}

View file

@ -161,19 +161,21 @@ public class Connection implements MessageListener {
if (!stopped) {
try {
String peersNodeAddress = peersNodeAddressOptional.isPresent() ? peersNodeAddressOptional.get().toString() : "null";
int size = ByteArrayUtils.objectToByteArray(message).length;
if (message instanceof PrefixedSealedAndSignedMessage && peersNodeAddressOptional.isPresent()) {
setPeerType(Connection.PeerType.DIRECT_MSG_PEER);
log.info("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" +
"Sending direct message to peer" +
"Write object to outputStream to peer: {} (uid={})\ntruncated message={}" +
"Write object to outputStream to peer: {} (uid={})\ntruncated message={} / size={}" +
"\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n",
peersNodeAddress, uid, StringUtils.abbreviate(message.toString(), 100));
peersNodeAddress, uid, StringUtils.abbreviate(message.toString(), 100), size);
} else {
log.info("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" +
"Write object to outputStream to peer: {} (uid={})\ntruncated message={}" +
"Write object to outputStream to peer: {} (uid={})\ntruncated message={} / size={}" +
"\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n",
peersNodeAddress, uid, StringUtils.abbreviate(message.toString(), 100));
peersNodeAddress, uid, StringUtils.abbreviate(message.toString(), 100), size);
}
Object objectToWrite;
@ -192,7 +194,8 @@ public class Connection implements MessageListener {
objectOutputStream.writeObject(objectToWrite);
objectOutputStream.flush();
statistic.addSentBytes(ByteArrayUtils.objectToByteArray(objectToWrite).length);
statistic.addSentBytes(size);
statistic.addSentMessage(message);
// We don't want to get the activity ts updated by ping/pong msg
@ -272,7 +275,7 @@ public class Connection implements MessageListener {
this.peerType = peerType;
}
private void setPeersNodeAddress(NodeAddress peerNodeAddress) {
public void setPeersNodeAddress(NodeAddress peerNodeAddress) {
checkNotNull(peerNodeAddress, "peerAddress must not be null");
peersNodeAddressOptional = Optional.of(peerNodeAddress);
@ -594,14 +597,16 @@ public class Connection implements MessageListener {
log.trace("InputHandler waiting for incoming messages.\n\tConnection=" + sharedModel.connection);
Object rawInputObject = objectInputStream.readObject();
int size = ByteArrayUtils.objectToByteArray(rawInputObject).length;
log.info("\n\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n" +
"New data arrived at inputHandler of connection {}.\n" +
"Received object (truncated)={}"
"Received object (truncated)={} / size={}"
+ "\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n",
sharedModel.connection,
StringUtils.abbreviate(rawInputObject.toString(), 100));
StringUtils.abbreviate(rawInputObject.toString(), 100),
size);
int size = ByteArrayUtils.objectToByteArray(rawInputObject).length;
if (size > getMaxMsgSize()) {
reportInvalidRequest(RuleViolation.MAX_MSG_SIZE_EXCEEDED);
return;

View file

@ -2,7 +2,7 @@ package io.bitsquare.p2p.peers.getdata.messages;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.storage.ProtectedData;
import io.bitsquare.p2p.storage.data.ProtectedData;
import java.util.HashSet;

View file

@ -23,7 +23,7 @@ import java.util.Random;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class KeepAliveManager implements MessageListener {
public class KeepAliveManager implements MessageListener, ConnectionListener {
private static final Logger log = LoggerFactory.getLogger(KeepAliveManager.class);
private static final int INTERVAL_SEC = new Random().nextInt(10) + 10;
@ -44,6 +44,7 @@ public class KeepAliveManager implements MessageListener {
this.peerManager = peerManager;
networkNode.addMessageListener(this);
networkNode.addConnectionListener(this);
}
public void shutDown() {
@ -51,6 +52,7 @@ public class KeepAliveManager implements MessageListener {
shutDownInProgress = true;
networkNode.removeMessageListener(this);
networkNode.removeConnectionListener(this);
maintenanceHandlerMap.values().stream().forEach(KeepAliveHandler::cleanup);
if (executor != null)
@ -103,6 +105,28 @@ public class KeepAliveManager implements MessageListener {
}
///////////////////////////////////////////////////////////////////////////////////////////
// ConnectionListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onConnection(Connection connection) {
// clean up in case we could not clean up at disconnect
if (connection.getPeersNodeAddressOptional().isPresent())
maintenanceHandlerMap.remove(connection.getPeersNodeAddressOptional().get().getFullAddress());
}
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
if (connection.getPeersNodeAddressOptional().isPresent())
maintenanceHandlerMap.remove(connection.getPeersNodeAddressOptional().get().getFullAddress());
}
@Override
public void onError(Throwable throwable) {
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////

View file

@ -14,7 +14,7 @@ public final class Ping extends KeepAliveMessage {
@Override
public String toString() {
return "PingRequest{" +
return "Ping{" +
", nonce=" + nonce +
"} " + super.toString();
}

View file

@ -14,7 +14,7 @@ public final class Pong extends KeepAliveMessage {
@Override
public String toString() {
return "PongResponse{" +
return "Pong{" +
"requestNonce=" + requestNonce +
"} " + super.toString();
}

View file

@ -89,6 +89,11 @@ class PeerExchangeHandler implements MessageListener {
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
if (!connection.getPeersNodeAddressOptional().isPresent()) {
connection.setPeersNodeAddress(nodeAddress);
//TODO remove setPeersNodeAddress if never needed
log.warn("sendGetPeersRequest: !connection.getPeersNodeAddressOptional().isPresent()");
}
PeerExchangeHandler.this.connection = connection;
connection.addMessageListener(PeerExchangeHandler.this);
log.trace("Send " + getPeersRequest + " to " + nodeAddress + " succeeded.");

View file

@ -1,5 +1,7 @@
package io.bitsquare.p2p.storage;
import io.bitsquare.p2p.storage.data.ProtectedData;
public interface HashMapChangedListener {
void onAdded(ProtectedData data);

View file

@ -14,6 +14,9 @@ import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.*;
import io.bitsquare.p2p.peers.Broadcaster;
import io.bitsquare.p2p.storage.data.ProtectedData;
import io.bitsquare.p2p.storage.data.ProtectedMailboxData;
import io.bitsquare.p2p.storage.data.RefreshTTLBundle;
import io.bitsquare.p2p.storage.messages.*;
import io.bitsquare.storage.Storage;
import org.apache.commons.lang3.StringUtils;
@ -39,7 +42,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
private static final Logger log = LoggerFactory.getLogger(P2PDataStorage.class);
@VisibleForTesting
public static int CHECK_TTL_INTERVAL_SEC = 30;
public static int CHECK_TTL_INTERVAL_MILLIS = (int) TimeUnit.SECONDS.toMillis(30);
private final Broadcaster broadcaster;
private final Map<ByteArray, ProtectedData> map = new ConcurrentHashMap<>();
@ -93,7 +96,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
if (sequenceNumberMap.size() > 1000)
sequenceNumberMap = getPurgedSequenceNumberMap(sequenceNumberMap);
}), CHECK_TTL_INTERVAL_SEC, CHECK_TTL_INTERVAL_SEC, TimeUnit.SECONDS);
}), CHECK_TTL_INTERVAL_MILLIS, CHECK_TTL_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
}
@ -104,8 +107,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
@Override
public void onMessage(Message message, Connection connection) {
if (message instanceof DataBroadcastMessage) {
Log.traceCall(message.toString() + "\n\tconnection=" + connection);
log.trace("DataBroadcastMessage received " + message + "\n\tconnection " + connection);
Log.traceCall(StringUtils.abbreviate(message.toString(), 100) + "\n\tconnection=" + connection);
connection.getPeersNodeAddressOptional().ifPresent(peersNodeAddress -> {
if (message instanceof AddDataMessage) {
add(((AddDataMessage) message).data, peersNodeAddress);
@ -113,6 +115,8 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
remove(((RemoveDataMessage) message).data, peersNodeAddress);
} else if (message instanceof RemoveMailboxDataMessage) {
removeMailboxData(((RemoveMailboxDataMessage) message).data, peersNodeAddress);
} else if (message instanceof RefreshTTLMessage) {
refreshTTL(((RefreshTTLMessage) message).refreshTTLBundle, peersNodeAddress);
}
});
}
@ -129,13 +133,13 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
if (connection.getPeersNodeAddressOptional().isPresent()) {
if (connection.getPeersNodeAddressOptional().isPresent() && !closeConnectionReason.isIntended) {
map.values().stream()
.forEach(protectedData -> {
ExpirableMessage expirableMessage = protectedData.expirableMessage;
if (expirableMessage instanceof RequiresLiveOwnerData) {
RequiresLiveOwnerData requiresLiveOwnerData = (RequiresLiveOwnerData) expirableMessage;
NodeAddress ownerNodeAddress = requiresLiveOwnerData.getOwnerNodeAddress();
if (expirableMessage instanceof RequiresOwnerIsOnlineMessage) {
RequiresOwnerIsOnlineMessage requiresOwnerIsOnlineMessage = (RequiresOwnerIsOnlineMessage) expirableMessage;
NodeAddress ownerNodeAddress = requiresOwnerIsOnlineMessage.getOwnerNodeAddress();
if (ownerNodeAddress.equals(connection.getPeersNodeAddressOptional().get())) {
// We have a RequiresLiveOwnerData data object with the node address of the
// disconnected peer. We remove that data from our map.
@ -170,26 +174,20 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
}
public boolean add(ProtectedData protectedData, @Nullable NodeAddress sender) {
Log.traceCall();
return doAdd(protectedData, sender, false);
return add(protectedData, sender, false);
}
public boolean rePublish(ProtectedData protectedData, @Nullable NodeAddress sender) {
Log.traceCall();
return doAdd(protectedData, sender, true);
}
private boolean doAdd(ProtectedData protectedData, @Nullable NodeAddress sender, boolean rePublish) {
public boolean add(ProtectedData protectedData, @Nullable NodeAddress sender, boolean forceBroadcast) {
Log.traceCall();
ByteArray hashOfPayload = getHashAsByteArray(protectedData.expirableMessage);
boolean result = checkPublicKeys(protectedData, true)
&& checkSignature(protectedData)
&& isSequenceNrValid(protectedData, hashOfPayload);
&& isSequenceNrValid(protectedData.sequenceNumber, hashOfPayload);
boolean containsKey = map.containsKey(hashOfPayload);
if (containsKey)
result &= checkIfStoredDataPubKeyMatchesNewDataPubKey(protectedData, hashOfPayload);
result &= checkIfStoredDataPubKeyMatchesNewDataPubKey(protectedData.ownerPubKey, hashOfPayload);
if (result) {
map.put(hashOfPayload, protectedData);
@ -198,20 +196,21 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
// even we had the data with the old seq nr. already
if (sequenceNumberMap.containsKey(hashOfPayload) &&
protectedData.sequenceNumber > sequenceNumberMap.get(hashOfPayload).first)
rePublish = true;
sequenceNumberMap.put(hashOfPayload, new Tuple2<>(protectedData.sequenceNumber, System.currentTimeMillis()));
storage.queueUpForSave(sequenceNumberMap, 5000);
StringBuilder sb = new StringBuilder("\n\n------------------------------------------------------------\n");
sb.append("Data set after addProtectedExpirableData (truncated)");
sb.append("Data set after doAdd (truncated)");
map.values().stream().forEach(e -> sb.append("\n").append(StringUtils.abbreviate(e.toString(), 100)));
sb.append("\n------------------------------------------------------------\n");
log.trace(sb.toString());
log.info("Data set after addProtectedExpirableData: size=" + map.values().size());
log.info("Data set after doAdd: size=" + map.values().size());
if (rePublish || !containsKey)
if (!containsKey || forceBroadcast)
broadcast(new AddDataMessage(protectedData), sender);
else
log.trace("Not broadcasting data as we had it already in our map.");
hashMapChangedListeners.stream().forEach(e -> e.onAdded(protectedData));
} else {
@ -220,6 +219,51 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
return result;
}
public boolean refreshTTL(RefreshTTLBundle refreshTTLBundle, @Nullable NodeAddress sender) {
Log.traceCall();
PublicKey ownerPubKey = refreshTTLBundle.ownerPubKey;
byte[] hashOfDataAndSeqNr = refreshTTLBundle.hashOfDataAndSeqNr;
byte[] signature = refreshTTLBundle.signature;
int sequenceNumber = refreshTTLBundle.sequenceNumber;
ByteArray hashOfPayload = new ByteArray(refreshTTLBundle.hashOfPayload);
if (map.containsKey(hashOfPayload)) {
if (sequenceNumberMap.containsKey(hashOfPayload) && sequenceNumberMap.get(hashOfPayload).first == sequenceNumber) {
log.warn("We got that message with that seq nr already from another peer. We ignore that message.");
return true;
} else {
boolean result = checkSignature(ownerPubKey, hashOfDataAndSeqNr, signature) &&
isSequenceNrValid(sequenceNumber, hashOfPayload) &&
checkIfStoredDataPubKeyMatchesNewDataPubKey(ownerPubKey, hashOfPayload);
if (result) {
log.error("OK");
ProtectedData storedData = map.get(hashOfPayload);
storedData.refreshDate();
sequenceNumberMap.put(hashOfPayload, new Tuple2<>(sequenceNumber, System.currentTimeMillis()));
storage.queueUpForSave(sequenceNumberMap, 5000);
StringBuilder sb = new StringBuilder("\n\n------------------------------------------------------------\n");
sb.append("Data set after reNew (truncated)");
map.values().stream().forEach(e -> sb.append("\n").append(StringUtils.abbreviate(e.toString(), 100)));
sb.append("\n------------------------------------------------------------\n");
log.trace(sb.toString());
log.info("Data set after addProtectedExpirableData: size=" + map.values().size());
broadcast(new RefreshTTLMessage(refreshTTLBundle), sender);
} else {
log.warn("Checks for refresh failed");
}
return result;
}
} else {
log.warn("We don't have data for that refresh message in our map.");
return true;
}
}
public boolean remove(ProtectedData protectedData, @Nullable NodeAddress sender) {
Log.traceCall();
ByteArray hashOfPayload = getHashAsByteArray(protectedData.expirableMessage);
@ -227,9 +271,9 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
if (!containsKey) log.debug("Remove data ignored as we don't have an entry for that data.");
boolean result = containsKey
&& checkPublicKeys(protectedData, false)
&& isSequenceNrValid(protectedData, hashOfPayload)
&& isSequenceNrValid(protectedData.sequenceNumber, hashOfPayload)
&& checkSignature(protectedData)
&& checkIfStoredDataPubKeyMatchesNewDataPubKey(protectedData, hashOfPayload);
&& checkIfStoredDataPubKeyMatchesNewDataPubKey(protectedData.ownerPubKey, hashOfPayload);
if (result) {
@ -252,10 +296,10 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
if (!containsKey) log.debug("Remove data ignored as we don't have an entry for that data.");
boolean result = containsKey
&& checkPublicKeys(protectedMailboxData, false)
&& isSequenceNrValid(protectedMailboxData, hashOfData)
&& isSequenceNrValid(protectedMailboxData.sequenceNumber, hashOfData)
&& protectedMailboxData.receiversPubKey.equals(protectedMailboxData.ownerPubKey) // at remove both keys are the same (only receiver is able to remove data)
&& checkSignature(protectedMailboxData)
&& checkIfStoredMailboxDataMatchesNewMailboxData(protectedMailboxData, hashOfData);
&& checkIfStoredMailboxDataMatchesNewMailboxData(protectedMailboxData.receiversPubKey, hashOfData);
if (result) {
doRemoveProtectedExpirableData(protectedMailboxData, hashOfData);
@ -275,7 +319,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
return map;
}
public ProtectedData getDataWithSignedSeqNr(ExpirableMessage payload, KeyPair ownerStoragePubKey)
public ProtectedData getProtectedData(ExpirableMessage payload, KeyPair ownerStoragePubKey)
throws CryptoException {
ByteArray hashOfData = getHashAsByteArray(payload);
int sequenceNumber;
@ -289,6 +333,21 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
return new ProtectedData(payload, payload.getTTL(), ownerStoragePubKey.getPublic(), sequenceNumber, signature);
}
public RefreshTTLBundle getRefreshTTLPackage(ExpirableMessage payload, KeyPair ownerStoragePubKey)
throws CryptoException {
ByteArray hashOfPayload = getHashAsByteArray(payload);
int sequenceNumber;
if (sequenceNumberMap.containsKey(hashOfPayload))
sequenceNumber = sequenceNumberMap.get(hashOfPayload).first + 1;
else
sequenceNumber = 0;
byte[] hashOfDataAndSeqNr = Hash.getHash(new DataAndSeqNrPair(payload, sequenceNumber));
byte[] signature = Sig.sign(ownerStoragePubKey.getPrivate(), hashOfDataAndSeqNr);
PublicKey ownerPubKey = ownerStoragePubKey.getPublic();
return new RefreshTTLBundle(ownerPubKey, hashOfDataAndSeqNr, signature, hashOfPayload.bytes, sequenceNumber);
}
public ProtectedMailboxData getMailboxDataWithSignedSeqNr(MailboxMessage expirableMailboxPayload,
KeyPair storageSignaturePubKey, PublicKey receiversPublicKey)
throws CryptoException {
@ -327,8 +386,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
log.info("Data set after addProtectedExpirableData: size=" + map.values().size());
}
private boolean isSequenceNrValid(ProtectedData data, ByteArray hashOfData) {
int newSequenceNumber = data.sequenceNumber;
private boolean isSequenceNrValid(int newSequenceNumber, ByteArray hashOfData) {
if (sequenceNumberMap.containsKey(hashOfData)) {
Integer storedSequenceNumber = sequenceNumberMap.get(hashOfData).first;
if (newSequenceNumber < storedSequenceNumber) {
@ -343,10 +401,9 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
}
}
private boolean checkSignature(ProtectedData data) {
byte[] hashOfDataAndSeqNr = Hash.getHash(new DataAndSeqNrPair(data.expirableMessage, data.sequenceNumber));
private boolean checkSignature(PublicKey ownerPubKey, byte[] hashOfDataAndSeqNr, byte[] signature) {
try {
boolean result = Sig.verify(data.ownerPubKey, hashOfDataAndSeqNr, data.signature);
boolean result = Sig.verify(ownerPubKey, hashOfDataAndSeqNr, signature);
if (!result)
log.error("Signature verification failed at checkSignature. " +
"That should not happen. Consider it might be an attempt of fraud.");
@ -358,6 +415,11 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
}
}
private boolean checkSignature(ProtectedData data) {
byte[] hashOfDataAndSeqNr = Hash.getHash(new DataAndSeqNrPair(data.expirableMessage, data.sequenceNumber));
return checkSignature(data.ownerPubKey, hashOfDataAndSeqNr, data.signature);
}
private boolean checkPublicKeys(ProtectedData data, boolean isAddOperation) {
boolean result = false;
if (data.expirableMessage instanceof MailboxMessage) {
@ -375,21 +437,25 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
return result;
}
private boolean checkIfStoredDataPubKeyMatchesNewDataPubKey(ProtectedData data, ByteArray hashOfData) {
private boolean checkIfStoredDataPubKeyMatchesNewDataPubKey(PublicKey ownerPubKey, ByteArray hashOfData) {
if (map.containsKey(hashOfData)) {
ProtectedData storedData = map.get(hashOfData);
boolean result = storedData.ownerPubKey.equals(data.ownerPubKey);
boolean result = storedData.ownerPubKey.equals(ownerPubKey);
if (!result)
log.error("New data entry does not match our stored data. Consider it might be an attempt of fraud");
return result;
} else {
return false;
}
}
private boolean checkIfStoredMailboxDataMatchesNewMailboxData(ProtectedMailboxData data, ByteArray hashOfData) {
private boolean checkIfStoredMailboxDataMatchesNewMailboxData(PublicKey receiversPubKey, ByteArray hashOfData) {
ProtectedData storedData = map.get(hashOfData);
if (storedData instanceof ProtectedMailboxData) {
ProtectedMailboxData storedMailboxData = (ProtectedMailboxData) storedData;
// publicKey is not the same (stored: sender, new: receiver)
boolean result = storedMailboxData.receiversPubKey.equals(data.receiversPubKey)
boolean result = storedMailboxData.receiversPubKey.equals(receiversPubKey)
&& getHashAsByteArray(storedMailboxData.expirableMessage).equals(hashOfData);
if (!result)
log.error("New data entry does not match our stored data. Consider it might be an attempt of fraud");

View file

@ -1,6 +1,7 @@
package io.bitsquare.p2p.storage;
package io.bitsquare.p2p.storage.data;
import com.google.common.annotations.VisibleForTesting;
import io.bitsquare.p2p.storage.P2PDataStorage;
import io.bitsquare.p2p.storage.messages.ExpirableMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -46,6 +47,10 @@ public class ProtectedData implements Serializable {
}
}
public void refreshDate() {
date = new Date();
}
public boolean isExpired() {
return (new Date().getTime() - date.getTime()) > ttl;
}

View file

@ -1,5 +1,6 @@
package io.bitsquare.p2p.storage;
package io.bitsquare.p2p.storage.data;
import io.bitsquare.p2p.storage.P2PDataStorage;
import io.bitsquare.p2p.storage.messages.MailboxMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View file

@ -0,0 +1,68 @@
package io.bitsquare.p2p.storage.data;
import io.bitsquare.app.Version;
import io.bitsquare.common.crypto.Sig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.security.KeyFactory;
import java.security.NoSuchAlgorithmException;
import java.security.NoSuchProviderException;
import java.security.PublicKey;
import java.security.spec.InvalidKeySpecException;
import java.security.spec.X509EncodedKeySpec;
import java.util.Arrays;
public class RefreshTTLBundle implements Serializable {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.P2P_NETWORK_VERSION;
private static final Logger log = LoggerFactory.getLogger(RefreshTTLBundle.class);
transient public PublicKey ownerPubKey;
public final byte[] hashOfDataAndSeqNr;
public final byte[] signature;
public final byte[] hashOfPayload;
public final int sequenceNumber;
private byte[] ownerPubKeyBytes;
public RefreshTTLBundle(PublicKey ownerPubKey,
byte[] hashOfDataAndSeqNr,
byte[] signature,
byte[] hashOfPayload,
int sequenceNumber) {
this.ownerPubKey = ownerPubKey;
this.hashOfDataAndSeqNr = hashOfDataAndSeqNr;
this.signature = signature;
this.hashOfPayload = hashOfPayload;
this.sequenceNumber = sequenceNumber;
ownerPubKeyBytes = new X509EncodedKeySpec(ownerPubKey.getEncoded()).getEncoded();
}
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
try {
in.defaultReadObject();
ownerPubKey = KeyFactory.getInstance(Sig.KEY_ALGO, "BC").generatePublic(new X509EncodedKeySpec(ownerPubKeyBytes));
} catch (InvalidKeySpecException | NoSuchAlgorithmException | NoSuchProviderException e) {
e.printStackTrace();
log.error(e.getMessage());
} catch (Throwable t) {
log.trace("Cannot be deserialized." + t.getMessage());
}
}
@Override
public String toString() {
return "RefreshTTLPackage{" +
"ownerPubKey.hashCode()=" + ownerPubKey.hashCode() +
", hashOfDataAndSeqNr.hashCode()=" + Arrays.hashCode(hashOfDataAndSeqNr) +
", hashOfPayload.hashCode()=" + Arrays.hashCode(hashOfPayload) +
", sequenceNumber=" + sequenceNumber +
", signature.hashCode()=" + Arrays.hashCode(signature) +
'}';
}
}

View file

@ -1,7 +1,7 @@
package io.bitsquare.p2p.storage.messages;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.storage.ProtectedData;
import io.bitsquare.p2p.storage.data.ProtectedData;
public final class AddDataMessage extends DataBroadcastMessage {
// That object is sent over the wire, so we need to take care of version compatibility.

View file

@ -3,7 +3,7 @@ package io.bitsquare.p2p.storage.messages;
import io.bitsquare.app.Version;
import io.bitsquare.crypto.PrefixedSealedAndSignedMessage;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.storage.ProtectedData;
import io.bitsquare.p2p.storage.data.ProtectedData;
import java.security.PublicKey;
import java.util.concurrent.TimeUnit;

View file

@ -0,0 +1,38 @@
package io.bitsquare.p2p.storage.messages;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.storage.data.RefreshTTLBundle;
public final class RefreshTTLMessage extends DataBroadcastMessage {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.P2P_NETWORK_VERSION;
public final RefreshTTLBundle refreshTTLBundle;
public RefreshTTLMessage(RefreshTTLBundle refreshTTLBundle) {
this.refreshTTLBundle = refreshTTLBundle;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof RefreshTTLMessage)) return false;
RefreshTTLMessage that = (RefreshTTLMessage) o;
return !(refreshTTLBundle != null ? !refreshTTLBundle.equals(that.refreshTTLBundle) : that.refreshTTLBundle != null);
}
@Override
public int hashCode() {
return refreshTTLBundle != null ? refreshTTLBundle.hashCode() : 0;
}
@Override
public String toString() {
return "RefreshTTLMessage{" +
"refreshTTLPackage=" + refreshTTLBundle +
"} " + super.toString();
}
}

View file

@ -1,7 +1,7 @@
package io.bitsquare.p2p.storage.messages;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.storage.ProtectedData;
import io.bitsquare.p2p.storage.data.ProtectedData;
public final class RemoveDataMessage extends DataBroadcastMessage {
// That object is sent over the wire, so we need to take care of version compatibility.

View file

@ -1,7 +1,7 @@
package io.bitsquare.p2p.storage.messages;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.storage.ProtectedMailboxData;
import io.bitsquare.p2p.storage.data.ProtectedMailboxData;
public final class RemoveMailboxDataMessage extends DataBroadcastMessage {
// That object is sent over the wire, so we need to take care of version compatibility.

View file

@ -10,7 +10,7 @@ import java.io.Serializable;
* This is used for the offers to avoid dead offers in case the offerer is in sleep/hibernate mode or the app has
* terminated without sending the remove message (e.g. in case of a crash).
*/
public interface RequiresLiveOwnerData extends Serializable {
public interface RequiresOwnerIsOnlineMessage extends Serializable {
/**
* @return NodeAddress of the data owner
*/

View file

@ -1,7 +1,7 @@
package io.bitsquare.p2p.storage.messages;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.storage.ProtectedData;
import io.bitsquare.p2p.storage.data.ProtectedData;
import java.security.PublicKey;

View file

@ -11,7 +11,7 @@ import io.bitsquare.p2p.network.LocalhostNetworkNode;
import io.bitsquare.p2p.peers.PeerManager;
import io.bitsquare.p2p.seed.SeedNode;
import io.bitsquare.p2p.storage.P2PDataStorage;
import io.bitsquare.p2p.storage.ProtectedData;
import io.bitsquare.p2p.storage.data.ProtectedData;
import io.bitsquare.p2p.storage.mocks.MockData;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.junit.*;

View file

@ -16,6 +16,7 @@ public class TestUtils {
private static final Logger log = LoggerFactory.getLogger(TestUtils.class);
public static int sleepTime;
public static String test_dummy_dir = "test_dummy_dir";
public static KeyPair generateKeyPair() throws NoSuchAlgorithmException {
long ts = System.currentTimeMillis();
@ -70,13 +71,13 @@ public class TestUtils {
seedNodes.add(new NodeAddress("localhost:8002"));
seedNodes.add(new NodeAddress("localhost:8003"));
sleepTime = 100;
seedNode = new SeedNode("test_dummy_dir");
seedNode = new SeedNode(test_dummy_dir);
} else {
seedNodes.add(new NodeAddress("3omjuxn7z73pxoee.onion:8001"));
seedNodes.add(new NodeAddress("j24fxqyghjetgpdx.onion:8002"));
seedNodes.add(new NodeAddress("45367tl6unwec6kw.onion:8003"));
sleepTime = 10000;
seedNode = new SeedNode("test_dummy_dir");
seedNode = new SeedNode(test_dummy_dir);
}
CountDownLatch latch = new CountDownLatch(1);

View file

@ -4,33 +4,33 @@ import io.bitsquare.common.UserThread;
import io.bitsquare.common.crypto.*;
import io.bitsquare.common.util.Utilities;
import io.bitsquare.crypto.EncryptionService;
import io.bitsquare.crypto.PrefixedSealedAndSignedMessage;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.P2PService;
import io.bitsquare.p2p.TestUtils;
import io.bitsquare.p2p.mocks.MockMessage;
import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.peers.Broadcaster;
import io.bitsquare.p2p.peers.PeerManager;
import io.bitsquare.p2p.storage.messages.MailboxMessage;
import io.bitsquare.p2p.storage.data.ProtectedData;
import io.bitsquare.p2p.storage.data.RefreshTTLBundle;
import io.bitsquare.p2p.storage.mocks.MockData;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.junit.*;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.*;
import java.security.cert.CertificateException;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
//TODO P2P network tests are outdated
@Ignore
public class ProtectedDataStorageTest {
private static final Logger log = LoggerFactory.getLogger(ProtectedDataStorageTest.class);
@ -58,18 +58,16 @@ public class ProtectedDataStorageTest {
dir2.mkdir();
UserThread.setExecutor(Executors.newSingleThreadExecutor());
P2PDataStorage.CHECK_TTL_INTERVAL_SEC = (int) TimeUnit.MINUTES.toMillis(10);
P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS = 300;
keyRing1 = new KeyRing(new KeyStorage(dir1));
storageSignatureKeyPair1 = keyRing1.getSignatureKeyPair();
encryptionService1 = new EncryptionService(keyRing1);
networkNode1 = TestUtils.getAndStartSeedNode(8001, useClearNet, seedNodes).getSeedNodeP2PService().getNetworkNode();
peerManager1 = new PeerManager(networkNode1, null, new File("dummy"));
//TODO
Broadcaster broadcaster = new Broadcaster(networkNode1);
dataStorage1 = new P2PDataStorage(broadcaster, networkNode1, new File("dummy"));
P2PService p2PService = TestUtils.getAndStartSeedNode(8001, useClearNet, seedNodes).getSeedNodeP2PService();
networkNode1 = p2PService.getNetworkNode();
peerManager1 = p2PService.getPeerManager();
dataStorage1 = p2PService.getP2PDataStorage();
// for mailbox
keyRing2 = new KeyRing(new KeyStorage(dir2));
@ -91,11 +89,18 @@ public class ProtectedDataStorageTest {
networkNode1.shutDown(() -> shutDownLatch.countDown());
shutDownLatch.await();
}
Path path = Paths.get(TestUtils.test_dummy_dir);
File dir = path.toFile();
Utilities.deleteDirectory(dir);
Utilities.deleteDirectory(dir1);
Utilities.deleteDirectory(dir2);
}
@Test
//@Test
public void testAddAndRemove() throws InterruptedException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException, NoSuchProviderException {
ProtectedData data = dataStorage1.getDataWithSignedSeqNr(mockData, storageSignatureKeyPair1);
ProtectedData data = dataStorage1.getProtectedData(mockData, storageSignatureKeyPair1);
Assert.assertTrue(dataStorage1.add(data, null));
Assert.assertEquals(1, dataStorage1.getMap().size());
@ -107,256 +112,84 @@ public class ProtectedDataStorageTest {
Assert.assertEquals(0, dataStorage1.getMap().size());
}
@Test
public void testExpirableData() throws InterruptedException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException, NoSuchProviderException {
P2PDataStorage.CHECK_TTL_INTERVAL_SEC = 10;
// CHECK_TTL_INTERVAL is used in constructor of ProtectedExpirableDataStorage so we recreate it here
//TODO
Broadcaster broadcaster = new Broadcaster(networkNode1);
dataStorage1 = new P2PDataStorage(broadcaster, networkNode1, new File("dummy"));
mockData.ttl = 50;
ProtectedData data = dataStorage1.getDataWithSignedSeqNr(mockData, storageSignatureKeyPair1);
// @Test
public void testTTL() throws InterruptedException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException, NoSuchProviderException {
mockData.ttl = (int) (P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS * 1.5);
ProtectedData data = dataStorage1.getProtectedData(mockData, storageSignatureKeyPair1);
log.debug("data.date " + data.date);
log.debug("data.date " + data.date.getTime());
Assert.assertTrue(dataStorage1.add(data, null));
Thread.sleep(5);
Assert.assertEquals(1, dataStorage1.getMap().size());
// still there
Thread.sleep(20);
log.debug("test 1");
Assert.assertEquals(1, dataStorage1.getMap().size());
Thread.sleep(40);
// now should be removed
Assert.assertEquals(0, dataStorage1.getMap().size());
// add with date in future
data = dataStorage1.getDataWithSignedSeqNr(mockData, storageSignatureKeyPair1);
int newSequenceNumber = data.sequenceNumber + 1;
byte[] hashOfDataAndSeqNr = Hash.getHash(new P2PDataStorage.DataAndSeqNrPair(data.expirableMessage, newSequenceNumber));
byte[] signature = Sig.sign(storageSignatureKeyPair1.getPrivate(), hashOfDataAndSeqNr);
ProtectedData dataWithFutureDate = new ProtectedData(data.expirableMessage, data.ttl, data.ownerPubKey, newSequenceNumber, signature);
dataWithFutureDate.date = new Date(new Date().getTime() + 60 * 60 * sleepTime);
// force serialisation (date check is done in readObject)
ProtectedData newData = Utilities.deserialize(Utilities.serialize(dataWithFutureDate));
Assert.assertTrue(dataStorage1.add(newData, null));
Thread.sleep(5);
Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS);
log.debug("test 2");
Assert.assertEquals(1, dataStorage1.getMap().size());
Thread.sleep(50);
Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS * 2);
log.debug("test 3 removed");
Assert.assertEquals(0, dataStorage1.getMap().size());
}
@Test
public void testMultiAddRemoveProtectedData() throws InterruptedException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException, NoSuchProviderException {
MockData mockData = new MockData("msg1", keyRing1.getSignatureKeyPair().getPublic());
ProtectedData data = dataStorage1.getDataWithSignedSeqNr(mockData, storageSignatureKeyPair1);
Assert.assertTrue(dataStorage1.add(data, null));
// remove with not updated seq nr -> failure
int newSequenceNumber = 0;
byte[] hashOfDataAndSeqNr = Hash.getHash(new P2PDataStorage.DataAndSeqNrPair(data.expirableMessage, newSequenceNumber));
byte[] signature = Sig.sign(storageSignatureKeyPair1.getPrivate(), hashOfDataAndSeqNr);
ProtectedData dataToRemove = new ProtectedData(data.expirableMessage, data.ttl, data.ownerPubKey, newSequenceNumber, signature);
Assert.assertFalse(dataStorage1.remove(dataToRemove, null));
// remove with too high updated seq nr -> ok
newSequenceNumber = 2;
hashOfDataAndSeqNr = Hash.getHash(new P2PDataStorage.DataAndSeqNrPair(data.expirableMessage, newSequenceNumber));
signature = Sig.sign(storageSignatureKeyPair1.getPrivate(), hashOfDataAndSeqNr);
dataToRemove = new ProtectedData(data.expirableMessage, data.ttl, data.ownerPubKey, newSequenceNumber, signature);
Assert.assertTrue(dataStorage1.remove(dataToRemove, null));
// add to empty map, any seq nr. -> ok
newSequenceNumber = 2;
hashOfDataAndSeqNr = Hash.getHash(new P2PDataStorage.DataAndSeqNrPair(data.expirableMessage, newSequenceNumber));
signature = Sig.sign(storageSignatureKeyPair1.getPrivate(), hashOfDataAndSeqNr);
ProtectedData dataToAdd = new ProtectedData(data.expirableMessage, data.ttl, data.ownerPubKey, newSequenceNumber, signature);
Assert.assertTrue(dataStorage1.add(dataToAdd, null));
// add with updated seq nr below previous -> failure
newSequenceNumber = 1;
hashOfDataAndSeqNr = Hash.getHash(new P2PDataStorage.DataAndSeqNrPair(data.expirableMessage, newSequenceNumber));
signature = Sig.sign(storageSignatureKeyPair1.getPrivate(), hashOfDataAndSeqNr);
dataToAdd = new ProtectedData(data.expirableMessage, data.ttl, data.ownerPubKey, newSequenceNumber, signature);
Assert.assertFalse(dataStorage1.add(dataToAdd, null));
// add with updated seq nr over previous -> ok
newSequenceNumber = 3;
hashOfDataAndSeqNr = Hash.getHash(new P2PDataStorage.DataAndSeqNrPair(data.expirableMessage, newSequenceNumber));
signature = Sig.sign(storageSignatureKeyPair1.getPrivate(), hashOfDataAndSeqNr);
dataToAdd = new ProtectedData(data.expirableMessage, data.ttl, data.ownerPubKey, newSequenceNumber, signature);
Assert.assertTrue(dataStorage1.add(dataToAdd, null));
// add with same seq nr -> failure
newSequenceNumber = 3;
hashOfDataAndSeqNr = Hash.getHash(new P2PDataStorage.DataAndSeqNrPair(data.expirableMessage, newSequenceNumber));
signature = Sig.sign(storageSignatureKeyPair1.getPrivate(), hashOfDataAndSeqNr);
dataToAdd = new ProtectedData(data.expirableMessage, data.ttl, data.ownerPubKey, newSequenceNumber, signature);
Assert.assertFalse(dataStorage1.add(dataToAdd, null));
// add with same data but higher seq nr. -> ok, ignore
newSequenceNumber = 4;
hashOfDataAndSeqNr = Hash.getHash(new P2PDataStorage.DataAndSeqNrPair(data.expirableMessage, newSequenceNumber));
signature = Sig.sign(storageSignatureKeyPair1.getPrivate(), hashOfDataAndSeqNr);
dataToAdd = new ProtectedData(data.expirableMessage, data.ttl, data.ownerPubKey, newSequenceNumber, signature);
Assert.assertTrue(dataStorage1.add(dataToAdd, null));
// remove with with same seq nr as prev. ignored -> failed
newSequenceNumber = 4;
hashOfDataAndSeqNr = Hash.getHash(new P2PDataStorage.DataAndSeqNrPair(data.expirableMessage, newSequenceNumber));
signature = Sig.sign(storageSignatureKeyPair1.getPrivate(), hashOfDataAndSeqNr);
dataToRemove = new ProtectedData(data.expirableMessage, data.ttl, data.ownerPubKey, newSequenceNumber, signature);
Assert.assertFalse(dataStorage1.remove(dataToRemove, null));
// remove with with higher seq nr -> ok
newSequenceNumber = 5;
hashOfDataAndSeqNr = Hash.getHash(new P2PDataStorage.DataAndSeqNrPair(data.expirableMessage, newSequenceNumber));
signature = Sig.sign(storageSignatureKeyPair1.getPrivate(), hashOfDataAndSeqNr);
dataToRemove = new ProtectedData(data.expirableMessage, data.ttl, data.ownerPubKey, newSequenceNumber, signature);
Assert.assertTrue(dataStorage1.remove(dataToRemove, null));
}
@Test
public void testAddAndRemoveMailboxData() throws InterruptedException, NoSuchAlgorithmException, CertificateException,
KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException, NoSuchProviderException {
// sender
MockMessage mockMessage = new MockMessage("MockMessage");
PrefixedSealedAndSignedMessage prefixedSealedAndSignedMessage = new PrefixedSealedAndSignedMessage(networkNode1.getNodeAddress(),
encryptionService1.encryptAndSign(keyRing1.getPubKeyRing(), mockMessage),
Hash.getHash("aa"));
MailboxMessage expirableMailboxPayload = new MailboxMessage(prefixedSealedAndSignedMessage,
keyRing1.getSignatureKeyPair().getPublic(),
keyRing2.getSignatureKeyPair().getPublic());
ProtectedMailboxData data = dataStorage1.getMailboxDataWithSignedSeqNr(expirableMailboxPayload, storageSignatureKeyPair1, storageSignatureKeyPair2.getPublic());
Assert.assertTrue(dataStorage1.add(data, null));
Thread.sleep(sleepTime);
Assert.assertEquals(1, dataStorage1.getMap().size());
// receiver (storageSignatureKeyPair2)
int newSequenceNumber = data.sequenceNumber + 1;
byte[] hashOfDataAndSeqNr = Hash.getHash(new P2PDataStorage.DataAndSeqNrPair(data.expirableMessage, newSequenceNumber));
byte[] signature;
ProtectedMailboxData dataToRemove;
// wrong sig -> fail
signature = Sig.sign(storageSignatureKeyPair1.getPrivate(), hashOfDataAndSeqNr);
dataToRemove = new ProtectedMailboxData(expirableMailboxPayload, data.ttl, storageSignatureKeyPair2.getPublic(), newSequenceNumber, signature, storageSignatureKeyPair2.getPublic());
Assert.assertFalse(dataStorage1.removeMailboxData(dataToRemove, null));
// wrong seq nr
signature = Sig.sign(storageSignatureKeyPair2.getPrivate(), hashOfDataAndSeqNr);
dataToRemove = new ProtectedMailboxData(expirableMailboxPayload, data.ttl, storageSignatureKeyPair2.getPublic(), data.sequenceNumber, signature, storageSignatureKeyPair2.getPublic());
Assert.assertFalse(dataStorage1.removeMailboxData(dataToRemove, null));
// wrong signingKey
signature = Sig.sign(storageSignatureKeyPair2.getPrivate(), hashOfDataAndSeqNr);
dataToRemove = new ProtectedMailboxData(expirableMailboxPayload, data.ttl, data.ownerPubKey, newSequenceNumber, signature, storageSignatureKeyPair2.getPublic());
Assert.assertFalse(dataStorage1.removeMailboxData(dataToRemove, null));
// wrong peerPubKey
signature = Sig.sign(storageSignatureKeyPair2.getPrivate(), hashOfDataAndSeqNr);
dataToRemove = new ProtectedMailboxData(expirableMailboxPayload, data.ttl, storageSignatureKeyPair2.getPublic(), newSequenceNumber, signature, storageSignatureKeyPair1.getPublic());
Assert.assertFalse(dataStorage1.removeMailboxData(dataToRemove, null));
// receiver can remove it (storageSignatureKeyPair2) -> all ok
Assert.assertEquals(1, dataStorage1.getMap().size());
signature = Sig.sign(storageSignatureKeyPair2.getPrivate(), hashOfDataAndSeqNr);
dataToRemove = new ProtectedMailboxData(expirableMailboxPayload, data.ttl, storageSignatureKeyPair2.getPublic(), newSequenceNumber, signature, storageSignatureKeyPair2.getPublic());
Assert.assertTrue(dataStorage1.removeMailboxData(dataToRemove, null));
Assert.assertEquals(0, dataStorage1.getMap().size());
}
/*@Test
public void testTryToHack() throws InterruptedException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException {
ProtectedData data = dataStorage1.getDataWithSignedSeqNr(mockData, storageSignatureKeyPair1);
Assert.assertTrue(dataStorage1.add(data, null));
Thread.sleep(sleepTime);
Assert.assertEquals(1, dataStorage1.getMap().size());
Assert.assertEquals(1, dataStorage2.getMap().size());
// hackers key pair is storageSignatureKeyPair2
// change seq nr. and signature: fails on both own and peers dataStorage
int newSequenceNumber = data.sequenceNumber + 1;
byte[] hashOfDataAndSeqNr = cryptoService2.getHash(new P2PDataStorage.DataAndSeqNrPair(data.expirablePayload, newSequenceNumber));
byte[] signature = cryptoService2.signStorageData(storageSignatureKeyPair2.getPrivate(), hashOfDataAndSeqNr);
ProtectedData dataToAdd = new ProtectedData(data.expirablePayload, data.ttl, data.ownerStoragePubKey, newSequenceNumber, signature);
Assert.assertFalse(dataStorage1.add(dataToAdd, null));
Assert.assertFalse(dataStorage2.add(dataToAdd, null));
// change seq nr. and signature and data pub key. fails on peers dataStorage, succeeds on own dataStorage
newSequenceNumber = data.sequenceNumber + 2;
hashOfDataAndSeqNr = cryptoService2.getHash(new P2PDataStorage.DataAndSeqNrPair(data.expirablePayload, newSequenceNumber));
signature = cryptoService2.signStorageData(storageSignatureKeyPair2.getPrivate(), hashOfDataAndSeqNr);
dataToAdd = new ProtectedData(data.expirablePayload, data.ttl, storageSignatureKeyPair2.getPublic(), newSequenceNumber, signature);
Assert.assertTrue(dataStorage2.add(dataToAdd, null));
Assert.assertFalse(dataStorage1.add(dataToAdd, null));
Thread.sleep(sleepTime);
Assert.assertEquals(1, dataStorage2.getMap().size());
Thread.sleep(sleepTime);
Assert.assertEquals(1, dataStorage1.getMap().size());
Assert.assertEquals(data, dataStorage1.getMap().values().stream().findFirst().get());
Assert.assertEquals(dataToAdd, dataStorage2.getMap().values().stream().findFirst().get());
Assert.assertNotEquals(data, dataToAdd);
newSequenceNumber = data.sequenceNumber + 3;
hashOfDataAndSeqNr = cryptoService1.getHash(new P2PDataStorage.DataAndSeqNrPair(data.expirablePayload, newSequenceNumber));
signature = cryptoService1.signStorageData(storageSignatureKeyPair1.getPrivate(), hashOfDataAndSeqNr);
ProtectedData dataToRemove = new ProtectedData(data.expirablePayload, data.ttl, data.ownerStoragePubKey, newSequenceNumber, signature);
Assert.assertTrue(dataStorage1.remove(dataToRemove, null));
Assert.assertEquals(0, dataStorage1.getMap().size());
}*/
/* //@Test
public void testTryToHackMailboxData() throws InterruptedException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException {
MockMessage mockMessage = new MockMessage("MockMessage");
SealedAndSignedMessage sealedAndSignedMessage = cryptoService1.encryptAndSignMessage(keyRing1.getPubKeyRing(), mockMessage);
ExpirableMailboxPayload expirableMailboxPayload = new ExpirableMailboxPayload(sealedAndSignedMessage,
keyRing1.getStorageSignatureKeyPair().getPublic(),
keyRing2.getStorageSignatureKeyPair().getPublic());
// sender
ProtectedMailboxData data = dataStorage1.getMailboxDataWithSignedSeqNr(expirableMailboxPayload, storageSignatureKeyPair1, storageSignatureKeyPair2.getPublic());
public void testRePublish() throws InterruptedException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException, NoSuchProviderException {
mockData.ttl = (int) (P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS * 1.5);
ProtectedData data = dataStorage1.getDataWithSignedSeqNr(mockData, storageSignatureKeyPair1);
Assert.assertTrue(dataStorage1.add(data, null));
Thread.sleep(sleepTime);
Assert.assertEquals(1, dataStorage1.getMap().size());
Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS);
log.debug("test 1");
Assert.assertEquals(1, dataStorage1.getMap().size());
// receiver (storageSignatureKeyPair2)
int newSequenceNumber = data.sequenceNumber + 1;
byte[] hashOfDataAndSeqNr = cryptoService2.getHash(new P2PDataStorage.DataAndSeqNrPair(expirableMailboxPayload, newSequenceNumber));
byte[] signature;
ProtectedMailboxData dataToRemove;
// wrong sig -> fail
signature = cryptoService2.signStorageData(storageSignatureKeyPair1.getPrivate(), hashOfDataAndSeqNr);
dataToRemove = new ProtectedMailboxData(expirableMailboxPayload, data.ttl, storageSignatureKeyPair2.getPublic(), newSequenceNumber, signature, storageSignatureKeyPair2.getPublic());
Assert.assertFalse(dataStorage1.removeMailboxData(dataToRemove, null));
// wrong seq nr
signature = cryptoService2.signStorageData(storageSignatureKeyPair2.getPrivate(), hashOfDataAndSeqNr);
dataToRemove = new ProtectedMailboxData(expirableMailboxPayload, data.ttl, storageSignatureKeyPair2.getPublic(), data.sequenceNumber, signature, storageSignatureKeyPair2.getPublic());
Assert.assertFalse(dataStorage1.removeMailboxData(dataToRemove, null));
// wrong signingKey
signature = cryptoService2.signStorageData(storageSignatureKeyPair2.getPrivate(), hashOfDataAndSeqNr);
dataToRemove = new ProtectedMailboxData(expirableMailboxPayload, data.ttl, data.ownerStoragePubKey, newSequenceNumber, signature, storageSignatureKeyPair2.getPublic());
Assert.assertFalse(dataStorage1.removeMailboxData(dataToRemove, null));
// wrong peerPubKey
signature = cryptoService2.signStorageData(storageSignatureKeyPair2.getPrivate(), hashOfDataAndSeqNr);
dataToRemove = new ProtectedMailboxData(expirableMailboxPayload, data.ttl, storageSignatureKeyPair2.getPublic(), newSequenceNumber, signature, storageSignatureKeyPair1.getPublic());
Assert.assertFalse(dataStorage1.removeMailboxData(dataToRemove, null));
// all ok
data = dataStorage1.getDataWithSignedSeqNr(mockData, storageSignatureKeyPair1);
Assert.assertTrue(dataStorage1.rePublish(data, null));
Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS);
log.debug("test 2");
Assert.assertEquals(1, dataStorage1.getMap().size());
signature = cryptoService2.signStorageData(storageSignatureKeyPair2.getPrivate(), hashOfDataAndSeqNr);
dataToRemove = new ProtectedMailboxData(expirableMailboxPayload, data.ttl, storageSignatureKeyPair2.getPublic(), newSequenceNumber, signature, storageSignatureKeyPair2.getPublic());
Assert.assertTrue(dataStorage1.removeMailboxData(dataToRemove, null));
data = dataStorage1.getDataWithSignedSeqNr(mockData, storageSignatureKeyPair1);
Assert.assertTrue(dataStorage1.rePublish(data, null));
Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS);
log.debug("test 3");
Assert.assertEquals(1, dataStorage1.getMap().size());
Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS);
log.debug("test 4");
Assert.assertEquals(1, dataStorage1.getMap().size());
Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS * 2);
log.debug("test 5 removed");
Assert.assertEquals(0, dataStorage1.getMap().size());
}
*/
@Test
public void testRePublish() throws InterruptedException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException, NoSuchProviderException {
mockData.ttl = (int) (P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS * 1.5);
ProtectedData data = dataStorage1.getProtectedData(mockData, storageSignatureKeyPair1);
Assert.assertTrue(dataStorage1.add(data, null));
Assert.assertEquals(1, dataStorage1.getMap().size());
Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS);
log.debug("test 1");
Assert.assertEquals(1, dataStorage1.getMap().size());
RefreshTTLBundle refreshTTLBundle = dataStorage1.getRefreshTTLPackage(mockData, storageSignatureKeyPair1);
Assert.assertTrue(dataStorage1.refreshTTL(refreshTTLBundle, null));
Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS);
log.debug("test 2");
Assert.assertEquals(1, dataStorage1.getMap().size());
refreshTTLBundle = dataStorage1.getRefreshTTLPackage(mockData, storageSignatureKeyPair1);
Assert.assertTrue(dataStorage1.refreshTTL(refreshTTLBundle, null));
Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS);
log.debug("test 3");
Assert.assertEquals(1, dataStorage1.getMap().size());
Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS);
log.debug("test 4");
Assert.assertEquals(1, dataStorage1.getMap().size());
Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS * 2);
log.debug("test 5 removed");
Assert.assertEquals(0, dataStorage1.getMap().size());
}
*/
}