当前位置: 首页 > 知识库问答 >
问题:

Apache Ignite Kafka连接问题

鄢飞鸾
2023-03-14

我正在尝试对Kafka消息流进行流处理和CEP。为此,我选择Apache Ignite首先实现一个原型。但是,我无法连接到队列:

使用KAFKA2.11-0.10.1.0 apache-ignite-fabric-1.8.0-bin

Kafka工作正常,我用一个消费者测试了它。然后启动ignite,然后在spring boot命令行应用程序中运行following。

    KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>();

    Ignition.setClientMode(true);

    Ignite ignite = Ignition.start();

    Properties settings = new Properties();
    // Set a few key parameters
    settings.put("bootstrap.servers", "localhost:9092");
    settings.put("group.id", "test");
    settings.put("zookeeper.connect", "localhost:2181");
    settings.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    settings.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    settings.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    settings.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    // Create an instance of StreamsConfig from the Properties instance
    kafka.consumer.ConsumerConfig config = new ConsumerConfig(settings);

    IgniteCache<String, String> cache = ignite.getOrCreateCache("myCache");

    try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("myCache")) {
        // allow overwriting cache data
        stmr.allowOverwrite(true);

        kafkaStreamer.setIgnite(ignite);
        kafkaStreamer.setStreamer(stmr);

        // set the topic
        kafkaStreamer.setTopic("test");

        // set the number of threads to process Kafka streams
        kafkaStreamer.setThreads(1);

        // set Kafka consumer configurations
        kafkaStreamer.setConsumerConfig(config);

        // set decoders
        StringDecoder keyDecoder = new StringDecoder(null);
        StringDecoder valueDecoder = new StringDecoder(null);

        kafkaStreamer.setKeyDecoder(keyDecoder);
        kafkaStreamer.setValueDecoder(valueDecoder);

        kafkaStreamer.start();
    } finally {
        kafkaStreamer.stop();
    }

当应用程序启动时,我得到

2017-02-23 10:25:23.409警告1388---[main]Kafka.utils.VerifiableProperties:属性bootstrap.servers无效2017-02-23 10:25:23.410信息1388---[main]Kafka.utils.VerifiableProperties:属性组ID被重写以测试2017-02-23 10:25:23.410警告1388---[main]Kafka.utils.VerifiableProperties:属性密钥.反序列化程序无效2017-02-23 10:25:23.411警告1388---[main]Kafka.utils.VerifiableProperties:属性密钥.序列化程序无效2017-02-23 10:25:23.411警告1388---[main]Kafka.utils.Verifi ableProperties:属性值.反序列化程序无效2017-02-23 10:25:23.411警告1388---[main]Kafka.utils.VerifiableProperties:属性值.序列化程序无效2017-02-23 10:25:23.411 INFO 1388---[main]Kafka.utils.VerifiableProperties:属性ZookeePer.Connect被覆盖到LocalHost:2181

然后

2017-02-23 10:25:24.057警告1388---[r-finder-thread]kafka.client.clientutils$:从代理[BrokerEndpoint(0,user.local,9092)]为主题[set(test)]提取相关id为0的主题元数据失败

java.nhtml" target="_blank">io.channels.ClosedChannelException:null在kafka.network.blockingchannel.send(blockingchannel.scala:110)~[kafka2.11-0.10.0.1.jar:na]在kafka.producer.syncproducer.liftedtree1$1(Syncproducer.scala:80)~[kafka2.11-0.10.0.1.jar:na]在kafka.producer.syncproducer.scala:80)~[kafka2.11-0.10.0.1.jar:na]在kafka.producer.syncproducer.kafka$producer.syncproducer.dosend(124)~[kafka2.11-0.10.0.1.jar:na]在kafka.client.clientutils.scala:59)[kafka2.11-0.10.0.1.jar:na]在kafka.client.clientutils.scala:94)[kafka2.11-0.10.0.1.jar:na]在kafka.client.client.clientutils.fetchtopicmetadata(Clientutils.scala:94)[kafka2.11-0.10.0.1.jar:na]在kafka.consumer.consumerfetchermanager$leaderfinderthread.doWork(consumerfetchermanager.scala:66)[kafka2.11-0.10.0.1.jar:na]在.run(SutdownableThread.Scala:63)[kafka2.11-0.10.0.1.jar:na]

并且从队列中读取也不起作用。有没有人知道怎么解决这个问题?

编辑:如果我注释finally块的内容,则会出现以下错误

[2M2017-02-27 16:42:27.780[0;39M[31Merror[0;39M[35M29946[0;39M[2M---[0;39M[2M[2M[3M]池-3-线程-1][0;39M[36M[0;39M[2M:[0;39M]39M消息被忽略,原因是错误[msg=messageandmetadata(test,0,Message(magic=1,attributes=0,CreateTime=-1,crc=2558126716,key=java.nio.heapbytebuffer[pos=0 lim=1 cap=79],payload=java.nio.heapbytebuffer[pos=0 lim=74 CAP=74]),15941704,Kafka.Serializer.StringDecoder@74A96647,Kafka.Serializer.StringDecoder@42849D34,-1,CreateTime)]

IllegalStateException:数据流已经关闭。在org.apache.ignite.internal.processors.DataStreamer.DataStreamerImpl.Enterbusy(DataStreamerImpl.java:401)~[ignite-core-1.8.0.jar:1.8.0]在org.apache.ignite.internal.processors.DataStreamer.DataStreamerImpl.AddDataInternal(DataStreamerImpl.java:613)~[ignite-core-1.8.0.jar:1.8.0]在org.apache.ignite.internal.processors.DataStreamer.DataStreamer.AddData(DataStreamerImpl.java:667)~[ignite-core-1.8.0.jar:1.8.0]在.ignite.stream.kafka.kafkastreamer$1.run(kafkastreamer.java:180)~[ignite-kafka-1.8.0.jar:1.8.0]在java.util.concurrent.executors$runnableadapter.call(executors.java:511)[NA:1.8.0_111]在java.util.concurrent.futuretask.run(futuretask.java:266)[NA:1.8.0_111]在java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.1142)[NA:1.8.0_111]在java.util.concurrent.threadpoolexecutor$works er.run(ThreadPoolExecutor.java:617)[NA:1.8.0_111]位于java.lang.Thread.run(Thread.java:745)[NA:1.8.0_111]

谢谢!

共有1个答案

甄佐
2023-03-14

我认为发生这种情况是因为KafkastReamer在启动后立即关闭(KafkastReamer.stop()调用finally块)。kafkastreamer.start()不是同步的,它只是从Kafka中旋转出要消耗的线程并退出。

 类似资料:
  • 问题内容: 如果我在闲置了一段时间后启动应用程序,那么我曾经遇到以下错误。(我正在使用Spring + Hibernate + MySQL作为DB) 我通过将以下内容添加到我的servlet-context.xml中解决了这个问题。 我在这里问了这个问题,这个问题是解决方案所特有的。我需要知道为什么会遇到这个错误。 我尝试了上面链接中提供的第一个选项(使用autoReconnect = true配

  • 在连接jstatd和visualvm时遇到了一些问题。以下是我设置的详细信息: 杰斯塔德。政策 叫做与 牵引端口 港口又好又开放 正在运行的应用程序是在vmware上运行的,尽管可以毫无问题地访问该应用程序。 如果有人对连接visualvm有任何想法,那就太好了。

  • 我在CentOS7(confluent)上安装了Apache Kafka,正试图以分布式模式运行filestream Kafka connect,但收到以下错误: 现在可以通过更新workers.properties(如http://docs.confluent.io/current/connect/userguide.html#connect-userguide-distributed-conf

  • 我创建了一个测试帐户来开始使用BrowserStack。我关注了以下页面:在Browserstack Automate上运行量角器测试,这真的很有帮助。 现在我得到: 这是什么意思?我没有任何请求。我只是打开一个页面,点击一个元素,就这样了。

  • 我正试图在公司网络中使用公司服务器进行登录。我可以RDC到服务器,ping服务器,还可以使用get Service-ComputerName DBServer获取Windows服务状态。但是,WinRM会话不允许我进入服务器。 我的电脑: Windows 10 服务器: Windows Server 2012 在DBServer上: 在客户端(我的机器): WinRM的防火墙端口对HTTP和HTT

  • 在我的服务器上做netstat后,当我试图将我的数据库与nifi连接时,显示此错误,而当我通过同一台服务器连接时,它被连接,我已经看到了3台相同的服务器,其中安装了nifi密码db dbuser ip地址端口,每件事都被检查了很多次。有人能帮我解决问题吗?

  • 我用的是Kafka 0.8.2-beta,有2台Ubuntu 14虚拟机: 172.30.141.127正在运行动物园管理员 172.30.141.184在经营一家Kafka经纪人 我正在启动动物园管理员实例,如果一切顺利的话。然后,我尝试启动代理并将其连接到172.30.141.127:2181。它似乎能够在特定的端口上连接并建立会话,但是由于一些似乎没有记录的异常,它失去了连接。 代理输出: