#!/usr/bin/python3 from datetime import datetime import time import argparse import json import paho.mqtt.client as mqtt import sqlite3 import csv import html import threading from userio import * import web conn = None c = None topic="" outfile="" address_book_filename="" myDBFilename="loraSMS.db" configuration = None THDEFERSTART=2 THSLEEPLOOP=30 def myDatabase_create(): global conn global c conn = sqlite3.connect(myDBFilename) c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS address (DevEUI text, SubID text, DID int, UNIQUE(DevEUI) )''') c.execute('''CREATE TABLE IF NOT EXISTS sms (epoch int, DevEUIfrom text, DevEUIto text, urgent int, content text, date text, UNIQUE(epoch) )''') def myDatabase_fill_address(): global conn global c global configuration with open(address_book_filename) as csv_file: csv_reader = csv.reader(csv_file, delimiter=',') line_count = 0 for row in csv_reader: sqlquery="INSERT OR IGNORE INTO address VALUES ('"+row[0]+"','"+row[1]+"',"+str(row[2])+");" c.execute(sqlquery) conn.commit() def on_publish(client,userdata,result): # Do nothing more pass def on_connect(client, userdata, flags, rc): dateExec = datetime.now().strftime("%Y-%m-%d %H:%M:%S") with open(outfile,'a+') as f: csvLine="CONNECT: "+dateExec+" : "+str(rc) say(csvLine) f.write(csvLine+"\n") client.subscribe(topic) def on_message(client, userdata, msg): global conn global c global configuration json_data=str(msg.payload.decode('ASCII')) dateExec = datetime.now().strftime("%Y-%m-%d %H:%M:%S") with open(outfile,'a+') as f: try: parsed=json.loads(json_data) except: print(msg) f.write(msg) f.close() return # MQTT message type mqttType=-1 try: parsed['DevEUI_downlink'] mqttType=1 except: pass try: parsed['DevEUI_downlink_Sent'] mqttType=2 except: pass try: parsed['DevEUI_uplink'] mqttType=3 except: pass try: parsed['deviceEUI'] mqttType=4 except: pass try: parsed['DevEUI_notification'] mqttType=5 except: pass try: parsed['DevEUI_downlink_Rejected'] mqttType=6 except: pass try: parsed['DevEUI_location'] mqttType=7 except: pass csvLine="" if mqttType == 3: # Is a pure uplink try: parsed['DevEUI_uplink']['payload']['messageType'] except: try: parsed['DevEUI_uplink']['rawJoinRequest'] csvLine+="JOIN : "+msg.topic+" : "+parsed['DevEUI_uplink']['Time']+" : "+parsed['DevEUI_uplink']['DevEUI'] except: csvLine+="EXCEPT : "+msg.topic+" : "+parsed['DevEUI_uplink']['Time']+" : "+parsed['DevEUI_uplink']['DevEUI'] print(json_data) return if parsed['DevEUI_uplink']['payload']['messageType'] == "SMS": # Is SMS destinationID=parsed['DevEUI_uplink']['payload']['destinationID'] messageAscii=parsed['DevEUI_uplink']['payload']['messageAscii'] messageHex=parsed['DevEUI_uplink']['payload']['messageHex'] CustomerID=parsed['DevEUI_uplink']['CustomerID'] payloadHex=parsed['DevEUI_uplink']['payload_hex'] DateSec=parsed['DevEUI_uplink']['Time'].split(".")[0] DateEpoch=int(time.mktime(datetime.strptime(DateSec, "%Y-%m-%dT%H:%M:%S").timetuple())) # Protection # ~ messageAscii=messageAscii.replace("'","‘") messageAscii=html.escape(messageAscii,quote=True) sqlquery="SELECT DevEUI from address where did="+str(destinationID)+";" c.execute(sqlquery) rows = c.fetchall() try: DevEUIDestination=rows[0][0] except: DevEUIDestination="FFFFFFFFFFFFFFFF" sqlquery="SELECT did from address where DevEUI='"+parsed['DevEUI_uplink']['DevEUI']+"';" c.execute(sqlquery) rows = c.fetchall() try: sourceID=rows[0][0] except: sourceID="-1" csvLine+="----------------------------------\n" csvLine+="SMS : "+msg.topic+" : "+parsed['DevEUI_uplink']['Time']+" : "+parsed['DevEUI_uplink']['DevEUI']+"\n" csvLine+="SubID : "+CustomerID+" : to: "+str(destinationID)+" : "+DevEUIDestination+"\n" csvLine+="Content: "+messageHex+"\n" csvLine+="Content: "+messageAscii+"\n" csvLine+="----------------------------------" sqlquery="INSERT OR IGNORE INTO sms VALUES ("+str(DateEpoch)+",'"+parsed['DevEUI_uplink']['DevEUI']+"','"+DevEUIDestination+"',0,'"+messageAscii+"','"+DateSec+"');" #debug(sqlquery) c.execute(sqlquery) conn.commit() print(csvLine) f.write(csvLine+" : "+json_data+"\n") if not DevEUIDestination == "FFFFFFFFFFFFFFFF": #Sending Downlink pubTopic=configuration['topic'].replace("#","") pubTopic+="things/" pubTopic+=DevEUIDestination pubTopic+="/downlink" pubMessage={'DevEUI_downlink': { 'DevEUI': DevEUIDestination, 'FPort': 2, 'Confirmed': 0, 'FlushDownlinkQueue': 0, 'payload_hex': payloadHex } } jsonPubMessage=json.dumps(pubMessage) ret= client.publish(pubTopic,jsonPubMessage) else: # Is Other Uplink message csvLine+="UL : "+msg.topic+" : "+parsed['DevEUI_uplink']['Time']+" : "+parsed['DevEUI_uplink']['DevEUI']+" : "+str(parsed['DevEUI_uplink']['FCntUp'])+" : "+parsed['DevEUI_uplink']['payload']['messageType'] say(csvLine) elif mqttType == 1: # Is pure downlink csvLine+="DL : "+msg.topic+" : "+parsed['DevEUI_downlink']['DevEUI']+" Queued: "+parsed['DevEUI_downlink']['payload_hex'] say(csvLine) elif mqttType == 2: # Is pure downlink sent indicator csvLine+="DLSI : "+msg.topic+" : "+parsed['DevEUI_downlink_Sent']['Time']+" : "+parsed['DevEUI_downlink_Sent']['DevEUI'] say(csvLine) elif mqttType == 5: # Is pure notification csvLine+="NOTIF : "+msg.topic+" : "+parsed['DevEUI_notification']['Time']+" : "+parsed['DevEUI_notification']['DevEUI']+" : "+parsed['DevEUI_notification']['Type'] say(csvLine) elif mqttType == 6: # Is pure notification csvLine+="REJECT : "+msg.topic+" : "+parsed['DevEUI_downlink_Rejected']['Time']+" : "+parsed['DevEUI_downlink_Rejected']['DevEUI']+" : "+parsed['DevEUI_downlink_Rejected']['DownlinkRejectionCause'] say(csvLine) elif mqttType == 7: # Is pure notification csvLine+="NETLOC : "+msg.topic+" : "+parsed['DevEUI_location']['Time']+" : "+parsed['DevEUI_location']['DevEUI'] say(csvLine) elif mqttType == 4: # TPX LE output messageType=parsed['resolvedTracker']['messageType'] try: uplinkMode=parsed['uplinkPayload']['deviceConfiguration']['mode'] except: uplinkMode="UNKNOWN" endOfLine="" if "POSITION_MESSAGE" == messageType: try: endOfLine=" : "+uplinkMode except: print(json_data) FCnt=parsed['processedFeed']['sequenceNumber'] csvLine+="TPXLE: "+msg.topic+" : "+parsed['time']+" : "+parsed['deviceEUI'].upper()+" : #"+str(FCnt)+" : "+messageType+endOfLine #csvLine+=" #"+str(FCnt) print(csvLine) else: csvLine="RAW: "+" : "+msg.topic+" : "+dateExec print(csvLine) print(parsed) if configuration['store_all']: f.write(csvLine+" : "+json_data+"\n") def configDump(configuration): say("MQTT broker: "+configuration['brokerName']+":"+str(configuration['brokerPort'])) say("MQTT topic : "+configuration['topic']) say("MQTT log : "+outfile) def thCreateWeb(): thName="WEB" thSleep=THSLEEPLOOP say("Thread "+thName+" Starting") time.sleep(THDEFERSTART) while True: web.web.start(configuration) time.sleep(thSleep) if __name__ == "__main__": # parser = argparse.ArgumentParser() parser.add_argument("-c", "--config", help="config file") args = parser.parse_args() jsonFile=args.config fJson = open(jsonFile) configuration = json.loads(fJson.read()) broker_address=configuration['brokerName'] port=configuration['brokerPort'] user=configuration['username'] password=configuration['password'] topic=configuration['topic'] address_book_filename=configuration['address_book'] outfile=configuration['brokerName']+"-"+topic outfile=outfile.replace("/","_").replace("#","_").replace("__","")+".log" try: thWeb = threading.Thread(target=thCreateWeb) thWeb.start() except KeyboardInterrupt: thWeb.join() myDatabase_create() myDatabase_fill_address() client = mqtt.Client() client.on_publish = on_publish client.on_connect = on_connect client.on_message = on_message try: client.username_pw_set(user, password=password) except: error("ERROR: MQTT Set failed") quit() try: client.connect(broker_address, port, 60) except: error("ERROR: MQTT connect failed") quit() client.loop_forever()