#!/usr/bin/python3
import sqlite3
import json
import argparse
import time
from datetime import datetime
import logging
from logging.handlers import RotatingFileHandler
from xml.etree.ElementTree import Element, SubElement, tostring, ElementTree
format = "%(asctime)s : %(levelname)s : %(message)s"
outputlog='create_table.log'
logging.basicConfig(format=format,
level=logging.INFO,
datefmt='%Y-%m-%dT%H:%M:%S%z',
handlers=[RotatingFileHandler(outputlog,backupCount=5,maxBytes=10485760),logging.StreamHandler()])
db_path = "http-okay.db"
table_name = "content"
def colorSnr(f):
if f < -7.5:
return "sf12"
elif f < -5:
return "sf11"
elif f < -2.5:
return "sf10"
elif f < 0:
return "sf9"
elif f < 2.5:
return "sf8"
else:
return "sf7"
def colorRssi(f):
if f < -105:
return "sf12"
elif f < -95:
return "sf11"
elif f < -85:
return "sf10"
elif f < -80:
return "sf9"
else:
return "sf7"
def calcSF(f):
if f < -7.5:
return "SF12"
elif f < -5:
return "SF11"
elif f < -2.5:
return "SF10"
elif f < 0:
return "SF9"
elif f < 2.5:
return "SF8"
else:
return "SF7"
def colorSnrValue(f):
if f < -7.5:
return "ffff0000" # Red
elif f < -5:
return "ffff5f1f" # Orange
elif f < -2.5:
return "ff7a8500" # Olive
elif f < 0:
return "ff52ad00" # Green
elif f < 2.5:
return "ff408000" # Dark Green
else:
return "ff15db00" # Bright Green
def generate_html_table(db_path, table_name,last):
dateExec = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logging.info(f"Exec : {dateExec}")
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
query = f"SELECT content FROM {table_name} ORDER BY id DESC LIMIT {last}"
cursor.execute(query)
records = cursor.fetchall()
conn.close()
headers = ["Time",
"DevEUI",
"FPort",
"FCntUp",
"FCntDn",
"Payload",
"SF",
"RSSI",
"SNR",
"LC",
"LRRID",
"Late",
"Count",
"MAC"
]
html="""
ApplicationServer
Application Server
KML
GeoJSON
Map
"""
html += f"Generated at {dateExec}
"
html += "\n"
html += "" + "".join(f"\n {header} " for header in headers) + "\n \n"
stats="""
ToggleAll
"""
html_table=""
deveuis=[]
for record in records:
try:
data = json.loads(record[0])
uplink = data.get("DevEUI_uplink", {})
deveui = uplink.get("DevEUI", "N/A")
if deveui not in deveuis:
stats += f"
{deveui}
\n"
deveuis.append(deveui)
except json.JSONDecodeError:
pass
stats+="
\n \n"
kml = Element('kml', xmlns="http://www.opengis.net/kml/2.2")
document = SubElement(kml, 'Document')
styles = {}
geojson = {
"type": "FeatureCollection",
"features": []
}
for record in records:
try:
data = json.loads(record[0])
uplink = data.get("DevEUI_uplink", {})
time = uplink.get("Time", "N/A")
deveui = uplink.get("DevEUI", "N/A")
fport = uplink.get("FPort", "N/A")
FCntUp = uplink.get("FCntUp", "N/A")
FCntDn = uplink.get("FCntDn", "N/A")
payload_hex = uplink.get("payload_hex", "N/A")
SpFact = uplink.get("SpFact", "na")
LrrRSSI = uplink.get("LrrRSSI", -199)
LrrSNR = uplink.get("LrrSNR", -99)
Channel = uplink.get("Channel", "-1")
Channel = int(Channel.replace("LC",""))
Lrrid = uplink.get("Lrrid", "N/A")
Late = uplink.get("Late", -1)
DevLrrCnt = uplink.get("DevLrrCnt", -1)
rawMacCommands = uplink.get("rawMacCommands", "N/A")
uplink_type="arrow-up"
if int(Late) == 1:
uplink_type="arrow-up-gray"
#table_line = f"
{time}{deveui} {fport} {FCntUp} {FCntDn} {payload_hex} "
collapse=json.dumps(data,indent=2)
table_line = f"{collapse}
{time}{deveui} {fport} {FCntUp} {FCntDn} {payload_hex} "
table_line += f"{SpFact} {LrrSNR} "
table_line += f"{Channel} {Lrrid} {Late} {DevLrrCnt} {rawMacCommands} "
logline=f"{time} {deveui} {fport} {FCntUp} {FCntDn} {payload_hex} {SpFact} {LrrRSSI} {LrrSNR} {Channel} {Lrrid}"
logging.info(logline)
html_table += table_line+"\n"
payload = uplink.get("payload", {})
if payload.get("messageType") != "EXTENDED_POSITION_MESSAGE":
continue
lat = payload.get("gpsLatitude")
lon = payload.get("gpsLongitude")
alt = payload.get("gpsAltitude", 0)
if lat is None or lon is None or LrrSNR is None:
continue
logging.info(f"{lat},{lon}")
color = colorSnrValue(LrrSNR)
style_id = f"snr_{color}"
calc_sf = calcSF(LrrSNR)
if style_id not in styles:
style = SubElement(document, "Style", id=style_id)
icon_style = SubElement(style, "IconStyle")
SubElement(icon_style, "color").text = color
SubElement(icon_style, "scale").text = "1.2"
icon = SubElement(icon_style, "Icon")
SubElement(icon, "href").text = "http://maps.google.com/mapfiles/kml/shapes/placemark_circle.png"
# Hide label
label_style = SubElement(style, "LabelStyle")
SubElement(label_style, "scale").text = "0"
styles[style_id] = True
# Create Placemark
placemark = SubElement(document, 'Placemark')
SubElement(placemark, 'name').text = ""
desc = f"""
Time: {time}
DevEUI: {deveui}
FCntUp: {FCntUp}
SNR: {LrrSNR}
SF: {calc_sf}
CalculatedSF: {calc_sf}
""".strip()
SubElement(placemark, 'description').text = desc
SubElement(placemark, 'styleUrl').text = f"#{style_id}"
point = SubElement(placemark, 'Point')
SubElement(point, 'coordinates').text = f"{lon},{lat},{alt}"
feature = {
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": [lon, lat, alt]
},
"properties": {
"DevEUI": deveui,
"Time": time,
"FCntUp": FCntUp,
"SNR": LrrSNR,
"SpFact": SpFact,
"SF": calc_sf,
"style": {
"marker-color": "#" + color[-6:], # Remove alpha from KML ARGB
"marker-symbol": "circle",
"marker-size": "medium"
}
}
}
geojson["features"].append(feature)
except json.JSONDecodeError:
#html_table += "Invalid JSON Data "
pass
tree = ElementTree(kml)
tree.write("positions.kml", encoding='utf-8', xml_declaration=True)
with open("positions.geojson", "w") as f:
json.dump(geojson, f, indent=2)
html += stats
html += html_table
html += "
"
html += """
"""
html += ""
html += ""
return html
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-l", "--last", help="N last records")
args = parser.parse_args()
while True:
html_output = generate_html_table(db_path, table_name,args.last)
with open("index.html","w") as f:
f.write(html_output);
time.sleep(30)