golang类似canal监听MySQL的binlog变更进行数据库多活部署

4次阅读
没有评论

共计 5792 个字符,预计需要花费 15 分钟才能阅读完成。

golang类似canal监听MySQL的binlog变更进行数据库多活部署

前言

假期在家学习技术,最近用一些小厂的VPS由于小厂不大稳定会做MySQL多活的配置,但是目前没有找到很好的工具做MySQL之间的同步的工作所以就做了这么一个小工具,上图是我引入的库,下面是一些关键代码

关键代码

package main

import (
    "database/sql"
    "fmt"
    "github.com/go-mysql-org/go-mysql/canal"
    "github.com/go-mysql-org/go-mysql/mysql"
    "github.com/go-mysql-org/go-mysql/replication"
    _ "github.com/go-sql-driver/mysql" // 导入 MySQL 驱动
    "github.com/spf13/viper"
    "log"
    "os"
    "strings"
    "time"
)

var gMonitoredDBs []string

// 将出错的 SQL 保存到按日期命名的文件
func saveErrorSQLToFile(sql string, err error) {
    date := time.Now().Format("2006-01-02")
    fileName := fmt.Sprintf("error_sql_%s.log", date)
    file, openErr := os.OpenFile(fileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
    if openErr != nil {
        log.Printf("Failed to open error SQL log file: %v", openErr)
        return
    }
    defer file.Close()

    logEntry := fmt.Sprintf("SQL: %s, Error: %v\n", sql, err)
    if _, writeErr := file.WriteString(logEntry); writeErr != nil {
        log.Printf("Failed to write to error SQL log file: %v", writeErr)
    }
}

// 获取当前 MySQL 的 binlog Position
func getCurrentBinlogPosition(user, password, addr string) (mysql.Position, error) {
    dsn := fmt.Sprintf("%s:%s@tcp(%s)/", user, password, addr)
    db, err := sql.Open("mysql", dsn)
    if err != nil {
        return mysql.Position{}, err
    }
    defer db.Close()

    var logFile string
    var logPos uint32
    var binlogDoDB string
    var binlogIgnoreDB string
    var executedGtidSet string
    err = db.QueryRow("SHOW MASTER STATUS").Scan(&logFile, &logPos, &binlogDoDB, &binlogIgnoreDB, &executedGtidSet)
    if err != nil {
        return mysql.Position{}, err
    }

    return mysql.Position{
        Name: logFile,
        Pos:  logPos,
    }, nil
}

// 同步数据到目标数据库
func syncToTargetDB(targetDB *sql.DB, e *canal.RowsEvent) error {
    var query string
    columnNames := make([]string, 0, len(e.Table.Columns))
    for _, col := range e.Table.Columns {
        columnNames = append(columnNames, col.Name)
    }
    columns := strings.Join(columnNames, ", ")
    placeholders := strings.Repeat("?, ", len(columnNames))
    placeholders = strings.TrimRight(placeholders, ", ")

    switch e.Action {
    case "insert":
        for _, row := range e.Rows {
            query = fmt.Sprintf("INSERT INTO %s.%s (%s) VALUES (%s)", e.Table.Schema, e.Table.Name, columns, placeholders)
            _, err := targetDB.Exec(query, row...)
            if err != nil {
                saveErrorSQLToFile(query, err)
                return err
            }
        }
    case "update":
        for i := 0; i < len(e.Rows); i += 2 {
            oldRow := e.Rows[i]
            newRow := e.Rows[i+1]
            setClause := ""
            whereClause := ""
            for _, col := range columnNames {
                if setClause != "" {
                    setClause += ", "
                }
                setClause += fmt.Sprintf("%s = ?", col)
                if whereClause != "" {
                    whereClause += " AND "
                }
                whereClause += fmt.Sprintf("%s = ?", col)
            }
            query = fmt.Sprintf("UPDATE %s.%s SET %s WHERE %s", e.Table.Schema, e.Table.Name, setClause, whereClause)
            args := append(newRow, oldRow...)
            _, err := targetDB.Exec(query, args...)
            if err != nil {
                saveErrorSQLToFile(query, err)
                return err
            }
        }
    case "delete":
        columns := strings.Join(columnNames, " = ? AND ")
        columns += " = ?"
        for _, row := range e.Rows {
            query = fmt.Sprintf("DELETE FROM %s.%s WHERE %s", e.Table.Schema, e.Table.Name, columns)
            _, err := targetDB.Exec(query, row...)
            if err != nil {
                saveErrorSQLToFile(query, err)
                return err
            }
        }
    }
    return nil
}

type CustomEventHandler struct {
    canal.DummyEventHandler
}

func (h *CustomEventHandler) String() string {
    return "CustomEventHandler"
}

func (h *CustomEventHandler) OnRow(e *canal.RowsEvent) error {
    // 检查数据库是否在监控列表中
    dbName := e.Table.Schema
    if len(gMonitoredDBs) > 0 && !strings.Contains(strings.Join(gMonitoredDBs, ","), dbName) {
        log.Printf("Ignoring event in database %s", dbName)
        return nil
    }
    log.Printf("Rows event in database %s, table %s: %s\n", e.Table.Schema, e.Table.Name, e.Action)
    for _, row := range e.Rows {
        log.Printf("%v", row)
    }
    // 记录当前的 Position
    // 假设 c 是 canal 实例,这里需要在 main 函数中传递
    // currentPos := c.SyncedPosition()
    // log.Printf("Current binlog position: %s %d", currentPos.Name, currentPos.Pos)

    // 同步数据到目标数据库
    targetDB, err := getTargetDB()
    if err != nil {
        log.Printf("Error getting target DB: %v", err)
        return err
    }
    err = syncToTargetDB(targetDB, e)
    if err != nil {
        log.Printf("Error syncing data: %v", err)
    }
    return nil
}

func (h *CustomEventHandler) OnDDL(header *replication.EventHeader, nextPos mysql.Position, event *replication.QueryEvent) error {
    query := string(event.Query)
    // 检查数据库是否在监控列表中
    dbName := strings.SplitN(query, ".", 2)[0]
    if len(gMonitoredDBs) > 0 && !strings.Contains(strings.Join(gMonitoredDBs, ","), dbName) {
        log.Printf("Ignoring DDL event in database %s", dbName)
        return nil
    }
    log.Printf("DDL event: %s", query)
    // 记录当前的 Position
    // 假设 c 是 canal 实例,这里需要在 main 函数中传递
    // currentPos := c.SyncedPosition()
    // log.Printf("Current binlog position: %s %d", currentPos.Name, currentPos.Pos)

    // 执行 DDL 语句到目标数据库
    targetDB, err := getTargetDB()
    if err != nil {
        log.Printf("Error getting target DB: %v", err)
        return err
    }
    _, err = targetDB.Exec(query)
    if err != nil {
        saveErrorSQLToFile(query, err)
        log.Printf("Error executing DDL on target DB: %v", err)
    }
    return nil
}

func getTargetDB() (*sql.DB, error) {
    // 初始化 Viper
    viper.SetConfigName("config")
    viper.SetConfigType("yaml")
    viper.AddConfigPath(".")

    err := viper.ReadInConfig()
    if err != nil {
        return nil, err
    }

    // 获取目标数据库配置
    targetAddr := viper.GetString("target.addr")
    targetUser := viper.GetString("target.user")
    targetPassword := viper.GetString("target.password")

    // 连接目标数据库
    targetDSN := fmt.Sprintf("%s:%s@tcp(%s)/", targetUser, targetPassword, targetAddr)
    targetDB, err := sql.Open("mysql", targetDSN)
    if err != nil {
        return nil, err
    }
    return targetDB, nil
}

func main() {
    // 初始化 Viper
    viper.SetConfigName("config")
    viper.SetConfigType("yaml")
    viper.AddConfigPath(".")

    err := viper.ReadInConfig()
    if err != nil {
        log.Fatalf("Failed to read config file: %v", err)
    }

    // 获取源数据库配置
    sourceAddr := viper.GetString("source.addr")
    sourceUser := viper.GetString("source.user")
    sourcePassword := viper.GetString("source.password")
    monitoredDBs := viper.GetStringSlice("source.monitored_databases")

    // 创建 canal 配置
    cfg := canal.NewDefaultConfig()
    cfg.Addr = sourceAddr
    cfg.User = sourceUser
    cfg.Password = sourcePassword
    cfg.Flavor = "mysql"
    cfg.Dump.ExecutionPath = ""
    if len(monitoredDBs) > 0 {
        cfg.Dump.Databases = monitoredDBs
        gMonitoredDBs = monitoredDBs
    }

    // 创建 canal 实例
    c, err := canal.NewCanal(cfg)
    if err != nil {
        log.Fatal(err)
    }

    // 获取当前的 binlog Position
    pos, err := getCurrentBinlogPosition(sourceUser, sourcePassword, sourceAddr)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("Starting from binlog position: %s %d\n", pos.Name, pos.Pos)

    // 注册 binlog 事件处理函数
    c.SetEventHandler(&CustomEventHandler{})
    log.Printf("监控启动成功 当前监控的数据库: %v\n", monitoredDBs)
    // 从指定位置开始同步 binlog
    err = c.RunFrom(pos)
    if err != nil {
        log.Fatal(err)
    }
}
正文完
 0
Eric chan
版权声明:本站原创文章,由 Eric chan 于2025-05-04发表,共计5792字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。