diff --git a/brozzler/frontier.py b/brozzler/frontier.py index 6e64e51..3866edf 100644 --- a/brozzler/frontier.py +++ b/brozzler/frontier.py @@ -23,11 +23,63 @@ import time import datetime import rethinkdb as r import doublethink +import psycopg import urlcanon class UnexpectedDbResult(Exception): pass +@lru_cache(maxsize=256) +def skip_page(url, site, conn): + """ + check if url and site are present in conn table captures, + returning true if they exist, and false if not + """ + query = "SELECT exists(SELECT 1 FROM captures WHERE url = %s and site = %s LIMIT 1);" + cur = conn.cursor() + try: + cur.execute(query, (captured_url, site)) + except Exception as e: + logging.warning("exception querying for %s and %s: %s", url, site, e) + return False + result = cur.fetchone() + if result and result == (True, ): + logging.info("skipping page for url %s", url) + return True + return False + +class LimitPagesPGMixin: + """ + Limit pages added to brozzler frontier to one per canonical url + """ + def __init__(self, datasource): + # datasource like "postgresql://user@db_host/db_name" + # table name captures + try: + self._conn = psycopg.connect(datasource, prepare_threshold=None) + except Exception as e: + self.logger.warning("db connection failure: %s", e) + + def limit_pages(self, url, site): + """ + tracks captured urls, aka completed pages, per site, + returns True when we've captured this url before, else False + """ + self.logger.info('%s', skip_page.cache_info()) + + return skip_page(url, site, self._conn) + + def note_page(self, url, site): + """ + add page and site to captures table + """ + query = "INSERT INTO captures (url, site) VALUES (%s, %s);" + try: + cur.execute(query, (url, site)) + conn.commit() + except Exception as e: + logging.warning("exception inserting %s and %s: %s", url, site, e) + class RethinkDbFrontier: logger = logging.getLogger(__module__ + "." + __qualname__)