首先安装模块 pip install redis
import redis
redis_con = redis.Redis(host='192.168.163.100', port=6379, db=0) # 与redis建立连接
redis_con.set("api_key", "api_value") # 将字符串传入数据库
print(redis_con.get("api_key")) # 打印字符串
import redis
redis_con = redis.Redis(host='192.168.163.100', port=6379, db=0) # 与redis建立连接
## 字符串
redis_con.mset({'api_key1': 'value1', 'api_key2': 'value2'}) # 传入多个字符键值对
print(redis_con.mget(['api_key1', 'api_key2'])) # 打印输出
## 列表
redis_con.lpush('api_key_list', 1) # 向列表中添加元素
redis_con.lpush('api_key_list', 2) # 向列表中添加元素
print(redis_con.lrange('api_key_list', 0, -1)) # 打印输出
## set
redis_con.sadd('api_key_set', 1) # 添加set元素
redis_con.sadd('api_key_set', 2) # 添加set元素
print(redis_con.smembers('api_key_set')) # 打印输出
## hash
redis_con.hset('api_key_hash', 'api_key1', 'value1') # 添加hash元素
redis_con.hset('api_key_hash', 'api_key2', 'value2') # 添加hash元素
print(redis_con.hgetall('api_key_hash')) # 打印输出
## zset
redis_con.zadd('api_key_zset', {'zset1': 100}) # 向zset中添加元素 评分在后
redis_con.zadd('api_key_zset', {'zset2': 200}) # 向zset中添加元素
print(redis_con.zrange('api_key_zset', 0, 150, byscore=True)) # 打印输出分数在0-150之间的值
# 需求: # 大数据领域经常需要做计算 如果使用Spark做数据计算,数据计算是要获取资源,通常是在Yarn平台获取资源 # 在Yarn资源平台中会存在一个队列 ,那么如果当有一个计算任务提交了 ,那么一个计算任务可以是一个SQL, # 那么该SQL可以认为是一个JOB,当任务,Job提交时,会获取计算资源(包括有 CPU 内存等), # 当提交时,任务会先提交至资源队列中进行排队,当第一个任务执行后,再去执行第二个任务,如果资源获取不到, # 那么会进入等待状态。
from redis import Redis
import json
redis_con = Redis(host='192.168.163.100', port=6379, db=0)
# 把多条任务放入redis中
for i in range(1, 10):
job = {"SQL": "select * from emp", "cpu": 4, "Memory": 8, "job_id": f"job_00{i}"}
job_json = json.dumps(job) # 列表元素只支持字符串 int float 故需要先将字典格式转为字符串
redis_con.lpush('job', job_json)
import json
from redis import Redis
import time
# 与redis建立连接
redis_conn = Redis(host='192.168.163.100', port=6379, db=0)
while True:
# 每2秒取出一条数据
time.sleep(2)
redis_job = redis_conn.rpop('job')
#另一种方式
#redis_job = redis_conn.brpop("job", 5) 每五秒取出一个job
if not redis_job:
print('Job not found')
else:
redis_job_str = redis_job.decode(encoding='utf8') #取出来的是bytes类型,需要转换成string类型
# print(redis_job_str)
# print(type(redis_job_str))
job = json.loads(redis_job) #将字典类型的字符串转换为json
cpu=int(job['cpu'])
memory=int(job['Memory'])
#判断资源是否充足,充足则执行
if cpu<=int(redis_conn.get('cpu')) and memory <=int(redis_conn.get('memory')):
print(redis_job_str+'正在执行')
redis_conn.decrby('cpu',cpu)
redis_conn.decrby('memory',memory)
time.sleep(2)
print(redis_job_str+'执行完成')
redis_conn.incrby('cpu',cpu) #执行完毕释放资源
redis_conn.incrby('memory',memory)
else:
print('资源不足,请等待')
redis.close()
# Redis 是一个缓存数据库 # 通常的使用方式为: 对于一个业务系统, # 经常存在前端、后端(用于在数据库中查询数据,并将其包装成一个API) # 关系型数据库,如果当业务系统中的用户访问量比较大,那么在数据库中查询SQL的频率也会比较高 # 那么对于数据库来说,其访问压力较大,性能在短时间内会急剧下降,那么可以通过将经常查询的信息, # 缓存到 Redis当中 ,提升整体查询的效率
需求: 对于一个业务系统,模拟有国家信息查询,如果查询到以后,那么会将国家信息缓存至Redis当中
import pymysql
from redis import Redis
import json
#连接数据库
conn = pymysql.connect(host='192.168.163.100', port=3306, user='root', passwd='123456', db='bigdata22')
cursor = conn.cursor()
#连接redis
redis_conn = Redis(host='192.168.163.100', port=6379, db=0)
#先在redis中查找,若找不到,再去MySQL中寻找
def getNationInfo(id):
if redis_conn.exists(f'nation_{id}'):
print('已在redis中找到信息')
return json.loads(redis_conn.get(f'nation_{id}'))
else:
sql=f'select * from nations where nid={id}'
cursor.execute(sql)
info=cursor.fetchone()
#在mysql中查找,若找到,则写入redis中,并返回值,若未找到,则返回None
if info:
info_str=f'nid:{info[0]},nation_name:{info[1]},nation_score:{info[2]}'
redis_conn.set(f'nation_{id}',json.dumps(info_str))
return info_str
else:
print('数据库中没有该信息')
return None
if __name__ == '__main__':
print("getNationinfo",getNationInfo(22))