SQL 入库流程
- 服务器与 MySQL 建立连接
- 依次经过 MySQL 服务器内存中 Server 层的分析器、优化器、执行器
- 执行器根据执行计划操作 InnoDB 引擎
- InnoDB 从磁盘数据文件中将 data 读到缓冲池中
- 修改之前,会写入 undo log 将 data 存起来
- 然后将缓冲池中的 data 改成 new_data
- 写入一条 redo log 将修改后的 new_data 存起来
- 写入一条 binlog 将修改后的值 new_data 存起来
- 后台 IO 线程将缓冲池中被修改的值刷入磁盘
redo log 就是重做日志的意思
com.github.shyiko mysql-binlog-connector-java 0.21.0
public class MySQLRecord {private String database;private String table;private final List records = new LinkedList<>();public MySQLRecord() {}public MySQLRecord(String database, String table) {this.database = database;this.table = table;}@Overridepublic String toString() {return "{\"database\"=\"" + database + "\"," +"\"table\"=" + table + "\"," +"\"records\"=" + records + "}";}public void setRecord(UpdateRecord record) {records.add(record);}public void setRecords(List records) {this.records.addAll(records);}public void setDatabase(String database) {this.database = database;}public void setTable(String table) {this.table = table;}public static class UpdateRecord {public String before;public String after;public UpdateRecord(String before, String after) {this.before = before;this.after = after;}@Overridepublic String toString() {return "{\"before\"=\"" + before + "\"," +"\"after\"=" + after + "\"}";}}}
public class mysql {private static final Map> dbMap = new HashMap<>();public static void main(String[] args) {BinaryLogClient client = new BinaryLogClient("127.0.0.1", 3306, "root", "123456");client.setServerId(1);client.registerEventListener(event -> {EventData data = event.getData();if (data instanceof TableMapEventData) {TableMapEventData tableMapEventData = (TableMapEventData) data;if (!dbMap.containsKey(tableMapEventData.getTableId())) {dbMap.put(tableMapEventData.getTableId(), new HashMap(){{put("database",tableMapEventData.getDatabase());put("table", tableMapEventData.getTable());}});}}if (data instanceof UpdateRowsEventData) {format(((UpdateRowsEventData) data).getTableId(), 0, ((UpdateRowsEventData) data).getRows());} else if (data instanceof WriteRowsEventData) {format(((WriteRowsEventData) data).getTableId(), 1, ((WriteRowsEventData) data).getRows());} else if (data instanceof DeleteRowsEventData) {format(((DeleteRowsEventData) data).getTableId(), -1, ((DeleteRowsEventData) data).getRows());}});try {client.connect();} catch (IOException e) {e.printStackTrace();}}// 0 update, 1 insert, -1 deletepublic static void format(Long tableId, int type, List> data) {MySQLRecord records = new MySQLRecord(dbMap.get(tableId).get("database"), dbMap.get(tableId).get("table"));for (Object row : data) {records.setRecord(new MySQLRecord.UpdateRecord(type == 0 ? Arrays.toString((Object[]) ((Map.Entry, ?>) row).getKey()) :type == 1 ? null : Arrays.toString((Object[]) row),type == 0 ? Arrays.toString((Object[]) ((Map.Entry, ?>) row).getValue()) :type == 1 ? Arrays.toString((Object[]) row) : null));}System.out.println(records);}}