当前位置: 首页 > 知识库问答 >
问题:

Spring Kafka,嵌入式Kafka测试

充运浩
2023-03-14

我们正在用我们的Servicetest和嵌入式Kafka观察一个奇怪的行为。

该测试是一个Spock测试,我们使用JUnit规则KafkaEmbedded并传播brokersAsString如下:

@ClassRule
@Shared
KafkaEmbedded embeddedKafka = new KafkaEmbedded(1)

@Autowired
KafkaListenerEndpointRegistry endpointRegistry

def setupSpec() {
    System.setProperty("kafka.bootstrapServers",  embeddedKafka.getBrokersAsString())
}
public static void waitForAssignment(KafkaMessageListenerContainer<String, String> container, int partitions)
        throws Exception {

        log.info(
            "Waiting for " + container.getContainerProperties().getTopics() + " to connect to " + partitions + " " +
                "partitions.")

        int n = 0;
        int count = 0;
        while (n++ < 600 && count < partitions) {
            count = 0;
            container.getAssignedPartitions().each {
                TopicPartition it ->
                    log.info(it.topic() + ":" + it.partition() + "; ")
            }

            if (container.getAssignedPartitions() != null) {
                count = container.getAssignedPartitions().size();
            }
            if (count < partitions) {
                Thread.sleep(100);
            }
        }
     }
