dynamic-datasource + sharding 动态刷新表

祁默
2023-12-01

需求

在系统中可能会遇到 动态数据源 以及分表分库,动态分表

的情况,动态数据源切换 采用的

<dependency>
   <groupId>com.baomidou</groupId>
   <artifactId>dynamic-datasource-spring-boot-starter</artifactId>
   <version>3.2.1</version>
</dependency>

分表分库采用

<dependency>
   <groupId>org.apache.shardingsphere</groupId>
   <artifactId>sharding-jdbc-spring-boot-starter</artifactId>
   <version>4.1.1</version>
</dependency>
<!-- for spring namespace -->
<dependency>
   <groupId>org.apache.shardingsphere</groupId>
   <artifactId>sharding-jdbc-spring-namespace</artifactId>
   <version>4.1.1</version>
</dependency>

将表shardingsphere 交给DynamicDataSource管理

package com.iot.cloud.iotdevice.config;

/**
 * @author lj
 * @title: DataSourceConfiguration
 * @projectName iot-platform-cloud
 * @description: TODO  管理多数据源 shardingsphere的数据源交给dynamic-datasource去维护。
 * @date 2022/2/17 001713:52
 */

import com.baomidou.dynamic.datasource.DynamicRoutingDataSource;
import com.baomidou.dynamic.datasource.provider.AbstractDataSourceProvider;
import com.baomidou.dynamic.datasource.provider.DynamicDataSourceProvider;
import com.baomidou.dynamic.datasource.spring.boot.autoconfigure.DataSourceProperty;
import com.baomidou.dynamic.datasource.spring.boot.autoconfigure.DynamicDataSourceAutoConfiguration;
import com.baomidou.dynamic.datasource.spring.boot.autoconfigure.DynamicDataSourceProperties;
import org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.ShardingDataSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Primary;

import javax.annotation.Resource;
import javax.sql.DataSource;
import java.util.Map;


@Configuration
@AutoConfigureBefore({DynamicDataSourceAutoConfiguration.class, SpringBootConfiguration.class})
public class DataSourceConfiguration {

    /**
     * 动态数据源配置项
     */
    @Autowired
    private DynamicDataSourceProperties dynamicDataSourceProperties;

    @Value("${spring.shardingsphere.datasource.names:}")
    private String SHARDING_DATA_SOURCE_NAME;
    
    @Lazy
    @Resource
    ShardingDataSource shardingDataSource;

    @Bean
    public DynamicDataSourceProvider dynamicDataSourceProvider() {
        Map<String, DataSourceProperty> datasourceMap = dynamicDataSourceProperties.getDatasource();
        return new AbstractDataSourceProvider() {
            @Override
            public Map<String, DataSource> loadDataSources() {
                Map<String, DataSource> dataSourceMap = createDataSourceMap(datasourceMap);

                // 将 shardingjdbc 管理的数据源也交给动态数据源管理
                dataSourceMap.put(SHARDING_DATA_SOURCE_NAME, shardingDataSource);
                return dataSourceMap;
            }
        };
    }
    @Primary
    @Bean
    public DataSource dataSource(DynamicDataSourceProvider dynamicDataSourceProvider) {
        DynamicRoutingDataSource dataSource = new DynamicRoutingDataSource();
        dataSource.setPrimary(dynamicDataSourceProperties.getPrimary());
        dataSource.setStrict(dynamicDataSourceProperties.getStrict());
        dataSource.setStrategy(dynamicDataSourceProperties.getStrategy());
        dataSource.setProvider(dynamicDataSourceProvider);
        dataSource.setP6spy(dynamicDataSourceProperties.getP6spy());
        dataSource.setSeata(dynamicDataSourceProperties.getSeata());
        return dataSource;
    }
}
 

特别注意   这里的DataSource  指定  ShardingDataSource  不然后面刷新Sharding动态表时会转换失败。

动态刷新表

package com.iot.cloud.iotdevice.init;

import com.baomidou.dynamic.datasource.DynamicRoutingDataSource;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.iot.cloud.iotdevice.config.TableNamesConfig;
import com.iot.cloud.iotdevice.constant.DataSourceConstant;
import lombok.extern.log4j.Log4j2;
import org.apache.ibatis.javassist.Modifier;
import org.apache.shardingsphere.core.rule.TableRule;
import org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.ShardingDataSource;
import org.apache.shardingsphere.shardingjdbc.jdbc.unsupported.AbstractUnsupportedOperationDataSource;
import org.apache.shardingsphere.underlying.common.config.exception.ShardingSphereConfigurationException;
import org.apache.shardingsphere.underlying.common.rule.DataNode;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.sql.DataSource;
import java.lang.reflect.Field;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author Administrator
 * @title: lj
 * @projectName citymanagementplatform
 * @description: TODO
 * @date 2021/1/12 00129:41
 */
@Log4j2
@Component
@EnableConfigurationProperties(TableNamesConfig.class)
public class DeviceCreateTableHandler {

    @Autowired
    private TableNamesConfig tableNamesConfig;

    @Value("${spring.shardingsphere.datasource.names:}")
    private String SHARDING_DATA_SOURCE_NAME;

    @Autowired
    private DataSource dataSource;


    /***
     * 启动的时候初始化一下 动态表单的配置    初化当前年
     */
    @PostConstruct
    private void intData() {
        Date date = new Date();
        SimpleDateFormat format = new SimpleDateFormat("yyyy");
        String s = format.format(date);
        actualTablesRefresh(Integer.parseInt(s));
    }

    /**
     * 初始化下一天的
     */
    @Scheduled(cron = "0 0 20 31 12 ?")
    public void nextInit() {
        Date date = new Date();
        SimpleDateFormat format = new SimpleDateFormat("yyyy");
        String s = format.format(date);
        actualTablesRefresh(Integer.parseInt(s) + 1);
    }

    /**
     * 动态更新 处理化的表配置
     *
     * @param data 表名称
     */
    public void actualTablesRefresh(Integer data) {
        try {
            DynamicRoutingDataSource dd =(DynamicRoutingDataSource) this.dataSource;
            Map<String, DataSource> map = dd.getCurrentDataSources();
            if (tableNamesConfig.getNames() == null || tableNamesConfig.getNames().length == 0) {
                log.info("dynamic.table.names为空");
                return;
            }
            for (int i = 0; i < tableNamesConfig.getNames().length; i++) {
                // 这里 前缀为 数据库简称  后面为表名称
                String dbName = tableNamesConfig.getNames()[i];
                String [] dbNames = dbName.split("-");
                DataSource dsource =  map.get(dbNames[0]);
                ShardingDataSource dataSource = (ShardingDataSource) dsource;
                TableRule tableRule = null;
                try {
                    tableRule = dataSource.getRuntimeContext().getRule().getTableRule(dbNames[1]);
                } catch (ShardingSphereConfigurationException e) {
                    log.error("逻辑表:{},不存在配置!", dbNames[1]);
                }
                // 动态刷新 actualDataNodes
                List<DataNode> dataNodes = tableRule.getActualDataNodes();
                Field actualDataNodesField = TableRule.class.getDeclaredField("actualDataNodes");
                Field modifiersField = Field.class.getDeclaredField("modifiers");
                modifiersField.setAccessible(true);
                modifiersField.setInt(actualDataNodesField, actualDataNodesField.getModifiers() & ~Modifier.FINAL);
                actualDataNodesField.setAccessible(true);
                List<DataNode> newDataNodes = new ArrayList<>();
                int time = tableNamesConfig.getStartYear();
                String dataSourceName = dataNodes.get(0).getDataSourceName();
                while (true) {
                    DataNode dataNode = new DataNode(dataSourceName + "." + dbNames[1] + "_" + time);
                    newDataNodes.add(dataNode);
                    time = time + 1;
                    if (time > data.intValue()) {
                        break;
                    }
                }
                actualDataNodesField.set(tableRule, newDataNodes);
                Set<String> actualTables = Sets.newHashSet();
                Map<DataNode, Integer> dataNodeIndexMap = Maps.newHashMap();
                AtomicInteger index = new AtomicInteger(0);
                newDataNodes.forEach(dataNode -> {
                    actualTables.add(dataNode.getTableName());
                    if (index.intValue() == 0) {
                        dataNodeIndexMap.put(dataNode, 0);
                    } else {
                        dataNodeIndexMap.put(dataNode, index.intValue());
                    }
                    index.incrementAndGet();
                });
                // 动态刷新 actualTablesField
                Field actualTablesField = TableRule.class.getDeclaredField("actualTables");
                actualTablesField.setAccessible(true);
                actualTablesField.set(tableRule, actualTables);
                // 动态刷新 dataNodeIndexMapField
                Field dataNodeIndexMapField = TableRule.class.getDeclaredField("dataNodeIndexMap");
                dataNodeIndexMapField.setAccessible(true);
                dataNodeIndexMapField.set(tableRule, dataNodeIndexMap);
                // 动态刷新 datasourceToTablesMapField
                Map<String, Collection<String>> datasourceToTablesMap = Maps.newHashMap();
                datasourceToTablesMap.put(dataSourceName, actualTables);
                Field datasourceToTablesMapField = TableRule.class.getDeclaredField("datasourceToTablesMap");
                datasourceToTablesMapField.setAccessible(true);
                datasourceToTablesMapField.set(tableRule, datasourceToTablesMap);
            }
        } catch (Exception e) {
            e.printStackTrace();
            log.info("初始化 动态表单失败" + e.getMessage());
        }
    }
}
 

特别注意 我这里是按年分表 ,具体的分表策略你们自己安需求弄。

配置 yml

spring:
  main:
    allow-bean-definition-overriding: true
  ######## 配置多数据源 ########
  datasource:
    dynamic:
      #严格匹配数据源,默认false. true未匹配到指定数据源时抛异常,false使用默认数据源
      strict: true
      datasource:
        master:
          driver-class-name: com.mysql.cj.jdbc.Driver
          url: jdbc:${DATASOURCE_DBTYPE:mysql}://${DATASOURCE_HOST:192.168.0.252}:${DATASOURCE_PORT:3306}/iot_platform?characterEncoding=UTF-8&useUnicode=true&useSSL=false&serverTimezone=Asia/Shanghai
          username: ${DATASOURCE_USERNAME:root}
          password: ${DATASOURCE_PASSWORD:123456}
  ###   这里采用的按年分表策略  对拉取的公厕硬件设备数据进行分表 暂时不分库 #####
  shardingsphere:
    props:
      sql:
        show: true
    datasource:
      names: toilet
      ###  主数据库 ###
      toilet:
        type: com.alibaba.druid.pool.DruidDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:${DATASOURCE_DBTYPE:mysql}://${DATASOURCE_HOST:192.168.0.252}:${DATASOURCE_PORT:3306}/iot_toilet?characterEncoding=UTF-8&useUnicode=true&useSSL=false&serverTimezone=Asia/Shanghai
        username: ${DATASOURCE_USERNAME:root}
        password: ${DATASOURCE_PASSWORD:123456}
    sharding:
      tables:
        wc_passenger:
          ### 数据表 按年分表 ####   流量
          actual-data-nodes: toilet.wc_passenger
          table-strategy:
            standard:
              sharding-column: created_time
              precise-algorithm-class-name: xxx
              range-algorithm-class-name: xxxx

 类似资料: