heartbeat
/
heartbeat-srv.py
/
1 contributor
#!/usr/bin/env python3
import argparse
import socket
import threading
import json
import time
import paho.mqtt.client as pahomqtt
import logging
from logging.handlers import RotatingFileHandler
import base64
import requests
from datetime import datetime, timezone
from awscrt import io, mqtt, auth, http
from awsiot import mqtt_connection_builder
log_file = __file__.replace(".py",".log")
format = "%(asctime)s : %(levelname)s : %(message)s"
logging.basicConfig(format=format,
level=logging.INFO,
datefmt='%Y-%m-%dT%H:%M:%S%z',
handlers=[RotatingFileHandler(log_file,backupCount=2,maxBytes=1048576),logging.StreamHandler()])
# Store last seen timestamps per MAC
last_seen = {}
mac_status = {}
uuid_value = {}
uuid_details = {}
lock = threading.Lock()
# Default delta threshold in seconds
DELTA_THRESHOLD = 10
CHECK_INTERVAL = 5 # seconds
token_data=None
subid=None
configuration=None
# AWS IotCore
aws_mqtt_connection = None
aws_client_id="GatewayAlarmProbe"
def oss_login(client_id: str, client_secret: str):
fqdn=configuration.get("oss").get("oss_fqdn")
url=f"https://{fqdn}/users-auth/protocol/openid-connect/token"
headers = { 'content-type': 'application/x-www-form-urlencoded' }
data={
'client_id': client_id,
'client_secret': client_secret,
'grant_type': 'client_credentials'
}
response = requests.post(url, data=data, headers =headers)
return response.json()
def oss_gw_details(uuid: str):
for bs_detail in bs_details:
if uuid == bs_detail['lrrUUID']:
logging.info(f"{uuid} Gateway details found")
return bs_detail
logging.warning(f"{uuid} Gateway details not found")
return None
def oss_gw_all_details():
fqdn=configuration.get("oss").get("oss_fqdn")
url=f"https://{fqdn}/thingpark/wireless/rest/partners/mine/bss"
headers = {
'accept': 'application/json',
'authorization': f"Bearer {token_data.get('access_token')}"
}
response = requests.get(url,headers =headers)
return response.json().get('briefs',None)
def oss_sub_details():
global subid
fqdn=configuration.get("oss").get("oss_fqdn")
url=f"https://{fqdn}/thingpark/wireless/rest/partners/mine"
headers = {
'accept': 'application/json',
'authorization': f"Bearer {token_data.get('access_token')}"
}
response = requests.get(url,headers =headers)
subid=response.json().get('partner').get('ID')
return subid
def bs_alarm_format_cnx(lrrID,lrrName,uuid,tsStart,tsEnd):
tsStartH=datetime.fromtimestamp(tsStart, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
tsEndH=datetime.fromtimestamp(tsEnd, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
addInfo1=int((tsEnd-tsStart)/60)
alarm_msg={
"BS_alarm": {
"ID": 102,
"State": 1,
"CreationTime": tsStartH,
"LastUpdateTime": tsEndH,
"Occurrence": 1,
"Acked": False,
"AddInfo1": f"{addInfo1}",
"LrrID": lrrID,
"LrrUUID": uuid,
"PartnerID": subid,
"CustomerID": subid,
"BaseStationData": {
"name": lrrName
},
"ModelCfg": "1:TPX_00000000-0000-0000-0000-000000000000",
"CustomerRealmID": "tpw-users-actility-tpe-ope",
"downlinkUrl": f"https://{configuration.get('oss').get('oss_fqdn')}/iot-flow/downlinkMessages/00000000-0000-0000-0000-000000000000"
}
}
return alarm_msg
def bs_alarm_format_disc(lrrID,lrrName,uuid,tsStart):
tsStartH=datetime.fromtimestamp(tsStart, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
# ~ tsEndH=datetime.fromtimestamp(tsEnd, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
alarm_msg={
"BS_alarm": {
"ID": 102,
"State": 6,
"CreationTime": tsStartH,
"LastUpdateTime": tsStartH,
"Occurrence": 1,
"Acked": False,
"LrrID": lrrID,
"LrrUUID": uuid,
"PartnerID": subid,
"CustomerID": subid,
"BaseStationData": {
"name": lrrName
},
"ModelCfg": "1:TPX_00000000-0000-0000-0000-000000000000",
"CustomerRealmID": "tpw-users-actility-tpe-ope",
"downlinkUrl": f"https://{configuration.get('oss').get('oss_fqdn')}/iot-flow/downlinkMessages/00000000-0000-0000-0000-000000000000"
}
}
return alarm_msg
def mqtt_connect(broker_host, broker_port, user, password):
client = pahomqtt.Client()
if user:
client.username_pw_set(user, password)
client.connect(broker_host, broker_port, 60)
client.loop_start()
return client
def mac_to_uuid(mac):
#logging.info(f"DEBUG: mac_to_uuid {mac}")
#Kerlink: 7076FF-7076FF0204D1
#Milesight: 24E124-24E124F78436
#Tektelic: 647FDA-647FDA016D51
if mac.startswith("70:76:ff"):
uuid=f"7076FF-{mac.replace(':','')}".upper()
elif mac.startswith("24:e1:24"):
uuid=f"24E124-{mac.replace(':','')}".upper()
elif mac.startswith("64:7f:da"):
uuid=f"647FDA-{mac.replace(':','')}".upper()
else:
uuid=mac.replace(':','').upper()
return uuid
def handle_client(conn, addr, mqtt_client, mqtt_topic):
global mac_status
global last_seen
global uuid_value
global uuid_details
#logging.info(f"Connection from {addr[0]}:{addr[1]}")
buffer = b""
with conn:
while True:
try:
data = conn.recv(1024)
if not data:
break
buffer += data
while b"\n" in buffer:
line, buffer = buffer.split(b"\n", 1)
line = line.strip()
if not line:
continue
try:
msg = json.loads(line.decode())
mac = msg.get("mac")
status = msg.get("status")
epoch = msg.get("epoch", int(time.time()))
# ~ logging.info(msg)
if None == uuid_value.get(mac):
uuid=mac_to_uuid(mac)
uuid_value[mac]=uuid
else:
uuid=uuid_value.get(mac)
if None == uuid_details.get(mac):
details=oss_gw_details(uuid)
uuid_details[mac]=details
else:
details=uuid_details.get(mac)
gw_name=uuid_value.get(mac)
gw_lrrid=gw_name[-8:]
if None != details:
gw_name=details.get('name',None)
gw_lrrid=details.get('lrrID',None)
logging.info(f"Heartbeat received {mac} : uuid: {uuid} lrc: {status} SubID: {subid} LRRID: {gw_lrrid} Name: {gw_name}")
#mqtt_client.publish(mqtt_topic, json.dumps(msg))
# Update last_seen and status
with lock:
last_time=last_seen.get(mac)
last_seen[mac] = epoch
if mac_status.get(mac) is False:
mac_status[mac] = True
alarm_message=bs_alarm_format_cnx(gw_lrrid,
gw_name,
uuid,
int(last_time),
int(time.time()))
mqtt_client.publish(mqtt_topic, json.dumps(alarm_message,indent=2))
aws_push(f"tpx/things/{gw_lrrid}/uplink",alarm_message)
logging.info(f"Reconnection alert: {alarm_message}")
else:
mac_status.setdefault(mac, True)
#logging.info(f"Status: {mac_status[mac]} last_seen: {last_seen[mac]}")
except json.JSONDecodeError as e:
logging.error(f"Invalid JSON from {addr}: {e}")
except ConnectionResetError:
logging.error(f"Connection reset")
break
except Exception as e:
logging.error(f"Generic: {e}")
break
details=uuid_details.get(mac)
gw_name=uuid_value.get(mac)
gw_lrrid=gw_name[-8:]
if None != details:
gw_name=details.get('name',None)
gw_lrrid=details.get('lrrID',None)
alarm_message=bs_alarm_format_disc(gw_lrrid,
gw_name,
uuid,
int(time.time()))
mqtt_client.publish(mqtt_topic, json.dumps(alarm_message,indent=2))
logging.warning(f"Disconnection alert: {alarm_message}")
aws_push(f"tpx/things/{gw_lrrid}/uplink",alarm_message)
mac_status[mac]=False
def tcp_server(host, port, mqtt_client, mqtt_topic):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((host, port))
server.listen(5)
logging.info(f"Listening on {host}:{port}")
try:
while True:
conn, addr = server.accept()
t = threading.Thread(target=handle_client, args=(conn, addr, mqtt_client, mqtt_topic), daemon=True)
t.start()
except KeyboardInterrupt:
logging.info("Server shutting down.")
finally:
server.close()
def timeout_monitor(mqtt_client, mqtt_topic, delta_threshold):
logging.info(f"Monitor thread {delta_threshold}s")
while True:
now = int(time.time())
with lock:
for mac, ts in last_seen.items():
#print(now - ts)
if mac_status.get(mac, True) and (now - ts) > delta_threshold:
details=uuid_details.get(mac)
gw_name=uuid_value.get(mac)
gw_lrrid=gw_name[-8:]
if None != details:
gw_name=details.get('name',None)
gw_lrrid=details.get('lrrID',None)
uuid=uuid_value.get(mac)
alarm_message=bs_alarm_format_disc(gw_lrrid,
gw_name,
uuid,
int(time.time()))
mqtt_client.publish(mqtt_topic, json.dumps(alarm_message,indent=2))
logging.info(f"Timeout alert: {alarm_message}")
aws_push(f"tpx/things/{gw_lrrid}/uplink",alarm_message)
mac_status[mac] = False
time.sleep(CHECK_INTERVAL)
def aws_connect():
global aws_mqtt_connection
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
aws_mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=configuration['aws']['aws_endpoint'],
cert_filepath=configuration['aws']['aws_path_crt'],
pri_key_filepath=configuration['aws']['aws_path_key'],
client_bootstrap=client_bootstrap,
ca_filepath=configuration['aws']['aws_path_ca'],
client_id=aws_client_id,
clean_session=False,
keep_alive_secs=6
)
logging.info(f"AWS Connecting to {configuration['aws']['aws_endpoint']} with client ID {aws_client_id}")
connect_future = aws_mqtt_connection.connect()
connect_future.result()
logging.info("AWS connected")
def aws_disconnect():
global aws_mqtt_connection
disconnect_future = aws_mqtt_connection.disconnect()
disconnect_future.result()
def aws_push(topic, message):
global aws_mqtt_connection
if not aws_mqtt_connection:
aws_connect()
logging.info(f"AWS topic: {topic} : {message}")
try:
aws_mqtt_connection.publish(topic=topic, payload=json.dumps(message), qos=mqtt.QoS.AT_LEAST_ONCE)
except:
logging.error(f"AWS published: {topic} : EXCEPTION")
pass
logging.info(f"AWS published : {topic} : OK")
#aws_disconnect()
def main():
global delta_threshold
global token_data
global bs_details
global configuration
global aws_client_id
parser = argparse.ArgumentParser(description="Heartbeat TCP → MQTT forwarder")
parser.add_argument("-c","--config", required=True, help="Configuration File")
args = parser.parse_args()
with open(args.config, "r", encoding="utf-8") as f:
configuration = json.load(f)
broker_host=configuration.get("mqtt").get("broker_name","0.0.0.0")
broker_port=configuration.get("mqtt").get("broker_port",9999)
broker_topic=configuration.get("mqtt").get("broker_topic","heartbeat")
broker_user=configuration.get("mqtt").get("broker_user")
broker_password=configuration.get("mqtt").get("broker_password")
platform=configuration.get("oss").get("oss_fqdn")
username=configuration.get("oss").get("oss_said")
password=configuration.get("oss").get("oss_sasecret")
token_data = oss_login(username, password)
if not token_data:
logging.error("Access Token not found")
quit(1)
oss_sub_details()
logging.info(f"SubID: {subid}")
bs_details=oss_gw_all_details()
aws_client_id=f"GatewayAlarmProbe-{subid}"
aws_connect()
logging.info(f"MQTT broker: {broker_host}:{broker_port} {broker_topic}")
mqtt_client = mqtt_connect(broker_host, broker_port, broker_user, broker_password)
monitor_thread = threading.Thread(target=timeout_monitor,
args=(mqtt_client, broker_topic, configuration.get("delta")),
daemon=True)
monitor_thread.start()
tcp_server(configuration.get("server"), configuration.get("port"), mqtt_client, broker_topic)
if __name__ == "__main__":
main()