当前位置: 首页 > 知识库问答 >
问题:

如何在夸尔库斯注射Kafka模板

隆飞驰
2023-03-14

我正在尝试注入一个KafkaTemplate来发送一条消息。我正在开发一个位于反应式方法之外的小函数。

我只能找到使用@Ingoing的例子,@Outgoing来自斯莫利,但我不需要KafkaStream

我尝试使用Kafka-CDI,但无法注入SimpleKafkaProducer

有什么想法吗?

对于克莱门特的回答

这似乎是正确的方向,但执行订单。.send(“你好”);我收到此错误:

(vert.x-eventloop-thread-3) Unhandled exception:java.lang.IllegalStateException: Stream not yet connected

我通过命令行使用我的主题,Kafka已经启动并运行,如果我手动生成,我可以看到使用的消息。

好像是相对于doc的这句话:

要将Emitter用于流hello,您需要在代码(或配置)中的某个位置使用@Incoming(“hello”)。

我班上有这样的代码:

    @Incoming("orders")
    public CompletionStage<Void> consume(KafkaMessage<String, String> msg) {
        log.info("Received message (topic: {}, partition: {}) with key {}: {}", msg.getTopic(), msg.getPartition(), msg.getKey(), msg.getPayload());
        return msg.ack();
    }

也许我忘记了一些配置?

共有2个答案

东方辉
2023-03-14

自从Clement的回答之后,< code>@Stream注释就被弃用了。必须改用< code>@Channel批注。

您可以使用由< code > quar kus-small rye-reactive-messaging-kafka 依赖项提供的< code >发射器来生成针对Kafka主题的消息。

一个简单的Kafka生产者实现:

public class MyKafkaProducer {

    @Inject
    @Channel("my-topic")
    Emitter<String> myEmitter;

    public void produce(String message) {
      myEmitter.send(message);
    }
}

并且必须将以下配置添加到应用程序中。属性文件:

mp.messaging.outgoing.my-topic.connector=smallrye-kafka
mp.messaging.outgoing.my-topic.bootstrap.servers=localhost:9092
mp.messaging.outgoing.my-topic.value.serializer=org.apache.kafka.common.serialization.StringSerializer

这将向名为my topic的kafka主题生成字符串序列化消息。

请注意,默认情况下,通道的名称也是将在其中生成数据的kafka主题的名称。这种行为可以通过配置进行更改。在反应式消息传递文档中描述了支持的配置属性

李光华
2023-03-14

因此,您只需要使用发射器

@Inject
@Stream("orders") // Emit on the channel 'orders'
Emitter<String> orders;

// ...
orders.send("hello");

在你的application.properties中,声明:

## Orders topic (WRITE)
mp.messaging.outgoing.orders.type=io.smallrye.reactive.messaging.kafka.Kafka
mp.messaging.outgoing.orders.topic=orders
mp.messaging.outgoing.orders.bootstrap.servers=localhost:9092
mp.messaging.outgoing.orders.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.orders.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.orders.acks=1

要避免“流尚未连接”异常,如文档所示:”

要将Emitter用于流hello,您需要在代码(或配置)中的某个位置使用@Incoming(“hello”)。

假设您的应用程序属性中有类似的内容:

# Orders topic (READ)
smallrye.messaging.source.orders-r-topic.type=io.smallrye.reactive.messaging.kafka.Kafka
smallrye.messaging.source.orders-r-topic.topic=orders
smallrye.messaging.source.orders-r-topic.bootstrap.servers=0.0.0.0:9092
smallrye.messaging.source.orders-r-topic.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
smallrye.messaging.source.orders-r-topic.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
smallrye.messaging.source.orders-r-topic.group.id=my-group-id

添加类似如下的内容:

@Incoming("orders-r-topic")
public CompletionStage<Void> consume(KafkaMessage<String, String> msg) {
    log.info("Received message (topic: {}, partition: {}) with key {}: {}", msg.getTopic(), msg.getPartition(), msg.getKey(), msg.getPayload());
    return msg.ack();
}
 类似资料:
  • 我希望设置所有RESTEasy资源的基路径,而不需要包含扩展的类。 基本上我想摆脱: 我怎样才能做到呢?

  • 我想测试一个简单的轮询示例https://smallrye.io/smallrye-mutiny/guides/polling并将服务的数据轮询到Kafka流中。 这是我要测试的一个类的简化示例: 下面是测试类: 我的实际示例的错误日志是: 我试图依靠Quarkus测试容器来提供Kafka的一个实例

  • 我想使用利用的新错误处理,但我还想使用使用注释。 问题是,附带了的包,这与冲突,因为它们都有JAXRSecurityConfig。denyJaxRs,导致以下错误: 有人能够使用这两个库吗?

  • 我正在分布式模式下使用 cp-kafka-connect Helm chart 在 Google Kubernetes Engine (GKE) 上部署 Kafka-connect。 一个工作的Kafka集群与代理和动物园管理员已经在同一个GKE集群上运行。我知道我可以通过发送帖子请求到endpoint来创建连接器,一旦它可用。但是,Kafka连接容器进入运行状态,然后开始加载jar文件,直到所有

  • :101:22:ERROR:•在表达式“count words”的第一个参数中的“hello”中,即表达式:countWords[“hello”,“hello”,“world”]中的“[”hello“,”hello“,”world“]”中,无法将预期类型“Char”与实际类型“[Char]”匹配• :101:31:error:•在表达式“count words”的第一个参数中的“world”中,即

  • 我试图用axios从VueJS到Laravel,这是我的API。 我得到了这个错误: