我如何处理quarkus kafka smallrye流媒体处理的异常? 我的代码与quarkus指南上的命令生产者示例非常相似(https://quarkus.io/guides/kafka#imperative-用法) 我想要类似于香草Kafka库的东西,它提供了处理请求发送的每条记录的回调的选项。 Tks
我有一个将数据推送到kafka的endpoint。现在,我想分别在kafka写入成功或失败的情况下使用近似的状态代码2xx或5xx响应调用。代码片段是 现在的问题是,在执行ack或nack回调之前,endpoint正在使用状态代码进行响应。还尝试了的方法,但该方法返回void。因此,无法知道该消息是已确认还是未确认。
所以我不确定如何做到这一点。我已经使用Qukus和MicroProfile反应式消息传递框架以及javax.websocket库的东西做到了这一点,但我不确定如何将其移植到使用Kafka Streams。使用MP反应式消息传递,我可以在其他类中的一个通道上有一个@外出注释,然后使用我的WebSocket服务,我可以像这样从该通道注入。 Kafka流有可能做到这一点吗?
我有一个Quarkus项目,基于Kafka的Smallrye反应式消息传递。因为我想使用“复杂的pojo”,所以我需要一个定制的反序列化程序。 我想制作这两个类CDI bean,这样我就可以注入和使用我的自定义记录器,它是一个CDI bean。有没有办法实现这一点? 现在,我的注入记录器对象只是空的:
我创建了一个制作人和一个消费者,使用“Kafka节点”包发送和消费Kafka主题的消息。生产者和消费者通过API进行调用。POST方法用于向主题发送消息,而GET方法用于在消费者处从主题获取消息。 当我向KAFKA发送消息后调用consumer API时,之前的所有消息都会在。 我只需要最后一条消息,这是生产者发送的。 如何在不使用任何数组或任何东西的情况下获取最后一条消息。 有没有办法删除这个话
假设kafka消息生产者向一个主题发送一条事件消息。然后一个消费者处理这个事件消息。但是,这个消费者进程因为业务错误而抛出异常,所以他想让消息生产者知道它并再次怨恨。 有什么解决办法吗?
好的,目标是:我有一个应该发送邮件的服务,如果失败,我的Kafka制作人将把这封邮件发送到Kafka主题。第二个程序每两分钟查看一次主题,应该只使用一条消息(最早的一条),然后重试发送,如果失败,程序应该将此消息返回主题。 我已经有了一个消费者,但问题是,它会消耗我直到现在还没有使用消费者的所有消息。但我希望他只吃最老的,他以前从未吃过。 这是我的实际消费者: “CustMessage”是我为测试
我有一个网页,需要发送Kafka信息到一个主题。网络正在使用vuejs。我尝试使用npm“Kafka节点”和“Kafka”,它们在建立Kafka连接时都有错误。也许它们都是服务器端npm? 是否有任何js软件包支持网页扮演Kafka制作人的角色。我不想设置其他中间服务器(比如kafka http proxy)。我希望网页直接发送信息到主题。可行吗
我只是Kafka的新手,我有个问题: 我在Kafka中有一个主题“A”,我启动Spring boot应用程序并使用MessageChannel向主题“A”发送一些消息,然后我停止应用程序。 当我再次启动应用程序时,是否可以获取我发送到主题“A”的最新消息(并非所有消息)?我搜索了所有的解决方案,但它们对我帮助不大,如果我只发送新消息,它总是会立即收到消息。如果你有可运行的代码,请分享,我非常感谢:
我在向我的Kafka主题发送序列化XML时遇到问题。每当我运行我的代码时,我都不会收到任何异常或错误消息,但我仍然无法在Kafka主题中看到我的任何消息。 我的Kafka制作人设置如下: 当我运行代码时,我得到: 知道怎么做吗?提前谢谢!
我正在使用Kafka客户端1.0.0库中的KafkaProducer,根据文档,该方法是
我在一个视频教程中看到,当制作人发布消息时,Kafka Broker支持3种类型的确认。 0-开火并忘记 1-领导确认 2-确认所有经纪人 我正在使用Kafka的Java API发布消息。这是必须为每个使用服务器的代理设置的吗。每个经纪人的特定属性,还是必须由制作人设定?如果必须由制作人设置,请解释如何使用Java API设置。
在Kafka关于重试的记录中,它说: 允许在不设置最大值的情况下重试。航班请求。每连接到1可能会改变记录的顺序,因为如果两个批发送到单个分区,第一个失败并重试,但第二个成功,那么第二个批中的记录可能会首先出现。 根据该段,如果两个批被发送到同一个分区,Kafka可以提交第二批,而第一批失败<但这似乎与Kafka关于在同一分区内订购的保证背道而驰。因为通常情况下,如果一个批次失败,所有后续批次都应该
在具有定义配额的Kafka集群中,为了控制客户端使用的代理资源,代理计算将违反配额的客户端置于其配额之下所需的延迟量,并立即返回延迟的响应Source from Kafka apache留档。 在这个话题中,我想知道延迟指的是什么(请用具体例子)? 在我的知识,当我们定义例如,代理监视生产者不覆盖1024o/sec,如果生产者覆盖1024o/sec,代理将通过节流为该客户保留的管道来关闭其配额下的
我们现在正在将我们的一项服务从通过传统通信技术推送数据转移到Apache Kafka 当前的逻辑是向IBM MQ发送消息,并在出现错误时重试。我想重复一遍,但我不知道经纪人在这种情况下提供了什么样的担保 假设我通过producer通过Java客户端库批量发送100条消息。假设它到达集群,是否有可能只接受其中的一部分(例如,磁盘已满,或者我在写操作中接触到的某些分区未被复制)?我能从我的制作人那里检