在本页中,您不能只进行一次输出
但我只需要使用Spring Cloud Stream Kafka活页夹进行一次输出
那我该怎么办?
一些文章说使用org.springframework.cloud.stream.function.StreamBridge但它不适合我
我让StreamBridge向Kafka发送主题,但Kafka不会为我的Spring Boot应用程序生成主题
这是我的application.yml和生产主题代码
// producer Springboot application
spring.cloud.stream:
kafka:
binder:
brokers: {AWS.IP}:9092
zkNodes: {AWS.IP}:2181
bindings:
deliveryIncoming:
destination : deliveryIncomingtopic
@Bean
public CommandLineRunner commandLineRunner(ApplicationContext ctx) {
// wanna produce deliveryIncomingtopic and send to Spring's Consumer
}
// Consumer Springboot application
spring :
cloud:
stream:
kafka:
binder:
brokers: {AWS.IP}:9092
zkNodes: {AWS.IP}:2181
function:
definition : deliveryIncoming;
bindings:
deliveryIncoming-in-0:
destination : deliveryIncomingtopic
@Bean
public Consumer<KStream<String, String>> deliveryIncoming() {
return input ->
input.foreach((key, value) -> {
System.out.println("deliveryIncoming is playing");
System.out.println("Key: " + key + " Value: " + value);
});
}
对不起,我想我说的有点不清楚
我只想像下面这样做
生产(deliveryIncomingtopic)-
我只是想把事情弄清楚。您是否希望使用入站主题中的消息,然后生成/生成另一条消息到另一个输出主题?如果这是您的问题,那么我相信您在应用程序中遗漏了一些东西。yml。您需要为输出主题进行其他配置。例如:
由于您使用的是Spring Cloud Function(基于我在您的application.yml中看到的内容),因此您应该为输出主题添加更多配置,如下所示:
spring :
cloud:
stream:
kafka:
binder:
brokers: {AWS.IP}:9092
zkNodes: {AWS.IP}:2181
function:
definition: deliveryIncoming;deliveryOutput
bindings:
deliveryIncoming-in-0:
destination: deliveryIncomingtopic
deliveryOutput-out-0:
destination: deliveryOutput
并为producer函数定义另一个bean:
@Bean
public Producer<KStream<String, String>> deliveryOutput() {
// do your necessary logic here to outbound your message
}
希望这能符合你的期望。
如果是这种情况,那么您需要更改bean函数定义以返回java.util.Function
而不是java.util.消费者
。
@Bean
public Function<KStream<String, String>, KStream<String, String>> deliveryIncoming() {
return input ->
input.foreach((key, value) -> {
System.out.println("deliveryIncoming is playing");
System.out.println("Key: " + key + " Value: " + value);
});
}
但是,AFAIK...您仍然需要在application.yml.中定义输出通道。您可以使用相同的名称和不同的后缀。如下所示:
deliveryIncoming-in-0:
destination: <your_topic_name>
deliveryIncoming-out-0:
destination: <your_topic_name>
我有以下处理器bean方法签名: 相关物业: Spring Cloud Kafka Stream版本Hoxton. SR4(Spring-Cloud-stream-binder-kafka-stream: jar: 3.0.4. RELEASE),嵌入式Kafka版本2.5.0。 我正在使用嵌入式Kafka测试我的拓扑: 我的测试显示,来自的消息按预期到达并到达主题“out0”中,但“out1”主
我是Hadoop的新手,但这是我上个月的一个学习项目。 为了使这一点足够模糊,以便对其他人有用,让我先抛出基本目标……假设: < li >显然,您有一个大型数据集,包含数百万个基本ASCII文本文件。 < ul > < li >每个文件都是一个“记录” e. g. /user/hduser/data/customer1/YYYY-MM-DD, /user/hduser/data/customer2
我正在尝试制作一个定制的spring cloud stream活页夹,但它无法自动注册: 活页夹实现: 配置类: Spring活页夹文件: application.yml 我已经按照spring cloud stream的指导方针创建了一个custome活页夹,但这不起作用。此外,使用@Configuration创建绑定bean会禁用我在类路径上添加的kafka绑定。
我们有一个要求,我们正在消费来自一个主题的消息,然后发生了一些丰富,然后我们将消息发布到另一个主题。以下是事件 使用者 - 使用消息 扩充 - 扩充使用的消息 制作人 - 已发布 向其他主题发送的丰富消息 我正在使用Spring cloud kafka binder,一切正常。突然,我们观察到生产者正在向主题发送重复的消息,然后我们使生产者是幂等的。为了更好地控制,我们将autocommitOff
是否可以创建一个单例对象,并将其作为一个实例或集合或映射的一部分独立注入? 例如,我有两个类似的Instance: ... 我通过参数直接注入它们,在我的类中使用它们。但是,我也有一个用例,我想把它们都注入到一个集合中。 比如: 是否可以创建使用已定义实例的?如何在集合中单独使用相同的实例?
spring . cloud . stream . Kafka . binder . zknodes是必须的吗?如果价值缺失会发生什么?