サンプルコードの説明

Cloud IoT Coreサービスのコンソールから、ダウンロード証明書を使用して、MQTTプロトコルに基づいてCloud IoT Coreサーバーにメッセージを発行し、サブスクリプションする例を説明します。

コードは、Java、Python、Node JS(JavaScript)を基準に作成されています。

Java

  • サンプルコードは、Cloud IoT Coreに接続した後、合計5回のメッセージの発行と、サブスクリプションを繰り返した後にプログラムが終了します。

メッセージの発行は、コンソール説明書の例のようなfactory/room1/temperatureトピックで発生し、IoTサービスを経て再発行されたalertトピックをサブスクリプションします。

環境設定

このサンプルファイルを実行するためにはMQTT、TLSライブラリが必要です。

  • maven

    <dependencies>
    <dependency>
      <groupId>org.eclipse.paho</groupId>
      <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
      <version>${version}</version>
    </dependency>
    <dependency>
      <groupId>org.bouncycastle</groupId>
      <artifactId>bcpkix-jdk15on</artifactId>
      <version>${version}</version>
    </dependency>
    </dependencies>
    
  • gradle

    implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:${version}'
    implementation 'org.bouncycastle:bcpkix-jdk15on:${version}'
    

Javaのサンプルコード

import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.openssl.PEMKeyPair;
import org.bouncycastle.openssl.PEMParser;
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.security.*;
import java.security.cert.Certificate;
import java.security.cert.CertificateException;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.List;


public class MqttExample {

    public static void main(String[] args) throws Exception {
        String clientId = "myClientTest";

        /*
         *** Modify file path and server ***
         */
        // MQTT server hostname
        String iotServer = "ssl://msg01.cloudiot.ntruss.com";

        String rootCaCertFilePath = "/<Your>/<file>/<path>/rootCaCert.pem";
        String clientCaChainFilePath = "/<Your>/<file>/<path>/caChain.pem";
        String clientCertFilePath = "/<Your>/<file>/<path>/cert.pem";
        String clientKeyFilePath = "/<Your>/<file>/<path>/private.pem";
        String password = "";

        /*
        Make Root CA TrustManager and Client Keystore
         */
        MqttExample mqttExample = new MqttExample();
        // CA certificate is used to **authenticate server**
        TrustManagerFactory tmf = mqttExample.getTrustManagerFactory(rootCaCertFilePath);

        // client key, certificates and cachain are sent to server to authenticate client
        KeyManagerFactory kmf = mqttExample.getKeyManagerFactory(clientCaChainFilePath, clientCertFilePath, clientKeyFilePath, password);


        /*
        MQTT Connection
         */

        // Set MQTT Client
        MqttClient mqttClient = new MqttClient(iotServer, clientId, new MemoryPersistence());
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(true);
        connOpts.setConnectionTimeout(60);
        connOpts.setKeepAliveInterval(60);
        connOpts.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1);

        // create SSL socket factory
        SSLContext context = SSLContext.getInstance("TLSv1.2");
        context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
        connOpts.setSocketFactory(context.getSocketFactory());
        mqttClient.connect(connOpts);

        System.out.println("=== Successfully Connected");


        /*
        Subscribe & Publish Message
        Publish message on the device ---> IoT Platform(alert republish action) ---> Subscribe message on the device
         */

