# -*- coding: utf-8 -*- """ OnionShare | https://onionshare.org/ Copyright (C) 2014-2022 Micah Lee, et al. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . """ import base64 import hashlib import importlib.resources as importlib_resources import os import platform import random import requests import socket import sys import threading import time import shutil import re import colorama from colorama import Fore, Back, Style from .settings import Settings class CannotFindTor(Exception): """ OnionShare can't find a tor binary """ class Common: """ The Common object is shared amongst all parts of OnionShare. """ def __init__(self, verbose=False): self.verbose = verbose colorama.init(autoreset=True) # The platform OnionShare is running on self.platform = platform.system() if self.platform.endswith("BSD") or self.platform == "DragonFly": self.platform = "BSD" # The current version of OnionShare with open(self.get_resource_path("version.txt")) as f: self.version = f.read().strip() def display_banner(self): """ Raw ASCII art example: ╭───────────────────────────────────────────╮ │ * ▄▄█████▄▄ * │ │ ▄████▀▀▀████▄ * │ │ ▀▀█▀ ▀██▄ │ │ * ▄█▄ ▀██▄ │ │ ▄█████▄ ███ -+- │ │ ███ ▀█████▀ │ │ ▀██▄ ▀█▀ │ │ * ▀██▄ ▄█▄▄ * │ │ * ▀████▄▄▄████▀ │ │ ▀▀█████▀▀ │ │ -+- * │ │ ▄▀▄ ▄▀▀ █ │ │ █ █ ▀ ▀▄ █ │ │ █ █ █▀▄ █ ▄▀▄ █▀▄ ▀▄ █▀▄ ▄▀▄ █▄▀ ▄█▄ │ │ ▀▄▀ █ █ █ ▀▄▀ █ █ ▄▄▀ █ █ ▀▄█ █ ▀▄▄ │ │ │ │ v2.3.1 │ │ │ │ https://onionshare.org/ │ ╰───────────────────────────────────────────╯ """ try: print( Back.MAGENTA + Fore.WHITE + "╭───────────────────────────────────────────╮" ) print( Back.MAGENTA + Fore.WHITE + "│" + Fore.LIGHTMAGENTA_EX + " * " + Fore.WHITE + "▄▄█████▄▄" + Fore.LIGHTMAGENTA_EX + " * " + Fore.WHITE + "│" ) print( Back.MAGENTA + Fore.WHITE + "│" + Fore.WHITE + " ▄████▀▀▀████▄" + Fore.LIGHTMAGENTA_EX + " * " + Fore.WHITE + "│" ) print( Back.MAGENTA + Fore.WHITE + "│" + Fore.WHITE + " ▀▀█▀ ▀██▄ " + Fore.WHITE + "│" ) print( Back.MAGENTA + Fore.WHITE + "│" + Fore.LIGHTMAGENTA_EX + " * " + Fore.WHITE + "▄█▄ ▀██▄ " + Fore.WHITE + "│" ) print( Back.MAGENTA + Fore.WHITE + "│" + Fore.WHITE + " ▄█████▄ ███" + Fore.LIGHTMAGENTA_EX + " -+- " + Fore.WHITE + "│" ) print( Back.MAGENTA + Fore.WHITE + "│" + Fore.WHITE + " ███ ▀█████▀ " + Fore.WHITE + "│" ) print( Back.MAGENTA + Fore.WHITE + "│" + Fore.WHITE + " ▀██▄ ▀█▀ " + Fore.WHITE + "│" ) print( Back.MAGENTA + Fore.WHITE + "│" + Fore.LIGHTMAGENTA_EX + " * " + Fore.WHITE + "▀██▄ ▄█▄▄" + Fore.LIGHTMAGENTA_EX + " * " + Fore.WHITE + "│" ) print( Back.MAGENTA + Fore.WHITE + "│" + Fore.LIGHTMAGENTA_EX + " * " + Fore.WHITE + "▀████▄▄▄████▀ " + Fore.WHITE + "│" ) print( Back.MAGENTA + Fore.WHITE + "│" + Fore.WHITE + " ▀▀█████▀▀ " + Fore.WHITE + "│" ) print( Back.MAGENTA + Fore.WHITE + "│" + Fore.LIGHTMAGENTA_EX + " -+- * " + Fore.WHITE + "│" ) print( Back.MAGENTA + Fore.WHITE + "│" + Fore.WHITE + " ▄▀▄ ▄▀▀ █ " + Fore.WHITE + "│" ) print( Back.MAGENTA + Fore.WHITE + "│" + Fore.WHITE + " █ █ ▀ ▀▄ █ " + Fore.WHITE + "│" ) print( Back.MAGENTA + Fore.WHITE + "│" + Fore.WHITE + " █ █ █▀▄ █ ▄▀▄ █▀▄ ▀▄ █▀▄ ▄▀▄ █▄▀ ▄█▄ " + Fore.WHITE + "│" ) print( Back.MAGENTA + Fore.WHITE + "│" + Fore.WHITE + " ▀▄▀ █ █ █ ▀▄▀ █ █ ▄▄▀ █ █ ▀▄█ █ ▀▄▄ " + Fore.WHITE + "│" ) print( Back.MAGENTA + Fore.WHITE + "│ │" ) left_spaces = (43 - len(self.version) - 1) // 2 right_spaces = left_spaces if left_spaces + len(self.version) + 1 + right_spaces < 43: right_spaces += 1 print( Back.MAGENTA + Fore.WHITE + "│" + Fore.WHITE + f"{' '*left_spaces}v{self.version}{' '*right_spaces}" + Fore.WHITE + "│" ) print( Back.MAGENTA + Fore.WHITE + "│ │" ) print( Back.MAGENTA + Fore.WHITE + "│" + Fore.WHITE + " https://onionshare.org/ " + Fore.WHITE + "│" ) print( Back.MAGENTA + Fore.WHITE + "╰───────────────────────────────────────────╯" ) print() except: # If anything fails, print a boring banner print(f"OnionShare v{self.version}") print("https://onionshare.org/") print() def load_settings(self, config=None): """ Loading settings, optionally from a custom config json file. """ self.settings = Settings(self, config) self.settings.load() def log(self, module, func, msg=None): """ If verbose mode is on, log error messages to stdout """ if self.verbose: timestamp = time.strftime("%b %d %Y %X") final_msg = f"{Fore.LIGHTBLACK_EX + Style.DIM}[{timestamp}]{Style.RESET_ALL} {Fore.WHITE + Style.DIM}{module}.{func}{Style.RESET_ALL}" if msg: final_msg = ( f"{final_msg}{Fore.WHITE + Style.DIM}: {msg}{Style.RESET_ALL}" ) print(final_msg) def get_resource_path(self, filename): """ Returns the absolute path of a resource """ ref = importlib_resources.files("onionshare_cli.resources") / filename with importlib_resources.as_file(ref) as path: self.log("Common", "get_resource_path", f"filename={filename}, path={str(path)}") return str(path) def get_tor_paths(self): if self.platform == "Linux": tor_path = shutil.which("tor") if not tor_path: raise CannotFindTor() obfs4proxy_file_path = shutil.which("obfs4proxy") snowflake_file_path = shutil.which("snowflake-client") meek_client_file_path = shutil.which("meek-client") prefix = os.path.dirname(os.path.dirname(tor_path)) tor_geo_ip_file_path = os.path.join(prefix, "share/tor/geoip") tor_geo_ipv6_file_path = os.path.join(prefix, "share/tor/geoip6") elif self.platform == "Windows": # In Windows, the Tor binaries are in the onionshare package, not the onionshare_cli package base_path = self.get_resource_path("tor") base_path = base_path.replace("onionshare_cli", "onionshare") tor_path = os.path.join(base_path, "tor.exe") # If tor.exe isn't there, mayber we're running from the source tree if not os.path.exists(tor_path): self.log( "Common", "get_tor_paths", f"Cannot find tor.exe at {tor_path}" ) base_path = os.path.join(os.getcwd(), "onionshare", "resources", "tor") tor_path = os.path.join(base_path, "tor.exe") if not os.path.exists(tor_path): self.log( "Common", "get_tor_paths", f"Cannot find tor.exe at {tor_path}" ) raise CannotFindTor() obfs4proxy_file_path = os.path.join(base_path, "obfs4proxy.exe") snowflake_file_path = os.path.join(base_path, "snowflake-client.exe") meek_client_file_path = os.path.join(base_path, "meek-client.exe") tor_geo_ip_file_path = os.path.join(base_path, "geoip") tor_geo_ipv6_file_path = os.path.join(base_path, "geoip6") elif self.platform == "Darwin": # Let's see if we have tor binaries in the onionshare GUI package base_path = self.get_resource_path("tor") base_path = base_path.replace("onionshare_cli", "onionshare") tor_path = os.path.join(base_path, "tor") if os.path.exists(tor_path): obfs4proxy_file_path = os.path.join(base_path, "obfs4proxy") snowflake_file_path = os.path.join(base_path, "snowflake-client") meek_client_file_path = os.path.join(base_path, "meek-client") tor_geo_ip_file_path = os.path.join(base_path, "geoip") tor_geo_ipv6_file_path = os.path.join(base_path, "geoip6") else: # Fallback to looking in the path tor_path = shutil.which("tor") if not os.path.exists(tor_path): raise CannotFindTor() obfs4proxy_file_path = shutil.which("obfs4proxy") snowflake_file_path = shutil.which("snowflake-client") meek_client_file_path = shutil.which("meek-client") prefix = os.path.dirname(os.path.dirname(tor_path)) tor_geo_ip_file_path = os.path.join(prefix, "share/tor/geoip") tor_geo_ipv6_file_path = os.path.join(prefix, "share/tor/geoip6") elif self.platform == "BSD": tor_path = "/usr/local/bin/tor" tor_geo_ip_file_path = "/usr/local/share/tor/geoip" tor_geo_ipv6_file_path = "/usr/local/share/tor/geoip6" obfs4proxy_file_path = "/usr/local/bin/obfs4proxy" snowflake_file_path = "/usr/local/bin/snowflake-client" meek_client_file_path = "/usr/local/bin/meek-client" return ( tor_path, tor_geo_ip_file_path, tor_geo_ipv6_file_path, obfs4proxy_file_path, snowflake_file_path, meek_client_file_path, ) def build_data_dir(self): """ Returns the path of the OnionShare data directory. """ if self.platform == "Windows": try: appdata = os.environ["APPDATA"] onionshare_data_dir = f"{appdata}\\OnionShare" except Exception: # If for some reason we don't have the 'APPDATA' environment variable # (like running tests in Linux while pretending to be in Windows) try: xdg_config_home = os.environ["XDG_CONFIG_HOME"] onionshare_data_dir = f"{xdg_config_home}/onionshare" except Exception: onionshare_data_dir = os.path.expanduser("~/.config/onionshare") elif self.platform == "Darwin": onionshare_data_dir = os.path.expanduser( "~/Library/Application Support/OnionShare" ) else: try: xdg_config_home = os.environ["XDG_CONFIG_HOME"] onionshare_data_dir = f"{xdg_config_home}/onionshare" except Exception: onionshare_data_dir = os.path.expanduser("~/.config/onionshare") # Modify the data dir if running tests if getattr(sys, "onionshare_test_mode", False): onionshare_data_dir += "-testdata" os.makedirs(onionshare_data_dir, 0o700, True) return onionshare_data_dir def build_tmp_dir(self): """ Returns path to a folder that can hold temporary files """ tmp_dir = os.path.join(self.build_data_dir(), "tmp") os.makedirs(tmp_dir, 0o700, True) return tmp_dir def build_persistent_dir(self): """ Returns the path to the folder that holds persistent files """ persistent_dir = os.path.join(self.build_data_dir(), "persistent") os.makedirs(persistent_dir, 0o700, True) return persistent_dir def build_tor_dir(self): """ Returns path to the tor data directory """ tor_dir = os.path.join(self.build_data_dir(), "tor_data") os.makedirs(tor_dir, 0o700, True) return tor_dir def build_password(self, word_count=2): """ Returns a random string made of words from the wordlist, such as "deter-trig". """ with open(self.get_resource_path("wordlist.txt")) as f: wordlist = f.read().split() r = random.SystemRandom() return "-".join(r.choice(wordlist) for _ in range(word_count)) def build_username(self, word_count=2): """ Returns a random string made of words from the wordlist, such as "deter-trig". """ with open(self.get_resource_path("wordlist.txt")) as f: wordlist = f.read().split() r = random.SystemRandom() return "-".join(r.choice(wordlist) for _ in range(word_count)) def check_bridges_valid(self, bridges): """ Does a regex check against a supplied list of bridges, to make sure they are valid strings depending on the bridge type. """ valid_bridges = [] self.log("Common", "check_bridges_valid", "Checking bridge syntax") for bridge in bridges: if bridge != "": # Check the syntax of the custom bridge to make sure it looks legitimate ipv4_pattern = re.compile( r"(obfs4\s+)?(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]):([0-9]+)(\s+)([A-Z0-9]+)(.+)$" ) ipv6_pattern = re.compile( r"(obfs4\s+)?\[(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))\]:[0-9]+\s+[A-Z0-9]+(.+)$" ) meek_lite_pattern = re.compile( r"(meek_lite)(\s)+([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+:[0-9]+)(\s)+([0-9A-Z]+)(\s)+url=(.+)(\s)+front=(.+)" ) snowflake_pattern = re.compile( r"(snowflake)(\s)+([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+:[0-9]+)(\s)+([0-9A-Z]+)" ) if ( ipv4_pattern.match(bridge) or ipv6_pattern.match(bridge) or meek_lite_pattern.match(bridge) or snowflake_pattern.match(bridge) ): valid_bridges.append(bridge) if valid_bridges: return valid_bridges else: return False def is_flatpak(self): """ Returns True if OnionShare is running in a Flatpak sandbox """ return os.environ.get("FLATPAK_ID") == "org.onionshare.OnionShare" def is_snapcraft(self): """ Returns True if OnionShare is running in a Snapcraft sandbox """ return os.environ.get("SNAP_INSTANCE_NAME") == "onionshare" @staticmethod def random_string(num_bytes, output_len=None): """ Returns a random string with a specified number of bytes. """ b = os.urandom(num_bytes) h = hashlib.sha256(b).digest()[:16] s = base64.b32encode(h).lower().replace(b"=", b"").decode("utf-8") if not output_len: return s return s[:output_len] @staticmethod def human_readable_filesize(b): """ Returns filesize in a human readable format. """ thresh = 1024.0 if b < thresh: return "{:.1f} B".format(b) units = ("KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB") u = 0 b /= thresh while b >= thresh: b /= thresh u += 1 return "{:.1f} {}".format(b, units[u]) @staticmethod def format_seconds(seconds): """Return a human-readable string of the format 1d2h3m4s""" days, seconds = divmod(seconds, 86400) hours, seconds = divmod(seconds, 3600) minutes, seconds = divmod(seconds, 60) human_readable = [] if days: human_readable.append("{:.0f}d".format(days)) if hours: human_readable.append("{:.0f}h".format(hours)) if minutes: human_readable.append("{:.0f}m".format(minutes)) if seconds or not human_readable: human_readable.append("{:.0f}s".format(seconds)) return "".join(human_readable) @staticmethod def estimated_time_remaining(bytes_downloaded, total_bytes, started): now = time.time() time_elapsed = now - started # in seconds download_rate = bytes_downloaded / time_elapsed remaining_bytes = total_bytes - bytes_downloaded eta = remaining_bytes / download_rate return Common.format_seconds(eta) @staticmethod def get_available_port(min_port, max_port): """ Find a random available port within the given range. """ with socket.socket() as tmpsock: while True: try: tmpsock.bind(("127.0.0.1", random.randint(min_port, max_port))) break except OSError: pass _, port = tmpsock.getsockname() return port @staticmethod def dir_size(start_path): """ Calculates the total size, in bytes, of all of the files in a directory. """ total_size = 0 for dirpath, dirnames, filenames in os.walk(start_path): for f in filenames: fp = os.path.join(dirpath, f) if not os.path.islink(fp): total_size += os.path.getsize(fp) return total_size class AutoStopTimer(threading.Thread): """ Background thread sleeps t hours and returns. """ def __init__(self, common, time): threading.Thread.__init__(self) self.common = common self.setDaemon(True) self.time = time def run(self): self.common.log( "AutoStopTimer", f"Server will shut down after {self.time} seconds" ) time.sleep(self.time) return 1