在properties
文件中,我将specificaVroserde
作为默认值Serde,然后使用consumed.with(serdes.String(),serdes.String())
使用字符串值。
val topicOneStream = streamsBuilder.stream<String, AvroObject>(topicOne)
.peek { k, _ -> logger.info("Received message with key: $k") }
.flatMapValues { v -> listOf(v) }.groupByKey().reduce { v1, _ -> v1 }
val topicTwoStream = streamsBuilder
.stream<String, String>(topicTwo, Consumed.with(Serdes.String(), Serdes.String()))
.peek { k, _ -> logger.info("Received message with key: $k") }
.flatMapValues { v -> listOf(v) }.groupByKey().reduce { v1, _ -> v1 }
val topicThreeStream = streamsBuilder.stream<String, String>(topicThree, Consumed.with(Serdes.String(), Serdes.String()))
.peek { k, _ -> logger.info("Received message with key: $k") }
.mapValues { v -> objectMapper.readValue(v, AdviceCreated::class.java) }
.flatMapValues { v -> listOf(v) }.groupByKey().reduce { v1, _ -> v1 }
当我将以下流的配置作为值的默认值时,我看到Avro流(第一个)运行良好,并使用我在该主题上发布的内容。但是当我使用相同的配置发布到字符串值流时,会出现异常。
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
以下是发布topicTwo和TopicTrey的例外:
org.apache.kafka.streams.errors.StreamsException: A serializer (io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer) is not compatible to the actual value type (value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
我有一个应用程序需要收听多个不同的主题;每个主题都有关于如何处理消息的单独逻辑。我曾想过为每个KafkaStreams实例使用相同的kafka属性,但我得到了如下所示的错误。 错误 流处理应用程序的标识符。在Kafka集群中必须是唯一的。它用作1)默认的客户端ID前缀,2)用于成员资格管理的组ID,3)变更日志主题前缀。 问题 此错误意味着什么,以及导致此错误的原因。 假设您可以有应用程序的多个实
由于我不想多次使用流来单独收集每个属性,也不想使用来收集每个属性,是否有任何方法可以用单个流来获得上述属性。
我使用以下代码创建kafka流: 我给每个流不同的组ID。当我运行应用程序时,只接收到部分kafka消息,并且执行程序在foreachRDD调用中挂起。如果我只创建一个流,一切正常。日志信息没有任何例外。 我不知道为什么应用程序卡在那里。这是否意味着没有足够的资源?
我有以下功能用于统一多个集合(包括重复元素): 如果集合的交集具有类似签名的函数(使用类型相等),那就太好了。例如: 我找到了一个相交函数的实现,但它不使用流: 是否有任何方法可以利用流实现类似于unify函数的功能?我在java8/StreamAPI方面没有太多经验,因为一些建议会非常有用。
我有一个大致如下的事件流: 在每次命中时,我都要使用mongoose在数据库中查找哈希 有时,我会遇到一堆共享相同哈希的事件,当前它会触发一堆调用,这些调用基本上返回相同的内容。 如果哈希相同,我如何优化此过程并防止被多次调用。
问题内容: 我有两个项目,其中一个(服务)包括第二个(核心)。我已经在以下Core项目中定义了此PropertyPlaceholderConfigurer: 我想在上层项目中扩展Core占位符,包括appConfig.properties和其他一些。我发现的唯一方法是在上层定义另一个不同的bean(不同的ID),并包括新的bean: 但是它产生的结果是找不到appConfig.properties