当前位置: 首页 > 知识库问答 >
问题:

阿尔帕卡消费者不使用通过 Docker 撰写运行的Kafka的消息

翟博雅
2023-03-14

我已经通过Docker compose运行了Kafka和Zookeeper。我可以使用Kafka终端向主题发送/使用消息,也可以通过Conduktor监控所有内容。但不幸的是,我无法通过使用阿尔帕卡连接器的Scala应用程序使用MSG。该应用程序连接到主题,但每当我向主题发送MSG时,都没有任何反应。

只有Kafka和动物园管理员正在通过码头工人组成运行。我直接在主机上运行 Scala 消费者应用程序。

泊坞窗撰写

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    depends_on:
      - zookeeper

斯卡拉应用程序

object Main extends App {
  implicit val actorSystem = ActorSystem()

  import actorSystem.dispatcher

  val kafkaConsumerSettings = ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer)
    .withGroupId("new_id")
    .withCommitRefreshInterval(1.seconds)
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    .withBootstrapServers("localhost:9092")

  Consumer
    .plainSource(kafkaConsumerSettings, Subscriptions.topics("test1"))
    .map(msg => msg.value())
    .runWith(Sink.foreach(println)).onComplete {
    case Failure(exception) => exception.printStackTrace()
    case Success(value) => println("done")
  }
}

应用程序-控制台输出

16:58:33.877 INFO  [akka.event.slf4j.Slf4jLogger]                     Slf4jLogger started
16:58:34.470 INFO  [akka.kafka.internal.SingleSourceLogic]            [1955f] Starting. StageActor Actor[akka://default/system/Materializers/StreamSupervisor-0/$$a#-591284224]
16:58:34.516 INFO  [org.apache.kafka.clients.consumer.ConsumerConfig] ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.dns.lookup = default
    client.id = 
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = novo_id
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

16:58:34.701 INFO  [org.apache.kafka.common.utils.AppInfoParser]      Kafka version: 2.4.0
16:58:34.702 INFO  [org.apache.kafka.common.utils.AppInfoParser]      Kafka commitId: 77a89fcf8d7fa018
16:58:34.702 INFO  [org.apache.kafka.common.utils.AppInfoParser]      Kafka startTimeMs: 1585256314699
16:58:34.715 INFO  [org.apache.kafka.clients.consumer.KafkaConsumer]  [Consumer clientId=consumer-novo_id-1, groupId=novo_id] Subscribed to topic(s): test1
16:58:35.308 INFO  [org.apache.kafka.clients.Metadata]                [Consumer clientId=consumer-novo_id-1, groupId=novo_id] Cluster ID: c2XBuDIJTI-gBs9guTvG

共有1个答案

弘靖琪
2023-03-14

导出< code > KAFKA _ ADVERTISED _ LISTENERS

描述如何公布主机名,以及客户端如何能够访问该主机名。该值被发布到ZooKeeper供客户端使用。

如果使用 SSLSASL 协议,则终结点值必须以下列格式指定协议:

>

  • SSL:SSL://SASL_SSL://<-code>

    SASL:< code > SASL _明文://或< code>SASL_SSL://

    KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:29092
    

    现在,您的使用者可以使用端口 29092

    .withBootstrapServers("localhost:29092")
    

  •  类似资料:
    • 但是,consumer只从主题中第一个未提交的消息开始轮询。我希望总是从偏移量0开始,不管提交的消息是什么。使用Alpakka消费者,如何手动指定偏移量?

    • 我做了一个poc,其中我使用spark流从Kafka读取数据。但我们的组织要么使用ApacheFlink,要么使用Kafka消费者从ApacheKafka读取数据,作为标准流程。所以我需要用Kafka消费者或ApacheFlink替换Kafka流媒体。在我的应用程序用例中,我需要从kafka读取数据,过滤json数据并将字段放入cassandra中,因此建议使用kafka consumer而不是f

    • 我知道什么是生产者和消费者。但官方文件显示 < li >它是流媒体平台。 < li >它是企业消息系统。 < li>Kafka具有从数据库和其他系统导入和导出数据的连接器。 这是什么意思? 我知道生产者是向Kafka Broker发送数据的客户端应用程序,消费者也是从Kafka Broker读取数据的客户端应用程序。 但我的问题是,消费者可以将数据推送到Kafka Broker吗? 据我所知,我认

    • 我对骆驼生产商有很好的了解,但我不能对各种骆驼消费者保持清醒的头脑。特别是事件驱动消费者和轮询消费者,camel如何知道为这些消费者调用回调? 消费者的一般流量是多少?

    • 我正在尝试让 kafka 消费者获取在 Java 中生成并发布到主题的消息。我的消费者如下。 consumer.java 当我运行上面的代码时,我在控制台中什么也看不到,屏幕后面的java producer程序正在‘AATest’主题下不断地发布数据。另外,在动物园管理员控制台中,当我尝试运行上面的consumer.java时,我得到了以下行 此外,当我运行指向 AATest 主题的单独控制台使用

    • D: \软件\Kafka\Kafka2.10-0.10.0.1\bin\windows 我使用上面的命令来消费消息,有什么我错过的吗?帮助我: 这个 那些是生产者和消费者......