use tomp2p master branch

This commit is contained in:
Manfred Karrer 2014-07-31 19:43:46 +02:00
parent 79a52a6e38
commit d943ee0918
14 changed files with 40 additions and 952 deletions

View file

@ -110,7 +110,7 @@ public class P2PNode
useDiscStorage(useDiskStorage);
setupTimerForIPCheck();
FutureCallback<PeerDHT> localCallback = new FutureCallback<PeerDHT>()
/* FutureCallback<PeerDHT> localCallback = new FutureCallback<PeerDHT>()
{
@Override
public void onSuccess(@Nullable PeerDHT result)
@ -126,13 +126,18 @@ public class P2PNode
log.error(t.toString());
callback.onFailure(t);
}
};
}; */
bootstrapToLocalhostThread = runBootstrapThread(localCallback, new SeedNodeAddress(defaultStaticSeedNodeAddresses));
bootstrapToServerThread = runBootstrapThread(localCallback, new SeedNodeAddress(SeedNodeAddress.StaticSeedNodeAddresses.DIGITAL_OCEAN));
ListenableFuture<PeerDHT> bootstrapComplete = bootstrap(new SeedNodeAddress(defaultStaticSeedNodeAddresses));
Futures.addCallback(bootstrapComplete, callback);
// bootstrapToLocalhostThread = runBootstrapThread(localCallback, new SeedNodeAddress(defaultStaticSeedNodeAddresses));
// bootstrapToServerThread = runBootstrapThread(localCallback, new SeedNodeAddress(SeedNodeAddress.StaticSeedNodeAddresses.DIGITAL_OCEAN));
}
public void bootstrapThreadCompleted()
// TODO: start multiple threads for bootstrapping, so we can get it done faster.
/* public void bootstrapThreadCompleted()
{
if (bootstrapToLocalhostThread != null)
bootstrapToLocalhostThread.interrupt();
@ -155,7 +160,7 @@ public class P2PNode
});
bootstrapThread.start();
return bootstrapThread;
}
} */
public void shutDown()
{

View file

@ -1,139 +0,0 @@
package lighthouse.protocol;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LHUtils
{
private static final Logger log = LoggerFactory.getLogger(LHUtils.class);
public static List<Path> listDir(Path dir) throws IOException
{
List<Path> contents = new LinkedList<>();
try (Stream<Path> list = Files.list(dir))
{
list.forEach(contents::add);
}
return contents;
}
//region Generic Java 8 enhancements
public interface UncheckedRun<T>
{
public T run() throws Throwable;
}
public interface UncheckedRunnable
{
public void run() throws Throwable;
}
public static <T> T unchecked(UncheckedRun<T> run)
{
try
{
return run.run();
} catch (Throwable throwable)
{
throw new RuntimeException(throwable);
}
}
public static void uncheck(UncheckedRunnable run)
{
try
{
run.run();
} catch (Throwable throwable)
{
throw new RuntimeException(throwable);
}
}
public static void ignoreAndLog(UncheckedRunnable runnable)
{
try
{
runnable.run();
} catch (Throwable t)
{
log.error("Ignoring error", t);
}
}
public static <T> T ignoredAndLogged(UncheckedRun<T> runnable)
{
try
{
return runnable.run();
} catch (Throwable t)
{
log.error("Ignoring error", t);
return null;
}
}
@SuppressWarnings("unchecked")
public static <T, E extends Throwable> T checkedGet(Future<T> future) throws E
{
try
{
return future.get();
} catch (InterruptedException e)
{
throw new RuntimeException(e);
} catch (ExecutionException e)
{
throw (E) e.getCause();
}
}
public static boolean didThrow(UncheckedRun run)
{
try
{
run.run();
return false;
} catch (Throwable throwable)
{
return true;
}
}
public static boolean didThrow(UncheckedRunnable run)
{
try
{
run.run();
return false;
} catch (Throwable throwable)
{
return true;
}
}
public static <T> T stopwatched(String description, UncheckedRun<T> run)
{
long now = System.currentTimeMillis();
T result = unchecked(run::run);
log.info("{}: {}ms", description, System.currentTimeMillis() - now);
return result;
}
public static void stopwatch(String description, UncheckedRunnable run)
{
long now = System.currentTimeMillis();
uncheck(run::run);
log.info("{}: {}ms", description, System.currentTimeMillis() - now);
}
//endregion
}

View file

@ -1,171 +0,0 @@
package lighthouse.threading;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import javafx.application.Platform;
import lighthouse.protocol.LHUtils;
import static com.google.common.base.Preconditions.checkState;
/**
* An extended executor interface that supports thread affinity assertions and short circuiting.
*/
public interface AffinityExecutor extends Executor
{
/**
* Returns true if the current thread is equal to the thread this executor is backed by.
*/
public boolean isOnThread();
/**
* Throws an IllegalStateException if the current thread is equal to the thread this executor is backed by.
*/
public void checkOnThread();
/**
* If isOnThread() then runnable is invoked immediately, otherwise the closure is queued onto the backing thread.
*/
public void executeASAP(LHUtils.UncheckedRunnable runnable);
public abstract static class BaseAffinityExecutor implements AffinityExecutor
{
protected final Thread.UncaughtExceptionHandler exceptionHandler;
protected BaseAffinityExecutor()
{
exceptionHandler = Thread.currentThread().getUncaughtExceptionHandler();
}
@Override
public abstract boolean isOnThread();
@Override
public void checkOnThread()
{
checkState(isOnThread(), "On wrong thread: %s", Thread.currentThread());
}
@Override
public void executeASAP(LHUtils.UncheckedRunnable runnable)
{
final Runnable command = () -> {
try
{
runnable.run();
} catch (Throwable throwable)
{
exceptionHandler.uncaughtException(Thread.currentThread(), throwable);
}
};
if (isOnThread())
command.run();
else
{
execute(command);
}
}
// Must comply with the Executor definition w.r.t. exceptions here.
@Override
public abstract void execute(Runnable command);
}
public static AffinityExecutor UI_THREAD = new BaseAffinityExecutor()
{
@Override
public boolean isOnThread()
{
return Platform.isFxApplicationThread();
}
@Override
public void execute(Runnable command)
{
Platform.runLater(command);
}
};
public static AffinityExecutor SAME_THREAD = new BaseAffinityExecutor()
{
@Override
public boolean isOnThread()
{
return true;
}
@Override
public void execute(Runnable command)
{
command.run();
}
};
public static class ServiceAffinityExecutor extends BaseAffinityExecutor
{
protected AtomicReference<Thread> whichThread = new AtomicReference<>(null);
private final Thread.UncaughtExceptionHandler handler = Thread.currentThread().getUncaughtExceptionHandler();
public final ScheduledExecutorService service;
public ServiceAffinityExecutor(String threadName)
{
service = Executors.newSingleThreadScheduledExecutor(runnable -> {
Thread thread = new Thread(runnable);
thread.setDaemon(true);
thread.setName(threadName);
thread.setUncaughtExceptionHandler(handler);
whichThread.set(thread);
return thread;
});
}
@Override
public boolean isOnThread()
{
return Thread.currentThread() == whichThread.get();
}
@Override
public void execute(Runnable command)
{
service.execute(command);
}
}
/**
* An executor useful for unit tests: allows the current thread to block until a command arrives from another
* thread, which is then executed. Inbound closures/commands stack up until they are cleared by looping.
*/
public static class Gate extends BaseAffinityExecutor
{
private final Thread thisThread = Thread.currentThread();
private final LinkedBlockingQueue<Runnable> commandQ = new LinkedBlockingQueue<>();
@Override
public boolean isOnThread()
{
return Thread.currentThread() == thisThread;
}
@Override
public void execute(Runnable command)
{
Uninterruptibles.putUninterruptibly(commandQ, command);
}
public void waitAndRun()
{
final Runnable runnable = Uninterruptibles.takeUninterruptibly(commandQ);
System.err.println("Gate running " + runnable);
runnable.run();
}
public int getTaskQueueSize()
{
return commandQ.size();
}
}
}

View file

@ -1,101 +0,0 @@
package lighthouse.threading;
import java.util.ArrayList;
import java.util.List;
import javafx.collections.ListChangeListener;
import javafx.collections.ObservableList;
import javafx.collections.ObservableListBase;
import javafx.collections.WeakListChangeListener;
/**
* This list is created by dynamically concatenating all the source lists together.
*/
public class ConcatenatingList<T> extends ObservableListBase<T> implements ObservableList<T>
{
private List<ObservableList<T>> sources = new ArrayList<>();
private ListChangeListener<T> listener = this::sourceChanged;
@SafeVarargs
public ConcatenatingList(ObservableList<T>... source)
{
super();
for (ObservableList<T> s : source)
{
sources.add(s);
s.addListener(new WeakListChangeListener<T>(listener));
}
if (sources.isEmpty())
throw new IllegalArgumentException();
}
private int calculateOffset(ObservableList<? extends T> source)
{
int cursor = 0;
for (ObservableList<T> ts : sources)
{
if (ts == source) return cursor;
cursor += ts.size();
}
return cursor;
}
private void sourceChanged(ListChangeListener.Change<? extends T> c)
{
ObservableList<? extends T> source = c.getList();
int offset = calculateOffset(source);
beginChange();
while (c.next())
{
if (c.wasPermutated())
{
int[] perm = new int[c.getTo() - c.getFrom()];
for (int i = c.getFrom(); i < c.getTo(); i++)
perm[i - c.getFrom()] = c.getPermutation(i) + offset;
nextPermutation(c.getFrom() + offset, c.getTo() + offset, perm);
}
else if (c.wasUpdated())
{
for (int i = c.getFrom(); i < c.getTo(); i++)
{
nextUpdate(i + offset);
}
}
else
{
if (c.wasRemoved())
{
// Removed should come first to properly handle replacements, then add.
nextRemove(c.getFrom() + offset, c.getRemoved());
}
if (c.wasAdded())
{
nextAdd(c.getFrom() + offset, c.getTo() + offset);
}
}
}
endChange();
}
@Override
public T get(int index)
{
for (ObservableList<T> source : sources)
{
if (index < source.size())
{
return source.get(index);
}
else
{
index -= source.size();
}
}
throw new IndexOutOfBoundsException();
}
@Override
public int size()
{
return sources.stream().mapToInt(List::size).sum();
}
}

View file

@ -1,113 +0,0 @@
package lighthouse.threading;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import javafx.collections.ListChangeListener;
import javafx.collections.ObservableList;
import javafx.collections.transformation.TransformationList;
/**
* Maps elements of type F to E with change listeners working as expected.
*/
public class MappedList<E, F> extends TransformationList<E, F>
{
private final Function<F, E> mapper;
private final ArrayList<E> mapped;
/**
* Creates a new MappedList list wrapped around the source list.
* Each element will have the given function applied to it, such that the list is cast through the mapper.
*/
public MappedList(ObservableList<? extends F> source, Function<F, E> mapper)
{
super(source);
this.mapper = mapper;
this.mapped = new ArrayList<>(source.size());
mapAll();
}
private void mapAll()
{
mapped.clear();
for (F val : getSource())
mapped.add(mapper.apply(val));
}
@Override
protected void sourceChanged(ListChangeListener.Change<? extends F> c)
{
// Is all this stuff right for every case? Probably it doesn't matter for this app.
beginChange();
while (c.next())
{
if (c.wasPermutated())
{
int[] perm = new int[c.getTo() - c.getFrom()];
for (int i = c.getFrom(); i < c.getTo(); i++)
perm[i - c.getFrom()] = c.getPermutation(i);
nextPermutation(c.getFrom(), c.getTo(), perm);
}
else if (c.wasUpdated())
{
for (int i = c.getFrom(); i < c.getTo(); i++)
{
remapIndex(i);
nextUpdate(i);
}
}
else
{
if (c.wasRemoved())
{
// Removed should come first to properly handle replacements, then add.
List<E> removed = mapped.subList(c.getFrom(), c.getFrom() + c.getRemovedSize());
ArrayList<E> duped = new ArrayList<>(removed);
removed.clear();
nextRemove(c.getFrom(), duped);
}
if (c.wasAdded())
{
for (int i = c.getFrom(); i < c.getTo(); i++)
{
mapped.addAll(c.getFrom(), c.getAddedSubList().stream().map(mapper).collect(Collectors.toList()));
remapIndex(i);
}
nextAdd(c.getFrom(), c.getTo());
}
}
}
endChange();
}
private void remapIndex(int i)
{
if (i >= mapped.size())
{
for (int j = mapped.size(); j <= i; j++)
{
mapped.add(mapper.apply(getSource().get(j)));
}
}
mapped.set(i, mapper.apply(getSource().get(i)));
}
@Override
public int getSourceIndex(int index)
{
return index;
}
@Override
public E get(int index)
{
return mapped.get(index);
}
@Override
public int size()
{
return mapped.size();
}
}

View file

@ -1,43 +0,0 @@
package lighthouse.threading;
import java.util.concurrent.Executor;
import javafx.beans.InvalidationListener;
import javafx.beans.Observable;
import javafx.collections.*;
/**
* An attempt to make multi-threading and observable/reactive UI programming work together inside JavaFX without too
* many headaches. This class allows you to register change listeners on the target Observable which will be
* run with the given {@link java.util.concurrent.Executor}. In this way an observable collection which is updated by
* one thread can be observed from another thread without needing to use explicit locks or explicit marshalling.
*/
public class MarshallingObservers
{
public static InvalidationListener addListener(Observable observable, InvalidationListener listener, Executor executor)
{
InvalidationListener l = x -> executor.execute(() -> listener.invalidated(x));
observable.addListener(l);
return l;
}
public static <T> ListChangeListener<T> addListener(ObservableList<T> observable, ListChangeListener<T> listener, Executor executor)
{
ListChangeListener<T> l = (ListChangeListener.Change<? extends T> c) -> executor.execute(() -> listener.onChanged(c));
observable.addListener(l);
return l;
}
public static <T> SetChangeListener<T> addListener(ObservableSet<T> observable, SetChangeListener<T> listener, Executor executor)
{
SetChangeListener<T> l = (SetChangeListener.Change<? extends T> c) -> executor.execute(() -> listener.onChanged(c));
observable.addListener(l);
return l;
}
public static <K, V> MapChangeListener<K, V> addListener(ObservableMap<K, V> observable, MapChangeListener<K, V> listener, Executor executor)
{
MapChangeListener<K, V> l = (MapChangeListener.Change<? extends K, ? extends V> c) -> executor.execute(() -> listener.onChanged(c));
observable.addListener(l);
return l;
}
}

View file

@ -1,195 +0,0 @@
package lighthouse.threading;
import java.lang.ref.WeakReference;
import java.util.List;
import java.util.Set;
import javafx.beans.WeakListener;
import javafx.collections.*;
/**
* Utility functions that mirror changes from one list into another list. JavaFX already provides this functionality
* of course under the name "content binding"; a mirror is a content binding that relays changes into other threads
* first. Thus you can have an ObservableList which is updated in one thread, but still bound to directly in the UI
* thread, without needing to worry about cross-thread interference.
*/
public class ObservableMirrors
{
/**
* Creates an unmodifiable list that asynchronously follows changes in mirrored, with changes applied using
* the given executor. This should only be called on the thread that owns the list to be mirrored, as the contents
* will be read.
*/
public static <T> ObservableList<T> mirrorList(ObservableList<T> mirrored, AffinityExecutor runChangesIn)
{
ObservableList<T> result = FXCollections.observableArrayList();
result.setAll(mirrored);
mirrored.addListener(new ListMirror<T>(result, runChangesIn));
return FXCollections.unmodifiableObservableList(result);
}
private static class ListMirror<E> implements ListChangeListener<E>, WeakListener
{
private final WeakReference<ObservableList<E>> targetList;
private final AffinityExecutor runChangesIn;
public ListMirror(ObservableList<E> list, AffinityExecutor runChangesIn)
{
this.targetList = new WeakReference<>(list);
this.runChangesIn = runChangesIn;
}
@Override
public void onChanged(Change<? extends E> change)
{
final List<E> list = targetList.get();
if (list == null)
{
change.getList().removeListener(this);
}
else
{
// If we're already in the right thread this will just run the change immediately, as per normal.
runChangesIn.executeASAP(() -> {
while (change.next())
{
if (change.wasPermutated())
{
list.subList(change.getFrom(), change.getTo()).clear();
list.addAll(change.getFrom(), change.getList().subList(change.getFrom(), change.getTo()));
}
else
{
if (change.wasRemoved())
{
list.subList(change.getFrom(), change.getFrom() + change.getRemovedSize()).clear();
}
if (change.wasAdded())
{
list.addAll(change.getFrom(), change.getAddedSubList());
}
}
}
});
}
}
@Override
public boolean wasGarbageCollected()
{
return targetList.get() == null;
}
// Do we really need these?
@Override
public int hashCode()
{
final List<E> list = targetList.get();
return (list == null) ? 0 : list.hashCode();
}
@Override
public boolean equals(Object obj)
{
if (this == obj)
{
return true;
}
final List<E> list1 = targetList.get();
if (list1 == null)
{
return false;
}
if (obj instanceof ListMirror)
{
final ListMirror<?> other = (ListMirror<?>) obj;
final List<?> list2 = other.targetList.get();
return list1 == list2;
}
return false;
}
}
/**
* Creates an unmodifiable list that asynchronously follows changes in mirrored, with changes applied using
* the given executor. This should only be called on the thread that owns the list to be mirrored, as the contents
* will be read.
*/
public static <T> ObservableSet<T> mirrorSet(ObservableSet<T> mirrored, AffinityExecutor runChangesIn)
{
@SuppressWarnings("unchecked") ObservableSet<T> result = FXCollections.observableSet();
result.addAll(mirrored);
mirrored.addListener(new SetMirror<T>(result, runChangesIn));
return FXCollections.unmodifiableObservableSet(result);
}
private static class SetMirror<E> implements SetChangeListener<E>, WeakListener
{
private final WeakReference<ObservableSet<E>> targetSet;
private final AffinityExecutor runChangesIn;
public SetMirror(ObservableSet<E> set, AffinityExecutor runChangesIn)
{
this.targetSet = new WeakReference<>(set);
this.runChangesIn = runChangesIn;
}
@Override
public void onChanged(final Change<? extends E> change)
{
final ObservableSet<E> set = targetSet.get();
if (set == null)
{
change.getSet().removeListener(this);
}
else
{
// If we're already in the right thread this will just run the change immediately, as per normal.
runChangesIn.executeASAP(() -> {
if (change.wasAdded())
set.add(change.getElementAdded());
if (change.wasRemoved())
set.remove(change.getElementRemoved());
});
}
}
@Override
public boolean wasGarbageCollected()
{
return targetSet.get() == null;
}
@Override
public int hashCode()
{
final ObservableSet<E> set = targetSet.get();
return (set == null) ? 0 : set.hashCode();
}
@Override
public boolean equals(Object obj)
{
if (this == obj)
{
return true;
}
final Set<E> set1 = targetSet.get();
if (set1 == null)
{
return false;
}
if (obj instanceof SetMirror)
{
final SetMirror<?> other = (SetMirror<?>) obj;
final Set<?> list2 = other.targetSet.get();
return set1 == list2;
}
return false;
}
}
}