一,问题描述

Kafka是一种高吞吐量的分布式发布订阅的消息队列系统,原本开发自LinkedIn,用作LinkedIn的活动流(ActivityStream)和运营数据处理管道(Pipeline)的基础。现在它已被多家不同类型的公司作为多种类型的数据管道和消息系统使用。

搭建的用来测试的单节点Kafka集群(Zookeeper和Kafka
Broker都在同一台Ubuntu上),在命令行下使用:

1 Kafka消息队列简介

 ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic topicForTest

1.1 基本术语

  • Broker

    Kafka集群包含一个或多个服务器,这种服务器被称为broker[5] 

  • Topic

    每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

  • Partition

    Partition是物理上的概念,每个Topic包含一个或多个Partition.(一般为kafka节点数cpu的总核数)

  • Producer

    负责发布消息到Kafka broker

  • Consumer

    消息消费者,向Kafka broker读取消息的客户端。

  • Consumer Group

    每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group
    name,若不指定group name则属于默认的group)。

创建了一个3个分区的Topic如下:(Topic名称为
topicForTest)

1.2 消息队列

图片 1

1.2.1 基本特性

  1. 可扩展

    • 在不需要下线的情况下进行扩容
    • 数据流分区(partition)存储在多个机器上
  2. 高性能

    • 单个broker就能服务上千客户端
    • 单个broker每秒种读/写可达每秒几百兆字节
    • 多个brokers组成的集群将达到非常强的吞吐能力
    • 性能稳定,无论数据多大
    • Kafka在底层摒弃了Java堆缓存机制,采用了操作系统级别的页缓存,同时将随机写操作改为顺序写,再结合Zero-Copy的特性极大地改善了IO性能
  3. 持久存储

    • 存储在磁盘上
    • 冗余备份到其他服务器上以防止丢失

 

1.2.2 消息格式

  1. 一个topic对应一种消息格式,因此消息用topic分类
  2. 一个topic代表的消息有1个或者多个patition(s)组成
  3. 一个partition中
    • 一个partition应该存放在一到多个server上
      • 如果只有一个server,就没有冗余备份,是单机而不是集群
      • 如果有多个server
        • 一个server为leader,其他servers为followers;leader需要接受读写请求;followers仅作冗余备份;leader出现故障,会自动选举一个follower作为leader,保证服务不中断;每个server都可能扮演一些partitions的leader和其它partitions的follower角色,这样整个集群就会达到负载均衡的效果
    • 消息按顺序存放,顺序不可变
    • 只能追加消息,不能插入
    • 每个消息都有一个offset,用作消息ID, 在一个partition中唯一
    • offset有consumer保存和管理,因此读取顺序实际上是完全有consumer决定的,不一定是线性的
    • 消息有超时日期,过期则删除

使用
Console
producer/consumer都能够正常地向topicForTest发送和接收消息:

1.2.3 生产者 producer

  • producer将消息写入kafka
  • 写入要指定topic和partition
  • 消息如何分到不同的partition,算法由producer指定
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topicForTest
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicForTest --from-beginning

1.2.4 消费者 consumer

  • consumer读取消息并作处理
  • consumer group

    • 这个概念的引入为了支持两种场景:每条消息分发一个消费者,每条消息广播给消费组的所有消费者
    • 多个consumer
      group订阅一个topic,该topci的消息广播给group内所有consumer
    • 一条消息发送到一个consumer
      group后,只能由该group的一个consumer接收和使用
    • 一个group中的每个consumer对应一个partition可以带来如下好处
      • 可以按照partition的数目进行并发处理
      • 每个partition都只有一个consumer读取,因而保证了消息被处理的顺序是按照partition的存放顺序进行,注意这个顺序受到producer存放消息的算法影响
  •  一个Consumer可以有多个线程进行消费,线程数应不多于topic的partition数,因为对于一个包含一或多消费线程的consumer
    group来说,一个partition只能分给其中的一个消费线程消费,且让尽可能多的线程能分配到partition(不过实际上真正去消费的线程及线程数还是由线程池的调度机制来决定)。这样如果线程数比partition数多,那么单射分配也会有多出的线程,它们就不会消费到任何一个partition的数据而空转耗资源

  • 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
  • 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化

 

2. 安装和使用

以kafka_2.11-0.10.0.0为例。

下载解压后,进入kafka_2.11-0.10.0.0/

但是在自己的windows 机器的开发环境下,使用kafka
client JAVA API (0.10版本)中的KafkaConsumer
却无法接收消息,表现为:在poll()方法中阻塞了。

2.1 启动Zookeeper

测试时可以使用Kafka附带的Zookeeper:

启动: ./bin/zookeeper-server-start.sh config/zookeeper.properties
& ,config/zookeeper.properties是Zookeeper的配置文件。

结束: ./bin/zookeeper-server-stop.sh 

不过最好自己搭建一个Zookeeper集群,提高可用性和可靠性。详见:Zookeeper的安装和使用——MarchOn

更具体一点地,是在:org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient类的awaitMetadataUpdate方法中长时间阻塞了。类似问题可参考:这里

2.2 启动Kafka服务器

然而,在windows机器上,使用telnet
client 能够连接到 kafka broker 的9092默认端口。

2.2.1 配置文件

配置config/server.properties文件,一般需要配置如下字段,其他按默认即可:

图片 2

broker.id:          每一个broker在集群中的唯一表示,要求是正数
listeners(效果同之前的版本的host.name及port):注意绑定host.name,否则可能出现莫名其妙的错误如consumer找不到broker。这个host.name是Kafka的server的机器名字,会注册到Zookeeper中
log.dirs:           kafka数据的存放地址,多个地址的话用逗号分割,多个目录分布在不同磁盘上可以提高读写性能
log.retention.hours:    数据文件保留多长时间, 存储的最大时间超过这个时间会根据log.cleanup.policy设置数据清除策略
zookeeper.connect:     指定ZooKeeper的connect string,以hostname:port的形式,可有多个以逗号分隔,如hostname1:port1,hostname2:port2,hostname3:port3,还可有路径,如:hostname1:port1,hostname2:port2,hostname3:port3/kafka,注意要事先在zk中创建/kafka节点,否则会报出错误:java.lang.IllegalArgumentException: Path length must be > 0

图片 3

所有参数的含义及配置可参考:、

 一个配置示例如下:

图片 4

图片 5

  1 # Licensed to the Apache Software Foundation (ASF) under one or more
  2 # contributor license agreements.  See the NOTICE file distributed with
  3 # this work for additional information regarding copyright ownership.
  4 # The ASF licenses this file to You under the Apache License, Version 2.0
  5 # (the "License"); you may not use this file except in compliance with
  6 # the License.  You may obtain a copy of the License at
  7 #
  8 #    http://www.apache.org/licenses/LICENSE-2.0
  9 #
 10 # Unless required by applicable law or agreed to in writing, software
 11 # distributed under the License is distributed on an "AS IS" BASIS,
 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13 # See the License for the specific language governing permissions and
 14 # limitations under the License.
 15 # see kafka.server.KafkaConfig for additional details and defaults
 16 
 17 ############################# Server Basics #############################
 18 
 19 # The id of the broker. This must be set to a unique integer for each broker.
 20 broker.id=1
 21 
 22 ############################# Socket Server Settings #############################
 23 
 24 # The address the socket server listens on. It will get the value returned from 
 25 # java.net.InetAddress.getCanonicalHostName() if not configured.
 26 #   FORMAT:
 27 #     listeners = security_protocol://host_name:port
 28 #   EXAMPLE:
 29 #     listeners = PLAINTEXT://your.host.name:9092
 30 listeners=PLAINTEXT://192.168.6.128:9092
 31 
 32 # Hostname and port the broker will advertise to producers and consumers. If not set, 
 33 # it uses the value for "listeners" if configured.  Otherwise, it will use the value
 34 # returned from java.net.InetAddress.getCanonicalHostName().
 35 #advertised.listeners=PLAINTEXT://your.host.name:9092
 36 
 37 # The number of threads handling network requests
 38 num.network.threads=3
 39 
 40 # The number of threads doing disk I/O
 41 num.io.threads=8
 42 
 43 # The send buffer (SO_SNDBUF) used by the socket server
 44 socket.send.buffer.bytes=102400
 45 
 46 # The receive buffer (SO_RCVBUF) used by the socket server
 47 socket.receive.buffer.bytes=102400
 48 
 49 # The maximum size of a request that the socket server will accept (protection against OOM)
 50 socket.request.max.bytes=104857600
 51 
 52 
 53 ############################# Log Basics #############################
 54 
 55 # A comma seperated list of directories under which to store log files
 56 log.dirs=/usr/local/kafka/kafka_2.11-0.10.0.0/kfk_data/
 57 
 58 # The default number of log partitions per topic. More partitions allow greater
 59 # parallelism for consumption, but this will also result in more files across
 60 # the brokers.
 61 num.partitions=2
 62 auto.create.topics.enable=false
 63 
 64 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
 65 # This value is recommended to be increased for installations with data dirs located in RAID array.
 66 num.recovery.threads.per.data.dir=1
 67 
 68 ############################# Log Flush Policy #############################
 69 
 70 # Messages are immediately written to the filesystem but by default we only fsync() to sync
 71 # the OS cache lazily. The following configurations control the flush of data to disk.
 72 # There are a few important trade-offs here:
 73 #    1. Durability: Unflushed data may be lost if you are not using replication.
 74 #    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
 75 #    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
 76 # The settings below allow one to configure the flush policy to flush data after a period of time or
 77 # every N messages (or both). This can be done globally and overridden on a per-topic basis.
 78 
 79 # The number of messages to accept before forcing a flush of data to disk
 80 #log.flush.interval.messages=10000
 81 
 82 # The maximum amount of time a message can sit in a log before we force a flush
 83 #log.flush.interval.ms=1000
 84 
 85 ############################# Log Retention Policy #############################
 86 
 87 # The following configurations control the disposal of log segments. The policy can
 88 # be set to delete segments after a period of time, or after a given size has accumulated.
 89 # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
 90 # from the end of the log.
 91 
 92 # The minimum age of a log file to be eligible for deletion
 93 log.retention.hours=4
 94 
 95 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
 96 # segments don't drop below log.retention.bytes.
 97 #log.retention.bytes=1073741824
 98 
 99 # The maximum size of a log segment file. When this size is reached a new log segment will be created.
100 log.segment.bytes=1073741824
101 
102 # The interval at which log segments are checked to see if they can be deleted according
103 # to the retention policies
104 log.retention.check.interval.ms=300000
105 
106 ############################# Zookeeper #############################
107 
108 # Zookeeper connection string (see zookeeper docs for details).
109 # This is a comma separated host:port pairs, each corresponding to a zk
110 # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
111 # You can also append an optional chroot string to the urls to specify the
112 # root directory for all kafka znodes.
113 zookeeper.connect=192.168.6.131:2181,192.168.6.132:2181,192.168.6.133:2181
114 
115 # Timeout in ms for connecting to zookeeper
116 zookeeper.connection.timeout.ms=6000

图片 6

注意auto.create.topics.enable字段,若为true则如果producer写入某个不存在的topic时会自动创建该topic,若为false则需要事先创建否则会报错:failed
after 3 retries。

后面发现是Kafka
server中,配置文件 config/server.properties中
没有配置:advertised.host.name 或者 listener
参数。官网查了下这个参数的解释如下:

2.2.2 命令

启动: bin/kafka-server-start.sh config/server.properties ,生产环境最好以守护程序启动:nohup
 &

结束: bin/kafka-server-stop.sh 

advertised.host.name
Hostname to publish to ZooKeeper for clients to use. If this is not set, it will use the value for `host.name` if configured.
 Otherwise it will use the value returned from java.net.InetAddress.getCanonicalHostName().

advertised.listeners
Listeners to publish to ZooKeeper for clients to use, if different than the listeners above.If this is not set, the value for `listeners` will be used.

2.2.3 Kafka在Zookeeper中的存储结构

若上述的zookeeper.connect的值没有路径,则为根路径,启动Zookeeper和Kafka,命令行连接Zookeeper后,用
get /
命令可发现有 consumers、config、controller、admin、brokers、zookeeper、controller_epoch
这几个目录。

其结构如下:(具体可参考:apache
kafka系列之在zookeeper中存储结构)

图片 7

 

 

2.3 使用

kafka本身是和zookeeper相连的,而对应producer和consumer的状态保存也都是通过zookeeper完成的。对Kafka的各种操作通过其所连接的Zookeeper完成。

这里的原因是:
JAVA API中的kafkaConsumer找不到Zookeeper去获取元数据信息。

2.3.1 命令行客户端

创建topic: bin/kafka-topics.sh  –create
 –zookeeper  localhost:2181  –replication-factor 1  –partitions  1
 –topic test 

列出所有topic: bin/kafka-topics.sh –list
–zookeeper localhost:2181 

查看topic信息(包括分区、副本情况等): kafka-topics.sh –describe –zookeeper
localhost:2181 –topic
my-replicated-topic ,会列出分区数、副本数、副本leader节点、副本节点、活着的副本节点

往某topic生产消息: bin/kafka-console-producer.sh –broker-list
localhost:9092 –topic test 

从某topic消费消息: bin/kafka-console-consumer.sh –zookeeper
localhost:2181 –topic test
–from-beginning (默认用一个线程消费指定topic的所有分区的数据)

删除某个Kafka
groupid:连接Zookeeper后用rmr命令,如删除名为JSI的消费组: rmr /consumers/JSI 

查看消费进度:

./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group test-mirror-consumer-zsm --zkconnect ec2-12345.cn-north-1.compute.amazonaws.com.cn:2181/kafka/blink/0822 --topic GPS2
    各参数:
    --group指MirrorMaker消费源集群时指定的group.id
    -zkconnect指源集群的zookeeper地址
    --topic指定查的topic,没指定则返回所有topic的消费情况
The first time you call poll() with a new consumer, it is responsible for finding the GroupCoordinator, 
joining the consumer group and receiving a partition assignment.

2.3.2 Java客户端

1、Topic操作:

图片 8

图片 9

 1 import kafka.admin.DeleteTopicCommand;
 2 import kafka.admin.TopicCommand;
 3 
 4 /**
 5  * @author zsm
 6  * @date 2016年9月27日 上午10:26:42
 7  * @version 1.0
 8  * @parameter
 9  * @since
10  * @return
11  */
12 public class JTopic {
13     public static void createTopic(String zkAddr, String topicName, int partition, int replication) {
14         String[] options = new String[] { "--create", "--zookeeper", zkAddr, "--topic", topicName, "--partitions",
15                 partition + "", "--replication-factor", replication + "" };
16         TopicCommand.main(options);
17     }
18 
19     public static void listTopic(String zkAddr) {
20         String[] options = new String[] { "--list", "--zookeeper", zkAddr };
21         TopicCommand.main(options);
22     }
23 
24     public static void describeTopic(String zkAddr, String topicName) {
25         String[] options = new String[] { "--describe", "--zookeeper", zkAddr, "--topic", topicName, };
26         TopicCommand.main(options);
27     }
28 
29     public static void alterTopic(String zkAddr, String topicName) {
30         String[] options = new String[] { "--alter", "--zookeeper", zkAddr, "--topic", topicName, "--partitions", "5" };
31         TopicCommand.main(options);
32     }
33 
34     // 通过删除zk里面对应的路径来实现删除topic的功能,只会删除zk里面的信息,Kafka上真实的数据并没有删除
35     public static void deleteTopic(String zkAddr, String topicName) {
36         String[] options = new String[] { "--zookeeper", zkAddr, "--topic", topicName };
37         DeleteTopicCommand.main(options);
38     }
39 
40     public static void main(String[] args) {
41         // TODO Auto-generated method stub
42 
43         String myTestTopic = "ZsmTestTopic";
44         int myPartition = 4;
45         int myreplication = 1;
46 
47         //createTopic(ConfigureAPI.KafkaProperties.ZK, myTestTopic, myPartition, myreplication);
48         // listTopic(ConfigureAPI.KafkaProperties.ZK);
49         describeTopic(ConfigureAPI.KafkaProperties.ZK, myTestTopic);
50         // alterTopic(ConfigureAPI.KafkaProperties.ZK, myTestTopic);
51         // deleteTopic(ConfigureAPI.KafkaProperties.ZK, myTestTopic);
52     }
53 
54 }

图片 10

2、写:(写时可以指定key以供Kafka根据key将数据写入某个分区,若无指定,则几乎就是随机找一个分区发送无key的消息,然后把这个分区号加入到缓存中以备后面直接使用——当然,Kafka本身也会清空该缓存(默认每10分钟或每次请求topic元数据时))

图片 11

图片 12

  1 package com.zsm.kfkdemo;
  2 
  3 import java.util.ArrayList;
  4 import java.util.List;
  5 import java.util.Properties;
  6 
  7 import com.zsm.kfkdemo.ConfigureAPI.KafkaProperties;
  8 
  9 import kafka.javaapi.producer.Producer;
 10 import kafka.producer.KeyedMessage;
 11 import kafka.producer.ProducerConfig;
 12 
 13 /**
 14  * 可以指定规则(key和分区函数)以让消息写到特定分区:
 15  * <p>
 16  * 1、若发送的消息没有指定key则Kafka会随机选择一个分区
 17  * </p>
 18  * <p>
 19  * 2、否则,若指定了分区函数(通过partitioner.class)则该函数以key为参数确定写到哪个分区
 20  * </p>
 21  * <p>
 22  * 3、否则,Kafka根据hash(key)%partitionNum确定写到哪个分区
 23  * </p>
 24  * 
 25  * @author zsm
 26  * @date 2016年9月27日 上午10:26:42
 27  * @version 1.0
 28  * @parameter
 29  * @since
 30  * @return
 31  */
 32 public class JProducer extends Thread {
 33     private Producer<String, String> producer;
 34     private String topic;
 35     private final int SLEEP = 10;
 36     private final int msgNum = 1000;
 37 
 38     public JProducer(String topic) {
 39         Properties props = new Properties();
 40         props.put("metadata.broker.list", KafkaProperties.BROKER_LIST);// 如192.168.6.127:9092,192.168.6.128:9092
 41         // request.required.acks
 42         // 0, which means that the producer never waits for an acknowledgement from the broker (the same behavior as 0.7). This option provides the lowest latency but the weakest durability guarantees
 43         // (some data will be lost when a server fails).
 44         // 1, which means that the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server
 45         // acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost).
 46         // -1, which means that the producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the best durability, we guarantee that no messages will be
 47         // lost as long as at least one in sync replica remains.
 48         props.put("request.required.acks", "-1");
 49         // 配置value的序列化类
 50         props.put("serializer.class", "kafka.serializer.StringEncoder");
 51         // 配置key的序列化类
 52         props.put("key.serializer.class", "kafka.serializer.StringEncoder");
 53         // 提供自定义的分区函数将消息写到分区上,未指定的话Kafka根据hash(messageKey)%partitionNum确定写到哪个分区
 54         props.put("partitioner.class", "com.zsm.kfkdemo.MyPartitioner");
 55         producer = new Producer<String, String>(new ProducerConfig(props));
 56         this.topic = topic;
 57     }
 58 
 59     @Override
 60     public void run() {
 61         boolean isBatchWriteMode = true;
 62         System.out.println("isBatchWriteMode: " + isBatchWriteMode);
 63         if (isBatchWriteMode) {
 64             // 批量发送
 65             int batchSize = 100;
 66             List<KeyedMessage<String, String>> msgList = new ArrayList<KeyedMessage<String, String>>(batchSize);
 67             for (int i = 0; i < msgNum; i++) {
 68                 String msg = "Message_" + i;
 69                 msgList.add(new KeyedMessage<String, String>(topic, i + "", msg));
 70                 // msgList.add(new KeyedMessage<String, String>(topic, msg));//未指定key,Kafka会自动选择一个分区
 71                 if (i % batchSize == 0) {
 72                     producer.send(msgList);
 73                     System.out.println("Send->[" + msgList + "]");
 74                     msgList.clear();
 75                     try {
 76                         sleep(SLEEP);
 77                     } catch (Exception ex) {
 78                         ex.printStackTrace();
 79                     }
 80                 }
 81             }
 82             producer.send(msgList);
 83         } else {
 84             // 单个发送
 85             for (int i = 0; i < msgNum; i++) {
 86                 KeyedMessage<String, String> msg = new KeyedMessage<String, String>(topic, i + "", "Message_" + i);
 87                 // KeyedMessage<String, String> msg = new KeyedMessage<String, String>(topic, "Message_" + i);//未指定key,Kafka会自动选择一个分区
 88                 producer.send(msg);
 89                 System.out.println("Send->[" + msg + "]");
 90                 try {
 91                     sleep(SLEEP);
 92                 } catch (Exception ex) {
 93                     ex.printStackTrace();
 94                 }
 95             }
 96         }
 97 
 98         System.out.println("send done");
 99     }
100 
101     public static void main(String[] args) {
102         JProducer pro = new JProducer(KafkaProperties.TOPIC);
103         pro.start();
104     }
105 }

图片 13

3、读:(对于Consumer,需要注意 auto.commit.enable 和 auto.offset.reset 这两个字段)

图片 14

图片 15

 1 package com.zsm.kfkdemo;
 2 
 3 import java.text.MessageFormat;
 4 import java.util.HashMap;
 5 import java.util.List;
 6 import java.util.Map;
 7 import java.util.Properties;
 8 
 9 import com.zsm.kfkdemo.ConfigureAPI.KafkaProperties;
10 
11 import kafka.consumer.Consumer;
12 import kafka.consumer.ConsumerConfig;
13 import kafka.consumer.ConsumerIterator;
14 import kafka.consumer.KafkaStream;
15 import kafka.javaapi.consumer.ConsumerConnector;
16 import kafka.message.MessageAndMetadata;
17 
18 /**
19  * 同一consumer group的多线程消费可以两种方法实现:
20  * <p>
21  * 1、实现单线程客户端,启动多个去消费
22  * </p>
23  * <p>
24  * 2、在客户端的createMessageStreams里为topic指定大于1的线程数,再启动多个线程处理每个stream
25  * </p>
26  * 
27  * @author zsm
28  * @date 2016年9月27日 上午10:26:42
29  * @version 1.0
30  * @parameter
31  * @since
32  * @return
33  */
34 public class JConsumer extends Thread {
35 
36     private ConsumerConnector consumer;
37     private String topic;
38     private final int SLEEP = 20;
39 
40     public JConsumer(String topic) {
41         consumer = Consumer.createJavaConsumerConnector(this.consumerConfig());
42         this.topic = topic;
43     }
44 
45     private ConsumerConfig consumerConfig() {
46         Properties props = new Properties();
47         props.put("zookeeper.connect", KafkaProperties.ZK);
48         props.put("group.id", KafkaProperties.GROUP_ID);
49         props.put("auto.commit.enable", "true");// 默认为true,让consumer定期commit offset,zookeeper会将offset持久化,否则只在内存,若故障则再消费时会从最后一次保存的offset开始
50         props.put("auto.commit.interval.ms", KafkaProperties.INTERVAL + "");// 经过INTERVAL时间提交一次offset
51         props.put("auto.offset.reset", "largest");// What to do when there is no initial offset in ZooKeeper or if an offset is out of range
52         props.put("zookeeper.session.timeout.ms", KafkaProperties.TIMEOUT + "");
53         props.put("zookeeper.sync.time.ms", "200");
54         return new ConsumerConfig(props);
55     }
56 
57     @Override
58     public void run() {
59         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
60         topicCountMap.put(topic, new Integer(1));// 线程数
61         Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumer.createMessageStreams(topicCountMap);
62         KafkaStream<byte[], byte[]> stream = streams.get(topic).get(0);// 若上面设了多个线程去消费,则这里需为每个stream开个线程做如下的处理
63 
64         ConsumerIterator<byte[], byte[]> it = stream.iterator();
65         MessageAndMetadata<byte[], byte[]> messageAndMetaData = null;
66         while (it.hasNext()) {
67             messageAndMetaData = it.next();
68             System.out.println(MessageFormat.format("Receive->[ message:{0} , key:{1} , partition:{2} , offset:{3} ]",
69                     new String(messageAndMetaData.message()), new String(messageAndMetaData.key()),
70                     messageAndMetaData.partition() + "", messageAndMetaData.offset() + ""));
71             try {
72                 sleep(SLEEP);
73             } catch (Exception ex) {
74                 ex.printStackTrace();
75             }
76         }
77     }
78 
79     public static void main(String[] args) {
80         JConsumer con = new JConsumer(KafkaProperties.TOPIC);
81         con.start();
82     }
83 }

图片 16

与Kafka相关的Maven依赖:

图片 17

图片 18

 1         <dependency>
 2             <groupId>org.apache.kafka</groupId>
 3             <artifactId>kafka_2.9.2</artifactId>
 4             <version>0.8.1.1</version>
 5             <exclusions>
 6                 <exclusion>
 7                     <groupId>com.sun.jmx</groupId>
 8                     <artifactId>jmxri</artifactId>
 9                 </exclusion>
10                 <exclusion>
11                     <groupId>com.sun.jdmk</groupId>
12                     <artifactId>jmxtools</artifactId>
13                 </exclusion>
14                 <exclusion>
15                     <groupId>javax.jms</groupId>
16                     <artifactId>jms</artifactId>
17                 </exclusion>
18             </exclusions>
19         </dependency>

图片 19

 

 

3 MirrorMaker

Kafka自身提供的MirrorMaker工具用于把一个集群的数据同步到另一集群,其原理就是对源集群消费、对目标集群生产。

运行时需要指定源集群的Zookeeper地址(pull模式)或目标集群的Broker列表(push模式)。

