当前位置: 首页 > 工具软件 > Storm-Server > 使用案例 >

kafka, storm,hdfs日志处理方法(附storm-kafka, storm-hdfs使用方法)

施令雪
2023-12-01
主要内容

一、概述

二、集群安装

三、整合kafka与storm

四、融合kafka、storm与hdfs

五、创建kafka客户端代替kafka-console-producer(未完成,暂时使用其他人发送过来的日志文件)

部分内容转自:http://shiyanjun.cn/archives/934.html

一、概述

(一)机器环境

共4台机器:

nn01 dn01 dn02 dn03  192.168.169.91-94

storm集群:全用,其中nn01为nimbus与ui,其它为supervisor

kafka集群:dn01 dn02 dn03

zk集群:nn01 dn01 dn02

(二)本文主要内容3个示例

1、整合kafka与storm,将kafka中的数据经过kafkaSpout发送到storm中进行处理。

2、整合kafka、storm与hdfs,在上述示例的基础上,使用hdfsBolt将数据写入hdfs。

3、使用kafka客户端从日志文件中读取消息,并发送到kafka,然后继续第2步流程。注:上面2个示例均是使用kafka-console-producer来产生消息的。

说明:

1、事实上,以上3个示例为同一个示例的不同步骤,只是为了方便调试分成不同步骤而已,最终结果即是示例三。因此示例1、2中的部分代码不规范,均在示例3均作了完善。

2、在正式使用代码时,只需要完成集群安装及相应的示例代码即可。如需要使用示例3,则只需要完成第二及第五部分。

(三)关于代码

见:



二、集群安装

主要步骤如下:

1、安装及启动zookeeper集群

2、安装及启动storm集群

3、安装及启动kafka集群,并创建topic和producer,然后创建consumer验证集群正常

(一)安装及启动zookeeper集群

见安装zookeeper集群

(二)安装及启动storm集群

见安装storm集群

(三)安装及启动kafka集群,并创建topic和producer

1、使用3台机器搭建Kafka集群:

192.168.169.92 gdc-dn01-test
192.168.169.93 gdc-dn02-test
192.168.169.94 gdc-dn03-test

2、在安装Kafka集群之前,这里没有使用Kafka自带的Zookeeper,而是独立安装了一个Zookeeper集群,也是使用这3台机器,保证Zookeeper集群正常运行。
3、首先,在gdc-dn01-test上准备Kafka安装文件,执行如下命令:

cd

wget http://mirrors.cnnic.cn/apache/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz

tar xvzf kafka_2.10-0.8.2.1.tgz

mv kafka_2.10-0.8.2.1 kafka


4、修改配置文件kafka/config/server.properties,修改如下内容:





broker.id=0

zookeeper.connect=192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka
这里需要说明的是,默认Kafka会使用ZooKeeper默认的/路径,这样有关Kafka的ZooKeeper配置就会散落在根路径下面,如果 你有其他的应用也在使用ZooKeeper集群,查看ZooKeeper中数据可能会不直观,所以强烈建议指定一个chroot路径,直接在 zookeeper.connect配置项中指定:


zookeeper.connect=192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka
而且,需要手动在ZooKeeper中创建路径/kafka,使用如下命令连接到任意一台ZooKeeper服务器:


cd ~/zookeeper

bin/zkCli.sh
在ZooKeeper执行如下命令创建chroot路径:


create /kafka ''
这样,每次连接Kafka集群的时候(使用--zookeeper选项),也必须使用带chroot路径的连接字符串,后面会看到。


5、然后,将配置好的安装文件同步到其他的dn02、dn03节点上:

scp -r /usr/local/kafka_2.10-0.8.2.1/ 192.168.169.92:/home/hadoop

scp -r /usr/local/kafka_2.10-0.8.2.1/ 192.168.169.93:/home/hadoop

6、最后,在dn02、dn03节点上配置修改配置文件kafka/config/server.properties内容如下所示:


broker.id=1  # 在dn02修改

broker.id=2  # 在dn03修改
因为Kafka集群需要保证各个Broker的id在整个集群中必须唯一,需要调整这个配置项的值(如果在单机上,可以通过建立多个Broker进程来模拟分布式的Kafka集群,也需要Broker的id唯一,还需要修改一些配置目录的信息)。
7、在集群中的dn01、dn02、dn03这三个节点上分别启动Kafka,分别执行如下命令:


bin/kafka-server-start.sh config/server.properties &
可以通过查看日志,或者检查进程状态,保证Kafka集群启动成功。
8、创建一个名称为my-replicated-topic5的Topic,5个分区,并且复制因子为3,执行如下命令:


bin/kafka-topics.sh --create --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --replication-factor 3 --partitions 5 --topic my-replicated-topic5
9、查看创建的Topic,执行如下命令:


bin/kafka-topics.sh --describe --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --topic my-replicated-topic5
结果信息如下所示:

Topic:my-replicated-topic5  PartitionCount:5  ReplicationFactor:3  Configs:
 Topic: my-replicated-topic5  Partition: 0  Leader: 2   Replicas: 2,0,1  Isr: 2,0,1
 Topic: my-replicated-topic5  Partition: 1  Leader: 0  Replicas: 0,1,2   Isr: 0,1,2
 Topic: my-replicated-topic5  Partition: 2  Leader: 1  Replicas: 1,2,0  Isr: 1,2,0
 Topic: my-replicated-topic5  Partition: 3  Leader: 2   Replicas: 2,1,0  Isr: 2,1,0
 Topic: my-replicated-topic5  Partition: 4  Leader: 0  Replicas: 0,2,1  Isr: 0,2,1
上面Leader、Replicas、Isr的含义如下:

1    Partition: 分区
2    Leader   : 负责读写指定分区的节点
3    Replicas : 复制该分区log的节点列表
4    Isr      : "in-sync" replicas,当前活跃的副本列表(是一个子集),并且可能成为Leader
我们可以通过Kafka自带的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh脚本,来验证演示如果发布消息、消费消息。
11、在一个终端,启动Producer,并向我们上面创建的名称为my-replicated-topic5的Topic中生产消息,执行如下脚本:


bin/kafka-console-producer.sh --broker-list 192.168.169.92:9092, 192.168.169.93:9092, 192.168.169.94:9092 --topic my-replicated-topic5

12、在另一个终端,启动Consumer,并订阅我们上面创建的名称为my-replicated-topic5的Topic中生产的消息,执行如下脚本:

bin/kafka-console-consumer.sh --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --from-beginning --topic my-replicated-topic5
可以在Producer终端上输入字符串消息行,就可以在Consumer终端上看到消费者消费的消息内容。
也可以参考Kafka的Producer和Consumer的Java API,通过API编码的方式来实现消息生产和消费的处理逻辑。

三、整合kafka与strom

主要步骤

1、创建storm-kafka topology

(1)创建maven project及pom.xml

(2)MyKafkaTopology.java

(3)提交topology到storm集群

2、在producer中输入内容,观察输出情况

(一)创建storm-kafka topology

消息通过各种方式进入到Kafka消息中间件,比如可以通过使用Flume来收集日志数据,然后在Kafka中路由暂存,然后再由实时计算程序 Storm做实时分析,这时我们就需要将在Storm的Spout中读取Kafka中的消息,然后交由具体的Spot组件去分析处理。实际 上,apache-storm-0.9.4这个版本的Storm已经自带了一个集成Kafka的外部插件程序storm- kafka,可以直接使用,

1、在eclipse中创建maven project,pom.xml如下

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.ljh.demo</groupId>
  <artifactId>stormkafkademo</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>stormkafkademo</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

   <dependencies>
<dependency>
     <groupId>org.apache.storm</groupId>
     <artifactId>storm-core</artifactId>
     <version>0.9.4</version>
     <scope>provided</scope>
</dependency>
<dependency>
     <groupId>org.apache.storm</groupId>
     <artifactId>storm-kafka</artifactId>
     <version>0.9.4</version>
</dependency>
<dependency>
     <groupId>org.apache.kafka</groupId>
     <artifactId>kafka_2.10</artifactId>
     <version>0.8.2.1</version>
     <exclusions>
          <exclusion>
               <groupId>org.apache.zookeeper</groupId>
               <artifactId>zookeeper</artifactId>
          </exclusion>
          <exclusion>
               <groupId>log4j</groupId>
               <artifactId>log4j</artifactId>
          </exclusion>
     </exclusions>
</dependency>

  </dependencies>
</project>

2、开发一个简单WordCount示例程序,从Kafka读取订阅的消息行,通过空格拆分出单个单词,然后再做词频统计计算,实现的Topology的代码,如下所示:

package com.ljh.demo.stormkafkademo;

import java.util.Arrays;

import java.util.HashMap;

import java.util.Iterator;

import java.util.Map;

import java.util.Map.Entry;

import java.util.concurrent.atomic.AtomicInteger;



import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;



import storm.kafka.BrokerHosts;

import storm.kafka.KafkaSpout;

import storm.kafka.SpoutConfig;

import storm.kafka.StringScheme;

import storm.kafka.ZkHosts;

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.StormSubmitter;

import backtype.storm.generated.AlreadyAliveException;

import backtype.storm.generated.InvalidTopologyException;

import backtype.storm.spout.SchemeAsMultiScheme;

import backtype.storm.task.OutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.topology.base.BaseRichBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;



public class MyKafkaTopology {



     public static class KafkaWordSplitter extends BaseRichBolt {



          private static final Log LOG = LogFactory.getLog(KafkaWordSplitter.class);

          private static final long serialVersionUID = 886149197481637894L;

          private OutputCollector collector;



          public void prepare(Map stormConf, TopologyContext context,

                    OutputCollector collector) {

               this.collector = collector;

          }



          public void execute(Tuple input) {

               String line = input.getString(0);

               LOG.info("RECV[kafka -> splitter] " + line);

               String[] words = line.split("\\s+");

               for(String word : words) {

                    LOG.info("EMIT[splitter -> counter] " + word);

                    collector.emit(input, new Values(word, 1));

               }

               collector.ack(input);

          }



          public void declareOutputFields(OutputFieldsDeclarer declarer) {

               declarer.declare(new Fields("word", "count"));

          }



     }



     public static class WordCounter extends BaseRichBolt {



          private static final Log LOG = LogFactory.getLog(WordCounter.class);

          private static final long serialVersionUID = 886149197481637894L;

          private OutputCollector collector;

          private Map<String, AtomicInteger> counterMap;



          public void prepare(Map stormConf, TopologyContext context,

                    OutputCollector collector) {

               this.collector = collector;

               this.counterMap = new HashMap<String, AtomicInteger>();

          }



          public void execute(Tuple input) {

               String word = input.getString(0);

               int count = input.getInteger(1);

               LOG.info("RECV[splitter -> counter] " + word + " : " + count);

               AtomicInteger ai = this.counterMap.get(word);

               if(ai == null) {

                    ai = new AtomicInteger();

                    this.counterMap.put(word, ai);

               }

               ai.addAndGet(count);

               collector.ack(input);

               LOG.info("CHECK statistics map: " + this.counterMap);

          }



          @Override

          public void cleanup() {

               LOG.info("The final result:");

               Iterator<Entry<String, AtomicInteger>> iter = this.counterMap.entrySet().iterator();

               while(iter.hasNext()) {

                    Entry<String, AtomicInteger> entry = iter.next();

                    LOG.info(entry.getKey() + "\t:\t" + entry.getValue().get());

               }



          }



          public void declareOutputFields(OutputFieldsDeclarer declarer) {

               declarer.declare(new Fields("word", "count"));

          }

     }



     public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {

          String zks = "192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181";

          String topic = "my-replicated-topic5";

          String zkRoot = "/storm"; // default zookeeper root configuration for storm

          String id = "word";



          BrokerHosts brokerHosts = new ZkHosts(zks,"/kafka/brokers");

          SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);

          spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());

          spoutConf.forceFromStart = false;

          spoutConf.zkServers = Arrays.asList(new String[] {"192.168.169.91", "192.168.169.92", "192.168.169.93"});

          spoutConf.zkPort = 2181;



          TopologyBuilder builder = new TopologyBuilder();

          builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5); // Kafka我们创建了一个5分区的Topic,这里并行度设置为5

          builder.setBolt("word-splitter", new KafkaWordSplitter(), 2).shuffleGrouping("kafka-reader");

          builder.setBolt("word-counter", new WordCounter()).fieldsGrouping("word-splitter", new Fields("word"));



          Config conf = new Config();



          String name = MyKafkaTopology.class.getSimpleName();

          System.out.println("test");

          if (args != null && args.length > 0) {

               // Nimbus host name passed from command line

               conf.put(Config.NIMBUS_HOST, args[0]);

               conf.setNumWorkers(3);

               StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());

          } else {

               conf.setMaxTaskParallelism(3);

               LocalCluster cluster = new LocalCluster();

               cluster.submitTopology(name, conf, builder.createTopology());

               Thread.sleep(60000);

               cluster.shutdown();

          }

     }

}



3、提交我们开发的Topology程序到storm集群中


storm jar stormkafkademo-0.0.1-SNAPSHOT.jar com.ljh.demo.stormkafkademo.MyKafkaTopology 192.168.169.91

(二)查看topology的运行结果
可以通过查看日志文件(logs/目录下)或者Storm UI来监控Topology的运行状况。如果程序没有错误,可以使用前面我们使用的Kafka Producer来生成消息,就能看到我们开发的Storm Topology能够实时接收到并进行处理。
上面Topology实现代码中,有一个很关键的配置对象SpoutConfig,配置属性如下所示:


spoutConf.forceFromStart = false;
该配置是指,如果该Topology因故障停止处理,下次正常运行时是否从Spout对应数据源Kafka中的该订阅Topic的起始位置开始读 取,如果forceFromStart=true,则之前处理过的Tuple还要重新处理一遍,否则会从上次处理的位置继续处理,保证Kafka中的 Topic数据不被重复处理,是在数据源的位置进行状态记录。














四、整合kafka、storm与hdfs
主要步骤:
1、创建topic并启动producer
2、WordSplitterBolt.java
3、WordCounterBolt.java
4、KafkaStormHdfsTopology.java
5、pom.xml内容如下:
6、编译后把集群提交到storm
7、在kafka-console-producer中输入内容,并查看结果
注:本示例使用第二部分创建的集群

1、创建topic并启动producer
bin/kafka-topics.sh --create --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --replication-factor 3 --partitions 5 --topic test3
bin/kafka-console-producer.sh --broker-list 192.168.169.92:9092, 192.168.169.93:9092, 192.168.169.94:9092 --topic test3
2、WordSplitterBolt.java
package com.ljh.demo.stormkafkademo;

import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
/*
 * 将kafka发送过来的数据按照空格拆分
 */
public class WordSplitterBolt extends BaseRichBolt {

    private static final Log LOG = LogFactory.getLog(WordSplitterBolt.class);
    private static final long serialVersionUID = 65437289133090921L;
    private OutputCollector collector;

    public void prepare(Map stormConf, TopologyContext context,
    OutputCollector collector) {
        this.collector = collector;
    }

    public void execute(Tuple input) {
        String line = input.getString(0);
        LOG.info("RECV[kafka -> splitter] " + line);
        String[] words = line.split("\\s+");
        for (String word : words) {
            LOG.info("EMIT[splitter -> counter] " + word);
            // collector.emit(new Values(word,1));
            collector.emit(input, new Values(word, 1));
        }
        collector.ack(input);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}
3、WordCounterBolt.java
package com.ljh.demo.stormkafkademo.bolt;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class WordCounterBolt extends BaseRichBolt {
    private static final Log LOG = LogFactory.getLog(WordCounterBolt.class);
    private static final long serialVersionUID = 8557689093276234L;
    private OutputCollector collector;
    private Map<String, AtomicInteger> counterMap;
    public void prepare(Map stormConf, TopologyContext context,
    OutputCollector collector) {
        this.collector = collector;
        this.counterMap = new HashMap<String, AtomicInteger>();
    }
    public void execute(Tuple input) {
        String word = input.getString(0);
        int count = input.getInteger(1);
        LOG.info("RECV[splitter -> counter] " + word + " : " + count);
        AtomicInteger ai = this.counterMap.get(word);
        if (ai == null) {
            ai = new AtomicInteger();
            this.counterMap.put(word, ai);
        }
        ai.addAndGet(count);
        collector.emit(input, new Values(word, ai));
        // collector.ack(input);
        LOG.info("CHECK statistics map: " + this.counterMap);
    }
    //在topology被kill掉时执行,一般用于本地调试
    @Override
    public void cleanup() {
        LOG.info("The final result:");
        Iterator<Entry<String, AtomicInteger>> iter = this.counterMap
                .entrySet().iterator();
        while (iter.hasNext()) {
            Entry<String, AtomicInteger> entry = iter.next();
            LOG.info(entry.getKey() + "\t:\t" + entry.getValue().get());
        }
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}
4、KafkaStormHdfsTopology.java
package com.ljh.demo.stormkafkademo;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;

import com.ljh.demo.stormkafkademo.bolt.WordCounterBolt;

import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/*
 * 本类完成以下内容
 * 1、从kafka中读取消息:通过kafkaSpout
 * 2、在storm中对消息进行处理,主要完成(1)WordSplitterBolt对消息进行拆分(2)WordCounterBolt进行统计
 * 3、将结果输出到hdfs:通过hdfsBolt
 */
public class KafkaStormHdfsTopology {
    private static final String h1 = "192.168.169.91";
    private static final String h2 = "192.168.169.92";
    private static final String h3 = "192.168.169.93";

    //args[0]:nimbus hostname; args[1]:topology名称; args[2]:topic名称。 三都均为optional
    public static void main(String[] args) throws AlreadyAliveException,
            InvalidTopologyException, InterruptedException {

        String zks = h1 + ":2181," + h2 + ":2181," + h3 + ":2181";
        String topic = null;

        // 指定topic名称
        if (args.length <= 2) {
            topic = "kafka-storm-hdfs-demo";
        } else {
            topic = args[2];
        }
        // storm信息在zookeeper中的默认存储僧
        String zkRoot = "/storm";
        String id = "word";

        // 1、配置spout从kafka中读取消息
        // 默认情况下,brokers的目录位于/brokers,为方便管理,将kafka相关的信息都放入了/kafka中。
        BrokerHosts brokerHosts = new ZkHosts(zks, "/kafka/brokers");
        SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
        spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
        spoutConf.forceFromStart = false;
        spoutConf.zkServers = Arrays.asList(new String[] { h1, h2, h3 });
        spoutConf.zkPort = 2181;

        // 2、配置hdfs-bolt,将结果写入hdfs中
        RecordFormat format = new DelimitedRecordFormat()
                .withFieldDelimiter("\t");
        // 每收到1000个tuple同步一次到hdfs
        SyncPolicy syncPolicy = new CountSyncPolicy(1000);
        // 每分钟同步一次到hdfs。只要满足1分钟,或者1000tuple均会触发同步。
        FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f,
                TimeUnit.MINUTES); // rotate files
        FileNameFormat fileNameFormat = new DefaultFileNameFormat()
                .withPath("/storm/").withPrefix("app_").withExtension(".log"); // set
                                                                                // file
                                                                                // name
                                                                                // format
        HdfsBolt hdfsBolt = new HdfsBolt()
                .withFsUrl("hdfs://gdc-nn01-test:9000")
                .withFileNameFormat(fileNameFormat).withRecordFormat(format)
                .withRotationPolicy(rotationPolicy).withSyncPolicy(syncPolicy);

        // 3、创建topology
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5); // Kafka鎴戜滑鍒涘缓浜嗕竴涓5鍒嗗尯鐨凾opic锛岃繖閲屽苟琛屽害璁剧疆涓5
        builder.setBolt("word-splitter", new WordSplitterBolt(), 2)
                .shuffleGrouping("kafka-reader");
        builder.setBolt("word-counter", new WordCounterBolt()).fieldsGrouping(
                "word-splitter", new Fields("word"));
        builder.setBolt("hdfs-bolt", hdfsBolt, 2).shuffleGrouping(
                "word-counter");

        // 4、启动topology
        Config conf = new Config();
        String topologyName = null;
        if (args.length <= 1) {
            topologyName = KafkaStormHdfsTopology.class.getSimpleName();
        } else {
            topologyName = args[1];
        }
        String nimbusHost = null;
        if(args.length == 0){
            nimbusHost = h1;
        }else{
            nimbusHost = args[0];
        }

        System.out.println("test");

        if (args != null && args.length > 0) {
            conf.put(Config.NIMBUS_HOST, nimbusHost);
            conf.setNumWorkers(3);
            StormSubmitter.submitTopologyWithProgressBar(topologyName, conf,
                    builder.createTopology());
        } else {
            conf.setMaxTaskParallelism(3);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(topologyName, conf, builder.createTopology());
            Thread.sleep(60000);
            cluster.shutdown();

        }

    }

}
5、pom.xml内容如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.ljh.demo</groupId>
    <artifactId>stormkafkademo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>stormkafkademo</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>0.9.4</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>0.9.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-hdfs</artifactId>
            <version>0.9.4</version>
        </dependency>
        


        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

    </dependencies>
</project>
6、编译后把集群提交到storm
storm jar stormkafkademo-0.0.1-SNAPSHOT.jar com.ljh.demo.stormkafkademo.MyKafkaTopology 192.168.169.91 ksh test3

7、在kafka-console-producer中输入内容,并查看结果
(1)在192.168.169.91:8080中查看数据的发送850712.png以及是否存在错误
(2)到hdfs中查看内容
hadoop fs -ls /storm,并cat相应的文件
输出结果样例:
as      292
possible.       64
reasons,        32
tend    32
skip    32
OS      63
packaged        32
it      64
a       63
to      224
try     98
put     32
in      32
OS      64
hierarchy,      32
can     32

六、一些问题:

1、配置kafka时,如果使用zookeeper create /kafka创建了节点,kafka与storm集成时new ZkHosts(zks) 需要改成 new ZkHosts(zks,”/kafka/brokers”),不然会报java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/my-replicated-topic5/partitions。

storm-kafka插件默认kafka的 zk_path如下:
public class ZkHosts implements BrokerHosts {
private static final String DEFAULT_ZK_PATH = “/brokers”;

2、如果出现以下问题,代表偏移量出错,建议重新开一个topic
ERROR [KafkaApi-3] Error when processing fetch request for partition [xxxxx,1] offset 112394 from consumer with correlation id 0 (kafka.server.KafkaApis)
kafka.common.OffsetOutOfRangeException: Request for offset 112394 but we only have log segments in the range 0 to 665.  

3、在页面查看状态时,当在producer生产消息后,页面会有一定的延迟,应该是有缓存,缓存20个字节后一次提交,因此可以尝试一次大量提交word。

4、当没有某个topic,或者是某个topic的node放置不在默认位置时,会有以下异常:
java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /kafka/brokers/topics/mytest/partitions at storm.kafka.Dynam         


5、关于日志
在初次运行程序时,可能会出现各种各样的错误,一般错误均可在日志中发现,在本例中,需要重点关注的日志有:
(1)supervisor上的work日志,位于$STORM_HOME/logs,如果集群正常,但某个topology运行出现错误,一般可以在这些work日志中找到问题。最常见的是CLASSNOTFOUNDEXCEPTION, CLASSNOTDEFINDEXCEPTION,都是缺包导致的,将它们放入$STORM_HOME/lib即可。
(2)nimbus上的日志,位于$STORM_HOME/logs,主要观察整个集群的状态,有以下4个文件
access.log  metrics.log  nimbus.log  ui.log
(3)kafka的日志,位于$KAFKA_HOME/logs,观察kafka是否运行正常。

6.关于emit与transfer(转自http://www.reader8.cn/jiaocheng/20120801/2057699.html)
 storm ui上emit和transferred的区别
最开始对storm ui上展示出来的emit和transferred数量不是很明白, 于是在storm-user上google了一把, 发现有人也有跟我一样的困惑, nathan做了详细的回答:

emitted栏显示的数字表示的是调用OutputCollector的emit方法的次数.

transferred栏显示的数字表示的是实际tuple发送到下一个task的计数.

如果一个bolt A使用all group的方式(每一个bolt都要接收到)向bolt B发射tuple, 此时bolt B启动了5个task, 那么trasferred显示的数量将是emitted的5倍.

如果一个bolt A内部执行了emit操作, 但是没有指定tuple的接受者, 那么transferred将为0.

这里还有关于spout, bolt之间的emitted数量的关系讨论, 也解释了我的一些疑惑:
有 的bolt的execture方法中并没有emit tuple, 但是storm ui中依然有显示emitted, 主要是因为它调用了ack方法, 而该方法将emit ack tuple到系统默认的acker bolt. 因此如果anchor方式emit一个tuple, emitted一般会包含向acker bolt发射tuple的数量.

另外collector.emit(new Values(xxx))和collector.emit(tuple, new Values(xxx)) 这两种不同的emit方法也会影响后面bolt的emitted和transferred, 如果是前者, 则后续bolt的这两个值都是0, 因为前一个emit方法是非安全的, 不再使用acker来进行校验.

7、kafka中出现以下异常:
[2015-06-09 17:03:05,094] ERROR [KafkaApi-0] Error processing ProducerRequest with correlation id 616 from client kafka-client on partition [test3,0] (kafka.server.KafkaApis)
kafka.common.MessageSizeTooLargeException: Message size is 2211366 bytes which exceeds the maximum configured message size of 1000012.
原因是集群默认每次只能接受约1M的消息,如果客户端一次发送的消息大于这个数值则会导致异常。
在server.properties中添加以下参数
message.max.bytes=1000000000
replica.fetch.max.bytes=1073741824
同时在consumer.properties中添加以下参数:
fetch.message.max.bytes=1073741824
然后重启kafka进程即可,现在每次最大可接收100M的消息。

8、修改kafka数据的默认放置位置:
server.properties中,将
log.dir=/tmp/kafka-los
改为
log.dir=/home/data/kafka

9、删除kafka中的topic
bin/kafka-topics.sh  --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --delete --topic test2

10、
delete.topic.enable=true
默认为false,即delete topic时只是marked for deletion,但并不会真正删除topic。

 

 类似资料: