Compare commits

..

No commits in common. "2dd7e9162ec7d966b86b0a61ee3646a18a55d039" and "0cbac9b31c7f826dcc58514fb6796c5fa8846c09" have entirely different histories.

View file

@ -1,155 +1,118 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""
GTS-HolMirDas: RSS-based content discovery for GoToSocial
Inspired by HolMirDas by @aliceif:
- GitHub: https://github.com/aliceif/HolMirDas
- Fediverse: @aliceif@mkultra.x27.one
This GoToSocial adaptation extends the original RSS-to-ActivityPub concept
with Docker deployment, multi-instance processing, and comprehensive monitoring.
"""
import os import os
import sys
import time import time
import json
import logging import logging
import requests import requests
import feedparser import feedparser
import json from datetime import timedelta
from urllib.parse import urlparse from urllib.parse import quote_plus
from datetime import datetime
class GTSHolMirDas: class GTSHolMirDas:
def __init__(self): def __init__(self):
# Setup logging first """Initialize the RSS fetcher with configuration"""
self.setup_logging() self.config = {
"server_url": os.getenv("GTS_SERVER_URL", "https://your-gts-instance"),
"access_token": os.getenv("GTS_ACCESS_TOKEN", ""),
"max_posts_per_run": int(os.getenv("MAX_POSTS_PER_RUN", "25")),
"delay_between_requests": int(os.getenv("DELAY_BETWEEN_REQUESTS", "2")),
"healthcheck_url": os.getenv("HEALTHCHECK_URL", ""),
"log_level": os.getenv("LOG_LEVEL", "INFO")
}
# Load configuration # Setup logging FIRST
self.config = self.load_config()
# Setup rate limiting
self.last_request_time = 0
self.min_delay = int(os.getenv("DELAY_BETWEEN_REQUESTS", "2"))
# Track processed URLs
self.data_dir = "/app/data"
self.processed_urls_file = os.path.join(self.data_dir, "processed_urls.json")
self.instances_file = os.path.join(self.data_dir, "known_instances.json")
# Ensure data directory exists
os.makedirs(self.data_dir, exist_ok=True)
# Load processed URLs and known instances
self.processed_urls = self.load_processed_urls()
self.known_instances = self.load_known_instances()
def setup_logging(self):
"""Setup logging configuration"""
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=getattr(logging, self.config["log_level"]),
format='%(asctime)s - %(levelname)s - %(message)s', format='%(asctime)s - %(levelname)s - %(message)s'
handlers=[logging.StreamHandler()]
) )
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
def load_config(self):
"""Load configuration from environment variables or file"""
config = {}
# RSS URLs - try file first, then environment variable # Load RSS URLs from file or environment
rss_urls_file = os.getenv("RSS_URLS_FILE") rss_urls_file = os.getenv("RSS_URLS_FILE")
if rss_urls_file and os.path.exists(rss_urls_file): if rss_urls_file and os.path.exists(rss_urls_file):
# Load from file
try: try:
with open(rss_urls_file, 'r', encoding='utf-8') as f: with open(rss_urls_file, 'r') as f:
rss_urls = [] self.config["rss_urls"] = [
for line_num, line in enumerate(f, 1): line.strip() for line in f
line = line.strip() if line.strip() and not line.startswith('#')
# Skip empty lines and comment-only lines ]
if not line or line.startswith('#'): self.logger.info(f"Loaded {len(self.config['rss_urls'])} RSS URLs from file: {rss_urls_file}")
continue
# Handle inline comments: split at # and take first part
if '#' in line:
url_part = line.split('#')[0].strip()
else:
url_part = line
# Validate URL format
if url_part and url_part.startswith('http'):
# Remove any remaining control characters
clean_url = ''.join(char for char in url_part if ord(char) >= 32)
rss_urls.append(clean_url)
self.logger.debug(f"Line {line_num}: Added URL: {clean_url}")
elif url_part:
self.logger.warning(f"Line {line_num}: Invalid URL format: {url_part}")
self.logger.info(f"Loaded {len(rss_urls)} RSS URLs from file: {rss_urls_file}")
except Exception as e: except Exception as e:
self.logger.error(f"Could not load RSS URLs from file {rss_urls_file}: {e}") self.logger.error(f"Could not load RSS URLs from file {rss_urls_file}: {e}")
rss_urls = [] self.config["rss_urls"] = []
else: else:
# Fallback to environment variable # Fallback to environment variable
rss_urls_env = os.getenv("RSS_URLS", "") self.config["rss_urls"] = [
rss_urls = [url.strip() for url in rss_urls_env.split(",") if url.strip()] url.strip() for url in os.getenv("RSS_URLS", "").split(",")
if rss_urls: if url.strip()
self.logger.info(f"Loaded {len(rss_urls)} RSS URLs from environment variable") ]
else: if self.config["rss_urls"]:
self.logger.warning("No RSS URLs found in file or environment variable") self.logger.info(f"Loaded {len(self.config['rss_urls'])} RSS URLs from environment")
config['rss_urls'] = rss_urls
config['gts_instance'] = os.getenv("GTS_SERVER_URL") or os.getenv("GTS_INSTANCE")
config['max_posts_per_run'] = int(os.getenv("MAX_POSTS_PER_RUN", "10"))
return config # Load processed URLs from persistent storage
self.processed_urls_file = "/app/data/processed_urls.json"
self.processed_urls = self.load_processed_urls()
# Statistics tracking
self.previous_instances = getattr(self, 'previous_instances', 0)
def load_processed_urls(self): def load_processed_urls(self):
"""Load processed URLs from file""" """Load previously processed URLs and instance count from file"""
try: try:
if os.path.exists(self.processed_urls_file): if os.path.exists(self.processed_urls_file):
with open(self.processed_urls_file, 'r') as f: with open(self.processed_urls_file, 'r') as f:
return set(json.load(f)) data = json.load(f)
# Load previous instance count for statistics
self.previous_instances = data.get('previous_instances', 0)
return set(data.get('processed_urls', []))
except Exception as e: except Exception as e:
self.logger.warning(f"Could not load processed URLs: {e}") self.logger.warning(f"Could not load processed URLs: {e}")
return set() return set()
def save_processed_urls(self): def save_processed_urls(self, current_instances=None):
"""Save processed URLs to file""" """Save processed URLs and current instance count to file"""
try: try:
os.makedirs(os.path.dirname(self.processed_urls_file), exist_ok=True)
data = {
'processed_urls': list(self.processed_urls),
'last_updated': time.time()
}
# Save current instance count for next run
if current_instances is not None and current_instances != 'unknown':
data['previous_instances'] = current_instances
with open(self.processed_urls_file, 'w') as f: with open(self.processed_urls_file, 'w') as f:
json.dump(list(self.processed_urls), f) json.dump(data, f, indent=2)
except Exception as e: except Exception as e:
self.logger.error(f"Could not save processed URLs: {e}") self.logger.error(f"Could not save processed URLs: {e}")
def load_known_instances(self): def fetch_rss_urls(self, rss_url):
"""Load known instances from file""" """Fetch URLs from RSS feed"""
try: try:
if os.path.exists(self.instances_file): self.logger.info(f"Fetching RSS feed: {rss_url}")
with open(self.instances_file, 'r') as f:
return set(json.load(f))
except Exception as e:
self.logger.warning(f"Could not load known instances: {e}")
return set()
def save_known_instances(self):
"""Save known instances to file"""
try:
with open(self.instances_file, 'w') as f:
json.dump(list(self.known_instances), f)
except Exception as e:
self.logger.error(f"Could not save known instances: {e}")
def rate_limit(self):
"""Implement rate limiting between requests"""
current_time = time.time()
time_since_last = current_time - self.last_request_time
if time_since_last < self.min_delay:
sleep_time = self.min_delay - time_since_last
time.sleep(sleep_time)
self.last_request_time = time.time()
def fetch_rss_feed(self, url):
"""Fetch and parse RSS feed"""
try:
self.logger.info(f"Fetching RSS feed: {url}")
self.rate_limit()
response = requests.get(url, timeout=30) # Parse RSS feed
response.raise_for_status() feed = feedparser.parse(rss_url)
feed = feedparser.parse(response.content) if feed.bozo:
self.logger.warning(f"RSS feed may have issues: {rss_url}")
# Extract URLs from entries
urls = [] urls = []
for entry in feed.entries: for entry in feed.entries:
if hasattr(entry, 'link'): if hasattr(entry, 'link'):
urls.append(entry.link) urls.append(entry.link)
@ -158,134 +121,161 @@ class GTSHolMirDas:
return urls return urls
except Exception as e: except Exception as e:
self.logger.error(f"Error fetching RSS feed {url}: {e}") self.logger.error(f"Error fetching RSS feed {rss_url}: {e}")
return [] return []
def lookup_post(self, url): def lookup_post(self, post_url):
"""Lookup a post URL on the GTS instance""" """Look up a post URL using GTS search API"""
if not self.config['gts_instance']:
self.logger.warning("No GTS instance configured")
return False
try: try:
lookup_url = f"{self.config['gts_instance']}/api/v1/statuses/lookup" # Prepare search API call
params = {"uri": url} search_url = f"{self.config['server_url']}/api/v2/search"
params = {
'q': post_url,
'type': 'statuses',
'resolve': 'true',
'limit': 1
}
headers = {
'Authorization': f'Bearer {self.config["access_token"]}',
'Content-Type': 'application/json'
}
self.rate_limit() # Make API call
response = requests.get(lookup_url, params=params, timeout=10) response = requests.get(
search_url,
params=params,
headers=headers,
timeout=30
)
if response.status_code == 200: if response.status_code == 200:
self.logger.info(f"Successfully looked up: {url}") results = response.json()
# Extract and store instance info if results.get('statuses') or results.get('accounts'):
parsed_url = urlparse(url) self.logger.info(f"Successfully looked up: {post_url}")
instance = f"{parsed_url.scheme}://{parsed_url.netloc}" return True
self.known_instances.add(instance) else:
return True self.logger.warning(f"No results for: {post_url}")
elif response.status_code == 404: return False
self.logger.warning(f"No results for: {url}")
return False
else: else:
self.logger.warning(f"Lookup failed for {url}: {response.status_code}") self.logger.error(f"API error {response.status_code} for {post_url}: {response.text}")
return False return False
except Exception as e:
self.logger.error(f"Error looking up {url}: {e}")
return False
def get_instance_count(self): except requests.exceptions.RequestException as e:
"""Get current instance count from API""" self.logger.error(f"Error looking up {post_url}: {e}")
if not self.config['gts_instance']: return False
return "unknown"
try:
api_url = f"{self.config['gts_instance']}/api/v1/instance"
response = requests.get(api_url, timeout=10)
if response.status_code == 200:
return len(self.known_instances)
except Exception as e:
self.logger.error(f"Failed to get instance count: {e}")
return "unknown"
def process_feeds(self): def process_feeds(self):
"""Process all RSS feeds""" """Process all configured RSS feeds"""
total_posts = 0 total_processed = 0
feeds_processed = 0
initial_instance_count = len(self.known_instances) # Record start time for statistics
self.start_time = time.time()
for rss_url in self.config['rss_urls']:
self.logger.info(f"Processing feed: {rss_url}") # Ping healthcheck start
self.ping_healthcheck("/start")
# Fetch RSS feed
urls = self.fetch_rss_feed(rss_url)
# Filter new URLs
new_urls = [url for url in urls if url not in self.processed_urls]
if not new_urls:
self.logger.info("No new URLs to process")
continue
# Limit posts per run
limited_urls = new_urls[:self.config['max_posts_per_run']]
self.logger.info(f"Processing {len(limited_urls)} new URLs")
# Process each URL
for url in limited_urls[:self.config['max_posts_per_run']]:
if self.lookup_post(url):
total_posts += 1
# Mark as processed regardless of success
self.processed_urls.add(url)
feeds_processed += 1
new_instances = len(self.known_instances) - initial_instance_count
return {
'total_posts': total_posts,
'feeds_processed': feeds_processed,
'instance_count': len(self.known_instances),
'new_instances': new_instances
}
def run(self):
"""Main execution method"""
start_time = datetime.now()
try: try:
stats = self.process_feeds() for rss_url in self.config["rss_urls"]:
if not rss_url.strip():
# Save state continue
self.save_processed_urls()
self.save_known_instances() self.logger.info(f"Processing feed: {rss_url}")
# Get URLs from RSS
urls = self.fetch_rss_urls(rss_url)
# Filter out already processed URLs
new_urls = [url for url in urls if url not in self.processed_urls]
if not new_urls:
self.logger.info("No new URLs to process")
continue
# Rate limiting: max posts per run
urls_to_process = new_urls[:self.config["max_posts_per_run"]]
self.logger.info(f"Processing {len(urls_to_process)} new URLs")
for url in urls_to_process:
if self.lookup_post(url):
self.processed_urls.add(url)
total_processed += 1
# Rate limiting: delay between requests
time.sleep(self.config["delay_between_requests"])
# Calculate runtime # Calculate runtime
runtime = datetime.now() - start_time end_time = time.time()
runtime_seconds = end_time - self.start_time
runtime_formatted = str(timedelta(seconds=int(runtime_seconds)))
# Print summary statistics # Get current instance count
print("\n📊 GTS-HolMirDas Run Statistics:") try:
print(f" ⏱️ Runtime: {runtime}") instance_info = requests.get(f"{self.config['server_url']}/api/v1/instance",
print(f" 📄 Total posts processed: {stats['total_posts']}") headers={'Authorization': f'Bearer {self.config["access_token"]}'},
print(f" 🌐 Current known instances: {stats['instance_count']}") timeout=10)
print(f" New instances discovered: +{stats['new_instances']}") if instance_info.status_code == 200:
print(f" 📡 RSS feeds processed: {stats['feeds_processed']}") current_instances = instance_info.json().get('stats', {}).get('domain_count', 'unknown')
else:
current_instances = 'unknown'
except Exception as e:
self.logger.error(f"Failed to get instance count: {e}")
current_instances = 'unknown'
# Calculate new instances (if we have previous data)
new_instances = 'unknown'
if self.previous_instances > 0 and current_instances != 'unknown':
new_instances = current_instances - self.previous_instances
# Print comprehensive statistics
print(f"\n📊 GTS-HolMirDas Run Statistics:")
print(f" ⏱️ Runtime: {runtime_formatted}")
print(f" 📄 Total posts processed: {total_processed}")
print(f" 🌐 Current known instances: {current_instances}")
if new_instances != 'unknown' and new_instances > 0:
print(f" New instances discovered: +{new_instances}")
elif new_instances == 0:
print(f" New instances discovered: +0")
print(f" 📡 RSS feeds processed: {len(self.config['rss_urls'])}")
if runtime_seconds > 60:
print(f" ⚡ Posts per minute: {total_processed / (runtime_seconds / 60):.1f}")
self.save_processed_urls(current_instances)
# Ping healthcheck success
self.ping_healthcheck("")
except Exception as e: except Exception as e:
self.logger.error(f"Fatal error: {e}") self.logger.error(f"Error during processing: {e}")
# Ping healthcheck failure
self.ping_healthcheck("/fail")
raise raise
def ping_healthcheck(self, endpoint=""):
"""Ping healthchecks.io for monitoring"""
if not self.config.get("healthcheck_url"):
return
try:
url = self.config["healthcheck_url"] + endpoint
requests.get(url, timeout=10)
except Exception as e:
self.logger.warning(f"Failed to ping healthcheck: {e}")
def main(): def main():
"""Main function""" """Main entry point"""
try: try:
print("Starting GTS-HolMirDas run...")
fetcher = GTSHolMirDas() fetcher = GTSHolMirDas()
fetcher.run()
print("GTS-HolMirDas run completed. Sleeping for 1 hour...") # Validate required config
if not fetcher.config["access_token"]:
raise ValueError("GTS_ACCESS_TOKEN environment variable is required")
fetcher.process_feeds()
except Exception as e: except Exception as e:
logging.error(f"Fatal error: {e}") logging.error(f"Fatal error: {e}")
raise raise
if __name__ == "__main__": if __name__ == "__main__":
main() main()