我开始将我的spring云应用程序与docker compose文件中托管的kafka和zookeeper连接,它们是连接的,但当我运行应用程序并期望制作人发布消息时,什么也得不到。。。我不知道错误是什么,我遵循以下示例:https://github.com/ihuaylupo/manning-smia/tree/master/chapter10/licensing-service/src/main/java/com/optimagrowth/license/events
工作流是以github repo为例的,例如,当我调用和终结点发布服务时,我希望我的Kafka制作人发布主题消息,Kafka消费者使用该消息,但一切都正常-endpoint针对DB执行,除了Kafka生成和使用消息,甚至没有错误来知道我做错了什么。。。
Docker撰写文件:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafkaserver:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
- KAFKA_ADVERTISED_HOST_NAME=192.168.99.100 #kafka 192.168.99.100 #kafka - ip because i want to access kafka and zookeeper from outside of containers, i.e localhost
- KAFKA_ADVERTISED_PORT=9092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CREATE_TOPICS=dresses:1:1, ratings:1:1
- KAFKA_SOCKET_REQUEST_MAX_BYTES=2000000000
- KAFKA_HEAP_OPTS=-Xmx512M -Xmx5g
- listeners=PLAINTEXT://:9092
- advertised.listeners=PLAINTEXT://192.168.99.100:9092
volumes:
- "/var/run/docker.sock:/var/run/docker.sock"
depends_on:
- zookeeper
Kafka消费Spring云属性:
spring.cloud.stream.bindings.inboundOrgChanges.destination=orgChangeTopic
spring.cloud.stream.bindings.inboundOrgChanges.content-type=application/json
spring.cloud.stream.bindings.inboundOrgChanges.group=studentsGroup
spring.cloud.stream.kafka.binder.brokers=localhost #kafka
spring.cloud.stream.kafka.binder.zkNodes=localhost
Kafka制作人spring cloud属性:
spring.cloud.stream.bindings.output.destination=orgChangeTopic
spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.kafka.binder.zkNodes=http://192.168.99.100:2181
spring.cloud.stream.kafka.binder.brokers=http://192.168.99.100:9092
Kafka制作人。。。
@EnableBinding(Source.class) // - on my main Spring app
@Component
public class SimpleSourceBean {
private Source source;
private static final Logger logger = LoggerFactory.getLogger(SimpleSourceBean.class);
@Autowired
public SimpleSourceBean(Source source){
this.source = source;
}
public void publishOrganizationChange(String action, String organizationId){
logger.debug("Sending Kafka message {} for Organization Id: {}", action, organizationId);
OrganizationChangeModel change = new OrganizationChangeModel(
OrganizationChangeModel.class.getTypeName(),
action,
organizationId,
UserContext.getCorrelationId());
source.output().send(MessageBuilder.withPayload(change).build());
}
}
@Getter @Setter @ToString
public class OrganizationChangeModel {
private String type;
private String action;
private String organizationId;
private String correlationId;
public OrganizationChangeModel(String type, String action, String organizationId, String correlationId) {
super();
this.type = type;
this.action = action;
this.organizationId = organizationId;
this.correlationId = correlationId;
}
}
@Service class ServiceEx {
@Autowired
SimpleSourceBean simpleSourceBean;
public Organization findById(String organizationId) {
Optional<Organization> opt = repository.findById(organizationId);
simpleSourceBean.publishOrganizationChange("GET", organizationId);
return (opt.isPresent()) ? opt.get() : null;
} //wont do anything }
已编辑:Docker撰写文件:
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
networks:
- spring-cloud-network
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
- KAFKA_ADVERTISED_HOST_NAME=kafka
- KAFKA_ADVERTISED_PORT=9092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CREATE_TOPICS=dresses:1:1, ratings:1:1
- KAFKA_SOCKET_REQUEST_MAX_BYTES=2000000000
- KAFKA_HEAP_OPTS=-Xmx512M -Xmx5g
volumes:
- "/var/run/docker.sock:/var/run/docker.sock"
depends_on:
- zookeeper
networks:
- spring-cloud-network
facultate:
container_name: facultate
build: C:\Users\marius\com.balabasciuc.springmicroservicesinaction\facultateservice
restart: on-failure
ports:
- "1002:1002"
environment:
SPRING_CLOUD_CONFIG_URI: "http://config:7070"
EUREKA_HOST: server
EUREKA_PORT: 9001
DATABASE_HOST: database
# SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS: kafka:9092
depends_on:
- kafka
- zookeeper
- server
networks:
- spring-cloud-network
volumes:
simple:
driver: local
networks:
spring-cloud-network:
driver: bridge
Spring Cloud producer道具:
spring.cloud.stream.bindings.output.destination=orgChangeTopic
spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.kafka.binder.brokers=kafka
spring.cloud.stream.kafka.binder.zkNodes=zookeeper
Docker日志:
facultate | 2022-02-01 11:15:47.246 INFO 1 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.errorChannel' has 1 subscriber(s).
facultate | 2022-02-01 11:15:47.247 INFO 1 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
facultate | 2022-02-01 11:15:47.251 INFO 1 --- [ main] o.s.c.s.binder.DefaultBinderFactory : Creating binder: kafka
facultate | 2022-02-01 11:15:47.876 INFO 1 --- [ main] o.s.c.s.binder.DefaultBinderFactory : Caching the binder: kafka
facultate | 2022-02-01 11:15:47.876 INFO 1 --- [ main] o.s.c.s.binder.DefaultBinderFactory : Retrieving cached binder: kafka
facultate | 2022-02-01 11:15:48.138 INFO 1 --- [ main] o.s.c.s.b.k.p.KafkaTopicProvisioner : Using kafka topic for outbound: orgChangeTopic
facultate | 2022-02-01 11:15:48.150 INFO 1 --- [ main] o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
facultate | bootstrap.servers = [kafka:9092]
facultate | client.dns.lookup = use_all_dns_ips
facultate | client.id =
facultate | connections.max.idle.ms = 300000
facultate | default.api.timeout.ms = 60000
facultate | metadata.max.age.ms = 300000
facultate | metric.reporters = []
facultate | metrics.num.samples = 2
facultate | metrics.recording.level = INFO
facultate | metrics.sample.window.ms = 30000
facultate | receive.buffer.bytes = 65536
facultate | reconnect.backoff.max.ms = 1000
facultate | reconnect.backoff.ms = 50
facultate | request.timeout.ms = 30000
facultate | retries = 2147483647
facultate | retry.backoff.ms = 100
facultate | sasl.client.callback.handler.class = null
facultate | sasl.jaas.config = null
facultate | sasl.kerberos.kinit.cmd = /usr/bin/kinit
facultate | sasl.kerberos.min.time.before.relogin = 60000
facultate | sasl.kerberos.service.name = null
facultate | sasl.kerberos.ticket.renew.jitter = 0.05
facultate | sasl.kerberos.ticket.renew.window.factor = 0.8
facultate | sasl.login.callback.handler.class = null
facultate | sasl.login.class = null
facultate | sasl.login.refresh.buffer.seconds = 300
facultate | sasl.login.refresh.min.period.seconds = 60
facultate | sasl.login.refresh.window.factor = 0.8
facultate | sasl.login.refresh.window.jitter = 0.05
facultate | sasl.mechanism = GSSAPI
facultate | security.protocol = PLAINTEXT
facultate | security.providers = null
facultate | send.buffer.bytes = 131072
facultate | socket.connection.setup.timeout.max.ms = 30000
facultate | socket.connection.setup.timeout.ms = 10000
facultate | ssl.cipher.suites = null
facultate | ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
facultate | ssl.endpoint.identification.algorithm = https
facultate | ssl.engine.factory.class = null
facultate | ssl.key.password = null
facultate | ssl.keymanager.algorithm = SunX509
facultate | ssl.keystore.certificate.chain = null
facultate | ssl.keystore.key = null
facultate | ssl.keystore.location = null
facultate | ssl.keystore.password = null
facultate | ssl.keystore.type = JKS
facultate | ssl.protocol = TLSv1.3
facultate | ssl.provider = null
facultate | ssl.secure.random.implementation = null
facultate | ssl.trustmanager.algorithm = PKIX
facultate | ssl.truststore.certificates = null
facultate | ssl.truststore.location = null
facultate | ssl.truststore.password = null
facultate | ssl.truststore.type = JKS
facultate |
facultate | 2022-02-01 11:15:48.614 INFO 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.0.0
facultate | 2022-02-01 11:15:48.618 INFO 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 8cb0a5e9d3441962
facultate | 2022-02-01 11:15:48.619 INFO 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1643714148612
facultate | 2022-02-01 11:15:53.683 INFO 1 --- [| adminclient-1] o.a.kafka.common.utils.AppInfoParser : App info kafka.admin.client for adminclient-1 unregistered
facultate | 2022-02-01 11:15:53.767 INFO 1 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics : Metrics scheduler closed
facultate | 2022-02-01 11:15:53.775 INFO 1 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics : Closing reporter org.apache.kafka.common.metrics.JmxReporter
facultate | 2022-02-01 11:15:53.775 INFO 1 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics : Metrics reporters closed
facultate | 2022-02-01 11:15:53.899 INFO 1 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
facultate | acks = 1
facultate | batch.size = 16384
facultate | bootstrap.servers = [kafka:9092]
facultate | buffer.memory = 33554432
facultate | client.dns.lookup = use_all_dns_ips
facultate | client.id = producer-1
facultate | compression.type = none
facultate | connections.max.idle.ms = 540000
facultate | delivery.timeout.ms = 120000
facultate | enable.idempotence = true
facultate | interceptor.classes = []
facultate | key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
facultate | linger.ms = 0
facultate | max.block.ms = 60000
facultate | max.in.flight.requests.per.connection = 5
facultate | max.request.size = 1048576
facultate | metadata.max.age.ms = 300000
facultate | metadata.max.idle.ms = 300000
facultate | metric.reporters = []
facultate | metrics.num.samples = 2
facultate | metrics.recording.level = INFO
facultate | metrics.sample.window.ms = 30000
facultate | partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
facultate | receive.buffer.bytes = 32768
facultate | reconnect.backoff.max.ms = 1000
facultate | reconnect.backoff.ms = 50
facultate | request.timeout.ms = 30000
facultate | retries = 2147483647
facultate | retry.backoff.ms = 100
facultate | sasl.client.callback.handler.class = null
facultate | sasl.jaas.config = null
facultate | sasl.kerberos.kinit.cmd = /usr/bin/kinit
facultate | sasl.kerberos.min.time.before.relogin = 60000
facultate | sasl.kerberos.service.name = null
facultate | sasl.kerberos.ticket.renew.jitter = 0.05
facultate | sasl.kerberos.ticket.renew.window.factor = 0.8
facultate | sasl.login.callback.handler.class = null
facultate | sasl.login.class = null
facultate | sasl.login.refresh.buffer.seconds = 300
facultate | sasl.login.refresh.min.period.seconds = 60
facultate | sasl.login.refresh.window.factor = 0.8
facultate | sasl.login.refresh.window.jitter = 0.05
facultate | sasl.mechanism = GSSAPI
facultate | security.protocol = PLAINTEXT
facultate | security.providers = null
facultate | send.buffer.bytes = 131072
facultate | socket.connection.setup.timeout.max.ms = 30000
facultate | socket.connection.setup.timeout.ms = 10000
facultate | ssl.cipher.suites = null
facultate | ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
facultate | ssl.endpoint.identification.algorithm = https
facultate | ssl.engine.factory.class = null
facultate | ssl.key.password = null
facultate | ssl.keymanager.algorithm = SunX509
facultate | ssl.keystore.certificate.chain = null
facultate | ssl.keystore.key = null
facultate | ssl.keystore.location = null
facultate | ssl.keystore.password = null
facultate | ssl.keystore.type = JKS
facultate | ssl.protocol = TLSv1.3
facultate | ssl.provider = null
facultate | ssl.secure.random.implementation = null
facultate | ssl.trustmanager.algorithm = PKIX
facultate | ssl.truststore.certificates = null
facultate | ssl.truststore.location = null
facultate | ssl.truststore.password = null
facultate | ssl.truststore.type = JKS
facultate | transaction.timeout.ms = 60000
facultate | transactional.id = null
facultate | value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
facultate |
facultate | 2022-02-01 11:15:54.147 INFO 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.0.0
facultate | 2022-02-01 11:15:54.148 INFO 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 8cb0a5e9d3441962
facultate | 2022-02-01 11:15:54.148 INFO 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1643714154147
facultate | 2022-02-01 11:15:54.293 INFO 1 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: RK0OLlwdRKK-oQ5dsWuHBw
facultate | 2022-02-01 11:15:54.431 INFO 1 --- [ main] o.s.c.s.m.DirectWithAttributesChannel : Channel 'application-1.output' has 1 subscriber(s).
Kafka原木容器:
kafka | [2022-02-01 10:57:12,073] INFO [Partition dresses-0 broker=1001] Log loaded for partition dresses-0 with initial high watermark 0 (kafka.cluster.Partition)
zookeeper | 2022-02-01 10:57:09,689 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@596] - Got user-level KeeperException when processing sessionid:0x100001f44830000 type:multi cxid:0x4e zxid:0x31 txntype:-1 reqpath:n/a aborting remaining multi ops. Error Path:/admin/prefer
red_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election
kafka | [2022-02-01 11:15:52,573] INFO Creating topic orgChangeTopic with configuration {} and initial partition assignment HashMap(0 -> ArrayBuffer(1001)) (kafka.zk.AdminZkClient)
kafka | [2022-02-01 11:15:53,447] INFO [ReplicaFetcherManager on broker 1001] Removed fetcher for partitions Set(orgChangeTopic-0) (kafka.server.ReplicaFetcherManager)
kafka | [2022-02-01 11:15:53,484] INFO [Log partition=orgChangeTopic-0, dir=/kafka/kafka-logs-bde9c032b736] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
kafka | [2022-02-01 11:15:53,488] INFO Created log for partition orgChangeTopic-0 in /kafka/kafka-logs-bde9c032b736/orgChangeTopic-0 with properties {} (kafka.log.LogManager)
kafka | [2022-02-01 11:15:53,505] INFO [Partition orgChangeTopic-0 broker=1001] No checkpointed highwatermark is found for partition orgChangeTopic-0 (kafka.cluster.Partition)
如果我发布一些东西来生成一条为Kafka制作的消息,那么日志中不会出现任何内容。。。或者在Kafka
基于注释的编程模型(例如EnableBind
、Source
、Sink
等)在Spring Cloud Stream中已弃用,并将在即将到来的框架4.0行中完全删除。
可以使用功能样式添加源,如下所示:
public Supplier< OrganizationChangeModel> source() {
return () -> {
//return OrganizationChangeModel here.
}
}
在这种情况下,绑定名称变为source-out-0
,因为供应商方法被命名为source
。
然而,在您的情况下,由于您希望使用RESTendpoint以编程方式发布,因此我建议您使用StreamBridge API来实现这一点。有关这方面的更多详细信息,请参阅StreamBridge文档。基本思想是使用StreamBridge的send方法通过输出绑定发布数据。
Spring Cloud Stream示例存储库中的许多示例都使用这个版本的dockerated Kafka。您可能想比较它是如何在那里设置的。
我想使用spring cloud stream framework创建一个kafkaendpoint,它将有一个http post api到。如何动态更改属性 我可以使用实现来实现上述功能,但不知道是否有可能在Spring中开发此功能。
我正在探索反应性Kafka,只是想确认反应性Kafka是否等同于同步制作人。与同步生产者,我们得到消息传递保证与确认字符和生产者序列保持。但是,ASYNC不能保证交付和测序。反应式生产者等同于SYNC还是ASYNC?
我无法向Kafka主题发布消息,无法得到Kafka制作人的任何回应,它完全卡住了应用程序 Kafka生产者服务代码 2021-05-30 13:29:13.209[0;39M[32M信息[0;39M[35M2472[0;39M[2M---[0;39M[2M[nio-8084-exec-2][0;39M[36MO.apache.coyote.http11.HTTP11Processor[0;39M[
我正在处理xml,我需要每条记录发送一条消息,当我收到最后一条记录时,我关闭了kafka生产者,这里的问题是kafka生产者的发送方法是异步的,因此,有时当我关闭生产者时,它会拖曳我在某个地方读到过,我可以让制片人敞开心扉。我的问题是:这意味着什么,或者是否有更好的解决方案。 -编辑- 想象以下场景: 我们阅读标签并创建kafka生产者 对于每个元素,我们读取其属性,生成一个json对象并使用se
我知道Kafka制作人会将消息分批处理。每个批属于一个特定的分区。 我的问题是 生产者是否知道每个批次属于哪个分区? 生产者是否知道每个分区的代理地址? 当生产者发送请求时,每个请求包含多个批次还是只包含一个属于目标分区的批次。 如果生产者发送多个批次,接收kafka服务器是否将批次重传到目标分区。
我正在使用Spring和Spring Kafka编写一个小型PoC。我的目标是让生产者和消费者都从这个主题中写作(例如阅读)。 我有一个奇怪的情况: 制作人正确地制作了这些记录(我可以通过Python脚本使用它们) 下面是我的代码——它与文档示例非常相似。更准确地说,问题来自这样一个事实,即KafkanConsumerConfiguration中的bean不是由Spring创建的(也就是说,构建它