当前位置: 首页 > 知识库问答 >
问题:

Spring云流Kafka活页夹连接到码头-组成Kafka

裴令秋
2023-03-14
version: '3'

services:
  kafka-0:
    image: confluentinc/cp-kafka:5.2.1
    container_name: kafka-0
    restart: always
    ports:
      - "9094:9092"
    expose:
      - "9094"
    environment:
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-0:9094
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2182
      - KAFKA_ADVERTISED_HOST_NAME=kafka-0
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
    depends_on:
      - zookeeper

  kafka-1:
    image: confluentinc/cp-kafka:5.2.1
    container_name: kafka-1
    restart: always
    ports:
      - "9095:9093"
    expose:
      - "9095"
    environment:
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-1:9095
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2182
      - KAFKA_ADVERTISED_HOST_NAME=kafka-1
      - KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
    depends_on:
      - zookeeper

 zookeeper:
    image: confluentinc/cp-zookeeper:5.3.1
    container_name: zookeeper
    ports:
      - "2182:2181"
    expose:
      - "2182"
    environment:
      - ZOOKEEPER_CLIENT_PORT=2182
spring.cloud.stream.kafka.binder.brokers=127.0.0.1:9094,127.0.0.1:9095
@EnableScheduling
@EnableBinding(Source.class)
public class UsageDetailSender {

    @Autowired
    private Source source;

    private String[] users = {"Glenn", "Sabby", "Mark", "Janne", "Ilaya"};

    @Scheduled(fixedDelay = 1000)
    public void sendEvents() {
        UsageDetail usageDetail = new UsageDetail();
        usageDetail.setUserId(this.users[new Random().nextInt(5)]);
        usageDetail.setDuration(new Random().nextInt(300));
        usageDetail.setData(new Random().nextInt(700));
        this.source.output().send(MessageBuilder.withPayload(usageDetail).build());
    }
}
bootstrap.servers = [127.0.0.1:9094, 127.0.0.1:9095]
    client.id = 
    connections.max.idle.ms = 300000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 120000
    retries = 5
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

2020-04-15 20:32:20.303  INFO 10384 --- [| adminclient-1] o.a.k.c.a.i.AdminMetadataManager         : [AdminClient clientId=adminclient-1] Metadata update failed

org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited.

2020-04-15 20:32:20.314 ERROR 10384 --- [           main] o.s.cloud.stream.binding.BindingService  : Failed to create producer binding; retrying in 30 seconds

org.springframework.cloud.stream.provisioning.ProvisioningException: Provisioning exception; nested exception is java.util.concurrent.TimeoutException
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:290) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:137) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:78) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:193) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:97) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:151) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.doBindProducer(BindingService.java:268) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:243) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binding.BindableProxyFactory.createAndBindOutputs(BindableProxyFactory.java:287) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:58) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:na]
    at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:48) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:34) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:893) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:163) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:552) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:142) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:316) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at io.spring.dataflow.sample.usagedetailsender.UsageDetailSenderApplication.main(UsageDetailSenderApplication.java:10) ~[classes/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:567) ~[na:na]
    at com.intellij.rt.execution.application.AppMainV2.main(AppMainV2.java:131) ~[idea_rt.jar:na]
Caused by: java.util.concurrent.TimeoutException: null
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:108) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274) ~[kafka-clients-2.0.1.jar:na]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:323) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:299) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:281) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    ... 32 common frames omitted

共有1个答案

申屠健
2023-03-14

localhost:...上需要一个广告侦听器。

看这里。

这是来自cp-all-in-one在5.4.1-POST中的代理

  broker:
    image: ${REPOSITORY}/cp-server:${TAG}
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
 类似资料:
  • 如何使用Spring Cloud Stream Kafka Binder为生产者启用压缩(例如GZIP)?

  • 我有一个使用kafka活页夹的spring cloud stream应用程序,它可以消费和发送消息。在应用程序中,我使用重试策略配置自定义错误处理程序,并将不可重试的异常添加到处理程序中。配置示例: 但是我看到,如果异常抛出,比应用程序重试处理消息3次。预期行为-如果App. MyCustomException.class抛出,将不会重复消费消息。如何为Spring云流kafka绑定应用程序配置重

  • 我正在尝试通过SCSt频道构建并获取KTable。但这并不奏效。输入KTable没有数据,但如果我尝试查看KSTream聚合(toStream()),我可以看到一些数据。我明白了,KTable是不可查询的,它没有可查询的名称。 类别: 绑定: application.yml:

  • 我们如何使用Spring-Cloud-stream-binder-kinesis建立两个AWS kinesis连接? 第一个连接:Spring应用程序和AWS kinesis流在同一个AWS账户中。 第二个连接:其他AWS运动流位于不同的AWS帐户中。 从spring应用程序到不同AWS帐户中的两个不同运动流是否可能有两个不同的连接?如果是,我们如何实施?

  • 我们有一个要求,我们正在消费来自一个主题的消息,然后发生了一些丰富,然后我们将消息发布到另一个主题。以下是事件 使用者 - 使用消息 扩充 - 扩充使用的消息 制作人 - 已发布 向其他主题发送的丰富消息 我正在使用Spring cloud kafka binder,一切正常。突然,我们观察到生产者正在向主题发送重复的消息,然后我们使生产者是幂等的。为了更好地控制,我们将autocommitOff

  • 由于失去了与Azure EventHub的连接,我需要将spring-cloud-stream Kafka套接字配置为活动状态。根据推荐页面https://github.com/azure/azure-event-hubs-for-kafka/blob/master/configuration.md ,我需要将设置为true,但在spring-cloud-stream中找不到要设置的配置