使用jstorm本地模式topology时,可以是jstorm包含springboot,也可以是springboot 包含jstorm,这不影响jar和topology的运行。所以本地模式下,jstorm与springboot或springcloud如何进行集成,都没有影响,但是如果将topology提交到集群去运行时,代码的工程结构就得是jstorm包含springboot了。
首先是pom.xml配置调整,解决在集群上运行时冲突:
1、log相关的类包冲突
java.lang.NoSuchMethodError: ch.qos.logback.classic.LoggerContext.removeObject(Ljava/lang/String;)V
2、 多个defaults.yaml资源冲突
Invalid configuration defaults.yaml:Found multiple defaults.yaml resources. You're probably bundling the Storm jars with your topology jar.
在topology真正开始运行时启动springboot,解决获取不到Application以及发布到spring里面的bean的问题。
...
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j2</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency>
...
<dependency>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-core</artifactId>
<version>2.2.1</version>
<scope>provided</scope>
</dependency>
....
由于topology在jstorm集群中运行时,是会将topology序列化后传递到worker上,所以springboot只在nimbus上提交时启动,意义是不大的,需要在topology运行的worker上也启动springboot。
主要有以下几处调整:
调整SpringApplication类
SpringApplication
import com.ctrip.framework.apollo.spring.annotation.EnableApolloConfig;
import com.ctrip.framework.foundation.internals.provider.DefaultServerProvider;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.logging.LoggingSystem;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.env.Environment;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
@EnableApolloConfig
@SpringBootApplication(exclude = {GsonAutoConfiguration.class,
org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration.class})
public class SpringApplication {
private static AtomicBoolean springStarted = new AtomicBoolean(false);
public synchronized static void runSpring() {
runSpring(new String[0]);
}
public synchronized static void runSpring(String[] args) {
if(!Objects.isNull(SpringUtils.getApplicationContext())){
return;
}
try{
run(args);
}catch (UnknownHostException e){
log.error("启动springboot时出现异常", e);
}
}
public synchronized static void run(String[] args) throws UnknownHostException {
log.info("apollo env is {}", getApolloEnv());
ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder()
.sources(SpringApplication.class)
.web(WebApplicationType.NONE)
.run(args);
SpringUtils.setApplicationContext(applicationContext);
Environment env = applicationContext.getEnvironment();
log.info(
"\n----------------------------------------------------------\n\t"
+ "Application '{}' is running! Access URLs:\n\t"
+ "Local: \t\thttp://127.0.0.1:{}\n\t"
+ "External: \thttp://{}:{}\n----------------------------------------------------------",
env.getProperty("spring.application.name"),
env.getProperty("server.port"),
InetAddress.getLocalHost().getHostAddress(),
env.getProperty("server.port"));
String configServerStatus = env.getProperty("configserver.status");
log.info(
"\n----------------------------------------------------------\n\t"
+ "Config Server: \t{}\n----------------------------------------------------------",
configServerStatus == null
? "Not found or not setup for this application"
: configServerStatus);
}
private static String getApolloEnv(){
DefaultServerProvider defaultServerProvider = new DefaultServerProvider();
defaultServerProvider.initialize();
return defaultServerProvider.getEnvType();
}
}
SpringUtils
import org.springframework.context.ApplicationContext;
public class SpringUtils {
static ApplicationContext applicationContext;
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
public static void setApplicationContext(ApplicationContext applicationContext) {
SpringUtils.applicationContext = applicationContext;
}
/**
* 通过name获取 Bean
* @param name
* @return
*/
public static Object getBean(String name){
return getApplicationContext().getBean(name);
}
/**
* 通过class获取Bean
* @param clazz
* @param <T>
* @return
*/
public static <T> T getBean(Class<T> clazz){
return getApplicationContext().getBean(clazz);
}
/**
* 通过name,以及Clazz返回指定的Bean
* @param name
* @param clazz
* @param <T>
* @return
*/
public static <T> T getBean(String name,Class<T> clazz){
return getApplicationContext().getBean(name, clazz);
}
}
调整Jstorm Main-Class的main方法
public class CalBootstrap {
public static void main(String[] args) throws Exception {
//启动springboot
SpringApplication.runSpring(args);
// topologyBuilder 封装了jstorm topology的创建和提交逻辑
TopologyBuilder topologyBuilder = SpringUtils.getBean(TopologyBuilder.class);
topologyBuilder();
Runtime.getRuntime().addShutdownHook(new Thread(() -> topologyBuilder.destory()));
}
}
调整Jstorm IRichSpout或IRichBolt的开始和准备方法
在IRichSpout#open()方法中加入启动springboot的逻辑
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SourceSpout extends BaseRichSpout {
private static final long serialVersionUID = 2664953801711903206L;
private SpoutOutputCollector spoutOutputCollector;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
//启动springboot
SpringApplication.runSpring();
SpoutProperties properties = SpringUtils.getBean(SpoutProperties.class);
log.info("starting....................{}", kafkaProperties);
.......
}
......
}
或者在IRichBolt#prepare()中加入启动springboot的逻辑
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
public class CalBolt implements IRichBolt {
private static final long serialVersionUID = -8234268118517500220L;
private OutputCollector outputCollector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
//启动springboot
SpringApplication.runSpring();
....
}
......
}
以上就基本完成了jstorm和springboot的集成,可以发现,jstorm与springboot的集成的冲突主要在log4j-to-slf4j和logback-classic的依赖冲突,在springboot中排除掉就可以了,这也是网上找到的主流解决方案;但其实,还有另一种更优雅,也更简单的解决方法,请求下面的<集成spring cloud>。
jstorm与spring cloud集成出现的冲突与springboot一样,也是因为log相关。原因是spring-cloud-context包中的spring.factories里面的org.springframework.cloud.bootstrap.LoggingSystemShutdownListener;在启动时,运行到LoggingSystemShutdownListener.shutdownLogging()方法时会报java.lang.NoSuchMethodError: ch.qos.logback.classic.LoggerContext.removeObject(Ljava/lang/String;)V
解决方法是在启动springboot前,在系统属性中设置"org.springframework.boot.logging.LoggingSystem"=“none”,表示springboot不再处理log的逻辑。
public class SpringApplication {
.....
public synchronized static void run(String[] args) throws UnknownHostException {
//设置"org.springframework.boot.logging.LoggingSystem"="none"
System.setProperty(LoggingSystem.SYSTEM_PROPERTY, LoggingSystem.NONE);
log.info("apollo env is {}", getApolloEnv());
ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder()
.sources(SpringApplication.class)
.web(WebApplicationType.NONE)
.run(args);
SpringUtils.setApplicationContext(applicationContext);
Environment env = applicationContext.getEnvironment();
log.info(
"\n----------------------------------------------------------\n\t"
+ "Application '{}' is running! Access URLs:\n\t"
+ "Local: \t\thttp://127.0.0.1:{}\n\t"
+ "External: \thttp://{}:{}\n----------------------------------------------------------",
env.getProperty("spring.application.name"),
env.getProperty("server.port"),
InetAddress.getLocalHost().getHostAddress(),
env.getProperty("server.port"));
String configServerStatus = env.getProperty("configserver.status");
log.info(
"\n----------------------------------------------------------\n\t"
+ "Config Server: \t{}\n----------------------------------------------------------",
configServerStatus == null
? "Not found or not setup for this application"
: configServerStatus);
}
}
在pom.xml中添加apollo客户端依赖
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-client</artifactId>
<version>1.2.0</version>
</dependency>
在工程的resources目录中添加apollo-env.properties文件
dev.meta=http://***.***.**.*:****
fat.meta=http://***.***.**.*:****
uat.meta=http://***.***.**.*:****
pro.meta=http://***.***.**.*:****
在SpringApplication类中添加@EnableApolloConfig注解
@Slf4j
@EnableApolloConfig
@SpringBootApplication(exclude = {GsonAutoConfiguration.class,
org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration.class})
public class SpringApplication {
.......
}
调试时添加-Denv=dev参数
在服务器上运行时,创建文件 /opt/settings/server.properties
env=pro