我有2个使用kafka主题创建的流,我正在使用DataStream API加入它们。我希望将连接(应用)的结果发布到另一个kafka主题。我在外部主题中看不到连接的结果。
我确认我向两个源主题发布了正确的数据。不确定哪里出了问题。下面是代码片段,
创建的流如下所示。
DataStream<String> ms1=env.addSource(new FlinkKafkaConsumer("top1",new SimpleStringSchema(),prop))
.assignTimestampsAndWatermarks(new WatermarkStrategy() {
@Override
public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new AscendingTimestampsWatermarks<>();
}
@Override
public TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return (event, timestamp) -> System.currentTimeMillis();
}
});
DataStream<String> ms2=env.addSource(new FlinkKafkaConsumer("top2",new SimpleStringSchema(),prop))
.assignTimestampsAndWatermarks(new WatermarkStrategy() {
@Override
public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new AscendingTimestampsWatermarks<>();
}
@Override
public TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return (event, timestamp) -> System.currentTimeMillis();
}
});
流连接使用等于的连接执行,如下所示。
DataStream joinedStreams = ms1.join(ms2)
.where(o -> {String[] tokens = ((String) o).split("::"); return tokens[0];})
.equalTo(o -> {String[] tokens = ((String) o).split("::"); return tokens[0];})
.window(EventTimeSessionWindows.withGap(Time.seconds(60)))
.apply(new JoinFunction<String, String, CountryData>() {
@Override
public CountryData join(String o, String o2) throws Exception {
String[] tokens1 = o.split("::");
String[] tokens2 = o2.split("::");
CountryData countryData = new CountryData(tokens1[0], tokens1[1], tokens1[2], Long.parseLong(tokens1[3])+Long.parseLong(tokens2[3]));
return countryData;
}});
如下所述,
joinedStreams.addSink(new FlinkKafkaProducer<CountryData>("localhost:9095","flink-output", new CustomSchema()));
dataStreamSink.setParallelism(1);
dataStreamSink.name("KAFKA-TOPIC");
有什么线索吗,哪里出了问题?我可以在拓扑中看到可用的消息,谢谢
我认为这两个<code>FlinkkaFaConsumer</code>实例缺少时间提取器和水印配置。
由于代码使用的是事件-时间窗口连接,因此它需要与在Kafka中找到的数据相关联的某种时间信息,以便知道每个事件对应于哪个时间窗口。
如果没有这一点,两个流中的事件在事件时间上可能永远不会足够接近EventTimeSessionWindows.withGap(time.seconds(60))
定义的60秒窗口。
您还需要设置水印参数,以告诉 Flink 何时停止等待新数据并具体化输出 s.t.,您可以看到连接结果。
查看Kafka连接器时间和水印配置,了解您拥有的各种时间提取和水印可能性。
最后,确保将足够长时间内的测试数据发送到您的应用程序中。对于事件时间处理,只有“足够老”的数据才能输出,年轻的数据总是“在传输中被卡住”。例如,对于60秒的时间窗口和30秒的水位线,您至少需要90秒的数据才能在输出中看到任何内容。
我正在尝试为ApacheFlink导入ScalaAPI流扩展,如中所述https://ci.apache.org/projects/flink/flink-docs-master/apis/scala_api_extensions.html 但是,我的ScalaIDE抱怨以下消息:对象扩展不是包的成员org.apache.flink.streaming.api.scala 我使用的是scala 2
交互式应用程序通常要分别用类 istream 和 ostream 输入和输出数据。当提示信息出现在屏幕上时,用户输入一个数据来响应。显然,提示信息必须在执行输入操作前出现。在有输出缓冲区的情况下,只有在缓冲区已满时、在程序中明确地刷新输出缓冲区时或因程序结束而自动刷新输出缓冲区时,输出信息才会显示到屏幕上。为保证输出要在下一个输入前显示,C++ 提供了成员函数tie,该函数可以实现输入/输出操作的
我试图通过在Local上连接两个数据流来运行Flink上的基本连接。源流的数据类型是相同的(Tuple4(String,String,Long,Long))。在多次运行下面提到的函数后,我随机收到了两个不同的输出(存储在下面的变量CollectTuple2Sink中,下面提到了相同的调试日志)。我尝试保持并行度1和最大并行度1,但问题仍然存在。 源函数和其他定义都来自本教程。还从Flink官方文件
Kafka流中是否内置了允许将单个输入流动态连接到多个输出流的功能?允许基于true/false谓词进行分支,但这不是我想要的。我希望每个传入日志都确定它将在运行时流到的主题,例如,日志将流到主题和日志将流到主题。 我可以在流中调用,然后写给Kafka制作人,但这似乎不是很好。在Streams框架中是否有更好的方法来实现这一点?
我想在提取一些数据时使用外部工具(循环通过行)。为此,我首先使用了Runtime.getRuntime()。exec()执行它。但后来我的提取变得很慢。所以我在寻找一种可能性,在循环的每个实例中,使用shell的同一个实例来执行外部工具。 我发现,我应该使用ProcessBuilder。但是现在还不行。 这是我测试执行的代码(已经从论坛中的答案输入): 我想在另一个类中调用它,例如 Testcla
我有以下代码来计算socketTextStream中的单词。累积字数和时间窗字数都是必需的。该程序存在累积计数始终与窗口计数相同的问题。为什么会出现这个问题?根据加窗计数计算累积计数的正确方法是什么?