宝马娱乐官网手机版需要注意你的onMessage方法中逻辑是否能够兼容对重复消息的判断

当前位置:宝马娱乐官方网站 > 宝马娱乐官网手机版 > 宝马娱乐官网手机版需要注意你的onMessage方法中逻辑是否能够兼容对重复消息的判断
作者: 宝马娱乐官方网站|来源: http://www.darmini.com|栏目:宝马娱乐官网手机版

文章关键词:宝马娱乐官方网站,上推队列

  先讲了JMS和遵守JMS的ActiveMQ。Java Message Service,JMS,指的是面向消息中间件(MOM),用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

  AMQP的原始用途只是为金融界提供一个可以彼此协作的消息协议,而现在的目标则是为通用消息队列架构提供通用构建工具。因此,面向消息的中间件(MOM)系统,例如发布/订阅队列,没有作为基本元素实现。反而通过发送简化的AMQ实体,用户被赋予了构建例如这些实体的能力。这些实体也是规范的一部分,形成了在线路层协议顶端的一个层级:AMQP模型。这个模型统一了消息模式,诸如之前提到的发布/订阅,队列,事务以及流数据,并且添加了额外的特性,例如更易于扩展,基于内容的路由。

  Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

  注:Kafka不遵守JMS协议,所以Kafka实际应用中,很可能会需要ack,然后多个消费者能够会同时消费。。需要具体看。

  实际上现实场景中是多个订阅者节点组成一个订阅组负载均衡消费topic消息即分组订阅,这样订阅者很容易实现消费能力线性扩展。

  传统企业型消息队列ActiveMQ遵循了JMS规范,宝马娱乐官网手机版实现了点对点和发布订阅模型,但其他流行的消息队列RabbitMQ、Kafka并没有遵循JMS规范。

  RabbitMQ实现了AMQP协议,AMQP协议定义了消息路由规则和方式。

  生产端通过路由规则发送消息到不同queue,消费端根据queue名称消费消息。

  RabbitMQ既支持内存队列也支持持久化队列,消费端为推模型,消费状态和订阅关系由服务端负责维护,消息消费完后立即删除,不保留历史消息。

  Kafka只支持消息持久化,消费端为拉模型,消费状态和订阅关系由客户端端负责维护,消息消费完后不会立即删除,会保留历史消息。

  因此支持多订阅时,消息只会存储一份就可以了。但是可能产生重复消费的情况。

  当推送消息的数量到达了perfetch limit规定的数值时,消费者还没有向消息中间件返回ACK,消息中间件将不再继续向消费者推送消息。

  如果消息的数量很少(生产者生产消息的速率不快),但是每条消息 消费者需要很长的时间处理,那么prefetch limit设置为1比较合适。

  这样,消费者每次只会收到一条消息,当它处理完这条消息之后,向消息中间件发送ACK,此时消息中间件再向消费者推送下一条消息。

  另外,对于prefetch模式(,那么消费需要进行响应ACK。因为服务器需要知道consumer消费的情况。

  与之相关的还有针对 消费者以何种方式向消息中间件返回确认ACK(响应):

  比如消费者是每次消费一条消息之后就向消息中间件确认呢?还是采用“延迟确认”---即采用批量确认的方式(消费了若干条消息之后,统一再发ACK)。

  当一定量的消息ACK之后,broker端会继续批量push消息给client端。”

  因为,prefetch limit 等于零 意味着消息中间件不会主动给消费者Push消息,而此时消费者又用MessageListener被动获取消息(不会主动去轮询消息)。

  此外,还有一个要注意的地方,即消费者采用同步获取消息(receive方法) 与 异步获取消息的方法(MessageListener) ,宝马娱乐官网手机版对消息的确认时机是不同的。

  这里提到了这篇文章:文章名《ActiveMQ消息传送机制以及ACK机制详解》

  Producer客户端使用来发送消息的, Consumer客户端用来消费消息; 它们的协同中心就是ActiveMQ broker,broker也是让producer和consumer调用过程解耦的工具,最终实现了异步RPC/数据交换的功能。 随着ActiveMQ的不断发展,支持了越来越多的特性,也解决开发者在各种场景下使用ActiveMQ的需求。 比如producer支持异步调用; 使用flow control机制让broker协同consumer的消费速率; consumer端可以使用prefetchACK来最大化消息消费的速率; 提供重发策略等来提高消息的安全性等。

  图示中,提及到prefetchAck,以及消息同步、异步发送的基本逻辑;这对你了解下文中的ACK机制将有很大的帮助。

  optimizeAck表示是否开启“优化ACK”,只有在为true的情况下,

  只要此队列有消息,那么receive方法将会立即返回,当一定量的消息ACK之后,broker端会继续批量push消息给client端。 当consumer端使用MessageListener异步获取消息时,这就需要开发设定的prefetch值必须 =1,即至少为1;

  在异步消费消息模式中,设定prefetch=0,是相悖的,也将获得一个Exception。

  broker将会把指令中指定的消息重新添加到pendingQueue(亟待发送给consumer的消息队列)中,直到合适的时机,再次push给client。

  可以让消息在多个consumer间“负载均衡”(即均匀的发送给每个consumer);

  如果较大的prefetchSize,将会导致broker一次性push给client大量的消息,但是这些消息需要很久才能ACK(消息积压),

  这样可以保证每个consumer都能有活干,否则将会出现一个consumer非常忙碌,但是其他consumer几乎收不到消息。 如果消息很重要,特别是不愿意接收到”redelivery“的消息,那么我们需要将optimizeACK=false,prefetchSize=1

  那么这些消息就有可能会被重新发送给其他consumer,那么这种风险就需要client端能够容忍“重复”消息。

  同时如果离上一次ACK的时间间隔,已经超过optimizeAcknowledgeTimout毫秒,也会导致自动进行ACK。 此外简单的补充一下,批量确认消息时,只需要在ACK指令中指明“firstMessageId”和“lastMessageId”即可,即消息区间,

  对于broker而言,只有接收到ACK指令,才会认为消息被正确的接收或者处理成功了,通过ACK,可以在consumer(/producer)与Broker之间建立一种简单的“担保”机制.

  在同一个consumer中,我们不能同时使用这2种风格,比如在使用listener的情况下,当调用receive()方法将会获得一个Exception。

  择机确认似乎充满了不确定性,这也意味着,开发者必须明确知道择机确认的具体时机,否则将有可能导致消息的丢失,或者消息的重复接收.

  所以在这种情况下,message返回之后,如果开发者在处理message过程中出现异常,会导致此消息也不会redelivery,即潜在的消息丢失;

  此外需要注意,消息的重发次数是有限制的,每条消息中都会包含“redeliveryCounter”计数器,用来表示此消息已经被重发的次数,

  如果重发次数达到阀值,将会导致发送一个ACK_TYPE为POSION_ACK_TYPE确认指令,这就导致broker端认为此消息无法消费,

  此消息将会被删除或者迁移到dead letter通道中。 因此当我们使用messageListener方式消费消息时,通常建议在onMessage方法中使用try-catch,这样可以在处理消息出错时记录一些信息,

  如果你没有使用try-catch,就有可能会因为异常而导致消息重复接收的问题,需要注意你的onMessage方法中逻辑是否能够兼容对重复消息的判断。

  broker端将不会push消息,事实上client端将处于“假死”状态,而无法继续消费消息。

  它是一种潜在的AUTO_ACK确认机制,为批量确认而生,而且具有“延迟”确认的特点。

  这也意味着,当consumer故障重启后,那些尚未ACK的消息会重新发送过来。

  如果broker端事务操作成功,那么将会把本地deliveredMessage队列清空,新的事务开始;

网友评论

我的2016年度评论盘点
还没有评论,快来抢沙发吧!