编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

mysql内存数据库同步工具mych(两个mysql数据库之间数据同步)

wxchong 2024-07-16 10:07:27 开源技术 20 ℃ 0 评论

mych是mysql、canal和h2的缩写,它的主要作用是可以将mysql中的数据同步到内存数据库h2中,并且保持和mysql数据库一样的结构,在mysql中查询的sql语句同样可以在内存数据库中查询。

为什么要做这样一个工具呢?初衷是为了提高效率。我们在做项目过程中经常会涉及到一些基础数据的维护,这些数据通常是存储在关系型数据库中(mysql),并且在后台管理系统中进行维护。在我们对外提供的接口中经常会涉及到对这些基础数据的访问,通常是直接去数据库查询,由于基础数据一般都是单表存储且数据量不会很大,所以对接口响应时间要求不是很高的情况下,这样做也没有什么问题。如果我们的接口对于响应时间要求比较苛刻,那么我们需要尽可能的缩短接口业务逻辑的处理时间,这样才能够快速的响应。缩短业务处理时间有几个方面可以做:减少查询数据库的次数、同步逻辑改成异步、多线程处理、使用缓存等等。mych工具也正是为了能提高查询效率,将之前从mysql中的查询转换为从本地jvm内存数据库中查询,从而达到提高查询效率的目的。

将数据库数据同步到内存中的方式不止一种,下面介绍几种方式:

  1. 在系统启动时查询数据库,将数据包装为jdk中的list或者map结构,然后使用定时任务定时的和数据库进行同步,这种方式的优点是在内存中访问数据会很快,缺点是当数据库中数据发生变化,内存中的数据不能及时更新,存在一定的延迟。
  2. 在1的基础上采用mq消息通知的方式通知内存更新数据。假设我们的后台管理系统是A系统,提供对外服务的是B系统。当我们在A系统中更改了基础数据表,可以向mq发送一条消息,B系统接收到消息后,将内存中保存的数据进行更新(这里要考虑多线程的情况,因为在更新时有可能有误操作),至于是更新某一条数据,还是全量更新取决于具体的规则设计。

以上两种方式是可以解决问题的,但缺点也很明显:

  1. 处理过程较为繁琐,需要程序员针对具体的基础数据表进行多次封装;
  2. 和业务代码耦合度较高,这部分工作属于非功能性需求,最好不要在工程中提现,不仅增加了工作量,还需要维护;
  3. 不具备可复用性,每个项目都需要独立开发;
  4. 不够灵活,定制化开发;

也正是基于以上几点,我开发了mych工具,它的原理是在项目启动时,根据配置参数去读取mysql数据库相关的表,并且解析出表结构和索引,然后在h2内存数据中创建同样结构的表和索引,而后将mysql中的数据都插入到内存数据库,这样就可以通过内存数据库查询我们需要的数据。利用canal工具监听mysql的binlog日志,当数据库数据发生变化时(我们只关注增、删、改事件),canal会通知到mych客户端,客户端对将接收到的数据变化同步更新到内存数据库,这样就能准实时的保证内存中的数据和mysql中的数据一致。内存数据库应该只支持查询操作,不能支持增删改操作。由于是基于jvm内存的,每次重启后都会重新加载数据,纯粹就是一个缓存的作用。

下面贴一些代码片段,大概介绍一下具体是如何实现的:

  • Mych类,核心类,在初始化的时候调用,主要作用是创建内存数据库表,并且将mysql数据库中的数据同步到内存数据库中,主要用到的技术是jdbc相关api,尤其是元数据metadata相关的。

public void init(MySQLConfig mySQLConfig, List<String> tableNames, CanalConfig canalConfig) {

if (!flag.compareAndSet(false, true)) {

LOG.warn("Mych already initialized,Please don't call init method repeat.");

return;

}

LOG.info("Begin init Mych.");

try {

validateConfig(mySQLConfig, canalConfig);

} catch (Exception exception) {

LOG.warn("Initialization Mych failure.Because {}", exception.getMessage());

return;

}

try (Connection mysqlConnection = DBHelper.getMySQLConnection(mySQLConfig);) {

h2Connection = DBHelper.getH2Connection();

boolean allTable = false;

List<String> allTableNames = getAllTableNames(mysqlConnection, mySQLConfig.getUsername());

// 如果没有指定哪些表,则默认全部的表

if (tableNames == null || tableNames.isEmpty()) {

tableNames = allTableNames;

allTable = true;

}

for (String tableName : tableNames) {

if (!allTableNames.contains(tableName)) {

LOG.warn(

"Table [{}] not exists in database [{}].Please check the config of [spring.datasource.mych.tableNames].",

tableName, mySQLConfig.getDatabase());

continue;

}

initMemTable(mysqlConnection, tableName);

}

rowChangeListener = new RowChangeListener(canalConfig.getHost(), canalConfig.getPort(),

mySQLConfig.getDatabase(), allTable, tableNames);

rowChangeListener.start();

LOG.info("Init Mych complete.");

} catch (Exception e) {

LOG.error("Initialization Mych cause an exception.", e);

}

}

