Storm框架整合springboot的方法
作者:本拉邓
Storm:最火的流式处理框架
伴随着信息科技日新月异的发展,信息呈现出爆发式的膨胀,人们获取信息的途径也更加多样、更加便捷,同时对于信息的时效性要求也越来越高。举个搜索场景中的例子,当一个卖家发布了一条宝贝信息时,他希望的当然是这个宝贝马上就可以被卖家搜索出来、点击、购买啦,相反,如果这个宝贝要等到第二天或者更久才可以被搜出来,估计这个大哥就要骂娘了。再举一个推荐的例子,如果用户昨天在淘宝上买了一双袜子,今天想买一副泳镜去游泳,但是却发现系统在不遗余力地给他推荐袜子、鞋子,根本对他今天寻找泳镜的行为视而不见,估计这哥们心里就会想推荐你妹呀。其实稍微了解点背景知识的码农们都知道,这是因为后台系统做的是每天一次的全量处理,而且大多是在夜深人静之时做的,那么你今天白天做的事情当然要明天才能反映出来啦。
•实现一个实时计算系统
全量数据处理使用的大多是鼎鼎大名的hadoop或者hive,作为一个批处理系统,hadoop以其吞吐量大、自动容错等优点,在海量数据处理上得到了广泛的使用。但是,hadoop不擅长实时计算,因为它天然就是为批处理而生的,这也是业界一致的共识。否则最近这两年也不会有s4,storm,puma这些实时计算系统如雨后春笋般冒出来啦。先抛开s4,storm,puma这些系统不谈,我们首先来看一下,如果让我们自己设计一个实时计算系统,我们要解决哪些问题。
1.低延迟。都说了是实时计算系统了,延迟是一定要低的。
2.高性能。性能不高就是浪费机器,浪费机器是要受批评的哦。
3.分布式。系统都是为应用场景而生的,如果你的应用场景、你的数据和计算单机就能搞定,那么不用考虑这些复杂的问题了。我们所说的是单机搞不定的情况。
4.可扩展。伴随着业务的发展,我们的数据量、计算量可能会越来越大,所以希望这个系统是可扩展的。
5.容错。这是分布式系统中通用问题。一个节点挂了不能影响我的应用。
好,如果仅仅需要解决这5个问题,可能会有无数种方案,而且各有千秋,随便举一种方案,使用消息队列+分布在各个机器上的工作进程就ok啦。我们再继续往下看。
1.容易在上面开发应用程序。亲,你设计的系统需要应用程序开发人员考虑各个处理组件的分布、消息的传递吗?如果是,那有点麻烦啊,开发人员可能会用不好,也不会想去用。
2.消息不丢失。用户发布的一个宝贝消息不能在实时处理的时候给丢了,对吧?更严格一点,如果是一个精确数据统计的应用,那么它处理的消息要不多不少才行。这个要求有点高哦。
诞 生
在2011年Storm开源之前,由于Hadoop的火红,整个业界都在喋喋不休地谈论大数据。Hadoop的高吞吐,海量数据处理的能力使得人们可以方便地处理海量数据。但是,Hadoop的缺点也和它的优点同样鲜明——延迟大,响应缓慢,运维复杂。
有需求也就有创造,在Hadoop基本奠定了大数据霸主地位的时候,很多的开源项目都是以弥补Hadoop的实时性为目标而被创造出来。而在这个节骨眼上Storm横空出世了。
Storm带着流式计算的标签华丽丽滴出场了,看看它的一些卖点:
•分布式系统:可横向拓展,现在的项目不带个分布式特性都不好意思开源。
•运维简单:Storm的部署的确简单。虽然没有Mongodb的解压即用那么简单,但是它也就是多安装两个依赖库而已。
•高度容错:模块都是无状态的,随时宕机重启。
•无数据丢失:Storm创新性提出的ack消息追踪框架和复杂的事务性处理,能够满足很多级别的数据处理需求。不过,越高的数据处理需求,性能下降越严重。
•多语言:实际上,Storm的多语言更像是临时添加上去似的。因为,你的提交部分还是要使用Java实现。
下面介绍下Storm框架整合springboot的方法
我们知道Storm本身是一个独立运行的分布式流式数据处理框架,Springboot
也是一个独立运行的web框架。那么如何在Strom框架中集成Springboot
使得我们能够在Storm开发中运用Spring的Ioc容器及其他如Spring Jpa
等功能呢?我们先来了解以下概念:
•Storm主要的三个Component:Topology、Spout、Bolt。Topology作为主进程控制着spout、bolt线程的运行,他们相当于独立运行的容器分布于storm集群中的各个机器节点。
•SpringApplication:是配置Spring应用上下文的起点。通过调用SpringApplication.run()方法它将创建ApplicationContext实例,这是我们能够使用Ioc容器的主要BeanFactory。之后Spring将会加载所有单例模式的beans,并启动后台运行的CommandLineRunner beans等。
•ApplicationContextAware:这是我们能够在普通Java类中调用Spring容器里的beans的关键接口。
实现原理
Storm框架中的每个Spout和Bolt都相当于独立的应用,Strom在启动spout和bolt时提供了一个open方法(spout)和prepare方法(bolt)。我们可以把初始化Spring应用的操作放在这里,这样可以保证每个spout/bolt应用在后续执行过程中都能获取到Spring的ApplicationContext,有了ApplicationContext实例对象,Spring的所有功能就都能用上了。
•Spout.open方法实现
@Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { //启动Springboot应用 SpringStormApplication.run(); this.map = map; this.topologyContext = topologyContext; this.spoutOutputCollector = spoutOutputCollector; }
•Bolt.prepare方法实现
@Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { //启动Springboot应用 SpringStormApplication.run(); this.map = map; this.topologyContext = topologyContext; this.outputCollector = outputCollector; } •SpringStormApplication启动类 @SpringBootApplication @ComponentScan(value = "com.xxx.storm") public class SpringStormApplication { /** * 非工程启动入口,所以不用main方法 * @param args */ public static void run(String ...args) { SpringApplication app = new SpringApplication(SpringStormApplication.class); //我们并不需要web servlet功能,所以设置为WebApplicationType.NONE app.setWebApplicationType(WebApplicationType.NONE); //忽略掉banner输出 app.setBannerMode(Banner.Mode.OFF); //忽略Spring启动信息日志 app.setLogStartupInfo(false); app.run(args); } }
与我们传统的Springboot应用启动入口稍微有点区别,主要禁用了web功能,看下正常的启动方式:
@SpringBootApplication @ComponentScan(value = "com.xxx.web") public class PlatformApplication { public static void main(String[] args) { SpringApplication.run(PlatformApplication.class, args); } }
•在spout/bolt中调用了SpringStormApplication.run方法后,我们还需要能够拿到ApplicationContext容器对象,这时候我们还需要实现ApplicationContextAware接口,写个工具类BeanUtils:
@Component public class BeanUtils implements ApplicationContextAware { private static ApplicationContext applicationContext = null; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { if (BeanUtils.applicationContext == null) { BeanUtils.applicationContext = applicationContext; } } public static ApplicationContext getApplicationContext() { return applicationContext; } public static Object getBean(String name) { return getApplicationContext().getBean(name); } public static <T> T getBean(Class<T> clazz) { return getApplicationContext().getBean(clazz); } public static <T> T getBean(String name, Class<T> clazz) { return getApplicationContext().getBean(name, clazz); } }
通过@Component
注解使得Spring在启动时能够扫描到该bean,因为BeanUtils实现了ApplicationContextAware
接口,Spring会在启动成功时自动调用BeanUtils.setApplicationContext
方法,将ApplicationContext
对象保存到工具类的静态变量中,之后我们就可以使用BeanUtils.getBean()
去获取Spring容器中的bean了。
写个简单例子
•在FilterBolt的execute方法中获取Spring bean
@Override public void execute(Tuple tuple) { FilterService filterService = (FilterService) BeanUtils.getBean("filterService"); filterService.deleteAll(); }
•定义FilterService类,这时候我们就可以使用Spring的相关注解,自动注入,Spring Jpa等功能了。
@Service("filterService") public class FilterService { @Autowired UserRepository userRepository; public void deleteAll() { userRepository.deleteAll(); } }
将storm应用作为Springboot工程的一个子模块
工程主目录的pom文件还是springboot相关的依赖,在storm子模块中引入storm依赖,这时候启动Strom的topology应用会有一个日志包依赖冲突。
SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Applications/IntelliJ%20IDEA.app/Contents/bin/~/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.11.1/log4j-slf4j-impl-2.11.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Applications/IntelliJ%20IDEA.app/Contents/bin/~/.m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
我们需要在storm子模块的pom文件中重写org.springframework.boot:spring-boot-starter包依赖,将Springboot的相关日志包排除掉,如下:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <exclusions> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-to-slf4j2</artifactId> </exclusion> <exclusion> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic2</artifactId> </exclusion> </exclusions> </dependency>
总结
以上所述是小编给大家介绍的Storm框架整合springboot的方法,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对脚本之家网站的支持!