diff --git a/core/src/main/java/io/bitsquare/arbitration/ArbitratorMessageService.java b/core/src/main/java/io/bitsquare/arbitration/ArbitratorService.java similarity index 91% rename from core/src/main/java/io/bitsquare/arbitration/ArbitratorMessageService.java rename to core/src/main/java/io/bitsquare/arbitration/ArbitratorService.java index fc206021f4..75380eb1e7 100644 --- a/core/src/main/java/io/bitsquare/arbitration/ArbitratorMessageService.java +++ b/core/src/main/java/io/bitsquare/arbitration/ArbitratorService.java @@ -19,11 +19,12 @@ package io.bitsquare.arbitration; import io.bitsquare.arbitration.listeners.ArbitratorListener; +import io.bitsquare.network.DHTService; import java.util.Locale; import java.util.concurrent.Executor; -public interface ArbitratorMessageService { +public interface ArbitratorService extends DHTService { void setExecutor(Executor executor); void addArbitrator(Arbitrator arbitrator); diff --git a/core/src/main/java/io/bitsquare/arbitration/tomp2p/TomP2PArbitratorMessageModule.java b/core/src/main/java/io/bitsquare/arbitration/tomp2p/TomP2PArbitratorMessageModule.java index 47cbec85b1..d2d6d3dfcf 100644 --- a/core/src/main/java/io/bitsquare/arbitration/tomp2p/TomP2PArbitratorMessageModule.java +++ b/core/src/main/java/io/bitsquare/arbitration/tomp2p/TomP2PArbitratorMessageModule.java @@ -18,17 +18,10 @@ package io.bitsquare.arbitration.tomp2p; import io.bitsquare.arbitration.ArbitratorMessageModule; -import io.bitsquare.arbitration.ArbitratorMessageService; -import io.bitsquare.network.tomp2p.TomP2PNode; +import io.bitsquare.arbitration.ArbitratorService; -import com.google.inject.Injector; -import com.google.inject.Provider; import com.google.inject.Singleton; -import javax.inject.Inject; - -import javafx.application.Platform; - import org.springframework.core.env.Environment; public class TomP2PArbitratorMessageModule extends ArbitratorMessageModule { @@ -39,25 +32,6 @@ public class TomP2PArbitratorMessageModule extends ArbitratorMessageModule { @Override protected void doConfigure() { - bind(ArbitratorMessageService.class).toProvider(ArbitratorMessageServiceProvider.class).in(Singleton.class); - } - - @Override - protected void doClose(Injector injector) { - super.doClose(injector); + bind(ArbitratorService.class).to(TomP2PArbitratorService.class).in(Singleton.class); } } - -class ArbitratorMessageServiceProvider implements Provider { - private final ArbitratorMessageService arbitratorMessageService; - - @Inject - public ArbitratorMessageServiceProvider(TomP2PNode tomP2PNode) { - arbitratorMessageService = new TomP2PArbitratorMessageService(tomP2PNode); - arbitratorMessageService.setExecutor(Platform::runLater); - } - - public ArbitratorMessageService get() { - return arbitratorMessageService; - } -} \ No newline at end of file diff --git a/core/src/main/java/io/bitsquare/arbitration/tomp2p/TomP2PArbitratorMessageService.java b/core/src/main/java/io/bitsquare/arbitration/tomp2p/TomP2PArbitratorService.java similarity index 90% rename from core/src/main/java/io/bitsquare/arbitration/tomp2p/TomP2PArbitratorMessageService.java rename to core/src/main/java/io/bitsquare/arbitration/tomp2p/TomP2PArbitratorService.java index 8c1ac58367..c9ecd8892b 100644 --- a/core/src/main/java/io/bitsquare/arbitration/tomp2p/TomP2PArbitratorMessageService.java +++ b/core/src/main/java/io/bitsquare/arbitration/tomp2p/TomP2PArbitratorService.java @@ -18,8 +18,9 @@ package io.bitsquare.arbitration.tomp2p; import io.bitsquare.arbitration.Arbitrator; -import io.bitsquare.arbitration.ArbitratorMessageService; +import io.bitsquare.arbitration.ArbitratorService; import io.bitsquare.arbitration.listeners.ArbitratorListener; +import io.bitsquare.network.tomp2p.TomP2PDHTService; import io.bitsquare.network.tomp2p.TomP2PNode; import java.io.IOException; @@ -27,7 +28,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Locale; -import java.util.concurrent.Executor; + +import javax.inject.Inject; import net.tomp2p.dht.FutureGet; import net.tomp2p.dht.FuturePut; @@ -41,29 +43,24 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class TomP2PArbitratorMessageService implements ArbitratorMessageService { - private static final Logger log = LoggerFactory.getLogger(TomP2PArbitratorMessageService.class); +public class TomP2PArbitratorService extends TomP2PDHTService implements ArbitratorService { + private static final Logger log = LoggerFactory.getLogger(TomP2PArbitratorService.class); private static final String ARBITRATORS_ROOT = "ArbitratorsRoot"; - private final TomP2PNode tomP2PNode; private final List arbitratorListeners = new ArrayList<>(); - private Executor executor; /////////////////////////////////////////////////////////////////////////////////////////// // Constructor /////////////////////////////////////////////////////////////////////////////////////////// - public TomP2PArbitratorMessageService(TomP2PNode tomP2PNode) { - this.tomP2PNode = tomP2PNode; + @Inject + public TomP2PArbitratorService(TomP2PNode tomP2PNode) { + super(tomP2PNode); } - public void setExecutor(Executor executor) { - this.executor = executor; - } - /////////////////////////////////////////////////////////////////////////////////////////// // Arbitrators /////////////////////////////////////////////////////////////////////////////////////////// @@ -73,7 +70,7 @@ public class TomP2PArbitratorMessageService implements ArbitratorMessageService try { final Data arbitratorData = new Data(arbitrator); - FuturePut addFuture = tomP2PNode.addProtectedData(locationKey, arbitratorData); + FuturePut addFuture = addProtectedData(locationKey, arbitratorData); addFuture.addListener(new BaseFutureAdapter() { @Override public void operationComplete(BaseFuture future) throws Exception { @@ -107,7 +104,7 @@ public class TomP2PArbitratorMessageService implements ArbitratorMessageService public void removeArbitrator(Arbitrator arbitrator) throws IOException { Number160 locationKey = Number160.createHash(ARBITRATORS_ROOT); final Data arbitratorData = new Data(arbitrator); - FutureRemove removeFuture = tomP2PNode.removeFromDataMap(locationKey, arbitratorData); + FutureRemove removeFuture = removeFromDataMap(locationKey, arbitratorData); removeFuture.addListener(new BaseFutureAdapter() { @Override public void operationComplete(BaseFuture future) throws Exception { @@ -138,7 +135,7 @@ public class TomP2PArbitratorMessageService implements ArbitratorMessageService public void getArbitrators(Locale languageLocale) { Number160 locationKey = Number160.createHash(ARBITRATORS_ROOT); - FutureGet futureGet = tomP2PNode.getDataMap(locationKey); + FutureGet futureGet = getDataMap(locationKey); futureGet.addListener(new BaseFutureAdapter() { @Override public void operationComplete(BaseFuture future) throws Exception { diff --git a/core/src/main/java/io/bitsquare/gui/bitsquare.css b/core/src/main/java/io/bitsquare/gui/bitsquare.css index 47f86337b4..5bc912ff02 100644 --- a/core/src/main/java/io/bitsquare/gui/bitsquare.css +++ b/core/src/main/java/io/bitsquare/gui/bitsquare.css @@ -158,7 +158,13 @@ textfield */ -fx-progress-color: dimgrey; } -/* .table-view */ + +/******************************************************************************* + * * + * Table * + * * + ******************************************************************************/ + .table-view .table-cell { -fx-alignment: center; } @@ -175,6 +181,15 @@ textfield */ -fx-fill: black; } +.table-view:focused { + /*-fx-background-color: transparent;*/ +} +.table-view:focused { + -fx-background-color: -fx-box-border, -fx-control-inner-background; + -fx-background-insets: 0, 1; + -fx-padding: 1; +} + .table-view .table-row-cell:selected .table-row-cell:row-selection .table-row-cell:cell-selection .text { -fx-fill: white; } @@ -215,10 +230,12 @@ textfield */ -fx-fill: black; } -#form-header-text { - -fx-font-weight: bold; - -fx-font-size: 14; -} + +/******************************************************************************* + * * + * Icons * + * * + ******************************************************************************/ #non-clickable-icon { -fx-text-fill: #AAAAAA; @@ -233,11 +250,13 @@ textfield */ -fx-text-fill: #666666; } -#form-title { - -fx-font-weight: bold; -} -/* tab pane */ +/******************************************************************************* + * * + * Tab pane * + * * + ******************************************************************************/ + .tab-pane .tab-label { -fx-font-size: 15; } @@ -254,11 +273,15 @@ textfield */ -fx-background-color: transparent; } -/* table-view */ -.table-view:focused { - -fx-background-color: transparent; + +#form-header-text { + -fx-font-weight: bold; + -fx-font-size: 14; } +#form-title { + -fx-font-weight: bold; +} /* scroll-pane */ .scroll-pane { diff --git a/core/src/main/java/io/bitsquare/gui/main/MainViewModel.java b/core/src/main/java/io/bitsquare/gui/main/MainViewModel.java index 964d50a7ae..3460c8fd9b 100644 --- a/core/src/main/java/io/bitsquare/gui/main/MainViewModel.java +++ b/core/src/main/java/io/bitsquare/gui/main/MainViewModel.java @@ -19,7 +19,7 @@ package io.bitsquare.gui.main; import io.bitsquare.app.UpdateProcess; import io.bitsquare.arbitration.Arbitrator; -import io.bitsquare.arbitration.ArbitratorMessageService; +import io.bitsquare.arbitration.ArbitratorService; import io.bitsquare.arbitration.Reputation; import io.bitsquare.btc.BitcoinNetwork; import io.bitsquare.btc.WalletService; @@ -103,7 +103,7 @@ class MainViewModel implements ViewModel { private final WalletService walletService; private final ClientNode clientNode; private MessageService messageService; - private ArbitratorMessageService arbitratorMessageService; + private ArbitratorService arbitratorService; private final TradeManager tradeManager; private UpdateProcess updateProcess; private final BSFormatter formatter; @@ -112,14 +112,14 @@ class MainViewModel implements ViewModel { @Inject public MainViewModel(User user, WalletService walletService, ClientNode clientNode, MessageService messageService, - ArbitratorMessageService arbitratorMessageService, + ArbitratorService arbitratorService, TradeManager tradeManager, BitcoinNetwork bitcoinNetwork, UpdateProcess updateProcess, BSFormatter formatter, Persistence persistence, AccountSettings accountSettings) { this.user = user; this.walletService = walletService; this.clientNode = clientNode; this.messageService = messageService; - this.arbitratorMessageService = arbitratorMessageService; + this.arbitratorService = arbitratorService; this.tradeManager = tradeManager; this.updateProcess = updateProcess; this.formatter = formatter; @@ -157,9 +157,9 @@ class MainViewModel implements ViewModel { error -> log.error(error.toString()), () -> Platform.runLater(() -> setBitcoinNetworkSyncProgress(1.0))); - Observable messageObservable = clientNode.bootstrap(user.getMessageKeyPair(), messageService); - messageObservable.publish(); - messageObservable.subscribe( + Observable bootstrapStateAsObservable = clientNode.bootstrap(user.getMessageKeyPair(), messageService); + bootstrapStateAsObservable.publish(); + bootstrapStateAsObservable.subscribe( state -> Platform.runLater(() -> setBootstrapState(state)), error -> Platform.runLater(() -> { log.error(error.toString()); @@ -194,7 +194,7 @@ class MainViewModel implements ViewModel { log.trace("updateProcess completed"); }); - Observable allServices = Observable.merge(messageObservable, walletServiceObservable, updateProcessObservable); + Observable allServices = Observable.merge(bootstrapStateAsObservable, walletServiceObservable, updateProcessObservable); allServices.subscribe( next -> { }, @@ -376,7 +376,7 @@ class MainViewModel implements ViewModel { accountSettings.addAcceptedArbitrator(arbitrator); persistence.write(accountSettings); - arbitratorMessageService.addArbitrator(arbitrator); + arbitratorService.addArbitrator(arbitrator); } } } diff --git a/core/src/main/java/io/bitsquare/gui/main/account/arbitrator/browser/ArbitratorBrowserView.java b/core/src/main/java/io/bitsquare/gui/main/account/arbitrator/browser/ArbitratorBrowserView.java index ac80ca4131..87b0c95668 100644 --- a/core/src/main/java/io/bitsquare/gui/main/account/arbitrator/browser/ArbitratorBrowserView.java +++ b/core/src/main/java/io/bitsquare/gui/main/account/arbitrator/browser/ArbitratorBrowserView.java @@ -18,7 +18,7 @@ package io.bitsquare.gui.main.account.arbitrator.browser; import io.bitsquare.arbitration.Arbitrator; -import io.bitsquare.arbitration.ArbitratorMessageService; +import io.bitsquare.arbitration.ArbitratorService; import io.bitsquare.arbitration.listeners.ArbitratorListener; import io.bitsquare.common.viewfx.view.ActivatableView; import io.bitsquare.common.viewfx.view.CachingViewLoader; @@ -55,11 +55,11 @@ public class ArbitratorBrowserView extends ActivatableView implement private final ViewLoader viewLoader; private final AccountSettings accountSettings; private final Persistence persistence; - private final ArbitratorMessageService messageService; + private final ArbitratorService messageService; @Inject public ArbitratorBrowserView(CachingViewLoader viewLoader, AccountSettings accountSettings, Persistence persistence, - ArbitratorMessageService messageService) { + ArbitratorService messageService) { this.viewLoader = viewLoader; this.accountSettings = accountSettings; this.persistence = persistence; diff --git a/core/src/main/java/io/bitsquare/gui/main/account/arbitrator/registration/ArbitratorRegistrationView.java b/core/src/main/java/io/bitsquare/gui/main/account/arbitrator/registration/ArbitratorRegistrationView.java index 9611391644..dcbcef01c0 100644 --- a/core/src/main/java/io/bitsquare/gui/main/account/arbitrator/registration/ArbitratorRegistrationView.java +++ b/core/src/main/java/io/bitsquare/gui/main/account/arbitrator/registration/ArbitratorRegistrationView.java @@ -18,7 +18,7 @@ package io.bitsquare.gui.main.account.arbitrator.registration; import io.bitsquare.arbitration.Arbitrator; -import io.bitsquare.arbitration.ArbitratorMessageService; +import io.bitsquare.arbitration.ArbitratorService; import io.bitsquare.arbitration.Reputation; import io.bitsquare.btc.WalletService; import io.bitsquare.common.viewfx.view.ActivatableView; @@ -84,13 +84,13 @@ public class ArbitratorRegistrationView extends ActivatableView languageList = FXCollections.observableArrayList(); final ObservableList countryList = FXCollections.observableArrayList(); @@ -60,7 +60,7 @@ class RestrictionsDataModel implements Activatable, DataModel { @Inject public RestrictionsDataModel(User user, AccountSettings accountSettings, Persistence persistence, - ArbitratorMessageService messageService) { + ArbitratorService messageService) { this.user = user; this.accountSettings = accountSettings; this.persistence = persistence; diff --git a/core/src/main/java/io/bitsquare/gui/main/portfolio/pending/PendingTradesView.java b/core/src/main/java/io/bitsquare/gui/main/portfolio/pending/PendingTradesView.java index eebc23f434..7cecd3e757 100644 --- a/core/src/main/java/io/bitsquare/gui/main/portfolio/pending/PendingTradesView.java +++ b/core/src/main/java/io/bitsquare/gui/main/portfolio/pending/PendingTradesView.java @@ -504,7 +504,9 @@ public class PendingTradesView extends ActivatableViewAndModel { withdrawAddressTextField.requestFocus(); - scrollPane.setVvalue(scrollPane.getVmax()); + + // delay it once more as it does not get applied at first runLater + Platform.runLater(() -> scrollPane.setVvalue(scrollPane.getVmax())); }); } diff --git a/core/src/main/java/io/bitsquare/network/AddressService.java b/core/src/main/java/io/bitsquare/network/AddressService.java index ea0b3c7975..109b563fd8 100644 --- a/core/src/main/java/io/bitsquare/network/AddressService.java +++ b/core/src/main/java/io/bitsquare/network/AddressService.java @@ -22,10 +22,6 @@ import io.bitsquare.network.listener.GetPeerAddressListener; import java.security.PublicKey; -import java.util.concurrent.Executor; - -public interface AddressService { - void setExecutor(Executor executor); - +public interface AddressService extends DHTService{ void findPeerAddress(PublicKey messagePublicKey, GetPeerAddressListener getPeerAddressListener); } diff --git a/core/src/main/java/io/bitsquare/network/DHTService.java b/core/src/main/java/io/bitsquare/network/DHTService.java new file mode 100644 index 0000000000..d5325851b6 --- /dev/null +++ b/core/src/main/java/io/bitsquare/network/DHTService.java @@ -0,0 +1,43 @@ +/* + * This file is part of Bitsquare. + * + * Bitsquare is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bitsquare is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bitsquare. If not, see . + */ + +package io.bitsquare.network; + +import java.security.PublicKey; + +import net.tomp2p.dht.FutureGet; +import net.tomp2p.dht.FuturePut; +import net.tomp2p.dht.FutureRemove; +import net.tomp2p.peers.Number160; +import net.tomp2p.storage.Data; + +public interface DHTService extends NetworkService { + + FuturePut putDomainProtectedData(Number160 locationKey, Data data); + + FuturePut putData(Number160 locationKey, Data data); + + FutureGet getDomainProtectedData(Number160 locationKey, PublicKey publicKey); + + FutureGet getData(Number160 locationKey); + + FuturePut addProtectedData(Number160 locationKey, Data data); + + FutureRemove removeFromDataMap(Number160 locationKey, Data data); + + FutureGet getDataMap(Number160 locationKey); +} diff --git a/core/src/main/java/io/bitsquare/network/MessageService.java b/core/src/main/java/io/bitsquare/network/MessageService.java index 86c736c61c..9cae479de1 100644 --- a/core/src/main/java/io/bitsquare/network/MessageService.java +++ b/core/src/main/java/io/bitsquare/network/MessageService.java @@ -20,11 +20,7 @@ package io.bitsquare.network; import io.bitsquare.network.listener.SendMessageListener; -import java.util.concurrent.Executor; - -public interface MessageService extends MessageHandler { - - void setExecutor(Executor executor); +public interface MessageService extends NetworkService, MessageHandler { void sendMessage(Peer peer, Message message, SendMessageListener listener); diff --git a/core/src/main/java/io/bitsquare/network/NetworkService.java b/core/src/main/java/io/bitsquare/network/NetworkService.java new file mode 100644 index 0000000000..f923ad2227 --- /dev/null +++ b/core/src/main/java/io/bitsquare/network/NetworkService.java @@ -0,0 +1,28 @@ +/* + * This file is part of Bitsquare. + * + * Bitsquare is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bitsquare is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bitsquare. If not, see . + */ + +package io.bitsquare.network; + +import java.util.concurrent.Executor; + +public interface NetworkService { + void setExecutor(Executor executor); + + void bootstrapCompleted(); + + void shutDown(); +} diff --git a/core/src/main/java/io/bitsquare/network/tomp2p/TomP2PAddressService.java b/core/src/main/java/io/bitsquare/network/tomp2p/TomP2PAddressService.java index 906a3e4dce..e5c5362948 100644 --- a/core/src/main/java/io/bitsquare/network/tomp2p/TomP2PAddressService.java +++ b/core/src/main/java/io/bitsquare/network/tomp2p/TomP2PAddressService.java @@ -18,50 +18,69 @@ package io.bitsquare.network.tomp2p; import io.bitsquare.network.AddressService; -import io.bitsquare.network.MessageHandler; +import io.bitsquare.network.NetworkException; import io.bitsquare.network.Peer; import io.bitsquare.network.listener.GetPeerAddressListener; +import io.bitsquare.user.User; + +import java.io.IOException; import java.security.PublicKey; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executor; +import java.util.Timer; +import java.util.TimerTask; + +import javax.inject.Inject; import net.tomp2p.dht.FutureGet; +import net.tomp2p.dht.FuturePut; import net.tomp2p.futures.BaseFuture; import net.tomp2p.futures.BaseFutureAdapter; +import net.tomp2p.futures.BaseFutureListener; import net.tomp2p.peers.Number160; +import net.tomp2p.peers.PeerAddress; +import net.tomp2p.storage.Data; import net.tomp2p.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -/** - * That service delivers direct messaging and DHT functionality from the TomP2P library - * It is the translating domain specific functionality to the messaging layer. - * The TomP2P library codebase shall not be used outside that service. - * That way we limit the dependency of the TomP2P library only to that class (and it's sub components). - *

- */ -public class TomP2PAddressService implements AddressService { +public class TomP2PAddressService extends TomP2PDHTService implements AddressService { private static final Logger log = LoggerFactory.getLogger(TomP2PAddressService.class); + private static final int IP_CHECK_PERIOD = 2 * 60 * 1000; // Cheap call if nothing changes, so set it short to 2 min. + private static final int STORE_ADDRESS_PERIOD = 5 * 60 * 1000; // Save every 5 min. + private static final int ADDRESS_TTL = STORE_ADDRESS_PERIOD * 2; // TTL 10 min. - private final TomP2PNode tomP2PNode; - private final CopyOnWriteArrayList messageHandlers = new CopyOnWriteArrayList<>(); - private Executor executor; + private final Number160 locationKey; + private PeerAddress storedPeerAddress; + private Timer timerForStoreAddress; + private Timer timerForIPCheck; /////////////////////////////////////////////////////////////////////////////////////////// // Constructor /////////////////////////////////////////////////////////////////////////////////////////// - public TomP2PAddressService(TomP2PNode tomP2PNode) { - this.tomP2PNode = tomP2PNode; + @Inject + public TomP2PAddressService(TomP2PNode tomP2PNode, User user) { + super(tomP2PNode); + + locationKey = Utils.makeSHAHash(user.getMessageKeyPair().getPublic().getEncoded()); } - public void setExecutor(Executor executor) { - this.executor = executor; + @Override + public void bootstrapCompleted() { + setupTimerForIPCheck(); + setupTimerForStoreAddress(); + storeAddress(); + } + + @Override + public void shutDown() { + timerForIPCheck.cancel(); + timerForStoreAddress.cancel(); + removeAddress(); + super.shutDown(); } @@ -72,7 +91,7 @@ public class TomP2PAddressService implements AddressService { @Override public void findPeerAddress(PublicKey publicKey, GetPeerAddressListener listener) { final Number160 locationKey = Utils.makeSHAHash(publicKey.getEncoded()); - FutureGet futureGet = tomP2PNode.getDomainProtectedData(locationKey, publicKey); + FutureGet futureGet = getDomainProtectedData(locationKey, publicKey); log.trace("findPeerAddress called"); futureGet.addListener(new BaseFutureAdapter() { @Override @@ -90,4 +109,73 @@ public class TomP2PAddressService implements AddressService { }); } + + /////////////////////////////////////////////////////////////////////////////////////////// + // Private + /////////////////////////////////////////////////////////////////////////////////////////// + + private void setupTimerForStoreAddress() { + timerForStoreAddress = new Timer(); + timerForStoreAddress.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + if (storedPeerAddress != null && peerDHT != null && !storedPeerAddress.equals(peerDHT.peerAddress())) + storeAddress(); + } + }, STORE_ADDRESS_PERIOD, STORE_ADDRESS_PERIOD); + } + + private void setupTimerForIPCheck() { + timerForIPCheck = new Timer(); + timerForIPCheck.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + if (storedPeerAddress != null && peerDHT != null && !storedPeerAddress.equals(peerDHT.peerAddress())) + storeAddress(); + } + }, IP_CHECK_PERIOD, IP_CHECK_PERIOD); + } + + private void storeAddress() { + try { + Data data = new Data(new TomP2PPeer(peerDHT.peerAddress())); + // We set a short time-to-live to make getAddress checks fail fast in case if the offerer is offline and to support cheap offerbook state updates + data.ttlSeconds(ADDRESS_TTL); + log.debug("storePeerAddress " + peerDHT.peerAddress().toString()); + FuturePut futurePut = putDomainProtectedData(locationKey, data); + futurePut.addListener(new BaseFutureListener() { + @Override + public void operationComplete(BaseFuture future) throws Exception { + if (future.isSuccess()) { + storedPeerAddress = peerDHT.peerAddress(); + log.debug("storedPeerAddress = " + storedPeerAddress); + } + else { + log.error("storedPeerAddress not successful"); + throw new NetworkException("Storing address was not successful. Reason: " + future.failedReason()); + } + } + + @Override + public void exceptionCaught(Throwable t) throws Exception { + log.error("Exception at storedPeerAddress " + t.toString()); + throw new NetworkException("Exception at storeAddress.", t); + } + }); + } catch (IOException e) { + e.printStackTrace(); + log.error("Exception at storePeerAddress " + e.toString()); + } + } + + private void removeAddress() { + try { + Data data = new Data(new TomP2PPeer(peerDHT.peerAddress())); + removeFromDataMap(locationKey, data).awaitUninterruptibly(); + } catch (IOException e) { + e.printStackTrace(); + log.error("Exception at removeAddress " + e.toString()); + } + } + } diff --git a/core/src/main/java/io/bitsquare/network/tomp2p/TomP2PDHTService.java b/core/src/main/java/io/bitsquare/network/tomp2p/TomP2PDHTService.java new file mode 100644 index 0000000000..1c85731b72 --- /dev/null +++ b/core/src/main/java/io/bitsquare/network/tomp2p/TomP2PDHTService.java @@ -0,0 +1,175 @@ +/* + * This file is part of Bitsquare. + * + * Bitsquare is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bitsquare is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bitsquare. If not, see . + */ + +package io.bitsquare.network.tomp2p; + +import io.bitsquare.network.DHTService; + +import java.security.PublicKey; + +import javax.inject.Inject; + +import net.tomp2p.dht.FutureGet; +import net.tomp2p.dht.FuturePut; +import net.tomp2p.dht.FutureRemove; +import net.tomp2p.peers.Number160; +import net.tomp2p.storage.Data; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TomP2PDHTService extends TomP2PService implements DHTService { + private static final Logger log = LoggerFactory.getLogger(TomP2PDHTService.class); + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Constructor + /////////////////////////////////////////////////////////////////////////////////////////// + + @Inject + public TomP2PDHTService(TomP2PNode tomP2PNode) { + super(tomP2PNode); + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // DHT methods + /////////////////////////////////////////////////////////////////////////////////////////// + + // TODO remove all security features for the moment. There are some problems with a "wrong signature!" msg in + // the logs + @Override + public FuturePut putDomainProtectedData(Number160 locationKey, Data data) { + log.trace("putDomainProtectedData"); + return peerDHT.put(locationKey).data(data).start(); + } + + @Override + public FuturePut putData(Number160 locationKey, Data data) { + log.trace("putData"); + return peerDHT.put(locationKey).data(data).start(); + } + + @Override + public FutureGet getDomainProtectedData(Number160 locationKey, PublicKey publicKey) { + log.trace("getDomainProtectedData"); + return peerDHT.get(locationKey).start(); + } + + @Override + public FutureGet getData(Number160 locationKey) { + //log.trace("getData"); + return peerDHT.get(locationKey).start(); + } + + @Override + public FuturePut addProtectedData(Number160 locationKey, Data data) { + log.trace("addProtectedData"); + return peerDHT.add(locationKey).data(data).start(); + } + + @Override + public FutureRemove removeFromDataMap(Number160 locationKey, Data data) { + Number160 contentKey = data.hash(); + log.trace("removeFromDataMap with contentKey " + contentKey.toString()); + return peerDHT.remove(locationKey).contentKey(contentKey).start(); + } + + @Override + public FutureGet getDataMap(Number160 locationKey) { + log.trace("getDataMap"); + return peerDHT.get(locationKey).all().start(); + } + + +// +// public FuturePut putDomainProtectedData(Number160 locationKey, Data data) { +// log.trace("putDomainProtectedData"); +// data.protectEntry(keyPair); +// final Number160 ownerKeyHash = Utils.makeSHAHash(keyPair.getPublic().getEncoded()); +// return peerDHT.put(locationKey).data(data).keyPair(keyPair).domainKey(ownerKeyHash).protectDomain().start(); +// } +// +// // No protection, everybody can write. +// public FuturePut putData(Number160 locationKey, Data data) { +// log.trace("putData"); +// return peerDHT.put(locationKey).data(data).start(); +// } +// +// // Not public readable. Only users with the public key of the peer who stored the data can read that data +// public FutureGet getDomainProtectedData(Number160 locationKey, PublicKey publicKey) { +// log.trace("getDomainProtectedData"); +// final Number160 ownerKeyHash = Utils.makeSHAHash(publicKey.getEncoded()); +// return peerDHT.get(locationKey).domainKey(ownerKeyHash).start(); +// } +// +// // No protection, everybody can read. +// public FutureGet getData(Number160 locationKey) { +// log.trace("getData"); +// return peerDHT.get(locationKey).start(); +// } +// +// // No domain protection, but entry protection +// public FuturePut addProtectedData(Number160 locationKey, Data data) { +// log.trace("addProtectedData"); +// data.protectEntry(keyPair); +// log.trace("addProtectedData with contentKey " + data.hash().toString()); +// return peerDHT.add(locationKey).data(data).keyPair(keyPair).start(); +// } +// +// // No domain protection, but entry protection +// public FutureRemove removeFromDataMap(Number160 locationKey, Data data) { +// log.trace("removeFromDataMap"); +// Number160 contentKey = data.hash(); +// log.trace("removeFromDataMap with contentKey " + contentKey.toString()); +// return peerDHT.remove(locationKey).contentKey(contentKey).keyPair(keyPair).start(); +// } +// +// // Public readable +// public FutureGet getDataMap(Number160 locationKey) { +// log.trace("getDataMap"); +// return peerDHT.get(locationKey).all().start(); +// } + + // Send signed payLoad to peer +// public FutureDirect sendData(PeerAddress peerAddress, Object payLoad) { +// // use 30 seconds as max idle time before connection get closed +// FuturePeerConnection futurePeerConnection = peerDHT.peer().createPeerConnection(peerAddress, 30000); +// FutureDirect futureDirect = peerDHT.peer().sendDirect(futurePeerConnection).object(payLoad).sign().start(); +// futureDirect.addListener(new BaseFutureListener() { +// @Override +// public void operationComplete(BaseFuture future) throws Exception { +// if (futureDirect.isSuccess()) { +// log.debug("sendMessage completed"); +// } +// else { +// log.error("sendData failed with Reason " + futureDirect.failedReason()); +// } +// } +// +// @Override +// public void exceptionCaught(Throwable t) throws Exception { +// log.error("Exception at sendData " + t.toString()); +// } +// }); +// +// return futureDirect; +// } +// + + +} diff --git a/core/src/main/java/io/bitsquare/network/tomp2p/TomP2PMessageService.java b/core/src/main/java/io/bitsquare/network/tomp2p/TomP2PMessageService.java index b009418e55..568b3eef5a 100644 --- a/core/src/main/java/io/bitsquare/network/tomp2p/TomP2PMessageService.java +++ b/core/src/main/java/io/bitsquare/network/tomp2p/TomP2PMessageService.java @@ -24,7 +24,8 @@ import io.bitsquare.network.Peer; import io.bitsquare.network.listener.SendMessageListener; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executor; + +import javax.inject.Inject; import net.tomp2p.futures.BaseFuture; import net.tomp2p.futures.BaseFutureListener; @@ -33,50 +34,37 @@ import net.tomp2p.futures.FutureDirect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -/** - * That service delivers direct messaging and DHT functionality from the TomP2P library - * It is the translating domain specific functionality to the messaging layer. - * The TomP2P library codebase shall not be used outside that service. - * That way we limit the dependency of the TomP2P library only to that class (and it's sub components). - *

- */ -public class TomP2PMessageService implements MessageService { +public class TomP2PMessageService extends TomP2PService implements MessageService { private static final Logger log = LoggerFactory.getLogger(TomP2PMessageService.class); - private final TomP2PNode tomP2PNode; private final CopyOnWriteArrayList messageHandlers = new CopyOnWriteArrayList<>(); - private Executor executor; /////////////////////////////////////////////////////////////////////////////////////////// // Constructor /////////////////////////////////////////////////////////////////////////////////////////// + @Inject public TomP2PMessageService(TomP2PNode tomP2PNode) { - this.tomP2PNode = tomP2PNode; - } - - @Override - public void setExecutor(Executor executor) { - this.executor = executor; + super(tomP2PNode); } /////////////////////////////////////////////////////////////////////////////////////////// - // Messages + // MessageService implementation /////////////////////////////////////////////////////////////////////////////////////////// @Override public void sendMessage(Peer peer, Message message, SendMessageListener listener) { - if (!(peer instanceof TomP2PPeer)) { - throw new IllegalArgumentException("peer must be of type TomP2PPeer"); - } - FutureDirect futureDirect = tomP2PNode.sendData(((TomP2PPeer) peer).getPeerAddress(), message); + if (!(peer instanceof TomP2PPeer)) + throw new IllegalArgumentException("Peer must be of type TomP2PPeer"); + + FutureDirect futureDirect = peerDHT.peer().sendDirect(((TomP2PPeer) peer).getPeerAddress()).object(message).start(); futureDirect.addListener(new BaseFutureListener() { @Override public void operationComplete(BaseFuture future) throws Exception { if (future.isSuccess()) { + log.debug("sendMessage completed"); executor.execute(listener::handleResult); } else { @@ -87,37 +75,34 @@ public class TomP2PMessageService implements MessageService { @Override public void exceptionCaught(Throwable t) throws Exception { + log.error("Exception at sendMessage " + t.toString()); executor.execute(listener::handleFault); } }); } - - /////////////////////////////////////////////////////////////////////////////////////////// - // Event Listeners - /////////////////////////////////////////////////////////////////////////////////////////// - @Override public void addMessageHandler(MessageHandler listener) { if (!messageHandlers.add(listener)) - throw new RuntimeException("Add listener did not change list. Probably listener has been already added."); + throw new IllegalArgumentException("Add listener did not change list. Probably listener has been already added."); } @Override public void removeMessageHandler(MessageHandler listener) { if (!messageHandlers.remove(listener)) - throw new RuntimeException("Try to remove listener which was never added."); + throw new IllegalArgumentException("Try to remove listener which was never added."); } /////////////////////////////////////////////////////////////////////////////////////////// - // Incoming message handler + // MessageHandler implementation /////////////////////////////////////////////////////////////////////////////////////////// @Override public void handleMessage(Message message, Peer sender) { - if (sender instanceof TomP2PPeer) { + if (sender instanceof TomP2PPeer) executor.execute(() -> messageHandlers.stream().forEach(e -> e.handleMessage((Message) message, sender))); - } + else + throw new IllegalArgumentException("Peer must be of type TomP2PPeer"); } } diff --git a/core/src/main/java/io/bitsquare/network/tomp2p/TomP2PNetworkModule.java b/core/src/main/java/io/bitsquare/network/tomp2p/TomP2PNetworkModule.java index e8b95fd5ed..c99244d476 100644 --- a/core/src/main/java/io/bitsquare/network/tomp2p/TomP2PNetworkModule.java +++ b/core/src/main/java/io/bitsquare/network/tomp2p/TomP2PNetworkModule.java @@ -17,22 +17,17 @@ package io.bitsquare.network.tomp2p; +import io.bitsquare.network.AddressService; import io.bitsquare.network.BootstrapNodes; import io.bitsquare.network.ClientNode; -import io.bitsquare.network.AddressService; import io.bitsquare.network.MessageService; import io.bitsquare.network.NetworkModule; import io.bitsquare.network.Node; import com.google.inject.Injector; -import com.google.inject.Provider; import com.google.inject.Singleton; import com.google.inject.name.Names; -import javax.inject.Inject; - -import javafx.application.Platform; - import org.springframework.core.env.Environment; import static io.bitsquare.network.tomp2p.BootstrappedPeerBuilder.*; @@ -50,10 +45,14 @@ public class TomP2PNetworkModule extends NetworkModule { @Override protected void doConfigure() { + // Used both ClientNode and TomP2PNode for injection bind(ClientNode.class).to(TomP2PNode.class).in(Singleton.class); bind(TomP2PNode.class).in(Singleton.class); - bind(MessageService.class).toProvider(TomP2PMessageServiceProvider.class).in(Singleton.class); - bind(AddressService.class).toProvider(TomP2PAddressServiceProvider.class).in(Singleton.class); + + bind(BootstrappedPeerBuilder.class).in(Singleton.class); + + bind(AddressService.class).to(TomP2PAddressService.class).in(Singleton.class); + bind(MessageService.class).to(TomP2PMessageService.class).in(Singleton.class); bind(int.class).annotatedWith(Names.named(Node.PORT_KEY)).toInstance(env.getProperty(Node.PORT_KEY, int.class, Node.DEFAULT_PORT)); bind(boolean.class).annotatedWith(Names.named(USE_MANUAL_PORT_FORWARDING_KEY)).toInstance( @@ -66,43 +65,14 @@ public class TomP2PNetworkModule extends NetworkModule { ) ); bindConstant().annotatedWith(Names.named(NETWORK_INTERFACE_KEY)).to(env.getProperty(NETWORK_INTERFACE_KEY, NETWORK_INTERFACE_UNSPECIFIED)); - bind(BootstrappedPeerBuilder.class).in(Singleton.class); } @Override protected void doClose(Injector injector) { super.doClose(injector); - // First shut down TomP2PNode to remove address from DHT - injector.getInstance(TomP2PNode.class).shutDown(); + // First shut down AddressService to remove address from DHT + injector.getInstance(AddressService.class).shutDown(); injector.getInstance(BootstrappedPeerBuilder.class).shutDown(); } -} - -class TomP2PMessageServiceProvider implements Provider { - private final MessageService messageService; - - @Inject - public TomP2PMessageServiceProvider(TomP2PNode tomP2PNode) { - messageService = new TomP2PMessageService(tomP2PNode); - messageService.setExecutor(Platform::runLater); - } - - public MessageService get() { - return messageService; - } -} - -class TomP2PAddressServiceProvider implements Provider { - private final AddressService addressService; - - @Inject - public TomP2PAddressServiceProvider(TomP2PNode tomP2PNode) { - addressService = new TomP2PAddressService(tomP2PNode); - addressService.setExecutor(Platform::runLater); - } - - public AddressService get() { - return addressService; - } } \ No newline at end of file diff --git a/core/src/main/java/io/bitsquare/network/tomp2p/TomP2PNode.java b/core/src/main/java/io/bitsquare/network/tomp2p/TomP2PNode.java index c8dea9e36e..0d8ff09e30 100644 --- a/core/src/main/java/io/bitsquare/network/tomp2p/TomP2PNode.java +++ b/core/src/main/java/io/bitsquare/network/tomp2p/TomP2PNode.java @@ -23,36 +23,20 @@ import io.bitsquare.network.ClientNode; import io.bitsquare.network.ConnectionType; import io.bitsquare.network.Message; import io.bitsquare.network.MessageHandler; -import io.bitsquare.network.NetworkException; import io.bitsquare.network.Node; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; -import java.io.IOException; - import java.security.KeyPair; -import java.security.PublicKey; - -import java.util.Timer; -import java.util.TimerTask; import javax.annotation.Nullable; import javax.inject.Inject; -import net.tomp2p.dht.FutureGet; -import net.tomp2p.dht.FuturePut; -import net.tomp2p.dht.FutureRemove; import net.tomp2p.dht.PeerDHT; -import net.tomp2p.futures.BaseFuture; -import net.tomp2p.futures.BaseFutureListener; -import net.tomp2p.futures.FutureDirect; -import net.tomp2p.peers.Number160; import net.tomp2p.peers.PeerAddress; -import net.tomp2p.storage.Data; -import net.tomp2p.utils.Utils; import org.jetbrains.annotations.NotNull; @@ -63,26 +47,12 @@ import rx.Observable; import rx.subjects.BehaviorSubject; import rx.subjects.Subject; -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * The fully bootstrapped P2PNode which is responsible himself for his availability in the messaging system. It saves - * for instance the IP address periodically. - * This class is offering generic functionality of TomP2P needed for Bitsquare, like data and domain protection. - * It does not handle any domain aspects of Bitsquare. - */ public class TomP2PNode implements ClientNode { private static final Logger log = LoggerFactory.getLogger(TomP2PNode.class); - private static final int IP_CHECK_PERIOD = 2 * 60 * 1000; // Cheap call if nothing changes, so set it short to 2 min. - private static final int STORE_ADDRESS_PERIOD = 5 * 60 * 1000; // Save every 5 min. - private static final int ADDRESS_TTL = STORE_ADDRESS_PERIOD * 2; // TTL 10 min. - private KeyPair keyPair; - private PeerAddress storedPeerAddress; private PeerDHT peerDHT; private BootstrappedPeerBuilder bootstrappedPeerBuilder; - private Timer timerForStoreAddress; - private Timer timerForIPCheck; + private final Subject bootstrapStateSubject; /////////////////////////////////////////////////////////////////////////////////////////// @@ -92,26 +62,24 @@ public class TomP2PNode implements ClientNode { @Inject public TomP2PNode(BootstrappedPeerBuilder bootstrappedPeerBuilder) { this.bootstrappedPeerBuilder = bootstrappedPeerBuilder; + bootstrapStateSubject = BehaviorSubject.create(); } // for unit testing TomP2PNode(KeyPair keyPair, PeerDHT peerDHT) { - this.keyPair = keyPair; this.peerDHT = peerDHT; peerDHT.peerBean().keyPair(keyPair); + bootstrapStateSubject = BehaviorSubject.create(); } + /////////////////////////////////////////////////////////////////////////////////////////// // Public methods /////////////////////////////////////////////////////////////////////////////////////////// public Observable bootstrap(KeyPair keyPair, MessageHandler messageHandler) { - checkNotNull(keyPair, "keyPair must not be null."); - - this.keyPair = keyPair; bootstrappedPeerBuilder.setKeyPair(keyPair); - Subject bootstrapStateSubject = BehaviorSubject.create(); bootstrappedPeerBuilder.getBootstrapState().addListener((ov, oldValue, newValue) -> { log.debug("BootstrapState changed " + newValue); @@ -124,14 +92,7 @@ public class TomP2PNode implements ClientNode { public void onSuccess(@Nullable PeerDHT peerDHT) { if (peerDHT != null) { TomP2PNode.this.peerDHT = peerDHT; - setupTimerForIPCheck(); - setupTimerForStoreAddress(); setupReplyHandler(messageHandler); - try { - storeAddress(); - } catch (NetworkException e) { - bootstrapStateSubject.onError(e); - } bootstrapStateSubject.onCompleted(); } else { @@ -150,12 +111,13 @@ public class TomP2PNode implements ClientNode { return bootstrapStateSubject.asObservable(); } - void shutDown() { - timerForIPCheck.cancel(); - timerForStoreAddress.cancel(); - removeAddress(); + public Observable getBootstrapStateAsObservable() { + return bootstrapStateSubject.asObservable(); } + public PeerDHT getPeerDHT() { + return peerDHT; + } @Override public ConnectionType getConnectionType() { @@ -189,147 +151,6 @@ public class TomP2PNode implements ClientNode { } - /////////////////////////////////////////////////////////////////////////////////////////// - // Generic DHT methods - /////////////////////////////////////////////////////////////////////////////////////////// - - // TODO remove all security features for the moment. There are some problems with a "wrong signature!" msg in - // the logs - public FuturePut putDomainProtectedData(Number160 locationKey, Data data) { - log.trace("putDomainProtectedData"); - return peerDHT.put(locationKey).data(data).start(); - } - - public FuturePut putData(Number160 locationKey, Data data) { - log.trace("putData"); - return peerDHT.put(locationKey).data(data).start(); - } - - public FutureGet getDomainProtectedData(Number160 locationKey, PublicKey publicKey) { - log.trace("getDomainProtectedData"); - return peerDHT.get(locationKey).start(); - } - - public FutureGet getData(Number160 locationKey) { - //log.trace("getData"); - return peerDHT.get(locationKey).start(); - } - - public FuturePut addProtectedData(Number160 locationKey, Data data) { - log.trace("addProtectedData"); - return peerDHT.add(locationKey).data(data).start(); - } - - public FutureRemove removeFromDataMap(Number160 locationKey, Data data) { - Number160 contentKey = data.hash(); - log.trace("removeFromDataMap with contentKey " + contentKey.toString()); - return peerDHT.remove(locationKey).contentKey(contentKey).start(); - } - - public FutureGet getDataMap(Number160 locationKey) { - log.trace("getDataMap"); - return peerDHT.get(locationKey).all().start(); - } - - public FutureDirect sendData(PeerAddress peerAddress, Object payLoad) { - log.trace("sendData"); - FutureDirect futureDirect = peerDHT.peer().sendDirect(peerAddress).object(payLoad).start(); - futureDirect.addListener(new BaseFutureListener() { - @Override - public void operationComplete(BaseFuture future) throws Exception { - if (future.isSuccess()) { - log.debug("sendMessage completed"); - } - else { - log.error("sendData failed with Reason " + futureDirect.failedReason()); - } - } - - @Override - public void exceptionCaught(Throwable t) throws Exception { - log.error("Exception at sendData " + t.toString()); - } - }); - - return futureDirect; - } - -// -// public FuturePut putDomainProtectedData(Number160 locationKey, Data data) { -// log.trace("putDomainProtectedData"); -// data.protectEntry(keyPair); -// final Number160 ownerKeyHash = Utils.makeSHAHash(keyPair.getPublic().getEncoded()); -// return peerDHT.put(locationKey).data(data).keyPair(keyPair).domainKey(ownerKeyHash).protectDomain().start(); -// } -// -// // No protection, everybody can write. -// public FuturePut putData(Number160 locationKey, Data data) { -// log.trace("putData"); -// return peerDHT.put(locationKey).data(data).start(); -// } -// -// // Not public readable. Only users with the public key of the peer who stored the data can read that data -// public FutureGet getDomainProtectedData(Number160 locationKey, PublicKey publicKey) { -// log.trace("getDomainProtectedData"); -// final Number160 ownerKeyHash = Utils.makeSHAHash(publicKey.getEncoded()); -// return peerDHT.get(locationKey).domainKey(ownerKeyHash).start(); -// } -// -// // No protection, everybody can read. -// public FutureGet getData(Number160 locationKey) { -// log.trace("getData"); -// return peerDHT.get(locationKey).start(); -// } -// -// // No domain protection, but entry protection -// public FuturePut addProtectedData(Number160 locationKey, Data data) { -// log.trace("addProtectedData"); -// data.protectEntry(keyPair); -// log.trace("addProtectedData with contentKey " + data.hash().toString()); -// return peerDHT.add(locationKey).data(data).keyPair(keyPair).start(); -// } -// -// // No domain protection, but entry protection -// public FutureRemove removeFromDataMap(Number160 locationKey, Data data) { -// log.trace("removeFromDataMap"); -// Number160 contentKey = data.hash(); -// log.trace("removeFromDataMap with contentKey " + contentKey.toString()); -// return peerDHT.remove(locationKey).contentKey(contentKey).keyPair(keyPair).start(); -// } -// -// // Public readable -// public FutureGet getDataMap(Number160 locationKey) { -// log.trace("getDataMap"); -// return peerDHT.get(locationKey).all().start(); -// } - - // Send signed payLoad to peer -// public FutureDirect sendData(PeerAddress peerAddress, Object payLoad) { -// // use 30 seconds as max idle time before connection get closed -// FuturePeerConnection futurePeerConnection = peerDHT.peer().createPeerConnection(peerAddress, 30000); -// FutureDirect futureDirect = peerDHT.peer().sendDirect(futurePeerConnection).object(payLoad).sign().start(); -// futureDirect.addListener(new BaseFutureListener() { -// @Override -// public void operationComplete(BaseFuture future) throws Exception { -// if (futureDirect.isSuccess()) { -// log.debug("sendMessage completed"); -// } -// else { -// log.error("sendData failed with Reason " + futureDirect.failedReason()); -// } -// } -// -// @Override -// public void exceptionCaught(Throwable t) throws Exception { -// log.error("Exception at sendData " + t.toString()); -// } -// }); -// -// return futureDirect; -// } -// - - /////////////////////////////////////////////////////////////////////////////////////////// // Private /////////////////////////////////////////////////////////////////////////////////////////// @@ -353,79 +174,5 @@ public class TomP2PNode implements ClientNode { }); } - private void setupTimerForStoreAddress() { - timerForStoreAddress = new Timer(); - timerForStoreAddress.scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - if (storedPeerAddress != null && peerDHT != null && !storedPeerAddress.equals(peerDHT.peerAddress())) - try { - storeAddress(); - } catch (NetworkException e) { - e.printStackTrace(); - } - } - }, STORE_ADDRESS_PERIOD, STORE_ADDRESS_PERIOD); - } - private void setupTimerForIPCheck() { - timerForIPCheck = new Timer(); - timerForIPCheck.scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - if (storedPeerAddress != null && peerDHT != null && !storedPeerAddress.equals(peerDHT.peerAddress())) - try { - storeAddress(); - } catch (NetworkException e) { - e.printStackTrace(); - } - } - }, IP_CHECK_PERIOD, IP_CHECK_PERIOD); - } - - private void storeAddress() throws NetworkException { - try { - Number160 locationKey = Utils.makeSHAHash(keyPair.getPublic().getEncoded()); - Data data = new Data(new TomP2PPeer(peerDHT.peerAddress())); - // We set a short time-to-live to make getAddress checks fail fast in case if the offerer is offline and to support cheap offerbook state updates - data.ttlSeconds(ADDRESS_TTL); - log.debug("storePeerAddress " + peerDHT.peerAddress().toString()); - FuturePut futurePut = putDomainProtectedData(locationKey, data); - futurePut.addListener(new BaseFutureListener() { - @Override - public void operationComplete(BaseFuture future) throws Exception { - if (future.isSuccess()) { - storedPeerAddress = peerDHT.peerAddress(); - log.debug("storedPeerAddress = " + storedPeerAddress); - } - else { - log.error("storedPeerAddress not successful"); - throw new NetworkException("Storing address was not successful. Reason: " + future.failedReason()); - } - } - - @Override - public void exceptionCaught(Throwable t) throws Exception { - log.error("Exception at storedPeerAddress " + t.toString()); - throw new NetworkException("Exception at storeAddress.", t); - } - }); - } catch (IOException e) { - e.printStackTrace(); - log.error("Exception at storePeerAddress " + e.toString()); - throw new NetworkException("Exception at storeAddress.", e); - } - } - - - private void removeAddress() { - try { - Number160 locationKey = Utils.makeSHAHash(keyPair.getPublic().getEncoded()); - Data data = new Data(new TomP2PPeer(peerDHT.peerAddress())); - removeFromDataMap(locationKey, data).awaitUninterruptibly(2000); // give it max. 2 sec. to remove the address at shut down - } catch (IOException e) { - e.printStackTrace(); - log.error("Exception at removeAddress " + e.toString()); - } - } } diff --git a/core/src/main/java/io/bitsquare/network/tomp2p/TomP2PService.java b/core/src/main/java/io/bitsquare/network/tomp2p/TomP2PService.java new file mode 100644 index 0000000000..7c3619cc8b --- /dev/null +++ b/core/src/main/java/io/bitsquare/network/tomp2p/TomP2PService.java @@ -0,0 +1,96 @@ +/* + * This file is part of Bitsquare. + * + * Bitsquare is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bitsquare is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bitsquare. If not, see . + */ + +package io.bitsquare.network.tomp2p; + +import io.bitsquare.network.BootstrapState; +import io.bitsquare.network.NetworkService; + +import java.util.concurrent.Executor; + +import javax.inject.Inject; + +import javafx.application.Platform; + +import net.tomp2p.dht.PeerDHT; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import rx.Observable; +import rx.Subscriber; + + +/** + * That service delivers direct messaging and DHT functionality from the TomP2P library + * It is the translating domain specific functionality to the messaging layer. + * The TomP2P library codebase shall not be used outside that service. + * That way we limit the dependency of the TomP2P library only to that class (and it's sub components). + *

+ */ +public class TomP2PService implements NetworkService { + private static final Logger log = LoggerFactory.getLogger(TomP2PService.class); + + private final Subscriber subscriber; + + protected Executor executor = Platform::runLater; + protected PeerDHT peerDHT; + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Constructor + /////////////////////////////////////////////////////////////////////////////////////////// + + @Inject + public TomP2PService(TomP2PNode tomP2PNode) { + Observable bootstrapStateAsObservable = tomP2PNode.getBootstrapStateAsObservable(); + subscriber = new Subscriber() { + @Override + public void onCompleted() { + executor.execute(() -> { + peerDHT = tomP2PNode.getPeerDHT(); + subscriber.unsubscribe(); + bootstrapCompleted(); + }); + } + + @Override + public void onError(Throwable throwable) { + } + + @Override + public void onNext(BootstrapState bootstrapState) { + } + }; + bootstrapStateAsObservable.subscribe(subscriber); + } + + @Override + public void bootstrapCompleted() { + + } + + @Override + public void setExecutor(Executor executor) { + this.executor = executor; + } + + @Override + public void shutDown() { + } + +} diff --git a/core/src/main/java/io/bitsquare/offer/OfferBookService.java b/core/src/main/java/io/bitsquare/offer/OfferBookService.java index 1f007784c2..dbf3884ef3 100644 --- a/core/src/main/java/io/bitsquare/offer/OfferBookService.java +++ b/core/src/main/java/io/bitsquare/offer/OfferBookService.java @@ -19,15 +19,13 @@ package io.bitsquare.offer; import io.bitsquare.common.handlers.FaultHandler; import io.bitsquare.common.handlers.ResultHandler; +import io.bitsquare.network.DHTService; import java.util.List; -import java.util.concurrent.Executor; import javafx.beans.property.LongProperty; -public interface OfferBookService { - - void setExecutor(Executor executor); +public interface OfferBookService extends DHTService { void getOffers(String fiatCode); diff --git a/core/src/main/java/io/bitsquare/offer/tomp2p/TomP2POfferBookService.java b/core/src/main/java/io/bitsquare/offer/tomp2p/TomP2POfferBookService.java index c82d4241fd..b42be46640 100644 --- a/core/src/main/java/io/bitsquare/offer/tomp2p/TomP2POfferBookService.java +++ b/core/src/main/java/io/bitsquare/offer/tomp2p/TomP2POfferBookService.java @@ -19,6 +19,7 @@ package io.bitsquare.offer.tomp2p; import io.bitsquare.common.handlers.FaultHandler; import io.bitsquare.common.handlers.ResultHandler; +import io.bitsquare.network.tomp2p.TomP2PDHTService; import io.bitsquare.network.tomp2p.TomP2PNode; import io.bitsquare.offer.Offer; import io.bitsquare.offer.OfferBookService; @@ -28,7 +29,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.Executor; import javafx.beans.property.LongProperty; import javafx.beans.property.SimpleLongProperty; @@ -46,22 +46,16 @@ import net.tomp2p.storage.Data; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class TomP2POfferBookService implements OfferBookService { +public class TomP2POfferBookService extends TomP2PDHTService implements OfferBookService { private static final Logger log = LoggerFactory.getLogger(TomP2POfferBookService.class); private final List offerRepositoryListeners = new ArrayList<>(); private final LongProperty invalidationTimestamp = new SimpleLongProperty(0); - private final TomP2PNode tomP2PNode; - private Executor executor; public TomP2POfferBookService(TomP2PNode tomP2PNode) { - this.tomP2PNode = tomP2PNode; - } - - public void setExecutor(Executor executor) { - this.executor = executor; + super(tomP2PNode); } @Override @@ -75,7 +69,7 @@ public class TomP2POfferBookService implements OfferBookService { offerData.ttlSeconds(defaultOfferTTL); log.trace("Add offer to DHT requested. Added data: [locationKey: " + locationKey + ", hash: " + offerData.hash().toString() + "]"); - FuturePut futurePut = tomP2PNode.addProtectedData(locationKey, offerData); + FuturePut futurePut = addProtectedData(locationKey, offerData); futurePut.addListener(new BaseFutureListener() { @Override public void operationComplete(BaseFuture future) throws Exception { @@ -118,7 +112,7 @@ public class TomP2POfferBookService implements OfferBookService { final Data offerData = new Data(offer); log.trace("Remove offer from DHT requested. Removed data: [locationKey: " + locationKey + ", hash: " + offerData.hash().toString() + "]"); - FutureRemove futureRemove = tomP2PNode.removeFromDataMap(locationKey, offerData); + FutureRemove futureRemove = removeFromDataMap(locationKey, offerData); futureRemove.addListener(new BaseFutureListener() { @Override public void operationComplete(BaseFuture future) throws Exception { @@ -163,7 +157,7 @@ public class TomP2POfferBookService implements OfferBookService { public void getOffers(String currencyCode) { Number160 locationKey = Number160.createHash(currencyCode); log.trace("Get offers from DHT requested for locationKey: " + locationKey); - FutureGet futureGet = tomP2PNode.getDataMap(locationKey); + FutureGet futureGet = getDataMap(locationKey); futureGet.addListener(new BaseFutureAdapter() { @Override public void operationComplete(BaseFuture future) throws Exception { @@ -227,7 +221,7 @@ public class TomP2POfferBookService implements OfferBookService { private void writeInvalidationTimestampToDHT(String currencyCode) { invalidationTimestamp.set(System.currentTimeMillis()); try { - FuturePut putFuture = tomP2PNode.putData(getInvalidatedLocationKey(currencyCode), + FuturePut putFuture = putData(getInvalidatedLocationKey(currencyCode), new Data(invalidationTimestamp.get())); putFuture.addListener(new BaseFutureListener() { @Override @@ -254,7 +248,7 @@ public class TomP2POfferBookService implements OfferBookService { } public void requestInvalidationTimeStampFromDHT(String currencyCode) { - FutureGet futureGet = tomP2PNode.getData(getInvalidatedLocationKey(currencyCode)); + FutureGet futureGet = getData(getInvalidatedLocationKey(currencyCode)); futureGet.addListener(new BaseFutureListener() { @Override public void operationComplete(BaseFuture future) throws Exception { diff --git a/core/src/main/java/io/bitsquare/trade/TradeModule.java b/core/src/main/java/io/bitsquare/trade/TradeModule.java index 4fea655910..acab0b98ca 100644 --- a/core/src/main/java/io/bitsquare/trade/TradeModule.java +++ b/core/src/main/java/io/bitsquare/trade/TradeModule.java @@ -30,12 +30,7 @@ public class TradeModule extends BitsquareModule { } @Override - protected final void configure() { - doConfigure(); - + protected void configure() { bind(TradeManager.class).in(Singleton.class); } - - protected void doConfigure() { - } } diff --git a/core/src/test/java/io/bitsquare/network/tomp2p/TomP2PNodeTest.java b/core/src/test/java/io/bitsquare/network/tomp2p/TomP2PNodeTest.java deleted file mode 100644 index bd38baf0d4..0000000000 --- a/core/src/test/java/io/bitsquare/network/tomp2p/TomP2PNodeTest.java +++ /dev/null @@ -1,421 +0,0 @@ -/* - * This file is part of Bitsquare. - * - * Bitsquare is free software: you can redistribute it and/or modify it - * under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or (at - * your option) any later version. - * - * Bitsquare is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public - * License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with Bitsquare. If not, see . - */ - -package io.bitsquare.network.tomp2p; - -import java.io.IOException; - -import java.security.KeyPair; -import java.security.KeyPairGenerator; - -import java.util.Random; - -import net.tomp2p.connection.Ports; -import net.tomp2p.dht.FutureGet; -import net.tomp2p.dht.FuturePut; -import net.tomp2p.dht.FutureRemove; -import net.tomp2p.dht.PeerBuilderDHT; -import net.tomp2p.dht.PeerDHT; -import net.tomp2p.dht.UtilsDHT2; -import net.tomp2p.futures.FutureDirect; -import net.tomp2p.p2p.PeerBuilder; -import net.tomp2p.peers.Number160; -import net.tomp2p.peers.PeerAddress; -import net.tomp2p.rpc.ObjectDataReply; -import net.tomp2p.storage.Data; -import net.tomp2p.utils.Utils; - -import org.junit.Assert; -import org.junit.Ignore; -import org.junit.Test; - -import static org.junit.Assert.*; - -// TODO Reactivate tests when TomP2PNode is using original code again. we deactivated the security features atm. -// cause IOException: Not listening to anything. Maybe your binding information is wrong. -// investigate what has broken it, probably from update to latest head -@Ignore -public class TomP2PNodeTest { - - final private static Random rnd = new Random(42L); - - @Test - public void testSendData() throws Exception { - PeerDHT[] peers = UtilsDHT2.createNodes(3, rnd, new Ports().tcpPort()); - PeerDHT master = peers[0]; - PeerDHT client = peers[1]; - PeerDHT otherPeer = peers[2]; - UtilsDHT2.perfectRouting(peers); - - - for (final PeerDHT peer : peers) { - peer.peer().objectDataReply(new ObjectDataReply() { - @Override - public Object reply(PeerAddress sender, Object request) throws Exception { - return true; - } - }); - } - - final KeyPairGenerator keyGen = KeyPairGenerator.getInstance("DSA"); - keyGen.initialize(1024); - KeyPair keyPairClient = keyGen.genKeyPair(); - KeyPair keyPairOtherPeer = keyGen.genKeyPair(); - - TomP2PNode node; - Number160 locationKey; - Object object; - FutureDirect futureDirect; - - node = new TomP2PNode(keyPairClient, client); - object = "clients data"; - futureDirect = node.sendData(otherPeer.peerAddress(), object); - futureDirect.awaitUninterruptibly(); - - assertTrue(futureDirect.isSuccess()); - // we return true from objectDataReply - assertTrue((Boolean) futureDirect.object()); - - master.shutdown(); - } - - @Test - public void testProtectedPutGet() throws Exception { - PeerDHT[] peers = UtilsDHT2.createNodes(3, rnd, new Ports().tcpPort()); - PeerDHT master = peers[0]; - PeerDHT client = peers[1]; - PeerDHT otherPeer = peers[2]; - UtilsDHT2.perfectRouting(peers); - - final KeyPairGenerator keyGen = KeyPairGenerator.getInstance("DSA"); - keyGen.initialize(1024); - KeyPair keyPairClient = keyGen.genKeyPair(); - KeyPair keyPairOtherPeer = keyGen.genKeyPair(); - - TomP2PNode node; - Number160 locationKey; - Data data; - FuturePut futurePut; - FutureGet futureGet; - - // otherPeer tries to squat clients location store - // he can do it but as he has not the domain key of the client he cannot do any harm - // he only can store und that path: locationKey.otherPeerDomainKey.data - node = new TomP2PNode(keyPairOtherPeer, otherPeer); - locationKey = Number160.createHash("clients location"); - data = new Data("otherPeer data"); - futurePut = node.putDomainProtectedData(locationKey, data); - futurePut.awaitUninterruptibly(); - assertTrue(futurePut.isSuccess()); - - futureGet = node.getDomainProtectedData(locationKey, keyPairOtherPeer.getPublic()); - futureGet.awaitUninterruptibly(); - assertTrue(futureGet.isSuccess()); - assertEquals("otherPeer data", futureGet.data().object()); - - // client store his data und his domainkey, no problem with previous occupied - // he only can store und that path: locationKey.clientDomainKey.data - node = new TomP2PNode(keyPairClient, client); - locationKey = Number160.createHash("clients location"); - data = new Data("client data"); - futurePut = node.putDomainProtectedData(locationKey, data); - futurePut.awaitUninterruptibly(); - assertTrue(futurePut.isSuccess()); - - futureGet = node.getDomainProtectedData(locationKey, keyPairClient.getPublic()); - futureGet.awaitUninterruptibly(); - assertTrue(futureGet.isSuccess()); - assertEquals("client data", futureGet.data().object()); - - // also other peers can read that data if they know the public key of the client - node = new TomP2PNode(keyPairOtherPeer, otherPeer); - futureGet = node.getDomainProtectedData(locationKey, keyPairClient.getPublic()); - futureGet.awaitUninterruptibly(); - assertTrue(futureGet.isSuccess()); - assertEquals("client data", futureGet.data().object()); - - - // other peer try to use pub key of other peer as domain key hash. - // must fail as he don't have the full key pair (private key of client missing) - locationKey = Number160.createHash("clients location"); - data = new Data("otherPeer data hack"); - - data.protectEntry(keyPairOtherPeer); - // he use the pub key from the client - final Number160 keyHash = Utils.makeSHAHash(keyPairClient.getPublic().getEncoded()); - futurePut = otherPeer.put(locationKey).data(data).keyPair(keyPairOtherPeer).domainKey(keyHash) - .protectDomain().start(); - - futurePut.awaitUninterruptibly(); - assertFalse(futurePut.isSuccess()); - - // he can read his prev. stored data - node = new TomP2PNode(keyPairOtherPeer, otherPeer); - futureGet = node.getDomainProtectedData(locationKey, keyPairOtherPeer.getPublic()); - futureGet.awaitUninterruptibly(); - assertTrue(futureGet.isSuccess()); - assertEquals("otherPeer data", futureGet.data().object()); - - // he can read clients data - futureGet = node.getDomainProtectedData(locationKey, keyPairClient.getPublic()); - futureGet.awaitUninterruptibly(); - assertTrue(futureGet.isSuccess()); - assertEquals("client data", futureGet.data().object()); - - master.shutdown(); - } - - @Test - public void testChangeEntryProtectionKey() throws Exception { - KeyPairGenerator gen = KeyPairGenerator.getInstance("DSA"); - - KeyPair keyPair1 = gen.generateKeyPair(); - KeyPair keyPair2 = gen.generateKeyPair(); - PeerDHT p1 = new PeerBuilderDHT(new PeerBuilder(Number160.createHash(1)).ports(4838) - .keyPair(keyPair1).start()).start(); - PeerDHT p2 = new PeerBuilderDHT(new PeerBuilder(Number160.createHash(2)).ports(4839) - .keyPair(keyPair2).start()).start(); - - p2.peer().bootstrap().peerAddress(p1.peerAddress()).start().awaitUninterruptibly(); - p1.peer().bootstrap().peerAddress(p2.peerAddress()).start().awaitUninterruptibly(); - - Data data = new Data("test").protectEntry(keyPair1); - FuturePut fp1 = p1.put(Number160.createHash("key1")).sign().data(data).start().awaitUninterruptibly(); - Assert.assertTrue(fp1.isSuccess()); - FuturePut fp2 = p2.put(Number160.createHash("key1")).data(data).start().awaitUninterruptibly(); - Assert.assertTrue(!fp2.isSuccess()); - - Data data2 = new Data().protectEntry(keyPair2); - data2.publicKey(keyPair2.getPublic()); - FuturePut fp3 = - p1.put(Number160.createHash("key1")).sign().putMeta().data(data2).start().awaitUninterruptibly(); - Assert.assertTrue(fp3.isSuccess()); - - FuturePut fp4 = p2.put(Number160.createHash("key1")).sign().data(data).start().awaitUninterruptibly(); - Assert.assertTrue(fp4.isSuccess()); - - p1.shutdown().awaitUninterruptibly(); - p2.shutdown().awaitUninterruptibly(); - } - - - @Test - public void testAddToListGetList() throws Exception { - - PeerDHT[] peers = UtilsDHT2.createNodes(3, rnd, new Ports().tcpPort()); - PeerDHT master = peers[0]; - PeerDHT client = peers[1]; - PeerDHT otherPeer = peers[2]; - UtilsDHT2.perfectRouting(peers); - - TomP2PNode node; - final KeyPairGenerator keyGen = KeyPairGenerator.getInstance("DSA"); - keyGen.initialize(1024); - KeyPair keyPairClient = keyGen.genKeyPair(); - KeyPair keyPairOtherPeer = keyGen.genKeyPair(); - - Number160 locationKey; - Data data; - FuturePut futurePut; - FutureGet futureGet; - - // client add a value - - KeyPairGenerator gen = KeyPairGenerator.getInstance("DSA"); - KeyPair keyPair1 = gen.generateKeyPair(); - keyPairClient = keyPair1; - - node = new TomP2PNode(keyPairClient, client); - locationKey = Number160.createHash("add to list clients location"); - data = new Data("add to list client data1"); - Data data_1 = data; - futurePut = node.addProtectedData(locationKey, data); - futurePut.awaitUninterruptibly(); - assertTrue(futurePut.isSuccess()); - - data = new Data("add to list client data2"); - Data data_2 = data; - futurePut = node.addProtectedData(locationKey, data); - futurePut.awaitUninterruptibly(); - assertTrue(futurePut.isSuccess()); - - futureGet = node.getDataMap(locationKey); - futureGet.awaitUninterruptibly(); - assertTrue(futureGet.isSuccess()); - boolean foundData1 = futureGet.dataMap().values().stream().anyMatch(data1 -> { - try { - return data1.object().equals("add to list client data1"); - } catch (ClassNotFoundException | IOException e) { - e.printStackTrace(); - } - return false; - }); - boolean foundData2 = futureGet.dataMap().values().stream().anyMatch(data1 -> { - try { - return data1.object().equals("add to list client data2"); - } catch (ClassNotFoundException | IOException e) { - e.printStackTrace(); - } - return false; - }); - assertTrue(foundData1); - assertTrue(foundData2); - assertEquals(2, futureGet.dataMap().values().size()); - - - // other peer tried to overwrite that entry - // but will not succeed, instead he will add a new entry. - // TODO investigate why it is not possible to overwrite the entry with that method - // The protection entry with the key does not make any difference as also the client himself cannot overwrite - // any entry - // http://tomp2p.net/doc/P2P-with-TomP2P-1.pdf - // "add(location_key, value) is translated to put(location_key, hash(value), value)" - - // fake content key with content key from previous clients entry - Number160 contentKey = Number160.createHash("add to list client data1"); - - data = new Data("add to list other peer data HACK!"); - data.protectEntry(keyPairOtherPeer); // also with client key it does not work... - futurePut = otherPeer.put(locationKey).data(contentKey, data).keyPair(keyPairOtherPeer).start(); - futurePut.awaitUninterruptibly(); - assertTrue(futurePut.isSuccess()); - - node = new TomP2PNode(keyPairOtherPeer, otherPeer); - futureGet = node.getDataMap(locationKey); - futureGet.awaitUninterruptibly(); - assertTrue(futureGet.isSuccess()); - - foundData1 = futureGet.dataMap().values().stream().anyMatch(data1 -> { - try { - return data1.object().equals("add to list client data1"); - } catch (ClassNotFoundException | IOException e) { - e.printStackTrace(); - } - return false; - }); - foundData2 = futureGet.dataMap().values().stream().anyMatch(data1 -> { - try { - return data1.object().equals("add to list client data2"); - } catch (ClassNotFoundException | IOException e) { - e.printStackTrace(); - } - return false; - }); - boolean foundData3 = futureGet.dataMap().values().stream().anyMatch(data1 -> { - try { - return data1.object().equals("add to list other peer data HACK!"); - } catch (ClassNotFoundException | IOException e) { - e.printStackTrace(); - } - return false; - }); - assertTrue(foundData1); - assertTrue(foundData2); - assertTrue(foundData3); - assertEquals(3, futureGet.dataMap().values().size()); - - - // client removes his entry -> OK - node = new TomP2PNode(keyPairClient, client); - FutureRemove futureRemove = node.removeFromDataMap(locationKey, data_1); - futureRemove.awaitUninterruptibly(); - - // We don't test futureRemove.isSuccess() as this API does not fit well to that operation, - // it might change in future to something like foundAndRemoved and notFound - // See discussion at: https://github.com/tomp2p/TomP2P/issues/57#issuecomment-62069840 - - - futureGet = node.getDataMap(locationKey); - futureGet.awaitUninterruptibly(); - assertTrue(futureGet.isSuccess()); - - foundData1 = futureGet.dataMap().values().stream().anyMatch(data1 -> { - try { - return data1.object().equals("add to list client data1"); - } catch (ClassNotFoundException | IOException e) { - e.printStackTrace(); - } - return false; - }); - foundData2 = futureGet.dataMap().values().stream().anyMatch(data1 -> { - try { - return data1.object().equals("add to list client data2"); - } catch (ClassNotFoundException | IOException e) { - e.printStackTrace(); - } - return false; - }); - foundData3 = futureGet.dataMap().values().stream().anyMatch(data1 -> { - try { - return data1.object().equals("add to list other peer data HACK!"); - } catch (ClassNotFoundException | IOException e) { - e.printStackTrace(); - } - return false; - }); - - assertFalse(foundData1); - assertTrue(foundData2); - assertTrue(foundData3); - assertEquals(2, futureGet.dataMap().values().size()); - - - // otherPeer tries to removes client entry -> FAIL - node = new TomP2PNode(keyPairOtherPeer, otherPeer); - futureRemove = node.removeFromDataMap(locationKey, data_2); - futureRemove.awaitUninterruptibly(); - assertFalse(futureRemove.isSuccess()); - - futureGet = node.getDataMap(locationKey); - futureGet.awaitUninterruptibly(); - assertTrue(futureGet.isSuccess()); - - foundData1 = futureGet.dataMap().values().stream().anyMatch(data1 -> { - try { - return data1.object().equals("add to list client data1"); - } catch (ClassNotFoundException | IOException e) { - e.printStackTrace(); - } - return false; - }); - foundData2 = futureGet.dataMap().values().stream().anyMatch(data1 -> { - try { - return data1.object().equals("add to list client data2"); - } catch (ClassNotFoundException | IOException e) { - e.printStackTrace(); - } - return false; - }); - foundData3 = futureGet.dataMap().values().stream().anyMatch(data1 -> { - try { - return data1.object().equals("add to list other peer data HACK!"); - } catch (ClassNotFoundException | IOException e) { - e.printStackTrace(); - } - return false; - }); - - assertFalse(foundData1); - assertTrue(foundData2); - assertTrue(foundData3); - assertEquals(2, futureGet.dataMap().values().size()); - - master.shutdown(); - } - - -}