kafka启动后自动关闭_kafka自动退出

记录一次kafka的报错

可以通过分区策略体现消息顺序性。分区策略有轮询策略、随Kafka动态维护了一个同步状态的副本的(a set of in-sync replicas),简称ISR,在这个中的都是和leader保持高度一致的,任何一条消息必须被这个中的每个读取并追加到日志中了,才回通知外部这个消息已经被提交了。因此这个中的任何一个随时都可以被选为leader.ISR在ZooKeeper中维护。ISR中有f+1个,就可以允许在f个down掉的情况下不会丢失消息并正常提供服。ISR的成员是动态的,如果一个被淘汰了,当它重新达到“同步中”的状态时,他可以重新加入ISR.这种leader的选择方式是非常快速的,适合kafka的应用场景。机策略、按消息键保序策略。

个问题ja日志中报错连接不上kafka

kafka启动后自动关闭_kafka自动退出kafka启动后自动关闭_kafka自动退出


kafka启动后自动关闭_kafka自动退出


查看了一下防火墙 selinux都已经关闭,net kafka的端口也是通的

去看kafka的日志

org.apache.kafkmonwork.InvalidReceiveException: Invalid receive (size = 369296128 larger than 1048生产者和消费者各开一个终端,分别执行创建生产者和消费者的命令,执行生产者命令后,发送一个消息:kafkatest,消费者终端可以收到这个消息,即为正常57600)

这是因为发送的消息超过了kafka默认的大小 默认是100m

在配置文件server.properties中修改socket.request.max.bytes的值,修改之后不再报错

第二个问题 kafka报错oom

找到kafka的启动脚本kafka-server-start.sh 将启动命令中-Xmx1G -Xms1G改成合适的值(默认1G)

Linux版kafka&zookeeper启动方式

最少1次:可能会重传数据,有可能出现数据被重复处理的情况;

Linux版kafka启动方式

异步发送,将多条消息暂且在客户端buffer起来,并将他们批量发送到broker;小数据IO太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率;不过这也有一定的隐患,比如当producer失效时,那些尚未发送的消息将会丢失。

方法一:case1:如果提交的偏移量小于客户端处理的一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。

在bin的上一级目录执行命令:

加守护进程启动

方法二:

在bin的上一级目录执行命令:

通过后台来启动

ZooKeeper服务命令:

在准备好相应的配置之后,可以直接通过zk.sh 这个脚本进行服务的相关作

什么是kafka

Kafka最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的特性就是可以实时处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低时延的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为开源项目。

消息队列的性能好坏,其文件存储机制设计是衡量一个消息队列服务水平和最关键指标之一。

基本工作流程如上图所示,其中:

我们看上面的架构图中,producer就是生产者,是数据的入口。注意看图中的红色箭头,Producer在写入数据的时候 永远的找leader ,不会直接将数据写入follower!那leader怎么找呢?写入的流程又是什么样的呢?我们看下图:

发送的流程就在图中已经说明了,就不单独在文字列出来了!需要注意的一点是,消息写入leader后,follower是主动的去leader进行同步的!producer采用push模式将数据发布到broker,每条消息追加到分区中,顺序写入磁盘,所以保证 同一分区 内的数据是有序的!写入示意图如下:

上面说到数据会写入到不同的分区,那kafka为什么要做分区呢?相信大家应该也能猜到,分区的主要目的是:

熟悉负载均衡的朋友应该知道,当我们向某个发送请求的时候,服务端可能会对请求做一个负载,将流量分发到不同的,那在kafka中,如果某个topic有多个partition,producer又怎么知道该将数据发往哪个partition呢?kafka中有几个原则:

保证消息不丢失是一个消息队列中间件的基本保证,那producer在向kafka写入消息的时候,怎么保证消息不丢失呢?其实上面的写入流程图中有描述出来,那就是通过ACK应答机制!在生产者向队列写入数据的时候可以设置参数来确定是否确认kafka接收到数据,这个参数可设置的值为 0 、 1 、 all 。

要注意的是,如果往不存在的topic写数据,能不能写入成功呢?kafka会自动创建topic,分区和副本的数量根据默认配置都是1。

Producer将数据写入kafka后,集群就需要对数据进行保存了!kafka将数据保存在磁盘,可能在我们的一般的认知里,写入磁盘是比较耗时的作,不适合这种高并发的组件。Kafka初始会单独开辟一块磁盘空间,顺序写入数据(效率比随机写入高)。

前面说过了每个topic都可以分为一个或多个partition,如果你觉得topic比较抽象,那partition就是比较具体的东西了!Partition在上的表现形式就是一个一个的文件夹,每个partition的文件夹下面会有多组segment文件,每组segment文件又包含.index文件、.log文件、.timeindex文件(早期版本中没有)三个文件, log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。

上面说到log文件就实际是存储message的地方,我们在producer往kafka写入的也是一条一条的message,那存储在log中的message是什么样子的呢?消息主要包含消息体、消息大小、offset、压缩类型……等等!我们重点需要知道的是下面三个:

无论消息是否被消费,kafka都会保存所有的消息。那对于旧数据有什么删除策略呢?

消息存储在log文件后,消费者就可以进行消费了。在讲消息队列通信的两种模式的时候讲到过点对点模式和发布模式。Kafka采用的是点对点的模式,消费者主动的去kafka集群拉取消息,与producer相同的是,消费者在拉取消息的时候也是 找leader 去拉取。

多个消费者可以组成一个消费者组(consumer group),每个消费者组都有一个组id!同一个消费组者的消费者可以消费同一topic下不同分区的数据,但是不会组内多个消费者消费同一分区的数据!!!如下图:

图示是消费者组内的消费者小于partition数量的情况,所以会出现某个消费者消费多个partition数据的情况,消费的速度也就不及只处理一个partition的消费者的处理速度!如果是消费者组的消费者多于partition的数量,那会不会出现多个消费者消费同一个partition的数据呢?上面已经提到过不会出现这种情况!多出来的消费者不消费任何partition的数据。所以在实际的应用中,建议 消费者组的consumer的数量与partition的数量一致 !

kafka使用文件存储消息(append only log),这就直接决定kafka在性能上依赖文件系统的本身特性.且无论任何OS下,对文件系统本身的优化是非常艰难的.文件缓存/直接内存映射等是常用的手段.因为kafka是对日志文件进行append作,因此磁盘检索的开支是较小的;同时为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数.对于kafka而言,较高性能的磁盘,将会带来更加直接的性能提升.

除磁盘IO之外,我们还需要考虑网络IO,这直接关系到kafka的吞吐量问题.kafka并没有提供太多高超的技巧;对于producer端,可以将消息buffer起来,当消息的条数达到一定阀值时,批量发送给broker;对于consumer端也是一样,批量fetch多条消息.不过消息量的大小可以通过配置文件来指定.对于kafka broker端,似乎有个sendfile系统调用可以潜在的提升网络IO的性能:将文件的数据映射到系统内存中,socket直接读取相应的内存区域即可,而无需进程再次copy和交换(这里涉及到"磁盘IO数据"/"内核内存"/"进程内存"/"网络缓冲区",多者之间的数据copy).

kafka集群中的任何一个broker,都可以向producer提供metadata信息,这些metadata中包含"集群中存活的servers列表"/"partitions leader列表"等信息(请参看zookeeper中的信息). 当producer获取到metadata信息之后, producer将会和Topic下所有partition leader保持socket连接;消息由producer直接通过socket发送到broker,中间不会经过任何"路由层".

其他JMS实现,消息消费的位置是有prodiver保留,以便避免重复发送消息或者将没有消费成功的消息重发等,同时还要控制消息的状态.这就要求JMS broker需要太多额外的工作.在kafka中,partition中的消息只有一个consumer在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见kafka broker端是相当轻量级的.当消息被consumer接收之后,consumer可以在本地保存消息的offset,并间歇性的向zookeeper注册offset.由此可见,consumer客户端也很轻量级。

kafka中consumer负责维护消息的消费记录,而broker则不关心这些,这种设计不仅提高了consumer端的灵活性,也适度的减轻了broker端设计的复杂度;这是和众多JMS prodiver的区别.此外,kafka中消息ACK的设计也和JMS有很大不同,kafka中的消息是批量(通常以消息的条数或者ck的尺寸为单位)发送给consumer,当消息消费成功后,向zookeeper提交消息的offset,而不会向broker交付ACK.或许你已经意识到,这种"宽松"的设计,将会有"丢失"消息/"消息重发"的危险.

Kafka提供3种消息传输一致性语义:最多1次,最少1次,恰好1次。

最多1次:可能会出现数据丢失情况;

恰好1次:并不是指真正只传输1次,只不过有一个机制。确保不会出现“数据被重复处理”和“数据丢失”的情况。

at most once: 消费者fetch消息,然后保存offset,然后处理消息;当client保存offset之后,但是在消息处理过程中consumer进程失效(crash),导致部分消息未能继续处理.那么此后可能其他consumer会接管,但是因为offset已经提前保存,那么新的consumer将不能fetch到offset之前的消息(尽管它们尚没有被处理),这就是"at most once".

at least once: 消费者fetch消息,然后处理消息,然后保存offset.如果消息处理成功之后,但是在保存offset阶段zookeeper异常或者consumer失效,导致保存offset作未能执行成功,这就导致接下来再次fetch时可能获得上次已经处理过的消息,这就是"at least once".

