KafkaのCosumerAPIをJavaから叩いてみた
前回KafkaのProducerAPIをJavaで書いて動かしてみた.
KafkaのProducerAPIをJavaから叩いてみた - 技術メモ(仮)を参照
今回はJavaでKafkaConsumerのコードを書いてみた.
Consumer
コード
前回同様にコードはgithubにアップした.長すぎるからである.
https://github.com/fuji-151a/Kafka/blob/master/src/main/java/kafka/api/KafkaConsumer.java
これを実行するconsumeされる予定.
環境構築
前回のプロジェクトを拡張しただけなのでJarは前回と同じ.
KafkaのProducerAPIをJavaから叩いてみた - 技術メモ(仮)を参照
実行方法
実行方法は以下のコマンド(Sample)
java -cp Kafka-jar-with-dependencies.jar kafka.api.KafkaConsumer consumeConf.properties 1
propertiesファイル作成
前回同様に今回作成したコードを実行するには第1引数にpropertiesファイルを読み込ませる形になっている.
形式はこんな感じ.
group.id=groupid zookeeper.connect=ZookeeperHost:Port topic=TopicName
group.idにはgroupidを指定するらしいがよくわかっていないので今度調べる.
zookeeper.connectは接続するzookeeperのホスト名とport番号を指定.
topicにはconsumeしたいトピックを指定する.
スレッド
今回は第2引数にスレッド数を指定できるようにしている.
これはconsumeの際に接続するパーティションの数である.
例えばパーティションの数が10であったとするとスレッド数は10を指定しないと全てのデータをconsumeすることができないような仕組みになっている.
実践
まずKafkaおよびZookeeperが起動していることを確認.
起動方法はKafkaを動かしてみた - 技術メモ(仮)を参照
事前にKafkaにProduceを行う.
せっかくなので前回Javaで作成したProducerAPIを使ってみる.
java -cp Kafka-jar-with-dependencies.jar kafka.api.KafkaProducer prodConf.properties 0 0
ちなみにデータは以下の形式を1秒ずつproduceする.
[yyyy/MM/dd HH:mm:SS] Test Data
Client側
あとはcosumeしたいサーバから以下のJavaコマンドを実行
今回はパーティションの数を1でproduceしているのでスレッド数も1で指定
java -cp Kafka-jar-with-dependencies.jar kafka.api.KafkaConsumer consumeConf.properties 1
consume結果
[2014/04/07 00:32:43] Test Data [2014/04/07 00:32:44] Test Data [2014/04/07 00:32:45] Test Data ・・・以下略・・・ [2014/04/07 00:33:41] Test Data [2014/04/07 00:33:42] Test Data [2014/04/07 00:33:43] Test Data [2014/04/07 00:33:44] Test Data ・・・以下無限・・・
これも無事にconsumeされている.
これで自在にJavaからKafkaにデータをProduce,Consumeできるようになった.