flink-redis-connector实现

欧阳向文
2023-12-01

分享一个项目flink-redis-connector,功能如下:

  1. 支持Flink SQL写Redis
  2. 支持Flink SQL读Redis维表(高时效性&提供缓存,非定期全量load的all cache方式)

一 :维表方式

create table histalarmDim
(
    metricKey     varchar,
    histalarmData ARRAY< varchar >
) with (
      'connector' = 'redis',
      'host' = '127.0.0.1',
      'port' = '6379',
      'redis-mode' = 'single',
      'key-column' = 'metricKey',
      'value-column' = 'histalarmData',
      'lookup.hash.enable' = 'false',
      'lookup.redis.datatype' = 'list' 
      );

注意:

'lookup.redis.datatype' = 'list' -- 维表使用下,数组情况需要指定redis实际数据类型 LIST SET SORTED_SET,其他忽略

缓存参数:
cache.type          -- heap 、 off-heap 支持堆内、堆外缓存方式
cache.max-rows      -- 缓存数据量大小
cache.ttl           -- 缓存失效时间

二:sink表方式

create table sink_redis
(
    username VARCHAR,
    passport VARCHAR
) with (
      'connector' = 'redis',
      'host' = '127.0.0.1',
      'port' = '6379',
      'redis-mode' = 'single',
      'key-column' = 'username',
      'value-column' = 'passport',
      'command' = 'set');

注意:
command参数是指对应的redis操作命令:

 /**
     * Insert the specified value at the head of the list stored at key.
     * If key does not exist, it is created as empty list before performing the push operations.
     */
    LPUSH(RedisDataType.LIST),

    /**
     * Insert the specified value at the tail of the list stored at key.
     * If key does not exist, it is created as empty list before performing the push operation.
     */
    RPUSH(RedisDataType.LIST),

    /**
     * Add the specified member to the set stored at key.
     * Specified member that is already a member of this set is ignored.
     */
    SADD(RedisDataType.SET),

    /**
     * Set key to hold the string value. If key already holds a value,
     * it is overwritten, regardless of its type.
     */
    SET(RedisDataType.STRING),

    /**
     * Set key to hold the string value, with a time to live (TTL). If key already holds a value,
     * it is overwritten, regardless of its type.
     */
    SETEX(RedisDataType.STRING),

    /**
     * Adds the element to the HyperLogLog data structure stored at the variable name specified as first argument.
     */
    PFADD(RedisDataType.HYPER_LOG_LOG),

    /**
     * Posts a message to the given channel.
     */
    PUBLISH(RedisDataType.PUBSUB),

    /**
     * Adds the specified members with the specified score to the sorted set stored at key.
     */
    ZADD(RedisDataType.SORTED_SET),

    ZINCRBY(RedisDataType.SORTED_SET),

    /**
     * Removes the specified members from the sorted set stored at key.
     */
    ZREM(RedisDataType.SORTED_SET),

    /**
     * Sets field in the hash stored at key to value. If key does not exist,
     * a new key holding a hash is created. If field already exists in the hash, it is overwritten.
     */
    HSET(RedisDataType.HASH),

    HGET(RedisDataType.HASH),


    HINCRBY(RedisDataType.HINCRBY),

    /**
     * Delta plus for specified key.
     */
    INCRBY(RedisDataType.STRING),

    /**
     * Delta plus for specified key and expire the key with fixed time.
     */
    INCRBY_EX(RedisDataType.STRING),

    /**
     * decrease with fixed num for specified key.
     */
    DECRBY(RedisDataType.STRING),

    /**
     * decrease with fixed num for specified key and expire the key with fixed time.
     */
    DESCRBY_EX(RedisDataType.STRING);

redis-mode 是指redis集群部署模式:

  single
  sentinel
  cluster

具体可以阅读源码,学习了解。

 类似资料: