update p2p connection and message packages

remove inventor and monitor packages

Co-authored-by: Alva Swanson <alvasw@protonmail.com>
Co-authored-by: Alejandro García <117378669+alejandrogarcia83@users.noreply.github.com>
Co-authored-by: jmacxx <47253594+jmacxx@users.noreply.github.com>
Co-authored-by: HenrikJannsen <boilingfrog@gmx.com>
This commit is contained in:
woodser 2023-04-24 22:41:10 -04:00
parent 0f41c8d8b8
commit e0db4528da
79 changed files with 1332 additions and 5327 deletions

View file

@ -1,59 +1,63 @@
/*
* This file is part of Haveno.
*
* Haveno is free software: you can redistribute it and/or modify it
* haveno is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Haveno is distributed in the hope that it will be useful, but WITHOUT
* haveno is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Haveno. If not, see <http://www.gnu.org/licenses/>.
* along with haveno. If not, see <http://www.gnu.org/licenses/>.
*/
package haveno.network.p2p;
import haveno.common.config.Config;
import haveno.common.proto.network.NetworkProtoResolver;
import haveno.network.p2p.network.BridgeAddressProvider;
import haveno.network.p2p.network.LocalhostNetworkNode;
import haveno.network.p2p.network.NetworkFilter;
import haveno.network.p2p.network.BanFilter;
import haveno.network.p2p.network.NetworkNode;
import haveno.network.p2p.network.NewTor;
import haveno.network.p2p.network.RunningTor;
import haveno.network.p2p.network.TorMode;
import haveno.network.p2p.network.TorNetworkNode;
import javax.annotation.Nullable;
import haveno.common.config.Config;
import haveno.common.proto.network.NetworkProtoResolver;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Provider;
import java.io.File;
import javax.annotation.Nullable;
public class NetworkNodeProvider implements Provider<NetworkNode> {
private final NetworkNode networkNode;
@Inject
public NetworkNodeProvider(NetworkProtoResolver networkProtoResolver,
BridgeAddressProvider bridgeAddressProvider,
@Nullable NetworkFilter networkFilter,
@Named(Config.USE_LOCALHOST_FOR_P2P) boolean useLocalhostForP2P,
@Named(Config.NODE_PORT) int port,
@Named(Config.TOR_DIR) File torDir,
@Nullable @Named(Config.TORRC_FILE) File torrcFile,
@Named(Config.TORRC_OPTIONS) String torrcOptions,
@Named(Config.TOR_CONTROL_PORT) int controlPort,
@Named(Config.TOR_CONTROL_PASSWORD) String password,
@Nullable @Named(Config.TOR_CONTROL_COOKIE_FILE) File cookieFile,
@Named(Config.TOR_STREAM_ISOLATION) boolean streamIsolation,
@Named(Config.TOR_CONTROL_USE_SAFE_COOKIE_AUTH) boolean useSafeCookieAuthentication) {
BridgeAddressProvider bridgeAddressProvider,
@Nullable BanFilter banFilter,
@Named(Config.MAX_CONNECTIONS) int maxConnections,
@Named(Config.USE_LOCALHOST_FOR_P2P) boolean useLocalhostForP2P,
@Named(Config.NODE_PORT) int port,
@Named(Config.TOR_DIR) File torDir,
@Nullable @Named(Config.TORRC_FILE) File torrcFile,
@Named(Config.TORRC_OPTIONS) String torrcOptions,
@Named(Config.TOR_CONTROL_PORT) int controlPort,
@Named(Config.TOR_CONTROL_PASSWORD) String password,
@Nullable @Named(Config.TOR_CONTROL_COOKIE_FILE) File cookieFile,
@Named(Config.TOR_STREAM_ISOLATION) boolean streamIsolation,
@Named(Config.TOR_CONTROL_USE_SAFE_COOKIE_AUTH) boolean useSafeCookieAuthentication) {
if (useLocalhostForP2P) {
networkNode = new LocalhostNetworkNode(port, networkProtoResolver, networkFilter);
networkNode = new LocalhostNetworkNode(port, networkProtoResolver, banFilter, maxConnections);
} else {
TorMode torMode = getTorMode(bridgeAddressProvider,
torDir,
@ -63,21 +67,21 @@ public class NetworkNodeProvider implements Provider<NetworkNode> {
password,
cookieFile,
useSafeCookieAuthentication);
networkNode = new TorNetworkNode(port, networkProtoResolver, streamIsolation, torMode, networkFilter);
networkNode = new TorNetworkNode(port, networkProtoResolver, streamIsolation, torMode, banFilter, maxConnections);
}
}
private TorMode getTorMode(BridgeAddressProvider bridgeAddressProvider,
File torDir,
@Nullable File torrcFile,
String torrcOptions,
int controlPort,
String password,
@Nullable File cookieFile,
boolean useSafeCookieAuthentication) {
File torDir,
@Nullable File torrcFile,
String torrcOptions,
int controlPort,
String password,
@Nullable File cookieFile,
boolean useSafeCookieAuthentication) {
return controlPort != Config.UNSPECIFIED_PORT ?
new RunningTor(torDir, controlPort, password, cookieFile, useSafeCookieAuthentication) :
new NewTor(torDir, torrcFile, torrcOptions, bridgeAddressProvider.getBridgeAddresses());
new NewTor(torDir, torrcFile, torrcOptions, bridgeAddressProvider);
}
@Override

View file

@ -357,10 +357,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
UserThread.runAfter(() -> numConnectedPeers.set(networkNode.getAllConnections().size()), 3);
}
@Override
public void onError(Throwable throwable) {
}
///////////////////////////////////////////////////////////////////////////////////////////
// MessageListener implementation

View file

@ -20,8 +20,6 @@ package haveno.network.p2p.mailbox;
import com.google.common.base.Joiner;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import haveno.common.UserThread;
@ -34,6 +32,7 @@ import haveno.common.persistence.PersistenceManager;
import haveno.common.proto.ProtobufferException;
import haveno.common.proto.network.NetworkEnvelope;
import haveno.common.proto.persistable.PersistedDataHost;
import haveno.common.util.Tuple2;
import haveno.common.util.Utilities;
import haveno.network.crypto.EncryptionService;
import haveno.network.p2p.DecryptedMessageWithPubKey;
@ -64,6 +63,7 @@ import javax.inject.Singleton;
import java.security.PublicKey;
import java.time.Clock;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Date;
@ -76,6 +76,7 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
@ -119,6 +120,8 @@ public class MailboxMessageService implements HashMapChangedListener, PersistedD
private final Map<String, MailboxItem> mailboxItemsByUid = new HashMap<>();
private boolean isBootstrapped;
private boolean allServicesInitialized;
private boolean initAfterBootstrapped;
@Inject
public MailboxMessageService(NetworkNode networkNode,
@ -151,50 +154,69 @@ public class MailboxMessageService implements HashMapChangedListener, PersistedD
@Override
public void readPersisted(Runnable completeHandler) {
persistenceManager.readPersisted(persisted -> {
log.trace("## readPersisted persisted {}", persisted.size());
Map<String, Long> numItemsPerDay = new HashMap<>();
// We sort by creation date and limit to max 3000 entries, so oldest items get skipped even if TTL
// is not reached to cap the memory footprint. 3000 items is about 10 MB.
Map<String, Tuple2<AtomicLong, List<Integer>>> numItemsPerDay = new HashMap<>();
AtomicLong totalSize = new AtomicLong();
// We sort by creation date and limit to max 3000 entries, so the oldest items get skipped even if TTL
// is not reached. 3000 items is about 60 MB with max size of 20kb supported for storage.
persisted.stream()
.sorted(Comparator.comparingLong(o -> ((MailboxItem) o).getProtectedMailboxStorageEntry().getCreationTimeStamp()).reversed())
.limit(3000)
.filter(e -> !e.isExpired(clock))
.filter(e -> !mailboxItemsByUid.containsKey(e.getUid()))
.limit(3000)
.forEach(mailboxItem -> {
ProtectedMailboxStorageEntry protectedMailboxStorageEntry = mailboxItem.getProtectedMailboxStorageEntry();
int serializedSize = protectedMailboxStorageEntry.toProtoMessage().getSerializedSize();
// Usual size is 3-4kb. A few are about 15kb and very few are larger and about 100kb or
// more (probably attachments in disputes)
// We ignore those large data to reduce memory footprint.
if (serializedSize < 20000) {
String date = new Date(protectedMailboxStorageEntry.getCreationTimeStamp()).toString();
String day = date.substring(4, 10);
numItemsPerDay.putIfAbsent(day, 0L);
numItemsPerDay.put(day, numItemsPerDay.get(day) + 1);
String date = new Date(protectedMailboxStorageEntry.getCreationTimeStamp()).toString();
String day = date.substring(4, 10);
numItemsPerDay.putIfAbsent(day, new Tuple2<>(new AtomicLong(0), new ArrayList<>()));
Tuple2<AtomicLong, List<Integer>> tuple = numItemsPerDay.get(day);
tuple.first.getAndIncrement();
tuple.second.add(serializedSize);
String uid = mailboxItem.getUid();
mailboxItemsByUid.put(uid, mailboxItem);
// We only keep small items, to reduce the potential impact of missed remove messages.
// E.g. if a seed at a longer restart period missed the remove messages, then when loading from
// persisted data the messages, they would add those again and distribute then later at requests to peers.
// Those outdated messages would then stay in the network until TTL triggers removal.
// By not applying large messages we reduce the impact of such cases at costs of extra loading costs if the message is still alive.
if (serializedSize < 20000) {
mailboxItemsByUid.put(mailboxItem.getUid(), mailboxItem);
mailboxMessageList.add(mailboxItem);
totalSize.getAndAdd(serializedSize);
// We add it to our map so that it get added to the excluded key set we send for
// the initial data requests. So that helps to lower the load for mailbox messages at
// initial data requests.
//todo check if listeners are called too early
p2PDataStorage.addProtectedMailboxStorageEntryToMap(protectedMailboxStorageEntry);
log.trace("## readPersisted uid={}\nhash={}\nisMine={}\ndate={}\nsize={}",
uid,
P2PDataStorage.get32ByteHashAsByteArray(protectedMailboxStorageEntry.getProtectedStoragePayload()),
mailboxItem.isMine(),
date,
serializedSize);
} else {
log.info("We ignore this large persisted mailboxItem. If still valid we will reload it from seed nodes at getData requests.\n" +
"Size={}; date={}; sender={}", Utilities.readableFileSize(serializedSize), date,
mailboxItem.getProtectedMailboxStorageEntry().getMailboxStoragePayload().getPrefixedSealedAndSignedMessage().getSenderNodeAddress());
}
});
List<Map.Entry<String, Long>> perDay = numItemsPerDay.entrySet().stream()
List<String> perDay = numItemsPerDay.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.map(entry -> {
Tuple2<AtomicLong, List<Integer>> tuple = entry.getValue();
List<Integer> sizes = tuple.second;
long sum = sizes.stream().mapToLong(s -> s).sum();
List<String> largeItems = sizes.stream()
.filter(s -> s > 20000)
.map(Utilities::readableFileSize)
.collect(Collectors.toList());
String largeMsgInfo = largeItems.isEmpty() ? "" : "; Large messages: " + largeItems;
return entry.getKey() + ": Num messages: " + tuple.first + "; Total size: " +
Utilities.readableFileSize(sum) + largeMsgInfo;
})
.collect(Collectors.toList());
log.info("We loaded {} persisted mailbox messages.\nPer day distribution:\n{}", mailboxMessageList.size(), Joiner.on("\n").join(perDay));
log.info("We loaded {} persisted mailbox messages with {}.\nPer day distribution:\n{}",
mailboxMessageList.size(),
Utilities.readableFileSize(totalSize.get()),
Joiner.on("\n").join(perDay));
requestPersistence();
completeHandler.run();
},
@ -206,6 +228,12 @@ public class MailboxMessageService implements HashMapChangedListener, PersistedD
// API
///////////////////////////////////////////////////////////////////////////////////////////
// We wait until all services are ready to avoid some edge cases as in https://github.com/bisq-network/bisq/issues/6367
public void onAllServicesInitialized() {
allServicesInitialized = true;
init();
}
// We don't listen on requestDataManager directly as we require the correct
// order of execution. The p2pService is handling the correct order of execution and we get called
// directly from there.
@ -217,11 +245,18 @@ public class MailboxMessageService implements HashMapChangedListener, PersistedD
// second stage starup for MailboxMessageService ... apply existing messages to their modules
public void initAfterBootstrapped() {
// Only now we start listening and processing. The p2PDataStorage is our cache for data we have received
// after the hidden service was ready.
addHashMapChangedListener();
onAdded(p2PDataStorage.getMap().values());
maybeRepublishMailBoxMessages();
initAfterBootstrapped = true;
init();
}
private void init() {
if (allServicesInitialized && initAfterBootstrapped) {
// Only now we start listening and processing. The p2PDataStorage is our cache for data we have received
// after the hidden service was ready.
addHashMapChangedListener();
onAdded(p2PDataStorage.getMap().values());
maybeRepublishMailBoxMessages();
}
}
@ -373,15 +408,21 @@ public class MailboxMessageService implements HashMapChangedListener, PersistedD
// We run the batch processing of all mailbox messages we have received at startup in a thread to not block the UI.
// For about 1000 messages decryption takes about 1 sec.
private void threadedBatchProcessMailboxEntries(Collection<ProtectedMailboxStorageEntry> protectedMailboxStorageEntries) {
ListeningExecutorService executor = Utilities.getSingleThreadListeningExecutor("processMailboxEntry-" + new Random().nextInt(1000));
long ts = System.currentTimeMillis();
ListenableFuture<Set<MailboxItem>> future = executor.submit(() -> {
var mailboxItems = getMailboxItems(protectedMailboxStorageEntries);
log.trace("Batch processing of {} mailbox entries took {} ms",
protectedMailboxStorageEntries.size(),
System.currentTimeMillis() - ts);
return mailboxItems;
});
SettableFuture<Set<MailboxItem>> future = SettableFuture.create();
new Thread(() -> {
try {
var mailboxItems = getMailboxItems(protectedMailboxStorageEntries);
log.info("Batch processing of {} mailbox entries took {} ms",
protectedMailboxStorageEntries.size(),
System.currentTimeMillis() - ts);
future.set(mailboxItems);
} catch (Throwable throwable) {
future.setException(throwable);
}
}, "processMailboxEntry-" + new Random().nextInt(1000)).start();
Futures.addCallback(future, new FutureCallback<>() {
public void onSuccess(Set<MailboxItem> decryptedMailboxMessageWithEntries) {
@ -456,7 +497,7 @@ public class MailboxMessageService implements HashMapChangedListener, PersistedD
mailboxMessage.getClass().getSimpleName(), uid, sender);
decryptedMailboxListeners.forEach(e -> e.onMailboxMessageAdded(decryptedMessageWithPubKey, sender));
if (isBootstrapped) {
if (allServicesInitialized && isBootstrapped) {
// After we notified our listeners we remove the data immediately from the network.
// In case the client has not been ready it need to take it via getMailBoxMessages.
// We do not remove the data from our local map at that moment. This has to be called explicitely from the

View file

@ -19,10 +19,10 @@ package haveno.network.p2p.network;
import haveno.network.p2p.NodeAddress;
import java.util.function.Function;
import java.util.function.Predicate;
public interface NetworkFilter {
public interface BanFilter {
boolean isPeerBanned(NodeAddress nodeAddress);
void setBannedNodeFunction(Function<NodeAddress, Boolean> isNodeAddressBanned);
void setBannedNodePredicate(Predicate<NodeAddress> isNodeAddressBanned);
}

View file

@ -17,20 +17,6 @@
package haveno.network.p2p.network;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.InvalidProtocolBufferException;
import haveno.common.Proto;
import haveno.common.UserThread;
import haveno.common.app.Capabilities;
import haveno.common.app.Capability;
import haveno.common.app.HasCapabilities;
import haveno.common.app.Version;
import haveno.common.config.Config;
import haveno.common.proto.ProtobufferException;
import haveno.common.proto.network.NetworkEnvelope;
import haveno.common.proto.network.NetworkProtoResolver;
import haveno.common.util.Utilities;
import haveno.network.p2p.BundleOfEnvelopes;
import haveno.network.p2p.CloseConnectionMessage;
import haveno.network.p2p.ExtendedDataSizePermission;
@ -41,43 +27,63 @@ import haveno.network.p2p.peers.keepalive.messages.KeepAliveMessage;
import haveno.network.p2p.storage.P2PDataStorage;
import haveno.network.p2p.storage.messages.AddDataMessage;
import haveno.network.p2p.storage.messages.AddPersistableNetworkPayloadMessage;
import haveno.network.p2p.storage.messages.RemoveDataMessage;
import haveno.network.p2p.storage.payload.CapabilityRequiringPayload;
import haveno.network.p2p.storage.payload.PersistableNetworkPayload;
import haveno.network.p2p.storage.payload.ProtectedStoragePayload;
import javafx.beans.property.ObjectProperty;
import javafx.beans.property.SimpleObjectProperty;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.Nullable;
import haveno.common.Proto;
import haveno.common.UserThread;
import haveno.common.app.Capabilities;
import haveno.common.app.HasCapabilities;
import haveno.common.app.Version;
import haveno.common.config.Config;
import haveno.common.proto.ProtobufferException;
import haveno.common.proto.network.NetworkEnvelope;
import haveno.common.proto.network.NetworkProtoResolver;
import haveno.common.util.SingleThreadExecutorUtils;
import haveno.common.util.Utilities;
import com.google.protobuf.InvalidProtocolBufferException;
import javax.inject.Inject;
import com.google.common.util.concurrent.Uninterruptibles;
import javafx.beans.property.ObjectProperty;
import javafx.beans.property.SimpleObjectProperty;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InvalidClassException;
import java.io.OptionalDataException;
import java.io.StreamCorruptedException;
import java.lang.ref.WeakReference;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.lang.ref.WeakReference;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.Nullable;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
@ -101,27 +107,32 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
private static final int PERMITTED_MESSAGE_SIZE = 200 * 1024; // 200 kb
private static final int MAX_PERMITTED_MESSAGE_SIZE = 10 * 1024 * 1024; // 10 MB (425 offers resulted in about 660 kb, mailbox msg will add more to it) offer has usually 2 kb, mailbox 3kb.
//TODO decrease limits again after testing
private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(180);
private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(240);
private static final int SHUTDOWN_TIMEOUT = 100;
public static int getPermittedMessageSize() {
return PERMITTED_MESSAGE_SIZE;
}
public static int getMaxPermittedMessageSize() {
return MAX_PERMITTED_MESSAGE_SIZE;
}
public static int getShutdownTimeout() {
return SHUTDOWN_TIMEOUT;
}
///////////////////////////////////////////////////////////////////////////////////////////
// Class fields
///////////////////////////////////////////////////////////////////////////////////////////
private final Socket socket;
// private final MessageListener messageListener;
private final ConnectionListener connectionListener;
@Nullable
private final NetworkFilter networkFilter;
private final BanFilter banFilter;
@Getter
private final String uid;
private final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(runnable -> new Thread(runnable, "Connection.java executor-service"));
// holder of state shared between InputHandler and Connection
private final ExecutorService executorService;
@Getter
private final Statistic statistic;
@Getter
@ -130,7 +141,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
private final ConnectionStatistics connectionStatistics;
// set in init
private SynchronizedProtoOutputStream protoOutputStream;
private ProtoOutputStream protoOutputStream;
// mutable data, set from other threads but not changed internally.
@Getter
@ -153,21 +164,23 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
private final Capabilities capabilities = new Capabilities();
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
Connection(Socket socket,
MessageListener messageListener,
ConnectionListener connectionListener,
@Nullable NodeAddress peersNodeAddress,
NetworkProtoResolver networkProtoResolver,
@Nullable NetworkFilter networkFilter) {
MessageListener messageListener,
ConnectionListener connectionListener,
@Nullable NodeAddress peersNodeAddress,
NetworkProtoResolver networkProtoResolver,
@Nullable BanFilter banFilter) {
this.socket = socket;
this.connectionListener = connectionListener;
this.networkFilter = networkFilter;
uid = UUID.randomUUID().toString();
this.banFilter = banFilter;
this.uid = UUID.randomUUID().toString();
this.executorService = SingleThreadExecutorUtils.getSingleThreadExecutor("Executor service for connection with uid " + uid);
statistic = new Statistic();
addMessageListener(messageListener);
@ -189,11 +202,12 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
protoOutputStream = new SynchronizedProtoOutputStream(socket.getOutputStream(), statistic);
protoInputStream = socket.getInputStream();
// We create a thread for handling inputStream data
singleThreadExecutor.submit(this);
executorService.submit(this);
if (peersNodeAddress != null) {
setPeersNodeAddress(peersNodeAddress);
if (networkFilter != null && networkFilter.isPeerBanned(peersNodeAddress)) {
if (banFilter != null && banFilter.isPeerBanned(peersNodeAddress)) {
log.warn("We created an outbound connection with a banned peer");
reportInvalidRequest(RuleViolation.PEER_BANNED);
}
}
@ -212,12 +226,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
return capabilities;
}
private final Object lock = new Object();
private final Queue<BundleOfEnvelopes> queueOfBundles = new ConcurrentLinkedQueue<>();
private final ScheduledExecutorService bundleSender = Executors.newSingleThreadScheduledExecutor();
// Called from various threads
public void sendMessage(NetworkEnvelope networkEnvelope) {
void sendMessage(NetworkEnvelope networkEnvelope) {
long ts = System.currentTimeMillis();
log.debug(">> Send networkEnvelope of type: {}", networkEnvelope.getClass().getSimpleName());
@ -226,14 +235,16 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
return;
}
if (networkFilter != null &&
if (banFilter != null &&
peersNodeAddressOptional.isPresent() &&
networkFilter.isPeerBanned(peersNodeAddressOptional.get())) {
banFilter.isPeerBanned(peersNodeAddressOptional.get())) {
log.warn("We tried to send a message to a banned peer. message={}",
networkEnvelope.getClass().getSimpleName());
reportInvalidRequest(RuleViolation.PEER_BANNED);
return;
}
if (!noCapabilityRequiredOrCapabilityIsSupported(networkEnvelope)) {
if (!testCapability(networkEnvelope)) {
log.debug("Capability for networkEnvelope is required but not supported");
return;
}
@ -244,62 +255,10 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
long elapsed = now - lastSendTimeStamp;
if (elapsed < getSendMsgThrottleTrigger()) {
log.debug("We got 2 sendMessage requests in less than {} ms. We set the thread to sleep " +
"for {} ms to avoid flooding our peer. lastSendTimeStamp={}, now={}, elapsed={}, networkEnvelope={}",
"for {} ms to avoid flooding our peer. lastSendTimeStamp={}, now={}, elapsed={}, networkEnvelope={}",
getSendMsgThrottleTrigger(), getSendMsgThrottleSleep(), lastSendTimeStamp, now, elapsed,
networkEnvelope.getClass().getSimpleName());
// check if BundleOfEnvelopes is supported
if (getCapabilities().containsAll(new Capabilities(Capability.BUNDLE_OF_ENVELOPES))) {
synchronized (lock) {
// check if current envelope fits size
// - no? create new envelope
int size = !queueOfBundles.isEmpty() ? queueOfBundles.element().toProtoNetworkEnvelope().getSerializedSize() + networkEnvelopeSize : 0;
if (queueOfBundles.isEmpty() || size > MAX_PERMITTED_MESSAGE_SIZE * 0.9) {
// - no? create a bucket
queueOfBundles.add(new BundleOfEnvelopes());
// - and schedule it for sending
lastSendTimeStamp += getSendMsgThrottleSleep();
bundleSender.schedule(() -> {
if (!stopped) {
synchronized (lock) {
BundleOfEnvelopes bundle = queueOfBundles.poll();
if (bundle != null && !stopped) {
NetworkEnvelope envelope;
int msgSize;
if (bundle.getEnvelopes().size() == 1) {
envelope = bundle.getEnvelopes().get(0);
msgSize = envelope.toProtoNetworkEnvelope().getSerializedSize();
} else {
envelope = bundle;
msgSize = networkEnvelopeSize;
}
try {
protoOutputStream.writeEnvelope(envelope);
UserThread.execute(() -> messageListeners.forEach(e -> e.onMessageSent(envelope, this)));
UserThread.execute(() -> connectionStatistics.addSendMsgMetrics(System.currentTimeMillis() - ts, msgSize));
} catch (Throwable t) {
log.error("Sending envelope of class {} to address {} " +
"failed due {}",
envelope.getClass().getSimpleName(),
this.getPeersNodeAddressOptional(),
t.toString());
log.error("envelope: {}", envelope);
}
}
}
}
}, lastSendTimeStamp - now, TimeUnit.MILLISECONDS);
}
// - yes? add to bucket
queueOfBundles.element().add(networkEnvelope);
}
return;
}
Thread.sleep(getSendMsgThrottleSleep());
}
@ -312,44 +271,57 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
}
} catch (Throwable t) {
handleException(t);
throw new RuntimeException(t);
}
}
// TODO: If msg is BundleOfEnvelopes we should check each individual message for capability and filter out those
// which fail.
public boolean noCapabilityRequiredOrCapabilityIsSupported(Proto msg) {
boolean result;
if (msg instanceof AddDataMessage) {
final ProtectedStoragePayload protectedStoragePayload = (((AddDataMessage) msg).getProtectedStorageEntry()).getProtectedStoragePayload();
result = !(protectedStoragePayload instanceof CapabilityRequiringPayload);
if (!result)
result = capabilities.containsAll(((CapabilityRequiringPayload) protectedStoragePayload).getRequiredCapabilities());
} else if (msg instanceof AddPersistableNetworkPayloadMessage) {
final PersistableNetworkPayload persistableNetworkPayload = ((AddPersistableNetworkPayloadMessage) msg).getPersistableNetworkPayload();
result = !(persistableNetworkPayload instanceof CapabilityRequiringPayload);
if (!result)
result = capabilities.containsAll(((CapabilityRequiringPayload) persistableNetworkPayload).getRequiredCapabilities());
} else if (msg instanceof CapabilityRequiringPayload) {
result = capabilities.containsAll(((CapabilityRequiringPayload) msg).getRequiredCapabilities());
} else {
result = true;
public boolean testCapability(NetworkEnvelope networkEnvelope) {
if (networkEnvelope instanceof BundleOfEnvelopes) {
// We remove elements in the list which fail the capability test
BundleOfEnvelopes bundleOfEnvelopes = (BundleOfEnvelopes) networkEnvelope;
updateBundleOfEnvelopes(bundleOfEnvelopes);
// If the bundle is empty we dont send the networkEnvelope
return !bundleOfEnvelopes.getEnvelopes().isEmpty();
}
return extractCapabilityRequiringPayload(networkEnvelope)
.map(this::testCapability)
.orElse(true);
}
private boolean testCapability(CapabilityRequiringPayload capabilityRequiringPayload) {
boolean result = capabilities.containsAll(capabilityRequiringPayload.getRequiredCapabilities());
if (!result) {
if (capabilities.size() > 1) {
Proto data = msg;
if (msg instanceof AddDataMessage) {
data = ((AddDataMessage) msg).getProtectedStorageEntry().getProtectedStoragePayload();
}
// Monitoring nodes have only one capability set, we don't want to log those
log.debug("We did not send the message because the peer does not support our required capabilities. " +
"messageClass={}, peer={}, peers supportedCapabilities={}",
data.getClass().getSimpleName(), peersNodeAddressOptional, capabilities);
}
log.debug("We did not send {} because capabilities are not supported.",
capabilityRequiringPayload.getClass().getSimpleName());
}
return result;
}
private void updateBundleOfEnvelopes(BundleOfEnvelopes bundleOfEnvelopes) {
List<NetworkEnvelope> toRemove = bundleOfEnvelopes.getEnvelopes().stream()
.filter(networkEnvelope -> !testCapability(networkEnvelope))
.collect(Collectors.toList());
bundleOfEnvelopes.getEnvelopes().removeAll(toRemove);
}
private Optional<CapabilityRequiringPayload> extractCapabilityRequiringPayload(Proto proto) {
Proto candidate = proto;
// Lets check if our networkEnvelope is a wrapped data structure
if (proto instanceof AddDataMessage) {
candidate = (((AddDataMessage) proto).getProtectedStorageEntry()).getProtectedStoragePayload();
} else if (proto instanceof RemoveDataMessage) {
candidate = (((RemoveDataMessage) proto).getProtectedStorageEntry()).getProtectedStoragePayload();
} else if (proto instanceof AddPersistableNetworkPayloadMessage) {
candidate = (((AddPersistableNetworkPayloadMessage) proto).getPersistableNetworkPayload());
}
if (candidate instanceof CapabilityRequiringPayload) {
return Optional.of((CapabilityRequiringPayload) candidate);
}
return Optional.empty();
}
public void addMessageListener(MessageListener messageListener) {
boolean isNewEntry = messageListeners.add(messageListener);
if (!isNewEntry)
@ -434,9 +406,12 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
List<NetworkEnvelope> networkEnvelopes = bundleOfEnvelopes.getEnvelopes();
for (NetworkEnvelope networkEnvelope : networkEnvelopes) {
// If SendersNodeAddressMessage we do some verifications and apply if successful, otherwise we return false.
if (networkEnvelope instanceof SendersNodeAddressMessage &&
!processSendersNodeAddressMessage((SendersNodeAddressMessage) networkEnvelope)) {
continue;
if (networkEnvelope instanceof SendersNodeAddressMessage) {
boolean isValid = processSendersNodeAddressMessage((SendersNodeAddressMessage) networkEnvelope);
if (!isValid) {
log.warn("Received an invalid {} at processing BundleOfEnvelopes", networkEnvelope.getClass().getSimpleName());
continue;
}
}
if (networkEnvelope instanceof AddPersistableNetworkPayloadMessage) {
@ -461,7 +436,6 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
messageListeners.forEach(listener -> listener.onMessage(envelope, connection))));
}
///////////////////////////////////////////////////////////////////////////////////////////
// Setters
///////////////////////////////////////////////////////////////////////////////////////////
@ -481,7 +455,6 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
peersNodeAddressProperty.set(peerNodeAddress);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Getters
///////////////////////////////////////////////////////////////////////////////////////////
@ -499,8 +472,8 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
}
public void shutDown(CloseConnectionReason closeConnectionReason, @Nullable Runnable shutDownCompleteHandler) {
log.debug("shutDown: nodeAddressOpt={}, closeConnectionReason={}",
this.peersNodeAddressOptional.orElse(null), closeConnectionReason);
log.debug("shutDown: peersNodeAddressOptional={}, closeConnectionReason={}",
peersNodeAddressOptional, closeConnectionReason);
connectionState.shutDown();
@ -522,7 +495,6 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
stopped = true;
//noinspection UnstableApiUsage
Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
} catch (Throwable t) {
log.error(t.getMessage());
@ -544,38 +516,33 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
}
private void doShutDown(CloseConnectionReason closeConnectionReason, @Nullable Runnable shutDownCompleteHandler) {
UserThread.execute(() -> {
connectionListener.onDisconnect(closeConnectionReason, this);
// Use UserThread.execute as it's not clear if that is called from a non-UserThread
UserThread.execute(() -> connectionListener.onDisconnect(closeConnectionReason, this));
try {
protoOutputStream.onConnectionShutdown();
socket.close();
} catch (SocketException e) {
log.trace("SocketException at shutdown might be expected {}", e.getMessage());
} catch (IOException e) {
log.error("Exception at shutdown. " + e.getMessage());
e.printStackTrace();
} finally {
capabilitiesListeners.clear();
try {
socket.close();
} catch (SocketException e) {
log.trace("SocketException at shutdown might be expected {}", e.getMessage());
protoInputStream.close();
} catch (IOException e) {
log.error("Exception at shutdown. " + e.getMessage());
log.error(e.getMessage());
e.printStackTrace();
} finally {
protoOutputStream.onConnectionShutdown();
capabilitiesListeners.clear();
try {
protoInputStream.close();
} catch (IOException e) {
log.error(e.getMessage());
e.printStackTrace();
}
//noinspection UnstableApiUsage
MoreExecutors.shutdownAndAwaitTermination(singleThreadExecutor, 500, TimeUnit.MILLISECONDS);
//noinspection UnstableApiUsage
MoreExecutors.shutdownAndAwaitTermination(bundleSender, 500, TimeUnit.MILLISECONDS);
log.debug("Connection shutdown complete {}", this.toString());
// Use UserThread.execute as its not clear if that is called from a non-UserThread
if (shutDownCompleteHandler != null)
UserThread.execute(shutDownCompleteHandler);
}
});
Utilities.shutdownAndAwaitTermination(executorService, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
log.debug("Connection shutdown complete {}", this);
// Use UserThread.execute as it's not clear if that is called from a non-UserThread
if (shutDownCompleteHandler != null)
UserThread.execute(shutDownCompleteHandler);
}
}
@Override
@ -623,7 +590,6 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
'}';
}
///////////////////////////////////////////////////////////////////////////////////////////
// SharedSpace
///////////////////////////////////////////////////////////////////////////////////////////
@ -633,9 +599,8 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
* Runs in same thread as Connection
*/
public boolean reportInvalidRequest(RuleViolation ruleViolation) {
log.warn("We got reported the ruleViolation {} at connection {}", ruleViolation, this);
log.info("We got reported the ruleViolation {} at connection with address{} and uid {}", ruleViolation, this.getPeersNodeAddressProperty(), this.getUid());
int numRuleViolations;
numRuleViolations = ruleViolations.getOrDefault(ruleViolation, 0);
@ -643,14 +608,13 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
ruleViolations.put(ruleViolation, numRuleViolations);
if (numRuleViolations >= ruleViolation.maxTolerance) {
log.warn("We close connection as we received too many corrupt requests.\n" +
"numRuleViolations={}\n\t" +
"corruptRequest={}\n\t" +
"corruptRequests={}\n\t" +
"connection={}", numRuleViolations, ruleViolation, ruleViolations.toString(), this);
log.warn("We close connection as we received too many corrupt requests. " +
"ruleViolations={} " +
"connection with address{} and uid {}", ruleViolations, peersNodeAddressProperty, uid);
this.ruleViolation = ruleViolation;
if (ruleViolation == RuleViolation.PEER_BANNED) {
log.warn("We close connection due RuleViolation.PEER_BANNED. peersNodeAddress={}", getPeersNodeAddressOptional());
log.debug("We close connection due RuleViolation.PEER_BANNED. peersNodeAddress={}",
getPeersNodeAddressOptional());
shutDown(CloseConnectionReason.PEER_BANNED);
} else if (ruleViolation == RuleViolation.INVALID_CLASS) {
log.warn("We close connection due RuleViolation.INVALID_CLASS");
@ -682,23 +646,22 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
log.info("SocketException (expected if connection lost). closeConnectionReason={}; connection={}", closeConnectionReason, this);
} else if (e instanceof SocketTimeoutException || e instanceof TimeoutException) {
closeConnectionReason = CloseConnectionReason.SOCKET_TIMEOUT;
log.info("Shut down caused by exception {} on connection={}", e.toString(), this);
log.info("Shut down caused by exception {} on connection={}", e, this);
} else if (e instanceof EOFException) {
closeConnectionReason = CloseConnectionReason.TERMINATED;
log.warn("Shut down caused by exception {} on connection={}", e.toString(), this);
log.warn("Shut down caused by exception {} on connection={}", e, this);
} else if (e instanceof OptionalDataException || e instanceof StreamCorruptedException) {
closeConnectionReason = CloseConnectionReason.CORRUPTED_DATA;
log.warn("Shut down caused by exception {} on connection={}", e.toString(), this);
log.warn("Shut down caused by exception {} on connection={}", e, this);
} else {
// TODO sometimes we get StreamCorruptedException, OptionalDataException, IllegalStateException
closeConnectionReason = CloseConnectionReason.UNKNOWN_EXCEPTION;
log.warn("Unknown reason for exception at socket: {}\n\t" +
"peer={}\n\t" +
"Exception={}",
"peer={}\n\t" +
"Exception={}",
socket.toString(),
this.peersNodeAddressOptional,
e.toString());
e.printStackTrace();
}
shutDown(closeConnectionReason);
}
@ -718,7 +681,8 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
setPeersNodeAddress(senderNodeAddress);
}
if (networkFilter != null && networkFilter.isPeerBanned(senderNodeAddress)) {
if (banFilter != null && banFilter.isPeerBanned(senderNodeAddress)) {
log.warn("We got a message from a banned peer. message={}", sendersNodeAddressMessage.getClass().getSimpleName());
reportInvalidRequest(RuleViolation.PEER_BANNED);
return false;
}
@ -742,10 +706,10 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
@Override
public void run() {
try {
Thread.currentThread().setName("InputHandler");
Thread.currentThread().setName("InputHandler-" + Utilities.toTruncatedString(uid, 15));
while (!stopped && !Thread.currentThread().isInterrupted()) {
if (!threadNameSet && getPeersNodeAddressOptional().isPresent()) {
Thread.currentThread().setName("InputHandler-" + getPeersNodeAddressOptional().get().getFullAddress());
Thread.currentThread().setName("InputHandler-" + Utilities.toTruncatedString(getPeersNodeAddressOptional().get().getFullAddress(), 15));
threadNameSet = true;
}
try {
@ -769,8 +733,11 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
}
if (proto == null) {
if (stopped) {
return;
}
if (protoInputStream.read() == -1) {
log.warn("proto is null because protoInputStream.read()=-1 (EOF). That is expected if client got stopped without proper shutdown."); // TODO (woodser): why is this warning printing on shutdown?
log.warn("proto is null because protoInputStream.read()=-1 (EOF). That is expected if client got stopped without proper shutdown.");
} else {
log.warn("proto is null. protoInputStream.read()=" + protoInputStream.read());
}
@ -778,9 +745,10 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
return;
}
if (networkFilter != null &&
if (banFilter != null &&
peersNodeAddressOptional.isPresent() &&
networkFilter.isPeerBanned(peersNodeAddressOptional.get())) {
banFilter.isPeerBanned(peersNodeAddressOptional.get())) {
log.warn("We got a message from a banned peer. proto={}", Utilities.toTruncatedString(proto));
reportInvalidRequest(RuleViolation.PEER_BANNED);
return;
}
@ -789,8 +757,8 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
long now = System.currentTimeMillis();
long elapsed = now - lastReadTimeStamp;
if (elapsed < 10) {
log.info("We got 2 network messages received in less than 10 ms. We set the thread to sleep " +
"for 20 ms to avoid getting flooded by our peer. lastReadTimeStamp={}, now={}, elapsed={}",
log.debug("We got 2 network messages received in less than 10 ms. We set the thread to sleep " +
"for 20 ms to avoid getting flooded by our peer. lastReadTimeStamp={}, now={}, elapsed={}",
lastReadTimeStamp, now, elapsed);
Thread.sleep(20);
}
@ -837,7 +805,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
if (!proto.getMessageVersion().equals(Version.getP2PMessageVersion())
&& reportInvalidRequest(RuleViolation.WRONG_NETWORK_ID)) {
log.warn("RuleViolation.WRONG_NETWORK_ID. version of message={}, app version={}, " +
"proto.toTruncatedString={}", proto.getMessageVersion(),
"proto.toTruncatedString={}", proto.getMessageVersion(),
Version.getP2PMessageVersion(),
Utilities.toTruncatedString(proto.toString()));
return;
@ -855,7 +823,8 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
if (CloseConnectionReason.PEER_BANNED.name().equals(proto.getCloseConnectionMessage().getReason())) {
log.warn("We got shut down because we are banned by the other peer. " +
"(InputHandler.run CloseConnectionMessage). Peer: {}", getPeersNodeAddressOptional());
"(InputHandler.run CloseConnectionMessage). Peer: {}",
getPeersNodeAddressOptional());
}
shutDown(CloseConnectionReason.CLOSE_REQUESTED_BY_PEER);
return;
@ -866,9 +835,16 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
// If SendersNodeAddressMessage we do some verifications and apply if successful,
// otherwise we return false.
if (networkEnvelope instanceof SendersNodeAddressMessage &&
!processSendersNodeAddressMessage((SendersNodeAddressMessage) networkEnvelope)) {
return;
if (networkEnvelope instanceof SendersNodeAddressMessage) {
boolean isValid = processSendersNodeAddressMessage((SendersNodeAddressMessage) networkEnvelope);
if (!isValid) {
return;
}
}
if (!(networkEnvelope instanceof SendersNodeAddressMessage) && peersNodeAddressOptional.isEmpty()) {
log.info("We got a {} from a peer with yet unknown address on connection with uid={}",
networkEnvelope.getClass().getSimpleName(), uid);
}
onMessage(networkEnvelope, this);
@ -880,7 +856,6 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
reportInvalidRequest(RuleViolation.INVALID_CLASS);
} catch (ProtobufferException | NoClassDefFoundError | InvalidProtocolBufferException e) {
log.error(e.getMessage());
e.printStackTrace();
reportInvalidRequest(RuleViolation.INVALID_DATA_TYPE);
} catch (Throwable t) {
handleException(t);

View file

@ -21,7 +21,4 @@ public interface ConnectionListener {
void onConnection(Connection connection);
void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection);
//TODO is never called, can be removed
void onError(Throwable throwable);
}

View file

@ -18,16 +18,17 @@
package haveno.network.p2p.network;
import haveno.common.proto.network.NetworkProtoResolver;
import org.jetbrains.annotations.Nullable;
import java.net.Socket;
import org.jetbrains.annotations.Nullable;
public class InboundConnection extends Connection {
public InboundConnection(Socket socket,
MessageListener messageListener,
ConnectionListener connectionListener,
NetworkProtoResolver networkProtoResolver,
@Nullable NetworkFilter networkFilter) {
super(socket, messageListener, connectionListener, null, networkProtoResolver, networkFilter);
MessageListener messageListener,
ConnectionListener connectionListener,
NetworkProtoResolver networkProtoResolver,
@Nullable BanFilter banFilter) {
super(socket, messageListener, connectionListener, null, networkProtoResolver, banFilter);
}
}

View file

@ -17,17 +17,22 @@
package haveno.network.p2p.network;
import haveno.network.p2p.NodeAddress;
import haveno.common.UserThread;
import haveno.common.proto.network.NetworkProtoResolver;
import haveno.network.p2p.NodeAddress;
import org.jetbrains.annotations.Nullable;
import java.net.ServerSocket;
import java.net.Socket;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.TimeUnit;
import org.jetbrains.annotations.Nullable;
// Run in UserThread
public class LocalhostNetworkNode extends NetworkNode {
@ -44,15 +49,15 @@ public class LocalhostNetworkNode extends NetworkNode {
LocalhostNetworkNode.simulateTorDelayHiddenService = simulateTorDelayHiddenService;
}
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
public LocalhostNetworkNode(int port,
NetworkProtoResolver networkProtoResolver,
@Nullable NetworkFilter networkFilter) {
super(port, networkProtoResolver, networkFilter);
NetworkProtoResolver networkProtoResolver,
@Nullable BanFilter banFilter,
int maxConnections) {
super(port, networkProtoResolver, banFilter, maxConnections);
}
@Override
@ -60,8 +65,6 @@ public class LocalhostNetworkNode extends NetworkNode {
if (setupListener != null)
addSetupListener(setupListener);
createExecutorService();
// simulate tor connection delay
UserThread.runAfter(() -> {
nodeAddressProperty.set(new NodeAddress("localhost", servicePort));

View file

@ -17,41 +17,50 @@
package haveno.network.p2p.network;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.runjva.sourceforge.jsocks.protocol.Socks5Proxy;
import haveno.network.p2p.NodeAddress;
import haveno.common.Timer;
import haveno.common.UserThread;
import haveno.common.app.Capabilities;
import haveno.common.proto.network.NetworkEnvelope;
import haveno.common.proto.network.NetworkProtoResolver;
import haveno.common.util.Utilities;
import haveno.network.p2p.NodeAddress;
import com.runjva.sourceforge.jsocks.protocol.Socks5Proxy;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import javafx.beans.property.ObjectProperty;
import javafx.beans.property.ReadOnlyObjectProperty;
import javafx.beans.property.SimpleObjectProperty;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.io.IOException;
import java.util.Date;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static com.google.common.base.Preconditions.checkNotNull;
// Run in UserThread
@ -62,13 +71,14 @@ public abstract class NetworkNode implements MessageListener {
final int servicePort;
private final NetworkProtoResolver networkProtoResolver;
@Nullable
private final NetworkFilter networkFilter;
private final BanFilter banFilter;
private final CopyOnWriteArraySet<InboundConnection> inBoundConnections = new CopyOnWriteArraySet<>();
private final CopyOnWriteArraySet<MessageListener> messageListeners = new CopyOnWriteArraySet<>();
private final CopyOnWriteArraySet<ConnectionListener> connectionListeners = new CopyOnWriteArraySet<>();
final CopyOnWriteArraySet<SetupListener> setupListeners = new CopyOnWriteArraySet<>();
ListeningExecutorService executorService;
private final ListeningExecutorService connectionExecutor;
private final ListeningExecutorService sendMessageExecutor;
private Server server;
private volatile boolean shutDownInProgress;
@ -76,31 +86,44 @@ public abstract class NetworkNode implements MessageListener {
private final CopyOnWriteArraySet<OutboundConnection> outBoundConnections = new CopyOnWriteArraySet<>();
protected final ObjectProperty<NodeAddress> nodeAddressProperty = new SimpleObjectProperty<>();
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
NetworkNode(int servicePort,
NetworkProtoResolver networkProtoResolver,
@Nullable NetworkFilter networkFilter) {
NetworkProtoResolver networkProtoResolver,
@Nullable BanFilter banFilter,
int maxConnections) {
this.servicePort = servicePort;
this.networkProtoResolver = networkProtoResolver;
this.networkFilter = networkFilter;
this.banFilter = banFilter;
connectionExecutor = Utilities.getListeningExecutorService("NetworkNode.connection",
maxConnections * 2,
maxConnections * 3,
30,
30);
sendMessageExecutor = Utilities.getListeningExecutorService("NetworkNode.sendMessage",
maxConnections * 2,
maxConnections * 3,
30,
30);
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
// Calls this (and other registered) setup listener's ``onTorNodeReady()`` and ``onHiddenServicePublished``
// Calls this (and other registered) setup listener's ``onTorNodeReady()`` and
// ``onHiddenServicePublished``
// when the events happen.
public abstract void start(@Nullable SetupListener setupListener);
public SettableFuture<Connection> sendMessage(@NotNull NodeAddress peersNodeAddress,
NetworkEnvelope networkEnvelope) {
NetworkEnvelope networkEnvelope) {
log.debug("Send {} to {}. Message details: {}",
networkEnvelope.getClass().getSimpleName(), peersNodeAddress, Utilities.toTruncatedString(networkEnvelope));
networkEnvelope.getClass().getSimpleName(), peersNodeAddress,
Utilities.toTruncatedString(networkEnvelope));
checkNotNull(peersNodeAddress, "peerAddress must not be null");
@ -114,100 +137,91 @@ public abstract class NetworkNode implements MessageListener {
log.debug("We have not found any connection for peerAddress {}.\n\t" +
"We will create a new outbound connection.", peersNodeAddress);
final SettableFuture<Connection> resultFuture = SettableFuture.create();
ListenableFuture<Connection> future = executorService.submit(() -> {
Thread.currentThread().setName("NetworkNode:SendMessage-to-" + peersNodeAddress.getFullAddress());
SettableFuture<Connection> resultFuture = SettableFuture.create();
ListenableFuture<Connection> future = connectionExecutor.submit(() -> {
Thread.currentThread().setName("NetworkNode.connectionExecutor:SendMessage-to-"
+ Utilities.toTruncatedString(peersNodeAddress.getFullAddress(), 15));
if (peersNodeAddress.equals(getNodeAddress())) {
log.warn("We are sending a message to ourselves");
}
OutboundConnection outboundConnection;
try {
// can take a while when using tor
long startTs = System.currentTimeMillis();
// can take a while when using tor
long startTs = System.currentTimeMillis();
log.debug("Start create socket to peersNodeAddress {}", peersNodeAddress.getFullAddress());
log.debug("Start create socket to peersNodeAddress {}", peersNodeAddress.getFullAddress());
Socket socket = createSocket(peersNodeAddress);
long duration = System.currentTimeMillis() - startTs;
log.info("Socket creation to peersNodeAddress {} took {} ms", peersNodeAddress.getFullAddress(),
duration);
Socket socket = createSocket(peersNodeAddress);
long duration = System.currentTimeMillis() - startTs;
log.info("Socket creation to peersNodeAddress {} took {} ms", peersNodeAddress.getFullAddress(),
duration);
if (duration > CREATE_SOCKET_TIMEOUT)
throw new TimeoutException("A timeout occurred when creating a socket.");
if (duration > CREATE_SOCKET_TIMEOUT)
throw new TimeoutException("A timeout occurred when creating a socket.");
// Tor needs sometimes quite long to create a connection. To avoid that we get too many double-
// sided connections we check again if we still don't have any connection for that node address.
Connection existingConnection = getInboundConnection(peersNodeAddress);
if (existingConnection == null)
existingConnection = getOutboundConnection(peersNodeAddress);
// Tor needs sometimes quite long to create a connection. To avoid that we get
// too many
// connections with the same peer we check again if we still don't have any
// connection for that node address.
Connection existingConnection = getInboundConnection(peersNodeAddress);
if (existingConnection == null)
existingConnection = getOutboundConnection(peersNodeAddress);
if (existingConnection != null) {
log.debug("We found in the meantime a connection for peersNodeAddress {}, " +
"so we use that for sending the message.\n" +
"That can happen if Tor needs long for creating a new outbound connection.\n" +
"We might have got a new inbound or outbound connection.",
peersNodeAddress.getFullAddress());
if (existingConnection != null) {
log.debug("We found in the meantime a connection for peersNodeAddress {}, " +
"so we use that for sending the message.\n" +
"That can happen if Tor needs long for creating a new outbound connection.\n" +
"We might have got a new inbound or outbound connection.",
peersNodeAddress.getFullAddress());
try {
socket.close();
} catch (Throwable throwable) {
try {
socket.close();
} catch (Throwable throwable) {
if (!shutDownInProgress) {
log.error("Error at closing socket " + throwable);
}
existingConnection.sendMessage(networkEnvelope);
return existingConnection;
} else {
final ConnectionListener connectionListener = new ConnectionListener() {
@Override
public void onConnection(Connection connection) {
if (!connection.isStopped()) {
outBoundConnections.add((OutboundConnection) connection);
printOutBoundConnections();
connectionListeners.forEach(e -> e.onConnection(connection));
}
}
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason,
Connection connection) {
//noinspection SuspiciousMethodCalls
outBoundConnections.remove(connection);
}
existingConnection.sendMessage(networkEnvelope);
return existingConnection;
} else {
ConnectionListener connectionListener = new ConnectionListener() {
@Override
public void onConnection(Connection connection) {
if (!connection.isStopped()) {
outBoundConnections.add((OutboundConnection) connection);
printOutBoundConnections();
connectionListeners.forEach(e -> e.onDisconnect(closeConnectionReason, connection));
connectionListeners.forEach(e -> e.onConnection(connection));
}
@Override
public void onError(Throwable throwable) {
log.error("new OutboundConnection.ConnectionListener.onError " + throwable.getMessage());
connectionListeners.forEach(e -> e.onError(throwable));
}
};
outboundConnection = new OutboundConnection(socket,
NetworkNode.this,
connectionListener,
peersNodeAddress,
networkProtoResolver,
networkFilter);
if (log.isDebugEnabled()) {
log.debug("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" +
"NetworkNode created new outbound connection:"
+ "\nmyNodeAddress=" + getNodeAddress()
+ "\npeersNodeAddress=" + peersNodeAddress
+ "\nuid=" + outboundConnection.getUid()
+ "\nmessage=" + networkEnvelope
+ "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n");
}
// can take a while when using tor
outboundConnection.sendMessage(networkEnvelope);
return outboundConnection;
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason,
Connection connection) {
// noinspection SuspiciousMethodCalls
outBoundConnections.remove(connection);
printOutBoundConnections();
connectionListeners.forEach(e -> e.onDisconnect(closeConnectionReason, connection));
}
};
outboundConnection = new OutboundConnection(socket,
NetworkNode.this,
connectionListener,
peersNodeAddress,
networkProtoResolver,
banFilter);
if (log.isDebugEnabled()) {
log.debug("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" +
"NetworkNode created new outbound connection:"
+ "\nmyNodeAddress=" + getNodeAddress()
+ "\npeersNodeAddress=" + peersNodeAddress
+ "\nuid=" + outboundConnection.getUid()
+ "\nmessage=" + networkEnvelope
+ "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n");
}
} catch (Throwable throwable) {
if (!(throwable instanceof IOException || throwable instanceof TimeoutException)) {
log.warn("Executing task failed. " + throwable.getMessage());
}
throw throwable;
// can take a while when using tor
outboundConnection.sendMessage(networkEnvelope);
return outboundConnection;
}
});
@ -218,7 +232,12 @@ public abstract class NetworkNode implements MessageListener {
public void onFailure(@NotNull Throwable throwable) {
log.debug("onFailure at sendMessage: peersNodeAddress={}\n\tmessage={}\n\tthrowable={}", peersNodeAddress, networkEnvelope.getClass().getSimpleName(), throwable.toString());
UserThread.execute(() -> resultFuture.setException(throwable));
UserThread.execute(() -> {
if (!resultFuture.setException(throwable)) {
// In case the setException returns false we need to cancel the future.
resultFuture.cancel(true);
}
});
}
}, MoreExecutors.directExecutor());
@ -267,25 +286,49 @@ public abstract class NetworkNode implements MessageListener {
return null;
}
public SettableFuture<Connection> sendMessage(Connection connection, NetworkEnvelope networkEnvelope) {
// connection.sendMessage might take a bit (compression, write to stream), so we use a thread to not block
ListenableFuture<Connection> future = executorService.submit(() -> {
String id = connection.getPeersNodeAddressOptional().isPresent() ? connection.getPeersNodeAddressOptional().get().getFullAddress() : connection.getUid();
Thread.currentThread().setName("NetworkNode:SendMessage-to-" + id);
connection.sendMessage(networkEnvelope);
return connection;
});
final SettableFuture<Connection> resultFuture = SettableFuture.create();
Futures.addCallback(future, new FutureCallback<Connection>() {
public void onSuccess(Connection connection) {
UserThread.execute(() -> resultFuture.set(connection));
}
return sendMessage(connection, networkEnvelope, sendMessageExecutor);
}
public void onFailure(@NotNull Throwable throwable) {
UserThread.execute(() -> resultFuture.setException(throwable));
}
}, MoreExecutors.directExecutor());
public SettableFuture<Connection> sendMessage(Connection connection,
NetworkEnvelope networkEnvelope,
ListeningExecutorService executor) {
SettableFuture<Connection> resultFuture = SettableFuture.create();
try {
ListenableFuture<Connection> future = executor.submit(() -> {
String id = connection.getPeersNodeAddressOptional().isPresent() ?
connection.getPeersNodeAddressOptional().get().getFullAddress() :
connection.getUid();
Thread.currentThread().setName("NetworkNode:SendMessage-to-" + Utilities.toTruncatedString(id, 15));
connection.sendMessage(networkEnvelope);
return connection;
});
Futures.addCallback(future, new FutureCallback<>() {
public void onSuccess(Connection connection) {
UserThread.execute(() -> resultFuture.set(connection));
}
public void onFailure(@NotNull Throwable throwable) {
UserThread.execute(() -> {
if (!resultFuture.setException(throwable)) {
// In case the setException returns false we need to cancel the future.
resultFuture.cancel(true);
}
});
}
}, MoreExecutors.directExecutor());
} catch (RejectedExecutionException exception) {
log.error("RejectedExecutionException at sendMessage: ", exception);
UserThread.execute(() -> {
if (!resultFuture.setException(exception)) {
// In case the setException returns false we need to cancel the future.
resultFuture.cancel(true);
}
});
}
return resultFuture;
}
@ -316,7 +359,6 @@ public abstract class NetworkNode implements MessageListener {
.collect(Collectors.toSet());
}
public void shutDown(Runnable shutDownCompleteHandler) {
if (!shutDownInProgress) {
shutDownInProgress = true;
@ -344,7 +386,7 @@ public abstract class NetworkNode implements MessageListener {
log.info("Shutdown completed due timeout");
shutDownCompleteHandler.run();
}
}, 3);
}, 1500, TimeUnit.MILLISECONDS);
allConnections.forEach(c -> c.shutDown(CloseConnectionReason.APP_SHUT_DOWN,
() -> {
@ -353,6 +395,8 @@ public abstract class NetworkNode implements MessageListener {
if (shutdownCompleted.get() == numConnections) {
log.info("Shutdown completed with all connections closed");
timeoutHandler.stop();
connectionExecutor.shutdownNow();
sendMessageExecutor.shutdownNow();
if (shutDownCompleteHandler != null) {
shutDownCompleteHandler.run();
}
@ -361,7 +405,6 @@ public abstract class NetworkNode implements MessageListener {
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// SetupListener
///////////////////////////////////////////////////////////////////////////////////////////
@ -372,17 +415,15 @@ public abstract class NetworkNode implements MessageListener {
log.warn("Try to add a setupListener which was already added.");
}
///////////////////////////////////////////////////////////////////////////////////////////
// MessageListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
messageListeners.forEach(e -> e.onMessage(networkEnvelope, connection));
messageListeners.stream().forEach(e -> e.onMessage(networkEnvelope, connection));
}
///////////////////////////////////////////////////////////////////////////////////////////
// Listeners
///////////////////////////////////////////////////////////////////////////////////////////
@ -390,8 +431,8 @@ public abstract class NetworkNode implements MessageListener {
public void addConnectionListener(ConnectionListener connectionListener) {
boolean isNewEntry = connectionListeners.add(connectionListener);
if (!isNewEntry)
log.warn("Try to add a connectionListener which was already added.\n\tconnectionListener={}\n\tconnectionListeners={}"
, connectionListener, connectionListeners);
log.warn("Try to add a connectionListener which was already added.\n\tconnectionListener={}\n\tconnectionListeners={}",
connectionListener, connectionListeners);
}
public void removeConnectionListener(ConnectionListener connectionListener) {
@ -414,48 +455,36 @@ public abstract class NetworkNode implements MessageListener {
"That might happen because of async behaviour of CopyOnWriteArraySet");
}
///////////////////////////////////////////////////////////////////////////////////////////
// Protected
///////////////////////////////////////////////////////////////////////////////////////////
void createExecutorService() {
if (executorService == null)
executorService = Utilities.getListeningExecutorService("NetworkNode-" + servicePort, 15, 30, 60);
}
void startServer(ServerSocket serverSocket) {
final ConnectionListener connectionListener = new ConnectionListener() {
ConnectionListener connectionListener = new ConnectionListener() {
@Override
public void onConnection(Connection connection) {
if (!connection.isStopped()) {
inBoundConnections.add((InboundConnection) connection);
printInboundConnections();
connectionListeners.forEach(e -> e.onConnection(connection));
connectionListeners.stream().forEach(e -> e.onConnection(connection));
}
}
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
log.trace("onDisconnect at server socket connectionListener\n\tconnection={}", connection);
//noinspection SuspiciousMethodCalls
// noinspection SuspiciousMethodCalls
inBoundConnections.remove(connection);
printInboundConnections();
connectionListeners.forEach(e -> e.onDisconnect(closeConnectionReason, connection));
}
@Override
public void onError(Throwable throwable) {
log.error("server.ConnectionListener.onError " + throwable.getMessage());
connectionListeners.forEach(e -> e.onError(throwable));
connectionListeners.stream().forEach(e -> e.onDisconnect(closeConnectionReason, connection));
}
};
server = new Server(serverSocket,
NetworkNode.this,
connectionListener,
networkProtoResolver,
networkFilter);
executorService.submit(server);
banFilter);
server.start();
}
private Optional<OutboundConnection> lookupOutBoundConnection(NodeAddress peersNodeAddress) {
@ -463,13 +492,14 @@ public abstract class NetworkNode implements MessageListener {
printOutBoundConnections();
return outBoundConnections.stream()
.filter(connection -> connection.hasPeersNodeAddress() &&
peersNodeAddress.equals(connection.getPeersNodeAddressOptional().get())).findAny();
peersNodeAddress.equals(connection.getPeersNodeAddressOptional().get()))
.findAny();
}
private void printOutBoundConnections() {
StringBuilder sb = new StringBuilder("outBoundConnections size()=")
.append(outBoundConnections.size()).append("\n\toutBoundConnections=");
outBoundConnections.forEach(e -> sb.append(e).append("\n\t"));
outBoundConnections.stream().forEach(e -> sb.append(e).append("\n\t"));
log.debug(sb.toString());
}
@ -478,13 +508,14 @@ public abstract class NetworkNode implements MessageListener {
printInboundConnections();
return inBoundConnections.stream()
.filter(connection -> connection.hasPeersNodeAddress() &&
peersNodeAddress.equals(connection.getPeersNodeAddressOptional().get())).findAny();
peersNodeAddress.equals(connection.getPeersNodeAddressOptional().get()))
.findAny();
}
private void printInboundConnections() {
StringBuilder sb = new StringBuilder("inBoundConnections size()=")
.append(inBoundConnections.size()).append("\n\tinBoundConnections=");
inBoundConnections.forEach(e -> sb.append(e).append("\n\t"));
inBoundConnections.stream().forEach(e -> sb.append(e).append("\n\t"));
log.debug(sb.toString());
}

View file

@ -17,13 +17,6 @@
package haveno.network.p2p.network;
import lombok.extern.slf4j.Slf4j;
import org.berndpruenster.netlayer.tor.NativeTor;
import org.berndpruenster.netlayer.tor.Tor;
import org.berndpruenster.netlayer.tor.TorCtlException;
import org.berndpruenster.netlayer.tor.Torrc;
import javax.annotation.Nullable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
@ -33,6 +26,15 @@ import java.util.Date;
import java.util.LinkedHashMap;
import java.util.stream.Collectors;
import org.berndpruenster.netlayer.tor.NativeTor;
import org.berndpruenster.netlayer.tor.Tor;
import org.berndpruenster.netlayer.tor.TorCtlException;
import org.berndpruenster.netlayer.tor.Torrc;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable;
/**
* This class creates a brand new instance of the Tor onion router.
*
@ -49,19 +51,20 @@ public class NewTor extends TorMode {
private final File torrcFile;
private final String torrcOptions;
private final Collection<String> bridgeEntries;
private final BridgeAddressProvider bridgeAddressProvider;
public NewTor(File torWorkingDirectory, @Nullable File torrcFile, String torrcOptions, Collection<String> bridgeEntries) {
public NewTor(File torWorkingDirectory, @Nullable File torrcFile, String torrcOptions, BridgeAddressProvider bridgeAddressProvider) {
super(torWorkingDirectory);
this.torrcFile = torrcFile;
this.torrcOptions = torrcOptions;
this.bridgeEntries = bridgeEntries;
this.bridgeAddressProvider = bridgeAddressProvider;
}
@Override
public Tor getTor() throws IOException, TorCtlException {
long ts1 = new Date().getTime();
Collection<String> bridgeEntries = bridgeAddressProvider.getBridgeAddresses();
if (bridgeEntries != null)
log.info("Using bridges: {}", bridgeEntries.stream().collect(Collectors.joining(",")));
@ -115,5 +118,4 @@ public class NewTor extends TorMode {
public String getHiddenServiceDirectory() {
return "";
}
}

View file

@ -29,7 +29,7 @@ public class OutboundConnection extends Connection {
ConnectionListener connectionListener,
NodeAddress peersNodeAddress,
NetworkProtoResolver networkProtoResolver,
@Nullable NetworkFilter networkFilter) {
super(socket, messageListener, connectionListener, peersNodeAddress, networkProtoResolver, networkFilter);
@Nullable BanFilter banFilter) {
super(socket, messageListener, connectionListener, peersNodeAddress, networkProtoResolver, banFilter);
}
}

View file

@ -18,76 +18,85 @@
package haveno.network.p2p.network;
import haveno.common.proto.network.NetworkProtoResolver;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
// Runs in UserThread
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.jetbrains.annotations.Nullable;
class Server implements Runnable {
private static final Logger log = LoggerFactory.getLogger(Server.class);
private final MessageListener messageListener;
private final ConnectionListener connectionListener;
@Nullable
private final NetworkFilter networkFilter;
private final BanFilter banFilter;
// accessed from different threads
private final ServerSocket serverSocket;
private final int localPort;
private final Set<Connection> connections = new CopyOnWriteArraySet<>();
private volatile boolean stopped;
private final NetworkProtoResolver networkProtoResolver;
private final Thread serverThread = new Thread(this);
public Server(ServerSocket serverSocket,
MessageListener messageListener,
ConnectionListener connectionListener,
NetworkProtoResolver networkProtoResolver,
@Nullable NetworkFilter networkFilter) {
MessageListener messageListener,
ConnectionListener connectionListener,
NetworkProtoResolver networkProtoResolver,
@Nullable BanFilter banFilter) {
this.networkProtoResolver = networkProtoResolver;
this.serverSocket = serverSocket;
this.localPort = serverSocket.getLocalPort();
this.messageListener = messageListener;
this.connectionListener = connectionListener;
this.networkFilter = networkFilter;
this.banFilter = banFilter;
}
public void start() {
serverThread.setName("Server-" + localPort);
serverThread.start();
}
@Override
public void run() {
try {
// Thread created by NetworkNode
Thread.currentThread().setName("Server-" + serverSocket.getLocalPort());
try {
while (!stopped && !Thread.currentThread().isInterrupted()) {
log.debug("Ready to accept new clients on port " + serverSocket.getLocalPort());
while (isServerActive()) {
log.debug("Ready to accept new clients on port " + localPort);
final Socket socket = serverSocket.accept();
if (!stopped && !Thread.currentThread().isInterrupted()) {
log.debug("Accepted new client on localPort/port " + socket.getLocalPort() + "/" + socket.getPort());
if (isServerActive()) {
log.debug("Accepted new client on localPort/port " + socket.getLocalPort() + "/"
+ socket.getPort());
InboundConnection connection = new InboundConnection(socket,
messageListener,
connectionListener,
networkProtoResolver,
networkFilter);
banFilter);
log.debug("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" +
"Server created new inbound connection:"
+ "\nlocalPort/port={}/{}"
+ "\nconnection.uid={}", serverSocket.getLocalPort(), socket.getPort(), connection.getUid()
+ "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n");
+ "\nconnection.uid={}", serverSocket.getLocalPort(), socket.getPort(),
connection.getUid()
+ "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n");
if (!stopped)
if (isServerActive())
connections.add(connection);
else
connection.shutDown(CloseConnectionReason.APP_SHUT_DOWN);
}
}
} catch (IOException e) {
if (!stopped)
if (isServerActive())
e.printStackTrace();
}
} catch (Throwable t) {
@ -97,14 +106,15 @@ class Server implements Runnable {
}
public void shutDown() {
if (!stopped) {
stopped = true;
connections.stream().forEach(c -> c.shutDown(CloseConnectionReason.APP_SHUT_DOWN));
log.info("Server shutdown started");
if (isServerActive()) {
serverThread.interrupt();
connections.forEach(connection -> connection.shutDown(CloseConnectionReason.APP_SHUT_DOWN));
try {
if (!serverSocket.isClosed())
if (!serverSocket.isClosed()) {
serverSocket.close();
}
} catch (SocketException e) {
log.debug("SocketException at shutdown might be expected " + e.getMessage());
} catch (IOException e) {
@ -116,4 +126,8 @@ class Server implements Runnable {
log.warn("stopped already called ast shutdown");
}
}
private boolean isServerActive() {
return !serverThread.isInterrupted();
}
}

View file

@ -17,75 +17,65 @@
package haveno.network.p2p.network;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.runjva.sourceforge.jsocks.protocol.Socks5Proxy;
import haveno.network.p2p.NodeAddress;
import haveno.network.utils.Utils;
import haveno.common.Timer;
import haveno.common.UserThread;
import haveno.common.proto.network.NetworkProtoResolver;
import haveno.common.util.Utilities;
import haveno.network.p2p.NodeAddress;
import haveno.network.utils.Utils;
import javafx.beans.property.BooleanProperty;
import javafx.beans.property.SimpleBooleanProperty;
import haveno.common.util.SingleThreadExecutorUtils;
import org.berndpruenster.netlayer.tor.HiddenServiceSocket;
import org.berndpruenster.netlayer.tor.Tor;
import org.berndpruenster.netlayer.tor.TorCtlException;
import org.berndpruenster.netlayer.tor.TorSocket;
import org.fxmisc.easybind.EasyBind;
import org.fxmisc.easybind.monadic.MonadicBinding;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.runjva.sourceforge.jsocks.protocol.Socks5Proxy;
import java.security.SecureRandom;
import java.net.Socket;
import java.io.IOException;
import java.net.Socket;
import java.security.SecureRandom;
import java.util.Base64;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutorService;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.Nullable;
import static com.google.common.base.Preconditions.checkArgument;
// Run in UserThread
@Slf4j
public class TorNetworkNode extends NetworkNode {
private static final Logger log = LoggerFactory.getLogger(TorNetworkNode.class);
private static final int MAX_RESTART_ATTEMPTS = 5;
private static final long SHUT_DOWN_TIMEOUT = 5;
private static final long SHUT_DOWN_TIMEOUT = 2;
private HiddenServiceSocket hiddenServiceSocket;
private Timer shutDownTimeoutTimer;
private int restartCounter;
@SuppressWarnings("FieldCanBeLocal")
private MonadicBinding<Boolean> allShutDown;
private Tor tor;
private TorMode torMode;
private boolean streamIsolation;
private Socks5Proxy socksProxy;
private ListenableFuture<Void> torStartupFuture;
private boolean shutDownInProgress;
private final ExecutorService executor;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
public TorNetworkNode(int servicePort,
NetworkProtoResolver networkProtoResolver,
boolean useStreamIsolation,
TorMode torMode,
@Nullable NetworkFilter networkFilter) {
super(servicePort, networkProtoResolver, networkFilter);
NetworkProtoResolver networkProtoResolver,
boolean useStreamIsolation,
TorMode torMode,
@Nullable BanFilter banFilter,
int maxConnections) {
super(servicePort, networkProtoResolver, banFilter, maxConnections);
this.torMode = torMode;
this.streamIsolation = useStreamIsolation;
createExecutorService();
}
executor = SingleThreadExecutorUtils.getSingleThreadExecutor("StartTor");
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
@ -98,7 +88,6 @@ public class TorNetworkNode extends NetworkNode {
if (setupListener != null)
addSetupListener(setupListener);
// Create the tor node (takes about 6 sec.)
createTorAndHiddenService(Utils.findFreeSystemPort(), servicePort);
}
@ -106,200 +95,105 @@ public class TorNetworkNode extends NetworkNode {
protected Socket createSocket(NodeAddress peerNodeAddress) throws IOException {
checkArgument(peerNodeAddress.getHostName().endsWith(".onion"), "PeerAddress is not an onion address");
// If streamId is null stream isolation gets deactivated.
// Hidden services use stream isolation by default so we pass null.
// Hidden services use stream isolation by default, so we pass null.
return new TorSocket(peerNodeAddress.getHostName(), peerNodeAddress.getPort(), null);
}
// TODO handle failure more cleanly
public Socks5Proxy getSocksProxy() {
try {
String stream = null;
if (streamIsolation) {
// create a random string
byte[] bytes = new byte[512]; // note that getProxy does Sha256 that string anyways
byte[] bytes = new byte[512]; // tor.getProxy creates a Sha256 hash
new SecureRandom().nextBytes(bytes);
stream = Base64.getEncoder().encodeToString(bytes);
}
if (socksProxy == null || streamIsolation) {
tor = Tor.getDefault();
// ask for the connection
socksProxy = tor != null ? tor.getProxy(stream) : null;
}
return socksProxy;
} catch (TorCtlException e) {
log.error("TorCtlException at getSocksProxy: " + e.toString());
e.printStackTrace();
return null;
} catch (Throwable t) {
log.error("Error at getSocksProxy: " + t.toString());
log.error("Error at getSocksProxy", t);
return null;
}
}
public void shutDown(@Nullable Runnable shutDownCompleteHandler) {
if (allShutDown != null) {
log.warn("We got called shutDown again and ignore it.");
log.info("TorNetworkNode shutdown started");
if (shutDownInProgress) {
log.warn("We got shutDown already called");
return;
}
// this one is executed synchronously
BooleanProperty networkNodeShutDown = networkNodeShutDown();
// this one is committed as a thread to the executor
BooleanProperty torNetworkNodeShutDown = torNetworkNodeShutDown();
BooleanProperty shutDownTimerTriggered = shutDownTimerTriggered();
// Need to store allShutDown to not get garbage collected
allShutDown = EasyBind.combine(torNetworkNodeShutDown, networkNodeShutDown, shutDownTimerTriggered,
(a, b, c) -> (a && b) || c);
allShutDown.subscribe((observable, oldValue, newValue) -> {
if (newValue) {
shutDownTimeoutTimer.stop();
long ts = System.currentTimeMillis();
try {
MoreExecutors.shutdownAndAwaitTermination(executorService, 500, TimeUnit.MILLISECONDS);
log.debug("Shutdown executorService done after {} ms.", System.currentTimeMillis() - ts);
} catch (Throwable t) {
log.error("Shutdown executorService failed with exception: {}", t.getMessage());
t.printStackTrace();
} finally {
if (shutDownCompleteHandler != null)
shutDownCompleteHandler.run();
shutDownInProgress = true;
shutDownTimeoutTimer = UserThread.runAfter(() -> {
log.error("A timeout occurred at shutDown");
if (shutDownCompleteHandler != null)
shutDownCompleteHandler.run();
executor.shutdownNow();
}, SHUT_DOWN_TIMEOUT);
super.shutDown(() -> {
try {
tor = Tor.getDefault();
if (tor != null) {
tor.shutdown();
tor = null;
log.info("Tor shutdown completed");
}
executor.shutdownNow();
} catch (Throwable e) {
log.error("Shutdown torNetworkNode failed with exception", e);
} finally {
shutDownTimeoutTimer.stop();
if (shutDownCompleteHandler != null)
shutDownCompleteHandler.run();
}
});
}
private BooleanProperty torNetworkNodeShutDown() {
BooleanProperty done = new SimpleBooleanProperty();
try {
tor = Tor.getDefault();
if (tor != null) {
log.info("Tor has been created already so we can shut it down.");
tor.shutdown();
tor = null;
log.info("Tor shut down completed");
} else {
log.info("Tor has not been created yet. We cancel the torStartupFuture.");
if (torStartupFuture != null) {
torStartupFuture.cancel(true);
}
log.info("torStartupFuture cancelled");
}
} catch (Throwable e) {
log.error("Shutdown torNetworkNode failed with exception: {}", e.getMessage());
e.printStackTrace();
} finally {
// We need to delay as otherwise our listener would not get called if shutdown completes in synchronous manner
UserThread.execute(() -> done.set(true));
}
return done;
}
private BooleanProperty networkNodeShutDown() {
BooleanProperty done = new SimpleBooleanProperty();
// We need to delay as otherwise our listener would not get called if shutdown completes in synchronous manner
UserThread.execute(() -> super.shutDown(() -> done.set(true)));
return done;
}
private BooleanProperty shutDownTimerTriggered() {
BooleanProperty done = new SimpleBooleanProperty();
shutDownTimeoutTimer = UserThread.runAfter(() -> {
log.error("A timeout occurred at shutDown");
done.set(true);
}, SHUT_DOWN_TIMEOUT);
return done;
}
///////////////////////////////////////////////////////////////////////////////////////////
// shutdown, restart
///////////////////////////////////////////////////////////////////////////////////////////
private void restartTor(String errorMessage) {
log.info("Restarting Tor");
restartCounter++;
if (restartCounter <= MAX_RESTART_ATTEMPTS) {
UserThread.execute(() -> {
setupListeners.forEach(SetupListener::onRequestCustomBridges);
});
log.warn("We stop tor as starting tor with the default bridges failed. We request user to add custom bridges.");
shutDown(null);
} else {
String msg = "We tried to restart Tor " + restartCounter +
" times, but it continued to fail with error message:\n" +
errorMessage + "\n\n" +
"Please check your internet connection and firewall and try to start again.";
log.error(msg);
throw new RuntimeException(msg);
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// create tor
// Create tor and hidden service
///////////////////////////////////////////////////////////////////////////////////////////
private void createTorAndHiddenService(int localPort, int servicePort) {
torStartupFuture = executorService.submit(() -> {
executor.submit(() -> {
try {
// get tor
Tor.setDefault(torMode.getTor());
// start hidden service
long ts2 = new Date().getTime();
long ts = System.currentTimeMillis();
hiddenServiceSocket = new HiddenServiceSocket(localPort, torMode.getHiddenServiceDirectory(), servicePort);
nodeAddressProperty.set(new NodeAddress(hiddenServiceSocket.getServiceName() + ":" + hiddenServiceSocket.getHiddenServicePort()));
UserThread.execute(() -> setupListeners.forEach(SetupListener::onTorNodeReady));
hiddenServiceSocket.addReadyListener(socket -> {
try {
log.info("\n################################################################\n" +
"Tor hidden service published after {} ms. Socket={}\n" +
"################################################################",
(new Date().getTime() - ts2), socket); //takes usually 30-40 sec
new Thread() {
@Override
public void run() {
try {
nodeAddressProperty.set(new NodeAddress(hiddenServiceSocket.getServiceName() + ":" + hiddenServiceSocket.getHiddenServicePort()));
startServer(socket);
UserThread.execute(() -> setupListeners.forEach(SetupListener::onHiddenServicePublished));
} catch (final Exception e1) {
log.error(e1.toString());
e1.printStackTrace();
}
}
}.start();
} catch (final Exception e) {
log.error(e.toString());
e.printStackTrace();
}
log.info("\n################################################################\n" +
"Tor hidden service published after {} ms. Socket={}\n" +
"################################################################",
System.currentTimeMillis() - ts, socket);
UserThread.execute(() -> {
nodeAddressProperty.set(new NodeAddress(hiddenServiceSocket.getServiceName() + ":"
+ hiddenServiceSocket.getHiddenServicePort()));
startServer(socket);
setupListeners.forEach(SetupListener::onHiddenServicePublished);
});
return null;
});
} catch (TorCtlException e) {
String msg = e.getCause() != null ? e.getCause().toString() : e.toString();
log.error("Tor node creation failed: {}", msg);
log.error("Starting tor node failed", e);
if (e.getCause() instanceof IOException) {
// Since we cannot connect to Tor, we cannot do nothing.
// Furthermore, we have no hidden services started yet, so there is no graceful
// shutdown needed either
UserThread.execute(() -> setupListeners.forEach(s -> s.onSetupFailed(new RuntimeException(msg))));
UserThread.execute(() -> setupListeners.forEach(s -> s.onSetupFailed(new RuntimeException(e.getMessage()))));
} else {
restartTor(e.getMessage());
UserThread.execute(() -> setupListeners.forEach(SetupListener::onRequestCustomBridges));
log.warn("We shutdown as starting tor with the default bridges failed. We request user to add custom bridges.");
shutDown(null);
}
} catch (IOException e) {
log.error("Could not connect to running Tor: {}", e.getMessage());
// Since we cannot connect to Tor, we cannot do nothing.
// Furthermore, we have no hidden services started yet, so there is no graceful
// shutdown needed either
log.error("Could not connect to running Tor", e);
UserThread.execute(() -> setupListeners.forEach(s -> s.onSetupFailed(new RuntimeException(e.getMessage()))));
} catch (Throwable ignore) {
}
return null;
});
Futures.addCallback(torStartupFuture, Utilities.failureCallback(throwable ->
UserThread.execute(() -> log.error("Hidden service creation failed: " + throwable))
), MoreExecutors.directExecutor());
}
}

View file

@ -17,32 +17,43 @@
package haveno.network.p2p.peers;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import haveno.common.Timer;
import haveno.common.UserThread;
import haveno.network.p2p.BundleOfEnvelopes;
import haveno.network.p2p.NodeAddress;
import haveno.network.p2p.network.Connection;
import haveno.network.p2p.network.NetworkNode;
import haveno.network.p2p.storage.messages.BroadcastMessage;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import haveno.common.Timer;
import haveno.common.UserThread;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@Slf4j
public class BroadcastHandler implements PeerManager.Listener {
private static final long BASE_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(120);
///////////////////////////////////////////////////////////////////////////////////////////
// Listener
///////////////////////////////////////////////////////////////////////////////////////////
@ -57,7 +68,6 @@ public class BroadcastHandler implements PeerManager.Listener {
void onNotSufficientlyBroadcast(int numOfCompletedBroadcasts, int numOfFailedBroadcast);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Instance fields
///////////////////////////////////////////////////////////////////////////////////////////
@ -67,10 +77,14 @@ public class BroadcastHandler implements PeerManager.Listener {
private final ResultHandler resultHandler;
private final String uid;
private boolean stopped, timeoutTriggered;
private int numOfCompletedBroadcasts, numOfFailedBroadcasts, numPeersForBroadcast;
private final AtomicBoolean stopped = new AtomicBoolean();
private final AtomicBoolean timeoutTriggered = new AtomicBoolean();
private final AtomicInteger numOfCompletedBroadcasts = new AtomicInteger();
private final AtomicInteger numOfFailedBroadcasts = new AtomicInteger();
private final AtomicInteger numPeersForBroadcast = new AtomicInteger();
@Nullable
private Timer timeoutTimer;
private final Set<SettableFuture<Connection>> sendMessageFutures = new CopyOnWriteArraySet<>();
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
@ -85,12 +99,17 @@ public class BroadcastHandler implements PeerManager.Listener {
peerManager.addListener(this);
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void broadcast(List<Broadcaster.BroadcastRequest> broadcastRequests, boolean shutDownRequested) {
public void broadcast(List<Broadcaster.BroadcastRequest> broadcastRequests,
boolean shutDownRequested,
ListeningExecutorService executor) {
if (broadcastRequests.isEmpty()) {
return;
}
List<Connection> confirmedConnections = new ArrayList<>(networkNode.getConfirmedConnections());
Collections.shuffle(confirmedConnections);
@ -98,42 +117,42 @@ public class BroadcastHandler implements PeerManager.Listener {
if (shutDownRequested) {
delay = 1;
// We sent to all peers as in case we had offers we want that it gets removed with higher reliability
numPeersForBroadcast = confirmedConnections.size();
numPeersForBroadcast.set(confirmedConnections.size());
} else {
if (requestsContainOwnMessage(broadcastRequests)) {
// The broadcastRequests contains at least 1 message we have originated, so we send to all peers and
// with shorter delay
numPeersForBroadcast = confirmedConnections.size();
// The broadcastRequests contains at least 1 message we have originated, so we send to all peers and with shorter delay
numPeersForBroadcast.set(confirmedConnections.size());
delay = 50;
} else {
// Relay nodes only send to max 7 peers and with longer delay
numPeersForBroadcast = Math.min(7, confirmedConnections.size());
numPeersForBroadcast.set(Math.min(7, confirmedConnections.size()));
delay = 100;
}
}
setupTimeoutHandler(broadcastRequests, delay, shutDownRequested);
int iterations = numPeersForBroadcast;
int iterations = numPeersForBroadcast.get();
for (int i = 0; i < iterations; i++) {
long minDelay = (i + 1) * delay;
long maxDelay = (i + 2) * delay;
Connection connection = confirmedConnections.get(i);
UserThread.runAfterRandomDelay(() -> {
if (stopped) {
if (stopped.get()) {
return;
}
// We use broadcastRequests which have excluded the requests for messages the connection has
// originated to avoid sending back the message we received. We also remove messages not satisfying
// capability checks.
List<Broadcaster.BroadcastRequest> broadcastRequestsForConnection = getBroadcastRequestsForConnection(connection, broadcastRequests);
List<Broadcaster.BroadcastRequest> broadcastRequestsForConnection = getBroadcastRequestsForConnection(
connection, broadcastRequests);
// Could be empty list...
if (broadcastRequestsForConnection.isEmpty()) {
// We decrease numPeers in that case for making completion checks correct.
if (numPeersForBroadcast > 0) {
numPeersForBroadcast--;
if (numPeersForBroadcast.get() > 0) {
numPeersForBroadcast.decrementAndGet();
}
checkForCompletion();
return;
@ -142,24 +161,27 @@ public class BroadcastHandler implements PeerManager.Listener {
if (connection.isStopped()) {
// Connection has died in the meantime. We skip it.
// We decrease numPeers in that case for making completion checks correct.
if (numPeersForBroadcast > 0) {
numPeersForBroadcast--;
if (numPeersForBroadcast.get() > 0) {
numPeersForBroadcast.decrementAndGet();
}
checkForCompletion();
return;
}
sendToPeer(connection, broadcastRequestsForConnection);
try {
sendToPeer(connection, broadcastRequestsForConnection, executor);
} catch (RejectedExecutionException e) {
log.error("RejectedExecutionException at broadcast ", e);
cleanup();
}
}, minDelay, maxDelay, TimeUnit.MILLISECONDS);
}
}
public void cancel() {
stopped = true;
cleanup();
}
///////////////////////////////////////////////////////////////////////////////////////////
// PeerManager.Listener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@ -177,7 +199,6 @@ public class BroadcastHandler implements PeerManager.Listener {
public void onAwakeFromStandby() {
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////
@ -192,22 +213,23 @@ public class BroadcastHandler implements PeerManager.Listener {
}
private void setupTimeoutHandler(List<Broadcaster.BroadcastRequest> broadcastRequests,
int delay,
boolean shutDownRequested) {
int delay,
boolean shutDownRequested) {
// In case of shutdown we try to complete fast and set a short 1 second timeout
long baseTimeoutMs = shutDownRequested ? TimeUnit.SECONDS.toMillis(1) : BASE_TIMEOUT_MS;
long timeoutDelay = baseTimeoutMs + delay * (numPeersForBroadcast + 1); // We added 1 in the loop
long timeoutDelay = baseTimeoutMs + delay * (numPeersForBroadcast.get() + 1); // We added 1 in the loop
timeoutTimer = UserThread.runAfter(() -> {
if (stopped) {
if (stopped.get()) {
return;
}
timeoutTriggered = true;
timeoutTriggered.set(true);
numOfFailedBroadcasts.incrementAndGet();
log.warn("Broadcast did not complete after {} sec.\n" +
"numPeersForBroadcast={}\n" +
"numOfCompletedBroadcasts={}\n" +
"numOfFailedBroadcasts={}",
"numPeersForBroadcast={}\n" +
"numOfCompletedBroadcasts={}\n" +
"numOfFailedBroadcasts={}",
timeoutDelay / 1000d,
numPeersForBroadcast,
numOfCompletedBroadcasts,
@ -221,27 +243,30 @@ public class BroadcastHandler implements PeerManager.Listener {
}
// We exclude the requests containing a message we received from that connection
// Also we filter out messages which requires a capability but peer does not support it.
// Also we filter out messages which requires a capability but peer does not
// support it.
private List<Broadcaster.BroadcastRequest> getBroadcastRequestsForConnection(Connection connection,
List<Broadcaster.BroadcastRequest> broadcastRequests) {
List<Broadcaster.BroadcastRequest> broadcastRequests) {
return broadcastRequests.stream()
.filter(broadcastRequest -> !connection.getPeersNodeAddressOptional().isPresent() ||
!connection.getPeersNodeAddressOptional().get().equals(broadcastRequest.getSender()))
.filter(broadcastRequest -> connection.noCapabilityRequiredOrCapabilityIsSupported(broadcastRequest.getMessage()))
.filter(broadcastRequest -> connection.testCapability(broadcastRequest.getMessage()))
.collect(Collectors.toList());
}
private void sendToPeer(Connection connection, List<Broadcaster.BroadcastRequest> broadcastRequestsForConnection) {
private void sendToPeer(Connection connection,
List<Broadcaster.BroadcastRequest> broadcastRequestsForConnection,
ListeningExecutorService executor) {
// Can be BundleOfEnvelopes or a single BroadcastMessage
BroadcastMessage broadcastMessage = getMessage(broadcastRequestsForConnection);
SettableFuture<Connection> future = networkNode.sendMessage(connection, broadcastMessage);
SettableFuture<Connection> future = networkNode.sendMessage(connection, broadcastMessage, executor);
sendMessageFutures.add(future);
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Connection connection) {
numOfCompletedBroadcasts++;
numOfCompletedBroadcasts.incrementAndGet();
if (stopped) {
if (stopped.get()) {
return;
}
@ -251,11 +276,10 @@ public class BroadcastHandler implements PeerManager.Listener {
@Override
public void onFailure(@NotNull Throwable throwable) {
log.warn("Broadcast to {} failed. ErrorMessage={}", connection.getPeersNodeAddressOptional(),
throwable.getMessage());
numOfFailedBroadcasts++;
log.warn("Broadcast to " + connection.getPeersNodeAddressOptional() + " failed. ", throwable);
numOfFailedBroadcasts.incrementAndGet();
if (stopped) {
if (stopped.get()) {
return;
}
@ -277,43 +301,56 @@ public class BroadcastHandler implements PeerManager.Listener {
}
private void maybeNotifyListeners(List<Broadcaster.BroadcastRequest> broadcastRequests) {
int numOfCompletedBroadcastsTarget = Math.max(1, Math.min(numPeersForBroadcast, 3));
// We use equal checks to avoid duplicated listener calls as it would be the case with >= checks.
if (numOfCompletedBroadcasts == numOfCompletedBroadcastsTarget) {
// We have heard back from 3 peers (or all peers if numPeers is lower) so we consider the message was sufficiently broadcast.
int numOfCompletedBroadcastsTarget = Math.max(1, Math.min(numPeersForBroadcast.get(), 3));
// We use equal checks to avoid duplicated listener calls as it would be the
// case with >= checks.
if (numOfCompletedBroadcasts.get() == numOfCompletedBroadcastsTarget) {
// We have heard back from 3 peers (or all peers if numPeers is lower) so we
// consider the message was sufficiently broadcast.
broadcastRequests.stream()
.filter(broadcastRequest -> broadcastRequest.getListener() != null)
.map(Broadcaster.BroadcastRequest::getListener)
.filter(Objects::nonNull)
.forEach(listener -> listener.onSufficientlyBroadcast(broadcastRequests));
} else {
// We check if number of open requests to peers is less than we need to reach numOfCompletedBroadcastsTarget.
// Thus we never can reach required resilience as too many numOfFailedBroadcasts occurred.
int maxPossibleSuccessCases = numPeersForBroadcast - numOfFailedBroadcasts;
int maxPossibleSuccessCases = numPeersForBroadcast.get() - numOfFailedBroadcasts.get();
// We subtract 1 as we want to have it called only once, with a < comparision we would trigger repeatedly.
boolean notEnoughSucceededOrOpen = maxPossibleSuccessCases == numOfCompletedBroadcastsTarget - 1;
// We did not reach resilience level and timeout prevents to reach it later
boolean timeoutAndNotEnoughSucceeded = timeoutTriggered && numOfCompletedBroadcasts < numOfCompletedBroadcastsTarget;
boolean timeoutAndNotEnoughSucceeded = timeoutTriggered.get() && numOfCompletedBroadcasts.get() < numOfCompletedBroadcastsTarget;
if (notEnoughSucceededOrOpen || timeoutAndNotEnoughSucceeded) {
broadcastRequests.stream()
.filter(broadcastRequest -> broadcastRequest.getListener() != null)
.map(Broadcaster.BroadcastRequest::getListener)
.forEach(listener -> listener.onNotSufficientlyBroadcast(numOfCompletedBroadcasts, numOfFailedBroadcasts));
.filter(Objects::nonNull)
.forEach(listener -> listener.onNotSufficientlyBroadcast(numOfCompletedBroadcasts.get(), numOfFailedBroadcasts.get()));
}
}
}
private void checkForCompletion() {
if (numOfCompletedBroadcasts + numOfFailedBroadcasts == numPeersForBroadcast) {
if (numOfCompletedBroadcasts.get() + numOfFailedBroadcasts.get() == numPeersForBroadcast.get()) {
cleanup();
}
}
private void cleanup() {
stopped = true;
if (stopped.get()) {
return;
}
stopped.set(true);
if (timeoutTimer != null) {
timeoutTimer.stop();
timeoutTimer = null;
}
sendMessageFutures.stream()
.filter(future -> !future.isCancelled() && !future.isDone())
.forEach(future -> future.cancel(true));
sendMessageFutures.clear();
peerManager.removeListener(this);
resultHandler.onCompleted(this);
}

View file

@ -17,22 +17,32 @@
package haveno.network.p2p.peers;
import haveno.common.Timer;
import haveno.common.UserThread;
import haveno.network.p2p.NodeAddress;
import haveno.network.p2p.network.NetworkNode;
import haveno.network.p2p.storage.messages.BroadcastMessage;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.Nullable;
import haveno.common.Timer;
import haveno.common.UserThread;
import haveno.common.config.Config;
import haveno.common.util.Utilities;
import javax.inject.Inject;
import javax.inject.Named;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.Nullable;
@Slf4j
public class Broadcaster implements BroadcastHandler.ResultHandler {
@ -45,28 +55,40 @@ public class Broadcaster implements BroadcastHandler.ResultHandler {
private Timer timer;
private boolean shutDownRequested;
private Runnable shutDownResultHandler;
private final ListeningExecutorService executor;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
@Inject
public Broadcaster(NetworkNode networkNode, PeerManager peerManager) {
public Broadcaster(NetworkNode networkNode,
PeerManager peerManager,
@Named(Config.MAX_CONNECTIONS) int maxConnections) {
this.networkNode = networkNode;
this.peerManager = peerManager;
ThreadPoolExecutor threadPoolExecutor = Utilities.getThreadPoolExecutor("Broadcaster",
maxConnections * 3,
maxConnections * 4,
30,
30);
executor = MoreExecutors.listeningDecorator(threadPoolExecutor);
}
public void shutDown(Runnable resultHandler) {
log.info("Broadcaster shutdown started");
shutDownRequested = true;
shutDownResultHandler = resultHandler;
if (broadcastRequests.isEmpty()) {
doShutDown();
} else {
// We set delay of broadcasts and timeout to very low values,
// so we can expect that we get onCompleted called very fast and trigger the doShutDown from there.
// so we can expect that we get onCompleted called very fast and trigger the
// doShutDown from there.
maybeBroadcastBundle();
}
executor.shutdown();
}
public void flush() {
@ -81,26 +103,19 @@ public class Broadcaster implements BroadcastHandler.ResultHandler {
shutDownResultHandler.run();
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void broadcast(BroadcastMessage message,
@Nullable NodeAddress sender) {
@Nullable NodeAddress sender) {
broadcast(message, sender, null);
}
public void broadcast(BroadcastMessage message,
@Nullable NodeAddress sender,
@Nullable BroadcastHandler.Listener listener) {
@Nullable NodeAddress sender,
@Nullable BroadcastHandler.Listener listener) {
broadcastRequests.add(new BroadcastRequest(message, sender, listener));
// Keep that log on INFO for better debugging if the feature works as expected. Later it can
// be remove or set to DEBUG
log.debug("Broadcast requested for {}. We queue it up for next bundled broadcast.",
message.getClass().getSimpleName());
if (timer == null) {
timer = UserThread.runAfter(this::maybeBroadcastBundle, BROADCAST_INTERVAL_MS, TimeUnit.MILLISECONDS);
}
@ -108,19 +123,18 @@ public class Broadcaster implements BroadcastHandler.ResultHandler {
private void maybeBroadcastBundle() {
if (!broadcastRequests.isEmpty()) {
log.debug("Broadcast bundled requests of {} messages. Message types: {}",
broadcastRequests.size(),
broadcastRequests.stream().map(e -> e.getMessage().getClass().getSimpleName()).collect(Collectors.toList()));
BroadcastHandler broadcastHandler = new BroadcastHandler(networkNode, peerManager, this);
broadcastHandlers.add(broadcastHandler);
broadcastHandler.broadcast(new ArrayList<>(broadcastRequests), shutDownRequested);
broadcastHandler.broadcast(new ArrayList<>(broadcastRequests), shutDownRequested, executor);
broadcastRequests.clear();
if (timer != null) {
timer.stop();
}
timer = null;
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// BroadcastHandler.ResultHandler implementation
///////////////////////////////////////////////////////////////////////////////////////////
@ -133,7 +147,6 @@ public class Broadcaster implements BroadcastHandler.ResultHandler {
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// BroadcastRequest class
///////////////////////////////////////////////////////////////////////////////////////////

View file

@ -252,10 +252,6 @@ public final class PeerManager implements ConnectionListener, PersistedDataHost
maybeRemoveBannedPeer(closeConnectionReason, connection);
}
@Override
public void onError(Throwable throwable) {
}
///////////////////////////////////////////////////////////////////////////////////////////
// Connection

View file

@ -221,10 +221,6 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
}
}
@Override
public void onError(Throwable throwable) {
}
///////////////////////////////////////////////////////////////////////////////////////////
// PeerManager.Listener implementation

View file

@ -53,14 +53,19 @@ public final class GetDataResponse extends NetworkEnvelope implements SupportedC
private final boolean isGetUpdatedDataResponse;
private final Capabilities supportedCapabilities;
// Added at v1.9.6
private final boolean wasTruncated;
public GetDataResponse(@NotNull Set<ProtectedStorageEntry> dataSet,
@NotNull Set<PersistableNetworkPayload> persistableNetworkPayloadSet,
int requestNonce,
boolean isGetUpdatedDataResponse) {
boolean isGetUpdatedDataResponse,
boolean wasTruncated) {
this(dataSet,
persistableNetworkPayloadSet,
requestNonce,
isGetUpdatedDataResponse,
wasTruncated,
Capabilities.app,
Version.getP2PMessageVersion());
}
@ -73,6 +78,7 @@ public final class GetDataResponse extends NetworkEnvelope implements SupportedC
@NotNull Set<PersistableNetworkPayload> persistableNetworkPayloadSet,
int requestNonce,
boolean isGetUpdatedDataResponse,
boolean wasTruncated,
@NotNull Capabilities supportedCapabilities,
String messageVersion) {
super(messageVersion);
@ -81,6 +87,7 @@ public final class GetDataResponse extends NetworkEnvelope implements SupportedC
this.persistableNetworkPayloadSet = persistableNetworkPayloadSet;
this.requestNonce = requestNonce;
this.isGetUpdatedDataResponse = isGetUpdatedDataResponse;
this.wasTruncated = wasTruncated;
this.supportedCapabilities = supportedCapabilities;
}
@ -102,6 +109,7 @@ public final class GetDataResponse extends NetworkEnvelope implements SupportedC
.collect(Collectors.toList()))
.setRequestNonce(requestNonce)
.setIsGetUpdatedDataResponse(isGetUpdatedDataResponse)
.setWasTruncated(wasTruncated)
.addAllSupportedCapabilities(Capabilities.toIntList(supportedCapabilities));
protobuf.NetworkEnvelope proto = getNetworkEnvelopeBuilder()
@ -114,7 +122,10 @@ public final class GetDataResponse extends NetworkEnvelope implements SupportedC
public static GetDataResponse fromProto(protobuf.GetDataResponse proto,
NetworkProtoResolver resolver,
String messageVersion) {
log.info("Received a GetDataResponse with {}", Utilities.readableFileSize(proto.getSerializedSize()));
boolean wasTruncated = proto.getWasTruncated();
log.info("Received a GetDataResponse with {} {}",
Utilities.readableFileSize(proto.getSerializedSize()),
wasTruncated ? " (was truncated)" : "");
Set<ProtectedStorageEntry> dataSet = proto.getDataSetList().stream()
.map(entry -> (ProtectedStorageEntry) resolver.fromProto(entry)).collect(Collectors.toSet());
Set<PersistableNetworkPayload> persistableNetworkPayloadSet = proto.getPersistableNetworkPayloadItemsList().stream()
@ -123,6 +134,7 @@ public final class GetDataResponse extends NetworkEnvelope implements SupportedC
persistableNetworkPayloadSet,
proto.getRequestNonce(),
proto.getIsGetUpdatedDataResponse(),
wasTruncated,
Capabilities.fromIntList(proto.getSupportedCapabilitiesList()),
messageVersion);
}

View file

@ -135,10 +135,6 @@ public class KeepAliveManager implements MessageListener, ConnectionListener, Pe
closeHandler(connection);
}
@Override
public void onError(Throwable throwable) {
}
///////////////////////////////////////////////////////////////////////////////////////////
// PeerManager.Listener implementation

View file

@ -147,10 +147,6 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener,
}
}
@Override
public void onError(Throwable throwable) {
}
///////////////////////////////////////////////////////////////////////////////////////////
// PeerManager.Listener implementation

View file

@ -17,24 +17,6 @@
package haveno.network.p2p.storage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.inject.name.Named;
import com.google.protobuf.ByteString;
import haveno.common.Timer;
import haveno.common.UserThread;
import haveno.common.app.Capabilities;
import haveno.common.crypto.CryptoException;
import haveno.common.crypto.Hash;
import haveno.common.crypto.Sig;
import haveno.common.persistence.PersistenceManager;
import haveno.common.proto.network.NetworkEnvelope;
import haveno.common.proto.network.NetworkPayload;
import haveno.common.proto.persistable.PersistablePayload;
import haveno.common.proto.persistable.PersistedDataHost;
import haveno.common.util.Hex;
import haveno.common.util.Tuple2;
import haveno.common.util.Utilities;
import haveno.network.p2p.NodeAddress;
import haveno.network.p2p.network.CloseConnectionReason;
import haveno.network.p2p.network.Connection;
@ -72,20 +54,43 @@ import haveno.network.p2p.storage.persistence.ProtectedDataStoreService;
import haveno.network.p2p.storage.persistence.RemovedPayloadsService;
import haveno.network.p2p.storage.persistence.ResourceDataStoreService;
import haveno.network.p2p.storage.persistence.SequenceNumberMap;
import javafx.beans.property.BooleanProperty;
import javafx.beans.property.SimpleBooleanProperty;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import haveno.common.Timer;
import haveno.common.UserThread;
import haveno.common.app.Capabilities;
import haveno.common.crypto.CryptoException;
import haveno.common.crypto.Hash;
import haveno.common.crypto.Sig;
import haveno.common.persistence.PersistenceManager;
import haveno.common.proto.network.GetDataResponsePriority;
import haveno.common.proto.network.NetworkEnvelope;
import haveno.common.proto.network.NetworkPayload;
import haveno.common.proto.persistable.PersistablePayload;
import haveno.common.proto.persistable.PersistedDataHost;
import haveno.common.util.Hex;
import haveno.common.util.Tuple2;
import haveno.common.util.Utilities;
import com.google.protobuf.ByteString;
import com.google.inject.name.Named;
import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import org.fxmisc.easybind.EasyBind;
import org.fxmisc.easybind.monadic.MonadicBinding;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javafx.beans.property.BooleanProperty;
import javafx.beans.property.SimpleBooleanProperty;
import java.security.KeyPair;
import java.security.PublicKey;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -101,9 +106,20 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable;
@Slf4j
public class P2PDataStorage implements MessageListener, ConnectionListener, PersistedDataHost {
/**
@ -118,7 +134,8 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
private boolean initialRequestApplied = false;
private final Broadcaster broadcaster;
private final AppendOnlyDataStoreService appendOnlyDataStoreService;
@VisibleForTesting
final AppendOnlyDataStoreService appendOnlyDataStoreService;
private final ProtectedDataStoreService protectedDataStoreService;
private final ResourceDataStoreService resourceDataStoreService;
@ -143,6 +160,8 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
// Don't convert to local variable as it might get GC'ed.
private MonadicBinding<Boolean> readFromResourcesCompleteBinding;
@Setter
private Predicate<ProtectedStoragePayload> filterPredicate; // Set from FilterManager
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
@ -150,14 +169,14 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
@Inject
public P2PDataStorage(NetworkNode networkNode,
Broadcaster broadcaster,
AppendOnlyDataStoreService appendOnlyDataStoreService,
ProtectedDataStoreService protectedDataStoreService,
ResourceDataStoreService resourceDataStoreService,
PersistenceManager<SequenceNumberMap> persistenceManager,
RemovedPayloadsService removedPayloadsService,
Clock clock,
@Named("MAX_SEQUENCE_NUMBER_MAP_SIZE_BEFORE_PURGE") int maxSequenceNumberBeforePurge) {
Broadcaster broadcaster,
AppendOnlyDataStoreService appendOnlyDataStoreService,
ProtectedDataStoreService protectedDataStoreService,
ResourceDataStoreService resourceDataStoreService,
PersistenceManager<SequenceNumberMap> persistenceManager,
RemovedPayloadsService removedPayloadsService,
Clock clock,
@Named("MAX_SEQUENCE_NUMBER_MAP_SIZE_BEFORE_PURGE") int maxSequenceNumberBeforePurge) {
this.broadcaster = broadcaster;
this.appendOnlyDataStoreService = appendOnlyDataStoreService;
this.protectedDataStoreService = protectedDataStoreService;
@ -173,7 +192,6 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
this.persistenceManager.initialize(sequenceNumberMap, PersistenceManager.Source.PRIVATE_LOW_PRIO);
}
///////////////////////////////////////////////////////////////////////////////////////////
// PersistedDataHost
///////////////////////////////////////////////////////////////////////////////////////////
@ -181,9 +199,9 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
@Override
public void readPersisted(Runnable completeHandler) {
persistenceManager.readPersisted(persisted -> {
sequenceNumberMap.setMap(getPurgedSequenceNumberMap(persisted.getMap()));
completeHandler.run();
},
sequenceNumberMap.setMap(getPurgedSequenceNumberMap(persisted.getMap()));
completeHandler.run();
},
completeHandler);
}
@ -236,10 +254,9 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload);
map.put(hashOfPayload, protectedStorageEntry);
log.trace("## addProtectedMailboxStorageEntryToMap hashOfPayload={}, map={}", hashOfPayload, printMap());
//log.trace("## addProtectedMailboxStorageEntryToMap hashOfPayload={}, map={}", hashOfPayload, printMap());
}
///////////////////////////////////////////////////////////////////////////////////////////
// RequestData API
///////////////////////////////////////////////////////////////////////////////////////////
@ -266,18 +283,9 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
// PersistedStoragePayload items don't get removed, so we don't have an issue with the case that
// an object gets removed in between PreliminaryGetDataRequest and the GetUpdatedDataRequest and we would
// miss that event if we do not load the full set or use some delta handling.
Map<ByteArray, PersistableNetworkPayload> mapForDataRequest = getMapForDataRequest();
Set<byte[]> excludedKeys = getKeysAsByteSet(mapForDataRequest);
log.trace("## getKnownPayloadHashes map of PersistableNetworkPayloads={}, excludedKeys={}",
printPersistableNetworkPayloadMap(mapForDataRequest),
excludedKeys.stream().map(Utilities::encodeToHex).toArray());
Set<byte[]> excludedKeysFromProtectedStorageEntryMap = getKeysAsByteSet(map);
log.trace("## getKnownPayloadHashes map of ProtectedStorageEntrys={}, excludedKeys={}",
printMap(),
excludedKeysFromProtectedStorageEntryMap.stream().map(Utilities::encodeToHex).toArray());
excludedKeys.addAll(excludedKeysFromProtectedStorageEntryMap);
return excludedKeys;
}
@ -300,30 +308,40 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
// mapForDataResponse contains the filtered by version data from HistoricalDataStoreService as well as all other
// maps of the remaining appendOnlyDataStoreServices.
Map<ByteArray, PersistableNetworkPayload> mapForDataResponse = getMapForDataResponse(getDataRequest.getVersion());
Set<PersistableNetworkPayload> filteredPersistableNetworkPayloads =
filterKnownHashes(
mapForDataResponse,
Function.identity(),
excludedKeysAsByteArray,
peerCapabilities,
maxEntriesPerType,
wasPersistableNetworkPayloadsTruncated);
// Give a bit of tolerance for message overhead
double maxSize = Connection.getMaxPermittedMessageSize() * 0.6;
// 25% of space is allocated for PersistableNetworkPayloads
long limit = Math.round(maxSize * 0.25);
Set<PersistableNetworkPayload> filteredPersistableNetworkPayloads = filterKnownHashes(
mapForDataResponse,
Function.identity(),
excludedKeysAsByteArray,
peerCapabilities,
maxEntriesPerType,
limit,
wasPersistableNetworkPayloadsTruncated,
true);
log.info("{} PersistableNetworkPayload entries remained after filtered by excluded keys. " +
"Original map had {} entries.",
"Original map had {} entries.",
filteredPersistableNetworkPayloads.size(), mapForDataResponse.size());
log.trace("## buildGetDataResponse filteredPersistableNetworkPayloadHashes={}",
filteredPersistableNetworkPayloads.stream()
.map(e -> Utilities.encodeToHex(e.getHash()))
.toArray());
Set<ProtectedStorageEntry> filteredProtectedStorageEntries =
filterKnownHashes(
map,
ProtectedStorageEntry::getProtectedStoragePayload,
excludedKeysAsByteArray,
peerCapabilities,
maxEntriesPerType,
wasProtectedStorageEntriesTruncated);
// We give 75% space to ProtectedStorageEntries as they contain MailBoxMessages and those can be larger.
limit = Math.round(maxSize * 0.75);
Set<ProtectedStorageEntry> filteredProtectedStorageEntries = filterKnownHashes(
map,
ProtectedStorageEntry::getProtectedStoragePayload,
excludedKeysAsByteArray,
peerCapabilities,
maxEntriesPerType,
limit,
wasProtectedStorageEntriesTruncated,
false);
log.info("{} ProtectedStorageEntry entries remained after filtered by excluded keys. " +
"Original map had {} entries.",
filteredProtectedStorageEntries.size(), map.size());
@ -332,14 +350,15 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
.map(e -> get32ByteHashAsByteArray((e.getProtectedStoragePayload())))
.toArray());
boolean wasTruncated = wasPersistableNetworkPayloadsTruncated.get() || wasProtectedStorageEntriesTruncated.get();
return new GetDataResponse(
filteredProtectedStorageEntries,
filteredPersistableNetworkPayloads,
getDataRequest.getNonce(),
getDataRequest instanceof GetUpdatedDataRequest);
getDataRequest instanceof GetUpdatedDataRequest,
wasTruncated);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Utils for collecting the exclude hashes
///////////////////////////////////////////////////////////////////////////////////////////
@ -358,7 +377,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
serviceMap = service.getMap();
}
map.putAll(serviceMap);
log.info("We added {} entries from {} to the excluded key set of our request",
log.debug("We added {} entries from {} to the excluded key set of our request",
serviceMap.size(), service.getClass().getSimpleName());
});
return map;
@ -388,56 +407,134 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
*/
static private <T extends NetworkPayload> Set<T> filterKnownHashes(
Map<ByteArray, T> toFilter,
Function<T, ? extends NetworkPayload> objToPayload,
Function<T, ? extends NetworkPayload> asPayload,
Set<ByteArray> knownHashes,
Capabilities peerCapabilities,
int maxEntries,
AtomicBoolean outTruncated) {
long limit,
AtomicBoolean outTruncated,
boolean isPersistableNetworkPayload) {
log.info("Filter {} data based on {} knownHashes",
isPersistableNetworkPayload ? "PersistableNetworkPayload" : "ProtectedStorageEntry",
knownHashes.size());
log.info("Num knownHashes {}", knownHashes.size());
AtomicLong totalSize = new AtomicLong();
AtomicBoolean exceededSizeLimit = new AtomicBoolean();
Set<Map.Entry<ByteArray, T>> entries = toFilter.entrySet();
List<T> dateSortedTruncatablePayloads = entries.stream()
.filter(entry -> entry.getValue() instanceof DateSortedTruncatablePayload)
Map<String, AtomicInteger> numItemsByClassName = new HashMap<>();
entries.forEach(entry -> {
String name = asPayload.apply(entry.getValue()).getClass().getSimpleName();
numItemsByClassName.putIfAbsent(name, new AtomicInteger());
numItemsByClassName.get(name).incrementAndGet();
});
log.info("numItemsByClassName: {}", numItemsByClassName);
// Map.Entry.value can be ProtectedStorageEntry or PersistableNetworkPayload. We call it item in the steam iterations.
List<T> filteredItems = entries.stream()
.filter(entry -> !knownHashes.contains(entry.getKey()))
.map(Map.Entry::getValue)
.filter(payload -> shouldTransmitPayloadToPeer(peerCapabilities, objToPayload.apply(payload)))
.sorted(Comparator.comparing(payload -> ((DateSortedTruncatablePayload) payload).getDate()))
.filter(item -> shouldTransmitPayloadToPeer(peerCapabilities, asPayload.apply(item)))
.collect(Collectors.toList());
log.info("Num filtered dateSortedTruncatablePayloads {}", dateSortedTruncatablePayloads.size());
if (!dateSortedTruncatablePayloads.isEmpty()) {
int maxItems = ((DateSortedTruncatablePayload) dateSortedTruncatablePayloads.get(0)).maxItems();
if (dateSortedTruncatablePayloads.size() > maxItems) {
int fromIndex = dateSortedTruncatablePayloads.size() - maxItems;
int toIndex = dateSortedTruncatablePayloads.size();
dateSortedTruncatablePayloads = dateSortedTruncatablePayloads.subList(fromIndex, toIndex);
log.info("Num truncated dateSortedTruncatablePayloads {}", dateSortedTruncatablePayloads.size());
List<T> resultItems = new ArrayList<>();
// Truncation follows this rules
// 1. Add all payloads with GetDataResponsePriority.MID
// 2. Add all payloads with GetDataResponsePriority.LOW && !DateSortedTruncatablePayload until exceededSizeLimit is reached
// 3. if(!exceededSizeLimit) Add all payloads with GetDataResponsePriority.LOW && DateSortedTruncatablePayload until
// exceededSizeLimit is reached and truncate by maxItems (sorted by date). We add the sublist to our resultItems in
// reverse order so in case we cut off at next step we cut off oldest items.
// 4. We truncate list if resultList size > maxEntries
// 5. Add all payloads with GetDataResponsePriority.HIGH
// 1. Add all payloads with GetDataResponsePriority.MID
List<T> midPrioItems = filteredItems.stream()
.filter(item -> item.getGetDataResponsePriority() == GetDataResponsePriority.MID)
.collect(Collectors.toList());
resultItems.addAll(midPrioItems);
log.info("Number of items with GetDataResponsePriority.MID: {}", midPrioItems.size());
// 2. Add all payloads with GetDataResponsePriority.LOW && !DateSortedTruncatablePayload until exceededSizeLimit is reached
List<T> lowPrioItems = filteredItems.stream()
.filter(item -> item.getGetDataResponsePriority() == GetDataResponsePriority.LOW)
.filter(item -> !(asPayload.apply(item) instanceof DateSortedTruncatablePayload))
.filter(item -> {
if (exceededSizeLimit.get()) {
return false;
}
if (totalSize.addAndGet(item.toProtoMessage().getSerializedSize()) > limit) {
exceededSizeLimit.set(true);
return false;
}
return true;
})
.collect(Collectors.toList());
resultItems.addAll(lowPrioItems);
log.info("Number of items with GetDataResponsePriority.LOW and !DateSortedTruncatablePayload: {}. Exceeded size limit: {}", lowPrioItems.size(), exceededSizeLimit.get());
// 3. if(!exceededSizeLimit) Add all payloads with GetDataResponsePriority.LOW && DateSortedTruncatablePayload until
// exceededSizeLimit is reached and truncate by maxItems (sorted by date). We add the sublist to our resultItems in
// reverse order so in case we cut off at next step we cut off oldest items.
if (!exceededSizeLimit.get()) {
List<T> dateSortedItems = filteredItems.stream()
.filter(item -> item.getGetDataResponsePriority() == GetDataResponsePriority.LOW)
.filter(item -> asPayload.apply(item) instanceof DateSortedTruncatablePayload)
.filter(item -> {
if (exceededSizeLimit.get()) {
return false;
}
if (totalSize.addAndGet(item.toProtoMessage().getSerializedSize()) > limit) {
exceededSizeLimit.set(true);
return false;
}
return true;
})
.sorted(Comparator.comparing(item -> ((DateSortedTruncatablePayload) asPayload.apply(item)).getDate()))
.collect(Collectors.toList());
if (!dateSortedItems.isEmpty()) {
int maxItems = ((DateSortedTruncatablePayload) asPayload.apply(dateSortedItems.get(0))).maxItems();
int size = dateSortedItems.size();
if (size > maxItems) {
int fromIndex = size - maxItems;
dateSortedItems = dateSortedItems.subList(fromIndex, size);
outTruncated.set(true);
log.info("Num truncated dateSortedItems {}", size);
log.info("Removed oldest {} dateSortedItems as we exceeded {}", fromIndex, maxItems);
}
}
}
log.info("Number of items with GetDataResponsePriority.LOW and DateSortedTruncatablePayload: {}. Was truncated: {}", dateSortedItems.size(), outTruncated.get());
List<T> filteredResults = entries.stream()
.filter(entry -> !(entry.getValue() instanceof DateSortedTruncatablePayload))
.filter(entry -> !knownHashes.contains(entry.getKey()))
.map(Map.Entry::getValue)
.filter(payload -> shouldTransmitPayloadToPeer(peerCapabilities, objToPayload.apply(payload)))
.collect(Collectors.toList());
log.info("Num filtered non-dateSortedTruncatablePayloads {}", filteredResults.size());
// The non-dateSortedTruncatablePayloads have higher prio, so we added dateSortedTruncatablePayloads
// after those so in case we need to truncate we first truncate the dateSortedTruncatablePayloads.
filteredResults.addAll(dateSortedTruncatablePayloads);
if (filteredResults.size() > maxEntries) {
filteredResults = filteredResults.subList(0, maxEntries);
outTruncated.set(true);
log.info("Num truncated filteredResults {}", filteredResults.size());
// We reverse sorting so in case we get truncated we cut off the older items
Comparator<T> comparator = Comparator.comparing(item -> ((DateSortedTruncatablePayload) asPayload.apply(item)).getDate());
dateSortedItems.sort(comparator.reversed());
resultItems.addAll(dateSortedItems);
} else {
log.info("Num filteredResults {}", filteredResults.size());
log.info("No dateSortedItems added as we exceeded already the exceededSizeLimit of {}", limit);
}
return new HashSet<>(filteredResults);
// 4. We truncate list if resultList size > maxEntries
int size = resultItems.size();
if (size > maxEntries) {
resultItems = resultItems.subList(0, maxEntries);
outTruncated.set(true);
log.info("Removed last {} items as we exceeded {}", size - maxEntries, maxEntries);
}
outTruncated.set(outTruncated.get() || exceededSizeLimit.get());
// 5. Add all payloads with GetDataResponsePriority.HIGH
List<T> highPrioItems = filteredItems.stream()
.filter(item -> item.getGetDataResponsePriority() == GetDataResponsePriority.HIGH)
.collect(Collectors.toList());
resultItems.addAll(highPrioItems);
log.info("Number of items with GetDataResponsePriority.HIGH: {}", highPrioItems.size());
log.info("Number of result items we send to requester: {}", resultItems.size());
return new HashSet<>(resultItems);
}
public Collection<PersistableNetworkPayload> getPersistableNetworkPayloadCollection() {
return getMapForDataRequest().values();
}
private Set<byte[]> getKeysAsByteSet(Map<ByteArray, ? extends PersistablePayload> map) {
return map.keySet().stream()
@ -474,30 +571,36 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
* or domain listeners.
*/
public void processGetDataResponse(GetDataResponse getDataResponse, NodeAddress sender) {
final Set<ProtectedStorageEntry> dataSet = getDataResponse.getDataSet();
Set<ProtectedStorageEntry> protectedStorageEntries = getDataResponse.getDataSet();
Set<PersistableNetworkPayload> persistableNetworkPayloadSet = getDataResponse.getPersistableNetworkPayloadSet();
long ts = System.currentTimeMillis();
protectedStorageEntries.forEach(protectedStorageEntry -> {
// We rebroadcast high priority data after a delay for better resilience
if (protectedStorageEntry.getProtectedStoragePayload().getGetDataResponsePriority() == GetDataResponsePriority.HIGH) {
UserThread.runAfter(() -> {
log.info("Rebroadcast {}", protectedStorageEntry.getProtectedStoragePayload().getClass().getSimpleName());
broadcaster.broadcast(new AddDataMessage(protectedStorageEntry), sender, null);
}, 60);
}
long ts2 = System.currentTimeMillis();
dataSet.forEach(e -> {
// We don't broadcast here (last param) as we are only connected to the seed node and would be pointless
addProtectedStorageEntry(e, sender, null, false);
addProtectedStorageEntry(protectedStorageEntry, sender, null, false);
});
log.info("Processing {} protectedStorageEntries took {} ms.", dataSet.size(), this.clock.millis() - ts2);
log.info("Processing {} protectedStorageEntries took {} ms.", protectedStorageEntries.size(), this.clock.millis() - ts);
ts2 = this.clock.millis();
ts = this.clock.millis();
persistableNetworkPayloadSet.forEach(e -> {
if (e instanceof ProcessOncePersistableNetworkPayload) {
// We use an optimized method as many checks are not required in that case to avoid
// performance issues.
// Processing 82645 items took now 61 ms compared to earlier version where it took ages (> 2min).
// Usually we only get about a few hundred or max. a few 1000 items. 82645 is all
// trade stats stats and all account age witness data.
// trade stats and all account age witness data.
// We only apply it once from first response
if (!initialRequestApplied) {
if (!initialRequestApplied || getDataResponse.isWasTruncated()) {
addPersistableNetworkPayloadFromInitialRequest(e);
}
} else {
// We don't broadcast here as we are only connected to the seed node and would be pointless
@ -505,7 +608,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
}
});
log.info("Processing {} persistableNetworkPayloads took {} ms.",
persistableNetworkPayloadSet.size(), this.clock.millis() - ts2);
persistableNetworkPayloadSet.size(), this.clock.millis() - ts);
// We only process PersistableNetworkPayloads implementing ProcessOncePersistableNetworkPayload once. It can cause performance
// issues and since the data is rarely out of sync it is not worth it to apply them from multiple peers during
@ -529,10 +632,9 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
// object when we get it sent from new peers, we dont remove the sequence number from the map.
// That way an ADD message for an already expired data will fail because the sequence number
// is equal and not larger as expected.
ArrayList<Map.Entry<ByteArray, ProtectedStorageEntry>> toRemoveList =
map.entrySet().stream()
.filter(entry -> entry.getValue().isExpired(this.clock))
.collect(Collectors.toCollection(ArrayList::new));
ArrayList<Map.Entry<ByteArray, ProtectedStorageEntry>> toRemoveList = map.entrySet().stream()
.filter(entry -> entry.getValue().isExpired(this.clock))
.collect(Collectors.toCollection(ArrayList::new));
// Batch processing can cause performance issues, so do all of the removes first, then update the listeners
// to let them know about the removes.
@ -554,14 +656,6 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
removeExpiredEntriesTimer = UserThread.runPeriodically(this::removeExpiredEntries, CHECK_TTL_INTERVAL_SEC);
}
// Domain access should use the concrete appendOnlyDataStoreService if available. The Historical data store require
// care which data should be accessed (live data or all data).
@VisibleForTesting
Map<ByteArray, PersistableNetworkPayload> getAppendOnlyDataStoreMap() {
return appendOnlyDataStoreService.getMap();
}
///////////////////////////////////////////////////////////////////////////////////////////
// MessageListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@ -586,7 +680,6 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// ConnectionListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@ -624,12 +717,6 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
});
}
@Override
public void onError(Throwable throwable) {
}
///////////////////////////////////////////////////////////////////////////////////////////
// Client API
///////////////////////////////////////////////////////////////////////////////////////////
@ -665,7 +752,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
}
ByteArray hashAsByteArray = new ByteArray(payload.getHash());
boolean payloadHashAlreadyInStore = appendOnlyDataStoreService.getMap().containsKey(hashAsByteArray);
boolean payloadHashAlreadyInStore = appendOnlyDataStoreService.getMap(payload).containsKey(hashAsByteArray);
// Store already knows about this payload. Ignore it unless the caller specifically requests a republish.
if (payloadHashAlreadyInStore && !reBroadcast) {
@ -682,13 +769,16 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
}
// Add the payload and publish the state update to the appendOnlyDataStoreListeners
boolean wasAdded = false;
if (!payloadHashAlreadyInStore) {
appendOnlyDataStoreService.put(hashAsByteArray, payload);
appendOnlyDataStoreListeners.forEach(e -> e.onAdded(payload));
wasAdded = appendOnlyDataStoreService.put(hashAsByteArray, payload);
if (wasAdded) {
appendOnlyDataStoreListeners.forEach(e -> e.onAdded(payload));
}
}
// Broadcast the payload if requested by caller
if (allowBroadcast)
if (allowBroadcast && wasAdded)
broadcaster.broadcast(new AddPersistableNetworkPayloadMessage(payload), sender);
return true;
@ -731,7 +821,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload);
log.trace("## call addProtectedStorageEntry hash={}, map={}", hashOfPayload, printMap());
//log.trace("## call addProtectedStorageEntry hash={}, map={}", hashOfPayload, printMap());
// We do that check early as it is a very common case for returning, so we return early
// If we have seen a more recent operation for this payload and we have a payload locally, ignore it
@ -776,6 +866,13 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
return false;
}
// Test against filterPredicate set from FilterManager
if (filterPredicate != null &&
!filterPredicate.test(protectedStorageEntry.getProtectedStoragePayload())) {
log.debug("filterPredicate test failed. hashOfPayload={}", hashOfPayload);
return false;
}
// This is an updated entry. Record it and signal listeners.
map.put(hashOfPayload, protectedStorageEntry);
hashMapChangedListeners.forEach(e -> e.onAdded(Collections.singletonList(protectedStorageEntry)));
@ -784,7 +881,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.getSequenceNumber(), this.clock.millis()));
requestPersistence();
log.trace("## ProtectedStorageEntry added to map. hash={}, map={}", hashOfPayload, printMap());
//log.trace("## ProtectedStorageEntry added to map. hash={}, map={}", hashOfPayload, printMap());
// Optionally, broadcast the add/update depending on the calling environment
if (allowBroadcast) {
@ -812,7 +909,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
ProtectedStoragePayload protectedStoragePayload = protectedMailboxStorageEntry.getProtectedStoragePayload();
ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload);
log.trace("## call republishProtectedStorageEntry hash={}, map={}", hashOfPayload, printMap());
//log.trace("## call republishProtectedStorageEntry hash={}, map={}", hashOfPayload, printMap());
if (hasAlreadyRemovedAddOncePayload(protectedStoragePayload, hashOfPayload)) {
log.trace("## We have already removed that AddOncePayload by a previous removeDataMessage. " +
@ -839,42 +936,48 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
public boolean refreshTTL(RefreshOfferMessage refreshTTLMessage,
@Nullable NodeAddress sender) {
ByteArray hashOfPayload = new ByteArray(refreshTTLMessage.getHashOfPayload());
ProtectedStorageEntry storedData = map.get(hashOfPayload);
try {
ByteArray hashOfPayload = new ByteArray(refreshTTLMessage.getHashOfPayload());
ProtectedStorageEntry storedData = map.get(hashOfPayload);
if (storedData == null) {
log.debug("We don't have data for that refresh message in our map. That is expected if we missed the data publishing.");
if (storedData == null) {
log.debug("We don't have data for that refresh message in our map. That is expected if we missed the data publishing.");
return false;
}
ProtectedStorageEntry storedEntry = map.get(hashOfPayload);
ProtectedStorageEntry updatedEntry = new ProtectedStorageEntry(
storedEntry.getProtectedStoragePayload(),
storedEntry.getOwnerPubKey(),
refreshTTLMessage.getSequenceNumber(),
refreshTTLMessage.getSignature(),
this.clock);
// If we have seen a more recent operation for this payload, we ignore the current one
if (!hasSequenceNrIncreased(updatedEntry.getSequenceNumber(), hashOfPayload))
return false;
// Verify the updated ProtectedStorageEntry is well formed and valid for update
if (!updatedEntry.isValidForAddOperation())
return false;
// Update the hash map with the updated entry
map.put(hashOfPayload, updatedEntry);
// Record the latest sequence number and persist it
sequenceNumberMap.put(hashOfPayload, new MapValue(updatedEntry.getSequenceNumber(), this.clock.millis()));
requestPersistence();
// Always broadcast refreshes
broadcaster.broadcast(refreshTTLMessage, sender);
} catch (IllegalArgumentException e) {
log.error("refreshTTL failed, missing data: {}", e.toString());
e.printStackTrace();
return false;
}
ProtectedStorageEntry storedEntry = map.get(hashOfPayload);
ProtectedStorageEntry updatedEntry = new ProtectedStorageEntry(
storedEntry.getProtectedStoragePayload(),
storedEntry.getOwnerPubKey(),
refreshTTLMessage.getSequenceNumber(),
refreshTTLMessage.getSignature(),
this.clock);
// If we have seen a more recent operation for this payload, we ignore the current one
if (!hasSequenceNrIncreased(updatedEntry.getSequenceNumber(), hashOfPayload))
return false;
// Verify the updated ProtectedStorageEntry is well formed and valid for update
if (!updatedEntry.isValidForAddOperation())
return false;
// Update the hash map with the updated entry
map.put(hashOfPayload, updatedEntry);
// Record the latest sequence number and persist it
sequenceNumberMap.put(hashOfPayload, new MapValue(updatedEntry.getSequenceNumber(), this.clock.millis()));
requestPersistence();
// Always broadcast refreshes
broadcaster.broadcast(refreshTTLMessage, sender);
return true;
}
@ -1012,9 +1115,9 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
ByteArray hashOfPayload = entry.getKey();
ProtectedStorageEntry protectedStorageEntry = entry.getValue();
log.trace("## removeFromMapAndDataStore: hashOfPayload={}, map before remove={}", hashOfPayload, printMap());
//log.trace("## removeFromMapAndDataStore: hashOfPayload={}, map before remove={}", hashOfPayload, printMap());
map.remove(hashOfPayload);
log.trace("## removeFromMapAndDataStore: map after remove={}", printMap());
//log.trace("## removeFromMapAndDataStore: map after remove={}", printMap());
// We inform listeners even the entry was not found in our map
removedProtectedStorageEntries.add(protectedStorageEntry);
@ -1038,20 +1141,18 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
+ newSequenceNumber + " / storedSequenceNumber=" + storedSequenceNumber + " / hashOfData=" + hashOfData.toString());*/
return true;
} else if (newSequenceNumber == storedSequenceNumber) {
String msg;
if (newSequenceNumber == 0) {
msg = "Sequence number is equal to the stored one and both are 0." +
"That is expected for network_messages which never got updated (mailbox msg).";
log.debug("Sequence number is equal to the stored one and both are 0." +
"That is expected for network_messages which never got updated (mailbox msg).");
} else {
msg = "Sequence number is equal to the stored one. sequenceNumber = "
+ newSequenceNumber + " / storedSequenceNumber=" + storedSequenceNumber;
log.debug("Sequence number is equal to the stored one. sequenceNumber = {} / storedSequenceNumber={}",
newSequenceNumber, storedSequenceNumber);
}
log.debug(msg);
return false;
} else {
log.debug("Sequence number is invalid. sequenceNumber = "
+ newSequenceNumber + " / storedSequenceNumber=" + storedSequenceNumber + "\n" +
"That can happen if the data owner gets an old delayed data storage message.");
log.debug("Sequence number is invalid. sequenceNumber = {} / storedSequenceNumber={} " +
"That can happen if the data owner gets an old delayed data storage message.",
newSequenceNumber, storedSequenceNumber);
return false;
}
} else {
@ -1131,7 +1232,6 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
return Hash.getSha256Hash(data.toProtoMessage().toByteArray());
}
///////////////////////////////////////////////////////////////////////////////////////////
// Static class
///////////////////////////////////////////////////////////////////////////////////////////
@ -1161,7 +1261,6 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
}
}
/**
* Used as key object in map for cryptographic hash of stored data as byte[] as primitive data type cannot be
* used as key
@ -1171,6 +1270,19 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
// That object is saved to disc. We need to take care of changes to not break deserialization.
public final byte[] bytes;
public ByteArray(byte[] bytes) {
this.bytes = bytes;
verifyBytesNotEmpty();
}
public void verifyBytesNotEmpty() {
if (this.bytes == null)
throw new IllegalArgumentException("Cannot create P2PDataStorage.ByteArray with null byte[] array argument.");
if (this.bytes.length == 0)
throw new IllegalArgumentException("Cannot create P2PDataStorage.ByteArray with empty byte[] array argument.");
}
@Override
public String toString() {
return "ByteArray{" +
@ -1178,11 +1290,6 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
'}';
}
public ByteArray(byte[] bytes) {
this.bytes = bytes;
}
///////////////////////////////////////////////////////////////////////////////////////////
// Protobuffer
///////////////////////////////////////////////////////////////////////////////////////////
@ -1196,7 +1303,6 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
return new ByteArray(proto.getBytes().toByteArray());
}
///////////////////////////////////////////////////////////////////////////////////////////
// Util
///////////////////////////////////////////////////////////////////////////////////////////

View file

@ -24,11 +24,15 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import javax.inject.Inject;
import org.jetbrains.annotations.NotNull;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
* Used for PersistableNetworkPayload data which gets appended to a map storage.
@ -72,21 +76,25 @@ public class AppendOnlyDataStoreService {
services.forEach(service -> service.readFromResourcesSync(postFix));
}
public Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> getMap() {
return services.stream()
.flatMap(service -> {
Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> map = service instanceof HistoricalDataStoreService ?
((HistoricalDataStoreService) service).getMapOfAllData() :
service.getMap();
return map.entrySet().stream();
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
public Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> getMap(PersistableNetworkPayload payload) {
return findService(payload)
.map(service -> service instanceof HistoricalDataStoreService ?
((HistoricalDataStoreService<?>) service).getMapOfAllData() :
service.getMap())
.orElse(new HashMap<>());
}
public void put(P2PDataStorage.ByteArray hashAsByteArray, PersistableNetworkPayload payload) {
services.stream()
public boolean put(P2PDataStorage.ByteArray hashAsByteArray, PersistableNetworkPayload payload) {
Optional<MapStoreService<? extends PersistableNetworkPayloadStore<? extends PersistableNetworkPayload>, PersistableNetworkPayload>> optionalService = findService(payload);
optionalService.ifPresent(service -> service.putIfAbsent(hashAsByteArray, payload));
return optionalService.isPresent();
}
@NotNull
private Optional<MapStoreService<? extends PersistableNetworkPayloadStore<? extends PersistableNetworkPayload>, PersistableNetworkPayload>> findService(
PersistableNetworkPayload payload) {
return services.stream()
.filter(service -> service.canHandle(payload))
.forEach(service -> service.putIfAbsent(hashAsByteArray, payload));
.findAny();
}
}