当前位置: 首页 > 工具软件 > JStorm > 使用案例 >

JStorm与springboot(cloud)集成

葛书
2023-12-01

集成spring boot

本地模式

使用jstorm本地模式topology时,可以是jstorm包含springboot,也可以是springboot 包含jstorm,这不影响jar和topology的运行。所以本地模式下,jstorm与springboot或springcloud如何进行集成,都没有影响,但是如果将topology提交到集群去运行时,代码的工程结构就得是jstorm包含springboot了。

集群模式

首先是pom.xml配置调整,解决在集群上运行时冲突:
1、log相关的类包冲突

java.lang.NoSuchMethodError: ch.qos.logback.classic.LoggerContext.removeObject(Ljava/lang/String;)V

2、 多个defaults.yaml资源冲突

Invalid configuration defaults.yaml:Found multiple defaults.yaml resources. You're probably bundling the Storm jars with your topology jar.

在topology真正开始运行时启动springboot,解决获取不到Application以及发布到spring里面的bean的问题。

pom.xml配置

  • 排除spring包中有关log4j-to-slf4j和logback-classic的依赖
  • 将jstorm-core设置为provided范围
...

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j2</artifactId>
        </exclusion>
        <exclusion>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
        </exclusion>
    </exclusions>
</dependency>

...

<dependency>
    <groupId>com.alibaba.jstorm</groupId>
    <artifactId>jstorm-core</artifactId>
    <version>2.2.1</version>
    <scope>provided</scope>
</dependency>

....

springboot启动

由于topology在jstorm集群中运行时,是会将topology序列化后传递到worker上,所以springboot只在nimbus上提交时启动,意义是不大的,需要在topology运行的worker上也启动springboot。
主要有以下几处调整:

  1. 调整SpringApplication类
    SpringApplication

    import com.ctrip.framework.apollo.spring.annotation.EnableApolloConfig;
    import com.ctrip.framework.foundation.internals.provider.DefaultServerProvider;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.boot.WebApplicationType;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration;
    import org.springframework.boot.builder.SpringApplicationBuilder;
    import org.springframework.boot.logging.LoggingSystem;
    import org.springframework.context.ConfigurableApplicationContext;
    import org.springframework.core.env.Environment;
    import java.net.InetAddress;
    import java.net.UnknownHostException;
    import java.util.Objects;
    import java.util.concurrent.atomic.AtomicBoolean;
    
    @Slf4j
    @EnableApolloConfig
    @SpringBootApplication(exclude = {GsonAutoConfiguration.class,
            org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration.class})
    public class SpringApplication {
        private static AtomicBoolean springStarted = new AtomicBoolean(false);
    
    
        public synchronized static void runSpring() {
            runSpring(new String[0]);
        }
    
        public synchronized static void runSpring(String[] args) {
            if(!Objects.isNull(SpringUtils.getApplicationContext())){
                return;
            }
            try{
                run(args);
            }catch (UnknownHostException e){
                log.error("启动springboot时出现异常", e);
            }
        }
    
    
        public synchronized static void run(String[] args) throws UnknownHostException {
            log.info("apollo env is {}", getApolloEnv());
            ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder()
                    .sources(SpringApplication.class)
                    .web(WebApplicationType.NONE)
                    .run(args);
            SpringUtils.setApplicationContext(applicationContext);
            Environment env = applicationContext.getEnvironment();
            log.info(
                    "\n----------------------------------------------------------\n\t"
                            + "Application '{}' is running! Access URLs:\n\t"
                            + "Local: \t\thttp://127.0.0.1:{}\n\t"
                            + "External: \thttp://{}:{}\n----------------------------------------------------------",
                    env.getProperty("spring.application.name"),
                    env.getProperty("server.port"),
                    InetAddress.getLocalHost().getHostAddress(),
                    env.getProperty("server.port"));
    
            String configServerStatus = env.getProperty("configserver.status");
            log.info(
                    "\n----------------------------------------------------------\n\t"
                            + "Config Server: \t{}\n----------------------------------------------------------",
                    configServerStatus == null
                            ? "Not found or not setup for this application"
                            : configServerStatus);
        }
    
    
        private static String getApolloEnv(){
            DefaultServerProvider defaultServerProvider = new DefaultServerProvider();
            defaultServerProvider.initialize();
            return defaultServerProvider.getEnvType();
        }
    }
    

    SpringUtils

    import org.springframework.context.ApplicationContext;
    
    public class SpringUtils {
        static ApplicationContext applicationContext;
    
    
        public static ApplicationContext getApplicationContext() {
            return applicationContext;
        }
    
        public static void setApplicationContext(ApplicationContext applicationContext) {
            SpringUtils.applicationContext = applicationContext;
        }
    
        /**
         * 通过name获取 Bean
         * @param name
         * @return
         */
        public static Object getBean(String name){
            return getApplicationContext().getBean(name);
    
        }
    
    
        /**
         * 通过class获取Bean
         * @param clazz
         * @param <T>
         * @return
         */
        public static <T> T getBean(Class<T> clazz){
            return getApplicationContext().getBean(clazz);
        }
    
        /**
         * 通过name,以及Clazz返回指定的Bean
         * @param name
         * @param clazz
         * @param <T>
         * @return
         */
        public static <T> T getBean(String name,Class<T> clazz){
            return getApplicationContext().getBean(name, clazz);
        }
    }
    
  2. 调整Jstorm Main-Class的main方法

    public class CalBootstrap {
    
        public static void main(String[] args) throws Exception {
        		//启动springboot
            SpringApplication.runSpring(args);
            // topologyBuilder 封装了jstorm topology的创建和提交逻辑
            TopologyBuilder topologyBuilder = SpringUtils.getBean(TopologyBuilder.class);
            topologyBuilder();
            Runtime.getRuntime().addShutdownHook(new Thread(() -> topologyBuilder.destory()));
        }
    }
    
  3. 调整Jstorm IRichSpout或IRichBolt的开始和准备方法
    在IRichSpout#open()方法中加入启动springboot的逻辑

	import backtype.storm.spout.SpoutOutputCollector;
	import backtype.storm.task.TopologyContext;
	import backtype.storm.topology.OutputFieldsDeclarer;
	import backtype.storm.topology.base.BaseRichSpout;
	import lombok.extern.slf4j.Slf4j;

	
	@Slf4j
	public class SourceSpout extends BaseRichSpout {
	
	    private static final long serialVersionUID = 2664953801711903206L;
	
	    private SpoutOutputCollector spoutOutputCollector;
	
	    @Override
	    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
	    	 	//启动springboot
	        SpringApplication.runSpring();
	
	        SpoutProperties properties = SpringUtils.getBean(SpoutProperties.class);
			   
	
	        log.info("starting....................{}", kafkaProperties);
			  .......
	    }
	    ...... 
	}
或者在IRichBolt#prepare()中加入启动springboot的逻辑
	import backtype.storm.task.OutputCollector;
	import backtype.storm.task.TopologyContext;
	import backtype.storm.topology.IRichBolt;
	import backtype.storm.topology.OutputFieldsDeclarer;
	import backtype.storm.tuple.Fields;
	import backtype.storm.tuple.Tuple;
	import backtype.storm.tuple.Values;	
	import java.util.Map;
	import java.util.Objects;
	import java.util.concurrent.TimeUnit;
	
	
	public class CalBolt implements IRichBolt {
	
	    private static final long serialVersionUID = -8234268118517500220L;

	    private OutputCollector outputCollector;
	
	
	    @Override
	    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
	        //启动springboot
	        SpringApplication.runSpring();
				....
	    }
	    ......
	}

以上就基本完成了jstorm和springboot的集成,可以发现,jstorm与springboot的集成的冲突主要在log4j-to-slf4j和logback-classic的依赖冲突,在springboot中排除掉就可以了,这也是网上找到的主流解决方案;但其实,还有另一种更优雅,也更简单的解决方法,请求下面的<集成spring cloud>。

集成spring cloud

jstorm与spring cloud集成出现的冲突与springboot一样,也是因为log相关。原因是spring-cloud-context包中的spring.factories里面的org.springframework.cloud.bootstrap.LoggingSystemShutdownListener;在启动时,运行到LoggingSystemShutdownListener.shutdownLogging()方法时会报java.lang.NoSuchMethodError: ch.qos.logback.classic.LoggerContext.removeObject(Ljava/lang/String;)V

解决方法是在启动springboot前,在系统属性中设置"org.springframework.boot.logging.LoggingSystem"=“none”,表示springboot不再处理log的逻辑。

public class SpringApplication {
    .....
	
	
    public synchronized static void run(String[] args) throws UnknownHostException {
    		//设置"org.springframework.boot.logging.LoggingSystem"="none"
        System.setProperty(LoggingSystem.SYSTEM_PROPERTY, LoggingSystem.NONE);
        log.info("apollo env is {}", getApolloEnv());
        ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder()
                .sources(SpringApplication.class)
                .web(WebApplicationType.NONE)
                .run(args);
        SpringUtils.setApplicationContext(applicationContext);
        Environment env = applicationContext.getEnvironment();
        log.info(
                "\n----------------------------------------------------------\n\t"
                        + "Application '{}' is running! Access URLs:\n\t"
                        + "Local: \t\thttp://127.0.0.1:{}\n\t"
                        + "External: \thttp://{}:{}\n----------------------------------------------------------",
                env.getProperty("spring.application.name"),
                env.getProperty("server.port"),
                InetAddress.getLocalHost().getHostAddress(),
                env.getProperty("server.port"));
	
        String configServerStatus = env.getProperty("configserver.status");
        log.info(
                "\n----------------------------------------------------------\n\t"
                        + "Config Server: \t{}\n----------------------------------------------------------",
                configServerStatus == null
                        ? "Not found or not setup for this application"
                        : configServerStatus);
    }
}

集成apollo

在pom.xml中添加apollo客户端依赖

<dependency>
    <groupId>com.ctrip.framework.apollo</groupId>
    <artifactId>apollo-client</artifactId>
    <version>1.2.0</version>
</dependency>

在工程的resources目录中添加apollo-env.properties文件

dev.meta=http://***.***.**.*:****
fat.meta=http://***.***.**.*:****
uat.meta=http://***.***.**.*:****
pro.meta=http://***.***.**.*:****

在SpringApplication类中添加@EnableApolloConfig注解

@Slf4j
@EnableApolloConfig
@SpringBootApplication(exclude = {GsonAutoConfiguration.class,
	        org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration.class})
public class SpringApplication {
	.......
}

调试时添加-Denv=dev参数
在服务器上运行时,创建文件 /opt/settings/server.properties

env=pro
 类似资料: