Add isOwner flag for broadcasting to all peers, remove popup for disconnects of rule violations, reduce max nr of network threads

This commit is contained in:
Manfred Karrer 2016-02-26 14:10:22 +01:00
parent 60c8de796a
commit aecbf7ced9
19 changed files with 130 additions and 153 deletions

View file

@ -556,11 +556,11 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
public void onBroadcastFailed(String errorMessage) {
}
};
boolean result = p2PDataStorage.add(protectedMailboxStorageEntry, networkNode.getNodeAddress(), listener);
boolean result = p2PDataStorage.add(protectedMailboxStorageEntry, networkNode.getNodeAddress(), listener, true, true);
if (!result) {
//TODO remove and add again with a delay to ensure the data will be broadcasted
sendMailboxMessageListener.onFault("Data already exists in our local database");
boolean removeResult = p2PDataStorage.remove(protectedMailboxStorageEntry, networkNode.getNodeAddress());
boolean removeResult = p2PDataStorage.remove(protectedMailboxStorageEntry, networkNode.getNodeAddress(), true);
log.debug("remove result=" + removeResult);
}
} catch (CryptoException e) {
@ -593,7 +593,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
expirableMailboxStoragePayload,
optionalKeyRing.get().getSignatureKeyPair(),
receiversPubKey);
p2PDataStorage.removeMailboxData(protectedMailboxStorageEntry, networkNode.getNodeAddress());
p2PDataStorage.removeMailboxData(protectedMailboxStorageEntry, networkNode.getNodeAddress(), true);
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
}
@ -616,17 +616,13 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
// Data storage
///////////////////////////////////////////////////////////////////////////////////////////
public boolean addData(StoragePayload storagePayload) {
return addData(storagePayload, false);
}
public boolean addData(StoragePayload storagePayload, boolean forceBroadcast) {
public boolean addData(StoragePayload storagePayload, boolean forceBroadcast, boolean isDataOwner) {
Log.traceCall();
checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen.");
if (isBootstrapped()) {
try {
ProtectedStorageEntry protectedStorageEntry = p2PDataStorage.getProtectedData(storagePayload, optionalKeyRing.get().getSignatureKeyPair());
return p2PDataStorage.add(protectedStorageEntry, networkNode.getNodeAddress(), forceBroadcast);
return p2PDataStorage.add(protectedStorageEntry, networkNode.getNodeAddress(), null, forceBroadcast, isDataOwner);
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
return false;
@ -636,13 +632,13 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
}
}
public boolean refreshTTL(StoragePayload storagePayload) {
public boolean refreshTTL(StoragePayload storagePayload, boolean isDataOwner) {
Log.traceCall();
checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen.");
if (isBootstrapped()) {
try {
RefreshTTLMessage refreshTTLMessage = p2PDataStorage.getRefreshTTLMessage(storagePayload, optionalKeyRing.get().getSignatureKeyPair());
return p2PDataStorage.refreshTTL(refreshTTLMessage, networkNode.getNodeAddress());
return p2PDataStorage.refreshTTL(refreshTTLMessage, networkNode.getNodeAddress(), isDataOwner);
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
return false;
@ -652,13 +648,13 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
}
}
public boolean removeData(StoragePayload storagePayload) {
public boolean removeData(StoragePayload storagePayload, boolean isDataOwner) {
Log.traceCall();
checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen.");
if (isBootstrapped()) {
try {
ProtectedStorageEntry protectedStorageEntry = p2PDataStorage.getProtectedData(storagePayload, optionalKeyRing.get().getSignatureKeyPair());
return p2PDataStorage.remove(protectedStorageEntry, networkNode.getNodeAddress());
return p2PDataStorage.remove(protectedStorageEntry, networkNode.getNodeAddress(), isDataOwner);
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
return false;

View file

@ -60,8 +60,8 @@ public class Connection implements MessageListener {
private static final int MAX_MSG_SIZE = 100 * 1024; // 100 kb of compressed data
//TODO decrease limits again after testing
private static final int MSG_THROTTLE_PER_SEC = 100; // With MAX_MSG_SIZE of 100kb results in bandwidth of 100 mbit/sec
private static final int MSG_THROTTLE_PER_10_SEC = 1000; // With MAX_MSG_SIZE of 100kb results in bandwidth of 1000 mbit/sec for 10 sec
private static final int MSG_THROTTLE_PER_SEC = 10; // With MAX_MSG_SIZE of 100kb results in bandwidth of 10 mbit/sec
private static final int MSG_THROTTLE_PER_10_SEC = 100; // With MAX_MSG_SIZE of 100kb results in bandwidth of 100 mbit/sec for 10 sec
private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(60);
public static int getMaxMsgSize() {
@ -235,8 +235,6 @@ public class Connection implements MessageListener {
if (violated) {
log.error("violatesThrottleLimit 1 ");
log.error("elapsed " + (now - compareValue));
log.error("now " + now);
log.error("compareValue " + compareValue);
log.error("messageTimeStamps: \n\t" + messageTimeStamps.stream()
.map(e -> "\n\tts=" + e.first.toString() + " message=" + e.second.toString())
.collect(Collectors.toList()).toString());
@ -252,8 +250,10 @@ public class Connection implements MessageListener {
if (violated) {
log.error("violatesThrottleLimit 2 ");
log.error("compareValue " + compareValue);
log.error("messageTimeStamps: \n\t" + messageTimeStamps.stream().map(e -> e.second.toString() + "\n\t").toString());
log.error("elapsed " + (now - compareValue));
log.error("messageTimeStamps: \n\t" + messageTimeStamps.stream()
.map(e -> "\n\tts=" + e.first.toString() + " message=" + e.second.toString())
.collect(Collectors.toList()).toString());
}
}
// we limit to max 50 (MSG_THROTTLE_PER_10SEC) entries

View file

@ -334,7 +334,7 @@ public abstract class NetworkNode implements MessageListener {
///////////////////////////////////////////////////////////////////////////////////////////
void createExecutorService() {
executorService = Utilities.getListeningExecutorService("NetworkNode-" + servicePort, 50, 100, 2 * 60);
executorService = Utilities.getListeningExecutorService("NetworkNode-" + servicePort, 15, 30, 60);
}
void startServer(ServerSocket serverSocket) {

View file

@ -16,9 +16,7 @@ import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -29,13 +27,7 @@ public class BroadcastHandler implements PeerManager.Listener {
private static final Logger log = LoggerFactory.getLogger(BroadcastHandler.class);
private static final long TIMEOUT_PER_PEER_SEC = Timer.STRESS_TEST ? 5 : 30;
private static final long DELAY_MS = Timer.STRESS_TEST ? 1000 : 2000;
private static boolean USE_DELAY;
public static void useDelay(boolean useDelay) {
USE_DELAY = useDelay;
}
private static final long DELAY_MS = Timer.STRESS_TEST ? 100 : 500;
interface ResultHandler {
void onCompleted(BroadcastHandler broadcastHandler);
@ -100,7 +92,7 @@ public class BroadcastHandler implements PeerManager.Listener {
///////////////////////////////////////////////////////////////////////////////////////////
public void broadcast(BroadcastMessage message, @Nullable NodeAddress sender, ResultHandler resultHandler,
@Nullable Listener listener) {
@Nullable Listener listener, boolean isDataOwner) {
this.message = message;
this.resultHandler = resultHandler;
this.listener = listener;
@ -112,9 +104,23 @@ public class BroadcastHandler implements PeerManager.Listener {
.filter(connection -> !connection.getPeersNodeAddressOptional().get().equals(sender))
.collect(Collectors.toSet());
if (!receivers.isEmpty()) {
numOfPeers = receivers.size();
numOfCompletedBroadcasts = 0;
log.info("Broadcast message to {} peers.", numOfPeers);
if (isDataOwner) {
// the data owner sends to all and immediately
receivers.stream().forEach(connection -> sendToPeer(connection, message));
numOfPeers = receivers.size();
log.info("Broadcast message to {} peers.", numOfPeers);
} else {
// for relay nodes we limit to 2 recipients and use a delay
List<Connection> list = new ArrayList<>(receivers);
Collections.shuffle(list);
list = list.subList(0, Math.min(2, list.size()));
numOfPeers = list.size();
log.info("Broadcast message to {} peers.", numOfPeers);
list.stream().forEach(connection -> UserThread.runAfterRandomDelay(() ->
sendToPeer(connection, message), DELAY_MS, DELAY_MS * 2, TimeUnit.MILLISECONDS));
}
long timeoutDelay = TIMEOUT_PER_PEER_SEC * receivers.size();
timeoutTimer = UserThread.runAfter(() -> {
@ -129,13 +135,6 @@ public class BroadcastHandler implements PeerManager.Listener {
"broadcastQueue=" + broadcastQueue);
onFault(errorMessage);
}, timeoutDelay);
if (USE_DELAY) {
receivers.stream().forEach(connection -> UserThread.runAfterRandomDelay(() ->
sendToPeer(connection, message), DELAY_MS, DELAY_MS * 2, TimeUnit.MILLISECONDS));
} else {
receivers.stream().forEach(connection -> sendToPeer(connection, message));
}
} else {
onFault("Message not broadcasted because we have no available peers yet.\n\t" +
"message = " + StringUtils.abbreviate(message.toString(), 100), false);

View file

@ -39,12 +39,13 @@ public class Broadcaster implements BroadcastHandler.ResultHandler {
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void broadcast(BroadcastMessage message, @Nullable NodeAddress sender, @Nullable BroadcastHandler.Listener listener) {
public void broadcast(BroadcastMessage message, @Nullable NodeAddress sender,
@Nullable BroadcastHandler.Listener listener, boolean isDataOwner) {
Log.traceCall("Sender=" + sender + "\n\t" +
"Message=" + StringUtils.abbreviate(message.toString(), 100));
BroadcastHandler broadcastHandler = new BroadcastHandler(networkNode, peerManager);
broadcastHandler.broadcast(message, sender, this, listener);
broadcastHandler.broadcast(message, sender, this, listener, isDataOwner);
broadcastHandlers.add(broadcastHandler);
}

View file

@ -41,13 +41,13 @@ public class PeerManager implements ConnectionListener {
public static void setMaxConnections(int maxConnections) {
MAX_CONNECTIONS = maxConnections;
MIN_CONNECTIONS = Math.max(1, maxConnections - 4);
MAX_CONNECTIONS_PEER = MAX_CONNECTIONS + 5;
MAX_CONNECTIONS_NON_DIRECT = MAX_CONNECTIONS + 10;
MAX_CONNECTIONS_ABSOLUTE = MAX_CONNECTIONS + 30;
MAX_CONNECTIONS_PEER = MAX_CONNECTIONS + 4;
MAX_CONNECTIONS_NON_DIRECT = MAX_CONNECTIONS + 8;
MAX_CONNECTIONS_ABSOLUTE = MAX_CONNECTIONS + 18;
}
static {
setMaxConnections(12);
setMaxConnections(10);
}
private static final int MAX_REPORTED_PEERS = 1000;

View file

@ -9,7 +9,6 @@ import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.network.CloseConnectionReason;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.peers.PeerManager;
import io.bitsquare.p2p.peers.getdata.messages.GetDataRequest;
import io.bitsquare.p2p.peers.getdata.messages.GetDataResponse;
import io.bitsquare.p2p.storage.P2PDataStorage;
@ -20,8 +19,6 @@ import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
public class GetDataRequestHandler {
private static final Logger log = LoggerFactory.getLogger(GetDataRequestHandler.class);
@ -44,19 +41,18 @@ public class GetDataRequestHandler {
///////////////////////////////////////////////////////////////////////////////////////////
private final NetworkNode networkNode;
private final PeerManager peerManager;
private P2PDataStorage dataStorage;
private final Listener listener;
private Timer timeoutTimer;
private boolean stopped;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
public GetDataRequestHandler(NetworkNode networkNode, PeerManager peerManager, P2PDataStorage dataStorage, Listener listener) {
public GetDataRequestHandler(NetworkNode networkNode, P2PDataStorage dataStorage, Listener listener) {
this.networkNode = networkNode;
this.peerManager = peerManager;
this.dataStorage = dataStorage;
this.listener = listener;
}
@ -89,28 +85,32 @@ public class GetDataRequestHandler {
}
});
checkArgument(timeoutTimer == null, "requestData must not be called twice.");
timeoutTimer = UserThread.runAfter(() -> {
String errorMessage = "A timeout occurred for getDataResponse:" + getDataResponse +
" on connection:" + connection;
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, connection);
},
TIME_OUT_SEC, TimeUnit.SECONDS);
if (timeoutTimer == null) {
timeoutTimer = UserThread.runAfter(() -> {
String errorMessage = "A timeout occurred for getDataResponse:" + getDataResponse +
" on connection:" + connection;
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, connection);
},
TIME_OUT_SEC, TimeUnit.SECONDS);
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private void handleFault(String errorMessage, CloseConnectionReason closeConnectionReason, Connection connection) {
log.info(errorMessage);
//peerManager.shutDownConnection(connection, closeConnectionReason);
cleanup();
listener.onFault(errorMessage, connection);
if (!stopped) {
log.info(errorMessage + "\n\tcloseConnectionReason=" + closeConnectionReason);
cleanup();
listener.onFault(errorMessage, connection);
} else {
log.warn("We have already stopped (handleFault)");
}
}
private void cleanup() {
stopped = true;
if (timeoutTimer != null) {
timeoutTimer.stop();
timeoutTimer = null;

View file

@ -157,7 +157,7 @@ public class RequestDataHandler implements MessageListener {
"at that moment");
((GetDataResponse) message).dataSet.stream()
.forEach(protectedData -> dataStorage.add(protectedData,
connection.getPeersNodeAddressOptional().get()));
connection.getPeersNodeAddressOptional().get(), null, false, false));
cleanup();
listener.onComplete();

View file

@ -53,6 +53,7 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
private final Listener listener;
private final Map<NodeAddress, RequestDataHandler> handlerMap = new HashMap<>();
private Map<String, GetDataRequestHandler> getDataRequestHandlers = new HashMap<>();
private Optional<NodeAddress> nodeAddressOfPreliminaryDataRequest = Optional.empty();
private Timer retryTimer;
private boolean dataUpdateRequested;
@ -178,25 +179,33 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
if (peerManager.isSeedNode(connection))
connection.setPeerType(Connection.PeerType.SEED_NODE);
GetDataRequestHandler getDataRequestHandler = new GetDataRequestHandler(networkNode, peerManager, dataStorage,
new GetDataRequestHandler.Listener() {
@Override
public void onComplete() {
log.trace("requestDataHandshake completed.\n\tConnection={}", connection);
}
@Override
public void onFault(String errorMessage, @Nullable Connection connection) {
if (!stopped) {
log.trace("GetDataRequestHandler failed.\n\tConnection={}\n\t" +
"ErrorMessage={}", connection, errorMessage);
peerManager.handleConnectionFault(connection);
} else {
log.warn("We have stopped already. We ignore that getDataRequestHandler.handle.onFault call.");
final String uid = connection.getUid();
if (!getDataRequestHandlers.containsKey(uid)) {
GetDataRequestHandler getDataRequestHandler = new GetDataRequestHandler(networkNode, dataStorage,
new GetDataRequestHandler.Listener() {
@Override
public void onComplete() {
getDataRequestHandlers.remove(uid);
log.trace("requestDataHandshake completed.\n\tConnection={}", connection);
}
}
});
getDataRequestHandler.handle((GetDataRequest) message, connection);
@Override
public void onFault(String errorMessage, @Nullable Connection connection) {
getDataRequestHandlers.remove(uid);
if (!stopped) {
log.trace("GetDataRequestHandler failed.\n\tConnection={}\n\t" +
"ErrorMessage={}", connection, errorMessage);
peerManager.handleConnectionFault(connection);
} else {
log.warn("We have stopped already. We ignore that getDataRequestHandler.handle.onFault call.");
}
}
});
getDataRequestHandlers.put(uid, getDataRequestHandler);
getDataRequestHandler.handle((GetDataRequest) message, connection);
} else {
log.warn("We have already a GetDataRequestHandler for that connection started");
}
} else {
log.warn("We have stopped already. We ignore that onMessage call.");
}

View file

@ -42,7 +42,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
private static final Logger log = LoggerFactory.getLogger(P2PDataStorage.class);
@VisibleForTesting
public static int CHECK_TTL_INTERVAL_SEC = Timer.STRESS_TEST ? 5 : 30;
public static int CHECK_TTL_INTERVAL_SEC = Timer.STRESS_TEST ? 5 : 60;
private final Broadcaster broadcaster;
private final Map<ByteArray, ProtectedStorageEntry> map = new ConcurrentHashMap<>();
@ -113,13 +113,13 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
Log.traceCall(StringUtils.abbreviate(message.toString(), 100) + "\n\tconnection=" + connection);
connection.getPeersNodeAddressOptional().ifPresent(peersNodeAddress -> {
if (message instanceof AddDataMessage) {
add(((AddDataMessage) message).protectedStorageEntry, peersNodeAddress);
add(((AddDataMessage) message).protectedStorageEntry, peersNodeAddress, null, false, false);
} else if (message instanceof RemoveDataMessage) {
remove(((RemoveDataMessage) message).protectedStorageEntry, peersNodeAddress);
remove(((RemoveDataMessage) message).protectedStorageEntry, peersNodeAddress, false);
} else if (message instanceof RemoveMailboxDataMessage) {
removeMailboxData(((RemoveMailboxDataMessage) message).protectedMailboxStorageEntry, peersNodeAddress);
removeMailboxData(((RemoveMailboxDataMessage) message).protectedMailboxStorageEntry, peersNodeAddress, false);
} else if (message instanceof RefreshTTLMessage) {
refreshTTL((RefreshTTLMessage) message, peersNodeAddress);
refreshTTL((RefreshTTLMessage) message, peersNodeAddress, false);
}
});
}
@ -171,19 +171,8 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
// API
///////////////////////////////////////////////////////////////////////////////////////////
public boolean add(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender) {
return add(protectedStorageEntry, sender, null, false);
}
public boolean add(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender, boolean forceBroadcast) {
return add(protectedStorageEntry, sender, null, forceBroadcast);
}
public boolean add(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender, @Nullable BroadcastHandler.Listener listener) {
return add(protectedStorageEntry, sender, listener, false);
}
public boolean add(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender, @Nullable BroadcastHandler.Listener listener, boolean forceBroadcast) {
public boolean add(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender,
@Nullable BroadcastHandler.Listener listener, boolean forceBroadcast, boolean isDataOwner) {
Log.traceCall();
ByteArray hashOfPayload = getHashAsByteArray(protectedStorageEntry.getStoragePayload());
@ -209,7 +198,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
log.info("Data set after doAdd: size=" + map.values().size());
if (!containsKey || forceBroadcast)
broadcast(new AddDataMessage(protectedStorageEntry), sender, listener);
broadcast(new AddDataMessage(protectedStorageEntry), sender, listener, isDataOwner);
else
log.trace("Not broadcasting data as we had it already in our map.");
@ -220,7 +209,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
return result;
}
public boolean refreshTTL(RefreshTTLMessage refreshTTLMessage, @Nullable NodeAddress sender) {
public boolean refreshTTL(RefreshTTLMessage refreshTTLMessage, @Nullable NodeAddress sender, boolean isDataOwner) {
Log.traceCall();
byte[] hashOfDataAndSeqNr = refreshTTLMessage.hashOfDataAndSeqNr;
@ -235,7 +224,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
log.trace("We got that message with that seq nr already from another peer. We ignore that message.");
return true;
} else {
PublicKey ownerPubKey = ((StoragePayload) storedData.getStoragePayload()).getOwnerPubKey();
PublicKey ownerPubKey = storedData.getStoragePayload().getOwnerPubKey();
boolean result = checkSignature(ownerPubKey, hashOfDataAndSeqNr, signature) &&
isSequenceNrValid(sequenceNumber, hashOfPayload) &&
checkIfStoredDataPubKeyMatchesNewDataPubKey(ownerPubKey, hashOfPayload);
@ -256,7 +245,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
log.trace(sb.toString());
log.info("Data set after refreshTTL: size=" + map.values().size());
broadcast(refreshTTLMessage, sender, null);
broadcast(refreshTTLMessage, sender, null, isDataOwner);
} else {
log.warn("Checks for refreshTTL failed");
}
@ -268,7 +257,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
}
}
public boolean remove(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender) {
public boolean remove(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender, boolean isDataOwner) {
Log.traceCall();
ByteArray hashOfPayload = getHashAsByteArray(protectedStorageEntry.getStoragePayload());
boolean containsKey = map.containsKey(hashOfPayload);
@ -284,7 +273,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
if (result) {
doRemoveProtectedExpirableData(protectedStorageEntry, hashOfPayload);
broadcast(new RemoveDataMessage(protectedStorageEntry), sender, null);
broadcast(new RemoveDataMessage(protectedStorageEntry), sender, null, isDataOwner);
sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.sequenceNumber, System.currentTimeMillis()));
storage.queueUpForSave(sequenceNumberMap, 100);
@ -294,7 +283,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
return result;
}
public boolean removeMailboxData(ProtectedMailboxStorageEntry protectedMailboxStorageEntry, @Nullable NodeAddress sender) {
public boolean removeMailboxData(ProtectedMailboxStorageEntry protectedMailboxStorageEntry, @Nullable NodeAddress sender, boolean isDataOwner) {
Log.traceCall();
ByteArray hashOfData = getHashAsByteArray(protectedMailboxStorageEntry.getStoragePayload());
boolean containsKey = map.containsKey(hashOfData);
@ -310,7 +299,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
if (result) {
doRemoveProtectedExpirableData(protectedMailboxStorageEntry, hashOfData);
broadcast(new RemoveMailboxDataMessage(protectedMailboxStorageEntry), sender, null);
broadcast(new RemoveMailboxDataMessage(protectedMailboxStorageEntry), sender, null, isDataOwner);
sequenceNumberMap.put(hashOfData, new MapValue(protectedMailboxStorageEntry.sequenceNumber, System.currentTimeMillis()));
storage.queueUpForSave(sequenceNumberMap, 100);
@ -473,8 +462,9 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
}
}
private void broadcast(BroadcastMessage message, @Nullable NodeAddress sender, @Nullable BroadcastHandler.Listener listener) {
broadcaster.broadcast(message, sender, listener);
private void broadcast(BroadcastMessage message, @Nullable NodeAddress sender,
@Nullable BroadcastHandler.Listener listener, boolean isDataOwner) {
broadcaster.broadcast(message, sender, listener, isDataOwner);
}
private ByteArray getHashAsByteArray(ExpirablePayload data) {

View file

@ -99,14 +99,14 @@ public class ProtectedDataStorageTest {
//@Test
public void testAddAndRemove() throws InterruptedException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException, NoSuchProviderException {
ProtectedStorageEntry data = dataStorage1.getProtectedData(mockData, storageSignatureKeyPair1);
Assert.assertTrue(dataStorage1.add(data, null));
Assert.assertTrue(dataStorage1.add(data, null, null, true, true));
Assert.assertEquals(1, dataStorage1.getMap().size());
int newSequenceNumber = data.sequenceNumber + 1;
byte[] hashOfDataAndSeqNr = Hash.getHash(new P2PDataStorage.DataAndSeqNrPair(data.getStoragePayload(), newSequenceNumber));
byte[] signature = Sig.sign(storageSignatureKeyPair1.getPrivate(), hashOfDataAndSeqNr);
ProtectedStorageEntry dataToRemove = new ProtectedStorageEntry(data.getStoragePayload(), data.ownerPubKey, newSequenceNumber, signature);
Assert.assertTrue(dataStorage1.remove(dataToRemove, null));
Assert.assertTrue(dataStorage1.remove(dataToRemove, null, true));
Assert.assertEquals(0, dataStorage1.getMap().size());
}
@ -115,7 +115,7 @@ public class ProtectedDataStorageTest {
mockData.ttl = (int) (P2PDataStorage.CHECK_TTL_INTERVAL_SEC * 1.5);
ProtectedStorageEntry data = dataStorage1.getProtectedData(mockData, storageSignatureKeyPair1);
log.debug("data.date " + data.timeStamp);
Assert.assertTrue(dataStorage1.add(data, null));
Assert.assertTrue(dataStorage1.add(data, null, null, true, true));
log.debug("test 1");
Assert.assertEquals(1, dataStorage1.getMap().size());
@ -163,20 +163,20 @@ public class ProtectedDataStorageTest {
public void testRefreshTTL() throws InterruptedException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException, NoSuchProviderException {
mockData.ttl = (int) (P2PDataStorage.CHECK_TTL_INTERVAL_SEC * 1.5);
ProtectedStorageEntry data = dataStorage1.getProtectedData(mockData, storageSignatureKeyPair1);
Assert.assertTrue(dataStorage1.add(data, null));
Assert.assertTrue(dataStorage1.add(data, null, null, true, true));
Assert.assertEquals(1, dataStorage1.getMap().size());
Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_SEC);
log.debug("test 1");
Assert.assertEquals(1, dataStorage1.getMap().size());
RefreshTTLMessage refreshTTLMessage = dataStorage1.getRefreshTTLMessage(mockData, storageSignatureKeyPair1);
Assert.assertTrue(dataStorage1.refreshTTL(refreshTTLMessage, null));
Assert.assertTrue(dataStorage1.refreshTTL(refreshTTLMessage, null, true));
Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_SEC);
log.debug("test 2");
Assert.assertEquals(1, dataStorage1.getMap().size());
refreshTTLMessage = dataStorage1.getRefreshTTLMessage(mockData, storageSignatureKeyPair1);
Assert.assertTrue(dataStorage1.refreshTTL(refreshTTLMessage, null));
Assert.assertTrue(dataStorage1.refreshTTL(refreshTTLMessage, null, true));
Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_SEC);
log.debug("test 3");
Assert.assertEquals(1, dataStorage1.getMap().size());