# -*- coding: utf-8 -*-
# @Time : 2019-02-13 10:44
# @Author : cxa
# @File : mongohelper.py
# @Software: PyCharm
import asyncio
from logger.log import storage
import pathlib
import datetime
from motor.motor_asyncio import AsyncIOMotorClient
try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except ImportError:
pass
db_configs = {
'type': 'mongo',
'host': '121.7.0.170',
'port': '27017',
'user': 'admin',
'passwd': '1234qwer',
'db_name': 'spider_data'
}
class MotorOperation():
def __init__(self):
self.__dict__.update(**db_configs)
if self.user:
self.motor_uri = f"mongodb://{self.user}:{self.passwd}@{self.host}:{self.port}/{self.db_name}"
else:
self.motor_uri = f"mongodb://{self.host}:{self.port}/{self.db_name}"
self.client = AsyncIOMotorClient(self.motor_uri)
self.mb = self.client[self.db_name]
def get_use_list(self):
with open("tangxinqun_details.json", "r", encoding="utf-8") as fs:
data = fs.read()
return data
async def save_file_to_mongo(self):
items = self.get_use_list()
await self.mb.tangxinqun_details.insert_many(eval(items))
async def save_data(self, items):
storage.info(f"此时的items:{items}")
if isinstance(items, list):
for item in items:
try:
item["primary_key"] = item["primary_key"]
await self.mb.tangxinqun_data.update_one({
'primary_key': item.get("primary_key")},
{'$set': item},
upsert=True)
except Exception as e:
storage.error(f"数据插入出错:{e.args}此时的item是:{item}")
elif isinstance(items, dict):
try:
items["primary_key"] = items["primary_key"]
await self.mb.tangxinqun_data.update_one({
'primary_key': items.get("primary_key")},
{'$set': items},
upsert=True)
except Exception as e:
storage.error(f"数据插入出错:{e.args}此时的item是:{items}")
async def change_status(self, condition, status_code=0):
# status_code 0:初始,1:开始下载,2下载完了
try:
item = {}
item["status"] = status_code
storage.info(f"修改状态,此时的数据是:{item}")
await self.mb.tangxinqun_details.update_one(condition, {'$set': item})
except Exception as e:
storage.error(f"修改状态出错:{e.args}此时的数据是:{item}")
async def get_detail_datas(self):
data = self.mb.tangxinqun_details.find({'status': 0})
async for item in data:
print(item)
return data
async def reset_status(self):
await self.mb.tangxinqun_details.update_many({'status': 1}, {'$set': {"status": 0}})
async def reset_all_status(self):
await self.mb.tangxinqun_details.update_many({}, {'$set': {"status": 0}})
async def find_data(self):
curosr = self.mb.tangxinqun_details.find({'status': 0}, {"_id": 0})
async_gen = (item async for item in curosr)
return async_gen
async def do_delete_many(self):
await self.mb.tangxinqun_data.delete_many({"flag": 0})
if __name__ == '__main__':
m = MotorOperation()
loop = asyncio.get_event_loop()
loop.run_until_complete(m.save_file_to_mongo())