Basic Network Text UI implemented. Announce stream and peer settings implemented.

This commit is contained in:
Mark Qvist 2021-05-15 21:08:30 +02:00
parent 420cfcbbe3
commit a53bd70dc9
8 changed files with 668 additions and 25 deletions

View File

@ -9,6 +9,29 @@ class Conversation:
cached_conversations = {}
created_callback = None
aspect_filter = "lxmf.delivery"
@staticmethod
def received_announce(destination_hash, announced_identity, app_data):
app = nomadnet.NomadNetworkApp.get_shared_instance()
destination_hash_text = RNS.hexrep(destination_hash, delimit=False)
# Check if the announced destination is in
# our list of conversations
if destination_hash_text in [e[0] for e in Conversation.conversation_list(app)]:
RNS.log("Announced LXMF destination is in our conversation list")
RNS.log("app_data = "+str(app_data))
if app.directory.find(destination_hash):
RNS.log("It is also in the directory")
if Conversation.created_callback != None:
Conversation.created_callback()
else:
RNS.log("But it is not in the directory")
if Conversation.created_callback != None:
Conversation.created_callback()
# Add the announce to the directory announce
# stream logger
app.directory.announce_received(destination_hash, app_data)
@staticmethod
def query_for_peer(source_hash):
try:

View File

@ -1,11 +1,15 @@
import os
import RNS
import LXMF
import time
import RNS.vendor.umsgpack as msgpack
class Directory:
ANNOUNCE_STREAM_MAXLENGTH = 256
def __init__(self, app):
self.directory_entries = {}
self.announce_stream = []
self.app = app
self.load_from_disk()
@ -14,10 +18,15 @@ class Directory:
packed_list = []
for source_hash in self.directory_entries:
e = self.directory_entries[source_hash]
packed_list.append((e.source_hash, e.display_name, e.trust_level))
packed_list.append((e.source_hash, e.display_name, e.trust_level, e.hosts_node))
directory = {
"entry_list": packed_list,
"announce_stream": self.announce_stream
}
file = open(self.app.directorypath, "wb")
file.write(msgpack.packb(packed_list))
file.write(msgpack.packb(directory))
file.close()
except Exception as e:
RNS.log("Could not write directory to disk. Then contained exception was: "+str(e), RNS.LOG_ERROR)
@ -26,18 +35,31 @@ class Directory:
if os.path.isfile(self.app.directorypath):
try:
file = open(self.app.directorypath, "rb")
unpacked_list = msgpack.unpackb(file.read())
unpacked_directory = msgpack.unpackb(file.read())
unpacked_list = unpacked_directory["entry_list"]
file.close()
entries = {}
for e in unpacked_list:
entries[e[0]] = DirectoryEntry(e[0], e[1], e[2])
if len(e) > 3:
hosts_node = e[3]
else:
hosts_node = False
entries[e[0]] = DirectoryEntry(e[0], e[1], e[2], hosts_node)
self.directory_entries = entries
self.announce_stream = unpacked_directory["announce_stream"]
except Exception as e:
RNS.log("Could not load directory from disk. The contained exception was: "+str(e), RNS.LOG_ERROR)
def announce_received(self, source_hash, app_data):
timestamp = time.time()
self.announce_stream.insert(0, (timestamp, source_hash, app_data))
while len(self.announce_stream) > Directory.ANNOUNCE_STREAM_MAXLENGTH:
self.announce_stream.pop()
def display_name(self, source_hash):
if source_hash in self.directory_entries:
return self.directory_entries[source_hash].display_name
@ -94,6 +116,27 @@ class Directory:
except Exception as e:
return False
def known_nodes(self):
node_list = []
for eh in self.directory_entries:
e = self.directory_entries[eh]
if e.hosts_node:
node_list.append(e)
return node_list
def number_of_known_nodes(self):
return len(self.known_nodes())
def number_of_known_peers(self, lookback_seconds=None):
unique_hashes = []
cutoff_time = time.time()-lookback_seconds
for entry in self.announce_stream:
if not entry[1] in unique_hashes:
if lookback_seconds == None or entry[0] > cutoff_time:
unique_hashes.append(entry[1])
return len(unique_hashes)
class DirectoryEntry:
WARNING = 0x00
@ -101,7 +144,7 @@ class DirectoryEntry:
UNKNOWN = 0x02
TRUSTED = 0xFF
def __init__(self, source_hash, display_name=None, trust_level=UNKNOWN):
def __init__(self, source_hash, display_name=None, trust_level=UNKNOWN, hosts_node=False):
if len(source_hash) == RNS.Identity.TRUNCATED_HASHLENGTH//8:
self.source_hash = source_hash
self.display_name = display_name
@ -109,5 +152,6 @@ class DirectoryEntry:
display_name = source_hash
self.trust_level = trust_level
self.hosts_node = hosts_node
else:
raise TypeError("Attempt to add invalid source hash to directory")

