当前位置: 首页 > 知识库问答 >
问题:

GCP数据流Kafka(作为Azure事件中心)->大查询

孟嘉歆
2023-03-14

我有一个启用了Kafka的Azure Event Hub,我正试图从Google Cloud的数据流服务连接到它,以将数据流式传输到Google Big Query。我成功地可以使用Kafka CLI与Azure Event Hub交谈。但是,使用GCP,5分钟后,我在GCP数据流作业窗口中收到超时错误。

Azure EH已启用Kafka-

为了设置启用Kafka的事件中心,我遵循此GitHub页面上的详细信息。它让开发人员添加jaas。conf和client\u公共。属性。jaas。conf包括对登录模块的引用以及用户名/密码。带有Kafka的活动中心的用户名是$ConnectionString。密码是从CLI复制的连接字符串。客户端公用。属性包含两个标志:<代码>安全性。协议=SASL\U SSL和SASL。机构=普通。通过配置这些文件,我能够使用Kafka CLI工具和Azure事件中心发送和接收数据。我可以通过Azure事件中心看到从生产者到消费者的数据流。

export KAFKA_OPTS="-Djava.security.auth.login.config=jaas.conf"

(echo -n "1|"; cat message.json | jq . -c) | kafka-conle-producer.sh --topic test-event-hub --broker-list test-eh-namespace.servicebus.windows.net:9093 --producer.config client_common.properties --property "parse.key=true" --property "key.separator=|"

kafka-console-consumer.sh --topic test-event-hub --bootstrap-server test-eh-namespace.servicebus.windows.net:9093 --consumer.config client_common.properties --property "print.key=true"
# prints: 1 { "transaction_time": "2020-07-20 15:14:54", "first_name": "Joe", "last_name": "Smith" }

我为Kafka修改了谷歌的数据流模板-

gcloud dataflow jobs run kafka-test --gcs-location=<removed> --region=us-east1 --worker-zone=us-east4-a --parameters bootstrapServers=test-eh-namespace.servicebus.servicebus.windows.net:9093,inputTopic=test-event-hub,outputTableSpec=project:Kafka_Test.test --service-account-email my-service-account.iam.gserviceaccount.com
# these errors show up in the worker logs
Operation ongoing in step ReadFromKafka/KafkaIO.Read/Read(KafkaUnboundedSource)/DataflowRunner.StreamingUnboundedRead.ReadWithIds for at least 05m00s without outputting or completing in state process at java.lang.Thread.sleep(Native Method) at org.apache.kafka.common.utils.SystemTime.sleep(SystemTime.java:45) at org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:366) at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1481) at com.google.cloud.teleport.kafka.connector.KafkaUnboundedSource.updatedSpecWithAssignedPartitions(KafkaUnboundedSource.java:85) at com.google.cloud.teleport.kafka.connector.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:125) at com.google.cloud.teleport.kafka.connector.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:45) at org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReader.iterator(WorkerCustomSources.java:433) at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:186) at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163) at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:92) at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1426) at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:163) at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1105) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

Execution of work for computation 'S4' on key '0000000000000001' failed with uncaught exception. Work will be retried locally.

# this error shows up in the Job log
Error message from worker: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
Map<String, Object> props = new HashMap<>();
// azure event hub authentication
props.put("sasl.mechanism", "PLAIN");
props.put("security.protocol", "SASL_SSL")
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"<removed>\";");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// https://github.com/Azure/azure-event-hubs-for-kafka/blob/master/CONFIGURATION.md
props.put("request.timeout.ms", 60000);
props.put("session.timeout.ms", 15000);
props.put("max.poll.interval.ms", 30000);
props.put("offset.metadata.max.bytes", 1024);
props.put("connections.max.idle.ms", 180000);
props.put("metadata.max.age.ms", 180000);
    PCollectionTuple convertedTableRows =
                pipeline
                        /*
                         * Step #1: Read messages in from Kafka
                         */
                        .apply(
                                "ReadFromKafka",
                                KafkaIO.<String, String>read()
                                        .withConsumerConfigUpdates(ImmutableMap.of(props))
                                        .withBootstrapServers(options.getBootstrapServers())
                                        .withTopics(topicsList)
                                        .withKeyDeserializerAndCoder(
                                                StringDeserializer.class, NullableCoder.of(StringUtf8Coder.of()))
                                        .withValueDeserializerAndCoder(
                                                StringDeserializer.class, NullableCoder.of(StringUtf8Coder.of()))
                                        .withoutMetadata())

                        /*
                         * Step #2: Transform the Kafka Messages into TableRows
                         */
                        .apply("ConvertMessageToTableRow", new MessageToTableRow(options));
  • 是否可以从Apache Beam/Google Cloud Dataflow的Azure EventHub主题进行消费?
  • Kafka到Google Cloud Platform数据流摄取
  • kafka发送到azure事件中心
  • 从Apache Beam(GCP数据流)写入ConFluentCloud

共有1个答案

百里君博
2023-03-14
  1. 配置环境变量
  2. 修改,构建,

此应用程序有一个从GCP数据流模板移植过来的复杂构建过程。构建过程带来了作为依赖项引入的GCP数据流docker映像构建和部署脚本。只需克隆repo即可开始。

  • 安装Google Cloud CLI工具
  • (可选,但如果未完成,则需要从生成/部署命令中删除托管服务ID)托管服务ID
    • 设置GCP托管服务ID
    • 创建静态键
    • 将GCR身份验证助手设置为托管服务ID,以便从本地计算机向GCR进行身份验证

    第一步是设置环境变量,以配置给定应用程序的构建和部署脚本。

    export PROJECT=test-project
    export IMAGE_NAME=test-project
    export BUCKET_NAME=gs://test-project
    export TARGET_GCR_IMAGE=gcr.io/${PROJECT}/${IMAGE_NAME}
    export BASE_CONTAINER_IMAGE=gcr.io/dataflow-templates-base/java8-template-launcher-base
    export BASE_CONTAINER_IMAGE_VERSION=latest
    export TEMPLATE_MODULE=kafka-to-bigquery
    export APP_ROOT=/template/${TEMPLATE_MODULE}
    export COMMAND_SPEC=${APP_ROOT}/resources/${TEMPLATE_MODULE}-command-spec.json
    export TEMPLATE_IMAGE_SPEC=${BUCKET_NAME}/images/${TEMPLATE_MODULE}-image-spec.json
    
    export BOOTSTRAP=<event_grid_name>.servicebus.windows.net:9093
    export TOPICS=<event_grid_topic_name>
    export OUTPUT_TABLE=test-project:<schema>.test
    export AUTHENTICATION_STRING="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"\$ConnectionString\" password=\"<EVENT_GRID_TOPIC_APP_SECRET>\";"
    

    在构建之前,您需要更新/Kafka到bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQuery。java文件,包含处理身份验证字符串的附加内容:

    public class KafkaToBigQuery {
    
        public interface Options extends PipelineOptions {
    
            @Description("Kafka Authentication String")
            @Required
            String getAuthenticationString();
    
            void setAuthenticationString(String authenticationString);
        }
    
        public static PipelineResult run(Options options) {
    
            Map<String, Object> props = new HashMap<>();
            props.put("sasl.mechanism", "PLAIN");
            props.put("security.protocol", "SASL_SSL");
            props.put("sasl.jaas.config", options.getAuthenticationString());
    
    //      https://github.com/Azure/azure-event-hubs-for-kafka/blob/master/CONFIGURATION.md
            props.put("request.timeout.ms", 60000);
            props.put("session.timeout.ms", 15000);
            props.put("max.poll.interval.ms", 30000);
            props.put("offset.metadata.max.bytes", 1024);
    
            props.put("connections.max.idle.ms", 180000);
            props.put("metadata.max.age.ms", 180000);
    
    
            PCollectionTuple convertedTableRows =
                    pipeline
                            /*
                             * Step #1: Read messages in from Kafka
                             */
                            .apply(
                                    "ReadFromKafka",
                                    KafkaIO.<String, String>read()
                                            .withConsumerConfigUpdates(props)
                                            .withBootstrapServers(options.getBootstrapServers())
                                            .withTopics(topicsList)
                                            .withKeyDeserializerAndCoder(
                                                    StringDeserializer.class, NullableCoder.of(StringUtf8Coder.of()))
                                            .withValueDeserializerAndCoder(
                                                    StringDeserializer.class, NullableCoder.of(StringUtf8Coder.of()))
                                            .withoutMetadata())
    
        }
    }
    

    一旦您设置了项目并更改了文件,下一个阶段就是构建docker图像以上载到Google的容器注册中心。此命令还将构建与其他Google服务交互的通用文件。如果构建成功,容器将被推送到Google容器注册中心(GCR)。您可以从GCR部署到Google数据流中。

    mvn clean package -Dimage=${TARGET_GCR_IMAGE} \
        -Dbase-container-image=${BASE_CONTAINER_IMAGE} \
        -Dbase-container-image.version=${BASE_CONTAINER_IMAGE_VERSION} \
        -Dapp-root=${APP_ROOT} \
        -Dcommand-spec=${COMMAND_SPEC} \
        -am -pl ${TEMPLATE_MODULE}
    

    在数据流中启动项目之前,数据流运行程序需要一个Flex模板来知道如何执行项目。Flex模板是一个JSON元数据文件,其中包含构建GCP数据流应用程序的参数和说明。Flex模板必须上传到Google Cloud Storage(GCS)到环境变量设置的相应存储桶名称。此步骤必须匹配此环境变量TEMPLATE_IMAGE_SPEC=${BUCKET_NAME}/image/${TEMPLATE_MODULE}-image-spec.json

    {
      "image": "gcr.io/<my-project-url>:latest",
      "metadata": {
        "name": "Streaming data generator",
        "description": "Generates Synthetic data as per user specified schema at a fixed QPS and writes to Sink of user choice.",
        "parameters": [
          {
            "name": "authenticationString",
            "label": "Kafka Event Hub Authentication String",
            "helpText": "The authentication string for the Azure Event Hub",
            "is_optional": false,
            "regexes": [
              ".+"
            ],
            "paramType": "TEXT"
          },
          {
            "name": "bootstrapServers",
            "label": "Kafka Broker IP",
            "helpText": "The Kafka broker IP",
            "is_optional": false,
            "regexes": [
              ".+"
            ],
            "paramType": "TEXT"
          },
          {
            "name": "inputTopics",
            "label": "PubSub Topic name",
            "helpText": "The name of the topic to which the pipeline should publish data. For example, projects/<project-id>/topics/<topic-name> - should match the Event Grid Topic",
            "is_optional": false,
            "regexes": [
              ".+"
            ],
            "paramType": "PUBSUB_TOPIC"
          },
          {
            "name": "outputTableSpec",
            "label": "Output BigQuery table",
            "helpText": "Output BigQuery table. For example, <project>:<dataset>.<table_name>. Mandatory when sinkType is BIGQUERY.",
            "isOptional": false,
            "regexes": [
              ".+:.+\\..+"
            ],
            "paramType": "TEXT"
          },
          {
            "name": "outputDeadletterTable",
            "label": "Output Deadletter table",
            "helpText": "Output Deadletter table. For example, <project>:<dataset>.<table_name>",
            "isOptional": true,
            "regexes": [
              ".+:.+\\..+"
            ],
            "paramType": "TEXT"
          }
        ]
      },
      "sdk_info": {
        "language": "JAVA"
      }
    }
    

    将图像上载到GCP并上载Flex模板后,即可启动数据流应用程序。这些参数必须与Flex模板元数据部分中包含的参数匹配。

    export JOB_NAME="${TEMPLATE_MODULE}-`date +%Y%m%d-%H%M%S-%N`"
    gcloud beta dataflow flex-template run ${JOB_NAME} \
            --project=${PROJECT} --region=us-east1 \
            --template-file-gcs-location=${TEMPLATE_IMAGE_SPEC} \
            --parameters ^~^outputTableSpec=${OUTPUT_TABLE}~inputTopics=${TOPICS}~bootstrapServers=${BOOTSTRAP}~authenticationString="${AUTHENTICATION_STRING}" \
            --verbosity=info \
            --service-account-email=<service_account_to_execute_service>
    

    运行此命令后,请检查GCP云控制台以查看状态。此时,数据流作业应该能够成功地从Azure事件网格中提取消息并将其插入到Google Big Query中。

    GCP repo假设Google大查询/数据流将动态地使表具有正确的行,但我发现这是一个问题。解决方法是在运行数据流作业之前在Google Big Query中创建模式。

 类似资料:
  • 我们计划在Azure云上使用Spring云数据流,使用Azure EventHub作为消息绑定器。在Azure EventHub上,存在硬限制: 100个名称空间 每个名称空间10个主题 SpringCloudAzure事件中心流绑定器似乎只能配置一个名称空间,那么我们如何管理多个名称空间呢? 也许我们应该使用多个绑定器,以拥有SpringCloudAzure事件中心流绑定器的多个实例? 有人有什

  • 我正在尝试使用Kafka从Kafka主题中读取数据。python代码中的ReadFromKafka()方法。我的代码如下所示: 但下面是错误消息。

  • 我正在尝试构建以下将发布/订阅流式传输到 BigQuery 的示例: https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/PubSubToBigQuery.java 代码为: 使用以下maven 但是我得到错误,比

  • 我已经实现了下面链接中的代码,用于从事件中心接收事件。但是假设有10个事件,每5个事件检查一次。现在程序在读取第7个事件时异常退出,如果我再次重启事件处理器主机,那么事件(1,2,3,4,6)将被重新读取。请建议我如何再次避免重读和阅读第7次事件?任何例子都值得欣赏。谢了。 https://github.com/Azure/azure-event-hubs/blob/master/samples/

  • 使用标准的GCP提供的存储/文本文件来发布Sub数据流模板,但是尽管我已经设置了#workernodes eq 1,但是对于下游组件来说,处理的消息吞吐量“太高”。 在 Pub/Sub 中的消息事件上运行的 Cloud 函数会命中 GCP 配额,并且使用 CloudRun,我在开始时收到一堆 500、429 和 503 个错误(由于步进突发率)。 有没有办法控制数据流的处理速率?需要获得更软/更慢