当前位置: 首页 > 知识库问答 >
问题:

如何使用Python在kafka使用者中聚合json数据?

都浩淼
2023-03-14

我的数据在Kafka事务处理主题中是这样的:

ConsumerRecord(Topic='transactions',partition=0,offset=4,timestamp=1591277946736,timestamp_type=0,key=none,value={'transaction_id':'4952940859','account_number':14,'transaction_reference':'44291','transaction_datetime':'2020-06-04T19:09:06.736128','amount':2.82},headers=[],checksum=none,

ConsumerRecord(Topic='transactions',partition=0,offset=5,timestamp=1591277946737,timestamp_type=0,key=none,value={'transaction_id':'0193362270','account_number':12,'transaction_reference':'96312','transaction_datetime':'2020-06-04T19:09:06.736128','amount':766.95},headers=[],checksum=none,

到目前为止编写的消费者守则是:

consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 auto_offset_reset='earliest',
                                 value_deserializer=lambda m: json.loads(m.decode('utf-8')))
consumer.subscribe(['Transactions'])
for message in consumer:
            print (message)

我希望输出像元组(account_number,sum(amount)),我如何实现这一点?

共有1个答案

松刚豪
2023-03-14

我认为字典可能比元组更有用。DefaultDict将非常适合此过程

from collections import defaultdict

accounts = defaultdict(int)

for message in consumer:
    payload = message.value
    account = payload['account_number']
    amount = payload['amount']

    accounts[account] += amount


print(accounts)

defaultdict(<class 'int'>,{
  "14": 263.75,
  "12": 766.95
})

要获得可能要查找的元组,可以在循环之后遍历accounts.items()

for info in accounts.items():
    print(info)

("14", 263.75)
("12", 766.95)
 类似资料:
  • 我有一套文件。每个文档有两个字段—“代码”和“状态”。我的mongodb集合包含以下文档: 我想按每个代码的状态查找计数。我想要的输出如下所示: 如何使用spring data mongodb实现这一点?我对mongodb很陌生。 更新我已成功编写mongodb查询。这是: 有人能帮助您在spring data mongodb中编写此查询吗?

  • 问题内容: 我是一名学习Kafka的新学生,在了解多个消费者(到目前为止,文章,文档等对他们没有太大帮助)方面,我遇到了一些基本问题。 我尝试做的一件事是编写我自己的高级Kafka生产者和消费者并同时运行它们,将100条简单消息发布到某个主题,然后让消费者检索它们。我已经成功地做到了这一点,但是当我尝试引入另一个使用者来使用与刚刚发布消息的主题相同的主题时,它没有收到消息。 据我了解,对于每个主题

  • 我使用的是MongoDB,我必须在spring数据mongo DB中使用$date的聚合查询。这是我的用户收藏。 现在我必须使用MongoTemplate在spring data mongodb中编写这个查询。我是使用聚合的新手。他们是任何简单的方法来使用它。请帮忙 谢谢你。

  • 我有以下内容:我注意到在运行代码的末尾,如果我打印出aggregations.asMap(). get('subject');我得到:org.elasticsearch.search.aggregations.bucket.terms.StringTerms@6cff59fa 打印出“聚合”给了我:org.elasticsearch.search.aggregations.InternalAggr

  • 我需要汇总以下记录中的所有标记: https://gist.github.com/sbassi/5642925 (这个片段中有2个样本记录)并按大小对它们进行排序(首先是出现频率更高的标记)。但是我不想考虑具有特定“user_id”的数据(比方说,2,3,6和12)。 以下是我的尝试(只是聚合,没有过滤和排序): db。用户库。聚合({$unwind:“$annotations.data.tags