当前位置: 首页 > 知识库问答 >
问题:

从Redis读取数据到Flink

陶高峻
2023-03-14

我一直在试图找到一个连接器,将数据从Redis读取到Flink。Flink的文档中包含了要写入Redis的连接器的描述。在我的Flink工作中,我需要从Redis读取数据。在使用ApacheFlink进行数据流传输时,Fabian提到可以从Redis读取数据。可用于此目的的接头是什么?

共有3个答案

严瀚昂
2023-03-14

关于为ApacheFlink提供流式redis源代码连接器(请参阅Flink-3033),已经有过一些讨论,但目前还没有可用的。然而,实现一个应该并不困难。

燕璞
2023-03-14

我们正在生产中运行一个大致看起来像这样的

class RedisSource extends RichSourceFunction[SomeDataType] {

  var client: RedisClient = _

  override def open(parameters: Configuration) = {
    client = RedisClient() // init connection etc
  }

  @volatile var isRunning = true

  override def cancel(): Unit = {
    isRunning = false
    client.close()
  }

  override def run(ctx: SourceContext[SomeDataType]): Unit = while (isRunning) {
      for {
        data <- ??? // get some data from the redis client
      } yield ctx.collect(SomeDataType(data))

  }
} 

我认为这真的取决于你需要从redis获取什么。上面的方法可以用来从列表/队列中获取消息,转换/推送,然后从队列中删除它。Redis还支持Pub/Sub,因此可以订阅、获取SourceConext并向下游推送消息。

公羊涛
2023-03-14

目前,Flink Redis连接器不可用,但可以通过扩展RichSinkFunction/SinkFunction类来实现。

public class RedisSink extends RichSinkFunction<String> {

  @Override
  public void open(Configuration parameters) throws Exception {
      //open redis connection
  }

  @Override
  public void invoke(String map) throws Exception {
     //sink data to redis
  }

  @Override
  public void close() throws Exception {
     super.close();
  }

}
 类似资料:
  • Redis 服务器是 logstash 官方推荐的 broker 选择。Broker 角色也就意味着会同时存在输入和输出俩个插件。这里我们先学习输入插件。 LogStash::Inputs::Redis 支持三种 data_type(实际上是redis_type),不同的数据类型会导致实际采用不同的 Redis 命令操作: list => BLPOP channel => SUBSCRIBE pa

  • 问题内容: 我正在查询一个SQL数据库,我想使用熊猫来处理数据。但是,我不确定如何移动数据。以下是我的输入和输出。 问题答案: 答案更简短

  • 我是Flink大学的一年级新生,我想知道如何从hdfs读取数据。有谁能给我一些建议或简单的例子吗?谢谢大家。

  • 我试图从Firebase数据库中读取数据,我已经到处阅读和查找,但我已经走到了死胡同。 这就是我所做的一切。 依赖项: 实现'com.google.firebase: Firebase存储: 9.2.1' 实现'com。谷歌。firebase:firebase数据库:9.2。1' 实现'com。谷歌。firebase:firebase授权:9.2。1' 实现'com。谷歌。火基:火基核心:9.2。

  • 我已经看到了一些关于数据库到雪花的问题,但我的问题是如何将表格从雪花到数据库。 到目前为止我所做的:创建了一个集群并将集群附加到我的笔记本上(我正在使用Python) 然后我试着用spark.read读取雪花中的FBK _视频表: 我也尝试过:选项(“dbtable”,“从FBK_VIDEOS中选择*”).load() 但我看到的以下错误: net.snowflake.client.jdbc.Sn

  • 问题内容: 我正在尝试从golang中的telnet会话读取数据。为了达到这个目的,我编写了以下函数。 最初,我遇到一个问题,即我从没有数据的套接字读取数据,因此它将锁定并且永远不会返回。BufferSocketData是我尝试解决此问题的方法,因为我不知道是否有要读取的数据。这个想法是它将等待1秒钟,然后确定套接字中没有数据并返回一个空字符串。 GetData似乎第一次在缓冲区中有新数据时起作用