当前位置: 首页 > 知识库问答 >
问题:

ReactorKafka制作人-无法重试

沈成天
2023-03-14

我已经使用reactor-kafka(kafka的一个功能性Java API)创建了一个KafkaProducer(reactor.kafka.sender.KafkaSender)。使用以下生产者配置,

max.block.ms = 8000
request.timeout.ms= 4000
retries = 3
retry.backoff.ms = 2000
max.in.flight.requests.per.connection = 512
acks = all

当我试图发布一个记录到一个无效的主题我得到超时异常

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 8000 ms

正如所料。但我已经为没有发生的重试进行了配置。我假设在max.block之后。ms/请求。暂停。ms已过期,每次重试后都会重试。退后。ms直到元数据。最大年龄。ms重试次数已用尽。仅供参考,代码:

    String topic = "order/";
    int count = 1;
    Flux<SenderRecord<String, Event, EventInfo>> source = Flux.range(1, count).map(x -> {
      Event event = new Event();
      return SenderRecord.create(
            new ProducerRecord<String, Event>(topic, event.getX(),
                event), event.getEvent());
    });
    kafkaSender.send(source).subscribe(x -> System.out.println(x));
    kafkaSender.close();
  • 启用重试的配置是否正确

共有1个答案

万俟渝
2023-03-14

我相信你也应该设置"delivery.timeout.ms"

请参阅此处的文档:https://docs.confluent.io/current/installation/configuration/producer-configs.html#retries

将值设置为大于零将导致客户端重新发送发送失败并可能出现瞬时错误的任何记录。请注意,此重试与客户端在收到错误时重新发送记录没有什么不同。允许重试而不将max.in.flight.requests.per.connection设置为1可能会更改记录的顺序,因为如果将两个批次发送到单个分区,并且第一批失败并重试,但第二批成功,那么第二批中的记录可能会首先出现。另外请注意,如果delivery.timeout.ms配置的超时在成功确认之前首先过期,那么在重试次数用尽之前,产生请求将失败。用户通常应该倾向于不设置此配置,而是使用delivery.timeout.ms来控制重试行为。

 类似资料:
  • 我试图实现这里提到的生产者(https://github.com/awslabs/amazon-kinesis-video-streams-producer-sdk-java/blob/master/src/main/demo/com/amazonaws/kinesisvideo/demoapp/putmediademo.java)。 我有一个mkv文件,我想上传在循环中作为制片人在Kinesis

  • 我正在使用Spring-Cloud-sleuth-stream和Spring-Cloud-starter-stream-kafka发送跨度到kafka,异常发生在连接中。SR1 192.168.1.177是我的localhost 初始化连接 重试例外 我不明白为什么,主机更改为,以及如何修复它

  • 问题内容: 我已经建立了俄罗斯方块游戏。现在,我已经使用JPanel来显示内容和块(使用paintComponents()方法)。 问题是,当我尝试从另一个JFrame调用tetris程序时,它根本无法绘制。 我的俄罗斯方块主菜单的代码是: 当调用MatrixBoard的构造函数时,俄罗斯方块游戏会在新窗口中开始。但是,这些块在屏幕上不可见。MatrixBoard的代码是: 请帮忙。我怀疑问题出在

  • 我已经在覆盆子pi 3上安装了node-red来从传感器收集数据,然后将它们存储在kafka中,但是现在我对kafka生产者节点有一些问题。我在笔记本电脑上安装了一个kafka服务器,它可以在控制台上正确工作:如果我在kafka生产者控制台上发送消息,我可以在消费者控制台上正确接收。不幸的是,当我试图在覆盆子上的node-red中注入kafka生产者的时间戳时,服务器没有响应。 node red的

  • 我正在尝试将Spring Cloud合同合并到现有项目中。我在REST方面取得了一些成功,但我正在努力设置消息端。 到目前为止,我已经在producer上建立了一个契约,它确实在target/generated test sources/contracts中生成了一个测试。我还为测试设置了一个基类。 我无法克服这个错误: 2017-09-08 17:10:51.759错误 - --[]- [ 主]

  • 我有一个应用程序,可以更改某些元素的字体。这对大多数人来说都很好,但可能有0.5%的人在尝试更改字体时会出现异常。堆栈跟踪的重要部分是: 正如我所说,它适用于大多数人,所以我认为这不是字体文件或我的代码的问题。关于如何解决这个问题,有什么建议吗? 编辑:这是我的代码: