我试图发送消息到Kafka,这是安装在一个Ubuntu虚拟机。 有3个Kafka经纪人已经在VM中启动,一个消费者也在VM中监听这个主题。这一切都很好。 在Windows7上的Intellij中,我为KafkaProducer编写了一个小的演示应用程序, 连接被拒绝 下面是Ubuntu的一些输出 更新从ppatierno回答(我不能添加图片到评论,所以在这里添加更新) 谢谢你的评论。下面是来自 1
我试图在循环中加载一个数据文件(以检查统计数据),而不是在Kafka中的标准输入。下载Kafka后,我执行了以下步骤: 启动动物园管理员:
我正在使用以下客户端代码创建远程KafkaProducer 一旦我创建了生产者,我就可以运行下面的行并返回有效的主题信息,假设strTopic是一个现有的主题名称。 当我尝试发送消息时,我会执行以下操作:
当我将有效负载发布到一个kafka主题时,如果一个特定的字段有一个空值或空字符串,那么这个字段根本不会出现在kafka主题的有效负载中。我希望字段仍然显示,字符串值为null或空。
我使用的是Scala中的0.9Kafka Java客户机。 在中更改参数的方差可以解决这个问题吗?
我正在尝试使用Kafka将数据从oracle迁移到mongodb。我取了一个1000万的样本记录集,列长为90,每行为5KB 我将数据分成10个线程,但其中一个线程不是每次都在运行....当我检查数据时,我看到MongoDB中丢失了100万条记录。 主类: Kafka制作人线程类: JSONObject obj=new JSONObject(); while(rs.next()){ 消费者类:
我将为消息实现断路器模式。基本要求是,如果微服务无法将消息发布到发布主题,则应停止接受来自其他 Kafka 主题的消息。当发布主题可用时,微服务应开始接受来自其他 Kafka 主题的消息。 有没有一种方法可以在Spring BootKafka Streams中实现这一点?
我用的是Apache Kafka 2.7.0和Spring Cloud Stream Kafka Streams。 在我的Spring Cloud Stream (Kafka Streams)应用程序中,我已经将我的application.yml配置为当输入主题中的消息出现反序列化错误时使用sendToDlq机制: 我启动了我的应用程序,但我看不到这个主题存在。文档指出,如果 DLQ 主题不存在,
我正在使用 Spring靴:2.3.5.释放Spring云:Hoxton.SR8 我正在尝试Spring云流Kafka流应用程序。一切都运行良好,直到出现反序列化异常。应用程序每次都会关闭。 我想跳过不良记录,在Kafka主题中前进。但我无法实现这一点。配置: 我得到的错误是 现在我正在使用这个设置。它仍然没有效果。根据留档,它应该简单地记录错误并继续处理。即它应该跳过不良记录。但这并没有发生。看
我编写了一个Spring Cloud Streams Kafka Streams Binder应用程序,它将多个Kafka输入主题多路复用到一个流中: (来源:https://spring . io/blog/2019/12/03/stream-processing-with-spring-cloud-stream-and-Apache-Kafka-streams-part-2-programmi
我有一个 Kafka 应用程序,我一直在使用它 kafka-console-consumer.sh 使用消息,如下所示: 它提供了我通过Kafka消费者给Kafka经纪人写的所有消息,没有任何遗漏。 最近,我将该应用程序部署在另一个环境中,因为某些原因,zookeperhost无法访问。所以我使用的是kafka简单的消费者外壳。sh,如下所示: 但是有了这个,我看到很少的消息(大约5000个中有2
我的Kafka消费者的代码是这样的 我已经意识到,这种消费者设置无法读取所有信息。我无法再现这一点,因为这是一个间歇性的问题。 当我使用 将最后 100 条消息与此消费者进行比较时,我发现我的消费者间歇性地随机错过了几条消息。我的消费者有什么问题? 在python中使用消息的方法太多了。应该有一种最好只有一种明显的方法来做到这一点。
我有以下Spring Cloud Stream Kafka Streams Binder 3. x应用程序: 当我通过这个应用程序运行X消息时,通过使用和从联调将它们发布到,点1和点2的消息计数是相等的,正如我所期望的那样。 当我使用连接到Kafka代理的实时应用程序做同样的事情时,点1和点2的计数仍然显着不同: 消费者在< code >主题2上有很大的滞后,并且该滞后保持不变(在我停止发布消息后
我们正在使用Avro输入/输出记录通过Spring Cloud Stream功能支持测试Kafka Streams的使用,但设置和,以便在执行Avro转换的地方使用自定义的。 默认的 和值的 。 当我们只使用KStream到KStream函数时,一切都没问题,例如: 但是当我们尝试一个稍微复杂一点的例子,涉及一个像这样的输入KTable: ( 类只有两个成员:) 收到第一条记录时,将引发此异常:
我正在使用一个与kafka集成的spring boot应用程序,我想实现一个endpoint来停止和启动kafka发布消息。消息由另一个endpoint以异步方式触发。 豆子卡