BOSWatch 3
Python Script to receive and decode German BOS Information with rtl_fm and multimon-NG
 
Loading...
Searching...
No Matches
plugin.telegram.TelegramSender Class Reference

Public Member Functions

 __init__ (self, bot_token, chat_ids, max_retries=None, initial_delay=None, max_delay=None, parse_mode=None)
 
 send_message (self, text)
 
 send_location (self, latitude, longitude)
 

Data Fields

 bot_token
 
 chat_ids
 
 max_retries
 
 initial_delay
 
 max_delay
 
 parse_mode
 
 msg_queue
 

Protected Member Functions

 _worker_loop (self)
 
 _send_to_telegram (self, msg_type, chat_id, content)
 

Protected Attributes

 _worker
 

Constructor & Destructor Documentation

◆ __init__()

plugin.telegram.TelegramSender.__init__ (   self,
  bot_token,
  chat_ids,
  max_retries = None,
  initial_delay = None,
  max_delay = None,
  parse_mode = None 
)
38 def __init__(self, bot_token, chat_ids, max_retries=None, initial_delay=None, max_delay=None, parse_mode=None):
39 self.bot_token = bot_token
40 self.chat_ids = chat_ids
41 self.max_retries = max_retries if max_retries is not None else 5
42 self.initial_delay = initial_delay if initial_delay is not None else 2
43 self.max_delay = max_delay if max_delay is not None else 300
44 self.parse_mode = parse_mode
45 self.msg_queue = queue.Queue()
46 self._worker = threading.Thread(target=self._worker_loop, daemon=True)
47 self._worker.start()
48

Member Function Documentation

◆ send_message()

plugin.telegram.TelegramSender.send_message (   self,
  text 
)
49 def send_message(self, text):
50 for chat_id in self.chat_ids:
51 self.msg_queue.put(("text", chat_id, text, 0)) # retry_count = 0
52

◆ send_location()

plugin.telegram.TelegramSender.send_location (   self,
  latitude,
  longitude 
)
53 def send_location(self, latitude, longitude):
54 for chat_id in self.chat_ids:
55 self.msg_queue.put(("location", chat_id, {"latitude": latitude, "longitude": longitude}, 0))
56

◆ _worker_loop()

plugin.telegram.TelegramSender._worker_loop (   self)
protected
57 def _worker_loop(self):
58 delay = self.initial_delay
59
60 while True:
61 try:
62 msg_type, chat_id, content, retry_count = self.msg_queue.get()
63
64 success, permanent_failure, custom_delay = self._send_to_telegram(msg_type, chat_id, content)
65
66 if success:
67 delay = self.initial_delay
68
69 elif permanent_failure:
70 logger.error("Permanenter Fehler – Nachricht wird verworfen.")
71
72 elif retry_count >= self.max_retries:
73 logger.error("Maximale Wiederholungsanzahl erreicht – Nachricht wird verworfen.")
74
75 else:
76 logger.warning(f"Erneutes Einreihen der Nachricht (Versuch {retry_count + 1}).")
77 self.msg_queue.put((msg_type, chat_id, content, retry_count + 1))
78
79 # use the Telegram-provided value (retry_after) if available
80 wait_time = custom_delay if custom_delay is not None else delay
81 time.sleep(wait_time)
82
83 # increase delay for the next attempt (exponential backoff)
84 delay = min(delay * 2, self.max_delay)
85
86 except Exception as e:
87 logger.exception(f"Fehler im Telegram-Worker: {e}")
88 time.sleep(5)
89

◆ _send_to_telegram()

plugin.telegram.TelegramSender._send_to_telegram (   self,
  msg_type,
  chat_id,
  content 
)
protected
90 def _send_to_telegram(self, msg_type, chat_id, content):
91 if msg_type == "text":
92 url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage"
93 payload = {
94 'chat_id': chat_id,
95 'text': content,
96 }
97 if self.parse_mode:
98 payload['parse_mode'] = self.parse_mode
99 elif msg_type == "location":
100 url = f"https://api.telegram.org/bot{self.bot_token}/sendLocation"
101 payload = {
102 'chat_id': chat_id,
103 **content
104 }
105 else:
106 logger.error("Unbekannter Nachrichtentyp.")
107 return False, True, None # unknown message type = permanent failure
108
109 try:
110 custom_delay = None # standardvalue for return, except in case of 429
111
112 response = requests.post(url, data=payload, timeout=10)
113
114 if response.status_code == 429:
115 custom_delay = response.json().get("parameters", {}).get("retry_after", 5)
116 logger.warning(f"Rate Limit erreicht – warte {custom_delay} Sekunden.")
117 return False, False, custom_delay # Telegram gives exact wait time
118
119 if response.status_code == 400:
120 logger.error("Ungültige Parameter – Nachricht wird nicht erneut gesendet.")
121 return False, True, custom_delay # permanent failure
122
123 if response.status_code == 401:
124 logger.critical("Ungültiger Bot-Token – bitte prüfen!")
125 return False, True, custom_delay # permanent failure
126
127 response.raise_for_status()
128 logger.info(f"Erfolgreich gesendet an Chat-ID {chat_id}")
129 return True, False, custom_delay
130
131 except requests.RequestException as e:
132 logger.warning(f"Fehler beim Senden an Telegram (Chat-ID {chat_id}): {e}")
133 return False, False, custom_delay
134
135
136# ===========================
137# BoswatchPlugin-Class
138# ===========================
139
140

Field Documentation

◆ bot_token

plugin.telegram.TelegramSender.bot_token

◆ chat_ids

plugin.telegram.TelegramSender.chat_ids

◆ max_retries

plugin.telegram.TelegramSender.max_retries

◆ initial_delay

plugin.telegram.TelegramSender.initial_delay

◆ max_delay

plugin.telegram.TelegramSender.max_delay

◆ parse_mode

plugin.telegram.TelegramSender.parse_mode

◆ msg_queue

plugin.telegram.TelegramSender.msg_queue

◆ _worker

plugin.telegram.TelegramSender._worker
protected