BOSWatch 3
Python Script to receive and decode German BOS Information with rtl_fm and multimon-NG
 
Loading...
Searching...
No Matches
module.descriptor.BoswatchModule Class Reference

Adds descriptions to bwPackets with CSV and Regex support. More...

Public Member Functions

 __init__ (self, config)
 Do not change anything here!
 
 onLoad (self)
 Called by import of the plugin.
 
 doWork (self, bwPacket)
 start an run of the module.
 
 onUnload (self)
 Called by destruction of the plugin.
 

Data Fields

 unified_cache
 
- Data Fields inherited from module.moduleBase.ModuleBase
 config
 

Protected Member Functions

 _load_csv_data (self, csv_path, descriptor_key)
 Load CSV data for a descriptor and add to unified cache.
 
 _find_description (self, descriptor_key, scan_value, bw_packet)
 Find matching description for a scan value with Regex group support.
 
 _replace_wildcards (self, text, bw_packet)
 Replace all available wildcards in description text dynamically.
 
- Protected Member Functions inherited from module.moduleBase.ModuleBase
 _cleanup (self)
 Cleanup routine calls onUnload() directly.
 
 _run (self, bwPacket)
 start an run of the module.
 
 _getStatistics (self)
 Returns statistical information's from last module run.
 

Additional Inherited Members

- Static Public Member Functions inherited from module.moduleBase.ModuleBase
 registerWildcard (newWildcard, bwPacketField)
 Register a new wildcard.
 
- Protected Attributes inherited from module.moduleBase.ModuleBase
 _moduleName
 
 _cumTime
 
 _moduleTime
 
 _runCount
 
 _moduleErrorCount
 
- Static Protected Attributes inherited from module.moduleBase.ModuleBase
list _modulesActive = []
 

Detailed Description

Adds descriptions to bwPackets with CSV and Regex support.

Constructor & Destructor Documentation

◆ __init__()

module.descriptor.BoswatchModule.__init__ (   self,
  config 
)

Do not change anything here!

Reimplemented from module.moduleBase.ModuleBase.

33 def __init__(self, config):
34 r"""!Do not change anything here!"""
35 super().__init__(__name__, config) # you can access the config class on 'self.config'
36

Member Function Documentation

◆ onLoad()

module.descriptor.BoswatchModule.onLoad (   self)

Called by import of the plugin.

Reimplemented from module.moduleBase.ModuleBase.

37 def onLoad(self):
38 r"""!Called by import of the plugin"""
39 # Initialize unified cache for all descriptors
40 self.unified_cache = {}
41
42 # Process each descriptor configuration
43 for descriptor_config in self.config:
44 scan_field = descriptor_config.get("scanField")
45 descr_field = descriptor_config.get("descrField")
46 descriptor_key = f"{scan_field}_{descr_field}"
47
48 # Register wildcard if specified
49 if descriptor_config.get("wildcard", default=None):
50 self.registerWildcard(descriptor_config.get("wildcard"), descr_field)
51
52 # Initialize cache for this descriptor
53 self.unified_cache[descriptor_key] = []
54
55 # Load YAML descriptions first (for backward compatibility)
56 yaml_descriptions = descriptor_config.get("descriptions", default=None)
57 if yaml_descriptions:
58 # yaml_descriptions is a Config object, we need to iterate properly
59 for desc in yaml_descriptions:
60 entry = {
61 'for': str(desc.get("for", default="")),
62 'add': desc.get("add", default=""),
63 'isRegex': desc.get("isRegex", default=False) # Default: False
64 }
65 # Handle string 'true'/'false' values
66 if isinstance(entry['isRegex'], str):
67 entry['isRegex'] = entry['isRegex'].lower() == 'true'
68
69 self.unified_cache[descriptor_key].append(entry)
70 logging.debug("Added YAML entry: %s -> %s", entry['for'], entry['add'])
71 logging.info("Loaded %d YAML descriptions for %s", len(yaml_descriptions), descriptor_key)
72
73 # Load CSV descriptions if csvPath is specified
74 csv_path = descriptor_config.get("csvPath", default=None)
75 if csv_path:
76 self._load_csv_data(csv_path, descriptor_key)
77
78 logging.info("Total entries for %s: %d", descriptor_key, len(self.unified_cache[descriptor_key]))
79

◆ _load_csv_data()

module.descriptor.BoswatchModule._load_csv_data (   self,
  csv_path,
  descriptor_key 
)
protected

Load CSV data for a descriptor and add to unified cache.

80 def _load_csv_data(self, csv_path, descriptor_key):
81 r"""!Load CSV data for a descriptor and add to unified cache"""
82 try:
83 if not os.path.isfile(csv_path):
84 logging.error("CSV file not found: %s", csv_path)
85 return
86
87 csv_count = 0
88 with open(csv_path, 'r', encoding='utf-8') as csvfile:
89 reader = csv.DictReader(csvfile)
90 for row in reader:
91 # Set default values if columns are missing
92 raw_for = str(row.get('for', '')).strip()
93 # Remove enclosing quotes
94 clean_for = raw_for.strip().strip('"').strip("'")
95 entry = {
96 'for': clean_for,
97 'add': row.get('add', '').strip(),
98 'isRegex': row.get('isRegex', 'false').lower() == 'true' # Default: False
99 }
100 logging.debug("CSV row read: %s", row)
101 self.unified_cache[descriptor_key].append(entry)
102 csv_count += 1
103
104 logging.info("Loaded %d entries from CSV: %s for %s", csv_count, csv_path, descriptor_key)
105
106 except Exception as e:
107 logging.error("Error loading CSV file %s: %s", csv_path, str(e))
108

◆ _find_description()

module.descriptor.BoswatchModule._find_description (   self,
  descriptor_key,
  scan_value,
  bw_packet 
)
protected

Find matching description for a scan value with Regex group support.

The search is performed in two passes for performance optimization:

  1. First pass: Check for exact string matches (fast, no regex compilation)
  2. Second pass: Check regex patterns only if no exact match was found

Regex patterns support capture groups that can be referenced in the description using standard regex backreferences (\1, \2, etc.) via match.expand().

Example: Pattern: r"(\d{7})" Input: "1234567" Description template: "RIC: \1" Result: "RIC: 1234567"

Parameters
descriptor_keyCache key identifying the descriptor configuration
scan_valueValue to search for in the descriptor cache
bw_packetBOSWatch packet for wildcard replacement
Returns
: Matched description string or None if no match found
109 def _find_description(self, descriptor_key, scan_value, bw_packet):
110 r"""!Find matching description for a scan value with Regex group support.
111
112 The search is performed in two passes for performance optimization:
113 1. First pass: Check for exact string matches (fast, no regex compilation)
114 2. Second pass: Check regex patterns only if no exact match was found
115
116 Regex patterns support capture groups that can be referenced in the description
117 using standard regex backreferences (\1, \2, etc.) via match.expand().
118
119 Example:
120 Pattern: r"(\d{7})"
121 Input: "1234567"
122 Description template: "RIC: \1"
123 Result: "RIC: 1234567"
124
125 @param descriptor_key: Cache key identifying the descriptor configuration
126 @param scan_value: Value to search for in the descriptor cache
127 @param bw_packet: BOSWatch packet for wildcard replacement
128 @return: Matched description string or None if no match found
129 """
130
131 descriptions = self.unified_cache.get(descriptor_key, [])
132 scan_value_str = str(scan_value).strip()
133
134 # First pass: Search for exact matches (performance optimization)
135 # Exact matches are checked first because they don't require regex compilation
136 for desc in descriptions:
137 if not desc.get('isRegex', False):
138 if desc['for'] == scan_value_str:
139 description_text = desc.get('add', '')
140 final_description = self._replace_wildcards(description_text, bw_packet)
141 return final_description
142
143 # Second pass: Search for regex matches
144 # Only executed if no exact match was found in the first pass
145 for desc in descriptions:
146 if desc.get('isRegex', False):
147 match_pattern = desc.get('for', '')
148 try:
149 match = re.search(match_pattern, scan_value_str)
150 if match:
151 description_text = desc.get('add', '')
152 # match.expand() replaces backreferences (\1, \2, etc.) with captured groups
153 # Example: pattern="(\d+)-(\d+)", input="123-456", template="First: \1, Second: \2"
154 # result="First: 123, Second: 456"
155 expanded_description = match.expand(description_text)
156 final_description = self._replace_wildcards(expanded_description, bw_packet)
157 return final_description
158 except re.error as e:
159 logging.error("Invalid regex pattern '%s': %s", match_pattern, e)
160
161 return None
162

◆ _replace_wildcards()

module.descriptor.BoswatchModule._replace_wildcards (   self,
  text,
  bw_packet 
)
protected

Replace all available wildcards in description text dynamically.

163 def _replace_wildcards(self, text, bw_packet):
164 r"""!Replace all available wildcards in description text dynamically."""
165 if not text or '{' not in text:
166 return text
167
168 result = text
169
170 # Search for wildcards in the format {KEY} and replace them with values from the bw_packet
171 found_wildcards = re.findall(r"\{([A-Z0-9_]+)\}", result)
172
173 for key in found_wildcards:
174 key_lower = key.lower()
175 value = bw_packet.get(key_lower)
176
177 if value is not None:
178 result = result.replace(f"{{{key}}}", str(value))
179 logging.debug("Replaced wildcard {%s} with value '%s'", key, value)
180
181 return result
182

◆ doWork()

module.descriptor.BoswatchModule.doWork (   self,
  bwPacket 
)

start an run of the module.

Parameters
bwPacketA BOSWatch packet instance

Reimplemented from module.moduleBase.ModuleBase.

183 def doWork(self, bwPacket):
184 r"""!start an run of the module.
185
186 @param bwPacket: A BOSWatch packet instance"""
187 logging.debug("Processing packet with mode: %s", bwPacket.get("mode"))
188
189 # Process each descriptor configuration
190 for descriptor_config in self.config:
191 scan_field = descriptor_config.get("scanField")
192 descr_field = descriptor_config.get("descrField")
193 descriptor_key = f"{scan_field}_{descr_field}"
194
195 logging.debug("Processing descriptor: scanField='%s', descrField='%s'", scan_field, descr_field)
196
197 # Check if scanField is present in packet
198 scan_value = bwPacket.get(scan_field)
199 if scan_value is None:
200 logging.debug("scanField '%s' not found in packet, skipping", scan_field)
201 continue # scanField not available in this packet - try next descriptor
202
203 # Set default value (content of scanField)
204 bwPacket.set(descr_field, str(scan_value))
205 logging.debug("Set default value '%s' for field '%s'", scan_value, descr_field)
206
207 # Search for matching description in unified cache
208 description = self._find_description(descriptor_key, scan_value, bwPacket)
209
210 if description:
211 bwPacket.set(descr_field, description)
212 logging.info("Description set: '%s' -> '%s'", scan_value, description)
213 else:
214 logging.debug("No description found for value '%s' in field '%s'", scan_value, scan_field)
215
216 logging.debug("Returning modified packet")
217 return bwPacket
218

◆ onUnload()

module.descriptor.BoswatchModule.onUnload (   self)

Called by destruction of the plugin.

Reimplemented from module.moduleBase.ModuleBase.

219 def onUnload(self):
220 r"""!Called by destruction of the plugin"""
221 pass

Field Documentation

◆ unified_cache

module.descriptor.BoswatchModule.unified_cache