当前位置: 首页 > 工具软件 > Storm-Server > 使用案例 >

springboot整合storm

郦良才
2023-12-01

storm,kafka,flume这一套貌似是比较好用的数据实时处理框架

先不说怎么在服务器上搭建flume和kafka这一块的数据收集框架,只说java环境下接收kafka生产者产生的数据,然后经过storm处理数据。

一开始想通过普通main方法直接启动,但是无法使用框架带的功能,于是乎想整合进springboot框架,但是这些框架都是相对独立的,不能互相调用功能,于是乎在网上找了个例子,一系列调整下来成了这个样子。记下来万一后面用到呢。

首先是pom。感觉最困难的就是maven依赖这一块的,因为日志这一块包冲突挺严重的。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
	<groupId>com.example</groupId>
	<artifactId>Storm-Springboot</artifactId>
	<version>0.0.1-SNAPSHOT</version>
  	<packaging>jar</packaging>
  	
     <properties>
     	<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<java.version>1.8</java.version>
		<springboot.version>1.5.9.RELEASE</springboot.version>
        <mybatis-spring-boot>1.2.0</mybatis-spring-boot>
        <mysql-connector>5.1.44</mysql-connector>
		<slf4j.version>1.7.25</slf4j.version>
	    <logback.version>1.2.3</logback.version>
	    <kafka.version>1.0.0</kafka.version>
	    <storm.version>1.2.3</storm.version>
	    <fastjson.version>1.2.41</fastjson.version>
	    <druid>1.1.8</druid>
	    <mybatis-spring-boot-starter.version>1.1.1</mybatis-spring-boot-starter.version>
	    <aspectj.version>1.9.1</aspectj.version>
	</properties>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <version>${springboot.version}</version>
    </dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-test</artifactId>
		<version>${springboot.version}</version>
		<scope>test</scope>
	</dependency>
	<!-- spring-boot整合mybatis -->
    <dependency>
        <groupId>org.mybatis.spring.boot</groupId>
        <artifactId>mybatis-spring-boot-starter</artifactId>
        <version>${mybatis-spring-boot-starter.version}</version>
    </dependency>
    <!-- MySQL 连接驱动依赖 -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>${mysql-connector}</version>
    </dependency>
	<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-thymeleaf</artifactId>
        <version>1.5.2.RELEASE</version>
    </dependency>
    <dependency>
   	 <groupId>org.slf4j</groupId>
   	 <artifactId>slf4j-api</artifactId>
   	 <version>${slf4j.version}</version>
	</dependency>
	<!--可以不用写 get、set方法-->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.16.10</version>
        <scope>provided</scope>
    </dependency>
    
    <dependency>
    	<groupId>ch.qos.logback</groupId>
    	<artifactId>logback-classic</artifactId>
    	<version>${logback.version}</version>
	</dependency>
    <dependency>
    	<groupId>ch.qos.logback</groupId>
    	<artifactId>logback-core</artifactId>
    	<version>${logback.version}</version>
	</dependency>
    <!-- junit -->
	<dependency>
	   <groupId>junit</groupId>
	   <artifactId>junit</artifactId>
	   <version>4.7</version>
	</dependency>
    <!--fastjson 相关jar -->
	<dependency>
		<groupId>com.alibaba</groupId>
		<artifactId>fastjson</artifactId>
		<version>${fastjson.version}</version>
	</dependency>	
		
	<!-- Druid 数据连接池依赖 -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>druid</artifactId>
        <version>${druid}</version>
    </dependency>
    <!--使用AspectJ方式注解需要相应的包-->
	<dependency>
	    <groupId>org.aspectj</groupId>
	    <artifactId>aspectjrt</artifactId>
	    <version>${aspectj.version}</version>
	</dependency>
	<dependency>
        <groupId>org.aspectj</groupId >
        <artifactId>aspectjweaver</artifactId>
        <version>${aspectj.version}</version>
    </dependency>
	
       
    <!-- kafka -->
  	<dependency>
         <groupId>org.apache.kafka</groupId>
         <artifactId>kafka_2.11</artifactId>
         <version>1.1.1</version>
         <exclusions>
           <exclusion>
               <groupId>org.slf4j</groupId>
               <artifactId>slf4j-log4j12</artifactId>
           </exclusion>
         </exclusions>
     </dependency>
     <!-- storm -->         
     <dependency>
         <groupId>org.apache.storm</groupId>
         <artifactId>storm-kafka</artifactId>
         <version>1.2.2</version>
     </dependency>
     <!-- storm -->     
     <dependency>
         <groupId>org.apache.storm</groupId>
	     <artifactId>storm-core</artifactId>
	     <version>1.2.2</version>
     </dependency>
	 <dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-data-redis</artifactId>
		<version>1.5.9.RELEASE</version>
	</dependency>
	<dependency>
	    <groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-redis</artifactId>
	    <version>1.3.2.RELEASE</version>
	</dependency>
    <dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-websocket</artifactId>
		<version>1.5.2.RELEASE</version>
	</dependency>
  </dependencies>

	<build>
	      <plugins>
	          <plugin>
	              <groupId>org.apache.maven.plugins</groupId>
	              <artifactId>maven-compiler-plugin</artifactId>
	              <version>2.3.2</version>
	              <configuration>
	                  <source>1.6</source>
	                  <target>1.6</target>
	                  <compilerVersion>1.6</compilerVersion>
	              </configuration>
	          </plugin>
	      </plugins>
	  </build>
</project>

然后由于日志冲突严重所以后面还要再调整一下日志相关的配置

于是在application.yml中加入

logging:
    config: classpath:logback.xml把配置指向其他配置路径,现在此文件是这样的。

server:
    port: 8080
# log
logging:
    config: classpath:logback.xml

## mysql
spring:
    datasource:
        url: jdbc:mysql://localhost:3306/数据库名?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true
        username: root
        password: 123
        driverClassName: com.mysql.jdbc.Driver

## kafka 
kafka:
    servers: localhost:9092
    topicName: test
    autoCommit: false
    maxPollRecords: 100
    groupId: groupA
    commitRule: earliest

mybatis:
    mapper-locations: classpath:mapper/*.xml

此时logback.xml如下:

<?xml version="1.0" encoding="UTF-8"?>
<configuration  scan="true" scanPeriod="30 seconds" >
	<property name="LOG_HOME" value="logs"/>
	<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
		<layout class="ch.qos.logback.classic.PatternLayout">
			<Pattern>
				%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n
			</Pattern>
		</layout>
	</appender>
	<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
		<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
			<!-- rollover daily -->
			<fileNamePattern>${LOG_HOME}/mylog-%d{yyyy-MM-dd}.%i.txt</fileNamePattern>
			<!-- each file should be at most 10MB, keep 31 days worth of history, but at most 10GB -->
			<maxFileSize>10MB</maxFileSize>
			<maxHistory>31</maxHistory>
			<totalSizeCap>10GB</totalSizeCap>
		</rollingPolicy>
		<layout class="ch.qos.logback.classic.PatternLayout">
			<Pattern>
				%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n
			</Pattern>
		</layout>
	</appender>

	<root level="INFO">
		<appender-ref ref="STDOUT"/>
		<appender-ref ref="FILE" />
	</root>

</configuration>

下一步是融合两个框架

首先修改框架启动方式,修改启动类

新增一个与启动类同级的class

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;

import com.example.demo.utils.GetSpringBean;

@SpringBootApplication
@MapperScan("com.example.demo.DAO")
public class SpringBootApp {

    public static void run(String args) {
        ConfigurableApplicationContext context = SpringApplication.run(SpringBootApp.class, args);
        GetSpringBean springBean=new GetSpringBean();
        springBean.setApplicationContext(context);
    }
 }

然后是启动类内容

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.support.SpringBootServletInitializer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.scheduling.annotation.EnableScheduling;

import com.example.demo.utils.GetSpringBean;
import com.example.demo.utils.RunRedis;
import com.example.demo.utils.TopologyApp;

@SpringBootApplication
@MapperScan("com.example.demo.DAO")
public class Application extends SpringBootServletInitializer{
	
    public static void main(String[] args) {
        // 启动嵌入式的 Tomcat 并初始化 Spring 环境及其各 Spring 组件
        ConfigurableApplicationContext context = SpringApplication.run(Application.class, args);
        GetSpringBean springBean=new GetSpringBean();
        springBean.setApplicationContext(context);
        TopologyApp app = context.getBean(TopologyApp.class);
        app.runStorm(args);
    }
    
}

然后是storm相关的东西 首先是配置storm相关bean

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component("applicationConfiguration")
public class ApplicationConfiguration {
	@Value("${kafka.topicName}")
    private String topicName; 
		
	@Value("${kafka.servers}")
    private String servers;

	@Value("${kafka.maxPollRecords}")
    private int maxPollRecords; 
		
	@Value("${kafka.commitRule}")
    private String commitRule;
	
	@Value("${kafka.autoCommit}")
	private String autoCommit;

	@Value("${kafka.groupId}")
    private String groupId;
	/**  
	 * 获取topicName  
	 * @return  topicName  
	 */
	public String getTopicName() {
		return topicName;
	}

	/**  
	 * 获取servers  
	 * @return  servers  
	 */
	public String getServers() {
		return servers;
	}

	/**  
	 * 获取maxPollRecords  
	 * @return  maxPollRecords  
	 */
	public int getMaxPollRecords() {
		return maxPollRecords;
	}

	/**  
	 * 获取commitRule  
	 * @return  commitRule  
	 */
	public String getCommitRule() {
		return commitRule;
	}
	
	/**  
	 * 获取autoCommit  
	 * @return  autoCommit  
	 */
	public String getAutoCommit() {
		return autoCommit;
	}

	/**  
	 * 获取groupId  
	 * @return  groupId  
	 */
	public String getGroupId() {
		return groupId;
	}
		
	
}

存储消费者相关名称的类

public class Constants {
	
	public  final static String KAFKA_SPOUT="KAFKA_SPOUT"; 
	public  final static String INSERT_BOLT="INSERT_BOLT"; 
	public  final static String FIELD="insert";
}

还有动态获取框架bean的类,为了就是使用springboot框架内的东西

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

public class GetSpringBean implements ApplicationContextAware{

	private static ApplicationContext context;

	public static Object getBean(String name) {
		return context.getBean(name);
	}

	public static <T> T getBean(Class<T> c) {
		if(context==null){
			//SpringBootApp.run("pancm");
		}
		return context.getBean(c);
	}

	@Override
	public void setApplicationContext(ApplicationContext applicationContext)
			throws BeansException {
		if(applicationContext!=null){
			context = applicationContext;
		}
	}
}

然后是storm的生产者,在这里获取kafka生产者加入的数据,也就是说吧kafka的消费者放到这里比较好

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSON;
import com.example.demo.config.ApplicationConfiguration;
import com.example.demo.constant.Constants;
import com.example.demo.utils.GetSpringBean;

public class KafkaInsertDataSpout extends BaseRichSpout{

	private static final long serialVersionUID = -2548451744178936478L;
	private static final Logger logger = LoggerFactory.getLogger(KafkaInsertDataSpout.class);
	private SpoutOutputCollector collector;
	private KafkaConsumer<String, String> consumer;
	private ConsumerRecords<String, String> msgList;
	private ApplicationConfiguration app;
	
	@SuppressWarnings("rawtypes")
	@Override
	public void open(Map map, TopologyContext arg1, SpoutOutputCollector collector) {
		app=GetSpringBean.getBean(ApplicationConfiguration.class);
		kafkaInit();
		this.collector = collector;
	}
	
	@Override
	public void nextTuple() {
		for (;;) {
			try {
				msgList = consumer.poll(100);
				if (null != msgList && !msgList.isEmpty()) {
					String msg = "";
					List<String> list=new ArrayList<String>();
					for (ConsumerRecord<String, String> record : msgList) {
						msg = record.value();
						if (null == msg || "".equals(msg.trim())) {
							continue;
						}
						try{
							list.add(msg);
						}catch(Exception e){
							logger.error("获取消息异常",msg);
							continue;
						}
				    } 
					this.collector.emit(new Values(JSON.toJSONString(list)));
					consumer.commitAsync();
				}
			} catch (Exception e) {
				logger.error("spout异常", e);
				try {
					TimeUnit.SECONDS.sleep(10);
				} catch (InterruptedException e1) {
					logger.error("spout等待异常",e1);
				}
			}
		}
	}
	
	/**
	 * 生命传递到消费者时的key
	 */
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields(Constants.FIELD));
	}
	
	private void kafkaInit(){
		Properties props = new Properties();
        props.put("bootstrap.servers", app.getServers());  
        props.put("max.poll.records", app.getMaxPollRecords());
        props.put("enable.auto.commit", app.getAutoCommit());
        props.put("group.id", app.getGroupId());
        props.put("auto.offset.reset", app.getCommitRule());
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<String, String>(props);
        String topic=app.getTopicName();
        
        Map<TopicPartition, OffsetAndMetadata> hashMaps = new HashMap<TopicPartition, OffsetAndMetadata>();
        hashMaps.put(new TopicPartition(topic, 0), new OffsetAndMetadata(0));
        consumer.commitSync(hashMaps);
        
    	this.consumer.subscribe(Arrays.asList(topic));
    	logger.info("获取topic[" + topic + "]...");
	}	
}

然后是storm的消费者,这个可以是多步消费这里只做了一个

import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSON;
import com.example.demo.constant.Constants;
import com.example.demo.utils.MessageProducer;

public class InsertBolt extends BaseRichBolt{

	private static final long serialVersionUID = 6542256546124282695L;

	private static final Logger logger = LoggerFactory.getLogger(InsertBolt.class);

	@SuppressWarnings("rawtypes")
	@Override
	public void prepare(Map map, TopologyContext arg1, OutputCollector collector) {
	}
	   
	@Override
	public void execute(Tuple tuple) {
		String msg=tuple.getStringByField(Constants.FIELD);
		try{
			List<String> list =JSON.parseArray(msg,String.class);
			if(list!=null&&list.size()>0){
				Iterator<String> iterator = list.iterator();
				 while (iterator.hasNext()) {
					 System.out.println("????");
					 String str = iterator.next();
                     对数据经行处理的方法
					 你的方法!!!!!
				 }
			}
		}catch(Exception e){
			logger.error("Bolt处理数据异常",msg,e);
		}
	}
	
	@Override
	public void cleanup() {
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields(Constants.FIELD));
	}
	
}

然后是整合这两部分 就是启动类里面加入的那这部分。

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.springframework.stereotype.Repository;

import com.example.demo.bolt.InsertBolt;
import com.example.demo.constant.Constants;
import com.example.demo.spout.KafkaInsertDataSpout;

@Repository
public class TopologyApp {
	public  void runStorm(String[] args) {
		TopologyBuilder builder = new TopologyBuilder();
		//parallelism_hint并行度 它代表着一个组件的初始 executor (也是线程)数量
		builder.setSpout(Constants.KAFKA_SPOUT, new KafkaInsertDataSpout(), 1);
		//setNumTasks在代码中通过TopologyBuilder的setNumTasks方法设定具体某个组件的task数目。
		//随机分组,随机分发Stream中的tuple,保证每个Bolt的Task接收Tuple数量大致一致
		builder.setBolt(Constants.INSERT_BOLT, new InsertBolt(), 1).setNumTasks(1).shuffleGrouping(Constants.KAFKA_SPOUT);
		Config conf = new Config();
		//生成多少个线程来执行acker acker负责跟踪任务是否执行完成.如果不设置 系统默认生成一个线程来跟踪任务
		conf.setNumAckers(1);
		//设置工作进程,最大值为Storm集群的Slots的总数,是实际运行Topology的进程数.在前面所说的并行度则是线程数.如果设置工作进程为12.执行的并行度(executor)为48.则每个进程执行4个线程.
		conf.setNumWorkers(3);
		try {
			//远程
			if (args != null && args.length > 0) {
				StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
			} else {
				//本地
				LocalCluster cluster = new LocalCluster();
				cluster.submitTopology("TopologyApp", conf, builder.createTopology());
			}
		} catch (Exception e) {
			System.exit(1);
		}
	}
}

然后启动启动类就可以运行了。

 类似资料: