<dependency>
<groupId>io.streamnative.connectors</groupId>
<artifactId>pulsar-flink-connector_2.11</artifactId>
<version>1.12.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-auth-sasl</artifactId>
<version>2.7.0</version>
</dependency>
当在flink上提交如下代码,会出错:
package org.happy.test.streaming;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions;
import java.util.Properties;
public class AuthTest1 {
private static String krb5Conf = "/etc/krb5.conf";
private static String jassConf = "/home/stream/keytab/pulsar_client_jaas.conf";
private static String serviceUrl = "pulsar://host1:6650,host2:6650,host3:6650";
private static String adminUrl = "http://host1:8080,host2:8080,host3:8080";
public static void main(String[] args) {
// System.setProperty("java.security.auth.login.config", jassConf);
// System.setProperty("java.security.krb5.conf", krb5Conf);
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
String topic = "persistent://public/default/auth-test1";
props.setProperty("topic", topic);
props.setProperty("partition.discovery.interval-millis", "5000");
props.setProperty(PulsarOptions.AUTH_PLUGIN_CLASSNAME_KEY, "org.apache.pulsar.client.impl.auth.AuthenticationSasl");
props.setProperty(PulsarOptions.AUTH_PARAMS_KEY, "{\"saslJaasClientSectionName\":\"PulsarClient\", \"serverType\":\"broker\"}");
FlinkPulsarSource<String> source = new FlinkPulsarSource<>(
serviceUrl,
adminUrl,
new SimpleStringSchema(),
props
);
source.setStartFromEarliest();
DataStream<String> sourceStream = see.setParallelism(1).addSource(source);
System.out.println("begin test---------" + topic);
sourceStream.print();
// sourceStream.addSink(sink);
try {
see.execute("source-with-auth1");
} catch (Exception e) {
e.printStackTrace();
}
}
}
错误如下:
2021-10-25 14:48:28,086 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom Source -> Sink: Print to Std. Out (1/1) (06b8ec87b0d46f663e4cf71d3eb8af76) switched from RUNNING to FAILED on container_e62_1635127811924_0011_01_000002 @ host16.rscp.ccdc (dataPort=40906).
org.apache.pulsar.client.admin.PulsarAdminException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.admin.PulsarAdminException$GettingAuthenticationDataException: java.lang.NoSuchFieldError: INIT_AUTH_DATA
at org.apache.pulsar.client.admin.internal.BaseResource.getApiException(BaseResource.java:234) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.TopicsImpl$7.failed(TopicsImpl.java:457) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.BaseResource.asyncGetRequest(BaseResource.java:167) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.TopicsImpl.getPartitionedTopicMetadataAsync(TopicsImpl.java:447) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.TopicsImpl.getPartitionedTopicMetadata(TopicsImpl.java:431) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.getTopicPartitionsAll(PulsarMetadataReader.java:364) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.getTopicPartitions(PulsarMetadataReader.java:354) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.discoverTopicChanges(PulsarMetadataReader.java:135) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource.open(FlinkPulsarSource.java:453) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:428) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:545) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:535) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:575) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_271]
Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.admin.PulsarAdminException$GettingAuthenticationDataException: java.lang.NoSuchFieldError: INIT_AUTH_DATA
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_271]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) ~[?:1.8.0_271]
at org.apache.pulsar.client.admin.internal.BaseResource.request(BaseResource.java:68) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.BaseResource.asyncGetRequest(BaseResource.java:165) ~[pulsar-client-all-2.7.2.jar:2.7.2]
... 16 more
Caused by: org.apache.pulsar.client.admin.PulsarAdminException$GettingAuthenticationDataException: java.lang.NoSuchFieldError: INIT_AUTH_DATA
at org.apache.pulsar.client.admin.internal.BaseResource.requestAsync(BaseResource.java:111) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.BaseResource.request(BaseResource.java:68) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.BaseResource.asyncGetRequest(BaseResource.java:165) ~[pulsar-client-all-2.7.2.jar:2.7.2]
... 16 more
Caused by: java.lang.NoSuchFieldError: INIT_AUTH_DATA
at org.apache.pulsar.client.impl.auth.AuthenticationSasl.newRequestHeader(AuthenticationSasl.java:264) ~[quickstart-1.0-shaded.jar:?]
at org.apache.pulsar.client.impl.auth.AuthenticationSasl.newRequestBuilder(AuthenticationSasl.java:198) ~[quickstart-1.0-shaded.jar:?]
at org.apache.pulsar.client.impl.auth.AuthenticationSasl.authenticationStage(AuthenticationSasl.java:307) ~[quickstart-1.0-shaded.jar:?]
at org.apache.pulsar.client.admin.internal.BaseResource.requestAsync(BaseResource.java:82) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.BaseResource.request(BaseResource.java:68) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.BaseResource.asyncGetRequest(BaseResource.java:165) ~[pulsar-client-all-2.7.2.jar:2.7.2]
... 16 more
2021-10-25 14:48:28,116 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0.
2021-10-25 14:48:28,117 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 1 tasks should be restarted to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0.
2021-10-25 14:48:28,118 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job source-with-auth1 (f0487cef8297918c0f5a5d48b4b612b7) switched from state RUNNING to FAILING.
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:446) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_271]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_271]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_271]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_271]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.12.4.jar:1.12.4]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.12.4.jar:1.12.4]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.12.4.jar:1.12.4]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.12.4.jar:1.12.4]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.12.4.jar:1.12.4]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.12.4.jar:1.12.4]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.12.4.jar:1.12.4]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.12.4.jar:1.12.4]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.12.4.jar:1.12.4]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.12.4.jar:1.12.4]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.12.4.jar:1.12.4]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.12.4.jar:1.12.4]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.12.4.jar:1.12.4]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.12.4.jar:1.12.4]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.12.4.jar:1.12.4]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.12.4.jar:1.12.4]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.12.4.jar:1.12.4]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.12.4.jar:1.12.4]
Caused by: org.apache.pulsar.client.admin.PulsarAdminException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.admin.PulsarAdminException$GettingAuthenticationDataException: java.lang.NoSuchFieldError: INIT_AUTH_DATA
at org.apache.pulsar.client.admin.internal.BaseResource.getApiException(BaseResource.java:234) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.TopicsImpl$7.failed(TopicsImpl.java:457) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.BaseResource.asyncGetRequest(BaseResource.java:167) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.TopicsImpl.getPartitionedTopicMetadataAsync(TopicsImpl.java:447) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.TopicsImpl.getPartitionedTopicMetadata(TopicsImpl.java:431) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.getTopicPartitionsAll(PulsarMetadataReader.java:364) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.getTopicPartitions(PulsarMetadataReader.java:354) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.discoverTopicChanges(PulsarMetadataReader.java:135) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource.open(FlinkPulsarSource.java:453) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:428) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:545) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:535) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:575) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_271]
Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.admin.PulsarAdminException$GettingAuthenticationDataException: java.lang.NoSuchFieldError: INIT_AUTH_DATA
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_271]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) ~[?:1.8.0_271]
at org.apache.pulsar.client.admin.internal.BaseResource.request(BaseResource.java:68) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.BaseResource.asyncGetRequest(BaseResource.java:165) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.TopicsImpl.getPartitionedTopicMetadataAsync(TopicsImpl.java:447) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.TopicsImpl.getPartitionedTopicMetadata(TopicsImpl.java:431) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.getTopicPartitionsAll(PulsarMetadataReader.java:364) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.getTopicPartitions(PulsarMetadataReader.java:354) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.discoverTopicChanges(PulsarMetadataReader.java:135) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource.open(FlinkPulsarSource.java:453) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:428) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:545) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:535) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:575) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_271]
Caused by: org.apache.pulsar.client.admin.PulsarAdminException$GettingAuthenticationDataException: java.lang.NoSuchFieldError: INIT_AUTH_DATA
at org.apache.pulsar.client.admin.internal.BaseResource.requestAsync(BaseResource.java:111) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.BaseResource.request(BaseResource.java:68) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.BaseResource.asyncGetRequest(BaseResource.java:165) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.TopicsImpl.getPartitionedTopicMetadataAsync(TopicsImpl.java:447) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.TopicsImpl.getPartitionedTopicMetadata(TopicsImpl.java:431) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.getTopicPartitionsAll(PulsarMetadataReader.java:364) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.getTopicPartitions(PulsarMetadataReader.java:354) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.discoverTopicChanges(PulsarMetadataReader.java:135) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource.open(FlinkPulsarSource.java:453) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:428) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:545) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:535) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:575) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_271]
Caused by: java.lang.NoSuchFieldError: INIT_AUTH_DATA
at org.apache.pulsar.client.impl.auth.AuthenticationSasl.newRequestHeader(AuthenticationSasl.java:264) ~[quickstart-1.0-shaded.jar:?]
at org.apache.pulsar.client.impl.auth.AuthenticationSasl.newRequestBuilder(AuthenticationSasl.java:198) ~[quickstart-1.0-shaded.jar:?]
at org.apache.pulsar.client.impl.auth.AuthenticationSasl.authenticationStage(AuthenticationSasl.java:307) ~[quickstart-1.0-shaded.jar:?]
at org.apache.pulsar.client.admin.internal.BaseResource.requestAsync(BaseResource.java:82) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.BaseResource.request(BaseResource.java:68) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.BaseResource.asyncGetRequest(BaseResource.java:165) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.TopicsImpl.getPartitionedTopicMetadataAsync(TopicsImpl.java:447) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.TopicsImpl.getPartitionedTopicMetadata(TopicsImpl.java:431) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.getTopicPartitionsAll(PulsarMetadataReader.java:364) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.getTopicPartitions(PulsarMetadataReader.java:354) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.discoverTopicChanges(PulsarMetadataReader.java:135) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource.open(FlinkPulsarSource.java:453) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:428) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:545) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:535) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:575) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_271]
2021-10-25 14:48:28,126 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job source-with-auth1 (f0487cef8297918c0f5a5d48b4b612b7) switched from state FAILING to FAILED.
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:446) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_271]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_271]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_271]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_271]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.12.4.jar:1.12.4]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.12.4.jar:1.12.4]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.12.4.jar:1.12.4]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.12.4.jar:1.12.4]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.12.4.jar:1.12.4]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.12.4.jar:1.12.4]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.12.4.jar:1.12.4]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.12.4.jar:1.12.4]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.12.4.jar:1.12.4]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.12.4.jar:1.12.4]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.12.4.jar:1.12.4]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.12.4.jar:1.12.4]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.12.4.jar:1.12.4]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.12.4.jar:1.12.4]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.12.4.jar:1.12.4]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.12.4.jar:1.12.4]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.12.4.jar:1.12.4]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.12.4.jar:1.12.4]
Caused by: org.apache.pulsar.client.admin.PulsarAdminException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.admin.PulsarAdminException$GettingAuthenticationDataException: java.lang.NoSuchFieldError: INIT_AUTH_DATA
at org.apache.pulsar.client.admin.internal.BaseResource.getApiException(BaseResource.java:234) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.TopicsImpl$7.failed(TopicsImpl.java:457) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.BaseResource.asyncGetRequest(BaseResource.java:167) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.TopicsImpl.getPartitionedTopicMetadataAsync(TopicsImpl.java:447) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.TopicsImpl.getPartitionedTopicMetadata(TopicsImpl.java:431) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.getTopicPartitionsAll(PulsarMetadataReader.java:364) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.getTopicPartitions(PulsarMetadataReader.java:354) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.discoverTopicChanges(PulsarMetadataReader.java:135) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource.open(FlinkPulsarSource.java:453) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:428) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:545) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:535) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:575) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_271]
Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.admin.PulsarAdminException$GettingAuthenticationDataException: java.lang.NoSuchFieldError: INIT_AUTH_DATA
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_271]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) ~[?:1.8.0_271]
at org.apache.pulsar.client.admin.internal.BaseResource.request(BaseResource.java:68) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.BaseResource.asyncGetRequest(BaseResource.java:165) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.TopicsImpl.getPartitionedTopicMetadataAsync(TopicsImpl.java:447) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.TopicsImpl.getPartitionedTopicMetadata(TopicsImpl.java:431) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.getTopicPartitionsAll(PulsarMetadataReader.java:364) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.getTopicPartitions(PulsarMetadataReader.java:354) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.discoverTopicChanges(PulsarMetadataReader.java:135) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource.open(FlinkPulsarSource.java:453) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:428) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:545) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:535) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:575) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_271]
Caused by: org.apache.pulsar.client.admin.PulsarAdminException$GettingAuthenticationDataException: java.lang.NoSuchFieldError: INIT_AUTH_DATA
at org.apache.pulsar.client.admin.internal.BaseResource.requestAsync(BaseResource.java:111) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.BaseResource.request(BaseResource.java:68) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.BaseResource.asyncGetRequest(BaseResource.java:165) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.TopicsImpl.getPartitionedTopicMetadataAsync(TopicsImpl.java:447) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.TopicsImpl.getPartitionedTopicMetadata(TopicsImpl.java:431) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.getTopicPartitionsAll(PulsarMetadataReader.java:364) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.getTopicPartitions(PulsarMetadataReader.java:354) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.discoverTopicChanges(PulsarMetadataReader.java:135) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource.open(FlinkPulsarSource.java:453) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:428) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:545) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:535) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:575) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_271]
Caused by: java.lang.NoSuchFieldError: INIT_AUTH_DATA
at org.apache.pulsar.client.impl.auth.AuthenticationSasl.newRequestHeader(AuthenticationSasl.java:264) ~[quickstart-1.0-shaded.jar:?]
at org.apache.pulsar.client.impl.auth.AuthenticationSasl.newRequestBuilder(AuthenticationSasl.java:198) ~[quickstart-1.0-shaded.jar:?]
at org.apache.pulsar.client.impl.auth.AuthenticationSasl.authenticationStage(AuthenticationSasl.java:307) ~[quickstart-1.0-shaded.jar:?]
at org.apache.pulsar.client.admin.internal.BaseResource.requestAsync(BaseResource.java:82) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.BaseResource.request(BaseResource.java:68) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.BaseResource.asyncGetRequest(BaseResource.java:165) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.TopicsImpl.getPartitionedTopicMetadataAsync(TopicsImpl.java:447) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.pulsar.client.admin.internal.TopicsImpl.getPartitionedTopicMetadata(TopicsImpl.java:431) ~[pulsar-client-all-2.7.2.jar:2.7.2]
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.getTopicPartitionsAll(PulsarMetadataReader.java:364) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.getTopicPartitions(PulsarMetadataReader.java:354) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader.discoverTopicChanges(PulsarMetadataReader.java:135) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource.open(FlinkPulsarSource.java:453) ~[quickstart-1.0-shaded.jar:?]
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:428) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:545) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:535) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:575) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573) ~[flink-dist_2.11-1.12.4.jar:1.12.4]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_271]
2021-10-25 14:48:28,129 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Stopping checkpoint coordinator for job f0487cef8297918c0f5a5d48b4b612b7.
2021-10-25 14:48:28,132 INFO org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] - Shutting down
pulsar-client的2.7.2
版本,而部署的集群版本是2.7.0
;/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.api;
import static java.nio.charset.StandardCharsets.UTF_8;
import lombok.Data;
/**
* Authentication data.
*/
@Data(staticConstructor = "of")
public final class AuthData {
// CHECKSTYLE.OFF: StaticVariableName
public static byte[] INIT_AUTH_DATA = "PulsarAuthInit".getBytes(UTF_8);
// CHECKSTYLE.ON: StaticVariableName
private final byte[] bytes;
public boolean isComplete() {
return bytes == null;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.api;
import static java.nio.charset.StandardCharsets.UTF_8;
import lombok.Data;
/**
* Authentication data.
*/
@Data(staticConstructor = "of")
public final class AuthData {
// CHECKSTYLE.OFF: StaticVariableName
public static byte[] INIT_AUTH_DATA_BYTES = "PulsarAuthInit".getBytes(UTF_8);
public static byte[] REFRESH_AUTH_DATA_BYTES = "PulsarAuthRefresh".getBytes(UTF_8);
public static AuthData INIT_AUTH_DATA = AuthData.of(INIT_AUTH_DATA_BYTES);
public static AuthData REFRESH_AUTH_DATA = AuthData.of(REFRESH_AUTH_DATA_BYTES);
// CHECKSTYLE.ON: StaticVariableName
private final byte[] bytes;
public boolean isComplete() {
return bytes == null;
}
}
4.两个版本的AuthData版本不一致,因此导致不兼容。因此,2.7.2的Client也应该用2.7.2的pulsar-client-auth-sasl
,更新该Jar包的版本后,即可解决问题。
在pom文件中,将pulsar-client-auth-sasl
的版本改为2.7.2即可。