当前位置: 首页 > 知识库问答 >
问题:

当架构注册表要求TLS客户端身份验证时,无法从数据流作业连接到架构注册表

罗智志
2023-03-14

我正在开发一个使用Kafka代理和模式注册表的GCP云数据流作业。我们的Kafka broker和Schema Registry需要TLS客户端证书。我在部署时面临着与模式注册中心的连接问题。任何建议都非常欢迎。

这是我为数据流工作做的。我为TLS配置创建消费者属性。

props.put("security.protocol", "SSL");
props.put("ssl.truststore.password", "aaa");
props.put("ssl.keystore.password", "bbb");
props.put("ssl.key.password", "ccc"));
props.put("schema.registry.url", "https://host:port")
props.put("specific.avro.reader", true);

并通过update Consumer Properties更新使用者属性。

Pipeline p = Pipeline.create(options)
...
.updateConsumerProperties(properties)
... 

正如这个stackoverflow答案所建议的,我还将密钥库和信任库下载到本地目录,并在ConsumerFactory中的ConsumerProperties上指定信任库/密钥库的位置。

信任商店和谷歌云数据流

Pipeline p = Pipeline.create(options)
 ...
 .withConsumerFactoryFn(new MyConsumerFactory(...))
 ...

在消费者工厂:

public Consumer<byte[], byte[]> apply(Map<String, Object> config)  {
  // download keyStore and trustStore from GCS bucket 
  config.put("ssl.truststore.location", (Object)localTrustStoreFilePath)
  config.put("ssl.keystore.location", (Object)localKeyStoreFilePath)
  new KafkaConsumer<byte[], byte[]>(config);
}

使用此代码,我已成功部署,但数据流作业收到 TLS 服务器证书验证错误。

Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
        sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387)
        sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:292)
        sun.security.validator.Validator.validate(Validator.java:260)
        sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:559)
        sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185)
        sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1513)
        sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441)
        java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
        sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338)
        io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208)
        io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
        io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482)
        io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475)
        io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:151)
        io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:230)
        io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:209)
        io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116)
        io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88)
        org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:14)
        org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:7)
        org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:234)
        org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:176)
        org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:779)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)

然后我发现架构注册表客户端从系统属性加载TLS配置。https://github.com/confluentinc/schema-registry/issues/943

我使用相同的配置测试了 Kafka 消费者,并确认它工作正常。

props.put("schema.registry.url", "https://host:port")
props.put("specific.avro.reader", true);
props.put("ssl.truststore.location", System.getProperty("javax.net.ssl.trustStore"));
props.put("ssl.truststore.password", System.getProperty("javax.net.ssl.keyStore"));
props.put("ssl.keystore.location", System.getProperty("javax.net.ssl.keyStore"));
props.put("ssl.keystore.password", System.getProperty("javax.net.ssl.keyStorePassword"));
props.put("ssl.key.password", System.getProperty("javax.net.ssl.key.password"));

接下来,我将相同的方法应用于数据流作业代码,这意味着将相同的TLS配置应用于系统属性和消费者属性。

我在执行应用程序时通过系统属性指定了密码。

-Djavax.net.ssl.keyStorePassword=aaa \
-Djavax.net.ssl.key.password=bbb \
-Djavax.net.ssl.trustStorePassword=ccc \

注意:我在消费者工厂中为信任库和密钥库位置设置了系统属性,因为这些文件被下载到本地临时目录。

config.put("ssl.truststore.location", (Object)localTrustStoreFilePath)
config.put("ssl.keystore.location", (Object)localKeyStoreFilePath)
System.setProperty("javax.net.ssl.trustStore", localTrustStoreFilePath)
System.setProperty("javax.net.ssl.keyStore", localKeyStoreFilePath)

但即使部署也因超时错误而失败。

Exception in thread "main" java.lang.RuntimeException: Failed to construct instance from factory method DataflowRunner#fromOptions(interface org.apache.beam.sdk.options.PipelineOptions)
        at org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224)
...
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
...
Caused by: java.lang.IllegalArgumentException: DataflowRunner requires gcpTempLocation, but failed to retrieve a value from PipelineOptions
        at org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:246)
Caused by: java.lang.IllegalArgumentException: Error constructing default value for gcpTempLocation: tempLocation is not a valid GCS path, gs://dev-k8s-rfid-store-dataflow/rfid-store-siv-epc-transactions-to-bq/tmp. 
        at org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:255)
...
Caused by: java.lang.RuntimeException: Unable to verify that GCS bucket gs://dev-k8s-rfid-store-dataflow exists.
        at org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator.verifyPathIsAccessible(GcsPathValidator.java:86)
...
Caused by: java.io.IOException: Error getting access token for service account: java.security.NoSuchAlgorithmException: Error constructing implementation (algorithm: Default, provider: SunJSSE, class: sun.security.ssl.SSLContextImpl$DefaultSSLContext)
        at com.google.auth.oauth2.ServiceAccountCredentials.refreshAccessToken(ServiceAccountCredentials.java:401)
