始める前に
IoT機器の準備
データ収集のためのIoT機器を準備します。 IoT機器は、通信に使用されるMQTTプロトコルをサポートする必要があり、OpenSSLのようなSSL関連のライブラリの使用が可能で、Java、Python、Node JS(JavaScript)などの言語をサポートしているすべての機器での使用が可能です。
Cloud IoT Core サービスの加入
IoT機器から発生するデータを Cloud IoT Coreで受信するためにサービスに加入します。 (Cloud IoT Core サービスの詳細説明及び料金はポータルで確認できます。)
1.Products & Services > Cloud IoT Coreを選択します。
2.Subscriptionメニューから、画面中央の商品の利用の申請を選択すると、サービス利用の申請が完了します。
メッセージ構造の設計
IoT機器から送信するデータを、どのような形で送信するか、データの構造とトピック(Topic)を定義します。メッセージは、JSON構造を使用して、以下の例を参考にして、メッセージの構造を自由に設計することができます。
トピック: factory/room1/temperature
メッセージ構造:
{
"deviceId": "device_1",
"deviceType": "temperature",
"value": 35,
"battery": 9,
"date": "2016-12-15",
"time": "15:12:00"
}
トピックとデータ構造は、ユーザーの任意の名前と構造で使用することができ、一部システムに影響を与える特殊文字の場合、制限がある場合があります。詳細については、その他 > 制限事項を参考にしてください。
証明書を作成する
IoT機器と Cloud IoT Core サービス間の通信は、証明書を基にした暗号化通信を使用します。 Cloud IoT Coreで発行できる証明書は、証明書、チェーン証明書、ルート証明書、プライベートキーの合計4つに区分されます。その証明書を使用して、ユーザーのIoT機器から接続するサーバーが、通常の Cloud IoT Core サーバーが正しいことを確認し、サーバーでは、正常に認証されたユーザーであることが正しいことを、双方向認証でもって確認することになります。相互間の認証が完了すると、セキュリティのための暗号化通信が行われます。(提供する証明書のうち、プライベートキーは、初期に1回だけダウンロードができるので、保管に注意してください。)
証明書の作成
1. 左側のメニューからCertificatesを選択します。
2. 画面中央の証明書の発行ボタンを選択します。
3. 証明書作成のポップアップで発行ボタンを選択します。
4. 証明書の作成に数分かかることがあります。作成が完了すると、自動で証明書のダウンロード画面に移動します。
5. 各証明書のダウンロードボタンを押して、証明書、チェーン証明書、ルート証明書、プライベートキーを自分のコンピュータに保存します。
参考 作成した証明書の有効、無効、削除などの機能と、証明書を仮想デバイスと接続して管理する方法は、 アドオンページで詳細を確認できます。
IoT機器に証明書をインストール
マイコンピュータに保存された証明書は、IoT機器に移動するための安全な保存媒体を利用します。(IoT機器でWebブラウジングが可能な場合、直接ダウンロードできます。)
リアルタイム処理のためのルールの作成
IoT機器と Cloud IoT Coreとを接続する準備ができたら、次のステップで、IoT機器から受信したデータをどのように処理するかを定義するルールを作成する必要があります。 ルールの作成手順は、IoT機器で Cloud IoT Coreに送信されたメッセージ内容のうち、事前に設定された条件に一致するかどうかを検査する「ルールクエリ」と、条件に合致すれば事前に定義された動作を実行する「アクション」とで構成されています。例えば、「メッセージに温度値が80度以上の場合、ユーザーにPush通知を送信(アクション)」のようなルールを作成することができます。ルールクエリとアクションは、少なくとも1つずつは定義させる必要があり、これによりルールの作成が可能になります。
ルールクエリの作成
IoT機器から送信されたメッセージの内容に、特定のルールクエリの条件を作成します。
1. 左側のメニューからRulesを選択します。
2. 画面中央のルールの作成ボタンを選択します。
3. ルールの作成画面で、ルールの名前(ユーザーが区別できる名前)、ルールの説明(ルールの詳細説明)、ルールクエリ(メッセージを検査できる条件)を入力します。
4. 「クエリの検証」ボタンを選択すると、入力したルールクエリの文法チェックを実行し、適切な場合、下段に、アクションを作成する画面が表示されます。
ルールクエリの作成のルール
ルールクエリを作成するには、SQL構文を理解しなければなりません。また、ルールクエリを作成するルールは、次のとおりです。
SELECT [<トピックAlias> <メッセージJSON Key1>、<トピックAlias> <メッセージJSON Key2>、...]FROM "[トピック]" AS [トピックAlias] WHERE [条件]
上段の[メッセージ構造の設計](#メッセージ-構造の-設計)で定義したメッセージの構造を例に、battery値が10%より少なく、残りの場合を検査したい場合、以下のようにクエリ文を作成します。
SELECT * FROM "factory/room1/temperature" AS t WHERE t.battery < 10
機器を区別できる機器のID値(deviceId)と、バッテリーの値(battery)のように、特定のフィールドのみを抽出する場合は、以下のように抽出するフィールド名を指定して、クエリ文を作成します。
SELECT t.deviceId, t.battery FROM "factory/room1/temperature" AS t WHERE t.battery < 10
参考 ルールクエリの詳細な作成ルールは、[その他]ページのルールクエリの詳細な作成ルールで確認できます。
参考 サポートするルールクエリの組み込み関数は、[アドオン]ページのルールクエリの組み込み関数で確認できます。
アクションの作成
アクションとは、ルールクエリに合致する場合、実行される動作です。現在 Cloud IoT Coreでは、「指定したトピックに再発行」、「Cloud Functionsにデータを転送」の2つの機能をサポートします。今後、さまざまなアクションの設定ができるように、機能が徐々に拡張される予定です。 アクションを作成するには、「クエリの検証」の後に作成が可能で、ルールには、必ず1つ以上のアクションを設定しなければなりません。
指定したトピックに再発行
「指定したトピックに再発行」アクションは、IoT機器で発行されるMQTTメッセージを、別の名前のトピックに再発行するときに使用します。 例えば、IoT機器が発行するメッセージのうち、ユーザーが事前に定義したルールクエリにマッチしたデータが検出された場合、alertというトピックでメッセージを再発行することができます。(alertの再発行メッセージを受信した機器では、ビープ音を鳴らすなど、フォローアップの操作が可能です。
1. アクションのドロップダウンメニューから「指定したトピックに再発行」を選択した後、追加ボタンを押します。
2. アクション作成のポップアップの再発行トピック項目に、メッセージを発行する新しいトピック名(例:alert)を入力します。
(トピック名は、メッセージが無限に再発行されるのを防ぐために、受信したメッセージと同じトピック名での設定はできません。)
3. 完了ボタンを押して作成を完了します。
注意 トピック名の最大文字数は255文字です。
Cloud Functionsにデータを転送
Cloud Functionsにデータを転送して、追加のアクションを実行するには、まず Cloud Functions 商品に加入しなければなりません。(登録されていない場合、利用申請のためのポップアップウィンドウが作成され、「Cloud Functions 申請」ボタンで Cloud Functions 商品に移動して、利用申請を行います。)
Cloud Functionsにデータを転送、をアクションに選択する前に、Cloud Functionsのトリガーを設定するステップを実行します。 IoT機器からCloud IoT Coreに転送したメッセージを分析して、先に定義したルールクエリに一致する場合、Cloud Functionsに定義したアクション(例:ITFFFを介した機器の制御)を実行する仕組みです。
1. アクションのドロップダウンメニューから「Cloud Functionsへデータを転送」を選択した後、追加ボタンを押します。
2. アクション連動ポップアップウィンドウが見えて、ユーザーがCloud Functionsで作成したCloud IoT Coreトリガーを照会して選択することができます。 トリガーを作りたい場合は、「Cloud Functionsトリガー作成」ボタンで作成できます。
バッチ設定
- 一定のバッチ時間(5~600秒)の間、データをマージしてCloud Functionsトリガーを実行します。
- 時間の設定に応じてCloud Functionsトリガーの実行回数を減らし、メッセージを大量に処理することができます。
バッチを設定する時、Cloud Functionsトリガーパラメータに伝達されるデータのフォーマットは以下のとおりです。
- "messages" JSON Arrayフィールドを持つJSON Object
- IoT Coreに収集されたデータをJSON Arrayの形でマージ
例
{ "messages": [ { "deviceId": "1", "temperature": 12.6, "eventTime": 1606286653000 }, { "deviceId": "1", "temperature": 12.5, "eventTime": 1606286658000 }, { "deviceId": "1", "temperature": 12.3, "eventTime": 1606286693000 } ] }
注意 IoT Coreアクションの実行は、実際にマージされたメッセージ数で課金されます。 例えば、1分のバッチ時間の間、100個のメッセージがマージされた場合、IoT Coreアクションの実行は100件で課金されます。
3. 完了ボタンを押して、アクションの作成を完了します。
エラーアクションの作成
エラーアクションは、ユーザーが定義したルールのクエリが失敗したり、アクションの実行が失敗した場合に実行されるアクションです。 例えば、「Cloud Functionsへデータを転送」アクションを要求している過程において、予想できない状況でCloud Functionsへの要求が失敗した場合、Cloud IoT Coreはアクションの失敗を認識し、「エラーアクション」で設定されたアクションを実行します。(他商品の実行の結果、例えば、Cloud Functionsの実行結果に対するエラー処理はサポートしていません。) エラーアクションも、アクションの設定と同じように、「指定したトピックに再発行」または「Cloud Functionsへデータを転送」に設定することができます。
エラーアクションのリスポンス
- エラーアクションは、JSONタイプで提供され、次のような形式があります。errorsListは、メッセージの一つが複数のアクションを実行するので、複数のアクション中にエラーが発生した、すべてのアクションのエラーに関するリストです。
- クエリでエラーが発生した場合、actionNameフィールドはなく、errorTypeはRULE_QUERYで表示されます。
{ "ruleName":"TriggerSensor", "payload":{ "temp":50, "high":"top" }, "topic" : "myhome/room1", "errorsList":[ { "errorType" : "CLOUD_FUNCTIONS", "actionName" : "aggregate", "message":"Function id invalid", "returnCode":621 }, ... ] }
参考 Cloud IoT CoreではCloud Functionsの実行の成功、失敗の有無を確認します。また、機能の実行結果は、Cloud Functions商品のコンソール画面で確認することができます。
参考 エラーアクション実行時に、そのエラーアクションにエラーが発生すると、追加のエラーアクションを呼び出すことはできません。
IoT機器の接続
IoT機器に証明書をインストールしてCloud IoT Core設定を完了すると、機器をCloud IoT Coreに接続作成し、メッセージを発行、さらにメッセージをサブスクリプションする過程が動作することを確認できます。
IoT機器からのメッセージを送信するためのサンプルコード(Java、Python、JavaScript)は、次のリンクを参照してください。 サンプルコードのリンク
本ガイドでは、Pythonのサンプルコードに基づいて説明します。
1. Cloud IoT Coreサービスの申請後、ダウンロードした証明書(xxxx.caChain.pem, xxxx.cert.pem, xxxx.private.pem, rootCaCert.pem)をIoT機器にコピーします。
2. paho-MQTTライブラリをPythonのバージョンに合わせてインストールします。
pip install paho-mqtt //python 2.x
python -m pip install paho-mqtt // python 2.7.9+
python3 -m pip install paho-mqtt //python 3.x
3. データを送信するためのコード(mqttTSLClient.py)の作成と実行
- PythonのサンプルコードmqttTLSClient.py ファイルを参照して、コードを作成します。
- サンプルコード内の証明書のパスと、ユーザーがダウンロードした証明書のパスが一致するように修正しなければなりません。
- サンプルコードは、Cloud IoT Coreに接続した後、合計5回のメッセージの発行と、サブスクリプションを繰り返した後にプログラムが終了します。
- メッセージの発行は、コンソール説明書の例のようなfactory/room1/temperatureトピックで発生し、IoTサービスを経て再発行されたalertトピックをサブスクリプションします。
Python サンプルコード
# coding=utf-8
import argparse
import logging
import paho.mqtt.client as mqttClient
import ssl
import time
import json
def main():
# init
initLogger()
initParameter()
appendCert()
# Connect MQTT Broker
client = mqttClient.Client()
if not connectTls(client, hostname, port, rootCa, fullCertChain, private):
client.loop_stop()
exit(1)
attempts = 0
while attempts < 5:
# Subscribe message
client.subscribe("alert",0)
# Publish Message to Message broker
publish(client, topic, payload)
time.sleep(publishDelay)
attempts += 1
time.sleep(5)
def initLogger():
global log
log = logging.getLogger(__name__)
log.setLevel(logging.ERROR)
# stream handler (print to console)
streamHandler = logging.StreamHandler()
# file formatter
formatter = logging.Formatter('[%(asctime)s] [%(levelname)s] (%(filename)s:%(funcName)s:%(lineno)d) : %(message)s')
streamHandler.setFormatter(formatter)
log.addHandler(streamHandler)
def initParameter():
global rootCa
global caChain
global cert
global private
global fullCertChain
global hostname
global port
global publishDelay
global topic
global payload
global connected
rootCa = '/<Your>/<file>/<path>/rootCaCert.pem'
caChain = '/<Your>/<file>/<path>/caChain.pem'
cert = '/<Your>/<file>/<path>/cert.pem'
private = '/<Your>/<file>/<path>/private.pem'
hostname = 'msg01.cloudiot.ntruss.com'
port = 8883
publishDelay = 1 # sec
topic = "factory/room1/temperature"
payload = "{ \"deviceId\": \"device_1\", \"deviceType\": \"temperature\", \"value\": 35, \"battery\": 9, \"date\": \"2016-12-15\", \"time\": \"15:12:00\"}"
connected = False
#fullCertChain is automatically generated by cert and caChain
fullCertChain = './fullCertChain.pem'
def appendCert():
filenames = [cert, caChain]
with open(fullCertChain,'w') as outfile:
for fname in filenames :
with open(fname) as infile:
outfile.write(infile.read()+"\n")
def connectTls(client, hostname, port, rootCa, fullCertChain, clientKey):
client.on_connect = on_connect
client.on_message = on_message
client.on_publish = on_publish
client.tls_set(ca_certs=rootCa, certfile=fullCertChain,
keyfile=clientKey, cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
ssl.match_hostname = lambda cert, hostname: True
client.tls_insecure_set(False)
client.connect(hostname, port=port)
client.loop_start()
attempts = 0
while not connected and attempts < 5: # Wait for connection
time.sleep(1)
attempts += 1
if not connected:
return False
return True
def on_connect(client, userdata, flags, rc):
if rc == 0:
log.info("=== Successfully Connected")
global connected # Use global variable
connected = True # Signal connection
else:
log.error("=== Connection lost")
def on_publish(client, userdata, result):
print(">>> Publish to IoT server.")
def publish(client, topic, payload):
try:
client.publish(topic, payload)
except Exception as e:
print("[ERROR] Could not publish data, error: {}".format(e))
def on_message(client, userdata, message):
print("<<< Subscribe from IoT server. topic : "+ message.topic + ", message : " + str(message.payload.decode("utf-8")))
if __name__ == "__main__":
main()
Python サンプルコードの動作結果
>>> Publish to IoT server.
>>> Publish to IoT server.
>>> Publish to IoT server.
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}