当前位置: 首页 > 知识库问答 >
问题:

Flink Kafka连接器至eventhub

夏令秋
2023-03-14

我正在使用Apache Flink,并尝试通过使用Apache Kafka协议从它接收消息来连接到Azure eventhub。我设法连接到Azure eventhub并接收消息,但我不能使用这里(https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration)所述的flink功能“setStartFromTimestamp(...)”。当我尝试从时间戳获取一些消息时,Kafka说代理端的消息格式是0.10.0之前的。有人面临这个吗?Apache Kafka客户端版本是2.0.1 Apache Flink版本是1.7.2

更新:尝试使用Azure事件中心快速启动示例(https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/quickstart/java)在消费者软件包中,添加了代码以获得带有时间戳的偏移量,如果消息版本低于0.10.0 kafka版本,它将按预期返回null。

        List<PartitionInfo> partitionInfos = consumer.partitionsFor(TOPIC);
        List<TopicPartition> topicPartitions = partitionInfos.stream().map(pi -> new TopicPartition(pi.topic(), pi.partition())).collect(Collectors.toList());
        Map<TopicPartition, Long> topicPartitionToTimestampMap = topicPartitions.stream().collect(Collectors.toMap(tp -> tp, tp -> 0L));
        Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp = consumer.offsetsForTimes(topicPartitionToTimestampMap);
        System.out.println(offsetAndTimestamp);

共有1个答案

寿亦
2023-03-14

很抱歉我们错过了这个。EH现在支持Kafka offsetsForTimes()(以前不支持)。

以后可以针对我们的Github提出问题。https://github.com/Azure/azure-event-hubs-for-kafka

 类似资料:
  • 我已经使用kafka source connector将文档从Couchbase传输到kafka。这些文档然后被复制到Mongo DB。 沙发底座 -- 如果源连接器关闭,那么如何再次将所有文档同步到 Kafka? 有没有什么get和touch功能可以将kafka主题在关闭期间所做的所有更改都显示出来?

  • 如何找到对等节点(Peers) Geth不断尝试连接到网络上的其他节点,直到它找到peers。如果您在路由器上启用了UPnP,或者在面向Internet的服务器上运行其他方式,则它也将接受其他节点的连接。 Geth通过称为发现协议的东西找到peers。在发现协议中,节点相互通信以了解网络上的其他节点。为了一开始就可以执行,geth使用一组引导节点,其被记录在源代码中。 想要在启动时更改bootno

  • 当您将 M600 连上 Wi-Fi 网络后,您就可以使用 M600 上的 Google 应用程式商店来更新 Polar 应用程式并下载更多应用程式到您的 Wear OS by Google 智能手表上。 仅适用于与 Android 手机配对的 Polar M600 如果您的 M600 与手机蓝牙的连接断开,则它将自动连接到保存的 Wi-Fi 网络上。当 Android 手机已连接至 Wi-Fi 或

  • PS Vita的系统软件可通过升级,追加各式各样的功能或强化安全性。使用时请随时升级为最新版本。 若要进行系统升级,需事先将连接的电脑设为以下状态。 连接互联网 安装/下载PlayStation®内容管理助手 可在以下网站进行下载。 http://cma.dl.playstation.net/cma/ 1. 在电脑确认内容管理助手是否已启动。 可从电脑的工作列确认。 2. 使用USB连接线连接PS

  • PS Vita的系统软件可通过升级,追加各式各样的功能或强化安全性。使用时请随时升级为最新版本。 若要进行升级,需事先将连接的PS3™设为以下状态。 连接互联网 系统软件升级为版本4.00以上 关闭所有使用中的应用程序,显示(用户) 1. 使用USB连接线连接PS Vita和PS3™。 2. 操作PS Vita,轻触[系统升级]>[连接至PS3™进行升级]。 可通过PS3™的网络功能,经由互联网下

  • 在办公室或公共住宅等地方使用PlayStation®Network时,可能会因防火墙等加密功能,而出现连接遭阻挡的情形。此时请参考以下信息。 进入PlayStation®Network后,会与网络上的PlayStation®Network服务器连接并与其它PS Vita直接连接。此时使用的PlayStation®Network服务器Port码如下。 TCP : 80, 443, 465, 993,