您现在的位置是:首页 > 网络趣梗网络趣梗

kafka 集群配置(kafka消息大小配置)

2022-09-07 14:22:04网络趣梗0人已围观

简介  kafka 集群配置(kafka消息大小配置),新营销网红网本栏目通过数据整理汇集了kafka 集群配置(kafka消息大小配置)相关信息,下面一起看看

  kafka 集群配置(kafka消息大小配置),新营销网红网本栏目通过数据整理汇集了kafka 集群配置(kafka消息大小配置)相关信息,下面一起看看。

   Apache Kafka原理分区复制机制Kafka有三层结构:Kafka有多个主题,每个主题有多个分区,每个分区有多个消息。

  分区机制:主要解决单台服务器存储容量有限,单台服务器并发有限的问题。一个片段的不同副本不能放在同一个代理上。

  当主题数据量非常大,无法存储在一台服务器上时,数据会被分成两个或更多部分,存储在多台服务器上。每台服务器上的数据称为一个切片。

  分区Kafka集群的优点是:负载均衡、高存储容量、高可扩展性。对于分区消费者来说,可以提高并发性和效率。

  副本:副本备份机制解决了数据存储的高可用性问题。

  如果只保存一份数据拷贝,就会有丢失的风险。为了更好地容错和容灾,可以将数据复制几份,保存在不同的机器上。

  多个跟随者副本通常存储在与领导者副本不同的代理中。通过这种机制,实现了高可用性。当某一台机器挂机后,其他跟随者副本可以迅速“转正”,开始对外提供服务。

   kafka copy的作用:在kafka中,实现copy的目的是冗余备份,而且只是冗余备份。所有读写请求都由主副本处理。跟随者副本只有一个功能,就是从领导者副本中拉消息,并尽量与领导者副本的内容保持一致。

  跟随者副本不对外提供服务,可以防止一些类似数据库事务的魔读和脏读的问题。为了提高一些性能而造成数据不一致,显然是不值得的。

   Kafka保证数据不丢失的机制大致可以分为数据生产者、Kafka集群和消费者。要保证数据不丢失,也要考虑这三个角度。

  消息生产者消息生产者保证数据不会丢失——消息确认机制(ACK机制)。有三个参考值:0,1,-1。

   //生成器继续发送下一批消息,而不等待broker的确认。//这种情况下,数据传输效率最高,但数据可靠性最低。properties.put(ProducerConfig。ACKS _ CONFIG, 0 );//只要生产者收到分区副本已成功写入的通知,它就认为推送消息成功。//这里有一点需要注意。此副本必须是前导副本。//只有成功写入了前导副本,producer才会认为消息发送成功。properties.put(ProducerConfig。ACKS _ CONFIG, 1 );//ack=-1。简单地说,生产者只有在收到分区中所有副本成功写入的通知时,才认为推送消息成功。properties.put(ProducerConfig。ACKS _ CONFIG,-1 );卡夫卡的信息消费者模型:

  消息:0 1 2 3 4 5 6 7 8 9 10 11 12生产者WirtsConsumer A (offset=9)读取消费者B (offset=11)读取消费者丢失数据:由于Kafka消费者默认自动提交位移(先更新位移,再消费消息),如果消费者程序失败,没有完成消费,消息就会丢失。这个时候经纪人也不知道。

  解决方案:

   Enable.auto.commit=false关闭置换的自动提交。

  在完全处理完消息后,手动提交位移。

   properties.put(消费者配置。ENABLE_AUTO_COMMIT_CONFIG, false );消息存储和查询机制Kafka使用日志文件来存储生产者消息,每个消息都有一个偏移值来指示它在分区中的偏移。

  一般海量的消息数据都存储在Kafka中。为了避免日志文件太大,碎片不直接对应磁盘上的日志文件,而是对应磁盘上的目录。这个目录的命名规则是topic_name _ partition_id。

   Kafka容器数据目录:/kafka/kafka-logs-kafka1

  消息存储机制Kafka作为消息中间件,只负责消息的临时存储,不负责永久存储,所以需要删除过期的数据。如果所有的数据都存储在一个文件中,删除过期的数据会很麻烦。如果要在将过期数据切割成多个文件后删除,可以根据文件的日期属性进行删除。默认情况下,仅保留168小时(即7天)内的数据。所以卡夫卡的数据存储方案是多文件存储。

  日志分段:

  在每个碎片目录中,kafka通过分段将数据分成多个LogSegment。

  日志段对应于磁盘上的一个日志文件(000000000000000.log)和一个索引文件(0000000000000.index)。

  日志文件用于记录消息,索引文件用于存储消息的索引。

  可以在server.properties中的log.segment.bytes=107370(设置段大小,默认为1 GB)选项中设置每个日志段的大小

  当日志文件等于1时

  G 时,新的会写入到下一个 segment 中。

  timeindex 文件,是 kafka 的具体时间日志。

  通过 offset 查找 message

  存储的结构:

  一个主题 – 多个分区 – 多个日志段(多个文件)。

  第一步 – 查询 segment file:

  segment file 命名规则跟 offset 有关,根据 segment file 可以知道它的起始偏移量,因为 Segment file 的命名规则是上一个 segment 文件最后一条消息的 offset 值。所以只要根据 offset 二分查找文件列表,就可以快速定位到具体文件。

  比如,第一个 segment file 是 00000000000000000000.index 表示最开始的文件,起始偏移量 (offset) 为 0。第二个是 00000000000000091932.index – 代表消息量起始偏移量为 91933 = 91932 + 1。那么 offset=5000 时应该定位 00000000000000000000.index。

  第二步 – 通过 segment file 查找 message:

  通过第一步定位到 segment file,当 offset=5000 时,依次定位到 00000000000000000000.index 的元数据物理位置和 00000000000000000000.log 的物理偏移地址,然后再通过 00000000000000000000.log 顺序查找直到 offset=5000 为止。

  生产者消息分发策略

  Kafka 在数据生产的时候,有一个数据分发策略。默认的情况使用 DefaultPartitioner.class 类。

  这个类中就定义数据分发的策略:

  public interface Partitioner extends Configurable, Closeable {/*** Compute the partition for the given record.** @param topic The topic name* @param key The key to partition on (or null if no key)* @param keyBytes The serialized key to partition on( or null if no key)* @param value The value to partition on or null* @param valueBytes The serialized value to partition on or null* @param cluster The current cluster metadata*/public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);/*** This is called when partitioner is closed.*/public void close();}

  默认实现类:org.apache.kafka.clients.producer.internals.DefaultPartitioner

  1) 如果是用户指定了 partition,生产就不会调用 DefaultPartitioner.partition() 。

  数据分发策略的时候,可以指定数据发往哪个partition。

  当 ProducerRecord 的构造参数中有 partition 的时候,就可以发送到对应 partition 上。

  /*** Creates a record to be sent to a specified topic and partition** @param topic The topic the record will be appended to* @param partition The partition to which the record should be sent* @param key The key that will be included in the record* @param value The record contents* @param headers The headers that will be included in the record*/public ProducerRecord(String topic, Integer partition, K key, V value, Iterable Header headers) {this(topic, partition, null, key, value, headers);}

  2) DefaultPartitioner 源码

  如果指定 key,是取决于 key 的 hash 值。

  如果不指定 key,轮询分发。

  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取该 topic 的分区列表List PartitionInfo partitions = cluster.partitionsForTopic(topic);// 获得分区的个数int numPartitions = partitions.size();// 如果 key 值为 null; 如果没有指定 key,那么就是轮询if (keyBytes == null) {// 维护一个 key 为 topic 的 ConcurrentHashMap,并通过 CAS 操作的方式对 value 值执行递增 +1 操作int nextValue = nextValue(topic);// 获取该 topic 的可用分区列表List PartitionInfo availablePartitions = cluster.availablePartitionsForTopic(topic);// 如果可用分区大于 0if (availablePartitions.size() 0) {// 执行求余操作,保证消息落在可用分区上int part = Utils.toPositive(nextValue) % availablePartitions.size();return availablePartitions.get(part).partition();} else {// no partitions are available, give a non-available partitionreturn Utils.toPositive(nextValue) % numPartitions;}} else {// 指定了 key,key 肯定就不为 null// hash the keyBytes to choose a partitionreturn Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}消费者负载均衡机制

  同一个分区中的数据,只能被一个消费者组中的一个消费者所消费。例如 P0 分区中的数据不能被 Consumer Group A 中 C1 与 C2 同时消费。

  消费组:一个消费组中可以包含多个消费者,properties.put(ConsumerConfig.GROUP_ID_CONFIG, “groupName”);

  如果该消费组有四个消费者,主题有四个分区,那么每人一个。多个消费组可以重复消费消息。

  如果有 3 个 Partition,p0/p1/p2,同一个消费组有 3 个消费者,c0/c1/c2,则为一一对应关系。如果有 3 个 Partition,p0/p1/p2,同一个消费组有 2 个消费者,c0/c1,则其中一个消费者消费 2 个分区的数据,另一个消费者消费一个分区的数据。如果有 2 个 Partition, p0/p1,同一个消费组有 3 个消费者,c0/c1/c3,则其中有一个消费者空闲,另外 2 个消费者消费分别各自消费一个分区的数据。Kakfa 配置文件说明

  server.properties

  1、broker.id = 0

  Kafka 集群是由多个节点组成的,每个节点称为一个 broker,中文翻译是代理。每个 broker 都有一个不同的 brokerId,由 broker.id 指定,是一个不小于 0 的整数,各 brokerId 必须不同,但不必连续。如果想扩展 kafka 集群,只需引入新节点,分配一个不同的 broker.id 即可。

  启动 kafka 集群时,每一个 broker 都会实例化并启动一个 kafkaController,并将该 broker 的 brokerId 注册到 zooKeeper 的相应节点中。集群各 broker 会根据选举机制选出其中一个 broker 作为 leader,即 leader kafkaController。Leader kafkaController 负责主题的创建与删除、分区和副本的管理等。当 leader kafkaController 宕机后,其他 broker 会再次选举出新的 leader kafkaController。

  2、log.dir = /export/data/kafka/

  Broker 持久化消息到哪里,数据目录。

  3、log.retention.hours = 168

  Log 文件最小存活时间,默认是 168h,即 7 天。相同作用的还有 log.retention.minutes、log.retention.ms。

  数据存储的最大时间超过这个时间会根据 log.cleanup.policy 设置的策略处理数据,也就是消费端能够多久去消费数据。

  log.retention.bytes 和 log.retention.hours 任意一个达到要求,都会执行删除,会被 topic 创建时的指定参数覆盖。

  4、log.retention.check.interval.ms

  多长时间检查一次是否有 log 文件要删除。默认是 300000ms,即 5 分钟。

  5、log.retention.bytes

  限制单个分区的 log 文件的最大值,超过这个值,将删除旧的 log,以满足 log 文件不超过这个值。默认是 -1,即不限制。

  6、log.roll.hours

  多少时间会生成一个新的 log segment,默认是 168h,即 7 天。相同作用的还有 log.roll.ms、segment.ms。

  7、log.segment.bytes

  Log segment 多大之后会生成一个新的 log segment,默认是 1073741824,即 1G。

  8、log.flush.interval.messages

  指定 broker 每收到几个消息就把消息从内存刷到硬盘(刷盘)。默认是 9223372036854775807。

  Kafka 官方不建议使用这个配置,建议使用副本机制和操作系统的后台刷新功能,因为这更高效。这个配置可以根据不同的 topic 设置不同的值,即在创建 topic 的时候设置值。

  在 Linux 操作系统中,把数据写入到文件系统之后,数据其实在操作系统的 page cache 里面,并没有刷到磁盘上去。如果此时操作系统挂了,其实数据就丢了。1、kafka 是多副本的,当配置了同步复制之后。多个副本的数据都在 page cache 里面,出现多个副本同时挂掉的概率比 1 个副本挂掉,概率就小很多了。2、操作系统有后台线程,定期刷盘。如果应用程序每写入 1 次数据,都调用一次 fsync,那性能损耗就很大,所以一般都会在性能和可靠性之间进行权衡。因为对应一个应用来说,虽然应用挂了,只要操作系统不挂,数据就不会丢。

  9、log.flush.interval.ms

  指定 broker 每隔多少毫秒就把消息从内存刷到硬盘。默认值同 log.flush.interval.messages 一样, 9223372036854775807。

  同 log.flush.interval.messages 一样,kafka 官方不建议使用这个配置。

  10、delete.topic.enable=true

  是否允许从物理上删除 topic。

  Kafka 监控与运维kafka-eagle 概述

  在生产环境下,在 Kafka 集群中,消息数据变化是被关注的问题,当业务前提不复杂时,可以使用 Kafka 命令提供带有 Zookeeper 客户端工具的工具,可以轻松完成工作。随着业务的复杂性,增加 Group 和 Topic,那么使用 Kafka 提供命令工具,已经感到无能为力,那么 Kafka 监控系统目前尤为重要,需要观察消费者应用的细节。

  为了简化开发者和服务工程师维护 Kafka 集群的工作有一个监控管理工具,叫做 Kafka-eagle。这个管理工具可以很容易地发现分布在集群中的哪些 topic 分布不均匀,或者是分区在整个集群分布不均匀的的情况。它支持管理多个集群、选择副本、副本重新分配以及创建 topic。同时,这个管理工具也是一个非常好的可以快速浏览这个集群的工具。

  搭建安装 kafka-eagle

  Kafka-eagle 在 Docker 中没有镜像。

  环境要求:需要安装 jdk,启动 zk 以及 kafka 的服务。

  # 启动 ZookeeperzkServer.sh start# 启动 Kafkacd /export/servers/kafka/binnohup ./kafka-server-start.sh /export/servers/kafka/config/server.properties 2 1

  Windows host 文件:

  192.168.186.20 kafka1192.168.186.20 kafka2192.168.186.20 kafka3192.168.186.11 node1192.168.186.12 node2192.168.186.13 node3

  搭建步骤:

  1) 下载 kafka-eagle 的源码包:

  Kafka-eagle 官网 – http://download.kafka-eagle.org/

  可以从官网上面直接下载最新的安装包即可 kafka-eagle-bin-1.3.2.tar.gz 这个版本即可。

  代码托管地址 – https://github.com/ artloli/kafka-eagle/releases

  2) 上传安装包并解压:

  这里选择将 kafak-eagle 安装在 node3 服务器。

  如果要解压的是 zip 格式,需要先安装命令支持。

  yum install unzip

  unzip xxxx.zip

  # 将安装包上传至 node01 服务器的 /export/softwares 路径下, 然后解压cd /export/software/unzip kafka-eagle.zipcd kafka-eagle/kafka-eagle-web/target/tar -zxf kafka-eagle-web-2.0.1-bin.tar.gz -C /export/servers

  3) 准备数据库:

  Kafka-eagle 需要使用一个数据库来保存一些元数据信息,这里直接使用 MySQL 数据库来保存即可,在本地执行以下命令创建一个 MySQL 数据库即可。

  可以使用 SQLite 或者 MySQL 数据库。

  -- 进入 mysql 客户端create database if not exists eagle character set utf8mb4;

  4) 修改 kafka-eagle 配置文件:

  cd /export/servers/kafka-eagle-web-1.3.2/confvi system-config.properties# 修改内容如下####################################### multi zookeeper kafka cluster list######################################kafka.eagle.zk.cluster.alias=cluster1cluster1.zk.list=node1:2181,node2:2181,node3:2181####################################### kafka offset storage######################################cluster1.kafka.eagle.offset.storage=kafka# cluster2.kafka.eagle.offset.storage=zk####################################### kafka sqlite jdbc driver address####################################### kafka.eagle.driver=org.sqlite.JDBC# kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db# kafka.eagle.username=root# kafka.eagle.password=www.kafka-eagle.org####################################### kafka mysql jdbc driver address######################################kafka.eagle.driver=com.mysql.jdbc.Driverkafka.eagle.url=jdbc:mysql://192.168.1.116:3306/eagle?useUnicode=true characterEncoding=UTF-8 zeroDateTimeBehavior=convertToNullkafka.eagle.username=rootkafka.eagle.password=password

  默认情况下 MySQL 只允许本机连接到 MySQL 实例中,所以如果要远程访问,必须开放权限。

  # 修改权限update user set host = % where user =root;# 刷新配置flush privileges;

  5) 配置环境变量:

  Kafka-eagle 必须配置环境变量,node03 服务器执行以下命令来进行配置环境变量。

  vi /etc/profile# 内容如下:export KE_HOME=/export/servers/kafka-eagle-web-1.3.2export PATH=:$KE_HOME/bin:$PATH# 让修改立即生效,执行source /etc/profile

  6) 启动 kakfa-eagle:

  cd /export/servers/kafka-eagle-web-1.3.2/binchmod u+x ke.sh./ke.sh start 相关文章

  粤语歌曲网(经典粤语歌曲汇总)

  北京奥特莱斯(来北京必逛的12个商场)

  豪杰超级解霸(还记得豪杰超级解霸吗?)

  龚自珍的资料(清代诗人龚自珍一生功绩简介)

  象牙塔是什么意思(“象牙塔”里怎么了?)

  水色风信子(水培风信子容易养)

  婧字怎么读

  虞姬是哪里人(古代著名美人虞姬到底是哪里人?)

  公办三本院校(我们还有哪些“三本”?)

  电子酒柜(酒柜最全选购指南)

  德州景点(德州市景区景点62个)

  玉兰油官方网(OLAY发布高端臻粹系列)

  更多kafka 集群配置(kafka消息大小配置)相关信息请关注本文章,本文仅仅做为展示!

Tags: 网络趣事  

很赞哦! ()

留言与评论 (共有 条评论)
验证码:

本栏推荐