我试图从Kafka中的一个主题读取记录,该主题是由Kafka connect jdbc源连接器生成的,下面是连接器配置:
"name": "customers",
"config": {
"poll.interval.ms": "3000",
"table.poll.interval.ms": "10000",
"errors.log.enable": "false",
"errors.log.include.messages": "false",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"topic.prefix": "_",
"mode": "incrementing",
"validate.non.null": "true",
"table.whitelist": "customers",
"incrementing.column.name": "customer_id",
"connection.url": "jdbc:sqlserver://demo-sqlserver:1433;databaseName=DemoData",
"connection.user": "sa",
"connection.password": "password",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://kafka-schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://kafka-schema-registry:8081",
"tasks.max": "2"
}
services:
jobmanager:
build:
dockerfile: Dockerfile
context: .
volumes:
- ./examples:/opt/examples
- ./opt/flink/usrlib:/opt/flink/usrlib
hostname: "jobmanager"
expose:
- "6123"
ports:
- "28081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
networks:
- flink-proxy-net
taskmanager:
build:
dockerfile: Dockerfile
context: .
volumes:
- ./examples:/opt/examples
- ./opt/flink/usrlib:/opt/flink/usrlib
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- jobmanager:jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
networks:
- flink-proxy-net
FROM apache/flink:1.12-scala_2.11-java11
RUN set -ex
RUN apt-get update
RUN apt-get -y install python3
RUN apt-get -y install python3-pip
RUN apt-get -y install python3-dev
RUN ln -s /usr/bin/python3 /usr/bin/python
RUN ln -s /usr/bin/pip3 /usr/bin/pip
RUN set -ex;
RUN apt-get update;
RUN python -m pip install --upgrade pip;
RUN pip install apache-flink
ARG FLINK_VERSION=1.12.0
RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-json/${FLINK_VERSION}/flink-json-${FLINK_VERSION}.jar
RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/${FLINK_VERSION}/flink-csv-${FLINK_VERSION}.jar
RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/${FLINK_VERSION}/flink-sql-connector-kafka_2.12-${FLINK_VERSION}.jar
RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-avro/${FLINK_VERSION}/flink-sql-avro-${FLINK_VERSION}.jar
RUN mkdir -p /opt/data; \
mkdir -p /opt/data/stream
WORKDIR /opt/flink
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
b_s_env = StreamExecutionEnvironment.get_execution_environment()
b_s_env.set_parallelism(1)
b_s_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
b_s_t_env = StreamTableEnvironment.create(b_s_env, environment_settings=b_s_settings)
src_kafka_ddl="""
CREATE TABLE SourceTable (
`age` INT,
`customer_id` INT,
`name` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'dj_customers',
'properties.bootstrap.servers' = 'kafka1:9092',
'properties.zookeeper.connect'='zoo1:2181',
'scan.startup.mode' = 'earliest-offset',
'format' = 'avro'
)
"""
sink_kafka_ddl="""
CREATE TABLE SinkTable (
`customer_id` INT,
`name` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'sink_test',
'properties.bootstrap.servers' = 'kafka1:9092',
'properties.zookeeper.connect'='zoo1:2181',
'scan.startup.mode' = 'earliest-offset',
'format' = 'avro'
)
"""
b_s_t_env.execute_sql(src_kafka_ddl)
b_s_t_env.execute_sql(sink_kafka_ddl)
orders = b_s_t_env.from_path("SourceTable")
orders.select(orders.customer_id, orders.name).execute_insert("SinkTable").wait()
Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition _customers-0 could be determined.
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:123)
... 18 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
at jdk.internal.reflect.GeneratedMethodAccessor41.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition _customers-0 could be determined
您需要将Flink服务放在与代理相同的Docker网络上
最简单的方法是只对所有内容使用一个compose文件
CalledProcessError:Command'['java','-Dfile.encoding=UTF8','-jar',C:\Users\vijv2c13136\AppData\Local\Continuum\anaconda2\lib\site packages\tabla\tabla-1.0.2-jar-with-dependencies.jar','-pages',all','-g
来自Kafka常见问题页面 在Kafka生产者中,可以指定分区键来指示消息的目标分区。默认情况下,基于散列的分区器用于确定给定键的分区id 因此,具有特定密钥的所有消息将始终转到主题中的同一分区: 消费者如何知道生产者写入了哪个分区,从而可以直接从该分区进行消费 如果生产者多于分区,并且多个生产者正在写入同一分区,那么偏移量是如何排序的,以便消费者可以使用来自特定生产者的消息
问题内容: 我在这个项目中使用该类。在我的地图上,您可以看到许多标记。我有许多已知的位置,但是在我的代码中,例如,我仅显示两个位置。 我不明白如何使用方向API和JSON。如何显示从当前位置(更改)到已知位置(恒定)的路线,距离和行进时间? 问题答案: 看一下本教程: 使用Google Map Android API V2中的Google Directions在两个位置之间绘制行车路线 它显示了如
我们在生产方面面临以下问题: 是因为无效的配置,如批量大小、请求超时或其他原因吗?
我试图连接到我的本地机器上的Kafka(2.1),并在Flink(1.7.2)附带的scalashell中读取。 下面是我正在做的: 之后,最后一条语句我得到了以下错误: 我已经创建了一个名为“topic”的主题,我能够通过另一个客户端正确地生成和读取来自它的消息。我正在使用java版本1.8.0\u 201,并遵循https://ci.apache.org/projects/flink/flin
问题内容: 我正在使用Hibernate,试图模拟2个并发更新到数据库中的同一行。 编辑:我将em1.getTransaction()。commit移到em1.flush()之后;我没有收到任何StaleObjectException,两个事务已成功提交。 我在上遇到以下异常。为什么? 问题答案: 好吧,您正试图陷入僵局,并且成功了:-) Transaction1开始,与您的实体更新(和锁定)行。