技術メモ(仮)

IT系の話や研究,他のことなど話して行けたらいいな~って感じです.ただいまJavaを学習中

KafkaのProducerAPIをJavaから叩いてみた

前回Kafkaをインストールして動かしてみた.
Kafkaを動かしてみた - 技術メモ(仮)を参照.
前回は元から入っているshellscriptから呼び出したけど今回はJavaからKafkaにProduceを行ってみようと思う.
なのでJavaでKafkaProducerのコードを書いてみた.

Producer

コード

このページに載せる予定だったけど想像以上に行数がいったのでgithubのURLを載せる.
https://github.com/fuji-151a/Kafka/blob/master/src/main/java/kafka/api/KafkaProducer.java

これを実行するとpuroduceされる予定.

環境構築

今回,指定した件数をproduceする機能と無限にproduceする機能実装した.
実行環境を構築する際はKafkaをインストールした際に出てくるJarのkafka_2.8.0-0.8.0.jarと
Kafkaのlib以下にあるJarを全部ライブラリとしてインポートしたら使えるはず.

実行方法

実行方法は以下のコマンド(Sample)

java -cp Kafka-jar-with-dependencies.jar kafka.api.KafkaProducer prodConf.properties 1 1000
propertiesファイル作成

今回作成したコード実行するには第1引数にpropertiesファイルを読み込ませる形になっている.
形式はこんな感じ.

zk.connect=ZookeeperHost:Port
topic=TopicName
metadata.broker.list=KafkaHost:BrokerPort
serializer.class=kafka.serializer.StringEncoder

zk.connectにはzookeeperのホスト名とそのポートを指定.
topicにはproduceしたいKafkaのトピック名を指定.
metadata.broker.listにはKafkaBrokerのホスト名とそのポートを指定.
serializer.classこれは文字列のエンコード形式であると考えられる.

モード

今回第2引数に0 or 1を指定することで挙動が変わるようにしている.
モードによって第3引数の意味合いがことなる.

  • 0:無限にproduceする
  • 1:件数を指定してproduceする

0の場合第3引数が待ち時間(produceの速度)
1の場合第3引数が件数.
となる.

実践

まずKafkaおよびZookeeperが起動していることを確認.
起動方法はKafkaを動かしてみた - 技術メモ(仮)を参照
Kafka側でconsumerのshellを起動させておく.

sudo /opt/kafka/bin/kafka-console-consumer.sh --zookeeper zookeeper:2181 --topic KafkaTest

Client側
あとはクライアント側でさっきのJavaコマンドを実行
試しに1000件produce.

java -cp Kafka-jar-with-dependencies.jar kafka.api.KafkaProducer prodConf.properties 1 1000

Kafka側の出力

[2014/03/19 00:38:53] 1:Test Data
[2014/03/19 00:38:53] 2:Test Data
・・・以下略・・・
[2014/03/19 00:38:53] 998:Test Data
[2014/03/19 00:38:53] 999:Test Data

おお!?ちゃんとproduceされた!!

無限produceを試した.

java -cp Kafka-jar-with-dependencies.jar kafka.api.KafkaProducer prodConf.properties 0 1

Kafka側の出力

[2014/03/19 00:41:36] Test Data
・・・以下1秒おきにログが出力・・・

こっちもできた!!

まとめ

Javaを用いてKafkaProducerのAPIを叩いてみた.
あと無事に動くことを確認した.
このプログラムに意味はあるのか?と問われるとそうさ,意味なんかない.
という状態.でも今後何かの役に立つはず.
次はソースに書いてあるKafkaProducerAPIの内容を調査する.
またはConsumerのプログラムをJavaで書いてみるかのどちらかである.