#!/usr/bin/python from __future__ import print_function import dbus import dbus.exceptions import dbus.mainloop.glib import dbus.service import array import binascii import argparse try: from gi.repository import GObject # python3 except ImportError: import gobject as GObject # python2 from random import randint mainloop = None BLUEZ_SERVICE_NAME = 'org.bluez' LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1' DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1' class InvalidArgsException(dbus.exceptions.DBusException): _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs' class NotSupportedException(dbus.exceptions.DBusException): _dbus_error_name = 'org.bluez.Error.NotSupported' class NotPermittedException(dbus.exceptions.DBusException): _dbus_error_name = 'org.bluez.Error.NotPermitted' class InvalidValueLengthException(dbus.exceptions.DBusException): _dbus_error_name = 'org.bluez.Error.InvalidValueLength' class FailedException(dbus.exceptions.DBusException): _dbus_error_name = 'org.bluez.Error.Failed' class Advertisement(dbus.service.Object): PATH_BASE = '/org/bluez/example/advertisement' def __init__(self, bus, index, advertising_type): self.path = self.PATH_BASE + str(index) self.bus = bus self.ad_type = advertising_type self.service_uuids = None self.manufacturer_data = None self.solicit_uuids = None self.service_data = None self.local_name = None self.include_tx_power = None self.data = None dbus.service.Object.__init__(self, bus, self.path) def get_properties(self): properties = dict() properties['Type'] = self.ad_type if self.service_uuids is not None: properties['ServiceUUIDs'] = dbus.Array(self.service_uuids, signature='s') if self.solicit_uuids is not None: properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids, signature='s') if self.manufacturer_data is not None: properties['ManufacturerData'] = dbus.Dictionary( self.manufacturer_data, signature='qv') if self.service_data is not None: properties['ServiceData'] = dbus.Dictionary(self.service_data, signature='sv') if self.local_name is not None: properties['LocalName'] = dbus.String(self.local_name) if self.include_tx_power is not None: properties['IncludeTxPower'] = dbus.Boolean(self.include_tx_power) if self.data is not None: properties['Data'] = dbus.Dictionary( self.data, signature='yv') return {LE_ADVERTISEMENT_IFACE: properties} def get_path(self): return dbus.ObjectPath(self.path) def add_service_uuid(self, uuid): if not self.service_uuids: self.service_uuids = [] self.service_uuids.append(uuid) def add_solicit_uuid(self, uuid): if not self.solicit_uuids: self.solicit_uuids = [] self.solicit_uuids.append(uuid) def add_manufacturer_data(self, manuf_code, data): if not self.manufacturer_data: self.manufacturer_data = dbus.Dictionary({}, signature='qv') self.manufacturer_data[manuf_code] = dbus.Array(data, signature='y') def add_service_data(self, uuid, data): if not self.service_data: self.service_data = dbus.Dictionary({}, signature='sv') self.service_data[uuid] = dbus.Array(data, signature='y') def add_local_name(self, name): if not self.local_name: self.local_name = "" self.local_name = dbus.String(name) def add_data(self, ad_type, data): if not self.data: self.data = dbus.Dictionary({}, signature='yv') self.data[ad_type] = dbus.Array(data, signature='y') @dbus.service.method(DBUS_PROP_IFACE, in_signature='s', out_signature='a{sv}') def GetAll(self, interface): print('GetAll') if interface != LE_ADVERTISEMENT_IFACE: raise InvalidArgsException() print('returning props') return self.get_properties()[LE_ADVERTISEMENT_IFACE] @dbus.service.method(LE_ADVERTISEMENT_IFACE, in_signature='', out_signature='') def Release(self): print('%s: Released!' % self.path) class TestAdvertisement(Advertisement): def __init__(self, bus, index, service_uuid, payload): Advertisement.__init__(self, bus, index, 'peripheral') self.add_service_uuid(service_uuid) self.add_service_data(service_uuid, payload) def register_ad_cb(): print('Advertisement registered') def register_ad_error_cb(error): print('Failed to register advertisement: ' + str(error)) mainloop.quit() def find_adapter(bus): remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE) objects = remote_om.GetManagedObjects() for o, props in objects.items(): if LE_ADVERTISING_MANAGER_IFACE in props: return o return None def main(): global mainloop parser = argparse.ArgumentParser(description='Fake BLE Beacon for Trackers') parser.add_argument('-t', dest='X', type=int, required=True, help='type of beacon (from 1 to 5)') parser.add_argument('-i', dest='ID', type=str, required=True, help='beacon ID in hex without 0x prefix') args = parser.parse_args() dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() adapter = find_adapter(bus) if not adapter: print('LEAdvertisingManager1 interface not found') return adapter_props = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter), "org.freedesktop.DBus.Properties"); adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1)) ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter), LE_ADVERTISING_MANAGER_IFACE) #test_advertisement = TestAdvertisement(bus, 0) beacon_service_uuid = 'feaa' beacon_namespace = [0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99] beacon_instance = [0x54, 0x43, 0x4C] #beacon_type = [ 0x05 ] #beacon_id = [ 0xab, 0xcd ] beacon_type = [] beacon_type.append(args.X) beacon_id = bytearray.fromhex(args.ID) payload = [ 0x00, 0xE8 ] payload.extend(beacon_namespace) payload.extend(beacon_instance) payload.extend(beacon_type) payload.extend(beacon_id) print("UUID : ", beacon_service_uuid) print("Namespace : ", binascii.hexlify(bytearray(beacon_namespace))) print("Instance : ", binascii.hexlify(bytearray(beacon_instance))) print("Type : ", binascii.hexlify(bytearray(beacon_type))) print("ID : ", binascii.hexlify(bytearray(beacon_id))) test_advertisement = TestAdvertisement(bus, 0, beacon_service_uuid, payload) mainloop = GObject.MainLoop() ad_manager.RegisterAdvertisement(test_advertisement.get_path(), {}, reply_handler=register_ad_cb, error_handler=register_ad_error_cb) mainloop.run() if __name__ == '__main__': main()