mirror of
https://github.com/haveno-dex/haveno.git
synced 2025-06-15 18:39:47 -04:00
Re-enable dht data protection
This commit is contained in:
parent
fa900219b2
commit
ec83feabfa
12 changed files with 463 additions and 161 deletions
|
@ -22,6 +22,7 @@ import io.bitsquare.arbitration.ArbitratorService;
|
||||||
import io.bitsquare.arbitration.listeners.ArbitratorListener;
|
import io.bitsquare.arbitration.listeners.ArbitratorListener;
|
||||||
import io.bitsquare.p2p.tomp2p.TomP2PDHTService;
|
import io.bitsquare.p2p.tomp2p.TomP2PDHTService;
|
||||||
import io.bitsquare.p2p.tomp2p.TomP2PNode;
|
import io.bitsquare.p2p.tomp2p.TomP2PNode;
|
||||||
|
import io.bitsquare.user.User;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
@ -56,8 +57,8 @@ public class TomP2PArbitratorService extends TomP2PDHTService implements Arbitra
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public TomP2PArbitratorService(TomP2PNode tomP2PNode) {
|
public TomP2PArbitratorService(TomP2PNode tomP2PNode, User user) {
|
||||||
super(tomP2PNode);
|
super(tomP2PNode, user);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -70,7 +71,7 @@ public class TomP2PArbitratorService extends TomP2PDHTService implements Arbitra
|
||||||
try {
|
try {
|
||||||
final Data arbitratorData = new Data(arbitrator);
|
final Data arbitratorData = new Data(arbitrator);
|
||||||
|
|
||||||
FuturePut addFuture = addProtectedData(locationKey, arbitratorData);
|
FuturePut addFuture = addProtectedDataToMap(locationKey, arbitratorData);
|
||||||
addFuture.addListener(new BaseFutureAdapter<BaseFuture>() {
|
addFuture.addListener(new BaseFutureAdapter<BaseFuture>() {
|
||||||
@Override
|
@Override
|
||||||
public void operationComplete(BaseFuture future) throws Exception {
|
public void operationComplete(BaseFuture future) throws Exception {
|
||||||
|
@ -104,7 +105,7 @@ public class TomP2PArbitratorService extends TomP2PDHTService implements Arbitra
|
||||||
public void removeArbitrator(Arbitrator arbitrator) throws IOException {
|
public void removeArbitrator(Arbitrator arbitrator) throws IOException {
|
||||||
Number160 locationKey = Number160.createHash(ARBITRATORS_ROOT);
|
Number160 locationKey = Number160.createHash(ARBITRATORS_ROOT);
|
||||||
final Data arbitratorData = new Data(arbitrator);
|
final Data arbitratorData = new Data(arbitrator);
|
||||||
FutureRemove removeFuture = removeFromDataMap(locationKey, arbitratorData);
|
FutureRemove removeFuture = removeProtectedDataFromMap(locationKey, arbitratorData);
|
||||||
removeFuture.addListener(new BaseFutureAdapter<BaseFuture>() {
|
removeFuture.addListener(new BaseFutureAdapter<BaseFuture>() {
|
||||||
@Override
|
@Override
|
||||||
public void operationComplete(BaseFuture future) throws Exception {
|
public void operationComplete(BaseFuture future) throws Exception {
|
||||||
|
@ -135,7 +136,7 @@ public class TomP2PArbitratorService extends TomP2PDHTService implements Arbitra
|
||||||
|
|
||||||
public void getArbitrators(Locale languageLocale) {
|
public void getArbitrators(Locale languageLocale) {
|
||||||
Number160 locationKey = Number160.createHash(ARBITRATORS_ROOT);
|
Number160 locationKey = Number160.createHash(ARBITRATORS_ROOT);
|
||||||
FutureGet futureGet = getDataMap(locationKey);
|
FutureGet futureGet = getMap(locationKey);
|
||||||
futureGet.addListener(new BaseFutureAdapter<BaseFuture>() {
|
futureGet.addListener(new BaseFutureAdapter<BaseFuture>() {
|
||||||
@Override
|
@Override
|
||||||
public void operationComplete(BaseFuture future) throws Exception {
|
public void operationComplete(BaseFuture future) throws Exception {
|
||||||
|
|
|
@ -155,7 +155,7 @@ class TakeOfferViewModel extends ActivatableWithDataModel<TakeOfferDataModel> im
|
||||||
|
|
||||||
switch (state) {
|
switch (state) {
|
||||||
case UNKNOWN:
|
case UNKNOWN:
|
||||||
log.error("Must not happen.");
|
log.error("Offer state is UNKNOWN. That must not happen.");
|
||||||
break;
|
break;
|
||||||
case AVAILABLE:
|
case AVAILABLE:
|
||||||
this.state.set(State.AMOUNT_SCREEN);
|
this.state.set(State.AMOUNT_SCREEN);
|
||||||
|
|
|
@ -23,6 +23,7 @@ import io.bitsquare.offer.Offer;
|
||||||
import io.bitsquare.offer.OfferBookService;
|
import io.bitsquare.offer.OfferBookService;
|
||||||
import io.bitsquare.p2p.tomp2p.TomP2PDHTService;
|
import io.bitsquare.p2p.tomp2p.TomP2PDHTService;
|
||||||
import io.bitsquare.p2p.tomp2p.TomP2PNode;
|
import io.bitsquare.p2p.tomp2p.TomP2PNode;
|
||||||
|
import io.bitsquare.user.User;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
@ -57,8 +58,8 @@ public class TomP2POfferBookService extends TomP2PDHTService implements OfferBoo
|
||||||
|
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public TomP2POfferBookService(TomP2PNode tomP2PNode) {
|
public TomP2POfferBookService(TomP2PNode tomP2PNode, User user) {
|
||||||
super(tomP2PNode);
|
super(tomP2PNode, user);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -72,7 +73,7 @@ public class TomP2POfferBookService extends TomP2PDHTService implements OfferBoo
|
||||||
offerData.ttlSeconds(defaultOfferTTL);
|
offerData.ttlSeconds(defaultOfferTTL);
|
||||||
log.trace("Add offer to DHT requested. Added data: [locationKey: " + locationKey +
|
log.trace("Add offer to DHT requested. Added data: [locationKey: " + locationKey +
|
||||||
", hash: " + offerData.hash().toString() + "]");
|
", hash: " + offerData.hash().toString() + "]");
|
||||||
FuturePut futurePut = addProtectedData(locationKey, offerData);
|
FuturePut futurePut = addProtectedDataToMap(locationKey, offerData);
|
||||||
futurePut.addListener(new BaseFutureListener<BaseFuture>() {
|
futurePut.addListener(new BaseFutureListener<BaseFuture>() {
|
||||||
@Override
|
@Override
|
||||||
public void operationComplete(BaseFuture future) throws Exception {
|
public void operationComplete(BaseFuture future) throws Exception {
|
||||||
|
@ -115,7 +116,7 @@ public class TomP2POfferBookService extends TomP2PDHTService implements OfferBoo
|
||||||
final Data offerData = new Data(offer);
|
final Data offerData = new Data(offer);
|
||||||
log.trace("Remove offer from DHT requested. Removed data: [locationKey: " + locationKey +
|
log.trace("Remove offer from DHT requested. Removed data: [locationKey: " + locationKey +
|
||||||
", hash: " + offerData.hash().toString() + "]");
|
", hash: " + offerData.hash().toString() + "]");
|
||||||
FutureRemove futureRemove = removeFromDataMap(locationKey, offerData);
|
FutureRemove futureRemove = removeProtectedDataFromMap(locationKey, offerData);
|
||||||
futureRemove.addListener(new BaseFutureListener<BaseFuture>() {
|
futureRemove.addListener(new BaseFutureListener<BaseFuture>() {
|
||||||
@Override
|
@Override
|
||||||
public void operationComplete(BaseFuture future) throws Exception {
|
public void operationComplete(BaseFuture future) throws Exception {
|
||||||
|
@ -160,7 +161,7 @@ public class TomP2POfferBookService extends TomP2PDHTService implements OfferBoo
|
||||||
public void getOffers(String currencyCode) {
|
public void getOffers(String currencyCode) {
|
||||||
Number160 locationKey = Number160.createHash(currencyCode);
|
Number160 locationKey = Number160.createHash(currencyCode);
|
||||||
log.trace("Get offers from DHT requested for locationKey: " + locationKey);
|
log.trace("Get offers from DHT requested for locationKey: " + locationKey);
|
||||||
FutureGet futureGet = getDataMap(locationKey);
|
FutureGet futureGet = getMap(locationKey);
|
||||||
futureGet.addListener(new BaseFutureAdapter<BaseFuture>() {
|
futureGet.addListener(new BaseFutureAdapter<BaseFuture>() {
|
||||||
@Override
|
@Override
|
||||||
public void operationComplete(BaseFuture future) throws Exception {
|
public void operationComplete(BaseFuture future) throws Exception {
|
||||||
|
|
|
@ -29,11 +29,12 @@ public class BaseP2PService implements P2PService {
|
||||||
BaseP2PService.userThread = userThread;
|
BaseP2PService.userThread = userThread;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Executor executor = userThread;
|
protected Executor executor;
|
||||||
protected PeerDHT peerDHT;
|
protected PeerDHT peerDHT;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void bootstrapCompleted() {
|
public void bootstrapCompleted() {
|
||||||
|
this.executor = BaseP2PService.userThread;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -27,17 +27,17 @@ import net.tomp2p.storage.Data;
|
||||||
|
|
||||||
public interface DHTService extends P2PService {
|
public interface DHTService extends P2PService {
|
||||||
|
|
||||||
FuturePut putDomainProtectedData(Number160 locationKey, Data data);
|
|
||||||
|
|
||||||
FuturePut putData(Number160 locationKey, Data data);
|
FuturePut putData(Number160 locationKey, Data data);
|
||||||
|
|
||||||
FutureGet getDomainProtectedData(Number160 locationKey, PublicKey publicKey);
|
|
||||||
|
|
||||||
FutureGet getData(Number160 locationKey);
|
FutureGet getData(Number160 locationKey);
|
||||||
|
|
||||||
FuturePut addProtectedData(Number160 locationKey, Data data);
|
FuturePut putDataToMyProtectedDomain(Number160 locationKey, Data data);
|
||||||
|
|
||||||
FutureRemove removeFromDataMap(Number160 locationKey, Data data);
|
FutureGet getDataOfProtectedDomain(Number160 locationKey, PublicKey publicKey);
|
||||||
|
|
||||||
FutureGet getDataMap(Number160 locationKey);
|
FuturePut addProtectedDataToMap(Number160 locationKey, Data data);
|
||||||
|
|
||||||
|
FutureRemove removeProtectedDataFromMap(Number160 locationKey, Data data);
|
||||||
|
|
||||||
|
FutureGet getMap(Number160 locationKey);
|
||||||
}
|
}
|
||||||
|
|
|
@ -64,7 +64,7 @@ public class TomP2PAddressService extends TomP2PDHTService implements AddressSer
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public TomP2PAddressService(TomP2PNode tomP2PNode, User user) {
|
public TomP2PAddressService(TomP2PNode tomP2PNode, User user) {
|
||||||
super(tomP2PNode);
|
super(tomP2PNode, user);
|
||||||
|
|
||||||
locationKey = Utils.makeSHAHash(user.getMessageKeyPair().getPublic().getEncoded());
|
locationKey = Utils.makeSHAHash(user.getMessageKeyPair().getPublic().getEncoded());
|
||||||
}
|
}
|
||||||
|
@ -93,7 +93,7 @@ public class TomP2PAddressService extends TomP2PDHTService implements AddressSer
|
||||||
@Override
|
@Override
|
||||||
public void findPeerAddress(PublicKey publicKey, GetPeerAddressListener listener) {
|
public void findPeerAddress(PublicKey publicKey, GetPeerAddressListener listener) {
|
||||||
final Number160 locationKey = Utils.makeSHAHash(publicKey.getEncoded());
|
final Number160 locationKey = Utils.makeSHAHash(publicKey.getEncoded());
|
||||||
FutureGet futureGet = getDomainProtectedData(locationKey, publicKey);
|
FutureGet futureGet = getDataOfProtectedDomain(locationKey, publicKey);
|
||||||
log.trace("findPeerAddress called");
|
log.trace("findPeerAddress called");
|
||||||
futureGet.addListener(new BaseFutureAdapter<BaseFuture>() {
|
futureGet.addListener(new BaseFutureAdapter<BaseFuture>() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -144,7 +144,7 @@ public class TomP2PAddressService extends TomP2PDHTService implements AddressSer
|
||||||
// We set a short time-to-live to make getAddress checks fail fast in case if the offerer is offline and to support cheap offerbook state updates
|
// We set a short time-to-live to make getAddress checks fail fast in case if the offerer is offline and to support cheap offerbook state updates
|
||||||
data.ttlSeconds(ADDRESS_TTL);
|
data.ttlSeconds(ADDRESS_TTL);
|
||||||
log.debug("storePeerAddress " + peerDHT.peerAddress().toString());
|
log.debug("storePeerAddress " + peerDHT.peerAddress().toString());
|
||||||
FuturePut futurePut = putDomainProtectedData(locationKey, data);
|
FuturePut futurePut = putDataToMyProtectedDomain(locationKey, data);
|
||||||
futurePut.addListener(new BaseFutureListener<BaseFuture>() {
|
futurePut.addListener(new BaseFutureListener<BaseFuture>() {
|
||||||
@Override
|
@Override
|
||||||
public void operationComplete(BaseFuture future) throws Exception {
|
public void operationComplete(BaseFuture future) throws Exception {
|
||||||
|
@ -173,7 +173,7 @@ public class TomP2PAddressService extends TomP2PDHTService implements AddressSer
|
||||||
private void removeAddress() {
|
private void removeAddress() {
|
||||||
try {
|
try {
|
||||||
Data data = new Data(new TomP2PPeer(peerDHT.peerAddress()));
|
Data data = new Data(new TomP2PPeer(peerDHT.peerAddress()));
|
||||||
removeFromDataMap(locationKey, data).awaitUninterruptibly(1000);
|
removeProtectedDataFromMap(locationKey, data).awaitUninterruptibly(1000);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
log.error("Exception at removeAddress " + e.toString());
|
log.error("Exception at removeAddress " + e.toString());
|
||||||
|
|
|
@ -18,7 +18,9 @@
|
||||||
package io.bitsquare.p2p.tomp2p;
|
package io.bitsquare.p2p.tomp2p;
|
||||||
|
|
||||||
import io.bitsquare.p2p.DHTService;
|
import io.bitsquare.p2p.DHTService;
|
||||||
|
import io.bitsquare.user.User;
|
||||||
|
|
||||||
|
import java.security.KeyPair;
|
||||||
import java.security.PublicKey;
|
import java.security.PublicKey;
|
||||||
|
|
||||||
import javax.inject.Inject;
|
import javax.inject.Inject;
|
||||||
|
@ -26,14 +28,18 @@ import javax.inject.Inject;
|
||||||
import net.tomp2p.dht.FutureGet;
|
import net.tomp2p.dht.FutureGet;
|
||||||
import net.tomp2p.dht.FuturePut;
|
import net.tomp2p.dht.FuturePut;
|
||||||
import net.tomp2p.dht.FutureRemove;
|
import net.tomp2p.dht.FutureRemove;
|
||||||
|
import net.tomp2p.dht.StorageLayer;
|
||||||
import net.tomp2p.peers.Number160;
|
import net.tomp2p.peers.Number160;
|
||||||
import net.tomp2p.storage.Data;
|
import net.tomp2p.storage.Data;
|
||||||
|
import net.tomp2p.utils.Utils;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
public class TomP2PDHTService extends TomP2PService implements DHTService {
|
public class TomP2PDHTService extends TomP2PService implements DHTService {
|
||||||
private static final Logger log = LoggerFactory.getLogger(TomP2PDHTService.class);
|
private static final Logger log = LoggerFactory.getLogger(TomP2PDHTService.class);
|
||||||
|
private final KeyPair keyPair;
|
||||||
|
private final Number160 pubKeyHashForMyDomain;
|
||||||
|
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -41,135 +47,184 @@ public class TomP2PDHTService extends TomP2PService implements DHTService {
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public TomP2PDHTService(TomP2PNode tomP2PNode) {
|
public TomP2PDHTService(TomP2PNode tomP2PNode, User user) {
|
||||||
super(tomP2PNode);
|
super(tomP2PNode);
|
||||||
}
|
keyPair = user.getMessageKeyPair();
|
||||||
|
pubKeyHashForMyDomain = Utils.makeSHAHash(keyPair.getPublic().getEncoded());
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
|
||||||
// DHT methods
|
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
// TODO remove all security features for the moment. There are some problems with a "wrong signature!" msg in
|
|
||||||
// the logs
|
|
||||||
@Override
|
|
||||||
public FuturePut putDomainProtectedData(Number160 locationKey, Data data) {
|
|
||||||
log.trace("putDomainProtectedData");
|
|
||||||
return peerDHT.put(locationKey).data(data).start();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
public void bootstrapCompleted() {
|
||||||
|
super.bootstrapCompleted();
|
||||||
|
|
||||||
|
StorageLayer.ProtectionEnable protectionDomainEnable = StorageLayer.ProtectionEnable.ALL;
|
||||||
|
StorageLayer.ProtectionMode protectionDomainMode = StorageLayer.ProtectionMode.MASTER_PUBLIC_KEY;
|
||||||
|
StorageLayer.ProtectionEnable protectionEntryEnable = StorageLayer.ProtectionEnable.ALL;
|
||||||
|
StorageLayer.ProtectionMode protectionEntryMode = StorageLayer.ProtectionMode.MASTER_PUBLIC_KEY;
|
||||||
|
|
||||||
|
peerDHT.storageLayer().protection(protectionDomainEnable, protectionDomainMode, protectionEntryEnable, protectionEntryMode);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Put/Get: Public access. Used for offerbook invalidation timestamp
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Store data to given location key.
|
||||||
|
* Write access: Anyone with locationKey
|
||||||
|
*
|
||||||
|
* @param locationKey
|
||||||
|
* @param data
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
public FuturePut putData(Number160 locationKey, Data data) {
|
public FuturePut putData(Number160 locationKey, Data data) {
|
||||||
log.trace("putData");
|
log.trace("putData");
|
||||||
return peerDHT.put(locationKey).data(data).start();
|
return peerDHT.put(locationKey).data(data).start();
|
||||||
}
|
}
|
||||||
|
// No protection, everybody can read.
|
||||||
|
|
||||||
@Override
|
/**
|
||||||
public FutureGet getDomainProtectedData(Number160 locationKey, PublicKey publicKey) {
|
* Get data for given locationKey
|
||||||
log.trace("getDomainProtectedData");
|
* Read access: Anyone with locationKey
|
||||||
return peerDHT.get(locationKey).start();
|
*
|
||||||
}
|
* @param locationKey
|
||||||
|
* @return
|
||||||
@Override
|
*/
|
||||||
public FutureGet getData(Number160 locationKey) {
|
public FutureGet getData(Number160 locationKey) {
|
||||||
//log.trace("getData");
|
log.trace("getData");
|
||||||
return peerDHT.get(locationKey).start();
|
return peerDHT.get(locationKey).start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Put/Get: Domain protected, entry protected. Used for storing address.
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
@Override
|
/**
|
||||||
public FuturePut addProtectedData(Number160 locationKey, Data data) {
|
* Store data to given location key and my domain.
|
||||||
log.trace("addProtectedData");
|
* Write access: Anybody who has pubKey if domain is not used before. KeyPair owner of pubKey can overwrite and reserve that domain.
|
||||||
return peerDHT.add(locationKey).data(data).start();
|
* We save early an entry so we have that domain reserved and nobody else can use it.
|
||||||
|
* Additionally we use entry protection, so domain owner is data owner.
|
||||||
|
*
|
||||||
|
* @param locationKey
|
||||||
|
* @param data
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public FuturePut putDataToMyProtectedDomain(Number160 locationKey, Data data) {
|
||||||
|
log.trace("putDataToMyProtectedDomain");
|
||||||
|
data.protectEntry(keyPair).sign();
|
||||||
|
return peerDHT.put(locationKey).data(data).sign().protectDomain().domainKey(pubKeyHashForMyDomain).start();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
/**
|
||||||
public FutureRemove removeFromDataMap(Number160 locationKey, Data data) {
|
* Read data for given location and publicKey of that domain.
|
||||||
|
* Read access: Anyone who has publicKey
|
||||||
|
*
|
||||||
|
* @param locationKey
|
||||||
|
* @param publicKey
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public FutureGet getDataOfProtectedDomain(Number160 locationKey, PublicKey publicKey) {
|
||||||
|
log.trace("getDataOfProtectedDomain");
|
||||||
|
final Number160 pubKeyHashOfDomainOwner = Utils.makeSHAHash(publicKey.getEncoded());
|
||||||
|
return peerDHT.get(locationKey).domainKey(pubKeyHashOfDomainOwner).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Add/remove/get from map: Entry protected, no domain protection. Used for offerbook and arbitrators
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add data to a map. For the entry contentKey of data is used (internally).
|
||||||
|
* Write access: Anyone can add entries. But nobody can overwrite an existing entry as it is protected by data protection.
|
||||||
|
*
|
||||||
|
* @param locationKey
|
||||||
|
* @param data
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public FuturePut addProtectedDataToMap(Number160 locationKey, Data data) {
|
||||||
|
log.trace("addProtectedDataToMap");
|
||||||
|
data.protectEntry(keyPair).sign();
|
||||||
|
log.trace("addProtectedDataToMap with contentKey " + data.hash().toString());
|
||||||
|
return peerDHT.add(locationKey).data(data).sign().start();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove entry from map for given locationKey. ContentKey of data is used for removing the entry.
|
||||||
|
* Access: Only the owner of the data entry can remove it, as it was written with entry protection.
|
||||||
|
*
|
||||||
|
* @param locationKey
|
||||||
|
* @param data
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public FutureRemove removeProtectedDataFromMap(Number160 locationKey, Data data) {
|
||||||
|
log.trace("removeProtectedDataFromMap");
|
||||||
Number160 contentKey = data.hash();
|
Number160 contentKey = data.hash();
|
||||||
log.trace("removeFromDataMap with contentKey " + contentKey.toString());
|
log.trace("removeProtectedDataFromMap with contentKey " + contentKey.toString());
|
||||||
return peerDHT.remove(locationKey).contentKey(contentKey).start();
|
return peerDHT.remove(locationKey).contentKey(contentKey).sign().start();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
/**
|
||||||
public FutureGet getDataMap(Number160 locationKey) {
|
* Get map for given locationKey with all entries.
|
||||||
log.trace("getDataMap");
|
* Access: Everybody can read.
|
||||||
|
*
|
||||||
|
* @param locationKey
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public FutureGet getMap(Number160 locationKey) {
|
||||||
|
log.trace("getMap");
|
||||||
return peerDHT.get(locationKey).all().start();
|
return peerDHT.get(locationKey).all().start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
//
|
|
||||||
// public FuturePut putDomainProtectedData(Number160 locationKey, Data data) {
|
|
||||||
// log.trace("putDomainProtectedData");
|
|
||||||
// data.protectEntry(keyPair);
|
|
||||||
// final Number160 ownerKeyHash = Utils.makeSHAHash(keyPair.getPublic().getEncoded());
|
|
||||||
// return peerDHT.put(locationKey).data(data).keyPair(keyPair).domainKey(ownerKeyHash).protectDomain().start();
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // No protection, everybody can write.
|
|
||||||
// public FuturePut putData(Number160 locationKey, Data data) {
|
|
||||||
// log.trace("putData");
|
|
||||||
// return peerDHT.put(locationKey).data(data).start();
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // Not public readable. Only users with the public key of the peer who stored the data can read that data
|
|
||||||
// public FutureGet getDomainProtectedData(Number160 locationKey, PublicKey publicKey) {
|
|
||||||
// log.trace("getDomainProtectedData");
|
|
||||||
// final Number160 ownerKeyHash = Utils.makeSHAHash(publicKey.getEncoded());
|
|
||||||
// return peerDHT.get(locationKey).domainKey(ownerKeyHash).start();
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // No protection, everybody can read.
|
|
||||||
// public FutureGet getData(Number160 locationKey) {
|
|
||||||
// log.trace("getData");
|
|
||||||
// return peerDHT.get(locationKey).start();
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // No domain protection, but entry protection
|
|
||||||
// public FuturePut addProtectedData(Number160 locationKey, Data data) {
|
|
||||||
// log.trace("addProtectedData");
|
|
||||||
// data.protectEntry(keyPair);
|
|
||||||
// log.trace("addProtectedData with contentKey " + data.hash().toString());
|
|
||||||
// return peerDHT.add(locationKey).data(data).keyPair(keyPair).start();
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // No domain protection, but entry protection
|
|
||||||
// public FutureRemove removeFromDataMap(Number160 locationKey, Data data) {
|
|
||||||
// log.trace("removeFromDataMap");
|
|
||||||
// Number160 contentKey = data.hash();
|
|
||||||
// log.trace("removeFromDataMap with contentKey " + contentKey.toString());
|
|
||||||
// return peerDHT.remove(locationKey).contentKey(contentKey).keyPair(keyPair).start();
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // Public readable
|
|
||||||
// public FutureGet getDataMap(Number160 locationKey) {
|
|
||||||
// log.trace("getDataMap");
|
|
||||||
// return peerDHT.get(locationKey).all().start();
|
|
||||||
// }
|
|
||||||
|
|
||||||
// Send signed payLoad to peer
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
// public FutureDirect sendData(PeerAddress peerAddress, Object payLoad) {
|
// Add/remove/get from map: Domain protection, no data protection. Used for mailbox. For getting privacy we use encryption (not part of DHT infrastructure)
|
||||||
// // use 30 seconds as max idle time before connection get closed
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
// FuturePeerConnection futurePeerConnection = peerDHT.peer().createPeerConnection(peerAddress, 30000);
|
|
||||||
// FutureDirect futureDirect = peerDHT.peer().sendDirect(futurePeerConnection).object(payLoad).sign().start();
|
/**
|
||||||
// futureDirect.addListener(new BaseFutureListener<BaseFuture>() {
|
* Add data to a map. For the entry contentKey of data is used (internally).
|
||||||
// @Override
|
* Write access: Anyone can add entries. But nobody expect the domain owner can overwrite/remove an existing entry as it is protected by the domain owner.
|
||||||
// public void operationComplete(BaseFuture future) throws Exception {
|
*
|
||||||
// if (futureDirect.isSuccess()) {
|
* @param locationKey
|
||||||
// log.debug("sendMessage completed");
|
* @param data
|
||||||
// }
|
* @return
|
||||||
// else {
|
*/
|
||||||
// log.error("sendData failed with Reason " + futureDirect.failedReason());
|
public FuturePut addDataToMapOfProtectedDomain(Number160 locationKey, Data data, PublicKey publicKey) {
|
||||||
// }
|
log.trace("addDataToMapOfProtectedDomain");
|
||||||
// }
|
log.trace("addDataToMapOfProtectedDomain with contentKey " + data.hash().toString());
|
||||||
//
|
final Number160 pubKeyHashOfDomainOwner = Utils.makeSHAHash(publicKey.getEncoded());
|
||||||
// @Override
|
return peerDHT.add(locationKey).data(data).protectDomain().domainKey(pubKeyHashOfDomainOwner).start();
|
||||||
// public void exceptionCaught(Throwable t) throws Exception {
|
}
|
||||||
// log.error("Exception at sendData " + t.toString());
|
|
||||||
// }
|
/**
|
||||||
// });
|
* Remove entry from map for given locationKey. ContentKey of data is used for removing the entry.
|
||||||
//
|
* Access: Only the owner of the data entry can remove it, as it was written with entry protection.
|
||||||
// return futureDirect;
|
*
|
||||||
// }
|
* @param locationKey
|
||||||
//
|
* @param data
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public FutureRemove removeDataFromMapOfMyProtectedDomain(Number160 locationKey, Data data) {
|
||||||
|
log.trace("removeDataFromMapOfMyProtectedDomain");
|
||||||
|
Number160 contentKey = data.hash();
|
||||||
|
log.trace("removeDataFromMapOfMyProtectedDomain with contentKey " + contentKey.toString());
|
||||||
|
return peerDHT.remove(locationKey).contentKey(contentKey).protectDomain().sign().domainKey(pubKeyHashForMyDomain).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get map for given locationKey with all entries.
|
||||||
|
* Access: Everybody can read.
|
||||||
|
*
|
||||||
|
* @param locationKey
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public FutureGet getDataFromMapOfMyProtectedDomain(Number160 locationKey) {
|
||||||
|
log.trace("getDataFromMapOfMyProtectedDomain");
|
||||||
|
return peerDHT.get(locationKey).all().domainKey(pubKeyHashForMyDomain).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,256 @@
|
||||||
|
/*
|
||||||
|
* This file is part of Bitsquare.
|
||||||
|
*
|
||||||
|
* Bitsquare is free software: you can redistribute it and/or modify it
|
||||||
|
* under the terms of the GNU Affero General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or (at
|
||||||
|
* your option) any later version.
|
||||||
|
*
|
||||||
|
* Bitsquare is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
|
||||||
|
* License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.bitsquare.p2p.tomp2p;
|
||||||
|
|
||||||
|
import io.bitsquare.common.handlers.FaultHandler;
|
||||||
|
import io.bitsquare.common.handlers.ResultHandler;
|
||||||
|
import io.bitsquare.offer.Offer;
|
||||||
|
import io.bitsquare.offer.OfferBookService;
|
||||||
|
import io.bitsquare.p2p.AddressService;
|
||||||
|
import io.bitsquare.p2p.Message;
|
||||||
|
import io.bitsquare.p2p.Peer;
|
||||||
|
import io.bitsquare.p2p.listener.GetPeerAddressListener;
|
||||||
|
import io.bitsquare.user.User;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import java.security.PublicKey;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import javax.inject.Inject;
|
||||||
|
|
||||||
|
import net.tomp2p.dht.FutureGet;
|
||||||
|
import net.tomp2p.dht.FuturePut;
|
||||||
|
import net.tomp2p.dht.FutureRemove;
|
||||||
|
import net.tomp2p.futures.BaseFuture;
|
||||||
|
import net.tomp2p.futures.BaseFutureAdapter;
|
||||||
|
import net.tomp2p.futures.BaseFutureListener;
|
||||||
|
import net.tomp2p.peers.Number160;
|
||||||
|
import net.tomp2p.peers.Number640;
|
||||||
|
import net.tomp2p.storage.Data;
|
||||||
|
import net.tomp2p.utils.Utils;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
public class TomP2PMailboxService extends TomP2PDHTService implements AddressService {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(TomP2PMailboxService.class);
|
||||||
|
|
||||||
|
|
||||||
|
private final List<OfferBookService.Listener> offerRepositoryListeners = new ArrayList<>();
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Constructor
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public TomP2PMailboxService(TomP2PNode tomP2PNode, User user) {
|
||||||
|
super(tomP2PNode, user);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void bootstrapCompleted() {
|
||||||
|
super.bootstrapCompleted();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shutDown() {
|
||||||
|
super.shutDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Find peer address by publicKey
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
// public void findPeerAddress(PublicKey publicKey, GetPeerAddressListener listener) {
|
||||||
|
// final Number160 locationKey = Utils.makeSHAHash(publicKey.getEncoded());
|
||||||
|
|
||||||
|
public void saveMessage(PublicKey publicKey, Message message, ResultHandler resultHandler, FaultHandler faultHandler) {
|
||||||
|
final Number160 locationKey = Utils.makeSHAHash(publicKey.getEncoded());
|
||||||
|
|
||||||
|
// Number160 locationKey = Number160.createHash(offer.getCurrency().getCurrencyCode());
|
||||||
|
try {
|
||||||
|
final Data offerData = new Data(message);
|
||||||
|
|
||||||
|
// the offer is default 30 days valid
|
||||||
|
int defaultOfferTTL = 30 * 24 * 60 * 60;
|
||||||
|
offerData.ttlSeconds(defaultOfferTTL);
|
||||||
|
log.trace("Add offer to DHT requested. Added data: [locationKey: " + locationKey +
|
||||||
|
", hash: " + offerData.hash().toString() + "]");
|
||||||
|
FuturePut futurePut = addProtectedDataToMap(locationKey, offerData);
|
||||||
|
futurePut.addListener(new BaseFutureListener<BaseFuture>() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(BaseFuture future) throws Exception {
|
||||||
|
if (future.isSuccess()) {
|
||||||
|
executor.execute(() -> {
|
||||||
|
resultHandler.handleResult();
|
||||||
|
offerRepositoryListeners.stream().forEach(listener -> {
|
||||||
|
try {
|
||||||
|
Object offerDataObject = offerData.object();
|
||||||
|
if (offerDataObject instanceof Offer) {
|
||||||
|
log.info("Added offer to DHT with ID: " + offerDataObject);
|
||||||
|
listener.onOfferAdded((Offer) offerDataObject);
|
||||||
|
}
|
||||||
|
} catch (ClassNotFoundException | IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
log.error("Add offer to DHT failed: " + e.getMessage());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
log.trace("Add offer to DHT was successful. Added data: [locationKey: " + locationKey +
|
||||||
|
", value: " + offerData + "]");
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(Throwable ex) throws Exception {
|
||||||
|
executor.execute(() -> faultHandler.handleFault("Failed to add offer to DHT", ex));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (IOException ex) {
|
||||||
|
executor.execute(() -> faultHandler.handleFault("Failed to add offer to DHT", ex));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void removeOffer(Offer offer, ResultHandler resultHandler, FaultHandler faultHandler) {
|
||||||
|
Number160 locationKey = Number160.createHash(offer.getCurrency().getCurrencyCode());
|
||||||
|
try {
|
||||||
|
final Data offerData = new Data(offer);
|
||||||
|
log.trace("Remove offer from DHT requested. Removed data: [locationKey: " + locationKey +
|
||||||
|
", hash: " + offerData.hash().toString() + "]");
|
||||||
|
FutureRemove futureRemove = removeProtectedDataFromMap(locationKey, offerData);
|
||||||
|
futureRemove.addListener(new BaseFutureListener<BaseFuture>() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(BaseFuture future) throws Exception {
|
||||||
|
// We don't test futureRemove.isSuccess() as this API does not fit well to that operation,
|
||||||
|
// it might change in future to something like foundAndRemoved and notFound
|
||||||
|
// See discussion at: https://github.com/tomp2p/TomP2P/issues/57#issuecomment-62069840
|
||||||
|
log.trace("isRemoved? " + futureRemove.isRemoved());
|
||||||
|
executor.execute(() -> {
|
||||||
|
resultHandler.handleResult();
|
||||||
|
offerRepositoryListeners.stream().forEach(listener -> {
|
||||||
|
try {
|
||||||
|
Object offerDataObject = offerData.object();
|
||||||
|
if (offerDataObject instanceof Offer) {
|
||||||
|
log.trace("Remove offer from DHT was successful. Removed data: [key: " +
|
||||||
|
locationKey + ", " +
|
||||||
|
"offer: " + offerDataObject + "]");
|
||||||
|
listener.onOfferRemoved((Offer) offerDataObject);
|
||||||
|
}
|
||||||
|
} catch (ClassNotFoundException | IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
log.error("Remove offer from DHT failed. Error: " + e.getMessage());
|
||||||
|
faultHandler.handleFault("Remove offer from DHT failed. Error: " + e.getMessage(), e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(Throwable t) throws Exception {
|
||||||
|
log.error("Remove offer from DHT failed. Error: " + t.getMessage());
|
||||||
|
faultHandler.handleFault("Remove offer from DHT failed. Error: " + t.getMessage(), t);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
log.error("Remove offer from DHT failed. Error: " + e.getMessage());
|
||||||
|
faultHandler.handleFault("Remove offer from DHT failed. Error: " + e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void getOffers(String currencyCode) {
|
||||||
|
Number160 locationKey = Number160.createHash(currencyCode);
|
||||||
|
log.trace("Get offers from DHT requested for locationKey: " + locationKey);
|
||||||
|
FutureGet futureGet = getMap(locationKey);
|
||||||
|
futureGet.addListener(new BaseFutureAdapter<BaseFuture>() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(BaseFuture future) throws Exception {
|
||||||
|
if (future.isSuccess()) {
|
||||||
|
final Map<Number640, Data> dataMap = futureGet.dataMap();
|
||||||
|
final List<Offer> offers = new ArrayList<>();
|
||||||
|
if (dataMap != null) {
|
||||||
|
for (Data offerData : dataMap.values()) {
|
||||||
|
try {
|
||||||
|
Object offerDataObject = offerData.object();
|
||||||
|
if (offerDataObject instanceof Offer) {
|
||||||
|
offers.add((Offer) offerDataObject);
|
||||||
|
}
|
||||||
|
} catch (ClassNotFoundException | IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
executor.execute(() -> offerRepositoryListeners.stream().forEach(listener ->
|
||||||
|
listener.onOffersReceived(offers)));
|
||||||
|
}
|
||||||
|
|
||||||
|
log.trace("Get offers from DHT was successful. Stored data: [key: " + locationKey
|
||||||
|
+ ", values: " + futureGet.dataMap() + "]");
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
final Map<Number640, Data> dataMap = futureGet.dataMap();
|
||||||
|
if (dataMap == null || dataMap.size() == 0) {
|
||||||
|
log.trace("Get offers from DHT delivered empty dataMap.");
|
||||||
|
executor.execute(() -> offerRepositoryListeners.stream().forEach(listener ->
|
||||||
|
listener.onOffersReceived(new ArrayList<>())));
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
log.error("Get offers from DHT was not successful with reason:" + future.failedReason());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void findPeerAddress(PublicKey publicKey, GetPeerAddressListener listener) {
|
||||||
|
final Number160 locationKey = Utils.makeSHAHash(publicKey.getEncoded());
|
||||||
|
FutureGet futureGet = getDataOfProtectedDomain(locationKey, publicKey);
|
||||||
|
log.trace("findPeerAddress called");
|
||||||
|
futureGet.addListener(new BaseFutureAdapter<BaseFuture>() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(BaseFuture baseFuture) throws Exception {
|
||||||
|
if (baseFuture.isSuccess() && futureGet.data() != null) {
|
||||||
|
final Peer peer = (Peer) futureGet.data().object();
|
||||||
|
log.trace("Peer found in DHT. Peer = " + peer);
|
||||||
|
executor.execute(() -> listener.onResult(peer));
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
log.error("getPeerAddress failed. failedReason = " + baseFuture.failedReason());
|
||||||
|
executor.execute(listener::onFailed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Private
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
private Number160 getLocationKey(String currencyCode) {
|
||||||
|
return Number160.createHash(currencyCode + "mailbox");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -18,6 +18,7 @@
|
||||||
package io.bitsquare.p2p.tomp2p;
|
package io.bitsquare.p2p.tomp2p;
|
||||||
|
|
||||||
import io.bitsquare.BitsquareException;
|
import io.bitsquare.BitsquareException;
|
||||||
|
import io.bitsquare.common.handlers.ResultHandler;
|
||||||
import io.bitsquare.p2p.BootstrapState;
|
import io.bitsquare.p2p.BootstrapState;
|
||||||
import io.bitsquare.p2p.ClientNode;
|
import io.bitsquare.p2p.ClientNode;
|
||||||
import io.bitsquare.p2p.ConnectionType;
|
import io.bitsquare.p2p.ConnectionType;
|
||||||
|
@ -29,6 +30,9 @@ import com.google.common.util.concurrent.SettableFuture;
|
||||||
|
|
||||||
import java.security.KeyPair;
|
import java.security.KeyPair;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
import javax.inject.Inject;
|
import javax.inject.Inject;
|
||||||
|
@ -51,6 +55,7 @@ public class TomP2PNode implements ClientNode {
|
||||||
private PeerDHT peerDHT;
|
private PeerDHT peerDHT;
|
||||||
private BootstrappedPeerBuilder bootstrappedPeerBuilder;
|
private BootstrappedPeerBuilder bootstrappedPeerBuilder;
|
||||||
private final Subject<BootstrapState, BootstrapState> bootstrapStateSubject;
|
private final Subject<BootstrapState, BootstrapState> bootstrapStateSubject;
|
||||||
|
private List<ResultHandler> resultHandlers = new CopyOnWriteArrayList<>();
|
||||||
|
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -89,6 +94,7 @@ public class TomP2PNode implements ClientNode {
|
||||||
public void onSuccess(@Nullable PeerDHT peerDHT) {
|
public void onSuccess(@Nullable PeerDHT peerDHT) {
|
||||||
if (peerDHT != null) {
|
if (peerDHT != null) {
|
||||||
TomP2PNode.this.peerDHT = peerDHT;
|
TomP2PNode.this.peerDHT = peerDHT;
|
||||||
|
resultHandlers.stream().forEach(e -> e.handleResult());
|
||||||
bootstrapStateSubject.onCompleted();
|
bootstrapStateSubject.onCompleted();
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
@ -107,10 +113,6 @@ public class TomP2PNode implements ClientNode {
|
||||||
return bootstrapStateSubject.asObservable();
|
return bootstrapStateSubject.asObservable();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Observable<BootstrapState> getBootstrapStateAsObservable() {
|
|
||||||
return bootstrapStateSubject.asObservable();
|
|
||||||
}
|
|
||||||
|
|
||||||
public PeerDHT getPeerDHT() {
|
public PeerDHT getPeerDHT() {
|
||||||
return peerDHT;
|
return peerDHT;
|
||||||
}
|
}
|
||||||
|
@ -145,4 +147,12 @@ public class TomP2PNode implements ClientNode {
|
||||||
public Node getBootstrapNodeAddress() {
|
public Node getBootstrapNodeAddress() {
|
||||||
return bootstrappedPeerBuilder.getBootstrapNode();
|
return bootstrappedPeerBuilder.getBootstrapNode();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void addResultHandler(ResultHandler resultHandler) {
|
||||||
|
resultHandlers.add(resultHandler);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void removeResultHandler(ResultHandler resultHandler) {
|
||||||
|
resultHandlers.remove(resultHandler);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
|
|
||||||
package io.bitsquare.p2p.tomp2p;
|
package io.bitsquare.p2p.tomp2p;
|
||||||
|
|
||||||
import io.bitsquare.p2p.BootstrapState;
|
|
||||||
import io.bitsquare.p2p.BaseP2PService;
|
import io.bitsquare.p2p.BaseP2PService;
|
||||||
|
|
||||||
import javax.inject.Inject;
|
import javax.inject.Inject;
|
||||||
|
@ -25,9 +24,6 @@ import javax.inject.Inject;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import rx.Observable;
|
|
||||||
import rx.Subscriber;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* That service delivers direct messaging and DHT functionality from the TomP2P library
|
* That service delivers direct messaging and DHT functionality from the TomP2P library
|
||||||
|
@ -39,8 +35,6 @@ import rx.Subscriber;
|
||||||
public class TomP2PService extends BaseP2PService {
|
public class TomP2PService extends BaseP2PService {
|
||||||
private static final Logger log = LoggerFactory.getLogger(TomP2PService.class);
|
private static final Logger log = LoggerFactory.getLogger(TomP2PService.class);
|
||||||
|
|
||||||
private final Subscriber<BootstrapState> subscriber;
|
|
||||||
|
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
// Constructor
|
// Constructor
|
||||||
|
@ -48,25 +42,9 @@ public class TomP2PService extends BaseP2PService {
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public TomP2PService(TomP2PNode tomP2PNode) {
|
public TomP2PService(TomP2PNode tomP2PNode) {
|
||||||
Observable<BootstrapState> bootstrapStateAsObservable = tomP2PNode.getBootstrapStateAsObservable();
|
tomP2PNode.addResultHandler(() -> {
|
||||||
subscriber = new Subscriber<BootstrapState>() {
|
peerDHT = tomP2PNode.getPeerDHT();
|
||||||
@Override
|
bootstrapCompleted();
|
||||||
public void onCompleted() {
|
});
|
||||||
executor.execute(() -> {
|
|
||||||
peerDHT = tomP2PNode.getPeerDHT();
|
|
||||||
subscriber.unsubscribe();
|
|
||||||
bootstrapCompleted();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onError(Throwable throwable) {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onNext(BootstrapState bootstrapState) {
|
|
||||||
}
|
|
||||||
};
|
|
||||||
bootstrapStateAsObservable.subscribe(subscriber);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,7 +33,7 @@
|
||||||
<!-- <logger name="io.bitsquare.persistence.Persistence" level="ERROR"/>-->
|
<!-- <logger name="io.bitsquare.persistence.Persistence" level="ERROR"/>-->
|
||||||
<logger name="io.bitsquare.locale.BSResources" level="ERROR"/>
|
<logger name="io.bitsquare.locale.BSResources" level="ERROR"/>
|
||||||
|
|
||||||
<logger name="org.bitcoinj" level="TRACE"/>
|
<logger name="org.bitcoinj" level="WARN"/>
|
||||||
<logger name="net.tomp2p" level="INFO"/>
|
<logger name="net.tomp2p" level="INFO"/>
|
||||||
|
|
||||||
<logger name="org.bitcoinj.core.BitcoinSerializer" level="WARN"/>
|
<logger name="org.bitcoinj.core.BitcoinSerializer" level="WARN"/>
|
||||||
|
|
|
@ -108,7 +108,7 @@ public class PlaceOfferProtocolTest {
|
||||||
() -> {
|
() -> {
|
||||||
log.trace("message completed");
|
log.trace("message completed");
|
||||||
|
|
||||||
offerBookService = new TomP2POfferBookService(tomP2PNode);
|
offerBookService = new TomP2POfferBookService(tomP2PNode, user);
|
||||||
offerBookService.setExecutor(Threading.SAME_THREAD);
|
offerBookService.setExecutor(Threading.SAME_THREAD);
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue