Files
crowdsec-notify/app.py
Maxime Killinger d180cb700c
All checks were successful
🚀 Docker Build and Push / build-and-push (push) Successful in 59s
refactor: optimize docker, robust config & country handling
- optimize(docker): use distroless/python3-debian12 & multi-stage build
- feat(app): replace Gunicorn with Waitress (pure python)
- feat(countries): replace static list with pycountry (dynamic & complete)
- feat(config): enforce mandatory APPRISE_URL (fatal if missing)
- feat(logging): add structured logging and graceful shutdown signal handling
- docs: update README with required env var and correct usage
2025-12-31 18:46:21 +01:00

141 lines
4.5 KiB
Python

#!/usr/bin/env python3
"""
CrowdSec Notification Webhook
Groups alerts by IP and sends formatted notifications via Apprise
Author: Maxime Killinger
"""
import os
import logging
from flask import Flask, request, jsonify
import requests
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
app = Flask(__name__)
import pycountry
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
app = Flask(__name__)
APPRISE_URL = os.environ.get("APPRISE_URL")
def get_country_name(code):
if not code or code.lower() == 'unknown':
logger.warning(f"Country code is unknown or empty: {code}")
return "Unknown"
try:
# Standardize code to alpha_2
# Clean string just in case
clean_code = code.strip().upper()
if len(clean_code) != 2:
logger.warning(f"Invalid country code format: {code}")
return clean_code
country = pycountry.countries.get(alpha_2=clean_code)
if country:
return country.name
logger.warning(f"Country code not found in pycountry: {clean_code}")
return clean_code
except Exception as e:
logger.warning(f"Error resolving country code {code}: {e}")
return code
def format_scenario(scenario):
if not scenario:
return "unknown"
return scenario.replace("crowdsecurity/", "").replace("_", " ")
@app.route('/crowdsec', methods=['POST'])
def handle_crowdsec():
try:
alerts = request.json
if not alerts:
logger.debug("Received empty alert payload")
return jsonify({"status": "no data"}), 200
# Group by IP
ip_groups = {}
for alert in alerts:
ip = alert.get("Source", {}).get("IP", "unknown")
country = alert.get("Source", {}).get("Cn", "")
scenario = alert.get("Scenario", "unknown")
if ip not in ip_groups:
ip_groups[ip] = {"country": country, "scenarios": []}
ip_groups[ip]["scenarios"].append(format_scenario(scenario))
# Format message
num_ips = len(ip_groups)
lines = []
for ip, data in ip_groups.items():
country_name = get_country_name(data["country"])
whois_link = f"https://who.is/whois-ip/ip-address/{ip}"
lines.append(f"🚫 {ip} ({country_name})")
lines.append(f" 🔗 {whois_link}")
for scenario in data["scenarios"]:
lines.append(f"{scenario}")
lines.append("")
body = "\n".join(lines).strip()
title = f"🛡️ CrowdSec - {num_ips} IP{'s' if num_ips > 1 else ''} bannie{'s' if num_ips > 1 else ''}"
# Send to Apprise
response = requests.post(APPRISE_URL, json={"title": title, "body": body}, timeout=10)
if response.ok:
ips_list = ", ".join(ip_groups.keys())
logger.info(f"Notification sent: {num_ips} IP(s) banned [{ips_list}]")
else:
logger.error(f"Apprise error: {response.status_code} - {response.text}")
return jsonify({"status": "sent", "ips": num_ips}), 200
except Exception as e:
logger.exception(f"Error processing CrowdSec alert: {e}")
return jsonify({"status": "error", "message": str(e)}), 500
@app.route('/health', methods=['GET'])
def health():
return jsonify({"status": "ok"}), 200
if __name__ == '__main__':
from waitress import serve
import sys
import signal
if not APPRISE_URL:
logger.critical("APPRISE_URL environment variable is not set")
sys.exit(1)
def handle_signal(signum, frame):
logger.info(f"Received signal {signum}. Shutting down gracefully...")
# Waitress does not have a public stop method when run this way easily,
# but sys.exit(0) from a signal handler usually works to terminate.
# However, waitress catches signals if passed to serve?
# Actually waitress handles signals by itself if running in main.
# But we want to LOG it.
sys.exit(0)
signal.signal(signal.SIGTERM, handle_signal)
signal.signal(signal.SIGINT, handle_signal)
logger.info(f"Starting CrowdSec Notify - Apprise URL: {APPRISE_URL}")
serve(app, host='0.0.0.0', port=5000)