예제 코드 설명

Cloud IoT Core 서비스 콘솔에서 다운로드 인증서를 사용하여 MQTT 프로토콜 기반으로 Cloud IoT Core 서버로 메시지를 발행하고 구독하는 예제를 설명합니다.

코드는 Java, Python, node.js를 기준으로 작성되어 있습니다.

Java

예제 코드는 Cloud IoT Core에 접속 후 총 5번의 메시지 발행 및 구독을 반복한 후 프로그램이 종료합니다.

메시지 발행은 콘솔 설명서의 예제와 같은 factory/room1/temperature 토픽으로 발생하며, IoT 서비스를 거쳐 재발행된 alert 토픽을 구독합니다.

Java 예제 코드

import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.openssl.PEMKeyPair;
import org.bouncycastle.openssl.PEMParser;
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.security.*;
import java.security.cert.Certificate;
import java.security.cert.CertificateException;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.List;


public class MqttExample {

    public static void main(String[] args) throws Exception {
        String clientId = "myClientTest";

        /*
         *** Modify file path and server ***
         */
        // MQTT server hostname
        String iotServer = "ssl://msg01.cloudiot.ntruss.com";

        String rootCaCertFilePath = "/<Your>/<file>/<path>/rootCert.pem";
        String clientCaChainFilePath = "/<Your>/<file>/<path>/caChain.pem";
        String clientCertFilePath = "/<Your>/<file>/<path>/cert.pem";
        String clientKeyFilePath = "/<Your>/<file>/<path>/private.pem";
        String password = "";

        /*
        Make Root CA TrustManager and Client Keystore
         */
        MqttExample mqttExample = new MqttExample();
        // CA certificate is used to **authenticate server**
        TrustManagerFactory tmf = mqttExample.getTrustManagerFactory(rootCaCertFilePath);

        // client key, certificates and cachain are sent to server to authenticate client
        KeyManagerFactory kmf = mqttExample.getKeyManagerFactory(clientCaChainFilePath, clientCertFilePath, clientKeyFilePath, password);


        /*
        MQTT Connection
         */

        // Set MQTT Client
        MqttClient mqttClient = new MqttClient(iotServer, clientId, new MemoryPersistence());
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(true);
        connOpts.setConnectionTimeout(60);
        connOpts.setKeepAliveInterval(60);
        connOpts.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1);

        // create SSL socket factory
        SSLContext context = SSLContext.getInstance("TLSv1.2");
        context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
        connOpts.setSocketFactory(context.getSocketFactory());
        mqttClient.connect(connOpts);

        System.out.println("=== Successfully Connected");


        /*
        Subscribe & Publish Message
        Publish message on the device ---> IoT Platform(alert republish action) ---> Subscribe message on the device
         */

        mqttClient.setCallback(new MqttCallback() {
            public void connectionLost(Throwable throwable) {
                System.out.println("=== Connection lost.");
            }

            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                System.out.println("<<< Subscribe from IoT server. topic : " + s + ", message : " + mqttMessage.toString());
            }

            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println(">>> Publish to IoT server.");
            }
        });

        /*
        Topic & Message Example
        */
        String sendTopic = "factory/room1/temperature";

        // Test for MQTT republish
        // In order to receive alert topic message, alert republish action must be set in Rule engine.
        String alertTopic = "alert";

        // Console guide example message
        String message = "{ \"deviceId\": \"device_1\", \"deviceType\": \"temperature\", \"value\": 35, \"battery\": 9, \"date\": \"2016-12-15\", \"time\": \"15:12:00\"}";


        /*
        Subscribe message
         */
        boolean isConnected = mqttClient.isConnected();
        if (isConnected) {
            mqttClient.subscribe(alertTopic);

            // You can check whether the message is being delivered normally.
//            mqttClient.subscribe(sendTopic);
        }

        /*
        Publish message
         */
        if (isConnected) {
            for (int i=0; i < 5; i++) {
                MqttMessage mqttMessage = new MqttMessage(message.getBytes());
                mqttMessage.setQos(0);
                mqttMessage.setRetained(false);

                try {
                    MqttTopic topic = mqttClient.getTopic(sendTopic);
                    MqttDeliveryToken token = topic.publish(mqttMessage);
                    token.waitForCompletion();
                    Thread.sleep(2000);

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        /*
        Enable disconnect() and close() if you needed
         */
//        mqttClient.disconnect();
//        mqttClient.close();
    }


    /*
    Code for Certificate Access
     */

    private MqttExample() {
        init();
    }

    private KeyManagerFactory getKeyManagerFactory(String clientCaChainFilePath, String clientCertFilePath, String clientKeyFilePath, String password) throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException, UnrecoverableKeyException {

        // load client private key
        KeyPair key = getClientKey(clientKeyFilePath);

        // load client Cert
        Certificate clientCert = getClientCert(clientCertFilePath);

        // load client CA Chain
        List<Certificate> caChain = getClientCaChain(clientCaChainFilePath, clientCert);

        KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
        ks.load(null, null);
        int caChainSize = caChain.size();
        Certificate[] caChainArray = caChain.toArray(new Certificate[caChainSize]);
        ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(), caChainArray);
        KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
        kmf.init(ks, password.toCharArray());

        return kmf;
    }

    private TrustManagerFactory getTrustManagerFactory(String rootCaCertFilePath) throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException {
        // load CA certificate
        X509Certificate rootCaCert = getRootCaCert(rootCaCertFilePath);

        KeyStore rootCaKeyStore = KeyStore.getInstance(KeyStore.getDefaultType());
        rootCaKeyStore.load(null, null);
        rootCaKeyStore.setCertificateEntry("ca-certificate", rootCaCert);
        TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509");
        tmf.init(rootCaKeyStore);

        return tmf;
    }


    private void init() {
        Security.addProvider(new BouncyCastleProvider());
    }

    private KeyPair getClientKey(String clientKeyFilePath) throws IOException {
        PEMParser pemParser = new PEMParser(new FileReader(clientKeyFilePath));
        Object object = pemParser.readObject();
        JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC");

        KeyPair key = converter.getKeyPair((PEMKeyPair) object);
        pemParser.close();
        return key;
    }

    private List<Certificate> getClientCaChain(String clientCaChainFilePath, Certificate clientCert) throws CertificateException, IOException {
        X509Certificate cert;
        List<Certificate> caChain = new ArrayList<Certificate>();

        BufferedInputStream bis = new BufferedInputStream(new FileInputStream(clientCaChainFilePath));
        CertificateFactory cf = CertificateFactory.getInstance("X.509");

        while (bis.available() > 0) {
            cert = (X509Certificate) cf.generateCertificate(bis);
            caChain.add(cert);
        }

        caChain.add(0, clientCert);
        return caChain;
    }

    private X509Certificate getClientCert(String clientCertFilePath) throws CertificateException, IOException {
        return getX509Certificate(clientCertFilePath);
    }

    private X509Certificate getRootCaCert(String rootCaCertFilePath) throws CertificateException, IOException {
        return getX509Certificate(rootCaCertFilePath);
    }

    private X509Certificate getX509Certificate(String rootCaCertFilePath) throws CertificateException, IOException {
        X509Certificate caCert = null;

        BufferedInputStream bis = new BufferedInputStream(new FileInputStream(rootCaCertFilePath));
        CertificateFactory cf = CertificateFactory.getInstance("X.509");

        while (bis.available() > 0) {
            caCert = (X509Certificate) cf.generateCertificate(bis);
        }
        return caCert;
    }
}

Java 예제 코드 동작 결과

=== Successfully Connected
>>> Publish to IoT server.
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}

Python

예제 코드는 Cloud IoT Core에 접속 후 총 5번의 메시지 발행 및 구독을 반복한 후 프로그램이 종료합니다.

메시지 발행은 콘솔 설명서의 예제와 같은 factory/room1/temperature 토픽으로 발생하며, IoT 서비스를 거쳐 재발행된 alert 토픽을 구독합니다.

환경 설정

