loraSMS / loraSMS.py /
Yanik Cawidrone Push everything
cbf75b6 a year ago
1 contributor
306 lines | 9.46kb
#!/usr/bin/python3
from datetime import datetime
import time
import argparse
import json
import paho.mqtt.client as mqtt
import sqlite3
import csv
import html
import threading
from userio import * 
import web

conn = None
c = None
topic=""
outfile=""
address_book_filename=""
myDBFilename="loraSMS.db"
configuration = None

THDEFERSTART=2
THSLEEPLOOP=30

def myDatabase_create():
  global conn
  global c
  conn = sqlite3.connect(myDBFilename)
  c = conn.cursor()
  c.execute('''CREATE TABLE IF NOT EXISTS address
             (DevEUI text,
             SubID text,
             DID int,
             UNIQUE(DevEUI) )''')
  c.execute('''CREATE TABLE IF NOT EXISTS sms
             (epoch int,
             DevEUIfrom text,
             DevEUIto text,
             urgent int,
             content text,
             date text,
             UNIQUE(epoch) )''')
             
def myDatabase_fill_address():
  global conn
  global c
  global configuration
  with open(address_book_filename) as csv_file:
    csv_reader = csv.reader(csv_file, delimiter=',')
    line_count = 0
    for row in csv_reader:
      sqlquery="INSERT OR IGNORE INTO address VALUES ('"+row[0]+"','"+row[1]+"',"+str(row[2])+");"
      c.execute(sqlquery)
  conn.commit()

def on_publish(client,userdata,result):
  # Do nothing more
  pass
          
def on_connect(client, userdata, flags, rc):
  dateExec = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  with open(outfile,'a+') as f:
    csvLine="CONNECT: "+dateExec+" : "+str(rc)
    say(csvLine)
    f.write(csvLine+"\n")
  client.subscribe(topic)

def on_message(client, userdata, msg):
  global conn
  global c
  global configuration
  json_data=str(msg.payload.decode('ASCII'))

  dateExec = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  
  with open(outfile,'a+') as f:
    try:
      parsed=json.loads(json_data)
    except:
      print(msg)
      f.write(msg)
      f.close()
      return

    # MQTT message type  
    mqttType=-1
    try:
      parsed['DevEUI_downlink']
      mqttType=1
    except:
      pass
    try:
      parsed['DevEUI_downlink_Sent']
      mqttType=2
    except:
      pass
    try:
      parsed['DevEUI_uplink']
      mqttType=3
    except:
      pass
    try:
      parsed['deviceEUI']
      mqttType=4
    except:
      pass
    try:
      parsed['DevEUI_notification']
      mqttType=5
    except:
      pass
    try:
      parsed['DevEUI_downlink_Rejected']
      mqttType=6
    except:
      pass
    try:
      parsed['DevEUI_location']
      mqttType=7
    except:
      pass

    csvLine=""
    if mqttType == 3:
      # Is a pure uplink
      try:
        parsed['DevEUI_uplink']['payload']['messageType']
      except:
        try:
          parsed['DevEUI_uplink']['rawJoinRequest']
          csvLine+="JOIN   : "+msg.topic+" : "+parsed['DevEUI_uplink']['Time']+" : "+parsed['DevEUI_uplink']['DevEUI']
        except:
          csvLine+="EXCEPT : "+msg.topic+" : "+parsed['DevEUI_uplink']['Time']+" : "+parsed['DevEUI_uplink']['DevEUI']
        print(json_data)
        return
        
      if parsed['DevEUI_uplink']['payload']['messageType'] == "SMS":
        # Is SMS
        destinationID=parsed['DevEUI_uplink']['payload']['destinationID']
        messageAscii=parsed['DevEUI_uplink']['payload']['messageAscii']
        messageHex=parsed['DevEUI_uplink']['payload']['messageHex']
        CustomerID=parsed['DevEUI_uplink']['CustomerID']
        payloadHex=parsed['DevEUI_uplink']['payload_hex']
        DateSec=parsed['DevEUI_uplink']['Time'].split(".")[0]
        DateEpoch=int(time.mktime(datetime.strptime(DateSec, "%Y-%m-%dT%H:%M:%S").timetuple()))
        
        # Protection
        # ~ messageAscii=messageAscii.replace("'","‘")
        messageAscii=html.escape(messageAscii,quote=True)
        
        sqlquery="SELECT DevEUI from address where did="+str(destinationID)+";"
        c.execute(sqlquery)
        rows = c.fetchall()
        try:
          DevEUIDestination=rows[0][0]
        except:
          DevEUIDestination="FFFFFFFFFFFFFFFF"
          
        sqlquery="SELECT did from address where DevEUI='"+parsed['DevEUI_uplink']['DevEUI']+"';"
        c.execute(sqlquery)
        rows = c.fetchall()
        try:
          sourceID=rows[0][0]
        except:
          sourceID="-1"
          
        csvLine+="----------------------------------\n"
        csvLine+="SMS    : "+msg.topic+" : "+parsed['DevEUI_uplink']['Time']+" : "+parsed['DevEUI_uplink']['DevEUI']+"\n"
        csvLine+="SubID  : "+CustomerID+" : to: "+str(destinationID)+" : "+DevEUIDestination+"\n"
        csvLine+="Content: "+messageHex+"\n"
        csvLine+="Content: "+messageAscii+"\n"
        csvLine+="----------------------------------"
        sqlquery="INSERT OR IGNORE INTO sms VALUES ("+str(DateEpoch)+",'"+parsed['DevEUI_uplink']['DevEUI']+"','"+DevEUIDestination+"',0,'"+messageAscii+"','"+DateSec+"');"
        #debug(sqlquery)
        c.execute(sqlquery)
        conn.commit()
        
        print(csvLine)
        f.write(csvLine+" : "+json_data+"\n")
        if not DevEUIDestination == "FFFFFFFFFFFFFFFF":
          #Sending Downlink
          pubTopic=configuration['topic'].replace("#","")
          pubTopic+="things/"
          pubTopic+=DevEUIDestination
          pubTopic+="/downlink"
          pubMessage={'DevEUI_downlink': {
                'DevEUI': DevEUIDestination,
                'FPort': 2,
                'Confirmed': 0,
                'FlushDownlinkQueue': 0,
                'payload_hex': payloadHex
              }
            }
          jsonPubMessage=json.dumps(pubMessage)
          ret= client.publish(pubTopic,jsonPubMessage)      
          
      else:
        # Is Other Uplink message
        csvLine+="UL     : "+msg.topic+" : "+parsed['DevEUI_uplink']['Time']+" : "+parsed['DevEUI_uplink']['DevEUI']+" : "+str(parsed['DevEUI_uplink']['FCntUp'])+" : "+parsed['DevEUI_uplink']['payload']['messageType']
        say(csvLine)
    elif mqttType == 1:
      # Is pure downlink
      csvLine+="DL     : "+msg.topic+" : "+parsed['DevEUI_downlink']['DevEUI']+" Queued: "+parsed['DevEUI_downlink']['payload_hex']
      say(csvLine)
    elif mqttType == 2:
      # Is pure downlink sent indicator
      csvLine+="DLSI   : "+msg.topic+" : "+parsed['DevEUI_downlink_Sent']['Time']+" : "+parsed['DevEUI_downlink_Sent']['DevEUI']
      say(csvLine)
    elif mqttType == 5:
      # Is pure notification
      csvLine+="NOTIF  : "+msg.topic+" : "+parsed['DevEUI_notification']['Time']+" : "+parsed['DevEUI_notification']['DevEUI']+" : "+parsed['DevEUI_notification']['Type']
      say(csvLine)
    elif mqttType == 6:
      # Is pure notification
      csvLine+="REJECT : "+msg.topic+" : "+parsed['DevEUI_downlink_Rejected']['Time']+" : "+parsed['DevEUI_downlink_Rejected']['DevEUI']+" : "+parsed['DevEUI_downlink_Rejected']['DownlinkRejectionCause']
      say(csvLine)
    elif mqttType == 7:
      # Is pure notification
      csvLine+="NETLOC : "+msg.topic+" : "+parsed['DevEUI_location']['Time']+" : "+parsed['DevEUI_location']['DevEUI']
      say(csvLine)
    elif mqttType == 4:
      # TPX LE output
      messageType=parsed['resolvedTracker']['messageType']
      try:
        uplinkMode=parsed['uplinkPayload']['deviceConfiguration']['mode']
      except:
        uplinkMode="UNKNOWN"
      endOfLine=""
      if "POSITION_MESSAGE" == messageType:
        try:
          endOfLine=" : "+uplinkMode
        except:
          print(json_data)
      FCnt=parsed['processedFeed']['sequenceNumber']
      csvLine+="TPXLE: "+msg.topic+" : "+parsed['time']+" : "+parsed['deviceEUI'].upper()+" : #"+str(FCnt)+" : "+messageType+endOfLine
      #csvLine+=" #"+str(FCnt)
      print(csvLine)
    else:
      csvLine="RAW:      "+" : "+msg.topic+" : "+dateExec
      print(csvLine)
      print(parsed)

    if configuration['store_all']:
      f.write(csvLine+" : "+json_data+"\n")


def configDump(configuration):
  say("MQTT broker: "+configuration['brokerName']+":"+str(configuration['brokerPort']))
  say("MQTT topic : "+configuration['topic'])
  say("MQTT log   : "+outfile)


def thCreateWeb():
  thName="WEB"
  thSleep=THSLEEPLOOP
  say("Thread "+thName+" Starting")
  time.sleep(THDEFERSTART) 
  while True:
    web.web.start(configuration)
    time.sleep(thSleep) 
    
if __name__ == "__main__":
  #
  parser = argparse.ArgumentParser()
  parser.add_argument("-c", "--config", help="config file")
  args = parser.parse_args()
  
  jsonFile=args.config
  fJson = open(jsonFile)
  configuration = json.loads(fJson.read())
  broker_address=configuration['brokerName']
  port=configuration['brokerPort']
  user=configuration['username']
  password=configuration['password']
  topic=configuration['topic']
  address_book_filename=configuration['address_book']
  outfile=configuration['brokerName']+"-"+topic
  outfile=outfile.replace("/","_").replace("#","_").replace("__","")+".log"
  
  try:
    thWeb = threading.Thread(target=thCreateWeb)
    thWeb.start()
  except KeyboardInterrupt:
    thWeb.join()  
  
  myDatabase_create()
  myDatabase_fill_address()
    
  client = mqtt.Client()
  client.on_publish = on_publish
  client.on_connect = on_connect
  client.on_message = on_message

  try:
    client.username_pw_set(user, password=password)
  except:
    error("ERROR: MQTT Set failed")
    quit()

  try:
    client.connect(broker_address, port, 60)
  except:
    error("ERROR: MQTT connect failed")
    quit()
  
  client.loop_forever()