Apache Kafka是一個分散式的訊息處理framework,
透過publish來發佈message,以及subscribe來訂閱取得message。
從架構上來看,Kafka可以分為下面幾種角色,
1. producer: 發佈(publish)message to topic
2. consumer: 訂閱(subscribe)topic以取得message
3. broker: 簡單的說就是server,由一台以上的broker組成一個cluster
4. topic: message的分類
5. zookeeper: 嚴格來說,zookeeper不算是Kafka的一部分,但Kafka卻得倚靠zookeeper來做到sync。
了解Kafka的每一個角色以後,
接下來要講怎麼安裝跟使用。
1. Install Kafka首先,先安裝Kafka,這裡選的版本是0.8.2。
Kafka的安裝很簡單,只有一個tarball,解開就好。
wget http://apache.stu.edu.tw/kafka/0.8.2.0/kafka_2.10-0.8.2.0.tgz
tar xvzf kafka_2.10-0.8.2.0.tgz
cd kafka_2.10-0.8.2.0/
2. Start zookeeper安裝完以後,在開始之前,
要先啟動zookeeper,原因是前面有說過,
Kafka倚靠zookeeper做message的sync。
bin/zookeeper-server-start.sh config/zookeeper.properties &
啟動以後,可以透過下面指令去觀察一下,
應該會看到2隻java在LISTEN,
有一隻的port預設一定是2181,預設值放在config/zookeeper.properties裡面。
這個port是給等等的其他broker連上來用的。
netstat -tnlp
3. Start Kafka server(broker)接著啟動三個kafka server,讓這三台server變成一個cluster,
為什麼要跑三台?因為Kafka還有replication的功能,所以順便玩一下。
然後我只有一台機器,所以我會讓這三台通通run在同一台上面。
在啟動之前,要先“複製“and“編輯”一個config檔案,
首先,先複製config,因為有三台,但是預設的config只有一個,所以要多複製二個出來。
cp config/server.properties config/server-2.properties
cp config/server.properties config/server-3.properties
然後去編輯config/server-2.properties以及server-3.properties這二個檔案,
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9092
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
只要改broker.id, port, log.dirs這三個key,
這三個都要是unique的value,所以就都往上+1吧,
例如server-2.properties的例子就是,
broker.id=1
port=9093
log.dirs=/tmp/kafka-logs-2
編輯完以後就分別啟動這三台broker吧。
bin/kafka-server-start.sh config/server.properties &
bin/kafka-server-start.sh config/server-2.properties &
bin/kafka-server-start.sh config/server-3.properties &
4. Create a topic有了server以後,接著要create一個topic,
你可以試著把replication的value改成4,你應該會失敗了,
且error msg應該是【4 larger than available brokers: 3】,因為我們剛剛只有start三台broker,所以無法複製四份。
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic test
接著你可以去查詢這個topic的資訊,
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
應該會看到下面的資訊,
Topic:test PartitionCount:1 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
ReplicationFactor是3份,
Replicate在0, 1, 2這三台broker上面,
topic的leader是broker 2,leader負責partition的read and write。
Isr的意思是有哪些broker正在sync,簡單的說可以知道哪些broker是活著的。
5. Publish: send message to topic接著就要開始寫訊息到topic裡面,
透過kafka提供的shell可以進行測試,執行以後,就可以直接輸入你要輸入的訊息,
輸入完以後,按下ctrl+C就可以離開。
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
...
test message 1
test message 2
^C
6. Subscribe: get message from topic接著就要讀取訊息,
kafka一樣有提供shell script讓我們使用,
應該就可以看到上一步驟所輸入的訊息了。
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
7. Advance這一步只是要觀察一下replication有無成功,
我們可以去stop該topic的leader,
以我的例子來說,test leader是broker 2,
所以我就去把broker 2關掉,
$ jobs
[1] Running bin/zookeeper-server-start.sh config/zookeeper.properties &
[2] Running bin/kafka-server-start.sh config/server.properties &
[3]- Running bin/kafka-server-start.sh config/server-1.properties &
[4]+ Running bin/kafka-server-start.sh config/server-2.properties &
$ fg 4
$ ^C
接著我們可以先去看該topic的leader會有什麼變化,應該會發現leader變別台broker了,
而且Isr會只剩下0,1。
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
此時我們一樣在去consume message,message應該還是會保存的完整無缺。
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic test
下一篇應該寫kafka+spark streaming吧!?