如果是,请把我放在轨道上实现。
正如David还提到的,您可以使用KafkaProducer API在简单的Java中创建一个虚拟生产者,以便按照您的意愿安排和发送消息给Kafka。同样,如果你想要多个同时的生产者,你也可以用Flink来实现。使用Flink,您需要为producer和Consumer编写一个单独的作业。Kafka基本上支持异步处理体系结构,因此它没有队列机制。所以最好把生产者和消费者的工作分开。
但请多想想这个测试的用意:
您是否试图测试Kafka流式传输的持久性、复制、偏移管理功能
或者您正在尝试测试主题分区和Flink流并行性。
在这种情况下,单个或多个生产者但消息的键应该是非空的,您可以测试Flink执行器是如何与单个分区连接的,并观察它们的行为。
您可能需要测试更多的想法,其中每一个都需要在producer中执行或不执行特定的操作。
现在还不清楚你是否能像在《水槽》中那样在Kafka中做一个扇出(复制)。 我想让Kafka将数据保存到HDFS或S3,并将该数据的副本发送到Storm进行实时处理。Storm集合/分析的输出将存储在Cassandra中。我看到一些实现将所有数据从Kafka流到Storm,然后从Storm输出两个。但是,我想消除Storm对原始数据存储的依赖。 这可能吗?您知道任何类似的文档/示例/实现吗? 还有,
我有一个要求加入3个Kafka主题。前两个主题A和B将使用inner join添加,因为消息键相同,并且生成一个POJO与B相同的新Kafka流。现在,使用这个累积的流,我需要加入另一个主题C,并且我需要根据C中存在的字段对输出进行分组。 到目前为止,我有以下方法: 前两个主题(A和B)的KStream-KStream inner join是否可以不发布任何主题的累积输出,并且仍然可以在下面使用它
早些时候,我问了Flink一个简单的hello world示例。这给了我一些很好的例子! 然而,我想问一个更“流”的例子,我们每秒生成一个输入值。这在理想情况下是随机的,但即使每次都是相同的值也可以。 目标是获得一个无需/最少外部接触就能“移动”的流。 因此,我的问题是: 我发现如何显示这与外部生成数据和写入Kafka,或听一个公共源,但是我试图解决它与最小的依赖性(像在Nifi与Generate
我有多个冗余的应用程序实例,希望消费一个主题的所有事件,并存储它们独立的磁盘查找(通过一个rocksdb)。 为了便于讨论,让我们假设这些冗余消费者正在服务无状态http请求;因此,不使用kafka共享负载,而是使用kafka将数据从生产者复制到每个实例LocalStore中。 在查看生成的主题时,每个消费应用程序创建了3个额外的主题: null null 下面是创建存储区的代码
我正在遵循入门指南[1],但是我已经从配置设置中删除了MySQL和analytics的内容,因为我不打算使用任何分析函数。但是,scdf服务后来崩溃了,因为没有配置数据源。 好的,所以似乎仍然需要在scdf-config-kafka.yml[2]中配置数据源(尽管从阅读文档来看,我认为它只用于分析内容)。 但为了什么?数据源用于持久化Kafka消息,还是在节点之间建立云流消息? 我找不到任何关于大
假设有Kafka主题顺序。数据以JSON格式存储: 定义订单的状态(待定-1,已完成-2)。 完成后如何在“已完成”上进行更改? 正如我所知,Kafka主题是不可变的,我不能更改消息JSON,只需创建一个带有更改值的新消息,对吗?