Handle offer removal on disconnect, cleanup

This commit is contained in:
Manfred Karrer 2016-02-17 17:33:53 +01:00
parent db363fac48
commit 17c780639f
69 changed files with 377 additions and 367 deletions

View File

@ -60,6 +60,7 @@ public abstract class AppModule extends AbstractModule {
*
* @param injector the Injector originally initialized with this module
*/
@SuppressWarnings("WeakerAccess")
protected void doClose(Injector injector) {
}
}

View File

@ -27,7 +27,7 @@ import ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy;
import org.slf4j.LoggerFactory;
public class Log {
public static boolean PRINT_TRACE_METHOD = true;
private static boolean PRINT_TRACE_METHOD = true;
private static SizeBasedTriggeringPolicy triggeringPolicy;
private static Logger logbackLogger;

View File

@ -5,11 +5,12 @@ import io.bitsquare.app.Version;
import java.io.Serializable;
import java.util.Arrays;
// Util for comparing byte arrays
public class ByteArray implements Serializable {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.P2P_NETWORK_VERSION;
public final byte[] bytes;
private final byte[] bytes;
public ByteArray(byte[] bytes) {
this.bytes = bytes;

View File

@ -55,6 +55,7 @@ public class UserThread {
return UserThread.runAfterRandomDelay(runnable, minDelayInSec, maxDelayInSec, TimeUnit.SECONDS);
}
@SuppressWarnings("WeakerAccess")
public static Timer runAfterRandomDelay(Runnable runnable, long minDelay, long maxDelay, TimeUnit timeUnit) {
return UserThread.runAfter(runnable, new Random().nextInt((int) (maxDelay - minDelay)) + minDelay, timeUnit);
}
@ -70,7 +71,7 @@ public class UserThread {
public void run() {
Thread.currentThread().setName("TimerTask-" + new Random().nextInt(10000));
try {
UserThread.execute(() -> runnable.run());
UserThread.execute(runnable::run);
} catch (Throwable t) {
t.printStackTrace();
log.error("Executing timerTask failed. " + t.getMessage());

View File

@ -40,12 +40,12 @@ public class Encryption {
private static final Logger log = LoggerFactory.getLogger(Encryption.class);
public static final String ASYM_KEY_ALGO = "RSA"; // RSA/NONE/OAEPWithSHA256AndMGF1Padding
public static final String ASYM_CIPHER = "RSA";
private static final String ASYM_CIPHER = "RSA";
public static final String SYM_KEY_ALGO = "AES"; // AES/CTR/NoPadding
public static final String SYM_CIPHER = "AES";
private static final String SYM_KEY_ALGO = "AES"; // AES/CTR/NoPadding
private static final String SYM_CIPHER = "AES";
public static final String HMAC = "HmacSHA256";
private static final String HMAC = "HmacSHA256";
public static KeyPair generateKeyPair() {
long ts = System.currentTimeMillis();
@ -66,7 +66,7 @@ public class Encryption {
// Symmetric
///////////////////////////////////////////////////////////////////////////////////////////
public static byte[] encrypt(byte[] payload, SecretKey secretKey) throws CryptoException {
private static byte[] encrypt(byte[] payload, SecretKey secretKey) throws CryptoException {
try {
Cipher cipher = Cipher.getInstance(SYM_CIPHER, "BC");
cipher.init(Cipher.ENCRYPT_MODE, secretKey);
@ -77,7 +77,7 @@ public class Encryption {
}
}
public static byte[] decrypt(byte[] encryptedPayload, SecretKey secretKey) throws CryptoException {
private static byte[] decrypt(byte[] encryptedPayload, SecretKey secretKey) throws CryptoException {
try {
Cipher cipher = Cipher.getInstance(SYM_CIPHER, "BC");
cipher.init(Cipher.DECRYPT_MODE, secretKey);
@ -123,7 +123,7 @@ public class Encryption {
}
private static boolean verifyHmac(byte[] message, byte[] hmac, SecretKey secretKey) throws CryptoException {
private static boolean verifyHmac(byte[] message, byte[] hmac, SecretKey secretKey) {
try {
byte[] hmacTest = getHmac(message, secretKey);
return Arrays.equals(hmacTest, hmac);
@ -144,15 +144,15 @@ public class Encryption {
// Symmetric with Hmac
///////////////////////////////////////////////////////////////////////////////////////////
public static byte[] encryptPayloadWithHmac(Serializable object, SecretKey secretKey) throws CryptoException {
private static byte[] encryptPayloadWithHmac(Serializable object, SecretKey secretKey) throws CryptoException {
return encryptPayloadWithHmac(Utilities.serialize(object), secretKey);
}
public static byte[] encryptPayloadWithHmac(byte[] payload, SecretKey secretKey) throws CryptoException {
private static byte[] encryptPayloadWithHmac(byte[] payload, SecretKey secretKey) throws CryptoException {
return encrypt(getPayloadWithHmac(payload, secretKey), secretKey);
}
public static byte[] decryptPayloadWithHmac(byte[] encryptedPayloadWithHmac, SecretKey secretKey) throws CryptoException {
private static byte[] decryptPayloadWithHmac(byte[] encryptedPayloadWithHmac, SecretKey secretKey) throws CryptoException {
byte[] payloadWithHmac = decrypt(encryptedPayloadWithHmac, secretKey);
String payloadWithHmacAsHex = Hex.toHexString(payloadWithHmac);
// first part is raw message
@ -173,7 +173,7 @@ public class Encryption {
// Asymmetric
///////////////////////////////////////////////////////////////////////////////////////////
public static byte[] encrypt(byte[] payload, PublicKey publicKey) throws CryptoException {
private static byte[] encrypt(byte[] payload, PublicKey publicKey) throws CryptoException {
try {
Cipher cipher = Cipher.getInstance(ASYM_CIPHER, "BC");
cipher.init(Cipher.ENCRYPT_MODE, publicKey);
@ -184,7 +184,7 @@ public class Encryption {
}
}
public static byte[] decrypt(byte[] encryptedPayload, PrivateKey privateKey) throws CryptoException {
private static byte[] decrypt(byte[] encryptedPayload, PrivateKey privateKey) throws CryptoException {
try {
Cipher cipher = Cipher.getInstance(ASYM_CIPHER, "BC");
cipher.init(Cipher.DECRYPT_MODE, privateKey);

View File

@ -136,7 +136,7 @@ public class KeyStorage {
savePrivateKey(keyRing.getEncryptionKeyPair().getPrivate(), KeyEntry.MSG_ENCRYPTION.getFileName());
}
public void savePrivateKey(PrivateKey privateKey, String name) {
private void savePrivateKey(PrivateKey privateKey, String name) {
if (!storageDir.exists())
storageDir.mkdir();

View File

@ -36,7 +36,7 @@ public class Sig {
private static final Logger log = LoggerFactory.getLogger(Sig.class);
public static final String KEY_ALGO = "DSA";
public static final String ALGO = "SHA256withDSA";
private static final String ALGO = "SHA256withDSA";
/**
@ -99,7 +99,6 @@ public class Sig {
* @throws SignatureException
*/
public static boolean verify(PublicKey publicKey, byte[] data, byte[] signature) throws CryptoException {
byte[] sigAsBytes = new byte[0];
try {
Signature sig = Signature.getInstance(ALGO, "BC");
sig.initVerify(publicKey);

View File

@ -74,7 +74,7 @@ class DesktopUtil {
}
if (!Desktop.getDesktop().isSupported(Desktop.Action.BROWSE)) {
logErr("BORWSE is not supported.");
logErr("BROWSE is not supported.");
return false;
}

View File

@ -20,7 +20,7 @@ package io.bitsquare.common.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Profiler {
class Profiler {
private static final Logger log = LoggerFactory.getLogger(Profiler.class);
public static void printSystemLoad(Logger log) {

View File

@ -75,9 +75,7 @@ public class Utilities {
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTimeInSec,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(maximumPoolSize), threadFactory);
executor.allowCoreThreadTimeOut(true);
executor.setRejectedExecutionHandler((r, e) -> {
log.warn("RejectedExecutionHandler called");
});
executor.setRejectedExecutionHandler((r, e) -> log.warn("RejectedExecutionHandler called"));
return executor;
}
@ -96,9 +94,7 @@ public class Utilities {
executor.allowCoreThreadTimeOut(true);
executor.setMaximumPoolSize(maximumPoolSize);
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
executor.setRejectedExecutionHandler((r, e) -> {
log.warn("RejectedExecutionHandler called");
});
executor.setRejectedExecutionHandler((r, e) -> log.warn("RejectedExecutionHandler called"));
return executor;
}
@ -292,8 +288,10 @@ public class Utilities {
public static void deleteDirectory(File file) throws IOException {
if (file.isDirectory()) {
for (File c : file.listFiles())
deleteDirectory(c);
File[] files = file.listFiles();
if (files != null)
for (File c : files)
deleteDirectory(c);
}
if (!file.delete())
throw new FileNotFoundException("Failed to delete file: " + file);

View File

@ -15,26 +15,8 @@
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
/**
* Copyright 2013 Google Inc.
* Copyright 2014 Andreas Schildbach
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.bitsquare.storage;
import com.google.common.io.Files;
import io.bitsquare.common.UserThread;
import io.bitsquare.common.util.Utilities;
@ -50,13 +32,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Borrowed from BitcoinJ WalletFiles
* A class that handles atomic and optionally delayed writing of a file to disk.
* It can be useful to delay writing of a file to disk on slow devices.
* By coalescing writes and doing serialization
* and disk IO on a background thread performance can be improved.
*/
public class FileManager<T> {
private static final Logger log = LoggerFactory.getLogger(FileManager.class);
@ -166,7 +141,7 @@ public class FileManager<T> {
/**
* Shut down auto-saving.
*/
public void shutDown() {
void shutDown() {
executor.shutdown();
try {
executor.awaitTermination(5, TimeUnit.SECONDS);

View File

@ -109,7 +109,7 @@ public class Storage<T extends Serializable> {
}
// Save delayed and on a background thread
public void queueUpForSave(T serializable) {
private void queueUpForSave(T serializable) {
if (serializable != null) {
log.trace("save " + fileName);
checkNotNull(storageFile, "storageFile = null. Call setupFileStorage before using read/write.");

View File

@ -18,7 +18,7 @@
package io.bitsquare.alert;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.storage.data.StorageMessage;
import io.bitsquare.p2p.storage.messages.StorageMessage;
import java.security.PublicKey;
import java.util.concurrent.TimeUnit;

View File

@ -20,7 +20,7 @@ package io.bitsquare.alert;
import com.google.inject.Inject;
import io.bitsquare.common.crypto.KeyRing;
import io.bitsquare.p2p.storage.HashMapChangedListener;
import io.bitsquare.p2p.storage.data.ProtectedData;
import io.bitsquare.p2p.storage.ProtectedData;
import io.bitsquare.user.User;
import javafx.beans.property.ObjectProperty;
import javafx.beans.property.ReadOnlyObjectProperty;
@ -30,7 +30,6 @@ import org.bitcoinj.core.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.math.BigInteger;
import java.security.SignatureException;
@ -61,20 +60,18 @@ public class AlertManager {
alertService.addHashSetChangedListener(new HashMapChangedListener() {
@Override
public void onAdded(ProtectedData entry) {
Serializable data = entry.expirableMessage;
if (data instanceof Alert) {
Alert alert = (Alert) data;
public void onAdded(ProtectedData data) {
if (data.expirableMessage instanceof Alert) {
Alert alert = (Alert) data.expirableMessage;
if (verifySignature(alert))
alertMessageProperty.set(alert);
}
}
@Override
public void onRemoved(ProtectedData entry) {
Serializable data = entry.expirableMessage;
if (data instanceof Alert) {
Alert alert = (Alert) data;
public void onRemoved(ProtectedData data) {
if (data.expirableMessage instanceof Alert) {
Alert alert = (Alert) data.expirableMessage;
if (verifySignature(alert))
alertMessageProperty.set(null);
}

View File

@ -20,7 +20,7 @@ package io.bitsquare.arbitration;
import io.bitsquare.app.Version;
import io.bitsquare.common.crypto.PubKeyRing;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.storage.data.StorageMessage;
import io.bitsquare.p2p.storage.messages.StorageMessage;
import java.security.PublicKey;
import java.util.Arrays;

View File

@ -30,7 +30,7 @@ import io.bitsquare.p2p.BootstrapListener;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.P2PService;
import io.bitsquare.p2p.storage.HashMapChangedListener;
import io.bitsquare.p2p.storage.data.ProtectedData;
import io.bitsquare.p2p.storage.ProtectedData;
import io.bitsquare.user.User;
import javafx.collections.FXCollections;
import javafx.collections.ObservableMap;
@ -101,12 +101,12 @@ public class ArbitratorManager {
arbitratorService.addHashSetChangedListener(new HashMapChangedListener() {
@Override
public void onAdded(ProtectedData entry) {
public void onAdded(ProtectedData data) {
applyArbitrators();
}
@Override
public void onRemoved(ProtectedData entry) {
public void onRemoved(ProtectedData data) {
applyArbitrators();
}
});
@ -137,7 +137,7 @@ public class ArbitratorManager {
// re-publish periodically
republishArbitratorExecutor = Utilities.getScheduledThreadPoolExecutor("republishArbitrator", 1, 5, 5);
long delay = Arbitrator.TTL / 2;
republishArbitratorExecutor.scheduleAtFixedRate(() -> republishArbitrator(), delay, delay, TimeUnit.MILLISECONDS);
republishArbitratorExecutor.scheduleAtFixedRate(this::republishArbitrator, delay, delay, TimeUnit.MILLISECONDS);
}
applyArbitrators();
@ -150,7 +150,7 @@ public class ArbitratorManager {
Arbitrator registeredArbitrator = user.getRegisteredArbitrator();
if (registeredArbitrator != null) {
addArbitrator(registeredArbitrator,
() -> applyArbitrators(),
this::applyArbitrators,
log::error
);
}
@ -158,7 +158,7 @@ public class ArbitratorManager {
public void applyArbitrators() {
Map<NodeAddress, Arbitrator> map = arbitratorService.getArbitrators();
log.trace("Arbitrators . size=" + (map.values() != null ? map.values().size() : "0"));
log.trace("Arbitrators . size=" + map.values().size());
arbitratorsObservableMap.clear();
Map<NodeAddress, Arbitrator> filtered = map.values().stream()
.filter(e -> isPublicKeyInList(Utils.HEX.encode(e.getRegistrationPubKey()))
@ -176,8 +176,8 @@ public class ArbitratorManager {
// if we don't have any arbitrator anymore we set all matching
if (user.getAcceptedArbitrators().isEmpty()) {
arbitratorsObservableMap.values().stream()
.filter(arbitrator -> user.hasMatchingLanguage(arbitrator))
.forEach(arbitrator -> user.addAcceptedArbitrator(arbitrator));
.filter(user::hasMatchingLanguage)
.forEach(user::addAcceptedArbitrator);
}
}
}
@ -191,7 +191,7 @@ public class ArbitratorManager {
resultHandler.handleResult();
if (arbitratorsObservableMap.size() > 0)
UserThread.runAfter(() -> applyArbitrators(), 100, TimeUnit.MILLISECONDS);
UserThread.runAfter(this::applyArbitrators, 100, TimeUnit.MILLISECONDS);
},
errorMessageHandler::handleErrorMessage);
}

View File

@ -84,8 +84,8 @@ public class ArbitratorService {
public Map<NodeAddress, Arbitrator> getArbitrators() {
Set<Arbitrator> arbitratorSet = p2PService.getDataMap().values().stream()
.filter(e -> e.expirableMessage instanceof Arbitrator)
.map(e -> (Arbitrator) e.expirableMessage)
.filter(data -> data.expirableMessage instanceof Arbitrator)
.map(data -> (Arbitrator) data.expirableMessage)
.collect(Collectors.toSet());
Map<NodeAddress, Arbitrator> map = new HashMap<>();

View File

@ -575,7 +575,7 @@ public class DisputeManager {
}
private boolean isArbitrator(DisputeResult disputeResult) {
return walletService.getArbitratorAddressEntry().getAddressString().equals(disputeResult.getArbitratorAddressAsString());
return disputeResult.getArbitratorAddressAsString().equals(walletService.getArbitratorAddressEntry().getAddressString());
}

View File

@ -57,7 +57,7 @@ class AddressBasedCoinSelector implements CoinSelector {
}
@VisibleForTesting
static void sortOutputs(ArrayList<TransactionOutput> outputs) {
private static void sortOutputs(ArrayList<TransactionOutput> outputs) {
Collections.sort(outputs, (a, b) -> {
int depth1 = a.getParentTransactionDepthInBlocks();
int depth2 = b.getParentTransactionDepthInBlocks();
@ -92,7 +92,7 @@ class AddressBasedCoinSelector implements CoinSelector {
/**
* Sub-classes can override this to just customize whether transactions are usable, but keep age sorting.
*/
protected boolean shouldSelect(Transaction tx) {
private boolean shouldSelect(Transaction tx) {
return isInBlockChainOrPending(tx);
}

View File

@ -31,6 +31,7 @@ import io.bitsquare.btc.exceptions.TransactionVerificationException;
import io.bitsquare.btc.exceptions.WalletException;
import io.bitsquare.user.Preferences;
import org.bitcoinj.core.*;
import org.bitcoinj.crypto.DeterministicKey;
import org.bitcoinj.crypto.TransactionSignature;
import org.bitcoinj.kits.WalletAppKit;
import org.bitcoinj.script.Script;
@ -161,6 +162,7 @@ public class TradeWalletService {
// To be discussed if that introduce any privacy issues.
sendRequest.changeAddress = addressEntry.getAddress();
checkNotNull(wallet, "Wallet must not be null");
wallet.completeTx(sendRequest);
printTxWithInputs("tradingFeeTx", tradingFeeTx);
@ -248,8 +250,9 @@ public class TradeWalletService {
String changeOutputAddress = null;
if (changeOutput != null) {
changeOutputValue = changeOutput.getValue().getValue();
checkNotNull(changeOutput.getAddressFromP2PKHScript(params), "changeOutput.getAddressFromP2PKHScript(params) must not be null");
changeOutputAddress = changeOutput.getAddressFromP2PKHScript(params).toString();
Address addressFromP2PKHScript = changeOutput.getAddressFromP2PKHScript(params);
checkNotNull(addressFromP2PKHScript, "changeOutput.getAddressFromP2PKHScript(params) must not be null");
changeOutputAddress = addressFromP2PKHScript.toString();
}
return new InputsAndChangeOutput(rawInputList, changeOutputValue, changeOutputAddress);
@ -292,7 +295,7 @@ public class TradeWalletService {
log.trace("msOutputAmount " + msOutputAmount.toFriendlyString());
log.trace("takerRawInputs " + takerRawInputs.toString());
log.trace("takerChangeOutputValue " + takerChangeOutputValue);
log.trace("takerChangeAddressString " + takerChangeAddressString != null ? takerChangeAddressString : "");
log.trace("takerChangeAddressString " + takerChangeAddressString);
log.trace("buyerPubKey " + ECKey.fromPublicOnly(buyerPubKey).toString());
log.trace("sellerPubKey " + ECKey.fromPublicOnly(sellerPubKey).toString());
log.trace("arbitratorPubKey " + ECKey.fromPublicOnly(arbitratorPubKey).toString());
@ -499,6 +502,7 @@ public class TradeWalletService {
printTxWithInputs("depositTx", depositTx);
// Broadcast depositTx
checkNotNull(walletAppKit);
ListenableFuture<Transaction> broadcastComplete = walletAppKit.peerGroup().broadcastTransaction(depositTx).future();
Futures.addCallback(broadcastComplete, callback);
}
@ -552,7 +556,9 @@ public class TradeWalletService {
Script redeemScript = getMultiSigRedeemScript(buyerPubKey, sellerPubKey, arbitratorPubKey);
// MS output from prev. tx is index 0
Sha256Hash sigHash = preparedPayoutTx.hashForSignature(0, redeemScript, Transaction.SigHash.ALL, false);
ECKey.ECDSASignature sellerSignature = sellerAddressEntry.getKeyPair().sign(sigHash, aesKey).toCanonicalised();
DeterministicKey keyPair = sellerAddressEntry.getKeyPair();
checkNotNull(keyPair);
ECKey.ECDSASignature sellerSignature = keyPair.sign(sigHash, aesKey).toCanonicalised();
verifyTransaction(preparedPayoutTx);
@ -772,7 +778,9 @@ public class TradeWalletService {
// take care of sorting!
Script redeemScript = getMultiSigRedeemScript(buyerPubKey, sellerPubKey, arbitratorPubKey);
Sha256Hash sigHash = payoutTx.hashForSignature(0, redeemScript, Transaction.SigHash.ALL, false);
ECKey.ECDSASignature tradersSignature = tradersAddressEntry.getKeyPair().sign(sigHash, aesKey).toCanonicalised();
DeterministicKey keyPair = tradersAddressEntry.getKeyPair();
checkNotNull(keyPair);
ECKey.ECDSASignature tradersSignature = keyPair.sign(sigHash, aesKey).toCanonicalised();
TransactionSignature tradersTxSig = new TransactionSignature(tradersSignature, Transaction.SigHash.ALL, false);
TransactionSignature arbitratorTxSig = new TransactionSignature(ECKey.ECDSASignature.decodeFromDER(arbitratorSignature),
@ -805,6 +813,7 @@ public class TradeWalletService {
* @param callback
*/
public void broadcastTx(Transaction tx, FutureCallback<Transaction> callback) {
checkNotNull(walletAppKit);
ListenableFuture<Transaction> future = walletAppKit.peerGroup().broadcastTransaction(tx).future();
Futures.addCallback(future, callback);
}
@ -850,6 +859,7 @@ public class TradeWalletService {
* @throws VerificationException
*/
public Transaction getWalletTx(Sha256Hash txId) throws VerificationException {
checkNotNull(wallet);
return wallet.getTransaction(txId);
}
@ -858,22 +868,27 @@ public class TradeWalletService {
* is old and doesn't have that data.
*/
public int getLastBlockSeenHeight() {
checkNotNull(wallet);
return wallet.getLastBlockSeenHeight();
}
public ListenableFuture<StoredBlock> getBlockHeightFuture(Transaction transaction) {
checkNotNull(walletAppKit);
return walletAppKit.chain().getHeightFuture((int) transaction.getLockTime());
}
public int getBestChainHeight() {
checkNotNull(walletAppKit);
return walletAppKit.chain().getBestChainHeight();
}
public void addBlockChainListener(BlockChainListener blockChainListener) {
checkNotNull(walletAppKit);
walletAppKit.chain().addListener(blockChainListener);
}
public void removeBlockChainListener(BlockChainListener blockChainListener) {
checkNotNull(walletAppKit);
walletAppKit.chain().removeListener(blockChainListener);
}
@ -943,7 +958,7 @@ public class TradeWalletService {
transaction.addOutput(buyerPayoutAmount, new Address(params, buyerAddressString));
transaction.addOutput(sellerPayoutAmount, new Address(params, sellerAddressString));
if (lockTime != 0) {
log.info("We use a locktime of " + lockTime);
log.info("We use a lockTime of " + lockTime);
// When using lockTime we need to set sequenceNumber to 0
transaction.getInputs().stream().forEach(i -> i.setSequenceNumber(0));
transaction.setLockTime(lockTime);
@ -964,6 +979,7 @@ public class TradeWalletService {
private void signInput(Transaction transaction, TransactionInput input, int inputIndex) throws SigningException {
checkNotNull(input.getConnectedOutput(), "input.getConnectedOutput() must not be null");
Script scriptPubKey = input.getConnectedOutput().getScriptPubKey();
checkNotNull(wallet);
ECKey sigKey = input.getOutpoint().getConnectedKey(wallet);
checkNotNull(sigKey, "signInput: sigKey must not be null. input.getOutpoint()=" + input.getOutpoint().toString());
Sha256Hash hash = transaction.hashForSignature(inputIndex, scriptPubKey, Transaction.SigHash.ALL, false);
@ -981,6 +997,7 @@ public class TradeWalletService {
private void checkWalletConsistency() throws WalletException {
try {
log.trace("Check if wallet is consistent before commit.");
checkNotNull(wallet);
checkState(wallet.isConsistent());
} catch (Throwable t) {
t.printStackTrace();

View File

@ -120,9 +120,9 @@ public class WalletService {
Threading.USER_THREAD = UserThread.getExecutor();
Timer timeoutTimer = UserThread.runAfter(() -> {
exceptionHandler.handleException(new TimeoutException("Wallet did not initialize in " + STARTUP_TIMEOUT_SEC + " seconds."));
}, STARTUP_TIMEOUT_SEC);
Timer timeoutTimer = UserThread.runAfter(() ->
exceptionHandler.handleException(new TimeoutException("Wallet did not initialize in " +
STARTUP_TIMEOUT_SEC + " seconds.")), STARTUP_TIMEOUT_SEC);
// If seed is non-null it means we are restoring from backup.
walletAppKit = new WalletAppKit(params, walletDir, "Bitsquare") {
@ -190,7 +190,7 @@ public class WalletService {
timeoutTimer.cancel();
// onSetupCompleted in walletAppKit is not the called on the last invocations, so we add a bit of delay
UserThread.runAfter(() -> resultHandler.handleResult(), 100, TimeUnit.MILLISECONDS);
UserThread.runAfter(resultHandler::handleResult, 100, TimeUnit.MILLISECONDS);
}
};
@ -451,8 +451,7 @@ public class WalletService {
public Coin getRequiredFee(String fromAddress,
String toAddress,
Coin amount,
@Nullable KeyParameter aesKey) throws AddressFormatException, IllegalArgumentException,
InsufficientMoneyException {
@Nullable KeyParameter aesKey) throws AddressFormatException, IllegalArgumentException {
Coin fee;
try {
wallet.completeTx(getSendRequest(fromAddress, toAddress, amount, aesKey));
@ -469,7 +468,7 @@ public class WalletService {
String toAddress,
Coin amount,
@Nullable KeyParameter aesKey) throws AddressFormatException,
IllegalArgumentException, InsufficientMoneyException {
IllegalArgumentException {
Coin fee;
try {
wallet.completeTx(getSendRequestForMultipleAddresses(fromAddresses, toAddress, amount, null, aesKey));
@ -482,10 +481,10 @@ public class WalletService {
return fee;
}
public Wallet.SendRequest getSendRequest(String fromAddress,
String toAddress,
Coin amount,
@Nullable KeyParameter aesKey) throws AddressFormatException,
private Wallet.SendRequest getSendRequest(String fromAddress,
String toAddress,
Coin amount,
@Nullable KeyParameter aesKey) throws AddressFormatException,
IllegalArgumentException, InsufficientMoneyException {
Transaction tx = new Transaction(params);
Preconditions.checkArgument(Restrictions.isAboveDust(amount),
@ -505,11 +504,11 @@ public class WalletService {
return sendRequest;
}
public Wallet.SendRequest getSendRequestForMultipleAddresses(Set<String> fromAddresses,
String toAddress,
Coin amount,
@Nullable String changeAddress,
@Nullable KeyParameter aesKey) throws
private Wallet.SendRequest getSendRequestForMultipleAddresses(Set<String> fromAddresses,
String toAddress,
Coin amount,
@Nullable String changeAddress,
@Nullable KeyParameter aesKey) throws
AddressFormatException, IllegalArgumentException, InsufficientMoneyException {
Transaction tx = new Transaction(params);
Preconditions.checkArgument(Restrictions.isAboveDust(amount),
@ -520,9 +519,9 @@ public class WalletService {
sendRequest.aesKey = aesKey;
sendRequest.shuffleOutputs = false;
Set<AddressEntry> addressEntries = fromAddresses.stream()
.map(e -> getAddressEntryByAddress(e))
.filter(e -> e.isPresent())
.map(e -> e.get()).collect(Collectors.toSet());
.map(this::getAddressEntryByAddress)
.filter(Optional::isPresent)
.map(Optional::get).collect(Collectors.toSet());
if (addressEntries.isEmpty())
throw new IllegalArgumentException("No withdrawFromAddresses not found in our wallets.\n\t" +
"fromAddresses=" + fromAddresses);

View File

@ -44,8 +44,8 @@ public class MarketPriceFeed {
}
}
private static long PERIOD_FIAT = 1; // We load only the selected currency on interval. Only the first request we load all
private static long PERIOD_CRYPTO = 10; // We load the full list with 33kb so we don't want to load too often
private static final long PERIOD_FIAT = 1; // We load only the selected currency on interval. Only the first request we load all
private static final long PERIOD_CRYPTO = 10; // We load the full list with 33kb so we don't want to load too often
private final ScheduledThreadPoolExecutor executorService = Utilities.getScheduledThreadPoolExecutor("MarketPriceFeed", 5, 10, 700L);
private final Map<String, MarketPrice> cache = new HashMap<>();

View File

@ -29,7 +29,7 @@ public class Country implements Serializable {
public final String code;
public final String name;
public final Region region;
private final Region region;
public Country(String code, String name, Region region) {
this.code = code;

View File

@ -158,6 +158,7 @@ public class CurrencyUtil {
return !(isCryptoCurrency(currencyCode)) && Currency.getInstance(currencyCode) != null;
}
@SuppressWarnings("WeakerAccess")
public static boolean isCryptoCurrency(String currencyCode) {
return getSortedCryptoCurrencies().stream().filter(e -> e.getCode().equals(currencyCode)).findAny().isPresent();
}

View File

@ -33,6 +33,7 @@ public class FiatCurrency extends TradeCurrency implements Serializable {
this(Currency.getInstance(currencyCode));
}
@SuppressWarnings("WeakerAccess")
public FiatCurrency(Currency currency) {
super(currency.getCurrencyCode(), currency.getDisplayName(Preferences.getDefaultLocale()), currency.getSymbol());
this.currency = currency;

View File

@ -40,11 +40,11 @@ public class PaymentAccount implements Serializable {
protected final Date creationDate;
protected final PaymentMethod paymentMethod;
protected String accountName;
protected final List<TradeCurrency> tradeCurrencies = new ArrayList<>();
final List<TradeCurrency> tradeCurrencies = new ArrayList<>();
protected TradeCurrency selectedTradeCurrency;
@Nullable
protected Country country = null;
protected PaymentAccountContractData contractData;
PaymentAccountContractData contractData;
///////////////////////////////////////////////////////////////////////////////////////////
@ -52,7 +52,7 @@ public class PaymentAccount implements Serializable {
///////////////////////////////////////////////////////////////////////////////////////////
public PaymentAccount(PaymentMethod paymentMethod) {
protected PaymentAccount(PaymentMethod paymentMethod) {
this.paymentMethod = paymentMethod;
id = UUID.randomUUID().toString();
creationDate = new Date();
@ -108,7 +108,7 @@ public class PaymentAccount implements Serializable {
return country;
}
public void setCountry(@Nullable Country country) {
public void setCountry(Country country) {
this.country = country;
contractData.setCountryCode(country.code);
}

View File

@ -38,7 +38,7 @@ public abstract class PaymentAccountContractData implements Serializable {
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
public PaymentAccountContractData(String paymentMethodName, String id, int maxTradePeriod) {
PaymentAccountContractData(String paymentMethodName, String id, int maxTradePeriod) {
this.paymentMethodName = paymentMethodName;
this.id = id;
this.maxTradePeriod = maxTradePeriod;

View File

@ -44,7 +44,7 @@ public class SepaAccountContractData extends PaymentAccountContractData implemen
super(paymentMethod, id, maxTradePeriod);
Set<String> acceptedCountryCodesAsSet = CountryUtil.getAllSepaCountries().stream().map(e -> e.code).collect(Collectors.toSet());
acceptedCountryCodes = new ArrayList<>(acceptedCountryCodesAsSet);
acceptedCountryCodes.sort((a, b) -> a.compareTo(b));
acceptedCountryCodes.sort(String::compareTo);
}
public void setHolderName(String holderName) {

View File

@ -36,11 +36,11 @@ public abstract class SellerTrade extends Trade implements Serializable {
private static final Logger log = LoggerFactory.getLogger(BuyerAsTakerTrade.class);
public SellerTrade(Offer offer, Coin tradeAmount, NodeAddress tradingPeerNodeAddress, Storage<? extends TradableList> storage) {
SellerTrade(Offer offer, Coin tradeAmount, NodeAddress tradingPeerNodeAddress, Storage<? extends TradableList> storage) {
super(offer, tradeAmount, tradingPeerNodeAddress, storage);
}
public SellerTrade(Offer offer, Storage<? extends TradableList> storage) {
SellerTrade(Offer offer, Storage<? extends TradableList> storage) {
super(offer, storage);
}

View File

@ -554,6 +554,7 @@ abstract public class Trade implements Tradable, Model, Serializable {
this.takeOfferFeeTxId = takeOfferFeeTxId;
}
@org.jetbrains.annotations.Nullable
public String getTakeOfferFeeTxId() {
return takeOfferFeeTxId;
}

View File

@ -25,8 +25,8 @@ import io.bitsquare.common.handlers.ResultHandler;
import io.bitsquare.common.util.JsonExclude;
import io.bitsquare.locale.Country;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.storage.data.RequiresLiveOwner;
import io.bitsquare.p2p.storage.data.StorageMessage;
import io.bitsquare.p2p.storage.messages.RequiresLiveOwnerData;
import io.bitsquare.p2p.storage.messages.StorageMessage;
import io.bitsquare.payment.PaymentMethod;
import io.bitsquare.trade.protocol.availability.OfferAvailabilityModel;
import io.bitsquare.trade.protocol.availability.OfferAvailabilityProtocol;
@ -47,16 +47,14 @@ import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
public final class Offer implements StorageMessage, RequiresLiveOwner {
public final class Offer implements StorageMessage, RequiresLiveOwnerData {
// That object is sent over the wire, so we need to take care of version compatibility.
@JsonExclude
private static final long serialVersionUID = Version.P2P_NETWORK_VERSION;
@JsonExclude
private static final Logger log = LoggerFactory.getLogger(Offer.class);
//public static final long TTL = TimeUnit.MINUTES.toMillis(10);
//TODO
public static final long TTL = TimeUnit.SECONDS.toMillis(10);
public static final long TTL = TimeUnit.SECONDS.toMillis(60);
public final static String TAC_OFFERER = "When placing that offer I accept that anyone who fulfills my conditions can " +
"take that offer.";
@ -163,7 +161,7 @@ public final class Offer implements StorageMessage, RequiresLiveOwner {
public NodeAddress getOwnerNodeAddress() {
return offererNodeAddress;
}
public void validate() {
checkNotNull(getAmount(), "Amount is null");
checkNotNull(getArbitratorNodeAddresses(), "Arbitrator is null");

View File

@ -21,7 +21,7 @@ import io.bitsquare.common.handlers.ErrorMessageHandler;
import io.bitsquare.common.handlers.ResultHandler;
import io.bitsquare.p2p.P2PService;
import io.bitsquare.p2p.storage.HashMapChangedListener;
import io.bitsquare.p2p.storage.data.ProtectedData;
import io.bitsquare.p2p.storage.ProtectedData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -47,6 +47,7 @@ public class OfferBookService {
private final P2PService p2PService;
private final List<OfferBookChangedListener> offerBookChangedListeners = new LinkedList<>();
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
@ -57,20 +58,18 @@ public class OfferBookService {
p2PService.addHashSetChangedListener(new HashMapChangedListener() {
@Override
public void onAdded(ProtectedData entry) {
log.debug("OfferBookService.onAdded " + entry);
public void onAdded(ProtectedData data) {
offerBookChangedListeners.stream().forEach(listener -> {
if (entry.expirableMessage instanceof Offer)
listener.onAdded((Offer) entry.expirableMessage);
if (data.expirableMessage instanceof Offer)
listener.onAdded((Offer) data.expirableMessage);
});
}
@Override
public void onRemoved(ProtectedData entry) {
public void onRemoved(ProtectedData data) {
offerBookChangedListeners.stream().forEach(listener -> {
log.debug("OfferBookService.onRemoved " + entry);
if (entry.expirableMessage instanceof Offer)
listener.onRemoved((Offer) entry.expirableMessage);
if (data.expirableMessage instanceof Offer)
listener.onRemoved((Offer) data.expirableMessage);
});
}
});
@ -101,7 +100,7 @@ public class OfferBookService {
result = p2PService.addData(offer);
if (result) {
log.trace("Add offer to network was successful. Offer = " + offer);
log.trace("Add offer to network was successful. Offer ID = " + offer.getId());
resultHandler.handleResult();
} else {
errorMessageHandler.handleErrorMessage("Add offer failed");
@ -110,7 +109,7 @@ public class OfferBookService {
public void removeOffer(Offer offer, @Nullable ResultHandler resultHandler, @Nullable ErrorMessageHandler errorMessageHandler) {
if (p2PService.removeData(offer)) {
log.trace("Remove offer from network was successful. Offer = " + offer);
log.trace("Remove offer from network was successful. Offer ID = " + offer.getId());
if (resultHandler != null)
resultHandler.handleResult();
} else {
@ -121,8 +120,8 @@ public class OfferBookService {
public List<Offer> getOffers() {
return p2PService.getDataMap().values().stream()
.filter(e -> e.expirableMessage instanceof Offer)
.map(e -> (Offer) e.expirableMessage)
.filter(data -> data.expirableMessage instanceof Offer)
.map(data -> (Offer) data.expirableMessage)
.collect(Collectors.toList());
}

View File

@ -146,7 +146,8 @@ public class OpenOfferManager {
if (bootstrapListener != null)
p2PService.removeP2PServiceListener(bootstrapListener);
long period = (long) (Offer.TTL * 0.8); // republish sufficiently before offer would expire
// republish sufficiently before offer would expire
long period = (long) (Offer.TTL * 0.7);
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
@ -178,6 +179,7 @@ public class OpenOfferManager {
}
}
@SuppressWarnings("WeakerAccess")
public void shutDown() {
shutDown(null);
}
@ -190,8 +192,7 @@ public class OpenOfferManager {
log.info("remove all open offers at shutDown");
shutDownRequested = true;
// we remove own offers from offerbook when we go offline
//TODO
// openOffers.forEach(openOffer -> offerBookService.removeOfferAtShutDown(openOffer.getOffer()));
openOffers.forEach(openOffer -> offerBookService.removeOfferAtShutDown(openOffer.getOffer()));
if (completeHandler != null)
UserThread.runAfter(completeHandler::run, openOffers.size() * 200 + 300, TimeUnit.MILLISECONDS);
@ -230,7 +231,9 @@ public class OpenOfferManager {
log.warn("Offer was not found in our list of open offers. We still try to remove it from the offerbook.");
errorMessageHandler.handleErrorMessage("Offer was not found in our list of open offers. " +
"We still try to remove it from the offerbook.");
onRemoveOffer(offer);
offerBookService.removeOffer(offer,
() -> offer.setState(Offer.State.REMOVED),
null);
}
}
@ -247,14 +250,6 @@ public class OpenOfferManager {
errorMessageHandler);
}
// That should not be needed, but there are cases where the openOffer is removed but the offer still in the
// offerbook
public void onRemoveOffer(Offer offer) {
offerBookService.removeOffer(offer,
() -> offer.setState(Offer.State.REMOVED),
null);
}
public void reserveOpenOffer(OpenOffer openOffer) {
openOffer.setState(OpenOffer.State.RESERVED);
}
@ -283,7 +278,7 @@ public class OpenOfferManager {
openOffer.setState(OpenOffer.State.CLOSED);
offerBookService.removeOffer(openOffer.getOffer(),
() -> log.trace("Successful removed offer"),
errorMessage -> log.error(errorMessage));
log::error);
});
}

View File

@ -67,8 +67,4 @@ public class OfferAvailabilityModel implements Model {
@Override
public void onComplete() {
}
public PubKeyRing getPubKeyRing() {
return pubKeyRing;
}
}

View File

@ -30,7 +30,7 @@ public abstract class OfferMessage implements DirectMessage {
private final int messageVersion = Version.getP2PMessageVersion();
public final String offerId;
protected OfferMessage(String offerId) {
OfferMessage(String offerId) {
this.offerId = offerId;
}

View File

@ -40,7 +40,7 @@ public class SendOfferAvailabilityRequest extends Task<OfferAvailabilityModel> {
model.p2PService.sendEncryptedDirectMessage(model.getPeerNodeAddress(),
model.offer.getPubKeyRing(),
new OfferAvailabilityRequest(model.offer.getId(), model.getPubKeyRing()),
new OfferAvailabilityRequest(model.offer.getId(), model.pubKeyRing),
new SendDirectMessageListener() {
@Override
public void onArrived() {

View File

@ -64,7 +64,7 @@ public class PlaceOfferProtocol {
model.offerAddedToOfferBook = false;
log.debug("Offer removed from offer book.");
},
errorMessage2 -> log.error(errorMessage2));
log::error);
}
log.error(errorMessage);
}

View File

@ -44,8 +44,8 @@ public class SignPayoutTx extends TradeTask {
// We use the sellers LastBlockSeenHeight, which might be different to the buyers one.
// If lock time is 0 we set lockTimeAsBlockHeight to 0 to mark it as "not set".
// In the tradewallet we apply the locktime only if it is set, otherwise we use the default values for
// transaction locktime and sequence number
// In the tradeWallet we apply the lockTime only if it is set, otherwise we use the default values for
// transaction lockTime and sequence number
long lockTime = trade.getOffer().getPaymentMethod().getLockTime();
long lockTimeAsBlockHeight = 0;
if (lockTime > 0)

View File

@ -386,7 +386,7 @@ public class Preferences implements Serializable {
public boolean showAgain(String key) {
// if we add new and those are not in our stored map we display by default the new popup
if (!getShowAgainMap().containsKey(key)) {
if (!showAgainMap.containsKey(key)) {
showAgainMap.put(key, true);
storage.queueUpForSave(2000);
}

View File

@ -191,7 +191,7 @@ public class User implements Serializable {
storage.queueUpForSave();
}
public void setRegisteredArbitrator(Arbitrator arbitrator) {
public void setRegisteredArbitrator(@org.jetbrains.annotations.Nullable Arbitrator arbitrator) {
this.registeredArbitrator = arbitrator;
storage.queueUpForSave();
}

View File

@ -199,7 +199,7 @@ public class WithdrawalView extends ActivatableView<VBox, Void> {
.show();
}
} catch (AddressFormatException | InsufficientMoneyException e) {
} catch (AddressFormatException e) {
e.printStackTrace();
log.error(e.getMessage());
}

View File

@ -7,7 +7,7 @@ import java.net.URL;
// TODO route over tor
public class HttpClient implements Serializable {
private String baseUrl;
private final String baseUrl;
public HttpClient(String baseUrl) {
this.baseUrl = baseUrl;

View File

@ -24,11 +24,11 @@ import io.bitsquare.p2p.peers.peerexchange.PeerExchangeManager;
import io.bitsquare.p2p.seed.SeedNodesRepository;
import io.bitsquare.p2p.storage.HashMapChangedListener;
import io.bitsquare.p2p.storage.P2PDataStorage;
import io.bitsquare.p2p.storage.data.ExpirableMessage;
import io.bitsquare.p2p.storage.data.MailboxMessage;
import io.bitsquare.p2p.storage.data.ProtectedData;
import io.bitsquare.p2p.storage.data.ProtectedMailboxData;
import io.bitsquare.p2p.storage.ProtectedData;
import io.bitsquare.p2p.storage.ProtectedMailboxData;
import io.bitsquare.p2p.storage.messages.AddDataMessage;
import io.bitsquare.p2p.storage.messages.ExpirableMessage;
import io.bitsquare.p2p.storage.messages.MailboxMessage;
import javafx.beans.property.*;
import javafx.beans.value.ChangeListener;
import org.fxmisc.easybind.EasyBind;
@ -110,9 +110,9 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
}
private void init(boolean useLocalhost, int networkId, File storageDir) {
connectionNodeAddressListener = (observable, oldValue, newValue) -> {
UserThread.execute(() -> numConnectedPeers.set(networkNode.getNodeAddressesOfConfirmedConnections().size()));
};
connectionNodeAddressListener = (observable, oldValue, newValue) ->
UserThread.execute(() ->
numConnectedPeers.set(networkNode.getNodeAddressesOfConfirmedConnections().size()));
networkNode = useLocalhost ? new LocalhostNetworkNode(port) : new TorNetworkNode(port, torDir);
networkNode.addConnectionListener(this);
@ -364,13 +364,13 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onAdded(ProtectedData entry) {
if (entry instanceof ProtectedMailboxData)
processProtectedMailboxData((ProtectedMailboxData) entry);
public void onAdded(ProtectedData data) {
if (data instanceof ProtectedMailboxData)
processProtectedMailboxData((ProtectedMailboxData) data);
}
@Override
public void onRemoved(ProtectedData entry) {
public void onRemoved(ProtectedData data) {
}

View File

@ -59,9 +59,7 @@ public class Connection implements MessageListener {
private static final int MAX_MSG_SIZE = 100 * 1024; // 100 kb of compressed data
private static final int MSG_THROTTLE_PER_SEC = 10; // With MAX_MSG_SIZE of 100kb results in bandwidth of 10 mbit/sec
private static final int MSG_THROTTLE_PER_10SEC = 50; // With MAX_MSG_SIZE of 100kb results in bandwidth of 5 mbit/sec for 10 sec
//private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(60);
//TODO
private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(6);
private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(60);
public static int getMaxMsgSize() {
return MAX_MSG_SIZE;
@ -695,6 +693,8 @@ public class Connection implements MessageListener {
messageListener.onMessage(message, connection);
}
} catch (ClassNotFoundException | NoClassDefFoundError e) {
log.warn(e.getMessage());
e.printStackTrace();
reportInvalidRequest(RuleViolation.INVALID_DATA_TYPE);
return;
} catch (IOException e) {

View File

@ -6,6 +6,7 @@ import io.bitsquare.common.UserThread;
import io.bitsquare.common.util.Utilities;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
@ -56,7 +57,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
abstract public void start(@Nullable SetupListener setupListener);
public SettableFuture<Connection> sendMessage(@NotNull NodeAddress peersNodeAddress, Message message) {
Log.traceCall("peersNodeAddress=" + peersNodeAddress + "\n\tmessage=" + message);
Log.traceCall("peersNodeAddress=" + peersNodeAddress + "\n\tmessage=" + StringUtils.abbreviate(message.toString(), 100));
checkNotNull(peersNodeAddress, "peerAddress must not be null");
Optional<Connection> outboundConnectionOptional = lookupOutboundConnection(peersNodeAddress);
@ -127,7 +128,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
}
public SettableFuture<Connection> sendMessage(Connection connection, Message message) {
Log.traceCall("\n\tmessage=" + message + "\n\tconnection=" + connection);
Log.traceCall("\n\tmessage=" + StringUtils.abbreviate(message.toString(), 100) + "\n\tconnection=" + connection);
// connection.sendMessage might take a bit (compression, write to stream), so we use a thread to not block
ListenableFuture<Connection> future = executorService.submit(() -> {
Thread.currentThread().setName("NetworkNode:SendMessage-to-" + connection.getUid());

View File

@ -31,20 +31,19 @@ public class Broadcaster {
private final Set<Listener> listeners = new CopyOnWriteArraySet<>();
private IntegerProperty numOfBroadcasts = new SimpleIntegerProperty(0);
private final IntegerProperty numOfBroadcasts = new SimpleIntegerProperty(0);
public Broadcaster(NetworkNode networkNode) {
this.networkNode = networkNode;
}
public void broadcast(DataBroadcastMessage message, @Nullable NodeAddress sender) {
Log.traceCall("Sender=" + sender + "\n\t" +
"Message=" + StringUtils.abbreviate(message.toString(), 100));
numOfBroadcasts.set(0);
Set<Connection> receivers = networkNode.getConfirmedConnections();
if (!receivers.isEmpty()) {
log.info("Broadcast message to {} peers. Message: {}", receivers.size(), message);
log.info("Broadcast message to {} peers.", receivers.size());
receivers.stream()
.filter(connection -> !connection.getPeersNodeAddressOptional().get().equals(sender))
.forEach(connection -> {
@ -70,7 +69,7 @@ public class Broadcaster {
} else {
log.warn("Message not broadcasted because we have no available peers yet.\n\t" +
"That should never happen as broadcast should not be called in such cases.\n" +
"message = {}", message);
"message = {}", StringUtils.abbreviate(message.toString(), 100));
}
}

View File

@ -29,23 +29,23 @@ public class PeerManager implements ConnectionListener, MessageListener {
private static int MAX_CONNECTIONS;
private static int MIN_CONNECTIONS;
private static int MAX_CONNECTIONS_EXTENDED_1;
private static int MAX_CONNECTIONS_EXTENDED_2;
private static int MAX_CONNECTIONS_PEER;
private static int MAX_CONNECTIONS_NON_DIRECT;
private static int MAX_CONNECTIONS_EXTENDED_3;
private boolean printReportedPeersDetails = true;
private static int MAX_CONNECTIONS_ABSOLUTE;
private final boolean printReportedPeersDetails = true;
public static void setMaxConnections(int maxConnections) {
MAX_CONNECTIONS = maxConnections;
MIN_CONNECTIONS = Math.max(1, maxConnections - 4);
MAX_CONNECTIONS_EXTENDED_1 = MAX_CONNECTIONS + 5;
MAX_CONNECTIONS_EXTENDED_2 = MAX_CONNECTIONS + 10;
MAX_CONNECTIONS_EXTENDED_3 = MAX_CONNECTIONS + 20;
MAX_CONNECTIONS_PEER = MAX_CONNECTIONS + 5;
MAX_CONNECTIONS_NON_DIRECT = MAX_CONNECTIONS + 10;
MAX_CONNECTIONS_ABSOLUTE = MAX_CONNECTIONS + 30;
}
static {
setMaxConnections(3);
setMaxConnections(12);
}
private static final int MAX_REPORTED_PEERS = 1000;
@ -98,7 +98,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
}
public int getMaxConnections() {
return MAX_CONNECTIONS_EXTENDED_3;
return MAX_CONNECTIONS_ABSOLUTE;
}
///////////////////////////////////////////////////////////////////////////////////////////
@ -161,7 +161,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
log.info("We have {} connections open. Our limit is {}", size, limit);
if (size > limit) {
// Only InboundConnection, and PEER type connections
log.info("We have too many connections open. We try to close some.\n\t" +
log.info("We have too many connections open.\n\t" +
"Lets try first to remove the inbound connections of type PEER.");
List<Connection> candidates = allConnections.stream()
.filter(e -> e instanceof InboundConnection)
@ -169,19 +169,19 @@ public class PeerManager implements ConnectionListener, MessageListener {
.collect(Collectors.toList());
if (candidates.size() == 0) {
log.info("No candidates found. We go to the next level and check if we exceed our " +
"MAX_CONNECTIONS_EXTENDED_1 limit of {}", MAX_CONNECTIONS_EXTENDED_1);
if (size > MAX_CONNECTIONS_EXTENDED_1) {
log.info("Lets try to remove any connection of type PEER.");
log.info("No candidates found. We check if we exceed our " +
"MAX_CONNECTIONS_PEER limit of {}", MAX_CONNECTIONS_PEER);
if (size > MAX_CONNECTIONS_PEER) {
log.info("Lets try to remove ANY connection of type PEER.");
// Only PEER type connections
candidates = allConnections.stream()
.filter(e -> e.getPeerType() == Connection.PeerType.PEER)
.collect(Collectors.toList());
if (candidates.size() == 0) {
log.info("No candidates found. We go to the next level and check if we exceed our " +
"MAX_CONNECTIONS_EXTENDED_2 limit of {}", MAX_CONNECTIONS_EXTENDED_2);
if (size > MAX_CONNECTIONS_EXTENDED_2) {
log.info("No candidates found. We check if we exceed our " +
"MAX_CONNECTIONS_NON_DIRECT limit of {}", MAX_CONNECTIONS_NON_DIRECT);
if (size > MAX_CONNECTIONS_NON_DIRECT) {
log.info("Lets try to remove any connection which is not of type DIRECT_MSG_PEER.");
// All connections except DIRECT_MSG_PEER
candidates = allConnections.stream()
@ -189,9 +189,9 @@ public class PeerManager implements ConnectionListener, MessageListener {
.collect(Collectors.toList());
if (candidates.size() == 0) {
log.info("No candidates found. We go to the next level and check if we exceed our " +
"MAX_CONNECTIONS_EXTENDED_3 limit of {}", MAX_CONNECTIONS_EXTENDED_3);
if (size > MAX_CONNECTIONS_EXTENDED_3) {
log.info("No candidates found. We check if we exceed our " +
"MAX_CONNECTIONS_ABSOLUTE limit of {}", MAX_CONNECTIONS_ABSOLUTE);
if (size > MAX_CONNECTIONS_ABSOLUTE) {
log.info("Lets try to remove any connection.");
// All connections
candidates = allConnections.stream()
@ -225,7 +225,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
private void removeSuperfluousSeedNodes() {
Log.traceCall();
Set<Connection> allConnections = networkNode.getAllConnections();
if (allConnections.size() > MAX_CONNECTIONS_EXTENDED_1) {
if (allConnections.size() > MAX_CONNECTIONS_PEER) {
List<Connection> candidates = allConnections.stream()
.filter(this::isSeedNode)
.collect(Collectors.toList());
@ -266,8 +266,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
private void removeTooOldReportedPeers() {
Log.traceCall();
Set<ReportedPeer> reportedPeersToRemove = reportedPeers.stream()
.filter(reportedPeer -> reportedPeer.date != null &&
new Date().getTime() - reportedPeer.date.getTime() > MAX_AGE)
.filter(reportedPeer -> new Date().getTime() - reportedPeer.date.getTime() > MAX_AGE)
.collect(Collectors.toSet());
reportedPeersToRemove.forEach(this::removeReportedPeer);
}
@ -280,7 +279,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
printReportedPeers(reportedPeersToAdd);
// We check if the reported msg is not violating our rules
if (reportedPeersToAdd.size() <= (MAX_REPORTED_PEERS + PeerManager.MAX_CONNECTIONS_EXTENDED_3 + 10)) {
if (reportedPeersToAdd.size() <= (MAX_REPORTED_PEERS + PeerManager.MAX_CONNECTIONS_ABSOLUTE + 10)) {
reportedPeers.addAll(reportedPeersToAdd);
purgeReportedPeersIfExceeds();
@ -311,7 +310,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
}
}
public void printReportedPeers(HashSet<ReportedPeer> reportedPeers) {
private void printReportedPeers(HashSet<ReportedPeer> reportedPeers) {
if (printReportedPeersDetails) {
StringBuilder result = new StringBuilder("We received now reportedPeers:");
reportedPeers.stream().forEach(e -> result.append("\n\t").append(e));
@ -351,8 +350,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
private void removeTooOldPersistedPeers() {
Log.traceCall();
Set<ReportedPeer> persistedPeersToRemove = persistedPeers.stream()
.filter(reportedPeer -> reportedPeer.date != null &&
new Date().getTime() - reportedPeer.date.getTime() > MAX_AGE)
.filter(reportedPeer -> new Date().getTime() - reportedPeer.date.getTime() > MAX_AGE)
.collect(Collectors.toSet());
persistedPeersToRemove.forEach(this::removePersistedPeer);
}
@ -447,7 +445,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
private void purgeReportedPeersIfExceeds() {
Log.traceCall();
int size = getReportedPeers().size();
int limit = MAX_REPORTED_PEERS - MAX_CONNECTIONS_EXTENDED_3;
int limit = MAX_REPORTED_PEERS - MAX_CONNECTIONS_ABSOLUTE;
if (size > limit) {
log.trace("We have already {} reported peers which exceeds our limit of {}." +
"We remove random peers from the reported peers list.", size, limit);
@ -466,7 +464,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
private void purgePersistedPeersIfExceeds() {
Log.traceCall();
int size = getPersistedPeers().size();
int limit = MAX_REPORTED_PEERS - MAX_CONNECTIONS_EXTENDED_3;
int limit = MAX_PERSISTED_PEERS - MAX_CONNECTIONS_ABSOLUTE;
if (size > limit) {
log.trace("We have already {} persisted peers which exceeds our limit of {}." +
"We remove random peers from the persisted peers list.", size, limit);
@ -478,7 +476,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
removePersistedPeer(toRemove);
}
} else {
log.trace("No need to purge persisted peers.\n\tWe don't have more then {} persisted peers yet.", MAX_REPORTED_PEERS);
log.trace("No need to purge persisted peers.\n\tWe don't have more then {} persisted peers yet.", MAX_PERSISTED_PEERS);
}
}
@ -487,7 +485,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
return list.remove(new Random().nextInt(list.size()));
}
public Set<ReportedPeer> getConnectedPeers() {
private Set<ReportedPeer> getConnectedPeers() {
// networkNode.getConfirmedConnections includes:
// filter(connection -> connection.getPeersNodeAddressOptional().isPresent())
return networkNode.getConfirmedConnections().stream()
@ -495,7 +493,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
.collect(Collectors.toSet());
}
public HashSet<ReportedPeer> getConnectedPeersNonSeedNodes() {
private HashSet<ReportedPeer> getConnectedPeersNonSeedNodes() {
return new HashSet<>(getConnectedPeers().stream()
.filter(e -> !isSeedNode(e))
.collect(Collectors.toSet()));

View File

@ -243,7 +243,6 @@ public class RequestDataManager implements MessageListener {
!peerManager.isSelf(e))
.collect(Collectors.toList())
.stream()
.filter(e -> e.date != null)
.sorted((o1, o2) -> o2.date.compareTo(o1.date))
.map(e -> e.nodeAddress)
.collect(Collectors.toList());

View File

@ -2,7 +2,7 @@ package io.bitsquare.p2p.peers.getdata.messages;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.storage.data.ProtectedData;
import io.bitsquare.p2p.storage.ProtectedData;
import java.util.HashSet;

View File

@ -5,6 +5,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import io.bitsquare.app.Log;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.CloseConnectionReason;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.MessageListener;
@ -16,10 +17,13 @@ import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.Random;
class KeepAliveHandler implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(KeepAliveHandler.class);
@Nullable
private Connection connection;
@ -31,6 +35,8 @@ class KeepAliveHandler implements MessageListener {
void onComplete();
void onFault(String errorMessage, Connection connection);
void onFault(String errorMessage, NodeAddress nodeAddress);
}
@ -90,6 +96,34 @@ class KeepAliveHandler implements MessageListener {
});
}
public void sendPing(NodeAddress nodeAddress) {
Log.traceCall("nodeAddress=" + nodeAddress + " / this=" + this);
Ping ping = new Ping(nonce);
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, ping);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
if (connection != null) {
KeepAliveHandler.this.connection = connection;
connection.addMessageListener(KeepAliveHandler.this);
}
log.trace("Send " + ping + " to " + nodeAddress + " succeeded.");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
String errorMessage = "Sending ping to " + nodeAddress +
" failed. That is expected if the peer is offline.\n\tping=" + ping +
".\n\tException=" + throwable.getMessage();
log.info(errorMessage);
cleanup();
peerManager.shutDownConnection(nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE);
listener.onFault(errorMessage, nodeAddress);
}
});
}
///////////////////////////////////////////////////////////////////////////////////////////
// MessageListener implementation

View File

@ -8,6 +8,7 @@ import io.bitsquare.app.Log;
import io.bitsquare.common.UserThread;
import io.bitsquare.common.util.Utilities;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.*;
import io.bitsquare.p2p.peers.PeerManager;
import io.bitsquare.p2p.peers.keepalive.messages.Ping;
@ -18,15 +19,14 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class KeepAliveManager implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(KeepAliveManager.class);
//private static final int INTERVAL_SEC = new Random().nextInt(10) + 10;
//TODO
private static final int INTERVAL_SEC = 3;
private static final int INTERVAL_SEC = new Random().nextInt(10) + 10;
private final NetworkNode networkNode;
private final PeerManager peerManager;
@ -115,23 +115,28 @@ public class KeepAliveManager implements MessageListener {
networkNode.getConfirmedConnections().stream()
.filter(connection -> connection instanceof OutboundConnection)
.forEach(connection -> {
if (!maintenanceHandlerMap.containsKey(connection.getUid())) {
if (!maintenanceHandlerMap.containsKey(getKey(connection))) {
KeepAliveHandler keepAliveHandler = new KeepAliveHandler(networkNode, peerManager, new KeepAliveHandler.Listener() {
@Override
public void onComplete() {
maintenanceHandlerMap.remove(connection.getUid());
maintenanceHandlerMap.remove(getKey(connection));
}
@Override
public void onFault(String errorMessage, Connection connection) {
maintenanceHandlerMap.remove(connection.getUid());
maintenanceHandlerMap.remove(getKey(connection));
}
@Override
public void onFault(String errorMessage, NodeAddress nodeAddress) {
maintenanceHandlerMap.remove(nodeAddress.getFullAddress());
}
});
maintenanceHandlerMap.put(connection.getUid(), keepAliveHandler);
maintenanceHandlerMap.put(getKey(connection), keepAliveHandler);
keepAliveHandler.sendPing(connection);
} else {
log.warn("Connection with id {} has not completed and is still in our map. " +
"We will try to ping that peer at the next schedule.", connection.getUid());
"We will try to ping that peer at the next schedule.", getKey(connection));
}
});
@ -142,4 +147,14 @@ public class KeepAliveManager implements MessageListener {
"maintenanceHandlerMap size={}, peerManager.getMaxConnections()={}", size, peerManager.getMaxConnections());
}
}
private String getKey(Connection connection) {
if (connection.getPeersNodeAddressOptional().isPresent()) {
return connection.getPeersNodeAddressOptional().get().getFullAddress();
} else {
// TODO not sure if that can be the case, but handle it otherwise we get an exception
log.warn("!connection.getPeersNodeAddressOptional().isPresent(). That should not happen.");
return "null";
}
}
}

View File

@ -25,6 +25,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
private static final Logger log = LoggerFactory.getLogger(PeerExchangeManager.class);
private static final long RETRY_DELAY_SEC = 60;
private static final long RETRY_DELAY_AFTER_ALL_CON_LOST_SEC = 3;
private static final long REQUEST_PERIODICALLY_INTERVAL_MINUTES = 10;
private final NetworkNode networkNode;
@ -91,12 +92,18 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
if (connectToMorePeersTimer == null)
boolean lostAllConnections = networkNode.getAllConnections().isEmpty();
if (lostAllConnections || connectToMorePeersTimer == null) {
long delaySec = lostAllConnections ? RETRY_DELAY_AFTER_ALL_CON_LOST_SEC : RETRY_DELAY_SEC;
if (lostAllConnections && connectToMorePeersTimer != null)
connectToMorePeersTimer.cancel();
connectToMorePeersTimer = UserThread.runAfter(() -> {
log.trace("ConnectToMorePeersTimer called from onDisconnect code path");
stopConnectToMorePeersTimer();
requestWithAvailablePeers();
}, RETRY_DELAY_SEC);
}, delaySec);
}
}
@Override
@ -115,7 +122,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
if (peerManager.isSeedNode(connection))
connection.setPeerType(Connection.PeerType.SEED_NODE);
GetPeersRequestHandler getPeersRequestHandler = new GetPeersRequestHandler(networkNode,
peerManager,
new GetPeersRequestHandler.Listener() {

View File

@ -1,9 +1,7 @@
package io.bitsquare.p2p.storage;
import io.bitsquare.p2p.storage.data.ProtectedData;
public interface HashMapChangedListener {
void onAdded(ProtectedData entry);
void onAdded(ProtectedData data);
void onRemoved(ProtectedData entry);
void onRemoved(ProtectedData data);
}

View File

@ -8,16 +8,13 @@ import io.bitsquare.common.UserThread;
import io.bitsquare.common.crypto.CryptoException;
import io.bitsquare.common.crypto.Hash;
import io.bitsquare.common.crypto.Sig;
import io.bitsquare.common.util.Tuple2;
import io.bitsquare.common.util.Utilities;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.*;
import io.bitsquare.p2p.peers.Broadcaster;
import io.bitsquare.p2p.storage.data.*;
import io.bitsquare.p2p.storage.messages.AddDataMessage;
import io.bitsquare.p2p.storage.messages.DataBroadcastMessage;
import io.bitsquare.p2p.storage.messages.RemoveDataMessage;
import io.bitsquare.p2p.storage.messages.RemoveMailboxDataMessage;
import io.bitsquare.p2p.storage.messages.*;
import io.bitsquare.storage.Storage;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@ -28,7 +25,10 @@ import java.io.File;
import java.io.Serializable;
import java.security.KeyPair;
import java.security.PublicKey;
import java.util.*;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@ -39,14 +39,12 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
private static final Logger log = LoggerFactory.getLogger(P2PDataStorage.class);
@VisibleForTesting
public static int CHECK_TTL_INTERVAL_SEC = new Random().nextInt(60) + (int) TimeUnit.MINUTES.toSeconds(10); // 10-11 min.
//TODO
// public static int CHECK_TTL_INTERVAL_SEC = 10;
public static int CHECK_TTL_INTERVAL_SEC = 30;
private final Broadcaster broadcaster;
private final Map<ByteArray, ProtectedData> map = new ConcurrentHashMap<>();
private final CopyOnWriteArraySet<HashMapChangedListener> hashMapChangedListeners = new CopyOnWriteArraySet<>();
private HashMap<ByteArray, Integer> sequenceNumberMap = new HashMap<>();
private HashMap<ByteArray, Tuple2<Integer, Long>> sequenceNumberMap = new HashMap<>();
private final Storage<HashMap> storage;
private final ScheduledThreadPoolExecutor removeExpiredEntriesExecutor;
@ -63,40 +61,41 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
storage = new Storage<>(storageDir);
removeExpiredEntriesExecutor = Utilities.getScheduledThreadPoolExecutor("removeExpiredEntries", 1, 10, 5);
log.debug("CHECK_TTL_INTERVAL_SEC " + CHECK_TTL_INTERVAL_SEC);
init();
}
private void init() {
HashMap<ByteArray, Integer> persisted = storage.initAndGetPersisted("SequenceNumberMap");
HashMap<ByteArray, Tuple2<Integer, Long>> persisted = storage.initAndGetPersisted("SequenceNumberMap");
if (persisted != null)
sequenceNumberMap = persisted;
sequenceNumberMap = getPurgedSequenceNumberMap(persisted);
removeExpiredEntriesExecutor.scheduleAtFixedRate(() -> UserThread.execute(this::removeExpiredEntries), CHECK_TTL_INTERVAL_SEC, CHECK_TTL_INTERVAL_SEC, TimeUnit.SECONDS);
removeExpiredEntriesExecutor.scheduleAtFixedRate(() -> UserThread.execute(() -> {
log.trace("removeExpiredEntries");
// The moment when an object becomes expired will not be synchronous in the network and we could
// get add messages after the object has expired. To avoid repeated additions of already expired
// object when we get it sent from new peers, we dont remove the sequence number from the map.
// That way an ADD message for an already expired data will fail because the sequence number
// is equal and not larger.
Map<ByteArray, ProtectedData> temp = new HashMap<>(map);
Set<ProtectedData> toRemoveSet = new HashSet<>();
temp.entrySet().stream()
.filter(entry -> entry.getValue().isExpired())
.forEach(entry -> {
ByteArray hashOfPayload = entry.getKey();
toRemoveSet.add(map.get(hashOfPayload));
map.remove(hashOfPayload);
});
toRemoveSet.stream().forEach(
protectedDataToRemove -> hashMapChangedListeners.stream().forEach(
listener -> listener.onRemoved(protectedDataToRemove)));
if (sequenceNumberMap.size() > 1000)
sequenceNumberMap = getPurgedSequenceNumberMap(sequenceNumberMap);
}), CHECK_TTL_INTERVAL_SEC, CHECK_TTL_INTERVAL_SEC, TimeUnit.SECONDS);
}
private void removeExpiredEntries() {
Log.traceCall();
// The moment when an object becomes expired will not be synchronous in the network and we could
// get add messages after the object has expired. To avoid repeated additions of already expired
// object when we get it sent from new peers, we dont remove the sequence number from the map.
// That way an ADD message for an already expired data will fail because the sequence number
// is equal and not larger.
Map<ByteArray, ProtectedData> temp = new HashMap<>(map);
Set<ProtectedData> protectedDataToRemoveSet = new HashSet<>();
temp.entrySet().stream()
.filter(entry -> entry.getValue().isExpired())
.forEach(entry -> {
ByteArray hashOfPayload = entry.getKey();
ProtectedData protectedDataToRemove = map.get(hashOfPayload);
protectedDataToRemoveSet.add(protectedDataToRemove);
map.remove(hashOfPayload);
});
protectedDataToRemoveSet.stream().forEach(
protectedDataToRemove -> hashMapChangedListeners.stream().forEach(
listener -> listener.onRemoved(protectedDataToRemove)));
}
///////////////////////////////////////////////////////////////////////////////////////////
// MessageListener implementation
@ -119,65 +118,40 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// ConnectionListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onConnection(Connection connection) {
}
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
Log.traceCall();
map.values().stream()
.filter(protectedData -> protectedData.expirableMessage instanceof RequiresLiveOwner)
.forEach(protectedData -> removeRequiresLiveOwnerDataOnDisconnect(protectedData, ((RequiresLiveOwner) protectedData.expirableMessage).getOwnerNodeAddress()));
}
if (connection.getPeersNodeAddressOptional().isPresent()) {
map.values().stream()
.forEach(protectedData -> {
ExpirableMessage expirableMessage = protectedData.expirableMessage;
if (expirableMessage instanceof RequiresLiveOwnerData) {
RequiresLiveOwnerData requiresLiveOwnerData = (RequiresLiveOwnerData) expirableMessage;
NodeAddress ownerNodeAddress = requiresLiveOwnerData.getOwnerNodeAddress();
if (ownerNodeAddress.equals(connection.getPeersNodeAddressOptional().get())) {
// We have a RequiresLiveOwnerData data object with the node address of the
// disconnected peer. We remove that data from our map.
public boolean removeRequiresLiveOwnerDataOnDisconnect(ProtectedData protectedData, NodeAddress owner) {
Log.traceCall();
ByteArray hashOfPayload = getHashAsByteArray(protectedData.expirableMessage);
boolean containsKey = map.containsKey(hashOfPayload);
if (containsKey) {
doRemoveProtectedExpirableData(protectedData, hashOfPayload);
//broadcast(new RemoveDataMessage(protectedData), owner);
// sequenceNumberMap.put(hashOfPayload, protectedData.sequenceNumber);
sequenceNumberMap.remove(hashOfPayload);
storage.queueUpForSave(sequenceNumberMap, 5000);
} else {
log.debug("Remove data ignored as we don't have an entry for that data.");
// Check if we have the data (e.g. Offer)
ByteArray hashOfPayload = getHashAsByteArray(expirableMessage);
boolean containsKey = map.containsKey(hashOfPayload);
if (containsKey) {
doRemoveProtectedExpirableData(protectedData, hashOfPayload);
} else {
log.debug("Remove data ignored as we don't have an entry for that data.");
}
}
}
});
}
return containsKey;
}
// If the data owner gets disconnected we remove his data. Used for offers to get clean up when the peer is in
// sleep/hibernate mode or closes the app without proper shutdown (crash).
// We don't want to wait the until the TTL period is over so we add that method to improve usability
public boolean removeLocalDataOnDisconnectedDataOwner(ExpirableMessage expirableMessage) {
Log.traceCall();
ByteArray hashOfPayload = getHashAsByteArray(expirableMessage);
boolean containsKey = map.containsKey(hashOfPayload);
if (containsKey) {
map.remove(hashOfPayload);
log.trace("Data removed from our map.");
StringBuilder sb = new StringBuilder("\n\n------------------------------------------------------------\n" +
"Data set after removeProtectedExpirableData: (truncated)");
map.values().stream().forEach(e -> sb.append("\n").append(StringUtils.abbreviate(e.toString(), 100)));
sb.append("\n------------------------------------------------------------\n");
log.trace(sb.toString());
log.info("Data set after addProtectedExpirableData: size=" + map.values().size());
// sequenceNumberMap.put(hashOfPayload, protectedData.sequenceNumber);
// storage.queueUpForSave(sequenceNumberMap, 5000);
} else {
log.debug("Remove data ignored as we don't have an entry for that data.");
}
return containsKey;
}
@Override
@ -207,6 +181,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
private boolean doAdd(ProtectedData protectedData, @Nullable NodeAddress sender, boolean rePublish) {
Log.traceCall();
ByteArray hashOfPayload = getHashAsByteArray(protectedData.expirableMessage);
boolean result = checkPublicKeys(protectedData, true)
&& checkSignature(protectedData)
@ -222,10 +197,10 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
// Republished data have a larger sequence number. We set the rePublish flag to enable broadcasting
// even we had the data with the old seq nr. already
if (sequenceNumberMap.containsKey(hashOfPayload) &&
protectedData.sequenceNumber > sequenceNumberMap.get(hashOfPayload))
protectedData.sequenceNumber > sequenceNumberMap.get(hashOfPayload).first)
rePublish = true;
sequenceNumberMap.put(hashOfPayload, protectedData.sequenceNumber);
sequenceNumberMap.put(hashOfPayload, new Tuple2<>(protectedData.sequenceNumber, System.currentTimeMillis()));
storage.queueUpForSave(sequenceNumberMap, 5000);
StringBuilder sb = new StringBuilder("\n\n------------------------------------------------------------\n");
@ -262,7 +237,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
broadcast(new RemoveDataMessage(protectedData), sender);
sequenceNumberMap.put(hashOfPayload, protectedData.sequenceNumber);
sequenceNumberMap.put(hashOfPayload, new Tuple2<>(protectedData.sequenceNumber, System.currentTimeMillis()));
storage.queueUpForSave(sequenceNumberMap, 5000);
} else {
log.debug("remove failed");
@ -287,7 +262,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
broadcast(new RemoveMailboxDataMessage(protectedMailboxData), sender);
sequenceNumberMap.put(hashOfData, protectedMailboxData.sequenceNumber);
sequenceNumberMap.put(hashOfData, new Tuple2<>(protectedMailboxData.sequenceNumber, System.currentTimeMillis()));
storage.queueUpForSave(sequenceNumberMap, 5000);
} else {
log.debug("removeMailboxData failed");
@ -302,11 +277,10 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
public ProtectedData getDataWithSignedSeqNr(ExpirableMessage payload, KeyPair ownerStoragePubKey)
throws CryptoException {
Log.traceCall();
ByteArray hashOfData = getHashAsByteArray(payload);
int sequenceNumber;
if (sequenceNumberMap.containsKey(hashOfData))
sequenceNumber = sequenceNumberMap.get(hashOfData) + 1;
sequenceNumber = sequenceNumberMap.get(hashOfData).first + 1;
else
sequenceNumber = 0;
@ -318,11 +292,10 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
public ProtectedMailboxData getMailboxDataWithSignedSeqNr(MailboxMessage expirableMailboxPayload,
KeyPair storageSignaturePubKey, PublicKey receiversPublicKey)
throws CryptoException {
Log.traceCall();
ByteArray hashOfData = getHashAsByteArray(expirableMailboxPayload);
int sequenceNumber;
if (sequenceNumberMap.containsKey(hashOfData))
sequenceNumber = sequenceNumberMap.get(hashOfData) + 1;
sequenceNumber = sequenceNumberMap.get(hashOfData).first + 1;
else
sequenceNumber = 0;
@ -342,7 +315,6 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
///////////////////////////////////////////////////////////////////////////////////////////
private void doRemoveProtectedExpirableData(ProtectedData protectedData, ByteArray hashOfPayload) {
Log.traceCall();
map.remove(hashOfPayload);
log.trace("Data removed from our map. We broadcast the message to our peers.");
hashMapChangedListeners.stream().forEach(e -> e.onRemoved(protectedData));
@ -356,20 +328,22 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
}
private boolean isSequenceNrValid(ProtectedData data, ByteArray hashOfData) {
Log.traceCall();
int newSequenceNumber = data.sequenceNumber;
Integer storedSequenceNumber = sequenceNumberMap.get(hashOfData);
if (sequenceNumberMap.containsKey(hashOfData) && newSequenceNumber < storedSequenceNumber) {
log.trace("Sequence number is invalid. newSequenceNumber="
+ newSequenceNumber + " / storedSequenceNumber=" + storedSequenceNumber);
return false;
if (sequenceNumberMap.containsKey(hashOfData)) {
Integer storedSequenceNumber = sequenceNumberMap.get(hashOfData).first;
if (newSequenceNumber < storedSequenceNumber) {
log.warn("Sequence number is invalid. newSequenceNumber="
+ newSequenceNumber + " / storedSequenceNumber=" + storedSequenceNumber);
return false;
} else {
return true;
}
} else {
return true;
}
}
private boolean checkSignature(ProtectedData data) {
Log.traceCall();
byte[] hashOfDataAndSeqNr = Hash.getHash(new DataAndSeqNrPair(data.expirableMessage, data.sequenceNumber));
try {
boolean result = Sig.verify(data.ownerPubKey, hashOfDataAndSeqNr, data.signature);
@ -385,7 +359,6 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
}
private boolean checkPublicKeys(ProtectedData data, boolean isAddOperation) {
Log.traceCall();
boolean result = false;
if (data.expirableMessage instanceof MailboxMessage) {
MailboxMessage expirableMailboxPayload = (MailboxMessage) data.expirableMessage;
@ -403,7 +376,6 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
}
private boolean checkIfStoredDataPubKeyMatchesNewDataPubKey(ProtectedData data, ByteArray hashOfData) {
Log.traceCall();
ProtectedData storedData = map.get(hashOfData);
boolean result = storedData.ownerPubKey.equals(data.ownerPubKey);
if (!result)
@ -413,7 +385,6 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
}
private boolean checkIfStoredMailboxDataMatchesNewMailboxData(ProtectedMailboxData data, ByteArray hashOfData) {
Log.traceCall();
ProtectedData storedData = map.get(hashOfData);
if (storedData instanceof ProtectedMailboxData) {
ProtectedMailboxData storedMailboxData = (ProtectedMailboxData) storedData;
@ -434,8 +405,18 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
broadcaster.broadcast(message, sender);
}
private ByteArray getHashAsByteArray(ExpirableMessage payload) {
return new ByteArray(Hash.getHash(payload));
private ByteArray getHashAsByteArray(ExpirableMessage data) {
return new ByteArray(Hash.getHash(data));
}
private HashMap<ByteArray, Tuple2<Integer, Long>> getPurgedSequenceNumberMap(HashMap<ByteArray, Tuple2<Integer, Long>> persisted) {
HashMap<ByteArray, Tuple2<Integer, Long>> purged = new HashMap<>();
long maxAgeTs = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(10);
persisted.entrySet().stream().forEach(entry -> {
if (entry.getValue().second > maxAgeTs)
purged.put(entry.getKey(), entry.getValue());
});
return purged;
}

View File

@ -1,7 +1,7 @@
package io.bitsquare.p2p.storage.data;
package io.bitsquare.p2p.storage;
import com.google.common.annotations.VisibleForTesting;
import io.bitsquare.p2p.storage.P2PDataStorage;
import io.bitsquare.p2p.storage.messages.ExpirableMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -1,6 +1,6 @@
package io.bitsquare.p2p.storage.data;
package io.bitsquare.p2p.storage;
import io.bitsquare.p2p.storage.P2PDataStorage;
import io.bitsquare.p2p.storage.messages.MailboxMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -1,7 +1,7 @@
package io.bitsquare.p2p.storage.messages;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.storage.data.ProtectedData;
import io.bitsquare.p2p.storage.ProtectedData;
public final class AddDataMessage extends DataBroadcastMessage {
// That object is sent over the wire, so we need to take care of version compatibility.

View File

@ -1,4 +1,4 @@
package io.bitsquare.p2p.storage.data;
package io.bitsquare.p2p.storage.messages;
import java.io.Serializable;

View File

@ -1,8 +1,9 @@
package io.bitsquare.p2p.storage.data;
package io.bitsquare.p2p.storage.messages;
import io.bitsquare.app.Version;
import io.bitsquare.crypto.PrefixedSealedAndSignedMessage;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.storage.ProtectedData;
import java.security.PublicKey;
import java.util.concurrent.TimeUnit;
@ -30,7 +31,7 @@ public final class MailboxMessage implements ExpirableMessage {
* Used for check if the add operation is permitted.
* senderStoragePublicKey has to be equal to the ownerPubKey of the ProtectedData
*
* @see io.bitsquare.p2p.storage.data.ProtectedData#ownerPubKey
* @see ProtectedData#ownerPubKey
* @see io.bitsquare.p2p.storage.P2PDataStorage#add(ProtectedData, NodeAddress)
*/
public final PublicKey senderPubKeyForAddOperation;
@ -39,7 +40,7 @@ public final class MailboxMessage implements ExpirableMessage {
* Used for check if the remove operation is permitted.
* senderStoragePublicKey has to be equal to the ownerPubKey of the ProtectedData
*
* @see io.bitsquare.p2p.storage.data.ProtectedData#ownerPubKey
* @see ProtectedData#ownerPubKey
* @see io.bitsquare.p2p.storage.P2PDataStorage#remove(ProtectedData, NodeAddress)
*/
public final PublicKey receiverPubKeyForRemoveOperation;

View File

@ -1,7 +1,7 @@
package io.bitsquare.p2p.storage.messages;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.storage.data.ProtectedData;
import io.bitsquare.p2p.storage.ProtectedData;
public final class RemoveDataMessage extends DataBroadcastMessage {
// That object is sent over the wire, so we need to take care of version compatibility.

View File

@ -1,7 +1,7 @@
package io.bitsquare.p2p.storage.messages;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.storage.data.ProtectedMailboxData;
import io.bitsquare.p2p.storage.ProtectedMailboxData;
public final class RemoveMailboxDataMessage extends DataBroadcastMessage {
// That object is sent over the wire, so we need to take care of version compatibility.

View File

@ -1,4 +1,4 @@
package io.bitsquare.p2p.storage.data;
package io.bitsquare.p2p.storage.messages;
import io.bitsquare.p2p.NodeAddress;
@ -10,7 +10,7 @@ import java.io.Serializable;
* This is used for the offers to avoid dead offers in case the offerer is in sleep/hibernate mode or the app has
* terminated without sending the remove message (e.g. in case of a crash).
*/
public interface RequiresLiveOwner extends Serializable {
public interface RequiresLiveOwnerData extends Serializable {
/**
* @return NodeAddress of the data owner
*/

View File

@ -1,6 +1,7 @@
package io.bitsquare.p2p.storage.data;
package io.bitsquare.p2p.storage.messages;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.storage.ProtectedData;
import java.security.PublicKey;
@ -19,7 +20,7 @@ public interface StorageMessage extends ExpirableMessage {
* OwnerPubKey has to be equal to the ownerPubKey of the ProtectedData
*
* @return The public key of the data owner.
* @see io.bitsquare.p2p.storage.data.ProtectedData#ownerPubKey
* @see ProtectedData#ownerPubKey
* @see io.bitsquare.p2p.storage.P2PDataStorage#add(ProtectedData, NodeAddress)
* @see io.bitsquare.p2p.storage.P2PDataStorage#remove(ProtectedData, NodeAddress)
*/

View File

@ -11,7 +11,7 @@ import io.bitsquare.p2p.network.LocalhostNetworkNode;
import io.bitsquare.p2p.peers.PeerManager;
import io.bitsquare.p2p.seed.SeedNode;
import io.bitsquare.p2p.storage.P2PDataStorage;
import io.bitsquare.p2p.storage.data.ProtectedData;
import io.bitsquare.p2p.storage.ProtectedData;
import io.bitsquare.p2p.storage.mocks.MockData;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.junit.*;
@ -25,6 +25,7 @@ import java.security.cert.CertificateException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
// TorNode created. Took 6 sec.
// Hidden service created. Took 40-50 sec.
@ -157,7 +158,11 @@ public class P2PServiceTest {
Assert.assertEquals(1, p2PService3.getDataMap().size());
// try to manipulate seq nr. -> fails
ProtectedData origProtectedData = p2PService3.getDataMap().values().stream().findFirst().get();
Set<ProtectedData> dataSet = p2PService3.getDataMap().values().stream()
.filter(data -> data instanceof ProtectedData)
.map(data -> (ProtectedData) data)
.collect(Collectors.toSet());
ProtectedData origProtectedData = dataSet.stream().findFirst().get();
ProtectedData protectedDataManipulated = new ProtectedData(origProtectedData.expirableMessage, origProtectedData.ttl, origProtectedData.ownerPubKey, origProtectedData.sequenceNumber + 1, origProtectedData.signature);
Assert.assertFalse(p2PService3.removeData(protectedDataManipulated.expirableMessage));
Thread.sleep(sleepTime);

View File

@ -3,7 +3,7 @@ package io.bitsquare.p2p.mocks;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.messaging.MailboxMessage;
import io.bitsquare.p2p.storage.data.ExpirableMessage;
import io.bitsquare.p2p.storage.messages.ExpirableMessage;
public final class MockMailboxMessage implements MailboxMessage, ExpirableMessage {
private final int messageVersion = Version.getP2PMessageVersion();

View File

@ -2,7 +2,7 @@ package io.bitsquare.p2p.mocks;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.storage.data.ExpirableMessage;
import io.bitsquare.p2p.storage.messages.ExpirableMessage;
public final class MockMessage implements Message, ExpirableMessage {
public final String msg;

View File

@ -11,9 +11,7 @@ import io.bitsquare.p2p.mocks.MockMessage;
import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.peers.Broadcaster;
import io.bitsquare.p2p.peers.PeerManager;
import io.bitsquare.p2p.storage.data.MailboxMessage;
import io.bitsquare.p2p.storage.data.ProtectedData;
import io.bitsquare.p2p.storage.data.ProtectedMailboxData;
import io.bitsquare.p2p.storage.messages.MailboxMessage;
import io.bitsquare.p2p.storage.mocks.MockData;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.junit.*;

View File

@ -1,6 +1,6 @@
package io.bitsquare.p2p.storage.mocks;
import io.bitsquare.p2p.storage.data.StorageMessage;
import io.bitsquare.p2p.storage.messages.StorageMessage;
import java.security.PublicKey;

View File

@ -3,7 +3,6 @@ package io.bitsquare.p2p.seed;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.bitsquare.app.BitsquareEnvironment;
import io.bitsquare.common.UserThread;
import io.bitsquare.trade.offer.OfferBookService;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -15,8 +14,7 @@ import java.util.concurrent.ThreadFactory;
public class SeedNodeMain {
private static final Logger log = LoggerFactory.getLogger(SeedNodeMain.class);
public static final boolean USE_DETAILED_LOGGING = true;
private OfferBookService offerBookService;
private static final boolean USE_DETAILED_LOGGING = true;
private SeedNode seedNode;
@ -26,7 +24,7 @@ public class SeedNodeMain {
new SeedNodeMain(args);
}
public SeedNodeMain(String[] args) throws InterruptedException {
private SeedNodeMain(String[] args) throws InterruptedException {
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("SeedNodeMain")
.setDaemon(true)
@ -50,10 +48,6 @@ public class SeedNodeMain {
seedNode = new SeedNode(BitsquareEnvironment.defaultUserDataDir());
seedNode.processArgs(args);
seedNode.createAndStartP2PService(USE_DETAILED_LOGGING);
// We need the offerbook service to handle the case when the offerer is in sleep/hibernate mode and
// we want to remove his offers and not wait until TTL is over.
offerBookService = new OfferBookService(seedNode.getSeedNodeP2PService());
} catch (Throwable t) {
log.error("Executing task failed. " + t.getMessage());
t.printStackTrace();