VPC環境で利用できます。

目次

Cloud Data Streaming Serviceを使用する

Cloud Data Streaming Serviceはインストールタイプの商品で、別途の利用申請/解除はなく、クラスタ作成時に契約が成立します。

クラスタが削除されると契約は終了します。

Cloud Data Streaming Serviceクラスタを作成する

Cloud Data Streaming ServiceメニューでCluster作成ボタンをクリックします。

Cloud Data Streaming Serviceクラスタ設定情報を入力する

① クラスタ名を入力します。

② Applicationバージョンを選択します。

③ Applicationバージョンを選択する画面です。

④ ACG情報です。

  • Cloud Data Streaming Serviceで使用するACGは cdss-クラスタ名で自動的に作成されます。

⑤ Kafka Broker Portです。

  • 9092が基本設定で、変更はできません。

⑥ Kafka Broker TLS Portです。

 * `9093`が基本設定で、変更はできません。

⑦ Zookeeper Portです。

   * `2181`が基本設定で、変更はできません。

⑧ CMAK Portです。

   * `9000`が基本設定で、変更はできません。

⑨ CMAKにアクセスするためのIDを入力します。

⑩ CMAKアクセス用Passwordを入力します。

⑪ CMAKアクセス用Passwordをもう一度入力します。

⑫ ノード設定情報を入力するために次へをクリックします。

Cloud Data Streaming Serviceノードを設定する

① OSを選択します。

② VPCを選択します。

③ マネージャーノードサーバのSubnetを選択します。

  • Public subnetを選択できます。

④ マネージャーノードサーバ数の情報です。

  • 1個で、値は変更できません。

⑤ マネージャーノードのサーバタイプを選択します。

  • 選択可能なサーバタイプが表示されます。

⑥ BrokerノードサーバのSubnetを選択します。

  • Private subnetを選択できます。

⑦ Brokerノード数を入力します。

  • 最小3台から最大10台まで作成できます。
  • 基本設定は3台です。

⑧ Brokerノードのサーバタイプを選択します。

  • 選択可能なサーバタイプが表示されます。

⑨ Brokerノードストレージ容量です。

  • OSストレージではない別途のBlockStorageを使用します。
  • BlockStorageは、一つのアカウントにつきNAVERクラウドプラットフォーム全体のサービスで最大2,000GBまで使用できます。

⑩ 最終情報を確認するために次へをクリックします。

入力情報を最終確認する

① 入力した情報が正しいか確認します。

② クラスタ作成のためにクラスタ作成ボタンをクリックします。

Cloud Data Streaming Serviceクラスタ作成の確認

① Cloud Data Streaming Serviceクラスタ作成が始まりました。

  • 作成にかかる時間は数分から数十分です。

Cloud Data Streaming Serviceクラスタを管理する

作成したCloud Data Streaming Serviceの削除、ノード追加、CMAKアクセス、CMAKアクセスパスワードの初期化、CMAKアクセスドメインの設定変更、サービス再起動機能です。

作成したクラスタの確認

① クラスタの作成が完了するとクラスタの要約情報が表示され、当該クラスタを選択すると下段の詳細情報が表示されます。

② クラスタの詳細情報が確認できます。

  • ACG情報とCMAKに移動する、暗号化通信に使用する証明書をダウンロードできます。
  • クラスタが選択されると、上段のメニュー削除/クラスタ管理/クラスタ再起動が有効化します。

③ 選択クラスタ削除メニューです。

④ クラスタ管理メニューです。

⑤ 選択クラスタ再起動メニューです。

クラスタ削除メニュー

① クラスタを選択してから削除メニューを選択すると、選択されたクラスタリストを確認して最終削除を進行するかどうか選択します。

② 実際にクラスタを削除するかどうか選択します。

クラスタ管理メニュー

① クラスタを選択してからクラスタ管理メニューを選択すると、ブローカーノード追加CMAKアクセスCMAKアクセスパスワードの初期化CMAKアクセスドメインの設定変更メニューが表示されます。

ブローカーノード追加

ブローカーノード追加メニューを選択すると、ノードを追加できる画面が表示されます。

② 現在のクラスタのブローカーノード数が表示され、最大10台まで増設を選択できます。

  • 例:追加するノード数1を選択すると、既存の3台に1台のブローカーノードが追加されます。

③ ブローカーノードの増設を実行します。

④ ノードの増設が始まるとサーバ作成が進み、サーバ状態が変更中と表示されます。

CMAKアクセス

① ショートカットボタンをクリックするとCMAKにアクセスできます。

CMAKアクセスパスワードの初期化

① CMAKアクセス用Passwordを失くした場合は、Passwordを初期化します。

② 新しいPasswordを入力して確認を選択すると、新しいPasswordでCMAKにアクセスできます。適用には数分かかります。

CMAKアクセスドメインの設定変更

① CMAKアクセスドメインの設定を変更します。

① 現在、Publicドメインが無効化状態の場合、Publicドメインを有効化できます。

② 現在、Publicドメインが有効化状態の場合、Publicドメインを無効化できます。

  • アクセスドメインを無効化する場合、外部からのCMAKアクセスが遮断されます。

クラスタ再起動

① すべてのサービス、Kafka & Zookeeper、CMAKを再起動できるメニューです。

すべてのサービスを再起動

① すべてのサービス(Kafka & Zookeeper & CMAK)を再起動します。

Kafka and Zookeeper 再起動

① Kafka & Zookeeperを再起動します。

CMAK再起動

① CMAKを再起動します。

Cloud Data Streaming Serviceを活用する

Cloud Data Streaming Serviceを活用する方法を説明します。

CMAKにアクセスしてTopicを作成し、Brokerノードの情報を確認して、Cloud Data Streaming Service Clusterにデータを保存して照会します。

CMAKにアクセスする

CMAKにアクセスするためには、Publicドメインを有効化する必要があります。

① クラスタ管理 -> CMAKアクセスドメインの設定変更ボタンをクリックします。

② [はい]ボタンを押してPublicドメインを有効化します。

① ショートカットボタンを押してCMAKにアクセスします。

① クラスタ作成過程で入力したIDとPasswordを入力します。

① 作成したクラスタに関する情報を確認できます。

① Topicに関する情報を確認できます。

② ブローカーノードに関する情報を確認できます。

Topicを作成する

① Createボタンを押してTopicを作成します。

① Topic名を入力します。

② TopicのPartitionsを入力します。

③ TopicのReplication Factorを入力します。

④ その他Topicの設定を入力します。

⑤ Createボタンを押してTopicを作成します。

作成したTopicを確認する

① Topicの情報を確認できます。

② Topic削除、Partition追加、再分配、設定変更などの操作を実行できます。

③ 各ブローカーノードに関して作成されたPartition情報を確認できます。

④ 各PartitionのLeaderとReplicationがどんなブローカーノードに位置しているのか、確認できます。

Cloud Data Streaming Service使用

上記のような構築でProducer VM、Consumer VMを作成し、先に作成したTopicに関してデータを保存、照会してみます。

Producer VM、Consumer VM作成とセット

VM作成画面

  • CentOS 7.3を基準にガイドを作成しました。

① Cloud Data Streaming Serviceクラスタを作成したVPCと同じVPCを選択します。

② VMにアクセス、インストールのためにPublic Subnetを選択します。

  • このVMにグローバルIPを割り当てるとアクセスできます。

Apache Kafka、Java、Pythonを活用してデータを送信、保存みましょう。

Javaのインストール

yum install java -y

Apache Kafkaインストール

wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz
# 解凍してくれます。
tar -zxvf kafka_2.12-2.4.0.tgz

ブローカーノード情報を確認する

① Brokerノード情報 - 詳細]ボタンをクリックします。

① ブローカーノードと暗号化せずに通信をするために使用されます。

② ブローカーノードと暗号化通信に使用されます。

③ 暗号化通信に利用されるhostsファイルを編集するために使用されます。

ブローカーノードのACGを変更します。

① プロトコルを選択します。

② アクセスソース情報にProducer VM、Consumer VMの非公認IPアドレスを入力します。

③ 許可するポートを登録します。

  • 9092-9093を入力します。

④ ACGに関するメモを登録します。

追加]をクリックすると、以下のリストに入力した情報が表示されます。

入力されたルールをもう一度確認して適用をクリックすると、そのルールがACGに適用されます。

Cloud Data Streaming Serviceにデータ転送と再生する

Apache Kafka活用して、データの転送と再生する

以前に作成し、セットしたProducer VMに接続して、次のコマンドを実行します。

cd kafka_2.12-2.4.0
./bin/kafka-console-producer.sh --broker-list [broker.list] --topic [topic]
# [broker.list]に前に確認したブローカーノード情報のPlainTextコピーを入力します。
# [topic]にCMAKで作成したTopicを入力します。
# 例示) ./bin/kafka-console-producer.sh --broker-list 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092 --topic test

送信したいメッセージを入力すると、ブローカーノードに対応するメッセージが保存されます。 *終了したい場合は、ctrl + cキーを入力します。

以前に作成し、セットしたConsumer VMに接続して、次のコマンドを実行します。

cd kafka_2.12-2.4.0
./bin/kafka-console-consumer.sh --bootstrap-server [bootstrap.server] --topic [topic] --from-beginning
# [bootstrap.server]に前に確認したブローカーノード情報のPlainTextコピーを入力します。
# [topic]にCMAKで作成したTopicを入力します。
# 例示) ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092 --topic test --from-beginning

--from-beginningコマンドの使用時には、そのTopicのデータを最初からすべて照会します。 --from-beginningコマンドを使用していない時に、データを照会した瞬間から入力されたデータのみ照会します。

Javaを活用して、データの転送と再生する

Java Applicationを活用して、データを転送します。

  • IntelliJ IDEAを使用して行われました。 File - > New - > Project、Mavenを選択してProjectを作成します。
pom.xmlファイルを変更する

プロジェクトの依存関係は、Javaのバージョンおよびパッケージのメソッドを定義するpom.xmlファイルを変更します。 ユーザー環境に応じてpom.xmlファイルが異なることがあります。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <packaging>jar</packaging>
    <groupId>org.example</groupId>
    <artifactId>maventest</artifactId>
    <version>1.0-SNAPSHOT</version>
    <url>http://maven.apache.org</url>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <!-- Apache Kafka version in Cloud Data Streaming Service -->
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.21</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <configuration>
                    <transformers>
                        <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
                        <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                            <mainClass>KafkaMain</mainClass>
                        </transformer>
                    </transformers>
                </configuration>

                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
KafkaMain.javaを作成する
  • Java Application実行時、argumentにproduce/ consumeかどうか、Topic、Broker listsを転送します。

    public class KafkaMain {
     public static void main(String[] args) throws IOException {
         String topicName = args[1];
         String brokers = args[2];
    
         switch(args[0]){
             case "produce":
                 Producer.produce(brokers, topicName);
                 break;
             case "consume":
                 Consumer.consume(brokers, topicName);
                 break;
             default:
                 System.out.println("Wrong arguments");
                 break;
         }
         System.exit(0);
     }
    }
    
Producer.javaを作成する

*例として0〜99の数字を送信しました。

public class Producer {
    public static void produce(String brokers, String topicName) throws IOException {
        // Create Producer
        KafkaProducer<String, String> producer;
        // Configure
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", brokers);
        properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        producer = new KafkaProducer<>(properties);

        for(int i=0;i<100;i++){
            ProducerRecord record = new ProducerRecord<String, String>(topicName, Integer.toString(i));
            producer.send(record);
        }
        producer.close();
    }
}
Consumer.javaを作成する
public class Consumer {
    public static int consume(String brokers, String topicName) {
        // Create Consumer
        KafkaConsumer<String, String> consumer;
        // Configure
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", brokers);
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("group.id", "consumer_group");
        properties.setProperty("auto.offset.reset", "earliest");

        consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Arrays.asList(topicName));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(500); // wait for 500ms
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
                System.out.println(record.value());
            }
        }
    }
}

作成したJava codeをgitに保存した後、VMからgit cloneコマンドを使用してcodeをダウンロードします。

git clone自分のgit repository

このJava Applicationを構築するためにはMavenのインストールが必要です。

yum install maven -y

ダウンロードしたJava codeフォルダに移動した後は、jarファイルを作成します。

cd kafkatest
mvn clean package

jarファイルをビルドした後、targetフォルダとtargetフォルダ内にjarファイルが生成されます。targetフォルダに移動した後は、jarファイルを実行します。

cd target

# データ転送する
java -jar kafkatest-1.0-SNAPSHOT.jar produce [topic] [broker.list]
# [topic]にCMAKで作成されたTopicを入力します。
# [broker.list]に前に確認したブローカーノード情報のPlainTextコピーを入力します。
例示) java -jar kafkatest-1.0-SNAPSHOT.jar produce test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092


# データ照会する
java -jar kafkatest-1.0-SNAPSHOT.jar consume [topic] [broker.list]
# [topic]にCMAKで作成されたTopicを入力します。
# [broker.list]に前に確認したブローカーノード情報のPlainTextコピーを入力します。
例示) java -jar kafkatest-1.0-SNAPSHOT.jar consume test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092

Pythonを活用して、データの転送と再生する

  • Python2.7.5バージョンで行われました。
  • PythonでKafkaを活用するためにはkafka-python packageのインストールが必要です。
    #pipインストール
    curl -LO https://bootstrap.pypa.io/get-pip.py 
    python get-pip.py
    # kafka-python package 설치
    pip install kafka-python
    

KafkaMain.pyを作成する

import sys
from kafka import KafkaProducer, KafkaConsumer
from json import dumps
import time


def produce(topicName, brokerLists):
        producer = KafkaProducer(bootstrap_servers=brokerLists,
                         value_serializer=lambda x:
                         dumps(x).encode('utf-8'))
        for i in range(100):
                producer.send(topicName, i)

def consume(topicName, brokerLists):
        consumer = KafkaConsumer(topicName, bootstrap_servers=brokerLists,
                        group_id="test")

        for msg in consumer:
               print(msg)


action=sys.argv[1]
topicName=sys.argv[2]
brokerLists=sys.argv[3].split(',')

if action == 'produce':
    produce(topicName, brokerLists)
elif action == 'consume':
    consume(topicName, brokerLists)
else:
    print('wrong arguments')

KafkaMain.pyファイルを実行してproduceする *例として0〜99の数字を送信しました。

python KafkaMain.py produce [topic] [broker.list]
# [topic]にCMAKで作成されたTopicを入力します。
# [broker.list]に前に確認したブローカーノード情報のPlainTextコピーを入力します。
# 例示) python KafkaMain.py produce test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092

KafkaMain.pyファイルを実行してconsumeする

python KafkaMain.py consume [topic] [broker.list]
# [topic]にCMAKで作成されたTopicを入力します。
# [broker.list]に前に確認したブローカーノード情報のPlainTextコピーを入力します。
# 例示) python KafkaMain.py consume test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092

通信区間暗号化

Apache KafkaクライアントとApache Kafka Broker間の通信区間を暗号化する方法です。

全体的なプロセス要約は次のとおりです(① ~③のプロセスはクラスタを作成する際に自動化されるプロセスです)。 ① マネージャーノードで自体証明書を作成します。 ② 各ブローカーノードで証明書を作成し、証明書署名リクエストを作成します。 ③ ②で作成した証明書署名リクエストをマネージャーノードが署名します。 ④ クライアントでは①で作成した証明書をダウンロードして、証明書に関する情報を持つtruststoreを作成します。 ⑤ 暗号化通信に関する設定ファイルを作成します。 ⑥ ローカルDNSファイルを修正する

証明書ダウンロード

① ダウンロードボタンをクリックして証明書をダウンロードします。

① 確認ボタンをクリックして証明書をダウンロードします。

Truststoreを作成する

keytool -keystore kafka.client.truststore.jks -alias mytruststore -import -file ca-cert

① password:passwordを入力します。 ② retype-password:再入力します。 ③ Trust this certificate?:yesを入力します。 ④ lsコマンドでkafka.client.truststore.jksファイルが作成されたことを確認できます。

暗号化設定ファイルを作成する

# /root/kafka_2.12-2.4.0フォルダにファイルを作成します。
cd kafka_2.12-2.4.0
vi client-auth.properties
security.protocol=SSL
ssl.truststore.location=/root/kafka.client.truststore.jks
ssl.truststore.password=[password]

上記の内容を入力したclient-auth.propertiesファイルを作成します。

ローカルDNSファイルを修正する

暗号化通信を実行するためにはz、ローカルDNSファイル(/etc/hosts)を修正する必要があります。/etc/hostsファイルは、LinuxでDNSサーバより先にホスト名をIPに変換するファイルです。

vi /etc/hosts

/etc/hostsファイルに[ブローカーノードの内部IPサーバ名]を加えます。例)

127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
172.16.1.6 networktest12-d-251
172.16.1.7 networktest12-d-252
172.16.1.8 networktest12-d-253

Cloud Data Streaming Serviceに通信区間暗号化してデータを伝送する

先に作成したProducer VMにアクセスします。

./bin/kafka-console-producer.sh --broker-list [broker.list] --topic [topic] --producer.config client-auth.properties
# [broker.list]で先に確認したブローカーノードのサーバ名:9093を入力します(必ずサーバ名を入力する必要があります)。
# [topic]にCMAKで作成したTopicを入力します。
# 例)./bin/kafka-console-producer.sh --broker-list yeakafka2-d-2am:9093,yeakafka2-d-2an:9093,yeakafka2-d-2ao:9093 --topic test --producer.config client-auth.properties

伝送しようとするメッセージを入力すると、ブローカーノードにこのメッセージが保存されます。

  • 終了する場合は、ctrl + cを入力します。

Cloud Data Streaming Serviceに通信区間暗号化して保存されたデータを照会する

先に作成したConsumer VMにアクセスします。

cd kafka_2.12-2.4.0
./bin/kafka-console-consumer.sh --bootstrap-server [bootstrap.server] --topic [topic] --consumer.config client-auth.properties
# [bootstrap.server]で先に確認したブローカーノードのサーバ名:9093を入力します。
# [topic]に前段階のProducer VMで入力したTopicを入力します。
# 例)./bin/kafka-console-consumer.sh --bootstrap-server yeakafka2-d-2am:9093,yeakafka2-d-2an:9093,yeakafka2-d-2ao:9093 --topic test --consumer.config client-auth.properties

に対する検索結果は~件です。 ""

    に対する検索結果がありません。 ""

    処理中...