我有一个关于Apache Flink中两个以上流的联合的架构问题。 我们有三个甚至更多的流,它们是某种代码书,我们必须与它们一起丰富主流。代码书流是压缩的Kafka主题。代码本是不会经常更改的东西,例如货币。主流是一个快速的事件流。我们的目标是用代码书丰富主流。 在我看来,有三种可能的方法可以做到这一点: 对所有代码书进行联合,然后将其与主流连接并将丰富数据存储为托管的键控状态(因此当kafka的
我在这里设置了一个最小的示例,其中有N个Kakfa主题的N个流(在下面的示例中为100个)。 我想在每个流看到“EndofStream”消息时完成它。当所有流都完成时,我希望Flink程序能够顺利完成 当parallelism设置为1时,这是正确的,但通常不会发生。 从另一个问题来看,似乎并非Kafka消费群体的所有线索都结束了。 其他人建议抛出异常。但是,程序将在第一个异常时终止,并且不会等待所
我们使用Flink 1.2.1,我们通过联合一个流到另一个流来从2个kafka流中消费,并处理联合流。例如stream1.union(stream 2)但是,stream 2的体积是stream 1的100多倍,我们正在经历的是stream 2有巨大的消耗滞后(超过3天的数据),但stream 1的滞后很少。我们已经有9个分区,但1个作为Parallelism,会增加paralelism解决str
我使用flink动态分析json类型的数据,对keyby和给定的列求和,在我的mapFunction中,我将json转换为case类,但结果流没有在keyby函数中得到编译器,在线程“main”org.apache.flink.api.common.InvalidProgramException中得到错误。我的代码如下所示 我如何将json转换为case类或tuple?
我有一个从CSV文件读取的: 据我所知,是一个POJO: 我还有一个简单的课程: 但当我尝试使用它时,例如从测试中: 我得到以下错误: 我读过这些问题和答案,但运气不好: 任务不可序列化Flink 无法在scala中序列化任务 任务不可序列化:java。伊奥。仅在类而非对象上调用闭包外函数时NotSerializableException
我正在做一个Flink项目,想将源JSON字符串数据解析为Json Object。我正在使用jackson-module-scala进行JSON解析。但是,在Flink API中使用JSON解析器时遇到了一些问题(例如)。 这里有一些代码示例,我无法理解为什么它会这样。 在本例中,我正在执行jackson module scala的官方exmaple代码告诉我的操作: > 创建一个新的 注册 <块
我使用这个代码安装烧瓶 我喜欢这样 当我试着跑的时候 两者都给出了这个错误 尝试所有这些代码 这不能解决我的问题 pip——版本:PIP18.1 from/home/ghost/.local/lib/python2.7/site-packages/pip(python 2.7) pip3--version: pip 9.0.1 from /usr/lib/python3/dist-packages
我试图使用FlinkKafkaProducer010生成元素,但是当我打开消费者控制台窗口时,元素似乎出现了故障。 我使用kafka-topics.bat创建了主题 消费者是使用:kafka console consumer创建的。蝙蝠——zookeeper本地主机:2181——主题mytopic 我使用的Kafka制作人代码是: 当我查看Kafka日志文件时,我看到一个. log文件,其中的元素
一、 我正试图用Kafka信源和信宿测试Flink一次语义: 运行flink应用程序,只需将消息从一个主题传输到另一个主题,并行度=1,检查点间隔20秒 每2秒使用Python脚本生成具有递增整数的消息。 使用read_committed隔离级别的控制台使用者读取输出主题。 手动杀死TaskManager 我希望在输出主题中看到单调递增的整数,而不考虑TaskManager的终止和恢复。 但实际上
我有两个流,流A和流B。两个流都包含具有ID和时间戳的相同类型的事件。现在,我希望闪烁作业所要做的就是在1分钟的窗口内加入具有相同ID的事件。水印是在事件上分配的。 在我的测试中,我尝试发送以下内容: 对于并行度=1,我可以看到从时间0开始的事件连接在一起。 但是,对于parallelism=2,打印不会显示任何正在加入的内容。为了解决这个问题,我尝试在每个流的keyBy之后打印事件,我可以看到它
我们正在构建一个流处理管道,以使用Flink v1.11和事件时间特性来处理Kinesis消息。在定义源水印策略时,在官方留档中,我遇到了两个开箱即用的水印策略;forBoundedOutOfOrthy和forMonotonousTimestamps。但根据我对上述内容的理解,我认为这些不适合我的用法。以下是我的用法细节: 来自输入流的数据:(包含每分钟带有时间戳的数据) 现在,我想处理11:00
我是Flink的新手,所以在定义Flink中的水印时,我面临一些问题。 让我们从Kafka消费者开始。使用的反序列化是JSONKeyValueDeserializationSchema,因此没有自定义解析。 如果将接收器应用于此代码,则其工作正常。问题是需要水印来避免无序事件。这就是我写的策略: 在做了一些研究后,我最终得到了这段代码,但这不起作用。这些是我的问题: 在这里使用ObjectNode
在Flink中,我发现了2种设置水印的方法, 第一个是 第二个是 我想知道哪个最终会生效。
我正在使用翻滚窗口(5分钟)和,因为我的源代码来自Kafka。但是窗口总是运行超过5分钟。有人能建议吗?
我使用的是和连接器jar版本为0.10.2,kafka版本为0.9.1,flink版本为1.0.0。 当我在IDE中作为独立的主程序运行Java消费者时,它工作得很好。但是当我从运行它时,我不会看到正在使用的消息,也不会看到中JobManager的stdout中的任何日志。请告诉我可能有什么问题。