我正在使用Flink从Apache Pulsar读取数据。我在pulsar中有一个分区主题,有8个分区。在本主题中,我生成了1000条消息,分布在8个分区中。我的笔记本电脑中有8个内核,因此我有8个子任务(默认情况下,并行度=#个内核)。在执行Eclipse中的代码后,我打开了Flink UI,发现一些子任务没有收到任何记录(空闲)。我希望所有8个子任务都能得到利用(我希望每个子任务都映射到我的主
我写了一份使用番石榴缓存的Flink作业。缓存对象是在main()函数中调用的run()函数中创建和使用的。 它类似于: 如果我以某种程度的并行性运行这个Flink作业,所有并行任务是否都将使用相同的缓存对象?如果没有,如何使它们都使用单个缓存? 缓存用于流的进程()函数内部。所以这就像 您可以将我的用例视为基于缓存的重复数据消除,因此我希望所有并行任务都指向单个缓存对象
Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka
我使用Elasticsearch Connector作为Sink将数据插入到Elasticsearch中(参见:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/elasticsearch.html)。 但是,我并没有找到任何从Elasticsearch获取数据作为源的连接器。 在Flink pip
使用ApacheFlink版本1.3.2和Cassandra3.11,我编写了一个简单的代码,使用ApacheFlink-Cassandra连接器将数据写入Cassandra。代码如下: 尝试使用ApacheFlink 1.4.2(1.4.x)运行相同的代码时,出现错误: 在线 我认为ApacheFlink1.4.2中存在一些依赖性更改,这导致了问题。 我在代码中使用以下导入的依赖项: 如何解决A
我试图从Kafka主题中读取数据,在Flink流媒体。我试图运行以下示例代码,在APACHE Flink 1.1.3文档页面上作为示例:Apache kafka连接器, } 我有以下错误: 你能指导我修理这个吗?Kafka连接器是否存在依赖性问题。我的版本是: Flink 1.1.3
我在同一份flink jobs中读了两个Kafka主题。 :来自第一个主题的消息被保存到rocksdb,然后它将与Stream2联合。 :来自第二个主题的消息被Stream1保存的状态所丰富,然后它将与Stream1联合。 主题1和主题2是不同的来源,但两个来源的输出基本相同。我必须用topic1的数据来充实topic2的数据。 这里是流动; 这里是问题; 那个流量好吗? 可以访问由保存的相同的状
我是Flink的新手,我试图理解Flink是如何在其的并行抽象中命令调用。考虑这个产生部分和的流的例子: 我希望它的输出是流:。事实上,就在这里。 是否可以安全地假设这种情况始终存在,尤其是在从具有大量并行性的源读取数据时?
我们有一个Flink任务,它将两个流连接起来,两个流都使用来自Kafka的事件。下面是示例代码 但是,我们没有看到任何连接输出。我们检查了每个流是否连续发射带有时间戳和适当水印的元素。有人知道可能的原因吗?
我有一个Flink作业,我正在使用这里描述的方法进行集成测试:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#integration-testing 作业从两个源获取输入,这两个源组合在一个中。在测试环境中,我目前使用两个简单的SourceFunction来发出值,但是这不提供对事件发出顺
我想得到输入流作为JSON数组从一个网址。如何设置源代码,以便在apache flink中使用datastream连续获得输入。简而言之,我想从一个url连续获得json数据,而不会关闭flink作业。
我正在尝试为Flink流媒体作业创建JUnit测试,该作业将数据写入kafka主题,并分别使用和从同一kafka主题读取数据。我正在通过生产中的测试数据: 以及检查来自消费者的数据是否与以下数据相同: 使用。 通过打印流,我能够看到来自消费者的数据。但无法获得Junit测试结果,因为即使消息完成,使用者仍将继续运行。所以它并没有来测试这个部件。 在或中是否有任何方法停止进程或运行特定时间?
我最近遇到了一些关于开发flink作业的问题,它引入了Spring和hibernate,并且作业将在flink集群上运行。所以我需要在运行任务管理器而不是作业管理器上的flink操作符之前初始化Spring资源。但是我找不到任何合适的StreamExecttion环境方法来做到这一点。 我尝试了以下一些方法: 但是,当并行性不止一个的flink作业执行时,spring初始化不会出现在每个任务管理器
我在本地环境中向Flink(v1.0.3)提交新作业时遇到此错误。 造成原因:java.lang.NoClassDefFoundError: org/apache/flink/stream/runtime/操作员/Checkpointorg.apache.flink.streaming.connectors.cassandra.CassandraSink.addSink(CassandraSink
我想知道是否有一种方法(或某种代码示例)在Flink流媒体应用程序中加载编码的预训练模型(用python编写)。所以我可以使用从文件系统加载的权重和来自流的数据来拟合模型。 先谢谢你