我正在一个POC中工作,我想消费来自Kafka主题“用户”的消息。尝试实现一旦spring boot scheduler在预定时间或cron时间触发,消费者应该从Kafka主题读取消息,然后我们应该开始从Kafka主题逐个消费现有消息并处理这些消息,当所有的消息被消费,那么Kafka消费者应该停止。调度程序应在cron时间触发并再次启动进程。 我已经尝试了以下方法来实现这一点,尽管我正在努力确定如
我很难在Kafka主题的消费者中找到处理异常的简单模式。场景如下:在消费者中,我调用一个外部服务。如果服务不可用,我想重试几次,然后停止消费。 最简单的模式似乎是一种处理它的阻塞同步方式,在Java中如下所示: 但是,我觉得必须有一种更简单的方法(不使用第三方库),并且避免阻塞线程。 这似乎是我们想要的一种常见的东西,但我找不到一个简单的例子来说明这种模式。
最近,在一次采访中,我被问到一个关于Kafka流的问题,更具体地说,面试官想知道为什么/什么时候您会使用Kafka流DSL而不是普通的Kafka消费者API来读取和处理消息流?我不能给出一个令人信服的答案,我想知道使用这两种流处理风格的其他人是否可以分享他们的想法/意见。多谢了。
对于这个用例,我应该使用Kafka Consumer API还是Kafka Streams API?我有一个话题与一些消费群体消费它。本主题包含一种类型的事件,它是一个内部埋藏了一个类型字段的JSON消息。一些信息会被一些消费者群体消费,而另一些消费者群体不会消费,一个消费者群体可能根本不会消费很多信息。 我的问题是:我是否应该使用消费者API,然后在每个事件上读取type字段,并删除或处理基于t
所以我很想知道。这两个有什么不同?2.在哪种场景下选择哪一种?
我已经做了一些Kafka流应用程序和Kafka消费者应用程序。最后,Kafka流不是什么,而是消费来自Kafka的实时事件的消费者。所以我不知道什么时候使用Kafka流,或者为什么我们应该使用Kafka流,因为我们可以在消费者端执行所有转换。
我正在使用Apache Kafka 0.8.2.1,计划升级应用程序以使用Apache Kafka 1.0.0。当我考察Kafka流的时候,我得到了一些关于Kafka流和Kafka流的区别的问题。 我知道KafkaConsumer基本上用于字面上,从broker和KafkaStreams可以做各种事情,如或与数据库交互,甚至重新生成到其他kafka或任何其他系统。 所以,这是我的问题。KafkaC
null 我注意到前面也有人问过类似的问题。但没有什么能结束这个问题。我在这里提到这些链接 Kafka-节点突然从偏移量0消耗 Kafka Consumer:从一开始就开始读取分区,即使有提交的偏移量 Kafka崩溃后,偏移量丢失 Kafka分区和偏移量消失
我已经用实现了一个Kafka消费者。我的使用者应该使用事件,然后为每个事件向其他服务发送REST请求。只有当REST服务关闭时,我才想重试。否则,我可以忽略失败的事件。 我的容器工厂配置如下: 我使用来设置异常和相应的重试策略。 如果我有机会了解中的哪个记录失败了,那么我将创建的自定义实现,以检查失败的消息是否可重试(通过使用字段)。如果它是不可重试的,那么我将从列表中删除它以重新查找。 关于如何
例如,我有一个消费者,最初在时间t1发送100条消息,然后我的消费者在t1+30秒启动并运行,那么我的消费者会使用t1+30秒之后发布的消息,还是会使用t1之后发布的消息?
我正在使用KafkaIO API https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/io/Kafka/kafkao.html流式传输来自Kafka主题的消息 管道流程如下: 根据documenattion窗口是必要的,如果我们正在进行任何计算,如GroupByKey等。因为我只是解码数组字节消息
我正在使用SmallRye与Kafka的反应式消息传递,以及Confluent Registry和AVRO。正如在本博客中所解释的,它运行良好https://quarkus.io/blog/kafka-avro/但在与博客相关的源代码中,它在本机编译中似乎不起作用:https://github.com/cescoffier/quarkus-kafka-and-avro 我的环境(Avro 1.10
我们在Docker容器中使用Kafka。如果在生成或消费消息时主题不存在,我们会自动创建主题。我们想要3个主题分区,所以设置 在文件/etc/kafka/server中。Kafka容器中的属性。然而,它并没有生效。完成设置并重新启动容器后,尝试订阅或发布一些不存在的主题,这些主题将被创建,但只使用一个分区。 我们在ImageConfluentInc/cp kafka:5.1.0创建的容器和Imag
我有一个kafka消费java应用程序。我正在阅读该应用程序中两个独立的Kafka主题。 对于主题1,有一个名为kafkaListenerContainerFactory的KafkaListenerContainerFactory,下面是代码片段。消息是avro格式的。Pojo1是使用avro模式构建的pojo类。 对于使用来自topic1的消息,我们有以下方法
我使用Kafka作为队列,节点服务使用Kafka节点生成和使用Kafka主题的消息。 我一直在使用自制的分布式跟踪解决方案,但现在我们正在转向弹性APM。 这似乎是为HTTP服务器量身定制的,但如何配置它以与Kafka一起使用呢? 我希望能够像下面这样跟踪事务:服务A向服务B发送一个HTTP请求,服务B向Kafka Topic C生成该请求,服务D从中使用该请求,服务D将一些数据放入Kafka T