目标是:开发一个自定义Kafka连接器,该连接器以循环方式从websocket读取消息。我试着给你们举一个我所认识到的例子:
我创建了一个接口IWebsocketClientEndpoint
public interface IWebsocketClientEndpoint {
IWebsocketClientEndpoint Connect() ;
void Disconnect() throws IOException;
IWebsocketClientEndpoint addMessageHandler(IMessageHandler msgHandler);
void SendMessage(String message) throws Exception;
void SendMessage(ByteBuffer message) throws Exception;
void SendMessage(Object message) throws Exception;
boolean isOpen();
void Dispose()throws IOException;
}
以及实现上述接口的类:
@ClientEndpoint
public class WebsocketClientEndpoint implements IWebsocketClientEndpoint {
private WebSocketContainer _container;
private Session _userSession = null;
private IMessageHandler _messageHandler;
private URI _endpointURI;
private WebsocketClientEndpoint(URI endpointURI) {
try {
_endpointURI = endpointURI;
_container = ContainerProvider.getWebSocketContainer();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private WebsocketClientEndpoint(URI endpointURI, int bufferSize) {
try {
_endpointURI = endpointURI;
_container = ContainerProvider.getWebSocketContainer();
_container.setDefaultMaxBinaryMessageBufferSize(bufferSize);
_container.setDefaultMaxTextMessageBufferSize(bufferSize);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static IWebsocketClientEndpoint Create(URI endpointURI){
return new WebsocketClientEndpoint(endpointURI);
}
public static IWebsocketClientEndpoint Create(URI endpointURI,int bufferSize){
return new WebsocketClientEndpoint(endpointURI,bufferSize);
}
public IWebsocketClientEndpoint Connect() {
try {
_container.connectToServer(this, _endpointURI);
return this;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@OnOpen
public void onOpen(Session userSession) {
this._userSession = userSession;
if (this._messageHandler != null) {
this._messageHandler.handleOpen("Web socket "+ _endpointURI +" opened");}
}
@OnClose
public void onClose(Session userSession, CloseReason reason) {
this._userSession = null;
if (this._messageHandler != null) {
this._messageHandler.handleClose("Web socket "+ _endpointURI +" closed. Reason: " + reason.getReasonPhrase());}
}
public void Disconnect() throws IOException {
CloseReason reason = new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE,"Web socket closed by user");
this._userSession.close(reason);
this._userSession = null;
//close notification to caller
if (this._messageHandler != null) {
this._messageHandler.handleClose("Web socket "+ _endpointURI +" closed. Reason: " + reason.getReasonPhrase());}
}
@Override
public IWebsocketClientEndpoint addMessageHandler(IMessageHandler msgHandler) {
this._messageHandler = msgHandler;
return this;
}
@OnMessage
public void onMessage(String message) {
if (this._messageHandler != null) {
this._messageHandler.handleMessage(message);
}
}
@OnMessage
public void onMessage(ByteBuffer bytes) {
if (this._messageHandler != null) {
this._messageHandler.handleMessage(bytes);
}
}
public void SendMessage(String message) throws Exception {
try{
this._userSession.getAsyncRemote().sendText(message);
}catch (Exception ex){
throw ex;
}
}
public void SendMessage(ByteBuffer message) throws Exception {
try{
this._userSession.getAsyncRemote().sendBinary(message);
}catch (Exception ex){
throw ex;
}
}
public void SendMessage(Object message) throws Exception {
this._userSession.getAsyncRemote().sendObject(message);
}catch (Exception ex){
throw ex;
}
}
@Override
public boolean isOpen() {
if (this._userSession != null){
return this._userSession.isOpen();
}
return false;
}
}
WebsocketClientEndpoint类专用于创建websocket并管理连接、断开连接、发送和接收消息。
目标是:如何在Kafka连接结构中调整我的websocket结构?我可以将从套接字接收到的消息(“public void handleMessage(String s))排入ConcurrentLinkedQueue,然后在kafka连接循环方法中,将其解列。但这是最好的解决方案吗?
下面,我的Kafka自定义连接器的实现
My kafka Connector
public class MySourceTask extends SourceTask {
IWebsocketClientEndpoint _clientEndPoint;
@Override
public void start(Map<String, String> props) {
_clientEndPoint = WebsocketClientEndpoint
.Create(new URI(socket))
.Connect();
_clientEndPoint.addMessageHandler(new IMessageHandler() {
@Override
public void handleMessage(String s) {
}
@Override
public void handleMessage(ByteBuffer byteBuffer) {
}
@Override
public void handleClose(String s) {
}
@Override
public void handleOpen(String s) {
}
});
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
return null;
}
@Override
public void stop() {
_clientEndPoint.Dispose();
}
}
提前感谢所有人
我建议将接口添加到类中
extends SourceTask implements IMessageHandler
然后呢
_clientEndPoint.addMessageHandler(this);
当您实现handleMessage时,将这些字符串添加到一些队列中。在poll方法中,您可以从该队列中弹出数据来创建要返回的SourceRecord对象。
在停止内,调用此。关闭并清理其他资源。
我正在尝试使用docker容器中的kafka connect和一个自定义连接器(PROGRESS _ DATADIRECT _ JDBC _ OE _ all . jar)来连接openedge数据库。 我将JAR文件放在插件路径(usr/share/java)中,但它不会作为连接器加载。 我可以通过将另一个(标准)连接器放在插件路径中来加载它。这行得通 有点不知道如何前进,我对Kafka很陌生。
我有Kafka主题,有多种类型的消息流入并使用Kafka Connect写入弹性搜索。流看起来不错,直到我不得不将唯一的消息集分离到唯一的索引中。也就是说,我必须根据字段(JSON消息)为新的数据集获取新的索引。 我如何配置/定制Kafka connect以实现同样的功能?每个消息都包含一个表示消息类型和时间戳的字段。 示例 Json 如下所示: Sample1: {“log”:{“data”:“
问题内容: 我正在尝试测试安全的网络套接字,但是遇到了麻烦。这是我的测试: 创建后,这是“ ws”的日志: 我没有从打开回来的日志。我正在本地运行该项目,并且当我使用Chrome Advanced Rest Client工具时,可以正常连接。 我想念什么吗?请帮忙。 编辑: 我添加并注销, 我也尝试遵循此代码,但得到相同的错误。 问题答案: 该模块正在拒绝您的自签名证书(正如人们希望的那样)。您可
我创建了自己的库(com.custom.mylib),它返回一个字符串,如下所示。 我创建了一个将使用上述库的项目。我已将lib作为pom依赖项包含在内。但是当我尝试从我的应用程序调用库方法时。我得到了下面的错误。如何解决它? 请考虑在您的配置中定义一个“com.custom.mylog.MyLibrary”类型的bean。 我在application.properties文件中也有下面的内容,这
对于我的项目,我们被要求实现我们自己的连接池。我们不允许使用来自jdbc的PGPoolingDataSource。当我使用jdbc池时,我的程序运行得非常快,而我自己的连接池运行得不可预测,速度也慢得多。我的连接使一些任务等待了很长时间,这是jdbc池所没有的。 我正在使用Arrayblockingqueue实现连接池,我只是创建一个预先说过的连接数,然后让客户机借用并放回。 我是说这对我来说似乎
关闭连接标志着服务器和客户端之间的通信结束。使用事件可以关闭连接,标记通信结束后,服务器和客户端之间无法进一步传输消息。由于连接不良,也可能发生事件。 方法代表再见握手。它终止连接,除非连接再次打开,否则不能交换任何数据。 与前面的示例类似,当用户单击第二个按钮时,调用方法。 也可以传递前面提到的代码和原因说明参数,如下所示。 以下代码完整概述了如何关闭或断开Web Socket连接 - 在浏览器