我正在试用Kafka和Flink:
如果我创建一个基本的RESTWebServices,我想我会失去流媒体的兴趣,对吗?
我应该向我的网络应用程序提供flink数据,还是应该将其发送到另一个Kafka主题,以便将其提供给网络应用程序?
非常感谢。
安托万
我应该向我的网络应用程序提供flink数据,还是应该将其发送到另一个Kafka主题,以便将其提供给网络应用程序?
这不会改变什么,你会失去对网络服务的流媒体兴趣吗?你想过使用websocket引擎吗?为了以流的形式向客户端发送消息?
取决于你的后端技术:-https://github.com/socketio/engine.io-https://docs.spring.io/spring/docs/current/spring-framework-reference/html/websocket.html#websocket-server-...
例如,我想用和测试Kafka/Flink的集成。 该过程将是: 与Flink一起阅读Kafka主题 用Flink进行一些操作 和Flink一起写另一个Kafka主题 以字符串为例,从输入主题中读取字符串,转换为大写,写入新主题。 问题是如何测试流量? 当我说测试时,这是单元/集成测试。 谢谢!
我试图将数据从kafka主题读入DataStream并注册DataStream,然后使用tableEnvironment.sqlQuery(“sql”)查询数据,当tableEnvironment.execute()没有错误也没有输出时。 依赖关系: flink-streaming-java2.11版本:1.9.0-csa1.0.0.0; Flink-Streaming-Scala2.11版本:1
如果是,请把我放在轨道上实现。
我有一个Flink 1.11作业,它使用来自Kafka主题的消息,键入它们,过滤它们(keyBy后跟自定义ProcessFunction),并通过JDBC接收器将它们保存到db中(如下所述:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/jdbc.html) Kafka消费者使用以下选项初始化:
我有一个我真的无法解决的问题。所以我有一个kafka流,其中包含一些这样的数据: 我想用另一个值“bookingId”替换“adId”。此值位于csv文件中,但我无法真正弄清楚如何使其工作。 这是我的映射csv文件: 所以我的输出最好是这样的 该文件可以每小时至少刷新一次,因此它应该会接收对它的更改。 我目前有一个不适合我的代码: 代码只运行一次,然后停止,因此它不会使用csv文件转换kafka中
不幸的是,Kafka Flink连接器只支持-csv、json和avro格式。因此,我不得不使用较低级别的API(数据流)。 问题:如果我可以从datastream对象中创建一个表,那么我就可以接受在该表上运行的查询。它将使转换部分无缝和通用。是否可以在数据流对象上运行SQL查询?