From ec83feabfa5d9b984bd04ed54a4bc367fe359d25 Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Fri, 20 Mar 2015 14:25:17 +0100 Subject: [PATCH] Re-enable dht data protection --- .../tomp2p/TomP2PArbitratorService.java | 11 +- .../trade/takeoffer/TakeOfferViewModel.java | 2 +- .../offer/tomp2p/TomP2POfferBookService.java | 11 +- .../java/io/bitsquare/p2p/BaseP2PService.java | 3 +- .../java/io/bitsquare/p2p/DHTService.java | 14 +- .../p2p/tomp2p/TomP2PAddressService.java | 8 +- .../p2p/tomp2p/TomP2PDHTService.java | 267 +++++++++++------- .../p2p/tomp2p/TomP2PMailboxService.java | 256 +++++++++++++++++ .../io/bitsquare/p2p/tomp2p/TomP2PNode.java | 18 +- .../bitsquare/p2p/tomp2p/TomP2PService.java | 30 +- core/src/main/resources/logback.xml | 2 +- .../placeoffer/PlaceOfferProtocolTest.java | 2 +- 12 files changed, 463 insertions(+), 161 deletions(-) create mode 100644 core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PMailboxService.java diff --git a/core/src/main/java/io/bitsquare/arbitration/tomp2p/TomP2PArbitratorService.java b/core/src/main/java/io/bitsquare/arbitration/tomp2p/TomP2PArbitratorService.java index 35fabdf4e3..8943c0e92f 100644 --- a/core/src/main/java/io/bitsquare/arbitration/tomp2p/TomP2PArbitratorService.java +++ b/core/src/main/java/io/bitsquare/arbitration/tomp2p/TomP2PArbitratorService.java @@ -22,6 +22,7 @@ import io.bitsquare.arbitration.ArbitratorService; import io.bitsquare.arbitration.listeners.ArbitratorListener; import io.bitsquare.p2p.tomp2p.TomP2PDHTService; import io.bitsquare.p2p.tomp2p.TomP2PNode; +import io.bitsquare.user.User; import java.io.IOException; @@ -56,8 +57,8 @@ public class TomP2PArbitratorService extends TomP2PDHTService implements Arbitra /////////////////////////////////////////////////////////////////////////////////////////// @Inject - public TomP2PArbitratorService(TomP2PNode tomP2PNode) { - super(tomP2PNode); + public TomP2PArbitratorService(TomP2PNode tomP2PNode, User user) { + super(tomP2PNode, user); } @@ -70,7 +71,7 @@ public class TomP2PArbitratorService extends TomP2PDHTService implements Arbitra try { final Data arbitratorData = new Data(arbitrator); - FuturePut addFuture = addProtectedData(locationKey, arbitratorData); + FuturePut addFuture = addProtectedDataToMap(locationKey, arbitratorData); addFuture.addListener(new BaseFutureAdapter() { @Override public void operationComplete(BaseFuture future) throws Exception { @@ -104,7 +105,7 @@ public class TomP2PArbitratorService extends TomP2PDHTService implements Arbitra public void removeArbitrator(Arbitrator arbitrator) throws IOException { Number160 locationKey = Number160.createHash(ARBITRATORS_ROOT); final Data arbitratorData = new Data(arbitrator); - FutureRemove removeFuture = removeFromDataMap(locationKey, arbitratorData); + FutureRemove removeFuture = removeProtectedDataFromMap(locationKey, arbitratorData); removeFuture.addListener(new BaseFutureAdapter() { @Override public void operationComplete(BaseFuture future) throws Exception { @@ -135,7 +136,7 @@ public class TomP2PArbitratorService extends TomP2PDHTService implements Arbitra public void getArbitrators(Locale languageLocale) { Number160 locationKey = Number160.createHash(ARBITRATORS_ROOT); - FutureGet futureGet = getDataMap(locationKey); + FutureGet futureGet = getMap(locationKey); futureGet.addListener(new BaseFutureAdapter() { @Override public void operationComplete(BaseFuture future) throws Exception { diff --git a/core/src/main/java/io/bitsquare/gui/main/trade/takeoffer/TakeOfferViewModel.java b/core/src/main/java/io/bitsquare/gui/main/trade/takeoffer/TakeOfferViewModel.java index ee731bf61b..25a0ff6bd1 100644 --- a/core/src/main/java/io/bitsquare/gui/main/trade/takeoffer/TakeOfferViewModel.java +++ b/core/src/main/java/io/bitsquare/gui/main/trade/takeoffer/TakeOfferViewModel.java @@ -155,7 +155,7 @@ class TakeOfferViewModel extends ActivatableWithDataModel im switch (state) { case UNKNOWN: - log.error("Must not happen."); + log.error("Offer state is UNKNOWN. That must not happen."); break; case AVAILABLE: this.state.set(State.AMOUNT_SCREEN); diff --git a/core/src/main/java/io/bitsquare/offer/tomp2p/TomP2POfferBookService.java b/core/src/main/java/io/bitsquare/offer/tomp2p/TomP2POfferBookService.java index 01b4212cbd..93023450c8 100644 --- a/core/src/main/java/io/bitsquare/offer/tomp2p/TomP2POfferBookService.java +++ b/core/src/main/java/io/bitsquare/offer/tomp2p/TomP2POfferBookService.java @@ -23,6 +23,7 @@ import io.bitsquare.offer.Offer; import io.bitsquare.offer.OfferBookService; import io.bitsquare.p2p.tomp2p.TomP2PDHTService; import io.bitsquare.p2p.tomp2p.TomP2PNode; +import io.bitsquare.user.User; import java.io.IOException; @@ -57,8 +58,8 @@ public class TomP2POfferBookService extends TomP2PDHTService implements OfferBoo @Inject - public TomP2POfferBookService(TomP2PNode tomP2PNode) { - super(tomP2PNode); + public TomP2POfferBookService(TomP2PNode tomP2PNode, User user) { + super(tomP2PNode, user); } @Override @@ -72,7 +73,7 @@ public class TomP2POfferBookService extends TomP2PDHTService implements OfferBoo offerData.ttlSeconds(defaultOfferTTL); log.trace("Add offer to DHT requested. Added data: [locationKey: " + locationKey + ", hash: " + offerData.hash().toString() + "]"); - FuturePut futurePut = addProtectedData(locationKey, offerData); + FuturePut futurePut = addProtectedDataToMap(locationKey, offerData); futurePut.addListener(new BaseFutureListener() { @Override public void operationComplete(BaseFuture future) throws Exception { @@ -115,7 +116,7 @@ public class TomP2POfferBookService extends TomP2PDHTService implements OfferBoo final Data offerData = new Data(offer); log.trace("Remove offer from DHT requested. Removed data: [locationKey: " + locationKey + ", hash: " + offerData.hash().toString() + "]"); - FutureRemove futureRemove = removeFromDataMap(locationKey, offerData); + FutureRemove futureRemove = removeProtectedDataFromMap(locationKey, offerData); futureRemove.addListener(new BaseFutureListener() { @Override public void operationComplete(BaseFuture future) throws Exception { @@ -160,7 +161,7 @@ public class TomP2POfferBookService extends TomP2PDHTService implements OfferBoo public void getOffers(String currencyCode) { Number160 locationKey = Number160.createHash(currencyCode); log.trace("Get offers from DHT requested for locationKey: " + locationKey); - FutureGet futureGet = getDataMap(locationKey); + FutureGet futureGet = getMap(locationKey); futureGet.addListener(new BaseFutureAdapter() { @Override public void operationComplete(BaseFuture future) throws Exception { diff --git a/core/src/main/java/io/bitsquare/p2p/BaseP2PService.java b/core/src/main/java/io/bitsquare/p2p/BaseP2PService.java index afca369b16..999794e77b 100644 --- a/core/src/main/java/io/bitsquare/p2p/BaseP2PService.java +++ b/core/src/main/java/io/bitsquare/p2p/BaseP2PService.java @@ -29,11 +29,12 @@ public class BaseP2PService implements P2PService { BaseP2PService.userThread = userThread; } - protected Executor executor = userThread; + protected Executor executor; protected PeerDHT peerDHT; @Override public void bootstrapCompleted() { + this.executor = BaseP2PService.userThread; } @Override diff --git a/core/src/main/java/io/bitsquare/p2p/DHTService.java b/core/src/main/java/io/bitsquare/p2p/DHTService.java index aecf8b88d8..bb50858dce 100644 --- a/core/src/main/java/io/bitsquare/p2p/DHTService.java +++ b/core/src/main/java/io/bitsquare/p2p/DHTService.java @@ -27,17 +27,17 @@ import net.tomp2p.storage.Data; public interface DHTService extends P2PService { - FuturePut putDomainProtectedData(Number160 locationKey, Data data); - FuturePut putData(Number160 locationKey, Data data); - FutureGet getDomainProtectedData(Number160 locationKey, PublicKey publicKey); - FutureGet getData(Number160 locationKey); - FuturePut addProtectedData(Number160 locationKey, Data data); + FuturePut putDataToMyProtectedDomain(Number160 locationKey, Data data); - FutureRemove removeFromDataMap(Number160 locationKey, Data data); + FutureGet getDataOfProtectedDomain(Number160 locationKey, PublicKey publicKey); - FutureGet getDataMap(Number160 locationKey); + FuturePut addProtectedDataToMap(Number160 locationKey, Data data); + + FutureRemove removeProtectedDataFromMap(Number160 locationKey, Data data); + + FutureGet getMap(Number160 locationKey); } diff --git a/core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PAddressService.java b/core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PAddressService.java index 0984bfb240..241b207103 100644 --- a/core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PAddressService.java +++ b/core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PAddressService.java @@ -64,7 +64,7 @@ public class TomP2PAddressService extends TomP2PDHTService implements AddressSer @Inject public TomP2PAddressService(TomP2PNode tomP2PNode, User user) { - super(tomP2PNode); + super(tomP2PNode, user); locationKey = Utils.makeSHAHash(user.getMessageKeyPair().getPublic().getEncoded()); } @@ -93,7 +93,7 @@ public class TomP2PAddressService extends TomP2PDHTService implements AddressSer @Override public void findPeerAddress(PublicKey publicKey, GetPeerAddressListener listener) { final Number160 locationKey = Utils.makeSHAHash(publicKey.getEncoded()); - FutureGet futureGet = getDomainProtectedData(locationKey, publicKey); + FutureGet futureGet = getDataOfProtectedDomain(locationKey, publicKey); log.trace("findPeerAddress called"); futureGet.addListener(new BaseFutureAdapter() { @Override @@ -144,7 +144,7 @@ public class TomP2PAddressService extends TomP2PDHTService implements AddressSer // We set a short time-to-live to make getAddress checks fail fast in case if the offerer is offline and to support cheap offerbook state updates data.ttlSeconds(ADDRESS_TTL); log.debug("storePeerAddress " + peerDHT.peerAddress().toString()); - FuturePut futurePut = putDomainProtectedData(locationKey, data); + FuturePut futurePut = putDataToMyProtectedDomain(locationKey, data); futurePut.addListener(new BaseFutureListener() { @Override public void operationComplete(BaseFuture future) throws Exception { @@ -173,7 +173,7 @@ public class TomP2PAddressService extends TomP2PDHTService implements AddressSer private void removeAddress() { try { Data data = new Data(new TomP2PPeer(peerDHT.peerAddress())); - removeFromDataMap(locationKey, data).awaitUninterruptibly(1000); + removeProtectedDataFromMap(locationKey, data).awaitUninterruptibly(1000); } catch (IOException e) { e.printStackTrace(); log.error("Exception at removeAddress " + e.toString()); diff --git a/core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PDHTService.java b/core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PDHTService.java index be6ef6aabc..516ae741ef 100644 --- a/core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PDHTService.java +++ b/core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PDHTService.java @@ -18,7 +18,9 @@ package io.bitsquare.p2p.tomp2p; import io.bitsquare.p2p.DHTService; +import io.bitsquare.user.User; +import java.security.KeyPair; import java.security.PublicKey; import javax.inject.Inject; @@ -26,14 +28,18 @@ import javax.inject.Inject; import net.tomp2p.dht.FutureGet; import net.tomp2p.dht.FuturePut; import net.tomp2p.dht.FutureRemove; +import net.tomp2p.dht.StorageLayer; import net.tomp2p.peers.Number160; import net.tomp2p.storage.Data; +import net.tomp2p.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TomP2PDHTService extends TomP2PService implements DHTService { private static final Logger log = LoggerFactory.getLogger(TomP2PDHTService.class); + private final KeyPair keyPair; + private final Number160 pubKeyHashForMyDomain; /////////////////////////////////////////////////////////////////////////////////////////// @@ -41,135 +47,184 @@ public class TomP2PDHTService extends TomP2PService implements DHTService { /////////////////////////////////////////////////////////////////////////////////////////// @Inject - public TomP2PDHTService(TomP2PNode tomP2PNode) { + public TomP2PDHTService(TomP2PNode tomP2PNode, User user) { super(tomP2PNode); - } - - - /////////////////////////////////////////////////////////////////////////////////////////// - // DHT methods - /////////////////////////////////////////////////////////////////////////////////////////// - - // TODO remove all security features for the moment. There are some problems with a "wrong signature!" msg in - // the logs - @Override - public FuturePut putDomainProtectedData(Number160 locationKey, Data data) { - log.trace("putDomainProtectedData"); - return peerDHT.put(locationKey).data(data).start(); + keyPair = user.getMessageKeyPair(); + pubKeyHashForMyDomain = Utils.makeSHAHash(keyPair.getPublic().getEncoded()); } @Override + public void bootstrapCompleted() { + super.bootstrapCompleted(); + + StorageLayer.ProtectionEnable protectionDomainEnable = StorageLayer.ProtectionEnable.ALL; + StorageLayer.ProtectionMode protectionDomainMode = StorageLayer.ProtectionMode.MASTER_PUBLIC_KEY; + StorageLayer.ProtectionEnable protectionEntryEnable = StorageLayer.ProtectionEnable.ALL; + StorageLayer.ProtectionMode protectionEntryMode = StorageLayer.ProtectionMode.MASTER_PUBLIC_KEY; + + peerDHT.storageLayer().protection(protectionDomainEnable, protectionDomainMode, protectionEntryEnable, protectionEntryMode); + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Put/Get: Public access. Used for offerbook invalidation timestamp + /////////////////////////////////////////////////////////////////////////////////////////// + + /** + * Store data to given location key. + * Write access: Anyone with locationKey + * + * @param locationKey + * @param data + * @return + */ public FuturePut putData(Number160 locationKey, Data data) { log.trace("putData"); return peerDHT.put(locationKey).data(data).start(); } + // No protection, everybody can read. - @Override - public FutureGet getDomainProtectedData(Number160 locationKey, PublicKey publicKey) { - log.trace("getDomainProtectedData"); - return peerDHT.get(locationKey).start(); - } - - @Override + /** + * Get data for given locationKey + * Read access: Anyone with locationKey + * + * @param locationKey + * @return + */ public FutureGet getData(Number160 locationKey) { - //log.trace("getData"); + log.trace("getData"); return peerDHT.get(locationKey).start(); } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Put/Get: Domain protected, entry protected. Used for storing address. + /////////////////////////////////////////////////////////////////////////////////////////// - @Override - public FuturePut addProtectedData(Number160 locationKey, Data data) { - log.trace("addProtectedData"); - return peerDHT.add(locationKey).data(data).start(); + /** + * Store data to given location key and my domain. + * Write access: Anybody who has pubKey if domain is not used before. KeyPair owner of pubKey can overwrite and reserve that domain. + * We save early an entry so we have that domain reserved and nobody else can use it. + * Additionally we use entry protection, so domain owner is data owner. + * + * @param locationKey + * @param data + * @return + */ + public FuturePut putDataToMyProtectedDomain(Number160 locationKey, Data data) { + log.trace("putDataToMyProtectedDomain"); + data.protectEntry(keyPair).sign(); + return peerDHT.put(locationKey).data(data).sign().protectDomain().domainKey(pubKeyHashForMyDomain).start(); } - @Override - public FutureRemove removeFromDataMap(Number160 locationKey, Data data) { + /** + * Read data for given location and publicKey of that domain. + * Read access: Anyone who has publicKey + * + * @param locationKey + * @param publicKey + * @return + */ + public FutureGet getDataOfProtectedDomain(Number160 locationKey, PublicKey publicKey) { + log.trace("getDataOfProtectedDomain"); + final Number160 pubKeyHashOfDomainOwner = Utils.makeSHAHash(publicKey.getEncoded()); + return peerDHT.get(locationKey).domainKey(pubKeyHashOfDomainOwner).start(); + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Add/remove/get from map: Entry protected, no domain protection. Used for offerbook and arbitrators + /////////////////////////////////////////////////////////////////////////////////////////// + + /** + * Add data to a map. For the entry contentKey of data is used (internally). + * Write access: Anyone can add entries. But nobody can overwrite an existing entry as it is protected by data protection. + * + * @param locationKey + * @param data + * @return + */ + public FuturePut addProtectedDataToMap(Number160 locationKey, Data data) { + log.trace("addProtectedDataToMap"); + data.protectEntry(keyPair).sign(); + log.trace("addProtectedDataToMap with contentKey " + data.hash().toString()); + return peerDHT.add(locationKey).data(data).sign().start(); + } + + /** + * Remove entry from map for given locationKey. ContentKey of data is used for removing the entry. + * Access: Only the owner of the data entry can remove it, as it was written with entry protection. + * + * @param locationKey + * @param data + * @return + */ + public FutureRemove removeProtectedDataFromMap(Number160 locationKey, Data data) { + log.trace("removeProtectedDataFromMap"); Number160 contentKey = data.hash(); - log.trace("removeFromDataMap with contentKey " + contentKey.toString()); - return peerDHT.remove(locationKey).contentKey(contentKey).start(); + log.trace("removeProtectedDataFromMap with contentKey " + contentKey.toString()); + return peerDHT.remove(locationKey).contentKey(contentKey).sign().start(); } - @Override - public FutureGet getDataMap(Number160 locationKey) { - log.trace("getDataMap"); + /** + * Get map for given locationKey with all entries. + * Access: Everybody can read. + * + * @param locationKey + * @return + */ + public FutureGet getMap(Number160 locationKey) { + log.trace("getMap"); return peerDHT.get(locationKey).all().start(); } -// -// public FuturePut putDomainProtectedData(Number160 locationKey, Data data) { -// log.trace("putDomainProtectedData"); -// data.protectEntry(keyPair); -// final Number160 ownerKeyHash = Utils.makeSHAHash(keyPair.getPublic().getEncoded()); -// return peerDHT.put(locationKey).data(data).keyPair(keyPair).domainKey(ownerKeyHash).protectDomain().start(); -// } -// -// // No protection, everybody can write. -// public FuturePut putData(Number160 locationKey, Data data) { -// log.trace("putData"); -// return peerDHT.put(locationKey).data(data).start(); -// } -// -// // Not public readable. Only users with the public key of the peer who stored the data can read that data -// public FutureGet getDomainProtectedData(Number160 locationKey, PublicKey publicKey) { -// log.trace("getDomainProtectedData"); -// final Number160 ownerKeyHash = Utils.makeSHAHash(publicKey.getEncoded()); -// return peerDHT.get(locationKey).domainKey(ownerKeyHash).start(); -// } -// -// // No protection, everybody can read. -// public FutureGet getData(Number160 locationKey) { -// log.trace("getData"); -// return peerDHT.get(locationKey).start(); -// } -// -// // No domain protection, but entry protection -// public FuturePut addProtectedData(Number160 locationKey, Data data) { -// log.trace("addProtectedData"); -// data.protectEntry(keyPair); -// log.trace("addProtectedData with contentKey " + data.hash().toString()); -// return peerDHT.add(locationKey).data(data).keyPair(keyPair).start(); -// } -// -// // No domain protection, but entry protection -// public FutureRemove removeFromDataMap(Number160 locationKey, Data data) { -// log.trace("removeFromDataMap"); -// Number160 contentKey = data.hash(); -// log.trace("removeFromDataMap with contentKey " + contentKey.toString()); -// return peerDHT.remove(locationKey).contentKey(contentKey).keyPair(keyPair).start(); -// } -// -// // Public readable -// public FutureGet getDataMap(Number160 locationKey) { -// log.trace("getDataMap"); -// return peerDHT.get(locationKey).all().start(); -// } - // Send signed payLoad to peer -// public FutureDirect sendData(PeerAddress peerAddress, Object payLoad) { -// // use 30 seconds as max idle time before connection get closed -// FuturePeerConnection futurePeerConnection = peerDHT.peer().createPeerConnection(peerAddress, 30000); -// FutureDirect futureDirect = peerDHT.peer().sendDirect(futurePeerConnection).object(payLoad).sign().start(); -// futureDirect.addListener(new BaseFutureListener() { -// @Override -// public void operationComplete(BaseFuture future) throws Exception { -// if (futureDirect.isSuccess()) { -// log.debug("sendMessage completed"); -// } -// else { -// log.error("sendData failed with Reason " + futureDirect.failedReason()); -// } -// } -// -// @Override -// public void exceptionCaught(Throwable t) throws Exception { -// log.error("Exception at sendData " + t.toString()); -// } -// }); -// -// return futureDirect; -// } -// + /////////////////////////////////////////////////////////////////////////////////////////// + // Add/remove/get from map: Domain protection, no data protection. Used for mailbox. For getting privacy we use encryption (not part of DHT infrastructure) + /////////////////////////////////////////////////////////////////////////////////////////// + + /** + * Add data to a map. For the entry contentKey of data is used (internally). + * Write access: Anyone can add entries. But nobody expect the domain owner can overwrite/remove an existing entry as it is protected by the domain owner. + * + * @param locationKey + * @param data + * @return + */ + public FuturePut addDataToMapOfProtectedDomain(Number160 locationKey, Data data, PublicKey publicKey) { + log.trace("addDataToMapOfProtectedDomain"); + log.trace("addDataToMapOfProtectedDomain with contentKey " + data.hash().toString()); + final Number160 pubKeyHashOfDomainOwner = Utils.makeSHAHash(publicKey.getEncoded()); + return peerDHT.add(locationKey).data(data).protectDomain().domainKey(pubKeyHashOfDomainOwner).start(); + } + + /** + * Remove entry from map for given locationKey. ContentKey of data is used for removing the entry. + * Access: Only the owner of the data entry can remove it, as it was written with entry protection. + * + * @param locationKey + * @param data + * @return + */ + public FutureRemove removeDataFromMapOfMyProtectedDomain(Number160 locationKey, Data data) { + log.trace("removeDataFromMapOfMyProtectedDomain"); + Number160 contentKey = data.hash(); + log.trace("removeDataFromMapOfMyProtectedDomain with contentKey " + contentKey.toString()); + return peerDHT.remove(locationKey).contentKey(contentKey).protectDomain().sign().domainKey(pubKeyHashForMyDomain).start(); + } + + /** + * Get map for given locationKey with all entries. + * Access: Everybody can read. + * + * @param locationKey + * @return + */ + public FutureGet getDataFromMapOfMyProtectedDomain(Number160 locationKey) { + log.trace("getDataFromMapOfMyProtectedDomain"); + return peerDHT.get(locationKey).all().domainKey(pubKeyHashForMyDomain).start(); + } } diff --git a/core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PMailboxService.java b/core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PMailboxService.java new file mode 100644 index 0000000000..4bea102f5d --- /dev/null +++ b/core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PMailboxService.java @@ -0,0 +1,256 @@ +/* + * This file is part of Bitsquare. + * + * Bitsquare is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bitsquare is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bitsquare. If not, see . + */ + +package io.bitsquare.p2p.tomp2p; + +import io.bitsquare.common.handlers.FaultHandler; +import io.bitsquare.common.handlers.ResultHandler; +import io.bitsquare.offer.Offer; +import io.bitsquare.offer.OfferBookService; +import io.bitsquare.p2p.AddressService; +import io.bitsquare.p2p.Message; +import io.bitsquare.p2p.Peer; +import io.bitsquare.p2p.listener.GetPeerAddressListener; +import io.bitsquare.user.User; + +import java.io.IOException; + +import java.security.PublicKey; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import javax.inject.Inject; + +import net.tomp2p.dht.FutureGet; +import net.tomp2p.dht.FuturePut; +import net.tomp2p.dht.FutureRemove; +import net.tomp2p.futures.BaseFuture; +import net.tomp2p.futures.BaseFutureAdapter; +import net.tomp2p.futures.BaseFutureListener; +import net.tomp2p.peers.Number160; +import net.tomp2p.peers.Number640; +import net.tomp2p.storage.Data; +import net.tomp2p.utils.Utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TomP2PMailboxService extends TomP2PDHTService implements AddressService { + private static final Logger log = LoggerFactory.getLogger(TomP2PMailboxService.class); + + + private final List offerRepositoryListeners = new ArrayList<>(); + + /////////////////////////////////////////////////////////////////////////////////////////// + // Constructor + /////////////////////////////////////////////////////////////////////////////////////////// + + @Inject + public TomP2PMailboxService(TomP2PNode tomP2PNode, User user) { + super(tomP2PNode, user); + } + + @Override + public void bootstrapCompleted() { + super.bootstrapCompleted(); + } + + @Override + public void shutDown() { + super.shutDown(); + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Find peer address by publicKey + /////////////////////////////////////////////////////////////////////////////////////////// + + // public void findPeerAddress(PublicKey publicKey, GetPeerAddressListener listener) { + // final Number160 locationKey = Utils.makeSHAHash(publicKey.getEncoded()); + + public void saveMessage(PublicKey publicKey, Message message, ResultHandler resultHandler, FaultHandler faultHandler) { + final Number160 locationKey = Utils.makeSHAHash(publicKey.getEncoded()); + + // Number160 locationKey = Number160.createHash(offer.getCurrency().getCurrencyCode()); + try { + final Data offerData = new Data(message); + + // the offer is default 30 days valid + int defaultOfferTTL = 30 * 24 * 60 * 60; + offerData.ttlSeconds(defaultOfferTTL); + log.trace("Add offer to DHT requested. Added data: [locationKey: " + locationKey + + ", hash: " + offerData.hash().toString() + "]"); + FuturePut futurePut = addProtectedDataToMap(locationKey, offerData); + futurePut.addListener(new BaseFutureListener() { + @Override + public void operationComplete(BaseFuture future) throws Exception { + if (future.isSuccess()) { + executor.execute(() -> { + resultHandler.handleResult(); + offerRepositoryListeners.stream().forEach(listener -> { + try { + Object offerDataObject = offerData.object(); + if (offerDataObject instanceof Offer) { + log.info("Added offer to DHT with ID: " + offerDataObject); + listener.onOfferAdded((Offer) offerDataObject); + } + } catch (ClassNotFoundException | IOException e) { + e.printStackTrace(); + log.error("Add offer to DHT failed: " + e.getMessage()); + } + }); + + log.trace("Add offer to DHT was successful. Added data: [locationKey: " + locationKey + + ", value: " + offerData + "]"); + }); + } + } + + @Override + public void exceptionCaught(Throwable ex) throws Exception { + executor.execute(() -> faultHandler.handleFault("Failed to add offer to DHT", ex)); + } + }); + } catch (IOException ex) { + executor.execute(() -> faultHandler.handleFault("Failed to add offer to DHT", ex)); + } + } + + public void removeOffer(Offer offer, ResultHandler resultHandler, FaultHandler faultHandler) { + Number160 locationKey = Number160.createHash(offer.getCurrency().getCurrencyCode()); + try { + final Data offerData = new Data(offer); + log.trace("Remove offer from DHT requested. Removed data: [locationKey: " + locationKey + + ", hash: " + offerData.hash().toString() + "]"); + FutureRemove futureRemove = removeProtectedDataFromMap(locationKey, offerData); + futureRemove.addListener(new BaseFutureListener() { + @Override + public void operationComplete(BaseFuture future) throws Exception { + // We don't test futureRemove.isSuccess() as this API does not fit well to that operation, + // it might change in future to something like foundAndRemoved and notFound + // See discussion at: https://github.com/tomp2p/TomP2P/issues/57#issuecomment-62069840 + log.trace("isRemoved? " + futureRemove.isRemoved()); + executor.execute(() -> { + resultHandler.handleResult(); + offerRepositoryListeners.stream().forEach(listener -> { + try { + Object offerDataObject = offerData.object(); + if (offerDataObject instanceof Offer) { + log.trace("Remove offer from DHT was successful. Removed data: [key: " + + locationKey + ", " + + "offer: " + offerDataObject + "]"); + listener.onOfferRemoved((Offer) offerDataObject); + } + } catch (ClassNotFoundException | IOException e) { + e.printStackTrace(); + log.error("Remove offer from DHT failed. Error: " + e.getMessage()); + faultHandler.handleFault("Remove offer from DHT failed. Error: " + e.getMessage(), e); + } + }); + }); + } + + @Override + public void exceptionCaught(Throwable t) throws Exception { + log.error("Remove offer from DHT failed. Error: " + t.getMessage()); + faultHandler.handleFault("Remove offer from DHT failed. Error: " + t.getMessage(), t); + } + }); + } catch (IOException e) { + e.printStackTrace(); + log.error("Remove offer from DHT failed. Error: " + e.getMessage()); + faultHandler.handleFault("Remove offer from DHT failed. Error: " + e.getMessage(), e); + } + } + + public void getOffers(String currencyCode) { + Number160 locationKey = Number160.createHash(currencyCode); + log.trace("Get offers from DHT requested for locationKey: " + locationKey); + FutureGet futureGet = getMap(locationKey); + futureGet.addListener(new BaseFutureAdapter() { + @Override + public void operationComplete(BaseFuture future) throws Exception { + if (future.isSuccess()) { + final Map dataMap = futureGet.dataMap(); + final List offers = new ArrayList<>(); + if (dataMap != null) { + for (Data offerData : dataMap.values()) { + try { + Object offerDataObject = offerData.object(); + if (offerDataObject instanceof Offer) { + offers.add((Offer) offerDataObject); + } + } catch (ClassNotFoundException | IOException e) { + e.printStackTrace(); + } + } + + executor.execute(() -> offerRepositoryListeners.stream().forEach(listener -> + listener.onOffersReceived(offers))); + } + + log.trace("Get offers from DHT was successful. Stored data: [key: " + locationKey + + ", values: " + futureGet.dataMap() + "]"); + } + else { + final Map dataMap = futureGet.dataMap(); + if (dataMap == null || dataMap.size() == 0) { + log.trace("Get offers from DHT delivered empty dataMap."); + executor.execute(() -> offerRepositoryListeners.stream().forEach(listener -> + listener.onOffersReceived(new ArrayList<>()))); + } + else { + log.error("Get offers from DHT was not successful with reason:" + future.failedReason()); + } + } + } + }); + } + + @Override + public void findPeerAddress(PublicKey publicKey, GetPeerAddressListener listener) { + final Number160 locationKey = Utils.makeSHAHash(publicKey.getEncoded()); + FutureGet futureGet = getDataOfProtectedDomain(locationKey, publicKey); + log.trace("findPeerAddress called"); + futureGet.addListener(new BaseFutureAdapter() { + @Override + public void operationComplete(BaseFuture baseFuture) throws Exception { + if (baseFuture.isSuccess() && futureGet.data() != null) { + final Peer peer = (Peer) futureGet.data().object(); + log.trace("Peer found in DHT. Peer = " + peer); + executor.execute(() -> listener.onResult(peer)); + } + else { + log.error("getPeerAddress failed. failedReason = " + baseFuture.failedReason()); + executor.execute(listener::onFailed); + } + } + }); + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Private + /////////////////////////////////////////////////////////////////////////////////////////// + + private Number160 getLocationKey(String currencyCode) { + return Number160.createHash(currencyCode + "mailbox"); + } + +} diff --git a/core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PNode.java b/core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PNode.java index 98c203324f..c926eb16de 100644 --- a/core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PNode.java +++ b/core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PNode.java @@ -18,6 +18,7 @@ package io.bitsquare.p2p.tomp2p; import io.bitsquare.BitsquareException; +import io.bitsquare.common.handlers.ResultHandler; import io.bitsquare.p2p.BootstrapState; import io.bitsquare.p2p.ClientNode; import io.bitsquare.p2p.ConnectionType; @@ -29,6 +30,9 @@ import com.google.common.util.concurrent.SettableFuture; import java.security.KeyPair; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + import javax.annotation.Nullable; import javax.inject.Inject; @@ -51,6 +55,7 @@ public class TomP2PNode implements ClientNode { private PeerDHT peerDHT; private BootstrappedPeerBuilder bootstrappedPeerBuilder; private final Subject bootstrapStateSubject; + private List resultHandlers = new CopyOnWriteArrayList<>(); /////////////////////////////////////////////////////////////////////////////////////////// @@ -89,6 +94,7 @@ public class TomP2PNode implements ClientNode { public void onSuccess(@Nullable PeerDHT peerDHT) { if (peerDHT != null) { TomP2PNode.this.peerDHT = peerDHT; + resultHandlers.stream().forEach(e -> e.handleResult()); bootstrapStateSubject.onCompleted(); } else { @@ -107,10 +113,6 @@ public class TomP2PNode implements ClientNode { return bootstrapStateSubject.asObservable(); } - public Observable getBootstrapStateAsObservable() { - return bootstrapStateSubject.asObservable(); - } - public PeerDHT getPeerDHT() { return peerDHT; } @@ -145,4 +147,12 @@ public class TomP2PNode implements ClientNode { public Node getBootstrapNodeAddress() { return bootstrappedPeerBuilder.getBootstrapNode(); } + + public void addResultHandler(ResultHandler resultHandler) { + resultHandlers.add(resultHandler); + } + + public void removeResultHandler(ResultHandler resultHandler) { + resultHandlers.remove(resultHandler); + } } diff --git a/core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PService.java b/core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PService.java index e627b9cdcb..af3d201055 100644 --- a/core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PService.java +++ b/core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PService.java @@ -17,7 +17,6 @@ package io.bitsquare.p2p.tomp2p; -import io.bitsquare.p2p.BootstrapState; import io.bitsquare.p2p.BaseP2PService; import javax.inject.Inject; @@ -25,9 +24,6 @@ import javax.inject.Inject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import rx.Observable; -import rx.Subscriber; - /** * That service delivers direct messaging and DHT functionality from the TomP2P library @@ -39,8 +35,6 @@ import rx.Subscriber; public class TomP2PService extends BaseP2PService { private static final Logger log = LoggerFactory.getLogger(TomP2PService.class); - private final Subscriber subscriber; - /////////////////////////////////////////////////////////////////////////////////////////// // Constructor @@ -48,25 +42,9 @@ public class TomP2PService extends BaseP2PService { @Inject public TomP2PService(TomP2PNode tomP2PNode) { - Observable bootstrapStateAsObservable = tomP2PNode.getBootstrapStateAsObservable(); - subscriber = new Subscriber() { - @Override - public void onCompleted() { - executor.execute(() -> { - peerDHT = tomP2PNode.getPeerDHT(); - subscriber.unsubscribe(); - bootstrapCompleted(); - }); - } - - @Override - public void onError(Throwable throwable) { - } - - @Override - public void onNext(BootstrapState bootstrapState) { - } - }; - bootstrapStateAsObservable.subscribe(subscriber); + tomP2PNode.addResultHandler(() -> { + peerDHT = tomP2PNode.getPeerDHT(); + bootstrapCompleted(); + }); } } diff --git a/core/src/main/resources/logback.xml b/core/src/main/resources/logback.xml index bddbd6e9e4..0921843c1f 100644 --- a/core/src/main/resources/logback.xml +++ b/core/src/main/resources/logback.xml @@ -33,7 +33,7 @@ - + diff --git a/core/src/test/java/io/bitsquare/trade/protocol/placeoffer/PlaceOfferProtocolTest.java b/core/src/test/java/io/bitsquare/trade/protocol/placeoffer/PlaceOfferProtocolTest.java index 4f6ee8ab3a..e6c5cc48c3 100644 --- a/core/src/test/java/io/bitsquare/trade/protocol/placeoffer/PlaceOfferProtocolTest.java +++ b/core/src/test/java/io/bitsquare/trade/protocol/placeoffer/PlaceOfferProtocolTest.java @@ -108,7 +108,7 @@ public class PlaceOfferProtocolTest { () -> { log.trace("message completed"); - offerBookService = new TomP2POfferBookService(tomP2PNode); + offerBookService = new TomP2POfferBookService(tomP2PNode, user); offerBookService.setExecutor(Threading.SAME_THREAD); } );