> ethereumetl export_blocks_and_transactions --start-block 0 --end-block 500000 \
--blocks-output blocks.csv --transactions-output transactions.csv \
--provider-uri https://mainnet.infura.io/v3/7aef3f0cd1f64408b163814b22cc643c
def export_blocks_and_transactions(start_block, end_block, batch_size, provider_uri, max_workers, blocks_output,
transactions_output, chain='ethereum'):
"""Exports blocks and transactions."""
provider_uri = check_classic_provider_uri(chain, provider_uri)
if blocks_output is None and transactions_output is None:
raise ValueError('Either --blocks-output or --transactions-output options must be provided')
job = ExportBlocksJob(
start_block=start_block,
end_block=end_block,
batch_size=batch_size,
batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=True)),
max_workers=max_workers,
item_exporter=blocks_and_transactions_item_exporter(blocks_output, transactions_output),
export_blocks=blocks_output is not None,
export_transactions=transactions_output is not None)
job.run()
导出以太坊链上start到end所有块的信息,输出块信息blocks_output和交易信息transactions_output(可指定为csv文件)
判断在以太坊链中是否存在一个有效的provider_uri
判断输入块号和输出块号是否为同时为空
构建一个挖掘信息的job–ExportBlocksJob
class ExportBlocksJob(BaseJob)
batch_web3_provider 通过一个代理工厂的方式获取一个web3的连接对象
batch_work_executor 构造批处理的执行者
item_exporter 设置挖掘匹配的字段,以及存储文件
transaction_mapper 实现transaction对象和json的转换解析
block_mapper 实现block和json之间的转换,可以传入transaction_mapper
def _start(self):
self.item_exporter.open()
准备存储block和transaction的文件
def _export(self):
self.batch_work_executor.execute(
range(self.start_block, self.end_block + 1),
self._export_batch,
total_items=self.end_block - self.start_block + 1
)
调用batch_work_executor的execute方法
_export_batch批次获取以太坊的块内容以及交易信息
def _end(self):
self.batch_work_executor.shutdown()
self.item_exporter.close()
释放batch_work_executor对象,以及关闭存储block和transaction信息的文件
class BatchWorkExecutor:
executor 属于FailSafeExecutor对象(嵌套一个BoundedExecutor对象),执行主要工作的
progress_logger 线程安全的自定义程序日志统计(运行时间等)
logger 通过工厂方法获取一个logger实例
def execute(self, work_iterable, work_handler, total_items=None):
self.progress_logger.start(total_items=total_items)
for batch in dynamic_batch_iterator(work_iterable, lambda: self.batch_size):
self.executor.submit(self._fail_safe_execute, work_handler, batch)
1.启动progress_logger 开始计时以及打印日志
2.对于每个start_block块号到end_block之间的获取一个块号,通过动态调节(当前存储在内存的batch数量小于等于batch_size,否则释放之前的batch)的批处理函数获取一个当前读取区块链号
3.调用executor 的submit方法
class BoundedExecutor:
_delegate 通过ThreadPoolExecutor构造一个线程池
_semaphore 通过BoundedSemaphore得到一个有界信号量对象
class FailSafeExecutor:
_delegate 引用BoundedExecutor_delegate
_futures 保存多线程运行的一组对象,通过调用done函数判断是否完成
def submit(self, fn, *args, **kwargs):
self._check_completed_futures()
future = self._delegate.submit(fn, *args, **kwargs)
self._futures.append(future)
return future
1.检查所有之前的线程有没有完成,如果完成则释放线程锁,可以让线程池空出一个位置
2.提交一个新的任务,返回一个线程运行的对象
3.添加到_futures 中方便下次检查
export_blocks_and_transactions这个命令调用时,首先通过一系列的数据校验(包括起始块和目标块的参数,请求地址是否在链上等),然后通过创建ExportBlocksJob对象,把请求的任务进行一个封装,通过BatchWorkExecutor来进行执行。而BatchWorkExecutor中也定义了一个批次处理任务的线程池以及信号量用于管理最多运行的线程数目,同时负责任务的执行。整个链的获取来看,整体并没有考虑到区块链的回滚,以及错误数据获取的情况,缺乏安全性。