kafka 相关概念和使用
介绍
kafka 是一个开源消息系统,使用 scala 编写。类似于 JMS , 但不是 JMS 的规范实现。
相关概念
Producer
Consumer
Topic
一个特定的主题,或者理解为存储相同消息类型的一个或多个队列
Consumer Group
一个 topic 可以有多个 group, 每个 group 可以有一到多个 consumer. topic 的消息在逻辑上会复制到多个 consumer group 上。如果一个 group 有多个 consumer. 每条消息只会被其中一个 consumer 消费。如果一个 topic 有多个 group, 每个 group 都会得到相同的消息。
Broker 一台 kafka 服务器。
Partition
为了实现扩展性,一个非常大的topic可以分布到多个broker上,一个topic可以分为多个partition,每个partition是一个有序的队列。kafka只保证一个partition中的消息顺序发给consumer,不保证多个partition消息有序。
consumer / partition / topic 的关系
- 每个group中可以有多个consumer, 每个consumer属于一个consumer group
- 每个 topic 中的一条消息只会被 consumer group 中的一个消费者消费。
- 一个 topic 可以有多个 partition . 实际消费时是一个 partition 对应一个 consumer.
- 一个 partition 对应一个 consumer . 但是一个 consumer 可以消费一个 topic 的多个 partition.
- 同一个group中不能有多于partitions个数的consumer同时消费,否则某些consumer将无法得到消息.
安装与使用
解压,配置环境变量
修改配置文件
# 这个在集群中的每个节点都不能重复。 broker.id=0 # zookeeper 的地址也要修改 zookeeper.connect=192.168.192.132:2181
启动 zookeeper
# 可以启动自己配置的 zookeeper 也可以使用如下命令启动 kafka 自带的 zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties
启动 kafka
# 在每台机器上执行 bin/kafka-server-start.sh [-daemon] config/server.properties
命令行使用 kafka
创建 topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
列出 topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
查看 topic 详细信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
通过命令行生产消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
消费消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
删除 topic
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
Paritition机制
Producer发送消息到broker时,会根据Paritition机制选择将其存储到哪一个Partition。如果Partition机制设置合理,所有消息可以均匀分布到不同的Partition里,这样就实现了负载均衡
Paritition机制可以通过指定Producer的paritition. class这一参数来指定,该class必须实现kafka.producer.Partitioner接口
默认算法是 hashcode % partitions,也可以自己实现。
消息应答机制
通过参数 request.required.acks 可以设置 producer 接收消息ack 的机制。默认为0.
# 0: producer不会等待broker发送ack
# 1: 当leader接收到消息之后发送ack
# 2: 当所有的follower都同步消息成功后发送ack.
acks=0
Consumer的负载均衡
当一个group中consumer加入或者离开时,会触发负载均衡。
# 计算公式
# 可能出现 consumer 空闲的情况
每个 consumer 分得的 partition = ceil(partitions.len/consumers.len)
文件存储
在 kafka 数据存储目录下的 partition 以 topic + partition_id 命名。在 partition 文件夹下有一个 xxxx.index 和 xxxx.log 两种类型的文件。这些文件就是 Segment file
每个 partition 会被分成多个 Segment,默认会保留7天的 segment.
segment文件名为上一个segment文件最后一条消息的offset值. 索引文件存储元数据,索引文件中元数据指向对应数据文件中message的物理偏移地址.
配置
Boker 配置参数
Property | Description |
---|---|
broker.id | 每个 broker 唯一的整数ID |
zookeeper.connect | ip:port |
listeners=PLAINTEXT://host:9092 | 监听地址 |
advertised.listeners | |
advertised.port | |
num.partitions | topic的默认partitions数目 |
auto.create.topics.enable = true | |
delete.topic.enable = false | |
log.dirs | 数据存放目录 |
log.flush.interval.messages | |
log.flush.interval.ms | |
log.retention.bytes | 保留的日志大小 |
log.retention.hours | 保留的日志时间 |
log.roll.hours | (默认 7 * 24 ) 强制Kafka在达到该时间后生成一个新的log文件,而不管log.segment.bytes是否达到指定的大小 |
log.segment.bytes | topic的partition存储是以目录中的segment files存在的,该值控制segment file的最大大小,如果文件达到该值后,会重新生成一个新的日志文件 |
log.cleanup.policy | delete/compact |
log.retention.check.interval.ms=60000 | 日志片段文件的检查周期,查看它们是否达到了删除策略的设置(log.retention.hours或log.retention.bytes) |
log.cleaner.enable=false | 是否开启压缩 |
log.cleaner.delete.retention.ms = 1 day | 对于压缩的日志保留的最长时间 |
message.max.bytes | 消息体最大值 |
num.io.threads | 用来执行 io 请求的线程数,应等于磁盘数 |
queued.max.requests | 在I/O线程队列中的请求达到多少多少个之后,network线程停止接收新的请求 |
socket.send.buffer.bytes | 发送缓冲 |
socket.receive.buffer.bytes | socket的接收缓冲区,SO_RCVBUFF |
replica.lag.max.messages = 4000 | 如果relicas落后太多,将会认为此partition relicas已经失效。而一般情况下,因为网络延迟等原因,总会导致replicas中消息同步滞后。如果消息严重滞后,leader将认为此relicas网络延迟较大或者消息吞吐能力有限。在broker数量较少,或者网络不足的环境中,建议提高此值. |
replica.socket.timeout.ms= 30 * 1000 | leader与relicas的socket超时时间 |
replica.socket.receive.buffer.bytes=64 * 1024 | leader复制的socket缓存大小 |
replica.fetch.max.bytes = 1024 * 1024 | replicas每次获取数据的最大字节数 |
replica.fetch.wait.max.ms = 500 | replicas同leader之间通信的最大等待时间,失败了会重试 |
replica.fetch.min.bytes =1 | 每一个fetch操作的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会等待直到数据达到这个大小 |
num.replica.fetchers = 1 | leader中进行复制的线程数,增大这个数值会增加relipca的IO |
Producer 配置参数
Property | Description |
---|---|
acks = 0 | 0,1,-1 |
bootstrap.servers | host:port,… |
key.serializer | key的序列化方式,若是没有设置,同serializer.class |
value.serializer | |
buffer.memory | |
retries | 重试次数 |
batch.size = 16384 | 聚合多长请求到一定大小然后一起发送 |
linger.ms | batch 延迟的上限 |
client.id | string |
connections.max.idle.ms = 540000 | 多长时间以后关闭空闲连接 |
client.id=”” | 用户随意指定,但是不能重复,主要用于跟踪记录消息 |
partitioner.class=org.apache.kafka.clients.producer.internals.DefaultPartitioner | 分区的策略,默认是取模 |
request.timeout.ms = 10000 | 消息发送的最长等待时间 |
send.buffer.bytes=100*1024 | socket的缓存大小 |
retry.backoff.ms = 100 | 每次失败后的间隔时间 |
topic.metadata.refresh.interval.ms = 600 * 1000 | 生产者定时更新topic元信息的时间间隔 ,若是设置为0,那么会在每个消息发送后都去更新数据 |
queue.buffering.max.ms = 5000 | 异步模式下缓冲数据的最大时间。例如设置为100则会集合100ms内的消息后发送,这样会提高吞吐量,但是会增加消息发送的延时 |
queue.buffering.max.messages = 10000 | 异步模式下缓冲的最大消息数,同上 |
queue.enqueue.timeout.ms = -1 | 异步模式下,消息进入队列的等待时间。若是设置为0,则消息不等待,如果进入不了队列,则直接被抛弃 |
batch.num.messages=200 |
Consumer 配置参数
Property | Description |
---|---|
client.id | |
bootstrap.servers | host:port,… |
key.serializer | key 的序列化方式 |
value.serializer | value 的序列化方式 |
group.id | 决定该Consumer归属的唯一组ID |
fetch.min.bytes = 1 | server发送到消费端的最小数据,若是不满足这个数值则会等待直到满足指定大小。默认为1表示立即接收。 |
heartbeat.interval.ms | 用于确认 consumer 活着,在 consumer group 协调的时候会用到,值等于 session.timeout.ms 的三分之一 |
session.timeout.ms = 10000 | 用于检测 consumer 是否发生故障 |
max.poll.interval.ms = 300000 | 两次 poll 之间的最大间隔 |
max.poll.records = 500 | 一次获取的最大记录跳数 |
request.timeout.ms = 305000 | 等待超时时间 |
retry.backoff.ms | |
reconnect.backoff.ms | |
auto.offset.reset | earliest/latest/none(throw exception) |
enable.auto.commit = true | 自动提交 |
kafka java api
注意如果 bootstrap.servers 写成 ip 形式,最好在 config/server.properties 中把 listeners=PLAINTEXT://192.168.192.132:9092 或者 advertised.listeners=PLAINTEXT://192.168.192.132:9092 改成 ip 物理网卡的 ip 地址,否则可能会发生连不上 Broker 的情况。
producer
public class MyProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.192.132:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<Integer, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<Integer, String>("test",i,"test_"+i));
}
producer.close();
}
}
consumer
参看 http://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
public class MyConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.192.132:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords<Integer, String> records = consumer.poll(100);
for (ConsumerRecord<Integer, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
stream
有点类似 strom 的功能了,使 kafka 具有了流式处理的能力.
参看 http://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/streams/KafkaStreams.html
HA
参考:
Kafka设计解析(二)- Kafka High Availability (上)
Kafka设计解析(三)- Kafka High Availability (下)
Leader 选举
// 待补充
备份策略
// 待补充