我试图使用spring cloud stream绑定实现一个自定义的Kafka分区器。我只想对用户主题进行自定义分区,而不对公司主题进行任何操作(在本例中,Kafka将使用DefaultPartitioner)。
我的绑定配置:
spring:
cloud:
stream:
bindings:
comp-out:
destination: company
contentType: application/json
user-out:
destination: user
contentType: application/json
spring:
cloud:
stream:
bindings:
comp-out:
destination: company
contentType: application/json
user-out:
destination: user
contentType: application/json
producer:
partitioned: true
partitionSelectorClass: config.UserPartitioner
我使用以下方式将消息发送到流中:
public void postUserStream(User user) throws ServiceException {
try {
LOG.info("Posting User {} into Kafka stream...", user);
MessageChannel messageChannel = messageStreams.outboundUser();
messageChannel
.send(MessageBuilder.withPayload(user)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build());
} catch (Exception ex) {
LOG.error("Error while populating User stream into Kafka.. ", ex);
throw ex;
}
}
我的UserPartitioner类:
public class UserPartitioner extends DefaultPartitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
Cluster cluster) {
String partitionKey = null;
if (Objects.nonNull(value)) {
User user = (User) value;
partitionKey = String.valueOf(user.getCompanyId()) + "_" + String.valueOf(user.getId());
keyBytes = partitionKey.getBytes();
}
return super.partition(topic, partitionKey, keyBytes, value, valueBytes, cluster);
}
}
我最终收到以下异常:
Property: spring.cloud.stream.bindings.user-out.producer.partitioned
Value: true
Origin: "spring.cloud.stream.bindings.user-out.producer.partitioned" from property source "bootstrapProperties"
Reason: No setter found for property: partitioned
编辑:根据文档,还尝试了以下步骤:
User-Out:Destination:user ContentType:Application/JSON Producer:PartitionKeyExtractorClass:
@Component
public class SimpleUserPartitioner implements PartitionKeyExtractorStrategy {
@Override
public Object extractKey(Message<?> message) {
if(message.getPayload() instanceof BaseUser) {
BaseUser user = (BaseUser) message.getPayload();
return user.getId();
}
return 10;
}
}
更新2:适用于我的解决方案将partitioncount添加到绑定中,并在绑定器中将autoaddpartitions添加到true中:
spring:
logging:
level: info
cloud:
stream:
bindings:
user-out:
destination: user
contentType: application/json
producer:
partition-key-expression: headers['partitionKey']
partition-count: 4
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
autoAddPartitions: true
没有属性分区
;吸气器取决于其他属性...
public boolean isPartitioned() {
return this.partitionKeyExpression != null
|| this.partitionKeyExtractorName != null;
}
partitionSelectorClass: config.UserPartitioner
UserPartitioner
是一个KafkaPartitioner
-它决定哪些使用者获得哪些分区(在使用者端)
PartitionSelectorClass
必须是PartitionSelectorStrategy
-它决定将记录发送到哪个分区(在生成器端)。
鉴于您的问题,我认为您只是将Partitione
与PartitionSelectorStrategy
混淆了,而您需要后者。
我在spring boot(消费者)应用程序中使用spring cloud stream kafka。即使应用程序无法连接到Kafka(Kafka代理已关闭),应用程序的运行状况也不准确。我读过关于Kafka健康检查的文章。在spring actuator health check中,kafka health check似乎已禁用。 因此,我成功地编写了以下代码来为我的应用程序启用Kafka健康检
试图开发一个Spring云应用程序,使用kafka Kafka使用的配置是: 运行应用程序,我可以看到这些配置被选中 问题是以下错误消息: 如何配置这个“AdminClient”并将正确的主机/ip信息传递给它?查看了Spring Cloud Stream Kafka活页夹参考指南,但找不到答案。
spring-cloud-stream-kafka-elasticsearch The goal of this project is to implement a "News" processing pipeline composed of five Spring Boot applications: producer-api, categorizer-service, collector-se
以下是我的情况: 我们有一个Spring cloud Stream 3 Kafka服务连接到同一个代理中的多个主题,但我想基于属性控制连接到特定主题。 每个主题都有自己的活页夹和绑定,但代理对所有人来说都是一样的。 我尝试使用下面的属性禁用绑定(这是我到目前为止找到的唯一解决方案),这适用于StreamListener不接收消息,但与主题的连接和重新平衡仍在发生。 我想知道活页夹级别是否有任何设置
有没有办法配置默认消息 或 在上面的示例中,
我正在使用Spring开发一个应用程序。在Access Control Access一节中,我想使用Spring Security Acl(我是Acl的新手)。我想在我的应用程序中实现ACL基于两点: 应用程序应该具有以下五种权限:、、、和。 权限是分层的,当用户具有权限时,它应该能够,或者当用户具有权限时,它应该能够、和等。 更新: 我的应用程序是基于Spring MVC RESTful的。当用