Kafka介绍与实践

kafka安装

安装java

1
2
yum install java-1.8.0-openjdk
java -version

安装zookeeper

  1. 获取zookeeperwget http://mirror.bit.edu.cn/apache/zookeeper/current/zookeeper-3.4.12.tar.gz
  2. 解压tar zxvf zookeeper-3.4.12.tar.gz
  3. 移动解压文件到安装路径下

    1
    2
    3
    mkdir /data/zookeeper -p
    mv zookeeper-3.4.12 /data/zookeeper/
    cd /date/zookeeper/zookeeper-3.4.12
  4. 创建配置文件

    1
    2
    3
    4
    5
    6
    7
    [root@VM_187_252_centos zookeeper-3.4.12]# vim conf/zoo.cfg
    [root@VM_187_252_centos zookeeper-3.4.12]# cat conf/zoo.cfg
    tickTime = 2000
    dataDir = /data/zookeeper/data
    clientPort = 2181
    initLimit = 5
    syncLimit = 2
  5. 启动zookeeper服务

    1
    2
    3
    4
    [root@VM_187_252_centos zookeeper-3.4.12]# bin/zkServer.sh start
    ZooKeeper JMX enabled by default
    Using config: /data/zookeeper/zookeeper-3.4.12/bin/../conf/zoo.cfg
    Starting zookeeper ... STARTED
  6. 使用客户端连接

    1
    2
    3
    [root@VM_187_252_centos zookeeper-3.4.12]# bin/zkCli.sh
    ... ...
    [zk: localhost:2181(CONNECTED) 0]

安装kafka

  1. 获取kafkawget http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.12-2.0.0.tgz
  2. 解压tar zxf kafka_2.12-2.0.0.tgz
  3. 移动解压文件到安装路径下

    1
    2
    3
    mkdir /data/kafka -p
    mv kafka_2.12-2.0.0 /data/kafka/
    cd /data/kafka/kafka_2.12-2.0.0/
  4. 启动kafka

    1
    [root@VM_187_252_centos kafka_2.12-2.0.0]# nohup bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &

kafka manager安装

1
2
3
4
cd ~
mkdir .sbt
cd .sbt
vim repositories

在repositories中加入如下内容,使用国内阿里云资源加快下载速度

1
2
3
4
[repositories]
local
aliyun: http://maven.aliyun.com/nexus/content/groups/public
typesafe: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly

返回安装目录

1
2
3
git clone https://github.com/yahoo/kafka-manager.git
cd kafka-manager
./sbt clean dist

这个过程比较慢,需要耐心等待,完成之后在target/universal目录下会生产一个zip压缩包kafka-manager-*.zip,将其拷贝到要部署的目录下解压。

1
2
3
4
5
cd target/universal
cp kafka-manager-1.3.3.21.zip /data/kafka/
cd /data/kafka/
unzip kafka-manager-1.3.3.21.zip
cd kafka-manager-1.3.3.21/

修改 conf/application.confkafka-manager.zkhosts为正确的zookeeper地址

1
bin/kafka-manager -Dconfig.file=./conf/application.conf -Dhttp.port=9000

正常启动后即可登陆进行相关操作

kafka概述

kafka专为分布式高吞吐量系统而设计,kafka相比其它消息传递系统具有更好的吞吐量,内置分区,复制和固有的容错能力,非常适合大规模消息处理应用程序.
kafka是一个分布式发布-订阅消息系统和一个强大的队列,可以处理大量的数据,并使您能够将消息从一个端点传递到另一个端点.
kafka适合离线和在线消息消费.
Kafka消息保留在磁盘上,并在群集内复制以防止数据丢失.
Kafka构建在ZooKeeper同步服务之上.它与Apache Storm和Spark非常好地集成,用于实时流式数据分析.

kafka的好处

  1. 可靠性 kafka是分布式,分区,复制和容错的
  2. 可扩展性 kafka消息传递系统轻松缩放,无需停机
  3. 耐用性 kafka使用分布式提交日志,消息尽可能快的保存在磁盘上
  4. 性能 kafka对于发布和订阅都具有高吞吐量

kafka非常快,并保证零停机和零数据丢失

kafka的用例

  1. 指标 kafka通常用来操作监控数据,涉及聚合来自分布式应用程序的统计信息,以产生操作数据和集中馈送
  2. 日志聚合解决方案 kafka用于跨组织来从多个服务收集日志,并提供标准格式给多个服务器
  3. 流处理 流行的框架从主题中读取数据,对其处理,并将处理后的数据写入新主题,供用户和应用程序使用.kafka的强耐久性在流处理的上下文中非常有用

Kafka框架图

kafka基础

  1. Topics 主题
    属于特定类别的消息流称为主题.数据存储在主题中.主题被拆分成分区.对于每个主题,Kafka保存一个分区的数据.每个这样的分区包含不可变有序序列的消息.分区被实现为具有相等大小的一组分段文件
  2. Partition分区
    主题可能有许多分区,因此它可以处理任意数量的数据.
  3. Partition offset 分区偏移
    每个分区消息具有称为offset的唯一序列标识.
  4. Replicas of partition 分区备份
    副本只是一个分区的备份.副本从不读取或写入数据.它们用于防止数据丢失.
  5. Brokers 经纪人
    • 代理是负责维护发布数据的简单系统.每个代理中的每个主题可以具有零个或多个分区 假设,如果在一个主题和N个代理中有N个分区,每个代理将有一个分区.
    • 假设在一个主题中有N个分区并且多于N个代理(n+m),则第一个N代理将具有一个分区,并且下一个M代理将不具有用于该特定主题的任何分区。
    • 假设在一个主题中有N个分区并且小于N个代理(n-m),每个代理将在它们之间具有一个或多个分区共享。 由于代理之间的负载分布不相等,不推荐使用此方案。
  6. Kafka Cluster Kafka集群
    Kafka有多个代理被称为Kafka集群.可以扩展Kafka集群,无需停机.集群用于管理消息数据的持久性和复制.
  7. Producers 生产者
    生产者是发送给一个或多个Kafka主题的消息的发布者.生产者向Kafka经纪人发送数据.每当生产者将消息发布给代理时,代理只需将消息附加到最后一个段文件.实际上,该消息将被附加到分区.生产者还可以向他们选择的分区发送消息.
  8. Consumers 消费者
    Consumers从经纪人处读取数据.消费者订阅一个或多个主题,并通过从代理中提取数据来使用已发布的消息.
  9. Leader 领导者
    Leader是负责给定分区的所有读取和写入的节点. 每个分区都有一个服务器充当Leader
  10. Follower 追随者
    跟随领导者指令的节点被称为Follower.如果领导失败,一个追随者将自动成为新的领导者.跟随者作为正常消费者,拉取消息并更新其自己的数据存储.

kafka集群架构

  1. Broker代理
    Kafka集群通常由多个代理组成以保持负载平衡.Kafka代理是无状态的,使用ZooKeeper来维护它们的集群状态.一个Kafka代理实例可以每秒处理数十万次读取和写入,每个Broker可以处理TB的消息,而没有性能影响.Kafka经纪人领导选举可以由ZooKeeper完成.
  2. ZooKeeper
    ZooKeeper用于管理和协调Kafka代理.ZooKeeper服务主要用于通知生产者和消费者Kafka系统中存在任何新代理或Kafka系统中代理失败.根据Zookeeper接收到关于代理的存在或失败的通知,然后产品和消费者采取决定并开始与某些其他代理协调他们的任务.
  3. Producers 生产者
    生产者将数据推送给经纪人.当新代理启动时,所有生产者搜索它并自动向该新代理发送消息.Kafka生产者不等待来自代理的确认,并且发送消息的速度与代理可以处理的一样快.
  4. Consumers 消费者
    因为Kafka代理是无状态的,这意味着消费者必须通过使用分区偏移来维护已经消耗了多少消息.如果消费者确认特定的消息偏移,则意味着消费者已经消费了所有先前的消息.消费者向代理发出异步拉取请求,以具有准备好消耗的字节缓冲区.消费者可以简单地通过提供偏移值来快退或跳到分区中的任何点.消费者偏移值由ZooKeeper通知.

Kafka工作流程

Kafka只是分为一个或多个分区的主题的集合.
Kafka分区是消息的线性有序序列,其中每个消息由它们的偏移来标识.Kafka集群中的所有数据都是不相连的分区联合.传入消息写在分区的末尾,消息由消费者顺序读取.通过将消息复制到不同的代理提供持久性.
Kafka以快速,可靠,持久,容错和零停机的方式提供基于pub-sub和队列的消息系统.
生产者只需将消息发送到主题,消费者可以根据自己的需要选择任何一种类型的消息传递系统

发布订阅工作流程

  1. 生产者向主题发送消息。
  2. Kafka代理存储为该特定主题配置的分区中的所有消息.它确保消息在分区之间平等共享.如果生产者发送两个消息并且有两个分区,Kafka将在第一分区中存储一个消息,在第二分区中存储第二消息.
  3. 消费者订阅特定主题.
  4. 一旦消费者订阅主题,Kafka将向消费者提供主题的当前偏移,并且还将偏移保存在Zookeeper中.
  5. 消费者将定期请求Kafka新消息.
  6. 一旦Kafka收到来自生产者的消息,它将这些消息转发给消费者.
  7. 消费者将收到消息并进行处理.
  8. 一旦消息被处理,消费者将向Kafka代理发送确认.
  9. 一旦Kafka收到确认,它将偏移更改为新值,并在Zookeeper中更新它.由于偏移在Zookeeper中维护,消费者可以正确地读取下一封邮件,即使在服务器暴力期间.
  10. 以上流程将重复,直到消费者停止请求.

消费者可以随时回退/跳到所需的主题偏移量,并阅读所有后续消息.

队列消息/用户组的工作流

在队列消息传递系统而不是单个消费者中,具有相同”组ID “的一组消费者将订阅主题。 简单来说,订阅具有相同” Group ID “的主题的消费者被认为是单个组,并且消息在它们之间共享。 让我们检查这个系统的实际工作流程。

  1. 生产者以固定间隔向某个主题发送消息.
  2. Kafka存储在为该特定主题配置的分区中的所有消息.
  3. 单个消费者订阅特定主题,假设Topic-01的Group ID为Group-1。
  4. Kafka以与发布-订阅消息相同的方式与消费者交互,直到新消费者以相同的组ID订阅相同主题Topic-01
  5. 一旦新消费者到达,Kafka将其操作切换到共享模式,并在两个消费者之间共享数据.此共享将继续,直到用户数达到为该特定主题配置的分区数。
  6. 一旦消费者的数量超过分区的数量,新消费者将不会接收任何进一步的消息,直到现有消费者取消订阅任何一个消费者.出现这种情况是因为Kafka中的每个消费者将被分配至少一个分区,并且一旦所有分区被分配给现有消费者,新消费者将必须等待

此功能也称为”使用者组”。 同样,Kafka将以非常简单和高效的方式提供两个系统中最好的

ZooKeeper的作用

Apache Kafka的一个关键依赖是Apache Zookeeper, Zookeeper是一个分布式配置和同步服务.
Zookeeper是Kafka代理和消费者之间的协调接口.
Kafka服务器通过Zookeeper集群共享信息.
Kafka在Zookeeper中存储基本元数据,例如关于主题,代理,消费者偏移(队列读取器)等的信息.

由于所有关键信息存储在Zookeeper中,并且它通常在其整体上复制此数据,因此Kafka代理/Zookeeper的故障不会影响Kafka集群的状态.Kafka将恢复状态,一旦Zookeeper重新启动.这为Kafka带来了零停机时间.Kafka代理之间的领导者选举也通过使用Zookeeper在领导者失败的情况下完成.

Kafka Leader的选举

Kafka Partition的Leader选举是在所有Broker中选出一个Controller(管理员),所有Partition的Leader选举都由Controller决定,Controller会将Leader的改变直接通过RPC的方式(比Zookeeper Queue的方式更高效)通知需要为此做出响应的Broker.
Kafka Controller的选举过程:

  1. 每个Broker都会在Zookeeper中Controller Path上注册一个Watch(监听者)
  2. 当前Controller由于某些原因和Zookeeper断开时,对应的Controller Path会自动消失,此时会触发Watch,所有活着的Broker都回去竞争成为新的Controller(创建新的Controller Path),但只有一个会竞争成功(Zookeeper保证).
  3. 竞争成功的Broker会成功新的Controller,竞争失败的Broker会重新在新的Controller Path上注册Watch.

Kafka Partition Leader的选举由Controller执行:

  1. 从Zookeeper中读取当前分区的所有ISR集合
  2. 调用配置的分区选择算法选择分区的Leader

Kafka C++实践

librdkafka安装

1
2
3
4
5
6
wget https://github.com/edenhill/librdkafka/archive/v0.11.6.tar.gz
tar zxvf v0.11.6.tar.gz
cd librdkafka-0.11.6
./configure
make
make install

参考文献