使用bin/kafka-verifiable-producer.sh
–topic topicForTest –max-messages 200 –broker-list localhost:9092
向该Topic中写入200条消息。启动下面的程序测试:

3.1 使用

运行 ./kafka-run-class.sh kafka.tools.MirrorMaker
–help 查看使用说明,如下:

图片 20

图片 21

 1 Option                                  Description                            
 2 ------                                  -----------                            
 3 --blacklist <Java regex (String)>       Blacklist of topics to mirror.         
 4 --consumer.config <config file>         Consumer config to consume from a      
 5                                           source cluster. You may specify      
 6                                           multiple of these.                   
 7 --help                                  Print this message.                    
 8 --num.producers <Integer: Number of     Number of producer instances (default: 
 9   producers>                              1)                                   
10 --num.streams <Integer: Number of       Number of consumption streams.         
11   threads>                                (default: 1)                         
12 --producer.config <config file>         Embedded producer config.              
13 --queue.size <Integer: Queue size in    Number of messages that are buffered   
14   terms of number of messages>            between the consumer and producer    
15                                           (default: 10000)                     
16 --whitelist <Java regex (String)>       Whitelist of topics to mirror.

图片 22

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class ConsumerTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.121.34:9092");
        props.put("group.id", "mygroup");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "earliest");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
        kafkaConsumer.subscribe(Arrays.asList("topicForTest"));

        while(true)
        {
            System.out.println("nothing available...");
            ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
            for(ConsumerRecord<String, String> record : records)
            {
                System.out.printf("offset = %d, value = %s", record.offset(), record.value());
                System.out.println();
            }
        }
    }
}

3.2 启动

./bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config  zsmSourceClusterConsumer.config  --num.streams 2 --producer.config zsmTargetClusterProducer.config --whitelist="ds*"
    --consumer.config所指定的文件里至少需要有zookeeper.connect、group.id两字段
    --producer.config至少需要有metadata.broker.list字段,指定目标集群的brooker列表
    --whitelist指定要同步的topic

可以用2.3.1所说的查看消费进度来查看对原集群的同步状况(即消费状况)。

 

 

4 Kafka监控工具(KafkaOffsetMonitor)

可以借助KafkaOffsetMonitor来图形化展示Kafka的broker节点、topic、consumer及offset等信息。

以KafkaOffsetMonitor-assembly-0.2.0.jar为例,下载后执行:

图片 23

#!/bin/bash
java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.2.0.jar \
     com.quantifind.kafka.offsetapp.OffsetGetterWeb \
     --zk  192.168.5.131:2181,192.168.6.132:2181,192.168.6.133:2181 \
     --port 8087 \
     --refresh 10.seconds \
     --retain 1.days 1>./zsm-logs/stdout.log 2>./zsm-logs/stderr.log &

图片 24

其中,zk按照host1:port1,host2:port2…的格式去写即可,port为开启web界面的端口号,refresh为刷新时间,retain为数据保留时间(单位seconds,
minutes, hours, days)

 

程序抛出的DEBUG异常如下:

5 Kafka集群管理工具(Kafka Manager)

kafka-manager是yahoo开源出来的项目,属于商业级别的工具用Scala编写。

这个管理工具可以很容易地发现分布在集群中的哪些topic分布不均匀,或者是分区在整个集群分布不均匀的的情况。它支持管理多个集群、选择副本、副本重新分配以及创建Topic。同时,这个管理工具也是一个非常好的可以快速浏览这个集群的工具。

此工具以集群的方式运行,需要Zookeeper。

参考资料:

2017-08-17 18:14:48.210 [main] INFO  o.a.kafka.common.utils.AppInfoParser SEQ - Kafka version : 0.10.1.0
2017-08-17 18:14:48.210 [main] INFO  o.a.kafka.common.utils.AppInfoParser SEQ - Kafka commitId : 3402a74efb23d1d4
2017-08-17 18:14:48.211 [main] DEBUG o.a.k.clients.consumer.KafkaConsumer SEQ - Kafka consumer created
2017-08-17 18:14:48.212 [main] DEBUG o.a.k.clients.consumer.KafkaConsumer SEQ - Subscribed to topic(s): topicForTest
2017-08-17 18:14:48.212 [main] DEBUG o.a.k.c.c.i.AbstractCoordinator SEQ - Sending coordinator request for group group_test109 to broker xxx:9092 (id: -1 rack: null)
.....
.....
2017-08-17 18:14:48.274 [main] DEBUG o.a.kafka.common.network.Selector SEQ - Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
2017-08-17 18:14:48.275 [main] DEBUG o.apache.kafka.clients.NetworkClient SEQ - Completed connection to node -1
2017-08-17 18:14:48.337 [main] DEBUG o.apache.kafka.clients.NetworkClient SEQ - Sending metadata request {topics=[topicForTest]} to node -1
2017-08-17 18:14:48.396 [main] DEBUG org.apache.kafka.clients.Metadata SEQ - Updated cluster metadata version 2 to Cluster(id = xgdvTIvHTn2dL3cnEm-dRQ, nodes = [ubuntu:9092 (id: 0 rack: null)], partitions = [Partition(topic = topicForTest,partition = 0, leader = 0, replicas = [0,], isr = [0,])])
2017-08-17 18:14:48.398 [main] DEBUG o.a.k.c.c.i.AbstractCoordinator SEQ - Received group coordinator response ClientResponse(receivedTimeMs=1502964888398, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@144d0b84, request=RequestSend(header={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1}, body={group_id=group_test109}), createdTimeMs=1502964888230, sendTimeMs=1502964888338), responseBody={error_code=0,coordinator={node_id=0,host=ubuntu,port=9092}})
2017-08-17 18:14:48.399 [main] INFO  o.a.k.c.c.i.AbstractCoordinator SEQ - Discovered coordinator ubuntu:9092 (id: 2147483647 rack: null) for group group_test109.
2017-08-17 18:14:48.399 [main] DEBUG o.apache.kafka.clients.NetworkClient SEQ - Initiating connection to node 2147483647 at ubuntu:9092.
2017-08-17 18:14:51.127 [main] DEBUG o.apache.kafka.clients.NetworkClient SEQ - Error connecting to node 2147483647 at ubuntu:9092:
java.io.IOException: Can't resolve address: ubuntu:9092
    at org.apache.kafka.common.network.Selector.connect(Selector.java:180) ~[kafka-clients-0.10.1.0.jar:na]
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:498) [kafka-clients-0.10.1.0.jar:na]
    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:159) [kafka-clients-0.10.1.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:454) [kafka-clients-0.10.1.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$GroupCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:556) [kafka-clients-0.10.1.0.jar:na]