        mqttClient.setCallback(new MqttCallback() {
            public void connectionLost(Throwable throwable) {
                System.out.println("=== Connection lost.");
            }

            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                System.out.println("<<< Subscribe from IoT server. topic : " + s + ", message : " + mqttMessage.toString());
            }

            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println(">>> Publish to IoT server.");
            }
        });

        /*
        Topic & Message Example
        */
        String sendTopic = "factory/room1/temperature";

        // Test for MQTT republish
        // In order to receive alert topic message, alert republish action must be set in Rule engine.
        String alertTopic = "alert";

        // Console guide example message
        String message = "{ \"deviceId\": \"device_1\", \"deviceType\": \"temperature\", \"value\": 35, \"battery\": 9, \"date\": \"2016-12-15\", \"time\": \"15:12:00\"}";


        /*
        Subscribe message
         */
        boolean isConnected = mqttClient.isConnected();
        if (isConnected) {
            mqttClient.subscribe(alertTopic);

            // You can check whether the message is being delivered normally.
//            mqttClient.subscribe(sendTopic);
        }

        /*
        Publish message
         */
        if (isConnected) {
            for (int i=0; i < 5; i++) {
                MqttMessage mqttMessage = new MqttMessage(message.getBytes());
                mqttMessage.setQos(0);
                mqttMessage.setRetained(false);

                try {
                    MqttTopic topic = mqttClient.getTopic(sendTopic);
                    MqttDeliveryToken token = topic.publish(mqttMessage);
                    token.waitForCompletion();
                    Thread.sleep(2000);

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        /*
        Enable disconnect() and close() if you needed
         */
//        mqttClient.disconnect();
//        mqttClient.close();
    }


    /*
    Code for Certificate Access
     */

    private MqttExample() {
        init();
    }

    private KeyManagerFactory getKeyManagerFactory(String clientCaChainFilePath, String clientCertFilePath, String clientKeyFilePath, String password) throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException, UnrecoverableKeyException {

        // load client private key
        KeyPair key = getClientKey(clientKeyFilePath);

        // load client Cert
        Certificate clientCert = getClientCert(clientCertFilePath);

        // load client CA Chain
        List<Certificate> caChain = getClientCaChain(clientCaChainFilePath, clientCert);

        KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
        ks.load(null, null);
        int caChainSize = caChain.size();
        Certificate[] caChainArray = caChain.toArray(new Certificate[caChainSize]);
        ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(), caChainArray);
        KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
        kmf.init(ks, password.toCharArray());

        return kmf;
    }

    private TrustManagerFactory getTrustManagerFactory(String rootCaCertFilePath) throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException {
        // load CA certificate
        X509Certificate rootCaCert = getRootCaCert(rootCaCertFilePath);

        KeyStore rootCaKeyStore = KeyStore.getInstance(KeyStore.getDefaultType());
        rootCaKeyStore.load(null, null);
        rootCaKeyStore.setCertificateEntry("ca-certificate", rootCaCert);
        TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509");
        tmf.init(rootCaKeyStore);

        return tmf;
    }


    private void init() {
        Security.addProvider(new BouncyCastleProvider());
    }

    private KeyPair getClientKey(String clientKeyFilePath) throws IOException {
        PEMParser pemParser = new PEMParser(new FileReader(clientKeyFilePath));
        Object object = pemParser.readObject();
        JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC");

        KeyPair key = converter.getKeyPair((PEMKeyPair) object);
        pemParser.close();
        return key;
    }

    private List<Certificate> getClientCaChain(String clientCaChainFilePath, Certificate clientCert) throws CertificateException, IOException {
        X509Certificate cert;
        List<Certificate> caChain = new ArrayList<Certificate>();

        BufferedInputStream bis = new BufferedInputStream(new FileInputStream(clientCaChainFilePath));
        CertificateFactory cf = CertificateFactory.getInstance("X.509");

        while (bis.available() > 0) {
            cert = (X509Certificate) cf.generateCertificate(bis);
            caChain.add(cert);
        }

        caChain.add(0, clientCert);
        return caChain;
    }

    private X509Certificate getClientCert(String clientCertFilePath) throws CertificateException, IOException {
        return getX509Certificate(clientCertFilePath);
    }

    private X509Certificate getRootCaCert(String rootCaCertFilePath) throws CertificateException, IOException {
        return getX509Certificate(rootCaCertFilePath);
    }

    private X509Certificate getX509Certificate(String rootCaCertFilePath) throws CertificateException, IOException {
        X509Certificate caCert = null;

        BufferedInputStream bis = new BufferedInputStream(new FileInputStream(rootCaCertFilePath));
        CertificateFactory cf = CertificateFactory.getInstance("X.509");

        while (bis.available() > 0) {
            caCert = (X509Certificate) cf.generateCertificate(bis);
        }
        return caCert;
    }
}

Javaサンプルコードの動作結果

=== Successfully Connected
>>> Publish to IoT server.
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}

Python

サンプルコードは、Cloud IoT Coreに接続した後、合計5回のメッセージの発行と、サブスクリプションを繰り返した後にプログラムが終了します。

メッセージの発行は、コンソール説明書の例のようなfactory/room1/temperatureトピックで発生し、IoTサービスを経て再発行されたalertトピックをサブスクリプションします。

環境設定

このサンプルファイルを実行するためにはpaho-mqttライブラリのインストールが必要です。

pip install paho-mqtt //python 2.x
python -m pip install paho-mqtt // python 2.7.9+
python3 -m pip install paho-mqtt //python 3.x

サンプルファイル内のFull Certificate Chain値の場合、証明書とチェーン証明書を接続したFull Certificate Chainファイルを保存するパスを意味します。Pythonサンプルの実行時、Full Certificate Chainファイルが設定され、パスに自動的に別途保存されます。

Pythonサンプルコード

mqttTLSClient.py

# coding=utf-8
import argparse
import logging
import paho.mqtt.client as mqttClient
import ssl
import time
import json

def main():
    # init
    initLogger()
    initParameter()
    appendCert()

    # Connect MQTT Broker
    client = mqttClient.Client()
    if not connectTls(client, hostname, port, rootCa, fullCertChain, private):
        client.loop_stop()
        exit(1)

    attempts = 0
    while attempts < 5:
        # Subscribe message
        client.subscribe("alert",0)
        # Publish Message to Message broker
        publish(client, topic, payload)
        time.sleep(publishDelay)
        attempts += 1

    time.sleep(5)

def initLogger():
    global log
    log = logging.getLogger(__name__)
    log.setLevel(logging.ERROR)

    # stream handler (print to console)
    streamHandler = logging.StreamHandler()

    # file formatter
    formatter = logging.Formatter('[%(asctime)s] [%(levelname)s] (%(filename)s:%(funcName)s:%(lineno)d) : %(message)s')
    streamHandler.setFormatter(formatter)

    log.addHandler(streamHandler)

def initParameter():
    global rootCa
    global caChain
    global cert
    global private
    global fullCertChain

    global hostname
    global port
    global publishDelay

    global topic
    global payload

    global connected

    rootCa = '/<Your>/<file>/<path>/rootCaCert.pem'
    caChain = '/<Your>/<file>/<path>/caChain.pem'
    cert = '/<Your>/<file>/<path>/cert.pem'
    private = '/<Your>/<file>/<path>/private.pem'

    hostname = 'msg01.cloudiot.ntruss.com'
    port = 8883
    publishDelay = 1 # sec

    topic = "factory/room1/temperature"
    payload = "{ \"deviceId\": \"device_1\", \"deviceType\": \"temperature\", \"value\": 35, \"battery\": 9, \"date\": \"2016-12-15\", \"time\": \"15:12:00\"}"

    connected = False

    #fullCertChain is automatically generated by cert and caChain
    fullCertChain = './fullCertChain.pem'


def appendCert():
    filenames = [cert, caChain]
    with open(fullCertChain,'w') as outfile:
         for fname in filenames :
            with open(fname) as infile:
                outfile.write(infile.read()+"\n")

def connectTls(client, hostname, port, rootCa, fullCertChain, clientKey):
    client.on_connect = on_connect
    client.on_message = on_message
    client.on_publish = on_publish

    client.tls_set(ca_certs=rootCa, certfile=fullCertChain,
                   keyfile=clientKey, cert_reqs=ssl.CERT_REQUIRED,
                  tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)

    ssl.match_hostname = lambda cert, hostname: True
    client.tls_insecure_set(False)
    client.connect(hostname, port=port)
    client.loop_start()

    attempts = 0

    while not connected and attempts < 5:  # Wait for connection
        time.sleep(1)
        attempts += 1

    if not connected:
        return False

    return True

def on_connect(client, userdata, flags, rc):
    if rc == 0:
        log.info("=== Successfully Connected")
        global connected  # Use global variable
        connected = True  # Signal connection
    else:
        log.error("=== Connection lost")

def on_publish(client, userdata, result):
     print(">>> Publish to IoT server.")

def publish(client, topic, payload):
    try:
        client.publish(topic, payload)
    except Exception as e:
        print("[ERROR] Could not publish data, error: {}".format(e))

