#!/usr/bin/env python3 """ CrowdSec Notification Webhook Groups alerts by IP and sends formatted notifications via Apprise Author: Maxime Killinger """ import os import logging from flask import Flask, request, jsonify import requests # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) app = Flask(__name__) import pycountry # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) app = Flask(__name__) APPRISE_URL = os.environ.get("APPRISE_URL") def get_country_name(code): if not code or code.lower() == 'unknown': logger.warning(f"Country code is unknown or empty: {code}") return "Unknown" try: # Standardize code to alpha_2 # Clean string just in case clean_code = code.strip().upper() if len(clean_code) != 2: logger.warning(f"Invalid country code format: {code}") return clean_code country = pycountry.countries.get(alpha_2=clean_code) if country: return country.name logger.warning(f"Country code not found in pycountry: {clean_code}") return clean_code except Exception as e: logger.warning(f"Error resolving country code {code}: {e}") return code def format_scenario(scenario): if not scenario: return "unknown" return scenario.replace("crowdsecurity/", "").replace("_", " ") @app.route('/crowdsec', methods=['POST']) def handle_crowdsec(): try: alerts = request.json if not alerts: logger.debug("Received empty alert payload") return jsonify({"status": "no data"}), 200 # Group by IP ip_groups = {} for alert in alerts: ip = alert.get("Source", {}).get("IP", "unknown") country = alert.get("Source", {}).get("Cn", "") scenario = alert.get("Scenario", "unknown") if ip not in ip_groups: ip_groups[ip] = {"country": country, "scenarios": set()} ip_groups[ip]["scenarios"].add(format_scenario(scenario)) # Format message num_ips = len(ip_groups) lines = [] for ip, data in ip_groups.items(): country_name = get_country_name(data["country"]) whois_link = f"https://who.is/whois-ip/ip-address/{ip}" lines.append(f"🚫 {ip} ({country_name})") lines.append(f" 🔗 {whois_link}") for scenario in sorted(data["scenarios"]): lines.append(f" └ {scenario}") lines.append("") body = "\n".join(lines).strip() title = f"🛡️ CrowdSec - {num_ips} IP{'s' if num_ips > 1 else ''} bannie{'s' if num_ips > 1 else ''}" # Send to Apprise response = requests.post(APPRISE_URL, json={"title": title, "body": body}, timeout=10) if response.ok: ips_list = ", ".join(ip_groups.keys()) logger.info(f"Notification sent: {num_ips} IP(s) banned [{ips_list}]") else: logger.error(f"Apprise error: {response.status_code} - {response.text}") return jsonify({"status": "sent", "ips": num_ips}), 200 except Exception as e: logger.exception(f"Error processing CrowdSec alert: {e}") return jsonify({"status": "error", "message": str(e)}), 500 @app.route('/health', methods=['GET']) def health(): return jsonify({"status": "ok"}), 200 if __name__ == '__main__': from waitress import serve import sys import signal if not APPRISE_URL: logger.critical("APPRISE_URL environment variable is not set") sys.exit(1) def handle_signal(signum, frame): logger.info(f"Received signal {signum}. Shutting down gracefully...") # Waitress does not have a public stop method when run this way easily, # but sys.exit(0) from a signal handler usually works to terminate. # However, waitress catches signals if passed to serve? # Actually waitress handles signals by itself if running in main. # But we want to LOG it. sys.exit(0) signal.signal(signal.SIGTERM, handle_signal) signal.signal(signal.SIGINT, handle_signal) logger.info(f"Starting CrowdSec Notify - Apprise URL: {APPRISE_URL}") serve(app, host='0.0.0.0', port=5000)