我使用了spring io文档中列出的示例配置,它运行良好。 然而,当我用下游应用程序测试它时,我从Kafka那里消费并将其发布到下游。如果下游关闭,则消息仍在使用中,不会重播。 或者说,在使用kafka主题后,如果我在service activator中发现一些异常,我还想抛出一些异常,这些异常应该回滚事务,以便可以重播kafka消息。 简而言之,如果消费应用程序有一些问题,那么我想回滚事务,这
我使用以下代码消费来自JMS ActiveMQ的消息: 我的要求是从这里消费并将其发布到Kafka出站适配器。使用以下配置: 以下是我想实现的目标: > 我注意到我的消息会立即出队,如果处理遇到一些异常,我无法重新处理它。我不希望这种情况发生。 我真的很难让它发挥作用。有人能帮帮我吗?
我用的是Kafka的高级消费者。因为我使用Kafka作为我的应用程序的“事务队列”,所以我需要绝对确保不会错过或重读任何消息。关于这一点,我有两个问题: > 如何将偏移量提交给zoomaster?我将在每条消息成功消费后关闭自动提交和提交偏移量。我似乎找不到如何使用高级消费者执行此操作的实际代码示例。有人能帮我吗? 另一方面,我听说promisezooeger可能会很慢,所以另一种方法可能是在本地
已使用生产者推送消息。它向主题推送了100000条消息。 使用命令:bin/kafka producer perf test。sh--代理列表localhost:9092--消息100000--主题perfAtlasTopic获取以下生产者指标。 开始时间,结束。时间、压缩、消息。大小,批次。大小,总计。数据发送。在里面MB,MB。秒,总计。数据发送。在里面nMsg,nMsg。第[2015-02-
我在kafka connect连接器中安装了运行confluent hub安装的JDBC连接器——没有提示confluent Inc/kafka connect JDBC:10.2.5,但当我尝试使用实现新接收器时,出现以下错误:找不到任何实现连接器且名称与io匹配的类。汇合的。连接jdbc。JdbcSinkConnector 我想用的水槽 我正在使用confluentinc/cp kafka c
我需要帮助,因为我对Kafka和mqtt一无所知。我正在使用docker compose创建Mosquitto broker和Kakfa之间通信的架构 我还使用confluent Kafka Connect MQTT(https://www.confluent.io/hub/confluentinc/kafka-connect-mqtt) 它对于1883端口上的通信工作得很好,但当我尝试在8883
我有一个现有的2个kafka服务器加载了mysql连接器。它起作用了。此外,我需要添加MongoDB连接器。我已经在我的Kafka服务器(Centos7)上安装了confluent schema registry,它可以工作,我停止/启动/重新启动,看起来没有什么问题。我在这里下载并提取了debezium Mongo插件/usr/连接器/插件/debezium连接器mongodb/ 我编辑了 /e
我正在使用ConFluent的Kafka s3连接将数据从apache Kafka复制到AWS S3。 问题是,我有AVRO格式的Kafka数据,它没有使用Confluent Schema Registry的AVRO序列化程序,并且我无法更改Kafka生产者。因此,我需要反序列化来自Kafka的现有Avro数据,然后在AWS S3中以拼花格式保存相同的数据。我尝试使用confluent的AvroC
我有一个带有KafkaListener方法的Spring组件: 现在,我想测试这个方法。我想确保此方法正确接收消息。我尝试创建: 但我不知道接下来会发生什么。如何测试此方法?
我想为整个Kafka流编写一个集成测试。 在我的生产代码中,我有: 在我的测试代码中,我使用KafkaProducer 我希望有一个钩子,表明调用了KafkaListener。我可以在测试中加入一些延迟,但这是一个糟糕的做法,我想避免它。 有没有更好的方法来等待正在处理的?
我正在尝试使用嵌入式kafka进行我的测试。我使用Spring引导和Junit5,如下所示 然而,我的测试失败了,因为引导服务器中没有给出可解析的引导URL 我也在使用一个测试配置文件,在yml文件中 请帮忙。
要配置,它需要,而BrokerHosts又需要一个动物园管理员主机。 问题是,如果我有一个zookeeper仲裁(由3个zk服务器组成的集群),我如何配置KafkaSpout以获取仲裁的所有成员,而不是仅1个。因为一个zookeeper服务器可能会停机,整个拓扑将不可用。
下面是我成功运行两三天后持续获得的异常的详细信息。有人能指导我吗?
我有一个spring boot应用程序(比方说它叫app-1),它连接到一个kafka集群,并从一个特定的主题进行消费,比方说这个主题叫做“foo”。当另一个应用程序(比如称为app-2)将新的foo项导入数据库时,主题foo总是会收到一条消息。该主题主要用于第三个应用程序(比如app-3),它向可能对这个新foo项目感兴趣的人发送一些电子邮件通知。App-3是集群的,这意味着它有多个实例同时运行
我们一直在使用SI Kafka进行一个新项目,并取得了很大成功。在最近的一次切换之前,我们使用KafkaTopicOffsetManager来管理我们的消费者主题偏移量。为了避免每个消费者/主题对都有额外的主题,并使用Burrow或lag监控,我们决定使用最新的KafkaNativeOffsetManager,它使用Kafka提供的本机偏移管理。但在切换之后,我们注意到目标主题的消息消耗持续滞后。