/**

*

* 功能描述:初始化内存表

*

* @author chao

* @param mysqlConn MySQL数据库连接

* @param tableName 表名称

* @param columns 表的列集合

* @param h2Conn h2数据库连接

* @throws Exception

*/

private void initMemTable(Connection mysqlConn, String tableName) throws Exception {

// 获取当前表的所有列

List<ColumnInfo> columns = getTableColumns(mysqlConn, tableName);

if (columns.isEmpty()) {

LOG.warn("Columns of table [{}] is empty.", tableName);

return;

}

//

Map<String, List<String>> indexInfo = getIndexInfo(mysqlConn, tableName);

// 创建内存表

Connection connection = getLogConnection(LOG);

try (Statement stmt = connection.createStatement();) {

stmt.executeUpdate(SqlHelper.buildCreateTabelSql(tableName, columns));

if (indexInfo != null && !indexInfo.isEmpty()) {

for (Map.Entry<String, List<String>> entry : indexInfo.entrySet()) {

stmt.executeUpdate(SqlHelper.buildCreateIndexSql(tableName, entry.getKey(), entry.getValue()));

}

}

}

// 同步mysql数据库数据到内存数据库

try (PreparedStatement prepareStatement = connection

.prepareStatement(SqlHelper.buildInsertSql(tableName, columns));

Statement statement = mysqlConn.createStatement();

ResultSet resultSet = statement.executeQuery(SqlHelper.buildSelectSql(tableName));) {

while (resultSet.next()) {

for (int i = 0; i < columns.size(); i++) {

prepareStatement.setObject(i + 1, resultSet.getObject(columns.get(i).getName()));

}

prepareStatement.execute();

}

}

}

/**

*

* 功能描述:获取数据库中所有的表名称

*

* @author chao

* @param con 数据库连接

* @param username MySQL用户名

* @return

* @throws Exception

*/

private List<String> getAllTableNames(Connection con, String username) throws Exception {

List<String> list = new ArrayList<String>();

try (ResultSet rs = con.getMetaData().getTables(con.getCatalog(), username, null, new String[] { "TABLE" });) {

while (rs.next()) {

list.add(rs.getString("TABLE_NAME"));

}

}

return list;

}

/**

*

* 功能描述:获取表的所有列

*

* @author chao

* @param con 数据库连接

* @param tableName 表名

* @return 列集合

* @throws Exception

*/

private List<ColumnInfo> getTableColumns(Connection con, String tableName) throws Exception {

List<String> primaryKeys = getTablePrimaryKeys(con, tableName);

List<ColumnInfo> columns = new ArrayList<ColumnInfo>();

try (PreparedStatement prep = con.prepareStatement(SqlHelper.buildSelectSql(tableName, 1));) {

ResultSetMetaData metadata = prep.executeQuery().getMetaData();

for (int i = 1; i <= metadata.getColumnCount(); i++) {

ColumnInfo column = new ColumnInfo();

column.setName(metadata.getColumnName(i));

column.setDataType(metadata.getColumnTypeName(i));

column.setPrecision(metadata.getPrecision(i));

column.setPrimaryKey(primaryKeys.contains(metadata.getColumnName(i)));

columns.add(column);

}

}

return columns;

}

/**

*

* 功能描述: 获取索引信息

*

* @author chao

* @param con

* @param tableName

* @return

* @throws Exception

*/

