Kafkaを動かしてみた
Apache Kafkaを実際に動かしてみた.
Kafkaって何?って方はApache Kafkaについて - 技術メモ(仮)を参照してくれると嬉しい.
今回はproduceとconsumeの挙動を見てみた.
Kafkaを利用するには以下の環境が必要.
- Java(JDK6と7で動くことは確認)
- Zookeeper(3.3.4以降ならOKらしい)
今回仮想で以下の2台のサーバを作成した.
- Kafka(ホスト名:kafka)
- Zookeeper(ホスト名:zookeeper)
これらを利用しKafkaを動かしてみる.
Zookeeper
以下の処理はホスト名:zookeeperのサーバで行う.
インストール
まずZookeeperをzookeeperサーバにインストールする.
// JDKインストール yum install java-1.7.0-openjdk-devel // Zookeeperインストール wget http://archive.cloudera.com/cdh4/redhat/6/x86_64/cdh/cdh4-repository-1-0.noarch.rpm rpm -ivh cdh4-repository-1-0.noarch.rpm yum install zookeeper-server
これでひとまずZookeeperをインストールすることができた.
設定
/usr/lib/zookeeper/conf/zoo.cfgをvimとかで以下のように編集する.
# the port at which the clients will connect clientPort=2181 server.1=<zookeeperサーバのIP>:2182:2183 # 追加(インストール先のサーバアドレス)
上記の設定は別にホスト名でもいい.
起動
// Zookeeper起動 service zookeeper-server start
これでZookeeperの設定は完了.
次にKafkano設定を行う.
Kafka
以下の処理はホスト名:kafkaで行う.
今回は0.8系を試してみた.
インストール
// JDKインストール yum install -y java-1.7.0-openjdk-devel // KafkaをDLして展開 cd /opt wget http://ftp.riken.jp/net/apache/kafka/0.8.0/kafka_2.8.0-0.8.0.tar.gz tar xvzf kafka_2.8.0-0.8.0.tar.gz mv kafka_2.8.0-0.8.0 kafka
設定
/opt/kafka/config/server.properties
# Hostname the broker will bind to and advertise to producers and consumers. # If not set, the server will bind to all interfaces and advertise the value returned from # from java.net.InetAddress.getCanonicalHostName(). host.name=kafka # 変更:ホスト名設定 # The directory under which to store log files log.dir=/home/kafka/kafka-logs #変更:/tmp/kafka-logs → /home/kafka/kafka-logs # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=<zookeeperのIPアドレス>:2181 #変更:localhost → zookeeperのIPアドレス
起動
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties &
これでKafkaの準備ができたので次に実際にProduceしてみる.
Producer
ProduceするためにTopicの作成を行う.
// トピック作成 cd /opt/kafka bin/kafka-create-topic.sh --zookeeper 192.168.100.210:2181 --replica 1 --partition 1 --topic KafkaTest // トピック確認 bin/kafka-list-topic.sh --zookeeper 192.168.100.210:2181 topic: KafkaTest partition: 0 leader: 0 replicas: 0 isr: 0
作成できたのを確認したのでMessageのProduceを行う.
// メッセージ投入 /opt/kafka/bin/kafka-console-producer.sh --broker-list kafka:9092 --topic KafkaTest Test01 Test02 Test03
これで/home/kafka-logs/以下に作成したTopicのフォルダとlogが作成されているはずである.一応確認.
ls -l /home/kafka/kafka-logs/ 合計16 drwxr-xr-x. 2 root root 4096 2月 23 23:07 2014 KafkaTest-0 ・・・以下略・・・
これでProduce完了である.
Topic名の後ろ数字(今回の場合0)であるが,これはパーティションの数を示している.
今回は0のみであるが,もしパーティションが2であるならKafkaTest-0とKafkaTest-1が生成される.つまりパーティションがNであるならKafkaTestN-1までフォルダが作られる.
Consumer
最後にConsumerを用いてProduceしたデータをConsumeしてみる.
別ウィンドウを開いて以下のコマンドを実行
/opt/kafka/bin/kafka-console-consumer.sh --zookeeper zookeeper:2181 --topic KafkaTest --from-beginning // 以下出力 Test01 Test02 Test03
ProducerがProduceしたデータをConsumeすることができた.
-
- from-beginningは言葉どおりはじめからconsumeするオプション.
常時Consume,Produceしている様をみたい場合にはそのオプションを外してconsumerを起動.
Producerで常時書き込んでいけばリアルタイムでconsumeされる.
これで無事ProduceもConsumeも動きKafkaを利用することができた.
次回はプログラムから動かしてみる予定である.
トラブルシューティング
もしProduceやConsumeなどで「ホストへの経路がありません」などのエラーがでてしまった場合,それはiptablesで弾かれてしまっている.なので両サーバのiptablesをoffにする必要がある.(これは仮想なのでoffでいいが実サービスなどは正しく設定する必要がある)
どちらのサーバでも以下のコマンドを打つ.
/etc/rc.d/init.d/iptables stop chkconfig iptables off