lora-decoder / maccommands.py /
toto@bin.kawi.fr first commit
39e89d1 4 months ago
1 contributor
498 lines | 14.887kb
#!/usr/bin/python3
import json

max_eirp_decoded = [8, 10, 12, 13, 14, 16, 18, 20, 21, 24, 26, 27, 29, 30, 33, 36]
timing_setuq_req_delay = [ 1,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15]
device_mode_class=["Class-A","RFU","Class-C"]

# Format: CID,Name,size,Direction [0 uplink, 1 downlink, 2 both]
mac_commands = [
  #All Classes
  {"cid": 0x02, "name": "LinkCheckReq",         "size": 0, "direction": 0},#UL
  {"cid": 0x02, "name": "LinkCheckAns",         "size": 2, "direction": 1},#DL
  {"cid": 0x03, "name": "LinkADRReq",           "size": 4, "direction": 1},#DL
  {"cid": 0x03, "name": "LinkADRAns",           "size": 1, "direction": 0},#UL
  {"cid": 0x04, "name": "DutyCycleReq",         "size": 1, "direction": 1},#DL
  {"cid": 0x04, "name": "DutyCycleAns",         "size": 0, "direction": 0},#UL
  {"cid": 0x05, "name": "RXParamSetupReq",      "size": 4, "direction": 1},#DL
  {"cid": 0x05, "name": "RXParamSetupAns",      "size": 1, "direction": 0},#UL
  {"cid": 0x06, "name": "DevStatusReq",         "size": 0, "direction": 1},#DL
  {"cid": 0x06, "name": "DevStatusAns",         "size": 2, "direction": 0},#UL
  {"cid": 0x07, "name": "NewChannelReq",        "size": 5, "direction": 1},#DL
  {"cid": 0x07, "name": "NewChannelAns",        "size": 1, "direction": 0},#UL
  {"cid": 0x08, "name": "RXTimingSetupReq",     "size": 1, "direction": 1},#DL
  {"cid": 0x08, "name": "RXTimingSetupAns",     "size": 0, "direction": 0},#UL
  {"cid": 0x09, "name": "TxParamSetupReq",      "size": 1, "direction": 1},#DL
  {"cid": 0x09, "name": "TxParamSetupAns",      "size": 0, "direction": 0},#UL
  {"cid": 0x0A, "name": "DlChannelReq",         "size": 4, "direction": 1},#DL
  {"cid": 0x0A, "name": "DlChannelAns",         "size": 1, "direction": 0},#UL
  {"cid": 0x0B, "name": "RekeyConf",            "size": 1, "direction": 1},#DL
  {"cid": 0x0B, "name": "RekeyInd",             "size": 1, "direction": 0},#UL
  {"cid": 0x0C, "name": "ADRParamSetupReq",     "size": 1, "direction": 1},#DL
  {"cid": 0x0C, "name": "ADRParamSetupAns",     "size": 0, "direction": 0},#UL 
  {"cid": 0x0D, "name": "DeviceTimeReq",        "size": 0, "direction": 0},#UL
  {"cid": 0x0D, "name": "DeviceTimeAns",        "size": 5, "direction": 1},#DL
  {"cid": 0x0E, "name": "ForceRejoinReq",       "size": 2, "direction": 1},#DL
  {"cid": 0x0F, "name": "RejoinParamSetupReq",  "size": 1, "direction": 1},#DL
  {"cid": 0x0F, "name": "RejoinParamSetupAns",  "size": 1, "direction": 0},#UL
  #Class-B
  {"cid": 0x10, "name": "PingSlotInfoReq",      "size": 1, "direction": 0},#UL
  {"cid": 0x10, "name": "PingSlotInfoAns",      "size": 0, "direction": 1},#DL
  {"cid": 0x11, "name": "PingSlotChannelReq",   "size": 4, "direction": 1},#DL
  {"cid": 0x11, "name": "PingSlotChannelAns",   "size": 1, "direction": 0},#UL
  {"cid": 0x12, "name": "BeaconTimingReq",      "size": 0, "direction": 0},#UL
  {"cid": 0x12, "name": "BeaconTimingAns",      "size": 3, "direction": 0},#DL
  {"cid": 0x13, "name": "BeaconFreqReq",        "size": 3, "direction": 1},#DL
  {"cid": 0x13, "name": "BeaconFreqAns",        "size": 0, "direction": 0},#UL
  #Class-C                                      
  {"cid": 0x20, "name": "DeviceModeConf",       "size": 1, "direction": 1},#DL
  {"cid": 0x20, "name": "DeviceModeInd",        "size": 1, "direction": 0},#UL
]
ul_cids=[]
dl_cids=[]

def mac_direction():
  global ul_cids
  global dl_cids
  for mac_command in mac_commands:
    cid=mac_command["cid"]
    name=mac_command["name"]
    size=mac_command["size"]
    direction=mac_command["direction"]
    if direction == 0:
      ul_cids.append({"cid": cid,"name": name,"size": size})
    elif direction == 1:
      dl_cids.append({"cid": cid,"name": name,"size": size}) 
       
def mac_get_cid(cid,direction=0):
  cmds=ul_cids
  if direction:
    cmds=dl_cids
  for cmd in cmds:
    if cmd['cid'] == cid:
      return cmd['name'], cmd['size'], direction
  return None

def NewChannelReq(content):
  chIndex=content[0]
  chFreq=(content[1]<<0)+(content[2]<<8)+(content[3]<<16)
  drRange=content[4]
  drMin=drRange & 0x0f
  drMax=(drRange >> 4) & 0x0f
  return {
    "ChIndex": chIndex,
    "Frequency": 100*chFreq,
    "DrRange.MaxDR": drMax,
    "DrRange.MinDR": drMin
  }
  
def PingSlotChannelReq(content):
  status=content[0]
  chFreq=(content[1]<<0)+(content[2]<<8)+(content[3]<<16)
  DataRate=status&0x0f
  return {
    "DataRate": DataRate,
    "Frequency": 100*chFreq
  }
  
def PingSlotChannelAns(content):
  status=content[0]
  FrequencyOK=status&0x01
  DataRateOK=(status >> 1)&0x01
  return {
    "FrequencyOK": FrequencyOK,
    "DataRateOK": DataRateOK
  }
  
def BeaconTimingAns(content):
  Channel=content[0]
  Delay=(content[1]<<0)+(content[2]<<8)
  DelayDecoded=30*Delay
  return {
    "Channel": Channel,
    "Delay": Delay,
    "DelayDecoded": DelayDecoded
  }
  
def BeaconFreqReq(content):
  chFreq=(content[0]<<0)+(content[1]<<8)+(content[2]<<16)
  return {
    "Frequency": 100*chFreq
  }
  
def DlChannelReq(content):
  chIndex=content[0]
  chFreq=(content[1]<<0)+(content[2]<<8)+(content[3]<<16)
  return {
    "ChIndex": chIndex,
    "Frequency": 100*chFreq
  }

def RXParamSetupReq(content):
  dlSettings=content[0]
  chFreq=(content[1]<<0)+(content[2]<<8)+(content[3]<<16)
  RX2DataRate=dlSettings & 0x0f
  RX1DRoffset=(dlSettings >> 4) & 0x0f
  return {
    "RX1DRoffset": RX1DRoffset,
    "RX2DataRate": RX2DataRate,
    "Frequency": 100*chFreq
  }
  
def RXTimingSetupReq(content):
  status=content[0]
  delay=status&0x0f
  delayDec=timing_setuq_req_delay[delay]
  return {
    "Delay": delay,
    "DelayDec": delayDec
  }
  
def LinkCheckAns(content):
  margin=content[0]
  gwcnt=content[1]
  return {
    "Margin": margin,
    "GwCnt": gwcnt
  }
  
def LinkADRReq(content):
  status=content[0]
  chmask=content[1]+(content[2] << 8)
  redundancy=content[3]
  txpower=status & 0x0f
  DataRate=(status >> 4) & 0x0f
  return {
    "TxPower": txpower,
    "DataRate": DataRate,
    "ChMask": f"{chmask:04X}",
    "Redundancy": redundancy
  }
  
def LinkADRAns(content):
  status=content[0]
  ChannelMaskACK=status & 0x01
  DataRateACK=(status >> 1) & 0x01
  PowerACK=(status >> 2) & 0x01
  return {
    "ChannelMaskACK": ChannelMaskACK,
    "DataRateACK": DataRateACK,
    "PowerACK": PowerACK
  }
  
def DlChannelAns(content):
  status=content[0]
  ChannelFrequencyOK=status & 0x01
  UplinkFrequencyExists=(status >> 1) & 0x01
  return {
    "ChannelFrequencyOK": ChannelFrequencyOK,
    "UplinkFrequencyExists": UplinkFrequencyExists
  }
  
def NewChannelAns(content):
  status=content[0]
  ChannelFrequencyOK=status & 0x01
  DataRateRangeOK=(status >> 1) & 0x01
  return {
    "ChannelFrequencyOK": ChannelFrequencyOK,
    "DataRateRangeOK": DataRateRangeOK
  }
  
def RXParamSetupAns(content):
  status=content[0]
  ChannelACK=status & 0x01
  RX2DataRateACK=(status >> 1) & 0x01
  RX1DROffsetACK=(status >> 2) & 0x01
  return {
    "ChannelACK": ChannelACK,
    "RX2DataRateACK": RX2DataRateACK,
    "RX1DROffsetACK": RX1DROffsetACK
  }
  
def TxParamSetupReq(content):
  status=content[0]
  MaxEIRP=status & 0x0f
  MaxEIRPString=max_eirp_decoded[MaxEIRP]
  UplinkDwellTime=(status >> 4) & 0x01
  DownlinkDwellTime=(status >> 5) & 0x01
  return {
    "DownlinkDwellTime": DownlinkDwellTime,
    "UplinkDwellTime": UplinkDwellTime,
    "MaxEIRP": MaxEIRP,
    "MaxEIRPdBm": f"{MaxEIRPString}dBm"
  }
  
def RekeyConf(content):
  status=content[0]
  lorawan_ver=status & 0x0f
  return {
    "LoRaWanVersion": lorawan_ver
  }
def RekeyInd(content):
  status=content[0]
  lorawan_ver=status & 0x0f
  return {
    "LoRaWanVersion": lorawan_ver
  }
  
def DutyCycleReq(content):
  status=content[0]
  MaxDutyCycle=status & 0x0f
  return {
    "MaxDutyCycle": MaxDutyCycle
  }
def ADRParamSetupReq(content):
  status=content[0]
  DelayExp=status & 0x0f
  LimitExp=(status >> 4) & 0x0f
  AdrAckLimit=1 << LimitExp
  AdrAckDelay=1 << DelayExp
  return {
    "DelayExp": DelayExp,
    "AdrAckDelay": AdrAckDelay,
    "LimitExp": LimitExp,
    "AdrAckLimit": AdrAckLimit
  }
  
def PingSlotInfoReq(content):
  status=content[0]
  Periodicity=status & 0x07
  return {
    "Periodicity": Periodicity
  }
  
def ForceRejoinReq(content):
  status=content[0]
  DataRate=status & 0x0f
  RejoinType=(status>>4) & 0x07
  status=content[1]
  MaxRetries=status & 0x07
  MaxRetriesDecoded=MaxRetries+1
  Period=(status >> 3) & 0x07
  return {
    "DataRate": DataRate,
    "RejoinType": RejoinType,
    "MaxRetries": MaxRetries,
    "MaxRetriesDecoded": MaxRetriesDecoded,
    "Period": Period
  }
  
def RejoinParamSetupReq(content):
  status=content[0]
  MaxCountN=status & 0x0f
  MaxTimeN=(status>>4) & 0x0f
  MaxCountNDecoded=1<<MaxCountN
  MaxTimeNDecoded=1<<(MaxTimeN+10)
  return {
    "MaxCountN": MaxCountN,
    "MaxCountNDecoded": MaxCountNDecoded,
    "MaxTimeN": MaxTimeN,
    "MaxTimeNDecoded": MaxTimeNDecoded
  }
  
def RejoinParamSetupAns(content):
  status=content[0]
  TimeOK=status & 0x01
  return {
    "TimeOK": TimeOK
  }
  
def DevStatusAns(content):
  battery=content[0]
  radiostatus=content[1]
  if battery == 0:
    battery="ExternalPower"
  elif battery == 255:
    battery="ErrorMeasuring"
  radiostatus=(radiostatus&0x3f)
  if radiostatus > 0x1f:
    radiostatus-=64
  return {
    "Battery": battery,
    "RadioStatus": radiostatus
  }
  
def DeviceModeInd(content):
  DeviceClass=content[0]
  DeviceClassDecoded=device_mode_class[DeviceClass]
  return {
    "DeviceClass": DeviceClass,
    "DeviceClassDecoded": DeviceClassDecoded
  }
  
def DeviceModeConf(content):
  DeviceClass=content[0]
  DeviceClassDecoded=device_mode_class[DeviceClass]
  return {
    "DeviceClass": DeviceClass,
    "DeviceClassDecoded": DeviceClassDecoded
  }
  
def DeviceTimeAns(content):
  gpsepoch=(content[0])+(content[1]<<8)+(content[2]<<16)+(content[3]<<24)
  frac=0.00390625*float(content[4])
  #55A66DF5
  
  return {
    "Epoch": gpsepoch,
    "FractionalSeconds": frac
  }
  
def mac_decode(direction, payload_hex):
  from binascii import unhexlify
  payload = bytearray(unhexlify(payload_hex))
  i = 0
  result = []

  while i < len(payload):
    cid = payload[i]
    cmd = mac_get_cid(cid,direction)
    if not cmd:
      result.append({
        "cid": f"0x{cid:02X}",
        "name": "Unknown",
        "raw": payload[i+1:].hex()
      })
      break

    name, length, allowed_dir = cmd
    data = payload[i+1:i+1+length]
    decoded = {"cid": f"0x{cid:02X}", "name": name, "raw": data.hex(), "length": length}
      
    if name=="NewChannelReq":
      decoded["data"]=NewChannelReq(data)
    elif name=="NewChannelAns":
      decoded["data"]=NewChannelAns(data)
    elif name=="DevStatusAns":
      decoded["data"]=DevStatusAns(data)
    elif name=="LinkCheckAns":
      decoded["data"]=LinkCheckAns(data)
    elif name=="LinkADRReq":
      decoded["data"]=LinkADRReq(data)
    elif name=="LinkADRAns":
      decoded["data"]=LinkADRAns(data)
    elif name=="DutyCycleReq":
      decoded["data"]=DutyCycleReq(data)
    elif name=="RXParamSetupAns":
      decoded["data"]=RXParamSetupAns(data)
    elif name=="RXParamSetupReq":
      decoded["data"]=RXParamSetupReq(data)
    elif name=="DlChannelReq":
      decoded["data"]=DlChannelReq(data)
    elif name=="DlChannelAns":
      decoded["data"]=DlChannelAns(data)
    elif name=="TxParamSetupReq":
      decoded["data"]=TxParamSetupReq(data)
    elif name=="RXTimingSetupReq":
      decoded["data"]=RXTimingSetupReq(data)
    elif name=="DeviceTimeAns":
      decoded["data"]=DeviceTimeAns(data)
    elif name=="RekeyConf":
      decoded["data"]=RekeyConf(data)
    elif name=="RekeyInd":
      decoded["data"]=RekeyInd(data)
    elif name=="ForceRejoinReq":
      decoded["data"]=ForceRejoinReq(data)
    elif name=="RejoinParamSetupReq":
      decoded["data"]=RejoinParamSetupReq(data)
    elif name=="PingSlotInfoReq":
      decoded["data"]=PingSlotInfoReq(data)
    elif name=="PingSlotChannelReq":
      decoded["data"]=PingSlotChannelReq(data)
    elif name=="PingSlotChannelAns":
      decoded["data"]=PingSlotChannelAns(data)
    elif name=="BeaconTimingAns":
      decoded["data"]=BeaconTimingAns(data)
    elif name=="BeaconFreqReq":
      decoded["data"]=BeaconFreqReq(data)
    elif name=="DeviceModeInd":
      decoded["data"]=DeviceModeInd(data)
    elif name=="DeviceModeConf":
      decoded["data"]=DeviceModeConf(data)
    elif length!=0:
      decoded["data"]={"warning": "Not decoded currently"}

    result.append(decoded)
    i += 1 + length

  return result

