diff --git a/gts_holmirdas.py b/gts_holmirdas.py index 4b84f1d..642695e 100644 --- a/gts_holmirdas.py +++ b/gts_holmirdas.py @@ -1,155 +1,118 @@ #!/usr/bin/env python3 +""" +GTS-HolMirDas: RSS-based content discovery for GoToSocial + +Inspired by HolMirDas by @aliceif: +- GitHub: https://github.com/aliceif/HolMirDas +- Fediverse: @aliceif@mkultra.x27.one + +This GoToSocial adaptation extends the original RSS-to-ActivityPub concept +with Docker deployment, multi-instance processing, and comprehensive monitoring. +""" + import os +import sys import time +import json import logging import requests import feedparser -import json -from urllib.parse import urlparse -from datetime import datetime +from datetime import timedelta +from urllib.parse import quote_plus class GTSHolMirDas: def __init__(self): - # Setup logging first - self.setup_logging() + """Initialize the RSS fetcher with configuration""" + self.config = { + "server_url": os.getenv("GTS_SERVER_URL", "https://your-gts-instance"), + "access_token": os.getenv("GTS_ACCESS_TOKEN", ""), + "max_posts_per_run": int(os.getenv("MAX_POSTS_PER_RUN", "25")), + "delay_between_requests": int(os.getenv("DELAY_BETWEEN_REQUESTS", "2")), + "healthcheck_url": os.getenv("HEALTHCHECK_URL", ""), + "log_level": os.getenv("LOG_LEVEL", "INFO") + } - # Load configuration - self.config = self.load_config() - - # Setup rate limiting - self.last_request_time = 0 - self.min_delay = int(os.getenv("DELAY_BETWEEN_REQUESTS", "2")) - - # Track processed URLs - self.data_dir = "/app/data" - self.processed_urls_file = os.path.join(self.data_dir, "processed_urls.json") - self.instances_file = os.path.join(self.data_dir, "known_instances.json") - - # Ensure data directory exists - os.makedirs(self.data_dir, exist_ok=True) - - # Load processed URLs and known instances - self.processed_urls = self.load_processed_urls() - self.known_instances = self.load_known_instances() - - def setup_logging(self): - """Setup logging configuration""" + # Setup logging FIRST logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s', - handlers=[logging.StreamHandler()] + level=getattr(logging, self.config["log_level"]), + format='%(asctime)s - %(levelname)s - %(message)s' ) self.logger = logging.getLogger(__name__) - - def load_config(self): - """Load configuration from environment variables or file""" - config = {} - # RSS URLs - try file first, then environment variable + # Load RSS URLs from file or environment rss_urls_file = os.getenv("RSS_URLS_FILE") if rss_urls_file and os.path.exists(rss_urls_file): + # Load from file try: - with open(rss_urls_file, 'r', encoding='utf-8') as f: - rss_urls = [] - for line_num, line in enumerate(f, 1): - line = line.strip() - # Skip empty lines and comment-only lines - if not line or line.startswith('#'): - continue - - # Handle inline comments: split at # and take first part - if '#' in line: - url_part = line.split('#')[0].strip() - else: - url_part = line - - # Validate URL format - if url_part and url_part.startswith('http'): - # Remove any remaining control characters - clean_url = ''.join(char for char in url_part if ord(char) >= 32) - rss_urls.append(clean_url) - self.logger.debug(f"Line {line_num}: Added URL: {clean_url}") - elif url_part: - self.logger.warning(f"Line {line_num}: Invalid URL format: {url_part}") - - self.logger.info(f"Loaded {len(rss_urls)} RSS URLs from file: {rss_urls_file}") + with open(rss_urls_file, 'r') as f: + self.config["rss_urls"] = [ + line.strip() for line in f + if line.strip() and not line.startswith('#') + ] + self.logger.info(f"Loaded {len(self.config['rss_urls'])} RSS URLs from file: {rss_urls_file}") except Exception as e: self.logger.error(f"Could not load RSS URLs from file {rss_urls_file}: {e}") - rss_urls = [] + self.config["rss_urls"] = [] else: # Fallback to environment variable - rss_urls_env = os.getenv("RSS_URLS", "") - rss_urls = [url.strip() for url in rss_urls_env.split(",") if url.strip()] - if rss_urls: - self.logger.info(f"Loaded {len(rss_urls)} RSS URLs from environment variable") - else: - self.logger.warning("No RSS URLs found in file or environment variable") - - config['rss_urls'] = rss_urls - config['gts_instance'] = os.getenv("GTS_SERVER_URL") or os.getenv("GTS_INSTANCE") - config['max_posts_per_run'] = int(os.getenv("MAX_POSTS_PER_RUN", "10")) + self.config["rss_urls"] = [ + url.strip() for url in os.getenv("RSS_URLS", "").split(",") + if url.strip() + ] + if self.config["rss_urls"]: + self.logger.info(f"Loaded {len(self.config['rss_urls'])} RSS URLs from environment") - return config + # Load processed URLs from persistent storage + self.processed_urls_file = "/app/data/processed_urls.json" + self.processed_urls = self.load_processed_urls() + + # Statistics tracking + self.previous_instances = getattr(self, 'previous_instances', 0) def load_processed_urls(self): - """Load processed URLs from file""" + """Load previously processed URLs and instance count from file""" try: if os.path.exists(self.processed_urls_file): with open(self.processed_urls_file, 'r') as f: - return set(json.load(f)) + data = json.load(f) + # Load previous instance count for statistics + self.previous_instances = data.get('previous_instances', 0) + return set(data.get('processed_urls', [])) except Exception as e: self.logger.warning(f"Could not load processed URLs: {e}") + return set() - def save_processed_urls(self): - """Save processed URLs to file""" + def save_processed_urls(self, current_instances=None): + """Save processed URLs and current instance count to file""" try: + os.makedirs(os.path.dirname(self.processed_urls_file), exist_ok=True) + data = { + 'processed_urls': list(self.processed_urls), + 'last_updated': time.time() + } + # Save current instance count for next run + if current_instances is not None and current_instances != 'unknown': + data['previous_instances'] = current_instances + with open(self.processed_urls_file, 'w') as f: - json.dump(list(self.processed_urls), f) + json.dump(data, f, indent=2) except Exception as e: self.logger.error(f"Could not save processed URLs: {e}") - def load_known_instances(self): - """Load known instances from file""" + def fetch_rss_urls(self, rss_url): + """Fetch URLs from RSS feed""" try: - if os.path.exists(self.instances_file): - with open(self.instances_file, 'r') as f: - return set(json.load(f)) - except Exception as e: - self.logger.warning(f"Could not load known instances: {e}") - return set() - - def save_known_instances(self): - """Save known instances to file""" - try: - with open(self.instances_file, 'w') as f: - json.dump(list(self.known_instances), f) - except Exception as e: - self.logger.error(f"Could not save known instances: {e}") - - def rate_limit(self): - """Implement rate limiting between requests""" - current_time = time.time() - time_since_last = current_time - self.last_request_time - - if time_since_last < self.min_delay: - sleep_time = self.min_delay - time_since_last - time.sleep(sleep_time) - - self.last_request_time = time.time() - - def fetch_rss_feed(self, url): - """Fetch and parse RSS feed""" - try: - self.logger.info(f"Fetching RSS feed: {url}") - self.rate_limit() + self.logger.info(f"Fetching RSS feed: {rss_url}") - response = requests.get(url, timeout=30) - response.raise_for_status() + # Parse RSS feed + feed = feedparser.parse(rss_url) - feed = feedparser.parse(response.content) + if feed.bozo: + self.logger.warning(f"RSS feed may have issues: {rss_url}") + + # Extract URLs from entries urls = [] - for entry in feed.entries: if hasattr(entry, 'link'): urls.append(entry.link) @@ -158,134 +121,161 @@ class GTSHolMirDas: return urls except Exception as e: - self.logger.error(f"Error fetching RSS feed {url}: {e}") + self.logger.error(f"Error fetching RSS feed {rss_url}: {e}") return [] - def lookup_post(self, url): - """Lookup a post URL on the GTS instance""" - if not self.config['gts_instance']: - self.logger.warning("No GTS instance configured") - return False - + def lookup_post(self, post_url): + """Look up a post URL using GTS search API""" try: - lookup_url = f"{self.config['gts_instance']}/api/v1/statuses/lookup" - params = {"uri": url} + # Prepare search API call + search_url = f"{self.config['server_url']}/api/v2/search" + params = { + 'q': post_url, + 'type': 'statuses', + 'resolve': 'true', + 'limit': 1 + } + headers = { + 'Authorization': f'Bearer {self.config["access_token"]}', + 'Content-Type': 'application/json' + } - self.rate_limit() - response = requests.get(lookup_url, params=params, timeout=10) + # Make API call + response = requests.get( + search_url, + params=params, + headers=headers, + timeout=30 + ) if response.status_code == 200: - self.logger.info(f"Successfully looked up: {url}") - # Extract and store instance info - parsed_url = urlparse(url) - instance = f"{parsed_url.scheme}://{parsed_url.netloc}" - self.known_instances.add(instance) - return True - elif response.status_code == 404: - self.logger.warning(f"No results for: {url}") - return False + results = response.json() + if results.get('statuses') or results.get('accounts'): + self.logger.info(f"Successfully looked up: {post_url}") + return True + else: + self.logger.warning(f"No results for: {post_url}") + return False else: - self.logger.warning(f"Lookup failed for {url}: {response.status_code}") + self.logger.error(f"API error {response.status_code} for {post_url}: {response.text}") return False - - except Exception as e: - self.logger.error(f"Error looking up {url}: {e}") - return False - def get_instance_count(self): - """Get current instance count from API""" - if not self.config['gts_instance']: - return "unknown" - - try: - api_url = f"{self.config['gts_instance']}/api/v1/instance" - response = requests.get(api_url, timeout=10) - if response.status_code == 200: - return len(self.known_instances) - except Exception as e: - self.logger.error(f"Failed to get instance count: {e}") - - return "unknown" + except requests.exceptions.RequestException as e: + self.logger.error(f"Error looking up {post_url}: {e}") + return False def process_feeds(self): - """Process all RSS feeds""" - total_posts = 0 - feeds_processed = 0 - initial_instance_count = len(self.known_instances) - - for rss_url in self.config['rss_urls']: - self.logger.info(f"Processing feed: {rss_url}") - - # Fetch RSS feed - urls = self.fetch_rss_feed(rss_url) - - # Filter new URLs - new_urls = [url for url in urls if url not in self.processed_urls] - - if not new_urls: - self.logger.info("No new URLs to process") - continue - - # Limit posts per run - limited_urls = new_urls[:self.config['max_posts_per_run']] - self.logger.info(f"Processing {len(limited_urls)} new URLs") - - # Process each URL - for url in limited_urls[:self.config['max_posts_per_run']]: - if self.lookup_post(url): - total_posts += 1 - - # Mark as processed regardless of success - self.processed_urls.add(url) - - feeds_processed += 1 - - new_instances = len(self.known_instances) - initial_instance_count - - return { - 'total_posts': total_posts, - 'feeds_processed': feeds_processed, - 'instance_count': len(self.known_instances), - 'new_instances': new_instances - } + """Process all configured RSS feeds""" + total_processed = 0 + + # Record start time for statistics + self.start_time = time.time() + + # Ping healthcheck start + self.ping_healthcheck("/start") - def run(self): - """Main execution method""" - start_time = datetime.now() - try: - stats = self.process_feeds() - - # Save state - self.save_processed_urls() - self.save_known_instances() - + for rss_url in self.config["rss_urls"]: + if not rss_url.strip(): + continue + + self.logger.info(f"Processing feed: {rss_url}") + + # Get URLs from RSS + urls = self.fetch_rss_urls(rss_url) + + # Filter out already processed URLs + new_urls = [url for url in urls if url not in self.processed_urls] + + if not new_urls: + self.logger.info("No new URLs to process") + continue + + # Rate limiting: max posts per run + urls_to_process = new_urls[:self.config["max_posts_per_run"]] + + self.logger.info(f"Processing {len(urls_to_process)} new URLs") + + for url in urls_to_process: + if self.lookup_post(url): + self.processed_urls.add(url) + total_processed += 1 + + # Rate limiting: delay between requests + time.sleep(self.config["delay_between_requests"]) + # Calculate runtime - runtime = datetime.now() - start_time + end_time = time.time() + runtime_seconds = end_time - self.start_time + runtime_formatted = str(timedelta(seconds=int(runtime_seconds))) - # Print summary statistics - print("\nšŸ“Š GTS-HolMirDas Run Statistics:") - print(f" ā±ļø Runtime: {runtime}") - print(f" šŸ“„ Total posts processed: {stats['total_posts']}") - print(f" 🌐 Current known instances: {stats['instance_count']}") - print(f" āž• New instances discovered: +{stats['new_instances']}") - print(f" šŸ“” RSS feeds processed: {stats['feeds_processed']}") + # Get current instance count + try: + instance_info = requests.get(f"{self.config['server_url']}/api/v1/instance", + headers={'Authorization': f'Bearer {self.config["access_token"]}'}, + timeout=10) + if instance_info.status_code == 200: + current_instances = instance_info.json().get('stats', {}).get('domain_count', 'unknown') + else: + current_instances = 'unknown' + except Exception as e: + self.logger.error(f"Failed to get instance count: {e}") + current_instances = 'unknown' + # Calculate new instances (if we have previous data) + new_instances = 'unknown' + if self.previous_instances > 0 and current_instances != 'unknown': + new_instances = current_instances - self.previous_instances + + # Print comprehensive statistics + print(f"\nšŸ“Š GTS-HolMirDas Run Statistics:") + print(f" ā±ļø Runtime: {runtime_formatted}") + print(f" šŸ“„ Total posts processed: {total_processed}") + print(f" 🌐 Current known instances: {current_instances}") + if new_instances != 'unknown' and new_instances > 0: + print(f" āž• New instances discovered: +{new_instances}") + elif new_instances == 0: + print(f" āž• New instances discovered: +0") + print(f" šŸ“” RSS feeds processed: {len(self.config['rss_urls'])}") + if runtime_seconds > 60: + print(f" ⚔ Posts per minute: {total_processed / (runtime_seconds / 60):.1f}") + + self.save_processed_urls(current_instances) + + # Ping healthcheck success + self.ping_healthcheck("") + except Exception as e: - self.logger.error(f"Fatal error: {e}") + self.logger.error(f"Error during processing: {e}") + # Ping healthcheck failure + self.ping_healthcheck("/fail") raise + def ping_healthcheck(self, endpoint=""): + """Ping healthchecks.io for monitoring""" + if not self.config.get("healthcheck_url"): + return + + try: + url = self.config["healthcheck_url"] + endpoint + requests.get(url, timeout=10) + except Exception as e: + self.logger.warning(f"Failed to ping healthcheck: {e}") + def main(): - """Main function""" + """Main entry point""" try: - print("Starting GTS-HolMirDas run...") fetcher = GTSHolMirDas() - fetcher.run() - print("GTS-HolMirDas run completed. Sleeping for 1 hour...") - + + # Validate required config + if not fetcher.config["access_token"]: + raise ValueError("GTS_ACCESS_TOKEN environment variable is required") + + fetcher.process_feeds() + except Exception as e: logging.error(f"Fatal error: {e}") raise if __name__ == "__main__": - main() \ No newline at end of file + main()