当前位置: 首页 > 知识库问答 >
问题:

Apache Flink Kafka集成

长孙朝明
2023-03-14

我正在尝试将Apache Kafka 2.11-0.10.0.0与Apache Flink 1.1.2集成。我正在使用scalashell来测试它,我得到了以下错误。

类别组织。阿帕奇。Flink。流式处理。api。检查点。未找到检查点通知程序

我已经添加了组织。阿帕奇。Flink。将jar流式传输到类路径,但这没有帮助。我一直导入到org。阿帕奇。Flink。流式处理。api。检查点。\u。这仍然没有帮助。下面是我在shell中运行的代码

 import org.apache.flink.streaming.connectors.kafka._
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema
 import org.apache.flink._
 import java.util._
 val properties = new Properties()
 properties.setProperty("bootstrap.servers", "localhost:9092")
 properties.setProperty("zookeeper.connect", "localhost:2181")
 properties.setProperty("group.id", "test")
 val myFetcher = FlinkKafkaConsumer.FetcherType.NEW_HIGH_LEVEL
 val myHandler = FlinkKafkaConsumer.OffsetStore.FLINK_ZOOKEEPER
 senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties, myHandler, myFetcher)).print

我也尝试过使用FlinkKafkaConsumer081和FlinkKafkaConsumer082方法,而不是FlinkKafkaConsumer方法,但仍然出现相同的错误。

我注意到flink连接器kafka jar自2月16日以来一直没有更新过。这个罐子用错了吗?我确实在maven central Flink找到了Kafka基地2\U 11连接器。我应该改用那个罐子吗?

请帮忙!

共有1个答案

孔砚
2023-03-14

版本冲突。我相信,这个类在Flink 1.0中被删除了。所以您可能有一个旧版本的jar。检查以确保所有内容都已更新到Flink 1.1.2

 类似资料:
  • 我是新点燃的。 步骤1:我在两个VM(ubuntu)中安装了Ignite 2.6.0,在一个VM中启动了节点。下面有COMAND。bin/ignite.sh examples/config/example-ignite.xml 步骤2:我的所有配置都在example-default.xml中 步骤3:在其他VM中执行包含datagrid逻辑的client.jar(该VM既是客户机也是节点)。 步骤

  • 我创建了一个新示例,并将代码分为客户端和服务器端。 完整的代码可以在这里找到。 服务器端有3个版本。 服务器无Spring Boot应用程序,使用Spring Integration RSocket InboundGateway 服务器引导重用Spring RSocket autconfiguration,并通过serverrsocketmessagehandler创建ServerRSocketC

  • 可运行和可调用 如果你在Runnable或Callable中包含你的逻辑,就可以将这些类包装在他们的Sleuth代表中。 Runnable的示例: Runnable runnable = new Runnable() { @Override public void run() { // do some work } @Override public String toString()

  • Jinja2 提供了一些代码来继承到其它工具,诸如框架、 Babel 库或你偏好的编辑器 的奇特的代码高亮。这里是包含的这些的简要介绍。 帮助继承的文件在 这里 可 用。 Babel 集成 Jinja 提供了用 Babel 抽取器从模板中抽取 gettext 消息的支持,抽取器的接入点 名为 jinja2.ext.babel_extract 。 Babel 支持的被作为 i18n 扩展 的 一部分

  • Jinja2 提供了一些代码来继承到其它工具,诸如框架、 Babel 库或你偏好的编辑器 的奇特的代码高亮。这里是包含的这些的简要介绍。 帮助继承的文件在 这里 可 用。 Babel 集成 Jinja 提供了用 Babel 抽取器从模板中抽取 gettext 消息的支持,抽取器的接入点 名为 jinja2.ext.babel_extract 。 Babel 支持的被作为 i18n 扩展 的 一部分

  • 我有一个redis集群,有主服务器、从服务器和3个哨兵服务器。主从映射到dns名称node1-redis-dev.com、node2-redis-dev.com。redis服务器版本为2.8 我在application.properties文件中包含以下内容。 但是,当我检查StringRedisTemplate时,在JedisConnectionFactory的hostName属性下,我看到的是

  • sdiff key1 key2...keyN 返回所有给定key的差集 sdiffstore dstkey key1...keyN 同sdiff,并同时保存差集到dstkey下

  • sunion key1 key2...keyN 返回所有给定key的并集 sunionstore dstkey key1...keyN 同sunion,并同时保存并集到dstkey下