分享一个项目flink-redis-connector,功能如下:
create table histalarmDim
(
metricKey varchar,
histalarmData ARRAY< varchar >
) with (
'connector' = 'redis',
'host' = '127.0.0.1',
'port' = '6379',
'redis-mode' = 'single',
'key-column' = 'metricKey',
'value-column' = 'histalarmData',
'lookup.hash.enable' = 'false',
'lookup.redis.datatype' = 'list'
);
注意:
'lookup.redis.datatype' = 'list' -- 维表使用下,数组情况需要指定redis实际数据类型 LIST SET SORTED_SET,其他忽略
缓存参数:
cache.type -- heap 、 off-heap 支持堆内、堆外缓存方式
cache.max-rows -- 缓存数据量大小
cache.ttl -- 缓存失效时间
create table sink_redis
(
username VARCHAR,
passport VARCHAR
) with (
'connector' = 'redis',
'host' = '127.0.0.1',
'port' = '6379',
'redis-mode' = 'single',
'key-column' = 'username',
'value-column' = 'passport',
'command' = 'set');
注意:
command参数是指对应的redis操作命令:
/**
* Insert the specified value at the head of the list stored at key.
* If key does not exist, it is created as empty list before performing the push operations.
*/
LPUSH(RedisDataType.LIST),
/**
* Insert the specified value at the tail of the list stored at key.
* If key does not exist, it is created as empty list before performing the push operation.
*/
RPUSH(RedisDataType.LIST),
/**
* Add the specified member to the set stored at key.
* Specified member that is already a member of this set is ignored.
*/
SADD(RedisDataType.SET),
/**
* Set key to hold the string value. If key already holds a value,
* it is overwritten, regardless of its type.
*/
SET(RedisDataType.STRING),
/**
* Set key to hold the string value, with a time to live (TTL). If key already holds a value,
* it is overwritten, regardless of its type.
*/
SETEX(RedisDataType.STRING),
/**
* Adds the element to the HyperLogLog data structure stored at the variable name specified as first argument.
*/
PFADD(RedisDataType.HYPER_LOG_LOG),
/**
* Posts a message to the given channel.
*/
PUBLISH(RedisDataType.PUBSUB),
/**
* Adds the specified members with the specified score to the sorted set stored at key.
*/
ZADD(RedisDataType.SORTED_SET),
ZINCRBY(RedisDataType.SORTED_SET),
/**
* Removes the specified members from the sorted set stored at key.
*/
ZREM(RedisDataType.SORTED_SET),
/**
* Sets field in the hash stored at key to value. If key does not exist,
* a new key holding a hash is created. If field already exists in the hash, it is overwritten.
*/
HSET(RedisDataType.HASH),
HGET(RedisDataType.HASH),
HINCRBY(RedisDataType.HINCRBY),
/**
* Delta plus for specified key.
*/
INCRBY(RedisDataType.STRING),
/**
* Delta plus for specified key and expire the key with fixed time.
*/
INCRBY_EX(RedisDataType.STRING),
/**
* decrease with fixed num for specified key.
*/
DECRBY(RedisDataType.STRING),
/**
* decrease with fixed num for specified key and expire the key with fixed time.
*/
DESCRBY_EX(RedisDataType.STRING);
redis-mode 是指redis集群部署模式:
single
sentinel
cluster
具体可以阅读源码,学习了解。