add local storage for seed map

This commit is contained in:
Manfred Karrer 2015-10-28 22:57:34 +01:00
parent 9ef8b42509
commit 0d9e0d0f31
9 changed files with 34 additions and 23 deletions

View file

@ -17,6 +17,6 @@
package io.bitsquare.common.handlers; package io.bitsquare.common.handlers;
public interface ResultHandler extends Runnable { public interface ResultHandler {
void handleResult(); void handleResult();
} }

View file

@ -157,7 +157,7 @@ public class ArbitratorManager {
Arbitrator registeredArbitrator = user.getRegisteredArbitrator(); Arbitrator registeredArbitrator = user.getRegisteredArbitrator();
if (registeredArbitrator != null) { if (registeredArbitrator != null) {
addArbitrator(registeredArbitrator, addArbitrator(registeredArbitrator,
this::applyArbitrators, () -> applyArbitrators(),
log::error log::error
); );
} }

View file

@ -85,7 +85,8 @@ public class P2PService {
@Named(ProgramArguments.TOR_DIR) File torDir, @Named(ProgramArguments.TOR_DIR) File torDir,
@Named(ProgramArguments.USE_LOCALHOST) boolean useLocalhost, @Named(ProgramArguments.USE_LOCALHOST) boolean useLocalhost,
EncryptionService encryptionService, EncryptionService encryptionService,
KeyRing keyRing) { KeyRing keyRing,
@Named("storage.dir") File storageDir) {
this.encryptionService = encryptionService; this.encryptionService = encryptionService;
this.keyRing = keyRing; this.keyRing = keyRing;
@ -105,7 +106,7 @@ public class P2PService {
// storage layer // storage layer
dataStorage = new ProtectedExpirableDataStorage(routing, encryptionService); dataStorage = new ProtectedExpirableDataStorage(routing, storageDir);
// Listeners // Listeners

View file

@ -85,7 +85,7 @@ public class SeedNode {
seedNodesRepository.setTorSeedNodeAddresses(seedNodes); seedNodesRepository.setTorSeedNodeAddresses(seedNodes);
} }
p2PService = new P2PService(seedNodesRepository, port, new File("bitsquare_seed_node_" + port), useLocalhost, encryptionService, keyRing); p2PService = new P2PService(seedNodesRepository, port, new File("bitsquare_seed_node_" + port), useLocalhost, encryptionService, keyRing, new File("dummy"));
p2PService.start(listener); p2PService.start(listener);
} }

View file

@ -3,16 +3,17 @@ package io.bitsquare.p2p.storage;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import io.bitsquare.common.UserThread; import io.bitsquare.common.UserThread;
import io.bitsquare.common.crypto.CryptoUtil; import io.bitsquare.common.crypto.CryptoUtil;
import io.bitsquare.crypto.EncryptionService;
import io.bitsquare.p2p.Address; import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.network.IllegalRequest; import io.bitsquare.p2p.network.IllegalRequest;
import io.bitsquare.p2p.network.MessageListener; import io.bitsquare.p2p.network.MessageListener;
import io.bitsquare.p2p.routing.Routing; import io.bitsquare.p2p.routing.Routing;
import io.bitsquare.p2p.storage.data.*; import io.bitsquare.p2p.storage.data.*;
import io.bitsquare.p2p.storage.messages.*; import io.bitsquare.p2p.storage.messages.*;
import io.bitsquare.storage.Storage;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File;
import java.math.BigInteger; import java.math.BigInteger;
import java.security.*; import java.security.*;
import java.util.List; import java.util.List;
@ -29,10 +30,10 @@ public class ProtectedExpirableDataStorage {
public static int CHECK_TTL_INTERVAL = 10 * 60 * 1000; public static int CHECK_TTL_INTERVAL = 10 * 60 * 1000;
private final Routing routing; private final Routing routing;
private final EncryptionService encryptionService;
private final Map<BigInteger, ProtectedData> map = new ConcurrentHashMap<>(); private final Map<BigInteger, ProtectedData> map = new ConcurrentHashMap<>();
private final List<HashSetChangedListener> hashSetChangedListeners = new CopyOnWriteArrayList<>(); private final List<HashSetChangedListener> hashSetChangedListeners = new CopyOnWriteArrayList<>();
private final Map<BigInteger, Integer> sequenceNumberMap = new ConcurrentHashMap<>(); private ConcurrentHashMap<BigInteger, Integer> sequenceNumberMap = new ConcurrentHashMap<>();
private final Storage<ConcurrentHashMap> storage;
private boolean authenticated; private boolean authenticated;
private final Timer timer = new Timer(); private final Timer timer = new Timer();
private volatile boolean shutDownInProgress; private volatile boolean shutDownInProgress;
@ -42,9 +43,15 @@ public class ProtectedExpirableDataStorage {
// Constructor // Constructor
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public ProtectedExpirableDataStorage(Routing routing, EncryptionService encryptionService) { public ProtectedExpirableDataStorage(Routing routing, File storageDir) {
this.routing = routing; this.routing = routing;
this.encryptionService = encryptionService;
storage = new Storage<>(storageDir);
ConcurrentHashMap<BigInteger, Integer> persisted = storage.initAndGetPersisted(sequenceNumberMap, "sequenceNumberMap");
if (persisted != null) {
sequenceNumberMap = persisted;
}
addMessageListener((message, connection) -> { addMessageListener((message, connection) -> {
if (message instanceof DataMessage) { if (message instanceof DataMessage) {
@ -102,11 +109,12 @@ public class ProtectedExpirableDataStorage {
&& (!containsKey || checkIfStoredDataMatchesNewData(protectedData, hashOfPayload)) && (!containsKey || checkIfStoredDataMatchesNewData(protectedData, hashOfPayload))
&& doAddProtectedExpirableData(protectedData, hashOfPayload, sender); && doAddProtectedExpirableData(protectedData, hashOfPayload, sender);
if (result) if (result) {
sequenceNumberMap.put(hashOfPayload, protectedData.sequenceNumber); sequenceNumberMap.put(hashOfPayload, protectedData.sequenceNumber);
else storage.queueUpForSave();
} else {
log.debug("add failed"); log.debug("add failed");
}
return result; return result;
} }
@ -121,11 +129,12 @@ public class ProtectedExpirableDataStorage {
&& checkIfStoredDataMatchesNewData(protectedData, hashOfPayload) && checkIfStoredDataMatchesNewData(protectedData, hashOfPayload)
&& doRemoveProtectedExpirableData(protectedData, hashOfPayload, sender); && doRemoveProtectedExpirableData(protectedData, hashOfPayload, sender);
if (result) if (result) {
sequenceNumberMap.put(hashOfPayload, protectedData.sequenceNumber); sequenceNumberMap.put(hashOfPayload, protectedData.sequenceNumber);
else storage.queueUpForSave();
} else {
log.debug("remove failed"); log.debug("remove failed");
}
return result; return result;
} }
@ -141,11 +150,12 @@ public class ProtectedExpirableDataStorage {
&& checkIfStoredMailboxDataMatchesNewMailboxData(protectedMailboxData, hashOfData) && checkIfStoredMailboxDataMatchesNewMailboxData(protectedMailboxData, hashOfData)
&& doRemoveProtectedExpirableData(protectedMailboxData, hashOfData, sender); && doRemoveProtectedExpirableData(protectedMailboxData, hashOfData, sender);
if (result) if (result) {
sequenceNumberMap.put(hashOfData, protectedMailboxData.sequenceNumber); sequenceNumberMap.put(hashOfData, protectedMailboxData.sequenceNumber);
else storage.queueUpForSave();
} else {
log.debug("removeMailboxData failed"); log.debug("removeMailboxData failed");
}
return result; return result;
} }

View file

@ -117,7 +117,7 @@ public class TestUtils {
seedNodesRepository.setTorSeedNodeAddresses(seedNodes); seedNodesRepository.setTorSeedNodeAddresses(seedNodes);
} }
P2PService p2PService = new P2PService(seedNodesRepository, port, new File("seed_node_" + port), useLocalhost, encryptionService, keyRing); P2PService p2PService = new P2PService(seedNodesRepository, port, new File("seed_node_" + port), useLocalhost, encryptionService, keyRing, new File("dummy"));
p2PService.start(new P2PServiceListener() { p2PService.start(new P2PServiceListener() {
@Override @Override
public void onAllDataReceived() { public void onAllDataReceived() {

View file

@ -58,7 +58,7 @@ public class ProtectedDataStorageTest {
encryptionService1 = new EncryptionService(keyRing1); encryptionService1 = new EncryptionService(keyRing1);
networkNode1 = TestUtils.getAndStartSeedNode(8001, encryptionService1, keyRing1, useClearNet, seedNodes).getP2PService().getNetworkNode(); networkNode1 = TestUtils.getAndStartSeedNode(8001, encryptionService1, keyRing1, useClearNet, seedNodes).getP2PService().getNetworkNode();
routing1 = new Routing(networkNode1, seedNodes); routing1 = new Routing(networkNode1, seedNodes);
dataStorage1 = new ProtectedExpirableDataStorage(routing1, encryptionService1); dataStorage1 = new ProtectedExpirableDataStorage(routing1, new File("dummy"));
// for mailbox // for mailbox
keyRing2 = new KeyRing(new KeyStorage(new File("temp_keyStorage2"))); keyRing2 = new KeyRing(new KeyStorage(new File("temp_keyStorage2")));
@ -100,7 +100,7 @@ public class ProtectedDataStorageTest {
public void testExpirableData() throws InterruptedException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException { public void testExpirableData() throws InterruptedException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException {
ProtectedExpirableDataStorage.CHECK_TTL_INTERVAL = 10; ProtectedExpirableDataStorage.CHECK_TTL_INTERVAL = 10;
// CHECK_TTL_INTERVAL is used in constructor of ProtectedExpirableDataStorage so we recreate it here // CHECK_TTL_INTERVAL is used in constructor of ProtectedExpirableDataStorage so we recreate it here
dataStorage1 = new ProtectedExpirableDataStorage(routing1, encryptionService1); dataStorage1 = new ProtectedExpirableDataStorage(routing1, new File("dummy"));
mockData.ttl = 50; mockData.ttl = 50;
ProtectedData data = dataStorage1.getDataWithSignedSeqNr(mockData, storageSignatureKeyPair1); ProtectedData data = dataStorage1.getDataWithSignedSeqNr(mockData, storageSignatureKeyPair1);