当前位置: 首页 > 知识库问答 >
问题:

Kubernetes/Spring云数据流>Spring.Cloud.stream.bindings.output。生产者忽略了目标

阳宾实
2023-03-14

我正在尝试根据 http://cloud.spring.io/spring-cloud-dataflow/ 中解释的非常简单的示例运行“Hello,world”Spring Cloud 数据流流。我能够创建一个简单的源代码和接收器,并使用 Kafka 在本地 SCDF 服务器上运行它,因此到目前为止一切都是正确的,并且在 SCDF 指定的主题中生成和使用消息。

现在,我正在尝试根据http://docs.spring.io/spring-cloud-dataflow-server-kubernetes/docs/current-SNAPSHOT/reference/htmlsingle/#_getting_started.中列出的说明将其部署到我的私有云中。使用此部署,我能够毫无问题地部署一个简单的“时间|日志”开箱即用流,但我的示例失败了,因为生产者没有在创建pod时指定的主题中写入(例如,spring.cloud.stream.bindings.output.destination=ntest33.nites-源9),而是在主题“输出”中写入。我对接收器组件也有类似的问题,它在主题“输入”中创建并期望消息。

我使用仪表板创建了流定义

nsource1 | log

源的容器参数是:

--spring.cloud.stream.bindings.output.producer.requiredGroups=ntest34
--spring.cloud.stream.bindings.output.destination=ntest34.nsource1

源组件的代码片段是包 xxxx;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.support.GenericMessage;

@SpringBootApplication
@EnableBinding(Source.class)
public class HelloNitesApplication
{
public static void main(String[] args)
{
    SpringApplication.run(HelloNitesApplication.class, args);
}

@Bean
@InboundChannelAdapter(value = Source.OUTPUT)
public MessageSource<String> timerMessageSource()
{
    return () -> new GenericMessage<>("Hello " + new SimpleDateFormat().format(new Date()));
}

在日志中我可以清楚地看到

2017-04-07T09:44:34.596842965Z 2017-04-07 09:44:34593 INFO main o.s.i.c.DirectChannel:81-频道应用程序。output'有1个订阅者。

问题是,如何正确覆盖必须生成/使用消息的主题,或者使用什么属性和值使其在k8s上工作?

更新:我在使用RabbitMQ时也有类似的问题

2017-04-07T12:56:40.43505177Z 2017-04-07 12:56:40.435信息7-[主]o.s.integration.channel。DirectChannel:Channel”应用程序。output'有1个订阅者。

共有2个答案

程峻
2023-03-14

能否提供有关如何设置该配置属性的更多详细信息?这个特性非常基本,所以这应该是可行的。如果您正在使用流定义来设置它,请使用流定义更新您的问题。

通道名称保持“输出”,因为这是应用程序内部使用的。

黄泰宁
2023-03-14

问题在于我的码头工人形象。我仍然不知道细节,但使用https://spring.io/guides/gs/spring-boot-docker/指出的docker文件实例化了docker容器中的2个进程,一个有参数,另一个没有,这是一个正常运行时间,因此正在使用。

解决办法是替换

ENTRYPOINT [ "sh", "-c", "java $JAVA_OPTS -Djava.security.egd=file:/dev/./urandom -jar /app.jar" ]

具有

ENTRYPOINT [ "java", "-jar", "/app.jar" ]

它开始工作。该示例指出第一个入口点以及为什么创建 2 个进程必须有充分的理由,但原因仍然超出了我的理解范围。

 类似资料:
  • 我有一个服务,它从不同的Spring云流通道(绑定到EventHub/Kafka主题)生成和使用消息。有几种设置类似的服务。 配置如下所示 生产者/发布者代码如下所示 类似地,我还有多个其他发布者发布到不同的活动中心/主题。请注意,每个已发布的消息都有一个租户id标头。这是我的多租户应用程序特定于跟踪租户上下文的内容。还请注意,在发送消息时,我正在获取要发布到的频道。 我的消费者代码如下所示 同样

  • 我设置了一个Spring云流Kafka制作人和消费者,有3个Kafka经纪人在运行。我已经设置了min.insync。将副本复制到4,以查看生产者错误处理的工作方式。消息通道。send(发送) 以上是我的生产者配置。虽然retries设置为3,但生产者仍会多次重试。虽然sync设置为true,但发送呼叫会立即发出。虽然定义了错误通道和目标,并且将errorChannelEnabled设置为true

  • 只是在试用春云流 当我把它作为一个独立的jar运行时(就像另一个springboot应用程序一样),它尊重这个应用程序。物业和所有工程按预期进行。 当我使用SCDF创建流时,它会忽略属性,并使用约定流名称创建交换。app_名称不是我想要的。 我希望在某个时候,我希望这些应用程序与SpringCloudConfig服务器集成,在那里我可以从不同的来源完全外部化配置。 有人可以建议我遗漏了什么,以便S

  • 我需要更改Kafka设置中的值序列化器/反序列化器(出于测试目的,我一直在使用IntegerSerializer/IntegerDeserializer)。使用javaapi,它完全按照预期工作;但是,当使用控制台工具时,它似乎无法正常工作。 我所做的所有故障排除都让我得出了一个结论:Kafka控制台制作人似乎忽略了序列化程序选项。我尝试了和,并将其设置为带有参数。 它不仅不能将数据序列化为整数,

  • Spring Boot 2.0.1.RELEASE项目包含Spring Data JPA和Spring Data REST。似乎忽略了RESTendpoint中的<code>sort</code>参数(但在单元测试中使用相同的存储库方法)。存储库如下: } 实体(已编辑)如下: 当我尝试使用导出的RESTendpoint调用它时,例如: http://localhost:8080/api/v1/o

  • 我有一个存储库,其findAll方法用标记。我正在将生成的SQL打印到日志中,我可以看到当直接在Java中使用时,它会生成正确的连接选择(即)。但是当我通过REST调用它时,这种情况不会发生。我要么得到一个异常,BC。懒加载代理不能序列化,或者如果我添加杰克逊-数据库-hibernate5,我可以看到额外的查询。 我尝试在这里生成最小复制:https://github.com/cptwunderl