当前位置: 首页 > 知识库问答 >
问题:

如何利用Spring-Cloud-Stream实现自定义kafka分区

孙阳旭
2023-03-14

我试图使用spring cloud stream绑定实现一个自定义的Kafka分区器。我只想对用户主题进行自定义分区,而不对公司主题进行任何操作(在本例中,Kafka将使用DefaultPartitioner)。

我的绑定配置:

spring:
  cloud:
    stream:
      bindings:
        comp-out:
          destination: company
          contentType: application/json
        user-out:
          destination: user
          contentType: application/json
spring:
  cloud:
    stream:
      bindings:
        comp-out:
          destination: company
          contentType: application/json
        user-out:
          destination: user
          contentType: application/json
		      producer:
            partitioned: true
            partitionSelectorClass: config.UserPartitioner

我使用以下方式将消息发送到流中:

public void postUserStream(User user) throws ServiceException {
        try {
            LOG.info("Posting User {} into Kafka stream...", user);
            MessageChannel messageChannel = messageStreams.outboundUser();
            messageChannel
                    .send(MessageBuilder.withPayload(user)
                            .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build());
        } catch (Exception ex) {
            LOG.error("Error while populating User stream into Kafka.. ", ex);
            throw ex;
        }
    }

我的UserPartitioner类:

public class UserPartitioner extends DefaultPartitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
          Cluster cluster) {

        String partitionKey = null;
        if (Objects.nonNull(value)) {
            User user = (User) value;
            partitionKey = String.valueOf(user.getCompanyId()) + "_" + String.valueOf(user.getId());
            keyBytes = partitionKey.getBytes();
        }
        return super.partition(topic, partitionKey, keyBytes, value, valueBytes, cluster);
    }
}

我最终收到以下异常:

Property: spring.cloud.stream.bindings.user-out.producer.partitioned
Value: true
Origin: "spring.cloud.stream.bindings.user-out.producer.partitioned" from property source "bootstrapProperties"
Reason: No setter found for property: partitioned

编辑:根据文档,还尝试了以下步骤:

User-Out:Destination:user ContentType:Application/JSON Producer:PartitionKeyExtractorClass:

@Component
public class SimpleUserPartitioner implements PartitionKeyExtractorStrategy {

	@Override
	public Object extractKey(Message<?> message) {
		if(message.getPayload() instanceof BaseUser) {
			BaseUser user = (BaseUser) message.getPayload();
			return user.getId();
		}
		return 10;
	}
    
}

更新2:适用于我的解决方案将partitioncount添加到绑定中,并在绑定器中将autoaddpartitions添加到true中:

spring:
  logging:
    level: info
  cloud:
    stream:
      bindings:
        user-out:
          destination: user
          contentType: application/json
          producer:
            partition-key-expression: headers['partitionKey']
            partition-count: 4
spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          autoAddPartitions: true

共有1个答案

孔华池
2023-03-14

没有属性分区;吸气器取决于其他属性...

public boolean isPartitioned() {
    return this.partitionKeyExpression != null
            || this.partitionKeyExtractorName != null;
}

partitionSelectorClass: config.UserPartitioner

UserPartitioner是一个KafkaPartitioner-它决定哪些使用者获得哪些分区(在使用者端)

PartitionSelectorClass必须是PartitionSelectorStrategy-它决定将记录发送到哪个分区(在生成器端)。

鉴于您的问题,我认为您只是将PartitionePartitionSelectorStrategy混淆了,而您需要后者。

 类似资料:
  • 我在spring boot(消费者)应用程序中使用spring cloud stream kafka。即使应用程序无法连接到Kafka(Kafka代理已关闭),应用程序的运行状况也不准确。我读过关于Kafka健康检查的文章。在spring actuator health check中,kafka health check似乎已禁用。 因此,我成功地编写了以下代码来为我的应用程序启用Kafka健康检

  • 试图开发一个Spring云应用程序,使用kafka Kafka使用的配置是: 运行应用程序,我可以看到这些配置被选中 问题是以下错误消息: 如何配置这个“AdminClient”并将正确的主机/ip信息传递给它?查看了Spring Cloud Stream Kafka活页夹参考指南,但找不到答案。

  • spring-cloud-stream-kafka-elasticsearch The goal of this project is to implement a "News" processing pipeline composed of five Spring Boot applications: producer-api, categorizer-service, collector-se

  • 以下是我的情况: 我们有一个Spring cloud Stream 3 Kafka服务连接到同一个代理中的多个主题,但我想基于属性控制连接到特定主题。 每个主题都有自己的活页夹和绑定,但代理对所有人来说都是一样的。 我尝试使用下面的属性禁用绑定(这是我到目前为止找到的唯一解决方案),这适用于StreamListener不接收消息,但与主题的连接和重新平衡仍在发生。 我想知道活页夹级别是否有任何设置

  • 有没有办法配置默认消息 或 在上面的示例中,

  • 我正在使用Spring开发一个应用程序。在Access Control Access一节中,我想使用Spring Security Acl(我是Acl的新手)。我想在我的应用程序中实现ACL基于两点: 应用程序应该具有以下五种权限:、、、和。 权限是分层的,当用户具有权限时,它应该能够,或者当用户具有权限时,它应该能够、和等。 更新: 我的应用程序是基于Spring MVC RESTful的。当用