当前位置: 首页 > 知识库问答 >
问题:

使用setStartFromTimestamp设置时,FlinkKafkaConsumer010不起作用

翟承志
2023-03-14

我正在使用flink streaming和flink-connector-kafka处理来自Kafka的数据。当我用setStartFromTimestamp(1586852770000L)配置FlinkKafkaConsumer010时,此时kafka主题A中所有数据的时间都在1586852770000L之前,然后我向主题A的分区-0和分区-4发送一些消息(主题A有6个分区,当前系统时间已经在1586852770000L之后)。但是我的flink程序不使用主题a中的任何数据,所以这是一个问题吗?

如果我停止我的flink程序并重新启动它,它可以使用来自主题A的分区-0和分区-4的数据,但如果我向其他4个分区发送数据,它仍然不会使用来自其他4个分区的任何数据,除非我再次重新启动我的flink程序。

Kafka的日志如下:

2020-04-15 11:48:46,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={TopicA-4=1586836800000}, minVersion=1) to broker server1:9092 (id: 185 rack: null)
2020-04-15 11:48:46,463 TRACE org.apache.kafka.clients.NetworkClient                        - Sending {replica_id=-1,topics=[{topic=TopicA,partitions=[{partition=0,timestamp=1586836800000}]}]} to node 184.
2020-04-15 11:48:46,466 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 185, for key 2, received {responses=[{topic=TopicA,partition_responses=[{partition=4,error_code=0,timestamp=1586852770000,offset=4}]}]}
2020-04-15 11:48:46,467 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Received ListOffsetResponse {responses=[{topic=TopicA,partition_responses=[{partition=4,error_code=0,timestamp=1586852770000,offset=4}]}]} from broker server1:9092 (id: 185 rack: null)
2020-04-15 11:48:46,467 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Handling ListOffsetResponse response for TopicA-4. Fetched offset 4, timestamp 1586852770000


2020-04-15 11:48:46,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={TopicA-0=1586836800000}, minVersion=1) to broker server2:9092 (id: 184 rack: null)
2020-04-15 11:48:46,463 TRACE org.apache.kafka.clients.NetworkClient                        - Sending {replica_id=-1,topics=[{topic=TopicA,partitions=[{partition=0,timestamp=1586836800000}]}]} to node 184.
2020-04-15 11:48:46,467 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 184, for key 2, received {responses=[{topic=TopicA,partition_responses=[{partition=0,error_code=0,timestamp=1586863210000,offset=47}]}]}
2020-04-15 11:48:46,467 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Received ListOffsetResponse {responses=[{topic=TopicA,partition_responses=[{partition=0,error_code=0,timestamp=1586863210000,offset=47}]}]} from broker server2:9092 (id: 184 rack: null)
2020-04-15 11:48:46,467 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Handling ListOffsetResponse response for TopicA-0. Fetched offset 47, timestamp 1586863210000


2020-04-15 11:48:46,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={TopicA-2=1586836800000}, minVersion=1) to broker server3:9092 (id: 183 rack: null)
2020-04-15 11:48:46,465 TRACE org.apache.kafka.clients.NetworkClient                        - Sending {replica_id=-1,topics=[{topic=TopicA,partitions=[{partition=2,timestamp=1586836800000}]}]} to node 183.
2020-04-15 11:48:46,468 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 183, for key 2, received {responses=[{topic=TopicA,partition_responses=[{partition=2,error_code=0,timestamp=-1,offset=-1}]}]}
2020-04-15 11:48:46,468 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Received ListOffsetResponse {responses=[{topic=TopicA,partition_responses=[{partition=2,error_code=
0,timestamp=-1,offset=-1}]}]} from broker server3:9092 (id: 183 rack: null)
2020-04-15 11:48:46,468 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Handling ListOffsetResponse response for TopicA-2. Fetched offset -1, timestamp -1

2020-04-15 11:48:46,481 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 will start reading the following 2 partitions from timestamp 1586836800000: [KafkaTopicPartition{topic='TopicA', partition=4}, KafkaTopicPartition{topic='TopicA', partition=0}]

从日志中,除了分区-0和分区-4,其他4个分区的偏移量是-1。为什么返回偏移量是-1而不是最后的偏移量?

// Handle v1 and later response
log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}",topicPartition, partitionData.offset, partitionData.timestamp);
if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) {
   OffsetData offsetData = new OffsetData(partitionData.offset, partitionData.timestamp);
   timestampOffsetMap.put(topicPartition, offsetData);
}

我的Flink版本是1.9.2,而Flink kafka connertor是

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
   <version>1.9.2</version>
</dependency>

flink kafka连接器的文档如下:

setStartFromTimestamp(long):从指定的时间戳开始。对于每个分区,时间戳大于或等于指定时间戳的记录将用作开始位置。如果分区的最新记录早于时间戳,则只需从最新记录读取该分区。

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.junit.Test

class TestFlinkKafka {

  @Test
  def testFlinkKafkaDemo: Unit ={
    //1. set up the streaming execution environment.
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic( TimeCharacteristic.ProcessingTime)
    // To use fault tolerant Kafka Consumers, checkpointing needs to be enabled at the execution environment
    env.enableCheckpointing(60000)
    //2. kafka source
    val topic = "message"
    val schema = new SimpleStringSchema()
    //server1:9092,server2:9092,server3:9092
    val props = getKafkaConsumerProperties("localhost:9092","flink-streaming-client", "latest")
    val  consumer = new FlinkKafkaConsumer010(topic, schema, props)
    //consume data from special timestamp's offset
    //2020/4/14 20:0:0
    //consumer.setStartFromTimestamp(1586865600000L)
    //2020/4/15 20:0:0
    consumer.setStartFromTimestamp(1586952000000L)
    consumer.setCommitOffsetsOnCheckpoints(true)

    //3. transform
    val stream = env.addSource(consumer)
      .map(x => x)

    //4. sink
    stream.print()

    //5. execute
    env.execute("testFlinkKafkaConsumer")

  }

  def getKafkaConsumerProperties(brokerList:String, groupId:String, offsetReset:String): Properties ={
    val props = new Properties()
    props.setProperty("bootstrap.servers", brokerList)
    props.setProperty("group.id", groupId)
    props.setProperty("auto.offset.reset", offsetReset)
    props.setProperty("flink.partition-discovery.interval-millis", "30000")
    props
  }

}

log4j.logger.org.apache.kafka=TRACE
kafka-topics --zookeeper localhost:2181/kafka --create --topic message --partitions 6 --replication-factor 1
kafka-console-producer --broker-list localhost:9092 --topic message

{"name":"tom"}
{"name":"michael"}

共有1个答案

常乐
2023-03-14

通过将Flink/Kafka连接器升级到较新的通用连接器--flinkkafkaconsumer--可以从flink-connector-kafka2.11获得,解决了这个问题。推荐此版本的连接器适用于1.1.0以前的所有版本的Kafka。对于Kafka 0.10.x或0.11.x,最好使用特定于版本的flink-connector-kafka-0.102.11flink-connector-kafka-0.112.11连接器。(在所有情况下,如果使用的是Scala2.12,请用2.12替换2.11。)

有关Flink的Kafka连接器的更多信息,请参见Flink文档。

 类似资料:
  • 问题内容: 我正在尝试使用Spring Boot和MySQL开发应用程序。正如文档所述,首先,我使用Intelij Idea使用Spring initializr创建了项目,配置了文件,并编写了文件和文件。运行项目后,我发现MySQL数据库中没有表或数据。我的配置有什么问题?请帮忙。 application.properties文件, pom.xml文件中的依赖项, schema-mysql.sq

  • 我有一个php版本为7.0的Linux apache2 Web服务器。22.看起来设置根本不起作用:( 这个ini文件位于以下位置:根据。当我查看文件夹时,我看到三个文件(设置告诉我正在使用的文件),和。 所以我查找了,它是,我将它改为,然后。我对所有的人都这样做了。ini文件保存了它们,重新启动apache2并刷新了phpinfo()页面,但什么都没有发生。我试图在所有窗口中更改其他设置。ini

  • 我尝试将ini文件解析为可以在ant脚本中使用的属性。我有以下几点: 我试图做的是解析所有的name=value对,并将它们放入属性中,如:section。名称=值; 不知何故,“echoMsg”目标中没有记住该部分。我想记住部门名称。 所以 应该成为: 这是我的ant脚本的输出: 如您所见,未设置最后一个“${prevSection}”。我希望它是“全球性的”。 我试着用它来代替财产,但没有区别

  • 我有一个来自3的Vue项目。十、 我在

  • 我不能让谷歌云功能运行超过60秒,即使超时设置为540秒!!有什么建议吗? 我将部署时的超时标志设置为--timeout=540,并且我知道该设置会通过,因为在GCP WEB UI中会显示540秒的超时设置。我还尝试通过GCP WEB UI手动编辑超时至540。但无论如何,我还是在大约62000毫秒后超过了最后期限。 我已经尝试了pub/sub和https方法作为func触发器,但是仍然在60度左

  • 我想就以下问题寻求您的帮助。 不幸的是,Swagger不起作用,当我使用“spring . resources . static-locations”指定ui时,我得到了404 page not found异常(http://localhost:8080/Swagger-UI . html)。 有人面临这个问题吗?你能帮助我处理这个问题吗? 提前感谢您的帮助