我试图从Kafka主题中读取数据,在Flink流媒体。我试图运行以下示例代码,在APACHE Flink 1.1.3文档页面上作为示例:Apache kafka连接器,
import java.util.Properties;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
public class stock_streaming_kafka {
public static void main(String[] args) throws Exception
{
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<>("nsestocks4k", new SimpleStringSchema(), properties);
DataStream<String> stream = env
.addSource(myConsumer)
.print();
}
}
我有以下错误:
Exception in thread "main" java.lang.Error: Unresolved compilation problems:
The type org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase cannot be resolved. It is indirectly referenced from required .class files
The method addSource(SourceFunction<OUT>) in the type StreamExecutionEnvironment is not applicable for the arguments (FlinkKafkaConsumer09<String>)
at stock_streaming_kafka.main(stock_streaming_kafka.java:25)
你能指导我修理这个吗?Kafka连接器是否存在依赖性问题。我的版本是:
因为答案还没有被接受,这里有一个完整的Maven代码示例,可以使用Flink从Kafka读取数据。
您可能需要调整pom。xml以匹配Kafka和Scala版本的设置。
希望这有帮助。
Flink和Flink连接器的版本必须匹配。将flink连接器的依赖项更新为1.1.3。
请使用以下版本。它将与您的Kafka版本一起使用。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.1.4</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.10</artifactId>
<version>1.1.3</version>
</dependency>
我在代码中看到编译问题。
更改此项:
DataStream<String> stream = env
.addSource(myConsumer)
.print();
致:
DataStream<String> stream = env
.addSource(myConsumer);
stream.print();
如果它仍然不为你工作,那么请让我知道,我会分享工作代码。
哪个图表:图表/比特纳米/Kafka/https://github.com/bitnami/charts/tree/master/bitnami/kafka 描述我正在遵循的教程bug使用Apache Kafka和MongoDB在库伯内特斯上构建可扩展的容错消息群集 为了解决外部署问题,我遵循了外部署不呈现留档#5649中的示例。问题已解决,我的工作配置如下: Dockerfile文件 然后我跑了
我有一份flink的工作,从Kafka读取数据,执行某些聚合,并将结果写入elasticsearch索引。我看到震源上有很高的背压。高背压导致数据从Kafka缓慢读取,我看到数据在网络堆栈中排队(netstat RecvQ显示源Kafka连接中有上万字节的数据,数据最终被读取),这反过来会导致数据在延迟后沉入elasticsearch,并且延迟持续增加。 源每分钟产生约17500条记录,Flink
我认为我对Flink窗口的理解可能是错误的,因为它们没有像我期望的那样从文档或Flink书中进行评估。目标是将具有相当静态数据的Kafka主题与具有不断传入数据的Kafka主题连接起来。 返回FlinkKafkaConsumer 是我的键选择器的占位符。 我的关键问题: 这里到底发生了什么?是否在窗口完成处理后发出记录?我希望有一个实时输出到水槽,但这将解释很多。 与此相关的是:我可以用onEle
我使用水槽代理通过水槽代理收集外部数据。外部数据批次几乎是每 10 秒 1MB。我按如下方式配置了水槽代理。 我按以下方式激活了代理。 可惜后来发现netcat source运行良好,channel或者sink出了问题。从Ubuntu的资源监视器,我可以看到以下性能。网络性能。蓝色曲线表示输入,而红色曲线表示在没有其他应用程序运行网络io的情况下的输出,我确信这个图展示了我的Flume代理发生了什
Kafka可以配置为使用几种身份验证机制:明文用户名/密码、Kerberos或SSL。前2个使用SASL,其中需要JAAS配置文件。 对于纯文本auth方法,配置如下(取自文档): 如果可能的话,我想使用LDAP进行身份验证。我的问题是:如果我用一个实现LoginMoules的类替换,并将该类放在代理的类路径中,我是否可以以我希望的任何方式实现身份验证(即:LDAP)? 我不能以合理的方式使用Ke
我使用的是kafka connect支持的以下mongo源代码。我发现mongo源代码的一个配置(从这里)是tasks.max。 这意味着我可以提供连接器tasks.max这是 如果它将创建多个连接器来侦听mongoDb更改流,那么我将最终得到重复的消息。那么,mongo真的具有并行性并作为集群工作吗?如果它有超过1个tasks.max?