private Map<String, List<String>> getIndexInfo(Connection con, String tableName) throws Exception {

DatabaseMetaData dbMetaData = con.getMetaData();

ResultSet rs = dbMetaData.getIndexInfo(null, null, tableName, false, false);

Map<String, List<String>> map = new HashMap<>();

while (rs.next()) {

String indexName = rs.getString("INDEX_NAME");

if ("PRIMARY".equalsIgnoreCase(indexName)) {

continue;

}

String columnName = rs.getString("COLUMN_NAME");

List<String> list = map.get(indexName);

if (list == null) {

list = new ArrayList<>();

map.put(indexName, list);

}

list.add(columnName);

}

return map;

}

/**

*

* 功能描述:获取表的主键

*

* @author chao

* @param con 数据库连接

* @param tableName 表名称

* @return 主键集合

* @throws Exception

*/

private List<String> getTablePrimaryKeys(Connection con, String tableName) throws Exception {

List<String> primaryKeys = new ArrayList<String>();

try (ResultSet rs = con.getMetaData().getPrimaryKeys(null, null, tableName);) {

while (rs.next()) {

primaryKeys.add(rs.getString("COLUMN_NAME"));

}

}

return primaryKeys;

}

  • RowChangeListener和RowChangeTask类,这两个是用来连接canal的客户端类,主要的业务逻辑在RowChangeTask中,由于考虑到canal服务器端可能会重启,当重启后程序应该能够自动连接,所以做了重连的操作,主要代码如下:

@Override

public void run() {

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalHost, canalPort),

"example", "", "");

connector.connect();

// connector.subscribe(".*\\..*");

String filter = buildFilter();

LOG.info("filter={}", filter);

connector.subscribe(filter);

while (true) {

try {

Message message = connector.getWithoutAck(100);

if (message.getId() == -1 || message.getEntries().isEmpty()) {

sleep(SLEEP_MILLIS);

} else {

handleRowChange(message.getEntries(), tables);

connector.ack(message.getId());

}

} catch (Exception e) {

LOG.error("Canal connector cause an exception.", e);

// 重新连接

try {

connector.disconnect();

connector.connect();

} catch (Exception exception) {

LOG.error("Reconnect to canal cause an exception.", e);

// 如果重新连接失败就休眠一段时间后再次重新连接

sleep(SLEEP_MILLIS);

}

}

}

}

/**

*

* 功能描述:处理记录变化

*

* @author chao

* @param list

* @param tableNames

*/

private void handleRowChange(List<Entry> list, List<String> tableNames) {

try {

for (Entry entry : list) {

String schemaName = entry.getHeader().getSchemaName();

String tableName = entry.getHeader().getTableName();

if (StringUtils.isBlank(schemaName) || StringUtils.isBlank(tableName)) {

continue;

}

if (!StringUtils.equalsIgnoreCase(db, schemaName) || !tables.contains(tableName)) {

continue;

}

if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN

|| entry.getEntryType() == EntryType.TRANSACTIONEND) {

continue;

}

RowChange rowChage = RowChange.parseFrom(entry.getStoreValue());

// 打印debug日志

LOG.debug("binlog[{}:{}],name[{}:{}],eventType[{}]", entry.getHeader().getLogfileName(),

entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(),

entry.getHeader().getTableName(), rowChage.getEventType());

for (RowData rowData : rowChage.getRowDatasList()) {

EventHandlerManager.handle(tableName, rowChage.getEventType(), rowData);

}

}

} catch (Exception exception) {

LOG.error("Handle row change cause an exception.", exception);

}

}

private String buildFilter() {

if (allTable) {

return db + "\\..*";

}

StringBuilder builder = new StringBuilder();

for (String tableName : tables) {

builder.append(db).append("\\.").append(tableName).append(",");

}

return builder.substring(0, builder.length() - 1);

}

  • InsertEventHandler、DeleteEventHandler和UpdateEventHandler类,这三个类分别用来处理增加、删除和修改数据的,代码比较简单,主要是用jdbc操作数据库,这里就不贴出代码了。

至此mych就完成了,目前没有提供查询数据的api,是想着把查询操作交给使用者自己来做,因为现在项目中基本使用orm来操作数据库,内存数据库也是数据库,在使用上只是新建一个数据源,后续可以继续改进一下,将这部分操作在mych中提供,这样更减少使用者的工作量。

后记:为了使用更为简单,做了一个mych的springboot-starter,这个starter非常简单,就是根据配置参数,调用Mych的初始化方法进行初始化操作,这样不用调用者显示调用了,至于如何写springboot-starter不是本文的重点,网上有很多的参考教程,这里不再详述了。

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表