BOSWatch 3
Python Script to receive and decode German BOS Information with rtl_fm and multimon-NG
 
Loading...
Searching...
No Matches
plugin.divera.BoswatchPlugin Class Reference

Description of the Plugin. More...

Public Member Functions

 __init__ (self, config)
 Do not change anything here!
 
 fms (self, bwPacket)
 Called on FMS alarm.
 
 pocsag (self, bwPacket)
 Called on POCSAG alarm.
 
 zvei (self, bwPacket)
 Called on ZVEI alarm.
 
 msg (self, bwPacket)
 Called on MSG packet.
 
- Public Member Functions inherited from plugin.pluginBase.PluginBase
 onLoad (self)
 Called by import of the plugin can be inherited.
 
 setup (self)
 Called before alarm can be inherited.
 
 teardown (self)
 Called after alarm can be inherited.
 
 onUnload (self)
 Called on shutdown of boswatch can be inherited.
 
 parseWildcards (self, msg)
 Return the message with parsed wildcards.
 

Protected Member Functions

 _makeRequests (self, apipath, apicall)
 
 _asyncRequests (self, url)
 
 _fetch (self, url, session)
 
- Protected Member Functions inherited from plugin.pluginBase.PluginBase
 _cleanup (self)
 Cleanup routine calls onUnload() directly.
 
 _run (self, bwPacket)
 start an complete running turn of an plugin.
 
 _getStatistics (self)
 Returns statistical information's from last plugin run.
 

Additional Inherited Members

- Data Fields inherited from plugin.pluginBase.PluginBase
 config
 
- Protected Attributes inherited from plugin.pluginBase.PluginBase
 _pluginName
 
 _bwPacket
 
 _sumTime
 
 _cumTime
 
 _setupTime
 
 _alarmTime
 
 _teardownTime
 
 _runCount
 
 _setupErrorCount
 
 _alarmErrorCount
 
 _teardownErrorCount
 
- Static Protected Attributes inherited from plugin.pluginBase.PluginBase
list _pluginsActive = []
 

Detailed Description

Description of the Plugin.

Constructor & Destructor Documentation

◆ __init__()

plugin.divera.BoswatchPlugin.__init__ (   self,
  config 
)

Do not change anything here!

Reimplemented from plugin.pluginBase.PluginBase.

32 def __init__(self, config):
33 r"""!Do not change anything here!"""
34 super().__init__(__name__, config) # you can access the config class on 'self.config'
35

Member Function Documentation

◆ fms()

plugin.divera.BoswatchPlugin.fms (   self,
  bwPacket 
)

Called on FMS alarm.

Parameters
bwPacketbwPacket instance Remove if not implemented

Reimplemented from plugin.pluginBase.PluginBase.

36 def fms(self, bwPacket):
37 r"""!Called on FMS alarm
38
39 @param bwPacket: bwPacket instance
40 Remove if not implemented"""
41 fms_data = self.config.get("fms")
42 apicall = urllib.parse.urlencode({
43 "accesskey": self.config.get("accesskey", default=""),
44 "vehicle_ric": self.parseWildcards(fms_data.get("vehicle", default="")),
45 "status_id": bwPacket.get("status"),
46 "status_note": bwPacket.get("directionText"),
47 "title": self.parseWildcards(fms_data.get("title", default="{FMS}")),
48 "text": self.parseWildcards(fms_data.get("message", default="{FMS}")),
49 "priority": fms_data.get("priority", default="false"),
50 })
51 apipath = "/api/fms"
52 self._makeRequests(apipath, apicall)
53

◆ pocsag()

plugin.divera.BoswatchPlugin.pocsag (   self,
  bwPacket 
)

Called on POCSAG alarm.

Parameters
bwPacketbwPacket instance Remove if not implemented

Reimplemented from plugin.pluginBase.PluginBase.

54 def pocsag(self, bwPacket):
55 r"""!Called on POCSAG alarm
56
57 @param bwPacket: bwPacket instance
58 Remove if not implemented"""
59 poc_data = self.config.get("pocsag")
60 apicall = urllib.parse.urlencode({
61 "accesskey": self.config.get("accesskey", default=""),
62 "title": self.parseWildcards(poc_data.get("title", default="{RIC}({SRIC})\n{MSG}")),
63 "ric": self.parseWildcards(poc_data.get("ric", default="")),
64 "text": self.parseWildcards(poc_data.get("message", default="{MSG}")),
65 "priority": poc_data.get("priority", default="false"),
66 })
67 apipath = "/api/alarm"
68 self._makeRequests(apipath, apicall)
69

◆ zvei()

plugin.divera.BoswatchPlugin.zvei (   self,
  bwPacket 
)

Called on ZVEI alarm.

Parameters
bwPacketbwPacket instance Remove if not implemented

Reimplemented from plugin.pluginBase.PluginBase.

70 def zvei(self, bwPacket):
71 r"""!Called on ZVEI alarm
72
73 @param bwPacket: bwPacket instance
74 Remove if not implemented"""
75 zvei_data = self.config.get("zvei")
76 apicall = urllib.parse.urlencode({
77 "accesskey": self.config.get("accesskey", default=""),
78 "title": self.parseWildcards(zvei_data.get("title", default="{TONE}")),
79 "ric": self.parseWildcards(zvei_data.get("ric", default="{TONE}")),
80 "text": self.parseWildcards(zvei_data.get("message", default="{TONE}")),
81 "priority": zvei_data.get("priority", default="false"),
82 })
83 apipath = "/api/alarm"
84 self._makeRequests(apipath, apicall)
85

◆ msg()

plugin.divera.BoswatchPlugin.msg (   self,
  bwPacket 
)

Called on MSG packet.

Parameters
bwPacketbwPacket instance Remove if not implemented

Reimplemented from plugin.pluginBase.PluginBase.

86 def msg(self, bwPacket):
87 r"""!Called on MSG packet
88
89 @param bwPacket: bwPacket instance
90 Remove if not implemented"""
91 msg_data = self.config.get("msg")
92 apicall = urllib.parse.urlencode({
93 "accesskey": self.config.get("accesskey", default=""),
94 "title": self.parseWildcards(msg_data.get("title", default="{MSG}")),
95 "ric": self.parseWildcards(msg_data.get("ric", default="")),
96 "text": self.parseWildcards(msg_data.get("message", default="{MSG}")),
97 "priority": msg_data.get("priority", default="false"),
98 })
99 apipath = "/api/alarm"
100 self._makeRequests(apipath, apicall)
101

◆ _makeRequests()

plugin.divera.BoswatchPlugin._makeRequests (   self,
  apipath,
  apicall 
)
protected
Parses wildcard urls and handles asynchronus requests

@param urls: array of urls
102 def _makeRequests(self, apipath, apicall):
103 """Parses wildcard urls and handles asynchronus requests
104
105 @param urls: array of urls"""
106 url = "https://www.divera247.com"
107 request = url + apipath + "?" + apicall
108
109 loop = asyncio.get_event_loop()
110
111 future = asyncio.ensure_future(self._asyncRequests(request))
112 loop.run_until_complete(future)
113

◆ _asyncRequests()

plugin.divera.BoswatchPlugin._asyncRequests (   self,
  url 
)
protected
Handles asynchronus requests

@param urls: array of urls to send requests to
114 async def _asyncRequests(self, url):
115 """Handles asynchronus requests
116
117 @param urls: array of urls to send requests to"""
118 tasks = []
119
120 async with ClientSession() as session:
121 logging.debug("Generated URL: [{}]".format(url))
122 task = asyncio.ensure_future(self._fetch(url, session))
123 tasks.append(task)
124
125 responses = asyncio.gather(*tasks)
126 await responses
127

◆ _fetch()

plugin.divera.BoswatchPlugin._fetch (   self,
  url,
  session 
)
protected
Fetches requests

@param url: url

@param session: Clientsession instance
128 async def _fetch(self, url, session):
129 """Fetches requests
130
131 @param url: url
132
133 @param session: Clientsession instance"""
134 logging.debug("Post URL: [{}]".format(url))
135 async with session.post(url) as response:
136 logging.info("{} returned [{}]".format(response.url, response.status))
137 return await response.read()