设计思路:
1.程序一旦run起来,python会把mysql中最近一段时间的数据全部提取出来
2.然后实例化redis类,将数据简单解析后逐条传入redis队列
3.定时器设计每天凌晨12点开始跑
ps:redis是个内存数据库,做后台消息队列的缓存时有很大的用处,有兴趣的小伙伴可以去查看相关的文档。
# -*- coding:utf-8 -*- import MySQLdb import schedule import time import datetime import random import string import redis # get the data from mysql class FromSql(object): def __init__(self, conn): self.conn = conn def acquire(self): cursor = self.conn.cursor() try: sql = "SELECT * FROM test WHERE TO_DAYS(NOW()) - TO_DAYS(t) <= 1" cursor.execute(sql) rs = cursor.fetchall() #print (rs) for eve in rs: print('%s, %s, %s, %s' % eve) copy_rs = rs cursor.close() return copy_rs except Exception as e: print("The error: %s" % e) class RedisQueue(object): def __init__(self, name, namespace='queue', **redis_kwargs): """The default connection parameters are: host='localhost', port=6379, db=0""" self.__db= redis.Redis(**redis_kwargs) self.key = '%s:%s' %(namespace, name) def qsize(self): return self.__db.llen(self.key) def put(self, item): self.__db.rpush(self.key, item) def get(self, block=True, timeout=None): if block: item = self.__db.blpop(self.key, timeout=timeout) else: item = self.__db.lpop(self.key) if item: item = item[1] return item def get_nowait(self): return self.get(False) if __name__ == "__main__": # connect mysqldb conn_sql = MySQLdb.connect( host = '127.0.0.1', port = 3306, user = 'root', passwd = '', db = 'test', charset = 'utf8' ) def job_for_redis(): get_data = FromSql(conn_sql) data = get_data.acquire() q = RedisQueue('test',host='localhost', port=6379, db=0) for single_data in data: for meta_data in single_data: q.put(meta_data) print(meta_data) print("All data had been inserted.") """ try: schedule.every().day.at("00:00").do(job_for_redis) except Exception as e: print('Error: %s'% e) # finally: # conn.close() while True: schedule.run_pending() time.sleep(1) """
补充知识:python定时获取汇率存入数据库
python定时任务:
我们可以使用 轻量级的第三方模块schedule。首先先安装:pip install schedule
定时任务的的小测试:
import schedule import time def job(): print("I'm working...") schedule.every(10).minutes.do(job) # 每隔10分钟执行一次任务 schedule.every().hour.do(job) # 每隔一小时执行一次任务 schedule.every().day.at("10:30").do(job) # 每天10:30执行一次任务 schedule.every(5).to(10).days.do(job) # 每5-10天执行一次任务 schedule.every().monday.do(job) # 每周一的这个时候执行一次任务 schedule.every().wednesday.at("13:15").do(job) # 每周三13:15执行一次任务 while True: schedule.run_pending()
获取数据存入数据库:(格式可能不太对,还有一些符号。自己修改一下即可)
import pymysql import schedule import time import requests import pandas from sqlalchemy import create_engine #获取美元的所有外汇 def job(): content = '美元' url = 'http://www.boc.cn/sourcedb/whpj/index.html' #外汇数据地址 html = requests.get(url).content.decode('utf-8') index = html.index('<td>' + content + '</td>') str = html[index:index+300] result = re.findall('<td>(.*?)</td>',str) print("币种:" + result[0]) print("现汇买入价:" + result[1]) print("现钞买入价:" + result[2]) print("现汇卖出价:" + result[3]) print("现钞卖出价:" + result[4]) print("中行结算价:" + result[5]) print("发布时间:" + result[6] + ' ' + result[7]) #本地地址 数据库账号 密码 数据库名 db = pymysql.connect('localhost','root','root','pinyougoudb') cursor = db.cursor() #sql语句 sql = "update tb_money set huiBuy = %s,chaoBuy = %s,huiSale = %s,chaoSale = %s,centerResult= %s,publishTime = '%s' where typeId = '%s'" % (result[1], result[2], result[3], result[4], result[5], result[6] + ' ' + result[7], result[0]) cursor.execute(sql) db.commit() print('success') # 查询语句,将存入的数据查出来 # sqlalchemy 进行数据库初始化 engine = create_engine('mysql+pymysql://root:root@localhost:3306/pinyougoudb') sql = '''select * from tb_money''' # pandas 进行数据库读写 df = pandas.read_sql_query(sql,engine) print(df) db.commit() # 每隔几分中刷新一次 #schedule.every(0.1).minutes.do(job) #每天什么时候刷新 schedule.every().day.at("09:29").do(job) schedule.every().day.at("09:30").do(job) #一直循环 知道满足条件执行 while True: schedule.run_pending()
以上这篇Python定时从Mysql提取数据存入Redis的实现就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持小牛知识库。
本文向大家介绍Python爬取数据并写入MySQL数据库的实例,包括了Python爬取数据并写入MySQL数据库的实例的使用技巧和注意事项,需要的朋友参考一下 首先我们来爬取 http://html-color-codes.info/color-names/ 的一些数据。 按 F12 或 ctrl+u 审查元素,结果如下: 结构很清晰简单,我们就是要爬 tr 标签里面的 style 和 tr 下几
本文向大家介绍python实现定时提取实时日志程序,包括了python实现定时提取实时日志程序的使用技巧和注意事项,需要的朋友参考一下 本文实例为大家分享了python定时提取实时日志的具体代码,供大家参考,具体内容如下 这是一个定时读取 实时日志文件的程序。目标文件是target_file. 它是应用程序实时写入的。 我要做的是,每个5秒钟,提取一次该日志文件中的内容,然后生成另一个文件,最后把
本文向大家介绍python实现提取COCO,VOC数据集中特定的类,包括了python实现提取COCO,VOC数据集中特定的类的使用技巧和注意事项,需要的朋友参考一下 1.python提取COCO数据集中特定的类 安装pycocotools github地址:https://github.com/philferriere/cocoapi pip install git+https://github
问题内容: 我的Python代码处理了以下文本: 您能建议我如何从内部提取数据吗?我的想法是将其放入具有以下格式的CSV文件中:。 我希望没有正则表达式会很困难,但实际上我仍然在反对正则表达式。 我或多或少地通过以下方式使用了代码: 理想情况下是将每个td竞争以某个数组进行竞争。上面的HTML是python的结果。 问题答案: 获取BeautifulSoup并使用它。这很棒。
问题内容: 我试图提取具有特定文本文件之间的文本: 然后将其转储到文本文件中,以便 谢谢您的帮助。 问题答案: 这对我来说足够好了。您的样本数据在一个名为“ data.txt”的文件中,输出将进入“ result.txt”
我是编程新手,我开始使用Ionic框架构建应用程序作为体验。我目前正在学习构建一个基本的社交媒体应用程序,匿名用户可以在该平台上发帖。 我现在处于用户可以评论帖子的阶段。但是,我很难在视图上显示每个帖子的实时评论数。 我计算评论的方法是从Firebase获取帖子数据,计算评论数量,然后显示给视图。 我尝试过使用间隔来每隔x秒获取post数据,以刷新post数据。这有点可行,但问题是离子内容(视图)