技術メモ(仮)

IT系の話や研究,他のことなど話して行けたらいいな~って感じです.ただいまJavaを学習中

KafkaのCosumerAPIをJavaから叩いてみた

前回KafkaのProducerAPIをJavaで書いて動かしてみた.
KafkaのProducerAPIをJavaから叩いてみた - 技術メモ(仮)を参照
今回はJavaでKafkaConsumerのコードを書いてみた.

Consumer

コード

前回同様にコードはgithubにアップした.長すぎるからである.
https://github.com/fuji-151a/Kafka/blob/master/src/main/java/kafka/api/KafkaConsumer.java
これを実行するconsumeされる予定.

環境構築

前回のプロジェクトを拡張しただけなのでJarは前回と同じ.
KafkaのProducerAPIをJavaから叩いてみた - 技術メモ(仮)を参照

実行方法

実行方法は以下のコマンド(Sample)

java -cp Kafka-jar-with-dependencies.jar kafka.api.KafkaConsumer consumeConf.properties 1

propertiesファイル作成

前回同様に今回作成したコードを実行するには第1引数にpropertiesファイルを読み込ませる形になっている.
形式はこんな感じ.

group.id=groupid
zookeeper.connect=ZookeeperHost:Port
topic=TopicName

group.idにはgroupidを指定するらしいがよくわかっていないので今度調べる.
zookeeper.connectは接続するzookeeperのホスト名とport番号を指定.
topicにはconsumeしたいトピックを指定する.

スレッド

今回は第2引数にスレッド数を指定できるようにしている.
これはconsumeの際に接続するパーティションの数である.
例えばパーティションの数が10であったとするとスレッド数は10を指定しないと全てのデータをconsumeすることができないような仕組みになっている.

実践

まずKafkaおよびZookeeperが起動していることを確認.
起動方法はKafkaを動かしてみた - 技術メモ(仮)を参照
事前にKafkaにProduceを行う.
せっかくなので前回Javaで作成したProducerAPIを使ってみる.

java -cp Kafka-jar-with-dependencies.jar kafka.api.KafkaProducer prodConf.properties 0 0

ちなみにデータは以下の形式を1秒ずつproduceする.

[yyyy/MM/dd HH:mm:SS] Test Data
Client側

あとはcosumeしたいサーバから以下のJavaコマンドを実行
今回はパーティションの数を1でproduceしているのでスレッド数も1で指定

java -cp Kafka-jar-with-dependencies.jar kafka.api.KafkaConsumer consumeConf.properties 1
consume結果
[2014/04/07 00:32:43] Test Data
[2014/04/07 00:32:44] Test Data
[2014/04/07 00:32:45] Test Data
・・・以下略・・・
[2014/04/07 00:33:41] Test Data
[2014/04/07 00:33:42] Test Data
[2014/04/07 00:33:43] Test Data
[2014/04/07 00:33:44] Test Data
・・・以下無限・・・

これも無事にconsumeされている.
これで自在にJavaからKafkaにデータをProduce,Consumeできるようになった.

まとめ

Javaを用いてKafkaConsumerのAPIを叩いてみた.
無事にConsumeできることも確認した.
今後はこのConsumerとProducerを用いてなんかシステムを作ってみたい.
でも,もう少しこのコードが何をしているか調査する必要がある.