BOSWatch 3
Python Script to receive and decode German BOS Information with rtl_fm and multimon-NG
 
Loading...
Searching...
No Matches
plugin.telegram.TelegramSender Class Reference

Public Member Functions

 __init__ (self, bot_token, chat_ids, max_retries=None, initial_delay=None, max_delay=None)
 
 send_message (self, text)
 
 send_location (self, latitude, longitude)
 

Data Fields

 bot_token
 
 chat_ids
 
 max_retries
 
 initial_delay
 
 max_delay
 
 msg_queue
 

Protected Member Functions

 _worker_loop (self)
 
 _send_to_telegram (self, msg_type, chat_id, content)
 

Protected Attributes

 _worker
 

Constructor & Destructor Documentation

◆ __init__()

plugin.telegram.TelegramSender.__init__ (   self,
  bot_token,
  chat_ids,
  max_retries = None,
  initial_delay = None,
  max_delay = None 
)
38 def __init__(self, bot_token, chat_ids, max_retries=None, initial_delay=None, max_delay=None):
39 self.bot_token = bot_token
40 self.chat_ids = chat_ids
41 self.max_retries = max_retries if max_retries is not None else 5
42 self.initial_delay = initial_delay if initial_delay is not None else 2
43 self.max_delay = max_delay if max_delay is not None else 300
44 self.msg_queue = queue.Queue()
45 self._worker = threading.Thread(target=self._worker_loop, daemon=True)
46 self._worker.start()
47

Member Function Documentation

◆ send_message()

plugin.telegram.TelegramSender.send_message (   self,
  text 
)
48 def send_message(self, text):
49 for chat_id in self.chat_ids:
50 self.msg_queue.put(("text", chat_id, text, 0)) # retry_count = 0
51

◆ send_location()

plugin.telegram.TelegramSender.send_location (   self,
  latitude,
  longitude 
)
52 def send_location(self, latitude, longitude):
53 for chat_id in self.chat_ids:
54 self.msg_queue.put(("location", chat_id, {"latitude": latitude, "longitude": longitude}, 0))
55

◆ _worker_loop()

plugin.telegram.TelegramSender._worker_loop (   self)
protected
56 def _worker_loop(self):
57 delay = self.initial_delay
58
59 while True:
60 try:
61 msg_type, chat_id, content, retry_count = self.msg_queue.get()
62
63 success, permanent_failure, custom_delay = self._send_to_telegram(msg_type, chat_id, content)
64
65 if success:
66 delay = self.initial_delay
67
68 elif permanent_failure:
69 logger.error("Permanenter Fehler – Nachricht wird verworfen.")
70
71 elif retry_count >= self.max_retries:
72 logger.error("Maximale Wiederholungsanzahl erreicht – Nachricht wird verworfen.")
73
74 else:
75 logger.warning(f"Erneutes Einreihen der Nachricht (Versuch {retry_count + 1}).")
76 self.msg_queue.put((msg_type, chat_id, content, retry_count + 1))
77
78 # Nutze den von Telegram gelieferten Wert (retry_after), falls vorhanden
79 wait_time = custom_delay if custom_delay is not None else delay
80 time.sleep(wait_time)
81
82 # Erhöhe Delay für den nächsten Versuch (exponentielles Backoff)
83 delay = min(delay * 2, self.max_delay)
84
85 except Exception as e:
86 logger.exception(f"Fehler im Telegram-Worker: {e}")
87 time.sleep(5)
88

◆ _send_to_telegram()

plugin.telegram.TelegramSender._send_to_telegram (   self,
  msg_type,
  chat_id,
  content 
)
protected
89 def _send_to_telegram(self, msg_type, chat_id, content):
90 if msg_type == "text":
91 url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage"
92 payload = {
93 'chat_id': chat_id,
94 'text': content
95 }
96 elif msg_type == "location":
97 url = f"https://api.telegram.org/bot{self.bot_token}/sendLocation"
98 payload = {
99 'chat_id': chat_id,
100 **content
101 }
102 else:
103 logger.error("Unbekannter Nachrichtentyp.")
104 return False, True, None # Unbekannter Typ = permanent falsch
105
106 try:
107 custom_delay = None # Standardwert für Rückgabe, außer bei 429
108
109 response = requests.post(url, data=payload, timeout=10)
110
111 if response.status_code == 429:
112 custom_delay = response.json().get("parameters", {}).get("retry_after", 5)
113 logger.warning(f"Rate Limit erreicht – warte {custom_delay} Sekunden.")
114 return False, False, custom_delay # Telegram gibt genaue Wartezeit vor
115
116 if response.status_code == 400:
117 logger.error("Ungültige Parameter – Nachricht wird nicht erneut gesendet.")
118 return False, True, custom_delay # Permanent fehlerhaft
119
120 if response.status_code == 401:
121 logger.critical("Ungültiger Bot-Token – bitte prüfen!")
122 return False, True, custom_delay # Permanent fehlerhaft
123
124 response.raise_for_status()
125 logger.info(f"Erfolgreich gesendet an Chat-ID {chat_id}")
126 return True, False, custom_delay
127
128 except requests.RequestException as e:
129 logger.warning(f"Fehler beim Senden an Telegram (Chat-ID {chat_id}): {e}")
130 return False, False, custom_delay
131
132
133# ===========================
134# BoswatchPlugin-Klasse
135# ===========================
136
137

Field Documentation

◆ bot_token

plugin.telegram.TelegramSender.bot_token

◆ chat_ids

plugin.telegram.TelegramSender.chat_ids

◆ max_retries

plugin.telegram.TelegramSender.max_retries

◆ initial_delay

plugin.telegram.TelegramSender.initial_delay

◆ max_delay

plugin.telegram.TelegramSender.max_delay

◆ msg_queue

plugin.telegram.TelegramSender.msg_queue

◆ _worker

plugin.telegram.TelegramSender._worker
protected