我使用带有幂等生产者配置的spring kafka:
这是我的配置道具:
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Joiner.on(",").join(appProps.getBrokers()));
//configure the following three settings for SSL Encryption
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, appProps.getJksLocation());
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, appProps.getJksPassword());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
props.put(ProducerConfig.RETRIES_CONFIG, 5);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
我的Kafka制作人抛出OutOfOrderSequence异常:
2019-03-06 21:25:47发送者[ERROR][生产者clientId=生产者-1]代理返回org.apache.kafka.common.errors.OutOfOrderSequence异常:代理在偏移-1处收到主题分区主题-1的乱序序列号。这表示代理上的数据丢失,应该进行调查。2019-03-06 21:25:47TransactionManager[INFO][producer clientId=producer-1]producerId设置为-1,纪元-1 2019-03-06 21:25:47producerKafka[ERROR]我们在发送到kafka时遇到错误,请重试工作啊
我不知道为什么会抛出这个异常。我找不到具体的答案。例外情况的官方javadoc声明如下:
此异常表示代理从生产者处收到意外的序列号,这意味着数据可能已丢失。如果生产者仅配置为幂等(即,如果设置了enable.idemponence,但未配置transactional.id),则可以使用同一生产者实例继续发送,但这样做可能会导致发送记录的重新排序。对于事务性生产者,这是一个致命错误,您应该关闭生产者。
这是否意味着我需要使用事务生产者来避免这个问题?
KafkaProducer doc声明了一些使上述语句含糊不清的内容:https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
要启用幂等性,请启用。幂等配置必须设置为true。如果设置,重试配置将默认为整数。最大值,最大值。航班请求。每连接配置将默认为1,acks配置将默认为all。幂等生产者的API没有变化,所以现有的应用程序不需要修改就可以利用这个特性。
为了利用幂等生产者,必须避免应用程序级别的重发,因为这些重发不能去重复。因此,如果应用程序启用幂等性,建议保留重试配置未设置,因为它将默认为整数。MAX_VALUE。此外,如果发送(生产者记录)返回一个错误,即使有无限次重试(例如,如果消息在发送前在缓冲区中过期),那么建议关闭生产者并检查最后产生的消息的内容,以确保它不是复制的。最后,生产者只能保证单个会话内发送的消息的幂等性。
上面的语句清楚地指出,幂等生产者只需要使用enable.idempotence
属性。但是,异常声明我必须使用该transactional.id
属性。
创建幂等异步生产者的正确方法是什么,而不必处理致命的OutOfOrderSequenceException
。
如果显式设置重试,则必须设置
max.in.flight.requests.per.connection=1
为了避免无序问题
该文件非常清楚地指出:
将值设置为大于零将导致客户端重新发送发送失败并可能出现瞬时错误的任何记录。请注意,此重试与客户端在收到错误时重新发送记录没有什么不同。允许重试而不将MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION设置为1可能会更改记录的顺序,因为如果将两个批次发送到单个分区,并且第一批失败并重试,但第二批成功,那么第二批中的记录可能会首先出现。
对我来说似乎很清楚;从你的第二句话来看...
为了利用幂等生产者,必须避免应用程序级别的重发,因为这些重发不能去重复。因此,如果应用程序启用幂等性,建议保留重试配置未设置,因为它将默认为整数。MAX_VALUE。
而且你有
props.put(ProducerConfig.RETRIES_CONFIG, 5);
Kafka文件说,幂等生产者是可能的,与相同的生产者会话,我无法理解这一点。 比方说,Kafka为每条消息添加序列号,最后一个序列号在Kafka中维护(不确定它维护在哪里)。 它是如何生成序列号的,它保存在哪里? 为什么当制作人崩溃并再次出现时,它不能保持序列? 我怎样才能使它在制作人会话之间真正幂等?
我有一个循环缓冲区(数组/先进先出),一个消费者和一个生产者。生产者将随机数放入数组中,消费者获取第一个数字并检查它是否是相对素数。 我的代码工作正常,我认为它工作正常,但我想改进它。我不太确定我的“空运行”方法。我应该在其他地方做异常处理吗?改变“无限循环”?不应更改方法签名(它们是预定义的)。 我会很高兴每一个改进代码的建议。(不在乎知名度(公开,...),还有静态的东西,我刚刚把它们放在一个
我从源主题收到一条消息。然后我将消息分成3个部分,并将每个部分发送到3个不同的主题。现在有2条消息成功传递到第2个主题。但是在发送第3条消息时,我们会收到异常(例如ProducerFencedException|OutOfOrderSequenceException|AuthorizationException|RecordLengthException) 如何回滚/还原前2个主题中的其他2条消息
CustomerDao.InsertCustomer调用回滚,但仍然发送了kafka消息。如果在customer事件上有一个使用者,该事件将客户插入数据仓库,则在转换回滚时,数据仓库和记录系统将不同步。有没有办法让Kafka活页夹在这里是事务性的?
我有一个消费者作为生产者消费者模式的一部分: 简化: 如果我移除 通过将线程设置为睡眠,CPU使用率攀升到极高的水平(13%),而不是0%。 此外,如果我实例化该类的多个实例,则每个实例的CPU使用率都会以13%的增量攀升。 大约每分钟(可能每30秒)都会向BlockingCollection添加一个新的LogItem,并将适用的消息写入文件。 有没有可能线程以某种方式阻止了其他线程的运行,而系统
我尝试使用 kafka 实现一个简单的生产者消费者示例,并使用以下属性实现了: 然而,当我在另一个项目(数据可视化软件的插件)中尝试完全相同的配置时,我得到了以下错误: 在我说它工作的第一个版本中,我使用了“mvn clean compile assembly:single”,但是在第二个版本中,我为整个项目创建了一个jar文件。因为可视化软件需要一个jar文件来安装插件。因为每件事都是一样的(至