2016-07-29 11:24:02.600  WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 1 : {deliveryZipCode_v1=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.600  WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 1 : {staggering=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.600  WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 1 : {moa=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.696  WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 3 : {staggering=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.699  WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 3 : {moa=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.699  WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 3 : {deliveryZipCode_v1=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.807  WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 5 : {deliveryZipCode_v1=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.811  WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 5 : {staggering=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.812  WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 5 : {moa=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:03.544  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2016-07-29 11:24:03.544  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2016-07-29 11:24:03.544  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2016-07-29 11:24:03.602  INFO 1160 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : SyncGroup for group timeslot-service-group-06x failed due to coordinator rebalance, rejoining the group
2016-07-29 11:24:03.637  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[]
2016-07-29 11:24:03.637  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[]
2016-07-29 11:24:04.065  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[staggering-0]
2016-07-29 11:24:04.066  INFO 1160 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 50810 (http)
2016-07-29 11:24:04.073  INFO 1160 --- [           main] .t.s.AllocationsDeliveryZonesServiceSpec : Started AllocationsDeliveryZonesServiceSpec in 20.616 seconds (JVM running for 25.456)
2016-07-29 11:24:04.237  INFO 1160 --- [           main] org.eclipse.jetty.server.Server          : jetty-9.2.17.v20160517
2016-07-29 11:24:04.265  INFO 1160 --- [           main] o.e.jetty.server.handler.ContextHandler  : Started o.e.j.s.ServletContextHandler@6a8598e7{/__admin,null,AVAILABLE}
2016-07-29 11:24:04.270  INFO 1160 --- [           main] o.e.jetty.server.handler.ContextHandler  : Started o.e.j.s.ServletContextHandler@104ea372{/,null,AVAILABLE}
2016-07-29 11:24:04.279  INFO 1160 --- [           main] o.eclipse.jetty.server.ServerConnector   : Started ServerConnector@3c9b416a{HTTP/1.1}{0.0.0.0:50811}
2016-07-29 11:24:04.430  INFO 1160 --- [           main] o.eclipse.jetty.server.ServerConnector   : Started ServerConnector@7c214597{SSL-http/1.1}{0.0.0.0:50812}
2016-07-29 11:24:04.430  INFO 1160 --- [           main] org.eclipse.jetty.server.Server          : Started @25813ms
2016-07-29 11:24:04.632  INFO 1160 --- [           main] .t.s.AllocationsDeliveryZonesServiceSpec : waiting...
2016-07-29 11:24:04.662  INFO 1160 --- [           main] .t.s.AllocationsDeliveryZonesServiceSpec : Waiting for [moa] to connect to 2 partitions.^
2016-07-29 11:24:13.644  INFO 1160 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : Attempt to heart beat failed since the group is rebalancing, try to re-join group.
2016-07-29 11:24:13.644  INFO 1160 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : Attempt to heart beat failed since the group is rebalancing, try to re-join group.
2016-07-29 11:24:13.644  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2016-07-29 11:24:13.644  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2016-07-29 11:24:13.655  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[staggering-0]
2016-07-29 11:24:13.655  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[moa-0]
2016-07-29 11:24:13.655  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[deliveryZipCode_v1-0]
2016-07-29 11:24:13.740  INFO 1160 --- [           main] .t.s.AllocationsDeliveryZonesServiceSpec : moa:0;
[...]
2016-07-29 11:24:16.644  INFO 1160 --- [           main] .t.s.AllocationsDeliveryZonesServiceSpec : moa:0;
2016-07-29 11:24:16.666  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[staggering-0]
2016-07-29 11:24:16.750  INFO 1160 --- [           main] .t.s.AllocationsDeliveryZonesServiceSpec : moa:0;
[...]
2016-07-29 11:24:23.559  INFO 1160 --- [           main] .t.s.AllocationsDeliveryZonesServiceSpec : moa:0;
2016-07-29 11:24:23.660  INFO 1160 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : Attempt to heart beat failed since the group is rebalancing, try to re-join group.
2016-07-29 11:24:23.660  INFO 1160 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : Attempt to heart beat failed since the group is rebalancing, try to re-join group.
2016-07-29 11:24:23.662  INFO 1160 --- [           main] .t.s.AllocationsDeliveryZonesServiceSpec : moa:0;
2016-07-29 11:24:23.686  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[moa-0]
2016-07-29 11:24:23.686  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[deliveryZipCode_v1-0]
2016-07-29 11:24:23.695  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[moa-0]
2016-07-29 11:24:23.695  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[staggering-0]
2016-07-29 11:24:23.695  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[deliveryZipCode_v1-0]

现在让我们困惑的是,如果我们等待两个分区连接,等待就会超时。只有当我们等待一个分区连接时,过一段时间一切都会成功运行。

我们是否理解错了代码,在嵌入式Kafka中每个主题有两个分区?只给我们的听众分配一个是正常的吗?

共有1个答案

茹展鹏
2023-03-14

我无法解释你看到的薄片;是的,默认情况下每个主题获得2个分区。我刚刚运行了一个框架容器测试,看到了这个...

09:24:06.139 INFO  [testSlow3-kafka-consumer-1][org.springframework.kafka.listener.KafkaMessageListenerContainer] partitions revoked:[]
09:24:06.611 INFO  [testSlow3-kafka-consumer-1][org.springframework.kafka.listener.KafkaMessageListenerContainer] partitions assigned:[testTopic3-1, testTopic3-0]
 类似资料:
  • 我编写了一个基本的Spring Boot服务,它通过rest API使用一些数据,并将其发布到rabbitmq和kafka。 为了测试处理kafka生成的服务类,我遵循以下指南:https://www.baeldung.com/spring-boot-kafka-testing 孤立地说,测试(KafkaMessagingServiceIMTest)在intellij想法和命令行上的mvn中都可以

  • 我已经在集群中配置了3个kafka,我正在尝试与sping-kafka一起使用。 但是在我杀死kafka领导者后,我无法发送其他消息到队列。 我将Spring.kafka.bootstrap-servers属性设置为:“kafka-1:9092;kafka-2:9093,kafka-3:9094”以及我的主机文件中的所有名称。 Kafka0.10版 有人知道如何正确配置? 编辑 我测试过一个东西,

  • 所以我用了这个嵌入Kafka的例子,还有这个 我对这个示例做了一点更改,并用一些数据库(如h2db)更新了kafka侦听器。 现在在我的单元测试中,当我想检查数据在数据库中是否可用时,我得到NULL。另外,我不确定如何手动检查数据库,因为h2是一个内存基础数据库。 这是更新的部分:在接收器类中 在单元测试中: 但 dt 始终为空。此外,我也无法检查数据库,因为它在测试停止后停止。有人知道如何使它可

  • 我们正在使用Cucumber为out应用程序做一些集成测试,我们在测试一个时遇到了一些问题。我们设法使用了一个嵌入式Kafka并在其中生成数据。 但消费者从未收到任何数据,我们也不知道发生了什么。 这是我们的代码: 制作人配置 消费者配置 集成测试 步骤定义 消费者表示,同样的代码在生产引导服务器上运行良好,但嵌入式Kafka从未实现过 日志中的一切看起来都很好,但我们不知道遗漏了什么。 提前感谢

  • 我正在使用Edgware版本中的Spring Cloud Stream binder发送Kafka消息。我也在使用Spring Sleuth和Zipkin。 Spring使用自定义类将标头嵌入到Kafka消息中。这会给一些必须处理此自定义解码的消息的非Spring消费者带来问题。 我的问题是:有没有办法为Spring配置消息头的自定义编码器/解码器(例如普通JSON)?或者可能使用Kafka标题?

  • 我正试图为我的Kafka消费者编写一个集成测试。我已经看完了官方的参考文件,但当我开始测试时,我只看到这个无限重复: -2019-04-03 15:47:34.002WARN 13120 --- [ main]org.apache.kafka.clients.NetworkClient:[消费者clientId=消费者-1, group pId=my-group]无法建立到节点-1的连接。经纪人可