技術メモ(仮)

IT系の話や研究,他のことなど話して行けたらいいな~って感じです.ただいまJavaを学習中

Kafkaを動かしてみた

Apache Kafkaを実際に動かしてみた.
Kafkaって何?って方はApache Kafkaについて - 技術メモ(仮)を参照してくれると嬉しい.
今回はproduceとconsumeの挙動を見てみた.
Kafkaを利用するには以下の環境が必要.

  • Java(JDK6と7で動くことは確認)
  • Zookeeper(3.3.4以降ならOKらしい)

今回仮想で以下の2台のサーバを作成した.

  • Kafka(ホスト名:kafka)
  • Zookeeper(ホスト名:zookeeper)

これらを利用しKafkaを動かしてみる.

Zookeeper

以下の処理はホスト名:zookeeperのサーバで行う.

インストール

まずZookeeperをzookeeperサーバにインストールする.

// JDKインストール
yum install java-1.7.0-openjdk-devel

// Zookeeperインストール
wget http://archive.cloudera.com/cdh4/redhat/6/x86_64/cdh/cdh4-repository-1-0.noarch.rpm
rpm -ivh cdh4-repository-1-0.noarch.rpm
yum install zookeeper-server

これでひとまずZookeeperをインストールすることができた.

設定

/usr/lib/zookeeper/conf/zoo.cfgをvimとかで以下のように編集する.

# the port at which the clients will connect
clientPort=2181
server.1=<zookeeperサーバのIP>:2182:2183 # 追加(インストール先のサーバアドレス)

上記の設定は別にホスト名でもいい.

起動

// Zookeeper起動
service zookeeper-server start

これでZookeeperの設定は完了.
次にKafkano設定を行う.

Kafka

以下の処理はホスト名:kafkaで行う.
今回は0.8系を試してみた.

インストール

// JDKインストール
yum install -y java-1.7.0-openjdk-devel

// KafkaをDLして展開
cd /opt
wget http://ftp.riken.jp/net/apache/kafka/0.8.0/kafka_2.8.0-0.8.0.tar.gz
tar xvzf kafka_2.8.0-0.8.0.tar.gz
mv kafka_2.8.0-0.8.0 kafka

設定

/opt/kafka/config/server.properties

# Hostname the broker will bind to and advertise to producers and consumers.
# If not set, the server will bind to all interfaces and advertise the value returned from
# from java.net.InetAddress.getCanonicalHostName().
host.name=kafka # 変更:ホスト名設定

# The directory under which to store log files
log.dir=/home/kafka/kafka-logs #変更:/tmp/kafka-logs → /home/kafka/kafka-logs

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=<zookeeperのIPアドレス>:2181 #変更:localhost → zookeeperのIPアドレス

起動

/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties &

これでKafkaの準備ができたので次に実際にProduceしてみる.

Producer

ProduceするためにTopicの作成を行う.

// トピック作成
cd /opt/kafka
bin/kafka-create-topic.sh --zookeeper 192.168.100.210:2181 --replica 1 --partition 1 --topic KafkaTest
// トピック確認
bin/kafka-list-topic.sh --zookeeper 192.168.100.210:2181
topic: KafkaTest        partition: 0    leader: 0       replicas: 0     isr: 0

作成できたのを確認したのでMessageのProduceを行う.

// メッセージ投入
/opt/kafka/bin/kafka-console-producer.sh --broker-list kafka:9092 --topic KafkaTest

Test01
Test02
Test03

これで/home/kafka-logs/以下に作成したTopicのフォルダとlogが作成されているはずである.一応確認.

ls -l /home/kafka/kafka-logs/
合計16
drwxr-xr-x. 2 root root 4096  2月 23 23:07 2014 KafkaTest-0
・・・以下略・・・

これでProduce完了である.
Topic名の後ろ数字(今回の場合0)であるが,これはパーティションの数を示している.
今回は0のみであるが,もしパーティションが2であるならKafkaTest-0とKafkaTest-1が生成される.つまりパーティションがNであるならKafkaTestN-1までフォルダが作られる.

Consumer

最後にConsumerを用いてProduceしたデータをConsumeしてみる.
別ウィンドウを開いて以下のコマンドを実行

/opt/kafka/bin/kafka-console-consumer.sh --zookeeper zookeeper:2181 --topic KafkaTest --from-beginning

// 以下出力
Test01
Test02
Test03

ProducerがProduceしたデータをConsumeすることができた.

    • from-beginningは言葉どおりはじめからconsumeするオプション.

常時Consume,Produceしている様をみたい場合にはそのオプションを外してconsumerを起動.
Producerで常時書き込んでいけばリアルタイムでconsumeされる.
これで無事ProduceもConsumeも動きKafkaを利用することができた.
次回はプログラムから動かしてみる予定である.

トラブルシューティング

もしProduceやConsumeなどで「ホストへの経路がありません」などのエラーがでてしまった場合,それはiptablesで弾かれてしまっている.なので両サーバのiptablesをoffにする必要がある.(これは仮想なのでoffでいいが実サービスなどは正しく設定する必要がある)
どちらのサーバでも以下のコマンドを打つ.

/etc/rc.d/init.d/iptables stop
chkconfig iptables off

参考にしたサイト

ペンギン様:http://d.hatena.ne.jp/kimutansk/20130804/1375614109