当前位置: 首页 > 工具软件 > redis_admin > 使用案例 >

Flink1.9系列-connnector-redis篇

燕正德
2023-12-01

首先,预祝大家2020年多福多寿,少宰少难!!!
作为2020年的第一篇博文,再不写的话就对不起大家了!!!


好,废话少说,今天这篇文章主要是解决你在做实时计算的时候,将数据sink到redis的种种问题
实时计算流程框架其实比较简单,目前比较流行的也就是kafka+flink+redis或者kafka+flink+hbase

前面kafka+flink的流程稍后会专门来写,本篇主要写flink sink redis技术

1. Redis数据结构

Redis的数据结构常用的无非就是两种:

  1. SET
  2. HSET

这里并不展开说这两种数据结构的具体定义和使用场景,主要针对这两种数据类型具体说明flink sink的不同用法;

2.引包(特别注意)

目前市场上的私服绝大部分都是这个包

org.apache.flink:flink-connector-redis_2.11:1.1.5

如果你使用的SET结构类型,那这个包完全可以满足你的需求;
如果你使用的是HSET类型,并且不需要从数据中提取additionalKey,那这个也可以满足,例如:

hget 1000038 20200216:$action:exchange:311::

你的1000038是固定的,不随数据变化而变化,那就不用接着往下看了

但是,如果redisKey是需要从数据里面提取的话,那就有必要看一下官方文档了~~~

3.下载并编译org.apache.bahir提供的flink-connector-redis包

参考flink的官方文档:Flink Redis Connector

<dependency>
  <groupId>org.apache.bahir</groupId>
  <artifactId>flink-connector-redis_2.11</artifactId>
  <version>1.1-SNAPSHOT</version>
</dependency>

可是你是不是在哪个私服都找不到这个jar包,恭喜你,请继续往下看:
打眼一看,这个jar包是SNAPSHOT的,所以一般仓库也没有,需要从源码编译了

  1. 下载源码 源码github链接
  2. 按照github的说明编译源码

4.上传jar包到自己的私服上

/opt/maven/apache-maven-3.5.4/bin/mvn deploy:deploy-file -DgroupId=org.apache.bahir -DartifactId=flink-connector-redis_2.11 -Dversion=1.1-SNAPSHOT -Dpackaging=jar -Dfile=/opt/software/bahir-flink-master/flink-connector-redis/target/flink-connector-redis_2.11-1.1-SNAPSHOT.jar -Durl=http://hadoop03:8081/nexus/content/repositories/snapshots/ -DrepositoryId=snapshots

/opt/maven/apache-maven-3.5.4/bin/mvn deploy:deploy-file -DgroupId=org.apache.bahir -DartifactId=flink-connector-redis_2.11 -Dversion=1.1-SNAPSHOT -Dclassifier=sources -Dpackaging=jar -Dfile=/opt/software/bahir-flink-master/flink-connector-redis/target/flink-connector-redis_2.11-1.1-SNAPSHOT-sources.jar -Durl=http://hadoop03:8081/nexus/content/repositories/snapshots/ -DrepositoryId=snapshots

/opt/maven/apache-maven-3.5.4/bin/mvn deploy:deploy-file -DgroupId=org.apache.bahir -DartifactId=flink-connector-redis_2.11 -Dversion=1.1-SNAPSHOT -Dclassifier=test-sources -Dpackaging=jar -Dfile=/opt/software/bahir-flink-master/flink-connector-redis/target/flink-connector-redis_2.11-1.1-SNAPSHOT-test-sources.jar -Durl=http://hadoop03:8081/nexus/content/repositories/snapshots/ -DrepositoryId=snapshots

将命令中的路径修改为你自己环境的路径,并依次执行完成上传动作;

如果出现下面的错误:

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-deploy-plugin:2.7:deploy-file (default-cli) on project standalone-pom: Failed to deploy artifacts: Could not transfer artifact org.apache.bahir:flink-connector-redis_2.11:jar:1.1-20200216.012547-1 from/to snapshots (http://hadoop03:8081/nexus/content/repositories/snapshots/): Failed to transfer file: http://hadoop03:8081/nexus/content/repositories/snapshots/org/apache/bahir/flink-connector-redis_2.11/1.1-SNAPSHOT/flink-connector-redis_2.11-1.1-20200216.012547-1.jar. Return code is: 401, ReasonPhrase: Unauthorized. -> [Help 1]
[ERROR]

解决方案
在maven的setting文件中添加下面的代码即可,注意将密码修改为私服的密码,再次执行上面的命令即可

<server>
  <id>snapshots</id>
  <username>admin</username>
  <password>123</password>
</server>

5.使用flink-connector-redis,将数据sink到redis中

5.1 导包

compile 'org.apache.bahir:flink-connector-redis_2.11:1.1-SNAPSHOT'
compile 'redis.clients:jedis:3.1.0'

5.2 继承RedisMapper类

/**
  * @ClassName Redis
  * @Description
  * @Author HuZhongJin
  * @Date 2020/2/15 14:09
  * @Version 1.0
  */
class RedisHSetSink extends RedisMapper[(String, String, Long, Long, String)] {

  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.HSET, "enbrands_")
  }

  override def getKeyFromData(data: (String, String, Long, Long, String)): String = {
    data._2
  }

  override def getValueFromData(data: (String, String, Long, Long, String)): String = {
    val json = new JSONObject()
    json.put("pv", String.valueOf(data._3))
    json.put("uv", String.valueOf(data._4))
    json.put("date", data._5)
    json.toJSONString
  }

  override def getAdditionalKey(data: (String, String, Long, Long, String)): util.Optional[String] = {
    util.Optional.of("enbrands_" + data._1)
  }
}

注意:

  1. getAdditionalKey 用于从数据中提取redis key
  2. getKeyFromData 用于从数据中提取hkey
  3. getValueFromData 用于从数据中提取hvalue

6. org.apache.flink和org.apache.bahir提供的jar包的差别

其实这个看源码比较方便,我大概列一下差别吧,主要说org.apache.bahir有的,而org.apache.flink没有的

/**
     * Extracts the additional key from data as an {@link Optional<String>}.
     * The default implementation returns an empty Optional.
     *
     * @param data
     * @return Optional
     */
    default Optional<String> getAdditionalKey(T data) {
        return Optional.empty();
    }

    /**
     * Extracts the additional time to live (TTL) for data as an {@link Optional<Integer>}.
     * The default implementation returns an empty Optional.
     *
     * @param data
     * @return Optional
     */
    default Optional<Integer> getAdditionalTTL(T data) {
        return Optional.empty();
    }

其实就多了上面的两个方法,一个是为了提取redis key,一个是为了指定ttl,具体使用的时候大家再仔细领会

 类似资料: