当前位置: 首页 > 知识库问答 >
问题:

无法阅读Kafka主题使用Kafka消费者?

胡利
2023-03-14

因为我是新的Kafka,所以我能够从文件中读取记录,并通过生产者将消息发送到Kafka主题,但不能通过消费者消费相同的主题。

注意:您可以从任何文本文件中读取数据,我使用的是Kafka2.11-0.9。0.0版本

这是我的密码:

package implementation;
import java.io.BufferedReader;
//import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
//import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;
//import java.io.OutputStreamWriter;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.log4j.BasicConfigurator;

import kafka.producer.ProducerConfig;

public class File_to_Kafka extends ProducerConfig{

    Properties configProperties;
    public File_to_Kafka(Properties originalProps) {
        super(originalProps);
        configProperties = originalProps;
        // TODO Auto-generated constructor stub
    }

    public String topicName = "temp"+Math.random();

    public String groupId = UUID.randomUUID().toString();           



        public void producerKafka(Properties configProperties) throws IOException
        {


            FileInputStream fis = new FileInputStream("/home/nick/Desktop/Database-Kafka-ElasticSearch/src/main/java/resources/properties.xml");

            configProperties.load(fis);
            System.out.println(configProperties);
            org.apache.kafka.clients.producer.Producer<String, String> producer = new KafkaProducer<String, String>(configProperties);

            File f1 = new File("/home/niket/Desktop/sample-example.txt");
            FileInputStream fis1 = new FileInputStream(f1);
            BufferedReader br1 = new BufferedReader(new InputStreamReader(fis1));
            String str = br1.readLine();
            //while(br1.readLine()!=null)
            while(str != null)
            {
                ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topicName, str);
                producer.send(rec);
                str = br1.readLine();
            }

            br1.close();
            fis.close();
            fis1.close();
            producer.close();

        }

        public void consumerKafka() throws InterruptedException
        {
            ConsumerThread consumerRunnable = new ConsumerThread(topicName, groupId);
            consumerRunnable.start();
            Thread.sleep(100);
            consumerRunnable.getKafkaConsumer().wakeup();
            System.out.println("Stopping consumer .....");
            consumerRunnable.join();
        }

        private static class ConsumerThread extends Thread{

            private String topicName;
            private String groupId;
            private KafkaConsumer<String, String> kafkaConsumer;


            public ConsumerThread(String topicName, String groupId2) {
                super();
                this.topicName = topicName;
                this.groupId = groupId2;
            }

            public void run()
            {
                Properties configProperties = new Properties();
                configProperties.put("bootstrap.servers","localhost:9092");
                configProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                configProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                configProperties.put("group.id", groupId);
                configProperties.put("CLIENT_ID_CONFIG", "simple");

                //Figure out where to tart processing messages from
                kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
                kafkaConsumer.subscribe(Arrays.asList(topicName));
                int count=0;



                //Start Processing Messages
                try {
                        while(true) {
                            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                            count = 0;
                            for (ConsumerRecord<String, String> record : records)
                            {
                                    System.out.println(record.value());
                                    count++;
                            }
                            kafkaConsumer.commitAsync();
                            if(count==records.count())
                                break;
                        }

                    }
                catch (WakeupException e) {
                    // TODO: handle exception
                    System.out.println("Exception caught : "+ e.getMessage());
                }
                finally {
                    kafkaConsumer.close();
                    System.out.println("After Closing KafkaConsumer");
                }

            }
            public KafkaConsumer<String,String> getKafkaConsumer(){
                 return this.kafkaConsumer;
              }


        }

        public static void main(String [] args) throws IOException, InterruptedException
        {
            BasicConfigurator.configure();
            Properties configProperties = new Properties();
            configProperties.put("bootstrap.servers", "localhost:9092");
            configProperties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            configProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            configProperties.put("metadata.broker.list", "localhost:9092");
            File_to_Kafka obj = new File_to_Kafka(configProperties);
            obj.producerKafka(configProperties);
            obj.consumerKafka();

        }

}

下面是输出:

0 [main] INFO kafka.utils.VerifiableProperties  - Verifying properties
61 [main] WARN kafka.utils.VerifiableProperties  - Property bootstrap.servers is not valid
62 [main] WARN kafka.utils.VerifiableProperties  - Property key.serializer is not valid
62 [main] INFO kafka.utils.VerifiableProperties  - Property metadata.broker.list is overridden to localhost:9092
62 [main] WARN kafka.utils.VerifiableProperties  - Property value.serializer is not valid
{<name>BOOTSTRAP_SERVERS_CONFIG=(bootstrap.servers)</name>, key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer, <value>org.apache.kafka.common.serialization.StringSerializer</value>=, <name>KEY_SERIALIZER_CLASS_CONFIG</name>=, metadata.broker.list=localhost:9092, <configuration>=, <?xml=version="1.0"?>, <name>KEY_DESERIALIZER_CLASS_CONFIG</name>=, <property>=, <value>org.apache.kafka.common.serialization.StringDeserializer</value>=, <value>localhost=9092</value>, bootstrap.servers=localhost:9092, <name>VALUE_DESERIALIZER_CLASS_CONFIG</name>=, <value>org.apache.kafka.common.serialization.ByteArraySerializer</value>=, </property>=, value.serializer=org.apache.kafka.common.serialization.StringSerializer, </configuration>=, <name>VALUE_SERIALIZER_CLASS_CONFIG</name>=}
86 [main] INFO org.apache.kafka.clients.producer.ProducerConfig  - ProducerConfig values: 
    compression.type = none
    metric.reporters = []
    metadata.max.age.ms = 300000
    metadata.fetch.timeout.ms = 60000
    reconnect.backoff.ms = 50
    sasl.kerberos.ticket.renew.window.factor = 0.8
    bootstrap.servers = [localhost:9092]
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    buffer.memory = 33554432
    timeout.ms = 30000
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.keystore.type = JKS
    ssl.trustmanager.algorithm = PKIX
    block.on.buffer.full = false
    ssl.key.password = null
    max.block.ms = 60000
    sasl.kerberos.min.time.before.relogin = 60000
    connections.max.idle.ms = 540000
    ssl.truststore.password = null
    max.in.flight.requests.per.connection = 5
    metrics.num.samples = 2
    client.id = 
    ssl.endpoint.identification.algorithm = null
    ssl.protocol = TLS
    request.timeout.ms = 30000
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    acks = 1
    batch.size = 16384
    ssl.keystore.location = null
    receive.buffer.bytes = 32768
    ssl.cipher.suites = null
    ssl.truststore.type = JKS
    security.protocol = PLAINTEXT
    retries = 0
    max.request.size = 1048576
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer
    ssl.truststore.location = null
    ssl.keystore.password = null
    ssl.keymanager.algorithm = SunX509
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    send.buffer.bytes = 131072
    linger.ms = 0

93 [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name bufferpool-wait-time
96 [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name buffer-exhausted-records
99 [main] DEBUG org.apache.kafka.clients.Metadata  - Updated cluster metadata version 1 to Cluster(nodes = [Node(-1, localhost, 9092)], partitions = [])
116 [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name connections-closed:client-id-producer-1
116 [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name connections-created:client-id-producer-1
116 [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name bytes-sent-received:client-id-producer-1
116 [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name bytes-sent:client-id-producer-1
117 [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name bytes-received:client-id-producer-1
117 [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name select-time:client-id-producer-1
118 [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name io-time:client-id-producer-1
122 [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name batch-size
123 [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name compression-rate
123 [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name queue-time
123 [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name request-time
124 [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name produce-throttle-time
124 [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name records-per-request
125 [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name record-retries
125 [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name errors
125 [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name record-size-max
126 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - Starting Kafka producer I/O thread.
126 [main] WARN org.apache.kafka.clients.producer.ProducerConfig  - The configuration <value>localhost = 9092</value> was supplied but isn't a known config.
126 [main] WARN org.apache.kafka.clients.producer.ProducerConfig  - The configuration </configuration> =  was supplied but isn't a known config.
126 [main] WARN org.apache.kafka.clients.producer.ProducerConfig  - The configuration <property> =  was supplied but isn't a known config.
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig  - The configuration <value>org.apache.kafka.common.serialization.StringDeserializer</value> =  was supplied but isn't a known config.
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig  - The configuration <name>VALUE_DESERIALIZER_CLASS_CONFIG</name> =  was supplied but isn't a known config.
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig  - The configuration <value>org.apache.kafka.common.serialization.StringSerializer</value> =  was supplied but isn't a known config.
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig  - The configuration <configuration> =  was supplied but isn't a known config.
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig  - The configuration <value>org.apache.kafka.common.serialization.ByteArraySerializer</value> =  was supplied but isn't a known config.
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig  - The configuration <name>KEY_SERIALIZER_CLASS_CONFIG</name> =  was supplied but isn't a known config.
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig  - The configuration <name>BOOTSTRAP_SERVERS_CONFIG = (bootstrap.servers)</name> was supplied but isn't a known config.
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig  - The configuration </property> =  was supplied but isn't a known config.
129 [main] WARN org.apache.kafka.clients.producer.ProducerConfig  - The configuration <name>VALUE_SERIALIZER_CLASS_CONFIG</name> =  was supplied but isn't a known config.
129 [main] WARN org.apache.kafka.clients.producer.ProducerConfig  - The configuration <name>KEY_DESERIALIZER_CLASS_CONFIG</name> =  was supplied but isn't a known config.
129 [main] WARN org.apache.kafka.clients.producer.ProducerConfig  - The configuration metadata.broker.list = localhost:9092 was supplied but isn't a known config.
129 [main] WARN org.apache.kafka.clients.producer.ProducerConfig  - The configuration <?xml = version="1.0"?> was supplied but isn't a known config.
130 [main] INFO org.apache.kafka.common.utils.AppInfoParser  - Kafka version : 0.9.0.0
131 [main] INFO org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId : fc7243c2af4b2b4a
131 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer  - Kafka producer started
199 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient  - Initialize connection to node -1 for sending metadata request
199 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient  - Initiating connection to node -1 at localhost:9092.
254 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node--1.bytes-sent
255 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node--1.bytes-received
255 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node--1.latency
255 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient  - Completed connection to node -1
267 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient  - Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,client_id=producer-1}, body={topics=[temp0.8655521798253616]}), isInitiatedByNetworkClient, createdTimeMs=1513840470088, sendTimeMs=0) to node -1
502 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient  - Error while fetching metadata with correlation id 0 : {temp0.8655521798253616=LEADER_NOT_AVAILABLE}
502 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata  - Updated cluster metadata version 2 to Cluster(nodes = [Node(0, niket-Lenovo-Y50-70, 9092)], partitions = [])
502 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient  - Adding node 0 to nodes ever seen
599 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient  - Initialize connection to node 0 for sending metadata request
599 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient  - Initiating connection to node 0 at niket-Lenovo-Y50-70:9092.
599 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node-0.bytes-sent
600 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node-0.bytes-received
600 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node-0.latency
600 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient  - Completed connection to node 0
600 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient  - Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=1,client_id=producer-1}, body={topics=[temp0.8655521798253616]}), isInitiatedByNetworkClient, createdTimeMs=1513840470433, sendTimeMs=0) to node 0
611 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata  - Updated cluster metadata version 3 to Cluster(nodes = [Node(0, niket-Lenovo-Y50-70, 9092)], partitions = [Partition(topic = temp0.8655521798253616, partition = 0, leader = 0, replicas = [0,], isr = [0,]])
619 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.temp0.8655521798253616.records-per-batch
619 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.temp0.8655521798253616.bytes
619 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.temp0.8655521798253616.compression-rate
619 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.temp0.8655521798253616.record-retries
619 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.temp0.8655521798253616.record-errors
646 [main] INFO org.apache.kafka.clients.producer.KafkaProducer  - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
647 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - Beginning shutdown of Kafka producer I/O thread, sending remaining records.
667 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name connections-closed:client-id-producer-1
667 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name connections-created:client-id-producer-1
667 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name bytes-sent-received:client-id-producer-1
667 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name bytes-received:client-id-producer-1
668 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name bytes-sent:client-id-producer-1
668 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name select-time:client-id-producer-1
668 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name io-time:client-id-producer-1
668 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name node--1.bytes-sent
668 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name node--1.bytes-received
669 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name node--1.latency
669 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name node-0.bytes-sent
669 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name node-0.bytes-received
669 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name node-0.latency
669 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - Shutdown of Kafka producer I/O thread has completed.
669 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer  - The Kafka producer has closed.
674 [Thread-1] INFO org.apache.kafka.clients.consumer.ConsumerConfig  - ConsumerConfig values: 
    metric.reporters = []
    metadata.max.age.ms = 300000
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    group.id = 2a4549ce-0e9d-4a66-9573-c5b4c47b3b34
    partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
    reconnect.backoff.ms = 50
    sasl.kerberos.ticket.renew.window.factor = 0.8
    max.partition.fetch.bytes = 1048576
    bootstrap.servers = [localhost:9092]
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.keystore.type = JKS
    ssl.trustmanager.algorithm = PKIX
    enable.auto.commit = true
    ssl.key.password = null
    fetch.max.wait.ms = 500
    sasl.kerberos.min.time.before.relogin = 60000
    connections.max.idle.ms = 540000
    ssl.truststore.password = null
    session.timeout.ms = 30000
    metrics.num.samples = 2
    client.id = 
    ssl.endpoint.identification.algorithm = null
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    ssl.protocol = TLS
    check.crcs = true
    request.timeout.ms = 40000
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.keystore.location = null
    heartbeat.interval.ms = 3000
    auto.commit.interval.ms = 5000
    receive.buffer.bytes = 32768
    ssl.cipher.suites = null
    ssl.truststore.type = JKS
    security.protocol = PLAINTEXT
    ssl.truststore.location = null
    ssl.keystore.password = null
    ssl.keymanager.algorithm = SunX509
    metrics.sample.window.ms = 30000
    fetch.min.bytes = 1024
    send.buffer.bytes = 131072
    auto.offset.reset = latest

675 [Thread-1] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer  - Starting the Kafka consumer
675 [Thread-1] DEBUG org.apache.kafka.clients.Metadata  - Updated cluster metadata version 1 to Cluster(nodes = [Node(-1, localhost, 9092)], partitions = [])
675 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name connections-closed:client-id-consumer-1
675 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name connections-created:client-id-consumer-1
675 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name bytes-sent-received:client-id-consumer-1
675 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name bytes-sent:client-id-consumer-1
676 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name bytes-received:client-id-consumer-1
676 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name select-time:client-id-consumer-1
676 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name io-time:client-id-consumer-1
683 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name heartbeat-latency
683 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name join-latency
684 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name sync-latency
685 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name commit-latency
688 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name bytes-fetched
688 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name records-fetched
688 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name fetch-latency
688 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name records-lag
688 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name fetch-throttle-time
688 [Thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig  - The configuration CLIENT_ID_CONFIG = simple was supplied but isn't a known config.
689 [Thread-1] INFO org.apache.kafka.common.utils.AppInfoParser  - Kafka version : 0.9.0.0
689 [Thread-1] INFO org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId : fc7243c2af4b2b4a
689 [Thread-1] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer  - Kafka consumer created
689 [Thread-1] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer  - Subscribed to topic(s): temp0.8655521798253616
689 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Issuing group metadata request to broker -1
690 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient  - Initiating connection to node -1 at localhost:9092.
691 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node--1.bytes-sent
691 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node--1.bytes-received
691 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node--1.latency
691 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient  - Completed connection to node -1
Stopping consumer .....
Exception caught : null
772 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name connections-closed:client-id-consumer-1
772 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name connections-created:client-id-consumer-1
773 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name bytes-sent-received:client-id-consumer-1
773 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name bytes-received:client-id-consumer-1
773 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name bytes-sent:client-id-consumer-1
773 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name select-time:client-id-consumer-1
773 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name io-time:client-id-consumer-1
773 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name node--1.bytes-sent
774 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name node--1.bytes-received
774 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name node--1.latency
774 [Thread-1] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer  - The Kafka consumer has closed.
After Closing KafkaConsumer

共有1个答案

王才英
2023-03-14

看来你的主要方法有问题。看起来您的流在生成器while循环中被卡住了。

您可以尝试在单独的主类中启动消费者,您应该能够看到消费者获取记录。

 类似资料:
  • Debezium连接器的Kafka connect事件是Avro编码的。 在传递给Kafka connect standalone服务的connect-standalone.properties中提到了以下内容。 使用这些属性配置Kafka使用者代码: 在消费者实现中,下面是读取键和值组件的代码。我使用REST从模式注册表中获取键和值的模式。 解析密钥工作正常。在解析消息的值部分时,我得到了Arr

  • 我有一个简单的java制作人,如下所示 我正在尝试读取如下数据 但消费者并没有从Kafka那里读到任何信息。如果我在处添加以下内容 然后消费者开始从题目开始阅读。但是每次消费者重新启动时,它都从我不想要的主题开始读取消息。如果我在启动消费程序时添加了以下配置 然后,它从主题中读取消息,但是如果消费者在处理所有消息之前重新启动,那么它不会读取未处理的消息。 有人可以让我知道出了什么问题,我该如何解决

  • 我对Kafka相对来说是新的,我试图在主题上发送消息后产生消费者。 单个生产者在不同的分区上发送200个msg。 我多次运行消费者脚本。

  • 我们正在使用Kafka流将数据写入接收器主题。我正在运行一个avro消费者命令行来检查接收器主题中是否有数据: bin/kafka-avro控制台-消费者-主题sink.output.topic-从开始-新消费者-引导-服务器 当我在kafka streams应用程序运行时同时运行消费者时,我会看到数据,但如果我停止消费者并在几分钟后再次运行,我不会看到任何数据。几乎没有可能: 1) 这是因为Ka

  • 如何在apache/kafka中使用regex消费所有主题?我尝试了上面的代码,但不起作用。

  • 我有以下用例: 我有两个Kafka主题,一个是用来处理传入消息流的,另一个是用来存储记录的,作为应用程序初始状态的引导。 有没有办法做到以下几点: 当应用程序启动时,读取Kafka主题中的所有消息,并将该主题中用于将应用程序引导至初始状态的所有存储在内存中 只有在读取了所有消息后,才允许处理流主题中的 因为在应用程序运行时,状态主题上可能会有其他记录,以便在不必重新启动应用程序的情况下将它们合并到