253 lines
10 KiB
Python
253 lines
10 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Watchtower Telegram Notifier - Notificaciones personalizadas en español
|
||
Monitorea los logs de Watchtower y envía notificaciones detalladas a Telegram
|
||
"""
|
||
|
||
import docker
|
||
import requests
|
||
import time
|
||
import re
|
||
import os
|
||
from datetime import datetime
|
||
from threading import Thread
|
||
import logging
|
||
|
||
# Configuración
|
||
TELEGRAM_BOT_TOKEN = "7676713419:AAG-tfwzgrA9JaU5xNjvG5iBlFeZpz2ahiY"
|
||
TELEGRAM_CHAT_ID = "7940222048"
|
||
WATCHTOWER_CONTAINER_NAME = "watchtower"
|
||
# Obtener el nombre del host de una variable de entorno, con un valor por defecto
|
||
HOSTNAME = os.environ.get("WATCHTOWER_NOTIFICATIONS_HOSTNAME", "Servidor-Easypanel")
|
||
|
||
# Configurar logging
|
||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class WatchtowerTelegramNotifier:
|
||
def __init__(self):
|
||
self.client = docker.from_env()
|
||
self.telegram_url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
|
||
self.container_updates = {}
|
||
self.scan_in_progress = False
|
||
|
||
def send_telegram_message(self, message):
|
||
"""Envía mensaje a Telegram"""
|
||
try:
|
||
payload = {
|
||
'chat_id': TELEGRAM_CHAT_ID,
|
||
'text': message,
|
||
'parse_mode': 'HTML'
|
||
}
|
||
response = requests.post(self.telegram_url, data=payload, timeout=10)
|
||
if response.status_code == 200:
|
||
logger.info("✅ Mensaje enviado a Telegram exitosamente")
|
||
return True
|
||
else:
|
||
logger.error(f"❌ Error enviando mensaje: {response.status_code}")
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"❌ Error conectando a Telegram: {e}")
|
||
return False
|
||
|
||
def format_startup_message(self):
|
||
"""Mensaje de inicio del sistema"""
|
||
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
message = f"""🔄 <b>WATCHTOWER INICIADO</b>
|
||
|
||
🏠 <b>Servidor:</b> {HOSTNAME}
|
||
📅 <b>Fecha:</b> {now}
|
||
⚙️ <b>Estado:</b> Monitoreo activo
|
||
🕐 <b>Intervalo:</b> Cada 24 horas
|
||
|
||
📋 <b>Contenedores monitoreados:</b>"""
|
||
|
||
try:
|
||
containers = self.client.containers.list()
|
||
for container in containers:
|
||
if container.name != WATCHTOWER_CONTAINER_NAME:
|
||
status_emoji = "🟢" if container.status == "running" else "🔴"
|
||
message += f"\n{status_emoji} {container.name}"
|
||
except Exception as e:
|
||
logger.error(f"Error obteniendo contenedores: {e}")
|
||
|
||
message += "\n\n✅ <b>Sistema listo para detectar actualizaciones</b>"
|
||
return message
|
||
|
||
def format_scan_start_message(self):
|
||
"""Mensaje cuando inicia un escaneo"""
|
||
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
return f"""🔍 <b>ESCANEO DE ACTUALIZACIONES INICIADO</b>
|
||
|
||
🏠 <b>Servidor:</b> {HOSTNAME}
|
||
📅 <b>Fecha:</b> {now}
|
||
⏳ <b>Estado:</b> Verificando imágenes...
|
||
|
||
<i>Buscando actualizaciones disponibles...</i>"""
|
||
|
||
def format_update_report(self, updated_containers, checked_containers):
|
||
"""Formato del reporte final de actualizaciones"""
|
||
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
if updated_containers:
|
||
message = f"""🎉 <b>ACTUALIZACIONES COMPLETADAS</b>
|
||
|
||
🏠 <b>Servidor:</b> {HOSTNAME}
|
||
📅 <b>Fecha:</b> {now}
|
||
📦 <b>Contenedores verificados:</b> {len(checked_containers)}
|
||
|
||
<b>📥 ACTUALIZACIONES REALIZADAS:</b>"""
|
||
|
||
for container_info in updated_containers:
|
||
message += f"\n✅ <b>{container_info['name']}</b>"
|
||
message += f"\n 📋 Imagen: <code>{container_info['image']}</code>"
|
||
if container_info.get('old_image'):
|
||
message += f"\n 🔄 Desde: <code>{container_info['old_image']}</code>"
|
||
|
||
unchanged = [c for c in checked_containers if c not in [u['name'] for u in updated_containers]]
|
||
if unchanged:
|
||
message += f"\n\n<b>ℹ️ SIN CAMBIOS:</b>"
|
||
for container in unchanged:
|
||
message += f"\n🔘 {container}"
|
||
|
||
message += f"\n\n🎯 <b>Total actualizados:</b> {len(updated_containers)}"
|
||
message += "\n✨ <b>¡Todos los servicios están actualizados!</b>"
|
||
|
||
else:
|
||
message = f"""ℹ️ <b>VERIFICACIÓN COMPLETADA</b>
|
||
|
||
🏠 <b>Servidor:</b> {HOSTNAME}
|
||
📅 <b>Fecha:</b> {now}
|
||
📦 <b>Contenedores verificados:</b> {len(checked_containers)}
|
||
|
||
<b>📋 ESTADO:</b>"""
|
||
|
||
for container in checked_containers:
|
||
message += f"\n🔘 <b>{container}</b> - Sin actualizaciones"
|
||
|
||
message += "\n\n✅ <b>Todos los contenedores están al día</b>"
|
||
|
||
return message
|
||
|
||
def parse_watchtower_logs(self, log_line):
|
||
"""Analiza líneas de log de Watchtower"""
|
||
log_line = log_line.strip()
|
||
|
||
# Detectar inicio de escaneo
|
||
if "Checking all containers" in log_line and not self.scan_in_progress:
|
||
self.scan_in_progress = True
|
||
self.container_updates.clear()
|
||
self.send_telegram_message(self.format_scan_start_message())
|
||
return
|
||
|
||
# Detectar actualizaciones
|
||
update_patterns = [
|
||
r'Found new.*image for (.+)',
|
||
r'Updating (.+?) with.*',
|
||
r'Updated (.+?) \(.*\)'
|
||
]
|
||
|
||
for pattern in update_patterns:
|
||
match = re.search(pattern, log_line, re.IGNORECASE)
|
||
if match:
|
||
container_name = match.group(1)
|
||
if container_name not in self.container_updates:
|
||
self.container_updates[container_name] = {
|
||
'name': container_name,
|
||
'image': 'Detectando...',
|
||
'updated': True
|
||
}
|
||
logger.info(f"📦 Actualización detectada: {container_name}")
|
||
break
|
||
|
||
# Detectar fin de proceso
|
||
if any(phrase in log_line.lower() for phrase in ["session done", "watchtower will check again", "sleeping"]):
|
||
if self.scan_in_progress:
|
||
self.send_update_report()
|
||
self.scan_in_progress = False
|
||
|
||
def send_update_report(self):
|
||
"""Envía el reporte final de actualizaciones"""
|
||
try:
|
||
# Obtener lista actual de contenedores
|
||
containers = self.client.containers.list(all=True)
|
||
container_names = [c.name for c in containers if c.name != WATCHTOWER_CONTAINER_NAME]
|
||
|
||
# Completar información de contenedores actualizados
|
||
updated_list = []
|
||
for container_name, info in self.container_updates.items():
|
||
try:
|
||
container = self.client.containers.get(container_name)
|
||
info['image'] = container.image.tags[0] if container.image.tags else "Sin tag"
|
||
except:
|
||
info['image'] = "Imagen no encontrada"
|
||
updated_list.append(info)
|
||
|
||
# Enviar reporte
|
||
report = self.format_update_report(updated_list, container_names)
|
||
self.send_telegram_message(report)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error generando reporte: {e}")
|
||
error_msg = f"❌ <b>Error generando reporte de actualizaciones</b>\n\n🏠 <b>Servidor:</b> {HOSTNAME}\n⚠️ <b>Error:</b> {str(e)}"
|
||
self.send_telegram_message(error_msg)
|
||
|
||
def monitor_watchtower(self):
|
||
"""Monitorea logs de Watchtower en tiempo real"""
|
||
logger.info("🔍 Iniciando monitoreo de Watchtower...")
|
||
|
||
try:
|
||
container = self.client.containers.get(WATCHTOWER_CONTAINER_NAME)
|
||
logger.info(f"✅ Contenedor Watchtower encontrado: {container.id[:12]}")
|
||
|
||
# Enviar mensaje de inicio
|
||
self.send_telegram_message(self.format_startup_message())
|
||
|
||
# Monitorear logs en tiempo real
|
||
for log in container.logs(stream=True, follow=True, tail=10):
|
||
try:
|
||
log_line = log.decode('utf-8').strip()
|
||
if log_line:
|
||
logger.debug(f"Log: {log_line}")
|
||
self.parse_watchtower_logs(log_line)
|
||
except Exception as e:
|
||
logger.error(f"Error procesando log: {e}")
|
||
continue
|
||
|
||
except docker.errors.NotFound:
|
||
error_msg = f"❌ <b>ERROR: Contenedor Watchtower no encontrado</b>\n\n🏠 <b>Servidor:</b> {HOSTNAME}\n⚠️ <b>Nombre esperado:</b> {WATCHTOWER_CONTAINER_NAME}"
|
||
self.send_telegram_message(error_msg)
|
||
logger.error("❌ Contenedor Watchtower no encontrado")
|
||
except Exception as e:
|
||
error_msg = f"❌ <b>ERROR EN MONITOREO</b>\n\n🏠 <b>Servidor:</b> {HOSTNAME}\n⚠️ <b>Error:</b> {str(e)}"
|
||
self.send_telegram_message(error_msg)
|
||
logger.error(f"❌ Error en monitoreo: {e}")
|
||
|
||
def main():
|
||
logger.info("🚀 Iniciando Watchtower Telegram Notifier...")
|
||
|
||
notifier = WatchtowerTelegramNotifier()
|
||
|
||
# Verificar conexión a Telegram
|
||
test_message = f"🤖 <b>Bot de notificaciones iniciado</b>\n\n🏠 <b>Servidor:</b> {HOSTNAME}\n📅 <b>Fecha:</b> {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n✅ <b>Conexión establecida correctamente</b>"
|
||
|
||
if notifier.send_telegram_message(test_message):
|
||
logger.info("✅ Conexión a Telegram verificada")
|
||
else:
|
||
logger.error("❌ Error de conexión a Telegram")
|
||
return
|
||
|
||
# Iniciar monitoreo
|
||
try:
|
||
notifier.monitor_watchtower()
|
||
except KeyboardInterrupt:
|
||
logger.info("🛑 Monitoreo interrumpido por el usuario")
|
||
farewell_msg = f"👋 <b>Monitoreo detenido</b>\n\n🏠 <b>Servidor:</b> {HOSTNAME}\n📅 <b>Fecha:</b> {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n⚠️ <b>Bot desconectado</b>"
|
||
notifier.send_telegram_message(farewell_msg)
|
||
except Exception as e:
|
||
logger.error(f"❌ Error fatal: {e}")
|
||
error_msg = f"💥 <b>ERROR FATAL EN BOT</b>\n\n🏠 <b>Servidor:</b> {HOSTNAME}\n⚠️ <b>Error:</b> {str(e)}\n\n🔄 <b>Reinicia el script</b>"
|
||
notifier.send_telegram_message(error_msg)
|
||
|
||
if __name__ == "__main__":
|
||
main() |