当前位置: 首页 > 知识库问答 >
问题:

使用KAFKA主题中的数据并从中提取字段并使用python存储在MySQL中

马天逸
2023-03-14

我想使用以下命令使用来自 Kafka 主题的数据,如下所示:

kafka-console-consumer.sh引导服务器localhost:9092

然后,这将输出以下内容(只粘贴前2行输出,但它将是许多行...):

&time=1561768216000&gameCategory=PINPOINT&category=ONE&uniqueId=2518Z-0892A-0030O-16H70&transactionType=CRD&familyId=000-222-115-11119&realTs=1561768319000&sortId=1&msg=SET-UP+PRAYER+%26+intercession+begins+in+just+30+minutes.&remoteIpAddress=127.0.0.1&userAgent=HTTP&
&uniqueId=872541806296826880&time=1571988786000&gameCategory=NOTIFY&category=TWO&transactionType=CRD&familyId=401-222-115-89387&sortId=1&realTs=1571988989000&msg=This-is+a+reminder.&remoteIpAddress=127.0.0.1&userAgent=HTTPS&

我想使用输出中的以下内容:

>

  • 房地产经纪人

    家庭标识

    消息

    唯一标识

    您可以看到,每个元素都用一个&符号('

    描述测试表;

    +----------+--------------+------+-----+---------+-------+
    | Field    | Type         | Null | Key | Default | Extra |
    +----------+--------------+------+-----+---------+-------+
    | realTs   | bigint(20)   | YES  |     | NULL    |       |
    | familyId | varchar(255) | YES  |     | NULL    |       |
    | msg      | text         | YES  |     | NULL    |       |
    | uniqueId | varchar(255) | YES  |     | NULL    |       |
    +----------+--------------+------+-----+---------+-------+
    4 rows in set (0.00 sec)
    

    SELECT*FROM测试表;

    +---------------+-------------------+-----------------------------------------------------------+-------------------------+
    | realTs        | familyId          | msg                                                       | uniqueId                |
    +---------------+-------------------+-----------------------------------------------------------+-------------------------+
    | 1561768319000 | 000-222-115-11119 | SET-UP+PRAYER+%26+intercession+begins+in+just+30+minutes. | 2518Z-0892A-0030O-16H70 |
    | 1571988989000 | 401-222-115-89387 | This-is+a+reminder.                                       | 872541806296826880      |
    +---------------+-------------------+-----------------------------------------------------------+-------------------------+
    

    到目前为止,我有什么?我有一个带有 python 的 mysql 连接器,我可以在其中连接到本地 mysql 等,但我正在努力解析并插入它......

  • 共有1个答案

    卢承弼
    2023-03-14

    使用Python,您可以使用urllib.parse.parse_qs检索Python字典中的URL查询字符串组件,您可以稍后迭代这些组件以将数据插入MySQL数据库。

    例如:

    from urllib.parse import parse_qs
    line = "&time=1561768216000&gameCategory=PINPOINT&category=ONE&uniqueId=2518Z-0892A-0030O-16H70&transactionType=CRD&familyId=000-222-115-11119&realTs=1561768319000&sortId=1&msg=SET-UP+PRAYER+%26+intercession+begins+in+just+30+minutes.&remoteIpAddress=127.0.0.1&userAgent=HTTP&uniqueId=872541806296826880&time=1571988786000&gameCategory=NOTIFY&category=TWO&transactionType=CRD&familyId=401-222-115-89387&sortId=1&realTs=1571988989000&msg=This-is+a+reminder.&remoteIpAddress=127.0.0.1&userAgent=HTTPS&"
    
    o = parse_qs(line)
    
    print(o)
    

    结果:

    {'time': ['1561768216000', '1571988786000'], 'gameCategory': ['PINPOINT', 'NOTIFY'], 'category': ['ONE', 'TWO'], 'uniqueId': ['2518Z-0892A-0030O-16H70', '872541806296826880'], 'transactionType': ['CRD', 'CRD'], 'familyId': ['000-222-115-11119', '401-222-115-89387'], 'realTs': ['1561768319000', '1571988989000'], 'sortId': ['1', '1'], 'msg': ['SET-UP PRAYER & intercession begins in just 30 minutes.', 'This-is a reminder.'], 'remoteIpAddress': ['127.0.0.1', '127.0.0.1'], 'userAgent': ['HTTP', 'HTTPS']}
    
     类似资料:
    • 我正在尝试编写一个python代码来使用来自Confluent Kafka主题的数据,并作为测试项目的一部分执行数据验证。我能够读取数据,但是消费过程处于无限循环中,如果循环读取所有消息,则寻找退出的决策点。 参见下面的示例代码 请建议决定是否阅读所有消息的最佳方法,以便我可以退出循环并关闭消费者。

    • 嗨,我正在阅读kafka主题,我想处理从kafka接收到的数据,例如tockenize,过滤掉不必要的数据,删除停用词,最后我想写回另一个kafka主题 然后我得到以下错误 线程"main"中的异常org.apache.spark.sql.Analysis Exception:具有流源的查询必须使用WriteStream.start()执行; 然后,我对代码进行了如下编辑,以从Kafka中读取并写

    • 我们需要从Kafka主题导出生产数据以用于测试目的:数据用Avro编写,模式放在模式注册表中。 我们尝试了以下策略: 使用和或。我们无法获得可以用Java解析的文件:解析时总是出现异常,这表明文件格式错误。 使用:它生成一个还包括一些字节的json,例如在反序列化BigDecimal时。我们甚至不知道要选择哪个解析选项(不是avro,也不是json) null 使用Kafka连接接收器。我们没有找

    • 问题内容: 我是Python,Twitter和Tweepy的新手。我设法从Twitter提取数据,但是现在我想将其存储到CSV文件中。 我的代码是: 这会在CLI上打印出数据,但我希望将其输出到CSV文件。我尝试了几种不同的选项,但它只输出第一个推文,而不是所有推文。 问题答案: 这样就可以了! 我建议您使用Python的csv。打开一个文件并在循环期间将其写入,如下所示:

    • 我有一个Excel(.xlsx)文件,大约有40张工作表。每个工作表具有相同的结构,但包含不同的数据。我想从每张表中提取信息并将其合并为一张,将每张表中的信息一张叠在另一张上。我需要从中提取的信息有两个: 表名,总是在单元格E3中找到 感兴趣的单元格区域,总是在行72-85和列E-V之间找到 提取的信息将粘贴在合并工作表的第2-15行中,工作表名称在一列中,所有其他信息在其旁边的列中。然后,从提取

    • 需要以下步骤的帮助,了解如何使用Kafka主题中的消息并将其存储在/tmp/kakfa messages目录中 问题陈述: 创建Kafka使用者以使用主题“Multibrokerapplication”中的消息,并将其存储在“/tmp/kafka messages”中 步骤1:我能够使用发布到主题“Multibrokerapplication”的消息,如下所示。 bin/Kafka控制台消费者。s