> ethereumetl stream --start-block 500000 -e block,transaction,log,token_transfer --log-file log.txt \
--provider-uri https://mainnet.infura.io/v3/7aef3f0cd1f64408b163814b22cc643c
实现区块、交易、日志、货币不断地传输到控制台
def stream(last_synced_block_file, lag, provider_uri, output, start_block, entity_types,
period_seconds=10, batch_size=2, block_batch_size=10, max_workers=5, log_file=None, pid_file=None):
"""Streams all data types to console or Google Pub/Sub."""
configure_logging(log_file)
configure_signals()
entity_types = parse_entity_types(entity_types)
validate_entity_types(entity_types, output)
from ethereumetl.streaming.eth_streamer_adapter import EthStreamerAdapter
from blockchainetl.streaming.streamer import Streamer
# TODO: Implement fallback mechanism for provider uris instead of picking randomly
provider_uri = pick_random_provider_uri(provider_uri)
logging.info('Using ' + provider_uri)
streamer_adapter = EthStreamerAdapter(
batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=True)),
item_exporter=create_item_exporters(output),
batch_size=batch_size,
max_workers=max_workers,
entity_types=entity_types
)
streamer = Streamer(
blockchain_streamer_adapter=streamer_adapter,
last_synced_block_file=last_synced_block_file,
lag=lag,
start_block=start_block,
period_seconds=period_seconds,
block_batch_size=block_batch_size,
pid_file=pid_file
)
streamer.stream()
1.日志的配置,以及实体类型的校验
2.随机选取一个提供的uri,构建EthStreamerAdapter对象用于封装Streamer对象
3.调用Streamer的stream方法
class EthStreamerAdapter:
item_id_calculator 实例化一个EthItemIdCalculator对象,主要函数
def calculate(self, item):
对各个实体对象(block,transaction,log,token_transfer,trace,contract,token等)进行数据内容的拼接
item_timestamp_calculator 实例化一个EthItemTimestampCalculator对象,主要函数
def calculate(self, item):
不同类型的实体对应的时间戳转换为标准时间
def get_current_block_number(self):
w3 = build_web3(self.batch_web3_provider)
return int(w3.eth.getBlock("latest").number)
获取当前最新的代码块号
def export_all(self, start_block, end_block):
通过调用前面两篇博客学习的方法获取内容输出到目标位置
class Streamer:
blockchain_streamer_adapter 对应之前构建的EthStreamerAdapter
last_synced_block 最后一次同步的区块,从默认文件读取
def stream(self):
try:
if self.pid_file is not None:
logging.info('Creating pid file {}'.format(self.pid_file))
write_to_file(self.pid_file, str(os.getpid()))
self.blockchain_streamer_adapter.open()
self._do_stream()
finally:
self.blockchain_streamer_adapter.close()
if self.pid_file is not None:
logging.info('Deleting pid file {}'.format(self.pid_file))
delete_file(self.pid_file)
如果用户指定了pid_file,那么需要写入文件程序运行的pid。然后通过调用适配器的open方法,open方法将打开输出数据的位置(如postgre,kafka以及其他)的写功能,默认是命令行。然后调用_do_stream方法,最后是关闭输出流,删除对应文件。
def _do_stream(self):
while True and (self.end_block is None or self.last_synced_block < self.end_block):
synced_blocks = 0
try:
synced_blocks = self._sync_cycle()
except Exception as e:
# https://stackoverflow.com/a/4992124/1580227
logging.exception('An exception occurred while syncing block data.')
if not self.retry_errors:
raise e
if synced_blocks <= 0:
logging.info('Nothing to sync. Sleeping for {} seconds...'.format(self.period_seconds))
time.sleep(self.period_seconds)
一个如果末尾块号为空或者最后一个同步块号小于最后一个块号时的while循环,循环内是一个获取当前需要同步的块数,如果需要同步的块数小于等于0,则休眠period_seconds(默认为10s,因为以太坊平均15秒出块)
def _sync_cycle(self):
current_block = self.blockchain_streamer_adapter.get_current_block_number()
target_block = self._calculate_target_block(current_block, self.last_synced_block)
blocks_to_sync = max(target_block - self.last_synced_block, 0)
logging.info('Current block {}, target block {}, last synced block {}, blocks to sync {}'.format(
current_block, target_block, self.last_synced_block, blocks_to_sync))
if blocks_to_sync != 0:
self.blockchain_streamer_adapter.export_all(self.last_synced_block + 1, target_block)
logging.info('Writing last synced block {}'.format(target_block))
write_last_synced_block(self.last_synced_block_file, target_block)
self.last_synced_block = target_block
return blocks_to_sync
通过适配器获取当前的块号current_block,然后通过计算获得下一个应该获取的目标块号,然后得到当前应该要同步的块数目。如果需要同步的块数目不为0,则通过适配器函数获取需要同步块内的数据。