Camel组件之Idempotent Consumer

Camel组件之Idempotent ConsumerThe nbsp IdempotentCo nbsp fromthe nbsp EIPpatterns nbsp isusedtofilt Thispatterni nbsp IdempotentCo nbsp class Thisusesan nbsp Expression nbsp tocalculatea

The Idempotent Consumer from the EIP patterns is used to filter out duplicate messages.

This pattern is implemented using the IdempotentConsumer class. This uses an Expression to calculate a unique message ID string for a given message exchange; this ID can then be looked up in the IdempotentRepository to see if it has been seen before; if it has the message is consumed; if its not then the message is processed and the ID is added to the repository.

Idempotent Conumser用来处理重复的Message。它使用IdempotentConsumer进行Message的重复检测。检测原理很简单:每一个Message都有一个唯一的ID,在Message被处理之前,先在IdempotentRepository 里搜索ID,看看是否已经处理了该条Message,如果没有的,处理该条Message,然后将ID存到IdempotentRepository 中。

The Idempotent Consumer essentially acts like a Message Filter to filter out duplicates.

Camel will add the message id eagerly to the repository to detect duplication also for Exchanges currently in progress.

On completion Camel will remove the message id from the repository if the Exchange failed, otherwise it stays there.

Camel 提供的几个已有的IdempotentRepository:

  • MemoryIdempotentRepository
  • FileIdempotentRepository
  • HazelcastIdempotentRepository (Available as of Camel 2.8)
  • JdbcMessageIdRepository (Available as of Camel 2.7)
  • JpaMessageIdRepository

 

看看几个主要的属性:

Option Default Description
eager true

Camel 2.0: Eager controls whether Camel adds the message to the repository before or after the exchange has been processed. If enabled before then Camel will be able to detect duplicate messages even when messages are currently in progress. By disabling Camel will only detect duplicates when a message has successfully been processed.

这个主要用来配置ID重复检测与Message处理的关系,如果启用,在消息在处理的时候,就可以检测ID是否重复,否则就等到消息处理完毕再检测。

messageIdRepositoryRef null

A reference to a IdempotentRepository to lookup in the registry. This option is mandatory when using XML DSL.

存放ID和进行ID重复检测的IdempotentRepository

skipDuplicate true

Camel 2.8: Sets whether to skip duplicate messages. If set to false then the message will be continued. However the Exchange has been marked as a duplicate by having the Exchange.DUPLICATE_MESSAG exchange property set to a Boolean.TRUE value.

设置是否处理重复的消息,如果设为false,消息会继续执行,但是Exchange.DUPLICATE_MESSAG属性将被设为TRUE

removeOnFailure true

Camel 2.9: Sets whether to remove the id of an Exchange that failed.

是否删除执行失败的消息ID

注意:消息的ID是由一个表达式提供的,比如Header(“someHeader”)

使用MemoryIdempotentRepository:

 

RouteBuilder builder = new RouteBuilder() { public void configure() { errorHandler(deadLetterChannel("mock:error")); from("seda:a") .idempotentConsumer(header("myMessageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)) .to("seda:b"); } };

 这里使用myMessageId作为消息的ID

使用jpaMessageIdRepository:

from("direct:start").idempotentConsumer( header("messageId"), jpaMessageIdRepository(lookup(JpaTemplate.class), PROCESSOR_NAME) ).to("mock:result");

 这里使用myMessageId作为消息的ID,而PROCESSOR_NAME则作为消息ID存储的集合的标识(内部也是一个key-value结构?可能考虑到要支持多个Repository公用)。

下面看一个如何处理重复消息的示例:

from("direct:start") // instruct idempotent consumer to not skip duplicates as we will filter then our self .idempotentConsumer(header("messageId")).messageIdRepository(repo).skipDuplicate(false) .filter(property(Exchange.DUPLICATE_MESSAGE).isEqualTo(true)) // filter out duplicate messages by sending them to someplace else and then stop .to("mock:duplicate") .stop() .end() // and here we process only new messages (no duplicates) .to("mock:result");

 在处理完以后,对于Exchange.DUPLICATE_MESSAGE为ture的消息,转发到mock:duplicate。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/225876.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月17日 上午8:20
下一篇 2026年3月17日 上午8:21


相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号