当前位置: 首页 > 知识库问答 >
问题:

TimeoutException:当我试图从kafka读到flink时,在确定分区位置之前,超时60000ms已过期

方献
2023-03-14

我试图从Kafka中的一个主题读取记录,该主题是由Kafka connect jdbc源连接器生成的,下面是连接器配置:

"name": "customers",
"config": {
    "poll.interval.ms": "3000",
    "table.poll.interval.ms": "10000",
    "errors.log.enable": "false",
    "errors.log.include.messages": "false",
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "topic.prefix": "_",
    "mode": "incrementing",
    "validate.non.null": "true",
    "table.whitelist": "customers",
    "incrementing.column.name": "customer_id",
    "connection.url": "jdbc:sqlserver://demo-sqlserver:1433;databaseName=DemoData",
    "connection.user": "sa",
    "connection.password": "password",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://kafka-schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://kafka-schema-registry:8081",
    "tasks.max": "2"
}
services:

  jobmanager:
    build:
      dockerfile: Dockerfile
      context: .
    volumes:
      - ./examples:/opt/examples
      - ./opt/flink/usrlib:/opt/flink/usrlib
    hostname: "jobmanager"
    expose:
      - "6123"
    ports:
      - "28081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
    networks:
      - flink-proxy-net
  taskmanager:
    build:
      dockerfile: Dockerfile
      context: .
    volumes:
      - ./examples:/opt/examples
      - ./opt/flink/usrlib:/opt/flink/usrlib
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - jobmanager:jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
    networks:
      - flink-proxy-net
FROM apache/flink:1.12-scala_2.11-java11

RUN set -ex 
RUN apt-get update
RUN apt-get -y install python3
RUN apt-get -y install python3-pip 
RUN apt-get -y install python3-dev 
RUN ln -s /usr/bin/python3 /usr/bin/python 
RUN ln -s /usr/bin/pip3 /usr/bin/pip

RUN set -ex; 
RUN apt-get update; 
RUN python -m pip install --upgrade pip; 
RUN pip install apache-flink

ARG FLINK_VERSION=1.12.0

RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-json/${FLINK_VERSION}/flink-json-${FLINK_VERSION}.jar
RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/${FLINK_VERSION}/flink-csv-${FLINK_VERSION}.jar
RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/${FLINK_VERSION}/flink-sql-connector-kafka_2.12-${FLINK_VERSION}.jar
RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-avro/${FLINK_VERSION}/flink-sql-avro-${FLINK_VERSION}.jar

RUN mkdir -p /opt/data; \
    mkdir -p /opt/data/stream

WORKDIR /opt/flink
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings

b_s_env = StreamExecutionEnvironment.get_execution_environment()
b_s_env.set_parallelism(1)
b_s_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
b_s_t_env = StreamTableEnvironment.create(b_s_env, environment_settings=b_s_settings)
src_kafka_ddl="""
  CREATE TABLE SourceTable (
    `age` INT,
    `customer_id` INT,
    `name` STRING
  ) WITH (
    'connector' = 'kafka',
    'topic' = 'dj_customers',
    'properties.bootstrap.servers' = 'kafka1:9092',
    'properties.zookeeper.connect'='zoo1:2181',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'avro'
  )
"""
sink_kafka_ddl="""
  CREATE TABLE SinkTable (
    `customer_id` INT,
    `name` STRING
  ) WITH (
    'connector' = 'kafka',
    'topic' = 'sink_test',
    'properties.bootstrap.servers' = 'kafka1:9092',
    'properties.zookeeper.connect'='zoo1:2181',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'avro'
  )
"""

b_s_t_env.execute_sql(src_kafka_ddl)
b_s_t_env.execute_sql(sink_kafka_ddl)
orders = b_s_t_env.from_path("SourceTable") 
orders.select(orders.customer_id, orders.name).execute_insert("SinkTable").wait()
Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition _customers-0 could be determined. 

    Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:123)
        ... 18 more
    Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
        at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665)
        at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
        at jdk.internal.reflect.GeneratedMethodAccessor41.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition _customers-0 could be determined

共有1个答案

车明贤
2023-03-14

您需要将Flink服务放在与代理相同的Docker网络上

最简单的方法是只对所有内容使用一个compose文件

 类似资料:
  • CalledProcessError:Command'['java','-Dfile.encoding=UTF8','-jar',C:\Users\vijv2c13136\AppData\Local\Continuum\anaconda2\lib\site packages\tabla\tabla-1.0.2-jar-with-dependencies.jar','-pages',all','-g

  • 来自Kafka常见问题页面 在Kafka生产者中,可以指定分区键来指示消息的目标分区。默认情况下,基于散列的分区器用于确定给定键的分区id 因此,具有特定密钥的所有消息将始终转到主题中的同一分区: 消费者如何知道生产者写入了哪个分区,从而可以直接从该分区进行消费 如果生产者多于分区,并且多个生产者正在写入同一分区,那么偏移量是如何排序的,以便消费者可以使用来自特定生产者的消息

  • 问题内容: 我在这个项目中使用该类。在我的地图上,您可以看到许多标记。我有许多已知的位置,但是在我的代码中,例如,我仅显示两个位置。 我不明白如何使用方向API和JSON。如何显示从当前位置(更改)到已知位置(恒定)的路线,距离和行进时间? 问题答案: 看一下本教程: 使用Google Map Android API V2中的Google Directions在两个位置之间绘制行车路线 它显示了如

  • 我们在生产方面面临以下问题: 是因为无效的配置,如批量大小、请求超时或其他原因吗?

  • 我试图连接到我的本地机器上的Kafka(2.1),并在Flink(1.7.2)附带的scalashell中读取。 下面是我正在做的: 之后,最后一条语句我得到了以下错误: 我已经创建了一个名为“topic”的主题,我能够通过另一个客户端正确地生成和读取来自它的消息。我正在使用java版本1.8.0\u 201,并遵循https://ci.apache.org/projects/flink/flin

  • 问题内容: 我正在使用Hibernate,试图模拟2个并发更新到数据库中的同一行。 编辑:我将em1.getTransaction()。commit移到em1.flush()之后;我没有收到任何StaleObjectException,两个事务已成功提交。 我在上遇到以下异常。为什么? 问题答案: 好吧,您正试图陷入僵局,并且成功了:-) Transaction1开始,与您的实体更新(和锁定)行。