1 contributor
#!/usr/bin/python3
from datetime import datetime
import time
import argparse
import json
import paho.mqtt.client as mqtt
import sqlite3
import csv
import html
import threading
from userio import *
import web
conn = None
c = None
topic=""
outfile=""
address_book_filename=""
myDBFilename="loraSMS.db"
configuration = None
THDEFERSTART=2
THSLEEPLOOP=30
def myDatabase_create():
global conn
global c
conn = sqlite3.connect(myDBFilename)
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS address
(DevEUI text,
SubID text,
DID int,
UNIQUE(DevEUI) )''')
c.execute('''CREATE TABLE IF NOT EXISTS sms
(epoch int,
DevEUIfrom text,
DevEUIto text,
urgent int,
content text,
date text,
UNIQUE(epoch) )''')
def myDatabase_fill_address():
global conn
global c
global configuration
with open(address_book_filename) as csv_file:
csv_reader = csv.reader(csv_file, delimiter=',')
line_count = 0
for row in csv_reader:
sqlquery="INSERT OR IGNORE INTO address VALUES ('"+row[0]+"','"+row[1]+"',"+str(row[2])+");"
c.execute(sqlquery)
conn.commit()
def on_publish(client,userdata,result):
# Do nothing more
pass
def on_connect(client, userdata, flags, rc):
dateExec = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(outfile,'a+') as f:
csvLine="CONNECT: "+dateExec+" : "+str(rc)
say(csvLine)
f.write(csvLine+"\n")
client.subscribe(topic)
def on_message(client, userdata, msg):
global conn
global c
global configuration
json_data=str(msg.payload.decode('ASCII'))
dateExec = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(outfile,'a+') as f:
try:
parsed=json.loads(json_data)
except:
print(msg)
f.write(msg)
f.close()
return
# MQTT message type
mqttType=-1
try:
parsed['DevEUI_downlink']
mqttType=1
except:
pass
try:
parsed['DevEUI_downlink_Sent']
mqttType=2
except:
pass
try:
parsed['DevEUI_uplink']
mqttType=3
except:
pass
try:
parsed['deviceEUI']
mqttType=4
except:
pass
try:
parsed['DevEUI_notification']
mqttType=5
except:
pass
try:
parsed['DevEUI_downlink_Rejected']
mqttType=6
except:
pass
try:
parsed['DevEUI_location']
mqttType=7
except:
pass
csvLine=""
if mqttType == 3:
# Is a pure uplink
try:
parsed['DevEUI_uplink']['payload']['messageType']
except:
try:
parsed['DevEUI_uplink']['rawJoinRequest']
csvLine+="JOIN : "+msg.topic+" : "+parsed['DevEUI_uplink']['Time']+" : "+parsed['DevEUI_uplink']['DevEUI']
except:
csvLine+="EXCEPT : "+msg.topic+" : "+parsed['DevEUI_uplink']['Time']+" : "+parsed['DevEUI_uplink']['DevEUI']
print(json_data)
return
if parsed['DevEUI_uplink']['payload']['messageType'] == "SMS":
# Is SMS
destinationID=parsed['DevEUI_uplink']['payload']['destinationID']
messageAscii=parsed['DevEUI_uplink']['payload']['messageAscii']
messageHex=parsed['DevEUI_uplink']['payload']['messageHex']
CustomerID=parsed['DevEUI_uplink']['CustomerID']
payloadHex=parsed['DevEUI_uplink']['payload_hex']
DateSec=parsed['DevEUI_uplink']['Time'].split(".")[0]
DateEpoch=int(time.mktime(datetime.strptime(DateSec, "%Y-%m-%dT%H:%M:%S").timetuple()))
# Protection
# ~ messageAscii=messageAscii.replace("'","‘")
messageAscii=html.escape(messageAscii,quote=True)
sqlquery="SELECT DevEUI from address where did="+str(destinationID)+";"
c.execute(sqlquery)
rows = c.fetchall()
try:
DevEUIDestination=rows[0][0]
except:
DevEUIDestination="FFFFFFFFFFFFFFFF"
sqlquery="SELECT did from address where DevEUI='"+parsed['DevEUI_uplink']['DevEUI']+"';"
c.execute(sqlquery)
rows = c.fetchall()
try:
sourceID=rows[0][0]
except:
sourceID="-1"
csvLine+="----------------------------------\n"
csvLine+="SMS : "+msg.topic+" : "+parsed['DevEUI_uplink']['Time']+" : "+parsed['DevEUI_uplink']['DevEUI']+"\n"
csvLine+="SubID : "+CustomerID+" : to: "+str(destinationID)+" : "+DevEUIDestination+"\n"
csvLine+="Content: "+messageHex+"\n"
csvLine+="Content: "+messageAscii+"\n"
csvLine+="----------------------------------"
sqlquery="INSERT OR IGNORE INTO sms VALUES ("+str(DateEpoch)+",'"+parsed['DevEUI_uplink']['DevEUI']+"','"+DevEUIDestination+"',0,'"+messageAscii+"','"+DateSec+"');"
#debug(sqlquery)
c.execute(sqlquery)
conn.commit()
print(csvLine)
f.write(csvLine+" : "+json_data+"\n")
if not DevEUIDestination == "FFFFFFFFFFFFFFFF":
#Sending Downlink
pubTopic=configuration['topic'].replace("#","")
pubTopic+="things/"
pubTopic+=DevEUIDestination
pubTopic+="/downlink"
pubMessage={'DevEUI_downlink': {
'DevEUI': DevEUIDestination,
'FPort': 2,
'Confirmed': 0,
'FlushDownlinkQueue': 0,
'payload_hex': payloadHex
}
}
jsonPubMessage=json.dumps(pubMessage)
ret= client.publish(pubTopic,jsonPubMessage)
else:
# Is Other Uplink message
csvLine+="UL : "+msg.topic+" : "+parsed['DevEUI_uplink']['Time']+" : "+parsed['DevEUI_uplink']['DevEUI']+" : "+str(parsed['DevEUI_uplink']['FCntUp'])+" : "+parsed['DevEUI_uplink']['payload']['messageType']
say(csvLine)
elif mqttType == 1:
# Is pure downlink
csvLine+="DL : "+msg.topic+" : "+parsed['DevEUI_downlink']['DevEUI']+" Queued: "+parsed['DevEUI_downlink']['payload_hex']
say(csvLine)
elif mqttType == 2:
# Is pure downlink sent indicator
csvLine+="DLSI : "+msg.topic+" : "+parsed['DevEUI_downlink_Sent']['Time']+" : "+parsed['DevEUI_downlink_Sent']['DevEUI']
say(csvLine)
elif mqttType == 5:
# Is pure notification
csvLine+="NOTIF : "+msg.topic+" : "+parsed['DevEUI_notification']['Time']+" : "+parsed['DevEUI_notification']['DevEUI']+" : "+parsed['DevEUI_notification']['Type']
say(csvLine)
elif mqttType == 6:
# Is pure notification
csvLine+="REJECT : "+msg.topic+" : "+parsed['DevEUI_downlink_Rejected']['Time']+" : "+parsed['DevEUI_downlink_Rejected']['DevEUI']+" : "+parsed['DevEUI_downlink_Rejected']['DownlinkRejectionCause']
say(csvLine)
elif mqttType == 7:
# Is pure notification
csvLine+="NETLOC : "+msg.topic+" : "+parsed['DevEUI_location']['Time']+" : "+parsed['DevEUI_location']['DevEUI']
say(csvLine)
elif mqttType == 4:
# TPX LE output
messageType=parsed['resolvedTracker']['messageType']
try:
uplinkMode=parsed['uplinkPayload']['deviceConfiguration']['mode']
except:
uplinkMode="UNKNOWN"
endOfLine=""
if "POSITION_MESSAGE" == messageType:
try:
endOfLine=" : "+uplinkMode
except:
print(json_data)
FCnt=parsed['processedFeed']['sequenceNumber']
csvLine+="TPXLE: "+msg.topic+" : "+parsed['time']+" : "+parsed['deviceEUI'].upper()+" : #"+str(FCnt)+" : "+messageType+endOfLine
#csvLine+=" #"+str(FCnt)
print(csvLine)
else:
csvLine="RAW: "+" : "+msg.topic+" : "+dateExec
print(csvLine)
print(parsed)
if configuration['store_all']:
f.write(csvLine+" : "+json_data+"\n")
def configDump(configuration):
say("MQTT broker: "+configuration['brokerName']+":"+str(configuration['brokerPort']))
say("MQTT topic : "+configuration['topic'])
say("MQTT log : "+outfile)
def thCreateWeb():
thName="WEB"
thSleep=THSLEEPLOOP
say("Thread "+thName+" Starting")
time.sleep(THDEFERSTART)
while True:
web.web.start(configuration)
time.sleep(thSleep)
if __name__ == "__main__":
#
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config", help="config file")
args = parser.parse_args()
jsonFile=args.config
fJson = open(jsonFile)
configuration = json.loads(fJson.read())
broker_address=configuration['brokerName']
port=configuration['brokerPort']
user=configuration['username']
password=configuration['password']
topic=configuration['topic']
address_book_filename=configuration['address_book']
outfile=configuration['brokerName']+"-"+topic
outfile=outfile.replace("/","_").replace("#","_").replace("__","")+".log"
try:
thWeb = threading.Thread(target=thCreateWeb)
thWeb.start()
except KeyboardInterrupt:
thWeb.join()
myDatabase_create()
myDatabase_fill_address()
client = mqtt.Client()
client.on_publish = on_publish
client.on_connect = on_connect
client.on_message = on_message
try:
client.username_pw_set(user, password=password)
except:
error("ERROR: MQTT Set failed")
quit()
try:
client.connect(broker_address, port, 60)
except:
error("ERROR: MQTT connect failed")
quit()
client.loop_forever()