이 예제 파일을 실행하기 위해서는 paho-mqtt 라이브러리 설치가 필요합니다.

pip install paho-mqtt //python 2.x
python -m pip install paho-mqtt // python 2.7.9+
python3 -m pip install paho-mqtt //python 3.x

예제 파일 내 Full Certificate Chain 값의 경우, 인증서와 체인 인증서를 연결한 Full Certificate Chain 파일을 저장할 경로를 의미합니다. Python 예제 실행 시 Full Certificate Chain 파일이 설정된 경로에 자동으로 별도 저장됩니다.

Python 예제 코드

mqttTLSClient.py

# coding=utf-8
from config import *
import argparse
import logging
import paho.mqtt.client as mqttClient
import ssl
import time
import json

def main():
    # init
    initLogger()
    initParameter()
    appendCert()

    # Connect MQTT Broker
    client = mqttClient.Client()
    if not connectTls(client, hostname, port, rootCa, fullCertChain, private):
        client.loop_stop()
        exit(1)

    attempts = 0
    while attempts < 5:
        # Subscribe message
        client.subscribe("alert",0)
        # Publish Message to Message broker
        publish(client, topic, payload)
        time.sleep(publishDelay)
        attempts += 1

    time.sleep(5)

def initLogger():
    global log
    log = logging.getLogger(__name__)
    log.setLevel(logging.ERROR)

    # stream handler (print to console)
    streamHandler = logging.StreamHandler()

    # file formatter
    formatter = logging.Formatter('[%(asctime)s] [%(levelname)s] (%(filename)s:%(funcName)s:%(lineno)d) : %(message)s')
    streamHandler.setFormatter(formatter)

    log.addHandler(streamHandler)

def initParameter():
    global rootCa
    global caChain
    global cert
    global private
    global fullCertChain

    global hostname
    global port
    global publishDelay

    global topic
    global payload

    global connected

    rootCa = '/<Your>/<file>/<path>/rootCert.pem'
    caChain = '/<Your>/<file>/<path>/caChain.pem'
    cert = '/<Your>/<file>/<path>/cert.pem'
    private = '/<Your>/<file>/<path>/private.pem'
    fullCertChain = '/<Your>/<file>/<path>/fullCertChain.pem'

    hostname = 'msg01.cloudiot.ntruss.com'
    port = 8883
    publishDelay = 1 # sec

    topic = "factory/room1/temperature"
    payload = "{ \"deviceId\": \"device_1\", \"deviceType\": \"temperature\", \"value\": 35, \"battery\": 9, \"date\": \"2016-12-15\", \"time\": \"15:12:00\"}"

    connected = False

def appendCert():
    filenames = [cert, caChain]
    with open(fullCertChain,'w') as outfile:
         for fname in filenames :
            with open(fname) as infile:
                outfile.write(infile.read()+"\n")

def connectTls(client, hostname, port, rootCa, fullCertChain, clientKey):
    client.on_connect = on_connect
    client.on_message = on_message
    client.on_publish = on_publish

    client.tls_set(ca_certs=rootCa, certfile=fullCertChain,
                   keyfile=clientKey, cert_reqs=ssl.CERT_REQUIRED,
                  tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)

    ssl.match_hostname = lambda cert, hostname: True
    client.tls_insecure_set(False)
    client.connect(hostname, port=port)
    client.loop_start()

    attempts = 0

    while not connected and attempts < 5:  # Wait for connection
        time.sleep(1)
        attempts += 1

    if not connected:
        return False

    return True

def on_connect(client, userdata, flags, rc):
    if rc == 0:
        log.info("=== Successfully Connected")
        global connected  # Use global variable
        connected = True  # Signal connection
    else:
        log.error("=== Connection lost")

def on_publish(client, userdata, result):
     print(">>> Publish to IoT server.")

def publish(client, topic, payload):
    try:
        client.publish(topic, payload)
    except Exception as e:
        print("[ERROR] Could not publish data, error: {}".format(e))

def on_message(client, userdata, message):
    print("<<< Subscribe from IoT server. topic : "+ message.topic + ", message : " + str(message.payload.decode("utf-8")))


if __name__ == "__main__":
    main()

Python 예제 코드 동작 결과

mqttTLSClient.py 실행 결과

>>> Publish to IoT server.
>>> Publish to IoT server.
>>> Publish to IoT server.
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}

Node JS

예제 코드는 Cloud IoT Core에 접속 후 총 5번의 메시지 발행 및 구독을 반복한 후 프로그램이 종료합니다.

메시지 발행은 콘솔 설명서의 예제와 같은 factory/room1/temperature 토픽으로 발생하며, IoT 서비스를 거쳐 재발행된 alert 토픽을 구독합니다.

환경설정

Node Js 예제는 mqtt library 라이브러리설치가 필요합니다. 라이브러리 설치전 사전에 npm(node package manager)이 설치되어 있어야합니다. 다음 package.json 파일과 MqttClient.js 파일을 같은 폴더에 위치하고, npm install 명령어로 node library 패키지를 다운로드한 후 실행할 수 있습니다.

package.json

{
  "name": "cloud-iot-mqtt-sample",
  "version": "0.1.0",
  "dependencies": { "mqtt": "^1.14.1" }
}

Node JS 예제 코드

MqttClient.js

const mqtt = require('mqtt');
const fs = require('fs');
const path = require('path');

const HOST = "msg01.cloudiot.ntruss.com";
const PORT = 8883;
const PUBLISH_TOPIC = `factory/room1/temperature`;
const SUBSCRIBE_TOPIC = `alert`;
const QOS = 0;

const certFolderPath = "/<Your>/<file>/<path>/";
const KEY = fs.readFileSync(certFolderPath + `private.pem`);
const CERT = fs.readFileSync(certFolderPath + `cert.pem`);
const CA_CHAIN = fs.readFileSync(certFolderPath + `caChain.pem`);
const TRUSTED_CA_LIST = fs.readFileSync(certFolderPath + `rootCert.pem`);
const FULL_CERTIFICATION = CERT+"\n"+CA_CHAIN;

let receivedIndex = 0;

const connectionInfo = {
  port: PORT,
  host: HOST,
  key: KEY,
  cert: FULL_CERTIFICATION,
  ca: TRUSTED_CA_LIST,
  rejectUnauthorized: true,
  protocol: 'mqtts',
  connectTimeout: 60 * 1000,
  keepalive: 1000,
}

const client = mqtt.connect(connectionInfo);

// Connetion to mqtt message broker //
client.on('connect', async function () {
  console.log('=== Successfully Connected');

  for (let i = 0; i < 5; i++) {
    await sleep(1000);
    sendMessage();
  }
})

// Subscribe to Message //
function sendMessage() {
  console.log(">>> Publish to IoT server.")
  client.publish(PUBLISH_TOPIC, getMsg())
}

// Subscribe to Message //
client.subscribe(SUBSCRIBE_TOPIC, {qos:QOS});

// Receiving Message //
client.on('message', async function (topic, message) {
  console.log(`<<< Subscribe from IoT server. topic : "${topic}", message(${receivedIndex}) : ${message.toString()}`);
  receivedIndex++;

  if (receivedIndex == 5) {
    client.end();
    console.log("=== Complete.")
  }
})

client.on('close', function () {
  console.log("connection Close");
})

client.on('error', function (err) {
  console.log(err.toString());
})

function getMsg (type, publishIndex) {
  const timeStamp = Math.floor(new Date() / 1000);
  const msg = `{"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}`;
  return msg;
}

const sleep = (ms) => {
     return new Promise(resolve=>{
         setTimeout(resolve,ms);
     })
 }

Node JS 예제 코드 동작 결과

=== Successfully Connected
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : "factory/room1/temperature", message(0) : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : "factory/room1/temperature", message(1) : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : "factory/room1/temperature", message(2) : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : "factory/room1/temperature", message(3) : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : "factory/room1/temperature", message(4) : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
=== Complete.
connection Close

""에 대한 건이 검색되었습니다.

    ""에 대한 검색 결과가 없습니다.

    처리중...