KafkaのProducerAPIをJavaから叩いてみた
前回Kafkaをインストールして動かしてみた.
Kafkaを動かしてみた - 技術メモ(仮)を参照.
前回は元から入っているshellscriptから呼び出したけど今回はJavaからKafkaにProduceを行ってみようと思う.
なのでJavaでKafkaProducerのコードを書いてみた.
Producer
コード
このページに載せる予定だったけど想像以上に行数がいったのでgithubのURLを載せる.
https://github.com/fuji-151a/Kafka/blob/master/src/main/java/kafka/api/KafkaProducer.java
これを実行するとpuroduceされる予定.
環境構築
今回,指定した件数をproduceする機能と無限にproduceする機能実装した.
実行環境を構築する際はKafkaをインストールした際に出てくるJarのkafka_2.8.0-0.8.0.jarと
Kafkaのlib以下にあるJarを全部ライブラリとしてインポートしたら使えるはず.
実行方法
実行方法は以下のコマンド(Sample)
java -cp Kafka-jar-with-dependencies.jar kafka.api.KafkaProducer prodConf.properties 1 1000
propertiesファイル作成
今回作成したコード実行するには第1引数にpropertiesファイルを読み込ませる形になっている.
形式はこんな感じ.
zk.connect=ZookeeperHost:Port topic=TopicName metadata.broker.list=KafkaHost:BrokerPort serializer.class=kafka.serializer.StringEncoder
zk.connectにはzookeeperのホスト名とそのポートを指定.
topicにはproduceしたいKafkaのトピック名を指定.
metadata.broker.listにはKafkaBrokerのホスト名とそのポートを指定.
serializer.classこれは文字列のエンコード形式であると考えられる.
モード
今回第2引数に0 or 1を指定することで挙動が変わるようにしている.
モードによって第3引数の意味合いがことなる.
- 0:無限にproduceする
- 1:件数を指定してproduceする
0の場合第3引数が待ち時間(produceの速度)
1の場合第3引数が件数.
となる.
実践
まずKafkaおよびZookeeperが起動していることを確認.
起動方法はKafkaを動かしてみた - 技術メモ(仮)を参照
Kafka側でconsumerのshellを起動させておく.
sudo /opt/kafka/bin/kafka-console-consumer.sh --zookeeper zookeeper:2181 --topic KafkaTest
Client側
あとはクライアント側でさっきのJavaコマンドを実行
試しに1000件produce.
java -cp Kafka-jar-with-dependencies.jar kafka.api.KafkaProducer prodConf.properties 1 1000
Kafka側の出力
[2014/03/19 00:38:53] 1:Test Data [2014/03/19 00:38:53] 2:Test Data ・・・以下略・・・ [2014/03/19 00:38:53] 998:Test Data [2014/03/19 00:38:53] 999:Test Data
おお!?ちゃんとproduceされた!!
無限produceを試した.
java -cp Kafka-jar-with-dependencies.jar kafka.api.KafkaProducer prodConf.properties 0 1
Kafka側の出力
[2014/03/19 00:41:36] Test Data ・・・以下1秒おきにログが出力・・・
こっちもできた!!