def on_message(client, userdata, message):
    print("<<< Subscribe from IoT server. topic : "+ message.topic + ", message : " + str(message.payload.decode("utf-8")))


if __name__ == "__main__":
    main()

Pythonサンプルコードの動作結果

mqttTLSClient.pyの実行結果

>>> Publish to IoT server.
>>> Publish to IoT server.
>>> Publish to IoT server.
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}

Node JS(JavaScript)

サンプルコードは、Cloud IoT Coreに接続した後、合計5回のメッセージの発行と、サブスクリプションを繰り返した後にプログラムが終了します。

メッセージの発行は、コンソール説明書の例のようなfactory/room1/temperatureトピックで発生し、IoTサービスを経て再発行されたalertトピックをサブスクリプションします。

環境設定

Node JS(JavaScript)のサンプルは、mqtt libraryライブラリのインストールが必要です。ライブラリのインストール前、事前にnpm(node package manager)がインストールされていなければなりません。 次のpackage.jsonファイルとMqttClient.jsファイルを同じフォルダに配置し、npm installコマンドでnode libraryパッケージをダウンロードした後、実行することができます。

package.json

{
  "name": "cloud-iot-mqtt-sample",
  "version": "0.1.0",
  "dependencies": { "mqtt": "^1.14.1" }
}

Node JS(JavaScript)サンプルコード

MqttClient.js

const mqtt = require('mqtt');
const fs = require('fs');
const path = require('path');

const HOST = "msg01.cloudiot.ntruss.com";
const PORT = 8883;
const PUBLISH_TOPIC = `factory/room1/temperature`;
const SUBSCRIBE_TOPIC = `alert`;
const QOS = 0;

const certFolderPath = "/<Your>/<file>/<path>/";
const KEY = fs.readFileSync(certFolderPath + `private.pem`);
const CERT = fs.readFileSync(certFolderPath + `cert.pem`);
const CA_CHAIN = fs.readFileSync(certFolderPath + `caChain.pem`);
const TRUSTED_CA_LIST = fs.readFileSync(certFolderPath + `rootCaCert.pem`);
const FULL_CERTIFICATION = CERT+"\n"+CA_CHAIN;

let receivedIndex = 0;

const connectionInfo = {
  port: PORT,
  host: HOST,
  key: KEY,
  cert: FULL_CERTIFICATION,
  ca: TRUSTED_CA_LIST,
  rejectUnauthorized: true,
  protocol: 'mqtts',
  connectTimeout: 60 * 1000,
  keepalive: 1000,
}

const client = mqtt.connect(connectionInfo);

// Connetion to mqtt message broker //
client.on('connect', async function () {
  console.log('=== Successfully Connected');

  for (let i = 0; i < 5; i++) {
    await sleep(1000);
    sendMessage();
  }
})

// Subscribe to Message //
function sendMessage() {
  console.log(">>> Publish to IoT server.")
  client.publish(PUBLISH_TOPIC, getMsg())
}

// Subscribe to Message //
client.subscribe(SUBSCRIBE_TOPIC, {qos:QOS});

// Receiving Message //
client.on('message', async function (topic, message) {
  console.log(`<<< Subscribe from IoT server. topic : "${topic}", message(${receivedIndex}) : ${message.toString()}`);
  receivedIndex++;

  if (receivedIndex == 5) {
    client.end();
    console.log("=== Complete.")
  }
})

client.on('close', function () {
  console.log("connection Close");
})

client.on('error', function (err) {
  console.log(err.toString());
})

function getMsg () {
  const timeStamp = Math.floor(new Date() / 1000);
  const msg = `{"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}`;
  return msg;
}

const sleep = (ms) => {
     return new Promise(resolve=>{
         setTimeout(resolve,ms);
     })
 }

Node JS(JavaScript)サンプルコードの動作結果

=== Successfully Connected
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : "factory/room1/temperature", message(0) : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : "factory/room1/temperature", message(1) : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : "factory/room1/temperature", message(2) : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : "factory/room1/temperature", message(3) : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : "factory/room1/temperature", message(4) : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
=== Complete.
connection Close

に対する検索結果は~件です。 ""

    に対する検索結果がありません。 ""

    処理中...