"Kafka Cluster"到消费者的场景中可以采取以下方案来得到“恰好1次”的一致性语义:

最少1次+消费者的输出中额外增加已处理消息编号:由于已处理消息编号的存在,不会出现重复处理消息的情况。

kafka中,replication策略是基于partition,而不是topic;kafka将每个partition数据到多个server上,任何一个partition有一个leader和多个follower(可以没有);备份的个数可以通过broker配置文件来设定。leader处理所有的read-write请求,follower需要和leader保持同步.Follower就像一个"consumer",消费消息并保存在本地日志中;leader负责跟踪所有的follower状态,如果follower"落后"太多或者失效,leader将会把它从replicas同步列表中删除.当所有的follower都将一条消息保存成功,此消息才被认为是"committed",那么此时consumer才能消费它,这种同步策略,就要求follower和leader之间必须具有良好的网络环境.即使只有一个replicas实例存活,仍然可以保证消息的正常发送和接收,只要zookeeper集群存活即可.

选择follower时需要兼顾一个问题,就是新leader server上所已经承载的partition leader的个数,如果一个server上有过多的partition leader,意味着此server将承受着更多的IO压力.在选举新leader,需要考虑到"负载均衡",partition leader较少的broker将会更有可能成为新的leader.

每个log entry格式为"4个字节的数字N表示消息的长度" + "N个字节的消息内容";每个日志都有一个offset来的标记一条消息,offset的值为8个字节的数字,表示此消息在此partition中所处的起始位置..每个partition在物理存储层面,有多个log file组成(称为segment).segment file的命名为"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.

获取消息时,需要指定offset和ck尺寸,offset用来表示消息的起始位置,ck size用来表示获取消息的总长度(间接的表示消息的条数).根据offset,可以找到此消息所在segment文件,然后根据segment的最小offset取值,得到它在file中的相对位置,直接读取输出即可.

kafka使用zookeeper来存储一些meta信息,并使用了zookeeper watch机制来发现meta信息的变更并作出相应的动作(比如consumer失效,触发负载均衡等)

Broker node registry: 当一个kafka broker启动后,首先会向zookeeper注册自己的信息(临时znode),同时当broker和zookeeper断开连接时,此znode也会被删除.

Broker Topic Registry: 当一个broker启动时,会向zookeeper注册自己持有的topic和partitions信息,仍然是一个临时znode.

Consumer and Consumer group: 每个consumer客户端被创建时,会向zookeeper注册自己的信息;此作用主要是为了"负载均衡".一个group中的多个consumer可以交错的消费一个topic的所有partitions;简而言之,保证此topic的所有partitions都能被此group所消费,且消费时为了性能考虑,让partition相对均衡的分散到每个consumer上.

Consumer id Registry: 每个consumer都有一个的ID(host:uuid,可以通过配置文件指定,也可以由系统生成),此id用来标记消费者信息.

Consumer offset Tracking: 用来跟踪每个consumer目前所消费的partition中的offset.此znode为持久,可以看出offset跟group_id有关,以表明当group中一个消费者失效,其他consumer可以继续消费.

Partition Owner registry: 用来标记partition正在被哪个consumer消费.临时znode。此表达了"一个partition"只能被group下一个consumer消费,同时当group下某个consumer失效,那么将会触发负载均衡(即:让partitions在多个consumer间均衡消费,接管那些"游离"的partitions)

当consumer启动时,所触发的作:

B) 然后在"Consumer id Registry"册一个watch用来当前group中其他consumer的"lee"和"join";只要此znode path下列表变更,都会触发此group下consumer的负载均衡.(比如一个consumer失效,那么其他consumer接管partitions).

C) 在"Broker id registry"下,注册一个watch用来broker的存活情况;如果broker列表变更,将会触发所有的groups下的consumer重新balance.

总结:

Kafka的核心是日志文件,日志文件在集群中的同步是分布式数据系统最基础的要素。

如果leaders永远不会down的话我们就不需要followers了!一旦leader down掉了,需要在followers中选择一个新的leader.但是followers本身有可能延时太久或者crash,所以必须选择高质量的follower作为leader.必须保证,一旦一个消息被提交了,但是leader down掉了,新选出的leader必须可以提供这条消息。大部分的分布式系统采用了多数投票法则选择新的leader,对于多数投票法则,就是根据所有副本的状况动态的选择最适合的作为leader.Kafka并不是使用这种方法。

一个邪恶的想法:如果所有都down掉了怎么办?Kafka对于数据不会丢失的保证,是基于至少一个是存活的,一旦所有都down了,这个就不能保证了。

实际应用中,当所有的副本都down掉时,必须及时作出反应。可以有以下两种选择:

这是一个在可用性和连续性之间的权衡。如果等待ISR中的恢复,一旦ISR中的起不起来或者数据都是了,那集群就永远恢复不了了。如果等待ISR意外的恢复,这个的数据就会被作为线上数据,有可能和真实的数据有所出入,因为有些数据它使用 monorepo,我们的项目结构如下:可能还没同步到。Kafka目前选择了第二种策略,在未来的版本中将使这个策略的选择可配置,可以根据场景灵活的选择。

这种窘境不只Kafka会遇到,几乎所有的分布式数据系统都会遇到。

以上仅仅以一个topic一个分区为例子进行了讨论,但实际上一个Kafka将会管理成千上万的topic分区.Kafka尽量的使所有分区均匀的分跟踪数据概率抽样布到集群所有的上而不是集中在某些上,另外主从关系也尽量均衡这样每个几点都会担任一定比例的分区的leader.

优化leader的选择过程也是很重要的,它决定了系统发生故障时的空窗期有多久。Kafka选择一个作为“controller”,当发现有down掉的时候它负责在游泳分区的所有中选择新的leader,这使得Kafka可以批量的高效的管理所有分区的主从关系。如果controller down掉了,活着的中的一个会备切换为新的controller.

对于某个分区来说,保存正分区的"broker"为该分区的"leader",保存备份分区的"broker"为该分区的"follower"。备份分区会完全正分区的消息,包括消息的编号等附加属性值。为了保持正分区和备份分区的内容一致,Kafka采取的方案是在保存备份分区的"broker"上开启一个消费者进程进行消费,从而使得正分区的内容与备份分区的内容保持一致。一般情况下,一个分区有一个“正分区”和零到多个“备份分区”。可以配置“正分区+备份分区”的总数量,关于这个配置,不同主题可以有不同的配置值。注意,生产者,消费者只与保存正分区的"leader"进行通信。

Kafka允许topic的分区拥有若干副本,这个数量是可以配置的,你可以为每个topic配置副本的数量。Kafka会自动在每个副本上备份数据,所以当一个down掉时数据依然是可用的。

Kafka的副本功能不是必须的,你可以配置只有一个副本,这样其实就相当于只有一份数据。

创建副本的单位是topic的分区,每个分区都有一个leader和零或多个followers.所有的读写作都由leader处理,一般分区的数量都比broker的数量多的多,各分区的leader均匀的分布在brokers中。所有的followers都leader的日志,日志中的消息和顺序都和leader中的一致。followers向普通的consumer那样从leader那里拉取消息并保存在自己的日志文件中。

许多分布式的消息系统自动的处理失败的请求,它们对一个是否着(alive)”有着清晰的定义。Kafka判断一个是否活着有两个条件:

符合以上条件的准确的说应该是“同步中的(in sync)”,而不是模糊的说是“活着的”或是“失败的”。Leader会所有“同步中”的,一旦一个down掉了,或是卡住了,或是延时太久,leader就会把它移除。至于延时多久算是“太久”,是由参数决定的,怎样算是卡住了,怎是由参数replica.lag.time.max.ms决定的。

只有当消息被所有的副本加入到日志中时,才算是“committed”,只有committed的消息才会发送给consumer,这样就不用担心一旦leader down掉了消息会丢失。Producer也可以选择是否等待消息被提交的通知,这个是由参数acks决定的。

Kafka保证只要有一个“同步中”的,“committed”的消息就不会丢失。

docker启动kafka

OpenTelemetry 收集器端点

记录下自己的作,避免下次用的时候又去踩坑

docker pull wurstmeister/zookeeper

docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper

docker exec -it zookeeper /bin/sh

这时查看zk只有一个zookeeper

docker退出当前容器 快捷键:

次尝试启动容器是用的种方式,后面发现在容器外面连接不了kafka,可能是因为生成的kafka地址是容器内地址。所以换了第二种方式启动。

{"datas":[{"name":"jianshu","【value":"10"}],"ver":"1.0"}

发现和图3相比 创建了很多关于kafka的

我们可分布式跟踪可让您深入了解特定服务在分布式软件系统中作为整体的一部分是如何执行的。它跟踪和记录从起点到目的地的请求以及它们经过的系统。以看到我们创建的主题及其信息

kafka在启动时连接zookeeper失败,报错信息如下

贴一下别人的解决方案吧:

kafka——消费者原理解析

需要注意的是,kafka读取特定消息的时间复杂度是O(1),所以这里删除过期的文件并不会提高kafka的性能!

kafka采用发布模式:一对多。发布模式又分两种:

Kafka为这两种模型提供了单一的消费者抽象模型: 消费者组 (consumer group)。 消费者用一个消费者组名标记自己。 一个发布在Topic上消息被分发给此消费者组中的一个消费者。 如所有的消费者都在一个组中,那么这就变成了队列模型。 如所有的消费者都在不同的组中,那么就完全变成了发布-模型。 一个消费者组中消费者同一个Topic,每个消费者接受Topic的一部分分区的消息,从而实现对消费者的横向扩展,对消息进行分流。

注意:当单个消费者无法跟上数据生成的速度,就可以增加更多的消费者分担负载,每个消费者只处理部分partition的消息,从而实现单个应用程序的横向伸缩。但是不要让消费者的数量多于partition的数量,此时多余的消费者会空闲。此外,Kafka还允许多个应用程序从同一个Topic读取所有的消息,此时只要保证每个应用程序有自己的消费者组即可。

消费者组的概念就是:当有多个应用程序都需要从Kafka获取消息时,让每个app对应一个消费者组,从而使每个应用程序都能获取一个或多个Topic的全部消息;在每个消费者组中,往消费者组中添加消费者来伸缩读取能力和处理能力,消费者组中的每个消费者只处理每个Topic的一部分的消息,每个消费者对应一个线程。

在同一个群组中,无法让一个线程运行多个消费者,也无法让多线线程安全地共享一个消费者。按照规则,一个消费者使用一个线程,如果要在同一个消费者组中运行多个消费者,需要让每个消费者运行在自己的线程中。把消费者的逻辑封装在自己的对象中,然后使用ja的ExecutorServ启动多个线程,使每个消费者运行在自己的线程上,可参考

一个 consumer group 中有多个 consumer,一个 topic 有多个 partition,所以必然会涉及到 partition 的分配问题,即确定哪个 partition 由哪个 consumer 来消费。

关于如何设置partition值需要考Kafka中的事务可以使应用程序将消费消息、生产消息、提交消费位移当作原子作来处理,同时成功或失败,即使该生产或消费会跨多个分区。虑的因素

Kafka 有两种分配策略,一个是 RoundRobin,一个是 Range,默认为Range,当消费者组内消费者发生变化时,会触发分区分配策略(方法重新分配)。

以上三种现象会使partition的所有权在消费者之间转移,这样的行为叫作再均衡。

再均衡的优点 :

再均衡的缺点 :

RoundRobin 轮询方式将分区所有作为一个整体进行 Hash 排序,消费者组内分配分区个数别为 1,是按照组来分的,可以解决多个消费者消费数据不均衡的问题。

但是,当消费者组内不同主题时,可能造成消费混乱,如下图所示,Consumer0 主题 A,Consumer1 主题 B。

Range 方式是按照主题来分的,不会产生轮询方式的消费混乱问题。

但是,如下图所示,Consumer0、Consumer1 同时了主题 A 和 B,可能造成消息分配不对等问题,当消费者组内的主题越多,分区分配可能越不均衡。

由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。

consumer group +topic + partition 确定一个offest

Kafka 0.9 版本之前,consumer 默认将 offset 保存在 Zookeeper 中,从 0.9 版本开始,

你如果特别好奇,实在想看看offset什运行docker-come up -d以调出所有九个容器:么的,也可以执行下面作:

修改配置文件 consumer.properties

再启动一个消费者

当消费者崩溃或者有新的消费者加入,那么就会触发再均衡(rebalance),完成再均衡后,每个消费者可能会分配到新的分区,而不是之前处理那个,为了能够继续之前的工作,消费者需要读取每个partition一次提交的偏移量,然后从偏移量指定的地方继续处理。

case2:如果提交的偏移量大于客户端处理的一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。

自动提交的优点是方便,但是可能会重复处理消息

不足:broker在对提交请求作出回应之前,应用程序会一直阻塞,会限制应用程序的吞吐量。

因此,在消费者关闭之前一般会组合使用commitAsync和commitSync提交偏移量。

ConsumerRebalanceListener需要实现的两个方法

下面的例子演示如何在失去partition的所有权之前通过onPartitionRevoked()方法来提交偏移量。

Consumer有个Rebalance的特性,即重新负载均衡,该特性依赖于一个协调器来实现。每当Consumer Group中有Consumer退出或有新的Consumer加入都会触发Rebalance。

之所以要重新负载均衡,是为了将退出的Consumer所负责处理的数据再重新分配到组内的其他Consumer上进行处理。或当有新加入的Consumer时,将组内其他Consumer的负载压力,重新进均匀分配,而不会说新加入一个Consumer就闲在那。

下面就用几张图简单描述一下,各种情况触发Rebalance时,组内成员是如何与协调器进行交互的。

Tips :图中的Coordinator是协调器,而generation则类似于乐观锁中的版本号,每当成员入组成功就会更新,也是起到一个并发控制的作用。

参考:

OpenTelemetry、Spring Cloud Sleuth、Kafka、Jager实现分布式跟踪

consumer 默认将 offset 保存在 Kafka 一个内置的 topic 中,该 topic 为__consumer_offsets。

我们先来看看分布式中的一些基本术语。

跨度:表示系统内的单个工作单元。跨度可以相互嵌套以模拟工作的分解。例如,一个跨度可能正在调用一个 REST 端点,然后另一个子跨度可能是该端点调用另一个,等等在不同的服务中。

Trace:所有共享相同根跨度的跨度,或者更简单地说,将所有跨度创建为原始请求的直接结果。跨度的层次结构(每个跨度在根跨度旁边都有自己的父跨度)可用于形成有向无环图,显示请求在通过各种组件时的路径。

OpenTelemetry ,也简称为 OTel,是一个供应商中立的开源 Observability 框架,用于检测、生成、收集和导出遥测数据,例如 跟踪 、 指标 和 日志 。作为 云原生 计算基金会 (CNCF) 的孵化项目,OTel 旨在提供与供应商无关的统一库和 API 集——主要用于收集数据并将其传输到某处。OTel 正在成为生成和管理遥测数据的世界标准,并被广泛采用。

Sleuth 是一个由 Spring Cloud 团队管理和维护的项目,旨在将分布式跟踪功能集成到 Spring Boot 应用程序中。它作为一个典型Spring Starter的 . 以下是一些开箱即用的 Sleuth 工具:

Sleuth 能够跟踪您的请求和消息,以便您可以将该通信与相应的日志条目相关联。您还可以将跟踪信息导出到外部系统以可视化延迟。

与 Jaeger 类似,Zipkin 在其架构中也提供了相同的组件集。尽管 Zipkin 是一个较老的项目,但 Jaeger 具有更现代和可扩展的设计。对于此示例,我们选择 Jaeger 作为后端。

让我们设计三个 Spring Boot 微服务:

这三个微服务旨在:

这是为了观察 OpenTelemetry 如何结合 Spring Cloud Sleuth 处理代码的自动检测以及生成和传输跟踪数据。上面的虚线捕获了微服务导出的跟踪数据的路径,通过OTLP(OpenTelemetry Protocol)传输到OpenTelemetry Collector,收集器依次处理并将跟踪数据导出到后端Jaeger进行存储和查询。

第 1 步:添加 POM 依赖项

这是使用 OTel 和 Spring Cloud Sleuth 实现分布式跟踪的关键。我们的目标是不必手动检测我们的代码,因此我们依靠这些依赖项来完成它们设计的工作——自动检测我们的代码,除了跟踪实现、将遥测数据导出到 OTel 收集器等。

第 2 步:OpenTelemetry 配置

对于每个微服务,我们需要在其中添加以下配置application.yml(请参阅下面部分中的示例片段)。spring.sleuth.o.exporter.otlp.endpoint主要是配置OTel Collector端点。它告诉导出器,在我们的例子中是 Sleuth,通过 OTLP 将跟踪数据发送到指定的收集器端点://o-collector:4317。注意o-collector端点 URL 来自o-collector图像的 docker-come 服务。

spring.sleuth.o.config.trace-id-ratio-based属性定义了跟踪数据的采样概率。它根据提供给采样器的分数对一部分迹线进行采样。概率抽样允许 OpenTelemetry 跟踪用户通过使用随机抽样技术降低跨度收集成本。如果该比率小于 1.0,则某些迹线将不会被导出。对于此示例,我们将采样配置为 1.0、。

OpenTelemetry 配置文件

我们需要项目根目录下的 OTel 配置文件o-config.yaml。内容如下。此配置文件定义了 OTel 接收器、处理器和导出器的行为。正如我们所看到的,我们定义了我们的接收器来 gRPC 和 HTTP,处理器使用批处理和导出器作为 jaeger 和日志记录。

第 3 步:docker-come 将所有内容串在一起

让我们看看我们需要启动哪些 docker 容器来运行这三个微服务并观察它们的分布式跟踪,前三个微服务在上面的部分中进行了解释。

第 4ja.lang.OutOfMemoryError: Ja heap space很明显的oom内存溢出的报错 步:数据Sleuth 添加了一个,以确保在请求中传递所有跟踪信息。每次调用时,都会创建一个新的 Span。它在收到响应后关闭。在行动

快乐之路

