storm,kafka,flume这一套貌似是比较好用的数据实时处理框架
先不说怎么在服务器上搭建flume和kafka这一块的数据收集框架,只说java环境下接收kafka生产者产生的数据,然后经过storm处理数据。
一开始想通过普通main方法直接启动,但是无法使用框架带的功能,于是乎想整合进springboot框架,但是这些框架都是相对独立的,不能互相调用功能,于是乎在网上找了个例子,一系列调整下来成了这个样子。记下来万一后面用到呢。
首先是pom。感觉最困难的就是maven依赖这一块的,因为日志这一块包冲突挺严重的。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>Storm-Springboot</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<springboot.version>1.5.9.RELEASE</springboot.version>
<mybatis-spring-boot>1.2.0</mybatis-spring-boot>
<mysql-connector>5.1.44</mysql-connector>
<slf4j.version>1.7.25</slf4j.version>
<logback.version>1.2.3</logback.version>
<kafka.version>1.0.0</kafka.version>
<storm.version>1.2.3</storm.version>
<fastjson.version>1.2.41</fastjson.version>
<druid>1.1.8</druid>
<mybatis-spring-boot-starter.version>1.1.1</mybatis-spring-boot-starter.version>
<aspectj.version>1.9.1</aspectj.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${springboot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${springboot.version}</version>
<scope>test</scope>
</dependency>
<!-- spring-boot整合mybatis -->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>${mybatis-spring-boot-starter.version}</version>
</dependency>
<!-- MySQL 连接驱动依赖 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql-connector}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
<version>1.5.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<!--可以不用写 get、set方法-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>
<!-- junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.7</version>
</dependency>
<!--fastjson 相关jar -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!-- Druid 数据连接池依赖 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>${druid}</version>
</dependency>
<!--使用AspectJ方式注解需要相应的包-->
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjrt</artifactId>
<version>${aspectj.version}</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId >
<artifactId>aspectjweaver</artifactId>
<version>${aspectj.version}</version>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.1.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- storm -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.2.2</version>
</dependency>
<!-- storm -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>1.5.9.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>
<version>1.3.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>1.5.2.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
<compilerVersion>1.6</compilerVersion>
</configuration>
</plugin>
</plugins>
</build>
</project>
然后由于日志冲突严重所以后面还要再调整一下日志相关的配置
于是在application.yml中加入
logging:
config: classpath:logback.xml把配置指向其他配置路径,现在此文件是这样的。
server:
port: 8080
# log
logging:
config: classpath:logback.xml
## mysql
spring:
datasource:
url: jdbc:mysql://localhost:3306/数据库名?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true
username: root
password: 123
driverClassName: com.mysql.jdbc.Driver
## kafka
kafka:
servers: localhost:9092
topicName: test
autoCommit: false
maxPollRecords: 100
groupId: groupA
commitRule: earliest
mybatis:
mapper-locations: classpath:mapper/*.xml
此时logback.xml如下:
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="30 seconds" >
<property name="LOG_HOME" value="logs"/>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n
</Pattern>
</layout>
</appender>
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<!-- rollover daily -->
<fileNamePattern>${LOG_HOME}/mylog-%d{yyyy-MM-dd}.%i.txt</fileNamePattern>
<!-- each file should be at most 10MB, keep 31 days worth of history, but at most 10GB -->
<maxFileSize>10MB</maxFileSize>
<maxHistory>31</maxHistory>
<totalSizeCap>10GB</totalSizeCap>
</rollingPolicy>
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n
</Pattern>
</layout>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT"/>
<appender-ref ref="FILE" />
</root>
</configuration>
下一步是融合两个框架
首先修改框架启动方式,修改启动类
新增一个与启动类同级的class
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import com.example.demo.utils.GetSpringBean;
@SpringBootApplication
@MapperScan("com.example.demo.DAO")
public class SpringBootApp {
public static void run(String args) {
ConfigurableApplicationContext context = SpringApplication.run(SpringBootApp.class, args);
GetSpringBean springBean=new GetSpringBean();
springBean.setApplicationContext(context);
}
}
然后是启动类内容
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.support.SpringBootServletInitializer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.scheduling.annotation.EnableScheduling;
import com.example.demo.utils.GetSpringBean;
import com.example.demo.utils.RunRedis;
import com.example.demo.utils.TopologyApp;
@SpringBootApplication
@MapperScan("com.example.demo.DAO")
public class Application extends SpringBootServletInitializer{
public static void main(String[] args) {
// 启动嵌入式的 Tomcat 并初始化 Spring 环境及其各 Spring 组件
ConfigurableApplicationContext context = SpringApplication.run(Application.class, args);
GetSpringBean springBean=new GetSpringBean();
springBean.setApplicationContext(context);
TopologyApp app = context.getBean(TopologyApp.class);
app.runStorm(args);
}
}
然后是storm相关的东西 首先是配置storm相关bean
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component("applicationConfiguration")
public class ApplicationConfiguration {
@Value("${kafka.topicName}")
private String topicName;
@Value("${kafka.servers}")
private String servers;
@Value("${kafka.maxPollRecords}")
private int maxPollRecords;
@Value("${kafka.commitRule}")
private String commitRule;
@Value("${kafka.autoCommit}")
private String autoCommit;
@Value("${kafka.groupId}")
private String groupId;
/**
* 获取topicName
* @return topicName
*/
public String getTopicName() {
return topicName;
}
/**
* 获取servers
* @return servers
*/
public String getServers() {
return servers;
}
/**
* 获取maxPollRecords
* @return maxPollRecords
*/
public int getMaxPollRecords() {
return maxPollRecords;
}
/**
* 获取commitRule
* @return commitRule
*/
public String getCommitRule() {
return commitRule;
}
/**
* 获取autoCommit
* @return autoCommit
*/
public String getAutoCommit() {
return autoCommit;
}
/**
* 获取groupId
* @return groupId
*/
public String getGroupId() {
return groupId;
}
}
存储消费者相关名称的类
public class Constants {
public final static String KAFKA_SPOUT="KAFKA_SPOUT";
public final static String INSERT_BOLT="INSERT_BOLT";
public final static String FIELD="insert";
}
还有动态获取框架bean的类,为了就是使用springboot框架内的东西
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
public class GetSpringBean implements ApplicationContextAware{
private static ApplicationContext context;
public static Object getBean(String name) {
return context.getBean(name);
}
public static <T> T getBean(Class<T> c) {
if(context==null){
//SpringBootApp.run("pancm");
}
return context.getBean(c);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException {
if(applicationContext!=null){
context = applicationContext;
}
}
}
然后是storm的生产者,在这里获取kafka生产者加入的数据,也就是说吧kafka的消费者放到这里比较好
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.example.demo.config.ApplicationConfiguration;
import com.example.demo.constant.Constants;
import com.example.demo.utils.GetSpringBean;
public class KafkaInsertDataSpout extends BaseRichSpout{
private static final long serialVersionUID = -2548451744178936478L;
private static final Logger logger = LoggerFactory.getLogger(KafkaInsertDataSpout.class);
private SpoutOutputCollector collector;
private KafkaConsumer<String, String> consumer;
private ConsumerRecords<String, String> msgList;
private ApplicationConfiguration app;
@SuppressWarnings("rawtypes")
@Override
public void open(Map map, TopologyContext arg1, SpoutOutputCollector collector) {
app=GetSpringBean.getBean(ApplicationConfiguration.class);
kafkaInit();
this.collector = collector;
}
@Override
public void nextTuple() {
for (;;) {
try {
msgList = consumer.poll(100);
if (null != msgList && !msgList.isEmpty()) {
String msg = "";
List<String> list=new ArrayList<String>();
for (ConsumerRecord<String, String> record : msgList) {
msg = record.value();
if (null == msg || "".equals(msg.trim())) {
continue;
}
try{
list.add(msg);
}catch(Exception e){
logger.error("获取消息异常",msg);
continue;
}
}
this.collector.emit(new Values(JSON.toJSONString(list)));
consumer.commitAsync();
}
} catch (Exception e) {
logger.error("spout异常", e);
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e1) {
logger.error("spout等待异常",e1);
}
}
}
}
/**
* 生命传递到消费者时的key
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(Constants.FIELD));
}
private void kafkaInit(){
Properties props = new Properties();
props.put("bootstrap.servers", app.getServers());
props.put("max.poll.records", app.getMaxPollRecords());
props.put("enable.auto.commit", app.getAutoCommit());
props.put("group.id", app.getGroupId());
props.put("auto.offset.reset", app.getCommitRule());
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<String, String>(props);
String topic=app.getTopicName();
Map<TopicPartition, OffsetAndMetadata> hashMaps = new HashMap<TopicPartition, OffsetAndMetadata>();
hashMaps.put(new TopicPartition(topic, 0), new OffsetAndMetadata(0));
consumer.commitSync(hashMaps);
this.consumer.subscribe(Arrays.asList(topic));
logger.info("获取topic[" + topic + "]...");
}
}
然后是storm的消费者,这个可以是多步消费这里只做了一个
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.example.demo.constant.Constants;
import com.example.demo.utils.MessageProducer;
public class InsertBolt extends BaseRichBolt{
private static final long serialVersionUID = 6542256546124282695L;
private static final Logger logger = LoggerFactory.getLogger(InsertBolt.class);
@SuppressWarnings("rawtypes")
@Override
public void prepare(Map map, TopologyContext arg1, OutputCollector collector) {
}
@Override
public void execute(Tuple tuple) {
String msg=tuple.getStringByField(Constants.FIELD);
try{
List<String> list =JSON.parseArray(msg,String.class);
if(list!=null&&list.size()>0){
Iterator<String> iterator = list.iterator();
while (iterator.hasNext()) {
System.out.println("????");
String str = iterator.next();
对数据经行处理的方法
你的方法!!!!!
}
}
}catch(Exception e){
logger.error("Bolt处理数据异常",msg,e);
}
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(Constants.FIELD));
}
}
然后是整合这两部分 就是启动类里面加入的那这部分。
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.springframework.stereotype.Repository;
import com.example.demo.bolt.InsertBolt;
import com.example.demo.constant.Constants;
import com.example.demo.spout.KafkaInsertDataSpout;
@Repository
public class TopologyApp {
public void runStorm(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
//parallelism_hint并行度 它代表着一个组件的初始 executor (也是线程)数量
builder.setSpout(Constants.KAFKA_SPOUT, new KafkaInsertDataSpout(), 1);
//setNumTasks在代码中通过TopologyBuilder的setNumTasks方法设定具体某个组件的task数目。
//随机分组,随机分发Stream中的tuple,保证每个Bolt的Task接收Tuple数量大致一致
builder.setBolt(Constants.INSERT_BOLT, new InsertBolt(), 1).setNumTasks(1).shuffleGrouping(Constants.KAFKA_SPOUT);
Config conf = new Config();
//生成多少个线程来执行acker acker负责跟踪任务是否执行完成.如果不设置 系统默认生成一个线程来跟踪任务
conf.setNumAckers(1);
//设置工作进程,最大值为Storm集群的Slots的总数,是实际运行Topology的进程数.在前面所说的并行度则是线程数.如果设置工作进程为12.执行的并行度(executor)为48.则每个进程执行4个线程.
conf.setNumWorkers(3);
try {
//远程
if (args != null && args.length > 0) {
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
//本地
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("TopologyApp", conf, builder.createTopology());
}
} catch (Exception e) {
System.exit(1);
}
}
}
然后启动启动类就可以运行了。