当前位置: 首页 > 知识库问答 >
问题:

我的Kafka喷发不使用来自 HDP 中的Kafka代理的消息

冀望
2023-03-14

我开发了storm拓扑结构,以从hortonworks上的kafka代理接收JSONArray数据,

我不知道为什么我的kafkaSpout不使用HDP中来自Kafka Brokers的消息,但是Storm拓扑已成功提交,但当我可视化拓扑时:0%的数据已被消耗!!

拓扑可视化

这是我的计划类:

public class ClientInfosSheme implements Scheme{
private static final long serialVersionUID = -2990121166902741545L;
private static final Logger LOG = Logger.getLogger(ClientInfosSheme.class);
public String codeBanque;
public String codeAgence;
public String codeGuichet;
public String devise;
public String numCompte;
public String codeClient;
public String codeOperation;
public String sensOperation;
public String montantOperation;
public String dateValeur;
public String dateComptable;
public String utilisateur;

public static final String CODEBANQUE="codeBanque";
public static final String CODEAGENCE="codeAgence";
public static final String CODEGUICHET="codeGuichet";
public static final String DEVISE="devise";
public static final String NUMCOMPTE="numCompte";
public static final String CODECLIENT="codeClient";
public static final String CODEOPERATION="codeOperation";
public static final String SENSOPERATION="sensOperation";
public static final String MONTANTOPERATION="montantOperation";
public static final String DATEVALEUR="dateValeur";
public static final String DATECOMPTABLE="dateComptable";
public static final String UTILISATEUR="utilisateur";

public List<Object> deserialize(byte[] bytes) {

        try{
            String clientInfos = new String(bytes, "UTF-8");
               JSONArray JSON = new JSONArray(clientInfos);
                for(int i=0;i<JSON.length();i++) {
                    JSONObject object_clientInfos=JSON.getJSONObject(i);   
                try{     

                    //Récupérations des données

                        this.codeBanque=object_clientInfos.getString("codeBanque");
                        this.codeAgence=object_clientInfos.getString("codeAgence");
                        this.codeGuichet=object_clientInfos.getString("codeGuichet");
                        this.devise=object_clientInfos.getString("devise");
                        this.numCompte=object_clientInfos.getString("numCompte");
                        this.codeClient=object_clientInfos.getString("codeClient");
                        this.codeOperation=object_clientInfos.getString("codeOperation");
                        this.sensOperation=object_clientInfos.getString("sensOperation");
                        this.montantOperation=object_clientInfos.getString("montantOperation");
                        this.dateValeur=object_clientInfos.getString("dateValeur");
                        this.dateComptable=object_clientInfos.getString("dateComptable");
                        this.utilisateur=object_clientInfos.getString("utilisateur");

                    }
                    catch(Exception e) 
                              {
                                  e.printStackTrace(); 
                              }


    }// End For Loop



      } catch (JSONException e1) {
        // TODO Auto-generated catch block
        e1.printStackTrace();
    } catch (UnsupportedEncodingException e1) {
        // TODO Auto-generated catch block
        e1.printStackTrace();
    }
         return new Values(codeBanque, codeAgence, codeGuichet, devise, numCompte, codeClient, codeOperation, sensOperation,
                 montantOperation,dateValeur, dateComptable,utilisateur); 

}// End Function deserialize

public Fields getOutputFields() {
        return new Fields(CODEBANQUE,CODEAGENCE,CODEGUICHET,DEVISE,NUMCOMPTE,
                CODECLIENT,CODEOPERATION, SENSOPERATION,MONTANTOPERATION,DATEVALEUR,DATECOMPTABLE,UTILISATEUR);
    }


}

和属性文件:

#Broker host
kafka.zookeeper.host.port=sandbox.hortonworks.com

#Kafka topic to consume.
kafka.topic=INFOCLIENT

#Location in ZK for the Kafka spout to store state.
kafka.zkRoot=/client_infos_sprout

#Kafka Spout Executors.
spout.thread.count=1

当我使用另一个消费者时,存储在Kafka Brokers中的数据如下:

[{"codeBanque":"xx","codeAgence":"yy","codeGuichet":"zz","devise":"tt"..},
{"codeBanque":"xx1","codeAgence":"yy1","codeGuichet":"zz1","devise":"tt1"..},
{"codeBanque":"xx2","codeAgence":"yy2","codeGuichet":"zz2","devise":"tt2"..}]

所以我的问题是为什么它不消费来自Kafka经纪人的信息?

拜托我需要帮助

共有1个答案

栾烨华
2023-03-14

正如您在日志中发现的那样,您的 Spout 不会“消耗”消息,因为拓扑有错误并且不会确认元组 - 因此 Spout 将重放它们。这是按设计工作的。

拓扑稳定后,您将观察到偏移量正在增加。在此之前,Spout将向拓扑发送消息,但您将无法观察到结果。

如果没有看到calculCleRib方法,以及它是如何集成到您的拓扑中的,我们就无法帮助您调试这个方面。

 类似资料:
  • 我有以下场景: 生产者通过Confluent的REST代理(在Confluent的模式注册表上注册模式)向Kafka主题发送Avro编码的消息,如http://docs.confluent.io/3.0.0/kafka-rest/docs/intro.html#produce-and-consument-avro-messages所述 Spring Cloud Stream enabled mes

  • 我用的是Kafka。请在下面找到测试程序。 我正在使用Storm 0.8.1。在Storm 0.8.2中存在多方案类。我会用那个。我只想知道早期版本是如何通过实例化String计划()类来工作的?我在哪里可以下载早期版本的Kafka喷口?但是我怀疑这是一个正确的选择,而不是在Storm 0.8.2上工作。???(困惑) 当我在暴风集群上运行代码(如下所示)时(即当我推我的拓扑时),我得到以下错误(

  • 我创建了一个带有10个分区的Kafka主题,并尝试通过单个Kafka消费者来消费消息。但是,kafka consumer并不是从所有分区读取消息。更具体地说,它只使用来自5个特定分区的消息。示例:使用者仅使用来自[0,1,2,3,4]的消息。在重新启动之后,如果它开始使用来自[5,6,7,8,9]的消息,那么它将只使用来自这些分区的消息。下面是kafka-consumer-offset-check

  • 我正在开发一个模块,它使用来自Kafka主题的消息并发布到下游系统。在下游系统不可用的情况下,消费者不确认Kakfa消息。因此,当我的消费者收到消息时,当下游系统不可用时,kakfa的偏移量将不会被提交。但是如果我在下游系统启动后收到新消息,并且当我确认该消息时,最新的偏移量将被提交,并且消费者永远不会收到主题中没有偏移量提交的那些消息。

  • 我们使用Akka流Kafka来生成和消费消息和Strimzi Kafka集群。以下是相关版本: 重构消息发出后,消费者停止工作。我们在主题中确实有一些信息,但消费者只是在无休止地等待。 以下是日志片段: 还有一些要点: 架构注册表配置正确且良好(否则生产者将无法工作)。 主题(和组协调器)很好,我可以通过这样的普通消费者消费消息: 这就是代码卡住的地方——我使用阻塞调用获取2条消息(甚至无法获取1

  • 这是我的消费者: 所以当运行我的制作人时,它最终会出错。任何人都知道这意味着什么,如果这可能是错的。