06 07 2021

简介

ShardingSphere使用ThreadLocal管理分片键值进行Hint强制路由。可以通过编程的方式向HintManager中添加分片值,该分片值仅在当前线程内生效。 Hint方式主要使用场景:

1.分片字段不存在SQL中、数据库表结构中,而存在于外部业务逻辑。

2.强制在主库进行某些数据操作。

基于Hint实现数据分片和强制路由

1、项目构建

创建一个SpringBoot项目,引入如下依赖:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <parent>
  5. <groupId>cn.zwqh</groupId>
  6. <artifactId>sharding-sphere-4.1.1</artifactId>
  7. <version>1.0-SNAPSHOT</version>
  8. </parent>
  9. <modelVersion>4.0.0</modelVersion>
  10. <groupId>cn.zwqh</groupId>
  11. <artifactId>sharding-sphere-demo-6</artifactId>
  12. <version>${parent.version}</version>
  13. <packaging>jar</packaging>
  14. <name>sharding-sphere-demo-6</name>
  15. <description>Demo project for Spring Boot</description>
  16. <dependencies>
  17. <dependency>
  18. <groupId>org.springframework.boot</groupId>
  19. <artifactId>spring-boot-starter</artifactId>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.mybatis.spring.boot</groupId>
  23. <artifactId>mybatis-spring-boot-starter</artifactId>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.projectlombok</groupId>
  27. <artifactId>lombok</artifactId>
  28. <scope>provided</scope>
  29. </dependency>
  30. <dependency>
  31. <groupId>mysql</groupId>
  32. <artifactId>mysql-connector-java</artifactId>
  33. </dependency>
  34. <dependency>
  35. <groupId>com.alibaba</groupId>
  36. <artifactId>druid-spring-boot-starter</artifactId>
  37. </dependency>
  38. <dependency>
  39. <groupId>org.apache.shardingsphere</groupId>
  40. <artifactId>sharding-jdbc-spring-boot-starter</artifactId>
  41. </dependency>
  42. <dependency>
  43. <groupId>org.springframework.boot</groupId>
  44. <artifactId>spring-boot-starter-test</artifactId>
  45. <scope>test</scope>
  46. <exclusions>
  47. <exclusion>
  48. <groupId>org.junit.vintage</groupId>
  49. <artifactId>junit-vintage-engine</artifactId>
  50. </exclusion>
  51. </exclusions>
  52. </dependency>
  53. </dependencies>
  54. <build>
  55. <plugins>
  56. <plugin>
  57. <groupId>org.apache.maven.plugins</groupId>
  58. <artifactId>maven-compiler-plugin</artifactId>
  59. <version>3.8.1</version>
  60. <configuration>
  61. <source>1.8</source>
  62. <target>1.8</target>
  63. <encoding>UTF-8</encoding>
  64. </configuration>
  65. </plugin>
  66. <plugin>
  67. <groupId>org.springframework.boot</groupId>
  68. <artifactId>spring-boot-maven-plugin</artifactId>
  69. <configuration>
  70. <mainClass>cn.zwqh.shardingspheredemo6.ShardingSphereDemo6Application</mainClass>
  71. </configuration>
  72. <executions>
  73. <execution>
  74. <id>repackage</id>
  75. <goals>
  76. <goal>repackage</goal>
  77. </goals>
  78. </execution>
  79. </executions>
  80. </plugin>
  81. </plugins>
  82. </build>
  83. </project>

2、创建数据库 sharding_sphere_0,sharding_sphere_1

3、创建实体类

Order

  1. @Data
  2. public class Order {
  3. private long orderId;
  4. private int userId;
  5. private long addressId;
  6. private String status;
  7. }

OrderItem

  1. @Data
  2. public class OrderItem{
  3. private long orderItemId;
  4. private long orderId;
  5. private int userId;
  6. private String status;
  7. }

Address

  1. @Data
  2. public class Address {
  3. private Long addressId;
  4. private String addressName;
  5. }

4、Mapper层

CommonMapper

  1. public interface CommonMapper<T, P> {
  2. /**
  3. * 如果表不存在则创建表
  4. *
  5. * @throws SQLException SQL exception
  6. */
  7. void createTableIfNotExists() throws SQLException;
  8. /**
  9. * 删除表
  10. *
  11. * @throws SQLException SQL exception
  12. */
  13. void dropTable() throws SQLException;
  14. /**
  15. * 清空表
  16. *
  17. * @throws SQLException SQL exception
  18. */
  19. void truncateTable() throws SQLException;
  20. /**
  21. * 新增数据
  22. *
  23. * @param entity entity
  24. * @return generated primary key
  25. * @throws SQLException SQL exception
  26. */
  27. P insert(T entity) throws SQLException;
  28. /**
  29. * 删除数据
  30. *
  31. * @param primaryKey primaryKey
  32. * @throws SQLException SQL exception
  33. */
  34. void delete(P primaryKey) throws SQLException;
  35. /**
  36. * 查询数据
  37. *
  38. * @return all data
  39. * @throws SQLException SQL exception
  40. */
  41. List<T> selectAll() throws SQLException;
  42. }

OrderMapper

  1. public interface OrderMapper extends CommonMapper<Order,Long>{
  2. }

OrderMapperImpl

  1. public class OrderMapperImpl implements OrderMapper {
  2. private DataSource dataSource;
  3. public OrderMapperImpl(final DataSource dataSource) {
  4. this.dataSource = dataSource;
  5. }
  6. @Override
  7. public void createTableIfNotExists() throws SQLException {
  8. String sql = "CREATE TABLE IF NOT EXISTS t_order (order_id BIGINT NOT NULL AUTO_INCREMENT, user_id INT NOT NULL, address_id BIGINT NOT NULL, status VARCHAR(50), PRIMARY KEY (order_id))";
  9. try (Connection connection = dataSource.getConnection();
  10. Statement statement = connection.createStatement()) {
  11. statement.executeUpdate(sql);
  12. }
  13. }
  14. @Override
  15. public void dropTable() throws SQLException {
  16. String sql = "DROP TABLE t_order";
  17. try (Connection connection = dataSource.getConnection();
  18. Statement statement = connection.createStatement()) {
  19. statement.executeUpdate(sql);
  20. }
  21. }
  22. @Override
  23. public void truncateTable() throws SQLException {
  24. String sql = "TRUNCATE TABLE t_order";
  25. try (Connection connection = dataSource.getConnection();
  26. Statement statement = connection.createStatement()) {
  27. statement.executeUpdate(sql);
  28. }
  29. }
  30. @Override
  31. public Long insert(final Order order) throws SQLException {
  32. String sql = "INSERT INTO t_order (user_id, address_id, status) VALUES (?, ?, ?)";
  33. try (Connection connection = dataSource.getConnection();
  34. PreparedStatement preparedStatement = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
  35. preparedStatement.setInt(1, order.getUserId());
  36. preparedStatement.setLong(2, order.getAddressId());
  37. preparedStatement.setString(3, order.getStatus());
  38. preparedStatement.executeUpdate();
  39. try (ResultSet resultSet = preparedStatement.getGeneratedKeys()) {
  40. if (resultSet.next()) {
  41. order.setOrderId(resultSet.getLong(1));
  42. }
  43. }
  44. }
  45. return order.getOrderId();
  46. }
  47. @Override
  48. public void delete(final Long orderId) throws SQLException {
  49. String sql = "DELETE FROM t_order WHERE order_id=?";
  50. try (Connection connection = dataSource.getConnection();
  51. PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
  52. preparedStatement.setLong(1, orderId);
  53. preparedStatement.executeUpdate();
  54. }
  55. }
  56. @Override
  57. public List<Order> selectAll() throws SQLException {
  58. String sql = "SELECT * FROM t_order";
  59. return getOrders(sql);
  60. }
  61. protected List<Order> getOrders(final String sql) throws SQLException {
  62. List<Order> result = new LinkedList<>();
  63. try (Connection connection = dataSource.getConnection();
  64. PreparedStatement preparedStatement = connection.prepareStatement(sql);
  65. ResultSet resultSet = preparedStatement.executeQuery()) {
  66. while (resultSet.next()) {
  67. Order order = new Order();
  68. order.setOrderId(resultSet.getLong(1));
  69. order.setUserId(resultSet.getInt(2));
  70. order.setAddressId(resultSet.getLong(3));
  71. order.setStatus(resultSet.getString(4));
  72. result.add(order);
  73. }
  74. }
  75. return result;
  76. }
  77. }

OrderItemMapper

  1. public interface OrderItemMapper extends CommonMapper<OrderItem,Long> {
  2. }

OrderItemMapperImpl

  1. public class OrderItemMapperImpl implements OrderItemMapper{
  2. private DataSource dataSource;
  3. public OrderItemMapperImpl(final DataSource dataSource) {
  4. this.dataSource = dataSource;
  5. }
  6. @Override
  7. public void createTableIfNotExists() throws SQLException {
  8. String sql = "CREATE TABLE IF NOT EXISTS t_order_item "
  9. + "(order_item_id BIGINT NOT NULL AUTO_INCREMENT, order_id BIGINT NOT NULL, user_id INT NOT NULL, status VARCHAR(50), PRIMARY KEY (order_item_id))";
  10. try (Connection connection = dataSource.getConnection();
  11. Statement statement = connection.createStatement()) {
  12. statement.executeUpdate(sql);
  13. }
  14. }
  15. @Override
  16. public void dropTable() throws SQLException {
  17. String sql = "DROP TABLE t_order_item";
  18. try (Connection connection = dataSource.getConnection();
  19. Statement statement = connection.createStatement()) {
  20. statement.executeUpdate(sql);
  21. }
  22. }
  23. @Override
  24. public void truncateTable() throws SQLException {
  25. String sql = "TRUNCATE TABLE t_order_item";
  26. try (Connection connection = dataSource.getConnection();
  27. Statement statement = connection.createStatement()) {
  28. statement.executeUpdate(sql);
  29. }
  30. }
  31. @Override
  32. public Long insert(final OrderItem orderItem) throws SQLException {
  33. String sql = "INSERT INTO t_order_item (order_id, user_id, status) VALUES (?, ?, ?)";
  34. try (Connection connection = dataSource.getConnection();
  35. PreparedStatement preparedStatement = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
  36. preparedStatement.setLong(1, orderItem.getOrderId());
  37. preparedStatement.setInt(2, orderItem.getUserId());
  38. preparedStatement.setString(3, orderItem.getStatus());
  39. preparedStatement.executeUpdate();
  40. try (ResultSet resultSet = preparedStatement.getGeneratedKeys()) {
  41. if (resultSet.next()) {
  42. orderItem.setOrderItemId(resultSet.getLong(1));
  43. }
  44. }
  45. }
  46. return orderItem.getOrderItemId();
  47. }
  48. @Override
  49. public void delete(final Long orderItemId) throws SQLException {
  50. String sql = "DELETE FROM t_order_item WHERE order_item_id=?";
  51. try (Connection connection = dataSource.getConnection();
  52. PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
  53. preparedStatement.setLong(1, orderItemId);
  54. preparedStatement.executeUpdate();
  55. }
  56. }
  57. @Override
  58. public List<OrderItem> selectAll() throws SQLException {
  59. String sql = "SELECT i.* FROM t_order o, t_order_item i WHERE o.order_id = i.order_id";
  60. return getOrderItems(sql);
  61. }
  62. protected List<OrderItem> getOrderItems(final String sql) throws SQLException {
  63. List<OrderItem> result = new LinkedList<>();
  64. try (Connection connection = dataSource.getConnection();
  65. PreparedStatement preparedStatement = connection.prepareStatement(sql);
  66. ResultSet resultSet = preparedStatement.executeQuery()) {
  67. while (resultSet.next()) {
  68. OrderItem orderItem = new OrderItem();
  69. orderItem.setOrderItemId(resultSet.getLong(1));
  70. orderItem.setOrderId(resultSet.getLong(2));
  71. orderItem.setUserId(resultSet.getInt(3));
  72. orderItem.setStatus(resultSet.getString(4));
  73. result.add(orderItem);
  74. }
  75. }
  76. return result;
  77. }
  78. }

AddressMapper

  1. public interface AddressMapper extends CommonMapper<Address, Long> {
  2. }

AddressMapperImpl

  1. public class AddressMapperImpl implements AddressMapper {
  2. private DataSource dataSource;
  3. public AddressMapperImpl(final DataSource dataSource) {
  4. this.dataSource = dataSource;
  5. }
  6. @Override
  7. public void createTableIfNotExists() throws SQLException {
  8. String sql = "CREATE TABLE IF NOT EXISTS t_address "
  9. + "(address_id BIGINT NOT NULL, address_name VARCHAR(100) NOT NULL, PRIMARY KEY (address_id))";
  10. try (Connection connection = dataSource.getConnection();
  11. Statement statement = connection.createStatement()) {
  12. statement.executeUpdate(sql);
  13. }
  14. }
  15. @Override
  16. public void dropTable() throws SQLException {
  17. String sql = "DROP TABLE t_address";
  18. try (Connection connection = dataSource.getConnection();
  19. Statement statement = connection.createStatement()) {
  20. statement.executeUpdate(sql);
  21. }
  22. }
  23. @Override
  24. public void truncateTable() throws SQLException {
  25. String sql = "TRUNCATE TABLE t_address";
  26. try (Connection connection = dataSource.getConnection();
  27. Statement statement = connection.createStatement()) {
  28. statement.executeUpdate(sql);
  29. }
  30. }
  31. @Override
  32. public Long insert(final Address entity) throws SQLException {
  33. String sql = "INSERT INTO t_address (address_id, address_name) VALUES (?, ?)";
  34. try (Connection connection = dataSource.getConnection();
  35. PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
  36. preparedStatement.setLong(1, entity.getAddressId());
  37. preparedStatement.setString(2, entity.getAddressName());
  38. preparedStatement.executeUpdate();
  39. }
  40. return entity.getAddressId();
  41. }
  42. @Override
  43. public void delete(final Long primaryKey) throws SQLException {
  44. String sql = "DELETE FROM t_address WHERE address_id=?";
  45. try (Connection connection = dataSource.getConnection();
  46. PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
  47. preparedStatement.setLong(1, primaryKey);
  48. preparedStatement.executeUpdate();
  49. }
  50. }
  51. @Override
  52. public List<Address> selectAll() throws SQLException {
  53. String sql = "SELECT * FROM t_address";
  54. return getAddress(sql);
  55. }
  56. private List<Address> getAddress(final String sql) throws SQLException {
  57. List<Address> result = new LinkedList<>();
  58. try (Connection connection = dataSource.getConnection();
  59. PreparedStatement preparedStatement = connection.prepareStatement(sql);
  60. ResultSet resultSet = preparedStatement.executeQuery()) {
  61. while (resultSet.next()) {
  62. Address address = new Address();
  63. address.setAddressId(resultSet.getLong(1));
  64. address.setAddressName(resultSet.getString(2));
  65. result.add(address);
  66. }
  67. }
  68. return result;
  69. }
  70. }

5、Service层

