Create discourse2github.py

This commit is contained in:
c0mmando 2025-03-05 00:45:37 +00:00 committed by GitHub
parent 7b236eeac3
commit 8c89c6309a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

616
discourse2github.py Normal file
View File

@ -0,0 +1,616 @@
#!/usr/bin/env python3
"""
Archive Discourse posts and render topics to Markdown from multiple sites.
This script downloads posts from one or more Discourse servers via their APIs.
It archives new posts as JSON files (skipping those already saved or archived),
renders topics to Markdown files for each batch of posts concurrently (with images
downloaded and link URLs rewritten as relative paths), updates a metadata file
after each post is indexed, and then updates a README.md with a table of contents
linking to each archived topic.
Usage:
./discourse2github.py --urls https://forum.hackliberty.org,https://forum.qubes-os.org --target-dir ./archive
"""
import argparse
import concurrent.futures
import functools
import json
import logging
import os
import re
import sys
import time
import urllib.request
import datetime
from dataclasses import dataclass
from pathlib import Path
from urllib.parse import urlparse
import html2text # pip install html2text
from bs4 import BeautifulSoup # pip install beautifulsoup4
# Set up logging. If the 'rich' module is available, it will be used.
loglevel = 'DEBUG' if os.environ.get('DEBUG') else 'INFO'
try:
from rich.logging import RichHandler
logging.basicConfig(level=loglevel, datefmt="[%X]", handlers=[RichHandler()])
except ImportError:
logging.basicConfig(level=loglevel)
log = logging.getLogger('archive')
# Argument parser
parser = argparse.ArgumentParser(
description='Archive topics from one or more Discourse installations and render to Markdown')
parser.add_argument(
'--urls',
help='Comma-separated URLs of Discourse servers (for example: "https://forum.hackliberty.org,https://forum.qubes-os.org")',
default=os.environ.get('DISCOURSE_URLS', 'https://forum.hackliberty.org'))
parser.add_argument(
'--debug', action='store_true', default=os.environ.get('DEBUG', False))
parser.add_argument(
'-t', '--target-dir', help='Target base directory for the archives',
default=Path(os.environ.get('TARGET_DIR', './archive')))
@functools.cache
def args():
return parser.parse_args()
def parse_sites(urls_string: str) -> list:
"""Return a list of cleaned-up site URLs."""
return [url.strip().rstrip('/') for url in urls_string.split(',') if url.strip()]
def http_get(site_url: str, path: str, timeout: int = 15) -> str:
"""Simple HTTP GET with exponential backoff and a timeout."""
full_url = f"{site_url}{path}"
log.debug("HTTP GET %s", full_url)
backoff = 3
while True:
try:
with urllib.request.urlopen(full_url, timeout=timeout) as response:
return response.read().decode()
except Exception as e:
log.debug("Error fetching %s: %s -- Retrying in %d seconds", full_url, e, backoff)
time.sleep(backoff)
backoff *= 2
if backoff >= 256:
log.exception("Rate limit or unrecoverable error for %s", full_url)
sys.exit(1)
def http_get_json(site_url: str, path: str, timeout: int = 15) -> dict:
"""Fetch URL contents from a specific site and decode JSON."""
try:
return json.loads(http_get(site_url, path, timeout=timeout))
except json.JSONDecodeError:
log.warning("Unable to decode JSON response from %r", path)
raise
# ----- Helper: Truncate Filename -----
def truncate_filename(filename: str, max_length: int = 255) -> str:
"""
Truncates the file name to a maximum length (default 255 characters).
It preserves the file extension.
"""
if len(filename) <= max_length:
return filename
p = Path(filename)
stem = p.stem
suffix = "".join(p.suffixes)
max_stem_length = max_length - len(suffix)
if max_stem_length <= 0:
return filename[:max_length]
truncated_stem = stem[:max_stem_length]
return truncated_stem + suffix
# ----- Image / Link Download Helpers -----
def fix_url(original_url: str) -> str:
"""Fix scheme-relative URLs by prepending https: if necessary."""
if original_url.startswith("//"):
fixed = "https:" + original_url
log.debug("Converted scheme-relative URL: %s -> %s", original_url, fixed)
return fixed
return original_url
def download_image(image_url: str, dest_path: Path, timeout: int = 15):
"""
Download an image from image_url and save it to dest_path.
If the file already exists, skip downloading.
A timeout is specified to avoid hanging indefinitely.
"""
if dest_path.exists():
log.debug("Image already downloaded: %s", dest_path)
return
try:
log.info("Downloading image: %s", image_url)
with urllib.request.urlopen(fix_url(image_url), timeout=timeout) as response:
image_data = response.read()
dest_path.parent.mkdir(parents=True, exist_ok=True)
dest_path.write_bytes(image_data)
log.info("Saved image to %s", dest_path)
except Exception as e:
log.error("Failed to download image %s: %s", image_url, e)
def process_srcset(srcset_value: str, topic_dir: Path, topic_relative_path: str) -> str:
"""
Process a srcset attribute value, downloading images and returning a rewritten value.
Downloads every image referenced regardless of URL content.
"""
entries = srcset_value.split(",")
fixed_entries = []
for entry in entries:
parts = entry.strip().split()
if not parts:
continue
orig_url = parts[0]
fixed_url = fix_url(orig_url)
parsed = urlparse(fixed_url)
image_filename = os.path.basename(parsed.path)
if not image_filename:
log.warning("Skipping srcset URL with empty filename: %s", fixed_url)
continue
dest_path = topic_dir / image_filename
download_image(fixed_url, dest_path)
full_path = os.path.join(topic_relative_path, image_filename).replace(os.sep, '/')
if len(parts) > 1:
fixed_entries.append(f"{full_path} {parts[1]}")
else:
fixed_entries.append(f"{full_path}")
return ", ".join(fixed_entries)
def is_image_link(url: str) -> bool:
"""Determine if the URL points to an image by its extension."""
image_extensions = (".png", ".jpg", ".jpeg", ".gif", ".webp")
parsed = urlparse(url)
filename = os.path.basename(parsed.path).lower()
return filename.endswith(image_extensions)
def process_html(html_content: str, topic_dir: Path, topic_relative_path: str) -> str:
"""
Process the given HTML: download referenced images and rewrite links.
Processes both <img> (src, srcset) and <a> tags pointing to images.
Downloads every image referenced in the HTML.
Returns the modified HTML.
"""
soup = BeautifulSoup(html_content, "html.parser")
# Process <img> tags.
for img in soup.find_all("img"):
src = img.get("src")
if src:
src = fix_url(src)
parsed = urlparse(src)
image_filename = os.path.basename(parsed.path)
if image_filename:
dest_path = topic_dir / image_filename
download_image(src, dest_path)
full_src = os.path.join(topic_relative_path, image_filename).replace(os.sep, '/')
img["src"] = full_src
else:
log.warning("Skipping image with empty filename from src: %s", src)
srcset = img.get("srcset")
if srcset:
new_srcset = process_srcset(srcset, topic_dir, topic_relative_path)
img["srcset"] = new_srcset
# Process <a> tags whose href points to images.
for a in soup.find_all("a"):
href = a.get("href")
if href:
fixed_href = fix_url(href)
if is_image_link(fixed_href):
parsed = urlparse(fixed_href)
image_filename = os.path.basename(parsed.path)
if image_filename:
dest_path = topic_dir / image_filename
download_image(fixed_href, dest_path)
new_href = os.path.join(topic_relative_path, image_filename).replace(os.sep, '/')
a["href"] = new_href
else:
log.warning("Skipping link with empty filename from href: %s", fixed_href)
return str(soup)
def slugify(value: str) -> str:
"""
Normalizes string, removes non-alphanumeric characters, and converts whitespace to hyphens.
Useful for constructing filenames.
"""
value = str(value)
value = value.strip().lower()
value = re.sub(r'[^a-z0-9\s-]', '', value)
value = re.sub(r'[\s-]+', '-', value)
return value or "untitled"
# ----- Data Models -----
@dataclass(frozen=True)
class PostTopic:
id: int
slug: str
title: str
@dataclass(frozen=True)
class Post:
id: int
slug: str
raw: dict
def get_created_at(self) -> datetime.datetime:
return datetime.datetime.fromisoformat(self.raw['created_at'].replace("Z", "+00:00"))
def save(self, dir: Path):
"""Save the raw JSON post to disk if not already archived."""
idstr = str(self.id).zfill(10)
filename = f"{idstr}-{self.raw.get('username', 'anonymous')}-{self.raw.get('topic_slug', 'unknown')}.json"
filename = truncate_filename(filename)
folder_name = self.get_created_at().strftime('%Y-%m-%B')
full_path = dir / folder_name / filename
if full_path.exists():
log.debug("Post %s already saved, skipping", self.id)
return
full_path.parent.mkdir(parents=True, exist_ok=True)
log.info("Saving post %s to %s", self.id, full_path)
full_path.write_text(json.dumps(self.raw, indent=2), encoding='utf-8')
def get_topic(self) -> PostTopic:
return PostTopic(
id=self.raw.get('topic_id', self.id),
slug=self.raw.get('topic_slug', self.slug),
title=self.raw.get('topic_title', self.raw.get('title', 'No Title')),
)
@classmethod
def from_json(cls, j: dict) -> 'Post':
return cls(
id=j['id'],
slug=j.get('topic_slug', 'unknown'),
raw=j,
)
@dataclass(frozen=True)
class Topic:
id: int
slug: str
raw: dict
markdown: str
def get_created_at(self) -> datetime.datetime:
return datetime.datetime.fromisoformat(self.raw['created_at'].replace("Z", "+00:00"))
def save_rendered(self, dir: Path):
"""
Save the rendered Markdown topic to disk.
Filename built from creation date, slug, and id.
Truncate the filename if needed.
"""
date_str = str(self.get_created_at().date())
filename = f"{date_str}-{self.slug}-id{self.id}.md"
filename = truncate_filename(filename)
folder_name = self.get_created_at().strftime('%Y-%m-%B')
full_path = dir / folder_name / filename
full_path.parent.mkdir(parents=True, exist_ok=True)
log.info("Saving rendered topic %s to %s", self.id, full_path)
rendered_markdown = f"# {self.raw.get('title', 'No Title')}\n\n{self.markdown}"
full_path.write_text(rendered_markdown, encoding='utf-8')
# Return the relative path from the repository root.
return full_path.relative_to(dir.parent)
@classmethod
def from_json(cls, t: dict, markdown: str) -> 'Topic':
slug = t.get('slug') or t.get('topic_slug') or "unknown"
return cls(
id=t.get('id', 0),
slug=slug,
raw=t,
markdown=markdown,
)
# ----- New Helper for Rendering Topics with Image Downloading -----
def render_topic(site_url: str, topic: PostTopic, topics_dir: Path):
"""
Render a single topic to Markdown by:
1. Fetching the topic JSON.
2. Downloading its associated images and rewriting their URLs.
3. Converting processed HTML to Markdown (using html2text).
4. Saving the rendered Markdown document.
Images are saved to an assets directory relative to the site target directory.
Returns a dictionary with topic info for README updating.
"""
try:
log.info("Fetching topic %s JSON from %s", topic.id, site_url)
topic_data = http_get_json(site_url, f"/t/{topic.id}.json")
except Exception as e:
log.warning("Failed to fetch topic JSON for topic %s: %s", topic.id, e)
return None
# Define the assets directory in the repository root.
assets_dir = topics_dir.parent / "assets" / "images" / f"{topic.id}"
assets_dir.mkdir(parents=True, exist_ok=True)
# Determine the directory where the rendered markdown file will be saved.
try:
created_at = datetime.datetime.fromisoformat(topic_data['created_at'].replace("Z", "+00:00"))
except Exception as e:
log.error("Could not parse created_at for topic %s: %s", topic.id, e)
created_at = datetime.datetime.now()
folder_name = created_at.strftime('%Y-%m-%B')
rendered_md_dir = topics_dir / folder_name
# Compute the relative path from the markdown file's directory to the assets directory.
topic_relative_path = os.path.relpath(assets_dir, rendered_md_dir)
posts = topic_data.get("post_stream", {}).get("posts", [])
if not posts:
log.error("No posts found for topic %s", topic.id)
return None
converter = html2text.HTML2Text()
converter.body_width = 0
md_sections = []
for post in posts:
created = post.get("created_at", "unknown")
updated = post.get("updated_at", "unknown")
post_number = post.get("post_number", 0)
cooked_html = post.get("cooked", "")
# Pass the corrected topic_relative_path into process_html()
processed_html = process_html(cooked_html, assets_dir, topic_relative_path)
post_md = converter.handle(processed_html)
header_lines = [
f"**ID:** {topic.id}",
f"**USERNAME:** {post.get('username', 'unknown')}",
f"**POST NUMBER:** {post_number}",
f"**CREATED AT:** {created}",
f"**UPDATED AT:** {updated}",
]
# Join header lines with two newlines so each appears on its own line in GitHub Markdown.
header = "\n\n".join(header_lines)
section = f"## Post {post_number}\n\n{header}\n\n---\n\n{post_md}"
md_sections.append(section)
full_md = "\n\n".join(md_sections)
topic_title = topic_data.get("title", "No Title")
full_md = f"# {topic_title}\n\n" + full_md
topic_obj = Topic.from_json(topic_data, full_md)
saved_relative_path = topic_obj.save_rendered(topics_dir)
log.info("Saved rendered topic %s (%s)", topic_obj.id, topic_obj.slug)
# Return topic info for README.
return {
"id": topic_obj.id,
"slug": topic_obj.slug,
"title": topic_title,
"relative_path": str(saved_relative_path)
}
# ----- Concurrent Rendering Helper -----
def render_topics_concurrently(site_url: str, topics: dict, topics_dir: Path, max_workers: int = 8):
"""
Render multiple topics concurrently.
Returns a list of rendered topic information dictionaries.
"""
rendered_topics_info = []
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(render_topic, site_url, topic, topics_dir): topic for topic in topics.values()}
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
if result:
rendered_topics_info.append(result)
# Update the README incrementally after each topic is rendered.
update_readme_incrementally(topics_dir.parent, result)
except Exception as exc:
log.error("A topic generated an exception: %s", exc)
return rendered_topics_info
def update_metadata(metadata_file: Path, metadata: dict):
"""Writes the metadata as a JSON file to disk."""
log.debug("Updating metadata: %s", metadata)
metadata_file.write_text(json.dumps(metadata, indent=2), encoding='utf-8')
# A helper pattern to match a TOC line (i.e. a line with the topic entry and its id)
TOC_LINE_PATTERN = re.compile(
r"- $(?P<title>.+?)$(?P<relative_path>.+?)$\s*<!--\s*id:\s*(?P<id>\d+)\s*-->")
# ----- README Update Helpers -----
def read_existing_readme(repo_root: Path):
"""
Read the existing README.md from repo_root and return a dictionary of topics.
The keys will be the topic IDs (as integers) and the values as the topic dict.
If the file doesn't exist, return an empty dict.
"""
readme_path = repo_root / "README.md"
existing_topics = {}
if readme_path.exists():
try:
content = readme_path.read_text(encoding='utf-8')
# Expecting lines like: - [Topic Title](relative_path) <!-- id: topic_id -->
pattern = TOC_LINE_PATTERN
for line in content.splitlines():
match = pattern.match(line)
if match:
topic_id = int(match.group("id"))
existing_topics[topic_id] = {
"id": topic_id,
"title": match.group("title"),
"relative_path": match.group("relative_path")
}
except Exception as e:
log.error("Failed to parse existing README.md: %s", e)
return existing_topics
def update_readme_incrementally(repo_root: Path, new_topic: dict):
"""
Update or create README.md in repo_root by merging the new topic into the existing list.
If the topic already exists, report that. Otherwise, append the new topic to the TOC.
"""
topic_id = new_topic["id"]
existing_topics = read_existing_readme(repo_root)
if topic_id in existing_topics:
log.debug("Topic with id %s already exists in README.md", topic_id)
return
existing_topics[topic_id] = new_topic
append_to_readme(repo_root, new_topic)
def append_to_readme(repo_root: Path, new_topic: dict):
"""
Append a new topic to the existing README.md table-of-contents (TOC).
If README.md doesn't exist, create it with a header and the new topic.
"""
readme_path = repo_root / "README.md"
toc_header = ["# Archived Discourse Topics", "", "## Table of Contents", ""]
new_toc_line = f"- [{new_topic['title']}]({new_topic['relative_path']}) <!-- id: {new_topic['id']} -->"
if readme_path.exists():
try:
# Read the existing content
content = readme_path.read_text(encoding="utf-8")
lines = content.splitlines()
# Check if the file already has a TOC header by looking for the header marker.
try:
toc_start = lines.index("## Table of Contents")
# Find the blank line after the TOC header if exists
insertion_index = toc_start + 1
# Advance until we find the first non-TOC line or reach the end.
while (
insertion_index < len(lines)
and TOC_LINE_PATTERN.match(lines[insertion_index].strip())
):
insertion_index += 1
# Now, insert our new entry just before the first non-TOC line.
lines.insert(insertion_index, new_toc_line)
new_content = "\n".join(lines)
except ValueError:
# "## Table of Contents" not found, so we create a new TOC block at the top
new_content = "\n".join(toc_header + [new_toc_line] + [""] + lines)
except Exception as e:
log.error("Failed to read existing README.md: %s", e)
# In case of error, default to creating a new README.md with header and new topic
new_content = "\n".join(toc_header + [new_toc_line])
else:
# README.md doesn't exist, create a new one with a standard header and the new TOC entry
new_content = "\n".join(toc_header + [new_toc_line])
try:
readme_path.write_text(new_content, encoding="utf-8")
log.info("Updated README.md at %s", readme_path)
except Exception as e:
log.error("Failed to write README.md: %s", e)
def write_readme(site_target_dir: Path, topics: dict):
"""
Given a dictionary of topics, write out the full README.md at the site target directory.
"""
readme_path = site_target_dir / "README.md"
lines = ["# Archived Discourse Topics", "", "## Table of Contents", ""]
sorted_topics = sorted(topics.values(), key=lambda t: t["id"])
for topic in sorted_topics:
line = f"- [{topic['title']}]({topic['relative_path']}) <!-- id: {topic['id']} -->"
lines.append(line)
content = "\n".join(lines)
try:
readme_path.write_text(content, encoding="utf-8")
log.info("Finalized README.md updated at %s", readme_path)
except Exception as e:
log.error("Failed to write final README.md: %s", e)
# ----- Site Processing Function -----
def process_site(site_url: str, base_target_dir: Path):
"""
Archive posts and render topics for a single site.
Each site gets its own subdirectory (named for its hostname) inside the base target directory,
and its own metadata file.
The README.md is updated incrementally after each topic is rendered.
"""
parsed = urlparse(site_url)
site_name = parsed.hostname or site_url.replace("https://", "").replace("http://", "").split('/')[0]
log.info("Processing site: %s", site_url)
site_target_dir = base_target_dir / site_name
posts_dir = site_target_dir / 'posts'
topics_dir = site_target_dir / 'rendered-topics'
posts_dir.mkdir(parents=True, exist_ok=True)
topics_dir.mkdir(parents=True, exist_ok=True)
metadata_file = site_target_dir / '.metadata.json'
# Load stored metadata if exists.
metadata = {}
archived_post_ids = set()
if metadata_file.exists():
try:
metadata = json.loads(metadata_file.read_text())
if "archived_post_ids" in metadata:
archived_post_ids = set(int(x) for x in metadata.get('archived_post_ids', []))
except Exception as e:
log.error("Failed to read/parse metadata file for %s: %s", site_url, e)
posts_json = http_get_json(site_url, '/posts.json')
posts = posts_json.get('latest_posts', [])
last_id = None
should_stop = False
# List to accumulate info for final README generation.
rendered_topics_overall = []
while posts:
log.info("Processing %d posts for %s", len(posts), site_url)
topics_to_render = {} # Unique topics in this batch.
for json_post in posts:
try:
post = Post.from_json(json_post)
except Exception as e:
log.warning("Failed to deserialize post %s: %s", json_post, e)
continue
if post.id in archived_post_ids:
log.debug("Post %s already archived, skipping", post.id)
continue
post.save(posts_dir)
archived_post_ids.add(post.id)
last_id = post.id
topic = post.get_topic()
topics_to_render[topic.id] = topic
# Update metadata right away so that already processed posts won't be lost on interrupt.
metadata['archived_post_ids'] = sorted(archived_post_ids)
update_metadata(metadata_file, metadata)
if topics_to_render:
log.info("Rendering %d topics concurrently for %s.", len(topics_to_render), site_url)
rendered = render_topics_concurrently(site_url, topics_to_render, topics_dir, max_workers=8)
rendered_topics_overall.extend(rendered)
if should_stop:
log.info("Stopping pagination loop based on sync date for %s.", site_url)
break
if last_id is None or last_id <= 1:
log.info("No valid last_id found for %s. Ending pagination loop.", site_url)
break
time.sleep(5)
posts = http_get_json(site_url, f'/posts.json?before={last_id - 1}').get('latest_posts', [])
while not posts and last_id and last_id >= 0:
last_id -= 49
posts = http_get_json(site_url, f'/posts.json?before={last_id}').get('latest_posts', [])
time.sleep(1)
# Final merge/update of README from all rendered topics.
if rendered_topics_overall:
existing = read_existing_readme(site_target_dir)
for new_topic in rendered_topics_overall:
if new_topic["id"] not in existing:
existing[new_topic["id"]] = new_topic
write_readme(site_target_dir, existing)
else:
log.info("No topics rendered for %s; skipping final README.md generation.", site_url)
def main() -> None:
parameters = args()
base_target_dir = parameters.target_dir
if not isinstance(base_target_dir, Path):
base_target_dir = Path(base_target_dir)
base_target_dir.mkdir(parents=True, exist_ok=True)
sites = parse_sites(parameters.urls)
if not sites:
log.error("No valid sites provided. Exiting.")
sys.exit(1)
for site_url in sites:
process_site(site_url, base_target_dir)
if __name__ == "__main__":
main()