Fix inline comment parsing in RSS feeds file

- Fixed control character errors when using inline comments in rss_feeds.txt
- Comments after # are now properly stripped from RSS URLs
- Minimal fix using split('#', 1)[0].strip() approach
This commit is contained in:
matthias 2025-08-04 11:19:38 +02:00
parent 2dd7e9162e
commit d2601cd83f

View file

@ -1,155 +1,118 @@
#!/usr/bin/env python3
"""
GTS-HolMirDas: RSS-based content discovery for GoToSocial
Inspired by HolMirDas by @aliceif:
- GitHub: https://github.com/aliceif/HolMirDas
- Fediverse: @aliceif@mkultra.x27.one
This GoToSocial adaptation extends the original RSS-to-ActivityPub concept
with Docker deployment, multi-instance processing, and comprehensive monitoring.
"""
import os
import sys
import time
import json
import logging
import requests
import feedparser
import json
from urllib.parse import urlparse
from datetime import datetime
from datetime import timedelta
from urllib.parse import quote_plus
class GTSHolMirDas:
def __init__(self):
# Setup logging first
self.setup_logging()
"""Initialize the RSS fetcher with configuration"""
self.config = {
"server_url": os.getenv("GTS_SERVER_URL", "https://your-gts-instance"),
"access_token": os.getenv("GTS_ACCESS_TOKEN", ""),
"max_posts_per_run": int(os.getenv("MAX_POSTS_PER_RUN", "25")),
"delay_between_requests": int(os.getenv("DELAY_BETWEEN_REQUESTS", "2")),
"healthcheck_url": os.getenv("HEALTHCHECK_URL", ""),
"log_level": os.getenv("LOG_LEVEL", "INFO")
}
# Load configuration
self.config = self.load_config()
# Setup rate limiting
self.last_request_time = 0
self.min_delay = int(os.getenv("DELAY_BETWEEN_REQUESTS", "2"))
# Track processed URLs
self.data_dir = "/app/data"
self.processed_urls_file = os.path.join(self.data_dir, "processed_urls.json")
self.instances_file = os.path.join(self.data_dir, "known_instances.json")
# Ensure data directory exists
os.makedirs(self.data_dir, exist_ok=True)
# Load processed URLs and known instances
self.processed_urls = self.load_processed_urls()
self.known_instances = self.load_known_instances()
def setup_logging(self):
"""Setup logging configuration"""
# Setup logging FIRST
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
level=getattr(logging, self.config["log_level"]),
format='%(asctime)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
def load_config(self):
"""Load configuration from environment variables or file"""
config = {}
# RSS URLs - try file first, then environment variable
# Load RSS URLs from file or environment
rss_urls_file = os.getenv("RSS_URLS_FILE")
if rss_urls_file and os.path.exists(rss_urls_file):
# Load from file
try:
with open(rss_urls_file, 'r', encoding='utf-8') as f:
rss_urls = []
for line_num, line in enumerate(f, 1):
line = line.strip()
# Skip empty lines and comment-only lines
if not line or line.startswith('#'):
continue
# Handle inline comments: split at # and take first part
if '#' in line:
url_part = line.split('#')[0].strip()
else:
url_part = line
# Validate URL format
if url_part and url_part.startswith('http'):
# Remove any remaining control characters
clean_url = ''.join(char for char in url_part if ord(char) >= 32)
rss_urls.append(clean_url)
self.logger.debug(f"Line {line_num}: Added URL: {clean_url}")
elif url_part:
self.logger.warning(f"Line {line_num}: Invalid URL format: {url_part}")
self.logger.info(f"Loaded {len(rss_urls)} RSS URLs from file: {rss_urls_file}")
with open(rss_urls_file, 'r') as f:
self.config["rss_urls"] = [
line.split('#', 1)[0].strip() for line in f
if line.strip() and not line.strip().startswith('#')
]
self.logger.info(f"Loaded {len(self.config['rss_urls'])} RSS URLs from file: {rss_urls_file}")
except Exception as e:
self.logger.error(f"Could not load RSS URLs from file {rss_urls_file}: {e}")
rss_urls = []
self.config["rss_urls"] = []
else:
# Fallback to environment variable
rss_urls_env = os.getenv("RSS_URLS", "")
rss_urls = [url.strip() for url in rss_urls_env.split(",") if url.strip()]
if rss_urls:
self.logger.info(f"Loaded {len(rss_urls)} RSS URLs from environment variable")
else:
self.logger.warning("No RSS URLs found in file or environment variable")
config['rss_urls'] = rss_urls
config['gts_instance'] = os.getenv("GTS_SERVER_URL") or os.getenv("GTS_INSTANCE")
config['max_posts_per_run'] = int(os.getenv("MAX_POSTS_PER_RUN", "10"))
self.config["rss_urls"] = [
url.strip() for url in os.getenv("RSS_URLS", "").split(",")
if url.strip()
]
if self.config["rss_urls"]:
self.logger.info(f"Loaded {len(self.config['rss_urls'])} RSS URLs from environment")
return config
# Load processed URLs from persistent storage
self.processed_urls_file = "/app/data/processed_urls.json"
self.processed_urls = self.load_processed_urls()
# Statistics tracking
self.previous_instances = getattr(self, 'previous_instances', 0)
def load_processed_urls(self):
"""Load processed URLs from file"""
"""Load previously processed URLs and instance count from file"""
try:
if os.path.exists(self.processed_urls_file):
with open(self.processed_urls_file, 'r') as f:
return set(json.load(f))
data = json.load(f)
# Load previous instance count for statistics
self.previous_instances = data.get('previous_instances', 0)
return set(data.get('processed_urls', []))
except Exception as e:
self.logger.warning(f"Could not load processed URLs: {e}")
return set()
def save_processed_urls(self):
"""Save processed URLs to file"""
def save_processed_urls(self, current_instances=None):
"""Save processed URLs and current instance count to file"""
try:
os.makedirs(os.path.dirname(self.processed_urls_file), exist_ok=True)
data = {
'processed_urls': list(self.processed_urls),
'last_updated': time.time()
}
# Save current instance count for next run
if current_instances is not None and current_instances != 'unknown':
data['previous_instances'] = current_instances
with open(self.processed_urls_file, 'w') as f:
json.dump(list(self.processed_urls), f)
json.dump(data, f, indent=2)
except Exception as e:
self.logger.error(f"Could not save processed URLs: {e}")
def load_known_instances(self):
"""Load known instances from file"""
def fetch_rss_urls(self, rss_url):
"""Fetch URLs from RSS feed"""
try:
if os.path.exists(self.instances_file):
with open(self.instances_file, 'r') as f:
return set(json.load(f))
except Exception as e:
self.logger.warning(f"Could not load known instances: {e}")
return set()
def save_known_instances(self):
"""Save known instances to file"""
try:
with open(self.instances_file, 'w') as f:
json.dump(list(self.known_instances), f)
except Exception as e:
self.logger.error(f"Could not save known instances: {e}")
def rate_limit(self):
"""Implement rate limiting between requests"""
current_time = time.time()
time_since_last = current_time - self.last_request_time
if time_since_last < self.min_delay:
sleep_time = self.min_delay - time_since_last
time.sleep(sleep_time)
self.last_request_time = time.time()
def fetch_rss_feed(self, url):
"""Fetch and parse RSS feed"""
try:
self.logger.info(f"Fetching RSS feed: {url}")
self.rate_limit()
self.logger.info(f"Fetching RSS feed: {rss_url}")
response = requests.get(url, timeout=30)
response.raise_for_status()
# Parse RSS feed
feed = feedparser.parse(rss_url)
feed = feedparser.parse(response.content)
if feed.bozo:
self.logger.warning(f"RSS feed may have issues: {rss_url}")
# Extract URLs from entries
urls = []
for entry in feed.entries:
if hasattr(entry, 'link'):
urls.append(entry.link)
@ -158,134 +121,161 @@ class GTSHolMirDas:
return urls
except Exception as e:
self.logger.error(f"Error fetching RSS feed {url}: {e}")
self.logger.error(f"Error fetching RSS feed {rss_url}: {e}")
return []
def lookup_post(self, url):
"""Lookup a post URL on the GTS instance"""
if not self.config['gts_instance']:
self.logger.warning("No GTS instance configured")
return False
def lookup_post(self, post_url):
"""Look up a post URL using GTS search API"""
try:
lookup_url = f"{self.config['gts_instance']}/api/v1/statuses/lookup"
params = {"uri": url}
# Prepare search API call
search_url = f"{self.config['server_url']}/api/v2/search"
params = {
'q': post_url,
'type': 'statuses',
'resolve': 'true',
'limit': 1
}
headers = {
'Authorization': f'Bearer {self.config["access_token"]}',
'Content-Type': 'application/json'
}
self.rate_limit()
response = requests.get(lookup_url, params=params, timeout=10)
# Make API call
response = requests.get(
search_url,
params=params,
headers=headers,
timeout=30
)
if response.status_code == 200:
self.logger.info(f"Successfully looked up: {url}")
# Extract and store instance info
parsed_url = urlparse(url)
instance = f"{parsed_url.scheme}://{parsed_url.netloc}"
self.known_instances.add(instance)
return True
elif response.status_code == 404:
self.logger.warning(f"No results for: {url}")
return False
results = response.json()
if results.get('statuses') or results.get('accounts'):
self.logger.info(f"Successfully looked up: {post_url}")
return True
else:
self.logger.warning(f"No results for: {post_url}")
return False
else:
self.logger.warning(f"Lookup failed for {url}: {response.status_code}")
self.logger.error(f"API error {response.status_code} for {post_url}: {response.text}")
return False
except Exception as e:
self.logger.error(f"Error looking up {url}: {e}")
return False
def get_instance_count(self):
"""Get current instance count from API"""
if not self.config['gts_instance']:
return "unknown"
try:
api_url = f"{self.config['gts_instance']}/api/v1/instance"
response = requests.get(api_url, timeout=10)
if response.status_code == 200:
return len(self.known_instances)
except Exception as e:
self.logger.error(f"Failed to get instance count: {e}")
return "unknown"
except requests.exceptions.RequestException as e:
self.logger.error(f"Error looking up {post_url}: {e}")
return False
def process_feeds(self):
"""Process all RSS feeds"""
total_posts = 0
feeds_processed = 0
initial_instance_count = len(self.known_instances)
for rss_url in self.config['rss_urls']:
self.logger.info(f"Processing feed: {rss_url}")
# Fetch RSS feed
urls = self.fetch_rss_feed(rss_url)
# Filter new URLs
new_urls = [url for url in urls if url not in self.processed_urls]
if not new_urls:
self.logger.info("No new URLs to process")
continue
# Limit posts per run
limited_urls = new_urls[:self.config['max_posts_per_run']]
self.logger.info(f"Processing {len(limited_urls)} new URLs")
# Process each URL
for url in limited_urls[:self.config['max_posts_per_run']]:
if self.lookup_post(url):
total_posts += 1
# Mark as processed regardless of success
self.processed_urls.add(url)
feeds_processed += 1
new_instances = len(self.known_instances) - initial_instance_count
return {
'total_posts': total_posts,
'feeds_processed': feeds_processed,
'instance_count': len(self.known_instances),
'new_instances': new_instances
}
"""Process all configured RSS feeds"""
total_processed = 0
# Record start time for statistics
self.start_time = time.time()
# Ping healthcheck start
self.ping_healthcheck("/start")
def run(self):
"""Main execution method"""
start_time = datetime.now()
try:
stats = self.process_feeds()
# Save state
self.save_processed_urls()
self.save_known_instances()
for rss_url in self.config["rss_urls"]:
if not rss_url.strip():
continue
self.logger.info(f"Processing feed: {rss_url}")
# Get URLs from RSS
urls = self.fetch_rss_urls(rss_url)
# Filter out already processed URLs
new_urls = [url for url in urls if url not in self.processed_urls]
if not new_urls:
self.logger.info("No new URLs to process")
continue
# Rate limiting: max posts per run
urls_to_process = new_urls[:self.config["max_posts_per_run"]]
self.logger.info(f"Processing {len(urls_to_process)} new URLs")
for url in urls_to_process:
if self.lookup_post(url):
self.processed_urls.add(url)
total_processed += 1
# Rate limiting: delay between requests
time.sleep(self.config["delay_between_requests"])
# Calculate runtime
runtime = datetime.now() - start_time
end_time = time.time()
runtime_seconds = end_time - self.start_time
runtime_formatted = str(timedelta(seconds=int(runtime_seconds)))
# Print summary statistics
print("\n📊 GTS-HolMirDas Run Statistics:")
print(f" ⏱️ Runtime: {runtime}")
print(f" 📄 Total posts processed: {stats['total_posts']}")
print(f" 🌐 Current known instances: {stats['instance_count']}")
print(f" New instances discovered: +{stats['new_instances']}")
print(f" 📡 RSS feeds processed: {stats['feeds_processed']}")
# Get current instance count
try:
instance_info = requests.get(f"{self.config['server_url']}/api/v1/instance",
headers={'Authorization': f'Bearer {self.config["access_token"]}'},
timeout=10)
if instance_info.status_code == 200:
current_instances = instance_info.json().get('stats', {}).get('domain_count', 'unknown')
else:
current_instances = 'unknown'
except Exception as e:
self.logger.error(f"Failed to get instance count: {e}")
current_instances = 'unknown'
# Calculate new instances (if we have previous data)
new_instances = 'unknown'
if self.previous_instances > 0 and current_instances != 'unknown':
new_instances = current_instances - self.previous_instances
# Print comprehensive statistics
print(f"\n📊 GTS-HolMirDas Run Statistics:")
print(f" ⏱️ Runtime: {runtime_formatted}")
print(f" 📄 Total posts processed: {total_processed}")
print(f" 🌐 Current known instances: {current_instances}")
if new_instances != 'unknown' and new_instances > 0:
print(f" New instances discovered: +{new_instances}")
elif new_instances == 0:
print(f" New instances discovered: +0")
print(f" 📡 RSS feeds processed: {len(self.config['rss_urls'])}")
if runtime_seconds > 60:
print(f" ⚡ Posts per minute: {total_processed / (runtime_seconds / 60):.1f}")
self.save_processed_urls(current_instances)
# Ping healthcheck success
self.ping_healthcheck("")
except Exception as e:
self.logger.error(f"Fatal error: {e}")
self.logger.error(f"Error during processing: {e}")
# Ping healthcheck failure
self.ping_healthcheck("/fail")
raise
def ping_healthcheck(self, endpoint=""):
"""Ping healthchecks.io for monitoring"""
if not self.config.get("healthcheck_url"):
return
try:
url = self.config["healthcheck_url"] + endpoint
requests.get(url, timeout=10)
except Exception as e:
self.logger.warning(f"Failed to ping healthcheck: {e}")
def main():
"""Main function"""
"""Main entry point"""
try:
print("Starting GTS-HolMirDas run...")
fetcher = GTSHolMirDas()
fetcher.run()
print("GTS-HolMirDas run completed. Sleeping for 1 hour...")
# Validate required config
if not fetcher.config["access_token"]:
raise ValueError("GTS_ACCESS_TOKEN environment variable is required")
fetcher.process_feeds()
except Exception as e:
logging.error(f"Fatal error: {e}")
raise
if __name__ == "__main__":
main()
main()