Update p2p package #756 (#884)

This commit is contained in:
nsec1 2024-05-15 07:52:36 -03:00 committed by GitHub
parent a723c0d86b
commit 1b864368e1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
39 changed files with 262 additions and 105 deletions

View file

@ -18,6 +18,7 @@
package haveno.network;
import com.runjva.sourceforge.jsocks.protocol.Socks5Proxy;
import haveno.common.util.SingleThreadExecutorUtils;
import haveno.common.util.Utilities;
import lombok.extern.slf4j.Slf4j;
import org.bitcoinj.core.NetworkParameters;
@ -32,7 +33,6 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
@ -83,9 +83,9 @@ public class Socks5DnsDiscovery extends MultiplexingDiscovery {
// Attempted workaround for reported bugs on Linux in which gethostbyname does not appear to be properly
// thread safe and can cause segfaults on some libc versions.
if (Utilities.isLinux())
return Executors.newSingleThreadExecutor(new ContextPropagatingThreadFactory("DNS seed lookups"));
return SingleThreadExecutorUtils.getSingleThreadExecutor(new ContextPropagatingThreadFactory("DNS seed lookups"));
else
return Executors.newFixedThreadPool(seeds.size(), new DaemonThreadFactory("DNS seed lookups"));
return Utilities.getFixedThreadPoolExecutor(seeds.size(), new DaemonThreadFactory("DNS seed lookups"));
}
/**

View file

@ -40,11 +40,11 @@ public abstract class BootstrapListener implements P2PServiceListener {
}
@Override
public void onDataReceived() {
public void onUpdatedDataReceived() {
}
@Override
public abstract void onUpdatedDataReceived();
public abstract void onDataReceived();
@Override
public void onRequestCustomBridges() {

View file

@ -189,6 +189,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
}
private void doShutDown() {
log.info("P2PService doShutDown started");
if (p2PDataStorage != null) {
p2PDataStorage.shutDown();
@ -298,7 +299,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
@Override
public void onUpdatedDataReceived() {
applyIsBootstrapped(P2PServiceListener::onUpdatedDataReceived);
p2pServiceListeners.forEach(P2PServiceListener::onUpdatedDataReceived);
}
@Override
@ -313,7 +314,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
@Override
public void onDataReceived() {
p2pServiceListeners.forEach(P2PServiceListener::onDataReceived);
applyIsBootstrapped(P2PServiceListener::onDataReceived);
}
private void applyIsBootstrapped(Consumer<P2PServiceListener> listenerHandler) {

View file

@ -44,7 +44,7 @@ public enum CloseConnectionReason {
// illegal requests
RULE_VIOLATION(true, false),
PEER_BANNED(true, false),
PEER_BANNED(false, false),
INVALID_CLASS_RECEIVED(false, false),
MANDATORY_CAPABILITIES_NOT_SUPPORTED(false, false);

View file

@ -47,6 +47,16 @@ public class ConnectionState implements MessageListener {
@Setter
private static int expectedRequests = 6;
// We have 2 GetDataResponses and 3 GetHashResponses. If node is a lite node it also has a GetBlocksResponse if
// blocks are missing.
private static final int MIN_EXPECTED_RESPONSES = 5;
private static int expectedInitialDataResponses = MIN_EXPECTED_RESPONSES;
// If app runs in LiteNode mode there is one more expected request for the getBlocks request, so we increment standard value.
public static void incrementExpectedInitialDataResponses() {
expectedInitialDataResponses += 1;
}
private final Connection connection;
@Getter
@ -121,7 +131,7 @@ public class ConnectionState implements MessageListener {
}
private void maybeResetInitialDataExchangeType() {
if (numInitialDataResponses >= expectedRequests) {
if (numInitialDataResponses >= expectedInitialDataResponses) {
// We have received the expected messages from initial data requests. We delay a bit the reset
// to give time for processing the response and more tolerance to edge cases where we expect more responses.
// Reset to PEER does not mean disconnection as well, but just that this connection has lower priority and
@ -165,7 +175,7 @@ public class ConnectionState implements MessageListener {
",\n numInitialDataResponses=" + numInitialDataResponses +
",\n lastInitialDataMsgTimeStamp=" + lastInitialDataMsgTimeStamp +
",\n isSeedNode=" + isSeedNode +
",\n expectedRequests=" + expectedRequests +
",\n expectedInitialDataResponses=" + expectedInitialDataResponses +
"\n}";
}
}

View file

@ -361,6 +361,7 @@ public abstract class NetworkNode implements MessageListener {
}
public void shutDown(Runnable shutDownCompleteHandler) {
log.info("NetworkNode shutdown started");
if (!shutDownInProgress) {
shutDownInProgress = true;
if (server != null) {

View file

@ -18,12 +18,16 @@
package haveno.network.p2p.network;
import java.io.File;
import java.util.Date;
import lombok.extern.slf4j.Slf4j;
import org.berndpruenster.netlayer.tor.ExternalTor;
import org.berndpruenster.netlayer.tor.Tor;
import org.berndpruenster.netlayer.tor.TorCtlException;
import java.net.ConnectException;
import java.net.UnknownHostException;
/**
* This class creates a brand new instance of the Tor onion router.
*
@ -61,24 +65,47 @@ public class RunningTor extends TorMode {
@Override
public Tor getTor() throws TorCtlException {
long ts1 = new Date().getTime();
boolean retry = true;
long twoMinutesInMilli = 1000 * 60 * 2;
log.info("Connecting to running tor");
while (retry && ((new Date().getTime() - ts1) <= twoMinutesInMilli)) {
retry = false;
try {
log.info("Connecting to running tor");
Tor result;
if (!password.isEmpty())
result = new ExternalTor(controlHost, controlPort, password);
else if (cookieFile != null && cookieFile.exists())
result = new ExternalTor(controlHost, controlPort, cookieFile, useSafeCookieAuthentication);
else
result = new ExternalTor(controlHost, controlPort);
Tor result;
if (!password.isEmpty())
result = new ExternalTor(controlHost, controlPort, password);
else if (cookieFile != null && cookieFile.exists())
result = new ExternalTor(controlHost, controlPort, cookieFile, useSafeCookieAuthentication);
else
result = new ExternalTor(controlHost, controlPort);
log.info(
"\n################################################################\n"
+ "Connecting to Tor successful after {} ms. Start publishing hidden service.\n"
+ "################################################################",
(new Date().getTime() - ts1)); // takes usually a few seconds
boolean isTorBootstrapped = result.control.waitUntilBootstrapped();
if (!isTorBootstrapped) {
log.error("Couldn't bootstrap Tor.");
}
return result;
log.info(
"\n################################################################\n"
+ "Connecting to Tor successful after {} ms. Start publishing hidden service.\n"
+ "################################################################",
(new Date().getTime() - ts1)); // takes usually a few seconds
return result;
} catch (Exception e) {
// netlayer throws UnknownHostException when tor docker container is not ready yet.
// netlayer throws ConnectException before tor container bind to control port.
if (e instanceof UnknownHostException || e instanceof ConnectException) {
log.warn("Couldn't connect to Tor control port. Retrying...", e);
retry = true;
}
log.error("Couldn't connect to Tor.", e);
}
}
return null;
}
@Override

View file

@ -70,7 +70,7 @@ public class Statistic {
totalReceivedBytesPerSec.set(((double) totalReceivedBytes.get()) / passed);
}, 1);
// We log statistics every 5 minutes
// We log statistics every 60 minutes
UserThread.runPeriodically(() -> {
String ls = System.lineSeparator();
log.info("Accumulated network statistics:" + ls +
@ -79,14 +79,14 @@ public class Statistic {
"Number of sent messages per sec: {};" + ls +
"Bytes received: {}" + ls +
"Number of received messages/Received messages: {} / {};" + ls +
"Number of received messages per sec: {};" + ls,
"Number of received messages per sec: {}" + ls,
Utilities.readableFileSize(totalSentBytes.get()),
numTotalSentMessages.get(), totalSentMessages,
numTotalSentMessagesPerSec.get(),
Utilities.readableFileSize(totalReceivedBytes.get()),
numTotalReceivedMessages.get(), totalReceivedMessages,
numTotalReceivedMessagesPerSec.get());
}, TimeUnit.MINUTES.toSeconds(5));
}, TimeUnit.MINUTES.toSeconds(60));
}
public static LongProperty totalSentBytesProperty() {
@ -236,6 +236,30 @@ public class Statistic {
return roundTripTime;
}
public static long getTotalSentBytes() {
return totalSentBytes.get();
}
public static double getTotalSentBytesPerSec() {
return totalSentBytesPerSec.get();
}
public static long getTotalReceivedBytes() {
return totalReceivedBytes.get();
}
public static double getTotalReceivedBytesPerSec() {
return totalReceivedBytesPerSec.get();
}
public static double numTotalReceivedMessagesPerSec() {
return numTotalReceivedMessagesPerSec.get();
}
public static double getNumTotalSentMessagesPerSec() {
return numTotalSentMessagesPerSec.get();
}
@Override
public String toString() {
return "Statistic{" +

View file

@ -74,6 +74,7 @@ public class BroadcastHandler implements PeerManager.Listener {
private final NetworkNode networkNode;
private final PeerManager peerManager;
@Nullable
private final ResultHandler resultHandler;
private final String uid;
@ -276,6 +277,9 @@ public class BroadcastHandler implements PeerManager.Listener {
@Override
public void onFailure(@NotNull Throwable throwable) {
log.warn("Broadcast to " + connection.getPeersNodeAddressOptional() + " failed. ", throwable);
numOfFailedBroadcasts.incrementAndGet();
if (stopped.get()) {
return;
}

View file

@ -34,6 +34,7 @@ import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.Nullable;
@ -51,6 +52,7 @@ public class Broadcaster implements BroadcastHandler.ResultHandler {
private Runnable shutDownResultHandler;
private final ListeningExecutorService executor;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
@ -90,6 +92,7 @@ public class Broadcaster implements BroadcastHandler.ResultHandler {
}
private void doShutDown() {
log.info("Broadcaster doShutDown started");
broadcastHandlers.forEach(BroadcastHandler::cancel);
if (timer != null) {
timer.stop();

View file

@ -77,6 +77,7 @@ public final class PeerManager implements ConnectionListener, PersistedDataHost
private static final boolean PRINT_REPORTED_PEERS_DETAILS = true;
private Timer printStatisticsTimer;
private boolean shutDownRequested;
private int numOnConnections;
///////////////////////////////////////////////////////////////////////////////////////////
@ -167,7 +168,7 @@ public final class PeerManager implements ConnectionListener, PersistedDataHost
};
clockWatcher.addListener(clockWatcherListener);
printStatisticsTimer = UserThread.runPeriodically(this::printStatistics, TimeUnit.MINUTES.toSeconds(5));
printStatisticsTimer = UserThread.runPeriodically(this::printStatistics, TimeUnit.MINUTES.toSeconds(60));
}
public void shutDown() {
@ -209,6 +210,8 @@ public final class PeerManager implements ConnectionListener, PersistedDataHost
doHouseKeeping();
numOnConnections++;
if (lostAllConnections) {
lostAllConnections = false;
stopped = false;
@ -224,14 +227,15 @@ public final class PeerManager implements ConnectionListener, PersistedDataHost
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
log.info("onDisconnect called: nodeAddress={}, closeConnectionReason={}",
log.debug("onDisconnect called: nodeAddress={}, closeConnectionReason={}",
connection.getPeersNodeAddressOptional(), closeConnectionReason);
handleConnectionFault(connection);
boolean previousLostAllConnections = lostAllConnections;
lostAllConnections = networkNode.getAllConnections().isEmpty();
if (lostAllConnections) {
// At start-up we ignore if we would lose a connection and would fall back to no connections
if (lostAllConnections && numOnConnections > 2) {
stopped = true;
if (!shutDownRequested) {
@ -553,7 +557,7 @@ public final class PeerManager implements ConnectionListener, PersistedDataHost
if (!candidates.isEmpty()) {
Connection connection = candidates.remove(0);
log.info("checkMaxConnections: Num candidates for shut down={}. We close oldest connection to peer {}",
log.info("checkMaxConnections: Num candidates (inbound/peer) for shut down={}. We close oldest connection to peer {}",
candidates.size(), connection.getPeersNodeAddressOptional());
if (!connection.isStopped()) {
connection.shutDown(CloseConnectionReason.TOO_MANY_CONNECTIONS_OPEN,

View file

@ -37,16 +37,16 @@ import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
public class GetDataRequestHandler {
private static final long TIMEOUT = 180;
private static final long TIMEOUT = 240;
private static final int MAX_ENTRIES = 10000;
private static final int MAX_ENTRIES = 5000;
///////////////////////////////////////////////////////////////////////////////////////////
// Listener
///////////////////////////////////////////////////////////////////////////////////////////
public interface Listener {
void onComplete();
void onComplete(int serializedSize);
void onFault(String errorMessage, Connection connection);
}
@ -94,15 +94,11 @@ public class GetDataRequestHandler {
connection.getCapabilities());
if (wasPersistableNetworkPayloadsTruncated.get()) {
log.warn("The getData request from peer with {} caused too much PersistableNetworkPayload " +
"entries to get delivered. We limited the entries for the response to {} entries",
connectionInfo, MAX_ENTRIES);
log.info("The getDataResponse for peer {} got truncated.", connectionInfo);
}
if (wasProtectedStorageEntriesTruncated.get()) {
log.warn("The getData request from peer with {} caused too much ProtectedStorageEntry " +
"entries to get delivered. We limited the entries for the response to {} entries",
connectionInfo, MAX_ENTRIES);
log.info("The getDataResponse for peer {} got truncated.", connectionInfo);
}
log.info("The getDataResponse to peer with {} contains {} ProtectedStorageEntries and {} PersistableNetworkPayloads",
@ -126,8 +122,8 @@ public class GetDataRequestHandler {
if (!stopped) {
log.trace("Send DataResponse to {} succeeded. getDataResponse={}",
connection.getPeersNodeAddressOptional(), getDataResponse);
listener.onComplete(getDataResponse.toProtoNetworkEnvelope().getSerializedSize());
cleanup();
listener.onComplete();
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onSuccess call.");
}
@ -136,7 +132,7 @@ public class GetDataRequestHandler {
@Override
public void onFailure(@NotNull Throwable throwable) {
if (!stopped) {
String errorMessage = "Sending getDataRequest to " + connection +
String errorMessage = "Sending getDataResponse to " + connection +
" failed. That is expected if the peer is offline. getDataResponse=" + getDataResponse + "." +
"Exception: " + throwable.getMessage();
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_FAILURE, connection);

View file

@ -50,7 +50,7 @@ import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
class RequestDataHandler implements MessageListener {
private static final long TIMEOUT = 180;
private static final long TIMEOUT = 240;
private NodeAddress peersNodeAddress;
private String getDataRequestType;
@ -69,7 +69,7 @@ class RequestDataHandler implements MessageListener {
///////////////////////////////////////////////////////////////////////////////////////////
public interface Listener {
void onComplete();
void onComplete(boolean wasTruncated);
@SuppressWarnings("UnusedParameters")
void onFault(String errorMessage, @SuppressWarnings("SameParameterValue") @Nullable Connection connection);
@ -138,7 +138,7 @@ class RequestDataHandler implements MessageListener {
}
getDataRequestType = getDataRequest.getClass().getSimpleName();
log.info("We send a {} to peer {}. ", getDataRequestType, nodeAddress);
log.info("\n\n>> We send a {} to peer {}\n", getDataRequestType, nodeAddress);
networkNode.addMessageListener(this);
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, getDataRequest);
//noinspection UnstableApiUsage
@ -197,8 +197,7 @@ class RequestDataHandler implements MessageListener {
connection.getPeersNodeAddressOptional().get());
cleanup();
listener.onComplete();
// firstRequest = false;
listener.onComplete(getDataResponse.isWasTruncated());
} else {
log.warn("Nonce not matching. That can happen rarely if we get a response after a canceled " +
"handshake (timeout causes connection close but peer might have sent a msg before " +
@ -239,7 +238,7 @@ class RequestDataHandler implements MessageListener {
StringBuilder sb = new StringBuilder();
String sep = System.lineSeparator();
sb.append(sep).append("#################################################################").append(sep);
sb.append("Connected to node: ").append(peersNodeAddress.getFullAddress()).append(sep);
sb.append("Data provided by node: ").append(peersNodeAddress.getFullAddress()).append(sep);
int items = dataSet.size() + persistableNetworkPayloadSet.size();
sb.append("Received ").append(items).append(" instances from a ")
.append(getDataRequestType).append(sep);
@ -249,7 +248,7 @@ class RequestDataHandler implements MessageListener {
.append(" / ")
.append(Utilities.readableFileSize(value.second.get()))
.append(sep));
sb.append("#################################################################");
sb.append("#################################################################\n");
log.info(sb.toString());
}

View file

@ -42,6 +42,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
@ -55,8 +56,10 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
private static int NUM_SEEDS_FOR_PRELIMINARY_REQUEST = 2;
// how many seeds additional to the first responding PreliminaryGetDataRequest seed we request the GetUpdatedDataRequest from
private static int NUM_ADDITIONAL_SEEDS_FOR_UPDATE_REQUEST = 1;
private static int MAX_REPEATED_REQUESTS = 30;
private boolean isPreliminaryDataRequest = true;
///////////////////////////////////////////////////////////////////////////////////////////
// Listener
///////////////////////////////////////////////////////////////////////////////////////////
@ -75,6 +78,12 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
}
}
public interface ResponseListener {
void onSuccess(int serializedSize);
void onFault();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Class fields
@ -84,6 +93,7 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
private final P2PDataStorage dataStorage;
private final PeerManager peerManager;
private final List<NodeAddress> seedNodeAddresses;
private final List<ResponseListener> responseListeners = new CopyOnWriteArrayList<>();
// As we use Guice injection we cannot set the listener in our constructor but the P2PService calls the setListener
// in it's constructor so we can guarantee it is not null.
@ -94,8 +104,9 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
private Optional<NodeAddress> nodeAddressOfPreliminaryDataRequest = Optional.empty();
private Timer retryTimer;
private boolean dataUpdateRequested;
private boolean allDataReceived;
private boolean stopped;
private int numRepeatedRequests = 0;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
@ -124,6 +135,7 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
if (seedNodeRepository.isSeedNode(myAddress)) {
NUM_SEEDS_FOR_PRELIMINARY_REQUEST = 3;
NUM_ADDITIONAL_SEEDS_FOR_UPDATE_REQUEST = 2;
MAX_REPEATED_REQUESTS = 100;
}
}
});
@ -199,6 +211,10 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
return nodeAddressOfPreliminaryDataRequest;
}
public void addResponseListener(ResponseListener responseListener) {
responseListeners.add(responseListener);
}
///////////////////////////////////////////////////////////////////////////////////////////
// ConnectionListener implementation
@ -266,9 +282,11 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
GetDataRequestHandler getDataRequestHandler = new GetDataRequestHandler(networkNode, dataStorage,
new GetDataRequestHandler.Listener() {
@Override
public void onComplete() {
public void onComplete(int serializedSize) {
getDataRequestHandlers.remove(uid);
log.trace("requestDataHandshake completed.\n\tConnection={}", connection);
responseListeners.forEach(listener -> listener.onSuccess(serializedSize));
}
@Override
@ -278,6 +296,8 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
log.trace("GetDataRequestHandler failed.\n\tConnection={}\n\t" +
"ErrorMessage={}", connection, errorMessage);
peerManager.handleConnectionFault(connection);
responseListeners.forEach(ResponseListener::onFault);
} else {
log.warn("We have stopped already. We ignore that getDataRequestHandler.handle.onFault call.");
}
@ -313,7 +333,7 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
RequestDataHandler requestDataHandler = new RequestDataHandler(networkNode, dataStorage, peerManager,
new RequestDataHandler.Listener() {
@Override
public void onComplete() {
public void onComplete(boolean wasTruncated) {
log.trace("RequestDataHandshake of outbound connection complete. nodeAddress={}",
nodeAddress);
stopRetryTimer();
@ -336,7 +356,27 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
checkNotNull(listener).onUpdatedDataReceived();
}
checkNotNull(listener).onDataReceived();
if (wasTruncated) {
if (numRepeatedRequests < MAX_REPEATED_REQUESTS) {
// If we had allDataReceived already set to true but get a response with truncated flag,
// we still repeat the request to that node for higher redundancy. Otherwise, one seed node
// providing incomplete data would stop others to fill the gaps.
log.info("DataResponse did not contain all data, so we repeat request until we got all data");
UserThread.runAfter(() -> requestData(nodeAddress, remainingNodeAddresses), 2);
} else if (!allDataReceived) {
allDataReceived = true;
log.warn("\n#################################################################\n" +
"Loading initial data from {} did not complete after 20 repeated requests. \n" +
"#################################################################\n", nodeAddress);
checkNotNull(listener).onDataReceived();
}
} else if (!allDataReceived) {
allDataReceived = true;
log.info("\n\n#################################################################\n" +
"Loading initial data from {} completed\n" +
"#################################################################\n", nodeAddress);
checkNotNull(listener).onDataReceived();
}
}
@Override
@ -377,6 +417,7 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
}
});
handlerMap.put(nodeAddress, requestDataHandler);
numRepeatedRequests++;
requestDataHandler.requestData(nodeAddress, isPreliminaryDataRequest);
} else {
log.warn("We have started already a requestDataHandshake to peer. nodeAddress=" + nodeAddress + "\n" +

View file

@ -130,7 +130,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener,
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
log.info("onDisconnect closeConnectionReason={}, nodeAddressOpt={}", closeConnectionReason, connection.getPeersNodeAddressOptional());
log.debug("onDisconnect closeConnectionReason={}, nodeAddressOpt={}", closeConnectionReason, connection.getPeersNodeAddressOptional());
closeHandler(connection);
if (retryTimer == null) {

View file

@ -22,6 +22,7 @@ import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import haveno.common.crypto.CryptoException;
import haveno.common.crypto.Sig;
import haveno.common.proto.network.GetDataResponsePriority;
import haveno.common.proto.network.NetworkPayload;
import haveno.common.proto.network.NetworkProtoResolver;
import haveno.common.proto.persistable.PersistablePayload;
@ -141,6 +142,10 @@ public class ProtectedStorageEntry implements NetworkPayload, PersistablePayload
(clock.millis() - creationTimeStamp) > ((ExpirablePayload) protectedStoragePayload).getTTL();
}
public GetDataResponsePriority getGetDataResponsePriority() {
return protectedStoragePayload.getGetDataResponsePriority();
}
/*
* Returns true if the Entry is valid for an add operation. For non-mailbox Entrys, the entry owner must
* match the payload owner.

View file

@ -27,8 +27,10 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
/**

View file

@ -79,7 +79,7 @@ public abstract class HistoricalDataStoreService<T extends PersistableNetworkPay
"As our historical store is a newer version we add the data to our result map." :
"As the requester version is not older as our historical store we do not " +
"add the data to the result map.";
log.info("The requester had version {}. Our historical data store has version {}.\n{}",
log.trace("The requester had version {}. Our historical data store has version {}.\n{}",
requestersVersion, storeVersion, details);
return newVersion;
})
@ -141,7 +141,7 @@ public abstract class HistoricalDataStoreService<T extends PersistableNetworkPay
@Override
protected void readFromResources(String postFix, Runnable completeHandler) {
readStore(persisted -> {
log.info("We have created the {} store for the live data and filled it with {} entries from the persisted data.",
log.debug("We have created the {} store for the live data and filled it with {} entries from the persisted data.",
getFileName(), getMapOfLiveData().size());
// Now we add our historical data stores.
@ -181,7 +181,7 @@ public abstract class HistoricalDataStoreService<T extends PersistableNetworkPay
persistenceManager.readPersisted(fileName, persisted -> {
storesByVersion.put(version, persisted);
allHistoricalPayloads.putAll(persisted.getMap());
log.info("We have read from {} {} historical items.", fileName, persisted.getMap().size());
log.debug("We have read from {} {} historical items.", fileName, persisted.getMap().size());
pruneStore(persisted, version);
completeHandler.run();
},
@ -195,11 +195,11 @@ public abstract class HistoricalDataStoreService<T extends PersistableNetworkPay
mapOfLiveData.keySet().removeAll(historicalStore.getMap().keySet());
int postLive = mapOfLiveData.size();
if (preLive > postLive) {
log.info("We pruned data from our live data store which are already contained in the historical data store with version {}. " +
log.debug("We pruned data from our live data store which are already contained in the historical data store with version {}. " +
"The live map had {} entries before pruning and has {} entries afterwards.",
version, preLive, postLive);
} else {
log.info("No pruning from historical data store with version {} was applied", version);
log.debug("No pruning from historical data store with version {} was applied", version);
}
requestPersistence();
}

View file

@ -109,18 +109,18 @@ public abstract class StoreService<T extends PersistableEnvelope> {
File destinationFile = new File(Paths.get(absolutePathOfStorageDir, fileName).toString());
if (!destinationFile.exists()) {
try {
log.info("We copy resource to file: resourceFileName={}, destinationFile={}", resourceFileName, destinationFile);
log.debug("We copy resource to file: resourceFileName={}, destinationFile={}", resourceFileName, destinationFile);
FileUtil.resourceToFile(resourceFileName, destinationFile);
return true;
} catch (ResourceNotFoundException e) {
log.info("Could not find resourceFile " + resourceFileName + ". That is expected if none is provided yet.");
log.debug("Could not find resourceFile " + resourceFileName + ". That is expected if none is provided yet.");
} catch (Throwable e) {
log.error("Could not copy resourceFile " + resourceFileName + " to " +
destinationFile.getAbsolutePath() + ".\n" + e.getMessage());
e.printStackTrace();
}
} else {
log.info("No resource file was copied. {} exists already.", fileName);
log.debug("No resource file was copied. {} exists already.", fileName);
}
return false;
}