Run reformat and organize imports

This commit is contained in:
Manfred Karrer 2016-01-21 02:48:44 +01:00
parent f723bf5737
commit cb685d3b5c
73 changed files with 802 additions and 829 deletions

View file

@ -25,7 +25,8 @@ public class FileUtilities {
/**
* Closes both input and output streams when done.
* @param in Stream to read from
*
* @param in Stream to read from
* @param out Stream to write to
* @throws java.io.IOException - If close on input or output fails
*/
@ -39,7 +40,8 @@ public class FileUtilities {
/**
* Won't close the input stream when it's done, needed to handle ZipInputStreams
* @param in Won't be closed
*
* @param in Won't be closed
* @param out Will be closed
* @throws java.io.IOException - If close on output fails
*/
@ -84,7 +86,8 @@ public class FileUtilities {
/**
* Reads the input stream, deletes fileToWriteTo if it exists and over writes it with the stream.
* @param readFrom Stream to read from
*
* @param readFrom Stream to read from
* @param fileToWriteTo File to write to
* @throws java.io.IOException - If any of the file operations fail
*/
@ -110,8 +113,9 @@ public class FileUtilities {
/**
* This has to exist somewhere! Why isn't it a part of the standard Java library?
*
* @param destinationDirectory Directory files are to be extracted to
* @param zipFileInputStream Stream to unzip
* @param zipFileInputStream Stream to unzip
* @throws java.io.IOException - If there are any file errors
*/
public static void extractContentFromZip(File destinationDirectory, InputStream zipFileInputStream)

View file

@ -80,8 +80,7 @@ abstract public class OnionProxyContext {
/**
* Sets environment variables and working directory needed for Tor
*
* @param processBuilder
* we will call start on this to run Tor
* @param processBuilder we will call start on this to run Tor
*/
void setEnvironmentArgsAndWorkingDirectoryForStart(ProcessBuilder processBuilder) {
processBuilder.directory(getWorkingDirectory());
@ -164,7 +163,7 @@ abstract public class OnionProxyContext {
* Files we pull out of the AAR or JAR are typically at the root but for
* executables outside of Android the executable for a particular platform
* is in a specific sub-directory.
*
*
* @return Path to executable in JAR Resources
*/
protected abstract String getPathToTorExecutable();

View file

@ -50,11 +50,11 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
* Note that you will most likely need to actually call into the
* AndroidOnionProxyManager or JavaOnionProxyManager in order to create the
* right bindings for your environment.
* <p/>
* <p>
* This class is thread safe but that's mostly because we hit everything over
* the head with 'synchronized'. Given the way this class is used there
* shouldn't be any performance implications of this.
* <p/>
* <p>
* This class began life as TorPlugin from the Briar Project
*/
public abstract class OnionProxyManager {
@ -98,17 +98,13 @@ public abstract class OnionProxyManager {
* given time for bootstrap to finish and if it doesn't then will restart
* the bootstrap process the given number of repeats.
*
* @param secondsBeforeTimeOut
* Seconds to wait for boot strapping to finish
* @param numberOfRetries
* Number of times to try recycling the Tor OP before giving up
* on bootstrapping working
* @param secondsBeforeTimeOut Seconds to wait for boot strapping to finish
* @param numberOfRetries Number of times to try recycling the Tor OP before giving up
* on bootstrapping working
* @return True if bootstrap succeeded, false if there is a problem or the
* bootstrap couldn't complete in the given time.
* @throws java.lang.InterruptedException
* - You know, if we are interrupted
* @throws java.io.IOException
* - IO Exceptions
* bootstrap couldn't complete in the given time.
* @throws java.lang.InterruptedException - You know, if we are interrupted
* @throws java.io.IOException - IO Exceptions
*/
public synchronized boolean startWithRepeat(int secondsBeforeTimeOut, int numberOfRetries)
throws InterruptedException, IOException {
@ -165,8 +161,7 @@ public abstract class OnionProxyManager {
* listening on
*
* @return Discovered socks port
* @throws java.io.IOException
* - File errors
* @throws java.io.IOException - File errors
*/
public synchronized int getIPv4LocalHostSocksPort() throws IOException {
if (isRunning() == false) {
@ -191,14 +186,11 @@ public abstract class OnionProxyManager {
/**
* Publishes a hidden service
*
* @param hiddenServicePort
* The port that the hidden service will accept connections on
* @param localPort
* The local port that the hidden service will relay connections
* to
* @param hiddenServicePort The port that the hidden service will accept connections on
* @param localPort The local port that the hidden service will relay connections
* to
* @return The hidden service's onion address in the form X.onion.
* @throws java.io.IOException
* - File errors
* @throws java.io.IOException - File errors
*/
public synchronized String publishHiddenService(int hiddenServicePort, int localPort) throws IOException {
if (controlConnection == null) {
@ -283,8 +275,7 @@ public abstract class OnionProxyManager {
* going to work until you either call startWithRepeat or
* installAndStartTorOp
*
* @throws java.io.IOException
* - File errors
* @throws java.io.IOException - File errors
*/
public synchronized void stop() throws IOException {
try {
@ -308,8 +299,7 @@ public abstract class OnionProxyManager {
* to network connections.
*
* @return True if running
* @throws java.io.IOException
* - IO exceptions
* @throws java.io.IOException - IO exceptions
*/
public synchronized boolean isRunning() throws IOException {
return isBootstrapped() && isNetworkEnabled();
@ -318,11 +308,9 @@ public abstract class OnionProxyManager {
/**
* Tells the Tor OP if it should accept network connections
*
* @param enable
* If true then the Tor OP will accept SOCKS connections,
* otherwise not.
* @throws java.io.IOException
* - IO exceptions
* @param enable If true then the Tor OP will accept SOCKS connections,
* otherwise not.
* @throws java.io.IOException - IO exceptions
*/
public synchronized void enableNetwork(boolean enable) throws IOException {
if (controlConnection == null) {
@ -336,9 +324,8 @@ public abstract class OnionProxyManager {
* Specifies if Tor OP is accepting network connections
*
* @return True if network is enabled (that doesn't mean that the device is
* online, only that the Tor OP is trying to connect to the network)
* @throws java.io.IOException
* - IO exceptions
* online, only that the Tor OP is trying to connect to the network)
* @throws java.io.IOException - IO exceptions
*/
public synchronized boolean isNetworkEnabled() throws IOException {
if (controlConnection == null) {
@ -391,10 +378,8 @@ public abstract class OnionProxyManager {
* to actually connect it to the network.
*
* @return True if all files installed and Tor OP successfully started
* @throws java.io.IOException
* - IO Exceptions
* @throws java.lang.InterruptedException
* - If we are, well, interrupted
* @throws java.io.IOException - IO Exceptions
* @throws java.lang.InterruptedException - If we are, well, interrupted
*/
public synchronized boolean installAndStartTorOp() throws IOException, InterruptedException {
// The Tor OP will die if it looses the connection to its socket so if
@ -598,8 +583,7 @@ public abstract class OnionProxyManager {
/**
* Alas old versions of Android do not support setExecutable.
*
* @param f
* File to make executable
* @param f File to make executable
* @return True if it worked, otherwise false.
*/
protected abstract boolean setExecutable(File f);

View file

@ -34,6 +34,7 @@ import java.util.Scanner;
public class OsData {
public enum OsType {Windows, Linux32, Linux64, Mac, Android}
private static OsType detectedType = null;
public static OsType getOsType() {
@ -50,7 +51,7 @@ public class OsData {
protected static OsType actualGetOsType() {
//This also works for ART
if (System.getProperty("java.vm.name").contains("Dalvik")) {
if (System.getProperty("java.vm.name").contains("Dalvik")) {
return OsType.Android;
}

View file

@ -22,8 +22,9 @@ import java.util.concurrent.TimeUnit;
public interface WriteObserver {
/**
* Waits timeout of unit to see if file is modified
*
* @param timeout How long to wait before returning
* @param unit Unit to wait in
* @param unit Unit to wait in
* @return True if file was modified, false if it was not
*/
boolean poll(long timeout, TimeUnit unit);

View file

@ -18,212 +18,212 @@ import java.util.concurrent.atomic.AtomicBoolean;
public abstract class Connection implements Closeable {
private static final Logger log = LoggerFactory.getLogger(Connection.class);
private static final Logger log = LoggerFactory.getLogger(Connection.class);
private final Socket socket;
private final ObjectOutputStream out;
private final ObjectInputStream in;
private final LinkedList<ConnectionListener> connectionListeners;
private final String peer;
private boolean running;
private final AtomicBoolean available;
private final AtomicBoolean listening;
private final Socket socket;
private final ObjectOutputStream out;
private final ObjectInputStream in;
private final LinkedList<ConnectionListener> connectionListeners;
private final String peer;
private boolean running;
private final AtomicBoolean available;
private final AtomicBoolean listening;
private final ExecutorService executorService;
private final InputStreamListener inputStreamListener;
private final ExecutorService executorService;
private final InputStreamListener inputStreamListener;
private final AtomicBoolean heartBeating;
private final AtomicBoolean heartBeating;
public Connection(String peer, Socket socket) throws IOException {
this(peer, socket, Node.prepareOOSForSocket(socket), new ObjectInputStream(socket.getInputStream()));
}
Connection(String peer, Socket socket, ObjectOutputStream out, ObjectInputStream in) {
log.debug("Initiating new connection");
this.available = new AtomicBoolean(false);
this.peer = peer;
this.socket = socket;
this.in = in;
this.out = out;
running = true;
listening = new AtomicBoolean(false);
heartBeating = new AtomicBoolean(false);
this.connectionListeners = new LinkedList<>();
this.inputStreamListener = new InputStreamListener();
executorService = Executors.newCachedThreadPool();
}
public abstract boolean isIncoming();
public void addMessageListener(ConnectionListener listener) {
synchronized (connectionListeners) {
connectionListeners.add(listener);
public Connection(String peer, Socket socket) throws IOException {
this(peer, socket, Node.prepareOOSForSocket(socket), new ObjectInputStream(socket.getInputStream()));
}
}
protected void setConnectionListeners(Collection<ConnectionListener> listeners) {
synchronized (listeners) {
this.connectionListeners.clear();
this.connectionListeners.addAll(listeners);
Connection(String peer, Socket socket, ObjectOutputStream out, ObjectInputStream in) {
log.debug("Initiating new connection");
this.available = new AtomicBoolean(false);
this.peer = peer;
this.socket = socket;
this.in = in;
this.out = out;
running = true;
listening = new AtomicBoolean(false);
heartBeating = new AtomicBoolean(false);
this.connectionListeners = new LinkedList<>();
this.inputStreamListener = new InputStreamListener();
executorService = Executors.newCachedThreadPool();
}
}
public void removeMessageListener(ConnectionListener listener) {
synchronized (connectionListeners) {
connectionListeners.remove(listener);
}
}
public abstract boolean isIncoming();
void sendMsg(Message msg) throws IOException {
out.writeObject(msg);
out.flush();
}
public void sendMessage(ContainerMessage msg) throws IOException {
if (!available.get())
throw new IOException("Connection is not yet available!");
sendMsg(msg);
}
protected void onMessage(Message msg) throws IOException {
log.debug("RXD: " + msg.toString());
if (msg instanceof ContainerMessage) {
synchronized (connectionListeners) {
for (ConnectionListener l : connectionListeners)
l.onMessage(this, (ContainerMessage) msg);
}
} else {
if (msg instanceof ControlMessage) {
switch ((ControlMessage) msg) {
case DISCONNECT:
close(false, PredefinedDisconnectReason.createReason(PredefinedDisconnectReason.CONNECTION_CLOSED, true));
break;
case AVAILABLE:
startHeartbeat();
onReady();
break;
default:
break;
public void addMessageListener(ConnectionListener listener) {
synchronized (connectionListeners) {
connectionListeners.add(listener);
}
}
}
}
protected void onReady() {
if (!available.getAndSet(true)) {
synchronized (connectionListeners) {
for (ConnectionListener l : connectionListeners) {
l.onReady(this);
protected void setConnectionListeners(Collection<ConnectionListener> listeners) {
synchronized (listeners) {
this.connectionListeners.clear();
this.connectionListeners.addAll(listeners);
}
}
}
}
protected abstract void onDisconnect();
private void onDisconn(DisconnectReason reason) {
onDisconnect();
synchronized (connectionListeners) {
for (ConnectionListener l : connectionListeners) {
l.onDisconnect(this, reason);
}
public void removeMessageListener(ConnectionListener listener) {
synchronized (connectionListeners) {
connectionListeners.remove(listener);
}
}
}
private void onTimeout() {
try {
close(false, PredefinedDisconnectReason.TIMEOUT);
} catch (IOException e1) {
void sendMsg(Message msg) throws IOException {
out.writeObject(msg);
out.flush();
}
}
protected void onError(Exception e) {
synchronized (connectionListeners) {
for (ConnectionListener l : connectionListeners) {
l.onError(this, new ConnectionException(e));
}
public void sendMessage(ContainerMessage msg) throws IOException {
if (!available.get())
throw new IOException("Connection is not yet available!");
sendMsg(msg);
}
}
public void close() throws IOException {
close(true, PredefinedDisconnectReason.createReason(PredefinedDisconnectReason.CONNECTION_CLOSED, false));
}
private void close(boolean graceful, DisconnectReason reason) throws IOException {
running = false;
onDisconn(reason);
if (graceful) {
try {
sendMsg(ControlMessage.DISCONNECT);
} catch (Exception e) {
onError(e);
}
}
out.close();
in.close();
socket.close();
}
public String getPeer() {
return peer;
}
void startHeartbeat() {
if (!heartBeating.getAndSet(true)) {
log.debug("Starting Heartbeat");
executorService.submit(new Runnable() {
public void run() {
try {
Thread.sleep(30000);
while (running) {
try {
log.debug("TX Heartbeat");
sendMsg(ControlMessage.HEARTBEAT);
Thread.sleep(30000);
} catch (IOException e) {
e.printStackTrace();
}
protected void onMessage(Message msg) throws IOException {
log.debug("RXD: " + msg.toString());
if (msg instanceof ContainerMessage) {
synchronized (connectionListeners) {
for (ConnectionListener l : connectionListeners)
l.onMessage(this, (ContainerMessage) msg);
}
} catch (InterruptedException e) {
}
}
});
}
}
public void listen() throws ConnectionException {
if (listening.getAndSet(true))
throw new ConnectionException("Already Listening!");
executorService.submit(inputStreamListener);
}
private class InputStreamListener implements Runnable {
@Override
public void run() {
while (running) {
try {
Message msg = (Message) in.readObject();
onMessage(msg);
} catch (ClassNotFoundException | IOException e) {
if (e instanceof SocketTimeoutException) {
onTimeout();
} else {
if (running) {
onError(new ConnectionException(e));
// TODO: Fault Tolerance?
if (e instanceof EOFException) {
try {
close(false, PredefinedDisconnectReason.RESET);
} catch (IOException e1) {
e1.printStackTrace();
} else {
if (msg instanceof ControlMessage) {
switch ((ControlMessage) msg) {
case DISCONNECT:
close(false, PredefinedDisconnectReason.createReason(PredefinedDisconnectReason.CONNECTION_CLOSED, true));
break;
case AVAILABLE:
startHeartbeat();
onReady();
break;
default:
break;
}
}
}
}
protected void onReady() {
if (!available.getAndSet(true)) {
synchronized (connectionListeners) {
for (ConnectionListener l : connectionListeners) {
l.onReady(this);
}
}
}
}
protected abstract void onDisconnect();
private void onDisconn(DisconnectReason reason) {
onDisconnect();
synchronized (connectionListeners) {
for (ConnectionListener l : connectionListeners) {
l.onDisconnect(this, reason);
}
}
}
private void onTimeout() {
try {
close(false, PredefinedDisconnectReason.TIMEOUT);
} catch (IOException e1) {
}
}
protected void onError(Exception e) {
synchronized (connectionListeners) {
for (ConnectionListener l : connectionListeners) {
l.onError(this, new ConnectionException(e));
}
}
}
public void close() throws IOException {
close(true, PredefinedDisconnectReason.createReason(PredefinedDisconnectReason.CONNECTION_CLOSED, false));
}
private void close(boolean graceful, DisconnectReason reason) throws IOException {
running = false;
onDisconn(reason);
if (graceful) {
try {
sendMsg(ControlMessage.DISCONNECT);
} catch (Exception e) {
onError(e);
}
}
out.close();
in.close();
socket.close();
}
public String getPeer() {
return peer;
}
void startHeartbeat() {
if (!heartBeating.getAndSet(true)) {
log.debug("Starting Heartbeat");
executorService.submit(new Runnable() {
public void run() {
try {
Thread.sleep(30000);
while (running) {
try {
log.debug("TX Heartbeat");
sendMsg(ControlMessage.HEARTBEAT);
Thread.sleep(30000);
} catch (IOException e) {
e.printStackTrace();
}
}
} catch (InterruptedException e) {
}
}
});
}
}
public void listen() throws ConnectionException {
if (listening.getAndSet(true))
throw new ConnectionException("Already Listening!");
executorService.submit(inputStreamListener);
}
private class InputStreamListener implements Runnable {
@Override
public void run() {
while (running) {
try {
Message msg = (Message) in.readObject();
onMessage(msg);
} catch (ClassNotFoundException | IOException e) {
if (e instanceof SocketTimeoutException) {
onTimeout();
} else {
if (running) {
onError(new ConnectionException(e));
// TODO: Fault Tolerance?
if (e instanceof EOFException) {
try {
close(false, PredefinedDisconnectReason.RESET);
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
}
}
}
}
}
}
}
}
}

View file

@ -5,12 +5,12 @@ import io.nucleo.net.proto.exceptions.ConnectionException;
public interface ConnectionListener {
public abstract void onMessage(Connection con, ContainerMessage msg);
public abstract void onMessage(Connection con, ContainerMessage msg);
public void onDisconnect(Connection con, DisconnectReason reason);
public void onDisconnect(Connection con, DisconnectReason reason);
public void onError(Connection con, ConnectionException e);
public void onError(Connection con, ConnectionException e);
public void onReady(Connection con);
public void onReady(Connection con);
}

View file

@ -2,10 +2,10 @@ package io.nucleo.net;
public interface DisconnectReason {
public abstract String toString();
public abstract String toString();
public boolean isGraceful();
public boolean isGraceful();
public boolean isRemote();
public boolean isRemote();
}

View file

@ -29,363 +29,363 @@ import java.util.regex.Pattern;
public class Node {
/**
* Use this whenever to flush the socket header over the socket!
*
* @param socket the socket to construct an objectOutputStream from
* @return the outputstream from the socket
* @throws IOException in case something goes wrong, duh!
*/
static ObjectOutputStream prepareOOSForSocket(Socket socket) throws IOException {
ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
/**
* Use this whenever to flush the socket header over the socket!
*
* @param socket the socket to construct an objectOutputStream from
* @return the outputstream from the socket
* @throws IOException in case something goes wrong, duh!
*/
static ObjectOutputStream prepareOOSForSocket(Socket socket) throws IOException {
ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
out.flush();
return out;
}
private static final Logger log = LoggerFactory.getLogger(Node.class);
private final ServiceDescriptor descriptor;
private final HashMap<String, Connection> connections;
@SuppressWarnings("rawtypes")
private final TorNode tor;
private final AtomicBoolean serverRunning;
public Node(TCPServiceDescriptor descriptor) {
this(null, descriptor);
}
public Node(HiddenServiceDescriptor descriptor, TorNode<?, ?> tor) {
this(tor, descriptor);
}
private Node(TorNode<?, ?> tor, ServiceDescriptor descriptor) {
this.connections = new HashMap<>();
this.descriptor = descriptor;
this.tor = tor;
this.serverRunning = new AtomicBoolean(false);
}
public String getLocalName() {
return descriptor.getFullAddress();
}
public Connection connect(String peer, Collection<ConnectionListener> listeners)
throws NumberFormatException, IOException {
if (!serverRunning.get()) {
throw new IOException("This node has not been started yet!");
}
if (peer.equals(descriptor.getFullAddress()))
throw new IOException("If you find yourself talking to yourself too often, you should really seek help!");
synchronized (connections) {
if (connections.containsKey(peer))
throw new IOException("Already connected to " + peer);
out.flush();
return out;
}
final Socket sock = connectToService(peer);
return new OutgoingConnection(peer, sock, listeners);
}
private static final Logger log = LoggerFactory.getLogger(Node.class);
private Socket connectToService(String hostname, int port) throws IOException, UnknownHostException, SocketException {
final Socket sock;
if (tor != null)
sock = tor.connectToHiddenService(hostname, port);
else
sock = new Socket(hostname, port);
sock.setSoTimeout(60000);
return sock;
}
private final ServiceDescriptor descriptor;
private Socket connectToService(String peer) throws IOException, UnknownHostException, SocketException {
final String[] split = peer.split(Pattern.quote(":"));
return connectToService(split[0], Integer.parseInt(split[1]));
private final HashMap<String, Connection> connections;
}
@SuppressWarnings("rawtypes")
private final TorNode tor;
public synchronized Server startListening(ServerConnectListener listener) throws IOException {
if (serverRunning.getAndSet(true))
throw new IOException("This node is already listening!");
final Server server = new Server(descriptor.getServerSocket(), listener);
server.start();
return server;
}
private final AtomicBoolean serverRunning;
public Connection getConnection(String peerAddress) {
synchronized (connections) {
return connections.get(peerAddress);
}
}
public Set<Connection> getConnections() {
synchronized (connections) {
return new HashSet<Connection>(connections.values());
}
}
public class Server extends Thread {
private boolean running;
private final ServerSocket serverSocket;
private final ExecutorService executorService;
private final ServerConnectListener serverConnectListener;
private Server(ServerSocket serverSocket, ServerConnectListener listener) {
super("Server");
this.serverSocket = descriptor.getServerSocket();
this.serverConnectListener = listener;
running = true;
executorService = Executors.newCachedThreadPool();
public Node(TCPServiceDescriptor descriptor) {
this(null, descriptor);
}
public void shutdown() throws IOException {
running = false;
synchronized (connections) {
final Set<Connection> conns = new HashSet<Connection>(connections.values());
for (Connection con : conns) {
con.close();
public Node(HiddenServiceDescriptor descriptor, TorNode<?, ?> tor) {
this(tor, descriptor);
}
private Node(TorNode<?, ?> tor, ServiceDescriptor descriptor) {
this.connections = new HashMap<>();
this.descriptor = descriptor;
this.tor = tor;
this.serverRunning = new AtomicBoolean(false);
}
public String getLocalName() {
return descriptor.getFullAddress();
}
public Connection connect(String peer, Collection<ConnectionListener> listeners)
throws NumberFormatException, IOException {
if (!serverRunning.get()) {
throw new IOException("This node has not been started yet!");
}
}
serverSocket.close();
try {
executorService.awaitTermination(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
Node.this.serverRunning.set(false);
log.debug("Server successfully shutdown");
}
@Override
public void run() {
try {
while (running) {
final Socket socket = serverSocket.accept();
log.info("Accepting Client on port " + socket.getLocalPort());
executorService.submit(new Acceptor(socket));
if (peer.equals(descriptor.getFullAddress()))
throw new IOException("If you find yourself talking to yourself too often, you should really seek help!");
synchronized (connections) {
if (connections.containsKey(peer))
throw new IOException("Already connected to " + peer);
}
} catch (IOException e) {
if (running)
e.printStackTrace();
}
final Socket sock = connectToService(peer);
return new OutgoingConnection(peer, sock, listeners);
}
private boolean verifyIdentity(HELOMessage helo, ObjectInputStream in) throws IOException {
log.debug("Verifying HELO msg");
final Socket sock = connectToService(helo.getHostname(), helo.getPort());
log.debug("Connected to advertised client " + helo.getPeer());
ObjectOutputStream out = prepareOOSForSocket(sock);
final IDMessage challenge = new IDMessage(descriptor);
out.writeObject(challenge);
log.debug("Sent IDMessage to");
out.flush();
// wait for other side to close
try {
while (sock.getInputStream().read() != -1)
;
} catch (IOException e) {
// no matter
}
out.close();
sock.close();
log.debug("Closed socket after sending IDMessage");
try {
log.debug("Waiting for response of challenge");
IDMessage response = (IDMessage) in.readObject();
log.debug("Got response for challenge");
final boolean verified = challenge.verify(response);
log.debug("Response verified correctly!");
return verified;
} catch (ClassNotFoundException e) {
new ProtocolViolationException(e).printStackTrace();
}
return false;
private Socket connectToService(String hostname, int port) throws IOException, UnknownHostException, SocketException {
final Socket sock;
if (tor != null)
sock = tor.connectToHiddenService(hostname, port);
else
sock = new Socket(hostname, port);
sock.setSoTimeout(60000);
return sock;
}
private class Acceptor implements Runnable {
private Socket connectToService(String peer) throws IOException, UnknownHostException, SocketException {
final String[] split = peer.split(Pattern.quote(":"));
return connectToService(split[0], Integer.parseInt(split[1]));
private final Socket socket;
}
private Acceptor(Socket socket) {
this.socket = socket;
}
public synchronized Server startListening(ServerConnectListener listener) throws IOException {
if (serverRunning.getAndSet(true))
throw new IOException("This node is already listening!");
final Server server = new Server(descriptor.getServerSocket(), listener);
server.start();
return server;
}
@Override
public void run() {
{
try {
socket.setSoTimeout(60 * 1000);
} catch (SocketException e2) {
e2.printStackTrace();
public Connection getConnection(String peerAddress) {
synchronized (connections) {
return connections.get(peerAddress);
}
}
public Set<Connection> getConnections() {
synchronized (connections) {
return new HashSet<Connection>(connections.values());
}
}
public class Server extends Thread {
private boolean running;
private final ServerSocket serverSocket;
private final ExecutorService executorService;
private final ServerConnectListener serverConnectListener;
private Server(ServerSocket serverSocket, ServerConnectListener listener) {
super("Server");
this.serverSocket = descriptor.getServerSocket();
this.serverConnectListener = listener;
running = true;
executorService = Executors.newCachedThreadPool();
}
public void shutdown() throws IOException {
running = false;
synchronized (connections) {
final Set<Connection> conns = new HashSet<Connection>(connections.values());
for (Connection con : conns) {
con.close();
}
}
serverSocket.close();
try {
socket.close();
executorService.awaitTermination(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
Node.this.serverRunning.set(false);
log.debug("Server successfully shutdown");
}
@Override
public void run() {
try {
while (running) {
final Socket socket = serverSocket.accept();
log.info("Accepting Client on port " + socket.getLocalPort());
executorService.submit(new Acceptor(socket));
}
} catch (IOException e) {
if (running)
e.printStackTrace();
}
return;
}
}
ObjectInputStream objectInputStream = null;
ObjectOutputStream out = null;
private boolean verifyIdentity(HELOMessage helo, ObjectInputStream in) throws IOException {
log.debug("Verifying HELO msg");
final Socket sock = connectToService(helo.getHostname(), helo.getPort());
// get incoming data
try {
out = prepareOOSForSocket(socket);
objectInputStream = new ObjectInputStream(socket.getInputStream());
} catch (EOFException e) {
log.info("Got bogus incoming connection");
} catch (IOException e) {
e.printStackTrace();
log.debug("Connected to advertised client " + helo.getPeer());
ObjectOutputStream out = prepareOOSForSocket(sock);
final IDMessage challenge = new IDMessage(descriptor);
out.writeObject(challenge);
log.debug("Sent IDMessage to");
out.flush();
// wait for other side to close
try {
socket.close();
} catch (IOException e1) {
while (sock.getInputStream().read() != -1)
;
} catch (IOException e) {
// no matter
}
return;
}
out.close();
sock.close();
log.debug("Closed socket after sending IDMessage");
try {
log.debug("Waiting for response of challenge");
IDMessage response = (IDMessage) in.readObject();
log.debug("Got response for challenge");
final boolean verified = challenge.verify(response);
log.debug("Response verified correctly!");
return verified;
} catch (ClassNotFoundException e) {
new ProtocolViolationException(e).printStackTrace();
}
return false;
}
String peer = null;
try {
log.debug("Waiting for HELO or Identification");
final Message helo = (Message) objectInputStream.readObject();
if (helo instanceof HELOMessage) {
peer = ((HELOMessage) helo).getPeer();
log.debug("Got HELO from " + peer);
boolean alreadyConnected;
synchronized (connections) {
alreadyConnected = connections.containsKey(peer);
}
if (alreadyConnected || !verifyIdentity((HELOMessage) helo, objectInputStream)) {
log.debug(alreadyConnected ? ("already connected to " + peer) : "verification failed");
out.writeObject(alreadyConnected ? ControlMessage.ALREADY_CONNECTED : ControlMessage.HANDSHAKE_FAILED);
out.writeObject(ControlMessage.DISCONNECT);
out.flush();
out.close();
objectInputStream.close();
socket.close();
return;
}
log.debug("Verification of " + peer + " successful");
} else if (helo instanceof IDMessage) {
peer = ((IDMessage) helo).getPeer();
log.debug("got IDMessage from " + peer);
final Connection client = connections.get(peer);
if (client != null) {
log.debug("Got preexisting connection for " + peer);
client.sendMsg(((IDMessage) helo).reply());
log.debug("Sent response for challenge");
} else {
log.debug("Got IDMessage for unknown connection to " + peer);
}
out.flush();
out.close();
objectInputStream.close();
socket.close();
log.debug("Closed socket for identification");
return;
private class Acceptor implements Runnable {
private final Socket socket;
private Acceptor(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
{
try {
socket.setSoTimeout(60 * 1000);
} catch (SocketException e2) {
e2.printStackTrace();
try {
socket.close();
} catch (IOException e) {
}
return;
}
ObjectInputStream objectInputStream = null;
ObjectOutputStream out = null;
// get incoming data
try {
out = prepareOOSForSocket(socket);
objectInputStream = new ObjectInputStream(socket.getInputStream());
} catch (EOFException e) {
log.info("Got bogus incoming connection");
} catch (IOException e) {
e.printStackTrace();
try {
socket.close();
} catch (IOException e1) {
}
return;
}
String peer = null;
try {
log.debug("Waiting for HELO or Identification");
final Message helo = (Message) objectInputStream.readObject();
if (helo instanceof HELOMessage) {
peer = ((HELOMessage) helo).getPeer();
log.debug("Got HELO from " + peer);
boolean alreadyConnected;
synchronized (connections) {
alreadyConnected = connections.containsKey(peer);
}
if (alreadyConnected || !verifyIdentity((HELOMessage) helo, objectInputStream)) {
log.debug(alreadyConnected ? ("already connected to " + peer) : "verification failed");
out.writeObject(alreadyConnected ? ControlMessage.ALREADY_CONNECTED : ControlMessage.HANDSHAKE_FAILED);
out.writeObject(ControlMessage.DISCONNECT);
out.flush();
out.close();
objectInputStream.close();
socket.close();
return;
}
log.debug("Verification of " + peer + " successful");
} else if (helo instanceof IDMessage) {
peer = ((IDMessage) helo).getPeer();
log.debug("got IDMessage from " + peer);
final Connection client = connections.get(peer);
if (client != null) {
log.debug("Got preexisting connection for " + peer);
client.sendMsg(((IDMessage) helo).reply());
log.debug("Sent response for challenge");
} else {
log.debug("Got IDMessage for unknown connection to " + peer);
}
out.flush();
out.close();
objectInputStream.close();
socket.close();
log.debug("Closed socket for identification");
return;
} else
throw new ClassNotFoundException("First Message was neither HELO, nor ID");
} catch (ClassNotFoundException e) {
new ProtocolViolationException(e);
} catch (IOException e) {
try {
objectInputStream.close();
out.close();
socket.close();
} catch (IOException e1) {
}
return;
}
// Here we go
log.debug("Incoming Connection ready!");
try {
// TODO: listeners are only added afterwards, so messages can be lost!
IncomingConnection incomingConnection = new IncomingConnection(peer, socket, out, objectInputStream);
serverConnectListener.onConnect(incomingConnection);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
private class IncomingConnection extends Connection {
private IncomingConnection(String peer, Socket socket, ObjectOutputStream out, ObjectInputStream in)
throws IOException {
super(peer, socket, out, in);
synchronized (connections) {
connections.put(peer, this);
}
sendMsg(ControlMessage.AVAILABLE);
}
@Override
public void listen() throws ConnectionException {
super.listen();
onReady();
}
@Override
protected void onMessage(Message msg) throws IOException {
if ((msg instanceof ControlMessage) && (ControlMessage.HEARTBEAT == msg)) {
log.debug("RX+REPLY HEARTBEAT");
try {
sendMsg(ControlMessage.HEARTBEAT);
} catch (IOException e) {
onError(e);
}
} else
throw new ClassNotFoundException("First Message was neither HELO, nor ID");
} catch (ClassNotFoundException e) {
new ProtocolViolationException(e);
} catch (IOException e) {
try {
objectInputStream.close();
out.close();
socket.close();
} catch (IOException e1) {
super.onMessage(msg);
}
@Override
public void onDisconnect() {
synchronized (connections) {
connections.remove(getPeer());
}
return;
}
// Here we go
log.debug("Incoming Connection ready!");
try {
// TODO: listeners are only added afterwards, so messages can be lost!
IncomingConnection incomingConnection = new IncomingConnection(peer, socket, out, objectInputStream);
serverConnectListener.onConnect(incomingConnection);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
private class IncomingConnection extends Connection {
private IncomingConnection(String peer, Socket socket, ObjectOutputStream out, ObjectInputStream in)
throws IOException {
super(peer, socket, out, in);
synchronized (connections) {
connections.put(peer, this);
}
sendMsg(ControlMessage.AVAILABLE);
}
@Override
public void listen() throws ConnectionException {
super.listen();
onReady();
}
@Override
protected void onMessage(Message msg) throws IOException {
if ((msg instanceof ControlMessage) && (ControlMessage.HEARTBEAT == msg)) {
log.debug("RX+REPLY HEARTBEAT");
try {
sendMsg(ControlMessage.HEARTBEAT);
} catch (IOException e) {
onError(e);
@Override
public boolean isIncoming() {
return true;
}
} else
super.onMessage(msg);
}
@Override
public void onDisconnect() {
synchronized (connections) {
connections.remove(getPeer());
}
private class OutgoingConnection extends Connection {
private OutgoingConnection(String peer, Socket socket, Collection<ConnectionListener> listeners)
throws IOException {
super(peer, socket);
synchronized (connections) {
connections.put(peer, this);
}
setConnectionListeners(listeners);
try {
listen();
} catch (ConnectionException e) {
// Never happens
}
log.debug("Sending HELO");
sendMsg(new HELOMessage(descriptor));
log.debug("Sent HELO");
}
@Override
public void onDisconnect() {
synchronized (connections) {
connections.remove(getPeer());
}
}
@Override
public boolean isIncoming() {
return false;
}
}
@Override
public boolean isIncoming() {
return true;
}
}
private class OutgoingConnection extends Connection {
private OutgoingConnection(String peer, Socket socket, Collection<ConnectionListener> listeners)
throws IOException {
super(peer, socket);
synchronized (connections) {
connections.put(peer, this);
}
setConnectionListeners(listeners);
try {
listen();
} catch (ConnectionException e) {
// Never happens
}
log.debug("Sending HELO");
sendMsg(new HELOMessage(descriptor));
log.debug("Sent HELO");
}
@Override
public void onDisconnect() {
synchronized (connections) {
connections.remove(getPeer());
}
}
@Override
public boolean isIncoming() {
return false;
}
}
}

View file

@ -1,52 +1,52 @@
package io.nucleo.net;
public enum PredefinedDisconnectReason implements DisconnectReason {
TIMEOUT("due to timed out", false, true),
CONNECTION_CLOSED("as ordered", true),
RESET("due to remote reset (EOF)", false, true),
UNKNOWN("for unknown reasons", false);
TIMEOUT("due to timed out", false, true),
CONNECTION_CLOSED("as ordered", true),
RESET("due to remote reset (EOF)", false, true),
UNKNOWN("for unknown reasons", false);
private Boolean remote;
private final boolean graceful;
private final String description;
private Boolean remote;
private final boolean graceful;
private final String description;
private PredefinedDisconnectReason(String description, boolean graceful) {
this.description = description;
this.graceful = graceful;
}
private PredefinedDisconnectReason(String description, boolean graceful) {
this.description = description;
this.graceful = graceful;
}
private PredefinedDisconnectReason(String description, boolean graceful, boolean remote) {
this.description = description;
this.graceful = graceful;
this.remote = remote;
}
private PredefinedDisconnectReason(String description, boolean graceful, boolean remote) {
this.description = description;
this.graceful = graceful;
this.remote = remote;
}
public static PredefinedDisconnectReason createReason(PredefinedDisconnectReason reason, boolean remote) {
reason.remote = remote;
return reason;
}
public static PredefinedDisconnectReason createReason(PredefinedDisconnectReason reason, boolean remote) {
reason.remote = remote;
return reason;
}
@Override
public boolean isGraceful() {
return graceful;
}
@Override
public boolean isGraceful() {
return graceful;
}
@Override
public boolean isRemote() {
if (remote == null)
return false;
return remote;
@Override
public boolean isRemote() {
if (remote == null)
return false;
return remote;
}
}
public String toString() {
StringBuilder bld = new StringBuilder("Connection closed ");
if (remote != null)
bld.append(remote ? "remotely " : "locally ");
bld.append(description).append(" (");
bld.append(graceful ? "graceful" : "irregular").append(" disconnect)");
return bld.toString();
public String toString() {
StringBuilder bld = new StringBuilder("Connection closed ");
if (remote != null)
bld.append(remote ? "remotely " : "locally ");
bld.append(description).append(" (");
bld.append(graceful ? "graceful" : "irregular").append(" disconnect)");
return bld.toString();
}
}
}

View file

@ -2,11 +2,11 @@ package io.nucleo.net;
public interface ServerConnectListener {
/**
* Called whenever an incoming connection was set up properly.
* Connection.listen() needs to be called ASAP for the connection to become available
*
* @param con the newly established connection
*/
public void onConnect(Connection con);
/**
* Called whenever an incoming connection was set up properly.
* Connection.listen() needs to be called ASAP for the connection to become available
*
* @param con the newly established connection
*/
public void onConnect(Connection con);
}

View file

@ -5,14 +5,14 @@ import java.io.Serializable;
public class ContainerMessage implements Message {
private static final long serialVersionUID = 9219884444024922023L;
private final Serializable payload;
private static final long serialVersionUID = 9219884444024922023L;
private final Serializable payload;
public ContainerMessage(Serializable payload) {
this.payload = payload;
}
public ContainerMessage(Serializable payload) {
this.payload = payload;
}
public Serializable getPayload() {
return payload;
}
public Serializable getPayload() {
return payload;
}
}

View file

@ -1,9 +1,9 @@
package io.nucleo.net.proto;
public enum ControlMessage implements Message {
HEARTBEAT,
AVAILABLE,
HANDSHAKE_FAILED,
ALREADY_CONNECTED,
DISCONNECT;
HEARTBEAT,
AVAILABLE,
HANDSHAKE_FAILED,
ALREADY_CONNECTED,
DISCONNECT;
}

View file

@ -6,30 +6,30 @@ import java.util.regex.Pattern;
public class HELOMessage implements Message {
private static final long serialVersionUID = -4582946298578924930L;
private final String peer;
private static final long serialVersionUID = -4582946298578924930L;
private final String peer;
public HELOMessage(ServiceDescriptor descriptor) {
this(descriptor.getFullAddress());
}
public HELOMessage(ServiceDescriptor descriptor) {
this(descriptor.getFullAddress());
}
private HELOMessage(String peer) {
this.peer = peer;
}
private HELOMessage(String peer) {
this.peer = peer;
}
public String getPeer() {
return peer;
}
public String getPeer() {
return peer;
}
public String getHostname() {
return peer.split(Pattern.quote(":"))[0];
}
public String getHostname() {
return peer.split(Pattern.quote(":"))[0];
}
public int getPort() {
return Integer.parseInt(peer.split(Pattern.quote(":"))[1]);
}
public int getPort() {
return Integer.parseInt(peer.split(Pattern.quote(":"))[1]);
}
public String toString() {
return "HELO " + peer;
}
public String toString() {
return "HELO " + peer;
}
}

View file

@ -7,42 +7,42 @@ import java.security.SecureRandom;
public class IDMessage implements Message {
private static final long serialVersionUID = -2214485311644580948L;
private static SecureRandom rnd;
private static final long serialVersionUID = -2214485311644580948L;
private static SecureRandom rnd;
static {
try {
rnd = SecureRandom.getInstance("SHA1PRNG");
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
static {
try {
rnd = SecureRandom.getInstance("SHA1PRNG");
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
}
}
private final String id;
private final long nonce;
private final String id;
private final long nonce;
public IDMessage(ServiceDescriptor descriptor) {
this(descriptor.getFullAddress(), rnd.nextLong());
}
public IDMessage(ServiceDescriptor descriptor) {
this(descriptor.getFullAddress(), rnd.nextLong());
}
private IDMessage(String id, long nonce) {
this.id = id;
this.nonce = nonce;
}
private IDMessage(String id, long nonce) {
this.id = id;
this.nonce = nonce;
}
public String getPeer() {
return id;
}
public String getPeer() {
return id;
}
public IDMessage reply() {
return new IDMessage(id, nonce);
}
public IDMessage reply() {
return new IDMessage(id, nonce);
}
public boolean verify(IDMessage msg) {
return id.equals(msg.id) && (nonce == msg.nonce);
}
public boolean verify(IDMessage msg) {
return id.equals(msg.id) && (nonce == msg.nonce);
}
public String toString() {
return "ID " + id;
}
public String toString() {
return "ID " + id;
}
}

View file

@ -1,16 +1,16 @@
package io.nucleo.net.proto.exceptions;
public class ConnectionException extends Exception {
public ConnectionException(String msg) {
super(msg);
}
public ConnectionException(String msg) {
super(msg);
}
public ConnectionException(Throwable cause) {
super(cause);
}
public ConnectionException(Throwable cause) {
super(cause);
}
public ConnectionException(String msg, Throwable cause) {
super(msg, cause);
}
public ConnectionException(String msg, Throwable cause) {
super(msg, cause);
}
}

View file

@ -2,18 +2,18 @@ package io.nucleo.net.proto.exceptions;
public class ProtocolViolationException extends Exception {
public ProtocolViolationException() {
}
public ProtocolViolationException() {
}
public ProtocolViolationException(Throwable cause) {
super(cause);
}
public ProtocolViolationException(Throwable cause) {
super(cause);
}
public ProtocolViolationException(String msg) {
super(msg);
}
public ProtocolViolationException(String msg) {
super(msg);
}
public ProtocolViolationException(String msg, Throwable cause) {
super(msg, cause);
}
public ProtocolViolationException(String msg, Throwable cause) {
super(msg, cause);
}
}