当前位置: 首页 > 知识库问答 >
问题:

Kafka制作人Spring云应用程序不会发布任何内容

通迪
2023-03-14

我开始将我的spring云应用程序与docker compose文件中托管的kafka和zookeeper连接,它们是连接的,但当我运行应用程序并期望制作人发布消息时,什么也得不到。。。我不知道错误是什么,我遵循以下示例:https://github.com/ihuaylupo/manning-smia/tree/master/chapter10/licensing-service/src/main/java/com/optimagrowth/license/events

工作流是以github repo为例的,例如,当我调用和终结点发布服务时,我希望我的Kafka制作人发布主题消息,Kafka消费者使用该消息,但一切都正常-endpoint针对DB执行,除了Kafka生成和使用消息,甚至没有错误来知道我做错了什么。。。

Docker撰写文件:

 zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    

  kafkaserver:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      - KAFKA_ADVERTISED_HOST_NAME=192.168.99.100 #kafka 192.168.99.100 #kafka - ip because i want to access kafka and zookeeper from outside of containers, i.e localhost
      - KAFKA_ADVERTISED_PORT=9092
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CREATE_TOPICS=dresses:1:1, ratings:1:1
      - KAFKA_SOCKET_REQUEST_MAX_BYTES=2000000000
      - KAFKA_HEAP_OPTS=-Xmx512M -Xmx5g
      - listeners=PLAINTEXT://:9092
      - advertised.listeners=PLAINTEXT://192.168.99.100:9092
    volumes:
      - "/var/run/docker.sock:/var/run/docker.sock"
    depends_on:
      - zookeeper

Kafka消费Spring云属性:

spring.cloud.stream.bindings.inboundOrgChanges.destination=orgChangeTopic
spring.cloud.stream.bindings.inboundOrgChanges.content-type=application/json
spring.cloud.stream.bindings.inboundOrgChanges.group=studentsGroup
spring.cloud.stream.kafka.binder.brokers=localhost #kafka
spring.cloud.stream.kafka.binder.zkNodes=localhost

Kafka制作人spring cloud属性:

spring.cloud.stream.bindings.output.destination=orgChangeTopic
spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.kafka.binder.zkNodes=http://192.168.99.100:2181
spring.cloud.stream.kafka.binder.brokers=http://192.168.99.100:9092

Kafka制作人。。。

@EnableBinding(Source.class) // - on my main Spring app
@Component
public class SimpleSourceBean {
    private Source source;

    private static final Logger logger = LoggerFactory.getLogger(SimpleSourceBean.class);

    @Autowired
    public SimpleSourceBean(Source source){
        this.source = source;
    }

    public void publishOrganizationChange(String action, String organizationId){
       logger.debug("Sending Kafka message {} for Organization Id: {}", action, organizationId);
        OrganizationChangeModel change =  new OrganizationChangeModel(
                OrganizationChangeModel.class.getTypeName(),
                action,
                organizationId,
                UserContext.getCorrelationId());

        source.output().send(MessageBuilder.withPayload(change).build());
    }
}
@Getter @Setter @ToString
public class OrganizationChangeModel {
    private String type;
    private String action;
    private String organizationId;
    private String correlationId;

    public OrganizationChangeModel(String type, String action, String organizationId, String correlationId) {
        super();
        this.type = type;
        this.action = action;
        this.organizationId = organizationId;
        this.correlationId = correlationId;
    }
}

@Service class ServiceEx {
    @Autowired
    SimpleSourceBean simpleSourceBean;

    public Organization findById(String organizationId) {
        Optional<Organization> opt = repository.findById(organizationId);
        simpleSourceBean.publishOrganizationChange("GET", organizationId);
        return (opt.isPresent()) ? opt.get() : null;
    }   //wont do anything }

已编辑:Docker撰写文件:

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    networks:
      - spring-cloud-network

  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      - KAFKA_ADVERTISED_HOST_NAME=kafka
      - KAFKA_ADVERTISED_PORT=9092
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CREATE_TOPICS=dresses:1:1, ratings:1:1
      - KAFKA_SOCKET_REQUEST_MAX_BYTES=2000000000
      - KAFKA_HEAP_OPTS=-Xmx512M -Xmx5g
    volumes:
      - "/var/run/docker.sock:/var/run/docker.sock"
    depends_on:
      - zookeeper
    networks:
      - spring-cloud-network

  facultate:
    container_name: facultate
    build: C:\Users\marius\com.balabasciuc.springmicroservicesinaction\facultateservice
    restart: on-failure
    ports:
      - "1002:1002"
    environment:
      SPRING_CLOUD_CONFIG_URI: "http://config:7070"
      EUREKA_HOST: server
      EUREKA_PORT: 9001
      DATABASE_HOST: database
     # SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS: kafka:9092
    depends_on:
      - kafka
      - zookeeper
      - server
    networks:
      - spring-cloud-network
volumes:
  simple:
    driver: local
networks:
  spring-cloud-network:
    driver: bridge

Spring Cloud producer道具:

spring.cloud.stream.bindings.output.destination=orgChangeTopic
spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.kafka.binder.brokers=kafka
spring.cloud.stream.kafka.binder.zkNodes=zookeeper

Docker日志:

facultate    | 2022-02-01 11:15:47.246  INFO 1 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application-1.errorChannel' has 1 subscriber(s).
facultate    | 2022-02-01 11:15:47.247  INFO 1 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
facultate    | 2022-02-01 11:15:47.251  INFO 1 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Creating binder: kafka
facultate    | 2022-02-01 11:15:47.876  INFO 1 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Caching the binder: kafka
facultate    | 2022-02-01 11:15:47.876  INFO 1 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Retrieving cached binder: kafka
facultate    | 2022-02-01 11:15:48.138  INFO 1 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : Using kafka topic for outbound: orgChangeTopic
facultate    | 2022-02-01 11:15:48.150  INFO 1 --- [           main] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values:
facultate    |  bootstrap.servers = [kafka:9092]
facultate    |  client.dns.lookup = use_all_dns_ips
facultate    |  client.id =
facultate    |  connections.max.idle.ms = 300000
facultate    |  default.api.timeout.ms = 60000
facultate    |  metadata.max.age.ms = 300000
facultate    |  metric.reporters = []
facultate    |  metrics.num.samples = 2
facultate    |  metrics.recording.level = INFO
facultate    |  metrics.sample.window.ms = 30000
facultate    |  receive.buffer.bytes = 65536
facultate    |  reconnect.backoff.max.ms = 1000
facultate    |  reconnect.backoff.ms = 50
facultate    |  request.timeout.ms = 30000
facultate    |  retries = 2147483647
facultate    |  retry.backoff.ms = 100
facultate    |  sasl.client.callback.handler.class = null
facultate    |  sasl.jaas.config = null
facultate    |  sasl.kerberos.kinit.cmd = /usr/bin/kinit
facultate    |  sasl.kerberos.min.time.before.relogin = 60000
facultate    |  sasl.kerberos.service.name = null
facultate    |  sasl.kerberos.ticket.renew.jitter = 0.05
facultate    |  sasl.kerberos.ticket.renew.window.factor = 0.8
facultate    |  sasl.login.callback.handler.class = null
facultate    |  sasl.login.class = null
facultate    |  sasl.login.refresh.buffer.seconds = 300
facultate    |  sasl.login.refresh.min.period.seconds = 60
facultate    |  sasl.login.refresh.window.factor = 0.8
facultate    |  sasl.login.refresh.window.jitter = 0.05
facultate    |  sasl.mechanism = GSSAPI
facultate    |  security.protocol = PLAINTEXT
facultate    |  security.providers = null
facultate    |  send.buffer.bytes = 131072
facultate    |  socket.connection.setup.timeout.max.ms = 30000
facultate    |  socket.connection.setup.timeout.ms = 10000
facultate    |  ssl.cipher.suites = null
facultate    |  ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
facultate    |  ssl.endpoint.identification.algorithm = https
facultate    |  ssl.engine.factory.class = null
facultate    |  ssl.key.password = null
facultate    |  ssl.keymanager.algorithm = SunX509
facultate    |  ssl.keystore.certificate.chain = null
facultate    |  ssl.keystore.key = null
facultate    |  ssl.keystore.location = null
facultate    |  ssl.keystore.password = null
facultate    |  ssl.keystore.type = JKS
facultate    |  ssl.protocol = TLSv1.3
facultate    |  ssl.provider = null
facultate    |  ssl.secure.random.implementation = null
facultate    |  ssl.trustmanager.algorithm = PKIX
facultate    |  ssl.truststore.certificates = null
facultate    |  ssl.truststore.location = null
facultate    |  ssl.truststore.password = null
facultate    |  ssl.truststore.type = JKS
facultate    |
facultate    | 2022-02-01 11:15:48.614  INFO 1 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.0.0
facultate    | 2022-02-01 11:15:48.618  INFO 1 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 8cb0a5e9d3441962
facultate    | 2022-02-01 11:15:48.619  INFO 1 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1643714148612
facultate    | 2022-02-01 11:15:53.683  INFO 1 --- [| adminclient-1] o.a.kafka.common.utils.AppInfoParser     : App info kafka.admin.client for adminclient-1 unregistered
facultate    | 2022-02-01 11:15:53.767  INFO 1 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics  : Metrics scheduler closed
facultate    | 2022-02-01 11:15:53.775  INFO 1 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics  : Closing reporter org.apache.kafka.common.metrics.JmxReporter
facultate    | 2022-02-01 11:15:53.775  INFO 1 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics  : Metrics reporters closed
facultate    | 2022-02-01 11:15:53.899  INFO 1 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values:
facultate    |  acks = 1
facultate    |  batch.size = 16384
facultate    |  bootstrap.servers = [kafka:9092]
facultate    |  buffer.memory = 33554432
facultate    |  client.dns.lookup = use_all_dns_ips
facultate    |  client.id = producer-1
facultate    |  compression.type = none
facultate    |  connections.max.idle.ms = 540000
facultate    |  delivery.timeout.ms = 120000
facultate    |  enable.idempotence = true
facultate    |  interceptor.classes = []
facultate    |  key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
facultate    |  linger.ms = 0
facultate    |  max.block.ms = 60000
facultate    |  max.in.flight.requests.per.connection = 5
facultate    |  max.request.size = 1048576
facultate    |  metadata.max.age.ms = 300000
facultate    |  metadata.max.idle.ms = 300000
facultate    |  metric.reporters = []
facultate    |  metrics.num.samples = 2
facultate    |  metrics.recording.level = INFO
facultate    |  metrics.sample.window.ms = 30000
facultate    |  partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
facultate    |  receive.buffer.bytes = 32768
facultate    |  reconnect.backoff.max.ms = 1000
facultate    |  reconnect.backoff.ms = 50
facultate    |  request.timeout.ms = 30000
facultate    |  retries = 2147483647
facultate    |  retry.backoff.ms = 100
facultate    |  sasl.client.callback.handler.class = null
facultate    |  sasl.jaas.config = null
facultate    |  sasl.kerberos.kinit.cmd = /usr/bin/kinit
facultate    |  sasl.kerberos.min.time.before.relogin = 60000
facultate    |  sasl.kerberos.service.name = null
facultate    |  sasl.kerberos.ticket.renew.jitter = 0.05
facultate    |  sasl.kerberos.ticket.renew.window.factor = 0.8
facultate    |  sasl.login.callback.handler.class = null
facultate    |  sasl.login.class = null
facultate    |  sasl.login.refresh.buffer.seconds = 300
facultate    |  sasl.login.refresh.min.period.seconds = 60
facultate    |  sasl.login.refresh.window.factor = 0.8
facultate    |  sasl.login.refresh.window.jitter = 0.05
facultate    |  sasl.mechanism = GSSAPI
facultate    |  security.protocol = PLAINTEXT
facultate    |  security.providers = null
facultate    |  send.buffer.bytes = 131072
facultate    |  socket.connection.setup.timeout.max.ms = 30000
facultate    |  socket.connection.setup.timeout.ms = 10000
facultate    |  ssl.cipher.suites = null
facultate    |  ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
facultate    |  ssl.endpoint.identification.algorithm = https
facultate    |  ssl.engine.factory.class = null
facultate    |  ssl.key.password = null
facultate    |  ssl.keymanager.algorithm = SunX509
facultate    |  ssl.keystore.certificate.chain = null
facultate    |  ssl.keystore.key = null
facultate    |  ssl.keystore.location = null
facultate    |  ssl.keystore.password = null
facultate    |  ssl.keystore.type = JKS
facultate    |  ssl.protocol = TLSv1.3
facultate    |  ssl.provider = null
facultate    |  ssl.secure.random.implementation = null
facultate    |  ssl.trustmanager.algorithm = PKIX
facultate    |  ssl.truststore.certificates = null
facultate    |  ssl.truststore.location = null
facultate    |  ssl.truststore.password = null
facultate    |  ssl.truststore.type = JKS
facultate    |  transaction.timeout.ms = 60000
facultate    |  transactional.id = null
facultate    |  value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
facultate    |
facultate    | 2022-02-01 11:15:54.147  INFO 1 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.0.0
facultate    | 2022-02-01 11:15:54.148  INFO 1 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 8cb0a5e9d3441962
facultate    | 2022-02-01 11:15:54.148  INFO 1 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1643714154147
facultate    | 2022-02-01 11:15:54.293  INFO 1 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: RK0OLlwdRKK-oQ5dsWuHBw
facultate    | 2022-02-01 11:15:54.431  INFO 1 --- [           main] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application-1.output' has 1 subscriber(s).

Kafka原木容器:

kafka        | [2022-02-01 10:57:12,073] INFO [Partition dresses-0 broker=1001] Log loaded for partition dresses-0 with initial high watermark 0 (kafka.cluster.Partition)
zookeeper    | 2022-02-01 10:57:09,689 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@596] - Got user-level KeeperException when processing sessionid:0x100001f44830000 type:multi cxid:0x4e zxid:0x31 txntype:-1 reqpath:n/a aborting remaining multi ops. Error Path:/admin/prefer
red_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election
kafka        | [2022-02-01 11:15:52,573] INFO Creating topic orgChangeTopic with configuration {} and initial partition assignment HashMap(0 -> ArrayBuffer(1001)) (kafka.zk.AdminZkClient)
kafka        | [2022-02-01 11:15:53,447] INFO [ReplicaFetcherManager on broker 1001] Removed fetcher for partitions Set(orgChangeTopic-0) (kafka.server.ReplicaFetcherManager)
kafka        | [2022-02-01 11:15:53,484] INFO [Log partition=orgChangeTopic-0, dir=/kafka/kafka-logs-bde9c032b736] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
kafka        | [2022-02-01 11:15:53,488] INFO Created log for partition orgChangeTopic-0 in /kafka/kafka-logs-bde9c032b736/orgChangeTopic-0 with properties {} (kafka.log.LogManager)
kafka        | [2022-02-01 11:15:53,505] INFO [Partition orgChangeTopic-0 broker=1001] No checkpointed highwatermark is found for partition orgChangeTopic-0 (kafka.cluster.Partition)

如果我发布一些东西来生成一条为Kafka制作的消息,那么日志中不会出现任何内容。。。或者在Kafka

共有1个答案

连曜灿
2023-03-14

基于注释的编程模型(例如EnableBindSourceSink等)在Spring Cloud Stream中已弃用,并将在即将到来的框架4.0行中完全删除。

可以使用功能样式添加源,如下所示:

public Supplier< OrganizationChangeModel> source() {
  return () -> {
    //return OrganizationChangeModel here. 
  }
}

在这种情况下,绑定名称变为source-out-0,因为供应商方法被命名为source

然而,在您的情况下,由于您希望使用RESTendpoint以编程方式发布,因此我建议您使用StreamBridge API来实现这一点。有关这方面的更多详细信息,请参阅StreamBridge文档。基本思想是使用StreamBridge的send方法通过输出绑定发布数据。

Spring Cloud Stream示例存储库中的许多示例都使用这个版本的dockerated Kafka。您可能想比较它是如何在那里设置的。

 类似资料:
  • 我想使用spring cloud stream framework创建一个kafkaendpoint,它将有一个http post api到。如何动态更改属性 我可以使用实现来实现上述功能,但不知道是否有可能在Spring中开发此功能。

  • 我正在探索反应性Kafka,只是想确认反应性Kafka是否等同于同步制作人。与同步生产者,我们得到消息传递保证与确认字符和生产者序列保持。但是,ASYNC不能保证交付和测序。反应式生产者等同于SYNC还是ASYNC?

  • 我无法向Kafka主题发布消息,无法得到Kafka制作人的任何回应,它完全卡住了应用程序 Kafka生产者服务代码 2021-05-30 13:29:13.209[0;39M[32M信息[0;39M[35M2472[0;39M[2M---[0;39M[2M[nio-8084-exec-2][0;39M[36MO.apache.coyote.http11.HTTP11Processor[0;39M[

  • 我正在处理xml,我需要每条记录发送一条消息,当我收到最后一条记录时,我关闭了kafka生产者,这里的问题是kafka生产者的发送方法是异步的,因此,有时当我关闭生产者时,它会拖曳我在某个地方读到过,我可以让制片人敞开心扉。我的问题是:这意味着什么,或者是否有更好的解决方案。 -编辑- 想象以下场景: 我们阅读标签并创建kafka生产者 对于每个元素,我们读取其属性,生成一个json对象并使用se

  • 我知道Kafka制作人会将消息分批处理。每个批属于一个特定的分区。 我的问题是 生产者是否知道每个批次属于哪个分区? 生产者是否知道每个分区的代理地址? 当生产者发送请求时,每个请求包含多个批次还是只包含一个属于目标分区的批次。 如果生产者发送多个批次,接收kafka服务器是否将批次重传到目标分区。

  • 我正在使用Spring和Spring Kafka编写一个小型PoC。我的目标是让生产者和消费者都从这个主题中写作(例如阅读)。 我有一个奇怪的情况: 制作人正确地制作了这些记录(我可以通过Python脚本使用它们) 下面是我的代码——它与文档示例非常相似。更准确地说,问题来自这样一个事实,即KafkanConsumerConfiguration中的bean不是由Spring创建的(也就是说,构建它