共计 5792 个字符,预计需要花费 15 分钟才能阅读完成。
前言
假期在家学习技术,最近用一些小厂的VPS由于小厂不大稳定会做MySQL多活的配置,但是目前没有找到很好的工具做MySQL之间的同步的工作所以就做了这么一个小工具,上图是我引入的库,下面是一些关键代码
关键代码
package main
import (
"database/sql"
"fmt"
"github.com/go-mysql-org/go-mysql/canal"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
_ "github.com/go-sql-driver/mysql" // 导入 MySQL 驱动
"github.com/spf13/viper"
"log"
"os"
"strings"
"time"
)
var gMonitoredDBs []string
// 将出错的 SQL 保存到按日期命名的文件
func saveErrorSQLToFile(sql string, err error) {
date := time.Now().Format("2006-01-02")
fileName := fmt.Sprintf("error_sql_%s.log", date)
file, openErr := os.OpenFile(fileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if openErr != nil {
log.Printf("Failed to open error SQL log file: %v", openErr)
return
}
defer file.Close()
logEntry := fmt.Sprintf("SQL: %s, Error: %v\n", sql, err)
if _, writeErr := file.WriteString(logEntry); writeErr != nil {
log.Printf("Failed to write to error SQL log file: %v", writeErr)
}
}
// 获取当前 MySQL 的 binlog Position
func getCurrentBinlogPosition(user, password, addr string) (mysql.Position, error) {
dsn := fmt.Sprintf("%s:%s@tcp(%s)/", user, password, addr)
db, err := sql.Open("mysql", dsn)
if err != nil {
return mysql.Position{}, err
}
defer db.Close()
var logFile string
var logPos uint32
var binlogDoDB string
var binlogIgnoreDB string
var executedGtidSet string
err = db.QueryRow("SHOW MASTER STATUS").Scan(&logFile, &logPos, &binlogDoDB, &binlogIgnoreDB, &executedGtidSet)
if err != nil {
return mysql.Position{}, err
}
return mysql.Position{
Name: logFile,
Pos: logPos,
}, nil
}
// 同步数据到目标数据库
func syncToTargetDB(targetDB *sql.DB, e *canal.RowsEvent) error {
var query string
columnNames := make([]string, 0, len(e.Table.Columns))
for _, col := range e.Table.Columns {
columnNames = append(columnNames, col.Name)
}
columns := strings.Join(columnNames, ", ")
placeholders := strings.Repeat("?, ", len(columnNames))
placeholders = strings.TrimRight(placeholders, ", ")
switch e.Action {
case "insert":
for _, row := range e.Rows {
query = fmt.Sprintf("INSERT INTO %s.%s (%s) VALUES (%s)", e.Table.Schema, e.Table.Name, columns, placeholders)
_, err := targetDB.Exec(query, row...)
if err != nil {
saveErrorSQLToFile(query, err)
return err
}
}
case "update":
for i := 0; i < len(e.Rows); i += 2 {
oldRow := e.Rows[i]
newRow := e.Rows[i+1]
setClause := ""
whereClause := ""
for _, col := range columnNames {
if setClause != "" {
setClause += ", "
}
setClause += fmt.Sprintf("%s = ?", col)
if whereClause != "" {
whereClause += " AND "
}
whereClause += fmt.Sprintf("%s = ?", col)
}
query = fmt.Sprintf("UPDATE %s.%s SET %s WHERE %s", e.Table.Schema, e.Table.Name, setClause, whereClause)
args := append(newRow, oldRow...)
_, err := targetDB.Exec(query, args...)
if err != nil {
saveErrorSQLToFile(query, err)
return err
}
}
case "delete":
columns := strings.Join(columnNames, " = ? AND ")
columns += " = ?"
for _, row := range e.Rows {
query = fmt.Sprintf("DELETE FROM %s.%s WHERE %s", e.Table.Schema, e.Table.Name, columns)
_, err := targetDB.Exec(query, row...)
if err != nil {
saveErrorSQLToFile(query, err)
return err
}
}
}
return nil
}
type CustomEventHandler struct {
canal.DummyEventHandler
}
func (h *CustomEventHandler) String() string {
return "CustomEventHandler"
}
func (h *CustomEventHandler) OnRow(e *canal.RowsEvent) error {
// 检查数据库是否在监控列表中
dbName := e.Table.Schema
if len(gMonitoredDBs) > 0 && !strings.Contains(strings.Join(gMonitoredDBs, ","), dbName) {
log.Printf("Ignoring event in database %s", dbName)
return nil
}
log.Printf("Rows event in database %s, table %s: %s\n", e.Table.Schema, e.Table.Name, e.Action)
for _, row := range e.Rows {
log.Printf("%v", row)
}
// 记录当前的 Position
// 假设 c 是 canal 实例,这里需要在 main 函数中传递
// currentPos := c.SyncedPosition()
// log.Printf("Current binlog position: %s %d", currentPos.Name, currentPos.Pos)
// 同步数据到目标数据库
targetDB, err := getTargetDB()
if err != nil {
log.Printf("Error getting target DB: %v", err)
return err
}
err = syncToTargetDB(targetDB, e)
if err != nil {
log.Printf("Error syncing data: %v", err)
}
return nil
}
func (h *CustomEventHandler) OnDDL(header *replication.EventHeader, nextPos mysql.Position, event *replication.QueryEvent) error {
query := string(event.Query)
// 检查数据库是否在监控列表中
dbName := strings.SplitN(query, ".", 2)[0]
if len(gMonitoredDBs) > 0 && !strings.Contains(strings.Join(gMonitoredDBs, ","), dbName) {
log.Printf("Ignoring DDL event in database %s", dbName)
return nil
}
log.Printf("DDL event: %s", query)
// 记录当前的 Position
// 假设 c 是 canal 实例,这里需要在 main 函数中传递
// currentPos := c.SyncedPosition()
// log.Printf("Current binlog position: %s %d", currentPos.Name, currentPos.Pos)
// 执行 DDL 语句到目标数据库
targetDB, err := getTargetDB()
if err != nil {
log.Printf("Error getting target DB: %v", err)
return err
}
_, err = targetDB.Exec(query)
if err != nil {
saveErrorSQLToFile(query, err)
log.Printf("Error executing DDL on target DB: %v", err)
}
return nil
}
func getTargetDB() (*sql.DB, error) {
// 初始化 Viper
viper.SetConfigName("config")
viper.SetConfigType("yaml")
viper.AddConfigPath(".")
err := viper.ReadInConfig()
if err != nil {
return nil, err
}
// 获取目标数据库配置
targetAddr := viper.GetString("target.addr")
targetUser := viper.GetString("target.user")
targetPassword := viper.GetString("target.password")
// 连接目标数据库
targetDSN := fmt.Sprintf("%s:%s@tcp(%s)/", targetUser, targetPassword, targetAddr)
targetDB, err := sql.Open("mysql", targetDSN)
if err != nil {
return nil, err
}
return targetDB, nil
}
func main() {
// 初始化 Viper
viper.SetConfigName("config")
viper.SetConfigType("yaml")
viper.AddConfigPath(".")
err := viper.ReadInConfig()
if err != nil {
log.Fatalf("Failed to read config file: %v", err)
}
// 获取源数据库配置
sourceAddr := viper.GetString("source.addr")
sourceUser := viper.GetString("source.user")
sourcePassword := viper.GetString("source.password")
monitoredDBs := viper.GetStringSlice("source.monitored_databases")
// 创建 canal 配置
cfg := canal.NewDefaultConfig()
cfg.Addr = sourceAddr
cfg.User = sourceUser
cfg.Password = sourcePassword
cfg.Flavor = "mysql"
cfg.Dump.ExecutionPath = ""
if len(monitoredDBs) > 0 {
cfg.Dump.Databases = monitoredDBs
gMonitoredDBs = monitoredDBs
}
// 创建 canal 实例
c, err := canal.NewCanal(cfg)
if err != nil {
log.Fatal(err)
}
// 获取当前的 binlog Position
pos, err := getCurrentBinlogPosition(sourceUser, sourcePassword, sourceAddr)
if err != nil {
log.Fatal(err)
}
log.Printf("Starting from binlog position: %s %d\n", pos.Name, pos.Pos)
// 注册 binlog 事件处理函数
c.SetEventHandler(&CustomEventHandler{})
log.Printf("监控启动成功 当前监控的数据库: %v\n", monitoredDBs)
// 从指定位置开始同步 binlog
err = c.RunFrom(pos)
if err != nil {
log.Fatal(err)
}
}
正文完