cb — Bluetooth LE 機器への接続
cb(Core Bluetooth)モジュールを使えば、テキサスインスツルメンツ社製センサータグ(赤外線温度計や加速度計など、多くの種類のセンサーを含む、小型で安価なBluetooth LE機器)のようなBluetooth LE(Low Energy:低消費電力)の機器に接続することができるようになります。
このマニュアルにあげる例では、標準的なBluetooth LE 心拍数モニターや、テキサスインスツルメンツ社製センサータグを取り上げますが、モジュール自体はその他のタイプのBluetooth LE機器にも適用可能です。マイク付きヘッドホンやキーボードなどの、いわゆる「古典的な」Bluetoothについては対応していませんのでご注意下さい。
このモジュールはアップル社が提供しているCoreBluetoothフレームワークを使用していますが、その機能の全てをカバーしている訳ではありません。フレームワークの主たる役割のみ提供します。すなわち、機器への接続は可能ですが、機器そのもののデバイスの制御はサポートしていません。
クイックスタート
Bluetooth LE機器への接続やデータの読込みのための基本的な手順は以下の段階のようになっています。
見つけた機器やデバイス、サービス、サービス特性に通知や呼出しをするためのデリゲートなオブジェクトを定義します。実行できるコールバックの手法については後述します。
scan_for_peripherals()を呼び出してスキャンを開始します。多くの場合、デバイスを見つけられるように操作する必要があります。テキサスインスツルメンツ社製センサータグの場合、本体横のボタンを押す必要があります。
did_discover_peripheralコールバックをデリゲートオブジェクトが受け取ったら、デバイスの名前やUUID(Universally Unique IDentifier:デバイスの識別子)を調べた上で、接続したい周辺機器を見つけた場合にはconnect_peripheral()を呼び出します。接続を維持するために、見つかった周辺機器の名前やUUIDは持ち続けておく必要があります。さもなければ、オブジェクトがガーベージコレクションされた際に接続が遮断されてしまいます。
全てがスムーズに進んだ場合、the did_connect_peripheralコールバックが呼び出されます。この時点で、Peripheral.discover_services()を呼び出すことにより、周辺機器のサービス一覧を見ることができるようになります。
周辺機器のサービス一覧を見つけたら、did_discover_servicesコールバックを受け取ることになります。この段階で周辺機器のサービス属性にアクセスできるようになるので、目的のサービスを実行するためにPeripheral.discover_characteristics()を呼び出します。
もうお分かりかも知れませんが、これはdid_discover_characteristicsコールバックの結果です。この段階で(値をうまく読み込めた他のコールバックの結果として)Peripheral.read_characteristic_value()を呼び出すことで値を読み込んだり、Peripheral.set_notify_value()により通知を受けることが可能になります。
以下のコードは、Bluetooth LE 心拍数モニターにどのように接続するかを示しています。この例は、Polar社のデバイスを対象としていますが、他のブランドの心拍数モニターにも簡単に応用できるでしょう。(did_discover_peripheralメソッドを適宜変更するだけです):
import cb import sound import time import struct from __future__ import print_function class HeartRateManager (object): def __init__(self): self.peripheral = None def did_discover_peripheral(self, p): if p.name and 'Polar' in p.name and not self.peripheral: self.peripheral = p print('Connecting to heart rate monitor...') cb.connect_peripheral(p) def did_connect_peripheral(self, p): print('Connected:', p.name) print('Discovering services...') p.discover_services() def did_fail_to_connect_peripheral(self, p, error): print('Failed to connect: %s' % (error,)) def did_disconnect_peripheral(self, p, error): print('Disconnected, error: %s' % (error,)) self.peripheral = None def did_discover_services(self, p, error): for s in p.services: if s.uuid == '180D': print('Discovered heart rate service, discovering characteristitcs...') p.discover_characteristics(s) def did_discover_characteristics(self, s, error): print('Did discover characteristics...') for c in s.characteristics: if c.uuid == '2A37': self.peripheral.set_notify_value(c, True) def did_update_value(self, c, error): heart_rate = struct.unpack('<B', c.value[1])[0] self.values.append(heart_rate) print('Heart rate: %i' % heart_rate) mngr = HeartRateManager() cb.set_central_delegate(mngr) print('Scanning for peripherals...') cb.scan_for_peripherals() try: while True: pass except KeyboardInterrupt: cb.reset()
次の例はテキサスインスツルメンツ社製センサータグにどのように接続するかを示しています。温度センサーの(先ほどの心拍数モニターの例に良く似ていますが、1秒間に1回)値を記録し、センサータグのどれか2つのボタンが押されたら音を鳴らすというものです:
import cb import sound import struct class MyCentralManagerDelegate (object): def __init__(self): self.peripheral = None def did_discover_peripheral(self, p): print('+++ Discovered peripheral: %s (%s)' % (p.name, p.uuid)) if p.name and 'Sensor Tag' in p.name and not self.peripheral: # Keep a reference to the peripheral, so it doesn't get garbage-collected: self.peripheral = p cb.connect_peripheral(self.peripheral) def did_connect_peripheral(self, p): print('*** Connected: %s' % p.name) print('Discovering services...') p.discover_services() def did_fail_to_connect_peripheral(self, p, error): print('Failed to connect') def did_disconnect_peripheral(self, p, error): print('Disconnected, error: %s' % (error,)) self.peripheral = None def did_discover_services(self, p, error): for s in p.services: if 'AA00' in s.uuid: print('+++ IR Thermometer found') p.discover_characteristics(s) elif 'FFE0' in s.uuid: print('+++ Simple Key Service found') p.discover_characteristics(s) def did_discover_characteristics(self, s, error): if 'AA00' in s.uuid: for c in s.characteristics: if 'AA02' in c.uuid: print('Enabling temperature sensor...') self.peripheral.write_characteristic_value(c, chr(0x01), True) elif 'AA01' in c.uuid: # Enable notification for the temperature sensor: print('Enabling temperature sensor notifications...') self.peripheral.set_notify_value(c, True) elif 'FFE0' in s.uuid: print('Enabling notifications for Simple Key Service...') key_characteristic = s.characteristics[0] self.peripheral.set_notify_value(key_characteristic, True) def did_write_value(self, c, error): # The temperature sensor has been activated (see did_discover_characteristic) print('Did enable temperature sensor') def did_update_value(self, c, error): if 'FFE1' == c.uuid: # A button on the SensorTag was pressed (or released): print('Button value: %s' % c.value.encode('hex')) sound.play_effect('Beep') else: # The temperature sensor has sent an updated value: tobj, mtmpamb = self.convert_temperature(c.value) print('Object temperature: %f -- Ambient: %f' % (tobj, mtmpamb)) def convert_temperature(self, raw_data): # This will convert the raw sensor data to temperature values in Celsius. # The details of this algorithm aren't important for this example, you can # find more information about this in the SensorTag user guide: # http://processors.wiki.ti.com/index.php/SensorTag_User_Guide rawT = struct.unpack('<h', raw_data[:2])[0] tmpAmb = struct.unpack('<H', raw_data[2:])[0] vobj2 = float(rawT) * 0.00000015625 mtmpamb = float(tmpAmb) / 128.0 tdie2 = mtmpamb + 273.15 s0, a1, a2 = 6.4E-14, 1.75E-3, -1.678E-5 b0, b1, b2, c2 = -2.94E-5, -5.7E-7, 4.63E-9, 13.4 Tref = 298.15 S = s0 * (1.0+a1*(tdie2 - Tref) + a2 * pow((tdie2 - Tref), 2)) vos = b0 + b1*(tdie2-Tref) + b2*pow((tdie2-Tref), 2) fobj = (vobj2 - vos) + c2 * pow((vobj2 - vos), 2) tobj = pow(pow(tdie2, 4) + (fobj/S), 0.25) - 273.15 return tobj, mtmpamb delegate = MyCentralManagerDelegate() print('Scanning for peripherals...') cb.set_central_delegate(delegate) cb.scan_for_peripherals() # Keep the connection alive until the 'Stop' button is pressed: try: while True: pass except KeyboardInterrupt: # Disconnect everything: cb.reset()
関数
cb.set_central_delegate(delegate)
機器を見つけた場合や、データを受け取った場合など、イベントのコールバックを受け取るためのオブジェクトを設定します。
引数部分の「delegate」は以下の方法で、その後の各メソッドの引数として使用できます:
class MyDelegate (object): def did_update_state(self): # State was updated (e.g. Bluetooth powered on/off) pass def did_discover_peripheral(self, p): # You would typically check the peripheral's name/uuid here, # and connect via cb.connect_peripheral(p). # You should also keep a reference to p (e.g. self.peripheral = p), # so that it doesn't get garbage-collected. # Note that this may get called multiple times for a single peripheral. pass def did_connect_peripheral(self, p): # You would typically call p.discover_services() here pass def did_fail_to_connect_peripheral(self, p, error): # `error` is a tuple of error code (integer) and description (string) pass def did_disconnect_peripheral(self, p, error): # error is a tuple of error code (integer) and description (string) pass def did_discover_services(self, p, error): # Here you would typically call discover_characteristics # for the services you're interested in. pass def did_discover_characteristics(self, s, error): # You can now read or write the characteristic's value pass def did_write_value(self, c, error): pass def did_update_value(self, c, error): # You can now access the characteristic value with c.value pass
cb.set_verbose(flag)
Trueに設定すると、コールバックイベントのデリゲート(set_central_delegate()を参照) が適用されたか否かを問わず、全てのコールバックイベントがコンソール画面に記録されます。これはデバッグに便利です。
cb.scan_for_peripherals()
アドバタイズされているサービスの周辺機器のスキャンを開始します。
cb.stop_scan()
周辺機器のスキャンを終了します。
cb.connect_peripheral(peripheral)
周辺機器(peripheral)と接続します。一般的にこの関数はdid_discover_peripheralコールバック処理から呼び出されます。
cb.cancel_peripheral_connection(peripheral)
アクティブな、又は中断している周辺機器(peripheral)との接続をキャンセルします。
cb.get_state()
セントラルマネージャーの現在の状態を返します。(どんな値があるかは Constants を参照)
cb.reset()
全ての周辺機器との接続を切断します。この関数は同時にセントラル・デリゲートをNoneに設定してしまうので、全てのコールバックを受け付けなくなります。
クラス
(注意 このモジュールのクラス群は、直接インスタンス化することを意図していません。代わりに、デリゲート実行するコールバック関数へのパラメーターとして異なるクラス群のインスタンスを取得します。(set_central_delegate()を参照))
周辺機器(Peripheral)
class cb.Peripheral
Peripheralクラスは scan_for_peripherals() 経由で見つけた周辺機器デバイスを表すクラスです。周辺機器は、UUID(Universally Unique IDentifiers)で識別されます。周辺機器は一つないし複数のサービス(Serciceオブジェクトで表せます)を実行できます。
周辺機器(Peripheral)メソッド
Peripheral.discover_services()
周辺機器のサービスを見つけます。見つけ終わると、セントラル・デリゲートはdid_discover_servicesコールバックを受け取ります。
Peripheral.discover_characteristics(service)
引数「service」で指定されたサービスの持つ「characteristic」を見つけます。このメソッドは、一般的にはdid_discover_servicesコールバックから呼び出されます。「characteristic」が見つかったら、セントラル・デリゲートはdid_discover_characteristicsコールバックを受け取ります。
Peripheral.set_notify_value(characteristic, flag=True)
引数のcharacteristicに関する通知のオン/オフを設定します。(注 全てのcharacteristicsに対応しているわけではありません)通知がオンの場合、セントラル・デリゲートでdid_update_valueコールバックで受け取ることができます。(set_central_delegate()を参照)
Peripheral.write_characteristic_value(characteristic, data, with_response)
Characteristic の値を書き込みます。引数の data は、1バイトの文字列でなければなりません。引数 with_response によって、書込みがが完了した時(又は失敗した時)にdid_write_valueコールバックが呼び出されるか否かを指定します。応答なしに書き込める値の全てをサポートしている訳ではないことにご注意下さい。
Peripheral.read_characteristic_value(characteristic)
引数characteristicで与えられる値を検索します。読み込まれたことがある値の場合は、セントラル・デリゲートは、characteristicの値の属性とともに値にアクセスできるポイントで、did_update_valueコールバックを受け取ります。全てのcharacteristicについて読込可能な値があるとは限りません。
周辺機器(Peripheral)の属性
Peripheral.manufacturer_data
周辺機器のアドバタイジングデータ(周辺機器が親機に対して発する存在をアピールする信号)の一部である、メーカーを特定するデータです。周辺機器との接続が確立しなくても読むことができます。(一般的には、did_discover_peripheral コールバック中で使用)
Peripheral.name
周辺機器の名称です。(文字列の場合とNoneの場合があります)
Peripheral.uuid
周辺機器のUUIDです。(16進数の文字列です)
Peripheral.state
周辺機器との接続の状態です。(0:未接続、1:接続中、2:接続済)
Peripheral.services
周辺機器から示されたサービスのリストです。(Serviceオブジェクトのリストです)このリストは通常、Peripheral.discover_services()を呼び出すまで(そしてdid_discover_servicesコールバックを受け取るまで)空の状態であることにご注意下さい。
サービス
class cb.Service
サービス(Service)オブジェクトは、周辺機器(Peripheral)のサービス、すなわちデバイス(あるいはその一部)の機能や特徴を実現するためのデータや関連する動作の集合体、を示します。サービス・オブジェクトは最初、あるいは二番目、ないしはCharacteristicクラスの中に含まれます。
サービスの属性
Service.characteristics
サービスのキャラクタリスティックです。(Characteristicオブジェクトのリストで、見つからないうちは空のリストになります)
Service.primary
サービスが1番目か2番目かを示す、ブール型のフラグです。
Service.uuid
サービスのUUIDです。(16進数の文字列)
キャラクタリスティック
class cb.Characteristic
Characteristic オブジェクトは周辺機器のサービスについての詳細情報を示します。一つのキャラクタリスティックは一つの値を持ちます。キャラクタリスティックの属性により、キャラクタリスティックの値がどのように使われるかが決まります。
キャラクタリスティックの属性
Characteristic.uuid
キャラクタリスティックのUUIDです。(16進数の文字列)
Characteristic.value
キャラクタリスティックの現在の値です。(1バイト文字列の場合もあれば、読み込む前や値がない時にはNone値となる場合もあります)
Characteristic.notifying
このキャラクタリスティックの通知を許可するか否かのフラグです。(リードオンリーにする場合は Peripheral.set_notify_value() を使って設定します)
Characteristic.properties
キャラクタリスティックの属性に対するビットマスクです。(後述の定数CH_PROP_*を参照)
定数
get_state() は以下の値のいずれかを返します:
Characteristicの属性です。(注 これらの値はビットマスクとして組合わせることができます。):