ExampleService

  1. public interface ExampleService {
  2. /**
  3. * 初始化环境
  4. * Initialize environment.
  5. *
  6. * @throws SQLException SQL exception
  7. */
  8. void initEnvironment() throws SQLException;
  9. /**
  10. * 清除环境
  11. * Clean environment.
  12. *
  13. * @throws SQLException SQL exception
  14. */
  15. void cleanEnvironment() throws SQLException;
  16. /**
  17. * 流程成功
  18. * Process success.
  19. *
  20. * @throws SQLException SQL exception
  21. */
  22. void processSuccess() throws SQLException;
  23. /**
  24. * 流程失败
  25. * Process failure.
  26. *
  27. * @throws SQLException SQL exception
  28. */
  29. void processFailure() throws SQLException;
  30. /**
  31. * 打印数据
  32. * Print data.
  33. *
  34. * @throws SQLException SQL exception
  35. */
  36. void printData() throws SQLException;
  37. }

OrderServiceImpl

  1. public class OrderServiceImpl implements ExampleService{
  2. private OrderMapperImpl orderMapperImpl;
  3. private OrderItemMapperImpl orderItemMapperImpl;
  4. private AddressMapperImpl addressMapperImpl;
  5. public OrderServiceImpl(final DataSource dataSource) {
  6. this.orderMapperImpl = new OrderMapperImpl(dataSource);
  7. this.orderItemMapperImpl = new OrderItemMapperImpl(dataSource);
  8. this.addressMapperImpl = new AddressMapperImpl(dataSource);
  9. }
  10. public OrderServiceImpl( OrderMapperImpl orderMapperImpl, OrderItemMapperImpl orderItemMapperImpl, AddressMapperImpl addressMapperImpl) {
  11. this.orderMapperImpl = orderMapperImpl;
  12. this.orderItemMapperImpl = orderItemMapperImpl;
  13. this.addressMapperImpl = addressMapperImpl;
  14. }
  15. @Override
  16. public void initEnvironment() throws SQLException {
  17. orderMapperImpl.createTableIfNotExists();
  18. orderItemMapperImpl.createTableIfNotExists();
  19. orderMapperImpl.truncateTable();
  20. orderItemMapperImpl.truncateTable();
  21. initAddressTable();
  22. }
  23. private void initAddressTable() throws SQLException {
  24. addressMapperImpl.createTableIfNotExists();
  25. addressMapperImpl.truncateTable();
  26. initAddressData();
  27. }
  28. private void initAddressData() throws SQLException {
  29. for (int i = 0; i < 10; i++) {
  30. insertAddress(i);
  31. }
  32. }
  33. private void insertAddress(final int i) throws SQLException {
  34. Address address = new Address();
  35. address.setAddressId((long) i);
  36. address.setAddressName("address_" + i);
  37. addressMapperImpl.insert(address);
  38. }
  39. @Override
  40. public void cleanEnvironment() throws SQLException {
  41. orderMapperImpl.dropTable();
  42. orderItemMapperImpl.dropTable();
  43. addressMapperImpl.dropTable();
  44. }
  45. @Override
  46. public void processSuccess() throws SQLException {
  47. System.out.println("-------------- Process Success Begin ---------------");
  48. List<Long> orderIds = insertData();
  49. printData();
  50. // deleteData(orderIds);
  51. // printData();
  52. System.out.println("-------------- Process Success Finish --------------");
  53. }
  54. @Override
  55. public void processFailure() throws SQLException {
  56. System.out.println("-------------- Process Failure Begin ---------------");
  57. insertData();
  58. System.out.println("-------------- Process Failure Finish --------------");
  59. throw new RuntimeException("Exception occur for transaction test.");
  60. }
  61. private List<Long> insertData() throws SQLException {
  62. System.out.println("---------------------------- Insert Data ----------------------------");
  63. List<Long> result = new ArrayList<>(10);
  64. for (int i = 1; i <= 10; i++) {
  65. Order order = insertOrder(i);
  66. insertOrderItem(i, order);
  67. result.add(order.getOrderId());
  68. }
  69. return result;
  70. }
  71. private Order insertOrder(final int i) throws SQLException {
  72. Order order = new Order();
  73. order.setUserId(i);
  74. order.setAddressId(i);
  75. order.setStatus("INSERT_TEST");
  76. orderMapperImpl.insert(order);
  77. return order;
  78. }
  79. private void insertOrderItem(final int i, final Order order) throws SQLException {
  80. OrderItem item = new OrderItem();
  81. item.setOrderId(order.getOrderId());
  82. item.setUserId(i);
  83. item.setStatus("INSERT_TEST");
  84. orderItemMapperImpl.insert(item);
  85. }
  86. private void deleteData(final List<Long> orderIds) throws SQLException {
  87. System.out.println("---------------------------- Delete Data ----------------------------");
  88. for (Long each : orderIds) {
  89. orderMapperImpl.delete(each);
  90. orderItemMapperImpl.delete(each);
  91. }
  92. }
  93. @Override
  94. public void printData() throws SQLException {
  95. System.out.println("---------------------------- Print Order Data -----------------------");
  96. for (Object each : orderMapperImpl.selectAll()) {
  97. System.out.println(each);
  98. }
  99. System.out.println("---------------------------- Print OrderItem Data -------------------");
  100. for (Object each : orderItemMapperImpl.selectAll()) {
  101. System.out.println(each);
  102. }
  103. }
  104. }

