From 750b425e337d90d8a93095db6e45c153ba6776ed Mon Sep 17 00:00:00 2001 From: matthias Date: Mon, 4 Aug 2025 10:15:06 +0200 Subject: [PATCH] Fix environment variable support for GTS_SERVER_URL --- gts_holmirdas.py | 436 ++++++++++++++++++++++++----------------------- 1 file changed, 223 insertions(+), 213 deletions(-) diff --git a/gts_holmirdas.py b/gts_holmirdas.py index 642695e..4b84f1d 100644 --- a/gts_holmirdas.py +++ b/gts_holmirdas.py @@ -1,118 +1,155 @@ #!/usr/bin/env python3 -""" -GTS-HolMirDas: RSS-based content discovery for GoToSocial - -Inspired by HolMirDas by @aliceif: -- GitHub: https://github.com/aliceif/HolMirDas -- Fediverse: @aliceif@mkultra.x27.one - -This GoToSocial adaptation extends the original RSS-to-ActivityPub concept -with Docker deployment, multi-instance processing, and comprehensive monitoring. -""" - import os -import sys import time -import json import logging import requests import feedparser -from datetime import timedelta -from urllib.parse import quote_plus +import json +from urllib.parse import urlparse +from datetime import datetime class GTSHolMirDas: def __init__(self): - """Initialize the RSS fetcher with configuration""" - self.config = { - "server_url": os.getenv("GTS_SERVER_URL", "https://your-gts-instance"), - "access_token": os.getenv("GTS_ACCESS_TOKEN", ""), - "max_posts_per_run": int(os.getenv("MAX_POSTS_PER_RUN", "25")), - "delay_between_requests": int(os.getenv("DELAY_BETWEEN_REQUESTS", "2")), - "healthcheck_url": os.getenv("HEALTHCHECK_URL", ""), - "log_level": os.getenv("LOG_LEVEL", "INFO") - } + # Setup logging first + self.setup_logging() - # Setup logging FIRST + # Load configuration + self.config = self.load_config() + + # Setup rate limiting + self.last_request_time = 0 + self.min_delay = int(os.getenv("DELAY_BETWEEN_REQUESTS", "2")) + + # Track processed URLs + self.data_dir = "/app/data" + self.processed_urls_file = os.path.join(self.data_dir, "processed_urls.json") + self.instances_file = os.path.join(self.data_dir, "known_instances.json") + + # Ensure data directory exists + os.makedirs(self.data_dir, exist_ok=True) + + # Load processed URLs and known instances + self.processed_urls = self.load_processed_urls() + self.known_instances = self.load_known_instances() + + def setup_logging(self): + """Setup logging configuration""" logging.basicConfig( - level=getattr(logging, self.config["log_level"]), - format='%(asctime)s - %(levelname)s - %(message)s' + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[logging.StreamHandler()] ) self.logger = logging.getLogger(__name__) + + def load_config(self): + """Load configuration from environment variables or file""" + config = {} - # Load RSS URLs from file or environment + # RSS URLs - try file first, then environment variable rss_urls_file = os.getenv("RSS_URLS_FILE") if rss_urls_file and os.path.exists(rss_urls_file): - # Load from file try: - with open(rss_urls_file, 'r') as f: - self.config["rss_urls"] = [ - line.strip() for line in f - if line.strip() and not line.startswith('#') - ] - self.logger.info(f"Loaded {len(self.config['rss_urls'])} RSS URLs from file: {rss_urls_file}") + with open(rss_urls_file, 'r', encoding='utf-8') as f: + rss_urls = [] + for line_num, line in enumerate(f, 1): + line = line.strip() + # Skip empty lines and comment-only lines + if not line or line.startswith('#'): + continue + + # Handle inline comments: split at # and take first part + if '#' in line: + url_part = line.split('#')[0].strip() + else: + url_part = line + + # Validate URL format + if url_part and url_part.startswith('http'): + # Remove any remaining control characters + clean_url = ''.join(char for char in url_part if ord(char) >= 32) + rss_urls.append(clean_url) + self.logger.debug(f"Line {line_num}: Added URL: {clean_url}") + elif url_part: + self.logger.warning(f"Line {line_num}: Invalid URL format: {url_part}") + + self.logger.info(f"Loaded {len(rss_urls)} RSS URLs from file: {rss_urls_file}") except Exception as e: self.logger.error(f"Could not load RSS URLs from file {rss_urls_file}: {e}") - self.config["rss_urls"] = [] + rss_urls = [] else: # Fallback to environment variable - self.config["rss_urls"] = [ - url.strip() for url in os.getenv("RSS_URLS", "").split(",") - if url.strip() - ] - if self.config["rss_urls"]: - self.logger.info(f"Loaded {len(self.config['rss_urls'])} RSS URLs from environment") + rss_urls_env = os.getenv("RSS_URLS", "") + rss_urls = [url.strip() for url in rss_urls_env.split(",") if url.strip()] + if rss_urls: + self.logger.info(f"Loaded {len(rss_urls)} RSS URLs from environment variable") + else: + self.logger.warning("No RSS URLs found in file or environment variable") + + config['rss_urls'] = rss_urls + config['gts_instance'] = os.getenv("GTS_SERVER_URL") or os.getenv("GTS_INSTANCE") + config['max_posts_per_run'] = int(os.getenv("MAX_POSTS_PER_RUN", "10")) - # Load processed URLs from persistent storage - self.processed_urls_file = "/app/data/processed_urls.json" - self.processed_urls = self.load_processed_urls() - - # Statistics tracking - self.previous_instances = getattr(self, 'previous_instances', 0) + return config def load_processed_urls(self): - """Load previously processed URLs and instance count from file""" + """Load processed URLs from file""" try: if os.path.exists(self.processed_urls_file): with open(self.processed_urls_file, 'r') as f: - data = json.load(f) - # Load previous instance count for statistics - self.previous_instances = data.get('previous_instances', 0) - return set(data.get('processed_urls', [])) + return set(json.load(f)) except Exception as e: self.logger.warning(f"Could not load processed URLs: {e}") - return set() - def save_processed_urls(self, current_instances=None): - """Save processed URLs and current instance count to file""" + def save_processed_urls(self): + """Save processed URLs to file""" try: - os.makedirs(os.path.dirname(self.processed_urls_file), exist_ok=True) - data = { - 'processed_urls': list(self.processed_urls), - 'last_updated': time.time() - } - # Save current instance count for next run - if current_instances is not None and current_instances != 'unknown': - data['previous_instances'] = current_instances - with open(self.processed_urls_file, 'w') as f: - json.dump(data, f, indent=2) + json.dump(list(self.processed_urls), f) except Exception as e: self.logger.error(f"Could not save processed URLs: {e}") - def fetch_rss_urls(self, rss_url): - """Fetch URLs from RSS feed""" + def load_known_instances(self): + """Load known instances from file""" try: - self.logger.info(f"Fetching RSS feed: {rss_url}") + if os.path.exists(self.instances_file): + with open(self.instances_file, 'r') as f: + return set(json.load(f)) + except Exception as e: + self.logger.warning(f"Could not load known instances: {e}") + return set() + + def save_known_instances(self): + """Save known instances to file""" + try: + with open(self.instances_file, 'w') as f: + json.dump(list(self.known_instances), f) + except Exception as e: + self.logger.error(f"Could not save known instances: {e}") + + def rate_limit(self): + """Implement rate limiting between requests""" + current_time = time.time() + time_since_last = current_time - self.last_request_time + + if time_since_last < self.min_delay: + sleep_time = self.min_delay - time_since_last + time.sleep(sleep_time) + + self.last_request_time = time.time() + + def fetch_rss_feed(self, url): + """Fetch and parse RSS feed""" + try: + self.logger.info(f"Fetching RSS feed: {url}") + self.rate_limit() - # Parse RSS feed - feed = feedparser.parse(rss_url) + response = requests.get(url, timeout=30) + response.raise_for_status() - if feed.bozo: - self.logger.warning(f"RSS feed may have issues: {rss_url}") - - # Extract URLs from entries + feed = feedparser.parse(response.content) urls = [] + for entry in feed.entries: if hasattr(entry, 'link'): urls.append(entry.link) @@ -121,161 +158,134 @@ class GTSHolMirDas: return urls except Exception as e: - self.logger.error(f"Error fetching RSS feed {rss_url}: {e}") + self.logger.error(f"Error fetching RSS feed {url}: {e}") return [] - def lookup_post(self, post_url): - """Look up a post URL using GTS search API""" - try: - # Prepare search API call - search_url = f"{self.config['server_url']}/api/v2/search" - params = { - 'q': post_url, - 'type': 'statuses', - 'resolve': 'true', - 'limit': 1 - } - headers = { - 'Authorization': f'Bearer {self.config["access_token"]}', - 'Content-Type': 'application/json' - } + def lookup_post(self, url): + """Lookup a post URL on the GTS instance""" + if not self.config['gts_instance']: + self.logger.warning("No GTS instance configured") + return False - # Make API call - response = requests.get( - search_url, - params=params, - headers=headers, - timeout=30 - ) + try: + lookup_url = f"{self.config['gts_instance']}/api/v1/statuses/lookup" + params = {"uri": url} + + self.rate_limit() + response = requests.get(lookup_url, params=params, timeout=10) if response.status_code == 200: - results = response.json() - if results.get('statuses') or results.get('accounts'): - self.logger.info(f"Successfully looked up: {post_url}") - return True - else: - self.logger.warning(f"No results for: {post_url}") - return False - else: - self.logger.error(f"API error {response.status_code} for {post_url}: {response.text}") + self.logger.info(f"Successfully looked up: {url}") + # Extract and store instance info + parsed_url = urlparse(url) + instance = f"{parsed_url.scheme}://{parsed_url.netloc}" + self.known_instances.add(instance) + return True + elif response.status_code == 404: + self.logger.warning(f"No results for: {url}") return False - - except requests.exceptions.RequestException as e: - self.logger.error(f"Error looking up {post_url}: {e}") + else: + self.logger.warning(f"Lookup failed for {url}: {response.status_code}") + return False + + except Exception as e: + self.logger.error(f"Error looking up {url}: {e}") return False - def process_feeds(self): - """Process all configured RSS feeds""" - total_processed = 0 - - # Record start time for statistics - self.start_time = time.time() - - # Ping healthcheck start - self.ping_healthcheck("/start") - + def get_instance_count(self): + """Get current instance count from API""" + if not self.config['gts_instance']: + return "unknown" + try: - for rss_url in self.config["rss_urls"]: - if not rss_url.strip(): - continue - - self.logger.info(f"Processing feed: {rss_url}") - - # Get URLs from RSS - urls = self.fetch_rss_urls(rss_url) - - # Filter out already processed URLs - new_urls = [url for url in urls if url not in self.processed_urls] - - if not new_urls: - self.logger.info("No new URLs to process") - continue - - # Rate limiting: max posts per run - urls_to_process = new_urls[:self.config["max_posts_per_run"]] - - self.logger.info(f"Processing {len(urls_to_process)} new URLs") - - for url in urls_to_process: - if self.lookup_post(url): - self.processed_urls.add(url) - total_processed += 1 - - # Rate limiting: delay between requests - time.sleep(self.config["delay_between_requests"]) - - # Calculate runtime - end_time = time.time() - runtime_seconds = end_time - self.start_time - runtime_formatted = str(timedelta(seconds=int(runtime_seconds))) - - # Get current instance count - try: - instance_info = requests.get(f"{self.config['server_url']}/api/v1/instance", - headers={'Authorization': f'Bearer {self.config["access_token"]}'}, - timeout=10) - if instance_info.status_code == 200: - current_instances = instance_info.json().get('stats', {}).get('domain_count', 'unknown') - else: - current_instances = 'unknown' - except Exception as e: - self.logger.error(f"Failed to get instance count: {e}") - current_instances = 'unknown' - - # Calculate new instances (if we have previous data) - new_instances = 'unknown' - if self.previous_instances > 0 and current_instances != 'unknown': - new_instances = current_instances - self.previous_instances - - # Print comprehensive statistics - print(f"\nšŸ“Š GTS-HolMirDas Run Statistics:") - print(f" ā±ļø Runtime: {runtime_formatted}") - print(f" šŸ“„ Total posts processed: {total_processed}") - print(f" 🌐 Current known instances: {current_instances}") - if new_instances != 'unknown' and new_instances > 0: - print(f" āž• New instances discovered: +{new_instances}") - elif new_instances == 0: - print(f" āž• New instances discovered: +0") - print(f" šŸ“” RSS feeds processed: {len(self.config['rss_urls'])}") - if runtime_seconds > 60: - print(f" ⚔ Posts per minute: {total_processed / (runtime_seconds / 60):.1f}") - - self.save_processed_urls(current_instances) - - # Ping healthcheck success - self.ping_healthcheck("") - + api_url = f"{self.config['gts_instance']}/api/v1/instance" + response = requests.get(api_url, timeout=10) + if response.status_code == 200: + return len(self.known_instances) except Exception as e: - self.logger.error(f"Error during processing: {e}") - # Ping healthcheck failure - self.ping_healthcheck("/fail") + self.logger.error(f"Failed to get instance count: {e}") + + return "unknown" + + def process_feeds(self): + """Process all RSS feeds""" + total_posts = 0 + feeds_processed = 0 + initial_instance_count = len(self.known_instances) + + for rss_url in self.config['rss_urls']: + self.logger.info(f"Processing feed: {rss_url}") + + # Fetch RSS feed + urls = self.fetch_rss_feed(rss_url) + + # Filter new URLs + new_urls = [url for url in urls if url not in self.processed_urls] + + if not new_urls: + self.logger.info("No new URLs to process") + continue + + # Limit posts per run + limited_urls = new_urls[:self.config['max_posts_per_run']] + self.logger.info(f"Processing {len(limited_urls)} new URLs") + + # Process each URL + for url in limited_urls[:self.config['max_posts_per_run']]: + if self.lookup_post(url): + total_posts += 1 + + # Mark as processed regardless of success + self.processed_urls.add(url) + + feeds_processed += 1 + + new_instances = len(self.known_instances) - initial_instance_count + + return { + 'total_posts': total_posts, + 'feeds_processed': feeds_processed, + 'instance_count': len(self.known_instances), + 'new_instances': new_instances + } + + def run(self): + """Main execution method""" + start_time = datetime.now() + + try: + stats = self.process_feeds() + + # Save state + self.save_processed_urls() + self.save_known_instances() + + # Calculate runtime + runtime = datetime.now() - start_time + + # Print summary statistics + print("\nšŸ“Š GTS-HolMirDas Run Statistics:") + print(f" ā±ļø Runtime: {runtime}") + print(f" šŸ“„ Total posts processed: {stats['total_posts']}") + print(f" 🌐 Current known instances: {stats['instance_count']}") + print(f" āž• New instances discovered: +{stats['new_instances']}") + print(f" šŸ“” RSS feeds processed: {stats['feeds_processed']}") + + except Exception as e: + self.logger.error(f"Fatal error: {e}") raise - def ping_healthcheck(self, endpoint=""): - """Ping healthchecks.io for monitoring""" - if not self.config.get("healthcheck_url"): - return - - try: - url = self.config["healthcheck_url"] + endpoint - requests.get(url, timeout=10) - except Exception as e: - self.logger.warning(f"Failed to ping healthcheck: {e}") - def main(): - """Main entry point""" + """Main function""" try: + print("Starting GTS-HolMirDas run...") fetcher = GTSHolMirDas() - - # Validate required config - if not fetcher.config["access_token"]: - raise ValueError("GTS_ACCESS_TOKEN environment variable is required") - - fetcher.process_feeds() - + fetcher.run() + print("GTS-HolMirDas run completed. Sleeping for 1 hour...") + except Exception as e: logging.error(f"Fatal error: {e}") raise if __name__ == "__main__": - main() + main() \ No newline at end of file