当前位置: 首页 > 知识库问答 >
问题:

Apache kafka connect-自定义websocket连接器

顾泰平
2023-03-14

目标是:开发一个自定义Kafka连接器,该连接器以循环方式从websocket读取消息。我试着给你们举一个我所认识到的例子:

我创建了一个接口IWebsocketClientEndpoint

    public interface IWebsocketClientEndpoint {
    IWebsocketClientEndpoint Connect() ;
    void Disconnect() throws IOException;
    IWebsocketClientEndpoint addMessageHandler(IMessageHandler msgHandler);
    void SendMessage(String message) throws Exception;
    void SendMessage(ByteBuffer message) throws Exception;
    void SendMessage(Object message) throws Exception;
    boolean isOpen();
    void Dispose()throws IOException;
}

以及实现上述接口的类:

@ClientEndpoint
public class WebsocketClientEndpoint implements IWebsocketClientEndpoint {

    private WebSocketContainer _container;
    private Session _userSession = null;
    private IMessageHandler _messageHandler;
    private URI _endpointURI;

    private WebsocketClientEndpoint(URI endpointURI) {
        try {
            _endpointURI = endpointURI;
            _container = ContainerProvider.getWebSocketContainer();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }


    private WebsocketClientEndpoint(URI endpointURI, int bufferSize) {
        try {
            _endpointURI = endpointURI;
            _container = ContainerProvider.getWebSocketContainer();
            _container.setDefaultMaxBinaryMessageBufferSize(bufferSize);
            _container.setDefaultMaxTextMessageBufferSize(bufferSize);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

  
    public static IWebsocketClientEndpoint Create(URI endpointURI){
        return new WebsocketClientEndpoint(endpointURI);
    }

    public static IWebsocketClientEndpoint Create(URI endpointURI,int bufferSize){
        return new WebsocketClientEndpoint(endpointURI,bufferSize);
    }

    public IWebsocketClientEndpoint Connect()  {
        try {
                      _container.connectToServer(this, _endpointURI);
            return this;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }


    @OnOpen
    public void onOpen(Session userSession) {
        this._userSession = userSession;

       if (this._messageHandler != null) {
            this._messageHandler.handleOpen("Web socket "+ _endpointURI +" opened");}
    }


    @OnClose
    public void onClose(Session userSession, CloseReason reason) {

        this._userSession = null;

        if (this._messageHandler != null) {
            this._messageHandler.handleClose("Web socket "+ _endpointURI +" closed. Reason: " + reason.getReasonPhrase());}
    }

    public void Disconnect() throws IOException {
        CloseReason reason  = new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE,"Web socket closed by user");

        this._userSession.close(reason);
        this._userSession = null;
        //close notification to caller
        if (this._messageHandler != null) {
            this._messageHandler.handleClose("Web socket "+ _endpointURI +" closed. Reason: " + reason.getReasonPhrase());}
    }

    @Override
    public IWebsocketClientEndpoint addMessageHandler(IMessageHandler msgHandler) {
        this._messageHandler = msgHandler;
        return  this;
    }

    @OnMessage
    public void onMessage(String message) {
        if (this._messageHandler != null) {
            this._messageHandler.handleMessage(message);
        }
    }

    @OnMessage
    public void onMessage(ByteBuffer bytes) {

        if (this._messageHandler != null) {
            this._messageHandler.handleMessage(bytes);
        }
    }

    public void SendMessage(String message) throws Exception {
        try{
           this._userSession.getAsyncRemote().sendText(message);
        }catch (Exception ex){
            throw ex;
        }
    }

    public void SendMessage(ByteBuffer message) throws Exception {
        try{
            this._userSession.getAsyncRemote().sendBinary(message);
        }catch (Exception ex){
            throw ex;
        }
    }


    public void SendMessage(Object message) throws Exception {
            this._userSession.getAsyncRemote().sendObject(message);
        }catch (Exception ex){
            throw ex;
        }
    }

    @Override
    public boolean isOpen() {
        if (this._userSession != null){
            return  this._userSession.isOpen();
        }
        return false;
    }
}

WebsocketClientEndpoint类专用于创建websocket并管理连接、断开连接、发送和接收消息。

目标是:如何在Kafka连接结构中调整我的websocket结构?我可以将从套接字接收到的消息(“public void handleMessage(String s))排入ConcurrentLinkedQueue,然后在kafka连接循环方法中,将其解列。但这是最好的解决方案吗?

下面,我的Kafka自定义连接器的实现

My kafka Connector

public class MySourceTask extends SourceTask {
IWebsocketClientEndpoint _clientEndPoint;

    @Override
    public void start(Map<String, String> props) {

                _clientEndPoint = WebsocketClientEndpoint
                        .Create(new URI(socket))
                        .Connect();

            _clientEndPoint.addMessageHandler(new IMessageHandler() {
                @Override
                public void handleMessage(String s) {

                }

                @Override
                public void handleMessage(ByteBuffer byteBuffer) {

                }

                @Override
                public void handleClose(String s) {

                }

                @Override
                public void handleOpen(String s) {

                }
            });
     
    }

    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        return null;
    }

    @Override
    public void stop() {
            _clientEndPoint.Dispose();
    }
}

提前感谢所有人

共有1个答案

后焕
2023-03-14

我建议将接口添加到类中

extends SourceTask implements IMessageHandler

然后呢

_clientEndPoint.addMessageHandler(this);

当您实现handleMessage时,将这些字符串添加到一些队列中。在poll方法中,您可以从该队列中弹出数据来创建要返回的SourceRecord对象。

在停止内,调用此。关闭并清理其他资源。

