Add handling for open network requests at shutdown

This commit is contained in:
Manfred Karrer 2015-04-20 20:35:39 +02:00
parent 63c9b4670f
commit 919e31f0d5
10 changed files with 81 additions and 55 deletions

View File

@ -27,9 +27,9 @@ public class Version {
public static final int MINOR_VERSION = 2;
public static final int PATCH_VERSION = 1;
public static final long NETWORK_PROTOCOL_VERSION = 1;
public static final long LOCAL_DB_VERSION = 1;
public static final String VERSION = MAJOR_VERSION + "." + MINOR_VERSION + "." + PATCH_VERSION;
// If objects are used for both network and database the network version is applied.
public static final long NETWORK_PROTOCOL_VERSION = 1;
public static final long LOCAL_DB_VERSION = 1;
}

View File

@ -61,10 +61,12 @@ public class TomP2PArbitratorService extends TomP2PDHTService implements Arbitra
try {
final Data arbitratorData = new Data(arbitrator);
openRequestsUp();
FuturePut addFuture = addProtectedDataToMap(LOCATION_KEY, arbitratorData);
addFuture.addListener(new BaseFutureAdapter<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
openRequestsDown();
if (future.isSuccess()) {
log.trace("Add arbitrator to DHT was successful. Stored data: [key: " + LOCATION_KEY + ", " +
"values: " + arbitratorData + "]");
@ -84,16 +86,19 @@ public class TomP2PArbitratorService extends TomP2PDHTService implements Arbitra
}
});
} catch (IOException e) {
openRequestsDown();
e.printStackTrace();
}
}
public void removeArbitrator(Arbitrator arbitrator) throws IOException {
final Data arbitratorData = new Data(arbitrator);
openRequestsUp();
FutureRemove removeFuture = removeProtectedDataFromMap(LOCATION_KEY, arbitratorData);
removeFuture.addListener(new BaseFutureAdapter<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
openRequestsDown();
for (Data arbitratorData : removeFuture.dataMap().values()) {
try {
Object arbitratorDataObject = arbitratorData.object();

View File

@ -1,38 +0,0 @@
/*
* This file is part of Bitsquare.
*
* Bitsquare is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bitsquare is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.crypto;
import io.bitsquare.app.Version;
import java.io.Serializable;
import javax.annotation.concurrent.Immutable;
@Immutable
public class Bucket implements Serializable {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
public final byte[] encryptedKey;
public final byte[] encryptedPayload;
public Bucket(byte[] encryptedKey, byte[] encryptedPayload) {
this.encryptedKey = encryptedKey;
this.encryptedPayload = encryptedPayload;
}
}

View File

@ -35,6 +35,7 @@ public class BaseP2PService implements P2PService {
protected Executor executor;
protected PeerDHT peerDHT;
private int openRequests = 0;
@Override
public void bootstrapCompleted() {
@ -46,7 +47,25 @@ public class BaseP2PService implements P2PService {
this.executor = executor;
}
protected void openRequestsUp() {
executor.execute(() -> openRequests++);
}
protected void openRequestsDown() {
executor.execute(() -> openRequests--);
}
@Override
public void shutDown() {
long ts = System.currentTimeMillis();
// wait max. 10 sec. for open calls to complete
while (openRequests > 0 && (System.currentTimeMillis() - ts) < 10000) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

View File

@ -29,4 +29,5 @@ public interface MailboxService {
void removeAllMessages(ResultHandler resultHandler, FaultHandler faultHandler);
void shutDown();
}

View File

@ -83,11 +83,13 @@ public class TomP2PMailboxService extends TomP2PDHTService implements MailboxSer
log.trace("Add message to DHT requested. Added data: [locationKey: " + locationKey +
", hash: " + data.hash().toString() + "]");
openRequestsUp();
FuturePut futurePut = addDataToMapOfProtectedDomain(locationKey,
data, pubKeyRing.getDhtSignaturePubKey());
futurePut.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
openRequestsDown();
if (future.isSuccess()) {
executor.execute(() -> {
log.trace("Add message to mailbox was successful. Added data: [locationKey: " + locationKey + ", value: " + data + "]");
@ -106,10 +108,12 @@ public class TomP2PMailboxService extends TomP2PDHTService implements MailboxSer
@Override
public void exceptionCaught(Throwable ex) throws Exception {
openRequestsDown();
executor.execute(() -> faultHandler.handleFault("Add message to mailbox failed.", ex));
}
});
} catch (IOException ex) {
openRequestsDown();
executor.execute(() -> faultHandler.handleFault("Add message to mailbox failed.", ex));
}
}

View File

@ -52,11 +52,6 @@ public class TomP2PMessageService extends TomP2PService implements MessageServic
private final MailboxService mailboxService;
private final CryptoService<MailboxMessage> cryptoService;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
@Inject
public TomP2PMessageService(TomP2PNode tomP2PNode, MailboxService mailboxService, CryptoService<MailboxMessage> cryptoService) {
super(tomP2PNode);
@ -71,11 +66,10 @@ public class TomP2PMessageService extends TomP2PService implements MessageServic
setupReplyHandler();
}
///////////////////////////////////////////////////////////////////////////////////////////
// MessageService implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void shutDown() {
super.shutDown();
}
@Override
public void sendEncryptedMessage(Peer peer, PubKeyRing pubKeyRing, Message message, SendMessageListener listener) {
@ -90,11 +84,13 @@ public class TomP2PMessageService extends TomP2PService implements MessageServic
try {
final Message encryptedMessage = cryptoService.encryptAndSignMessage(pubKeyRing, message);
openRequestsUp();
FutureDirect futureDirect = peerDHT.peer().sendDirect(((TomP2PPeer) peer).getPeerAddress()).object(encryptedMessage).start();
futureDirect.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
if (future.isSuccess()) {
openRequestsDown();
log.debug("sendMessage completed");
executor.execute(listener::handleResult);
}
@ -116,6 +112,7 @@ public class TomP2PMessageService extends TomP2PService implements MessageServic
}
);
} catch (Throwable t) {
openRequestsDown();
t.printStackTrace();
log.error(t.getMessage());
executor.execute(listener::handleFault);
@ -129,10 +126,12 @@ public class TomP2PMessageService extends TomP2PService implements MessageServic
pubKeyRing,
message,
() -> {
openRequestsDown();
log.debug("Message successfully added to peers mailbox.");
executor.execute(listener::handleResult);
},
(errorMessage, throwable) -> {
openRequestsDown();
log.error("Message failed to add to peers mailbox.");
executor.execute(listener::handleFault);
}
@ -211,6 +210,6 @@ public class TomP2PMessageService extends TomP2PService implements MessageServic
}
private MessageWithPubKey getDecryptedMessageWithPubKey(SealedAndSignedMessage message) throws CryptoException {
return cryptoService.decryptAndVerifyMessage((SealedAndSignedMessage) message);
return cryptoService.decryptAndVerifyMessage(message);
}
}

View File

@ -79,5 +79,8 @@ public class TomP2PModule extends P2PModule {
// First shut down AddressService to remove address from DHT
injector.getInstance(AddressService.class).shutDown();
injector.getInstance(BootstrappedPeerBuilder.class).shutDown();
injector.getInstance(MailboxService.class).shutDown();
injector.getInstance(MessageService.class).shutDown();
}
}

View File

@ -22,6 +22,7 @@ import com.google.common.base.Throwables;
import java.io.File;
import java.io.IOException;
import java.io.InvalidClassException;
import java.io.InvalidObjectException;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
@ -130,7 +131,7 @@ public class Storage<T extends Serializable> {
log.info("Backup {} completed in {}msec", serializable.getClass().getSimpleName(), System.currentTimeMillis() - now);
return persistedObject;
} catch (InvalidClassException | ClassCastException | ClassNotFoundException e) {
} catch (InvalidClassException | InvalidObjectException | ClassCastException | ClassNotFoundException e) {
e.printStackTrace();
log.error("Version of persisted class has changed. We cannot read the persisted data anymore. We make a backup and remove the inconsistent " +
"file.");

View File

@ -56,12 +56,16 @@ public class TomP2POfferBookService extends TomP2PDHTService implements OfferBoo
private final List<Listener> offerRepositoryListeners = new ArrayList<>();
private final LongProperty invalidationTimestamp = new SimpleLongProperty(0);
@Inject
public TomP2POfferBookService(TomP2PNode tomP2PNode, KeyRing keyRing) {
super(tomP2PNode, keyRing);
}
@Override
public void shutDown() {
super.shutDown();
}
@Override
public void addOffer(Offer offer, ResultHandler resultHandler, FaultHandler faultHandler) {
log.debug("addOffer " + offer);
@ -71,10 +75,12 @@ public class TomP2POfferBookService extends TomP2PDHTService implements OfferBoo
offerData.ttlSeconds(TTL);
log.trace("Add offer to DHT requested. Added data: [locationKey: " + locationKey +
", hash: " + offerData.hash().toString() + "]");
openRequestsUp();
FuturePut futurePut = addProtectedDataToMap(locationKey, offerData);
futurePut.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
openRequestsDown();
if (future.isSuccess()) {
executor.execute(() -> {
resultHandler.handleResult();
@ -100,10 +106,12 @@ public class TomP2POfferBookService extends TomP2PDHTService implements OfferBoo
@Override
public void exceptionCaught(Throwable ex) throws Exception {
openRequestsDown();
executor.execute(() -> faultHandler.handleFault("Failed to add offer to DHT", ex));
}
});
} catch (IOException ex) {
openRequestsDown();
executor.execute(() -> faultHandler.handleFault("Failed to add offer to DHT", ex));
}
}
@ -115,10 +123,12 @@ public class TomP2POfferBookService extends TomP2PDHTService implements OfferBoo
final Data offerData = new Data(offer);
log.trace("Remove offer from DHT requested. Removed data: [locationKey: " + locationKey +
", hash: " + offerData.hash().toString() + "]");
openRequestsUp();
FutureRemove futureRemove = removeProtectedDataFromMap(locationKey, offerData);
futureRemove.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
openRequestsDown();
// We don't test futureRemove.isSuccess() as this API does not fit well to that operation,
// it might change in future to something like foundAndRemoved and notFound
// See discussion at: https://github.com/tomp2p/TomP2P/issues/57#issuecomment-62069840
@ -146,11 +156,13 @@ public class TomP2POfferBookService extends TomP2PDHTService implements OfferBoo
@Override
public void exceptionCaught(Throwable t) throws Exception {
openRequestsDown();
log.error("Remove offer from DHT failed. Error: " + t.getMessage());
faultHandler.handleFault("Remove offer from DHT failed. Error: " + t.getMessage(), t);
}
});
} catch (IOException e) {
openRequestsDown();
e.printStackTrace();
log.error("Remove offer from DHT failed. Error: " + e.getMessage());
faultHandler.handleFault("Remove offer from DHT failed. Error: " + e.getMessage(), e);
@ -164,11 +176,26 @@ public class TomP2POfferBookService extends TomP2PDHTService implements OfferBoo
final Data offerData = new Data(offer);
log.trace("Remove offer from DHT requested. Removed data: [locationKey: " + locationKey +
", hash: " + offerData.hash().toString() + "]");
openRequestsUp();
FutureRemove futureRemove = removeProtectedDataFromMap(locationKey, offerData);
writeInvalidationTimestampToDHT(offer.getCurrencyCode());
futureRemove.awaitUninterruptibly(1000);
futureRemove.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
openRequestsDown();
log.trace("isRemoved? " + futureRemove.isRemoved());
}
@Override
public void exceptionCaught(Throwable t) throws Exception {
openRequestsDown();
log.error("Remove offer from DHT failed. Error: " + t.getMessage());
}
});
log.trace("isRemoved? " + futureRemove.isRemoved());
} catch (IOException e) {
openRequestsDown();
e.printStackTrace();
log.error("Remove offer from DHT failed. Error: " + e.getMessage());
}
@ -195,6 +222,7 @@ public class TomP2POfferBookService extends TomP2PDHTService implements OfferBoo
}
} catch (ClassNotFoundException | IOException e) {
e.printStackTrace();
log.warn(e.getMessage());
}
}
log.trace("Get offers with offers.size(): " + offers.size());
@ -239,11 +267,13 @@ public class TomP2POfferBookService extends TomP2PDHTService implements OfferBoo
private void writeInvalidationTimestampToDHT(String currencyCode) {
invalidationTimestamp.set(System.currentTimeMillis());
try {
openRequestsUp();
FuturePut putFuture = putData(getInvalidatedLocationKey(currencyCode),
new Data(invalidationTimestamp.get()));
putFuture.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
openRequestsDown();
if (future.isSuccess())
log.trace("Update invalidationTimestamp to DHT was successful. TimeStamp=" +
invalidationTimestamp.get());
@ -253,10 +283,12 @@ public class TomP2POfferBookService extends TomP2PDHTService implements OfferBoo
@Override
public void exceptionCaught(Throwable t) throws Exception {
openRequestsDown();
log.error("Update invalidationTimestamp to DHT failed with exception:" + t.getMessage());
}
});
} catch (IOException e) {
openRequestsDown();
log.error("Update invalidationTimestamp to DHT failed with exception:" + e.getMessage());
}
}