当前位置: 首页 > 知识库问答 >
问题:

以Kafka为来源的Flink

侯池暝
2023-03-14

我试图从Kafka主题中读取数据,在Flink流媒体。我试图运行以下示例代码,在APACHE Flink 1.1.3文档页面上作为示例:Apache kafka连接器,

import java.util.Properties;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

public class stock_streaming_kafka {

    public static void main(String[] args) throws Exception
    {
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<>("nsestocks4k", new SimpleStringSchema(), properties);

    DataStream<String> stream = env
        .addSource(myConsumer)
        .print();
}

}

我有以下错误:

Exception in thread "main" java.lang.Error: Unresolved compilation problems: 
The type org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase cannot be resolved. It is indirectly referenced from required .class files
The method addSource(SourceFunction<OUT>) in the type StreamExecutionEnvironment is not applicable for the arguments (FlinkKafkaConsumer09<String>)

at stock_streaming_kafka.main(stock_streaming_kafka.java:25)

你能指导我修理这个吗?Kafka连接器是否存在依赖性问题。我的版本是:

  1. Flink 1.1.3

共有3个答案

令狐泓
2023-03-14

因为答案还没有被接受,这里有一个完整的Maven代码示例,可以使用Flink从Kafka读取数据。

您可能需要调整pom。xml以匹配Kafka和Scala版本的设置。

希望这有帮助。

巴洲
2023-03-14

Flink和Flink连接器的版本必须匹配。将flink连接器的依赖项更新为1.1.3。

云景焕
2023-03-14

请使用以下版本。它将与您的Kafka版本一起使用。

   <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.1.4</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
            <version>1.1.3</version>
        </dependency>

我在代码中看到编译问题。

更改此项:

DataStream<String> stream = env
        .addSource(myConsumer)
        .print();

致:

DataStream<String> stream = env
        .addSource(myConsumer);
stream.print();

如果它仍然不为你工作,那么请让我知道,我会分享工作代码。

 类似资料:
  • 哪个图表:图表/比特纳米/Kafka/https://github.com/bitnami/charts/tree/master/bitnami/kafka 描述我正在遵循的教程bug使用Apache Kafka和MongoDB在库伯内特斯上构建可扩展的容错消息群集 为了解决外部署问题,我遵循了外部署不呈现留档#5649中的示例。问题已解决,我的工作配置如下: Dockerfile文件 然后我跑了

  • 我有一份flink的工作,从Kafka读取数据,执行某些聚合,并将结果写入elasticsearch索引。我看到震源上有很高的背压。高背压导致数据从Kafka缓慢读取,我看到数据在网络堆栈中排队(netstat RecvQ显示源Kafka连接中有上万字节的数据,数据最终被读取),这反过来会导致数据在延迟后沉入elasticsearch,并且延迟持续增加。 源每分钟产生约17500条记录,Flink

  • 我认为我对Flink窗口的理解可能是错误的,因为它们没有像我期望的那样从文档或Flink书中进行评估。目标是将具有相当静态数据的Kafka主题与具有不断传入数据的Kafka主题连接起来。 返回FlinkKafkaConsumer 是我的键选择器的占位符。 我的关键问题: 这里到底发生了什么?是否在窗口完成处理后发出记录?我希望有一个实时输出到水槽,但这将解释很多。 与此相关的是:我可以用onEle

  • 我使用水槽代理通过水槽代理收集外部数据。外部数据批次几乎是每 10 秒 1MB。我按如下方式配置了水槽代理。 我按以下方式激活了代理。 可惜后来发现netcat source运行良好,channel或者sink出了问题。从Ubuntu的资源监视器,我可以看到以下性能。网络性能。蓝色曲线表示输入,而红色曲线表示在没有其他应用程序运行网络io的情况下的输出,我确信这个图展示了我的Flume代理发生了什

  • Kafka可以配置为使用几种身份验证机制:明文用户名/密码、Kerberos或SSL。前2个使用SASL,其中需要JAAS配置文件。 对于纯文本auth方法,配置如下(取自文档): 如果可能的话,我想使用LDAP进行身份验证。我的问题是:如果我用一个实现LoginMoules的类替换,并将该类放在代理的类路径中,我是否可以以我希望的任何方式实现身份验证(即:LDAP)? 我不能以合理的方式使用Ke

  • 我使用的是kafka connect支持的以下mongo源代码。我发现mongo源代码的一个配置(从这里)是tasks.max。 这意味着我可以提供连接器tasks.max这是 如果它将创建多个连接器来侦听mongoDb更改流,那么我将最终得到重复的消息。那么,mongo真的具有并行性并作为集群工作吗?如果它有超过1个tasks.max?