Compare commits
No commits in common. "2dd7e9162ec7d966b86b0a61ee3646a18a55d039" and "0cbac9b31c7f826dcc58514fb6796c5fa8846c09" have entirely different histories.
2dd7e9162e
...
0cbac9b31c
1 changed files with 206 additions and 216 deletions
422
gts_holmirdas.py
422
gts_holmirdas.py
|
@ -1,155 +1,118 @@
|
||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
GTS-HolMirDas: RSS-based content discovery for GoToSocial
|
||||||
|
|
||||||
|
Inspired by HolMirDas by @aliceif:
|
||||||
|
- GitHub: https://github.com/aliceif/HolMirDas
|
||||||
|
- Fediverse: @aliceif@mkultra.x27.one
|
||||||
|
|
||||||
|
This GoToSocial adaptation extends the original RSS-to-ActivityPub concept
|
||||||
|
with Docker deployment, multi-instance processing, and comprehensive monitoring.
|
||||||
|
"""
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
import sys
|
||||||
import time
|
import time
|
||||||
|
import json
|
||||||
import logging
|
import logging
|
||||||
import requests
|
import requests
|
||||||
import feedparser
|
import feedparser
|
||||||
import json
|
from datetime import timedelta
|
||||||
from urllib.parse import urlparse
|
from urllib.parse import quote_plus
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
class GTSHolMirDas:
|
class GTSHolMirDas:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
# Setup logging first
|
"""Initialize the RSS fetcher with configuration"""
|
||||||
self.setup_logging()
|
self.config = {
|
||||||
|
"server_url": os.getenv("GTS_SERVER_URL", "https://your-gts-instance"),
|
||||||
|
"access_token": os.getenv("GTS_ACCESS_TOKEN", ""),
|
||||||
|
"max_posts_per_run": int(os.getenv("MAX_POSTS_PER_RUN", "25")),
|
||||||
|
"delay_between_requests": int(os.getenv("DELAY_BETWEEN_REQUESTS", "2")),
|
||||||
|
"healthcheck_url": os.getenv("HEALTHCHECK_URL", ""),
|
||||||
|
"log_level": os.getenv("LOG_LEVEL", "INFO")
|
||||||
|
}
|
||||||
|
|
||||||
# Load configuration
|
# Setup logging FIRST
|
||||||
self.config = self.load_config()
|
|
||||||
|
|
||||||
# Setup rate limiting
|
|
||||||
self.last_request_time = 0
|
|
||||||
self.min_delay = int(os.getenv("DELAY_BETWEEN_REQUESTS", "2"))
|
|
||||||
|
|
||||||
# Track processed URLs
|
|
||||||
self.data_dir = "/app/data"
|
|
||||||
self.processed_urls_file = os.path.join(self.data_dir, "processed_urls.json")
|
|
||||||
self.instances_file = os.path.join(self.data_dir, "known_instances.json")
|
|
||||||
|
|
||||||
# Ensure data directory exists
|
|
||||||
os.makedirs(self.data_dir, exist_ok=True)
|
|
||||||
|
|
||||||
# Load processed URLs and known instances
|
|
||||||
self.processed_urls = self.load_processed_urls()
|
|
||||||
self.known_instances = self.load_known_instances()
|
|
||||||
|
|
||||||
def setup_logging(self):
|
|
||||||
"""Setup logging configuration"""
|
|
||||||
logging.basicConfig(
|
logging.basicConfig(
|
||||||
level=logging.INFO,
|
level=getattr(logging, self.config["log_level"]),
|
||||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
||||||
handlers=[logging.StreamHandler()]
|
|
||||||
)
|
)
|
||||||
self.logger = logging.getLogger(__name__)
|
self.logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
def load_config(self):
|
|
||||||
"""Load configuration from environment variables or file"""
|
|
||||||
config = {}
|
|
||||||
|
|
||||||
# RSS URLs - try file first, then environment variable
|
# Load RSS URLs from file or environment
|
||||||
rss_urls_file = os.getenv("RSS_URLS_FILE")
|
rss_urls_file = os.getenv("RSS_URLS_FILE")
|
||||||
if rss_urls_file and os.path.exists(rss_urls_file):
|
if rss_urls_file and os.path.exists(rss_urls_file):
|
||||||
|
# Load from file
|
||||||
try:
|
try:
|
||||||
with open(rss_urls_file, 'r', encoding='utf-8') as f:
|
with open(rss_urls_file, 'r') as f:
|
||||||
rss_urls = []
|
self.config["rss_urls"] = [
|
||||||
for line_num, line in enumerate(f, 1):
|
line.strip() for line in f
|
||||||
line = line.strip()
|
if line.strip() and not line.startswith('#')
|
||||||
# Skip empty lines and comment-only lines
|
]
|
||||||
if not line or line.startswith('#'):
|
self.logger.info(f"Loaded {len(self.config['rss_urls'])} RSS URLs from file: {rss_urls_file}")
|
||||||
continue
|
|
||||||
|
|
||||||
# Handle inline comments: split at # and take first part
|
|
||||||
if '#' in line:
|
|
||||||
url_part = line.split('#')[0].strip()
|
|
||||||
else:
|
|
||||||
url_part = line
|
|
||||||
|
|
||||||
# Validate URL format
|
|
||||||
if url_part and url_part.startswith('http'):
|
|
||||||
# Remove any remaining control characters
|
|
||||||
clean_url = ''.join(char for char in url_part if ord(char) >= 32)
|
|
||||||
rss_urls.append(clean_url)
|
|
||||||
self.logger.debug(f"Line {line_num}: Added URL: {clean_url}")
|
|
||||||
elif url_part:
|
|
||||||
self.logger.warning(f"Line {line_num}: Invalid URL format: {url_part}")
|
|
||||||
|
|
||||||
self.logger.info(f"Loaded {len(rss_urls)} RSS URLs from file: {rss_urls_file}")
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.error(f"Could not load RSS URLs from file {rss_urls_file}: {e}")
|
self.logger.error(f"Could not load RSS URLs from file {rss_urls_file}: {e}")
|
||||||
rss_urls = []
|
self.config["rss_urls"] = []
|
||||||
else:
|
else:
|
||||||
# Fallback to environment variable
|
# Fallback to environment variable
|
||||||
rss_urls_env = os.getenv("RSS_URLS", "")
|
self.config["rss_urls"] = [
|
||||||
rss_urls = [url.strip() for url in rss_urls_env.split(",") if url.strip()]
|
url.strip() for url in os.getenv("RSS_URLS", "").split(",")
|
||||||
if rss_urls:
|
if url.strip()
|
||||||
self.logger.info(f"Loaded {len(rss_urls)} RSS URLs from environment variable")
|
]
|
||||||
else:
|
if self.config["rss_urls"]:
|
||||||
self.logger.warning("No RSS URLs found in file or environment variable")
|
self.logger.info(f"Loaded {len(self.config['rss_urls'])} RSS URLs from environment")
|
||||||
|
|
||||||
config['rss_urls'] = rss_urls
|
|
||||||
config['gts_instance'] = os.getenv("GTS_SERVER_URL") or os.getenv("GTS_INSTANCE")
|
|
||||||
config['max_posts_per_run'] = int(os.getenv("MAX_POSTS_PER_RUN", "10"))
|
|
||||||
|
|
||||||
return config
|
# Load processed URLs from persistent storage
|
||||||
|
self.processed_urls_file = "/app/data/processed_urls.json"
|
||||||
|
self.processed_urls = self.load_processed_urls()
|
||||||
|
|
||||||
|
# Statistics tracking
|
||||||
|
self.previous_instances = getattr(self, 'previous_instances', 0)
|
||||||
|
|
||||||
def load_processed_urls(self):
|
def load_processed_urls(self):
|
||||||
"""Load processed URLs from file"""
|
"""Load previously processed URLs and instance count from file"""
|
||||||
try:
|
try:
|
||||||
if os.path.exists(self.processed_urls_file):
|
if os.path.exists(self.processed_urls_file):
|
||||||
with open(self.processed_urls_file, 'r') as f:
|
with open(self.processed_urls_file, 'r') as f:
|
||||||
return set(json.load(f))
|
data = json.load(f)
|
||||||
|
# Load previous instance count for statistics
|
||||||
|
self.previous_instances = data.get('previous_instances', 0)
|
||||||
|
return set(data.get('processed_urls', []))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.warning(f"Could not load processed URLs: {e}")
|
self.logger.warning(f"Could not load processed URLs: {e}")
|
||||||
|
|
||||||
return set()
|
return set()
|
||||||
|
|
||||||
def save_processed_urls(self):
|
def save_processed_urls(self, current_instances=None):
|
||||||
"""Save processed URLs to file"""
|
"""Save processed URLs and current instance count to file"""
|
||||||
try:
|
try:
|
||||||
|
os.makedirs(os.path.dirname(self.processed_urls_file), exist_ok=True)
|
||||||
|
data = {
|
||||||
|
'processed_urls': list(self.processed_urls),
|
||||||
|
'last_updated': time.time()
|
||||||
|
}
|
||||||
|
# Save current instance count for next run
|
||||||
|
if current_instances is not None and current_instances != 'unknown':
|
||||||
|
data['previous_instances'] = current_instances
|
||||||
|
|
||||||
with open(self.processed_urls_file, 'w') as f:
|
with open(self.processed_urls_file, 'w') as f:
|
||||||
json.dump(list(self.processed_urls), f)
|
json.dump(data, f, indent=2)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.error(f"Could not save processed URLs: {e}")
|
self.logger.error(f"Could not save processed URLs: {e}")
|
||||||
|
|
||||||
def load_known_instances(self):
|
def fetch_rss_urls(self, rss_url):
|
||||||
"""Load known instances from file"""
|
"""Fetch URLs from RSS feed"""
|
||||||
try:
|
try:
|
||||||
if os.path.exists(self.instances_file):
|
self.logger.info(f"Fetching RSS feed: {rss_url}")
|
||||||
with open(self.instances_file, 'r') as f:
|
|
||||||
return set(json.load(f))
|
|
||||||
except Exception as e:
|
|
||||||
self.logger.warning(f"Could not load known instances: {e}")
|
|
||||||
return set()
|
|
||||||
|
|
||||||
def save_known_instances(self):
|
|
||||||
"""Save known instances to file"""
|
|
||||||
try:
|
|
||||||
with open(self.instances_file, 'w') as f:
|
|
||||||
json.dump(list(self.known_instances), f)
|
|
||||||
except Exception as e:
|
|
||||||
self.logger.error(f"Could not save known instances: {e}")
|
|
||||||
|
|
||||||
def rate_limit(self):
|
|
||||||
"""Implement rate limiting between requests"""
|
|
||||||
current_time = time.time()
|
|
||||||
time_since_last = current_time - self.last_request_time
|
|
||||||
|
|
||||||
if time_since_last < self.min_delay:
|
|
||||||
sleep_time = self.min_delay - time_since_last
|
|
||||||
time.sleep(sleep_time)
|
|
||||||
|
|
||||||
self.last_request_time = time.time()
|
|
||||||
|
|
||||||
def fetch_rss_feed(self, url):
|
|
||||||
"""Fetch and parse RSS feed"""
|
|
||||||
try:
|
|
||||||
self.logger.info(f"Fetching RSS feed: {url}")
|
|
||||||
self.rate_limit()
|
|
||||||
|
|
||||||
response = requests.get(url, timeout=30)
|
# Parse RSS feed
|
||||||
response.raise_for_status()
|
feed = feedparser.parse(rss_url)
|
||||||
|
|
||||||
feed = feedparser.parse(response.content)
|
if feed.bozo:
|
||||||
|
self.logger.warning(f"RSS feed may have issues: {rss_url}")
|
||||||
|
|
||||||
|
# Extract URLs from entries
|
||||||
urls = []
|
urls = []
|
||||||
|
|
||||||
for entry in feed.entries:
|
for entry in feed.entries:
|
||||||
if hasattr(entry, 'link'):
|
if hasattr(entry, 'link'):
|
||||||
urls.append(entry.link)
|
urls.append(entry.link)
|
||||||
|
@ -158,134 +121,161 @@ class GTSHolMirDas:
|
||||||
return urls
|
return urls
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.error(f"Error fetching RSS feed {url}: {e}")
|
self.logger.error(f"Error fetching RSS feed {rss_url}: {e}")
|
||||||
return []
|
return []
|
||||||
|
|
||||||
def lookup_post(self, url):
|
def lookup_post(self, post_url):
|
||||||
"""Lookup a post URL on the GTS instance"""
|
"""Look up a post URL using GTS search API"""
|
||||||
if not self.config['gts_instance']:
|
|
||||||
self.logger.warning("No GTS instance configured")
|
|
||||||
return False
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
lookup_url = f"{self.config['gts_instance']}/api/v1/statuses/lookup"
|
# Prepare search API call
|
||||||
params = {"uri": url}
|
search_url = f"{self.config['server_url']}/api/v2/search"
|
||||||
|
params = {
|
||||||
|
'q': post_url,
|
||||||
|
'type': 'statuses',
|
||||||
|
'resolve': 'true',
|
||||||
|
'limit': 1
|
||||||
|
}
|
||||||
|
headers = {
|
||||||
|
'Authorization': f'Bearer {self.config["access_token"]}',
|
||||||
|
'Content-Type': 'application/json'
|
||||||
|
}
|
||||||
|
|
||||||
self.rate_limit()
|
# Make API call
|
||||||
response = requests.get(lookup_url, params=params, timeout=10)
|
response = requests.get(
|
||||||
|
search_url,
|
||||||
|
params=params,
|
||||||
|
headers=headers,
|
||||||
|
timeout=30
|
||||||
|
)
|
||||||
|
|
||||||
if response.status_code == 200:
|
if response.status_code == 200:
|
||||||
self.logger.info(f"Successfully looked up: {url}")
|
results = response.json()
|
||||||
# Extract and store instance info
|
if results.get('statuses') or results.get('accounts'):
|
||||||
parsed_url = urlparse(url)
|
self.logger.info(f"Successfully looked up: {post_url}")
|
||||||
instance = f"{parsed_url.scheme}://{parsed_url.netloc}"
|
return True
|
||||||
self.known_instances.add(instance)
|
else:
|
||||||
return True
|
self.logger.warning(f"No results for: {post_url}")
|
||||||
elif response.status_code == 404:
|
return False
|
||||||
self.logger.warning(f"No results for: {url}")
|
|
||||||
return False
|
|
||||||
else:
|
else:
|
||||||
self.logger.warning(f"Lookup failed for {url}: {response.status_code}")
|
self.logger.error(f"API error {response.status_code} for {post_url}: {response.text}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
self.logger.error(f"Error looking up {url}: {e}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
def get_instance_count(self):
|
except requests.exceptions.RequestException as e:
|
||||||
"""Get current instance count from API"""
|
self.logger.error(f"Error looking up {post_url}: {e}")
|
||||||
if not self.config['gts_instance']:
|
return False
|
||||||
return "unknown"
|
|
||||||
|
|
||||||
try:
|
|
||||||
api_url = f"{self.config['gts_instance']}/api/v1/instance"
|
|
||||||
response = requests.get(api_url, timeout=10)
|
|
||||||
if response.status_code == 200:
|
|
||||||
return len(self.known_instances)
|
|
||||||
except Exception as e:
|
|
||||||
self.logger.error(f"Failed to get instance count: {e}")
|
|
||||||
|
|
||||||
return "unknown"
|
|
||||||
|
|
||||||
def process_feeds(self):
|
def process_feeds(self):
|
||||||
"""Process all RSS feeds"""
|
"""Process all configured RSS feeds"""
|
||||||
total_posts = 0
|
total_processed = 0
|
||||||
feeds_processed = 0
|
|
||||||
initial_instance_count = len(self.known_instances)
|
# Record start time for statistics
|
||||||
|
self.start_time = time.time()
|
||||||
for rss_url in self.config['rss_urls']:
|
|
||||||
self.logger.info(f"Processing feed: {rss_url}")
|
# Ping healthcheck start
|
||||||
|
self.ping_healthcheck("/start")
|
||||||
# Fetch RSS feed
|
|
||||||
urls = self.fetch_rss_feed(rss_url)
|
|
||||||
|
|
||||||
# Filter new URLs
|
|
||||||
new_urls = [url for url in urls if url not in self.processed_urls]
|
|
||||||
|
|
||||||
if not new_urls:
|
|
||||||
self.logger.info("No new URLs to process")
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Limit posts per run
|
|
||||||
limited_urls = new_urls[:self.config['max_posts_per_run']]
|
|
||||||
self.logger.info(f"Processing {len(limited_urls)} new URLs")
|
|
||||||
|
|
||||||
# Process each URL
|
|
||||||
for url in limited_urls[:self.config['max_posts_per_run']]:
|
|
||||||
if self.lookup_post(url):
|
|
||||||
total_posts += 1
|
|
||||||
|
|
||||||
# Mark as processed regardless of success
|
|
||||||
self.processed_urls.add(url)
|
|
||||||
|
|
||||||
feeds_processed += 1
|
|
||||||
|
|
||||||
new_instances = len(self.known_instances) - initial_instance_count
|
|
||||||
|
|
||||||
return {
|
|
||||||
'total_posts': total_posts,
|
|
||||||
'feeds_processed': feeds_processed,
|
|
||||||
'instance_count': len(self.known_instances),
|
|
||||||
'new_instances': new_instances
|
|
||||||
}
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
"""Main execution method"""
|
|
||||||
start_time = datetime.now()
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
stats = self.process_feeds()
|
for rss_url in self.config["rss_urls"]:
|
||||||
|
if not rss_url.strip():
|
||||||
# Save state
|
continue
|
||||||
self.save_processed_urls()
|
|
||||||
self.save_known_instances()
|
self.logger.info(f"Processing feed: {rss_url}")
|
||||||
|
|
||||||
|
# Get URLs from RSS
|
||||||
|
urls = self.fetch_rss_urls(rss_url)
|
||||||
|
|
||||||
|
# Filter out already processed URLs
|
||||||
|
new_urls = [url for url in urls if url not in self.processed_urls]
|
||||||
|
|
||||||
|
if not new_urls:
|
||||||
|
self.logger.info("No new URLs to process")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Rate limiting: max posts per run
|
||||||
|
urls_to_process = new_urls[:self.config["max_posts_per_run"]]
|
||||||
|
|
||||||
|
self.logger.info(f"Processing {len(urls_to_process)} new URLs")
|
||||||
|
|
||||||
|
for url in urls_to_process:
|
||||||
|
if self.lookup_post(url):
|
||||||
|
self.processed_urls.add(url)
|
||||||
|
total_processed += 1
|
||||||
|
|
||||||
|
# Rate limiting: delay between requests
|
||||||
|
time.sleep(self.config["delay_between_requests"])
|
||||||
|
|
||||||
# Calculate runtime
|
# Calculate runtime
|
||||||
runtime = datetime.now() - start_time
|
end_time = time.time()
|
||||||
|
runtime_seconds = end_time - self.start_time
|
||||||
|
runtime_formatted = str(timedelta(seconds=int(runtime_seconds)))
|
||||||
|
|
||||||
# Print summary statistics
|
# Get current instance count
|
||||||
print("\n📊 GTS-HolMirDas Run Statistics:")
|
try:
|
||||||
print(f" ⏱️ Runtime: {runtime}")
|
instance_info = requests.get(f"{self.config['server_url']}/api/v1/instance",
|
||||||
print(f" 📄 Total posts processed: {stats['total_posts']}")
|
headers={'Authorization': f'Bearer {self.config["access_token"]}'},
|
||||||
print(f" 🌐 Current known instances: {stats['instance_count']}")
|
timeout=10)
|
||||||
print(f" ➕ New instances discovered: +{stats['new_instances']}")
|
if instance_info.status_code == 200:
|
||||||
print(f" 📡 RSS feeds processed: {stats['feeds_processed']}")
|
current_instances = instance_info.json().get('stats', {}).get('domain_count', 'unknown')
|
||||||
|
else:
|
||||||
|
current_instances = 'unknown'
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f"Failed to get instance count: {e}")
|
||||||
|
current_instances = 'unknown'
|
||||||
|
|
||||||
|
# Calculate new instances (if we have previous data)
|
||||||
|
new_instances = 'unknown'
|
||||||
|
if self.previous_instances > 0 and current_instances != 'unknown':
|
||||||
|
new_instances = current_instances - self.previous_instances
|
||||||
|
|
||||||
|
# Print comprehensive statistics
|
||||||
|
print(f"\n📊 GTS-HolMirDas Run Statistics:")
|
||||||
|
print(f" ⏱️ Runtime: {runtime_formatted}")
|
||||||
|
print(f" 📄 Total posts processed: {total_processed}")
|
||||||
|
print(f" 🌐 Current known instances: {current_instances}")
|
||||||
|
if new_instances != 'unknown' and new_instances > 0:
|
||||||
|
print(f" ➕ New instances discovered: +{new_instances}")
|
||||||
|
elif new_instances == 0:
|
||||||
|
print(f" ➕ New instances discovered: +0")
|
||||||
|
print(f" 📡 RSS feeds processed: {len(self.config['rss_urls'])}")
|
||||||
|
if runtime_seconds > 60:
|
||||||
|
print(f" ⚡ Posts per minute: {total_processed / (runtime_seconds / 60):.1f}")
|
||||||
|
|
||||||
|
self.save_processed_urls(current_instances)
|
||||||
|
|
||||||
|
# Ping healthcheck success
|
||||||
|
self.ping_healthcheck("")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.error(f"Fatal error: {e}")
|
self.logger.error(f"Error during processing: {e}")
|
||||||
|
# Ping healthcheck failure
|
||||||
|
self.ping_healthcheck("/fail")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
def ping_healthcheck(self, endpoint=""):
|
||||||
|
"""Ping healthchecks.io for monitoring"""
|
||||||
|
if not self.config.get("healthcheck_url"):
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
url = self.config["healthcheck_url"] + endpoint
|
||||||
|
requests.get(url, timeout=10)
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.warning(f"Failed to ping healthcheck: {e}")
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
"""Main function"""
|
"""Main entry point"""
|
||||||
try:
|
try:
|
||||||
print("Starting GTS-HolMirDas run...")
|
|
||||||
fetcher = GTSHolMirDas()
|
fetcher = GTSHolMirDas()
|
||||||
fetcher.run()
|
|
||||||
print("GTS-HolMirDas run completed. Sleeping for 1 hour...")
|
# Validate required config
|
||||||
|
if not fetcher.config["access_token"]:
|
||||||
|
raise ValueError("GTS_ACCESS_TOKEN environment variable is required")
|
||||||
|
|
||||||
|
fetcher.process_feeds()
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"Fatal error: {e}")
|
logging.error(f"Fatal error: {e}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue