mych是mysql、canal和h2的缩写,它的主要作用是可以将mysql中的数据同步到内存数据库h2中,并且保持和mysql数据库一样的结构,在mysql中查询的sql语句同样可以在内存数据库中查询。
为什么要做这样一个工具呢?初衷是为了提高效率。我们在做项目过程中经常会涉及到一些基础数据的维护,这些数据通常是存储在关系型数据库中(mysql),并且在后台管理系统中进行维护。在我们对外提供的接口中经常会涉及到对这些基础数据的访问,通常是直接去数据库查询,由于基础数据一般都是单表存储且数据量不会很大,所以对接口响应时间要求不是很高的情况下,这样做也没有什么问题。如果我们的接口对于响应时间要求比较苛刻,那么我们需要尽可能的缩短接口业务逻辑的处理时间,这样才能够快速的响应。缩短业务处理时间有几个方面可以做:减少查询数据库的次数、同步逻辑改成异步、多线程处理、使用缓存等等。mych工具也正是为了能提高查询效率,将之前从mysql中的查询转换为从本地jvm内存数据库中查询,从而达到提高查询效率的目的。
将数据库数据同步到内存中的方式不止一种,下面介绍几种方式:
- 在系统启动时查询数据库,将数据包装为jdk中的list或者map结构,然后使用定时任务定时的和数据库进行同步,这种方式的优点是在内存中访问数据会很快,缺点是当数据库中数据发生变化,内存中的数据不能及时更新,存在一定的延迟。
- 在1的基础上采用mq消息通知的方式通知内存更新数据。假设我们的后台管理系统是A系统,提供对外服务的是B系统。当我们在A系统中更改了基础数据表,可以向mq发送一条消息,B系统接收到消息后,将内存中保存的数据进行更新(这里要考虑多线程的情况,因为在更新时有可能有误操作),至于是更新某一条数据,还是全量更新取决于具体的规则设计。
以上两种方式是可以解决问题的,但缺点也很明显:
- 处理过程较为繁琐,需要程序员针对具体的基础数据表进行多次封装;
- 和业务代码耦合度较高,这部分工作属于非功能性需求,最好不要在工程中提现,不仅增加了工作量,还需要维护;
- 不具备可复用性,每个项目都需要独立开发;
- 不够灵活,定制化开发;
也正是基于以上几点,我开发了mych工具,它的原理是在项目启动时,根据配置参数去读取mysql数据库相关的表,并且解析出表结构和索引,然后在h2内存数据中创建同样结构的表和索引,而后将mysql中的数据都插入到内存数据库,这样就可以通过内存数据库查询我们需要的数据。利用canal工具监听mysql的binlog日志,当数据库数据发生变化时(我们只关注增、删、改事件),canal会通知到mych客户端,客户端对将接收到的数据变化同步更新到内存数据库,这样就能准实时的保证内存中的数据和mysql中的数据一致。内存数据库应该只支持查询操作,不能支持增删改操作。由于是基于jvm内存的,每次重启后都会重新加载数据,纯粹就是一个缓存的作用。
下面贴一些代码片段,大概介绍一下具体是如何实现的:
- Mych类,核心类,在初始化的时候调用,主要作用是创建内存数据库表,并且将mysql数据库中的数据同步到内存数据库中,主要用到的技术是jdbc相关api,尤其是元数据metadata相关的。
public void init(MySQLConfig mySQLConfig, List<String> tableNames, CanalConfig canalConfig) {
if (!flag.compareAndSet(false, true)) {
LOG.warn("Mych already initialized,Please don't call init method repeat.");
return;
}
LOG.info("Begin init Mych.");
try {
validateConfig(mySQLConfig, canalConfig);
} catch (Exception exception) {
LOG.warn("Initialization Mych failure.Because {}", exception.getMessage());
return;
}
try (Connection mysqlConnection = DBHelper.getMySQLConnection(mySQLConfig);) {
h2Connection = DBHelper.getH2Connection();
boolean allTable = false;
List<String> allTableNames = getAllTableNames(mysqlConnection, mySQLConfig.getUsername());
// 如果没有指定哪些表,则默认全部的表
if (tableNames == null || tableNames.isEmpty()) {
tableNames = allTableNames;
allTable = true;
}
for (String tableName : tableNames) {
if (!allTableNames.contains(tableName)) {
LOG.warn(
"Table [{}] not exists in database [{}].Please check the config of [spring.datasource.mych.tableNames].",
tableName, mySQLConfig.getDatabase());
continue;
}
initMemTable(mysqlConnection, tableName);
}
rowChangeListener = new RowChangeListener(canalConfig.getHost(), canalConfig.getPort(),
mySQLConfig.getDatabase(), allTable, tableNames);
rowChangeListener.start();
LOG.info("Init Mych complete.");
} catch (Exception e) {
LOG.error("Initialization Mych cause an exception.", e);
}
}
/**
*
* 功能描述:初始化内存表
*
* @author chao
* @param mysqlConn MySQL数据库连接
* @param tableName 表名称
* @param columns 表的列集合
* @param h2Conn h2数据库连接
* @throws Exception
*/
private void initMemTable(Connection mysqlConn, String tableName) throws Exception {
// 获取当前表的所有列
List<ColumnInfo> columns = getTableColumns(mysqlConn, tableName);
if (columns.isEmpty()) {
LOG.warn("Columns of table [{}] is empty.", tableName);
return;
}
//
Map<String, List<String>> indexInfo = getIndexInfo(mysqlConn, tableName);
// 创建内存表
Connection connection = getLogConnection(LOG);
try (Statement stmt = connection.createStatement();) {
stmt.executeUpdate(SqlHelper.buildCreateTabelSql(tableName, columns));
if (indexInfo != null && !indexInfo.isEmpty()) {
for (Map.Entry<String, List<String>> entry : indexInfo.entrySet()) {
stmt.executeUpdate(SqlHelper.buildCreateIndexSql(tableName, entry.getKey(), entry.getValue()));
}
}
}
// 同步mysql数据库数据到内存数据库
try (PreparedStatement prepareStatement = connection
.prepareStatement(SqlHelper.buildInsertSql(tableName, columns));
Statement statement = mysqlConn.createStatement();
ResultSet resultSet = statement.executeQuery(SqlHelper.buildSelectSql(tableName));) {
while (resultSet.next()) {
for (int i = 0; i < columns.size(); i++) {
prepareStatement.setObject(i + 1, resultSet.getObject(columns.get(i).getName()));
}
prepareStatement.execute();
}
}
}
/**
*
* 功能描述:获取数据库中所有的表名称
*
* @author chao
* @param con 数据库连接
* @param username MySQL用户名
* @return
* @throws Exception
*/
private List<String> getAllTableNames(Connection con, String username) throws Exception {
List<String> list = new ArrayList<String>();
try (ResultSet rs = con.getMetaData().getTables(con.getCatalog(), username, null, new String[] { "TABLE" });) {
while (rs.next()) {
list.add(rs.getString("TABLE_NAME"));
}
}
return list;
}
/**
*
* 功能描述:获取表的所有列
*
* @author chao
* @param con 数据库连接
* @param tableName 表名
* @return 列集合
* @throws Exception
*/
private List<ColumnInfo> getTableColumns(Connection con, String tableName) throws Exception {
List<String> primaryKeys = getTablePrimaryKeys(con, tableName);
List<ColumnInfo> columns = new ArrayList<ColumnInfo>();
try (PreparedStatement prep = con.prepareStatement(SqlHelper.buildSelectSql(tableName, 1));) {
ResultSetMetaData metadata = prep.executeQuery().getMetaData();
for (int i = 1; i <= metadata.getColumnCount(); i++) {
ColumnInfo column = new ColumnInfo();
column.setName(metadata.getColumnName(i));
column.setDataType(metadata.getColumnTypeName(i));
column.setPrecision(metadata.getPrecision(i));
column.setPrimaryKey(primaryKeys.contains(metadata.getColumnName(i)));
columns.add(column);
}
}
return columns;
}
/**
*
* 功能描述: 获取索引信息
*
* @author chao
* @param con
* @param tableName
* @return
* @throws Exception
*/
private Map<String, List<String>> getIndexInfo(Connection con, String tableName) throws Exception {
DatabaseMetaData dbMetaData = con.getMetaData();
ResultSet rs = dbMetaData.getIndexInfo(null, null, tableName, false, false);
Map<String, List<String>> map = new HashMap<>();
while (rs.next()) {
String indexName = rs.getString("INDEX_NAME");
if ("PRIMARY".equalsIgnoreCase(indexName)) {
continue;
}
String columnName = rs.getString("COLUMN_NAME");
List<String> list = map.get(indexName);
if (list == null) {
list = new ArrayList<>();
map.put(indexName, list);
}
list.add(columnName);
}
return map;
}
/**
*
* 功能描述:获取表的主键
*
* @author chao
* @param con 数据库连接
* @param tableName 表名称
* @return 主键集合
* @throws Exception
*/
private List<String> getTablePrimaryKeys(Connection con, String tableName) throws Exception {
List<String> primaryKeys = new ArrayList<String>();
try (ResultSet rs = con.getMetaData().getPrimaryKeys(null, null, tableName);) {
while (rs.next()) {
primaryKeys.add(rs.getString("COLUMN_NAME"));
}
}
return primaryKeys;
}
- RowChangeListener和RowChangeTask类,这两个是用来连接canal的客户端类,主要的业务逻辑在RowChangeTask中,由于考虑到canal服务器端可能会重启,当重启后程序应该能够自动连接,所以做了重连的操作,主要代码如下:
@Override
public void run() {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalHost, canalPort),
"example", "", "");
connector.connect();
// connector.subscribe(".*\\..*");
String filter = buildFilter();
LOG.info("filter={}", filter);
connector.subscribe(filter);
while (true) {
try {
Message message = connector.getWithoutAck(100);
if (message.getId() == -1 || message.getEntries().isEmpty()) {
sleep(SLEEP_MILLIS);
} else {
handleRowChange(message.getEntries(), tables);
connector.ack(message.getId());
}
} catch (Exception e) {
LOG.error("Canal connector cause an exception.", e);
// 重新连接
try {
connector.disconnect();
connector.connect();
} catch (Exception exception) {
LOG.error("Reconnect to canal cause an exception.", e);
// 如果重新连接失败就休眠一段时间后再次重新连接
sleep(SLEEP_MILLIS);
}
}
}
}
/**
*
* 功能描述:处理记录变化
*
* @author chao
* @param list
* @param tableNames
*/
private void handleRowChange(List<Entry> list, List<String> tableNames) {
try {
for (Entry entry : list) {
String schemaName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();
if (StringUtils.isBlank(schemaName) || StringUtils.isBlank(tableName)) {
continue;
}
if (!StringUtils.equalsIgnoreCase(db, schemaName) || !tables.contains(tableName)) {
continue;
}
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = RowChange.parseFrom(entry.getStoreValue());
// 打印debug日志
LOG.debug("binlog[{}:{}],name[{}:{}],eventType[{}]", entry.getHeader().getLogfileName(),
entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(),
entry.getHeader().getTableName(), rowChage.getEventType());
for (RowData rowData : rowChage.getRowDatasList()) {
EventHandlerManager.handle(tableName, rowChage.getEventType(), rowData);
}
}
} catch (Exception exception) {
LOG.error("Handle row change cause an exception.", exception);
}
}
private String buildFilter() {
if (allTable) {
return db + "\\..*";
}
StringBuilder builder = new StringBuilder();
for (String tableName : tables) {
builder.append(db).append("\\.").append(tableName).append(",");
}
return builder.substring(0, builder.length() - 1);
}
- InsertEventHandler、DeleteEventHandler和UpdateEventHandler类,这三个类分别用来处理增加、删除和修改数据的,代码比较简单,主要是用jdbc操作数据库,这里就不贴出代码了。
至此mych就完成了,目前没有提供查询数据的api,是想着把查询操作交给使用者自己来做,因为现在项目中基本使用orm来操作数据库,内存数据库也是数据库,在使用上只是新建一个数据源,后续可以继续改进一下,将这部分操作在mych中提供,这样更减少使用者的工作量。
后记:为了使用更为简单,做了一个mych的springboot-starter,这个starter非常简单,就是根据配置参数,调用Mych的初始化方法进行初始化操作,这样不用调用者显示调用了,至于如何写springboot-starter不是本文的重点,网上有很多的参考教程,这里不再详述了。
本文暂时没有评论,来添加一个吧(●'◡'●)