....[main] INFO o.a.k.c.c.i.AbstractCoordinator SEQ - Marking the coordinator ubuntu:9092 (id: 2147483647 rack: null) dead for group xxx

5.1 安装

需要从Github下载源码并安装sbt工具编译生成安装包,生成的时间很长且不知为何一直出错,所以这里用网友已编译好的包 (备份链接)。

包为kafka-manager-1.0-SNAPSHOT.zip

>解压:

 unzip kafka-manager-1.0-SNAPSHOT.zip 

>配置conf/application.conf里的kafka-manager.zkhosts:

 kafka-manager.zkhosts=”192.168.6.131:2181,192.168.6.132:2181,192.168.6.133:2181″ 

>启动:

 ./bin/kafka-manager
-Dconfig.file=conf/application.conf (启动后在Zookeeper根目录下可发现增加了kafka-manager目录)

默认是9000端口,要使用其他端口可以在命令行指定http.port,此外kafka-manager.zkhosts也可以在命令行指定,如:

 ./bin/kafka-manager -Dhttp.port=9001
-Dkafka-manager.zkhosts=”192.168.6.131:2181,192.168.6.132:2181,192.168.6.133:2181″ 

 

5.2 使用

访问web页面,在Cluster->Add
Cluster,输入要监控的Kafka集群的Zookeeper即可。

再来看ubuntu上的etc/hosts文件:

6 进阶

  • 在当前的kafka版本实现中,对于zookeeper的所有操作都是由kafka
    controller来完成的(serially的方式)
  • offset管理:kafka会记录offset到zk中。但是,zk client
    api对zk的频繁写入是一个低效的操作。0.8.2 kafka引入了native offset
    storage,将offset管理从zk移出,并且可以做到水平扩展。其原理就是利用了kafka的compacted
    topic,offset以consumer
    group,topic与partion的组合作为key直接提交到compacted
    topic中。同时Kafka又在内存中维护了三元组来维护最新的offset信息,consumer来取最新offset信息时直接从内存拿即可。当然,kafka允许你快速checkpoint最新的offset信息到磁盘上。
  • 如何确定分区数:分区数的确定与硬件、软件、负载情况等都有关,要视具体情况而定,不过依然可以遵循一定的步骤来尝试确定分区数:创建一个只有1个分区的topic,然后测试这个topic的producer吞吐量和consumer吞吐量。假设它们的值分别是Tp和Tc,单位是MB/s。然后假设总的目标吞吐量是Tt,那么分区数
    =  Tt / max(Tp, Tc)

 

引用:

127.0.0.1 ubuntu localhost
127.0.1.1 ubuntu localhost

 

因此,只需要在config/server.properties里面配置
listeners 参数就可以了。

listeners=PLAINTEXT://your.host.name:9092

 

二,关于Kafka的一些简单理解

①目录结构

前面testForTopic一共有三个分区,因此在
log.dirs目录下关于该Topic一共有三个目录,每个目录下内容如下:

图片 25

 

使用命令:./bin/kafka-topics.sh
–list –zookeeper localhost:2181 可以查看当前Topic信息。

使用命令:./bin/kafka-consumer-groups.sh
–list –bootstrap-server YOUR_IP_ADDRESS:9092 可以查看consumer
group的信息

(如果提示:Error:
Executing consumer group command failed due to Request METADATA failed
on brokers List(ubuntu:9092 (id: -1 rack:
null)))(ip地址/主机名/localhost 试试?)

使用命令:./bin/kafka-consumer-groups.sh
–bootstrap-server YOUR_IP_ADDRESS:9092 –describe –group
groupName  查看某个具体的group的情况

图片 26

 


Topic 、Partition、 ConsumerGroup、Consumer 之间的一些关系 

一个Topic一般会
分为 多个
分区(Partition),生产者可以同时向这个Topic的多个分区写入消息,而消费者则以
组 为单位,订阅这个Topic,消费者组里面的 某个消费者 负责 消费
某个Partition。 感觉 Topic 像是逻辑上的概念。

一般是订阅了同一Topic的若干个Consumer
属于某个ConsumerGroup。对于一个ConsumerGroup而言,其中的某个Consumer负责消费某个Partition,则该Partition中的消息就不会被其他的Consumer消费了。如下图:

图片 27

ⓐTopic
T1有四个分区,即TopicT1中的消息存储在这四个分区中,它被ConsumerGroup1
这个组中的消费者订阅,其中Consumer1负责消费Partition0和2,Consumer2负责消费Partition1和3。正常情况下,Topic
T1中被ConsumerGroup中的消费者 消费一次,也即:TopicT1中的某条消息被Consumer1消费了,就不会被Consumer2消费—对于ConsumerGroup组内成员而言,Consumer1消费了
消息A,Consumer2就不会消费 消息A了。

若要想让TopicT1中的消费被多个
消费者消费,可以再创建一个
消费者组ConsumerGroup2
,ConsumerGroup2中的消费者 去订阅TopicT1
即可。如下图:TopicT1中的消息,都会被消费2次,一次是ConsumerGroup1中的消费者消费;另一次是被ConsumerGroup2中的消费者消费。

图片 28

每个ConsumerGroup里面有个 group leader。group
leader一般是最先加入到该消费者组的 消费者。group leader从 group
coordinator那里接受分区信息,然后分配给各个consumer去订阅。

 

When a consumer wants to join a group, it sends a JoinGroup request to the group coordinator. The first consumer to join the
group becomes the group leader. The leader receives a list of all consumers in the group from the group coordinator and it is responsible for assigning a subset of
partitions to each consumer

 

ⓑConsumerGroup中消费者数量大于
Topic中的分区数量,则某个消费者 将没有 Partition
可消费。如下图:Consumer5,消费不到 任何消息。

图片 29

 

Partition
rebalance:

从上面图片中可看出,消息的消费是以
Partition为单位的。若,ConsumerGroup新增了
几个消费者,或者减少了几个消费者,那么Kafka
Broker就会重新分配Partition给Consumer。这个重新分配的过程就是
rebalance。比如说,ConsumerA
正在消费PartitionA,某个原因ConsumerA挂了,PartitionA中的消息就没有Consumer消费了。因此Broker发现ConsumerA挂了之后,就要把PartitionA交给另外还存活的Consumer去消费。

The event in which partition ownership is moved from one consumer to another is called a rebalance

rebalance过程会有很多问题,比如:1,在
rebalance这个过程中,Conusmer是不能消费消息的。

During a rebalance, consumers can’t consume messaged, so a rebalance is in effect a short window of unavailability on the entire consumer group

 

2,会造成消息被重复消费。比如ConsumerA
得到了 PartitionA 的几条消息,进行了一定的处理,然后还未 来得及
向Broker 确认它消费完了这几条消息(未commit),它就挂了。Broker
rebalance之后,把PartitionA 交给了ComsumerB订阅,那么 ConsumerB 也会得到
 ConsumerA 处理了
但未提交 
的 那几条消息。那这几条消息 就被
重复消费了。

3,Broker是如何发现Consumer挂了的呢?

这是通过KafkaConsumer
中的poll(long )方法实现的。

 

③KafkaConsumer
的 poll(long )方法

poll方法干了哪些事儿?coordination、分区平衡、consumer与broker之间心跳包
keep alive、获取消息…

Once the consumer subscribes to topics, the poll loop handles all details of coordination, partition rebalances, heartbeats and data fetching

消费者必须不停地
执行 poll
方法,一是不断地从kafka那里获得消息,另一个是告诉kafka,我没有发生故障,与
broker是 keep alive的。

consumers must keep polling Kafka or they will be considered dead and the partitions they are consuming will be handed to another
consumer in the group to continue consuming.

 

poll(long
)方法有一个 long 类型的参数,这些参数受 consumer
参数配置的影响,也与具体的应用 如何 处理消息 有关。

This specifies how long it will take poll to return, with or without data. The value is
typically driven by application needs for quick responses - how fast do you want
to return control to the thread that does the polling?

 

消费者消费完消息后,不再消费了,要记得关闭。因为,consumer要离开了,那么就会造成
rebalance,consumer.close() 使得consumer主动 通知 Group Coordinator 进行
rebalance,而不是靠 GroupCoordinator去等待一段时间发现
Consumer离开了(Consumer不再执行poll方法了),然后再进行
rebalance。

consumer.close();

 

④Kafka
中的一些配置参数

Broker的配置参数;Producer的配置参数;Consumer的配置参数

auto.commit.interval.ms   The frequency in
ms that the consumer offsets are committed to zookeeper.(consumer
隔多久提交 offsets –消费指针)

group.id    A unique string that
identifies the Connect cluster group this worker belongs to.

heartbeat.interval.ms    

session.timeout.ms 
….这些参数的设置与具体的应用相关,也会影响
rebalance时机,具体不是太了解。

具体的配置参数可参考:Kafka配置参数解释。

参考文献:

书籍:Kafka_
The Definitive Guide

Zookeeper中Kafka元数据解释

 

 原文:

相关文章