Refactor tomp2p network classes

This commit is contained in:
Manfred Karrer 2015-03-08 23:04:33 +01:00
parent 694bc8810d
commit 5cc15e0a41
46 changed files with 560 additions and 344 deletions

View File

@ -41,7 +41,7 @@ import joptsimple.OptionParser;
import joptsimple.OptionSet;
import static io.bitsquare.app.BitsquareEnvironment.*;
import static io.bitsquare.msg.tomp2p.TomP2PMessageModule.*;
import static io.bitsquare.network.tomp2p.TomP2PNetworkModule.*;
import static io.bitsquare.network.Node.*;
import static java.util.Arrays.asList;

View File

@ -19,16 +19,20 @@ package io.bitsquare.app.gui;
import io.bitsquare.BitsquareModule;
import io.bitsquare.account.AccountSettings;
import io.bitsquare.arbitrator.ArbitratorMessageModule;
import io.bitsquare.arbitrator.tomp2p.TomP2PArbitratorMessageModule;
import io.bitsquare.btc.BitcoinModule;
import io.bitsquare.crypto.CryptoModule;
import io.bitsquare.gui.GuiModule;
import io.bitsquare.msg.MessageModule;
import io.bitsquare.msg.tomp2p.TomP2PMessageModule;
import io.bitsquare.network.NetworkModule;
import io.bitsquare.network.tomp2p.TomP2PNetworkModule;
import io.bitsquare.offer.OfferModule;
import io.bitsquare.offer.tomp2p.TomP2POfferModule;
import io.bitsquare.persistence.Persistence;
import io.bitsquare.settings.Preferences;
import io.bitsquare.trade.TradeMessageModule;
import io.bitsquare.trade.TradeModule;
import io.bitsquare.trade.tomp2p.TomP2PTradeMessageModule;
import io.bitsquare.user.User;
import com.google.inject.Injector;
@ -64,16 +68,22 @@ class BitsquareAppModule extends BitsquareModule {
bind(Environment.class).toInstance(env);
bind(UpdateProcess.class).asEagerSingleton();
install(messageModule());
install(networkModule());
install(bitcoinModule());
install(cryptoModule());
install(tradeModule());
install(tradeMessageModule());
install(offerModule());
install(arbitratorMessageModule());
install(guiModule());
}
protected MessageModule messageModule() {
return new TomP2PMessageModule(env);
protected ArbitratorMessageModule arbitratorMessageModule() {
return new TomP2PArbitratorMessageModule(env);
}
protected NetworkModule networkModule() {
return new TomP2PNetworkModule(env);
}
protected BitcoinModule bitcoinModule() {
@ -88,6 +98,10 @@ class BitsquareAppModule extends BitsquareModule {
return new TradeModule(env);
}
protected TradeMessageModule tradeMessageModule() {
return new TomP2PTradeMessageModule(env);
}
protected OfferModule offerModule() {
return new TomP2POfferModule(env);
}

View File

@ -15,34 +15,23 @@
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.msg;
package io.bitsquare.arbitrator;
import io.bitsquare.BitsquareModule;
import com.google.inject.Injector;
import org.springframework.core.env.Environment;
public abstract class MessageModule extends BitsquareModule {
public abstract class ArbitratorMessageModule extends BitsquareModule {
protected MessageModule(Environment env) {
protected ArbitratorMessageModule(Environment env) {
super(env);
}
@Override
protected final void configure() {
bind(MessageService.class).to(messageService()).asEagerSingleton();
doConfigure();
}
protected void doConfigure() {
}
protected abstract Class<? extends MessageService> messageService();
@Override
protected void doClose(Injector injector) {
injector.getInstance(MessageService.class).shutDown();
}
}

View File

@ -0,0 +1,33 @@
/*
* This file is part of Bitsquare.
*
* Bitsquare is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bitsquare is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.arbitrator;
import io.bitsquare.arbitrator.listeners.ArbitratorListener;
import java.util.Locale;
public interface ArbitratorMessageService {
void addArbitrator(Arbitrator arbitrator);
void addArbitratorListener(ArbitratorListener listener);
void getArbitrators(Locale defaultLanguageLocale);
}

View File

@ -15,7 +15,7 @@
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.msg.listeners;
package io.bitsquare.arbitrator.listeners;
import io.bitsquare.arbitrator.Arbitrator;

View File

@ -0,0 +1,43 @@
/*
* This file is part of Bitsquare.
*
* Bitsquare is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bitsquare is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.arbitrator.tomp2p;
import io.bitsquare.arbitrator.ArbitratorMessageModule;
import io.bitsquare.arbitrator.ArbitratorMessageService;
import com.google.inject.Injector;
import com.google.inject.Singleton;
import org.springframework.core.env.Environment;
public class TomP2PArbitratorMessageModule extends ArbitratorMessageModule {
public TomP2PArbitratorMessageModule(Environment env) {
super(env);
}
@Override
protected void doConfigure() {
bind(ArbitratorMessageService.class).to(TomP2PArbitratorMessageService.class).in(Singleton.class);
}
@Override
protected void doClose(Injector injector) {
super.doClose(injector);
}
}

View File

@ -15,24 +15,15 @@
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.msg.tomp2p;
package io.bitsquare.arbitrator.tomp2p;
import io.bitsquare.arbitrator.Arbitrator;
import io.bitsquare.msg.Message;
import io.bitsquare.msg.MessageService;
import io.bitsquare.msg.listeners.ArbitratorListener;
import io.bitsquare.msg.listeners.GetPeerAddressListener;
import io.bitsquare.msg.listeners.IncomingMessageListener;
import io.bitsquare.msg.listeners.OutgoingMessageListener;
import io.bitsquare.network.BootstrapState;
import io.bitsquare.network.Peer;
import io.bitsquare.network.tomp2p.TomP2PPeer;
import io.bitsquare.user.User;
import io.bitsquare.arbitrator.ArbitratorMessageService;
import io.bitsquare.arbitrator.listeners.ArbitratorListener;
import io.bitsquare.network.tomp2p.TomP2PNode;
import java.io.IOException;
import java.security.PublicKey;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
@ -46,35 +37,20 @@ import net.tomp2p.dht.FuturePut;
import net.tomp2p.dht.FutureRemove;
import net.tomp2p.futures.BaseFuture;
import net.tomp2p.futures.BaseFutureAdapter;
import net.tomp2p.futures.BaseFutureListener;
import net.tomp2p.futures.FutureDirect;
import net.tomp2p.peers.Number160;
import net.tomp2p.storage.Data;
import net.tomp2p.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
/**
* That service delivers direct messaging and DHT functionality from the TomP2P library
* It is the translating domain specific functionality to the messaging layer.
* The TomP2P library codebase shall not be used outside that service.
* That way we limit the dependency of the TomP2P library only to that class (and it's sub components).
* <p>
* TODO: improve callbacks that Platform.runLater is not necessary. We call usually that methods form teh UI thread.
*/
public class TomP2PMessageService implements MessageService {
private static final Logger log = LoggerFactory.getLogger(TomP2PMessageService.class);
public class TomP2PArbitratorMessageService implements ArbitratorMessageService {
private static final Logger log = LoggerFactory.getLogger(TomP2PArbitratorMessageService.class);
private static final String ARBITRATORS_ROOT = "ArbitratorsRoot";
private final TomP2PNode p2pNode;
private final User user;
private final TomP2PNode tomP2PNode;
private final List<ArbitratorListener> arbitratorListeners = new ArrayList<>();
private final List<IncomingMessageListener> incomingMessageListeners = new ArrayList<>();
///////////////////////////////////////////////////////////////////////////////////////////
@ -82,77 +58,8 @@ public class TomP2PMessageService implements MessageService {
///////////////////////////////////////////////////////////////////////////////////////////
@Inject
public TomP2PMessageService(User user, TomP2PNode p2pNode) {
this.user = user;
this.p2pNode = p2pNode;
}
///////////////////////////////////////////////////////////////////////////////////////////
// Public Methods
///////////////////////////////////////////////////////////////////////////////////////////
public Observable<BootstrapState> init() {
return p2pNode.bootstrap(this, user.getMessageKeyPair());
}
public void shutDown() {
if (p2pNode != null)
p2pNode.shutDown();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Find peer address by publicKey
///////////////////////////////////////////////////////////////////////////////////////////
public void getPeerAddress(PublicKey publicKey, GetPeerAddressListener listener) {
final Number160 locationKey = Utils.makeSHAHash(publicKey.getEncoded());
FutureGet futureGet = p2pNode.getDomainProtectedData(locationKey, publicKey);
futureGet.addListener(new BaseFutureAdapter<BaseFuture>() {
@Override
public void operationComplete(BaseFuture baseFuture) throws Exception {
if (baseFuture.isSuccess() && futureGet.data() != null) {
final Peer peer = (Peer) futureGet.data().object();
Platform.runLater(() -> listener.onResult(peer));
}
else {
log.error("getPeerAddress failed. failedReason = " + baseFuture.failedReason());
Platform.runLater(listener::onFailed);
}
}
});
}
///////////////////////////////////////////////////////////////////////////////////////////
// Trade process
///////////////////////////////////////////////////////////////////////////////////////////
public void sendMessage(Peer peer, Message message,
OutgoingMessageListener listener) {
if (!(peer instanceof TomP2PPeer)) {
throw new IllegalArgumentException("peer must be of type TomP2PPeer");
}
FutureDirect futureDirect = p2pNode.sendData(((TomP2PPeer) peer).getPeerAddress(), message);
futureDirect.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
if (future.isSuccess()) {
Platform.runLater(listener::onResult);
}
else {
log.error("sendMessage failed with reason " + futureDirect.failedReason());
Platform.runLater(listener::onFailed);
}
}
@Override
public void exceptionCaught(Throwable t) throws Exception {
Platform.runLater(listener::onFailed);
}
});
public TomP2PArbitratorMessageService(TomP2PNode tomP2PNode) {
this.tomP2PNode = tomP2PNode;
}
@ -165,7 +72,7 @@ public class TomP2PMessageService implements MessageService {
try {
final Data arbitratorData = new Data(arbitrator);
FuturePut addFuture = p2pNode.addProtectedData(locationKey, arbitratorData);
FuturePut addFuture = tomP2PNode.addProtectedData(locationKey, arbitratorData);
addFuture.addListener(new BaseFutureAdapter<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
@ -199,7 +106,7 @@ public class TomP2PMessageService implements MessageService {
public void removeArbitrator(Arbitrator arbitrator) throws IOException {
Number160 locationKey = Number160.createHash(ARBITRATORS_ROOT);
final Data arbitratorData = new Data(arbitrator);
FutureRemove removeFuture = p2pNode.removeFromDataMap(locationKey, arbitratorData);
FutureRemove removeFuture = tomP2PNode.removeFromDataMap(locationKey, arbitratorData);
removeFuture.addListener(new BaseFutureAdapter<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
@ -230,7 +137,7 @@ public class TomP2PMessageService implements MessageService {
public void getArbitrators(Locale languageLocale) {
Number160 locationKey = Number160.createHash(ARBITRATORS_ROOT);
FutureGet futureGet = p2pNode.getDataMap(locationKey);
FutureGet futureGet = tomP2PNode.getDataMap(locationKey);
futureGet.addListener(new BaseFutureAdapter<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
@ -275,23 +182,4 @@ public class TomP2PMessageService implements MessageService {
arbitratorListeners.remove(listener);
}
public void addIncomingMessageListener(IncomingMessageListener listener) {
incomingMessageListeners.add(listener);
}
public void removeIncomingMessageListener(IncomingMessageListener listener) {
incomingMessageListeners.remove(listener);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Incoming message handler
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void handleMessage(Object message, Peer sender) {
if (message instanceof Message) {
Platform.runLater(() -> incomingMessageListeners.stream().forEach(e ->
e.onMessage((Message) message, sender)));
}
}
}

View File

@ -20,6 +20,7 @@ package io.bitsquare.gui.main;
import io.bitsquare.account.AccountSettings;
import io.bitsquare.app.gui.UpdateProcess;
import io.bitsquare.arbitrator.Arbitrator;
import io.bitsquare.arbitrator.ArbitratorMessageService;
import io.bitsquare.arbitrator.Reputation;
import io.bitsquare.bank.BankAccount;
import io.bitsquare.bank.BankAccountType;
@ -28,11 +29,12 @@ import io.bitsquare.btc.WalletService;
import io.bitsquare.gui.util.BSFormatter;
import io.bitsquare.locale.CountryUtil;
import io.bitsquare.locale.LanguageUtil;
import io.bitsquare.msg.MessageService;
import io.bitsquare.network.BootstrapState;
import io.bitsquare.network.ClientNode;
import io.bitsquare.persistence.Persistence;
import io.bitsquare.trade.Trade;
import io.bitsquare.trade.TradeManager;
import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.user.User;
import io.bitsquare.util.DSAKeyUtil;
@ -102,7 +104,9 @@ class MainViewModel implements ViewModel {
private final User user;
private final WalletService walletService;
private final MessageService messageService;
private final ClientNode clientNode;
private TradeMessageService tradeMessageService;
private ArbitratorMessageService arbitratorMessageService;
private final TradeManager tradeManager;
private UpdateProcess updateProcess;
private final BSFormatter formatter;
@ -110,12 +114,15 @@ class MainViewModel implements ViewModel {
private AccountSettings accountSettings;
@Inject
public MainViewModel(User user, WalletService walletService, MessageService messageService,
public MainViewModel(User user, WalletService walletService, ClientNode clientNode, TradeMessageService tradeMessageService,
ArbitratorMessageService arbitratorMessageService,
TradeManager tradeManager, BitcoinNetwork bitcoinNetwork, UpdateProcess updateProcess,
BSFormatter formatter, Persistence persistence, AccountSettings accountSettings) {
this.user = user;
this.walletService = walletService;
this.messageService = messageService;
this.clientNode = clientNode;
this.tradeMessageService = tradeMessageService;
this.arbitratorMessageService = arbitratorMessageService;
this.tradeManager = tradeManager;
this.updateProcess = updateProcess;
this.formatter = formatter;
@ -160,7 +167,7 @@ class MainViewModel implements ViewModel {
error -> log.error(error.toString()),
() -> Platform.runLater(() -> setBitcoinNetworkSyncProgress(1.0)));
Observable<BootstrapState> messageObservable = messageService.init();
Observable<BootstrapState> messageObservable = clientNode.bootstrap(user.getMessageKeyPair(), tradeMessageService);
messageObservable.publish();
messageObservable.subscribe(
state -> Platform.runLater(() -> setBootstrapState(state)),
@ -377,7 +384,7 @@ class MainViewModel implements ViewModel {
accountSettings.addAcceptedArbitrator(arbitrator);
persistence.write(accountSettings);
messageService.addArbitrator(arbitrator);
arbitratorMessageService.addArbitrator(arbitrator);
}
}
}

View File

@ -21,8 +21,8 @@ import io.bitsquare.account.AccountSettings;
import io.bitsquare.arbitrator.Arbitrator;
import io.bitsquare.gui.main.account.arbitrator.profile.ArbitratorProfileView;
import io.bitsquare.locale.LanguageUtil;
import io.bitsquare.msg.MessageService;
import io.bitsquare.msg.listeners.ArbitratorListener;
import io.bitsquare.arbitrator.ArbitratorMessageService;
import io.bitsquare.arbitrator.listeners.ArbitratorListener;
import io.bitsquare.persistence.Persistence;
import java.util.ArrayList;
@ -56,11 +56,11 @@ public class ArbitratorBrowserView extends ActivatableView<Pane, Void> implement
private final ViewLoader viewLoader;
private final AccountSettings accountSettings;
private final Persistence persistence;
private final MessageService messageService;
private final ArbitratorMessageService messageService;
@Inject
public ArbitratorBrowserView(CachingViewLoader viewLoader, AccountSettings accountSettings, Persistence persistence,
MessageService messageService) {
ArbitratorMessageService messageService) {
this.viewLoader = viewLoader;
this.accountSettings = accountSettings;
this.persistence = persistence;

View File

@ -24,7 +24,7 @@ import io.bitsquare.gui.components.confidence.ConfidenceProgressIndicator;
import io.bitsquare.gui.util.BSFormatter;
import io.bitsquare.locale.BSResources;
import io.bitsquare.locale.LanguageUtil;
import io.bitsquare.msg.MessageService;
import io.bitsquare.arbitrator.ArbitratorMessageService;
import io.bitsquare.persistence.Persistence;
import io.bitsquare.user.User;
import io.bitsquare.util.DSAKeyUtil;
@ -85,13 +85,13 @@ public class ArbitratorRegistrationView extends ActivatableView<AnchorPane, Void
private final Persistence persistence;
private final WalletService walletService;
private final MessageService messageService;
private final ArbitratorMessageService messageService;
private final User user;
private final BSFormatter formatter;
@Inject
private ArbitratorRegistrationView(Persistence persistence, WalletService walletService,
MessageService messageService, User user, BSFormatter formatter) {
ArbitratorMessageService messageService, User user, BSFormatter formatter) {
this.persistence = persistence;
this.walletService = walletService;
this.messageService = messageService;

View File

@ -25,7 +25,7 @@ import io.bitsquare.bank.BankAccountType;
import io.bitsquare.locale.CountryUtil;
import io.bitsquare.locale.CurrencyUtil;
import io.bitsquare.locale.LanguageUtil;
import io.bitsquare.msg.MessageService;
import io.bitsquare.arbitrator.ArbitratorMessageService;
import io.bitsquare.persistence.Persistence;
import io.bitsquare.user.User;
import io.bitsquare.util.DSAKeyUtil;
@ -55,7 +55,7 @@ class IrcAccountDataModel implements Activatable, DataModel {
private final User user;
private final AccountSettings accountSettings;
private final MessageService messageService;
private final ArbitratorMessageService messageService;
private final Persistence persistence;
final StringProperty nickName = new SimpleStringProperty();
@ -70,7 +70,7 @@ class IrcAccountDataModel implements Activatable, DataModel {
@Inject
public IrcAccountDataModel(User user, Persistence persistence, AccountSettings accountSettings,
MessageService messageService) {
ArbitratorMessageService messageService) {
this.persistence = persistence;
this.user = user;
this.accountSettings = accountSettings;

View File

@ -24,7 +24,7 @@ import io.bitsquare.locale.Country;
import io.bitsquare.locale.CountryUtil;
import io.bitsquare.locale.LanguageUtil;
import io.bitsquare.locale.Region;
import io.bitsquare.msg.MessageService;
import io.bitsquare.arbitrator.ArbitratorMessageService;
import io.bitsquare.persistence.Persistence;
import io.bitsquare.user.User;
import io.bitsquare.util.DSAKeyUtil;
@ -50,7 +50,7 @@ class RestrictionsDataModel implements Activatable, DataModel {
private final User user;
private final AccountSettings accountSettings;
private final Persistence persistence;
private final MessageService messageService;
private final ArbitratorMessageService messageService;
final ObservableList<Locale> languageList = FXCollections.observableArrayList();
final ObservableList<Country> countryList = FXCollections.observableArrayList();
@ -62,7 +62,7 @@ class RestrictionsDataModel implements Activatable, DataModel {
@Inject
public RestrictionsDataModel(User user, AccountSettings accountSettings, Persistence persistence,
MessageService messageService) {
ArbitratorMessageService messageService) {
this.user = user;
this.accountSettings = accountSettings;
this.persistence = persistence;

View File

@ -21,15 +21,15 @@ import io.bitsquare.btc.AddressEntry;
import io.bitsquare.btc.FeePolicy;
import io.bitsquare.btc.WalletService;
import io.bitsquare.btc.listeners.BalanceListener;
import io.bitsquare.msg.MessageService;
import io.bitsquare.msg.listeners.GetPeerAddressListener;
import io.bitsquare.msg.listeners.OutgoingMessageListener;
import io.bitsquare.network.Peer;
import io.bitsquare.offer.Offer;
import io.bitsquare.persistence.Persistence;
import io.bitsquare.settings.Preferences;
import io.bitsquare.trade.Trade;
import io.bitsquare.trade.TradeManager;
import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.trade.listeners.GetPeerAddressListener;
import io.bitsquare.trade.listeners.OutgoingMessageListener;
import io.bitsquare.trade.protocol.trade.taker.messages.RequestIsOfferAvailableMessage;
import org.bitcoinj.core.Coin;
@ -69,7 +69,7 @@ class TakeOfferDataModel implements Activatable, DataModel {
private final TradeManager tradeManager;
private final WalletService walletService;
private MessageService messageService;
private TradeMessageService tradeMessageService;
private final Preferences preferences;
private final Persistence persistence;
@ -98,12 +98,12 @@ class TakeOfferDataModel implements Activatable, DataModel {
private boolean isActivated;
@Inject
public TakeOfferDataModel(TradeManager tradeManager, WalletService walletService, MessageService messageService,
public TakeOfferDataModel(TradeManager tradeManager, WalletService walletService, TradeMessageService tradeMessageService,
Preferences preferences,
Persistence persistence) {
this.tradeManager = tradeManager;
this.walletService = walletService;
this.messageService = messageService;
this.tradeMessageService = tradeMessageService;
this.preferences = preferences;
this.persistence = persistence;
@ -154,7 +154,7 @@ class TakeOfferDataModel implements Activatable, DataModel {
// TODO: Should be moved to a domain and handled with add/remove listeners instead of isActivated
// or maybe with rx?
private void getPeerAddress(Offer offer) {
messageService.getPeerAddress(offer.getMessagePublicKey(), new GetPeerAddressListener() {
tradeMessageService.getPeerAddress(offer.getMessagePublicKey(), new GetPeerAddressListener() {
@Override
public void onResult(Peer peer) {
if (isActivated)
@ -170,7 +170,7 @@ class TakeOfferDataModel implements Activatable, DataModel {
}
private void isOfferAvailable(Peer peer, String offerId) {
messageService.sendMessage(peer, new RequestIsOfferAvailableMessage(offerId),
tradeMessageService.sendMessage(peer, new RequestIsOfferAvailableMessage(offerId),
new OutgoingMessageListener() {
@Override
public void onResult() {

View File

@ -17,10 +17,16 @@
package io.bitsquare.network;
import java.security.KeyPair;
import rx.Observable;
public interface ClientNode {
ConnectionType getConnectionType();
Node getAddress();
Node getBootstrapNodeAddress();
public Observable<BootstrapState> bootstrap(KeyPair keyPair, MessageBroker messageBroker);
}

View File

@ -15,7 +15,7 @@
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.msg;
package io.bitsquare.network;
public interface Message {
}

View File

@ -15,9 +15,7 @@
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.msg;
import io.bitsquare.network.Peer;
package io.bitsquare.network;
/**
* Interface for the object handling incoming messages.

View File

@ -0,0 +1,37 @@
/*
* This file is part of Bitsquare.
*
* Bitsquare is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bitsquare is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.network;
import io.bitsquare.BitsquareModule;
import org.springframework.core.env.Environment;
public abstract class NetworkModule extends BitsquareModule {
protected NetworkModule(Environment env) {
super(env);
}
@Override
protected final void configure() {
doConfigure();
}
protected void doConfigure() {
}
}

View File

@ -15,7 +15,7 @@
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.msg.tomp2p;
package io.bitsquare.network.tomp2p;
import io.bitsquare.network.BootstrapState;
import io.bitsquare.network.Node;
@ -180,7 +180,7 @@ public class BootstrappedPeerBuilder {
return settableFuture;
}
void shutDown() {
public void shutDown() {
if (peerDHT != null)
peerDHT.shutdown();
}

View File

@ -15,54 +15,48 @@
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.msg.tomp2p;
package io.bitsquare.network.tomp2p;
import io.bitsquare.msg.MessageModule;
import io.bitsquare.msg.MessageService;
import io.bitsquare.network.BootstrapNodes;
import io.bitsquare.network.ClientNode;
import io.bitsquare.network.NetworkModule;
import io.bitsquare.network.Node;
import com.google.inject.Injector;
import com.google.inject.Singleton;
import com.google.inject.name.Names;
import javax.inject.Singleton;
import org.springframework.core.env.Environment;
import static io.bitsquare.msg.tomp2p.BootstrappedPeerBuilder.*;
public class TomP2PMessageModule extends MessageModule {
import static io.bitsquare.network.tomp2p.BootstrappedPeerBuilder.*;
public class TomP2PNetworkModule extends NetworkModule {
public static final String BOOTSTRAP_NODE_NAME_KEY = "bootstrap.node.name";
public static final String BOOTSTRAP_NODE_IP_KEY = "bootstrap.node.ip";
public static final String BOOTSTRAP_NODE_PORT_KEY = "bootstrap.node.port";
public static final String NETWORK_INTERFACE_KEY = BootstrappedPeerBuilder.NETWORK_INTERFACE_KEY;
public static final String USE_MANUAL_PORT_FORWARDING_KEY = BootstrappedPeerBuilder.USE_MANUAL_PORT_FORWARDING_KEY;
public TomP2PMessageModule(Environment env) {
public TomP2PNetworkModule(Environment env) {
super(env);
}
@Override
protected void doConfigure() {
bind(int.class).annotatedWith(Names.named(Node.PORT_KEY)).toInstance(
env.getProperty(Node.PORT_KEY, int.class, Node.DEFAULT_PORT));
bind(ClientNode.class).to(TomP2PNode.class).in(Singleton.class);
bind(TomP2PNode.class).in(Singleton.class);
bind(int.class).annotatedWith(Names.named(Node.PORT_KEY)).toInstance(env.getProperty(Node.PORT_KEY, int.class, Node.DEFAULT_PORT));
bind(boolean.class).annotatedWith(Names.named(USE_MANUAL_PORT_FORWARDING_KEY)).toInstance(
env.getProperty(USE_MANUAL_PORT_FORWARDING_KEY, boolean.class, false));
bind(TomP2PNode.class).in(Singleton.class);
bind(ClientNode.class).to(TomP2PNode.class);
bind(Node.class).annotatedWith(Names.named(BOOTSTRAP_NODE_KEY)).toInstance(
Node.at(
env.getProperty(BOOTSTRAP_NODE_NAME_KEY, BootstrapNodes.DEFAULT.getName()),
Node.at(env.getProperty(BOOTSTRAP_NODE_NAME_KEY, BootstrapNodes.DEFAULT.getName()),
env.getProperty(BOOTSTRAP_NODE_IP_KEY, BootstrapNodes.DEFAULT.getIp()),
env.getProperty(BOOTSTRAP_NODE_PORT_KEY, int.class, BootstrapNodes.DEFAULT.getPort())
)
);
bindConstant().annotatedWith(Names.named(NETWORK_INTERFACE_KEY)).to(
env.getProperty(NETWORK_INTERFACE_KEY, NETWORK_INTERFACE_UNSPECIFIED));
bindConstant().annotatedWith(Names.named(NETWORK_INTERFACE_KEY)).to(env.getProperty(NETWORK_INTERFACE_KEY, NETWORK_INTERFACE_UNSPECIFIED));
bind(BootstrappedPeerBuilder.class).asEagerSingleton();
}
@ -72,9 +66,4 @@ public class TomP2PMessageModule extends MessageModule {
injector.getInstance(BootstrappedPeerBuilder.class).shutDown();
}
@Override
protected Class<? extends MessageService> messageService() {
return TomP2PMessageService.class;
}
}

View File

@ -15,16 +15,15 @@
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.msg.tomp2p;
package io.bitsquare.network.tomp2p;
import io.bitsquare.BitsquareException;
import io.bitsquare.msg.MessageBroker;
import io.bitsquare.network.MessageBroker;
import io.bitsquare.network.BootstrapState;
import io.bitsquare.network.ClientNode;
import io.bitsquare.network.ConnectionType;
import io.bitsquare.network.NetworkException;
import io.bitsquare.network.Node;
import io.bitsquare.network.tomp2p.TomP2PPeer;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@ -75,10 +74,7 @@ public class TomP2PNode implements ClientNode {
private static final Logger log = LoggerFactory.getLogger(TomP2PNode.class);
private KeyPair keyPair;
private MessageBroker messageBroker;
private PeerAddress storedPeerAddress;
private PeerDHT peerDHT;
private BootstrappedPeerBuilder bootstrappedPeerBuilder;
@ -97,20 +93,15 @@ public class TomP2PNode implements ClientNode {
this.keyPair = keyPair;
this.peerDHT = peerDHT;
peerDHT.peerBean().keyPair(keyPair);
messageBroker = (message, peerAddress) -> {
};
}
///////////////////////////////////////////////////////////////////////////////////////////
// Public methods
///////////////////////////////////////////////////////////////////////////////////////////
public Observable<BootstrapState> bootstrap(MessageBroker messageBroker, KeyPair keyPair) {
public Observable<BootstrapState> bootstrap(KeyPair keyPair, MessageBroker messageBroker) {
checkNotNull(keyPair, "keyPair must not be null.");
checkNotNull(messageBroker, "messageBroker must not be null.");
this.messageBroker = messageBroker;
this.keyPair = keyPair;
bootstrappedPeerBuilder.setKeyPair(keyPair);
@ -128,7 +119,7 @@ public class TomP2PNode implements ClientNode {
if (peerDHT != null) {
TomP2PNode.this.peerDHT = peerDHT;
setupTimerForIPCheck();
setupReplyHandler();
setupReplyHandler(messageBroker);
try {
storeAddress();
} catch (NetworkException e) {
@ -152,10 +143,6 @@ public class TomP2PNode implements ClientNode {
return bootstrapStateSubject.asObservable();
}
public void shutDown() {
if (peerDHT != null)
peerDHT.shutdown();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Generic DHT methods
@ -302,7 +289,7 @@ public class TomP2PNode implements ClientNode {
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private void setupReplyHandler() {
private void setupReplyHandler(MessageBroker messageBroker) {
peerDHT.peer().objectDataReply((sender, request) -> {
log.debug("handleMessage peerAddress " + sender);
log.debug("handleMessage message " + request);

View File

@ -17,7 +17,7 @@
package io.bitsquare.offer.tomp2p;
import io.bitsquare.msg.tomp2p.TomP2PNode;
import io.bitsquare.network.tomp2p.TomP2PNode;
import io.bitsquare.offer.Offer;
import io.bitsquare.offer.RemoteOfferBook;
import io.bitsquare.util.handlers.FaultHandler;
@ -30,8 +30,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import javax.inject.Inject;
import javafx.beans.property.LongProperty;
import javafx.beans.property.SimpleLongProperty;
@ -55,12 +53,11 @@ public class TomP2POfferBook implements RemoteOfferBook {
private final List<Listener> offerRepositoryListeners = new ArrayList<>();
private final LongProperty invalidationTimestamp = new SimpleLongProperty(0);
private final TomP2PNode p2pNode;
private final TomP2PNode tomP2PNode;
private Executor executor;
@Inject
public TomP2POfferBook(TomP2PNode p2pNode) {
this.p2pNode = p2pNode;
public TomP2POfferBook(TomP2PNode tomP2PNode) {
this.tomP2PNode = tomP2PNode;
}
public void setExecutor(Executor executor) {
@ -78,7 +75,7 @@ public class TomP2POfferBook implements RemoteOfferBook {
offerData.ttlSeconds(defaultOfferTTL);
log.trace("Add offer to DHT requested. Added data: [locationKey: " + locationKey +
", hash: " + offerData.hash().toString() + "]");
FuturePut futurePut = p2pNode.addProtectedData(locationKey, offerData);
FuturePut futurePut = tomP2PNode.addProtectedData(locationKey, offerData);
futurePut.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
@ -121,7 +118,7 @@ public class TomP2POfferBook implements RemoteOfferBook {
final Data offerData = new Data(offer);
log.trace("Remove offer from DHT requested. Removed data: [locationKey: " + locationKey +
", hash: " + offerData.hash().toString() + "]");
FutureRemove futureRemove = p2pNode.removeFromDataMap(locationKey, offerData);
FutureRemove futureRemove = tomP2PNode.removeFromDataMap(locationKey, offerData);
futureRemove.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
@ -166,7 +163,7 @@ public class TomP2POfferBook implements RemoteOfferBook {
public void getOffers(String currencyCode) {
Number160 locationKey = Number160.createHash(currencyCode);
log.trace("Get offers from DHT requested for locationKey: " + locationKey);
FutureGet futureGet = p2pNode.getDataMap(locationKey);
FutureGet futureGet = tomP2PNode.getDataMap(locationKey);
futureGet.addListener(new BaseFutureAdapter<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
@ -230,7 +227,7 @@ public class TomP2POfferBook implements RemoteOfferBook {
private void writeInvalidationTimestampToDHT(String currencyCode) {
invalidationTimestamp.set(System.currentTimeMillis());
try {
FuturePut putFuture = p2pNode.putData(getInvalidatedLocationKey(currencyCode),
FuturePut putFuture = tomP2PNode.putData(getInvalidatedLocationKey(currencyCode),
new Data(invalidationTimestamp.get()));
putFuture.addListener(new BaseFutureListener<BaseFuture>() {
@Override
@ -257,7 +254,7 @@ public class TomP2POfferBook implements RemoteOfferBook {
}
public void requestInvalidationTimeStampFromDHT(String currencyCode) {
FutureGet futureGet = p2pNode.getData(getInvalidatedLocationKey(currencyCode));
FutureGet futureGet = tomP2PNode.getData(getInvalidatedLocationKey(currencyCode));
futureGet.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {

View File

@ -17,7 +17,7 @@
package io.bitsquare.offer.tomp2p;
import io.bitsquare.msg.tomp2p.TomP2PNode;
import io.bitsquare.network.tomp2p.TomP2PNode;
import io.bitsquare.offer.OfferModule;
import io.bitsquare.offer.RemoteOfferBook;
@ -45,8 +45,8 @@ class RemoteOfferBookProvider implements Provider<RemoteOfferBook> {
private final TomP2POfferBook remoteOfferBook;
@Inject
public RemoteOfferBookProvider(TomP2PNode p2pNode) {
remoteOfferBook = new TomP2POfferBook(p2pNode);
public RemoteOfferBookProvider(TomP2PNode tomP2PNode) {
remoteOfferBook = new TomP2POfferBook(tomP2PNode);
remoteOfferBook.setExecutor(Platform::runLater);
}

View File

@ -22,15 +22,14 @@ import io.bitsquare.bank.BankAccount;
import io.bitsquare.btc.BlockChainService;
import io.bitsquare.btc.WalletService;
import io.bitsquare.crypto.SignatureService;
import io.bitsquare.msg.Message;
import io.bitsquare.msg.MessageService;
import io.bitsquare.msg.listeners.OutgoingMessageListener;
import io.bitsquare.network.Message;
import io.bitsquare.network.Peer;
import io.bitsquare.offer.Direction;
import io.bitsquare.offer.Offer;
import io.bitsquare.offer.RemoteOfferBook;
import io.bitsquare.persistence.Persistence;
import io.bitsquare.trade.handlers.TransactionResultHandler;
import io.bitsquare.trade.listeners.OutgoingMessageListener;
import io.bitsquare.trade.protocol.placeoffer.PlaceOfferProtocol;
import io.bitsquare.trade.protocol.trade.TradeMessage;
import io.bitsquare.trade.protocol.trade.offerer.BuyerAcceptsOfferProtocol;
@ -80,7 +79,7 @@ public class TradeManager {
private final User user;
private final AccountSettings accountSettings;
private final Persistence persistence;
private final MessageService messageService;
private final TradeMessageService tradeMessageService;
private final BlockChainService blockChainService;
private final WalletService walletService;
private final SignatureService signatureService;
@ -105,13 +104,13 @@ public class TradeManager {
@Inject
public TradeManager(User user, AccountSettings accountSettings, Persistence persistence,
MessageService messageService, BlockChainService blockChainService,
TradeMessageService tradeMessageService, BlockChainService blockChainService,
WalletService walletService, SignatureService signatureService,
RemoteOfferBook remoteOfferBook) {
this.user = user;
this.accountSettings = accountSettings;
this.persistence = persistence;
this.messageService = messageService;
this.tradeMessageService = tradeMessageService;
this.blockChainService = blockChainService;
this.walletService = walletService;
this.signatureService = signatureService;
@ -132,7 +131,7 @@ public class TradeManager {
closedTrades.putAll((Map<String, Trade>) closedTradesObject);
}
messageService.addIncomingMessageListener(this::onIncomingTradeMessage);
tradeMessageService.addIncomingMessageListener(this::onIncomingTradeMessage);
}
@ -141,7 +140,7 @@ public class TradeManager {
///////////////////////////////////////////////////////////////////////////////////////////
public void cleanup() {
messageService.removeIncomingMessageListener(this::onIncomingTradeMessage);
tradeMessageService.removeIncomingMessageListener(this::onIncomingTradeMessage);
}
@ -250,7 +249,7 @@ public class TradeManager {
BuyerAcceptsOfferProtocol buyerAcceptsOfferProtocol = new BuyerAcceptsOfferProtocol(trade,
sender,
messageService,
tradeMessageService,
walletService,
blockChainService,
signatureService,
@ -380,7 +379,7 @@ public class TradeManager {
};
SellerTakesOfferProtocol sellerTakesOfferProtocol = new SellerTakesOfferProtocol(
trade, listener, messageService, walletService, blockChainService, signatureService,
trade, listener, tradeMessageService, walletService, blockChainService, signatureService,
user);
takerAsSellerProtocolMap.put(trade.getId(), sellerTakesOfferProtocol);
sellerTakesOfferProtocol.start();
@ -429,7 +428,7 @@ public class TradeManager {
boolean isOfferOpen = getTrade(tradeId) == null;
RespondToIsOfferAvailableMessage replyMessage =
new RespondToIsOfferAvailableMessage(tradeId, isOfferOpen);
messageService.sendMessage(sender, replyMessage, new OutgoingMessageListener() {
tradeMessageService.sendMessage(sender, replyMessage, new OutgoingMessageListener() {
@Override
public void onResult() {
log.trace("RespondToTakeOfferRequestMessage successfully arrived at peer");

View File

@ -17,10 +17,12 @@
package io.bitsquare.trade;
import io.bitsquare.network.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TradeMessage {
public class TradeMessage implements Message {
private static final Logger log = LoggerFactory.getLogger(TradeMessage.class);
public TradeMessage() {

View File

@ -0,0 +1,37 @@
/*
* This file is part of Bitsquare.
*
* Bitsquare is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bitsquare is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.trade;
import io.bitsquare.BitsquareModule;
import org.springframework.core.env.Environment;
public abstract class TradeMessageModule extends BitsquareModule {
protected TradeMessageModule(Environment env) {
super(env);
}
@Override
protected final void configure() {
doConfigure();
}
protected void doConfigure() {
}
}

View File

@ -15,39 +15,24 @@
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.msg;
package io.bitsquare.trade;
import io.bitsquare.arbitrator.Arbitrator;
import io.bitsquare.msg.listeners.ArbitratorListener;
import io.bitsquare.msg.listeners.GetPeerAddressListener;
import io.bitsquare.msg.listeners.IncomingMessageListener;
import io.bitsquare.msg.listeners.OutgoingMessageListener;
import io.bitsquare.network.BootstrapState;
import io.bitsquare.network.Message;
import io.bitsquare.network.MessageBroker;
import io.bitsquare.trade.listeners.GetPeerAddressListener;
import io.bitsquare.trade.listeners.IncomingMessageListener;
import io.bitsquare.trade.listeners.OutgoingMessageListener;
import io.bitsquare.network.Peer;
import java.security.PublicKey;
import java.util.Locale;
import rx.Observable;
public interface MessageService extends MessageBroker {
public interface TradeMessageService extends MessageBroker {
void sendMessage(Peer peer, Message message, OutgoingMessageListener listener);
void shutDown();
void addArbitrator(Arbitrator arbitrator);
void addIncomingMessageListener(IncomingMessageListener listener);
void removeIncomingMessageListener(IncomingMessageListener listener);
void addArbitratorListener(ArbitratorListener listener);
void getArbitrators(Locale defaultLanguageLocale);
Observable<BootstrapState> init();
void getPeerAddress(PublicKey messagePublicKey, GetPeerAddressListener getPeerAddressListener);
}

View File

@ -15,7 +15,7 @@
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.msg.listeners;
package io.bitsquare.trade.listeners;
import io.bitsquare.network.Peer;

View File

@ -15,9 +15,9 @@
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.msg.listeners;
package io.bitsquare.trade.listeners;
import io.bitsquare.msg.Message;
import io.bitsquare.network.Message;
import io.bitsquare.network.Peer;
public interface IncomingMessageListener {

View File

@ -15,7 +15,7 @@
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.msg.listeners;
package io.bitsquare.trade.listeners;
public interface OutgoingMessageListener {
void onFailed();

View File

@ -15,7 +15,7 @@
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.msg.listeners;
package io.bitsquare.trade.listeners;
import net.tomp2p.peers.PeerAddress;

View File

@ -17,7 +17,7 @@
package io.bitsquare.trade.protocol.trade;
import io.bitsquare.msg.Message;
import io.bitsquare.network.Message;
public interface TradeMessage extends Message {
public String getTradeId();

View File

@ -22,11 +22,11 @@ import io.bitsquare.btc.BlockChainService;
import io.bitsquare.btc.FeePolicy;
import io.bitsquare.btc.WalletService;
import io.bitsquare.crypto.SignatureService;
import io.bitsquare.msg.MessageService;
import io.bitsquare.network.Peer;
import io.bitsquare.offer.Offer;
import io.bitsquare.trade.Contract;
import io.bitsquare.trade.Trade;
import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.trade.protocol.trade.offerer.tasks.CreateDepositTx;
import io.bitsquare.trade.protocol.trade.offerer.tasks.HandleTakeOfferRequest;
import io.bitsquare.trade.protocol.trade.offerer.tasks.RequestTakerDepositPayment;
@ -96,7 +96,7 @@ public class BuyerAcceptsOfferProtocol {
// provided
private final Trade trade;
private final Peer peer;
private final MessageService messageService;
private final TradeMessageService tradeMessageService;
private final WalletService walletService;
private final BlockChainService blockChainService;
private final SignatureService signatureService;
@ -139,7 +139,7 @@ public class BuyerAcceptsOfferProtocol {
public BuyerAcceptsOfferProtocol(Trade trade,
Peer peer,
MessageService messageService,
TradeMessageService tradeMessageService,
WalletService walletService,
BlockChainService blockChainService,
SignatureService signatureService,
@ -148,7 +148,7 @@ public class BuyerAcceptsOfferProtocol {
this.trade = trade;
this.peer = peer;
this.listener = listener;
this.messageService = messageService;
this.tradeMessageService = tradeMessageService;
this.walletService = walletService;
this.blockChainService = blockChainService;
this.signatureService = signatureService;
@ -175,7 +175,7 @@ public class BuyerAcceptsOfferProtocol {
public void start() {
log.debug("start called " + step++);
state = State.HandleTakeOfferRequest;
HandleTakeOfferRequest.run(this::onResultHandleTakeOfferRequest, this::onFault, peer, messageService,
HandleTakeOfferRequest.run(this::onResultHandleTakeOfferRequest, this::onFault, peer, tradeMessageService,
trade.getState(), tradeId);
}
@ -239,7 +239,7 @@ public class BuyerAcceptsOfferProtocol {
RequestTakerDepositPayment.run(this::onResultRequestTakerDepositPayment,
this::onFault,
peer,
messageService,
tradeMessageService,
tradeId,
bankAccount,
accountId,
@ -338,7 +338,7 @@ public class BuyerAcceptsOfferProtocol {
listener.onDepositTxPublished(depositTransaction);
state = State.SendDepositTxIdToTaker;
SendDepositTxIdToTaker.run(this::onResultSendDepositTxIdToTaker, this::onFault, peer, messageService,
SendDepositTxIdToTaker.run(this::onResultSendDepositTxIdToTaker, this::onFault, peer, tradeMessageService,
tradeId, depositTransaction);
}
@ -381,7 +381,7 @@ public class BuyerAcceptsOfferProtocol {
SendSignedPayoutTx.run(this::onResultSendSignedPayoutTx,
this::onFault,
peer,
messageService,
tradeMessageService,
walletService,
tradeId,
peersPayoutAddress,

View File

@ -17,10 +17,10 @@
package io.bitsquare.trade.protocol.trade.offerer.tasks;
import io.bitsquare.msg.MessageService;
import io.bitsquare.msg.listeners.OutgoingMessageListener;
import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.network.Peer;
import io.bitsquare.trade.Trade;
import io.bitsquare.trade.listeners.OutgoingMessageListener;
import io.bitsquare.trade.protocol.trade.offerer.messages.RespondToTakeOfferRequestMessage;
import io.bitsquare.util.handlers.ExceptionHandler;
@ -31,7 +31,7 @@ public class HandleTakeOfferRequest {
private static final Logger log = LoggerFactory.getLogger(HandleTakeOfferRequest.class);
public static void run(ResultHandler resultHandler, ExceptionHandler exceptionHandler, Peer peer,
MessageService messageService, Trade.State tradeState, String tradeId) {
TradeMessageService tradeMessageService, Trade.State tradeState, String tradeId) {
log.trace("Run task");
boolean isTradeIsOpen = tradeState == Trade.State.OPEN;
if (!isTradeIsOpen) {
@ -39,7 +39,7 @@ public class HandleTakeOfferRequest {
}
RespondToTakeOfferRequestMessage tradeMessage =
new RespondToTakeOfferRequestMessage(tradeId, isTradeIsOpen);
messageService.sendMessage(peer, tradeMessage, new OutgoingMessageListener() {
tradeMessageService.sendMessage(peer, tradeMessage, new OutgoingMessageListener() {
@Override
public void onResult() {
log.trace("RespondToTakeOfferRequestMessage successfully arrived at peer");

View File

@ -18,8 +18,8 @@
package io.bitsquare.trade.protocol.trade.offerer.tasks;
import io.bitsquare.bank.BankAccount;
import io.bitsquare.msg.MessageService;
import io.bitsquare.msg.listeners.OutgoingMessageListener;
import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.trade.listeners.OutgoingMessageListener;
import io.bitsquare.network.Peer;
import io.bitsquare.trade.protocol.trade.offerer.messages.RequestTakerDepositPaymentMessage;
import io.bitsquare.util.handlers.ExceptionHandler;
@ -34,7 +34,7 @@ public class RequestTakerDepositPayment {
public static void run(ResultHandler resultHandler,
ExceptionHandler exceptionHandler,
Peer peer,
MessageService messageService,
TradeMessageService tradeMessageService,
String tradeId,
BankAccount bankAccount,
String accountId,
@ -44,7 +44,7 @@ public class RequestTakerDepositPayment {
log.trace("Run task");
RequestTakerDepositPaymentMessage tradeMessage = new RequestTakerDepositPaymentMessage(
tradeId, bankAccount, accountId, offererPubKey, preparedOffererDepositTxAsHex, offererTxOutIndex);
messageService.sendMessage(peer, tradeMessage, new OutgoingMessageListener() {
tradeMessageService.sendMessage(peer, tradeMessage, new OutgoingMessageListener() {
@Override
public void onResult() {
log.trace("RequestTakerDepositPaymentMessage successfully arrived at peer");

View File

@ -17,8 +17,8 @@
package io.bitsquare.trade.protocol.trade.offerer.tasks;
import io.bitsquare.msg.MessageService;
import io.bitsquare.msg.listeners.OutgoingMessageListener;
import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.trade.listeners.OutgoingMessageListener;
import io.bitsquare.network.Peer;
import io.bitsquare.trade.protocol.trade.offerer.messages.DepositTxPublishedMessage;
import io.bitsquare.util.handlers.ExceptionHandler;
@ -34,12 +34,12 @@ public class SendDepositTxIdToTaker {
private static final Logger log = LoggerFactory.getLogger(SendDepositTxIdToTaker.class);
public static void run(ResultHandler resultHandler, ExceptionHandler exceptionHandler, Peer peer,
MessageService messageService, String tradeId, Transaction depositTransaction) {
TradeMessageService tradeMessageService, String tradeId, Transaction depositTransaction) {
log.trace("Run task");
DepositTxPublishedMessage tradeMessage =
new DepositTxPublishedMessage(tradeId, Utils.HEX.encode(depositTransaction.bitcoinSerialize()));
messageService.sendMessage(peer, tradeMessage, new OutgoingMessageListener() {
tradeMessageService.sendMessage(peer, tradeMessage, new OutgoingMessageListener() {
@Override
public void onResult() {
log.trace("DepositTxPublishedMessage successfully arrived at peer");

View File

@ -18,8 +18,8 @@
package io.bitsquare.trade.protocol.trade.offerer.tasks;
import io.bitsquare.btc.WalletService;
import io.bitsquare.msg.MessageService;
import io.bitsquare.msg.listeners.OutgoingMessageListener;
import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.trade.listeners.OutgoingMessageListener;
import io.bitsquare.network.Peer;
import io.bitsquare.trade.protocol.trade.offerer.messages.BankTransferInitedMessage;
import io.bitsquare.util.handlers.ExceptionHandler;
@ -39,7 +39,7 @@ public class SendSignedPayoutTx {
public static void run(ResultHandler resultHandler,
ExceptionHandler exceptionHandler,
Peer peer,
MessageService messageService,
TradeMessageService tradeMessageService,
WalletService walletService,
String tradeId,
String takerPayoutAddress,
@ -68,7 +68,7 @@ public class SendSignedPayoutTx {
takerPaybackAmount,
offererPayoutAddress);
messageService.sendMessage(peer, tradeMessage, new OutgoingMessageListener() {
tradeMessageService.sendMessage(peer, tradeMessage, new OutgoingMessageListener() {
@Override
public void onResult() {
log.trace("BankTransferInitedMessage successfully arrived at peer");

View File

@ -21,11 +21,11 @@ import io.bitsquare.bank.BankAccount;
import io.bitsquare.btc.BlockChainService;
import io.bitsquare.btc.WalletService;
import io.bitsquare.crypto.SignatureService;
import io.bitsquare.msg.MessageService;
import io.bitsquare.network.Peer;
import io.bitsquare.offer.Offer;
import io.bitsquare.trade.Contract;
import io.bitsquare.trade.Trade;
import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.trade.protocol.trade.offerer.messages.BankTransferInitedMessage;
import io.bitsquare.trade.protocol.trade.offerer.messages.DepositTxPublishedMessage;
import io.bitsquare.trade.protocol.trade.offerer.messages.RequestTakerDepositPaymentMessage;
@ -86,7 +86,7 @@ public class SellerTakesOfferProtocol {
// provided data
private final Trade trade;
private final SellerTakesOfferProtocolListener listener;
private final MessageService messageService;
private final TradeMessageService tradeMessageService;
private final WalletService walletService;
private final BlockChainService blockChainService;
private final SignatureService signatureService;
@ -133,14 +133,14 @@ public class SellerTakesOfferProtocol {
public SellerTakesOfferProtocol(Trade trade,
SellerTakesOfferProtocolListener listener,
MessageService messageService,
TradeMessageService tradeMessageService,
WalletService walletService,
BlockChainService blockChainService,
SignatureService signatureService,
User user) {
this.trade = trade;
this.listener = listener;
this.messageService = messageService;
this.tradeMessageService = tradeMessageService;
this.walletService = walletService;
this.blockChainService = blockChainService;
this.signatureService = signatureService;
@ -164,10 +164,13 @@ public class SellerTakesOfferProtocol {
state = State.Init;
}
// 1. GetPeerAddress
// Async
// In case of an error: No rollback activity needed
public void start() {
log.debug("start called " + step++);
state = State.GetPeerAddress;
GetPeerAddress.run(this::onResultGetPeerAddress, this::onFault, messageService, peersMessagePublicKey);
GetPeerAddress.run(this::onResultGetPeerAddress, this::onFault, tradeMessageService, peersMessagePublicKey);
}
public void onResultGetPeerAddress(Peer peer) {
@ -175,7 +178,7 @@ public class SellerTakesOfferProtocol {
this.peer = peer;
state = State.RequestTakeOffer;
RequestTakeOffer.run(this::onResultRequestTakeOffer, this::onFault, peer, messageService, tradeId);
RequestTakeOffer.run(this::onResultRequestTakeOffer, this::onFault, peer, tradeMessageService, tradeId);
}
public void onResultRequestTakeOffer() {
@ -211,7 +214,7 @@ public class SellerTakesOfferProtocol {
state = State.SendTakeOfferFeePayedTxId;
SendTakeOfferFeePayedTxId.run(this::onResultSendTakeOfferFeePayedTxId, this::onFault, peer,
messageService, tradeId, takeOfferFeeTxId, tradeAmount, pubKeyForThatTrade);
tradeMessageService, tradeId, takeOfferFeeTxId, tradeAmount, pubKeyForThatTrade);
}
public void onResultSendTakeOfferFeePayedTxId() {
@ -291,7 +294,7 @@ public class SellerTakesOfferProtocol {
SendSignedTakerDepositTxAsHex.run(this::onResultSendSignedTakerDepositTxAsHex,
this::onFault,
peer,
messageService,
tradeMessageService,
walletService,
bankAccount,
accountId,
@ -385,7 +388,7 @@ public class SellerTakesOfferProtocol {
listener.onPayoutTxPublished(trade, transaction);
state = State.SendPayoutTxToOfferer;
SendPayoutTxToOfferer.run(this::onResultSendPayoutTxToOfferer, this::onFault, peer, messageService,
SendPayoutTxToOfferer.run(this::onResultSendPayoutTxToOfferer, this::onFault, peer, tradeMessageService,
tradeId, payoutTxAsHex);
}

View File

@ -17,9 +17,9 @@
package io.bitsquare.trade.protocol.trade.taker.tasks;
import io.bitsquare.msg.MessageService;
import io.bitsquare.msg.listeners.GetPeerAddressListener;
import io.bitsquare.network.Peer;
import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.trade.listeners.GetPeerAddressListener;
import io.bitsquare.util.handlers.ExceptionHandler;
import java.security.PublicKey;
@ -29,11 +29,11 @@ import org.slf4j.LoggerFactory;
public class GetPeerAddress {
private static final Logger log = LoggerFactory.getLogger(GetPeerAddress.class);
public static void run(ResultHandler resultHandler, ExceptionHandler exceptionHandler,
MessageService messageService, PublicKey messagePublicKey) {
log.trace("Run task");
messageService.getPeerAddress(messagePublicKey, new GetPeerAddressListener() {
TradeMessageService tradeMessageService, PublicKey messagePublicKey) {
log.trace("Run GetPeerAddress task");
tradeMessageService.getPeerAddress(messagePublicKey, new GetPeerAddressListener() {
@Override
public void onResult(Peer peer) {
log.trace("Received peer = " + peer.toString());
@ -42,8 +42,8 @@ public class GetPeerAddress {
@Override
public void onFailed() {
log.error("Lookup for peer address faultHandler.onFault.");
exceptionHandler.handleException(new Exception("Lookup for peer address faultHandler.onFault."));
log.error("Lookup for peer address failed.");
exceptionHandler.handleException(new Exception("Lookup for peer address failed."));
}
});
}

View File

@ -17,8 +17,8 @@
package io.bitsquare.trade.protocol.trade.taker.tasks;
import io.bitsquare.msg.MessageService;
import io.bitsquare.msg.listeners.OutgoingMessageListener;
import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.trade.listeners.OutgoingMessageListener;
import io.bitsquare.network.Peer;
import io.bitsquare.trade.protocol.trade.taker.messages.RequestTakeOfferMessage;
import io.bitsquare.util.handlers.ExceptionHandler;
@ -31,9 +31,9 @@ public class RequestTakeOffer {
private static final Logger log = LoggerFactory.getLogger(RequestTakeOffer.class);
public static void run(ResultHandler resultHandler, ExceptionHandler exceptionHandler, Peer peer,
MessageService messageService, String tradeId) {
TradeMessageService tradeMessageService, String tradeId) {
log.trace("Run task");
messageService.sendMessage(peer, new RequestTakeOfferMessage(tradeId),
tradeMessageService.sendMessage(peer, new RequestTakeOfferMessage(tradeId),
new OutgoingMessageListener() {
@Override
public void onResult() {

View File

@ -17,8 +17,8 @@
package io.bitsquare.trade.protocol.trade.taker.tasks;
import io.bitsquare.msg.MessageService;
import io.bitsquare.msg.listeners.OutgoingMessageListener;
import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.trade.listeners.OutgoingMessageListener;
import io.bitsquare.network.Peer;
import io.bitsquare.trade.protocol.trade.taker.messages.PayoutTxPublishedMessage;
import io.bitsquare.util.handlers.ExceptionHandler;
@ -31,10 +31,10 @@ public class SendPayoutTxToOfferer {
private static final Logger log = LoggerFactory.getLogger(SendPayoutTxToOfferer.class);
public static void run(ResultHandler resultHandler, ExceptionHandler exceptionHandler, Peer peer,
MessageService messageService, String tradeId, String payoutTxAsHex) {
TradeMessageService tradeMessageService, String tradeId, String payoutTxAsHex) {
log.trace("Run task");
PayoutTxPublishedMessage tradeMessage = new PayoutTxPublishedMessage(tradeId, payoutTxAsHex);
messageService.sendMessage(peer, tradeMessage, new OutgoingMessageListener() {
tradeMessageService.sendMessage(peer, tradeMessage, new OutgoingMessageListener() {
@Override
public void onResult() {
log.trace("PayoutTxPublishedMessage successfully arrived at peer");

View File

@ -19,8 +19,8 @@ package io.bitsquare.trade.protocol.trade.taker.tasks;
import io.bitsquare.bank.BankAccount;
import io.bitsquare.btc.WalletService;
import io.bitsquare.msg.MessageService;
import io.bitsquare.msg.listeners.OutgoingMessageListener;
import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.trade.listeners.OutgoingMessageListener;
import io.bitsquare.network.Peer;
import io.bitsquare.trade.protocol.trade.taker.messages.RequestOffererPublishDepositTxMessage;
import io.bitsquare.util.handlers.ExceptionHandler;
@ -40,7 +40,7 @@ public class SendSignedTakerDepositTxAsHex {
public static void run(ResultHandler resultHandler,
ExceptionHandler exceptionHandler,
Peer peer,
MessageService messageService,
TradeMessageService tradeMessageService,
WalletService walletService,
BankAccount bankAccount,
String accountId,
@ -68,7 +68,7 @@ public class SendSignedTakerDepositTxAsHex {
walletService.getAddressInfoByTradeID(tradeId).getAddressString(),
takerTxOutIndex,
offererTxOutIndex);
messageService.sendMessage(peer, tradeMessage, new OutgoingMessageListener() {
tradeMessageService.sendMessage(peer, tradeMessage, new OutgoingMessageListener() {
@Override
public void onResult() {
log.trace("RequestOffererDepositPublicationMessage successfully arrived at peer");

View File

@ -17,8 +17,8 @@
package io.bitsquare.trade.protocol.trade.taker.tasks;
import io.bitsquare.msg.MessageService;
import io.bitsquare.msg.listeners.OutgoingMessageListener;
import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.trade.listeners.OutgoingMessageListener;
import io.bitsquare.network.Peer;
import io.bitsquare.trade.protocol.trade.taker.messages.TakeOfferFeePayedMessage;
import io.bitsquare.util.handlers.ExceptionHandler;
@ -35,7 +35,7 @@ public class SendTakeOfferFeePayedTxId {
public static void run(ResultHandler resultHandler,
ExceptionHandler exceptionHandler,
Peer peer,
MessageService messageService,
TradeMessageService tradeMessageService,
String tradeId,
String takeOfferFeeTxId,
Coin tradeAmount,
@ -44,7 +44,7 @@ public class SendTakeOfferFeePayedTxId {
TakeOfferFeePayedMessage msg = new TakeOfferFeePayedMessage(tradeId, takeOfferFeeTxId, tradeAmount,
pubKeyForThatTradeAsHex);
messageService.sendMessage(peer, msg, new OutgoingMessageListener() {
tradeMessageService.sendMessage(peer, msg, new OutgoingMessageListener() {
@Override
public void onResult() {
log.trace("TakeOfferFeePayedMessage successfully arrived at peer");

View File

@ -0,0 +1,43 @@
/*
* This file is part of Bitsquare.
*
* Bitsquare is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bitsquare is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.trade.tomp2p;
import io.bitsquare.trade.TradeMessageModule;
import io.bitsquare.trade.TradeMessageService;
import com.google.inject.Injector;
import com.google.inject.Singleton;
import org.springframework.core.env.Environment;
public class TomP2PTradeMessageModule extends TradeMessageModule {
public TomP2PTradeMessageModule(Environment env) {
super(env);
}
@Override
protected void doConfigure() {
bind(TradeMessageService.class).to(TomP2PTradeMessageService.class).in(Singleton.class);
}
@Override
protected void doClose(Injector injector) {
super.doClose(injector);
}
}

View File

@ -0,0 +1,156 @@
/*
* This file is part of Bitsquare.
*
* Bitsquare is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bitsquare is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.trade.tomp2p;
import io.bitsquare.network.Message;
import io.bitsquare.network.Peer;
import io.bitsquare.network.tomp2p.TomP2PNode;
import io.bitsquare.network.tomp2p.TomP2PPeer;
import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.trade.listeners.GetPeerAddressListener;
import io.bitsquare.trade.listeners.IncomingMessageListener;
import io.bitsquare.trade.listeners.OutgoingMessageListener;
import io.bitsquare.user.User;
import java.security.PublicKey;
import java.util.ArrayList;
import java.util.List;
import javax.inject.Inject;
import javafx.application.Platform;
import net.tomp2p.dht.FutureGet;
import net.tomp2p.futures.BaseFuture;
import net.tomp2p.futures.BaseFutureAdapter;
import net.tomp2p.futures.BaseFutureListener;
import net.tomp2p.futures.FutureDirect;
import net.tomp2p.peers.Number160;
import net.tomp2p.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* That service delivers direct messaging and DHT functionality from the TomP2P library
* It is the translating domain specific functionality to the messaging layer.
* The TomP2P library codebase shall not be used outside that service.
* That way we limit the dependency of the TomP2P library only to that class (and it's sub components).
* <p/>
* TODO: improve callbacks that Platform.runLater is not necessary. We call usually that methods form teh UI thread.
*/
public class TomP2PTradeMessageService implements TradeMessageService {
private static final Logger log = LoggerFactory.getLogger(TomP2PTradeMessageService.class);
private final TomP2PNode tomP2PNode;
private final User user;
private final List<IncomingMessageListener> incomingMessageListeners = new ArrayList<>();
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
@Inject
public TomP2PTradeMessageService(User user, TomP2PNode tomP2PNode) {
this.user = user;
this.tomP2PNode = tomP2PNode;
}
///////////////////////////////////////////////////////////////////////////////////////////
// Find peer address by publicKey
///////////////////////////////////////////////////////////////////////////////////////////
public void getPeerAddress(PublicKey publicKey, GetPeerAddressListener listener) {
final Number160 locationKey = Utils.makeSHAHash(publicKey.getEncoded());
FutureGet futureGet = tomP2PNode.getDomainProtectedData(locationKey, publicKey);
futureGet.addListener(new BaseFutureAdapter<BaseFuture>() {
@Override
public void operationComplete(BaseFuture baseFuture) throws Exception {
if (baseFuture.isSuccess() && futureGet.data() != null) {
final Peer peer = (Peer) futureGet.data().object();
Platform.runLater(() -> listener.onResult(peer));
}
else {
log.error("getPeerAddress failed. failedReason = " + baseFuture.failedReason());
Platform.runLater(listener::onFailed);
}
}
});
}
///////////////////////////////////////////////////////////////////////////////////////////
// Trade process
///////////////////////////////////////////////////////////////////////////////////////////
public void sendMessage(Peer peer, Message message,
OutgoingMessageListener listener) {
if (!(peer instanceof TomP2PPeer)) {
throw new IllegalArgumentException("peer must be of type TomP2PPeer");
}
FutureDirect futureDirect = tomP2PNode.sendData(((TomP2PPeer) peer).getPeerAddress(), message);
futureDirect.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
if (future.isSuccess()) {
Platform.runLater(listener::onResult);
}
else {
log.error("sendMessage failed with reason " + futureDirect.failedReason());
Platform.runLater(listener::onFailed);
}
}
@Override
public void exceptionCaught(Throwable t) throws Exception {
Platform.runLater(listener::onFailed);
}
});
}
///////////////////////////////////////////////////////////////////////////////////////////
// Event Listeners
///////////////////////////////////////////////////////////////////////////////////////////
public void addIncomingMessageListener(IncomingMessageListener listener) {
incomingMessageListeners.add(listener);
}
public void removeIncomingMessageListener(IncomingMessageListener listener) {
incomingMessageListeners.remove(listener);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Incoming message handler
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void handleMessage(Object message, Peer sender) {
if (message instanceof Message) {
Platform.runLater(() -> incomingMessageListeners.stream().forEach(e ->
e.onMessage((Message) message, sender)));
}
}
}

View File

@ -15,7 +15,7 @@
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.msg.tomp2p;
package io.bitsquare.network.tomp2p;
import java.io.IOException;

View File

@ -25,17 +25,18 @@ import io.bitsquare.btc.UserAgent;
import io.bitsquare.btc.WalletService;
import io.bitsquare.locale.CountryUtil;
import io.bitsquare.locale.LanguageUtil;
import io.bitsquare.msg.tomp2p.BootstrappedPeerBuilder;
import io.bitsquare.msg.tomp2p.TomP2PMessageService;
import io.bitsquare.msg.tomp2p.TomP2PNode;
import io.bitsquare.network.BootstrapState;
import io.bitsquare.network.Node;
import io.bitsquare.network.tomp2p.BootstrappedPeerBuilder;
import io.bitsquare.network.tomp2p.TomP2PNode;
import io.bitsquare.offer.Direction;
import io.bitsquare.offer.Offer;
import io.bitsquare.offer.RemoteOfferBook;
import io.bitsquare.offer.tomp2p.TomP2POfferBook;
import io.bitsquare.persistence.Persistence;
import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.trade.handlers.TransactionResultHandler;
import io.bitsquare.trade.tomp2p.TomP2PTradeMessageService;
import io.bitsquare.user.User;
import io.bitsquare.util.DSAKeyUtil;
import io.bitsquare.util.handlers.FaultHandler;
@ -80,11 +81,13 @@ public class PlaceOfferProtocolTest {
private static final Logger log = LoggerFactory.getLogger(PlaceOfferProtocolTest.class);
private WalletService walletService;
private TomP2PMessageService messageService;
private TradeMessageService tradeMessageService;
private RemoteOfferBook remoteOfferBook;
private final File dir = new File("./temp");
private final static String OFFER_ID = "offerID";
private Address address;
private TomP2PNode tomP2PNode;
private BootstrappedPeerBuilder bootstrappedPeerBuilder;
@Before
public void setup() throws InterruptedException {
@ -98,11 +101,11 @@ public class PlaceOfferProtocolTest {
Node bootstrapNode = Node.at("localhost", "127.0.0.1");
User user = new User();
user.applyPersistedUser(null);
BootstrappedPeerBuilder bootstrappedPeerBuilder = new BootstrappedPeerBuilder(Node.DEFAULT_PORT, false, bootstrapNode, "<unspecified>");
TomP2PNode p2pNode = new TomP2PNode(bootstrappedPeerBuilder);
messageService = new TomP2PMessageService(user, p2pNode);
bootstrappedPeerBuilder = new BootstrappedPeerBuilder(Node.DEFAULT_PORT, false, bootstrapNode, "<unspecified>");
tomP2PNode = new TomP2PNode(bootstrappedPeerBuilder);
tradeMessageService = new TomP2PTradeMessageService(user, tomP2PNode);
Observable<BootstrapState> messageObservable = messageService.init();
Observable<BootstrapState> messageObservable = tomP2PNode.bootstrap(user.getMessageKeyPair(), tradeMessageService);
messageObservable.publish();
messageObservable.subscribe(
state -> log.trace("state changed: " + state),
@ -112,7 +115,7 @@ public class PlaceOfferProtocolTest {
() -> {
log.trace("message completed");
remoteOfferBook = new TomP2POfferBook(p2pNode);
remoteOfferBook = new TomP2POfferBook(tomP2PNode);
remoteOfferBook.setExecutor(Threading.SAME_THREAD);
}
);
@ -161,7 +164,7 @@ public class PlaceOfferProtocolTest {
@After
public void shutDown() throws IOException, InterruptedException {
walletService.shutDown();
messageService.shutDown();
bootstrappedPeerBuilder.shutDown();
}
@Test