heartbeat / heartbeat-srv.py /
Yanik Cawidrone Init
2af1abf 3 months ago
1 contributor
378 lines | 13.018kb
#!/usr/bin/env python3
import argparse
import socket
import threading
import json
import time
import paho.mqtt.client as pahomqtt
import logging
from logging.handlers import RotatingFileHandler
import base64
import requests
from datetime import datetime, timezone
from awscrt import io, mqtt, auth, http
from awsiot import mqtt_connection_builder

log_file = __file__.replace(".py",".log")
format = "%(asctime)s : %(levelname)s :  %(message)s"
logging.basicConfig(format=format,
  level=logging.INFO,
  datefmt='%Y-%m-%dT%H:%M:%S%z',
  handlers=[RotatingFileHandler(log_file,backupCount=2,maxBytes=1048576),logging.StreamHandler()])
  
# Store last seen timestamps per MAC
last_seen = {}
mac_status = {}
uuid_value = {}
uuid_details = {}
lock = threading.Lock()

# Default delta threshold in seconds
DELTA_THRESHOLD = 10
CHECK_INTERVAL = 5  # seconds
token_data=None
subid=None
configuration=None

# AWS IotCore
aws_mqtt_connection = None
aws_client_id="GatewayAlarmProbe"

def oss_login(client_id: str, client_secret: str):
  fqdn=configuration.get("oss").get("oss_fqdn")
  url=f"https://{fqdn}/users-auth/protocol/openid-connect/token"
  headers = { 'content-type': 'application/x-www-form-urlencoded' }
  data={
    'client_id': client_id,
    'client_secret': client_secret,
    'grant_type': 'client_credentials'
  }
  response = requests.post(url, data=data, headers =headers)
  return response.json()

def oss_gw_details(uuid: str):
  for bs_detail in bs_details:
    if uuid == bs_detail['lrrUUID']:
      logging.info(f"{uuid} Gateway details found")
      return bs_detail
  logging.warning(f"{uuid} Gateway details not found")
  return None
  
def oss_gw_all_details():
  fqdn=configuration.get("oss").get("oss_fqdn")
  url=f"https://{fqdn}/thingpark/wireless/rest/partners/mine/bss"
  headers = {
	'accept': 'application/json',
	'authorization': f"Bearer {token_data.get('access_token')}"
  }
  response = requests.get(url,headers =headers)
  return response.json().get('briefs',None)

def oss_sub_details():
  global subid
  fqdn=configuration.get("oss").get("oss_fqdn")
  url=f"https://{fqdn}/thingpark/wireless/rest/partners/mine"
  headers = {
	'accept': 'application/json',
	'authorization': f"Bearer {token_data.get('access_token')}"
  }
  response = requests.get(url,headers =headers)
  subid=response.json().get('partner').get('ID')
  return subid


  
def bs_alarm_format_cnx(lrrID,lrrName,uuid,tsStart,tsEnd):
  tsStartH=datetime.fromtimestamp(tsStart, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
  tsEndH=datetime.fromtimestamp(tsEnd, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
  addInfo1=int((tsEnd-tsStart)/60)
  alarm_msg={
    "BS_alarm": {
      "ID": 102,
      "State": 1,
      "CreationTime": tsStartH,
      "LastUpdateTime": tsEndH,
      "Occurrence": 1,
      "Acked": False,
      "AddInfo1": f"{addInfo1}",
      "LrrID": lrrID,
      "LrrUUID": uuid,
      "PartnerID": subid,
      "CustomerID": subid,
      "BaseStationData": {
        "name": lrrName
      },
      "ModelCfg": "1:TPX_00000000-0000-0000-0000-000000000000",
      "CustomerRealmID": "tpw-users-actility-tpe-ope",
      "downlinkUrl": f"https://{configuration.get('oss').get('oss_fqdn')}/iot-flow/downlinkMessages/00000000-0000-0000-0000-000000000000"
    }
  }
  return alarm_msg
  
def bs_alarm_format_disc(lrrID,lrrName,uuid,tsStart):
  tsStartH=datetime.fromtimestamp(tsStart, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
  # ~ tsEndH=datetime.fromtimestamp(tsEnd, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
  alarm_msg={
    "BS_alarm": {
      "ID": 102,
      "State": 6,
      "CreationTime": tsStartH,
      "LastUpdateTime": tsStartH,
      "Occurrence": 1,
      "Acked": False,
      "LrrID": lrrID,
      "LrrUUID": uuid,
      "PartnerID": subid,
      "CustomerID": subid,
      "BaseStationData": {
        "name": lrrName
      },
      "ModelCfg": "1:TPX_00000000-0000-0000-0000-000000000000",
      "CustomerRealmID": "tpw-users-actility-tpe-ope",
      "downlinkUrl": f"https://{configuration.get('oss').get('oss_fqdn')}/iot-flow/downlinkMessages/00000000-0000-0000-0000-000000000000"
    }
  }
  return alarm_msg


def mqtt_connect(broker_host, broker_port, user, password):
    client = pahomqtt.Client()
    if user:
        client.username_pw_set(user, password)
    client.connect(broker_host, broker_port, 60)
    client.loop_start()
    return client

def mac_to_uuid(mac):
  #logging.info(f"DEBUG: mac_to_uuid {mac}")
  #Kerlink: 7076FF-7076FF0204D1
  #Milesight: 24E124-24E124F78436
  #Tektelic: 647FDA-647FDA016D51
  if mac.startswith("70:76:ff"):
    uuid=f"7076FF-{mac.replace(':','')}".upper()
  elif mac.startswith("24:e1:24"):
    uuid=f"24E124-{mac.replace(':','')}".upper()
  elif mac.startswith("64:7f:da"):
    uuid=f"647FDA-{mac.replace(':','')}".upper()
  else:
    uuid=mac.replace(':','').upper()
  return uuid
  
def handle_client(conn, addr, mqtt_client, mqtt_topic):
  global mac_status
  global last_seen
  global uuid_value
  global uuid_details
  #logging.info(f"Connection from {addr[0]}:{addr[1]}")
  buffer = b""
  with conn:
    while True:
      try:
        data = conn.recv(1024)
        if not data:
            break
        buffer += data

        while b"\n" in buffer:
          line, buffer = buffer.split(b"\n", 1)
          line = line.strip()
          if not line:
              continue
          try:
            msg = json.loads(line.decode())
            mac = msg.get("mac")
            status = msg.get("status")
            epoch = msg.get("epoch", int(time.time()))
            # ~ logging.info(msg)
            if None == uuid_value.get(mac):
              uuid=mac_to_uuid(mac)            
              uuid_value[mac]=uuid
            else:
              uuid=uuid_value.get(mac)
            
            if None == uuid_details.get(mac):
              details=oss_gw_details(uuid)
              uuid_details[mac]=details
            else:
              details=uuid_details.get(mac)
              
            gw_name=uuid_value.get(mac)
            gw_lrrid=gw_name[-8:]
            if None != details:
              gw_name=details.get('name',None)
              gw_lrrid=details.get('lrrID',None)        
              
            logging.info(f"Heartbeat received {mac} : uuid: {uuid} lrc: {status} SubID: {subid} LRRID: {gw_lrrid} Name: {gw_name}")
            #mqtt_client.publish(mqtt_topic, json.dumps(msg))           
                
            # Update last_seen and status
            with lock:
              last_time=last_seen.get(mac)
              last_seen[mac] = epoch
              if mac_status.get(mac) is False:
                mac_status[mac] = True
                alarm_message=bs_alarm_format_cnx(gw_lrrid,
                                                  gw_name,
                                                  uuid,
                                                  int(last_time),
                                                  int(time.time()))
                mqtt_client.publish(mqtt_topic, json.dumps(alarm_message,indent=2))
                aws_push(f"tpx/things/{gw_lrrid}/uplink",alarm_message)
                logging.info(f"Reconnection alert: {alarm_message}")
              else:
                mac_status.setdefault(mac, True)
            #logging.info(f"Status: {mac_status[mac]} last_seen: {last_seen[mac]}")
          except json.JSONDecodeError as e:
            logging.error(f"Invalid JSON from {addr}: {e}")
      except ConnectionResetError:
        logging.error(f"Connection reset")
        break
      except Exception as e:
        logging.error(f"Generic: {e}")
        break
  
  details=uuid_details.get(mac)
  gw_name=uuid_value.get(mac)
  gw_lrrid=gw_name[-8:]
  if None != details:
    gw_name=details.get('name',None)
    gw_lrrid=details.get('lrrID',None)
  alarm_message=bs_alarm_format_disc(gw_lrrid,
                                     gw_name,
                                     uuid,
                                     int(time.time()))
  mqtt_client.publish(mqtt_topic, json.dumps(alarm_message,indent=2))
  logging.warning(f"Disconnection alert: {alarm_message}")
  aws_push(f"tpx/things/{gw_lrrid}/uplink",alarm_message)
  mac_status[mac]=False

def tcp_server(host, port, mqtt_client, mqtt_topic):
  server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  server.bind((host, port))
  server.listen(5)
  logging.info(f"Listening on {host}:{port}")
  
  try:
    while True:
      conn, addr = server.accept()
      t = threading.Thread(target=handle_client, args=(conn, addr, mqtt_client, mqtt_topic), daemon=True)
      t.start()
  except KeyboardInterrupt:
    logging.info("Server shutting down.")
  finally:
    server.close()

def timeout_monitor(mqtt_client, mqtt_topic, delta_threshold):
  logging.info(f"Monitor thread {delta_threshold}s")
  while True:
    now = int(time.time())
    with lock:
      for mac, ts in last_seen.items():
        #print(now - ts)
        if mac_status.get(mac, True) and (now - ts) > delta_threshold:
          details=uuid_details.get(mac)
          gw_name=uuid_value.get(mac)
          gw_lrrid=gw_name[-8:]
          if None != details:
            gw_name=details.get('name',None)
            gw_lrrid=details.get('lrrID',None)
            uuid=uuid_value.get(mac)
            alarm_message=bs_alarm_format_disc(gw_lrrid,
                                                 gw_name,
                                                 uuid,
                                                 int(time.time()))
            mqtt_client.publish(mqtt_topic, json.dumps(alarm_message,indent=2))
            logging.info(f"Timeout alert: {alarm_message}")
            aws_push(f"tpx/things/{gw_lrrid}/uplink",alarm_message)
          mac_status[mac] = False
    time.sleep(CHECK_INTERVAL)

def aws_connect():
  global aws_mqtt_connection
  event_loop_group = io.EventLoopGroup(1)
  host_resolver = io.DefaultHostResolver(event_loop_group)
  client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
  aws_mqtt_connection = mqtt_connection_builder.mtls_from_path(
      endpoint=configuration['aws']['aws_endpoint'],
      cert_filepath=configuration['aws']['aws_path_crt'],
      pri_key_filepath=configuration['aws']['aws_path_key'],
      client_bootstrap=client_bootstrap,
      ca_filepath=configuration['aws']['aws_path_ca'],
      client_id=aws_client_id,
      clean_session=False,
      keep_alive_secs=6
    )

  logging.info(f"AWS Connecting to {configuration['aws']['aws_endpoint']} with client ID {aws_client_id}")
  
  connect_future = aws_mqtt_connection.connect()
  connect_future.result()
  logging.info("AWS connected")
  
  
def aws_disconnect():
  global aws_mqtt_connection
  disconnect_future = aws_mqtt_connection.disconnect()
  disconnect_future.result()	
  
def aws_push(topic, message):
  global aws_mqtt_connection
  if not aws_mqtt_connection:
    aws_connect()
  logging.info(f"AWS topic: {topic} : {message}")
  
  try:
    aws_mqtt_connection.publish(topic=topic, payload=json.dumps(message), qos=mqtt.QoS.AT_LEAST_ONCE)
  except:
    logging.error(f"AWS published: {topic} : EXCEPTION")
    pass
  
  logging.info(f"AWS published : {topic} : OK")
  #aws_disconnect()
       
def main():
  global delta_threshold
  global token_data
  global bs_details
  global configuration
  global aws_client_id
  parser = argparse.ArgumentParser(description="Heartbeat TCP → MQTT forwarder")
  parser.add_argument("-c","--config", required=True, help="Configuration File")
  args = parser.parse_args()
  
  with open(args.config, "r", encoding="utf-8") as f:
    configuration = json.load(f)

  broker_host=configuration.get("mqtt").get("broker_name","0.0.0.0")
  broker_port=configuration.get("mqtt").get("broker_port",9999)
  broker_topic=configuration.get("mqtt").get("broker_topic","heartbeat")
  broker_user=configuration.get("mqtt").get("broker_user")
  broker_password=configuration.get("mqtt").get("broker_password")
  
  platform=configuration.get("oss").get("oss_fqdn")
  username=configuration.get("oss").get("oss_said")
  password=configuration.get("oss").get("oss_sasecret")
  token_data = oss_login(username, password)
  if not token_data:
    logging.error("Access Token not found")
    quit(1)
  oss_sub_details()
  logging.info(f"SubID: {subid}")
  bs_details=oss_gw_all_details()

  aws_client_id=f"GatewayAlarmProbe-{subid}"
  aws_connect()
  
  logging.info(f"MQTT broker: {broker_host}:{broker_port} {broker_topic}")
  
  mqtt_client = mqtt_connect(broker_host, broker_port, broker_user, broker_password)
  monitor_thread = threading.Thread(target=timeout_monitor,
                                      args=(mqtt_client, broker_topic, configuration.get("delta")),
                                      daemon=True)
  monitor_thread.start()
  tcp_server(configuration.get("server"), configuration.get("port"), mqtt_client, broker_topic)

if __name__ == "__main__":
    main()