version: '3'
services:
kafka-0:
image: confluentinc/cp-kafka:5.2.1
container_name: kafka-0
restart: always
ports:
- "9094:9092"
expose:
- "9094"
environment:
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-0:9094
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2182
- KAFKA_ADVERTISED_HOST_NAME=kafka-0
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
depends_on:
- zookeeper
kafka-1:
image: confluentinc/cp-kafka:5.2.1
container_name: kafka-1
restart: always
ports:
- "9095:9093"
expose:
- "9095"
environment:
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-1:9095
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2182
- KAFKA_ADVERTISED_HOST_NAME=kafka-1
- KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
depends_on:
- zookeeper
zookeeper:
image: confluentinc/cp-zookeeper:5.3.1
container_name: zookeeper
ports:
- "2182:2181"
expose:
- "2182"
environment:
- ZOOKEEPER_CLIENT_PORT=2182
spring.cloud.stream.kafka.binder.brokers=127.0.0.1:9094,127.0.0.1:9095
@EnableScheduling
@EnableBinding(Source.class)
public class UsageDetailSender {
@Autowired
private Source source;
private String[] users = {"Glenn", "Sabby", "Mark", "Janne", "Ilaya"};
@Scheduled(fixedDelay = 1000)
public void sendEvents() {
UsageDetail usageDetail = new UsageDetail();
usageDetail.setUserId(this.users[new Random().nextInt(5)]);
usageDetail.setDuration(new Random().nextInt(300));
usageDetail.setData(new Random().nextInt(700));
this.source.output().send(MessageBuilder.withPayload(usageDetail).build());
}
}
bootstrap.servers = [127.0.0.1:9094, 127.0.0.1:9095]
client.id =
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
2020-04-15 20:32:20.303 INFO 10384 --- [| adminclient-1] o.a.k.c.a.i.AdminMetadataManager : [AdminClient clientId=adminclient-1] Metadata update failed
org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited.
2020-04-15 20:32:20.314 ERROR 10384 --- [ main] o.s.cloud.stream.binding.BindingService : Failed to create producer binding; retrying in 30 seconds
org.springframework.cloud.stream.provisioning.ProvisioningException: Provisioning exception; nested exception is java.util.concurrent.TimeoutException
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:290) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:137) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:78) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:193) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:97) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:151) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.cloud.stream.binding.BindingService.doBindProducer(BindingService.java:268) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:243) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.cloud.stream.binding.BindableProxyFactory.createAndBindOutputs(BindableProxyFactory.java:287) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:58) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:na]
at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:48) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:34) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:893) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:163) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:552) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:142) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:316) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at io.spring.dataflow.sample.usagedetailsender.UsageDetailSenderApplication.main(UsageDetailSenderApplication.java:10) ~[classes/:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:567) ~[na:na]
at com.intellij.rt.execution.application.AppMainV2.main(AppMainV2.java:131) ~[idea_rt.jar:na]
Caused by: java.util.concurrent.TimeoutException: null
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:108) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274) ~[kafka-clients-2.0.1.jar:na]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:323) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:299) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:281) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
... 32 common frames omitted
在localhost:...
上需要一个广告侦听器。
看这里。
这是来自cp-all-in-one在5.4.1-POST中的代理
broker:
image: ${REPOSITORY}/cp-server:${TAG}
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
如何使用Spring Cloud Stream Kafka Binder为生产者启用压缩(例如GZIP)?
我有一个使用kafka活页夹的spring cloud stream应用程序,它可以消费和发送消息。在应用程序中,我使用重试策略配置自定义错误处理程序,并将不可重试的异常添加到处理程序中。配置示例: 但是我看到,如果异常抛出,比应用程序重试处理消息3次。预期行为-如果App. MyCustomException.class抛出,将不会重复消费消息。如何为Spring云流kafka绑定应用程序配置重
我正在尝试通过SCSt频道构建并获取KTable。但这并不奏效。输入KTable没有数据,但如果我尝试查看KSTream聚合(toStream()),我可以看到一些数据。我明白了,KTable是不可查询的,它没有可查询的名称。 类别: 绑定: application.yml:
我们如何使用Spring-Cloud-stream-binder-kinesis建立两个AWS kinesis连接? 第一个连接:Spring应用程序和AWS kinesis流在同一个AWS账户中。 第二个连接:其他AWS运动流位于不同的AWS帐户中。 从spring应用程序到不同AWS帐户中的两个不同运动流是否可能有两个不同的连接?如果是,我们如何实施?
我们有一个要求,我们正在消费来自一个主题的消息,然后发生了一些丰富,然后我们将消息发布到另一个主题。以下是事件 使用者 - 使用消息 扩充 - 扩充使用的消息 制作人 - 已发布 向其他主题发送的丰富消息 我正在使用Spring cloud kafka binder,一切正常。突然,我们观察到生产者正在向主题发送重复的消息,然后我们使生产者是幂等的。为了更好地控制,我们将autocommitOff
由于失去了与Azure EventHub的连接,我需要将spring-cloud-stream Kafka套接字配置为活动状态。根据推荐页面https://github.com/azure/azure-event-hubs-for-kafka/blob/master/configuration.md ,我需要将设置为true,但在spring-cloud-stream中找不到要设置的配置