当前位置: 首页 > 知识库问答 >
问题:

在Dataflow上运行的Apache Beam管道无法从Kafkaio读取:SSL握手失败

薛望
2023-03-14

我正在构建一个Apache Beam管道,将Kafka作为一个无界源进行阅读。

我能够使用直接运行器在本地运行它。

然而,当在云上使用Google Cloud Dataflow runner运行时,管道会因附加的异常堆栈跟踪而失败。

似乎最终是Conscrypt Java库抛出了javax.net.ssl.sslException:无法解析TLS数据包头。我真的不知道如何解决这个问题。

java.io.IOException: Failed to start reading from source: org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@33b5ff70
        com.google.cloud.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:783)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:360)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:193)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
        com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
        org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:126)
        com.google.cloud.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:778)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:360)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:193)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
        com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
        java.util.concurrent.FutureTask.report(FutureTask.java:122)
        java.util.concurrent.FutureTask.get(FutureTask.java:206)
        org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:112)
        com.google.cloud.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:778)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:360)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:193)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
        com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLException: Unable to parse TLS packet header
        org.conscrypt.ConscryptEngine.unwrap(ConscryptEngine.java:782)
        org.conscrypt.ConscryptEngine.unwrap(ConscryptEngine.java:723)
        org.conscrypt.ConscryptEngine.unwrap(ConscryptEngine.java:688)
        org.conscrypt.Java8EngineWrapper.unwrap(Java8EngineWrapper.java:236)
        org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:464)
        org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:328)
        org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:255)
        org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:79)
        org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:460)
        org.apache.kafka.common.network.Selector.poll(Selector.java:398)
        org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
        org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238)
        org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
        org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:190)
        org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:219)
        org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:205)
        org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:468)
        org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.refreshCommittedOffsetsIfNeeded(ConsumerCoordinator.java:450)
        org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1772)
        org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1411)
        org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.setupInitialOffset(KafkaUnboundedReader.java:641)
        org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.lambda$start$0(KafkaUnboundedReader.java:106)
        java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        java.util.concurrent.FutureTask.run(FutureTask.java:266)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)

共有1个答案

禹昆
2023-03-14

看起来Conscrypt在许多cottext中导致SSL错误,就像这样。BEAM2.9.0中的数据流工作人员有一个禁用此选项。请试试看。--experiment=disable_conscrypt_security_provider。或者,您可以尝试BEAM2.4.x,它不启用conscrypt。

 类似资料:
  • 我试图在Beam管道完成后,在Google DataFlow上运行一个函数(或管道)。 目前,我已经构建了一个hack来运行该函数,方法是使用 ... func在哪里: 但是有更好的方法吗?

  • 我正在学习SSL通信,我遇到了这个问题。我正在编写一个简单的客户端,它试图与本地apache服务器握手。服务器启用https。我将服务器证书添加到所有可能的信任存储(jdk中的一个 注意:我从以下教程中获取了代码: http://docs.oracle.com/javase/7/docs/technotes/guides/security/jsse/JSSERefGuide.html#KRB 停留

  • 我通过受ssl v3保护的cxf使用soap服务。我从服务器下载.cer文件,并通过keytool使用以下指令创建JKS文件: 在java代码中,我将此代码用于客户端配置: 对于调用此代码的服务: 当我运行代码时,会发生此错误: 我搜索此错误,我意识到该错误是针对不良信任存储的。但我不知道如何生成正确的信任库。

  • 从服务器上,我能够连接并从配置了SSL的远程kafka服务器主题获取数据。 如果我指向GCS上的证书,当证书指向Google存储桶时,它会抛出错误。 其次是:Truststore和Google Cloud Dataflow 更新的代码将SSL truststore、keystore位置指向本地机器的/tmp目录认证,以防KafkaIO需要从文件路径读取。它没有抛出FileNotFounderRor

  • 问题内容: 我正在尝试将Jenkins CI配置为对我们的项目执行持续集成,并且无法使其通过https连接到我们的SVN存储库。每当我尝试配置存储库URL并尝试连接时,都会遇到以下异常: 我在tomcat实例上启用了SSL调试(使用),并得到了以下信息: 我尝试按照这篇文章中的说明在tomcat中添加属性,但仍然出现相同的错误。 在这一点上,我对发生的事情完全感到困惑。不幸的是,我不是完全了解SS

  • 问题内容: 我已经将证书导入到信任库中,但是仍然无法成功连接到该URL。我已经尝试了所有方法,任何人都可以看到输出并提供帮助吗? 无法弄清楚这是什么,应用程序正在使用java1.6,但是SSLPoke无法通过两种情况 问题答案: 我发现客户也进行了验证。因此它是2路身份验证。客户还必须将我的公共证书导入其密钥库。