View File

@ -6,6 +6,8 @@ import RNS
import LXMF
import nomadnet
import RNS.vendor.umsgpack as msgpack
from ._version import __version__
from .vendor.configobj import ConfigObj
@ -47,6 +49,7 @@ class NomadNetworkApp:
self.resourcepath = self.configdir+"/storage/resources"
self.conversationpath = self.configdir+"/storage/conversations"
self.directorypath = self.configdir+"/storage/directory"
self.peersettingspath = self.configdir+"/storage/peersettings"
if not os.path.isdir(self.storagepath):
os.makedirs(self.storagepath)
@ -102,13 +105,37 @@ class NomadNetworkApp:
RNS.log("The contained exception was: %s" % (str(e)), RNS.LOG_ERROR)
nomadnet.panic()
if os.path.isfile(self.peersettingspath):
try:
file = open(self.peersettingspath, "rb")
self.peer_settings = msgpack.unpackb(file.read())
file.close()
except Exception as e:
RNS.log("Could not load local peer settings from "+self.peersettingspath, RNS.LOG_ERROR)
RNS.log("The contained exception was: %s" % (str(e)), RNS.LOG_ERROR)
nomadnet.panic()
else:
try:
RNS.log("No peer settings file found, creating new...")
self.peer_settings = {
"display_name": "",
"announce_interval": None,
"last_announce": None,
}
self.save_peer_settings()
RNS.log("Created new peer settings file")
except Exception as e:
RNS.log("Could not create and save a new peer settings file", RNS.LOG_ERROR)
RNS.log("The contained exception was: %s" % (str(e)), RNS.LOG_ERROR)
nomadnet.panic()
atexit.register(self.exit_handler)
self.message_router = LXMF.LXMRouter()
self.message_router.register_delivery_callback(self.lxmf_delivery)
self.lxmf_destination = self.message_router.register_delivery_identity(self.identity)
self.lxmf_destination = self.message_router.register_delivery_identity(self.identity, display_name=self.peer_settings["display_name"])
RNS.Identity.remember(
packet_hash=None,
@ -117,12 +144,32 @@ class NomadNetworkApp:
app_data=None
)
RNS.Transport.register_announce_handler(nomadnet.Conversation)
RNS.log("LXMF Router ready to receive on: "+RNS.prettyhexrep(self.lxmf_destination.hash))
self.directory = nomadnet.Directory.Directory(self)
nomadnet.ui.spawn(self.uimode)
def set_display_name(self, display_name):
self.peer_settings["display_name"] = display_name
self.lxmf_destination.display_name = display_name
self.lxmf_destination.set_default_app_data(display_name.encode("utf-8"))
self.save_peer_settings()
def get_display_name(self):
return self.peer_settings["display_name"]
def announce_now(self):
self.lxmf_destination.announce()
self.peer_settings["last_announce"] = time.time()
self.save_peer_settings()
def save_peer_settings(self):
file = open(self.peersettingspath, "wb")
file.write(msgpack.packb(self.peer_settings))
file.close()
def lxmf_delivery(self, message):
time_string = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(message.timestamp))
@ -191,6 +238,11 @@ class NomadNetworkApp:
else:
self.config["textui"]["intro_time"] = self.config["textui"].as_int("intro_time")
if not "animation_interval" in self.config["textui"]:
self.config["textui"]["animation_interval"] = 1
else:
self.config["textui"]["animation_interval"] = self.config["textui"].as_int("animation_interval")
if not "colormode" in self.config["textui"]:
self.config["textui"]["colormode"] = nomadnet.ui.COLORMODE_16
else:

View File

@ -22,6 +22,8 @@ THEMES = {
('shortcutbar', 'black', 'light gray', 'standout', '#111', '#bbb'),
('body_text', 'white', 'default', 'default', '#0a0', 'default'),
('error_text', 'dark red', 'default', 'default', 'dark red', 'default'),
('warning_text', 'yellow', 'default', 'default', '#ba4', 'default'),
('inactive_text', 'dark gray', 'default', 'default', 'dark gray', 'default'),
('buttons', 'light green,bold', 'default', 'default', '#00a533', 'default'),
('msg_editor', 'black', 'light cyan', 'standout', '#111', '#0bb'),
("msg_header_ok", 'black', 'light green', 'standout', '#111', '#6b2'),

View File

@ -13,7 +13,7 @@ class ConversationListDisplayShortcuts():
def __init__(self, app):
self.app = app
self.widget = urwid.AttrMap(urwid.Text("[Enter] Open [C-e] Edit Peer [C-x] Delete [C-n] New"), "shortcutbar")
self.widget = urwid.AttrMap(urwid.Text("[Enter] Open [C-e] Peer Info [C-x] Delete [C-n] New"), "shortcutbar")
class ConversationDisplayShortcuts():
def __init__(self, app):
@ -50,6 +50,7 @@ class ConversationsDisplay():
def __init__(self, app):
self.app = app
self.dialog_open = False
self.currently_displayed_conversation = None
def disp_list_shortcuts(sender, arg1, arg2):
self.shortcuts_display = self.list_shortcuts
@ -57,7 +58,13 @@ class ConversationsDisplay():
self.update_listbox()
self.columns_widget = urwid.Columns([("weight", ConversationsDisplay.list_width, self.listbox), ("weight", 1-ConversationsDisplay.list_width, self.make_conversation_widget(None))], dividechars=0, focus_column=0, box_columns=[0])
self.columns_widget = urwid.Columns(
[
("weight", ConversationsDisplay.list_width, self.listbox),
("weight", 1-ConversationsDisplay.list_width, self.make_conversation_widget(None))
],
dividechars=0, focus_column=0, box_columns=[0]
)
self.list_shortcuts = ConversationListDisplayShortcuts(self.app)
self.editor_shortcuts = ConversationDisplayShortcuts(self.app)
@ -126,8 +133,8 @@ class ConversationsDisplay():
if display_name == None:
display_name = ""
e_id = urwid.Edit(caption="ID : ",edit_text=source_hash_text)
t_id = urwid.Text("ID : "+source_hash_text)
e_id = urwid.Edit(caption="Addr : ",edit_text=source_hash_text)
t_id = urwid.Text("Addr : "+source_hash_text)
e_name = urwid.Edit(caption="Name : ",edit_text=display_name)
selected_id_widget = t_id
@ -177,6 +184,7 @@ class ConversationsDisplay():
self.app.directory.remember(entry)
self.update_conversation_list()
self.dialog_open = False
self.app.ui.main_display.sub_displays.network_display.directory_change_callback()
except Exception as e:
RNS.log("Could not save directory entry. The contained exception was: "+str(e), RNS.LOG_VERBOSE)
if not dialog_pile.error_display:
@ -190,6 +198,7 @@ class ConversationsDisplay():
known_section = urwid.Divider("\u2504")
else:
def query_action(sender, user_data):
self.close_conversation_by_hash(user_data)
nomadnet.Conversation.query_for_peer(user_data)
options = dialog_pile.options(height_type="pack")
dialog_pile.contents = [
@ -211,7 +220,7 @@ class ConversationsDisplay():
])
dialog_pile.error_display = False
dialog = DialogLineBox(dialog_pile, title="Edit Peer")
dialog = DialogLineBox(dialog_pile, title="Peer Info")
dialog.delegate = self
bottom = self.listbox
@ -225,7 +234,7 @@ class ConversationsDisplay():
source_hash = ""
display_name = ""
e_id = urwid.Edit(caption="ID : ",edit_text=source_hash)
e_id = urwid.Edit(caption="Addr : ",edit_text=source_hash)
e_name = urwid.Edit(caption="Name : ",edit_text=display_name)
trust_button_group = []
@ -276,7 +285,7 @@ class ConversationsDisplay():
r_unknown,
r_trusted,
urwid.Text(""),
urwid.Columns([("weight", 0.45, urwid.Button("Start", on_press=confirmed)), ("weight", 0.1, urwid.Text("")), ("weight", 0.45, urwid.Button("Back", on_press=dismiss_dialog))])
urwid.Columns([("weight", 0.45, urwid.Button("Create", on_press=confirmed)), ("weight", 0.1, urwid.Text("")), ("weight", 0.45, urwid.Button("Back", on_press=dismiss_dialog))])
])
dialog_pile.error_display = False
@ -331,9 +340,19 @@ class ConversationsDisplay():
widget.check_editor_allowed()
return widget
def close_conversation_by_hash(self, conversation_hash):
if conversation_hash in ConversationsDisplay.cached_conversation_widgets:
ConversationsDisplay.cached_conversation_widgets.pop(conversation_hash)
if self.currently_displayed_conversation == conversation_hash:
self.display_conversation(sender=None, source_hash=None)
def close_conversation(self, conversation):
ConversationsDisplay.cached_conversation_widgets.pop(conversation.source_hash)
self.display_conversation(sender=None, source_hash=None)
if conversation.source_hash in ConversationsDisplay.cached_conversation_widgets:
ConversationsDisplay.cached_conversation_widgets.pop(conversation.source_hash)
if self.currently_displayed_conversation == conversation.source_hash:
self.display_conversation(sender=None, source_hash=None)
def conversation_list_widget(self, conversation):
@ -450,7 +469,7 @@ class ConversationWidget(urwid.WidgetWrap):
def __init__(self, source_hash):
if source_hash == None:
self.frame = None
display_widget = urwid.LineBox(urwid.Filler(urwid.Text("No conversation selected"), "top"))
display_widget = urwid.LineBox(urwid.Filler(urwid.Text("\n No conversation selected"), "top"))
urwid.WidgetWrap.__init__(self, display_widget)
else:
if source_hash in ConversationsDisplay.cached_conversation_widgets:
@ -505,7 +524,8 @@ class ConversationWidget(urwid.WidgetWrap):
self.frame = ConversationFrame(
self.messagelist,
header=header,
footer=self.minimal_editor
footer=self.minimal_editor,
focus_part="footer"
)
self.frame.delegate = self

