All checks were successful
🚀 Docker Build and Push / build-and-push (push) Successful in 48s
89 lines
3.3 KiB
Python
89 lines
3.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
CrowdSec Notification Webhook
|
|
Groups alerts by IP and sends formatted notifications via Apprise
|
|
Author: Maxime Killinger
|
|
"""
|
|
|
|
import os
|
|
from flask import Flask, request, jsonify
|
|
import requests
|
|
|
|
app = Flask(__name__)
|
|
|
|
APPRISE_URL = os.environ.get("APPRISE_URL", "http://192.168.1.118:8000/notify/telegram_alerts")
|
|
|
|
# Country codes to full names
|
|
COUNTRIES = {
|
|
"CN": "Chine", "RU": "Russie", "US": "États-Unis", "FR": "France",
|
|
"DE": "Allemagne", "GB": "Royaume-Uni", "NL": "Pays-Bas", "BR": "Brésil",
|
|
"IN": "Inde", "KR": "Corée du Sud", "JP": "Japon", "VN": "Vietnam",
|
|
"ID": "Indonésie", "TW": "Taïwan", "HK": "Hong Kong", "SG": "Singapour",
|
|
"TH": "Thaïlande", "PH": "Philippines", "MY": "Malaisie", "PK": "Pakistan",
|
|
"BD": "Bangladesh", "IR": "Iran", "TR": "Turquie", "UA": "Ukraine",
|
|
"PL": "Pologne", "RO": "Roumanie", "IT": "Italie", "ES": "Espagne",
|
|
"AR": "Argentine", "MX": "Mexique", "CO": "Colombie", "CL": "Chili",
|
|
"ZA": "Afrique du Sud", "EG": "Égypte", "NG": "Nigeria", "KE": "Kenya",
|
|
"AU": "Australie", "NZ": "Nouvelle-Zélande", "CA": "Canada", "SE": "Suède",
|
|
"NO": "Norvège", "FI": "Finlande", "DK": "Danemark", "BE": "Belgique",
|
|
"CH": "Suisse", "AT": "Autriche", "CZ": "Tchéquie", "HU": "Hongrie",
|
|
"BG": "Bulgarie", "GR": "Grèce", "PT": "Portugal", "IE": "Irlande",
|
|
}
|
|
|
|
def get_country_name(code):
|
|
if not code:
|
|
return "Inconnu"
|
|
return COUNTRIES.get(code.upper(), code)
|
|
|
|
def format_scenario(scenario):
|
|
if not scenario:
|
|
return "unknown"
|
|
return scenario.replace("crowdsecurity/", "").replace("_", " ")
|
|
|
|
@app.route('/crowdsec', methods=['POST'])
|
|
def handle_crowdsec():
|
|
try:
|
|
alerts = request.json
|
|
if not alerts:
|
|
return jsonify({"status": "no data"}), 200
|
|
|
|
# Group by IP
|
|
ip_groups = {}
|
|
for alert in alerts:
|
|
ip = alert.get("Source", {}).get("IP", "unknown")
|
|
country = alert.get("Source", {}).get("Cn", "")
|
|
scenario = alert.get("Scenario", "unknown")
|
|
|
|
if ip not in ip_groups:
|
|
ip_groups[ip] = {"country": country, "scenarios": []}
|
|
ip_groups[ip]["scenarios"].append(format_scenario(scenario))
|
|
|
|
# Format message
|
|
num_ips = len(ip_groups)
|
|
lines = []
|
|
|
|
for ip, data in ip_groups.items():
|
|
country_name = get_country_name(data["country"])
|
|
whois_link = f"https://who.is/whois-ip/ip-address/{ip}"
|
|
lines.append(f"🚫 {ip} ({country_name})")
|
|
lines.append(f" 🔗 {whois_link}")
|
|
for scenario in data["scenarios"]:
|
|
lines.append(f" └ {scenario}")
|
|
lines.append("")
|
|
|
|
body = "\n".join(lines).strip()
|
|
title = f"🛡️ CrowdSec - {num_ips} IP{'s' if num_ips > 1 else ''} bannie{'s' if num_ips > 1 else ''}"
|
|
|
|
requests.post(APPRISE_URL, json={"title": title, "body": body}, timeout=10)
|
|
return jsonify({"status": "sent", "ips": num_ips}), 200
|
|
|
|
except Exception as e:
|
|
return jsonify({"status": "error", "message": str(e)}), 500
|
|
|
|
@app.route('/health', methods=['GET'])
|
|
def health():
|
|
return jsonify({"status": "ok"}), 200
|
|
|
|
if __name__ == '__main__':
|
|
app.run(host='0.0.0.0', port=5000)
|