From 40807b07f84f52e172069c33b3ff2bd2fcd0a730 Mon Sep 17 00:00:00 2001 From: Chris Beams Date: Wed, 5 Nov 2014 11:45:59 +0100 Subject: [PATCH] Introduce OfferRepository interface and TomP2P impl This change begins the process of breaking up the monolithic MessageFacade abstraction into smaller, cohesive Repositories (in the Domain-Driven Design sense of the word) that abstract callers away from networking details. It also begins the process of restructuring the msg.listeners package, such that individual *Listener interfaces are co-located with their respective Repositories --- .../io/bitsquare/app/BitsquareModule.java | 6 + .../gui/main/trade/offerbook/OfferBook.java | 31 +- .../java/io/bitsquare/msg/MessageFacade.java | 19 -- .../io/bitsquare/msg/TomP2PMessageFacade.java | 258 --------------- .../OfferModule.java} | 20 +- .../io/bitsquare/offer/OfferRepository.java | 49 +++ .../offer/TomP2POfferRepository.java | 294 ++++++++++++++++++ .../java/io/bitsquare/trade/TradeManager.java | 10 +- .../createoffer/CreateOfferCoordinator.java | 14 +- .../createoffer/tasks/PublishOfferToDHT.java | 6 +- 10 files changed, 395 insertions(+), 312 deletions(-) rename src/main/java/io/bitsquare/{msg/listeners/OfferBookListener.java => offer/OfferModule.java} (64%) create mode 100644 src/main/java/io/bitsquare/offer/OfferRepository.java create mode 100644 src/main/java/io/bitsquare/offer/TomP2POfferRepository.java diff --git a/src/main/java/io/bitsquare/app/BitsquareModule.java b/src/main/java/io/bitsquare/app/BitsquareModule.java index 9395174c84..da11e8aa60 100644 --- a/src/main/java/io/bitsquare/app/BitsquareModule.java +++ b/src/main/java/io/bitsquare/app/BitsquareModule.java @@ -23,6 +23,7 @@ import io.bitsquare.crypto.CryptoModule; import io.bitsquare.gui.GuiModule; import io.bitsquare.msg.DefaultMessageModule; import io.bitsquare.msg.MessageModule; +import io.bitsquare.offer.OfferModule; import io.bitsquare.persistence.Persistence; import io.bitsquare.settings.Settings; import io.bitsquare.trade.TradeModule; @@ -70,6 +71,7 @@ public class BitsquareModule extends AbstractBitsquareModule { install(bitcoinModule()); install(cryptoModule()); install(tradeModule()); + install(offerModule()); install(guiModule()); bindConstant().annotatedWith(Names.named("appName")).to(appName); @@ -95,6 +97,10 @@ public class BitsquareModule extends AbstractBitsquareModule { return new TradeModule(properties); } + protected OfferModule offerModule() { + return new OfferModule(properties); + } + protected GuiModule guiModule() { return new GuiModule(properties, primaryStage); } diff --git a/src/main/java/io/bitsquare/gui/main/trade/offerbook/OfferBook.java b/src/main/java/io/bitsquare/gui/main/trade/offerbook/OfferBook.java index e962e6b451..f9080354ff 100644 --- a/src/main/java/io/bitsquare/gui/main/trade/offerbook/OfferBook.java +++ b/src/main/java/io/bitsquare/gui/main/trade/offerbook/OfferBook.java @@ -20,9 +20,8 @@ package io.bitsquare.gui.main.trade.offerbook; import io.bitsquare.bank.BankAccount; import io.bitsquare.locale.Country; import io.bitsquare.locale.CurrencyUtil; -import io.bitsquare.msg.MessageFacade; -import io.bitsquare.msg.listeners.OfferBookListener; import io.bitsquare.offer.Offer; +import io.bitsquare.offer.OfferRepository; import io.bitsquare.user.User; import io.bitsquare.util.Utilities; @@ -44,18 +43,18 @@ import static com.google.common.base.Preconditions.checkArgument; * Holds and manages the unsorted and unfiltered offerbook list of both buy and sell offers. * It is handled as singleton by Guice and is used by 2 instances of OfferBookModel (one for Buy one for Sell). * As it is used only by the Buy and Sell UIs we treat it as local UI model. - * It also use OfferBookListener as the lists items class and we don't want to get any dependency out of the package - * for that. + * It also use OfferRepository.Listener as the lists items class and we don't want to get any dependency out of the + * package for that. */ public class OfferBook { private static final Logger log = LoggerFactory.getLogger(OfferBook.class); - private final MessageFacade messageFacade; + private final OfferRepository offerRepository; private final User user; private final ObservableList offerBookListItems = FXCollections.observableArrayList(); - private final OfferBookListener offerBookListener; + private final OfferRepository.Listener offerRepositoryListener; private final ChangeListener bankAccountChangeListener; private final ChangeListener invalidationListener; private String fiatCode; @@ -69,14 +68,14 @@ public class OfferBook { /////////////////////////////////////////////////////////////////////////////////////////// @Inject - OfferBook(MessageFacade messageFacade, User user) { - this.messageFacade = messageFacade; + OfferBook(OfferRepository offerRepository, User user) { + this.offerRepository = offerRepository; this.user = user; bankAccountChangeListener = (observableValue, oldValue, newValue) -> setBankAccount(newValue); invalidationListener = (ov, oldValue, newValue) -> requestOffers(); - offerBookListener = new OfferBookListener() { + offerRepositoryListener = new OfferRepository.Listener() { @Override public void onOfferAdded(Offer offer) { addOfferToOfferBookListItems(offer); @@ -143,15 +142,15 @@ public class OfferBook { private void addListeners() { log.debug("addListeners "); user.currentBankAccountProperty().addListener(bankAccountChangeListener); - messageFacade.addOfferBookListener(offerBookListener); - messageFacade.invalidationTimestampProperty().addListener(invalidationListener); + offerRepository.addListener(offerRepositoryListener); + offerRepository.invalidationTimestampProperty().addListener(invalidationListener); } private void removeListeners() { log.debug("removeListeners "); user.currentBankAccountProperty().removeListener(bankAccountChangeListener); - messageFacade.removeOfferBookListener(offerBookListener); - messageFacade.invalidationTimestampProperty().removeListener(invalidationListener); + offerRepository.removeListener(offerRepositoryListener); + offerRepository.invalidationTimestampProperty().removeListener(invalidationListener); } private void addOfferToOfferBookListItems(Offer offer) { @@ -161,7 +160,7 @@ public class OfferBook { } private void requestOffers() { - messageFacade.getOffers(fiatCode); + offerRepository.getOffers(fiatCode); } @@ -174,11 +173,11 @@ public class OfferBook { addListeners(); setBankAccount(user.getCurrentBankAccount()); pollingTimer = Utilities.setInterval(1000, (animationTimer) -> { - messageFacade.requestInvalidationTimeStampFromDHT(fiatCode); + offerRepository.requestInvalidationTimeStampFromDHT(fiatCode); return null; }); - messageFacade.getOffers(fiatCode); + offerRepository.getOffers(fiatCode); } private void stopPolling() { diff --git a/src/main/java/io/bitsquare/msg/MessageFacade.java b/src/main/java/io/bitsquare/msg/MessageFacade.java index f6d6de4c7a..1dd9f49868 100644 --- a/src/main/java/io/bitsquare/msg/MessageFacade.java +++ b/src/main/java/io/bitsquare/msg/MessageFacade.java @@ -18,22 +18,17 @@ package io.bitsquare.msg; import io.bitsquare.arbitrator.Arbitrator; -import io.bitsquare.msg.listeners.AddOfferListener; import io.bitsquare.msg.listeners.ArbitratorListener; import io.bitsquare.msg.listeners.BootstrapListener; import io.bitsquare.msg.listeners.GetPeerAddressListener; import io.bitsquare.msg.listeners.IncomingMessageListener; -import io.bitsquare.msg.listeners.OfferBookListener; import io.bitsquare.msg.listeners.OutgoingMessageListener; import io.bitsquare.network.Peer; -import io.bitsquare.offer.Offer; import java.security.PublicKey; import java.util.Locale; -import javafx.beans.property.LongProperty; - public interface MessageFacade extends MessageBroker { void sendMessage(Peer peer, Message message, OutgoingMessageListener listener); @@ -46,25 +41,11 @@ public interface MessageFacade extends MessageBroker { void removeIncomingMessageListener(IncomingMessageListener listener); - void addOffer(Offer offer, AddOfferListener addOfferListener); - void addArbitratorListener(ArbitratorListener listener); void getArbitrators(Locale defaultLanguageLocale); - LongProperty invalidationTimestampProperty(); - - void addOfferBookListener(OfferBookListener offerBookListener); - - void requestInvalidationTimeStampFromDHT(String fiatCode); - - void getOffers(String fiatCode); - - void removeOffer(Offer offer); - void init(int clientPort, BootstrapListener bootstrapListener); void getPeerAddress(PublicKey messagePublicKey, GetPeerAddressListener getPeerAddressListener); - - void removeOfferBookListener(OfferBookListener offerBookListener); } diff --git a/src/main/java/io/bitsquare/msg/TomP2PMessageFacade.java b/src/main/java/io/bitsquare/msg/TomP2PMessageFacade.java index b974144bf6..9e48c886c7 100644 --- a/src/main/java/io/bitsquare/msg/TomP2PMessageFacade.java +++ b/src/main/java/io/bitsquare/msg/TomP2PMessageFacade.java @@ -18,16 +18,13 @@ package io.bitsquare.msg; import io.bitsquare.arbitrator.Arbitrator; -import io.bitsquare.msg.listeners.AddOfferListener; import io.bitsquare.msg.listeners.ArbitratorListener; import io.bitsquare.msg.listeners.BootstrapListener; import io.bitsquare.msg.listeners.GetPeerAddressListener; import io.bitsquare.msg.listeners.IncomingMessageListener; -import io.bitsquare.msg.listeners.OfferBookListener; import io.bitsquare.msg.listeners.OutgoingMessageListener; import io.bitsquare.network.Peer; import io.bitsquare.network.tomp2p.TomP2PPeer; -import io.bitsquare.offer.Offer; import io.bitsquare.user.User; import com.google.common.util.concurrent.FutureCallback; @@ -39,15 +36,12 @@ import java.security.PublicKey; import java.util.ArrayList; import java.util.List; import java.util.Locale; -import java.util.Map; import javax.annotation.Nullable; import javax.inject.Inject; import javafx.application.Platform; -import javafx.beans.property.LongProperty; -import javafx.beans.property.SimpleLongProperty; import net.tomp2p.dht.FutureGet; import net.tomp2p.dht.FuturePut; @@ -58,7 +52,6 @@ import net.tomp2p.futures.BaseFutureAdapter; import net.tomp2p.futures.BaseFutureListener; import net.tomp2p.futures.FutureDirect; import net.tomp2p.peers.Number160; -import net.tomp2p.peers.Number640; import net.tomp2p.storage.Data; import net.tomp2p.utils.Utils; @@ -82,10 +75,8 @@ class TomP2PMessageFacade implements MessageFacade { private final P2PNode p2pNode; private final User user; - private final List offerBookListeners = new ArrayList<>(); private final List arbitratorListeners = new ArrayList<>(); private final List incomingMessageListeners = new ArrayList<>(); - private final LongProperty invalidationTimestamp = new SimpleLongProperty(0); /////////////////////////////////////////////////////////////////////////////////////////// @@ -152,166 +143,6 @@ class TomP2PMessageFacade implements MessageFacade { } - /////////////////////////////////////////////////////////////////////////////////////////// - // Offer - /////////////////////////////////////////////////////////////////////////////////////////// - - public void addOffer(Offer offer, AddOfferListener addOfferListener) { - Number160 locationKey = Number160.createHash(offer.getCurrency().getCurrencyCode()); - try { - final Data offerData = new Data(offer); - - // the offer is default 30 days valid - int defaultOfferTTL = 30 * 24 * 60 * 60; - offerData.ttlSeconds(defaultOfferTTL); - log.trace("Add offer to DHT requested. Added data: [locationKey: " + locationKey + - ", hash: " + offerData.hash().toString() + "]"); - FuturePut futurePut = p2pNode.addProtectedData(locationKey, offerData); - futurePut.addListener(new BaseFutureListener() { - @Override - public void operationComplete(BaseFuture future) throws Exception { - // deactivate it for the moment until the port forwarding bug is fixed - // if (future.isSuccess()) { - Platform.runLater(() -> { - addOfferListener.onComplete(); - offerBookListeners.stream().forEach(listener -> { - try { - Object offerDataObject = offerData.object(); - if (offerDataObject instanceof Offer) { - log.error("Added offer to DHT with ID: " + ((Offer) offerDataObject).getId()); - listener.onOfferAdded((Offer) offerDataObject); - } - } catch (ClassNotFoundException | IOException e) { - e.printStackTrace(); - log.error("Add offer to DHT failed: " + e.getMessage()); - } - }); - - // TODO will be removed when we don't use polling anymore - writeInvalidationTimestampToDHT(locationKey); - log.trace("Add offer to DHT was successful. Added data: [locationKey: " + locationKey + - ", value: " + offerData + "]"); - }); - /* } - else { - Platform.runLater(() -> { - addOfferListener.onFailed("Add offer to DHT failed.", - new Exception("Add offer to DHT failed. Reason: " + future.failedReason())); - log.error("Add offer to DHT failed. Reason: " + future.failedReason()); - }); - }*/ - } - - @Override - public void exceptionCaught(Throwable t) throws Exception { - Platform.runLater(() -> { - addOfferListener.onFailed("Add offer to DHT failed with an exception.", t); - log.error("Add offer to DHT failed with an exception: " + t.getMessage()); - }); - } - }); - } catch (IOException e) { - Platform.runLater(() -> { - addOfferListener.onFailed("Add offer to DHT failed with an exception.", e); - log.error("Add offer to DHT failed with an exception: " + e.getMessage()); - }); - } - } - - //TODO remove is failing, probably due Coin or Fiat class (was working before) - // objects are identical but returned object form network might have some problem with serialisation? - public void removeOffer(Offer offer) { - Number160 locationKey = Number160.createHash(offer.getCurrency().getCurrencyCode()); - try { - final Data offerData = new Data(offer); - log.trace("Remove offer from DHT requested. Removed data: [locationKey: " + locationKey + - ", hash: " + offerData.hash().toString() + "]"); - FutureRemove futureRemove = p2pNode.removeFromDataMap(locationKey, offerData); - futureRemove.addListener(new BaseFutureListener() { - @Override - public void operationComplete(BaseFuture future) throws Exception { - // deactivate it for the moment until the port forwarding bug is fixed - // if (future.isSuccess()) { - Platform.runLater(() -> { - offerBookListeners.stream().forEach(offerBookListener -> { - try { - Object offerDataObject = offerData.object(); - if (offerDataObject instanceof Offer) { - log.trace("Remove offer from DHT was successful. Removed data: [key: " + - locationKey + ", " + - "offer: " + (Offer) offerDataObject + "]"); - offerBookListener.onOfferRemoved((Offer) offerDataObject); - } - } catch (ClassNotFoundException | IOException e) { - e.printStackTrace(); - log.error("Remove offer from DHT failed. Error: " + e.getMessage()); - } - }); - writeInvalidationTimestampToDHT(locationKey); - }); - /* } - else { - log.error("Remove offer from DHT failed. Cause: future.isSuccess() = false, locationKey: " + - locationKey + ", Reason: " + future.failedReason()); - }*/ - } - - @Override - public void exceptionCaught(Throwable t) throws Exception { - log.error("Remove offer from DHT failed. Error: " + t.getMessage()); - } - }); - } catch (IOException e) { - e.printStackTrace(); - log.error("Remove offer from DHT failed. Error: " + e.getMessage()); - } - } - - public void getOffers(String currencyCode) { - Number160 locationKey = Number160.createHash(currencyCode); - log.trace("Get offers from DHT requested for locationKey: " + locationKey); - FutureGet futureGet = p2pNode.getDataMap(locationKey); - futureGet.addListener(new BaseFutureAdapter() { - @Override - public void operationComplete(BaseFuture baseFuture) throws Exception { - if (baseFuture.isSuccess()) { - final Map dataMap = futureGet.dataMap(); - final List offers = new ArrayList<>(); - if (dataMap != null) { - for (Data offerData : dataMap.values()) { - try { - Object offerDataObject = offerData.object(); - if (offerDataObject instanceof Offer) { - offers.add((Offer) offerDataObject); - } - } catch (ClassNotFoundException | IOException e) { - e.printStackTrace(); - } - } - - Platform.runLater(() -> offerBookListeners.stream().forEach(listener -> - listener.onOffersReceived(offers))); - } - - log.trace("Get offers from DHT was successful. Stored data: [key: " + locationKey - + ", values: " + futureGet.dataMap() + "]"); - } - else { - final Map dataMap = futureGet.dataMap(); - if (dataMap == null || dataMap.size() == 0) { - log.trace("Get offers from DHT delivered empty dataMap."); - Platform.runLater(() -> offerBookListeners.stream().forEach(listener -> - listener.onOffersReceived(new ArrayList<>()))); - } - else { - log.error("Get offers from DHT was not successful with reason:" + baseFuture.failedReason()); - } - } - } - }); - } - - /////////////////////////////////////////////////////////////////////////////////////////// // Trade process /////////////////////////////////////////////////////////////////////////////////////////// @@ -452,14 +283,6 @@ class TomP2PMessageFacade implements MessageFacade { // Event Listeners /////////////////////////////////////////////////////////////////////////////////////////// - public void addOfferBookListener(OfferBookListener listener) { - offerBookListeners.add(listener); - } - - public void removeOfferBookListener(OfferBookListener listener) { - offerBookListeners.remove(listener); - } - public void addArbitratorListener(ArbitratorListener listener) { arbitratorListeners.add(listener); } @@ -477,87 +300,6 @@ class TomP2PMessageFacade implements MessageFacade { } - /* - * We store the timestamp of any change of the offer list (add, remove offer) and we poll in intervals for changes. - * If we detect a change we request the offer list from the DHT. - * Polling should be replaced by a push based solution later. - */ - - /////////////////////////////////////////////////////////////////////////////////////////// - // Polling - /////////////////////////////////////////////////////////////////////////////////////////// - - private void writeInvalidationTimestampToDHT(Number160 locationKey) { - invalidationTimestamp.set(System.currentTimeMillis()); - try { - FuturePut putFuture = p2pNode.putData(getInvalidatedLocationKey(locationKey), - new Data(invalidationTimestamp.get())); - putFuture.addListener(new BaseFutureListener() { - @Override - public void operationComplete(BaseFuture future) throws Exception { - if (putFuture.isSuccess()) - log.trace("Update invalidationTimestamp to DHT was successful. TimeStamp=" + - invalidationTimestamp.get()); - else - log.error("Update invalidationTimestamp to DHT failed with reason:" + putFuture.failedReason()); - } - - @Override - public void exceptionCaught(Throwable t) throws Exception { - log.error("Update invalidationTimestamp to DHT failed with exception:" + t.getMessage()); - } - }); - } catch (IOException e) { - log.error("Update invalidationTimestamp to DHT failed with exception:" + e.getMessage()); - } - } - - public LongProperty invalidationTimestampProperty() { - return invalidationTimestamp; - } - - public void requestInvalidationTimeStampFromDHT(String currencyCode) { - Number160 locationKey = Number160.createHash(currencyCode); - FutureGet getFuture = p2pNode.getData(getInvalidatedLocationKey(locationKey)); - getFuture.addListener(new BaseFutureListener() { - @Override - public void operationComplete(BaseFuture future) throws Exception { - if (getFuture.isSuccess()) { - Data data = getFuture.data(); - if (data != null && data.object() instanceof Long) { - final Object object = data.object(); - Platform.runLater(() -> { - Long timeStamp = (Long) object; - //log.trace("Get invalidationTimestamp from DHT was successful. TimeStamp=" + timeStamp); - invalidationTimestamp.set(timeStamp); - }); - } - else { - //log.error("Get invalidationTimestamp from DHT failed. Data = " + data); - } - } - else if (getFuture.data() == null) { - // OK as nothing is set at the moment - // log.trace("Get invalidationTimestamp from DHT returns null. That is ok for the startup."); - } - else { - log.error("Get invalidationTimestamp from DHT failed with reason:" + getFuture.failedReason()); - } - } - - @Override - public void exceptionCaught(Throwable t) throws Exception { - log.error("Get invalidationTimestamp from DHT failed with exception:" + t.getMessage()); - t.printStackTrace(); - } - }); - } - - private Number160 getInvalidatedLocationKey(Number160 locationKey) { - return Number160.createHash(locationKey + "invalidated"); - } - - /////////////////////////////////////////////////////////////////////////////////////////// // Incoming message handler /////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/main/java/io/bitsquare/msg/listeners/OfferBookListener.java b/src/main/java/io/bitsquare/offer/OfferModule.java similarity index 64% rename from src/main/java/io/bitsquare/msg/listeners/OfferBookListener.java rename to src/main/java/io/bitsquare/offer/OfferModule.java index 8a6edae6dc..dc7b826183 100644 --- a/src/main/java/io/bitsquare/msg/listeners/OfferBookListener.java +++ b/src/main/java/io/bitsquare/offer/OfferModule.java @@ -15,16 +15,20 @@ * along with Bitsquare. If not, see . */ -package io.bitsquare.msg.listeners; +package io.bitsquare.offer; -import io.bitsquare.offer.Offer; +import io.bitsquare.AbstractBitsquareModule; -import java.util.List; +import java.util.Properties; -public interface OfferBookListener { - void onOfferAdded(Offer offer); +public class OfferModule extends AbstractBitsquareModule { - void onOffersReceived(List offers); + public OfferModule(Properties properties) { + super(properties); + } - void onOfferRemoved(Offer offer); -} \ No newline at end of file + @Override + protected void configure() { + bind(OfferRepository.class).to(TomP2POfferRepository.class).asEagerSingleton(); + } +} diff --git a/src/main/java/io/bitsquare/offer/OfferRepository.java b/src/main/java/io/bitsquare/offer/OfferRepository.java new file mode 100644 index 0000000000..a7f42c987b --- /dev/null +++ b/src/main/java/io/bitsquare/offer/OfferRepository.java @@ -0,0 +1,49 @@ +/* + * This file is part of Bitsquare. + * + * Bitsquare is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bitsquare is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bitsquare. If not, see . + */ + +package io.bitsquare.offer; + +import io.bitsquare.msg.listeners.AddOfferListener; + +import java.util.List; + +import javafx.beans.property.LongProperty; + +public interface OfferRepository { + + void getOffers(String fiatCode); + + void addOffer(Offer offer, AddOfferListener addOfferListener); + + void removeOffer(Offer offer); + + void addListener(Listener listener); + + void removeListener(Listener listener); + + LongProperty invalidationTimestampProperty(); + + void requestInvalidationTimeStampFromDHT(String fiatCode); + + interface Listener { + void onOfferAdded(Offer offer); + + void onOffersReceived(List offers); + + void onOfferRemoved(Offer offer); + } +} diff --git a/src/main/java/io/bitsquare/offer/TomP2POfferRepository.java b/src/main/java/io/bitsquare/offer/TomP2POfferRepository.java new file mode 100644 index 0000000000..8bb753068c --- /dev/null +++ b/src/main/java/io/bitsquare/offer/TomP2POfferRepository.java @@ -0,0 +1,294 @@ +/* + * This file is part of Bitsquare. + * + * Bitsquare is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bitsquare is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bitsquare. If not, see . + */ + +package io.bitsquare.offer; + +import io.bitsquare.msg.P2PNode; +import io.bitsquare.msg.listeners.AddOfferListener; + +import java.io.IOException; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import javax.inject.Inject; + +import javafx.application.Platform; +import javafx.beans.property.LongProperty; +import javafx.beans.property.SimpleLongProperty; + +import net.tomp2p.dht.FutureGet; +import net.tomp2p.dht.FuturePut; +import net.tomp2p.dht.FutureRemove; +import net.tomp2p.futures.BaseFuture; +import net.tomp2p.futures.BaseFutureAdapter; +import net.tomp2p.futures.BaseFutureListener; +import net.tomp2p.peers.Number160; +import net.tomp2p.peers.Number640; +import net.tomp2p.storage.Data; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class TomP2POfferRepository implements OfferRepository { + + private static final Logger log = LoggerFactory.getLogger(TomP2POfferRepository.class); + + private final List offerRepositoryListeners = new ArrayList<>(); + private final LongProperty invalidationTimestamp = new SimpleLongProperty(0); + + private final P2PNode p2pNode; + + @Inject + public TomP2POfferRepository(P2PNode p2pNode) { + this.p2pNode = p2pNode; + } + + @Override + public void addOffer(Offer offer, AddOfferListener addOfferListener) { + Number160 locationKey = Number160.createHash(offer.getCurrency().getCurrencyCode()); + try { + final Data offerData = new Data(offer); + + // the offer is default 30 days valid + int defaultOfferTTL = 30 * 24 * 60 * 60; + offerData.ttlSeconds(defaultOfferTTL); + log.trace("Add offer to DHT requested. Added data: [locationKey: " + locationKey + + ", hash: " + offerData.hash().toString() + "]"); + FuturePut futurePut = p2pNode.addProtectedData(locationKey, offerData); + futurePut.addListener(new BaseFutureListener() { + @Override + public void operationComplete(BaseFuture future) throws Exception { + // deactivate it for the moment until the port forwarding bug is fixed + // if (future.isSuccess()) { + Platform.runLater(() -> { + addOfferListener.onComplete(); + offerRepositoryListeners.stream().forEach(listener -> { + try { + Object offerDataObject = offerData.object(); + if (offerDataObject instanceof Offer) { + log.error("Added offer to DHT with ID: " + ((Offer) offerDataObject).getId()); + listener.onOfferAdded((Offer) offerDataObject); + } + } catch (ClassNotFoundException | IOException e) { + e.printStackTrace(); + log.error("Add offer to DHT failed: " + e.getMessage()); + } + }); + + writeInvalidationTimestampToDHT(locationKey); + log.trace("Add offer to DHT was successful. Added data: [locationKey: " + locationKey + + ", value: " + offerData + "]"); + }); + } + + @Override + public void exceptionCaught(Throwable t) throws Exception { + Platform.runLater(() -> { + addOfferListener.onFailed("Add offer to DHT failed with an exception.", t); + log.error("Add offer to DHT failed with an exception: " + t.getMessage()); + }); + } + }); + } catch (IOException e) { + Platform.runLater(() -> { + addOfferListener.onFailed("Add offer to DHT failed with an exception.", e); + log.error("Add offer to DHT failed with an exception: " + e.getMessage()); + }); + } + } + + //TODO remove is failing, probably due Coin or Fiat class (was working before) + // objects are identical but returned object form network might have some problem with serialisation? + public void removeOffer(Offer offer) { + Number160 locationKey = Number160.createHash(offer.getCurrency().getCurrencyCode()); + try { + final Data offerData = new Data(offer); + log.trace("Remove offer from DHT requested. Removed data: [locationKey: " + locationKey + + ", hash: " + offerData.hash().toString() + "]"); + FutureRemove futureRemove = p2pNode.removeFromDataMap(locationKey, offerData); + futureRemove.addListener(new BaseFutureListener() { + @Override + public void operationComplete(BaseFuture future) throws Exception { + // deactivate it for the moment until the port forwarding bug is fixed + // if (future.isSuccess()) { + Platform.runLater(() -> { + offerRepositoryListeners.stream().forEach(listener -> { + try { + Object offerDataObject = offerData.object(); + if (offerDataObject instanceof Offer) { + log.trace("Remove offer from DHT was successful. Removed data: [key: " + + locationKey + ", " + + "offer: " + (Offer) offerDataObject + "]"); + listener.onOfferRemoved((Offer) offerDataObject); + } + } catch (ClassNotFoundException | IOException e) { + e.printStackTrace(); + log.error("Remove offer from DHT failed. Error: " + e.getMessage()); + } + }); + writeInvalidationTimestampToDHT(locationKey); + }); + } + + @Override + public void exceptionCaught(Throwable t) throws Exception { + log.error("Remove offer from DHT failed. Error: " + t.getMessage()); + } + }); + } catch (IOException e) { + e.printStackTrace(); + log.error("Remove offer from DHT failed. Error: " + e.getMessage()); + } + } + + public void getOffers(String currencyCode) { + Number160 locationKey = Number160.createHash(currencyCode); + log.trace("Get offers from DHT requested for locationKey: " + locationKey); + FutureGet futureGet = p2pNode.getDataMap(locationKey); + futureGet.addListener(new BaseFutureAdapter() { + @Override + public void operationComplete(BaseFuture baseFuture) throws Exception { + if (baseFuture.isSuccess()) { + final Map dataMap = futureGet.dataMap(); + final List offers = new ArrayList<>(); + if (dataMap != null) { + for (Data offerData : dataMap.values()) { + try { + Object offerDataObject = offerData.object(); + if (offerDataObject instanceof Offer) { + offers.add((Offer) offerDataObject); + } + } catch (ClassNotFoundException | IOException e) { + e.printStackTrace(); + } + } + + Platform.runLater(() -> offerRepositoryListeners.stream().forEach(listener -> + listener.onOffersReceived(offers))); + } + + log.trace("Get offers from DHT was successful. Stored data: [key: " + locationKey + + ", values: " + futureGet.dataMap() + "]"); + } + else { + final Map dataMap = futureGet.dataMap(); + if (dataMap == null || dataMap.size() == 0) { + log.trace("Get offers from DHT delivered empty dataMap."); + Platform.runLater(() -> offerRepositoryListeners.stream().forEach(listener -> + listener.onOffersReceived(new ArrayList<>()))); + } + else { + log.error("Get offers from DHT was not successful with reason:" + baseFuture.failedReason()); + } + } + } + }); + } + + @Override + public void addListener(Listener listener) { + offerRepositoryListeners.add(listener); + } + + @Override + public void removeListener(Listener listener) { + offerRepositoryListeners.remove(listener); + } + + /* + * We store the timestamp of any change of the offer list (add, remove offer) and we poll + * in intervals for changes. If we detect a change we request the offer list from the DHT. + * Polling should be replaced by a push based solution later. + */ + + /////////////////////////////////////////////////////////////////////////////////////////// + // Polling + /////////////////////////////////////////////////////////////////////////////////////////// + + private void writeInvalidationTimestampToDHT(Number160 locationKey) { + invalidationTimestamp.set(System.currentTimeMillis()); + try { + FuturePut putFuture = p2pNode.putData(getInvalidatedLocationKey(locationKey), + new Data(invalidationTimestamp.get())); + putFuture.addListener(new BaseFutureListener() { + @Override + public void operationComplete(BaseFuture future) throws Exception { + if (putFuture.isSuccess()) + log.trace("Update invalidationTimestamp to DHT was successful. TimeStamp=" + + invalidationTimestamp.get()); + else + log.error("Update invalidationTimestamp to DHT failed with reason:" + putFuture.failedReason()); + } + + @Override + public void exceptionCaught(Throwable t) throws Exception { + log.error("Update invalidationTimestamp to DHT failed with exception:" + t.getMessage()); + } + }); + } catch (IOException e) { + log.error("Update invalidationTimestamp to DHT failed with exception:" + e.getMessage()); + } + } + + public LongProperty invalidationTimestampProperty() { + return invalidationTimestamp; + } + + public void requestInvalidationTimeStampFromDHT(String currencyCode) { + Number160 locationKey = Number160.createHash(currencyCode); + FutureGet getFuture = p2pNode.getData(getInvalidatedLocationKey(locationKey)); + getFuture.addListener(new BaseFutureListener() { + @Override + public void operationComplete(BaseFuture future) throws Exception { + if (getFuture.isSuccess()) { + Data data = getFuture.data(); + if (data != null && data.object() instanceof Long) { + final Object object = data.object(); + Platform.runLater(() -> { + Long timeStamp = (Long) object; + //log.trace("Get invalidationTimestamp from DHT was successful. TimeStamp=" + timeStamp); + invalidationTimestamp.set(timeStamp); + }); + } + else { + //log.error("Get invalidationTimestamp from DHT failed. Data = " + data); + } + } + else if (getFuture.data() == null) { + // OK as nothing is set at the moment + // log.trace("Get invalidationTimestamp from DHT returns null. That is ok for the startup."); + } + else { + log.error("Get invalidationTimestamp from DHT failed with reason:" + getFuture.failedReason()); + } + } + + @Override + public void exceptionCaught(Throwable t) throws Exception { + log.error("Get invalidationTimestamp from DHT failed with exception:" + t.getMessage()); + t.printStackTrace(); + } + }); + } + + private Number160 getInvalidatedLocationKey(Number160 locationKey) { + return Number160.createHash(locationKey + "invalidated"); + } + +} diff --git a/src/main/java/io/bitsquare/trade/TradeManager.java b/src/main/java/io/bitsquare/trade/TradeManager.java index 7956036edb..da3704d330 100644 --- a/src/main/java/io/bitsquare/trade/TradeManager.java +++ b/src/main/java/io/bitsquare/trade/TradeManager.java @@ -25,6 +25,7 @@ import io.bitsquare.msg.MessageFacade; import io.bitsquare.network.Peer; import io.bitsquare.offer.Direction; import io.bitsquare.offer.Offer; +import io.bitsquare.offer.OfferRepository; import io.bitsquare.persistence.Persistence; import io.bitsquare.settings.Settings; import io.bitsquare.trade.handlers.ErrorMessageHandler; @@ -78,6 +79,7 @@ public class TradeManager { private final BlockChainFacade blockChainFacade; private final WalletFacade walletFacade; private final CryptoFacade cryptoFacade; + private final OfferRepository offerRepository; //TODO store TakerAsSellerProtocol in trade private final Map takerAsSellerProtocolMap = new HashMap<>(); @@ -99,7 +101,8 @@ public class TradeManager { @Inject public TradeManager(User user, Settings settings, Persistence persistence, MessageFacade messageFacade, - BlockChainFacade blockChainFacade, WalletFacade walletFacade, CryptoFacade cryptoFacade) { + BlockChainFacade blockChainFacade, WalletFacade walletFacade, CryptoFacade cryptoFacade, + OfferRepository offerRepository) { this.user = user; this.settings = settings; this.persistence = persistence; @@ -107,6 +110,7 @@ public class TradeManager { this.blockChainFacade = blockChainFacade; this.walletFacade = walletFacade; this.cryptoFacade = cryptoFacade; + this.offerRepository = offerRepository; Object offersObject = persistence.read(this, "offers"); if (offersObject instanceof Map) { @@ -189,7 +193,7 @@ public class TradeManager { (message, throwable) -> { errorMessageHandler.onFault(message); createOfferCoordinatorMap.remove(offer.getId()); - }); + }, offerRepository); createOfferCoordinatorMap.put(offer.getId(), createOfferCoordinator); createOfferCoordinator.start(); } @@ -210,7 +214,7 @@ public class TradeManager { offers.remove(offer.getId()); persistOffers(); - messageFacade.removeOffer(offer); + offerRepository.removeOffer(offer); } diff --git a/src/main/java/io/bitsquare/trade/protocol/createoffer/CreateOfferCoordinator.java b/src/main/java/io/bitsquare/trade/protocol/createoffer/CreateOfferCoordinator.java index 4e17d6d897..d9e982c158 100644 --- a/src/main/java/io/bitsquare/trade/protocol/createoffer/CreateOfferCoordinator.java +++ b/src/main/java/io/bitsquare/trade/protocol/createoffer/CreateOfferCoordinator.java @@ -20,6 +20,7 @@ package io.bitsquare.trade.protocol.createoffer; import io.bitsquare.btc.WalletFacade; import io.bitsquare.msg.MessageFacade; import io.bitsquare.offer.Offer; +import io.bitsquare.offer.OfferRepository; import io.bitsquare.persistence.Persistence; import io.bitsquare.trade.handlers.FaultHandler; import io.bitsquare.trade.handlers.TransactionResultHandler; @@ -90,22 +91,25 @@ public class CreateOfferCoordinator { private final TransactionResultHandler resultHandler; private final FaultHandler faultHandler; private final Model model; + private final OfferRepository offerRepository; public CreateOfferCoordinator(Persistence persistence, Offer offer, WalletFacade walletFacade, MessageFacade messageFacade, TransactionResultHandler resultHandler, - FaultHandler faultHandler) { - this(offer, walletFacade, messageFacade, resultHandler, faultHandler, new Model(persistence)); + FaultHandler faultHandler, OfferRepository offerRepository) { + this(offer, walletFacade, messageFacade, resultHandler, faultHandler, new Model(persistence), offerRepository); } // for recovery from model public CreateOfferCoordinator(Offer offer, WalletFacade walletFacade, MessageFacade messageFacade, - TransactionResultHandler resultHandler, FaultHandler faultHandler, Model model) { + TransactionResultHandler resultHandler, FaultHandler faultHandler, Model model, + OfferRepository offerRepository) { this.offer = offer; this.walletFacade = walletFacade; this.messageFacade = messageFacade; this.resultHandler = resultHandler; this.faultHandler = faultHandler; this.model = model; + this.offerRepository = offerRepository; model.setState(State.INITED); } @@ -129,7 +133,7 @@ public class CreateOfferCoordinator { private void onOfferFeeTxBroadCasted() { model.setState(State.OFFER_FEE_BROAD_CASTED); - PublishOfferToDHT.run(this::onOfferPublishedToDHT, this::onFailed, messageFacade, offer); + PublishOfferToDHT.run(this::onOfferPublishedToDHT, this::onFailed, offerRepository, offer); } private void onOfferPublishedToDHT() { @@ -159,7 +163,7 @@ public class CreateOfferCoordinator { case OFFER_FEE_BROAD_CASTED: // actually the only replay case here, tx publish was successful but storage to dht failed. // Republish the offer to DHT - PublishOfferToDHT.run(this::onOfferPublishedToDHT, this::onFailed, messageFacade, offer); + PublishOfferToDHT.run(this::onOfferPublishedToDHT, this::onFailed, offerRepository, offer); break; case OFFER_PUBLISHED_TO_DHT: // should be impossible diff --git a/src/main/java/io/bitsquare/trade/protocol/createoffer/tasks/PublishOfferToDHT.java b/src/main/java/io/bitsquare/trade/protocol/createoffer/tasks/PublishOfferToDHT.java index d5611bd519..143a06afed 100644 --- a/src/main/java/io/bitsquare/trade/protocol/createoffer/tasks/PublishOfferToDHT.java +++ b/src/main/java/io/bitsquare/trade/protocol/createoffer/tasks/PublishOfferToDHT.java @@ -17,9 +17,9 @@ package io.bitsquare.trade.protocol.createoffer.tasks; -import io.bitsquare.msg.MessageFacade; import io.bitsquare.msg.listeners.AddOfferListener; import io.bitsquare.offer.Offer; +import io.bitsquare.offer.OfferRepository; import io.bitsquare.trade.handlers.FaultHandler; import io.bitsquare.trade.handlers.ResultHandler; @@ -29,9 +29,9 @@ import org.slf4j.LoggerFactory; public class PublishOfferToDHT { private static final Logger log = LoggerFactory.getLogger(PublishOfferToDHT.class); - public static void run(ResultHandler resultHandler, FaultHandler faultHandler, MessageFacade messageFacade, + public static void run(ResultHandler resultHandler, FaultHandler faultHandler, OfferRepository offerRepository, Offer offer) { - messageFacade.addOffer(offer, new AddOfferListener() { + offerRepository.addOffer(offer, new AddOfferListener() { @Override public void onComplete() { resultHandler.onResult();