View File

@ -87,6 +87,7 @@ class MainDisplay():
def show_network(self, user_data):
self.sub_displays.active_display = self.sub_displays.network_display
self.update_active_sub_display()
self.sub_displays.network_display.start()
def show_conversations(self, user_data):
self.sub_displays.active_display = self.sub_displays.conversations_display

View File

@ -1,21 +1,519 @@
import RNS
import urwid
import nomadnet
from datetime import datetime
from nomadnet.Directory import DirectoryEntry
from nomadnet.vendor.additional_urwid_widgets import IndicativeListBox, MODIFIER_KEY
class NetworkDisplayShortcuts():
def __init__(self, app):
import urwid
self.app = app
self.widget = urwid.AttrMap(urwid.Text("Network Display Shortcuts"), "shortcutbar")
class NetworkDisplay():
def __init__(self, app):
import urwid
self.app = app
class DialogLineBox(urwid.LineBox):
def keypress(self, size, key):
if key == "esc":
self.delegate.update_conversation_list()
else:
return super(DialogLineBox, self).keypress(size, key)
class ListEntry(urwid.Text):
_selectable = True
signals = ["click"]
def keypress(self, size, key):
"""
Send 'click' signal on 'activate' command.
"""
if self._command_map[key] != urwid.ACTIVATE:
return key
self._emit('click')
def mouse_event(self, size, event, button, x, y, focus):
"""
Send 'click' signal on button 1 press.
"""
if button != 1 or not urwid.util.is_mouse_press(event):
return False
self._emit('click')
return True
class AnnounceStreamEntry(urwid.WidgetWrap):
def __init__(self, app, timestamp, source_hash):
self.app = app
self.timestamp = timestamp
time_format = app.time_format
dt = datetime.fromtimestamp(self.timestamp)
ts_string = dt.strftime(time_format)
trust_level = self.app.directory.trust_level(source_hash)
display_str = self.app.directory.simplest_display_str(source_hash)
if trust_level == DirectoryEntry.UNTRUSTED:
symbol = "\u2715"
style = "list_untrusted"
focus_style = "list_focus_untrusted"
elif trust_level == DirectoryEntry.UNKNOWN:
symbol = "?"
style = "list_unknown"
focus_style = "list_focus"
elif trust_level == DirectoryEntry.TRUSTED:
symbol = "\u2713"
style = "list_trusted"
focus_style = "list_focus_trusted"
elif trust_level == DirectoryEntry.WARNING:
symbol = "\u26A0"
style = "list_warning"
focus_style = "list_focus"
else:
symbol = "\u26A0"
style = "list_untrusted"
focus_style = "list_focus_untrusted"
widget = ListEntry(ts_string+" "+display_str)
self.display_widget = urwid.AttrMap(widget, style, focus_style)
urwid.WidgetWrap.__init__(self, self.display_widget)
class AnnounceStream(urwid.WidgetWrap):
def __init__(self, app, parent):
self.app = app
self.parent = parent
self.started = False
self.timeout = self.app.config["textui"]["animation_interval"]*2
self.ilb = None
self.added_entries = []
self.widget_list = []
self.update_widget_list()
wlt = [AnnounceStreamEntry(self.app, e[0], e[1]) for e in self.app.directory.announce_stream]
self.ilb = IndicativeListBox(
self.widget_list,
#wlt,
on_selection_change=self.list_selection,
initialization_is_selection_change=False,
modifier_key=MODIFIER_KEY.CTRL,
#highlight_offFocus="list_off_focus"
)
self.display_widget = self.ilb
urwid.WidgetWrap.__init__(self, urwid.LineBox(self.display_widget, title="Announce Stream"))
def rebuild_widget_list(self):
self.added_entries = []
self.widget_list = []
self.update_widget_list()
def update_widget_list(self):
new_entries = []
for e in self.app.directory.announce_stream:
if not e[0] in self.added_entries:
self.added_entries.insert(0, e[0])
new_entries.insert(0, e)
new_widgets = [AnnounceStreamEntry(self.app, e[0], e[1]) for e in new_entries]
for nw in new_widgets:
self.widget_list.insert(0, nw)
if len(new_widgets) > 0:
RNS.log("Inserted "+str(len(new_widgets))+" widgets")
if self.ilb != None:
self.ilb.set_body(self.widget_list)
def list_selection(self, arg1, arg2):
pass
def update(self):
self.update_widget_list()
def update_callback(self, loop=None, user_data=None):
self.update()
if self.started:
self.app.ui.loop.set_alarm_in(self.timeout, self.update_callback)
def start(self):
was_started = self.started
self.started = True
if not was_started:
self.update_callback()
def stop(self):
self.started = False
class SelectText(urwid.Text):
_selectable = True
signals = ["click"]
def keypress(self, size, key):
"""
Send 'click' signal on 'activate' command.
"""
if self._command_map[key] != urwid.ACTIVATE:
return key
self._emit('click')
def mouse_event(self, size, event, button, x, y, focus):
"""
Send 'click' signal on button 1 press.
"""
if button != 1 or not urwid.util.is_mouse_press(event):
return False
self._emit('click')
return True
class KnownNodes(urwid.WidgetWrap):
def __init__(self, app):
self.app = app
self.node_list = app.directory.known_nodes()
self.ilb = IndicativeListBox(
self.make_node_widgets(),
on_selection_change=self.node_list_selection,
initialization_is_selection_change=False,
highlight_offFocus="list_off_focus"
)
if len(self.node_list) > 0:
self.display_widget = self.ilb
widget_style = None
self.no_content = False
else:
self.no_content = True
widget_style = "inactive_text"
self.display_widget = urwid.Pile([urwid.Text(("warning_text", "- \u2139 -\n"), align="center"), SelectText(("warning_text", "Currently, no nodes are known\n\n"), align="center")])
urwid.WidgetWrap.__init__(self, urwid.AttrMap(urwid.LineBox(self.display_widget, title="Known Nodes"), widget_style))
def keypress(self, size, key):
if key == "up" and (self.no_content or self.ilb.top_is_visible):
nomadnet.NomadNetworkApp.get_shared_instance().ui.main_display.frame.set_focus("header")
return super(KnownNodes, self).keypress(size, key)
def node_list_selection(self, arg1, arg2):
pass
def make_node_widgets(self):
widget_list = []
for node_entry in self.node_list:
# TODO: Implement this
widget_list.append(ListEntry("Node "+RNS.prettyhexrep(node_entry.source_hash)))
# TODO: Sort list
return widget_list
class AnnounceTime(urwid.WidgetWrap):
def __init__(self, app):
self.started = False
self.app = app
self.timeout = self.app.config["textui"]["animation_interval"]
self.display_widget = urwid.Text("")
self.update_time()
urwid.WidgetWrap.__init__(self, self.display_widget)
def update_time(self):
self.last_announce_string = "Never"
if self.app.peer_settings["last_announce"] != None:
self.last_announce_string = pretty_date(int(self.app.peer_settings["last_announce"]))
self.display_widget.set_text("Last Announce : "+self.last_announce_string)
def update_time_callback(self, loop=None, user_data=None):
self.update_time()
if self.started:
self.app.ui.loop.set_alarm_in(self.timeout, self.update_time_callback)
def start(self):
was_started = self.started
self.started = True
if not was_started:
self.update_time_callback()
def stop(self):
self.started = False
class LocalPeer(urwid.WidgetWrap):
announce_timer = None
def __init__(self, app, parent):
self.app = app
self.parent = parent
self.dialog_open = False
display_name = self.app.lxmf_destination.display_name
if display_name == None:
display_name = ""
t_id = urwid.Text("Addr : "+RNS.hexrep(self.app.lxmf_destination.hash, delimit=False))
e_name = urwid.Edit(caption="Name : ", edit_text=display_name)
def save_query(sender):
def dismiss_dialog(sender):
self.dialog_open = False
self.parent.left_pile.contents[2] = (LocalPeer(self.app, self.parent), options)
self.app.set_display_name(e_name.get_edit_text())
dialog = DialogLineBox(
urwid.Pile([
urwid.Text("\nSaved", align="center"),
urwid.Button("OK", on_press=dismiss_dialog)
]), title="\u2139"
)
dialog.delegate = self
bottom = self
overlay = urwid.Overlay(dialog, bottom, align="center", width=("relative", 100), valign="middle", height="pack", left=4, right=4)
options = self.parent.left_pile.options()
self.dialog_open = True
self.parent.left_pile.contents[2] = (overlay, options)
def announce_query(sender):
def dismiss_dialog(sender):
self.dialog_open = False
options = self.parent.left_pile.options(height_type="pack", height_amount=None)
self.parent.left_pile.contents[3] = (LocalPeer(self.app, self.parent), options)
self.app.announce_now()
dialog = DialogLineBox(
urwid.Pile([
urwid.Text("\n\n\nAnnounce Sent\n\n", align="center"),
urwid.Button("OK", on_press=dismiss_dialog)
]), title="\u2139"
)
dialog.delegate = self
bottom = self
#overlay = urwid.Overlay(dialog, bottom, align="center", width=("relative", 100), valign="middle", height="pack", left=4, right=4)
overlay = dialog
self.dialog_open = True
options = self.parent.left_pile.options(height_type="pack", height_amount=None)
self.parent.left_pile.contents[3] = (overlay, options)
def node_settings_query(sender):
options = self.parent.left_pile.options(height_type="pack", height_amount=None)
self.parent.left_pile.contents[3] = (self.parent.node_settings_display, options)
if LocalPeer.announce_timer == None:
self.t_last_announce = AnnounceTime(self.app)
LocalPeer.announce_timer = self.t_last_announce
else:
self.t_last_announce = LocalPeer.announce_timer
self.t_last_announce.update_time()
announce_button = urwid.Button("Announce Now", on_press=announce_query)
self.display_widget = urwid.Pile(
[
t_id,
e_name,
urwid.Divider("\u2504"),
self.t_last_announce,
announce_button,
urwid.Divider("\u2504"),
urwid.Columns([("weight", 0.45, urwid.Button("Save", on_press=save_query)), ("weight", 0.1, urwid.Text("")), ("weight", 0.45, urwid.Button("Node Settings", on_press=node_settings_query))])
]
)
urwid.WidgetWrap.__init__(self, urwid.LineBox(self.display_widget, title="Local Peer Info"))
def start(self):
self.t_last_announce.start()
class NodeSettings(urwid.WidgetWrap):
def __init__(self, app, parent):
self.app = app
self.parent = parent
def show_peer_info(sender):
options = self.parent.left_pile.options(height_type="pack", height_amount=None)
self.parent.left_pile.contents[3] = (LocalPeer(self.app, self.parent), options)
widget_style = "inactive_text"
pile = urwid.Pile([
urwid.Text(("body_text", "Network Display \U0001F332")),
urwid.Text("- \u2139 -\n", align="center"),
urwid.Text("\nNode Hosting currently unavailable\n\n", align="center"),
urwid.Padding(urwid.Button("Back", on_press=show_peer_info), "center", "pack")
])
self.display_widget = pile
urwid.WidgetWrap.__init__(self, urwid.AttrMap(urwid.LineBox(self.display_widget, title="Node Settings"), widget_style))
def node_list_selection(self, arg1, arg2):
pass
def make_node_widgets(self):
widget_list = []
for node_entry in self.node_list:
# TODO: Implement this
widget_list.append(ListEntry("Node "+RNS.prettyhexrep(node_entry.source_hash)))
# TODO: Sort list
return widget_list
class UpdatingText(urwid.WidgetWrap):
def __init__(self, app, title, value_method, append_text=""):
self.started = False
self.app = app
self.timeout = self.app.config["textui"]["animation_interval"]*5
self.display_widget = urwid.Text("")
self.value = None
self.value_method = value_method
self.title = title
self.append_text = append_text
self.update()
urwid.WidgetWrap.__init__(self, self.display_widget)
def update(self):
self.value = self.value_method()
self.display_widget.set_text(self.title+str(self.value)+str(self.append_text))
def update_callback(self, loop=None, user_data=None):
self.update()
if self.started:
self.app.ui.loop.set_alarm_in(self.timeout, self.update_callback)
def start(self):
was_started = self.started
self.started = True
if not was_started:
self.update_callback()
def stop(self):
self.started = False
class NetworkStats(urwid.WidgetWrap):
def __init__(self, app, parent):
self.app = app
self.parent = parent
def get_num_peers():
return self.app.directory.number_of_known_peers(lookback_seconds=30*60)
def get_num_nodes():
return self.app.directory.number_of_known_nodes()
self.w_heard_peers = UpdatingText(self.app, "Heard Peers: ", get_num_peers, append_text=" (last 30m)")
self.w_known_nodes = UpdatingText(self.app, "Known Nodes: ", get_num_nodes)
pile = urwid.Pile([
self.w_heard_peers,
self.w_known_nodes,
])
self.display_widget = urwid.LineBox(pile, title="Network Stats")
urwid.WidgetWrap.__init__(self, self.display_widget)
def start(self):
self.w_heard_peers.start()
self.w_known_nodes.start()
class NetworkDisplay():
list_width = 0.33
def __init__(self, app):
self.app = app
self.known_nodes_display = KnownNodes(self.app)
self.network_stats_display = NetworkStats(self.app, self)
self.announce_stream_display = AnnounceStream(self.app, self)
self.local_peer_display = LocalPeer(self.app, self)
self.node_settings_display = NodeSettings(self.app, self)
self.left_pile = urwid.Pile([
("pack", self.known_nodes_display),
("weight", 1, self.announce_stream_display),
("pack", self.network_stats_display),
("pack", self.local_peer_display),
])
self.left_area = self.left_pile
self.right_area = urwid.AttrMap(urwid.LineBox(urwid.Filler(urwid.Text("Disconnected\n\u2190 \u2192", align="center"), "middle"), title="Remote Node"), "inactive_text")
self.columns = urwid.Columns(
[
("weight", NetworkDisplay.list_width, self.left_area),
("weight", 1-NetworkDisplay.list_width, self.right_area)
],
dividechars=0, focus_column=0
)
self.shortcuts_display = NetworkDisplayShortcuts(self.app)
self.widget = urwid.Filler(pile, 'top')
self.widget = self.columns
def start(self):
self.local_peer_display.start()
self.network_stats_display.start()
self.announce_stream_display.start()
def shortcuts(self):
return self.shortcuts_display
def directory_change_callback(self):
self.announce_stream_display.rebuild_widget_list()
def pretty_date(time=False):
"""
Get a datetime object or a int() Epoch timestamp and return a
pretty string like 'an hour ago', 'Yesterday', '3 months ago',
'just now', etc
"""
from datetime import datetime
now = datetime.now()
if type(time) is int:
diff = now - datetime.fromtimestamp(time)
elif isinstance(time,datetime):
diff = now - time
elif not time:
diff = now - now
second_diff = diff.seconds
day_diff = diff.days
if day_diff < 0:
return ''
if day_diff == 0:
if second_diff < 10:
return "just now"
if second_diff < 60:
return str(second_diff) + " seconds ago"
if second_diff < 120:
return "a minute ago"
if second_diff < 3600:
return str(int(second_diff / 60)) + " minutes ago"
if second_diff < 7200:
return "an hour ago"
if second_diff < 86400:
return str(int(second_diff / 3600)) + " hours ago"
if day_diff == 1:
return "Yesterday"
if day_diff < 7:
return str(day_diff) + " days ago"
if day_diff < 31:
return str(int(day_diff / 7)) + " weeks ago"
if day_diff < 365:
return str(int(day_diff / 30)) + " months ago"
return str(int(day_diff / 365)) + " years ago"

View File

@ -1 +1,4 @@
quotes = [("I want the wisdom that wise men revere. I want more.", "Faithless")]
quotes = [
("I want the wisdom that wise men revere. I want more.", "Faithless"),
("That's enough entropy for you my friend", "Unknown")
]