6、Hint 配置

HintType 枚举

  1. public enum HintType {
  2. DATABASE_ONLY, DATABASE_TABLES, MASTER_ONLY
  3. }

hint-databases-only.yaml 分库配置文件

  1. dataSources:
  2. ds_0: !!com.alibaba.druid.pool.DruidDataSource
  3. driverClassName: com.mysql.cj.jdbc.Driver
  4. jdbcUrl: jdbc:mysql://127.0.0.1:3306/sharding_sphere_0
  5. username: root
  6. password: 123456
  7. ds_1: !!com.alibaba.druid.pool.DruidDataSource
  8. driverClassName: com.mysql.cj.jdbc.Driver
  9. jdbcUrl: jdbc:mysql://127.0.0.1:3306/sharding_sphere_1
  10. username: root
  11. password: 123456
  12. shardingRule:
  13. tables:
  14. t_order:
  15. actualDataNodes: ds_${0..1}.t_order
  16. keyGenerator:
  17. type: SNOWFLAKE
  18. column: order_id
  19. props:
  20. worker.id: 123
  21. t_order_item:
  22. actualDataNodes: ds_${0..1}.t_order_item
  23. bindingTables:
  24. - t_order,t_order_item
  25. # 广播表
  26. broadcastTables:
  27. - t_address
  28. # 默认分库策略
  29. defaultDatabaseStrategy:
  30. hint:
  31. algorithmClassName: cn.zwqh.shardingspheredemo6.hint.MyHintAlgorithm
  32. # 默认分表策略
  33. defaultTableStrategy:
  34. # 不分表
  35. none:
  36. props:
  37. sql.show: true

hint-databases-tables.yaml 分库分表配置文件

  1. dataSources:
  2. ds_0: !!com.alibaba.druid.pool.DruidDataSource
  3. driverClassName: com.mysql.cj.jdbc.Driver
  4. url: jdbc:mysql://127.0.0.1:3306/sharding_sphere_0
  5. username: root
  6. password: 123456
  7. ds_1: !!com.alibaba.druid.pool.DruidDataSource
  8. driverClassName: com.mysql.cj.jdbc.Driver
  9. url: jdbc:mysql://127.0.0.1:3306/sharding_sphere_1
  10. username: root
  11. password: 123456
  12. shardingRule:
  13. tables:
  14. t_order:
  15. actualDataNodes: ds_${0..1}.t_order_${0..1}
  16. databaseStrategy:
  17. hint:
  18. algorithmClassName: cn.zwqh.shardingspheredemo6.hint.MyHintAlgorithm
  19. tableStrategy:
  20. hint:
  21. algorithmClassName: cn.zwqh.shardingspheredemo6.hint.MyHintAlgorithm
  22. keyGenerator:
  23. type: SNOWFLAKE
  24. column: order_id
  25. props:
  26. worker.id: 123
  27. t_order_item:
  28. actualDataNodes: ds_${0..1}.t_order_item_${0..1}
  29. bindingTables:
  30. - t_order,t_order_item
  31. broadcastTables:
  32. - t_address
  33. defaultDatabaseStrategy:
  34. inline:
  35. shardingColumn: user_id
  36. algorithmExpression: ds_${user_id % 2}
  37. defaultTableStrategy:
  38. inline:
  39. shardingColumn: order_id
  40. algorithmExpression: t_order_item_${order_id % 2}
  41. props:
  42. sql.show: true

hint-master-only.yaml 读写分离配置文件

  1. dataSources:
  2. ds_master: !!com.alibaba.druid.pool.DruidDataSource
  3. driverClassName: com.mysql.cj.jdbc.Driver
  4. jdbcUrl: jdbc:mysql://127.0.0.1:3306/sharding_sphere_0
  5. username: root
  6. password: 123456
  7. ds_slave_0: !!com.alibaba.druid.pool.DruidDataSource
  8. driverClassName: com.mysql.cj.jdbc.Driver
  9. jdbcUrl: jdbc:mysql://127.0.0.1:3307/sharding_sphere_1
  10. username: root
  11. password: 123456
  12. masterSlaveRule:
  13. name: ds_ms
  14. masterDataSourceName: ds_master
  15. slaveDataSourceNames: [ds_slave_0]
  16. props:
  17. sql.show: true

MyHintAlgorithm 自定义分片算法

  1. /**
  2. * Hint分片算法需要用户实现org.apache.shardingsphere.api.sharding.hint.HintShardingAlgorithm接口。
  3. * ShardingSphere在进行Routing时,如果发现LogicTable的TableRule采用了 Hint的分片算法,将会从HintManager中获取分片值进行路由操作。
  4. */
  5. @Component
  6. public class MyHintAlgorithm implements HintShardingAlgorithm<Long> {
  7. @Override
  8. public Collection<String> doSharding(final Collection<String> availableTargetNames, final HintShardingValue<Long> shardingValue) {
  9. Collection<String> result = new ArrayList<>();
  10. for (String each : availableTargetNames) {
  11. System.out.println("---------------each"+each);
  12. for (Long value : shardingValue.getValues()) {
  13. if (each.endsWith(String.valueOf(value % 2))) {
  14. result.add(each);
  15. }
  16. }
  17. }
  18. return result;
  19. }
  20. }

7、测试类

HintMain

  1. public class HintMain {
  2. /**
  3. * 分库分表
  4. */
  5. private static final HintType TYPE = HintType.DATABASE_TABLES;
  6. /**
  7. * 分库
  8. */
  9. //private static final HintType TYPE = HintType.DATABASE_ONLY;
  10. /**
  11. * 读写分离
  12. */
  13. //private static final HintType TYPE = HintType.MASTER_ONLY;
  14. /**
  15. * demo运行入口
  16. *
  17. * @param args
  18. * @throws IOException
  19. * @throws SQLException
  20. */
  21. public static void main(String[] args) throws IOException, SQLException {
  22. DataSource dataSource = getDataSource();
  23. ExampleService exampleService = getExampleService(dataSource);
  24. exampleService.initEnvironment();
  25. exampleService.processSuccess();
  26. processWithHintValue(dataSource);
  27. // exampleService.cleanEnvironment();
  28. }
  29. /**
  30. * 根据TYPE创建不同配置的数据源
  31. *
  32. * @return
  33. * @throws IOException
  34. * @throws SQLException
  35. */
  36. private static DataSource getDataSource() throws IOException, SQLException {
  37. switch (TYPE) {
  38. case DATABASE_TABLES:
  39. return YamlShardingDataSourceFactory.createDataSource(getFile("/META-INF/hint-databases-tables.yaml"));
  40. case DATABASE_ONLY:
  41. return YamlShardingDataSourceFactory.createDataSource(getFile("/META-INF/hint-databases-only.yaml"));
  42. case MASTER_ONLY:
  43. return YamlMasterSlaveDataSourceFactory.createDataSource(getFile("/META-INF/hint-master-only.yaml"));
  44. default:
  45. throw new UnsupportedOperationException("unsupported type");
  46. }
  47. }
  48. /**
  49. * 获取配置文件
  50. *
  51. * @param configFile 绝对路径或者相对路径都可
  52. * @return
  53. */
  54. private static File getFile(final String configFile) {
  55. return new File(Thread.currentThread().getClass().getResource(configFile).getFile());
  56. }
  57. private static ExampleService getExampleService(final DataSource dataSource) {
  58. return new OrderServiceImpl(dataSource);
  59. }
  60. private static void processWithHintValue(final DataSource dataSource) throws SQLException {
  61. try (HintManager hintManager = HintManager.getInstance();
  62. Connection connection = dataSource.getConnection();
  63. Statement statement = connection.createStatement()) {
  64. setHintValue(hintManager);
  65. statement.execute("select * from t_order");
  66. statement.execute("SELECT i.* FROM t_order o, t_order_item i WHERE o.order_id = i.order_id");
  67. statement.execute("select * from t_order_item");
  68. statement.execute("INSERT INTO t_order (user_id, address_id, status) VALUES (1, 1, 'init')");
  69. }
  70. }
  71. private static void setHintValue(final HintManager hintManager) {
  72. switch (TYPE) {
  73. case DATABASE_TABLES:
  74. //设置数据源分片键值
  75. hintManager.addDatabaseShardingValue("t_order", 0L);
  76. //设置表分片键值
  77. hintManager.addTableShardingValue("t_order", 1L);
  78. return;
  79. case DATABASE_ONLY:
  80. //设置数据源分片键值
  81. hintManager.setDatabaseShardingValue(1L);
  82. return;
  83. case MASTER_ONLY:
  84. //设置主库路由
  85. hintManager.setMasterRouteOnly();
  86. return;
  87. default:
  88. throw new UnsupportedOperationException("unsupported type");
  89. }
  90. }
  91. }

添加分片键值

  • 使用hintManager.addDatabaseShardingValue来添加数据源分片键值。
  • 使用hintManager.addTableShardingValue来添加表分片键值。

分库不分表情况下,强制路由至某一个分库时,可使用hintManager.setDatabaseShardingValue方式添加分片。通过此方式添加分片键值后,将跳过SQL解析和改写阶段,从而提高整体执行效率。

设置主库路由

  • 使用hintManager.setMasterRouteOnly设置主库路由。

清除分片键值

分片键值保存在ThreadLocal中,所以需要在操作结束时调用hintManager.close()来清除ThreadLocal中的内容。

hintManager实现了AutoCloseable接口,可推荐使用try with resource自动关闭。

JDK 8 中实现资源的自动关闭,要求所有资源必须在try子句中初始化。

8、测试结果

在执行过程中,如果没有指定分片算法,则使用默认分片策略。

只有使用HintManager,并进行相应的设置,才能使用指定的分片算法,否则使用默认分片策略。

源码地址

github

码云

延伸阅读
  1. MySQL 5.7 详细安装步骤
  2. Linux下MySQL的彻底卸载
  3. 一文读懂 MySQL 事务
  4. MySQL 怎么解决幻读问题
  5. MySQL join的使用和原理
发表评论