lora-decoder / lora-decode.py /
toto@bin.kawi.fr first commit
39e89d1 4 months ago
1 contributor
223 lines | 6.711kb
#!/usr/bin/env python3
import json
import argparse
from binascii import unhexlify
from Crypto.Cipher import AES  # Requires pycryptodome
import maccommands

# JoinRequest: ./lora-decode.py -d 0013000001005f63204e000009015f63209df0b5ab08ba
# JoinRequest: ./lora-decode.py -d 0013000001005f63204e000009015f6320f94b7f8d2805
# JoinAccept: ./lora-decode.py -d 20100000020000503802040201a0af8c70b78c40bf8c10c78ce0ce8c00cf671b6d --clear (unciphered)
# JoinAccept: ./lora-decode.py -d 20c74d265d1cc3f3a96f3d2adb2d731ef6fbecf6a854b845f98531706f06b064c6 (ciphered)
#20110000020000503802040201a0af8c70b78c40bf8c10c78ce0ce8c0070cbe839
#207abde18dd2a63bf98b87d654899c5091368b23be326e4a66b9822fab06b064c6
# Uplink: ./lora-decode.py -d 4050380204820200070311f0ed825f95378b6434a6c6e633bcd01a91e4198fe4c32dfaf740d57bca3cae472bc9c40b9b0b

def decode_mhdr(mhdr_byte):
  mtype = (mhdr_byte >> 5) & 0x07
  mtype_map = {
    0: "Join Request",
    1: "Join Accept",
    2: "Unconfirmed Data Up",
    3: "Unconfirmed Data Down",
    4: "Confirmed Data Up",
    5: "Confirmed Data Down",
    6: "RFU (Reserved)",
    7: "Proprietary"
  }
  major = mhdr_byte & 0x03
  return {
    'MHDR Byte': f"{mhdr_byte:#02x}",
    'MType': mtype_map.get(mtype, "Unknown"),
    'MTypeValue': mtype,
    'Major': f"{major} (LoRaWAN v1.0)" if major == 0 else f"{major} (Unknown)"
  }

def parse_join_request(payload):
  if len(payload) != 23:
    raise ValueError("Invalid Join Request length (expecting 23 bytes including MIC)")

  app_eui = payload[1:9][::-1].hex()
  dev_eui = payload[9:17][::-1].hex()
  dev_nonce = payload[17:19][::-1].hex()
  mic = payload[19:23].hex()

  return {
    'AppEUI': app_eui,
    'DevEUI': dev_eui,
    'DevNonce': dev_nonce,
    'MIC': mic
  }

def decrypt_join_accept(payload, appkey_hex):
  if len(appkey_hex) != 32:
    raise ValueError("AppKey must be 16 bytes (32 hex characters)")
  appkey = unhexlify(appkey_hex)
  cipher = AES.new(appkey, AES.MODE_ECB)

  encrypted = payload[1:-4]

  if len(encrypted) % 16 != 0:
    raise ValueError(f"Encrypted Join Accept length {len(encrypted)} is not a multiple of 16")

  decrypted = cipher.decrypt(encrypted)
  return bytearray([payload[0]]) + decrypted + payload[-4:]



def parse_join_accept(payload):
  data = payload[1:-4]
  mic = payload[-4:].hex()

  if len(data) < 12:
    raise ValueError("Join Accept payload too short")

  app_nonce = data[0:3][::-1].hex()
  net_id = data[3:6][::-1].hex()
  dev_addr = data[6:10][::-1].hex()
  dl_settings = data[10]
  rx_delay = data[11]

  cf_list = data[12:].hex() if len(data) > 12 else None

  return {
    'AppNonce': app_nonce,
    'NetID': net_id,
    'DevAddr': dev_addr,
    'DLSettings': f"{dl_settings:#02x}",
    'RxDelay': rx_delay,
    'CFList': cf_list,
    'MIC': mic
  }

def parse_fopts(fopts_bytes):
  # ~ print(fopts_bytes)
  # ~ mac_cmds = {
    # ~ 0x02: "LinkCheckReq",
    # ~ 0x03: "LinkCheckAns",
    # ~ 0x04: "LinkADRReq",
    # ~ 0x05: "LinkADRAns",
    # ~ 0x06: "DutyCycleReq",
    # ~ 0x07: "DutyCycleAns",
    # ~ 0x08: "RXParamSetupReq",
    # ~ 0x09: "RXParamSetupAns",
    # ~ 0x0A: "DevStatusReq",
    # ~ 0x0B: "DevStatusAns",
    # ~ 0x0C: "NewChannelReq",
    # ~ 0x0D: "NewChannelAns",
    # ~ 0x0E: "RXTimingSetupReq",
    # ~ 0x0F: "RXTimingSetupAns"
  # ~ }

  # ~ decoded = []
  # ~ i = 0
  # ~ while i < len(fopts_bytes):
    # ~ cid = fopts_bytes[i]
    # ~ cmd_name = mac_cmds.get(cid, f"Unknown (CID: {cid:#02x})")
    # ~ decoded.append(cmd_name)
    # ~ i += 1
  decoded=maccommands.mac_decode(0,fopts_bytes.hex())
  return decoded

def parse_data_frame(payload):
  frame = {}
  mac_payload = payload[1:-4]
  mic = payload[-4:].hex()
  frame['MIC'] = mic

  frame['DevAddr'] = mac_payload[0:4][::-1].hex()
  fctrl = mac_payload[4]
  frame['FCtrl'] = fctrl
  fopts_len = fctrl & 0x0F
  frame['FOptsLen'] = fopts_len

  fcnt = int.from_bytes(mac_payload[5:7], byteorder='little')
  frame['FCnt'] = fcnt

  fopts_start = 7
  fopts_end = fopts_start + fopts_len
  fopts_raw = mac_payload[fopts_start:fopts_end]
  frame['FOpts (raw)'] = fopts_raw.hex()
  frame['FOpts (decoded)'] = json.dumps(parse_fopts(fopts_raw),indent=2)

  if fopts_end < len(mac_payload):
    fport = mac_payload[fopts_end]
    frame['FPort'] = fport
    frmpayload_start = fopts_end + 1
    frame['FRMPayload'] = mac_payload[frmpayload_start:].hex()
  else:
    frame['FPort'] = None
    frame['FRMPayload'] = None

  return frame

def parse_lorawan_frame(hex_payload, appkey_hex=None, clear_join=False):
  payload = bytearray(unhexlify(hex_payload))
  if len(payload) < 5:
    raise ValueError("Payload too short")

  mhdr = payload[0]
  mhdr_info = decode_mhdr(mhdr)
  mtype = mhdr_info['MTypeValue']

  result = {'MHDR': mhdr_info}

  if mtype == 0:
    result['Type'] = 'Join Request'
    result['Fields'] = parse_join_request(payload)
  elif mtype == 1:
    result['Type'] = 'Join Accept'
    if clear_join:
      result['Info'] = 'Processing Join Accept as cleartext'
      result['Fields'] = parse_join_accept(payload)
    elif appkey_hex:
      decrypted_payload = decrypt_join_accept(payload, appkey_hex)
      result['Decrypted'] = decrypted_payload.hex()
      result['Fields'] = parse_join_accept(decrypted_payload)
  elif mtype in (2, 3, 4, 5):
    result['Type'] = 'Data Frame'
    result['Fields'] = parse_data_frame(payload)
  else:
    result['Type'] = 'Unknown/Unsupported'
    result['Fields'] = {}

  return result

def main():
  parser = argparse.ArgumentParser(description="Parse a LoRaWAN PHYPayload.")
  parser.add_argument('-d', '--data', required=True, help='LoRaWAN payload in hex string')
  parser.add_argument('--appkey', help='AppKey (hex) to decrypt Join Accept')
  parser.add_argument('--clear', action='store_true',help='Treat Join Accept as already decrypted (cleartext)')
  args = parser.parse_args()
  maccommands.mac_direction()
  try:
    result = parse_lorawan_frame(args.data, args.appkey, args.clear)

    print("MHDR:")
    for k, v in result['MHDR'].items():
      if k != 'MTypeValue':
        print(f"  {k}: {v}")

    print(f"\nFrame Type: {result['Type']}")

    if 'Warning' in result:
      print(f"  Warning: {result['Warning']}")

    if 'Decrypted' in result:
      print(f"  Decrypted Payload: {result['Decrypted']}")
      print(f"  Should be        : 20110000020000503802040201a0af8c70b78c40bf8c10c78ce0ce8c0070cbe839")

    print("Fields:")
    for key, value in result['Fields'].items():
      if isinstance(value, list):
        print(f"  {key}:")
        for item in value:
          print(f"    - {item}")
      else:
        print(f"  {key}: {value}")

  except Exception as e:
    print(f"Error: {e}")

if __name__ == '__main__':
  main()