我有多个API,它们通过kafka(生成和使用消息)相互通信。在其中一个API中,我基于HTTP请求触发器生成消息(当调用endpoint时,生成一个meesage并发送给kafka),带有@Output和@EnableBinding注释。这些会议被订阅此主题的其他API使用。
现在,我尝试迁移到新的Spring-Cloud-Stream函数编程模型,并从文档中得出结论,使用外部源数据的StreamBridge是我的案例所需的方法(https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources). 然而,我不理解在没有定义源函数的情况下,如何根据命名约定正确配置源、bindingName和目标主题。我有以下配置,它成功地在“myFooTopic”上生成消息,但我在应用程序启动时注意到一些奇怪的日志,因为绑定似乎没有正确完成:
application.properties:
spring.kafka.bootstrap-servers=listOfServers for kafka
spring.kafka.producer.value-serializer= MyCustomKafkaPayloadAvroSerializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.properties.schema.registry.url=listOfServers for schema registry
spring.cloud.stream.bindings.user.destination=myUserTopic
spring.cloud.stream.bindings.user.producer.useNativeEncoding=true
spring.cloud.stream.bindings.user.producer.partitionCount=1
spring.cloud.stream.bindings.user.producer.partitionKeyExpression=headers['partitionId']
spring.cloud.stream.kafka.binder.autoCreateTopics=false
spring.cloud.stream.kafka.binder.configuration.security.protocol=SSL
spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=
spring.cloud.stream.kafka.binder.configuration.ssl.truststore.type=
spring.cloud.stream.kafka.binder.configuration.ssl.keystore.location=
spring.cloud.stream.kafka.binder.configuration.ssl.keystore.type=
此外,StreamBridge的代码为:
@Component
@RequiredArgsConstructor
@Slf4j
public class EventPublisher {
private final StreamBridge streamBridge;
private static final String USER = "user";
public void sendToChannel(String message) {
log.info("sendToChannel - sending to channel hello event");
try {
if (streamBridge.send(USER, buildChannelMessage(message))) {
log.info("sendToChannel - message was successfully sent ");
} else {
log.error("sendToChannel - failed to send message");
}
} catch (Exception e) {
log.error("sendToChannel - error while sending message on output binding {}", USER, e);
}
}
private Message<HelloEventAvro> buildChannelMessage(String message) {
HelloEventAvro helloEventAvro = HelloEventAvro.newBuilder()
.setHelloMessage(message)
.build();
long timestamp = Instant.now().toEpochMilli();
return MessageBuilder.withPayload(helloEventAvro)
.setHeader("partitionId", 1)
.setHeader("X-Timestamp", timestamp)
.build();
}
}
在使用的依赖项中:
我的问题是:
Using kafka topic for outbound: myUserTopic (which is correct)
Caching the binder: kafka
Retrieving cached binder: kafka
.....
Channel 'unknown.channel.name' has 1 subscriber(s). (which is strange)
对于生成的以下消息,日志“未知”。频道“名称”不会再次出现。
我不明白为什么通道的名称是“未知”,而不是application.properties配置中提供的输出bindingName“user”。你能指导我了解一下我这边是否有配置错误吗?所有来自留档的sping-cloud-stream示例
编辑:
关于上述配置的测试,我还有一个问题。我试图通过以下示例为StreamBridge的这个特定用例编写一个测试([https://github.com/spring-cloud/spring-cloud-stream/blob/d8ed65a249ed4364b96d68f4f56b3d5b4de996a4/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java#L716][1] )并根据我的上述应用程序进行调整。属性配置,并且出于某种原因,此测试在outputDestination中接收到的消息为空:
public class StreamBridgeTests {
@Test
public void testSendingMessageToDestination() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration
.getCompleteConfiguration(Application.class))
.web(WebApplicationType.NONE).run()) {
HelloEventAvro helloEventAvro = buildHelloEventAvro();
Message<HelloEventAvro> helloEventAvroMessage = MessageBuilder
.withPayload(helloEventAvro)
.setHeader(CustomKafkaHeaders.PARTITION_ID.value(), helloEventAvro.getId())
.build();
StreamBridge bridge = context.getBean(StreamBridge.class);
bridge.send("user", helloEventAvroMessage);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message<byte[]> message = outputDestination.receive(100, "user");
assertThat(new String(message.getPayload())).contains("hello");
}
}
}
我用debug深入研究了一下,看起来outputDestation是用一个通道(myUserTopic.destination)和2个消息队列而不是1创建的,如下所述:
如果我在OutputDestination的receive()方法中将bindingName从“user”更改为kafka主题“myUserTopic”的名称,它将按预期工作:创建一个通道(myUserTopic.destination)和一个消息队列(myUserTopic.destination):
public class StreamBridgeTests {
@Test
public void testSendingMessageToDestination() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration
.getCompleteConfiguration(Application.class))
.web(WebApplicationType.NONE).run()) {
HelloEventAvro helloEventAvro = buildHelloEventAvro();
Message<HelloEventAvro> helloEventAvroMessage = MessageBuilder
.withPayload(helloEventAvro)
.setHeader(CustomKafkaHeaders.PARTITION_ID.value(), helloEventAvro.getId())
.build();
StreamBridge bridge = context.getBean(StreamBridge.class);
bridge.send("user", helloEventAvroMessage);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message<byte[]> message = outputDestination.receive(100, "myUserTopic");
assertThat(new String(message.getPayload())).contains("hello");
}
}
}
考虑到上述情况,我仍然不知道StreamBridge的send()方法中的bindingName如何与OutputDestination中的receive()方法相关联(如果我只有一个主题,那么这两个地方的名称不应该相同?)以及如何将其解析为Kafka主题目标名称集。
[1]: https://github.com/spring-cloud/spring-cloud-stream/blob/d8ed65a249ed4364b96d68f4f56b3d5b4de996a4/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java#L716)
绑定约定与以前相同。您使用StreamBridge发送到绑定。如果此类绑定不存在,它将被创建,并且与绑定对应的实际目标名称相同,除非 . . . (继续阅读)
bridge.send("foo", message)
如果您在代理上有一个现有的目的地(例如,foo_destination
),您可以像以前一样进行目的地映射
spring.cloud.stream.bindings.foo.destination=foo_destination
换句话说,所有绑定属性都将应用于新创建的绑定(及时)。
如Gary在评论中所述,如果使用spring。云流动source=foo,将在初始化过程中预先创建绑定,但它将遵循相同的约定,就像您有一个函数,这意味着实际绑定名称将是foo-out-0。这是有意的,但我将跳过如何解释,以避免失去问题的主要上下文。
enable绑定用来设置form中的元素是否可用,例如button、input、select等。当绑定的参数为true时元素可用。 示例代码: //.W片段 <p> <input type='checkbox' bind-checked="hasCellphone" /> I have a cellphone </p> <p> Your cellphone number: <input
自3.1版以来,用于处理队列的主要API已被弃用。课堂评论中写道: 从3.1开始就不推荐使用函数式编程模型 我在网上搜索了很多解决方案,但没有找到关于如何迁移的可靠的E2E解释。 正在寻找以下方面的示例: 从队列读取 如果有几种方法可以做到这一点(正如我在网上看到的),我很乐意为每个选项提供解释和典型用例。
关于静态和动态之间的区别,我仍然有点困惑。据我所知,动态使用对象,而静态使用类型,动态在运行时解析,而静态在编译时解析。所以this.lastName.compare(s1.last名称)不应该使用动态绑定吗? 钥匙compareTo(list[position-1])使用动态绑定 (this . last name . compare to(S1 . last name))为什么使用静态绑定?
if绑定 if绑定应用在页面元素中,并通过表达式判断是否为元素添加子元素的绑定。if绑定在功能上非常像visible绑定,但在实现上却有很大的不同。visible绑定是为元素添加css样式来控制元素是否显示,if绑定是控制元素的字元素,如果表达式为true,则为元素添加子元素,否则清空子元素。 示例代码: //.W片段 <label> <input type="checkbox" bind-c
这是我的控制器: 我的观点是: 和VO类:
自定义绑定(Custom Binding)允许我们通过代码实现自定义绑定规则,从而完成更高级的业务需求。 示例代码 //.js片段 justep.Bind.bindingHandlers.yourBindingName = { init: function (element, valueAccessor, allBindings, viewModel, bindingContext) {