我是Kafka和quarkus的新手,我想在处理用户请求后向Kafka主题发送消息。
我已经浏览了Quarkus-快速入门中提供的kafka示例。我已经尝试使用KafkaMessage
// when GET called send message to topic
@GET
@Produces(MediaType.TEXT_PLAIN)
public String hello() {
generateSingle();
return "hello";
}
@Outgoing("single-stations")
public KafkaMessage<Integer, String> generateSingle() {
return KafkaMessage.of(1, "value");
};
但我得到了一个结果,那就是不断地向Kafka主题发送消息。
我想知道是否有其他方法或我的代码是否有任何问题。
帮助感谢
目前关于该主题的文档简洁且不完整(Quarkus 0.25.0)。我成功地做到了这一点,但这需要大量的实验,我相信这是一种黑客行为,有望在Quarkus的后续版本中得到修复。
其原理是,@Outgoing
方法必须产生一个由外部控制的流。这是通过Flowable创建流来实现的。在
,并向类成员公开发射器。@PostConstruct
方法中创建()@Outgoing
方法只返回已经构造好的流。
以下组件公开了一个公共方法,product(String message)
,该方法将向Kafka发送该文本消息:
package ...
import java.util.UUID;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.reactivestreams.Publisher;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.smallrye.reactive.messaging.kafka.KafkaMessage;
@ApplicationScoped
public class KafkaController {
private FlowableEmitter<KafkaMessage<String, String>> emitter;
private Flowable<KafkaMessage<String, String>> outgoingStream;
@PostConstruct
void init() {
outgoingStream = Flowable.create(emitter -> this.emitter = emitter, BackpressureStrategy.BUFFER);
}
public void produce(String message) {
emitter.onNext(KafkaMessage.of(UUID.randomUUID().toString(), message));
}
@PreDestroy
void dispose() {
emitter.onComplete();
}
@Outgoing("internal")
Publisher<KafkaMessage<String, String>> produceKafkaMessage() {
return outgoingStream;
}
@Incoming("internal")
@Outgoing("kafka-test")
KafkaMessage<String, String> transform(Message<KafkaMessage<String, String>> arg) {
return arg.getPayload();
}
}
我在生成的Quarkus应用程序中创建了这个类,如下所述:
mvn io.quarkus:quarkus-maven-plugin:0.25.0:create \
-DprojectGroupId=org.acme \
-DprojectArtifactId=kafka-quickstart \
-Dextensions="kafka"
并配置(application.properties
)如下:
kafka.bootstrap.servers=localhost:9092
mp.messaging.outgoing.kafka-test.connector=smallrye-kafka
mp.messaging.outgoing.kafka-test.topic=test
mp.messaging.outgoing.kafka-test.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.kafka-test.value.serializer=org.apache.kafka.common.serialization.StringSerializer
Kafka实例完全按照快速入门中的描述启动。您可以通过控制台侦听器观看测试
主题,如下所示:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic test --from-beginning --group test-console.consumer
要测试它,您可以创建一个JAX-RS资源来调用产生()
:
package ...
import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
@Path("/control")
public class KafkaProduceControlResource {
@Inject
KafkaController kafkaController;
@POST
@Path("/produce")
public void produceMessage(String message) {
kafkaController.produce(message);
}
}
从命令行调用它,如下所示,并观察控制台使用者:
curl -i -s -X POST -d "A text message" \
http://localhost:8080/control/produce
黑客:似乎用@Outgoing(“Kafka测试”)
注释produceKafkaMessage()
失败,因为Quarkus不理解KafkaMessage
是一条消息,并将其包装成一条,导致序列化错误。我用
“internal”
流绕过它。
所以我不确定如何做到这一点。我已经使用Qukus和MicroProfile反应式消息传递框架以及javax.websocket库的东西做到了这一点,但我不确定如何将其移植到使用Kafka Streams。使用MP反应式消息传递,我可以在其他类中的一个通道上有一个@外出注释,然后使用我的WebSocket服务,我可以像这样从该通道注入。 Kafka流有可能做到这一点吗?
我有一个应用程序,它定期生成原始JSON消息数组。我能够使用avro-tools将其转换为Avro。我这样做是因为由于Kafka-Connect JDBC接收器的限制,我需要消息包含模式。我可以在记事本上打开这个文件,看到它包括模式和几行数据。 现在,我想将其发送到我的中央Kafka代理,然后使用Kafka Connect JDBC接收器将数据放入数据库。我很难理解我应该如何将这些Avro文件发送
我正在尝试使用一个websocket发送音频消息,我应该将音频流改为什么类型的消息,以便我可以使用一个socket发送?如果我直接使用一个WebSocket.send(音频),我会得到一个错误“DOMException”,我应该把它改为二进制数据吗?怎么做的?我是一个全新的程序,所以请帮助!!!
我有一个有状态的应用程序,它维护与用户的会话。此应用程序有 5 个实例。 以下是主题: 所有主题都有5个分区。 Topic1和topic2分别用于建立州商店和全球故事。这两个主题都使用用户名作为消息键。这些主题中的数据由应用程序实例本身生成。 现在,另一个应用程序使用与消息键相同的用户名向topic3生成数据。 我的期望是它将进入同一个分区,该分区由在其本地状态存储中拥有该用户的实例使用。这是对的
我有一个问题与产生的消息Kafka的主题。 我使用来自外部供应商的Kafka管理服务,所以我问他经纪人的状况,他说一切都好。顺便说一句,它发生在三个不同的Kafka实例上。Kafka客户端版本也无关紧要-0.11.0.0和2.0.1都有。
我是Kafka的新手,当我试图发送信息到我得到的主题下面的错误。有人能帮我一下吗? [2018-09-23 13:37:56,613]警告[Producer Clientid=Console-Producer]无法建立到节点-1的连接。代理可能不可用。(org.apache.kafka.clients.NetworkClient)