当前位置: 首页 > 知识库问答 >
问题:

StreamBridge绑定而不是Enable绑定和自Spring Cloud Stream 3.1版本以来已弃用的输出注释

皮景龙
2023-03-14

我有多个API,它们通过kafka(生成和使用消息)相互通信。在其中一个API中,我基于HTTP请求触发器生成消息(当调用endpoint时,生成一个meesage并发送给kafka),带有@Output和@EnableBinding注释。这些会议被订阅此主题的其他API使用。

现在,我尝试迁移到新的Spring-Cloud-Stream函数编程模型,并从文档中得出结论,使用外部源数据的StreamBridge是我的案例所需的方法(https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources). 然而,我不理解在没有定义源函数的情况下,如何根据命名约定正确配置源、bindingName和目标主题。我有以下配置,它成功地在“myFooTopic”上生成消息,但我在应用程序启动时注意到一些奇怪的日志,因为绑定似乎没有正确完成:

application.properties:

spring.kafka.bootstrap-servers=listOfServers for kafka
spring.kafka.producer.value-serializer= MyCustomKafkaPayloadAvroSerializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.properties.schema.registry.url=listOfServers for schema registry

spring.cloud.stream.bindings.user.destination=myUserTopic
spring.cloud.stream.bindings.user.producer.useNativeEncoding=true
spring.cloud.stream.bindings.user.producer.partitionCount=1

spring.cloud.stream.bindings.user.producer.partitionKeyExpression=headers['partitionId']

spring.cloud.stream.kafka.binder.autoCreateTopics=false
spring.cloud.stream.kafka.binder.configuration.security.protocol=SSL
spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=
spring.cloud.stream.kafka.binder.configuration.ssl.truststore.type=
spring.cloud.stream.kafka.binder.configuration.ssl.keystore.location=
spring.cloud.stream.kafka.binder.configuration.ssl.keystore.type=

此外,StreamBridge的代码为:

@Component
@RequiredArgsConstructor
@Slf4j
public class EventPublisher {

    private final StreamBridge streamBridge;
    private static final String USER = "user";

    public void sendToChannel(String message) {
        log.info("sendToChannel - sending to channel hello event");

        try {
            if (streamBridge.send(USER, buildChannelMessage(message))) {
                log.info("sendToChannel - message was successfully sent ");
            } else {
                log.error("sendToChannel - failed to send message");
            }
        } catch (Exception e) {
            log.error("sendToChannel - error while sending message on output binding {}", USER, e);
        }
    }

    private Message<HelloEventAvro> buildChannelMessage(String message) {
        HelloEventAvro helloEventAvro = HelloEventAvro.newBuilder()
                .setHelloMessage(message)
                .build();
        long timestamp = Instant.now().toEpochMilli();

        return MessageBuilder.withPayload(helloEventAvro)
                .setHeader("partitionId", 1)
                .setHeader("X-Timestamp", timestamp)
                .build();
    }
}

在使用的依赖项中:

  • Spring启动2.6.2
  • kafka客户端2.8.1
  • 春-云-流3.2.1
  • 春-云-流-绑定器-Kafka3.2.1
  • Spring-集成-kafka 5.5.8

我的问题是:

  • 生产者与myUserTopic的绑定是否正确,或者是否需要添加属性spring。云流动源=用户
  • “user”bindingName是正确的,或者考虑到我没有配置bean供应商,它应该遵守“user-out-0”的约定
  • 当应用程序启动后生成第一条消息时,我可以看到以下日志:
    Using kafka topic for outbound: myUserTopic (which is correct)
    Caching the binder: kafka
    Retrieving cached binder: kafka
    .....
    Channel 'unknown.channel.name' has 1 subscriber(s). (which is strange)

对于生成的以下消息,日志“未知”。频道“名称”不会再次出现。

我不明白为什么通道的名称是“未知”,而不是application.properties配置中提供的输出bindingName“user”。你能指导我了解一下我这边是否有配置错误吗?所有来自留档的sping-cloud-stream示例

编辑:

关于上述配置的测试,我还有一个问题。我试图通过以下示例为StreamBridge的这个特定用例编写一个测试([https://github.com/spring-cloud/spring-cloud-stream/blob/d8ed65a249ed4364b96d68f4f56b3d5b4de996a4/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java#L716][1] )并根据我的上述应用程序进行调整。属性配置,并且出于某种原因,此测试在outputDestination中接收到的消息为空:

public class StreamBridgeTests {

    @Test
    public void testSendingMessageToDestination() {
        try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration
                .getCompleteConfiguration(Application.class))
                .web(WebApplicationType.NONE).run()) {

            HelloEventAvro helloEventAvro = buildHelloEventAvro();

            Message<HelloEventAvro> helloEventAvroMessage = MessageBuilder
                    .withPayload(helloEventAvro)
                    .setHeader(CustomKafkaHeaders.PARTITION_ID.value(), helloEventAvro.getId())
                    .build();

            StreamBridge bridge = context.getBean(StreamBridge.class);
            bridge.send("user", helloEventAvroMessage);

            OutputDestination outputDestination = context.getBean(OutputDestination.class);
            Message<byte[]> message = outputDestination.receive(100, "user");
            assertThat(new String(message.getPayload())).contains("hello");
        }
    }
}

我用debug深入研究了一下,看起来outputDestation是用一个通道(myUserTopic.destination)和2个消息队列而不是1创建的,如下所述:

  • 大小为0的user.destination(其中"user"=my bindingName)
  • myUserTopic.destination大小为1,生成的helloEventAvroMessage

如果我在OutputDestination的receive()方法中将bindingName从“user”更改为kafka主题“myUserTopic”的名称,它将按预期工作:创建一个通道(myUserTopic.destination)和一个消息队列(myUserTopic.destination):

public class StreamBridgeTests {

    @Test
    public void testSendingMessageToDestination() {
        try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration
                .getCompleteConfiguration(Application.class))
                .web(WebApplicationType.NONE).run()) {

            HelloEventAvro helloEventAvro = buildHelloEventAvro();

            Message<HelloEventAvro> helloEventAvroMessage = MessageBuilder
                    .withPayload(helloEventAvro)
                    .setHeader(CustomKafkaHeaders.PARTITION_ID.value(), helloEventAvro.getId())
                    .build();

            StreamBridge bridge = context.getBean(StreamBridge.class);
            bridge.send("user", helloEventAvroMessage);

            OutputDestination outputDestination = context.getBean(OutputDestination.class);
            Message<byte[]> message = outputDestination.receive(100, "myUserTopic");
            assertThat(new String(message.getPayload())).contains("hello");
        }
    }
}

考虑到上述情况,我仍然不知道StreamBridge的send()方法中的bindingName如何与OutputDestination中的receive()方法相关联(如果我只有一个主题,那么这两个地方的名称不应该相同?)以及如何将其解析为Kafka主题目标名称集。

[1]: https://github.com/spring-cloud/spring-cloud-stream/blob/d8ed65a249ed4364b96d68f4f56b3d5b4de996a4/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java#L716)

共有1个答案

张坚白
2023-03-14

绑定约定与以前相同。您使用StreamBridge发送到绑定。如果此类绑定不存在,它将被创建,并且与绑定对应的实际目标名称相同,除非 . . . (继续阅读)

bridge.send("foo", message) 

如果您在代理上有一个现有的目的地(例如,foo_destination),您可以像以前一样进行目的地映射

spring.cloud.stream.bindings.foo.destination=foo_destination

换句话说,所有绑定属性都将应用于新创建的绑定(及时)。

如Gary在评论中所述,如果使用spring。云流动source=foo,将在初始化过程中预先创建绑定,但它将遵循相同的约定,就像您有一个函数,这意味着实际绑定名称将是foo-out-0。这是有意的,但我将跳过如何解释,以避免失去问题的主要上下文。

 类似资料:
  • enable绑定用来设置form中的元素是否可用,例如button、input、select等。当绑定的参数为true时元素可用。 示例代码: //.W片段 <p> <input type='checkbox' bind-checked="hasCellphone" /> I have a cellphone </p> <p> Your cellphone number: <input

  • 自3.1版以来,用于处理队列的主要API已被弃用。课堂评论中写道: 从3.1开始就不推荐使用函数式编程模型 我在网上搜索了很多解决方案,但没有找到关于如何迁移的可靠的E2E解释。 正在寻找以下方面的示例: 从队列读取 如果有几种方法可以做到这一点(正如我在网上看到的),我很乐意为每个选项提供解释和典型用例。

  • 关于静态和动态之间的区别,我仍然有点困惑。据我所知,动态使用对象,而静态使用类型,动态在运行时解析,而静态在编译时解析。所以this.lastName.compare(s1.last名称)不应该使用动态绑定吗? 钥匙compareTo(list[position-1])使用动态绑定 (this . last name . compare to(S1 . last name))为什么使用静态绑定?

  • if绑定 if绑定应用在页面元素中,并通过表达式判断是否为元素添加子元素的绑定。if绑定在功能上非常像visible绑定,但在实现上却有很大的不同。visible绑定是为元素添加css样式来控制元素是否显示,if绑定是控制元素的字元素,如果表达式为true,则为元素添加子元素,否则清空子元素。 示例代码: //.W片段 <label> <input type="checkbox" bind-c

  • 这是我的控制器: 我的观点是: 和VO类:

  • 自定义绑定(Custom Binding)允许我们通过代码实现自定义绑定规则,从而完成更高级的业务需求。 示例代码 //.js片段 justep.Bind.bindingHandlers.yourBindingName = { init: function (element, valueAccessor, allBindings, viewModel, bindingContext) {