我正在使用Apache Flink和Kafka消费者阅读Kafka主题中的一些值。我还有一个通过读取文件获得的流。
根据收到的值,我想就不同的Kafka主题编写这个流。
基本上,我有一个与许多孩子联系在一起的领导者的网络。对于每个孩子,领导者需要编写在孩子特定的Kafka主题中读取的流,以便孩子可以阅读它。当子级启动时,它会在从领导者读取的Kafka主题中注册自己。问题是我不知道我有多少个孩子。
例如,我从Kafka主题中阅读1
,我只想在一个名为Topic1
的Kafka主题中编写流。我读了1-2
,我想写两个Kafka主题(Topic1
和Topic2
)。
我不知道这是否可能,因为为了写这个主题,我使用了Kafka生产者和addSink
方法,据我理解(从我的尝试来看),Flink似乎需要知道汇的数量先验。
但是,难道没有办法获得这种行为吗?
现在不建议使用KeyedSeriazationSchema。相反,您必须使用"KafkaSeriazationSchema"
同样可以通过重写serialize方法来实现。
public ProducerRecord<byte[], byte[]> serialize(
String inputString, @Nullable Long aLong){
return new ProducerRecord<>(customTopicName,
key.getBytes(StandardCharsets.UTF_8), inputString.getBytes(StandardCharsets.UTF_8));
}
如果我能很好地理解你的问题,我认为你可以用一个接收器来解决它,因为你可以根据正在处理的记录来选择Kafka主题。似乎源代码中的一个元素可能会写入多个主题,在这种情况下,您需要一个FlatMapFunction将每个源代码记录复制N次(每个输出主题一个)。我建议将(topic,record)作为一对(akaTuple2
)输出。
DataStream<Tuple2<String, MyValue>> stream = input.flatMap(new FlatMapFunction<>() {
public void flatMap(MyValue value, Collector<Tupple2<String, MyValue>> out) {
for (String topic : topics) {
out.collect(Tuple2.of(topic, value));
}
}
});
然后,您可以使用前面通过创建FlinkKafkaProducer计算出的主题,该主题带有一个KeyedSerializationSchema,在其中实现getTargetTopic
,以返回对中的第一个元素。
stream.addSink(new FlinkKafkaProducer10<>(
"default-topic",
new KeyedSerializationSchema<>() {
public String getTargetTopic(Tuple2<String, MyValue> element) {
return element.f0;
}
...
},
kafkaProperties)
);
我在试着让我的弹性搜索下沉并运行。然而,我得到了以下错误,并正在耗尽如何修复它的想法。任何帮助都很感激。以下是错误: 我运行的是CDH 5.3和elasticsearch 1.4.2
我的应用程序的MainActivity从用户那里获取一些数据来计算他/她的BMI和BMR,并将这些数据用于应用程序的主要部分,即另一个名为DailyActivity的活动。我希望这样,用户第一次启动应用程序时,它会要求提供数据。但在第一次之后,每次用户启动应用程序时,都应使用之前输入的数据直接启动DailyActivity。(按下后退按钮也不应显示MainActivity)。我如何实现这一点?
问题内容: 对于我来说,目前尚不清楚,在这种情况下,我想使用值接收器而不是始终使用指针接收器。 回顾一下文档: 该 文档 还说:“对于基本类型,切片和小型结构之类的类型,值接收器非常便宜,因此,除非该方法的语义要求使用指针,否则值接收器是高效且清晰的。” 首先, 它说“非常便宜”,但问题是它比指针接收器便宜。因此,我做了一个小的基准测试(基于要点的代码),向我展示了,即使对于只有一个字符串字段的结
技巧 有人问我,如何通过选项来指定动态连接器,而不使用缺省系统自带的动态连接器。我后来查了下ld的手册,有这么一个选项: -Ifile --dynamic-linker=file Set the name of the dynamic linker. This is only meaningful when generating dynamically linked ELF ex
我在我的崩溃,有没有人面临这样的问题? Thx,提前 异常java.lang.RuntimeException:无法启动接收方android.support.v4.media.session.mediabuttonreceiver:java.illegalstateException:找不到任何处理[REDACTED_DOMAIN_NAME]_button的服务或媒体浏览器服务实现 (ZygoTe
我目前正在使用SharedReferences跟踪通过AlarmManager启动的BroadcastReceiver中要执行工作的项列表。除了一个特定的场景外,一切都很好。当我触发一个新项目来执行工作时,让它完成工作,然后删除该项目(全部通过SharedReferences编辑),它在应用程序运行时工作得很好。当列表中没有任何内容,我打开任务管理器并终止应用程序时,该项突然出现在Broadcas