 类似资料:
  • 我正在尝试使用docker容器中的kafka connect和一个自定义连接器(PROGRESS _ DATADIRECT _ JDBC _ OE _ all . jar)来连接openedge数据库。 我将JAR文件放在插件路径(usr/share/java)中,但它不会作为连接器加载。 我可以通过将另一个(标准)连接器放在插件路径中来加载它。这行得通 有点不知道如何前进,我对Kafka很陌生。

  • 我有Kafka主题,有多种类型的消息流入并使用Kafka Connect写入弹性搜索。流看起来不错,直到我不得不将唯一的消息集分离到唯一的索引中。也就是说,我必须根据字段(JSON消息)为新的数据集获取新的索引。 我如何配置/定制Kafka connect以实现同样的功能?每个消息都包含一个表示消息类型和时间戳的字段。 示例 Json 如下所示: Sample1: {“log”:{“data”:“

  • 问题内容: 我正在尝试测试安全的网络套接字,但是遇到了麻烦。这是我的测试: 创建后,这是“ ws”的日志: 我没有从打开回来的日志。我正在本地运行该项目,并且当我使用Chrome Advanced Rest Client工具时,可以正常连接。 我想念什么吗?请帮忙。 编辑: 我添加并注销, 我也尝试遵循此代码,但得到相同的错误。 问题答案: 该模块正在拒绝您的自签名证书(正如人们希望的那样)。您可

  • 我创建了自己的库(com.custom.mylib),它返回一个字符串,如下所示。 我创建了一个将使用上述库的项目。我已将lib作为pom依赖项包含在内。但是当我尝试从我的应用程序调用库方法时。我得到了下面的错误。如何解决它? 请考虑在您的配置中定义一个“com.custom.mylog.MyLibrary”类型的bean。 我在application.properties文件中也有下面的内容,这

  • 对于我的项目,我们被要求实现我们自己的连接池。我们不允许使用来自jdbc的PGPoolingDataSource。当我使用jdbc池时,我的程序运行得非常快,而我自己的连接池运行得不可预测,速度也慢得多。我的连接使一些任务等待了很长时间,这是jdbc池所没有的。 我正在使用Arrayblockingqueue实现连接池,我只是创建一个预先说过的连接数,然后让客户机借用并放回。 我是说这对我来说似乎

  • 关闭连接标志着服务器和客户端之间的通信结束。使用事件可以关闭连接,标记通信结束后,服务器和客户端之间无法进一步传输消息。由于连接不良,也可能发生事件。 方法代表再见握手。它终止连接,除非连接再次打开,否则不能交换任何数据。 与前面的示例类似,当用户单击第二个按钮时,调用方法。 也可以传递前面提到的代码和原因说明参数,如下所示。 以下代码完整概述了如何关闭或断开Web Socket连接 - 在浏览器