当前位置: 首页 > 知识库问答 >
问题:

在Google Dataflow上使用KafkaIO连接到带有SSL的Kafka

龙德义
2023-03-14

从服务器上,我能够连接并从配置了SSL的远程kafka服务器主题获取数据

如果我指向GCS上的证书,当证书指向Google存储桶时,它会抛出错误。


Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

Caused by: org.apache.kafka.common.KafkaException:
 java.io.FileNotFoundException: 
gs:/bucket/folder/truststore-client.jks (No such file or directory)

其次是:Truststore和Google Cloud Dataflow

更新的代码将SSL truststore、keystore位置指向本地机器的/tmp目录认证,以防KafkaIO需要从文件路径读取。它没有抛出FileNotFounderRor。

尝试从GCP帐户运行服务器Java客户机代码,并使用Dataflow-Beam Java管道,我得到以下错误。


ssl.truststore.location = <LOCAL MACHINE CERTICATE FILE PATH>
    ssl.truststore.password = [hidden]
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka version : 1.0.0
org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka commitId : aaa7af6d4a11b29d
org.apache.kafka.common.network.SslTransportLayer close
WARNING: Failed to send SSL Close message 
java.io.IOException: Broken pipe

org.apache.beam.runners.direct.RootProviderRegistry.getInitialInputs(RootProviderRegistry.java:81)
    at org.apache.beam.runners.direct.ExecutorServiceParallelExecutor.start(ExecutorServiceParallelExecutor.java:153)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:205)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
    at 

org.apache.kafka.common.utils.LogContext$KafkaLogger warn
WARNING: [Consumer clientId=consumer-1, groupId=test-group] Connection to node -2 terminated during authentication. This may indicate that authentication failed due to invalid credentials.

任何建议或例子赞赏。

共有1个答案

仇征
2023-03-14

Git将Java Maven项目从本地机器克隆或上传到GCP Cloud Shell主目录。在Cloud Shell终端上使用Dataflow runner命令编译项目。

mvn -Pdataflow-runner compile exec:java \
      -Dexec.mainClass=com.packagename.JavaClass \
      -Dexec.args="--project=PROJECT_ID \
      --stagingLocation=gs://BUCKET/PATH/ \
      --tempLocation=gs://BUCKET/temp/ \
      --output=gs://BUCKET/PATH/output \
      --runner=DataflowRunner"

确保运行程序设置为dataflowrunnner.class,并且在云上运行作业时,可以在Dataflow控制台上看到作业。DirectRunner执行不会显示在云数据流控制台上。

将证书放在Maven项目中的resources文件夹中,并使用ClassLoader读取文件。

ClassLoader classLoader = getClass().getClassLoader();
File file = new File(classLoader.getResource("keystore.jks").getFile());    
resourcePath.put("keystore.jks",file.getAbsoluteFile().getPath());

按照https://stackoverflow.com/a/53549757/4250322中的描述,编写一个ConsumerFactoryFn()以复制DataFlow的“/tmp/”目录中的证书

将KafkaIO与资源路径属性一起使用。

Properties props = new Properties();
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/tmp/truststore.jks");    
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/tmp/keystore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,  PASSWORD);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,  PASSWORD); 
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,  PASSWORD);

//other properties
...

PCollection<String> collection = p.apply(KafkaIO.<String, String>read()
                .withBootstrapServers(BOOTSTRAP_SERVERS)
                .withTopic(TOPIC)                                
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)                
                .updateConsumerProperties(props)
                .withConsumerFactoryFn(new ConsumerFactoryFn())
                .withMaxNumRecords(50)
                .withoutMetadata()
        ).apply(Values.<String>create());

// Apply Beam transformations and write to output.

 类似资料:
  • 我想让SSL和Kafka一起运行,让它更安全。我下载了Kafka并安装了它。我按照说明为SSL创建证书和信任库,没有任何问题。我将以下内容添加到我的config/server.properties中 启动Zookeeper后,我在启动kafak时收到此错误:[2017-12-07 16:02:52,155]ERROR[Controller id=0, targetBrokerId=0]连接到节点0

  • 我有简单的Spring启动应用程序和具有工作SSL连接的Kafka(其他应用程序,不是Spring启动,已成功连接)。我无法访问Kafka经纪人的属性。我的应用是Kafka的客户端。这个应用程序在库伯内特斯内部的容器中运行。我的Spring启动可以访问密钥库.p12,ca-cert,Kafka佩姆,Kafka.key文件(它位于容器内的目录中)。 在配置中我使用 每次我收到错误 我尝试了不同的变化

  • 问题内容: 我的意思是说非常简单。我想通过安全连接从PHP脚本连接到外部MS SQL数据库。然而,事实证明这是有问题的,到目前为止,花了三个小时进行研究,我很茫然。 客户端的平台是Ubuntu,这意味着我无法使用SQLSRV。安全连接已与其他客户端进行了测试,并且工作正常。我目前正在使用PDO和DBlib连接到数据库,这也可以正常工作。 我找不到能强制建立安全连接的任何方法。我尝试了多种其他驱动程

  • 我有一个Spring Boot应用程序(版本2.1.1),使用Postgresql 9.6作为数据库。我必须使用sslmode=verify ca通过SSL连接到db。到目前为止,我所做的是在申请表中设置。属性文件属性 有没有办法在其他一些Spring属性中指定ssl属性而不是在连接url中? 此外,还可以为证书指定相对路径,而不是使用绝对路径?

  • 我们所面临的问题已在许多文件中得到充分证明https://stackoverflow.com/questions/34189756/warning-about-ssl-connection-when-connecting-to-mysql-database. 从过渡到时,我们就开始面临这个问题。建议的修复方法对我们有效,但我们有一个问题,我们不想更新Java源文件以进行更改,例如从 到 正如在ht

  • 我在一台主机上安装了一台mysql服务器,我希望通过加密连接从另一台主机连接到mysql服务器(两者都运行Ubuntu12.04)。我已经创建并传输了证书,并手动测试了设置。我可以从Linux命令行使用密钥和证书连接到远程mysql服务器,因此服务器配置似乎是正确的: 我不知道怎么了,有什么想法吗?谢了!