Use Executor instead of Platform::runLater

This commit is contained in:
Manfred Karrer 2015-03-08 23:16:58 +01:00
parent 5cc15e0a41
commit b70868f793
13 changed files with 118 additions and 69 deletions

View file

@ -21,8 +21,10 @@ package io.bitsquare.arbitrator;
import io.bitsquare.arbitrator.listeners.ArbitratorListener; import io.bitsquare.arbitrator.listeners.ArbitratorListener;
import java.util.Locale; import java.util.Locale;
import java.util.concurrent.Executor;
public interface ArbitratorMessageService { public interface ArbitratorMessageService {
void setExecutor(Executor executor);
void addArbitrator(Arbitrator arbitrator); void addArbitrator(Arbitrator arbitrator);

View file

@ -19,10 +19,16 @@ package io.bitsquare.arbitrator.tomp2p;
import io.bitsquare.arbitrator.ArbitratorMessageModule; import io.bitsquare.arbitrator.ArbitratorMessageModule;
import io.bitsquare.arbitrator.ArbitratorMessageService; import io.bitsquare.arbitrator.ArbitratorMessageService;
import io.bitsquare.network.tomp2p.TomP2PNode;
import com.google.inject.Injector; import com.google.inject.Injector;
import com.google.inject.Provider;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import javax.inject.Inject;
import javafx.application.Platform;
import org.springframework.core.env.Environment; import org.springframework.core.env.Environment;
public class TomP2PArbitratorMessageModule extends ArbitratorMessageModule { public class TomP2PArbitratorMessageModule extends ArbitratorMessageModule {
@ -33,7 +39,7 @@ public class TomP2PArbitratorMessageModule extends ArbitratorMessageModule {
@Override @Override
protected void doConfigure() { protected void doConfigure() {
bind(ArbitratorMessageService.class).to(TomP2PArbitratorMessageService.class).in(Singleton.class); bind(ArbitratorMessageService.class).toProvider(ArbitratorMessageServiceProvider.class).in(Singleton.class);
} }
@Override @Override
@ -41,3 +47,17 @@ public class TomP2PArbitratorMessageModule extends ArbitratorMessageModule {
super.doClose(injector); super.doClose(injector);
} }
} }
class ArbitratorMessageServiceProvider implements Provider<ArbitratorMessageService> {
private final ArbitratorMessageService arbitratorMessageService;
@Inject
public ArbitratorMessageServiceProvider(TomP2PNode tomP2PNode) {
arbitratorMessageService = new TomP2PArbitratorMessageService(tomP2PNode);
arbitratorMessageService.setExecutor(Platform::runLater);
}
public ArbitratorMessageService get() {
return arbitratorMessageService;
}
}

View file

@ -27,10 +27,7 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.concurrent.Executor;
import javax.inject.Inject;
import javafx.application.Platform;
import net.tomp2p.dht.FutureGet; import net.tomp2p.dht.FutureGet;
import net.tomp2p.dht.FuturePut; import net.tomp2p.dht.FuturePut;
@ -51,17 +48,21 @@ public class TomP2PArbitratorMessageService implements ArbitratorMessageService
private final TomP2PNode tomP2PNode; private final TomP2PNode tomP2PNode;
private final List<ArbitratorListener> arbitratorListeners = new ArrayList<>(); private final List<ArbitratorListener> arbitratorListeners = new ArrayList<>();
private Executor executor;
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Constructor // Constructor
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@Inject
public TomP2PArbitratorMessageService(TomP2PNode tomP2PNode) { public TomP2PArbitratorMessageService(TomP2PNode tomP2PNode) {
this.tomP2PNode = tomP2PNode; this.tomP2PNode = tomP2PNode;
} }
public void setExecutor(Executor executor) {
this.executor = executor;
}
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Arbitrators // Arbitrators
@ -76,7 +77,7 @@ public class TomP2PArbitratorMessageService implements ArbitratorMessageService
addFuture.addListener(new BaseFutureAdapter<BaseFuture>() { addFuture.addListener(new BaseFutureAdapter<BaseFuture>() {
@Override @Override
public void operationComplete(BaseFuture future) throws Exception { public void operationComplete(BaseFuture future) throws Exception {
Platform.runLater(() -> arbitratorListeners.stream().forEach(listener -> executor.execute(() -> arbitratorListeners.stream().forEach(listener ->
{ {
try { try {
Object arbitratorDataObject = arbitratorData.object(); Object arbitratorDataObject = arbitratorData.object();
@ -110,7 +111,7 @@ public class TomP2PArbitratorMessageService implements ArbitratorMessageService
removeFuture.addListener(new BaseFutureAdapter<BaseFuture>() { removeFuture.addListener(new BaseFutureAdapter<BaseFuture>() {
@Override @Override
public void operationComplete(BaseFuture future) throws Exception { public void operationComplete(BaseFuture future) throws Exception {
Platform.runLater(() -> arbitratorListeners.stream().forEach(listener -> executor.execute(() -> arbitratorListeners.stream().forEach(listener ->
{ {
for (Data arbitratorData : removeFuture.dataMap().values()) { for (Data arbitratorData : removeFuture.dataMap().values()) {
try { try {
@ -141,7 +142,7 @@ public class TomP2PArbitratorMessageService implements ArbitratorMessageService
futureGet.addListener(new BaseFutureAdapter<BaseFuture>() { futureGet.addListener(new BaseFutureAdapter<BaseFuture>() {
@Override @Override
public void operationComplete(BaseFuture future) throws Exception { public void operationComplete(BaseFuture future) throws Exception {
Platform.runLater(() -> arbitratorListeners.stream().forEach(listener -> executor.execute(() -> arbitratorListeners.stream().forEach(listener ->
{ {
List<Arbitrator> arbitrators = new ArrayList<>(); List<Arbitrator> arbitrators = new ArrayList<>();
for (Data arbitratorData : futureGet.dataMap().values()) { for (Data arbitratorData : futureGet.dataMap().values()) {

View file

@ -21,7 +21,7 @@ import io.bitsquare.bank.BankAccount;
import io.bitsquare.locale.Country; import io.bitsquare.locale.Country;
import io.bitsquare.locale.CurrencyUtil; import io.bitsquare.locale.CurrencyUtil;
import io.bitsquare.offer.Offer; import io.bitsquare.offer.Offer;
import io.bitsquare.offer.RemoteOfferBook; import io.bitsquare.offer.OfferBookService;
import io.bitsquare.user.User; import io.bitsquare.user.User;
import io.bitsquare.util.Utilities; import io.bitsquare.util.Utilities;
@ -50,11 +50,11 @@ public class OfferBook {
private static final Logger log = LoggerFactory.getLogger(OfferBook.class); private static final Logger log = LoggerFactory.getLogger(OfferBook.class);
private final RemoteOfferBook remoteOfferBook; private final OfferBookService offerBookService;
private final User user; private final User user;
private final ObservableList<OfferBookListItem> offerBookListItems = FXCollections.observableArrayList(); private final ObservableList<OfferBookListItem> offerBookListItems = FXCollections.observableArrayList();
private final RemoteOfferBook.Listener remoteOfferBookListener; private final OfferBookService.Listener remoteOfferBookListener;
private final ChangeListener<BankAccount> bankAccountChangeListener; private final ChangeListener<BankAccount> bankAccountChangeListener;
private final ChangeListener<Number> invalidationListener; private final ChangeListener<Number> invalidationListener;
private String fiatCode; private String fiatCode;
@ -68,14 +68,14 @@ public class OfferBook {
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@Inject @Inject
OfferBook(RemoteOfferBook remoteOfferBook, User user) { OfferBook(OfferBookService offerBookService, User user) {
this.remoteOfferBook = remoteOfferBook; this.offerBookService = offerBookService;
this.user = user; this.user = user;
bankAccountChangeListener = (observableValue, oldValue, newValue) -> setBankAccount(newValue); bankAccountChangeListener = (observableValue, oldValue, newValue) -> setBankAccount(newValue);
invalidationListener = (ov, oldValue, newValue) -> requestOffers(); invalidationListener = (ov, oldValue, newValue) -> requestOffers();
remoteOfferBookListener = new RemoteOfferBook.Listener() { remoteOfferBookListener = new OfferBookService.Listener() {
@Override @Override
public void onOfferAdded(Offer offer) { public void onOfferAdded(Offer offer) {
addOfferToOfferBookListItems(offer); addOfferToOfferBookListItems(offer);
@ -142,15 +142,15 @@ public class OfferBook {
private void addListeners() { private void addListeners() {
log.debug("addListeners "); log.debug("addListeners ");
user.currentBankAccountProperty().addListener(bankAccountChangeListener); user.currentBankAccountProperty().addListener(bankAccountChangeListener);
remoteOfferBook.addListener(remoteOfferBookListener); offerBookService.addListener(remoteOfferBookListener);
remoteOfferBook.invalidationTimestampProperty().addListener(invalidationListener); offerBookService.invalidationTimestampProperty().addListener(invalidationListener);
} }
private void removeListeners() { private void removeListeners() {
log.debug("removeListeners "); log.debug("removeListeners ");
user.currentBankAccountProperty().removeListener(bankAccountChangeListener); user.currentBankAccountProperty().removeListener(bankAccountChangeListener);
remoteOfferBook.removeListener(remoteOfferBookListener); offerBookService.removeListener(remoteOfferBookListener);
remoteOfferBook.invalidationTimestampProperty().removeListener(invalidationListener); offerBookService.invalidationTimestampProperty().removeListener(invalidationListener);
} }
private void addOfferToOfferBookListItems(Offer offer) { private void addOfferToOfferBookListItems(Offer offer) {
@ -160,7 +160,7 @@ public class OfferBook {
} }
private void requestOffers() { private void requestOffers() {
remoteOfferBook.getOffers(fiatCode); offerBookService.getOffers(fiatCode);
} }
@ -173,11 +173,11 @@ public class OfferBook {
addListeners(); addListeners();
setBankAccount(user.getCurrentBankAccount().get()); setBankAccount(user.getCurrentBankAccount().get());
pollingTimer = Utilities.setInterval(3000, (animationTimer) -> { pollingTimer = Utilities.setInterval(3000, (animationTimer) -> {
remoteOfferBook.requestInvalidationTimeStampFromDHT(fiatCode); offerBookService.requestInvalidationTimeStampFromDHT(fiatCode);
return null; return null;
}); });
remoteOfferBook.getOffers(fiatCode); offerBookService.getOffers(fiatCode);
} }
private void stopPolling() { private void stopPolling() {

View file

@ -25,7 +25,7 @@ import java.util.concurrent.Executor;
import javafx.beans.property.LongProperty; import javafx.beans.property.LongProperty;
public interface RemoteOfferBook { public interface OfferBookService {
void setExecutor(Executor executor); void setExecutor(Executor executor);

View file

@ -19,7 +19,7 @@ package io.bitsquare.offer.tomp2p;
import io.bitsquare.network.tomp2p.TomP2PNode; import io.bitsquare.network.tomp2p.TomP2PNode;
import io.bitsquare.offer.Offer; import io.bitsquare.offer.Offer;
import io.bitsquare.offer.RemoteOfferBook; import io.bitsquare.offer.OfferBookService;
import io.bitsquare.util.handlers.FaultHandler; import io.bitsquare.util.handlers.FaultHandler;
import io.bitsquare.util.handlers.ResultHandler; import io.bitsquare.util.handlers.ResultHandler;
@ -46,9 +46,9 @@ import net.tomp2p.storage.Data;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class TomP2POfferBook implements RemoteOfferBook { public class TomP2POfferBookService implements OfferBookService {
private static final Logger log = LoggerFactory.getLogger(TomP2POfferBook.class); private static final Logger log = LoggerFactory.getLogger(TomP2POfferBookService.class);
private final List<Listener> offerRepositoryListeners = new ArrayList<>(); private final List<Listener> offerRepositoryListeners = new ArrayList<>();
private final LongProperty invalidationTimestamp = new SimpleLongProperty(0); private final LongProperty invalidationTimestamp = new SimpleLongProperty(0);
@ -56,7 +56,7 @@ public class TomP2POfferBook implements RemoteOfferBook {
private final TomP2PNode tomP2PNode; private final TomP2PNode tomP2PNode;
private Executor executor; private Executor executor;
public TomP2POfferBook(TomP2PNode tomP2PNode) { public TomP2POfferBookService(TomP2PNode tomP2PNode) {
this.tomP2PNode = tomP2PNode; this.tomP2PNode = tomP2PNode;
} }

View file

@ -19,7 +19,7 @@ package io.bitsquare.offer.tomp2p;
import io.bitsquare.network.tomp2p.TomP2PNode; import io.bitsquare.network.tomp2p.TomP2PNode;
import io.bitsquare.offer.OfferModule; import io.bitsquare.offer.OfferModule;
import io.bitsquare.offer.RemoteOfferBook; import io.bitsquare.offer.OfferBookService;
import com.google.inject.Provider; import com.google.inject.Provider;
@ -37,20 +37,20 @@ public class TomP2POfferModule extends OfferModule {
@Override @Override
protected void configure() { protected void configure() {
bind(RemoteOfferBook.class).toProvider(RemoteOfferBookProvider.class).asEagerSingleton(); bind(OfferBookService.class).toProvider(OfferBookServiceProvider.class).asEagerSingleton();
} }
} }
class RemoteOfferBookProvider implements Provider<RemoteOfferBook> { class OfferBookServiceProvider implements Provider<OfferBookService> {
private final TomP2POfferBook remoteOfferBook; private final OfferBookService offerBookService;
@Inject @Inject
public RemoteOfferBookProvider(TomP2PNode tomP2PNode) { public OfferBookServiceProvider(TomP2PNode tomP2PNode) {
remoteOfferBook = new TomP2POfferBook(tomP2PNode); offerBookService = new TomP2POfferBookService(tomP2PNode);
remoteOfferBook.setExecutor(Platform::runLater); offerBookService.setExecutor(Platform::runLater);
} }
public RemoteOfferBook get() { public OfferBookService get() {
return remoteOfferBook; return offerBookService;
} }
} }

View file

@ -26,7 +26,7 @@ import io.bitsquare.network.Message;
import io.bitsquare.network.Peer; import io.bitsquare.network.Peer;
import io.bitsquare.offer.Direction; import io.bitsquare.offer.Direction;
import io.bitsquare.offer.Offer; import io.bitsquare.offer.Offer;
import io.bitsquare.offer.RemoteOfferBook; import io.bitsquare.offer.OfferBookService;
import io.bitsquare.persistence.Persistence; import io.bitsquare.persistence.Persistence;
import io.bitsquare.trade.handlers.TransactionResultHandler; import io.bitsquare.trade.handlers.TransactionResultHandler;
import io.bitsquare.trade.listeners.OutgoingMessageListener; import io.bitsquare.trade.listeners.OutgoingMessageListener;
@ -83,7 +83,7 @@ public class TradeManager {
private final BlockChainService blockChainService; private final BlockChainService blockChainService;
private final WalletService walletService; private final WalletService walletService;
private final SignatureService signatureService; private final SignatureService signatureService;
private final RemoteOfferBook remoteOfferBook; private final OfferBookService offerBookService;
//TODO store TakerAsSellerProtocol in trade //TODO store TakerAsSellerProtocol in trade
private final Map<String, SellerTakesOfferProtocol> takerAsSellerProtocolMap = new HashMap<>(); private final Map<String, SellerTakesOfferProtocol> takerAsSellerProtocolMap = new HashMap<>();
@ -106,7 +106,7 @@ public class TradeManager {
public TradeManager(User user, AccountSettings accountSettings, Persistence persistence, public TradeManager(User user, AccountSettings accountSettings, Persistence persistence,
TradeMessageService tradeMessageService, BlockChainService blockChainService, TradeMessageService tradeMessageService, BlockChainService blockChainService,
WalletService walletService, SignatureService signatureService, WalletService walletService, SignatureService signatureService,
RemoteOfferBook remoteOfferBook) { OfferBookService offerBookService) {
this.user = user; this.user = user;
this.accountSettings = accountSettings; this.accountSettings = accountSettings;
this.persistence = persistence; this.persistence = persistence;
@ -114,7 +114,7 @@ public class TradeManager {
this.blockChainService = blockChainService; this.blockChainService = blockChainService;
this.walletService = walletService; this.walletService = walletService;
this.signatureService = signatureService; this.signatureService = signatureService;
this.remoteOfferBook = remoteOfferBook; this.offerBookService = offerBookService;
Object offersObject = persistence.read(this, "offers"); Object offersObject = persistence.read(this, "offers");
if (offersObject instanceof Map) { if (offersObject instanceof Map) {
@ -175,7 +175,7 @@ public class TradeManager {
PlaceOfferProtocol placeOfferProtocol = new PlaceOfferProtocol( PlaceOfferProtocol placeOfferProtocol = new PlaceOfferProtocol(
offer, offer,
walletService, walletService,
remoteOfferBook, offerBookService,
(transaction) -> { (transaction) -> {
saveOffer(offer); saveOffer(offer);
resultHandler.handleResult(transaction); resultHandler.handleResult(transaction);
@ -192,7 +192,7 @@ public class TradeManager {
} }
public void requestRemoveOffer(Offer offer, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { public void requestRemoveOffer(Offer offer, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
remoteOfferBook.removeOffer(offer, offerBookService.removeOffer(offer,
() -> { () -> {
if (offers.containsKey(offer.getId())) { if (offers.containsKey(offer.getId())) {
offers.remove(offer.getId()); offers.remove(offer.getId());

View file

@ -19,14 +19,18 @@ package io.bitsquare.trade;
import io.bitsquare.network.Message; import io.bitsquare.network.Message;
import io.bitsquare.network.MessageBroker; import io.bitsquare.network.MessageBroker;
import io.bitsquare.network.Peer;
import io.bitsquare.trade.listeners.GetPeerAddressListener; import io.bitsquare.trade.listeners.GetPeerAddressListener;
import io.bitsquare.trade.listeners.IncomingMessageListener; import io.bitsquare.trade.listeners.IncomingMessageListener;
import io.bitsquare.trade.listeners.OutgoingMessageListener; import io.bitsquare.trade.listeners.OutgoingMessageListener;
import io.bitsquare.network.Peer;
import java.security.PublicKey; import java.security.PublicKey;
import java.util.concurrent.Executor;
public interface TradeMessageService extends MessageBroker { public interface TradeMessageService extends MessageBroker {
void setExecutor(Executor executor);
void sendMessage(Peer peer, Message message, OutgoingMessageListener listener); void sendMessage(Peer peer, Message message, OutgoingMessageListener listener);

View file

@ -19,7 +19,7 @@ package io.bitsquare.trade.protocol.placeoffer;
import io.bitsquare.btc.WalletService; import io.bitsquare.btc.WalletService;
import io.bitsquare.offer.Offer; import io.bitsquare.offer.Offer;
import io.bitsquare.offer.RemoteOfferBook; import io.bitsquare.offer.OfferBookService;
import io.bitsquare.trade.handlers.TransactionResultHandler; import io.bitsquare.trade.handlers.TransactionResultHandler;
import io.bitsquare.util.handlers.FaultHandler; import io.bitsquare.util.handlers.FaultHandler;
@ -43,16 +43,16 @@ public class PlaceOfferProtocol {
private final WalletService walletService; private final WalletService walletService;
private final TransactionResultHandler resultHandler; private final TransactionResultHandler resultHandler;
private final FaultHandler faultHandler; private final FaultHandler faultHandler;
private final RemoteOfferBook remoteOfferBook; private final OfferBookService offerBookService;
private int repeatAddOfferCallCounter = 0; private int repeatAddOfferCallCounter = 0;
public PlaceOfferProtocol(Offer offer, WalletService walletService, RemoteOfferBook remoteOfferBook, TransactionResultHandler resultHandler, public PlaceOfferProtocol(Offer offer, WalletService walletService, OfferBookService offerBookService, TransactionResultHandler resultHandler,
FaultHandler faultHandler) { FaultHandler faultHandler) {
this.offer = offer; this.offer = offer;
this.walletService = walletService; this.walletService = walletService;
this.resultHandler = resultHandler; this.resultHandler = resultHandler;
this.faultHandler = faultHandler; this.faultHandler = faultHandler;
this.remoteOfferBook = remoteOfferBook; this.offerBookService = offerBookService;
} }
public void placeOffer() { public void placeOffer() {
@ -129,7 +129,7 @@ public class PlaceOfferProtocol {
// need to write data before storage, otherwise hash is different when removing offer! // need to write data before storage, otherwise hash is different when removing offer!
offer.setOfferFeePaymentTxID(transaction.getHashAsString()); offer.setOfferFeePaymentTxID(transaction.getHashAsString());
remoteOfferBook.addOffer(offer, offerBookService.addOffer(offer,
() -> { () -> {
resultHandler.handleResult(transaction); resultHandler.handleResult(transaction);
}, },

View file

@ -17,12 +17,19 @@
package io.bitsquare.trade.tomp2p; package io.bitsquare.trade.tomp2p;
import io.bitsquare.network.tomp2p.TomP2PNode;
import io.bitsquare.trade.TradeMessageModule; import io.bitsquare.trade.TradeMessageModule;
import io.bitsquare.trade.TradeMessageService; import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.user.User;
import com.google.inject.Injector; import com.google.inject.Injector;
import com.google.inject.Provider;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import javax.inject.Inject;
import javafx.application.Platform;
import org.springframework.core.env.Environment; import org.springframework.core.env.Environment;
public class TomP2PTradeMessageModule extends TradeMessageModule { public class TomP2PTradeMessageModule extends TradeMessageModule {
@ -33,7 +40,7 @@ public class TomP2PTradeMessageModule extends TradeMessageModule {
@Override @Override
protected void doConfigure() { protected void doConfigure() {
bind(TradeMessageService.class).to(TomP2PTradeMessageService.class).in(Singleton.class); bind(TradeMessageService.class).toProvider(TomP2PTradeMessageServiceProvider.class).in(Singleton.class);
} }
@Override @Override
@ -41,3 +48,17 @@ public class TomP2PTradeMessageModule extends TradeMessageModule {
super.doClose(injector); super.doClose(injector);
} }
} }
class TomP2PTradeMessageServiceProvider implements Provider<TradeMessageService> {
private final TradeMessageService tradeMessageService;
@Inject
public TomP2PTradeMessageServiceProvider(User user, TomP2PNode tomP2PNode) {
tradeMessageService = new TomP2PTradeMessageService(user, tomP2PNode);
tradeMessageService.setExecutor(Platform::runLater);
}
public TradeMessageService get() {
return tradeMessageService;
}
}

View file

@ -31,10 +31,7 @@ import java.security.PublicKey;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.Executor;
import javax.inject.Inject;
import javafx.application.Platform;
import net.tomp2p.dht.FutureGet; import net.tomp2p.dht.FutureGet;
import net.tomp2p.futures.BaseFuture; import net.tomp2p.futures.BaseFuture;
@ -54,26 +51,30 @@ import org.slf4j.LoggerFactory;
* The TomP2P library codebase shall not be used outside that service. * The TomP2P library codebase shall not be used outside that service.
* That way we limit the dependency of the TomP2P library only to that class (and it's sub components). * That way we limit the dependency of the TomP2P library only to that class (and it's sub components).
* <p/> * <p/>
* TODO: improve callbacks that Platform.runLater is not necessary. We call usually that methods form teh UI thread. * TODO: improve callbacks that executor.execute is not necessary. We call usually that methods form teh UI thread.
*/ */
public class TomP2PTradeMessageService implements TradeMessageService { public class TomP2PTradeMessageService implements TradeMessageService {
private static final Logger log = LoggerFactory.getLogger(TomP2PTradeMessageService.class); private static final Logger log = LoggerFactory.getLogger(TomP2PTradeMessageService.class);
private final TomP2PNode tomP2PNode; private final TomP2PNode tomP2PNode;
private final User user; private final User user;
private final List<IncomingMessageListener> incomingMessageListeners = new ArrayList<>(); private final List<IncomingMessageListener> incomingMessageListeners = new ArrayList<>();
private Executor executor;
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Constructor // Constructor
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@Inject
public TomP2PTradeMessageService(User user, TomP2PNode tomP2PNode) { public TomP2PTradeMessageService(User user, TomP2PNode tomP2PNode) {
this.user = user; this.user = user;
this.tomP2PNode = tomP2PNode; this.tomP2PNode = tomP2PNode;
} }
public void setExecutor(Executor executor) {
this.executor = executor;
}
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -89,11 +90,11 @@ public class TomP2PTradeMessageService implements TradeMessageService {
public void operationComplete(BaseFuture baseFuture) throws Exception { public void operationComplete(BaseFuture baseFuture) throws Exception {
if (baseFuture.isSuccess() && futureGet.data() != null) { if (baseFuture.isSuccess() && futureGet.data() != null) {
final Peer peer = (Peer) futureGet.data().object(); final Peer peer = (Peer) futureGet.data().object();
Platform.runLater(() -> listener.onResult(peer)); executor.execute(() -> listener.onResult(peer));
} }
else { else {
log.error("getPeerAddress failed. failedReason = " + baseFuture.failedReason()); log.error("getPeerAddress failed. failedReason = " + baseFuture.failedReason());
Platform.runLater(listener::onFailed); executor.execute(listener::onFailed);
} }
} }
}); });
@ -113,17 +114,17 @@ public class TomP2PTradeMessageService implements TradeMessageService {
@Override @Override
public void operationComplete(BaseFuture future) throws Exception { public void operationComplete(BaseFuture future) throws Exception {
if (future.isSuccess()) { if (future.isSuccess()) {
Platform.runLater(listener::onResult); executor.execute(listener::onResult);
} }
else { else {
log.error("sendMessage failed with reason " + futureDirect.failedReason()); log.error("sendMessage failed with reason " + futureDirect.failedReason());
Platform.runLater(listener::onFailed); executor.execute(listener::onFailed);
} }
} }
@Override @Override
public void exceptionCaught(Throwable t) throws Exception { public void exceptionCaught(Throwable t) throws Exception {
Platform.runLater(listener::onFailed); executor.execute(listener::onFailed);
} }
}); });
} }
@ -149,7 +150,7 @@ public class TomP2PTradeMessageService implements TradeMessageService {
@Override @Override
public void handleMessage(Object message, Peer sender) { public void handleMessage(Object message, Peer sender) {
if (message instanceof Message) { if (message instanceof Message) {
Platform.runLater(() -> incomingMessageListeners.stream().forEach(e -> executor.execute(() -> incomingMessageListeners.stream().forEach(e ->
e.onMessage((Message) message, sender))); e.onMessage((Message) message, sender)));
} }
} }

View file

@ -31,8 +31,8 @@ import io.bitsquare.network.tomp2p.BootstrappedPeerBuilder;
import io.bitsquare.network.tomp2p.TomP2PNode; import io.bitsquare.network.tomp2p.TomP2PNode;
import io.bitsquare.offer.Direction; import io.bitsquare.offer.Direction;
import io.bitsquare.offer.Offer; import io.bitsquare.offer.Offer;
import io.bitsquare.offer.RemoteOfferBook; import io.bitsquare.offer.OfferBookService;
import io.bitsquare.offer.tomp2p.TomP2POfferBook; import io.bitsquare.offer.tomp2p.TomP2POfferBookService;
import io.bitsquare.persistence.Persistence; import io.bitsquare.persistence.Persistence;
import io.bitsquare.trade.TradeMessageService; import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.trade.handlers.TransactionResultHandler; import io.bitsquare.trade.handlers.TransactionResultHandler;
@ -82,7 +82,7 @@ public class PlaceOfferProtocolTest {
private WalletService walletService; private WalletService walletService;
private TradeMessageService tradeMessageService; private TradeMessageService tradeMessageService;
private RemoteOfferBook remoteOfferBook; private OfferBookService offerBookService;
private final File dir = new File("./temp"); private final File dir = new File("./temp");
private final static String OFFER_ID = "offerID"; private final static String OFFER_ID = "offerID";
private Address address; private Address address;
@ -115,8 +115,8 @@ public class PlaceOfferProtocolTest {
() -> { () -> {
log.trace("message completed"); log.trace("message completed");
remoteOfferBook = new TomP2POfferBook(tomP2PNode); offerBookService = new TomP2POfferBookService(tomP2PNode);
remoteOfferBook.setExecutor(Threading.SAME_THREAD); offerBookService.setExecutor(Threading.SAME_THREAD);
} }
); );
bootstrappedPeerBuilder.start(); bootstrappedPeerBuilder.start();
@ -222,7 +222,7 @@ public class PlaceOfferProtocolTest {
CountDownLatch countDownLatch = new CountDownLatch(2); CountDownLatch countDownLatch = new CountDownLatch(2);
try { try {
Offer offer = getOffer(); Offer offer = getOffer();
remoteOfferBook.addListener(new RemoteOfferBook.Listener() { offerBookService.addListener(new OfferBookService.Listener() {
@Override @Override
public void onOfferAdded(Offer offer1) { public void onOfferAdded(Offer offer1) {
assertEquals("Offer matching", offer.getId(), offer1.getId()); assertEquals("Offer matching", offer.getId(), offer1.getId());
@ -296,7 +296,7 @@ public class PlaceOfferProtocolTest {
InterruptedException { InterruptedException {
return new PlaceOfferProtocol(offer, return new PlaceOfferProtocol(offer,
walletService, walletService,
remoteOfferBook, offerBookService,
resultHandler, resultHandler,
faultHandler); faultHandler);
} }