启动 Jaeger UI, [= Traces按钮,这是我们看到的创建客户跟踪:它跨越三个服务,总共跨越六个,持续时间 82.35 毫秒。

除了 Trace Timeline 视图(上面的屏幕截图),Jaeger 还提供了一个图形视图(Trace Graph在右上角的下拉菜单中选择):

识别长期运行的跨度

Jaeger UI 允许我们搜索超过指定持续时间的跟踪。例如,我们可以搜索所有耗时超过 1000 毫秒的跟踪。然后,我们可以深入研究长期运行的跟踪以调查其根本原因。

在这个故事中,我们从 OpenTelemetry、Spring Cloud Sleuth 和 Jaeger 的角度解压了分布式跟踪,验证了 REST API 调用和 Kafka pub/sub 中分布式跟踪的自动检测。我希望这个故事能让你更好地理解这些跟踪框架和工具,尤其是 OpenTelemetry,以及它如何从根本上改变我们在 分布式系统 中进行可观察性的方式。

kafka 如何脱离 zookeeper 单独使用?

log.dirs可以根据自己的实际情况填写,也可以直接用默认值根据实际情况填写,也可以直接用默认值

Kafka 不能脱离 Zookeeper 单独使用,因为 Kafka 使用 Zookeeper其实对于producer/consumer/broker三者而言,CPU的开支应该都不大,因此启用消息压缩机制是一个良好的策略;压缩需要消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑.可以将任何在网络上传输的消息都经过压缩.kafka支持gzip/snappy等多种压缩方式 管理和协调 Kafka 的。但是,的 3.0 版本中,Kafka 依然兼容 zookeeper Controller,但 Kafka Raft 元数据模式,已经可以在不依赖 zookeeper 的情况下启动 Kafka 了。如果您想尝试使用 Kafka Raft 启动 Kafka,可以在 config 目录下找到 kraft 目录,里面包含着一套新的配置文件,可以直接摒弃对 ZK 的依赖。

kafka单机版部署

Kafka 中有多种延时作,比如延时生产,还有延时拉取(DelayedFetch)、延时数据删除(DelayedDeleteRecords)等。

kafka安装需要zookeeper,但是kafka集成了zookeeper,单机部署时可以直接使用,配置kafka_2.11.X/config下的zookeeper.preperties即可

customer-serv让我们在 docker 中暂停我们的PostgreSQL 数据库,然后重复从customer-serv-bff. 500 internal server error正如预期的那样,我们得到了。检查 Jaeger,我们看到以下跟踪,异常堆栈跟踪抱怨SocketTimeoutException,再次如预期的那样。

tar -xvf kafka_2.11-2.3.0.tgz

修改dataDir和clientPort两个配置项,前者是快照的存放地址,后者是客户端连接zookeeper服务的端口

修改log.dirs和zookeeper.connect两个配置选项

zookeeper.connect要和zookeeper.preperties文件中的clientPort保持一致

做完以上作,单机版部署就完成了

nohup ./zookeeper-server-start.sh ../config/zookeeper.properties >../zookeeper.log &

后台启动zooke延时作创建之后会被加入延时作管理器(DelayedOperationPurgatory)来做专门的处理。延时作有可能会超时,每个延时作管理器都会配备一个定时器(SystemTimer)来做超时管理,定时器的底层就是采用时间轮(TimingWheel)实现的。eper,启动日志写入zookeeper.log

启动后查看zookeeper.log,看zookeeper是否启动成功

nohup ./kafka-server-start.sh ../config/server.properties > ../kafka.log&

后台启动kafkaserver,启动日志写入kafka.log

启动后查看kafka.log,看是否启动成功

./kafka-topics.sh --create --zookeeper IP:22 --replication-factor 1 --partitions 1 --topic test&

./kafka-console-producer.sh --broker-list IP:9092 --topic test&

./kafka-console-consumer.sh --bootstrap-server IP:22 --topic test --from-beginning

检查生产者、消费者是否正常的方法:

Kafka总结

内网IP只能在内网局域网访问连接,在外网是不能认识内网IP不能访问的。

consumer 采用 pull(拉)模式从 broker 中读取数据。

三个微服务在 docker 中的日志输出显示相同的跟踪 id,以红色突出显示,并根据其应用程序名称显示不同的跨度 id(应用程序名称及其对应的跨度 id 以匹配的颜色突出显示)。在 的情况下customer-serv,相同的 span id 从 REST API 请求传递到 Kafka 发布者请求。

push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。

将 A、B 主题的分区排序后分配给消费者组,TopicB 分区中的数据可能 分配到 Consumer0 中。

pull 模式不足之处是,如果 kafka 没有数据,消费者可能会陷入循环中,一直返回空数

据。针对这一点,Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有

数据可供消费,consumer 会等待一段时间之后再返回,这段时长即为 timeout。

分区中的所有副本统称为 AR(Assigned Replicas)。所有与 leader 副本保持一定程度同步的副本(包括 leader 副本在内)组成ISR(In-Sync Replicas),ISR 是 AR 中的一个子集。

处理顺序 :->序列化器->分区器

消息在通过 send() 方法发往 broker 的过程中,有可能需要经过(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往 broker。一般不是必需的,而序列化器是必需的。消息经过序列化之后就需要确定它发往的分区,如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要分区器的作用,因为 partition 代表的就是所要发往的分区号。

整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程(发送线程)。

一般来说如果消费者过多,出现了消费者的个数大于分区个数的情况,就会有消费者分配不到任何分区。开发者可以继承AbstractPartitionAssignor实现自定义消费策略,从而实现同一消费组内的任意消费者都可以消费主题的所有分区。

当前消费者需要提交的消费位移是offset+1

在旧消费者客户端中,消费位移是存储在 ZooKeeper 中的。而在新消费者客户端中,消费位移存储在 Kafka 内部的主题__consumer_offsets 中。

Kafka 中的消息是以主题为基本单位进行归类的,各个主题在逻辑上相互。每个主题又可以分为一个或多个分区。不考虑多副本的情况,一个分区对应一个日志(Log)。为了防止 Log 过大,Kafka 又引入了日志分段(LogSegment)的概念,将 Log 切分为多个 LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件。

Log 和 LogSegment 也不是纯粹物理意义上的概念,Log 在物理上只以文件夹的形式存储,而每个 LogSegment 对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件。

每个日志分段文件对应了两个索引文件,主要用来提高查找消息的效率。

日志删除(Log Retention):按照一定的保留策略直接删除不符合条件的日志分段。

日志压缩(Log Compaction):针对每个消息的 key 进行整合,对于有相同 key 的不同 value 值,只保留一个版本。

在 Kafka 集群中会有一个或多个 broker,其中有一个 broker 会被选举为(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。当某个分区的 leader 副本出现故障时,由负责为该分区选举新的 leader 副本。当检测到某个分区的 ISR 发生变化时,由负责通知所有broker更新其元数据信息。当使用 kafka-topics.sh 脚本为某个 topic 增加分区数量时,同样还是由负责分区的重新分配。

为了实现生产者的幂等性,Kafka 为此引入了 producer id(以下简称 PID)和序列号(sequence number)这两个概念。

Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的 Producer 在

初始化的时候会被分配一个 PID,发往同一 Partition 的消息会附带 Sequence Number。而Broker 端会对做缓存,当具有相同主键的消息提交时,Broker 只会持久化一条。

生产者必须提供的transactionalId,启动后请求事务协调器获取一个PID,transactionalId与PID一一对应。

每次发送数据给前,需要先向事务协调器发送AddPartitionsToTxnRequest,事务协调器会将该存于__transaction_state内,并将其状态置为BEGIN。

在处理完 AddOffsetsToTxnRequest 之后,生产者还会发送 TxnOffsetCommitRequest 请求给 GroupCoordinator,从而将本次事务中包含的消费位移信息 offsets 存储到主题 __consumer_offsets 中

一旦上述数据写入作完成,应用程序必须调用KafkaProducer的commitTransaction方法或者abortTransaction方法以结束当前事务。

在发送延时消息的时候并不是先投递到要发送的真实主题(real_topic)中,而是先投递到一些 Kafka 内部的主题(delay_topic)中,这些内部主题对用户不可见,然后通过一个自定义的服务拉取这些内部主题中的消息,并将满足条件的消息再投递到要发送的真实的主题中,消费者所的还是真实的主题。

Kafka 集群中有一个 broker 会被选举为 Controller,负责管理集群 broker 的上下线,所

有 topic 的分区副本分配和 leader 选举等工作。Controller 的管理工作都是依赖于 Zookeeper 的。

请教kafka启动包错 求指教

现在,让我们启动customer-serv-bff流程的入口点,以创建新客户。

启动kafka报错,应该怎么解决

Jaeger 最初由 Uber 的团队构建,然后于 2015 年开源。它于 2017 年被接受为云原生孵化项目,并于 2019 年毕业。作为 CNCF 的一部分,Jaeger 是云原生 架构 中公认的项目。它的源代码主要是用 Go 编写的。Jaeger 的架构包括:

必须使用base 包中的build.prop,但是必须在build.prop 任意位置加入如下几行(对比了百度和联想的build.prop,发现百度修改和添加了一下prop,移植时如果base的build.prop有这个属性,替换,没有则增加即可):

ro.baidu.build.hardware=c8813(以c8813为例,可选择自己适配的机型)

ro.baidu.build.hardware.version=1.0

ro.baidu在本文中,我们将使用 OpenTelemetry、Spring Cloud Sleuth、Kafka 和 Jaeger 在三个 Spring Boot 微服务 中实现分布式跟踪。.build.version.release=2.1

ro.product.manufacturer=Baidu

persist.sys.emmc=/mnt/sdcard2

ro.config.notification_sound=Ding.mp3

ro.config.ringtone=Echo.mp3

ro.config.alarm_alert=alarm.mp3

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 836084111@qq.com,本站将立刻删除。