当前位置: 首页 > 工具软件 > Asyncio Mongo > 使用案例 >

python读文件保存json保存mongo,多进程,协程,计时,redis队列

秦炜
2023-12-01
# -*- coding: utf-8 -*-
"""
 @Time   : 2020/7/10 17:22
 @Athor   : LinXiao
 @功能   :
"""
# ------------------------------
import asyncio
import json
import multiprocessing
import os
import platform
import timeit
from copy import deepcopy
from pprint import pprint

from loguru import logger
from motor.motor_asyncio import AsyncIOMotorClient

from redis import Redis

SAVE_PATH=r"/backup/btc/btc_outputList_tx"
que=multiprocessing.Queue()
redis=Redis(db=2)


def file_path(dirname):
    redis.expire("json_path", 0)
    for root, _, files in os.walk(dirname):
        for f in files:
            file_path=os.path.join(root, f)
            # 将每一个json path存入redis
            redis.lpush("json_path", file_path)
            logger.info(f"{file_path}  to  redis!")
    length=redis.llen("json_path")
    logger.info(f"总共有{length} 个文件")


# 清洗新的btc
async def get_outputList(tx):
    voutValues=0
    vinValues=0

    outputValue=0
    vout=tx["vout"]  # list
    vin=tx["vin"]  # list
    for vi in vin:
        outputList=[]
        if vi['id'] == 'coinbase':
            for vout_data in vout:
                try:
                    if vout_data["address"] != "":
                        dic={
                            "address": vout_data.get("address")[0],
                            "value": vout_data.get("value")
                        }
                        outputValue=dic.get("value")
                        outputList.append(dic)

                        btc_data={

                            "outputList": outputList,
                            "voutValues": outputValue,
                            "vinValues": 0,
                            "outputValue": dic.get("value"),
                            "fee": dic.get("value"),
                        }
                        tx.update(btc_data)
                        return tx
                except Exception as e:
                    logger.error(e)
                    continue

        else:
            # 判断vin中的地址是否在vout中出现,若出现在vout中,则删除掉vout中的这笔交易
            try:
                vout1=deepcopy(vout)
                for vi in vin:
                    vinValues+=vi.get("value")
                    for vo in vout1:
                        if vi["address"] == vo.get("address"):
                            vout1.remove(vo)

                for vo1 in vout1:
                    # 这里要判断vout的address的值得类型,大多数时候是[],但是612001块就出现了""空字符串
                    output_address=str(vo1.get("address")[0]) if isinstance(vo1.get("address"), list) else ""
                    output={
                        "address": output_address,
                        "value": vo1.get("value")
                    }
                    outputList.append(output)
                    if vo1.get("value"):
                        outputValue+=float(vo1.get("value"))

            except Exception as e:
                logger.error(tx)
                continue

        for vot in vout:
            voutValues+=vot.get("value")

        fee=float(vinValues - voutValues)

        btc_data={

            "outputList": outputList,
            "voutValues": voutValues,
            "vinValues": vinValues,
            "outputValue": outputValue,
            "fee": fee,
        }
        tx.update(btc_data)
        return tx


# 读文件
async def read_in_line(file_path):
    with open(file_path, "r") as f:
        while True:
            line=f.readline()
            if line:
                yield line
            else:
                return  # 如果读取到文件末尾,则退出


# 存为json文件
async def save_json(tx, path):
    # 先将字典对象转化为可写入文本的字符串
    item=json.dumps(tx)
    try:
        if not os.path.exists(path):
            with open(path, "w", encoding='utf-8') as f:
                f.write(item + "\n")

        else:
            with open(path, "a", encoding='utf-8') as f:
                # f.write(item + ",\n")   # 这里不能有",",不然会导致mongo import的时候报错:
                f.write(item + "\n")
    except Exception as e:
        print(e)


# 存入mongo库
def save_to_mongo(data):
    # 保存到mongo数据库中
    db_url='39.99.160.162'
    db_port=27017
    db_name="btc_tx_new"
    db_collection="transaction"
    # 建立连接
    client=AsyncIOMotorClient(db_url, db_port)

    # 连接某个库名字
    db=client[db_name][db_collection]

    # db.insert_many([i for i in data])
    db.insert_one(data)


async def get_btc_data():
    # async def get_btc_data(dirname):
    # file_list=await file_path(dirname)
    # for path in file_list:  # todo 开多进程的时候,每个进程从redis中取一个path
    process=os.getpid()

    while True:

        json_path=redis.rpop("json_path")

        if json_path is None:
            break
        path=bytes.decode(json_path)

        logger.info(f"process_{process} start handle | {path} ")
        start=timeit.default_timer()

        with open(path, 'r') as f:
            save_list=[]

            while True:
                data=f.readline()
                if not data:
                    break
                tx=json.loads(data)
                btc_data=await get_outputList(tx)

                if platform.system() == 'Linux' or platform.system() == 'Darwin':
                    save_path=SAVE_PATH
                    name=path.split("/")[-1]  # Linux下
                else:
                    save_path=r"D:\Project\etc\Chain-Storage\src\core\test"
                    name=path.split("\\")[-1]  # windows下

                # 存为新json
                new_path=f"{save_path}/new_{name}"
                logger.info(f"save {btc_data}")
                await save_json(btc_data, new_path)

                # todo 直接存入mongo数据库 太慢了,每秒200条,共5亿条,则需要722小时
                # save_to_mongo(btc_data)

            elapsed=(timeit.default_timer() - start)
        logger.info(f"process_{process} write {name} success! | used {elapsed} s")


def main():
    loop=asyncio.get_event_loop()
    loop.run_until_complete(get_btc_data())
    loop.close()


if __name__ == '__main__':
    if platform.system() == 'Linux' or platform.system() == 'Darwin':
        dirname=r"/backup/btc/btc_new_tx11"
    else:
        dirname=r"D:\Project\etc\Chain-Storage\src\core\test"
        # dirname=r"D:\Project\etc\Chain-Storage\src\core\test\test1"

    file_path(dirname)

    # 多进程
    process_count=3
    # pool=multiprocessing.Pool(process_count)
    with multiprocessing.Pool(process_count) as  pool:
        for i in range(process_count):
            # pool.apply_async(asyncio.get_event_loop().run_until_complete(Url().save_url_to_redis()), (i,))  # 多进程调用协程 ,将协程函数重复执行三次,
            pool.apply_async(main)  # 多进程调用普通函数

        pool.close()
        pool.join()

    # 进程队列
    # que.put("1")
    #     # que.put("255")
    # que.put("1wqert")
    # if platform.system() == 'Linux' or platform.system() == 'Darwin':
    #     with open(r"/backup/btc/btc_erro/erro_message.json", "w", encoding='utf-8') as f:
    #         while not que.empty():
    #             res = que.get()
    #             if res:
    #                 f.write(res+"\n")
    #             else:
    #                 break
    # else:
    #     with open(r"err.json", "w", encoding='utf-8') as f:
    #         while not que.empty():
    #             res = que.get()
    #             if res:
    #                 f.write(res+"\n")
    #             else:
    #                 break
 类似资料: