Refactor TomP2P services

This commit is contained in:
Manfred Karrer 2015-03-19 23:51:21 +01:00
parent 1893953948
commit 3f12247d65
24 changed files with 574 additions and 887 deletions

View File

@ -19,11 +19,12 @@ package io.bitsquare.arbitration;
import io.bitsquare.arbitration.listeners.ArbitratorListener;
import io.bitsquare.network.DHTService;
import java.util.Locale;
import java.util.concurrent.Executor;
public interface ArbitratorMessageService {
public interface ArbitratorService extends DHTService {
void setExecutor(Executor executor);
void addArbitrator(Arbitrator arbitrator);

View File

@ -18,17 +18,10 @@
package io.bitsquare.arbitration.tomp2p;
import io.bitsquare.arbitration.ArbitratorMessageModule;
import io.bitsquare.arbitration.ArbitratorMessageService;
import io.bitsquare.network.tomp2p.TomP2PNode;
import io.bitsquare.arbitration.ArbitratorService;
import com.google.inject.Injector;
import com.google.inject.Provider;
import com.google.inject.Singleton;
import javax.inject.Inject;
import javafx.application.Platform;
import org.springframework.core.env.Environment;
public class TomP2PArbitratorMessageModule extends ArbitratorMessageModule {
@ -39,25 +32,6 @@ public class TomP2PArbitratorMessageModule extends ArbitratorMessageModule {
@Override
protected void doConfigure() {
bind(ArbitratorMessageService.class).toProvider(ArbitratorMessageServiceProvider.class).in(Singleton.class);
}
@Override
protected void doClose(Injector injector) {
super.doClose(injector);
bind(ArbitratorService.class).to(TomP2PArbitratorService.class).in(Singleton.class);
}
}
class ArbitratorMessageServiceProvider implements Provider<ArbitratorMessageService> {
private final ArbitratorMessageService arbitratorMessageService;
@Inject
public ArbitratorMessageServiceProvider(TomP2PNode tomP2PNode) {
arbitratorMessageService = new TomP2PArbitratorMessageService(tomP2PNode);
arbitratorMessageService.setExecutor(Platform::runLater);
}
public ArbitratorMessageService get() {
return arbitratorMessageService;
}
}

View File

@ -18,8 +18,9 @@
package io.bitsquare.arbitration.tomp2p;
import io.bitsquare.arbitration.Arbitrator;
import io.bitsquare.arbitration.ArbitratorMessageService;
import io.bitsquare.arbitration.ArbitratorService;
import io.bitsquare.arbitration.listeners.ArbitratorListener;
import io.bitsquare.network.tomp2p.TomP2PDHTService;
import io.bitsquare.network.tomp2p.TomP2PNode;
import java.io.IOException;
@ -27,7 +28,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.Executor;
import javax.inject.Inject;
import net.tomp2p.dht.FutureGet;
import net.tomp2p.dht.FuturePut;
@ -41,29 +43,24 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TomP2PArbitratorMessageService implements ArbitratorMessageService {
private static final Logger log = LoggerFactory.getLogger(TomP2PArbitratorMessageService.class);
public class TomP2PArbitratorService extends TomP2PDHTService implements ArbitratorService {
private static final Logger log = LoggerFactory.getLogger(TomP2PArbitratorService.class);
private static final String ARBITRATORS_ROOT = "ArbitratorsRoot";
private final TomP2PNode tomP2PNode;
private final List<ArbitratorListener> arbitratorListeners = new ArrayList<>();
private Executor executor;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
public TomP2PArbitratorMessageService(TomP2PNode tomP2PNode) {
this.tomP2PNode = tomP2PNode;
@Inject
public TomP2PArbitratorService(TomP2PNode tomP2PNode) {
super(tomP2PNode);
}
public void setExecutor(Executor executor) {
this.executor = executor;
}
///////////////////////////////////////////////////////////////////////////////////////////
// Arbitrators
///////////////////////////////////////////////////////////////////////////////////////////
@ -73,7 +70,7 @@ public class TomP2PArbitratorMessageService implements ArbitratorMessageService
try {
final Data arbitratorData = new Data(arbitrator);
FuturePut addFuture = tomP2PNode.addProtectedData(locationKey, arbitratorData);
FuturePut addFuture = addProtectedData(locationKey, arbitratorData);
addFuture.addListener(new BaseFutureAdapter<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
@ -107,7 +104,7 @@ public class TomP2PArbitratorMessageService implements ArbitratorMessageService
public void removeArbitrator(Arbitrator arbitrator) throws IOException {
Number160 locationKey = Number160.createHash(ARBITRATORS_ROOT);
final Data arbitratorData = new Data(arbitrator);
FutureRemove removeFuture = tomP2PNode.removeFromDataMap(locationKey, arbitratorData);
FutureRemove removeFuture = removeFromDataMap(locationKey, arbitratorData);
removeFuture.addListener(new BaseFutureAdapter<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
@ -138,7 +135,7 @@ public class TomP2PArbitratorMessageService implements ArbitratorMessageService
public void getArbitrators(Locale languageLocale) {
Number160 locationKey = Number160.createHash(ARBITRATORS_ROOT);
FutureGet futureGet = tomP2PNode.getDataMap(locationKey);
FutureGet futureGet = getDataMap(locationKey);
futureGet.addListener(new BaseFutureAdapter<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {

View File

@ -158,7 +158,13 @@ textfield */
-fx-progress-color: dimgrey;
}
/* .table-view */
/*******************************************************************************
* *
* Table *
* *
******************************************************************************/
.table-view .table-cell {
-fx-alignment: center;
}
@ -175,6 +181,15 @@ textfield */
-fx-fill: black;
}
.table-view:focused {
/*-fx-background-color: transparent;*/
}
.table-view:focused {
-fx-background-color: -fx-box-border, -fx-control-inner-background;
-fx-background-insets: 0, 1;
-fx-padding: 1;
}
.table-view .table-row-cell:selected .table-row-cell:row-selection .table-row-cell:cell-selection .text {
-fx-fill: white;
}
@ -215,10 +230,12 @@ textfield */
-fx-fill: black;
}
#form-header-text {
-fx-font-weight: bold;
-fx-font-size: 14;
}
/*******************************************************************************
* *
* Icons *
* *
******************************************************************************/
#non-clickable-icon {
-fx-text-fill: #AAAAAA;
@ -233,11 +250,13 @@ textfield */
-fx-text-fill: #666666;
}
#form-title {
-fx-font-weight: bold;
}
/* tab pane */
/*******************************************************************************
* *
* Tab pane *
* *
******************************************************************************/
.tab-pane .tab-label {
-fx-font-size: 15;
}
@ -254,11 +273,15 @@ textfield */
-fx-background-color: transparent;
}
/* table-view */
.table-view:focused {
-fx-background-color: transparent;
#form-header-text {
-fx-font-weight: bold;
-fx-font-size: 14;
}
#form-title {
-fx-font-weight: bold;
}
/* scroll-pane */
.scroll-pane {

View File

@ -19,7 +19,7 @@ package io.bitsquare.gui.main;
import io.bitsquare.app.UpdateProcess;
import io.bitsquare.arbitration.Arbitrator;
import io.bitsquare.arbitration.ArbitratorMessageService;
import io.bitsquare.arbitration.ArbitratorService;
import io.bitsquare.arbitration.Reputation;
import io.bitsquare.btc.BitcoinNetwork;
import io.bitsquare.btc.WalletService;
@ -103,7 +103,7 @@ class MainViewModel implements ViewModel {
private final WalletService walletService;
private final ClientNode clientNode;
private MessageService messageService;
private ArbitratorMessageService arbitratorMessageService;
private ArbitratorService arbitratorService;
private final TradeManager tradeManager;
private UpdateProcess updateProcess;
private final BSFormatter formatter;
@ -112,14 +112,14 @@ class MainViewModel implements ViewModel {
@Inject
public MainViewModel(User user, WalletService walletService, ClientNode clientNode, MessageService messageService,
ArbitratorMessageService arbitratorMessageService,
ArbitratorService arbitratorService,
TradeManager tradeManager, BitcoinNetwork bitcoinNetwork, UpdateProcess updateProcess,
BSFormatter formatter, Persistence persistence, AccountSettings accountSettings) {
this.user = user;
this.walletService = walletService;
this.clientNode = clientNode;
this.messageService = messageService;
this.arbitratorMessageService = arbitratorMessageService;
this.arbitratorService = arbitratorService;
this.tradeManager = tradeManager;
this.updateProcess = updateProcess;
this.formatter = formatter;
@ -157,9 +157,9 @@ class MainViewModel implements ViewModel {
error -> log.error(error.toString()),
() -> Platform.runLater(() -> setBitcoinNetworkSyncProgress(1.0)));
Observable<BootstrapState> messageObservable = clientNode.bootstrap(user.getMessageKeyPair(), messageService);
messageObservable.publish();
messageObservable.subscribe(
Observable<BootstrapState> bootstrapStateAsObservable = clientNode.bootstrap(user.getMessageKeyPair(), messageService);
bootstrapStateAsObservable.publish();
bootstrapStateAsObservable.subscribe(
state -> Platform.runLater(() -> setBootstrapState(state)),
error -> Platform.runLater(() -> {
log.error(error.toString());
@ -194,7 +194,7 @@ class MainViewModel implements ViewModel {
log.trace("updateProcess completed");
});
Observable<?> allServices = Observable.merge(messageObservable, walletServiceObservable, updateProcessObservable);
Observable<?> allServices = Observable.merge(bootstrapStateAsObservable, walletServiceObservable, updateProcessObservable);
allServices.subscribe(
next -> {
},
@ -376,7 +376,7 @@ class MainViewModel implements ViewModel {
accountSettings.addAcceptedArbitrator(arbitrator);
persistence.write(accountSettings);
arbitratorMessageService.addArbitrator(arbitrator);
arbitratorService.addArbitrator(arbitrator);
}
}
}

View File

@ -18,7 +18,7 @@
package io.bitsquare.gui.main.account.arbitrator.browser;
import io.bitsquare.arbitration.Arbitrator;
import io.bitsquare.arbitration.ArbitratorMessageService;
import io.bitsquare.arbitration.ArbitratorService;
import io.bitsquare.arbitration.listeners.ArbitratorListener;
import io.bitsquare.common.viewfx.view.ActivatableView;
import io.bitsquare.common.viewfx.view.CachingViewLoader;
@ -55,11 +55,11 @@ public class ArbitratorBrowserView extends ActivatableView<Pane, Void> implement
private final ViewLoader viewLoader;
private final AccountSettings accountSettings;
private final Persistence persistence;
private final ArbitratorMessageService messageService;
private final ArbitratorService messageService;
@Inject
public ArbitratorBrowserView(CachingViewLoader viewLoader, AccountSettings accountSettings, Persistence persistence,
ArbitratorMessageService messageService) {
ArbitratorService messageService) {
this.viewLoader = viewLoader;
this.accountSettings = accountSettings;
this.persistence = persistence;

View File

@ -18,7 +18,7 @@
package io.bitsquare.gui.main.account.arbitrator.registration;
import io.bitsquare.arbitration.Arbitrator;
import io.bitsquare.arbitration.ArbitratorMessageService;
import io.bitsquare.arbitration.ArbitratorService;
import io.bitsquare.arbitration.Reputation;
import io.bitsquare.btc.WalletService;
import io.bitsquare.common.viewfx.view.ActivatableView;
@ -84,13 +84,13 @@ public class ArbitratorRegistrationView extends ActivatableView<AnchorPane, Void
private final Persistence persistence;
private final WalletService walletService;
private final ArbitratorMessageService messageService;
private final ArbitratorService messageService;
private final User user;
private final BSFormatter formatter;
@Inject
private ArbitratorRegistrationView(Persistence persistence, WalletService walletService,
ArbitratorMessageService messageService, User user, BSFormatter formatter) {
ArbitratorService messageService, User user, BSFormatter formatter) {
this.persistence = persistence;
this.walletService = walletService;
this.messageService = messageService;

View File

@ -18,7 +18,7 @@
package io.bitsquare.gui.main.account.content.irc;
import io.bitsquare.arbitration.Arbitrator;
import io.bitsquare.arbitration.ArbitratorMessageService;
import io.bitsquare.arbitration.ArbitratorService;
import io.bitsquare.arbitration.Reputation;
import io.bitsquare.common.viewfx.model.Activatable;
import io.bitsquare.common.viewfx.model.DataModel;
@ -53,7 +53,7 @@ class IrcAccountDataModel implements Activatable, DataModel {
private final User user;
private final AccountSettings accountSettings;
private final ArbitratorMessageService messageService;
private final ArbitratorService messageService;
private final Persistence persistence;
final StringProperty nickName = new SimpleStringProperty();
@ -68,7 +68,7 @@ class IrcAccountDataModel implements Activatable, DataModel {
@Inject
public IrcAccountDataModel(User user, Persistence persistence, AccountSettings accountSettings,
ArbitratorMessageService messageService) {
ArbitratorService messageService) {
this.persistence = persistence;
this.user = user;
this.accountSettings = accountSettings;

View File

@ -18,7 +18,7 @@
package io.bitsquare.gui.main.account.content.restrictions;
import io.bitsquare.arbitration.Arbitrator;
import io.bitsquare.arbitration.ArbitratorMessageService;
import io.bitsquare.arbitration.ArbitratorService;
import io.bitsquare.arbitration.Reputation;
import io.bitsquare.common.viewfx.model.Activatable;
import io.bitsquare.common.viewfx.model.DataModel;
@ -48,7 +48,7 @@ class RestrictionsDataModel implements Activatable, DataModel {
private final User user;
private final AccountSettings accountSettings;
private final Persistence persistence;
private final ArbitratorMessageService messageService;
private final ArbitratorService messageService;
final ObservableList<Locale> languageList = FXCollections.observableArrayList();
final ObservableList<Country> countryList = FXCollections.observableArrayList();
@ -60,7 +60,7 @@ class RestrictionsDataModel implements Activatable, DataModel {
@Inject
public RestrictionsDataModel(User user, AccountSettings accountSettings, Persistence persistence,
ArbitratorMessageService messageService) {
ArbitratorService messageService) {
this.user = user;
this.accountSettings = accountSettings;
this.persistence = persistence;

View File

@ -504,7 +504,9 @@ public class PendingTradesView extends ActivatableViewAndModel<AnchorPane, Pendi
if (visible)
Platform.runLater(() -> {
withdrawAddressTextField.requestFocus();
scrollPane.setVvalue(scrollPane.getVmax());
// delay it once more as it does not get applied at first runLater
Platform.runLater(() -> scrollPane.setVvalue(scrollPane.getVmax()));
});
}

View File

@ -22,10 +22,6 @@ import io.bitsquare.network.listener.GetPeerAddressListener;
import java.security.PublicKey;
import java.util.concurrent.Executor;
public interface AddressService {
void setExecutor(Executor executor);
public interface AddressService extends DHTService{
void findPeerAddress(PublicKey messagePublicKey, GetPeerAddressListener getPeerAddressListener);
}

View File

@ -0,0 +1,43 @@
/*
* This file is part of Bitsquare.
*
* Bitsquare is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bitsquare is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.network;
import java.security.PublicKey;
import net.tomp2p.dht.FutureGet;
import net.tomp2p.dht.FuturePut;
import net.tomp2p.dht.FutureRemove;
import net.tomp2p.peers.Number160;
import net.tomp2p.storage.Data;
public interface DHTService extends NetworkService {
FuturePut putDomainProtectedData(Number160 locationKey, Data data);
FuturePut putData(Number160 locationKey, Data data);
FutureGet getDomainProtectedData(Number160 locationKey, PublicKey publicKey);
FutureGet getData(Number160 locationKey);
FuturePut addProtectedData(Number160 locationKey, Data data);
FutureRemove removeFromDataMap(Number160 locationKey, Data data);
FutureGet getDataMap(Number160 locationKey);
}

View File

@ -20,11 +20,7 @@ package io.bitsquare.network;
import io.bitsquare.network.listener.SendMessageListener;
import java.util.concurrent.Executor;
public interface MessageService extends MessageHandler {
void setExecutor(Executor executor);
public interface MessageService extends NetworkService, MessageHandler {
void sendMessage(Peer peer, Message message, SendMessageListener listener);

View File

@ -0,0 +1,28 @@
/*
* This file is part of Bitsquare.
*
* Bitsquare is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bitsquare is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.network;
import java.util.concurrent.Executor;
public interface NetworkService {
void setExecutor(Executor executor);
void bootstrapCompleted();
void shutDown();
}

View File

@ -18,50 +18,69 @@
package io.bitsquare.network.tomp2p;
import io.bitsquare.network.AddressService;
import io.bitsquare.network.MessageHandler;
import io.bitsquare.network.NetworkException;
import io.bitsquare.network.Peer;
import io.bitsquare.network.listener.GetPeerAddressListener;
import io.bitsquare.user.User;
import java.io.IOException;
import java.security.PublicKey;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.Timer;
import java.util.TimerTask;
import javax.inject.Inject;
import net.tomp2p.dht.FutureGet;
import net.tomp2p.dht.FuturePut;
import net.tomp2p.futures.BaseFuture;
import net.tomp2p.futures.BaseFutureAdapter;
import net.tomp2p.futures.BaseFutureListener;
import net.tomp2p.peers.Number160;
import net.tomp2p.peers.PeerAddress;
import net.tomp2p.storage.Data;
import net.tomp2p.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* That service delivers direct messaging and DHT functionality from the TomP2P library
* It is the translating domain specific functionality to the messaging layer.
* The TomP2P library codebase shall not be used outside that service.
* That way we limit the dependency of the TomP2P library only to that class (and it's sub components).
* <p/>
*/
public class TomP2PAddressService implements AddressService {
public class TomP2PAddressService extends TomP2PDHTService implements AddressService {
private static final Logger log = LoggerFactory.getLogger(TomP2PAddressService.class);
private static final int IP_CHECK_PERIOD = 2 * 60 * 1000; // Cheap call if nothing changes, so set it short to 2 min.
private static final int STORE_ADDRESS_PERIOD = 5 * 60 * 1000; // Save every 5 min.
private static final int ADDRESS_TTL = STORE_ADDRESS_PERIOD * 2; // TTL 10 min.
private final TomP2PNode tomP2PNode;
private final CopyOnWriteArrayList<MessageHandler> messageHandlers = new CopyOnWriteArrayList<>();
private Executor executor;
private final Number160 locationKey;
private PeerAddress storedPeerAddress;
private Timer timerForStoreAddress;
private Timer timerForIPCheck;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
public TomP2PAddressService(TomP2PNode tomP2PNode) {
this.tomP2PNode = tomP2PNode;
@Inject
public TomP2PAddressService(TomP2PNode tomP2PNode, User user) {
super(tomP2PNode);
locationKey = Utils.makeSHAHash(user.getMessageKeyPair().getPublic().getEncoded());
}
public void setExecutor(Executor executor) {
this.executor = executor;
@Override
public void bootstrapCompleted() {
setupTimerForIPCheck();
setupTimerForStoreAddress();
storeAddress();
}
@Override
public void shutDown() {
timerForIPCheck.cancel();
timerForStoreAddress.cancel();
removeAddress();
super.shutDown();
}
@ -72,7 +91,7 @@ public class TomP2PAddressService implements AddressService {
@Override
public void findPeerAddress(PublicKey publicKey, GetPeerAddressListener listener) {
final Number160 locationKey = Utils.makeSHAHash(publicKey.getEncoded());
FutureGet futureGet = tomP2PNode.getDomainProtectedData(locationKey, publicKey);
FutureGet futureGet = getDomainProtectedData(locationKey, publicKey);
log.trace("findPeerAddress called");
futureGet.addListener(new BaseFutureAdapter<BaseFuture>() {
@Override
@ -90,4 +109,73 @@ public class TomP2PAddressService implements AddressService {
});
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private void setupTimerForStoreAddress() {
timerForStoreAddress = new Timer();
timerForStoreAddress.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
if (storedPeerAddress != null && peerDHT != null && !storedPeerAddress.equals(peerDHT.peerAddress()))
storeAddress();
}
}, STORE_ADDRESS_PERIOD, STORE_ADDRESS_PERIOD);
}
private void setupTimerForIPCheck() {
timerForIPCheck = new Timer();
timerForIPCheck.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
if (storedPeerAddress != null && peerDHT != null && !storedPeerAddress.equals(peerDHT.peerAddress()))
storeAddress();
}
}, IP_CHECK_PERIOD, IP_CHECK_PERIOD);
}
private void storeAddress() {
try {
Data data = new Data(new TomP2PPeer(peerDHT.peerAddress()));
// We set a short time-to-live to make getAddress checks fail fast in case if the offerer is offline and to support cheap offerbook state updates
data.ttlSeconds(ADDRESS_TTL);
log.debug("storePeerAddress " + peerDHT.peerAddress().toString());
FuturePut futurePut = putDomainProtectedData(locationKey, data);
futurePut.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
if (future.isSuccess()) {
storedPeerAddress = peerDHT.peerAddress();
log.debug("storedPeerAddress = " + storedPeerAddress);
}
else {
log.error("storedPeerAddress not successful");
throw new NetworkException("Storing address was not successful. Reason: " + future.failedReason());
}
}
@Override
public void exceptionCaught(Throwable t) throws Exception {
log.error("Exception at storedPeerAddress " + t.toString());
throw new NetworkException("Exception at storeAddress.", t);
}
});
} catch (IOException e) {
e.printStackTrace();
log.error("Exception at storePeerAddress " + e.toString());
}
}
private void removeAddress() {
try {
Data data = new Data(new TomP2PPeer(peerDHT.peerAddress()));
removeFromDataMap(locationKey, data).awaitUninterruptibly();
} catch (IOException e) {
e.printStackTrace();
log.error("Exception at removeAddress " + e.toString());
}
}
}

View File

@ -0,0 +1,175 @@
/*
* This file is part of Bitsquare.
*
* Bitsquare is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bitsquare is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.network.tomp2p;
import io.bitsquare.network.DHTService;
import java.security.PublicKey;
import javax.inject.Inject;
import net.tomp2p.dht.FutureGet;
import net.tomp2p.dht.FuturePut;
import net.tomp2p.dht.FutureRemove;
import net.tomp2p.peers.Number160;
import net.tomp2p.storage.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TomP2PDHTService extends TomP2PService implements DHTService {
private static final Logger log = LoggerFactory.getLogger(TomP2PDHTService.class);
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
@Inject
public TomP2PDHTService(TomP2PNode tomP2PNode) {
super(tomP2PNode);
}
///////////////////////////////////////////////////////////////////////////////////////////
// DHT methods
///////////////////////////////////////////////////////////////////////////////////////////
// TODO remove all security features for the moment. There are some problems with a "wrong signature!" msg in
// the logs
@Override
public FuturePut putDomainProtectedData(Number160 locationKey, Data data) {
log.trace("putDomainProtectedData");
return peerDHT.put(locationKey).data(data).start();
}
@Override
public FuturePut putData(Number160 locationKey, Data data) {
log.trace("putData");
return peerDHT.put(locationKey).data(data).start();
}
@Override
public FutureGet getDomainProtectedData(Number160 locationKey, PublicKey publicKey) {
log.trace("getDomainProtectedData");
return peerDHT.get(locationKey).start();
}
@Override
public FutureGet getData(Number160 locationKey) {
//log.trace("getData");
return peerDHT.get(locationKey).start();
}
@Override
public FuturePut addProtectedData(Number160 locationKey, Data data) {
log.trace("addProtectedData");
return peerDHT.add(locationKey).data(data).start();
}
@Override
public FutureRemove removeFromDataMap(Number160 locationKey, Data data) {
Number160 contentKey = data.hash();
log.trace("removeFromDataMap with contentKey " + contentKey.toString());
return peerDHT.remove(locationKey).contentKey(contentKey).start();
}
@Override
public FutureGet getDataMap(Number160 locationKey) {
log.trace("getDataMap");
return peerDHT.get(locationKey).all().start();
}
//
// public FuturePut putDomainProtectedData(Number160 locationKey, Data data) {
// log.trace("putDomainProtectedData");
// data.protectEntry(keyPair);
// final Number160 ownerKeyHash = Utils.makeSHAHash(keyPair.getPublic().getEncoded());
// return peerDHT.put(locationKey).data(data).keyPair(keyPair).domainKey(ownerKeyHash).protectDomain().start();
// }
//
// // No protection, everybody can write.
// public FuturePut putData(Number160 locationKey, Data data) {
// log.trace("putData");
// return peerDHT.put(locationKey).data(data).start();
// }
//
// // Not public readable. Only users with the public key of the peer who stored the data can read that data
// public FutureGet getDomainProtectedData(Number160 locationKey, PublicKey publicKey) {
// log.trace("getDomainProtectedData");
// final Number160 ownerKeyHash = Utils.makeSHAHash(publicKey.getEncoded());
// return peerDHT.get(locationKey).domainKey(ownerKeyHash).start();
// }
//
// // No protection, everybody can read.
// public FutureGet getData(Number160 locationKey) {
// log.trace("getData");
// return peerDHT.get(locationKey).start();
// }
//
// // No domain protection, but entry protection
// public FuturePut addProtectedData(Number160 locationKey, Data data) {
// log.trace("addProtectedData");
// data.protectEntry(keyPair);
// log.trace("addProtectedData with contentKey " + data.hash().toString());
// return peerDHT.add(locationKey).data(data).keyPair(keyPair).start();
// }
//
// // No domain protection, but entry protection
// public FutureRemove removeFromDataMap(Number160 locationKey, Data data) {
// log.trace("removeFromDataMap");
// Number160 contentKey = data.hash();
// log.trace("removeFromDataMap with contentKey " + contentKey.toString());
// return peerDHT.remove(locationKey).contentKey(contentKey).keyPair(keyPair).start();
// }
//
// // Public readable
// public FutureGet getDataMap(Number160 locationKey) {
// log.trace("getDataMap");
// return peerDHT.get(locationKey).all().start();
// }
// Send signed payLoad to peer
// public FutureDirect sendData(PeerAddress peerAddress, Object payLoad) {
// // use 30 seconds as max idle time before connection get closed
// FuturePeerConnection futurePeerConnection = peerDHT.peer().createPeerConnection(peerAddress, 30000);
// FutureDirect futureDirect = peerDHT.peer().sendDirect(futurePeerConnection).object(payLoad).sign().start();
// futureDirect.addListener(new BaseFutureListener<BaseFuture>() {
// @Override
// public void operationComplete(BaseFuture future) throws Exception {
// if (futureDirect.isSuccess()) {
// log.debug("sendMessage completed");
// }
// else {
// log.error("sendData failed with Reason " + futureDirect.failedReason());
// }
// }
//
// @Override
// public void exceptionCaught(Throwable t) throws Exception {
// log.error("Exception at sendData " + t.toString());
// }
// });
//
// return futureDirect;
// }
//
}

View File

@ -24,7 +24,8 @@ import io.bitsquare.network.Peer;
import io.bitsquare.network.listener.SendMessageListener;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import javax.inject.Inject;
import net.tomp2p.futures.BaseFuture;
import net.tomp2p.futures.BaseFutureListener;
@ -33,50 +34,37 @@ import net.tomp2p.futures.FutureDirect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* That service delivers direct messaging and DHT functionality from the TomP2P library
* It is the translating domain specific functionality to the messaging layer.
* The TomP2P library codebase shall not be used outside that service.
* That way we limit the dependency of the TomP2P library only to that class (and it's sub components).
* <p/>
*/
public class TomP2PMessageService implements MessageService {
public class TomP2PMessageService extends TomP2PService implements MessageService {
private static final Logger log = LoggerFactory.getLogger(TomP2PMessageService.class);
private final TomP2PNode tomP2PNode;
private final CopyOnWriteArrayList<MessageHandler> messageHandlers = new CopyOnWriteArrayList<>();
private Executor executor;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
@Inject
public TomP2PMessageService(TomP2PNode tomP2PNode) {
this.tomP2PNode = tomP2PNode;
}
@Override
public void setExecutor(Executor executor) {
this.executor = executor;
super(tomP2PNode);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Messages
// MessageService implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void sendMessage(Peer peer, Message message, SendMessageListener listener) {
if (!(peer instanceof TomP2PPeer)) {
throw new IllegalArgumentException("peer must be of type TomP2PPeer");
}
FutureDirect futureDirect = tomP2PNode.sendData(((TomP2PPeer) peer).getPeerAddress(), message);
if (!(peer instanceof TomP2PPeer))
throw new IllegalArgumentException("Peer must be of type TomP2PPeer");
FutureDirect futureDirect = peerDHT.peer().sendDirect(((TomP2PPeer) peer).getPeerAddress()).object(message).start();
futureDirect.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
if (future.isSuccess()) {
log.debug("sendMessage completed");
executor.execute(listener::handleResult);
}
else {
@ -87,37 +75,34 @@ public class TomP2PMessageService implements MessageService {
@Override
public void exceptionCaught(Throwable t) throws Exception {
log.error("Exception at sendMessage " + t.toString());
executor.execute(listener::handleFault);
}
});
}
///////////////////////////////////////////////////////////////////////////////////////////
// Event Listeners
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void addMessageHandler(MessageHandler listener) {
if (!messageHandlers.add(listener))
throw new RuntimeException("Add listener did not change list. Probably listener has been already added.");
throw new IllegalArgumentException("Add listener did not change list. Probably listener has been already added.");
}
@Override
public void removeMessageHandler(MessageHandler listener) {
if (!messageHandlers.remove(listener))
throw new RuntimeException("Try to remove listener which was never added.");
throw new IllegalArgumentException("Try to remove listener which was never added.");
}
///////////////////////////////////////////////////////////////////////////////////////////
// Incoming message handler
// MessageHandler implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void handleMessage(Message message, Peer sender) {
if (sender instanceof TomP2PPeer) {
if (sender instanceof TomP2PPeer)
executor.execute(() -> messageHandlers.stream().forEach(e -> e.handleMessage((Message) message, sender)));
}
else
throw new IllegalArgumentException("Peer must be of type TomP2PPeer");
}
}

View File

@ -17,22 +17,17 @@
package io.bitsquare.network.tomp2p;
import io.bitsquare.network.AddressService;
import io.bitsquare.network.BootstrapNodes;
import io.bitsquare.network.ClientNode;
import io.bitsquare.network.AddressService;
import io.bitsquare.network.MessageService;
import io.bitsquare.network.NetworkModule;
import io.bitsquare.network.Node;
import com.google.inject.Injector;
import com.google.inject.Provider;
import com.google.inject.Singleton;
import com.google.inject.name.Names;
import javax.inject.Inject;
import javafx.application.Platform;
import org.springframework.core.env.Environment;
import static io.bitsquare.network.tomp2p.BootstrappedPeerBuilder.*;
@ -50,10 +45,14 @@ public class TomP2PNetworkModule extends NetworkModule {
@Override
protected void doConfigure() {
// Used both ClientNode and TomP2PNode for injection
bind(ClientNode.class).to(TomP2PNode.class).in(Singleton.class);
bind(TomP2PNode.class).in(Singleton.class);
bind(MessageService.class).toProvider(TomP2PMessageServiceProvider.class).in(Singleton.class);
bind(AddressService.class).toProvider(TomP2PAddressServiceProvider.class).in(Singleton.class);
bind(BootstrappedPeerBuilder.class).in(Singleton.class);
bind(AddressService.class).to(TomP2PAddressService.class).in(Singleton.class);
bind(MessageService.class).to(TomP2PMessageService.class).in(Singleton.class);
bind(int.class).annotatedWith(Names.named(Node.PORT_KEY)).toInstance(env.getProperty(Node.PORT_KEY, int.class, Node.DEFAULT_PORT));
bind(boolean.class).annotatedWith(Names.named(USE_MANUAL_PORT_FORWARDING_KEY)).toInstance(
@ -66,43 +65,14 @@ public class TomP2PNetworkModule extends NetworkModule {
)
);
bindConstant().annotatedWith(Names.named(NETWORK_INTERFACE_KEY)).to(env.getProperty(NETWORK_INTERFACE_KEY, NETWORK_INTERFACE_UNSPECIFIED));
bind(BootstrappedPeerBuilder.class).in(Singleton.class);
}
@Override
protected void doClose(Injector injector) {
super.doClose(injector);
// First shut down TomP2PNode to remove address from DHT
injector.getInstance(TomP2PNode.class).shutDown();
// First shut down AddressService to remove address from DHT
injector.getInstance(AddressService.class).shutDown();
injector.getInstance(BootstrappedPeerBuilder.class).shutDown();
}
}
class TomP2PMessageServiceProvider implements Provider<MessageService> {
private final MessageService messageService;
@Inject
public TomP2PMessageServiceProvider(TomP2PNode tomP2PNode) {
messageService = new TomP2PMessageService(tomP2PNode);
messageService.setExecutor(Platform::runLater);
}
public MessageService get() {
return messageService;
}
}
class TomP2PAddressServiceProvider implements Provider<AddressService> {
private final AddressService addressService;
@Inject
public TomP2PAddressServiceProvider(TomP2PNode tomP2PNode) {
addressService = new TomP2PAddressService(tomP2PNode);
addressService.setExecutor(Platform::runLater);
}
public AddressService get() {
return addressService;
}
}

View File

@ -23,36 +23,20 @@ import io.bitsquare.network.ClientNode;
import io.bitsquare.network.ConnectionType;
import io.bitsquare.network.Message;
import io.bitsquare.network.MessageHandler;
import io.bitsquare.network.NetworkException;
import io.bitsquare.network.Node;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.security.KeyPair;
import java.security.PublicKey;
import java.util.Timer;
import java.util.TimerTask;
import javax.annotation.Nullable;
import javax.inject.Inject;
import net.tomp2p.dht.FutureGet;
import net.tomp2p.dht.FuturePut;
import net.tomp2p.dht.FutureRemove;
import net.tomp2p.dht.PeerDHT;
import net.tomp2p.futures.BaseFuture;
import net.tomp2p.futures.BaseFutureListener;
import net.tomp2p.futures.FutureDirect;
import net.tomp2p.peers.Number160;
import net.tomp2p.peers.PeerAddress;
import net.tomp2p.storage.Data;
import net.tomp2p.utils.Utils;
import org.jetbrains.annotations.NotNull;
@ -63,26 +47,12 @@ import rx.Observable;
import rx.subjects.BehaviorSubject;
import rx.subjects.Subject;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* The fully bootstrapped P2PNode which is responsible himself for his availability in the messaging system. It saves
* for instance the IP address periodically.
* This class is offering generic functionality of TomP2P needed for Bitsquare, like data and domain protection.
* It does not handle any domain aspects of Bitsquare.
*/
public class TomP2PNode implements ClientNode {
private static final Logger log = LoggerFactory.getLogger(TomP2PNode.class);
private static final int IP_CHECK_PERIOD = 2 * 60 * 1000; // Cheap call if nothing changes, so set it short to 2 min.
private static final int STORE_ADDRESS_PERIOD = 5 * 60 * 1000; // Save every 5 min.
private static final int ADDRESS_TTL = STORE_ADDRESS_PERIOD * 2; // TTL 10 min.
private KeyPair keyPair;
private PeerAddress storedPeerAddress;
private PeerDHT peerDHT;
private BootstrappedPeerBuilder bootstrappedPeerBuilder;
private Timer timerForStoreAddress;
private Timer timerForIPCheck;
private final Subject<BootstrapState, BootstrapState> bootstrapStateSubject;
///////////////////////////////////////////////////////////////////////////////////////////
@ -92,26 +62,24 @@ public class TomP2PNode implements ClientNode {
@Inject
public TomP2PNode(BootstrappedPeerBuilder bootstrappedPeerBuilder) {
this.bootstrappedPeerBuilder = bootstrappedPeerBuilder;
bootstrapStateSubject = BehaviorSubject.create();
}
// for unit testing
TomP2PNode(KeyPair keyPair, PeerDHT peerDHT) {
this.keyPair = keyPair;
this.peerDHT = peerDHT;
peerDHT.peerBean().keyPair(keyPair);
bootstrapStateSubject = BehaviorSubject.create();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Public methods
///////////////////////////////////////////////////////////////////////////////////////////
public Observable<BootstrapState> bootstrap(KeyPair keyPair, MessageHandler messageHandler) {
checkNotNull(keyPair, "keyPair must not be null.");
this.keyPair = keyPair;
bootstrappedPeerBuilder.setKeyPair(keyPair);
Subject<BootstrapState, BootstrapState> bootstrapStateSubject = BehaviorSubject.create();
bootstrappedPeerBuilder.getBootstrapState().addListener((ov, oldValue, newValue) -> {
log.debug("BootstrapState changed " + newValue);
@ -124,14 +92,7 @@ public class TomP2PNode implements ClientNode {
public void onSuccess(@Nullable PeerDHT peerDHT) {
if (peerDHT != null) {
TomP2PNode.this.peerDHT = peerDHT;
setupTimerForIPCheck();
setupTimerForStoreAddress();
setupReplyHandler(messageHandler);
try {
storeAddress();
} catch (NetworkException e) {
bootstrapStateSubject.onError(e);
}
bootstrapStateSubject.onCompleted();
}
else {
@ -150,12 +111,13 @@ public class TomP2PNode implements ClientNode {
return bootstrapStateSubject.asObservable();
}
void shutDown() {
timerForIPCheck.cancel();
timerForStoreAddress.cancel();
removeAddress();
public Observable<BootstrapState> getBootstrapStateAsObservable() {
return bootstrapStateSubject.asObservable();
}
public PeerDHT getPeerDHT() {
return peerDHT;
}
@Override
public ConnectionType getConnectionType() {
@ -189,147 +151,6 @@ public class TomP2PNode implements ClientNode {
}
///////////////////////////////////////////////////////////////////////////////////////////
// Generic DHT methods
///////////////////////////////////////////////////////////////////////////////////////////
// TODO remove all security features for the moment. There are some problems with a "wrong signature!" msg in
// the logs
public FuturePut putDomainProtectedData(Number160 locationKey, Data data) {
log.trace("putDomainProtectedData");
return peerDHT.put(locationKey).data(data).start();
}
public FuturePut putData(Number160 locationKey, Data data) {
log.trace("putData");
return peerDHT.put(locationKey).data(data).start();
}
public FutureGet getDomainProtectedData(Number160 locationKey, PublicKey publicKey) {
log.trace("getDomainProtectedData");
return peerDHT.get(locationKey).start();
}
public FutureGet getData(Number160 locationKey) {
//log.trace("getData");
return peerDHT.get(locationKey).start();
}
public FuturePut addProtectedData(Number160 locationKey, Data data) {
log.trace("addProtectedData");
return peerDHT.add(locationKey).data(data).start();
}
public FutureRemove removeFromDataMap(Number160 locationKey, Data data) {
Number160 contentKey = data.hash();
log.trace("removeFromDataMap with contentKey " + contentKey.toString());
return peerDHT.remove(locationKey).contentKey(contentKey).start();
}
public FutureGet getDataMap(Number160 locationKey) {
log.trace("getDataMap");
return peerDHT.get(locationKey).all().start();
}
public FutureDirect sendData(PeerAddress peerAddress, Object payLoad) {
log.trace("sendData");
FutureDirect futureDirect = peerDHT.peer().sendDirect(peerAddress).object(payLoad).start();
futureDirect.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
if (future.isSuccess()) {
log.debug("sendMessage completed");
}
else {
log.error("sendData failed with Reason " + futureDirect.failedReason());
}
}
@Override
public void exceptionCaught(Throwable t) throws Exception {
log.error("Exception at sendData " + t.toString());
}
});
return futureDirect;
}
//
// public FuturePut putDomainProtectedData(Number160 locationKey, Data data) {
// log.trace("putDomainProtectedData");
// data.protectEntry(keyPair);
// final Number160 ownerKeyHash = Utils.makeSHAHash(keyPair.getPublic().getEncoded());
// return peerDHT.put(locationKey).data(data).keyPair(keyPair).domainKey(ownerKeyHash).protectDomain().start();
// }
//
// // No protection, everybody can write.
// public FuturePut putData(Number160 locationKey, Data data) {
// log.trace("putData");
// return peerDHT.put(locationKey).data(data).start();
// }
//
// // Not public readable. Only users with the public key of the peer who stored the data can read that data
// public FutureGet getDomainProtectedData(Number160 locationKey, PublicKey publicKey) {
// log.trace("getDomainProtectedData");
// final Number160 ownerKeyHash = Utils.makeSHAHash(publicKey.getEncoded());
// return peerDHT.get(locationKey).domainKey(ownerKeyHash).start();
// }
//
// // No protection, everybody can read.
// public FutureGet getData(Number160 locationKey) {
// log.trace("getData");
// return peerDHT.get(locationKey).start();
// }
//
// // No domain protection, but entry protection
// public FuturePut addProtectedData(Number160 locationKey, Data data) {
// log.trace("addProtectedData");
// data.protectEntry(keyPair);
// log.trace("addProtectedData with contentKey " + data.hash().toString());
// return peerDHT.add(locationKey).data(data).keyPair(keyPair).start();
// }
//
// // No domain protection, but entry protection
// public FutureRemove removeFromDataMap(Number160 locationKey, Data data) {
// log.trace("removeFromDataMap");
// Number160 contentKey = data.hash();
// log.trace("removeFromDataMap with contentKey " + contentKey.toString());
// return peerDHT.remove(locationKey).contentKey(contentKey).keyPair(keyPair).start();
// }
//
// // Public readable
// public FutureGet getDataMap(Number160 locationKey) {
// log.trace("getDataMap");
// return peerDHT.get(locationKey).all().start();
// }
// Send signed payLoad to peer
// public FutureDirect sendData(PeerAddress peerAddress, Object payLoad) {
// // use 30 seconds as max idle time before connection get closed
// FuturePeerConnection futurePeerConnection = peerDHT.peer().createPeerConnection(peerAddress, 30000);
// FutureDirect futureDirect = peerDHT.peer().sendDirect(futurePeerConnection).object(payLoad).sign().start();
// futureDirect.addListener(new BaseFutureListener<BaseFuture>() {
// @Override
// public void operationComplete(BaseFuture future) throws Exception {
// if (futureDirect.isSuccess()) {
// log.debug("sendMessage completed");
// }
// else {
// log.error("sendData failed with Reason " + futureDirect.failedReason());
// }
// }
//
// @Override
// public void exceptionCaught(Throwable t) throws Exception {
// log.error("Exception at sendData " + t.toString());
// }
// });
//
// return futureDirect;
// }
//
///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////
@ -353,79 +174,5 @@ public class TomP2PNode implements ClientNode {
});
}
private void setupTimerForStoreAddress() {
timerForStoreAddress = new Timer();
timerForStoreAddress.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
if (storedPeerAddress != null && peerDHT != null && !storedPeerAddress.equals(peerDHT.peerAddress()))
try {
storeAddress();
} catch (NetworkException e) {
e.printStackTrace();
}
}
}, STORE_ADDRESS_PERIOD, STORE_ADDRESS_PERIOD);
}
private void setupTimerForIPCheck() {
timerForIPCheck = new Timer();
timerForIPCheck.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
if (storedPeerAddress != null && peerDHT != null && !storedPeerAddress.equals(peerDHT.peerAddress()))
try {
storeAddress();
} catch (NetworkException e) {
e.printStackTrace();
}
}
}, IP_CHECK_PERIOD, IP_CHECK_PERIOD);
}
private void storeAddress() throws NetworkException {
try {
Number160 locationKey = Utils.makeSHAHash(keyPair.getPublic().getEncoded());
Data data = new Data(new TomP2PPeer(peerDHT.peerAddress()));
// We set a short time-to-live to make getAddress checks fail fast in case if the offerer is offline and to support cheap offerbook state updates
data.ttlSeconds(ADDRESS_TTL);
log.debug("storePeerAddress " + peerDHT.peerAddress().toString());
FuturePut futurePut = putDomainProtectedData(locationKey, data);
futurePut.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
if (future.isSuccess()) {
storedPeerAddress = peerDHT.peerAddress();
log.debug("storedPeerAddress = " + storedPeerAddress);
}
else {
log.error("storedPeerAddress not successful");
throw new NetworkException("Storing address was not successful. Reason: " + future.failedReason());
}
}
@Override
public void exceptionCaught(Throwable t) throws Exception {
log.error("Exception at storedPeerAddress " + t.toString());
throw new NetworkException("Exception at storeAddress.", t);
}
});
} catch (IOException e) {
e.printStackTrace();
log.error("Exception at storePeerAddress " + e.toString());
throw new NetworkException("Exception at storeAddress.", e);
}
}
private void removeAddress() {
try {
Number160 locationKey = Utils.makeSHAHash(keyPair.getPublic().getEncoded());
Data data = new Data(new TomP2PPeer(peerDHT.peerAddress()));
removeFromDataMap(locationKey, data).awaitUninterruptibly(2000); // give it max. 2 sec. to remove the address at shut down
} catch (IOException e) {
e.printStackTrace();
log.error("Exception at removeAddress " + e.toString());
}
}
}

View File

@ -0,0 +1,96 @@
/*
* This file is part of Bitsquare.
*
* Bitsquare is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bitsquare is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.network.tomp2p;
import io.bitsquare.network.BootstrapState;
import io.bitsquare.network.NetworkService;
import java.util.concurrent.Executor;
import javax.inject.Inject;
import javafx.application.Platform;
import net.tomp2p.dht.PeerDHT;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
/**
* That service delivers direct messaging and DHT functionality from the TomP2P library
* It is the translating domain specific functionality to the messaging layer.
* The TomP2P library codebase shall not be used outside that service.
* That way we limit the dependency of the TomP2P library only to that class (and it's sub components).
* <p/>
*/
public class TomP2PService implements NetworkService {
private static final Logger log = LoggerFactory.getLogger(TomP2PService.class);
private final Subscriber<BootstrapState> subscriber;
protected Executor executor = Platform::runLater;
protected PeerDHT peerDHT;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
@Inject
public TomP2PService(TomP2PNode tomP2PNode) {
Observable<BootstrapState> bootstrapStateAsObservable = tomP2PNode.getBootstrapStateAsObservable();
subscriber = new Subscriber<BootstrapState>() {
@Override
public void onCompleted() {
executor.execute(() -> {
peerDHT = tomP2PNode.getPeerDHT();
subscriber.unsubscribe();
bootstrapCompleted();
});
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(BootstrapState bootstrapState) {
}
};
bootstrapStateAsObservable.subscribe(subscriber);
}
@Override
public void bootstrapCompleted() {
}
@Override
public void setExecutor(Executor executor) {
this.executor = executor;
}
@Override
public void shutDown() {
}
}

View File

@ -19,15 +19,13 @@ package io.bitsquare.offer;
import io.bitsquare.common.handlers.FaultHandler;
import io.bitsquare.common.handlers.ResultHandler;
import io.bitsquare.network.DHTService;
import java.util.List;
import java.util.concurrent.Executor;
import javafx.beans.property.LongProperty;
public interface OfferBookService {
void setExecutor(Executor executor);
public interface OfferBookService extends DHTService {
void getOffers(String fiatCode);

View File

@ -19,6 +19,7 @@ package io.bitsquare.offer.tomp2p;
import io.bitsquare.common.handlers.FaultHandler;
import io.bitsquare.common.handlers.ResultHandler;
import io.bitsquare.network.tomp2p.TomP2PDHTService;
import io.bitsquare.network.tomp2p.TomP2PNode;
import io.bitsquare.offer.Offer;
import io.bitsquare.offer.OfferBookService;
@ -28,7 +29,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import javafx.beans.property.LongProperty;
import javafx.beans.property.SimpleLongProperty;
@ -46,22 +46,16 @@ import net.tomp2p.storage.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TomP2POfferBookService implements OfferBookService {
public class TomP2POfferBookService extends TomP2PDHTService implements OfferBookService {
private static final Logger log = LoggerFactory.getLogger(TomP2POfferBookService.class);
private final List<Listener> offerRepositoryListeners = new ArrayList<>();
private final LongProperty invalidationTimestamp = new SimpleLongProperty(0);
private final TomP2PNode tomP2PNode;
private Executor executor;
public TomP2POfferBookService(TomP2PNode tomP2PNode) {
this.tomP2PNode = tomP2PNode;
}
public void setExecutor(Executor executor) {
this.executor = executor;
super(tomP2PNode);
}
@Override
@ -75,7 +69,7 @@ public class TomP2POfferBookService implements OfferBookService {
offerData.ttlSeconds(defaultOfferTTL);
log.trace("Add offer to DHT requested. Added data: [locationKey: " + locationKey +
", hash: " + offerData.hash().toString() + "]");
FuturePut futurePut = tomP2PNode.addProtectedData(locationKey, offerData);
FuturePut futurePut = addProtectedData(locationKey, offerData);
futurePut.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
@ -118,7 +112,7 @@ public class TomP2POfferBookService implements OfferBookService {
final Data offerData = new Data(offer);
log.trace("Remove offer from DHT requested. Removed data: [locationKey: " + locationKey +
", hash: " + offerData.hash().toString() + "]");
FutureRemove futureRemove = tomP2PNode.removeFromDataMap(locationKey, offerData);
FutureRemove futureRemove = removeFromDataMap(locationKey, offerData);
futureRemove.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
@ -163,7 +157,7 @@ public class TomP2POfferBookService implements OfferBookService {
public void getOffers(String currencyCode) {
Number160 locationKey = Number160.createHash(currencyCode);
log.trace("Get offers from DHT requested for locationKey: " + locationKey);
FutureGet futureGet = tomP2PNode.getDataMap(locationKey);
FutureGet futureGet = getDataMap(locationKey);
futureGet.addListener(new BaseFutureAdapter<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
@ -227,7 +221,7 @@ public class TomP2POfferBookService implements OfferBookService {
private void writeInvalidationTimestampToDHT(String currencyCode) {
invalidationTimestamp.set(System.currentTimeMillis());
try {
FuturePut putFuture = tomP2PNode.putData(getInvalidatedLocationKey(currencyCode),
FuturePut putFuture = putData(getInvalidatedLocationKey(currencyCode),
new Data(invalidationTimestamp.get()));
putFuture.addListener(new BaseFutureListener<BaseFuture>() {
@Override
@ -254,7 +248,7 @@ public class TomP2POfferBookService implements OfferBookService {
}
public void requestInvalidationTimeStampFromDHT(String currencyCode) {
FutureGet futureGet = tomP2PNode.getData(getInvalidatedLocationKey(currencyCode));
FutureGet futureGet = getData(getInvalidatedLocationKey(currencyCode));
futureGet.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {

View File

@ -30,12 +30,7 @@ public class TradeModule extends BitsquareModule {
}
@Override
protected final void configure() {
doConfigure();
protected void configure() {
bind(TradeManager.class).in(Singleton.class);
}
protected void doConfigure() {
}
}

View File

@ -1,421 +0,0 @@
/*
* This file is part of Bitsquare.
*
* Bitsquare is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bitsquare is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.network.tomp2p;
import java.io.IOException;
import java.security.KeyPair;
import java.security.KeyPairGenerator;
import java.util.Random;
import net.tomp2p.connection.Ports;
import net.tomp2p.dht.FutureGet;
import net.tomp2p.dht.FuturePut;
import net.tomp2p.dht.FutureRemove;
import net.tomp2p.dht.PeerBuilderDHT;
import net.tomp2p.dht.PeerDHT;
import net.tomp2p.dht.UtilsDHT2;
import net.tomp2p.futures.FutureDirect;
import net.tomp2p.p2p.PeerBuilder;
import net.tomp2p.peers.Number160;
import net.tomp2p.peers.PeerAddress;
import net.tomp2p.rpc.ObjectDataReply;
import net.tomp2p.storage.Data;
import net.tomp2p.utils.Utils;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import static org.junit.Assert.*;
// TODO Reactivate tests when TomP2PNode is using original code again. we deactivated the security features atm.
// cause IOException: Not listening to anything. Maybe your binding information is wrong.
// investigate what has broken it, probably from update to latest head
@Ignore
public class TomP2PNodeTest {
final private static Random rnd = new Random(42L);
@Test
public void testSendData() throws Exception {
PeerDHT[] peers = UtilsDHT2.createNodes(3, rnd, new Ports().tcpPort());
PeerDHT master = peers[0];
PeerDHT client = peers[1];
PeerDHT otherPeer = peers[2];
UtilsDHT2.perfectRouting(peers);
for (final PeerDHT peer : peers) {
peer.peer().objectDataReply(new ObjectDataReply() {
@Override
public Object reply(PeerAddress sender, Object request) throws Exception {
return true;
}
});
}
final KeyPairGenerator keyGen = KeyPairGenerator.getInstance("DSA");
keyGen.initialize(1024);
KeyPair keyPairClient = keyGen.genKeyPair();
KeyPair keyPairOtherPeer = keyGen.genKeyPair();
TomP2PNode node;
Number160 locationKey;
Object object;
FutureDirect futureDirect;
node = new TomP2PNode(keyPairClient, client);
object = "clients data";
futureDirect = node.sendData(otherPeer.peerAddress(), object);
futureDirect.awaitUninterruptibly();
assertTrue(futureDirect.isSuccess());
// we return true from objectDataReply
assertTrue((Boolean) futureDirect.object());
master.shutdown();
}
@Test
public void testProtectedPutGet() throws Exception {
PeerDHT[] peers = UtilsDHT2.createNodes(3, rnd, new Ports().tcpPort());
PeerDHT master = peers[0];
PeerDHT client = peers[1];
PeerDHT otherPeer = peers[2];
UtilsDHT2.perfectRouting(peers);
final KeyPairGenerator keyGen = KeyPairGenerator.getInstance("DSA");
keyGen.initialize(1024);
KeyPair keyPairClient = keyGen.genKeyPair();
KeyPair keyPairOtherPeer = keyGen.genKeyPair();
TomP2PNode node;
Number160 locationKey;
Data data;
FuturePut futurePut;
FutureGet futureGet;
// otherPeer tries to squat clients location store
// he can do it but as he has not the domain key of the client he cannot do any harm
// he only can store und that path: locationKey.otherPeerDomainKey.data
node = new TomP2PNode(keyPairOtherPeer, otherPeer);
locationKey = Number160.createHash("clients location");
data = new Data("otherPeer data");
futurePut = node.putDomainProtectedData(locationKey, data);
futurePut.awaitUninterruptibly();
assertTrue(futurePut.isSuccess());
futureGet = node.getDomainProtectedData(locationKey, keyPairOtherPeer.getPublic());
futureGet.awaitUninterruptibly();
assertTrue(futureGet.isSuccess());
assertEquals("otherPeer data", futureGet.data().object());
// client store his data und his domainkey, no problem with previous occupied
// he only can store und that path: locationKey.clientDomainKey.data
node = new TomP2PNode(keyPairClient, client);
locationKey = Number160.createHash("clients location");
data = new Data("client data");
futurePut = node.putDomainProtectedData(locationKey, data);
futurePut.awaitUninterruptibly();
assertTrue(futurePut.isSuccess());
futureGet = node.getDomainProtectedData(locationKey, keyPairClient.getPublic());
futureGet.awaitUninterruptibly();
assertTrue(futureGet.isSuccess());
assertEquals("client data", futureGet.data().object());
// also other peers can read that data if they know the public key of the client
node = new TomP2PNode(keyPairOtherPeer, otherPeer);
futureGet = node.getDomainProtectedData(locationKey, keyPairClient.getPublic());
futureGet.awaitUninterruptibly();
assertTrue(futureGet.isSuccess());
assertEquals("client data", futureGet.data().object());
// other peer try to use pub key of other peer as domain key hash.
// must fail as he don't have the full key pair (private key of client missing)
locationKey = Number160.createHash("clients location");
data = new Data("otherPeer data hack");
data.protectEntry(keyPairOtherPeer);
// he use the pub key from the client
final Number160 keyHash = Utils.makeSHAHash(keyPairClient.getPublic().getEncoded());
futurePut = otherPeer.put(locationKey).data(data).keyPair(keyPairOtherPeer).domainKey(keyHash)
.protectDomain().start();
futurePut.awaitUninterruptibly();
assertFalse(futurePut.isSuccess());
// he can read his prev. stored data
node = new TomP2PNode(keyPairOtherPeer, otherPeer);
futureGet = node.getDomainProtectedData(locationKey, keyPairOtherPeer.getPublic());
futureGet.awaitUninterruptibly();
assertTrue(futureGet.isSuccess());
assertEquals("otherPeer data", futureGet.data().object());
// he can read clients data
futureGet = node.getDomainProtectedData(locationKey, keyPairClient.getPublic());
futureGet.awaitUninterruptibly();
assertTrue(futureGet.isSuccess());
assertEquals("client data", futureGet.data().object());
master.shutdown();
}
@Test
public void testChangeEntryProtectionKey() throws Exception {
KeyPairGenerator gen = KeyPairGenerator.getInstance("DSA");
KeyPair keyPair1 = gen.generateKeyPair();
KeyPair keyPair2 = gen.generateKeyPair();
PeerDHT p1 = new PeerBuilderDHT(new PeerBuilder(Number160.createHash(1)).ports(4838)
.keyPair(keyPair1).start()).start();
PeerDHT p2 = new PeerBuilderDHT(new PeerBuilder(Number160.createHash(2)).ports(4839)
.keyPair(keyPair2).start()).start();
p2.peer().bootstrap().peerAddress(p1.peerAddress()).start().awaitUninterruptibly();
p1.peer().bootstrap().peerAddress(p2.peerAddress()).start().awaitUninterruptibly();
Data data = new Data("test").protectEntry(keyPair1);
FuturePut fp1 = p1.put(Number160.createHash("key1")).sign().data(data).start().awaitUninterruptibly();
Assert.assertTrue(fp1.isSuccess());
FuturePut fp2 = p2.put(Number160.createHash("key1")).data(data).start().awaitUninterruptibly();
Assert.assertTrue(!fp2.isSuccess());
Data data2 = new Data().protectEntry(keyPair2);
data2.publicKey(keyPair2.getPublic());
FuturePut fp3 =
p1.put(Number160.createHash("key1")).sign().putMeta().data(data2).start().awaitUninterruptibly();
Assert.assertTrue(fp3.isSuccess());
FuturePut fp4 = p2.put(Number160.createHash("key1")).sign().data(data).start().awaitUninterruptibly();
Assert.assertTrue(fp4.isSuccess());
p1.shutdown().awaitUninterruptibly();
p2.shutdown().awaitUninterruptibly();
}
@Test
public void testAddToListGetList() throws Exception {
PeerDHT[] peers = UtilsDHT2.createNodes(3, rnd, new Ports().tcpPort());
PeerDHT master = peers[0];
PeerDHT client = peers[1];
PeerDHT otherPeer = peers[2];
UtilsDHT2.perfectRouting(peers);
TomP2PNode node;
final KeyPairGenerator keyGen = KeyPairGenerator.getInstance("DSA");
keyGen.initialize(1024);
KeyPair keyPairClient = keyGen.genKeyPair();
KeyPair keyPairOtherPeer = keyGen.genKeyPair();
Number160 locationKey;
Data data;
FuturePut futurePut;
FutureGet futureGet;
// client add a value
KeyPairGenerator gen = KeyPairGenerator.getInstance("DSA");
KeyPair keyPair1 = gen.generateKeyPair();
keyPairClient = keyPair1;
node = new TomP2PNode(keyPairClient, client);
locationKey = Number160.createHash("add to list clients location");
data = new Data("add to list client data1");
Data data_1 = data;
futurePut = node.addProtectedData(locationKey, data);
futurePut.awaitUninterruptibly();
assertTrue(futurePut.isSuccess());
data = new Data("add to list client data2");
Data data_2 = data;
futurePut = node.addProtectedData(locationKey, data);
futurePut.awaitUninterruptibly();
assertTrue(futurePut.isSuccess());
futureGet = node.getDataMap(locationKey);
futureGet.awaitUninterruptibly();
assertTrue(futureGet.isSuccess());
boolean foundData1 = futureGet.dataMap().values().stream().anyMatch(data1 -> {
try {
return data1.object().equals("add to list client data1");
} catch (ClassNotFoundException | IOException e) {
e.printStackTrace();
}
return false;
});
boolean foundData2 = futureGet.dataMap().values().stream().anyMatch(data1 -> {
try {
return data1.object().equals("add to list client data2");
} catch (ClassNotFoundException | IOException e) {
e.printStackTrace();
}
return false;
});
assertTrue(foundData1);
assertTrue(foundData2);
assertEquals(2, futureGet.dataMap().values().size());
// other peer tried to overwrite that entry
// but will not succeed, instead he will add a new entry.
// TODO investigate why it is not possible to overwrite the entry with that method
// The protection entry with the key does not make any difference as also the client himself cannot overwrite
// any entry
// http://tomp2p.net/doc/P2P-with-TomP2P-1.pdf
// "add(location_key, value) is translated to put(location_key, hash(value), value)"
// fake content key with content key from previous clients entry
Number160 contentKey = Number160.createHash("add to list client data1");
data = new Data("add to list other peer data HACK!");
data.protectEntry(keyPairOtherPeer); // also with client key it does not work...
futurePut = otherPeer.put(locationKey).data(contentKey, data).keyPair(keyPairOtherPeer).start();
futurePut.awaitUninterruptibly();
assertTrue(futurePut.isSuccess());
node = new TomP2PNode(keyPairOtherPeer, otherPeer);
futureGet = node.getDataMap(locationKey);
futureGet.awaitUninterruptibly();
assertTrue(futureGet.isSuccess());
foundData1 = futureGet.dataMap().values().stream().anyMatch(data1 -> {
try {
return data1.object().equals("add to list client data1");
} catch (ClassNotFoundException | IOException e) {
e.printStackTrace();
}
return false;
});
foundData2 = futureGet.dataMap().values().stream().anyMatch(data1 -> {
try {
return data1.object().equals("add to list client data2");
} catch (ClassNotFoundException | IOException e) {
e.printStackTrace();
}
return false;
});
boolean foundData3 = futureGet.dataMap().values().stream().anyMatch(data1 -> {
try {
return data1.object().equals("add to list other peer data HACK!");
} catch (ClassNotFoundException | IOException e) {
e.printStackTrace();
}
return false;
});
assertTrue(foundData1);
assertTrue(foundData2);
assertTrue(foundData3);
assertEquals(3, futureGet.dataMap().values().size());
// client removes his entry -> OK
node = new TomP2PNode(keyPairClient, client);
FutureRemove futureRemove = node.removeFromDataMap(locationKey, data_1);
futureRemove.awaitUninterruptibly();
// We don't test futureRemove.isSuccess() as this API does not fit well to that operation,
// it might change in future to something like foundAndRemoved and notFound
// See discussion at: https://github.com/tomp2p/TomP2P/issues/57#issuecomment-62069840
futureGet = node.getDataMap(locationKey);
futureGet.awaitUninterruptibly();
assertTrue(futureGet.isSuccess());
foundData1 = futureGet.dataMap().values().stream().anyMatch(data1 -> {
try {
return data1.object().equals("add to list client data1");
} catch (ClassNotFoundException | IOException e) {
e.printStackTrace();
}
return false;
});
foundData2 = futureGet.dataMap().values().stream().anyMatch(data1 -> {
try {
return data1.object().equals("add to list client data2");
} catch (ClassNotFoundException | IOException e) {
e.printStackTrace();
}
return false;
});
foundData3 = futureGet.dataMap().values().stream().anyMatch(data1 -> {
try {
return data1.object().equals("add to list other peer data HACK!");
} catch (ClassNotFoundException | IOException e) {
e.printStackTrace();
}
return false;
});
assertFalse(foundData1);
assertTrue(foundData2);
assertTrue(foundData3);
assertEquals(2, futureGet.dataMap().values().size());
// otherPeer tries to removes client entry -> FAIL
node = new TomP2PNode(keyPairOtherPeer, otherPeer);
futureRemove = node.removeFromDataMap(locationKey, data_2);
futureRemove.awaitUninterruptibly();
assertFalse(futureRemove.isSuccess());
futureGet = node.getDataMap(locationKey);
futureGet.awaitUninterruptibly();
assertTrue(futureGet.isSuccess());
foundData1 = futureGet.dataMap().values().stream().anyMatch(data1 -> {
try {
return data1.object().equals("add to list client data1");
} catch (ClassNotFoundException | IOException e) {
e.printStackTrace();
}
return false;
});
foundData2 = futureGet.dataMap().values().stream().anyMatch(data1 -> {
try {
return data1.object().equals("add to list client data2");
} catch (ClassNotFoundException | IOException e) {
e.printStackTrace();
}
return false;
});
foundData3 = futureGet.dataMap().values().stream().anyMatch(data1 -> {
try {
return data1.object().equals("add to list other peer data HACK!");
} catch (ClassNotFoundException | IOException e) {
e.printStackTrace();
}
return false;
});
assertFalse(foundData1);
assertTrue(foundData2);
assertTrue(foundData3);
assertEquals(2, futureGet.dataMap().values().size());
master.shutdown();
}
}