From 919e31f0d5df102b6eca87ddec2c5abafad52f18 Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Mon, 20 Apr 2015 20:35:39 +0200 Subject: [PATCH] Add handling for open network requests at shutdown --- .../main/java/io/bitsquare/app/Version.java | 6 +-- .../tomp2p/TomP2PArbitratorService.java | 5 +++ .../main/java/io/bitsquare/crypto/Bucket.java | 38 ------------------- .../java/io/bitsquare/p2p/BaseP2PService.java | 19 ++++++++++ .../java/io/bitsquare/p2p/MailboxService.java | 1 + .../p2p/tomp2p/TomP2PMailboxService.java | 4 ++ .../p2p/tomp2p/TomP2PMessageService.java | 21 +++++----- .../io/bitsquare/p2p/tomp2p/TomP2PModule.java | 3 ++ .../java/io/bitsquare/storage/Storage.java | 3 +- .../offer/tomp2p/TomP2POfferBookService.java | 36 +++++++++++++++++- 10 files changed, 81 insertions(+), 55 deletions(-) delete mode 100644 core/src/main/java/io/bitsquare/crypto/Bucket.java diff --git a/core/src/main/java/io/bitsquare/app/Version.java b/core/src/main/java/io/bitsquare/app/Version.java index c400fd904b..3482ee0ae5 100644 --- a/core/src/main/java/io/bitsquare/app/Version.java +++ b/core/src/main/java/io/bitsquare/app/Version.java @@ -27,9 +27,9 @@ public class Version { public static final int MINOR_VERSION = 2; public static final int PATCH_VERSION = 1; - public static final long NETWORK_PROTOCOL_VERSION = 1; - public static final long LOCAL_DB_VERSION = 1; - public static final String VERSION = MAJOR_VERSION + "." + MINOR_VERSION + "." + PATCH_VERSION; + // If objects are used for both network and database the network version is applied. + public static final long NETWORK_PROTOCOL_VERSION = 1; + public static final long LOCAL_DB_VERSION = 1; } diff --git a/core/src/main/java/io/bitsquare/arbitration/tomp2p/TomP2PArbitratorService.java b/core/src/main/java/io/bitsquare/arbitration/tomp2p/TomP2PArbitratorService.java index 6096ccfbbc..a8308f6a1d 100644 --- a/core/src/main/java/io/bitsquare/arbitration/tomp2p/TomP2PArbitratorService.java +++ b/core/src/main/java/io/bitsquare/arbitration/tomp2p/TomP2PArbitratorService.java @@ -61,10 +61,12 @@ public class TomP2PArbitratorService extends TomP2PDHTService implements Arbitra try { final Data arbitratorData = new Data(arbitrator); + openRequestsUp(); FuturePut addFuture = addProtectedDataToMap(LOCATION_KEY, arbitratorData); addFuture.addListener(new BaseFutureAdapter() { @Override public void operationComplete(BaseFuture future) throws Exception { + openRequestsDown(); if (future.isSuccess()) { log.trace("Add arbitrator to DHT was successful. Stored data: [key: " + LOCATION_KEY + ", " + "values: " + arbitratorData + "]"); @@ -84,16 +86,19 @@ public class TomP2PArbitratorService extends TomP2PDHTService implements Arbitra } }); } catch (IOException e) { + openRequestsDown(); e.printStackTrace(); } } public void removeArbitrator(Arbitrator arbitrator) throws IOException { final Data arbitratorData = new Data(arbitrator); + openRequestsUp(); FutureRemove removeFuture = removeProtectedDataFromMap(LOCATION_KEY, arbitratorData); removeFuture.addListener(new BaseFutureAdapter() { @Override public void operationComplete(BaseFuture future) throws Exception { + openRequestsDown(); for (Data arbitratorData : removeFuture.dataMap().values()) { try { Object arbitratorDataObject = arbitratorData.object(); diff --git a/core/src/main/java/io/bitsquare/crypto/Bucket.java b/core/src/main/java/io/bitsquare/crypto/Bucket.java deleted file mode 100644 index 0db05869aa..0000000000 --- a/core/src/main/java/io/bitsquare/crypto/Bucket.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * This file is part of Bitsquare. - * - * Bitsquare is free software: you can redistribute it and/or modify it - * under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or (at - * your option) any later version. - * - * Bitsquare is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public - * License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with Bitsquare. If not, see . - */ - -package io.bitsquare.crypto; - -import io.bitsquare.app.Version; - -import java.io.Serializable; - -import javax.annotation.concurrent.Immutable; - -@Immutable -public class Bucket implements Serializable { - // That object is sent over the wire, so we need to take care of version compatibility. - private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; - - public final byte[] encryptedKey; - public final byte[] encryptedPayload; - - public Bucket(byte[] encryptedKey, byte[] encryptedPayload) { - this.encryptedKey = encryptedKey; - this.encryptedPayload = encryptedPayload; - } -} diff --git a/core/src/main/java/io/bitsquare/p2p/BaseP2PService.java b/core/src/main/java/io/bitsquare/p2p/BaseP2PService.java index 3402a94ba9..fbfb224e33 100644 --- a/core/src/main/java/io/bitsquare/p2p/BaseP2PService.java +++ b/core/src/main/java/io/bitsquare/p2p/BaseP2PService.java @@ -35,6 +35,7 @@ public class BaseP2PService implements P2PService { protected Executor executor; protected PeerDHT peerDHT; + private int openRequests = 0; @Override public void bootstrapCompleted() { @@ -46,7 +47,25 @@ public class BaseP2PService implements P2PService { this.executor = executor; } + + protected void openRequestsUp() { + executor.execute(() -> openRequests++); + } + + protected void openRequestsDown() { + executor.execute(() -> openRequests--); + } + @Override public void shutDown() { + long ts = System.currentTimeMillis(); + // wait max. 10 sec. for open calls to complete + while (openRequests > 0 && (System.currentTimeMillis() - ts) < 10000) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } } } diff --git a/core/src/main/java/io/bitsquare/p2p/MailboxService.java b/core/src/main/java/io/bitsquare/p2p/MailboxService.java index 5e99590e80..ba98f056a1 100644 --- a/core/src/main/java/io/bitsquare/p2p/MailboxService.java +++ b/core/src/main/java/io/bitsquare/p2p/MailboxService.java @@ -29,4 +29,5 @@ public interface MailboxService { void removeAllMessages(ResultHandler resultHandler, FaultHandler faultHandler); + void shutDown(); } diff --git a/core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PMailboxService.java b/core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PMailboxService.java index 026d7679ea..5750a19e5a 100644 --- a/core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PMailboxService.java +++ b/core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PMailboxService.java @@ -83,11 +83,13 @@ public class TomP2PMailboxService extends TomP2PDHTService implements MailboxSer log.trace("Add message to DHT requested. Added data: [locationKey: " + locationKey + ", hash: " + data.hash().toString() + "]"); + openRequestsUp(); FuturePut futurePut = addDataToMapOfProtectedDomain(locationKey, data, pubKeyRing.getDhtSignaturePubKey()); futurePut.addListener(new BaseFutureListener() { @Override public void operationComplete(BaseFuture future) throws Exception { + openRequestsDown(); if (future.isSuccess()) { executor.execute(() -> { log.trace("Add message to mailbox was successful. Added data: [locationKey: " + locationKey + ", value: " + data + "]"); @@ -106,10 +108,12 @@ public class TomP2PMailboxService extends TomP2PDHTService implements MailboxSer @Override public void exceptionCaught(Throwable ex) throws Exception { + openRequestsDown(); executor.execute(() -> faultHandler.handleFault("Add message to mailbox failed.", ex)); } }); } catch (IOException ex) { + openRequestsDown(); executor.execute(() -> faultHandler.handleFault("Add message to mailbox failed.", ex)); } } diff --git a/core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PMessageService.java b/core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PMessageService.java index 607515fb67..4c973c7e34 100644 --- a/core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PMessageService.java +++ b/core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PMessageService.java @@ -52,11 +52,6 @@ public class TomP2PMessageService extends TomP2PService implements MessageServic private final MailboxService mailboxService; private final CryptoService cryptoService; - - /////////////////////////////////////////////////////////////////////////////////////////// - // Constructor - /////////////////////////////////////////////////////////////////////////////////////////// - @Inject public TomP2PMessageService(TomP2PNode tomP2PNode, MailboxService mailboxService, CryptoService cryptoService) { super(tomP2PNode); @@ -71,11 +66,10 @@ public class TomP2PMessageService extends TomP2PService implements MessageServic setupReplyHandler(); } - - /////////////////////////////////////////////////////////////////////////////////////////// - // MessageService implementation - /////////////////////////////////////////////////////////////////////////////////////////// - + @Override + public void shutDown() { + super.shutDown(); + } @Override public void sendEncryptedMessage(Peer peer, PubKeyRing pubKeyRing, Message message, SendMessageListener listener) { @@ -90,11 +84,13 @@ public class TomP2PMessageService extends TomP2PService implements MessageServic try { final Message encryptedMessage = cryptoService.encryptAndSignMessage(pubKeyRing, message); + openRequestsUp(); FutureDirect futureDirect = peerDHT.peer().sendDirect(((TomP2PPeer) peer).getPeerAddress()).object(encryptedMessage).start(); futureDirect.addListener(new BaseFutureListener() { @Override public void operationComplete(BaseFuture future) throws Exception { if (future.isSuccess()) { + openRequestsDown(); log.debug("sendMessage completed"); executor.execute(listener::handleResult); } @@ -116,6 +112,7 @@ public class TomP2PMessageService extends TomP2PService implements MessageServic } ); } catch (Throwable t) { + openRequestsDown(); t.printStackTrace(); log.error(t.getMessage()); executor.execute(listener::handleFault); @@ -129,10 +126,12 @@ public class TomP2PMessageService extends TomP2PService implements MessageServic pubKeyRing, message, () -> { + openRequestsDown(); log.debug("Message successfully added to peers mailbox."); executor.execute(listener::handleResult); }, (errorMessage, throwable) -> { + openRequestsDown(); log.error("Message failed to add to peers mailbox."); executor.execute(listener::handleFault); } @@ -211,6 +210,6 @@ public class TomP2PMessageService extends TomP2PService implements MessageServic } private MessageWithPubKey getDecryptedMessageWithPubKey(SealedAndSignedMessage message) throws CryptoException { - return cryptoService.decryptAndVerifyMessage((SealedAndSignedMessage) message); + return cryptoService.decryptAndVerifyMessage(message); } } diff --git a/core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PModule.java b/core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PModule.java index 6d044d0bda..6d0fbfd219 100644 --- a/core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PModule.java +++ b/core/src/main/java/io/bitsquare/p2p/tomp2p/TomP2PModule.java @@ -79,5 +79,8 @@ public class TomP2PModule extends P2PModule { // First shut down AddressService to remove address from DHT injector.getInstance(AddressService.class).shutDown(); injector.getInstance(BootstrappedPeerBuilder.class).shutDown(); + injector.getInstance(MailboxService.class).shutDown(); + injector.getInstance(MessageService.class).shutDown(); + } } \ No newline at end of file diff --git a/core/src/main/java/io/bitsquare/storage/Storage.java b/core/src/main/java/io/bitsquare/storage/Storage.java index db2257c496..2773189ddb 100644 --- a/core/src/main/java/io/bitsquare/storage/Storage.java +++ b/core/src/main/java/io/bitsquare/storage/Storage.java @@ -22,6 +22,7 @@ import com.google.common.base.Throwables; import java.io.File; import java.io.IOException; import java.io.InvalidClassException; +import java.io.InvalidObjectException; import java.io.Serializable; import java.util.concurrent.TimeUnit; @@ -130,7 +131,7 @@ public class Storage { log.info("Backup {} completed in {}msec", serializable.getClass().getSimpleName(), System.currentTimeMillis() - now); return persistedObject; - } catch (InvalidClassException | ClassCastException | ClassNotFoundException e) { + } catch (InvalidClassException | InvalidObjectException | ClassCastException | ClassNotFoundException e) { e.printStackTrace(); log.error("Version of persisted class has changed. We cannot read the persisted data anymore. We make a backup and remove the inconsistent " + "file."); diff --git a/core/src/main/java/io/bitsquare/trade/offer/tomp2p/TomP2POfferBookService.java b/core/src/main/java/io/bitsquare/trade/offer/tomp2p/TomP2POfferBookService.java index 8383c1d217..e7d866fd9b 100644 --- a/core/src/main/java/io/bitsquare/trade/offer/tomp2p/TomP2POfferBookService.java +++ b/core/src/main/java/io/bitsquare/trade/offer/tomp2p/TomP2POfferBookService.java @@ -56,12 +56,16 @@ public class TomP2POfferBookService extends TomP2PDHTService implements OfferBoo private final List offerRepositoryListeners = new ArrayList<>(); private final LongProperty invalidationTimestamp = new SimpleLongProperty(0); - @Inject public TomP2POfferBookService(TomP2PNode tomP2PNode, KeyRing keyRing) { super(tomP2PNode, keyRing); } + @Override + public void shutDown() { + super.shutDown(); + } + @Override public void addOffer(Offer offer, ResultHandler resultHandler, FaultHandler faultHandler) { log.debug("addOffer " + offer); @@ -71,10 +75,12 @@ public class TomP2POfferBookService extends TomP2PDHTService implements OfferBoo offerData.ttlSeconds(TTL); log.trace("Add offer to DHT requested. Added data: [locationKey: " + locationKey + ", hash: " + offerData.hash().toString() + "]"); + openRequestsUp(); FuturePut futurePut = addProtectedDataToMap(locationKey, offerData); futurePut.addListener(new BaseFutureListener() { @Override public void operationComplete(BaseFuture future) throws Exception { + openRequestsDown(); if (future.isSuccess()) { executor.execute(() -> { resultHandler.handleResult(); @@ -100,10 +106,12 @@ public class TomP2POfferBookService extends TomP2PDHTService implements OfferBoo @Override public void exceptionCaught(Throwable ex) throws Exception { + openRequestsDown(); executor.execute(() -> faultHandler.handleFault("Failed to add offer to DHT", ex)); } }); } catch (IOException ex) { + openRequestsDown(); executor.execute(() -> faultHandler.handleFault("Failed to add offer to DHT", ex)); } } @@ -115,10 +123,12 @@ public class TomP2POfferBookService extends TomP2PDHTService implements OfferBoo final Data offerData = new Data(offer); log.trace("Remove offer from DHT requested. Removed data: [locationKey: " + locationKey + ", hash: " + offerData.hash().toString() + "]"); + openRequestsUp(); FutureRemove futureRemove = removeProtectedDataFromMap(locationKey, offerData); futureRemove.addListener(new BaseFutureListener() { @Override public void operationComplete(BaseFuture future) throws Exception { + openRequestsDown(); // We don't test futureRemove.isSuccess() as this API does not fit well to that operation, // it might change in future to something like foundAndRemoved and notFound // See discussion at: https://github.com/tomp2p/TomP2P/issues/57#issuecomment-62069840 @@ -146,11 +156,13 @@ public class TomP2POfferBookService extends TomP2PDHTService implements OfferBoo @Override public void exceptionCaught(Throwable t) throws Exception { + openRequestsDown(); log.error("Remove offer from DHT failed. Error: " + t.getMessage()); faultHandler.handleFault("Remove offer from DHT failed. Error: " + t.getMessage(), t); } }); } catch (IOException e) { + openRequestsDown(); e.printStackTrace(); log.error("Remove offer from DHT failed. Error: " + e.getMessage()); faultHandler.handleFault("Remove offer from DHT failed. Error: " + e.getMessage(), e); @@ -164,11 +176,26 @@ public class TomP2POfferBookService extends TomP2PDHTService implements OfferBoo final Data offerData = new Data(offer); log.trace("Remove offer from DHT requested. Removed data: [locationKey: " + locationKey + ", hash: " + offerData.hash().toString() + "]"); + openRequestsUp(); FutureRemove futureRemove = removeProtectedDataFromMap(locationKey, offerData); writeInvalidationTimestampToDHT(offer.getCurrencyCode()); - futureRemove.awaitUninterruptibly(1000); + futureRemove.addListener(new BaseFutureListener() { + @Override + public void operationComplete(BaseFuture future) throws Exception { + openRequestsDown(); + log.trace("isRemoved? " + futureRemove.isRemoved()); + } + + @Override + public void exceptionCaught(Throwable t) throws Exception { + openRequestsDown(); + log.error("Remove offer from DHT failed. Error: " + t.getMessage()); + } + }); + log.trace("isRemoved? " + futureRemove.isRemoved()); } catch (IOException e) { + openRequestsDown(); e.printStackTrace(); log.error("Remove offer from DHT failed. Error: " + e.getMessage()); } @@ -195,6 +222,7 @@ public class TomP2POfferBookService extends TomP2PDHTService implements OfferBoo } } catch (ClassNotFoundException | IOException e) { e.printStackTrace(); + log.warn(e.getMessage()); } } log.trace("Get offers with offers.size(): " + offers.size()); @@ -239,11 +267,13 @@ public class TomP2POfferBookService extends TomP2PDHTService implements OfferBoo private void writeInvalidationTimestampToDHT(String currencyCode) { invalidationTimestamp.set(System.currentTimeMillis()); try { + openRequestsUp(); FuturePut putFuture = putData(getInvalidatedLocationKey(currencyCode), new Data(invalidationTimestamp.get())); putFuture.addListener(new BaseFutureListener() { @Override public void operationComplete(BaseFuture future) throws Exception { + openRequestsDown(); if (future.isSuccess()) log.trace("Update invalidationTimestamp to DHT was successful. TimeStamp=" + invalidationTimestamp.get()); @@ -253,10 +283,12 @@ public class TomP2POfferBookService extends TomP2PDHTService implements OfferBoo @Override public void exceptionCaught(Throwable t) throws Exception { + openRequestsDown(); log.error("Update invalidationTimestamp to DHT failed with exception:" + t.getMessage()); } }); } catch (IOException e) { + openRequestsDown(); log.error("Update invalidationTimestamp to DHT failed with exception:" + e.getMessage()); } }