当前位置: 首页 > 知识库问答 >
问题:

从Kafka流向带有Quarkus的WebSocket发送消息

华建同
2023-03-14

所以我不确定如何做到这一点。我已经使用Qukus和MicroProfile反应式消息传递框架以及javax.websocket库的东西做到了这一点,但我不确定如何将其移植到使用Kafka Streams。使用MP反应式消息传递,我可以在其他类中的一个通道上有一个@外出注释,然后使用我的WebSocket服务,我可以像这样从该通道注入。

@ServerEndpoint("/validatedmessages")
@ApplicationScoped
public class WebSocket {

@Inject @Channel("post-final-check") Flowable<CustomMessage> finalizedMessages;
//private Jsonb jsonb;
ObjectMapper obj = new ObjectMapper(); 

private static final Logger LOGGER = Logger.getLogger(WebSocket.class);

private List<Session> sessions = new CopyOnWriteArrayList<>();
private Disposable subscription;

@OnOpen
public void onOpen(Session session) {
    sessions.add(session);
}

@OnClose
public void onClose(Session session) {
    sessions.remove(session);
}

@PostConstruct
public void subscribe() {
    subscription = finalizedMessages.subscribe(message -> sessions.forEach(session -> {
        try {
            write(session, message);
        } catch (JsonProcessingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }));
}

@PreDestroy
public void cleanup() throws Exception {
    subscription.dispose();
    //jsonb.close();
}

Kafka流有可能做到这一点吗?

共有1个答案

程胡非
2023-03-14

您将Kafka Streams的输出存储在哪里?在主题还是KTable中?

如果是第一个,那么您可以注入一个Publisher或使用@传入(主题)。否则您需要使用InteractiveQueries,请参阅此处的示例:

https://quarkus.io/guides/kafka-streams#interactive-询问

 类似资料:
  • 我是Kafka和quarkus的新手,我想在处理用户请求后向Kafka主题发送消息。 我已经浏览了Quarkus-快速入门中提供的kafka示例。我已经尝试使用KafkaMessage 但我得到了一个结果,那就是不断地向Kafka主题发送消息。 我想知道是否有其他方法或我的代码是否有任何问题。 帮助感谢

  • 问题内容: 这段代码一切正常(将其缩短以便更好地阅读)。 当向服务器发送请求时,服务器会立即响应他。但是,其他客户端看不到响应消息。 因此,我想进一步说明:当客户端向服务器发送请求时,服务器将响应所有客户端,以便所有客户端都能看到消息。 我怎样才能做到这一点?有任何示例或不错的入门教程吗? 提前致谢! 服务器: 问题答案: 您必须使用连接池将消息广播到所有连接。您可以将其用作教程/示例http:/

  • 我有一个应用程序,它定期生成原始JSON消息数组。我能够使用avro-tools将其转换为Avro。我这样做是因为由于Kafka-Connect JDBC接收器的限制,我需要消息包含模式。我可以在记事本上打开这个文件,看到它包括模式和几行数据。 现在,我想将其发送到我的中央Kafka代理,然后使用Kafka Connect JDBC接收器将数据放入数据库。我很难理解我应该如何将这些Avro文件发送

  • 如何使用新的Spring Cloud Stream Kafka功能模型发送消息? 不推荐的方式是这样的。 但是我如何以函数式风格发送消息呢? 应用yml公司 我会自动连接MessageChannel,但对于process、process-out-0、output或类似的东西,没有MessageChannel Bean。或者我可以用供应商Bean发送消息吗?谁能给我举个例子吗?谢谢!

  • 我是新手。NET(C#)和WebSocket。。。我已经安装了VS2012和Windows Server 2012,并且已经启动并运行了WebSocket。我似乎无法从一个套接字接收消息并将其发送到另一个特定的套接字。唯一的选择似乎是向所有套接字广播消息。有没有一种方法可以将消息只发送给特定的用户?我希望聊天室主持人有机会拒绝不适当的帖子。

  • 我很清楚,我必须使用完整功能的broker支持,将STOMP作为子协议。我们已经使用rabbit进行通信,所以它是显而易见的选择。 文档说明“STOMP broker中继还为每个连接的WebSocket客户端创建一个单独的TCP连接。”它是每个用户会话的单独连接,而不是每个用户。此外,当发送端位于同一应用程序(负责处理用户会话的应用程序)内时,每个会话都创建一个单独的队列。我认为,希望发送消息的其