近年来,基于hadoop的sql框架层出不穷,presto也是其中的一员.从2012年发展至今,依然保持年轻的活力(版本迭代依然很快),presto的相关介绍,我们就不赘述了,相信看官多对presto有或多或少的了解,详细的一些说明可以看官网(https://prestodb.io)的说明.
presto自身功能和思想富有先进性,虽然由于是内存计算,稳定性方面还有很大提升空间,但整体依然在adhoc方面有很好的竞争力.其中在catalog加载的方式上来说比较的固化,官方并没有做出动态的方案出来,导致在添加catalog后必须重启整个集群才可以将新添加的catalog数据源添加到presto中,这无疑在实际的生产环境中很不友好.尤其是在一些中台项目中,需要动态规划的东西非常多.这种模式的catalog添加方式显然不能满足我们的开发需要.
因此,在环境的加持下,对presto的加载catlog的方式的源码进行了改造,使其具有热动态添加的功能.我们采用了外部数据库作为他的catlog资源库,对其进行热加载
(1)添加restful API请求接口.
为了使框架本身具有添加catalog的功能,需要使其本身具有Api访问接口的方式来来对catalog的资源进行调整的功能
1.新增CatalogResource类来实现api的请求接口
2.新增TimiCatalogStoreConfig类来实现与数据库交互的持久层
3.新增TimiCatalogStore类来替换原本的catlog加载类
4.新增CatalogInfo类来实现对catalog Model信息的解析
#1 CatalogResource
1 2 | @Path ( "/presto/catalog" ) public class CatalogResource{<br><br> |
@GET
@Path("test")
public Response test()
{
return Response.ok("Hello world").build();
}
在ServerMainModule类中setup方法,最后一行添加jaxrsBinder(binder).bind(CatalogResource.class);将添加的请求类添加进来,然后启动主服务,并确认所开启的presto的请求接口地址,默认端口是:8080请求http://localhost:8080/presto/catalog/test
返回 "Hello world" 则表示restful API 接口添加成功.
#2 TimiCatalogStoreConfig 类中主要实现了读取数据库连接配置,以及具体执行的catalog执行动作,并使用jaxrsBinder(binder).bind(TimiCatalogStoreConfig.class);注入到项目启动的容器中.并将Announcer,disabledCatalogs,ConnectorManager注入到类中.具体实现
1 2 3 4 5 6 7 8 9 10 11 12 | public class TimiCatalogStoreConfig { private final Announcer announcer; private static final Logger log = Logger.get(TimiCatalogStoreConfig. class ); private final Set<String> disabledCatalogs; private final ConnectorManager connectorManager;<br> public TimiCatalogStoreConfig(Announcer announcer,Set<String> disabledCatalogs,ConnectorManager connectorManager ) { this .announcer = announcer; this .disabledCatalogs = ImmutableSet.copyOf(disabledCatalogs); this .connectorManager = connectorManager; } } |
然后就是实现对catlog增删查改动作,并将操作的结构实现到ConnectorManager中,
首先将Server中的CatalogStore替换成我们自定义实现的TimiCatalogStore并注入相关类
1 2 3 4 5 6 7 8 9 | @Inject public TimiCatalogStore(ConnectorManager connectorManager, Announcer announcer, StaticCatalogStoreConfig config,TimiCatalogStoreConfig catalogStoreConfig) { this (connectorManager, announcer, config.getCatalogConfigurationDir(), firstNonNull(config.getDisabledCatalogs(), ImmutableList.of()), catalogStoreConfig ); } |
然后实现loadCatalogs方法,首次调用的时候使用load();方法加载mysql中存储的所有catlog,然后使用ScheduledExecutorService定时的方式从mysql中提取有变化的catlog加载到presto的ConnectorManager中.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | public static void updateConnectorIdAnnouncement(Announcer announcer, CatalogName connectorId) { // // This code was copied from PrestoServer, and is a hack that should be removed when the connectorId property is removed // // get existing announcement ServiceAnnouncement announcement = getPrestoAnnouncement(announcer.getServiceAnnouncements()); // update connectorIds property Map<String, String> properties = new LinkedHashMap<>(announcement.getProperties()); String property = nullToEmpty(properties.get( "connectorIds" )); Set<String> connectorIds = new LinkedHashSet<>(Splitter.on( ',' ).trimResults().omitEmptyStrings().splitToList(property)); connectorIds.add(connectorId.toString()); properties.put( "connectorIds" , Joiner.on( ',' ).join(connectorIds)); // update announcement announcer.removeServiceAnnouncement(announcement.getId()); announcer.addServiceAnnouncement(serviceAnnouncement(announcement.getType()).addProperties(properties).build()); announcer.forceAnnounce(); } |
在这里我们设定的1分钟从mysql库充更新一次catalog列表
1 2 3 4 5 6 | scheduledExecutorService.scheduleWithFixedDelay( new Runnable() { @Override public void run() { reload(); } }, 60 , 60 , TimeUnit.SECONDS); |
调用reload方法定时读取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | public void reload() { try { //获取最新的catalogs Map<String, CatalogInfo> catalogInfos = catalogStoreConfig.load(); catalogInfos.forEach( (key, catalogInfo) -> { if (!catalogInfoMap.containsKey(key)) { //相同--catlog try { System.out.println( "添加数据源" +JSON.toJSONString(catalogInfo)); // log.info("添加数据源:{}",JSON.toJSONString(catalogInfos.get(key))); CatalogName catalogName = loadCatalog(catalogInfo); updateConnectorIdAnnouncement(announcer,catalogName); } catch (Exception e) { e.printStackTrace(); } } else { //不同catlog if (!JSON.toJSONString(catalogInfoMap.get(key)).equals(JSON.toJSONString(catalogInfo))){ connectorManager.dropConnection(catalogInfo.getCatalogName()); try { System.out.println( "添加数据源" +JSON.toJSONString(catalogInfo)); CatalogName catalogName = loadCatalog(catalogInfo); updateConnectorIdAnnouncement(announcer,catalogName); } catch (Exception e) { e.printStackTrace(); } } } } ); catalogInfoMap.putAll(catalogInfos); } catch (Exception e){ e.printStackTrace(); } } |
从mysql库中取出来的catlog信息和对现有的catlog进行对比,如果是不同的catlog就添加到pres