当前位置: 首页 > 知识库问答 >
问题:

无法使用apache storm使用kafka消息

邵博艺
2023-03-14

我已经开发了一个使用apache storm使用kafka消息的应用程序,当我在eclipse中运行topology using in LocalCluster时,它可以正常工作,消息也可以正常使用,但是当我使用storm命令(bin\storm jar..\kafka-storm-0.0.1-SNAPSHOT.jar com.kafka_storm.util.topology storm kafka topology)运行这个应用程序时,拓扑已启动,但无法使用任何消息,这可能是因为我正在执行错误操作,或者指导我如何查找问题

拓扑代码

public class Topology {

public Properties configs;
public BoltBuilder boltBuilder;
public SpoutBuilder spoutBuilder;   

public Topology(String configFile) throws Exception {
    configs = new Properties();

    InputStream is = null;
    try {
        is = this.getClass().getResourceAsStream("/application.properties");
        configs.load(is);
        //configs.load(Topology.class.getResourceAsStream("/application.properties"));
        boltBuilder = new BoltBuilder(configs);
        spoutBuilder = new SpoutBuilder(configs);
    } catch (Exception ex) {
        ex.printStackTrace();
        System.exit(0);
    }
}

private void submitTopology() throws Exception {
    System.out.println("Entered in submitTopology");
    TopologyBuilder builder = new TopologyBuilder();    
    KafkaSpout<?, ?> kafkaSpout = spoutBuilder.buildKafkaSpout();
    SinkTypeBolt sinkTypeBolt = boltBuilder.buildSinkTypeBolt();
    MongoDBBolt mongoBolt = boltBuilder.buildMongoDBBolt();


    //set the kafkaSpout to topology
    //parallelism-hint for kafkaSpout - defines number of executors/threads to be spawn per container
    int kafkaSpoutCount = Integer.parseInt(configs.getProperty(Keys.KAFKA_SPOUT_COUNT));
    builder.setSpout(configs.getProperty(Keys.KAFKA_SPOUT_ID), kafkaSpout, kafkaSpoutCount);


    //set the sinktype bolt
    int sinkBoltCount = Integer.parseInt(configs.getProperty(Keys.SINK_BOLT_COUNT));
    builder.setBolt(configs.getProperty(Keys.SINK_TYPE_BOLT_ID),sinkTypeBolt,sinkBoltCount).shuffleGrouping(configs.getProperty(Keys.KAFKA_SPOUT_ID));

    //set the mongodb bolt
    int mongoBoltCount = Integer.parseInt(configs.getProperty(Keys.MONGO_BOLT_COUNT));
    builder.setBolt(configs.getProperty(Keys.MONGO_BOLT_ID),mongoBolt,mongoBoltCount).shuffleGrouping(configs.getProperty(Keys.SINK_TYPE_BOLT_ID),Keys.MONGODB_STREAM);


    String topologyName = configs.getProperty(Keys.TOPOLOGY_NAME);

    Config conf = new Config();
    //Defines how many worker processes have to be created for the topology in the cluster.
    conf.setNumWorkers(1);

    System.out.println("Submitting Topology");
    //StormSubmitter.submitTopology(topologyName, conf, builder.createTopology());
    System.out.println("Topology submitted");

    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology(topologyName, conf, builder.createTopology());
}

public static void main(String[] args) throws Exception {
    String configFile;
    if (args.length == 0) {
        System.out.println("Missing input : config file location, using default");
        configFile = "application.properties";
    } else{
        configFile = args[0];
    }

    Topology ingestionTopology = new Topology(configFile);
    ingestionTopology.submitTopology();
}

}

喷口代码

public class SpoutBuilder {

public Properties configs = null;

public SpoutBuilder(Properties configs) {
    this.configs = configs;
}
public KafkaSpout<?, ?> buildKafkaSpout() {
    String servers = configs.getProperty(Keys.KAFKA_BROKER);
    String topic = configs.getProperty(Keys.KAFKA_TOPIC);
    String group = configs.getProperty(Keys.KAFKA_CONSUMERGROUP);

    return new KafkaSpout<>(getKafkaSpoutConfig(servers,topic,group));
}

protected KafkaSpoutConfig<String, String> getKafkaSpoutConfig(String bootstrapServers, String topic, String group) {
    return KafkaSpoutConfig.builder(bootstrapServers, new String[]{topic})
        .setProp(ConsumerConfig.GROUP_ID_CONFIG, group)
        .setRetry(getRetryService())
        .setOffsetCommitPeriodMs(10_000)
        .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_LATEST)
        .setMaxUncommittedOffsets(250)
        .setProcessingGuarantee(ProcessingGuarantee.AT_LEAST_ONCE)
        .setTupleTrackingEnforced(true)
        .setEmitNullTuples(false)
        .setRecordTranslator(new DefaultRecordTranslator<String, String>())
        .build();
}

protected KafkaSpoutRetryService getRetryService() {
    return new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500),
        TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10));
}

}

螺栓生成器

public class BoltBuilder {

public Properties configs = null;

public BoltBuilder(Properties configs) {
    this.configs = configs;
}

public SinkTypeBolt buildSinkTypeBolt() {
    return new SinkTypeBolt();
}

public MongoDBBolt buildMongoDBBolt() {
    String host = configs.getProperty(Keys.MONGO_HOST);
    int port = Integer.parseInt(configs.getProperty(Keys.MONGO_PORT));
    String db = configs.getProperty(Keys.MONGO_DATABASE);
    String collection = configs.getProperty(Keys.MONGO_COLLECTION);
    return new MongoDBBolt(host, port, db, collection);
}

}

SinkTypeBolt代码

public class SinkTypeBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private OutputCollector collector;

public void execute(Tuple tuple) {
    String value = tuple.getString(4);
    System.out.println("Received in SinkType bolt : "+value);
    if (value != null && !value.isEmpty()){
        collector.emit(Keys.MONGODB_STREAM,new Values(value));
        System.out.println("Emitted : "+value);
    }
    collector.ack(tuple);   
}

public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
    this.collector = collector;
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declareStream(Keys.MONGODB_STREAM, new Fields("content"));
}

}

MongoDB螺栓

public class MongoDBBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private OutputCollector collector;
private MongoDatabase mongoDB;
private MongoClient mongoClient;
private String collection;

public String host;
public int port ;
public String db;

protected MongoDBBolt(String host, int port, String db,String collection) {
    this.host = host;
    this.port = port;
    this.db = db;
    this.collection = collection;
}

public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    this.collector = collector;
    this.mongoClient = new MongoClient(host,port);
    this.mongoDB = mongoClient.getDatabase(db);
}

public void execute(Tuple input) {
    Document mongoDoc = getMongoDocForInput(input);
    try{
        mongoDB.getCollection(collection).insertOne(mongoDoc);
        collector.ack(input);
    }catch(Exception e) {
        e.printStackTrace();
        collector.fail(input);
    }
}

@Override
public void cleanup() {
    this.mongoClient.close();
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
    // TODO Auto-generated method stub
}

public Document  getMongoDocForInput(Tuple input) {
    Document doc = new Document();
    String content = (String) input.getValueByField("content");
    String[] parts = content.trim().split(" ");
    System.out.println("Received in MongoDB bolt "+content);
    try {
        for(String part : parts) {
            String[] subParts = part.split(":");
            String fieldName = subParts[0];
            String value = subParts[1];
            doc.append(fieldName, value);
        }
    } catch(Exception e) {

    }
    return doc;
}

}

pom.xml代码

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
</properties>

<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>3.8.1</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>1.2.2</version>
        <scope>provided</scope>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>1.1.0</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-kafka-client</artifactId>
        <version>1.2.2</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>mongo-java-driver</artifactId>
        <version>3.0.4</version>
    </dependency>
    <dependency>
        <groupId>com.googlecode.json-simple</groupId>
        <artifactId>json-simple</artifactId>
        <version>1.1</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>1.4</version>
            <configuration>
                <createDependencyReducedPom>true</createDependencyReducedPom>
            </configuration>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers>
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>com.kafka_storm.util.Topology</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-resources-plugin</artifactId>
            <version>2.4</version>
        </plugin>
    </plugins>
    <resources>
        <resource>
            <directory>src/main/java</directory>
            <includes>
                <include> **/*.properties</include>
            </includes>
        </resource>
    </resources>
</build>

Storm用户界面

共有1个答案

郎健柏
2023-03-14

可以肯定的是,当您使用stormjar提交拓扑时,请记住在拓扑中使用StormSubmitter行,而不是LocalCluster,对吗?

另外,请检查您是否启动了所有正确的守护程序,即,storm nimbusstorm supervisor应至少运行(加上Zookeeper安装)

下一步要查看的地方是日志文件。在Storm目录中,您将有一个logs目录。查看日志/工作工件/

 类似资料:
  • 我已经建立了一个由3个节点组成的AWS集群。我修改了节点的/etc/hosts文件,看起来像这样 当我从其中一个节点运行命令时 bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic first_topic --from-start 它可以工作,但是当我用ip替换主机名并用下面的命令运行它时 bin/kafka-co

  • 因为我是新的Kafka,所以我能够从文件中读取记录,并通过生产者将消息发送到Kafka主题,但不能通过消费者消费相同的主题。 注意:您可以从任何文本文件中读取数据,我使用的是Kafka2.11-0.9。0.0版本 这是我的密码: 下面是输出:

  • 我们正在使用与Kafka消费者和生产者Spring。我们正在生成大小为905字节的消息。我们正在序列化消息,并试图为下一个使用者反序列化它。 消息有效负载类示例: Application.Properties 当我们接受字符串格式的消息负载时,Consumer工作得很好,但当我们将Consumer中的负载反序列化为对象时,我们面临着问题。下面的错误被抛出相同

  • null 产生一些消息: 使用者无法使用消息,如果我给: 当给出服务器而不是时,能够使用消息的使用者:

  • 我已经更新了我的Kafka从版本0.10.2.0到版本2.1.0,现在Kafka不能消费消息。我使用Spring引导,这是我的配置: 我已经更改了组id,以避免旧组id出现问题。我当前的spring版本是2.1。2.释放。在我的应用程序中,我可以看到我的客户是如何不断地重新连接的 你知道这个问题吗?