1 contributor
#!/usr/bin/python3
import json
max_eirp_decoded = [8, 10, 12, 13, 14, 16, 18, 20, 21, 24, 26, 27, 29, 30, 33, 36]
timing_setuq_req_delay = [ 1,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15]
device_mode_class=["Class-A","RFU","Class-C"]
# Format: CID,Name,size,Direction [0 uplink, 1 downlink, 2 both]
mac_commands = [
#All Classes
{"cid": 0x02, "name": "LinkCheckReq", "size": 0, "direction": 0},#UL
{"cid": 0x02, "name": "LinkCheckAns", "size": 2, "direction": 1},#DL
{"cid": 0x03, "name": "LinkADRReq", "size": 4, "direction": 1},#DL
{"cid": 0x03, "name": "LinkADRAns", "size": 1, "direction": 0},#UL
{"cid": 0x04, "name": "DutyCycleReq", "size": 1, "direction": 1},#DL
{"cid": 0x04, "name": "DutyCycleAns", "size": 0, "direction": 0},#UL
{"cid": 0x05, "name": "RXParamSetupReq", "size": 4, "direction": 1},#DL
{"cid": 0x05, "name": "RXParamSetupAns", "size": 1, "direction": 0},#UL
{"cid": 0x06, "name": "DevStatusReq", "size": 0, "direction": 1},#DL
{"cid": 0x06, "name": "DevStatusAns", "size": 2, "direction": 0},#UL
{"cid": 0x07, "name": "NewChannelReq", "size": 5, "direction": 1},#DL
{"cid": 0x07, "name": "NewChannelAns", "size": 1, "direction": 0},#UL
{"cid": 0x08, "name": "RXTimingSetupReq", "size": 1, "direction": 1},#DL
{"cid": 0x08, "name": "RXTimingSetupAns", "size": 0, "direction": 0},#UL
{"cid": 0x09, "name": "TxParamSetupReq", "size": 1, "direction": 1},#DL
{"cid": 0x09, "name": "TxParamSetupAns", "size": 0, "direction": 0},#UL
{"cid": 0x0A, "name": "DlChannelReq", "size": 4, "direction": 1},#DL
{"cid": 0x0A, "name": "DlChannelAns", "size": 1, "direction": 0},#UL
{"cid": 0x0B, "name": "RekeyConf", "size": 1, "direction": 1},#DL
{"cid": 0x0B, "name": "RekeyInd", "size": 1, "direction": 0},#UL
{"cid": 0x0C, "name": "ADRParamSetupReq", "size": 1, "direction": 1},#DL
{"cid": 0x0C, "name": "ADRParamSetupAns", "size": 0, "direction": 0},#UL
{"cid": 0x0D, "name": "DeviceTimeReq", "size": 0, "direction": 0},#UL
{"cid": 0x0D, "name": "DeviceTimeAns", "size": 5, "direction": 1},#DL
{"cid": 0x0E, "name": "ForceRejoinReq", "size": 2, "direction": 1},#DL
{"cid": 0x0F, "name": "RejoinParamSetupReq", "size": 1, "direction": 1},#DL
{"cid": 0x0F, "name": "RejoinParamSetupAns", "size": 1, "direction": 0},#UL
#Class-B
{"cid": 0x10, "name": "PingSlotInfoReq", "size": 1, "direction": 0},#UL
{"cid": 0x10, "name": "PingSlotInfoAns", "size": 0, "direction": 1},#DL
{"cid": 0x11, "name": "PingSlotChannelReq", "size": 4, "direction": 1},#DL
{"cid": 0x11, "name": "PingSlotChannelAns", "size": 1, "direction": 0},#UL
{"cid": 0x12, "name": "BeaconTimingReq", "size": 0, "direction": 0},#UL
{"cid": 0x12, "name": "BeaconTimingAns", "size": 3, "direction": 0},#DL
{"cid": 0x13, "name": "BeaconFreqReq", "size": 3, "direction": 1},#DL
{"cid": 0x13, "name": "BeaconFreqAns", "size": 0, "direction": 0},#UL
#Class-C
{"cid": 0x20, "name": "DeviceModeConf", "size": 1, "direction": 1},#DL
{"cid": 0x20, "name": "DeviceModeInd", "size": 1, "direction": 0},#UL
]
ul_cids=[]
dl_cids=[]
def mac_direction():
global ul_cids
global dl_cids
for mac_command in mac_commands:
cid=mac_command["cid"]
name=mac_command["name"]
size=mac_command["size"]
direction=mac_command["direction"]
if direction == 0:
ul_cids.append({"cid": cid,"name": name,"size": size})
elif direction == 1:
dl_cids.append({"cid": cid,"name": name,"size": size})
def mac_get_cid(cid,direction=0):
cmds=ul_cids
if direction:
cmds=dl_cids
for cmd in cmds:
if cmd['cid'] == cid:
return cmd['name'], cmd['size'], direction
return None
def NewChannelReq(content):
chIndex=content[0]
chFreq=(content[1]<<0)+(content[2]<<8)+(content[3]<<16)
drRange=content[4]
drMin=drRange & 0x0f
drMax=(drRange >> 4) & 0x0f
return {
"ChIndex": chIndex,
"Frequency": 100*chFreq,
"DrRange.MaxDR": drMax,
"DrRange.MinDR": drMin
}
def PingSlotChannelReq(content):
status=content[0]
chFreq=(content[1]<<0)+(content[2]<<8)+(content[3]<<16)
DataRate=status&0x0f
return {
"DataRate": DataRate,
"Frequency": 100*chFreq
}
def PingSlotChannelAns(content):
status=content[0]
FrequencyOK=status&0x01
DataRateOK=(status >> 1)&0x01
return {
"FrequencyOK": FrequencyOK,
"DataRateOK": DataRateOK
}
def BeaconTimingAns(content):
Channel=content[0]
Delay=(content[1]<<0)+(content[2]<<8)
DelayDecoded=30*Delay
return {
"Channel": Channel,
"Delay": Delay,
"DelayDecoded": DelayDecoded
}
def BeaconFreqReq(content):
chFreq=(content[0]<<0)+(content[1]<<8)+(content[2]<<16)
return {
"Frequency": 100*chFreq
}
def DlChannelReq(content):
chIndex=content[0]
chFreq=(content[1]<<0)+(content[2]<<8)+(content[3]<<16)
return {
"ChIndex": chIndex,
"Frequency": 100*chFreq
}
def RXParamSetupReq(content):
dlSettings=content[0]
chFreq=(content[1]<<0)+(content[2]<<8)+(content[3]<<16)
RX2DataRate=dlSettings & 0x0f
RX1DRoffset=(dlSettings >> 4) & 0x0f
return {
"RX1DRoffset": RX1DRoffset,
"RX2DataRate": RX2DataRate,
"Frequency": 100*chFreq
}
def RXTimingSetupReq(content):
status=content[0]
delay=status&0x0f
delayDec=timing_setuq_req_delay[delay]
return {
"Delay": delay,
"DelayDec": delayDec
}
def LinkCheckAns(content):
margin=content[0]
gwcnt=content[1]
return {
"Margin": margin,
"GwCnt": gwcnt
}
def LinkADRReq(content):
status=content[0]
chmask=content[1]+(content[2] << 8)
redundancy=content[3]
txpower=status & 0x0f
DataRate=(status >> 4) & 0x0f
return {
"TxPower": txpower,
"DataRate": DataRate,
"ChMask": f"{chmask:04X}",
"Redundancy": redundancy
}
def LinkADRAns(content):
status=content[0]
ChannelMaskACK=status & 0x01
DataRateACK=(status >> 1) & 0x01
PowerACK=(status >> 2) & 0x01
return {
"ChannelMaskACK": ChannelMaskACK,
"DataRateACK": DataRateACK,
"PowerACK": PowerACK
}
def DlChannelAns(content):
status=content[0]
ChannelFrequencyOK=status & 0x01
UplinkFrequencyExists=(status >> 1) & 0x01
return {
"ChannelFrequencyOK": ChannelFrequencyOK,
"UplinkFrequencyExists": UplinkFrequencyExists
}
def NewChannelAns(content):
status=content[0]
ChannelFrequencyOK=status & 0x01
DataRateRangeOK=(status >> 1) & 0x01
return {
"ChannelFrequencyOK": ChannelFrequencyOK,
"DataRateRangeOK": DataRateRangeOK
}
def RXParamSetupAns(content):
status=content[0]
ChannelACK=status & 0x01
RX2DataRateACK=(status >> 1) & 0x01
RX1DROffsetACK=(status >> 2) & 0x01
return {
"ChannelACK": ChannelACK,
"RX2DataRateACK": RX2DataRateACK,
"RX1DROffsetACK": RX1DROffsetACK
}
def TxParamSetupReq(content):
status=content[0]
MaxEIRP=status & 0x0f
MaxEIRPString=max_eirp_decoded[MaxEIRP]
UplinkDwellTime=(status >> 4) & 0x01
DownlinkDwellTime=(status >> 5) & 0x01
return {
"DownlinkDwellTime": DownlinkDwellTime,
"UplinkDwellTime": UplinkDwellTime,
"MaxEIRP": MaxEIRP,
"MaxEIRPdBm": f"{MaxEIRPString}dBm"
}
def RekeyConf(content):
status=content[0]
lorawan_ver=status & 0x0f
return {
"LoRaWanVersion": lorawan_ver
}
def RekeyInd(content):
status=content[0]
lorawan_ver=status & 0x0f
return {
"LoRaWanVersion": lorawan_ver
}
def DutyCycleReq(content):
status=content[0]
MaxDutyCycle=status & 0x0f
return {
"MaxDutyCycle": MaxDutyCycle
}
def ADRParamSetupReq(content):
status=content[0]
DelayExp=status & 0x0f
LimitExp=(status >> 4) & 0x0f
AdrAckLimit=1 << LimitExp
AdrAckDelay=1 << DelayExp
return {
"DelayExp": DelayExp,
"AdrAckDelay": AdrAckDelay,
"LimitExp": LimitExp,
"AdrAckLimit": AdrAckLimit
}
def PingSlotInfoReq(content):
status=content[0]
Periodicity=status & 0x07
return {
"Periodicity": Periodicity
}
def ForceRejoinReq(content):
status=content[0]
DataRate=status & 0x0f
RejoinType=(status>>4) & 0x07
status=content[1]
MaxRetries=status & 0x07
MaxRetriesDecoded=MaxRetries+1
Period=(status >> 3) & 0x07
return {
"DataRate": DataRate,
"RejoinType": RejoinType,
"MaxRetries": MaxRetries,
"MaxRetriesDecoded": MaxRetriesDecoded,
"Period": Period
}
def RejoinParamSetupReq(content):
status=content[0]
MaxCountN=status & 0x0f
MaxTimeN=(status>>4) & 0x0f
MaxCountNDecoded=1<<MaxCountN
MaxTimeNDecoded=1<<(MaxTimeN+10)
return {
"MaxCountN": MaxCountN,
"MaxCountNDecoded": MaxCountNDecoded,
"MaxTimeN": MaxTimeN,
"MaxTimeNDecoded": MaxTimeNDecoded
}
def RejoinParamSetupAns(content):
status=content[0]
TimeOK=status & 0x01
return {
"TimeOK": TimeOK
}
def DevStatusAns(content):
battery=content[0]
radiostatus=content[1]
if battery == 0:
battery="ExternalPower"
elif battery == 255:
battery="ErrorMeasuring"
radiostatus=(radiostatus&0x3f)
if radiostatus > 0x1f:
radiostatus-=64
return {
"Battery": battery,
"RadioStatus": radiostatus
}
def DeviceModeInd(content):
DeviceClass=content[0]
DeviceClassDecoded=device_mode_class[DeviceClass]
return {
"DeviceClass": DeviceClass,
"DeviceClassDecoded": DeviceClassDecoded
}
def DeviceModeConf(content):
DeviceClass=content[0]
DeviceClassDecoded=device_mode_class[DeviceClass]
return {
"DeviceClass": DeviceClass,
"DeviceClassDecoded": DeviceClassDecoded
}
def DeviceTimeAns(content):
gpsepoch=(content[0])+(content[1]<<8)+(content[2]<<16)+(content[3]<<24)
frac=0.00390625*float(content[4])
#55A66DF5
return {
"Epoch": gpsepoch,
"FractionalSeconds": frac
}
def mac_decode(direction, payload_hex):
from binascii import unhexlify
payload = bytearray(unhexlify(payload_hex))
i = 0
result = []
while i < len(payload):
cid = payload[i]
cmd = mac_get_cid(cid,direction)
if not cmd:
result.append({
"cid": f"0x{cid:02X}",
"name": "Unknown",
"raw": payload[i+1:].hex()
})
break
name, length, allowed_dir = cmd
data = payload[i+1:i+1+length]
decoded = {"cid": f"0x{cid:02X}", "name": name, "raw": data.hex(), "length": length}
if name=="NewChannelReq":
decoded["data"]=NewChannelReq(data)
elif name=="NewChannelAns":
decoded["data"]=NewChannelAns(data)
elif name=="DevStatusAns":
decoded["data"]=DevStatusAns(data)
elif name=="LinkCheckAns":
decoded["data"]=LinkCheckAns(data)
elif name=="LinkADRReq":
decoded["data"]=LinkADRReq(data)
elif name=="LinkADRAns":
decoded["data"]=LinkADRAns(data)
elif name=="DutyCycleReq":
decoded["data"]=DutyCycleReq(data)
elif name=="RXParamSetupAns":
decoded["data"]=RXParamSetupAns(data)
elif name=="RXParamSetupReq":
decoded["data"]=RXParamSetupReq(data)
elif name=="DlChannelReq":
decoded["data"]=DlChannelReq(data)
elif name=="DlChannelAns":
decoded["data"]=DlChannelAns(data)
elif name=="TxParamSetupReq":
decoded["data"]=TxParamSetupReq(data)
elif name=="RXTimingSetupReq":
decoded["data"]=RXTimingSetupReq(data)
elif name=="DeviceTimeAns":
decoded["data"]=DeviceTimeAns(data)
elif name=="RekeyConf":
decoded["data"]=RekeyConf(data)
elif name=="RekeyInd":
decoded["data"]=RekeyInd(data)
elif name=="ForceRejoinReq":
decoded["data"]=ForceRejoinReq(data)
elif name=="RejoinParamSetupReq":
decoded["data"]=RejoinParamSetupReq(data)
elif name=="PingSlotInfoReq":
decoded["data"]=PingSlotInfoReq(data)
elif name=="PingSlotChannelReq":
decoded["data"]=PingSlotChannelReq(data)
elif name=="PingSlotChannelAns":
decoded["data"]=PingSlotChannelAns(data)
elif name=="BeaconTimingAns":
decoded["data"]=BeaconTimingAns(data)
elif name=="BeaconFreqReq":
decoded["data"]=BeaconFreqReq(data)
elif name=="DeviceModeInd":
decoded["data"]=DeviceModeInd(data)
elif name=="DeviceModeConf":
decoded["data"]=DeviceModeConf(data)
elif length!=0:
decoded["data"]={"warning": "Not decoded currently"}
result.append(decoded)
i += 1 + length
return result
def mac_print(direction,payload,cmds):
if not direction:
direction="UL"
else:
direction="DL"
print(f"{direction} : {payload}")
for cmd in cmds:
print(f" CID: {cmd['cid']} : Name: {cmd['name']}")
if cmd['length']:
print(" Raw:", cmd["raw"])
if "data" in cmd:
print(" Data:")
for k, v in cmd["data"].items():
print(" ", k + ":", v)
print("--------------")
if __name__ == "__main__":
mac_direction()
import argparse
parser = argparse.ArgumentParser(description="Parse LoRaWAN MAC Commands")
parser.add_argument('-d', '--data', required=True, help='MAC payload in hex string')
parser.add_argument('--direction', help='uplink: 0, downlink: 1')
args = parser.parse_args()
# ~ mac_content="0707b0d68c5006"
# ~ direction=1
# ~ cmds=mac_decode(direction, mac_content)
# ~ mac_print(direction,mac_content,cmds)
# ~ mac_content="06f01f"
# ~ direction=0
# ~ cmds=mac_decode(direction, mac_content)
# ~ mac_print(direction,mac_content,cmds)
# ~ mac_content="021f03"
# ~ direction=1
# ~ cmds=mac_decode(direction, mac_content)
# ~ mac_print(direction,mac_content,cmds)
# ~ mac_content="0302ffff05"
# ~ direction=1
# ~ cmds=mac_decode(direction, mac_content)
# ~ mac_print(direction,mac_content,cmds)
# ~ mac_content="0707b0d68c5006021f030302ffff05"
# ~ direction=1
# ~ cmds=mac_decode(direction, mac_content)
# ~ mac_print(direction,mac_content,cmds)
# ~ mac_content="0dcb9e9555ef0502805b8d0707b0d68c50"
# ~ direction=1
# ~ cmds=mac_decode(direction, mac_content)
# ~ mac_print(direction,mac_content,cmds)
# ~ mac_content="0a00d0248d0a01a02c8d0a0270348d0a03403c8d"
# ~ direction=1
# ~ cmds=mac_decode(direction, mac_content)
# ~ mac_print(direction,mac_content,cmds)
# ~ mac_content="0a030a030a030a03"
# ~ direction=0
# ~ cmds=mac_decode(direction, mac_content)
# ~ mac_print(direction,mac_content,cmds)
# ~ mac_content="05070703"
# ~ direction=0
# ~ cmds=mac_decode(direction, mac_content)
# ~ mac_print(direction,mac_content,cmds)
# ~ mac_content="0703"
# ~ direction=0
# ~ cmds=mac_decode(direction, mac_content)
# ~ mac_print(direction,mac_content,cmds)
mac_content=args.data
direction=int(args.direction)
cmds=mac_decode(direction, mac_content)
mac_print(direction,mac_content,cmds)