我有一个接受@Payload字符串的@StreamListener。为了测试这个侦听器类,我使用嵌入式Kafka编写了一个Junit类。运行我的测试类时,我得到以下错误
错误O.S.I.Handler.LoggingHandler-org.SpringFramework.Messaging.Converter.MessageConversionException:无法在此处输入代码
将GenericMessage从[[B]转换为[java.lang.String]
如果我将@Payload的数据类型从String更改为Byte[],那么我的侦听器类将选择消息。
有人能帮我知道这是什么问题吗?我猜这是与Could Steam配置有关的东西。
@ExtendWith(SpringExtension.class)
@DirtiesContext
@SpringBootTest(classes = IntegrationTestConsumer.class)
@EmbeddedKafka(partitions = 1, controlledShutdown = true,
topics = {
"input",
"output"})
public class TestUtils {
public static final String KEY_SERIALIZER = "key.serializer";
public static final String VALUE_SERIALIZER = "value.serializer";
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@BeforeEach
public void setup() {
System.setProperty("spring.cloud.stream.kafka.binder.brokers", embeddedKafka.getBrokersAsString());
}
@Test
public void someTest() throws Exception {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
senderProps.put(KEY_SERIALIZER, StringSerializer.class);
senderProps.put(VALUE_SERIALIZER, StringSerializer.class);
DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory, true);
template.setDefaultTopic("input");
template.sendDefault("foo");
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
"input_group",
"false",
this.embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put("key.deserializer", StringDeserializer.class);
consumerProps.put("value.deserializer", StringDeserializer.class);
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<String, String> consumer = cf.createConsumer();
consumer.subscribe(Collections.singleton("output"));
ConsumerRecords<String, String> records = consumer.poll(10_000);
consumer.commitSync();
Assertions.assertThat(records.count()).isGreaterThanOrEqualTo(1);
spring:
cloud:
stream:
kafka:
streams:
binder:
configuration:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
input:
consumer:
enable-dlq: true
dlq-name: output
dlq-producer-properties:
retries: 1
binder:
brokers: ${spring.embedded.kafka.brokers}
replicationFactor: ${replication_factor:1}
autoCreateTopics: true
autoAddPartitions: true
configuration:
retries: 1
batch.size: 16384
linger.ms: 1
enable.idempotence: true
buffer.memory: 33554432
request.timeout.ms: 3000
transaction.timeout.ms: 3000
max.block.ms: ${kafka_max_block_time:5000}
max.poll.records: 80
poll.timeout: 10000
commit.retries: 1
commit.retry.interval: 1000
session.timeout.ms.config: 50000
shutdown.signal: INT,TERM
acks: "all"
bindings:
output:
destination: output
contentType: application/json
producer:
partitionCount: ${partition_count:1}
input:
destination: input
contentType: application/json
partitioned: true
group: input_group
请检查是否已模拟ObjectMapper,因为ObjectMapper无法将字节[]转换为字符串。
Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智能路由,微代理,控制总线)。分布式系统的协调导致了样板模式, 使用Spring Cloud开发人员可以快速地支持实现这些模式的服务和应用程序。他们将在任何分布式环境中运行良好,包括开发人员自己的笔记本电脑,裸机数据中心,以及Cloud Foundry等托管平台。
Spring Cloud 为开发人员提供了工具,以快速构建分布式系统中的某些常见模式(例如:配置管理、服务发现、智能路由、微代理、控制总线、一次性令牌、全局锁、分布式会话、群集状态等)。分布式系统的协调导致了样板式样,并且使用Spring Cloud开发人员可以快速站起来实现这些样板的服务和应用程序。它们可以在任何分布式环境中正常工作,包括开发人员自己的笔记本电脑,裸机数据中心以及Cloud Fo
Cloudfoundry的Spring Cloud可以轻松地在Cloud Foundry(平台即服务)中运行 Spring Cloud应用程序 。Cloud Foundry有一个“服务”的概念,它是“绑定”到应用程序的中间件,本质上为其提供包含凭据的环境变量(例如,用于服务的位置和用户名)。 spring-cloud-cloudfoundry-web项目为Cloud Foundry中的webapp
我们最近从Spring Cloud Netflix Ribbon迁移到Spring Cloud LoadBalancer,并使用Spring Cloud kubernetes作为发现客户端。 现在spring.cloud.kubernetes.ribbon.mode(https://cloud.spring.io/spring-cloud-static/spring-cloud-kubernete
主要内容:Spring Cloud Config,Spring Cloud Config 工作原理,Spring Cloud Config 的特点,搭建 Config 服务端,搭建 Config 客户端,手动刷新配置,Config+Bus 实现配置的动态刷新在分布式微服务系统中,几乎所有服务的运行都离不开配置文件的支持,这些配置文件通常由各个服务自行管理,以 properties 或 yml 格式保存在各个微服务的类路径下,例如 application.properties 或 applicat
主要内容:API 网关,Spring Cloud Gateway ,Gateway 的工作流程,Predicate 断言,Spring Cloud Gateway 动态路由,Filter 过滤器在微服务架构中,一个系统往往由多个微服务组成,而这些服务可能部署在不同机房、不同地区、不同域名下。这种情况下,客户端(例如浏览器、手机、软件工具等)想要直接请求这些服务,就需要知道它们具体的地址信息,例如 IP 地址、端口号等。 这种客户端直接请求服务的方式存在以下问题: 当服务数量众多时,客户端需要维护
主要内容:熔断器,Spring Cloud Hystrix ,Hystrix 服务降级,示例1,全局降级方法,解耦降级逻辑,Hystrix 服务熔断,Hystrix 故障监控在微服务架构中,一个应用往往由多个服务组成,这些服务之间相互依赖,依赖关系错综复杂。 例如一个微服务系统中存在 A、B、C、D、E、F 等多个服务,它们的依赖关系如下图。 图1:服务依赖关系 通常情况下,一个用户请求往往需要多个服务配合才能完成。如图 1 所示,在所有服务都处于可用状态时,请求 1 需要调用 A、D、E、F
主要内容:OpenFeign,Feign VS OpenFeign ,OpenFeign 实现远程服务调用,OpenFeign 超时控制,OpenFeign 日志增强Netflix Feign 是 Netflix 公司发布的一种实现负载均衡和服务调用的开源组件。Spring Cloud 将其与 Netflix 中的其他开源服务组件(例如 Eureka、Ribbon 以及 Hystrix 等)一起整合进 Spring Cloud Netflix 模块中,整合后全称为 Spring Cloud Ne