我有一个消费转换产品应用程序,在Kafka中有一次精确的权杖。(事务性)生成阶段在同一主题上生成新消息,然后使用该消息(事务性=read_committed)。只有一个线程执行此操作,并确保消费者轮询在生产者的事务提交之后发生。现在我每轮只有一份民意调查报告。 当我运行我的测试用例时,有时可能会有其他生产者在我的生产者的事务提交之前发送的消息。然后我经历了以下情况: 我的单个poll语句只返回这个
我什么都试过了,但似乎什么都不起作用。如果有人面临同样的问题,请让我知道。 我的配置包括以下一行:-
我正在努力实现以下目标: 使用spring cloud Stream2.1.3.Release with Kafka binder将消息发送到输入通道,并实现发布订阅行为,其中每个消费者都将得到通知,并且能够处理发送到Kafka主题的消息。 我明白,在Kafka,如果每一个消费者都属于自己的消费者群体,就能从一个话题中读到每一条信息。在我的案例中,spring为我运行的spring boot应用程
我正在开发REST服务,反过来,它将查询缓慢的遗留系统,因此响应时间将以秒为单位。我们还期望有大量的负载,所以我考虑了异步/非阻塞方法,以避免数百个“servlet”线程在调用慢速系统时被阻塞。 正如我所看到的,这可以使用AsyncContext实现,它出现在新的servlet API规范中。我甚至开发了小的原型,它似乎是工作的。 另一方面,它看起来像我可以实现同样的使用spring WebFlu
我用Kafka和spring-布特: Kafka制作人班: Kafka-配置: 问题: 我有一个主题的5个分区,比方说。 发生的情况是,我获得成功(即消息成功发送到Kafka)日志,但是topic的无分区的偏移量增加。 正如您在上面看到的,我添加了日志和。我所期望的是,当Kafka不能发送消息给Kafka时,我应该得到一个错误,但在这种情况下,我没有收到任何错误消息。 Kafka的上述行为以的比例
使用spring集成Kafka(2.1),我能够成功地发送消息到Kafka的一个主题。 本机Kafka客户端API提供了一个在成功发送时回调的选项。我怎样才能做到与spring一样--融合--Kafka。下面我的配置和代码供你参考。 XML配置 发送消息的Java码
我想从flink读一个Kafka的题目 此代码成功运行: 但是,当我尝试使用from flink: 我得到一个错误:
我很感激你在这方面的帮助。 我正在构建一个ApacheKafka消费者,以订阅另一个已经运行的Kafka。现在,我的问题是,当我的制作人将消息推送到服务器时。。。我的消费者没有收到。。我在打印的日志中得到以下信息: 我不确定我是否遗漏了任何重要的配置。。。但是,我可以使用WireShark看到一些来自我的服务器的消息,但是我的消费者没有消费这些消息。。。。 我的代码是示例消费者示例的精确副本:ht
我有一个用例“XML文件==>Kafka主题==>Build REST API to Query”来自Kafka主题的数据。我熟悉将数据转换为Avro格式,并编写到kafka主题。 您能建议如何发布XML吗?
我想产生一个Kafka主题的信息。该消息应该具有以下模式: 我知道这是一个json模式,那么如何将json转换成字符串呢?
创建了一个群集,其中有两个代理使用相同的动物园管理员,并试图为主题生成消息,其详细信息如下。 当生产者设置或-1时,,它应该接收代理(领导者和副本)的确认,但当一个代理在制作时手动关闭时,即使在acks=“all”有人能解释这种奇怪行为的原因时,对Kafka制作人也没有任何影响? 经纪人在9091,9092。 下面是Kafka制作人的源代码
下午好,我最近才开始与Kafka合作,我有一个关于制作人与模式的问题。 最初,我尝试在C#中构建一个没有模式的简单生产者。到目前为止,这是可行的,代码也在一个简短的版本中给出。 无模式生产者代码: 但是模式会给我带来问题(请参阅下一节)。 假设我给了一个消费者,比如说Python中的消费者,他使用以下方案来接收整数: 我现在想创建一个使用此方案并向Python消费者发送消息的C#生产者。根据该方案
我使用的是spring kafka 2.2。我们使用confluent schema registry进行模式验证,因为我们发布的是avro消息。 下面是如何在my producer configs中设置架构注册表URL。 配置。put(KafkaAvroSerializerConfig.SCHEMA\u REGISTRY\u URL\u CONFIG,schemaRegistryURL); 现在
据我所知,Kafka使用者将在第一次从代理反序列化avro数据时,前往模式注册中心获取和缓存数据模式,以备将来使用。 在所有这些开始之前,消费者进程是否有可能首先检查架构注册表是否可达(如网络问题或注册表意外关闭或其他什么)?
我有一组Kafka代理实例作为集群运行。我有一个客户正在生产数据给Kafka: 当我们使用tcpdump进行监控时,我可以看到只有到broker1和broker2的连接被建立,而对于broker3,没有来自我的生产者的连接。我有一个只有一个分区的单一主题。 我的问题是: > 为什么在我的情况下,我无法连接到broker3?或者至少我的网络监控没有显示我的制作人与broker3建立了连接? 如果我能