当前位置: 首页 > 知识库问答 >
问题:

在pyflink中访问kafka时间戳

陈富
2023-03-14

我正在尝试编写一个用于测量延迟和吞吐量的Pyflink应用程序。我的数据作为来自kafka主题的json对象,并使用SimpleStringSchema类加载到Datastream中进行反序列化。下面是对这篇文章的回答(如何在Kafka和Flink环境中测试性能?)我让Kafka的制作人在事件中加上时间戳,但我很难理解我现在如何才能访问这些时间戳。我知道上面提到的文章为这个问题提供了一个解决方案,但是我很难将这个例子转移到python,因为文档/例子很少。

class MyProcessFunction():

    def process_element(self, value, ctx):
        result = value.get_time_stamp()
        yield result

在此处执行value.get_time_stamp()的正确方法是什么?或者有没有一个更简单的方法来解决我不知道的问题?

谢谢!

共有1个答案

孟昊空
2023-03-14

设置由Kafka主题支持的表时,可以为Kafka时间戳声明一个虚拟列,如本例中的event_time列:

CREATE TABLE KafkaTable (
  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
  `partition` BIGINT METADATA VIRTUAL,
  `offset` BIGINT METADATA VIRTUAL,
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
);

有关使用Kafka头中的元数据的更多信息,请参阅Flink的Kafka表连接器的文档。

 类似资料:
  • 我有两个集群, 一个集群安装了我的应用程序微服务,另一个集群安装了Strimzi kafka。两者都是私有的GKE集群。 我的挑战是如何从我的应用程序连接到这个 kafka。大约有 10 个微服务正在运行,每个微服务都必须连接到 kafka。 我现在有一种方法,将Strimzi kafka作为Nodeport服务,并在应用程序代码中提供Ip和nodeIp。 这种方法的问题在于,如果 GKE 节点自

  • HttpSession 接口的 getLastAccessedTime 方法允许 servlet 确定在当前请求之前的会话的最后访问时间。当会话中的请求是 servlet 容器第一个处理时该会话被认为是访问了。

  • 我们是否可以在系统中使用java7访问安装在其他机器上的Kafka?我必须通过编写消费者代码来获取主题,但我的系统中有java7。我知道安装我们需要最少的Java8。

  • 我试图理解Java8中引入的新日期和时间API。 我在日志文件中有一个unix时间戳,我需要对它进行处理,以确定它属于今天或昨天的哪个小时。 我在Android Studio中遇到了一个不寻常的错误,我想更好地理解它。

  • 我正在使用部署在Kafka Connect中的Debezium MySQL连接器,将MySQL更改流式传输到Kafka主题,并从中获取这些消息,从而丰富数据并将数据推送到另一个MySQL。 源和接收器都是MySQL。 我的源表中有几个列,列数据类型为TIMESTAMP。 创建时间:2021-10-06 09:32:46 我可以在Kafka的信息中看到上述数据,如下所示 “创建时间”:“2021-1

  • 我有几个问题。1)我们能不能在k8集群之外以上述方式访问k8集群之外的Kafka?如果是这样,我在某种程度上做错了吗?2)如果端口是打开的,那不是意味着代理是可用的吗? 如有任何帮助,不胜感激。谢谢