...
Caused by: java.net.SocketException: java.security.NoSuchAlgorithmException: Error constructing implementation (algorithm: Default, provider: SunJSSE, class: sun.security.ssl.SSLContextImpl$DefaultSSLContext)
        at javax.net.ssl.DefaultSSLSocketFactory.throwException(SSLSocketFactory.java:248)
...
Caused by: java.security.NoSuchAlgorithmException: Error constructing implementation (algorithm: Default, provider: SunJSSE, class: sun.security.ssl.SSLContextImpl$DefaultSSLContext)
        at java.security.Provider$Service.newInstance(Provider.java:1617)
...
Caused by: java.io.IOException: Keystore was tampered with, or password was incorrect
    at sun.security.provider.JavaKeyStore.engineLoad(JavaKeyStore.java:780)
Caused by: java.security.UnrecoverableKeyException: Password verification failed
    at sun.security.provider.JavaKeyStore.engineLoad(JavaKeyStore.java:778)

我错过什么了吗?

共有2个答案

壤驷心思
2023-03-14

我得到了GCP支持部门的回复。Apache Beam似乎不支持模式注册表。

您好,数据流专家已与我联系。我现在将揭露他们告诉我的事情。

你的问题的答案是不,Apache Beam不支持Schema注册表。但是,他们告诉我,您可以自己实现对Schema注册表的调用,因为Beam只消耗原始消息,用户有责任对数据做任何他们需要的事情。

这是基于我们对您希望将消息发布到Kafka的情况的理解,并让DF使用这些消息,根据注册表中的模式解析它们。

我希望这些信息对您有用,请告诉我是否可以提供进一步的帮助。

但是数据流作业仍然可以接收Avro格式的二进制消息。所以您在内部调用Schema Registry REST API,如下所示。https://stackoverflow.com/a/55917157

程城
2023-03-14

在< code>ConsumerFactoryFn中,您需要将证书从某个位置(如GCS)复制到机器上的本地文件路径。

在Truststore和Google Cloud Dataflow中,用户编写的消费者FnFactory具有从GCS获取信任库的代码片段:

            Storage storage = StorageOptions.newBuilder()
                    .setProjectId("prj-id-of-your-bucket")
                    .setCredentials(GoogleCredentials.getApplicationDefault())
                    .build()
                    .getService();
            Blob blob = storage.get("your-bucket-name", "pth.to.your.kafka.client.truststore.jks");
            ReadChannel readChannel = blob.reader();
            FileOutputStream fileOuputStream;
            fileOuputStream = new FileOutputStream("/tmp/kafka.client.truststore.jks"); //path where the jks file will be stored
            fileOuputStream.getChannel().transferFrom(readChannel, 0, Long.MAX_VALUE);
            fileOuputStream.close();
            File f = new File("/tmp/kafka.client.truststore.jks"); //assuring the store file exists
            if (f.exists())
            {
                LOG.debug("key exists");

            }
            else
            {
                LOG.error("key does not exist");

            }

您需要执行类似操作(它不一定是GCS,但确实需要从在Google云数据流上执行管道的所有虚拟机访问)。

 类似资料:
  • 我们已将Schema注册表和Kafka Connect设置为使用基本身份验证。一些连接器似乎正在运行。但其中一些出现错误: " io . confluent . Kafka . schemaregistry . client . rest . exceptions . rest client exception:未授权;错误代码:401 " 如何给Kafka Connect提供模式注册的凭证?我应

  • 我有一个docker容器运行AWS弹性容器服务(Fargate)中的confluentinc/cp模式注册表:5.5.0。只有一个容器正在运行。通过该模式注册表获取当前注册模式的API调用正在工作(例如,

  • 我正在尝试从模式注册表中检索给定kafka主题的模式主题版本。我可以使用成功发布新版本,但我不确定如何检索版本。我在下面尝试使用curl请求,但结果立即命中-1(空)。 我如何修复这个GET请求,或者更好的是,我应该如何使用模式注册中心来检索一个模式?

  • 嘿,我想将ConFluent模式注册表与Avro Serializers一起使用:留档现在基本上是说:不要为多个不同的主题使用相同的模式 谁能解释一下原因吗?我重新搜索了源代码,它基本上将模式存储在Kafka主题中,如下所示(topicname,magicbytes,version- 因此,除了冗余之外,我看不到多次使用模式的问题?

  • 对于跨网络汇流平台,我们有一个kafka集群在Premise上,另一个在AWS上,其中数据使用mirror Maker从on-prem复制到AWS。这两个集群都独立于它们自己的模式注册表、rest代理和Connect,这两个集群都有不同的生产者和消费者集,并且选择的主题在集群之间被镜像。 部署schema-registry的最佳实践应该是什么?我们是否应该在on-prem和AWS上有一个主服务器(

  • 我正在使用Avro模式向Kafka主题写入数据。起初,一切正常。在avro文件中添加多一个新字段(scan_app_id)后。我正面临这个错误。 Avro文件:{ “type”:“record”,“name”:“initiate_scan”,“namespace”:“avro”,“doc”:“initiate_scan的avro架构注册表”,“fields”:[{“name”:“app_id”,“