def mac_print(direction,payload,cmds):
  if not direction:
    direction="UL"
  else:
    direction="DL"
  print(f"{direction} : {payload}")
  for cmd in cmds:
    print(f"  CID: {cmd['cid']} : Name: {cmd['name']}")
    if cmd['length']:
      print("    Raw:", cmd["raw"])
    if "data" in cmd:
      print("    Data:")
      for k, v in cmd["data"].items():
        print("    ", k + ":", v)
  print("--------------")

if __name__ == "__main__":
  mac_direction()
  import argparse
  parser = argparse.ArgumentParser(description="Parse LoRaWAN MAC Commands")
  parser.add_argument('-d', '--data', required=True, help='MAC payload in hex string')
  parser.add_argument('--direction', help='uplink: 0, downlink: 1')
  args = parser.parse_args()
  # ~ mac_content="0707b0d68c5006"
  # ~ direction=1
  # ~ cmds=mac_decode(direction, mac_content)
  # ~ mac_print(direction,mac_content,cmds)
  
  # ~ mac_content="06f01f"
  # ~ direction=0
  # ~ cmds=mac_decode(direction, mac_content)
  # ~ mac_print(direction,mac_content,cmds)
  
  # ~ mac_content="021f03"
  # ~ direction=1
  # ~ cmds=mac_decode(direction, mac_content)
  # ~ mac_print(direction,mac_content,cmds)
  
  # ~ mac_content="0302ffff05"
  # ~ direction=1
  # ~ cmds=mac_decode(direction, mac_content)
  # ~ mac_print(direction,mac_content,cmds)
  
  # ~ mac_content="0707b0d68c5006021f030302ffff05"
  # ~ direction=1
  # ~ cmds=mac_decode(direction, mac_content)
  # ~ mac_print(direction,mac_content,cmds)
  
  # ~ mac_content="0dcb9e9555ef0502805b8d0707b0d68c50"
  # ~ direction=1
  # ~ cmds=mac_decode(direction, mac_content)
  # ~ mac_print(direction,mac_content,cmds)
  
  # ~ mac_content="0a00d0248d0a01a02c8d0a0270348d0a03403c8d"
  # ~ direction=1
  # ~ cmds=mac_decode(direction, mac_content)
  # ~ mac_print(direction,mac_content,cmds)
  
  # ~ mac_content="0a030a030a030a03"
  # ~ direction=0
  # ~ cmds=mac_decode(direction, mac_content)
  # ~ mac_print(direction,mac_content,cmds)
  
  # ~ mac_content="05070703"
  # ~ direction=0
  # ~ cmds=mac_decode(direction, mac_content)
  # ~ mac_print(direction,mac_content,cmds)
  
  # ~ mac_content="0703"
  # ~ direction=0
  # ~ cmds=mac_decode(direction, mac_content)
  # ~ mac_print(direction,mac_content,cmds)
  
  mac_content=args.data
  direction=int(args.direction)
  cmds=mac_decode(direction, mac_content)
  mac_print(direction,mac_content,cmds)