Merge pull request #388 from ivilata/ReviewNetworkCode

Review network code
This commit is contained in:
Manfred Karrer 2016-04-20 12:55:42 +02:00
commit 9f224dcac3
13 changed files with 55 additions and 30 deletions

View File

@ -7,6 +7,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
// Helps configure listener objects that are run by the `UserThread` each second
// and can do per second, per minute and delayed second actions.
public class Clock {
private static final Logger log = LoggerFactory.getLogger(Clock.class);

View File

@ -7,6 +7,7 @@ import java.util.Set;
import java.util.TimerTask;
import java.util.concurrent.CopyOnWriteArraySet;
// Runs all listener objects periodically in a short interval.
public class MasterTimer {
private final static Logger log = LoggerFactory.getLogger(MasterTimer.class);
private static final java.util.Timer timer = new java.util.Timer();

View File

@ -27,6 +27,7 @@ import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
// Helps run delayed and periodic actions in the caller thread.
public class UserThread {
private static final Logger log = LoggerFactory.getLogger(UserThread.class);
private static Class<? extends Timer> timerClass;

View File

@ -14,6 +14,8 @@ import java.util.List;
public class FileUtil {
private static final Logger log = LoggerFactory.getLogger(FileUtil.class);
/** Number of copies to keep in backup directory. */
private static final int KEPT_BACKUPS = 10;
public static void rollingBackup(File dir, String fileName) {
if (dir.exists()) {
@ -30,7 +32,7 @@ public class FileUtil {
File backupFileDir = new File(Paths.get(backupDir.getAbsolutePath(), dirName).toString());
if (!backupFileDir.exists())
if (!backupFileDir.mkdir())
log.warn("make backupFileDir failed");
log.warn("make backupFileDir failed.\nBackupFileDir=" + backupFileDir.getAbsolutePath());
File backupFile = new File(Paths.get(backupFileDir.getAbsolutePath(), new Date().getTime() + "_" + fileName).toString());
@ -39,7 +41,7 @@ public class FileUtil {
pruneBackup(backupFileDir);
} catch (IOException e) {
log.error("Backup key failed " + e.getMessage());
log.error("Backup key failed: " + e.getMessage());
e.printStackTrace();
}
}
@ -51,7 +53,7 @@ public class FileUtil {
File[] files = backupDir.listFiles();
if (files != null) {
List<File> filesList = Arrays.asList(files);
if (filesList.size() > 10) {
if (filesList.size() > KEPT_BACKUPS) {
filesList.sort((o1, o2) -> o1.getName().compareTo(o2.getName()));
File file = filesList.get(0);
if (file.isFile()) {

View File

@ -57,7 +57,7 @@ public final class DecryptedMsgWithPubKey implements Persistable {
@Override
public String toString() {
return "DecryptedMsgWithPubKey{" +
", message=" + message +
"message=" + message +
", signaturePubKey.hashCode()=" + signaturePubKey.hashCode() +
'}';
}

View File

@ -22,14 +22,15 @@ public class HttpClient {
URL url = new URL(baseUrl + param);
connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("GET");
connection.setConnectTimeout(10000);
connection.setReadTimeout(10000);
connection.setConnectTimeout(10_000);
connection.setReadTimeout(10_000);
if (connection.getResponseCode() == 200) {
return convertInputStreamToString(connection.getInputStream());
} else {
String error = convertInputStreamToString(connection.getErrorStream());
connection.getErrorStream().close();
throw new HttpException(convertInputStreamToString(connection.getErrorStream()));
throw new HttpException(error);
}
} finally {
if (connection != null)

View File

@ -30,7 +30,7 @@ public final class NodeAddress implements Persistable, Payload {
return hostName + ":" + port;
}
// We use just a few chars form or address to blur the potential receiver for sent messages
// We use just a few chars from the full address to blur the potential receiver for sent messages
public byte[] getAddressPrefixHash() {
if (addressPrefixHash == null)
addressPrefixHash = Hash.getHash(getFullAddress().substring(0, Math.min(2, getFullAddress().length())));

View File

@ -109,8 +109,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
this.torDir = torDir;
this.clock = clock;
optionalEncryptionService = encryptionService == null ? Optional.empty() : Optional.of(encryptionService);
optionalKeyRing = keyRing == null ? Optional.empty() : Optional.of(keyRing);
optionalEncryptionService = Optional.ofNullable(encryptionService);
optionalKeyRing = Optional.ofNullable(keyRing);
init(useLocalhost, networkId, storageDir);
}

View File

@ -167,14 +167,16 @@ public class Connection implements MessageListener {
try {
Log.traceCall();
// Throttle outbound messages
if (System.currentTimeMillis() - lastSendTimeStamp < 20) {
log.info("We got 2 sendMessage requests in less then 20 ms. We set the thread to sleep " +
"for 50 ms to avoid that we flood our peer. lastSendTimeStamp={}, now={}, elapsed={}",
lastSendTimeStamp, System.currentTimeMillis(), (System.currentTimeMillis() - lastSendTimeStamp));
long now = System.currentTimeMillis();
long elapsed = now - lastSendTimeStamp;
if (elapsed < 20) {
log.info("We got 2 sendMessage requests in less than 20 ms. We set the thread to sleep " +
"for 50 ms to avoid flooding our peer. lastSendTimeStamp={}, now={}, elapsed={}",
lastSendTimeStamp, now, elapsed);
Thread.sleep(50);
}
lastSendTimeStamp = System.currentTimeMillis();
lastSendTimeStamp = now;
String peersNodeAddress = peersNodeAddressOptional.isPresent() ? peersNodeAddressOptional.get().toString() : "null";
int size = ByteArrayUtils.objectToByteArray(message).length;
@ -572,7 +574,7 @@ public class Connection implements MessageListener {
@Override
public String toString() {
return "SharedSpace{" +
", socket=" + socket +
"socket=" + socket +
", ruleViolations=" + ruleViolations +
'}';
}
@ -583,7 +585,9 @@ public class Connection implements MessageListener {
// InputHandler
///////////////////////////////////////////////////////////////////////////////////////////
// Runs in same thread as Connection
// Runs in same thread as Connection, receives a message, performs several checks on it
// (including throttling limits, validity and statistics)
// and delivers it to the message listener given in the constructor.
private static class InputHandler implements Runnable {
private static final Logger log = LoggerFactory.getLogger(InputHandler.class);
@ -629,8 +633,8 @@ public class Connection implements MessageListener {
long now = System.currentTimeMillis();
long elapsed = now - lastReadTimeStamp;
if (elapsed < 10) {
log.info("We got 2 messages received in less then 10 ms. We set the thread to sleep " +
"for 20 ms to avoid that we get flooded from our peer. lastReadTimeStamp={}, now={}, elapsed={}",
log.info("We got 2 messages received in less than 10 ms. We set the thread to sleep " +
"for 20 ms to avoid getting flooded by our peer. lastReadTimeStamp={}, now={}, elapsed={}",
lastReadTimeStamp, now, elapsed);
Thread.sleep(20);
}
@ -728,13 +732,13 @@ public class Connection implements MessageListener {
if (!(message instanceof KeepAliveMessage))
connection.statistic.updateLastActivityTimestamp();
// First a seed node gets a message form a peer (PreliminaryDataRequest using
// AnonymousMessage interface) which does not has its hidden service
// published, so does not know its address. As the IncomingConnection does not has the
// First a seed node gets a message from a peer (PreliminaryDataRequest using
// AnonymousMessage interface) which does not have its hidden service
// published, so it does not know its address. As the IncomingConnection does not have the
// peersNodeAddress set that connection cannot be used for outgoing messages until we
// get the address set.
// At the data update message (DataRequest using SendersNodeAddressMessage interface)
// after the HS is published we get the peers address set.
// after the HS is published we get the peer's address set.
// There are only those messages used for new connections to a peer:
// 1. PreliminaryDataRequest
@ -796,4 +800,4 @@ public class Connection implements MessageListener {
'}';
}
}
}
}

View File

@ -60,6 +60,8 @@ public abstract class NetworkNode implements MessageListener {
// API
///////////////////////////////////////////////////////////////////////////////////////////
// Calls this (and other registered) setup listener's ``onTorNodeReady()`` and ``onHiddenServicePublished``
// when the events happen.
abstract public void start(@Nullable SetupListener setupListener);
public SettableFuture<Connection> sendMessage(@NotNull NodeAddress peersNodeAddress, Message message) {

View File

@ -25,6 +25,8 @@ import static com.google.common.base.Preconditions.checkArgument;
public class SeedNode {
private static final Logger log = LoggerFactory.getLogger(SeedNode.class);
public static final int MAX_CONNECTIONS_LIMIT = 1000;
public static final int MAX_CONNECTIONS_DEFAULT = 50;
private NodeAddress mySeedNodeAddress = new NodeAddress("localhost:8001");
private boolean useLocalhost = false;
@ -66,11 +68,11 @@ public class SeedNode {
String arg2 = args[2];
int maxConnections = Integer.parseInt(arg2);
log.info("From processArgs: maxConnections=" + maxConnections);
checkArgument(maxConnections < 1000, "maxConnections seems to be a bit too high...");
checkArgument(maxConnections < MAX_CONNECTIONS_LIMIT, "maxConnections seems to be a bit too high...");
PeerManager.setMaxConnections(maxConnections);
} else {
// we keep default a higher connection size for seed nodes
PeerManager.setMaxConnections(50);
PeerManager.setMaxConnections(MAX_CONNECTIONS_DEFAULT);
}
if (args.length > 3) {
String arg3 = args[3];

View File

@ -11,9 +11,10 @@ import java.util.stream.Collectors;
public class SeedNodesRepository {
private static final Logger log = LoggerFactory.getLogger(SeedNodesRepository.class);
// mainnet use port 8000
// testnet use port 8001
// regtest use port 8002
// Addresses are used if their port match the network id:
// - mainnet uses port 8000
// - testnet uses port 8001
// - regtest uses port 8002
private Set<NodeAddress> torSeedNodeAddresses = Sets.newHashSet(
// In alpha we change the network with new releases. That will be faded out once we become backwards compatible (Beta)
@ -51,6 +52,10 @@ public class SeedNodesRepository {
new NodeAddress("mfla72c4igh5ta2t.onion:8002")
);
// Addresses are used if the last digit of their port match the network id:
// - mainnet use port ends in 0
// - testnet use port ends in 1
// - regtest use port ends in 2
private Set<NodeAddress> localhostSeedNodeAddresses = Sets.newHashSet(
// mainnet
new NodeAddress("localhost:2000"),

View File

@ -40,6 +40,8 @@ import java.util.concurrent.TimeUnit;
// Run in UserThread
public class P2PDataStorage implements MessageListener, ConnectionListener {
private static final Logger log = LoggerFactory.getLogger(P2PDataStorage.class);
/** How many days to keep an entry before it is purged. */
public static final int PURGE_AGE_DAYS = 10;
@VisibleForTesting
public static int CHECK_TTL_INTERVAL_SEC = Timer.STRESS_TEST ? 5 : 60;
@ -433,6 +435,8 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
return checkSignature(protectedStorageEntry.ownerPubKey, hashOfDataAndSeqNr, protectedStorageEntry.signature);
}
// Check that the pubkey of the storage entry matches the allowed pubkey for the addition or removal operation
// in the contained mailbox message, or the pubkey of other kinds of messages.
private boolean checkPublicKeys(ProtectedStorageEntry protectedStorageEntry, boolean isAddOperation) {
boolean result;
if (protectedStorageEntry.getStoragePayload() instanceof MailboxStoragePayload) {
@ -493,9 +497,10 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
return new ByteArray(Hash.getHash(data));
}
// Get a new map with entries older than PURGE_AGE_DAYS purged from the given map.
private HashMap<ByteArray, MapValue> getPurgedSequenceNumberMap(HashMap<ByteArray, MapValue> persisted) {
HashMap<ByteArray, MapValue> purged = new HashMap<>();
long maxAgeTs = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(10);
long maxAgeTs = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(PURGE_AGE_DAYS);
persisted.entrySet().stream().forEach(entry -> {
if (entry.getValue().timeStamp > maxAgeTs)
purged.put(entry.getKey(), entry.getValue());