当前位置: 首页 > 知识库问答 >
问题:

Quarkus有什么功能可以向Kafka发送消息吗

林英锐
2023-03-14

我是Kafka和quarkus的新手,我想在处理用户请求后向Kafka主题发送消息。

我已经浏览了Quarkus-快速入门中提供的kafka示例。我已经尝试使用KafkaMessage

// when GET called send message to topic
@GET
@Produces(MediaType.TEXT_PLAIN)
public String hello() {
    generateSingle();
    return "hello";
}

@Outgoing("single-stations")
public KafkaMessage<Integer, String> generateSingle() {
    return KafkaMessage.of(1, "value");
};

但我得到了一个结果,那就是不断地向Kafka主题发送消息。

我想知道是否有其他方法或我的代码是否有任何问题。

帮助感谢

共有1个答案

秦新立
2023-03-14

目前关于该主题的文档简洁且不完整(Quarkus 0.25.0)。我成功地做到了这一点,但这需要大量的实验,我相信这是一种黑客行为,有望在Quarkus的后续版本中得到修复。

其原理是,@Outgoing方法必须产生一个由外部控制的流。这是通过Flowable创建流来实现的。在@PostConstruct方法中创建(),并向类成员公开发射器。@Outgoing方法只返回已经构造好的流。

以下组件公开了一个公共方法,product(String message),该方法将向Kafka发送该文本消息:

package ...

import java.util.UUID;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.reactivestreams.Publisher;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.smallrye.reactive.messaging.kafka.KafkaMessage;

@ApplicationScoped
public class KafkaController {

    private FlowableEmitter<KafkaMessage<String, String>> emitter;

    private Flowable<KafkaMessage<String, String>> outgoingStream;

    @PostConstruct
    void init() {
        outgoingStream = Flowable.create(emitter -> this.emitter = emitter, BackpressureStrategy.BUFFER);
    }

    public void produce(String message) {
        emitter.onNext(KafkaMessage.of(UUID.randomUUID().toString(), message));
    }

    @PreDestroy
    void dispose() {
        emitter.onComplete();
    }

    @Outgoing("internal")
    Publisher<KafkaMessage<String, String>> produceKafkaMessage() {
        return outgoingStream;
    }

    @Incoming("internal")
    @Outgoing("kafka-test")
    KafkaMessage<String, String> transform(Message<KafkaMessage<String, String>> arg) {
        return arg.getPayload();
    }
}

我在生成的Quarkus应用程序中创建了这个类,如下所述:

mvn io.quarkus:quarkus-maven-plugin:0.25.0:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=kafka-quickstart \
    -Dextensions="kafka"

并配置(application.properties)如下:

kafka.bootstrap.servers=localhost:9092

mp.messaging.outgoing.kafka-test.connector=smallrye-kafka
mp.messaging.outgoing.kafka-test.topic=test
mp.messaging.outgoing.kafka-test.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.kafka-test.value.serializer=org.apache.kafka.common.serialization.StringSerializer

Kafka实例完全按照快速入门中的描述启动。您可以通过控制台侦听器观看测试主题,如下所示:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic test --from-beginning --group test-console.consumer

要测试它,您可以创建一个JAX-RS资源来调用产生()

package ...

import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;

@Path("/control")
public class KafkaProduceControlResource {

    @Inject
    KafkaController kafkaController;

    @POST
    @Path("/produce")
    public void produceMessage(String message) {
        kafkaController.produce(message);
    }
}

从命令行调用它,如下所示,并观察控制台使用者:

curl -i -s -X POST -d "A text message" \
    http://localhost:8080/control/produce

黑客:似乎用@Outgoing(“Kafka测试”)注释produceKafkaMessage()失败,因为Quarkus不理解KafkaMessage是一条消息,并将其包装成一条,导致序列化错误。我用“internal”流绕过它。

 类似资料:
  • 所以我不确定如何做到这一点。我已经使用Qukus和MicroProfile反应式消息传递框架以及javax.websocket库的东西做到了这一点,但我不确定如何将其移植到使用Kafka Streams。使用MP反应式消息传递,我可以在其他类中的一个通道上有一个@外出注释,然后使用我的WebSocket服务,我可以像这样从该通道注入。 Kafka流有可能做到这一点吗?

  • 我有一个应用程序,它定期生成原始JSON消息数组。我能够使用avro-tools将其转换为Avro。我这样做是因为由于Kafka-Connect JDBC接收器的限制,我需要消息包含模式。我可以在记事本上打开这个文件,看到它包括模式和几行数据。 现在,我想将其发送到我的中央Kafka代理,然后使用Kafka Connect JDBC接收器将数据放入数据库。我很难理解我应该如何将这些Avro文件发送

  • 我正在尝试使用一个websocket发送音频消息,我应该将音频流改为什么类型的消息,以便我可以使用一个socket发送?如果我直接使用一个WebSocket.send(音频),我会得到一个错误“DOMException”,我应该把它改为二进制数据吗?怎么做的?我是一个全新的程序,所以请帮助!!!

  • 我有一个有状态的应用程序,它维护与用户的会话。此应用程序有 5 个实例。 以下是主题: 所有主题都有5个分区。 Topic1和topic2分别用于建立州商店和全球故事。这两个主题都使用用户名作为消息键。这些主题中的数据由应用程序实例本身生成。 现在,另一个应用程序使用与消息键相同的用户名向topic3生成数据。 我的期望是它将进入同一个分区,该分区由在其本地状态存储中拥有该用户的实例使用。这是对的

  • 我有一个问题与产生的消息Kafka的主题。 我使用来自外部供应商的Kafka管理服务,所以我问他经纪人的状况,他说一切都好。顺便说一句,它发生在三个不同的Kafka实例上。Kafka客户端版本也无关紧要-0.11.0.0和2.0.1都有。

  • 我是Kafka的新手,当我试图发送信息到我得到的主题下面的错误。有人能帮我一下吗? [2018-09-23 13:37:56,613]警告[Producer Clientid=Console-Producer]无法建立到节点-1的连接。代理可能不可用。(org.apache.kafka.clients.NetworkClient)