サンプルコードの説明
Cloud IoT Coreサービスのコンソールから、ダウンロード証明書を使用して、MQTTプロトコルに基づいてCloud IoT Coreサーバーにメッセージを発行し、サブスクリプションする例を説明します。
コードは、Java、Python、Node JS(JavaScript)を基準に作成されています。
Java
- サンプルコードは、Cloud IoT Coreに接続した後、合計5回のメッセージの発行と、サブスクリプションを繰り返した後にプログラムが終了します。
メッセージの発行は、コンソール説明書の例のようなfactory/room1/temperatureトピックで発生し、IoTサービスを経て再発行されたalertトピックをサブスクリプションします。
環境設定
このサンプルファイルを実行するためにはMQTT、TLSライブラリが必要です。
maven
<dependencies> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>${version}</version> </dependency> <dependency> <groupId>org.bouncycastle</groupId> <artifactId>bcpkix-jdk15on</artifactId> <version>${version}</version> </dependency> </dependencies>
gradle
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:${version}' implementation 'org.bouncycastle:bcpkix-jdk15on:${version}'
Javaのサンプルコード
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.openssl.PEMKeyPair;
import org.bouncycastle.openssl.PEMParser;
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.security.*;
import java.security.cert.Certificate;
import java.security.cert.CertificateException;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.List;
public class MqttExample {
public static void main(String[] args) throws Exception {
String clientId = "myClientTest";
/*
*** Modify file path and server ***
*/
// MQTT server hostname
String iotServer = "ssl://msg01.cloudiot.ntruss.com";
String rootCaCertFilePath = "/<Your>/<file>/<path>/rootCaCert.pem";
String clientCaChainFilePath = "/<Your>/<file>/<path>/caChain.pem";
String clientCertFilePath = "/<Your>/<file>/<path>/cert.pem";
String clientKeyFilePath = "/<Your>/<file>/<path>/private.pem";
String password = "";
/*
Make Root CA TrustManager and Client Keystore
*/
MqttExample mqttExample = new MqttExample();
// CA certificate is used to **authenticate server**
TrustManagerFactory tmf = mqttExample.getTrustManagerFactory(rootCaCertFilePath);
// client key, certificates and cachain are sent to server to authenticate client
KeyManagerFactory kmf = mqttExample.getKeyManagerFactory(clientCaChainFilePath, clientCertFilePath, clientKeyFilePath, password);
/*
MQTT Connection
*/
// Set MQTT Client
MqttClient mqttClient = new MqttClient(iotServer, clientId, new MemoryPersistence());
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setConnectionTimeout(60);
connOpts.setKeepAliveInterval(60);
connOpts.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1);
// create SSL socket factory
SSLContext context = SSLContext.getInstance("TLSv1.2");
context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
connOpts.setSocketFactory(context.getSocketFactory());
mqttClient.connect(connOpts);
System.out.println("=== Successfully Connected");
/*
Subscribe & Publish Message
Publish message on the device ---> IoT Platform(alert republish action) ---> Subscribe message on the device
*/
mqttClient.setCallback(new MqttCallback() {
public void connectionLost(Throwable throwable) {
System.out.println("=== Connection lost.");
}
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println("<<< Subscribe from IoT server. topic : " + s + ", message : " + mqttMessage.toString());
}
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println(">>> Publish to IoT server.");
}
});
/*
Topic & Message Example
*/
String sendTopic = "factory/room1/temperature";
// Test for MQTT republish
// In order to receive alert topic message, alert republish action must be set in Rule engine.
String alertTopic = "alert";
// Console guide example message
String message = "{ \"deviceId\": \"device_1\", \"deviceType\": \"temperature\", \"value\": 35, \"battery\": 9, \"date\": \"2016-12-15\", \"time\": \"15:12:00\"}";
/*
Subscribe message
*/
boolean isConnected = mqttClient.isConnected();
if (isConnected) {
mqttClient.subscribe(alertTopic);
// You can check whether the message is being delivered normally.
// mqttClient.subscribe(sendTopic);
}
/*
Publish message
*/
if (isConnected) {
for (int i=0; i < 5; i++) {
MqttMessage mqttMessage = new MqttMessage(message.getBytes());
mqttMessage.setQos(0);
mqttMessage.setRetained(false);
try {
MqttTopic topic = mqttClient.getTopic(sendTopic);
MqttDeliveryToken token = topic.publish(mqttMessage);
token.waitForCompletion();
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/*
Enable disconnect() and close() if you needed
*/
// mqttClient.disconnect();
// mqttClient.close();
}
/*
Code for Certificate Access
*/
private MqttExample() {
init();
}
private KeyManagerFactory getKeyManagerFactory(String clientCaChainFilePath, String clientCertFilePath, String clientKeyFilePath, String password) throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException, UnrecoverableKeyException {
// load client private key
KeyPair key = getClientKey(clientKeyFilePath);
// load client Cert
Certificate clientCert = getClientCert(clientCertFilePath);
// load client CA Chain
List<Certificate> caChain = getClientCaChain(clientCaChainFilePath, clientCert);
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
ks.load(null, null);
int caChainSize = caChain.size();
Certificate[] caChainArray = caChain.toArray(new Certificate[caChainSize]);
ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(), caChainArray);
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(ks, password.toCharArray());
return kmf;
}
private TrustManagerFactory getTrustManagerFactory(String rootCaCertFilePath) throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException {
// load CA certificate
X509Certificate rootCaCert = getRootCaCert(rootCaCertFilePath);
KeyStore rootCaKeyStore = KeyStore.getInstance(KeyStore.getDefaultType());
rootCaKeyStore.load(null, null);
rootCaKeyStore.setCertificateEntry("ca-certificate", rootCaCert);
TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509");
tmf.init(rootCaKeyStore);
return tmf;
}
private void init() {
Security.addProvider(new BouncyCastleProvider());
}
private KeyPair getClientKey(String clientKeyFilePath) throws IOException {
PEMParser pemParser = new PEMParser(new FileReader(clientKeyFilePath));
Object object = pemParser.readObject();
JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC");
KeyPair key = converter.getKeyPair((PEMKeyPair) object);
pemParser.close();
return key;
}
private List<Certificate> getClientCaChain(String clientCaChainFilePath, Certificate clientCert) throws CertificateException, IOException {
X509Certificate cert;
List<Certificate> caChain = new ArrayList<Certificate>();
BufferedInputStream bis = new BufferedInputStream(new FileInputStream(clientCaChainFilePath));
CertificateFactory cf = CertificateFactory.getInstance("X.509");
while (bis.available() > 0) {
cert = (X509Certificate) cf.generateCertificate(bis);
caChain.add(cert);
}
caChain.add(0, clientCert);
return caChain;
}
private X509Certificate getClientCert(String clientCertFilePath) throws CertificateException, IOException {
return getX509Certificate(clientCertFilePath);
}
private X509Certificate getRootCaCert(String rootCaCertFilePath) throws CertificateException, IOException {
return getX509Certificate(rootCaCertFilePath);
}
private X509Certificate getX509Certificate(String rootCaCertFilePath) throws CertificateException, IOException {
X509Certificate caCert = null;
BufferedInputStream bis = new BufferedInputStream(new FileInputStream(rootCaCertFilePath));
CertificateFactory cf = CertificateFactory.getInstance("X.509");
while (bis.available() > 0) {
caCert = (X509Certificate) cf.generateCertificate(bis);
}
return caCert;
}
}
Javaサンプルコードの動作結果
=== Successfully Connected
>>> Publish to IoT server.
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
Python
サンプルコードは、Cloud IoT Coreに接続した後、合計5回のメッセージの発行と、サブスクリプションを繰り返した後にプログラムが終了します。
メッセージの発行は、コンソール説明書の例のようなfactory/room1/temperatureトピックで発生し、IoTサービスを経て再発行されたalertトピックをサブスクリプションします。
環境設定
このサンプルファイルを実行するためにはpaho-mqttライブラリのインストールが必要です。
pip install paho-mqtt //python 2.x
python -m pip install paho-mqtt // python 2.7.9+
python3 -m pip install paho-mqtt //python 3.x
サンプルファイル内のFull Certificate Chain値の場合、証明書とチェーン証明書を接続したFull Certificate Chainファイルを保存するパスを意味します。Pythonサンプルの実行時、Full Certificate Chainファイルが設定され、パスに自動的に別途保存されます。
Pythonサンプルコード
mqttTLSClient.py
# coding=utf-8
import argparse
import logging
import paho.mqtt.client as mqttClient
import ssl
import time
import json
def main():
# init
initLogger()
initParameter()
appendCert()
# Connect MQTT Broker
client = mqttClient.Client()
if not connectTls(client, hostname, port, rootCa, fullCertChain, private):
client.loop_stop()
exit(1)
attempts = 0
while attempts < 5:
# Subscribe message
client.subscribe("alert",0)
# Publish Message to Message broker
publish(client, topic, payload)
time.sleep(publishDelay)
attempts += 1
time.sleep(5)
def initLogger():
global log
log = logging.getLogger(__name__)
log.setLevel(logging.ERROR)
# stream handler (print to console)
streamHandler = logging.StreamHandler()
# file formatter
formatter = logging.Formatter('[%(asctime)s] [%(levelname)s] (%(filename)s:%(funcName)s:%(lineno)d) : %(message)s')
streamHandler.setFormatter(formatter)
log.addHandler(streamHandler)
def initParameter():
global rootCa
global caChain
global cert
global private
global fullCertChain
global hostname
global port
global publishDelay
global topic
global payload
global connected
rootCa = '/<Your>/<file>/<path>/rootCaCert.pem'
caChain = '/<Your>/<file>/<path>/caChain.pem'
cert = '/<Your>/<file>/<path>/cert.pem'
private = '/<Your>/<file>/<path>/private.pem'
hostname = 'msg01.cloudiot.ntruss.com'
port = 8883
publishDelay = 1 # sec
topic = "factory/room1/temperature"
payload = "{ \"deviceId\": \"device_1\", \"deviceType\": \"temperature\", \"value\": 35, \"battery\": 9, \"date\": \"2016-12-15\", \"time\": \"15:12:00\"}"
connected = False
#fullCertChain is automatically generated by cert and caChain
fullCertChain = './fullCertChain.pem'
def appendCert():
filenames = [cert, caChain]
with open(fullCertChain,'w') as outfile:
for fname in filenames :
with open(fname) as infile:
outfile.write(infile.read()+"\n")
def connectTls(client, hostname, port, rootCa, fullCertChain, clientKey):
client.on_connect = on_connect
client.on_message = on_message
client.on_publish = on_publish
client.tls_set(ca_certs=rootCa, certfile=fullCertChain,
keyfile=clientKey, cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
ssl.match_hostname = lambda cert, hostname: True
client.tls_insecure_set(False)
client.connect(hostname, port=port)
client.loop_start()
attempts = 0
while not connected and attempts < 5: # Wait for connection
time.sleep(1)
attempts += 1
if not connected:
return False
return True
def on_connect(client, userdata, flags, rc):
if rc == 0:
log.info("=== Successfully Connected")
global connected # Use global variable
connected = True # Signal connection
else:
log.error("=== Connection lost")
def on_publish(client, userdata, result):
print(">>> Publish to IoT server.")
def publish(client, topic, payload):
try:
client.publish(topic, payload)
except Exception as e:
print("[ERROR] Could not publish data, error: {}".format(e))
def on_message(client, userdata, message):
print("<<< Subscribe from IoT server. topic : "+ message.topic + ", message : " + str(message.payload.decode("utf-8")))
if __name__ == "__main__":
main()
Pythonサンプルコードの動作結果
mqttTLSClient.pyの実行結果
>>> Publish to IoT server.
>>> Publish to IoT server.
>>> Publish to IoT server.
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
Node JS(JavaScript)
サンプルコードは、Cloud IoT Coreに接続した後、合計5回のメッセージの発行と、サブスクリプションを繰り返した後にプログラムが終了します。
メッセージの発行は、コンソール説明書の例のようなfactory/room1/temperatureトピックで発生し、IoTサービスを経て再発行されたalertトピックをサブスクリプションします。
環境設定
Node JS(JavaScript)のサンプルは、mqtt libraryライブラリのインストールが必要です。ライブラリのインストール前、事前にnpm(node package manager)がインストールされていなければなりません。 次のpackage.jsonファイルとMqttClient.jsファイルを同じフォルダに配置し、npm installコマンドでnode libraryパッケージをダウンロードした後、実行することができます。
package.json
{
"name": "cloud-iot-mqtt-sample",
"version": "0.1.0",
"dependencies": { "mqtt": "^1.14.1" }
}
Node JS(JavaScript)サンプルコード
MqttClient.js
const mqtt = require('mqtt');
const fs = require('fs');
const path = require('path');
const HOST = "msg01.cloudiot.ntruss.com";
const PORT = 8883;
const PUBLISH_TOPIC = `factory/room1/temperature`;
const SUBSCRIBE_TOPIC = `alert`;
const QOS = 0;
const certFolderPath = "/<Your>/<file>/<path>/";
const KEY = fs.readFileSync(certFolderPath + `private.pem`);
const CERT = fs.readFileSync(certFolderPath + `cert.pem`);
const CA_CHAIN = fs.readFileSync(certFolderPath + `caChain.pem`);
const TRUSTED_CA_LIST = fs.readFileSync(certFolderPath + `rootCaCert.pem`);
const FULL_CERTIFICATION = CERT+"\n"+CA_CHAIN;
let receivedIndex = 0;
const connectionInfo = {
port: PORT,
host: HOST,
key: KEY,
cert: FULL_CERTIFICATION,
ca: TRUSTED_CA_LIST,
rejectUnauthorized: true,
protocol: 'mqtts',
connectTimeout: 60 * 1000,
keepalive: 1000,
}
const client = mqtt.connect(connectionInfo);
// Connetion to mqtt message broker //
client.on('connect', async function () {
console.log('=== Successfully Connected');
for (let i = 0; i < 5; i++) {
await sleep(1000);
sendMessage();
}
})
// Subscribe to Message //
function sendMessage() {
console.log(">>> Publish to IoT server.")
client.publish(PUBLISH_TOPIC, getMsg())
}
// Subscribe to Message //
client.subscribe(SUBSCRIBE_TOPIC, {qos:QOS});
// Receiving Message //
client.on('message', async function (topic, message) {
console.log(`<<< Subscribe from IoT server. topic : "${topic}", message(${receivedIndex}) : ${message.toString()}`);
receivedIndex++;
if (receivedIndex == 5) {
client.end();
console.log("=== Complete.")
}
})
client.on('close', function () {
console.log("connection Close");
})
client.on('error', function (err) {
console.log(err.toString());
})
function getMsg () {
const timeStamp = Math.floor(new Date() / 1000);
const msg = `{"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}`;
return msg;
}
const sleep = (ms) => {
return new Promise(resolve=>{
setTimeout(resolve,ms);
})
}
Node JS(JavaScript)サンプルコードの動作結果
=== Successfully Connected
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : "factory/room1/temperature", message(0) : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : "factory/room1/temperature", message(1) : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : "factory/room1/temperature", message(2) : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : "factory/room1/temperature", message(3) : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : "factory/room1/temperature", message(4) : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
=== Complete.
connection Close