Compare commits

...

2 commits

View file

@ -1,118 +1,155 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""
GTS-HolMirDas: RSS-based content discovery for GoToSocial
Inspired by HolMirDas by @aliceif:
- GitHub: https://github.com/aliceif/HolMirDas
- Fediverse: @aliceif@mkultra.x27.one
This GoToSocial adaptation extends the original RSS-to-ActivityPub concept
with Docker deployment, multi-instance processing, and comprehensive monitoring.
"""
import os import os
import sys
import time import time
import json
import logging import logging
import requests import requests
import feedparser import feedparser
from datetime import timedelta import json
from urllib.parse import quote_plus from urllib.parse import urlparse
from datetime import datetime
class GTSHolMirDas: class GTSHolMirDas:
def __init__(self): def __init__(self):
"""Initialize the RSS fetcher with configuration""" # Setup logging first
self.config = { self.setup_logging()
"server_url": os.getenv("GTS_SERVER_URL", "https://your-gts-instance"),
"access_token": os.getenv("GTS_ACCESS_TOKEN", ""),
"max_posts_per_run": int(os.getenv("MAX_POSTS_PER_RUN", "25")),
"delay_between_requests": int(os.getenv("DELAY_BETWEEN_REQUESTS", "2")),
"healthcheck_url": os.getenv("HEALTHCHECK_URL", ""),
"log_level": os.getenv("LOG_LEVEL", "INFO")
}
# Setup logging FIRST # Load configuration
self.config = self.load_config()
# Setup rate limiting
self.last_request_time = 0
self.min_delay = int(os.getenv("DELAY_BETWEEN_REQUESTS", "2"))
# Track processed URLs
self.data_dir = "/app/data"
self.processed_urls_file = os.path.join(self.data_dir, "processed_urls.json")
self.instances_file = os.path.join(self.data_dir, "known_instances.json")
# Ensure data directory exists
os.makedirs(self.data_dir, exist_ok=True)
# Load processed URLs and known instances
self.processed_urls = self.load_processed_urls()
self.known_instances = self.load_known_instances()
def setup_logging(self):
"""Setup logging configuration"""
logging.basicConfig( logging.basicConfig(
level=getattr(logging, self.config["log_level"]), level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s' format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
) )
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
def load_config(self):
"""Load configuration from environment variables or file"""
config = {}
# Load RSS URLs from file or environment # RSS URLs - try file first, then environment variable
rss_urls_file = os.getenv("RSS_URLS_FILE") rss_urls_file = os.getenv("RSS_URLS_FILE")
if rss_urls_file and os.path.exists(rss_urls_file): if rss_urls_file and os.path.exists(rss_urls_file):
# Load from file
try: try:
with open(rss_urls_file, 'r') as f: with open(rss_urls_file, 'r', encoding='utf-8') as f:
self.config["rss_urls"] = [ rss_urls = []
line.strip() for line in f for line_num, line in enumerate(f, 1):
if line.strip() and not line.startswith('#') line = line.strip()
] # Skip empty lines and comment-only lines
self.logger.info(f"Loaded {len(self.config['rss_urls'])} RSS URLs from file: {rss_urls_file}") if not line or line.startswith('#'):
continue
# Handle inline comments: split at # and take first part
if '#' in line:
url_part = line.split('#')[0].strip()
else:
url_part = line
# Validate URL format
if url_part and url_part.startswith('http'):
# Remove any remaining control characters
clean_url = ''.join(char for char in url_part if ord(char) >= 32)
rss_urls.append(clean_url)
self.logger.debug(f"Line {line_num}: Added URL: {clean_url}")
elif url_part:
self.logger.warning(f"Line {line_num}: Invalid URL format: {url_part}")
self.logger.info(f"Loaded {len(rss_urls)} RSS URLs from file: {rss_urls_file}")
except Exception as e: except Exception as e:
self.logger.error(f"Could not load RSS URLs from file {rss_urls_file}: {e}") self.logger.error(f"Could not load RSS URLs from file {rss_urls_file}: {e}")
self.config["rss_urls"] = [] rss_urls = []
else: else:
# Fallback to environment variable # Fallback to environment variable
self.config["rss_urls"] = [ rss_urls_env = os.getenv("RSS_URLS", "")
url.strip() for url in os.getenv("RSS_URLS", "").split(",") rss_urls = [url.strip() for url in rss_urls_env.split(",") if url.strip()]
if url.strip() if rss_urls:
] self.logger.info(f"Loaded {len(rss_urls)} RSS URLs from environment variable")
if self.config["rss_urls"]: else:
self.logger.info(f"Loaded {len(self.config['rss_urls'])} RSS URLs from environment") self.logger.warning("No RSS URLs found in file or environment variable")
config['rss_urls'] = rss_urls
config['gts_instance'] = os.getenv("GTS_SERVER_URL") or os.getenv("GTS_INSTANCE")
config['max_posts_per_run'] = int(os.getenv("MAX_POSTS_PER_RUN", "10"))
# Load processed URLs from persistent storage return config
self.processed_urls_file = "/app/data/processed_urls.json"
self.processed_urls = self.load_processed_urls()
# Statistics tracking
self.previous_instances = getattr(self, 'previous_instances', 0)
def load_processed_urls(self): def load_processed_urls(self):
"""Load previously processed URLs and instance count from file""" """Load processed URLs from file"""
try: try:
if os.path.exists(self.processed_urls_file): if os.path.exists(self.processed_urls_file):
with open(self.processed_urls_file, 'r') as f: with open(self.processed_urls_file, 'r') as f:
data = json.load(f) return set(json.load(f))
# Load previous instance count for statistics
self.previous_instances = data.get('previous_instances', 0)
return set(data.get('processed_urls', []))
except Exception as e: except Exception as e:
self.logger.warning(f"Could not load processed URLs: {e}") self.logger.warning(f"Could not load processed URLs: {e}")
return set() return set()
def save_processed_urls(self, current_instances=None): def save_processed_urls(self):
"""Save processed URLs and current instance count to file""" """Save processed URLs to file"""
try: try:
os.makedirs(os.path.dirname(self.processed_urls_file), exist_ok=True)
data = {
'processed_urls': list(self.processed_urls),
'last_updated': time.time()
}
# Save current instance count for next run
if current_instances is not None and current_instances != 'unknown':
data['previous_instances'] = current_instances
with open(self.processed_urls_file, 'w') as f: with open(self.processed_urls_file, 'w') as f:
json.dump(data, f, indent=2) json.dump(list(self.processed_urls), f)
except Exception as e: except Exception as e:
self.logger.error(f"Could not save processed URLs: {e}") self.logger.error(f"Could not save processed URLs: {e}")
def fetch_rss_urls(self, rss_url): def load_known_instances(self):
"""Fetch URLs from RSS feed""" """Load known instances from file"""
try: try:
self.logger.info(f"Fetching RSS feed: {rss_url}") if os.path.exists(self.instances_file):
with open(self.instances_file, 'r') as f:
return set(json.load(f))
except Exception as e:
self.logger.warning(f"Could not load known instances: {e}")
return set()
def save_known_instances(self):
"""Save known instances to file"""
try:
with open(self.instances_file, 'w') as f:
json.dump(list(self.known_instances), f)
except Exception as e:
self.logger.error(f"Could not save known instances: {e}")
def rate_limit(self):
"""Implement rate limiting between requests"""
current_time = time.time()
time_since_last = current_time - self.last_request_time
if time_since_last < self.min_delay:
sleep_time = self.min_delay - time_since_last
time.sleep(sleep_time)
self.last_request_time = time.time()
def fetch_rss_feed(self, url):
"""Fetch and parse RSS feed"""
try:
self.logger.info(f"Fetching RSS feed: {url}")
self.rate_limit()
# Parse RSS feed response = requests.get(url, timeout=30)
feed = feedparser.parse(rss_url) response.raise_for_status()
if feed.bozo: feed = feedparser.parse(response.content)
self.logger.warning(f"RSS feed may have issues: {rss_url}")
# Extract URLs from entries
urls = [] urls = []
for entry in feed.entries: for entry in feed.entries:
if hasattr(entry, 'link'): if hasattr(entry, 'link'):
urls.append(entry.link) urls.append(entry.link)
@ -121,161 +158,134 @@ class GTSHolMirDas:
return urls return urls
except Exception as e: except Exception as e:
self.logger.error(f"Error fetching RSS feed {rss_url}: {e}") self.logger.error(f"Error fetching RSS feed {url}: {e}")
return [] return []
def lookup_post(self, post_url): def lookup_post(self, url):
"""Look up a post URL using GTS search API""" """Lookup a post URL on the GTS instance"""
try: if not self.config['gts_instance']:
# Prepare search API call self.logger.warning("No GTS instance configured")
search_url = f"{self.config['server_url']}/api/v2/search" return False
params = {
'q': post_url,
'type': 'statuses',
'resolve': 'true',
'limit': 1
}
headers = {
'Authorization': f'Bearer {self.config["access_token"]}',
'Content-Type': 'application/json'
}
# Make API call try:
response = requests.get( lookup_url = f"{self.config['gts_instance']}/api/v1/statuses/lookup"
search_url, params = {"uri": url}
params=params,
headers=headers, self.rate_limit()
timeout=30 response = requests.get(lookup_url, params=params, timeout=10)
)
if response.status_code == 200: if response.status_code == 200:
results = response.json() self.logger.info(f"Successfully looked up: {url}")
if results.get('statuses') or results.get('accounts'): # Extract and store instance info
self.logger.info(f"Successfully looked up: {post_url}") parsed_url = urlparse(url)
return True instance = f"{parsed_url.scheme}://{parsed_url.netloc}"
else: self.known_instances.add(instance)
self.logger.warning(f"No results for: {post_url}") return True
return False elif response.status_code == 404:
else: self.logger.warning(f"No results for: {url}")
self.logger.error(f"API error {response.status_code} for {post_url}: {response.text}")
return False return False
else:
except requests.exceptions.RequestException as e: self.logger.warning(f"Lookup failed for {url}: {response.status_code}")
self.logger.error(f"Error looking up {post_url}: {e}") return False
except Exception as e:
self.logger.error(f"Error looking up {url}: {e}")
return False return False
def process_feeds(self): def get_instance_count(self):
"""Process all configured RSS feeds""" """Get current instance count from API"""
total_processed = 0 if not self.config['gts_instance']:
return "unknown"
# Record start time for statistics
self.start_time = time.time()
# Ping healthcheck start
self.ping_healthcheck("/start")
try: try:
for rss_url in self.config["rss_urls"]: api_url = f"{self.config['gts_instance']}/api/v1/instance"
if not rss_url.strip(): response = requests.get(api_url, timeout=10)
continue if response.status_code == 200:
return len(self.known_instances)
self.logger.info(f"Processing feed: {rss_url}")
# Get URLs from RSS
urls = self.fetch_rss_urls(rss_url)
# Filter out already processed URLs
new_urls = [url for url in urls if url not in self.processed_urls]
if not new_urls:
self.logger.info("No new URLs to process")
continue
# Rate limiting: max posts per run
urls_to_process = new_urls[:self.config["max_posts_per_run"]]
self.logger.info(f"Processing {len(urls_to_process)} new URLs")
for url in urls_to_process:
if self.lookup_post(url):
self.processed_urls.add(url)
total_processed += 1
# Rate limiting: delay between requests
time.sleep(self.config["delay_between_requests"])
# Calculate runtime
end_time = time.time()
runtime_seconds = end_time - self.start_time
runtime_formatted = str(timedelta(seconds=int(runtime_seconds)))
# Get current instance count
try:
instance_info = requests.get(f"{self.config['server_url']}/api/v1/instance",
headers={'Authorization': f'Bearer {self.config["access_token"]}'},
timeout=10)
if instance_info.status_code == 200:
current_instances = instance_info.json().get('stats', {}).get('domain_count', 'unknown')
else:
current_instances = 'unknown'
except Exception as e:
self.logger.error(f"Failed to get instance count: {e}")
current_instances = 'unknown'
# Calculate new instances (if we have previous data)
new_instances = 'unknown'
if self.previous_instances > 0 and current_instances != 'unknown':
new_instances = current_instances - self.previous_instances
# Print comprehensive statistics
print(f"\n📊 GTS-HolMirDas Run Statistics:")
print(f" ⏱️ Runtime: {runtime_formatted}")
print(f" 📄 Total posts processed: {total_processed}")
print(f" 🌐 Current known instances: {current_instances}")
if new_instances != 'unknown' and new_instances > 0:
print(f" New instances discovered: +{new_instances}")
elif new_instances == 0:
print(f" New instances discovered: +0")
print(f" 📡 RSS feeds processed: {len(self.config['rss_urls'])}")
if runtime_seconds > 60:
print(f" ⚡ Posts per minute: {total_processed / (runtime_seconds / 60):.1f}")
self.save_processed_urls(current_instances)
# Ping healthcheck success
self.ping_healthcheck("")
except Exception as e: except Exception as e:
self.logger.error(f"Error during processing: {e}") self.logger.error(f"Failed to get instance count: {e}")
# Ping healthcheck failure
self.ping_healthcheck("/fail") return "unknown"
def process_feeds(self):
"""Process all RSS feeds"""
total_posts = 0
feeds_processed = 0
initial_instance_count = len(self.known_instances)
for rss_url in self.config['rss_urls']:
self.logger.info(f"Processing feed: {rss_url}")
# Fetch RSS feed
urls = self.fetch_rss_feed(rss_url)
# Filter new URLs
new_urls = [url for url in urls if url not in self.processed_urls]
if not new_urls:
self.logger.info("No new URLs to process")
continue
# Limit posts per run
limited_urls = new_urls[:self.config['max_posts_per_run']]
self.logger.info(f"Processing {len(limited_urls)} new URLs")
# Process each URL
for url in limited_urls[:self.config['max_posts_per_run']]:
if self.lookup_post(url):
total_posts += 1
# Mark as processed regardless of success
self.processed_urls.add(url)
feeds_processed += 1
new_instances = len(self.known_instances) - initial_instance_count
return {
'total_posts': total_posts,
'feeds_processed': feeds_processed,
'instance_count': len(self.known_instances),
'new_instances': new_instances
}
def run(self):
"""Main execution method"""
start_time = datetime.now()
try:
stats = self.process_feeds()
# Save state
self.save_processed_urls()
self.save_known_instances()
# Calculate runtime
runtime = datetime.now() - start_time
# Print summary statistics
print("\n📊 GTS-HolMirDas Run Statistics:")
print(f" ⏱️ Runtime: {runtime}")
print(f" 📄 Total posts processed: {stats['total_posts']}")
print(f" 🌐 Current known instances: {stats['instance_count']}")
print(f" New instances discovered: +{stats['new_instances']}")
print(f" 📡 RSS feeds processed: {stats['feeds_processed']}")
except Exception as e:
self.logger.error(f"Fatal error: {e}")
raise raise
def ping_healthcheck(self, endpoint=""):
"""Ping healthchecks.io for monitoring"""
if not self.config.get("healthcheck_url"):
return
try:
url = self.config["healthcheck_url"] + endpoint
requests.get(url, timeout=10)
except Exception as e:
self.logger.warning(f"Failed to ping healthcheck: {e}")
def main(): def main():
"""Main entry point""" """Main function"""
try: try:
print("Starting GTS-HolMirDas run...")
fetcher = GTSHolMirDas() fetcher = GTSHolMirDas()
fetcher.run()
# Validate required config print("GTS-HolMirDas run completed. Sleeping for 1 hour...")
if not fetcher.config["access_token"]:
raise ValueError("GTS_ACCESS_TOKEN environment variable is required")
fetcher.process_feeds()
except Exception as e: except Exception as e:
logging.error(f"Fatal error: {e}") logging.error(f"Fatal error: {e}")
raise raise
if __name__ == "__main__": if __name__ == "__main__":
main() main()