当前位置: 首页 > 知识库问答 >
问题:

Spring Websocket与Kafka的集成

西门凯康
2023-03-14

在Spring MVC项目中,我试图通过Spring Websockets将使用过的Kafka数据发送到前端(JavaScript)。

为了建立服务器和客户端之间的通信,我有以下内容。

客户端(app.js)

function connect() {
    var socket = new SockJS('/kafka-data-websocket');
    stompClient = Stomp.over(socket);
    stompClient.connect({}, function (frame) {
        console.log('Connected: ' + frame);
        stompClient.send("/app/fetchData");
        stompClient.subscribe('/data/records', function (message) {
            console.log(JSON.parse(message.body).content);
        });
    });
}

服务器(KafkaController.java)

@Controller
public class KafkaController {

    @MessageMapping("/fetchData")
    @SendTo("/data/records")
    public String fetchMetrics() {
        //...
    }
}

要使用来自特定Kafka主题的数据,我使用@KafkaListener注释如下:

public class KafkaReceiver {
    @KafkaListener(topics = "mytopic")
    public void receive(ConsumerRecord<?, ?> record) {
        MyRecord m = new MyRecord(new Long(record.offset()), record.key().toString(), record.value().toString());
           //...
    }
}

我有一个适当的Kafkanconfig类,包含所有必要的bean(如这里所解释的)。

共有1个答案

公羊浩气
2023-03-14

您应该将SimpMessagingTemboard注入KafkaRecencer并从接收()方法使用它:

 this.template.convertAndSend("/data/records", m);

请参阅《Spring框架参考手册》中的更多信息。

 类似资料:
  • 我试图测试camel与kafka的集成,如这里所述 以下是我的代码 然而,我得到了以下错误 请提出可能遗漏的内容。

  • 我正在尝试连接陋居到AWS MSKKafka。我一直收到下面的消息。我能够连接到MSK从相同的EC2实例以下步骤。但是陋居是不能连接。我们需要指定truststore,我不能在陋居中设置它。如有任何帮助,不胜感激。

  • 我正试图通过带有SASL配置的spring集成模块连接到kafka服务器,但出现错误 JAVAlang.IllegalArgumentException:在JAAS配置中找不到“KafkaClient”条目。系统属性的java。安全auth。登录。未设置“配置” 但当我构建简单的消费者和民意调查消息时,一切都很好。有人能告诉我如何关闭JAAS授权或通过它正确连接吗。这是我的Kafka。java和S

  • 我们有kafka集群,包含3个kafka代理节点和3个zookeepers服务器 Kafka版本- 10.1 ( hortonworks) 根据我的理解,因为所有的元数据都位于zookeeper服务器上,kafka代理正在使用这些数据(kafka通过端口2181与zookeeper服务器对话) 我只是想知道是否每台kafka机器都与集群中的其他kafka交谈,或者kafka可能只在动物园管理员服务

  • 我试图在Spring Integration中定义一个简单的消息流,它从一个通道读取消息,然后将消息转储到Kafka队列中。为此,我使用了spring集成kafka。问题是我得到了一个无法解读的错误。 以下是我的XML配置: 当我通过Spring启动运行我的应用程序时,我得到这个异常: 创建名称为org.springframework.integration.kafka.outbound.Kafk

  • 通过和注释,我得到以下错误: 然后添加以下配置: 错误消失了。但是,我想我应该能够按照文档中的建议,使用属性自动配置它。 null