Kafka介绍

Kafka 的设计

设计的动机: 能够作为一个统一的可以处理一家大公司可能有的所有实时数据的平台。

要做到这一点,我们需要考虑一个相当广泛的使用场景:

1 它必须具有高吞吐量,以支持大容量的事件流,如实时日志聚合。
2 优雅地处理大量数据积压,要能够支持从离线系统(offline system)中定期数据装载。
3 该系统需要实现低延迟交付,以处理更传统的消息传递任务。

根据这些需求和目标,形成了 kafka 的分区模式和消费者模式, 同时加入了一些新的元素。因此,比起传统的消息系统,kafka 更像一个数据库日志。

Topic和分区

  • Topic是一类消息的集合,通常这些消息比较大,因此一个Topic会分成多个分区,数据按照某种规则送到各个分区上。
  • 分区Partition相当与Topic的其中一个队列,到达同一个分区上的消息有一个偏移值,这个值会一次递增。
  • 分区可以均衡各个broker之间的数据和请求压力;分摊处理不同消费者进程,使消费速度更快;每个Partition内可以保证严格的时序。
  • 消息以顺序写磁盘的形式写入磁盘中,速度很快,甚至比随机写内存还快,这是kafka高吞吐的重要原因。
  • 每个分区在磁盘中都是一个目录,目录下包含*.index, *.log, *.timeindex三类文件。*就是给文件第一条消息的offset。
  • 由于具有持久化的功能,众多的数据不能放在一个文件中,因此每个分区的消息会分成块存成不同的文件,可以设置每个文件的大小,块文件大小到了就重新生成新的文件。
  • 旧的数据可以被删除,可以配置成按照过期时间和数据大小删除。
  • 创建Topic时,可以指定分成多少Partition,每个分区的复制因子是多少(不同复本在不同的broker上,因此复制因子不能超过broker个数)。
  • 每个Topic的所有Partitions会分散在不同的broker上,如果是单个broker,那所有都分布在这一个broker上。
  • Partition所有副本集中有一个是leader,其他是follower。follower像消费者一样从leader上拉取数据作为备份。
  • Partition的复制因子为N,则可以容错N-1个broker挂掉而不丢失数据。
  • 通常客户端无法控制是否新建topic,kafka查看其设置auto.create.topics.enable和num.partitions来自行处理是否新建topic以及topic的分区数目。

生产者和消费者

  • 生产者直接将消息发送分区的leader broker,不经过任何路由层,包括zookeeper。
  • 生产者确定每条消息送给那个partition,方式有round-robin,也可以根据指定的key计算哈希值让相同key的消息发送固定的分区(这适合消费着对消息位置比较敏感的场合)。
  • 为了方便producer做到上面的功能,所有的kafka brokers都会回复Producer对分区元数据的请求。Producer得到分区的元数据就知道直接将数据送给那个broker了。
  • 可以选择数据批处理功能,这是kafka高性能的一个原因,这样kafka可以将数据放在内存中,等数据累积的大小或者时间足够大再一起打包发往kafka,这样可以大大的减小服务器的I/O操作。因此,需要再节省服务器IO操作和实时性之间做出选择。
  • 生产者使用push方式主动向kafka推送数据,而消费者以Pull的方式根据自己的消费能力向broker拉取数据。

Kafka可以扮演的三种角色

Messaging System

  • 传统的消息系统有两种模式:队列和Publish/Subscribe,前者多个消费者抢一个列队的消息,只能由一个消费者消费,消费完即删除;后者会将消息分发给所有需要消费该消息的消费者。前者属于单播,后者属于广播。
  • Kafka提出消费组的概念可以实现上面两类,一个消费组的消费者只能由一个能消费一条消息,属于单播,但和传统消息系统不一样的是消费完后消息不会被删除;如果每个消费者做一个组,那就都会消费每一条消息,属于广播。
  • 传统消费者可以保证消息到服务器的时序,但是如果不同进程抢同一个队列的消息,处理的顺序就可能不一样,因此,传统的队列有一个"exclusive consumer"的概念,即排外的消费者,也就是只有一个进程去消费一个队列的消息,这样就保证了时序,kakfa由于是一个分区只能被一个consumer消费,所有同一个分区的消息在消费的时候也能保证严格的时序,但是多个分区的消息就不能保证时序了。

Storage System

  • Kafka解耦了消息发布和消费,这样就像一个数据库一样,其实也是一个数据存储系统。
  • 到Kafka的数据被写进磁盘,并有副本来容错。Kafka允许producer等待ack确认消息已经写到kafka并已经有一定数量的副本。
  • Kafka使用顺序写磁盘,复杂度一样O(1),不管服务器保存50KB还是50TB,性能都一样。
  • 可以追踪消费者消费分区数据的偏移,并可以手动修改偏移值,重新消费某个偏移值开始的数据。

Stream Processing

  • 仅仅读写存数据还不够,流处理还可以实时处理数据, 即从输入topic中提取连续的数据,处理之后放到另外一个topics。
  • 这类运用可以使用producer和consumer来实现,也就是消费者消费topic-in的数据之后,处理之后产生新的数据送到topic-out中。
  • Kafka针对该功能有独立的API可以使用。

配置

Kafka Broker的配置

Producer 和 Consumer 的配置

消息的保证

  • 和mqtt一样,消息有三种保证:最多投递一次(可能丢失,不会重新投递);至少投递一次(不会丢失,且有可能重复);有且仅有一次(不会丢失且只有一次).
  • Publish的消息有committed的概念,committed的消息是只要复制该分区的broker有一个alive(broker alive的定义在后面),这条消息就不会被丢失, 也可以说是这些消息已经到达了所有的ISR。
  • 虽然定义的committed消息是所有ISR都复制了这条消息,但是考虑实时性等性能问题,producer往往设置只要收到几个ISR的ack就算这条消息已经圆满到达了。默认是ack=all。
  • 想想一种场景:消息发往kafka时发生网络问题;消息committed之后发生网络问题。这样都不确定是否执行成功。
  • 消息保证通用的逻辑是:要确保这个需要接收者给发送这一个确认信息。qos=1的逻辑是发送者暂时保留消息如果一定时间内没有收到接收者确认信息就重发; qos=2的逻辑是发送者收到接受者将消息投递给下一级用户之后的回复再删除。
  • [对于Producer]:这种保证带来了很大的延时和性能问题,不是特别严格的场合不需要使用。因此需要producer指定是否需要这种保证,和等待确认的时间。
  • [对于Consumer]:消费者有消费的偏移值,消费者正常时将这个值保存内存中,消费者挂了后,如果另起一个进程消费该如何确定消费的偏移?该怎么做呢?有一下选择:
  • 选择1:消费者读取数据之后,将消费的offset保存在log里再处理数据,后面启动的进程读取该log,接着上次消费的数据消费,这是at-most-once清醒,因为没有处理就已经把offset保存到log了,如果消息处理失败就不会重复消费这条消息了。所有启动Kafka的时候我们会看到自动创建的topic:__consumer_offsets。
  • 选择2:消费者读取数据之后,处理完数据之后再将将消费的offset保存在log,这样确保保存offset的消息一定是被处理过的。
  • qos=2的情况,kafka还没有实现。
  • 消息的offset,之前保存在zookeeper中,现在建议保存在kafka中。java的consumer API可以设置offsets.storage指定。

分区的broker角色和副本

  • 分区的副本因此在创建topic的时候指定。
  • 保存同一分区的broker集合有不同的角色,leader,replicas和isr。
  • leader是分区负责读写的broker,每个broker都可能是某些分区的leader,也可能是某些分区的副本broker。
  • replicas是复制该分区数据的所有broker,不管这个broker是否alive,也包括这个分区的leader。
  • isr(in-sync)是当前活着并且数据跟上leader的所有节点。复制的数据是否跟上leader。
  • alive broker要满足两个条件:broker必须和zookeeper建立session,进而实现心跳机制;如果是slave,复制leader的数据不能差的太远,通过replica.lag.time.max.ms配置。满足这两个条件的就是in-snyc的node(ISR),这样区别alive/failed的模糊概念。
  • leader会跟踪Partition的ISR集合,如果ISR里面某个挂了,或者数据复制落后太多,leader会将其从ISR列表中删除。
  • 配置brokers优雅关闭,一个broker fail之后需要选出以该broker作为leader的分区的新leader,优雅的关闭有这些好处:同步所有数据到disk,避免重启之后进行任何log恢复,log恢复会花费很长的时间,所以同步数据可以使重启更块;关闭之前会迁移作为leader的分区的数据到其他replicas,这样能加快新的leader转移, 减小分区不可用的时间到几ms。前者会再服务给stop(不是强制关闭)前自动执行,后者由controlled.shutdown.enable=true(默认就是true)设置。
  • 有一个preferred replicas(优先副本)的概念, ISR里的第一个就是优先副本。一个服务器挂了,新的leader会选出来,挂了的服务器恢复之后也只能做follower,不能进行读写了,这样会造成imbalance,可以通过执行下面的脚本重新恢复preferred replicas: bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot 执行上述指令很麻烦,可以设置服务器参数auto.leader.rebalance.enable=true实现自动平衡。

权限控制

使用SASL认证

有以下步骤:

配置zk集群

zk集群可以不配置auth。

配置kafka服务器

  • vim config/kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required # 指定认证的插件。
username="admin"
password="admin"
user_admin="admin"
user_manager="manager"
user_producer="producer"
user_consumer="consumer";
};
  • 修改kafka配置文件config/server.properties
# 指定acl的插件。
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
listeners=SASL_PLAINTEXT://192.168.1.100:9092
security.inter.broker.protocol= SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
super.users=User:admin
allow.everyone.if.no.acl.found=false
  • 修改kafka-server-start.sh最后一行, 加上auth的jaas文件
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=$base_dir/../config/kafka_server_jaas.conf kafka.Kafka "$@"
  • 启动zk
./bin/zkServer.sh start
  • 启动kafka
./bin/kafka-server-start.sh ./config/server.properties
  • acl:
./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181  --add --allow-principal User:lixuancong1  --operation Read 
--topic device_data_bd601ae2abdcd8c6bd4d22970629830b --group intoyun-data-group
./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181  --add --allow-principal User:lixuancong1  --operation Read 
--topic device_data_default --group intoyun-data-group
  • 使用elb负载均衡访问

https://stackoverflow.com/questions/38666795/does-kafka-support-elb-in-front-of-broker-cluster

使用elb的时候,也要确保各个kafka server能被client访问,因为client首先从elb上获取topic和分区的metadata,包括分区的leader server,后面就直接访问 该服务器。所以必须要把通过advertised把外网放出去。

# 指定acl的插件。
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
listeners=SASL_PLAINTEXT://0.0.0.0:9092
advertised.listeners=SASL_PLAINTEXT://外网IP:9092
security.inter.broker.protocol= SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
super.users=User:admin
allow.everyone.if.no.acl.found=false
  • 安装mvn编译工具:

http://blinkfox.com/linux-debianxia-mavende-an-zhuang-he-shi-yong/

  1. wget http://mirror.nus.edu.sg/apache/maven/maven-3/3.5.0/binaries/apache-maven-3.5.0-bin.tar.gz
  2. sudo mkdir /usr/lib/maven
  3. sudo tar -zxf apache-maven-3.5.0-bin.tar.gz -C /usr/lib/maven
  4. .zshrc文件添加:
export M2_HOME=/usr/lib/maven/apache-maven-3.5.0  
export M2=$M2_HOME/bin  
export PATH=$M2:$PATH   
  1. source ~/.zshrc
  2. 测试是否安装成功: mvn -version
Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 2017-04-04T03:39:06+08:00)
Maven home: /usr/lib/maven/apache-maven-3.5.0
Java version: 1.8.0_131, vendor: Oracle Corporation
Java home: /usr/local/jdk1.8.0_131/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "4.4.0-85-generic", arch: "amd64", family: "unix"

LoginModule的initialize这个方法的目的就是用有关的信息去实例化这个LoginModule。如果login成功, 在这个方法里的Subject就被用在存储Principals和Credentials. 注意这个方法有一个能被用作输入认证 信息的CallbackHandler。在这个例子里,我没有用CallbackHandler. CallbackHandler是有用的,因为它 从被用作特定输入设备里分离了服务提供者。

SASL:

  1. 创建服务端机制(客户端其实也需要,但再kafka中我们需要的是服务机制)

public SaslServer createSaslServer(String mechanism, String protocol, String serverName, Map<String, ?> props, CallbackHandler cbh)

  1. ACLs

list:

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic 

add:

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic

remove:

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic

Comments

comments powered by Disqus