当前位置: 首页 > 知识库问答 >
问题:

GCP数据流是否支持python中的kafka IO?

井昊乾
2023-03-14

我正在尝试使用Kafka从Kafka主题中读取数据。python代码中的ReadFromKafka()方法。我的代码如下所示:

from apache_beam.io.external import kafka
import apache_beam as beam

options = PipelineOptions()

with beam.Pipeline(options=options) as p:
           plants = (
      p
        |       'read' >> kafka.ReadFromKafka({'bootstrap.servers': 'public_ip:9092'}, ['topic1']))

但下面是错误消息。

错误:apache\u beam。跑步者。运行程序:访问读取回溯时出错(上次调用):文件“test_File.py”,第16行,在

是因为apache光束数据流转轮不支持kafkaIO吗?


共有2个答案

郭凯
2023-03-14

现在数据流支持多语言数据流Pipelinne检查此:链接

慕乐池
2023-03-14

beam的python SDK不支持连接到Kafka。下面是一段代码片段

from __future__ import print_function
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import kafkaio

kafka_topic = "notifications"
kafka_config = {"topic": kafka_topic,
                "bootstrap_servers": "localhost:9092",
                "group_id": "notification_consumer_group"}

with beam.Pipeline(options=PipelineOptions()) as p:
    notifications = p | "Reading messages from Kafka" >> kafkaio.KafkaConsume(kafka_config)
    notifications | 'Writing to stdout' >> beam.Map(print)

bootstrap_服务器是一种逗号分隔的主机和端口配置,在这里部署代理。您将从Kafka群集配置中获得此信息。

 类似资料:
  • 问题内容: Python是否支持短路? 问题答案: 是的操作员都短路了-请参阅docs。

  • 问题内容: 如果我有参数,那么我的程序只需使用函数中的一个即可。有没有一种方法可以组合起来,以便程序仅接受或? 编辑: 添加一个简单的程序以更清楚: 然后只能被调用。是否可以将argparse组排除在组之外,以便仅被调用? 问题答案: 编辑 :没关系。因为调用时必须创建一个选项,这是一个可怕的选择。那不是我的设计选择。如果您迫切需要此功能,可以尝试使用ConflictsOptionParser来实

  • 1.16.1. 什么是 PyMySQL? PyMySQL 是在 Python3.x 版本中用于连接 MySQL 服务器的一个库,Python2中则使用mysqldb。 PyMySQL 遵循 Python 数据库 API v2.0 规范,并包含了 pure-Python MySQL 客户端库。 1.16.2. PyMySQL安装 PyMySQL下载地址:https://github.com/PyMy

  • 使用标准的GCP提供的存储/文本文件来发布Sub数据流模板,但是尽管我已经设置了#workernodes eq 1,但是对于下游组件来说,处理的消息吞吐量“太高”。 在 Pub/Sub 中的消息事件上运行的 Cloud 函数会命中 GCP 配额,并且使用 CloudRun,我在开始时收到一堆 500、429 和 503 个错误(由于步进突发率)。 有没有办法控制数据流的处理速率?需要获得更软/更慢

  • 其他流式框架(如Apache Samza、Storm或Nifi)是否可以实现这一点? 我们非常期待得到答复。

  • 问题内容: 根据这些评论,JSONKit不支持ARC,甚至在ARC环境中都不使用fobjc-no- arc设置运行:https : //github.com/johnezang/JSONKit/issues/37 问题答案: 您仍然可以在ARC应用程序中使用JSONKit。 我自己用的。 在XCode 5中选择项目根目录,在“ 选择应用程序”下,然后选择“ 选项卡”。在JSONKit.m 下,双击