#!/usr/bin/env python3 import argparse import socket import threading import json import time import paho.mqtt.client as pahomqtt import logging from logging.handlers import RotatingFileHandler import base64 import requests from datetime import datetime, timezone from awscrt import io, mqtt, auth, http from awsiot import mqtt_connection_builder log_file = __file__.replace(".py",".log") format = "%(asctime)s : %(levelname)s : %(message)s" logging.basicConfig(format=format, level=logging.INFO, datefmt='%Y-%m-%dT%H:%M:%S%z', handlers=[RotatingFileHandler(log_file,backupCount=2,maxBytes=1048576),logging.StreamHandler()]) # Store last seen timestamps per MAC last_seen = {} mac_status = {} uuid_value = {} uuid_details = {} lock = threading.Lock() # Default delta threshold in seconds DELTA_THRESHOLD = 10 CHECK_INTERVAL = 5 # seconds token_data=None subid=None configuration=None # AWS IotCore aws_mqtt_connection = None aws_client_id="GatewayAlarmProbe" def oss_login(client_id: str, client_secret: str): fqdn=configuration.get("oss").get("oss_fqdn") url=f"https://{fqdn}/users-auth/protocol/openid-connect/token" headers = { 'content-type': 'application/x-www-form-urlencoded' } data={ 'client_id': client_id, 'client_secret': client_secret, 'grant_type': 'client_credentials' } response = requests.post(url, data=data, headers =headers) return response.json() def oss_gw_details(uuid: str): for bs_detail in bs_details: if uuid == bs_detail['lrrUUID']: logging.info(f"{uuid} Gateway details found") return bs_detail logging.warning(f"{uuid} Gateway details not found") return None def oss_gw_all_details(): fqdn=configuration.get("oss").get("oss_fqdn") url=f"https://{fqdn}/thingpark/wireless/rest/partners/mine/bss" headers = { 'accept': 'application/json', 'authorization': f"Bearer {token_data.get('access_token')}" } response = requests.get(url,headers =headers) return response.json().get('briefs',None) def oss_sub_details(): global subid fqdn=configuration.get("oss").get("oss_fqdn") url=f"https://{fqdn}/thingpark/wireless/rest/partners/mine" headers = { 'accept': 'application/json', 'authorization': f"Bearer {token_data.get('access_token')}" } response = requests.get(url,headers =headers) subid=response.json().get('partner').get('ID') return subid def bs_alarm_format_cnx(lrrID,lrrName,uuid,tsStart,tsEnd): tsStartH=datetime.fromtimestamp(tsStart, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") tsEndH=datetime.fromtimestamp(tsEnd, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") addInfo1=int((tsEnd-tsStart)/60) alarm_msg={ "BS_alarm": { "ID": 102, "State": 1, "CreationTime": tsStartH, "LastUpdateTime": tsEndH, "Occurrence": 1, "Acked": False, "AddInfo1": f"{addInfo1}", "LrrID": lrrID, "LrrUUID": uuid, "PartnerID": subid, "CustomerID": subid, "BaseStationData": { "name": lrrName }, "ModelCfg": "1:TPX_00000000-0000-0000-0000-000000000000", "CustomerRealmID": "tpw-users-actility-tpe-ope", "downlinkUrl": f"https://{configuration.get('oss').get('oss_fqdn')}/iot-flow/downlinkMessages/00000000-0000-0000-0000-000000000000" } } return alarm_msg def bs_alarm_format_disc(lrrID,lrrName,uuid,tsStart): tsStartH=datetime.fromtimestamp(tsStart, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") # ~ tsEndH=datetime.fromtimestamp(tsEnd, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") alarm_msg={ "BS_alarm": { "ID": 102, "State": 6, "CreationTime": tsStartH, "LastUpdateTime": tsStartH, "Occurrence": 1, "Acked": False, "LrrID": lrrID, "LrrUUID": uuid, "PartnerID": subid, "CustomerID": subid, "BaseStationData": { "name": lrrName }, "ModelCfg": "1:TPX_00000000-0000-0000-0000-000000000000", "CustomerRealmID": "tpw-users-actility-tpe-ope", "downlinkUrl": f"https://{configuration.get('oss').get('oss_fqdn')}/iot-flow/downlinkMessages/00000000-0000-0000-0000-000000000000" } } return alarm_msg def mqtt_connect(broker_host, broker_port, user, password): client = pahomqtt.Client() if user: client.username_pw_set(user, password) client.connect(broker_host, broker_port, 60) client.loop_start() return client def mac_to_uuid(mac): #logging.info(f"DEBUG: mac_to_uuid {mac}") #Kerlink: 7076FF-7076FF0204D1 #Milesight: 24E124-24E124F78436 #Tektelic: 647FDA-647FDA016D51 if mac.startswith("70:76:ff"): uuid=f"7076FF-{mac.replace(':','')}".upper() elif mac.startswith("24:e1:24"): uuid=f"24E124-{mac.replace(':','')}".upper() elif mac.startswith("64:7f:da"): uuid=f"647FDA-{mac.replace(':','')}".upper() else: uuid=mac.replace(':','').upper() return uuid def handle_client(conn, addr, mqtt_client, mqtt_topic): global mac_status global last_seen global uuid_value global uuid_details #logging.info(f"Connection from {addr[0]}:{addr[1]}") buffer = b"" with conn: while True: try: data = conn.recv(1024) if not data: break buffer += data while b"\n" in buffer: line, buffer = buffer.split(b"\n", 1) line = line.strip() if not line: continue try: msg = json.loads(line.decode()) mac = msg.get("mac") status = msg.get("status") epoch = msg.get("epoch", int(time.time())) # ~ logging.info(msg) if None == uuid_value.get(mac): uuid=mac_to_uuid(mac) uuid_value[mac]=uuid else: uuid=uuid_value.get(mac) if None == uuid_details.get(mac): details=oss_gw_details(uuid) uuid_details[mac]=details else: details=uuid_details.get(mac) gw_name=uuid_value.get(mac) gw_lrrid=gw_name[-8:] if None != details: gw_name=details.get('name',None) gw_lrrid=details.get('lrrID',None) logging.info(f"Heartbeat received {mac} : uuid: {uuid} lrc: {status} SubID: {subid} LRRID: {gw_lrrid} Name: {gw_name}") #mqtt_client.publish(mqtt_topic, json.dumps(msg)) # Update last_seen and status with lock: last_time=last_seen.get(mac) last_seen[mac] = epoch if mac_status.get(mac) is False: mac_status[mac] = True alarm_message=bs_alarm_format_cnx(gw_lrrid, gw_name, uuid, int(last_time), int(time.time())) mqtt_client.publish(mqtt_topic, json.dumps(alarm_message,indent=2)) aws_push(f"tpx/things/{gw_lrrid}/uplink",alarm_message) logging.info(f"Reconnection alert: {alarm_message}") else: mac_status.setdefault(mac, True) #logging.info(f"Status: {mac_status[mac]} last_seen: {last_seen[mac]}") except json.JSONDecodeError as e: logging.error(f"Invalid JSON from {addr}: {e}") except ConnectionResetError: logging.error(f"Connection reset") break except Exception as e: logging.error(f"Generic: {e}") break details=uuid_details.get(mac) gw_name=uuid_value.get(mac) gw_lrrid=gw_name[-8:] if None != details: gw_name=details.get('name',None) gw_lrrid=details.get('lrrID',None) alarm_message=bs_alarm_format_disc(gw_lrrid, gw_name, uuid, int(time.time())) mqtt_client.publish(mqtt_topic, json.dumps(alarm_message,indent=2)) logging.warning(f"Disconnection alert: {alarm_message}") aws_push(f"tpx/things/{gw_lrrid}/uplink",alarm_message) mac_status[mac]=False def tcp_server(host, port, mqtt_client, mqtt_topic): server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind((host, port)) server.listen(5) logging.info(f"Listening on {host}:{port}") try: while True: conn, addr = server.accept() t = threading.Thread(target=handle_client, args=(conn, addr, mqtt_client, mqtt_topic), daemon=True) t.start() except KeyboardInterrupt: logging.info("Server shutting down.") finally: server.close() def timeout_monitor(mqtt_client, mqtt_topic, delta_threshold): logging.info(f"Monitor thread {delta_threshold}s") while True: now = int(time.time()) with lock: for mac, ts in last_seen.items(): #print(now - ts) if mac_status.get(mac, True) and (now - ts) > delta_threshold: details=uuid_details.get(mac) gw_name=uuid_value.get(mac) gw_lrrid=gw_name[-8:] if None != details: gw_name=details.get('name',None) gw_lrrid=details.get('lrrID',None) uuid=uuid_value.get(mac) alarm_message=bs_alarm_format_disc(gw_lrrid, gw_name, uuid, int(time.time())) mqtt_client.publish(mqtt_topic, json.dumps(alarm_message,indent=2)) logging.info(f"Timeout alert: {alarm_message}") aws_push(f"tpx/things/{gw_lrrid}/uplink",alarm_message) mac_status[mac] = False time.sleep(CHECK_INTERVAL) def aws_connect(): global aws_mqtt_connection event_loop_group = io.EventLoopGroup(1) host_resolver = io.DefaultHostResolver(event_loop_group) client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver) aws_mqtt_connection = mqtt_connection_builder.mtls_from_path( endpoint=configuration['aws']['aws_endpoint'], cert_filepath=configuration['aws']['aws_path_crt'], pri_key_filepath=configuration['aws']['aws_path_key'], client_bootstrap=client_bootstrap, ca_filepath=configuration['aws']['aws_path_ca'], client_id=aws_client_id, clean_session=False, keep_alive_secs=6 ) logging.info(f"AWS Connecting to {configuration['aws']['aws_endpoint']} with client ID {aws_client_id}") connect_future = aws_mqtt_connection.connect() connect_future.result() logging.info("AWS connected") def aws_disconnect(): global aws_mqtt_connection disconnect_future = aws_mqtt_connection.disconnect() disconnect_future.result() def aws_push(topic, message): global aws_mqtt_connection if not aws_mqtt_connection: aws_connect() logging.info(f"AWS topic: {topic} : {message}") try: aws_mqtt_connection.publish(topic=topic, payload=json.dumps(message), qos=mqtt.QoS.AT_LEAST_ONCE) except: logging.error(f"AWS published: {topic} : EXCEPTION") pass logging.info(f"AWS published : {topic} : OK") #aws_disconnect() def main(): global delta_threshold global token_data global bs_details global configuration global aws_client_id parser = argparse.ArgumentParser(description="Heartbeat TCP → MQTT forwarder") parser.add_argument("-c","--config", required=True, help="Configuration File") args = parser.parse_args() with open(args.config, "r", encoding="utf-8") as f: configuration = json.load(f) broker_host=configuration.get("mqtt").get("broker_name","0.0.0.0") broker_port=configuration.get("mqtt").get("broker_port",9999) broker_topic=configuration.get("mqtt").get("broker_topic","heartbeat") broker_user=configuration.get("mqtt").get("broker_user") broker_password=configuration.get("mqtt").get("broker_password") platform=configuration.get("oss").get("oss_fqdn") username=configuration.get("oss").get("oss_said") password=configuration.get("oss").get("oss_sasecret") token_data = oss_login(username, password) if not token_data: logging.error("Access Token not found") quit(1) oss_sub_details() logging.info(f"SubID: {subid}") bs_details=oss_gw_all_details() aws_client_id=f"GatewayAlarmProbe-{subid}" aws_connect() logging.info(f"MQTT broker: {broker_host}:{broker_port} {broker_topic}") mqtt_client = mqtt_connect(broker_host, broker_port, broker_user, broker_password) monitor_thread = threading.Thread(target=timeout_monitor, args=(mqtt_client, broker_topic, configuration.get("delta")), daemon=True) monitor_thread.start() tcp_server(configuration.get("server"), configuration.get("port"), mqtt_client, broker_topic) if __name__ == "__main__": main()