当前位置:网站首页>Bifrost 位点管理 之 异构中间件实现难点(1)

Bifrost 位点管理 之 异构中间件实现难点(1)

2020-11-09 23:53:19 布哦

Bifrost ---- 面向生产环境的 MySQL 同步到 Redis,MongoDB 等服务的异构中间件

源码下载 (你的点击 star 就是对 Bifrost 最大的支持!!!): Github Gitee

现在行里有不少的数据同步的工具,比如 datalink, 阿里的DTS , otter 等工具,甚至很多企业都在 otter 等开源工具里进行二次开发,实更了很多功能。

也和不少的开发人员,聊过类似工具的架构设计

大多数的开发人员架构设计 都是先用 canal binlog解析出来,每个表一个topic,然后再起额外的topic 消费线程,进行数据转换,再写入相对应的目标库里。其实这个设计没有毛病,而且看上去解偶设计非常得好。

咱在这里也先不讨论说一个表一个topic,等太多topic造成 kafka 磁盘IO性能下降明显 等问题。

只是有不少开发人员,对 kafka 集群的数据认为是绝对安全,并没有考滤到 kafka 数据集群崩溃等不可控的情况下造成数据丢失,

假如Kafka等工具出现严重不可恢复的情况下,怎么通知canal该从哪一个位点开始进行重新解析呢?当然完全重新来,也不是不可以,再全量一次。比如用阿里的datax,再来一次全量。

Bifrost 在开发设计的时候,考滤到性能,还有数据位点安全,还有资源最小化等多情况下,全局采用当前进程里的内存队列(当然在出现同步阻塞的时候,会自动启动文件队列功能)。全局采用内存队列,减少网络延迟,还有减少硬件资源的消耗。

Bifrost 全局有多个地方有对位点进行保存管理:

1. Bristol 模块里,在解析出 query,commit 事件的时候,对记录一次位点信息,保存到一个变量中,row_event,map_event 等其他事件的位点,并不会记录,这和binlog数据有关,map_event紧跟着row_event,row_event的解析又依懒map_event,假如 map_event之后,连接出错了,Bristol 需要自动重连等原因,假如重连接,没有map_event事件的数据,不就解析出错了么?

binlog.go

func (parser *eventParser) saveBinlog(event *EventReslut) {
   switch event.Header.EventType {
   case QUERY_EVENT,XID_EVENT:
      if event.BinlogFileName == "" {
         return
      }
      parser.binlogDump.Lock()
      parser.binlogFileName = event.BinlogFileName
      parser.binlogPosition = event.Header.LogPos
      parser.binlogTimestamp = event.Header.Timestamp
      parser.binlogDump.Unlock()
      break
   case ROTATE_EVENT:
      parser.binlogDump.Lock()
      parser.currentBinlogFileName = event.BinlogFileName
      parser.binlogDump.Unlock()
   default:
      break
   }
}

2. 每个数据源列表数据里,每3秒,调用 Bristol 模块,拿Bristol 最后一次解析的 query,commit事件的位点.

1).将位点保存到 Db数据源在内存里对应的变量中,用于界面显示显示,还有正常关闭的时候,将配置文件刷到db.bifrost文件中

2). 将DB数据源位点保存到 leveldb中,每隔2秒刷一次leveldb数据到磁盘

func (db *db) saveBinlog(){
   FileName,Position,Timestamp := db.binlogDump.GetBinlog()
   if FileName == ""{
      return
   }
   //db.Lock()
   //保存位点,这个位点在重启 配置文件恢复的时候
   //一个db有可能有多个channel,数据顺序不用担心,因为实际在重启的时候 会根据最小的 ToServerList 的位点进行自动替换
   db.binlogDumpFileName,db.binlogDumpPosition,db.binlogDumpTimestamp = FileName,Position,Timestamp
   if db.DBBinlogKey == nil{
      db.DBBinlogKey = getDBBinlogkey(db)
   }
   //db.Unlock()
   index := strings.IndexAny(FileName, ".")

   BinlogFileNum,_ := strconv.Atoi(FileName[index+1:])
   saveBinlogPosition(db.DBBinlogKey,BinlogFileNum,db.binlogDumpPosition)
}
func saveBinlogPositionToStorageFromCache()  {
   for {
      time.Sleep(2 * time.Second)
      for _, t := range TmpPositioin {
         t.Lock()
         for k, v := range t.Data {
            Val, _ := json.Marshal(v)
            storage.PutKeyVal([]byte(k) , Val)
         }
         t.Data = make(map[string]positionStruct,0)
         t.Unlock()
      }
   }
}

3. 每个数据同步队列对应最后同步成功和最后进入队列的2个位点

1). 最后同步成功的位点

    这个就是字段上的意思 ,最后同步到目标库的位点

2). 最后进入队列的位点

    每次写数据到队列的时候,都会更新最后一条更新到当前队列的数据位点

 不管是 最后同步成功 还是 最后进入队列的位点 ,都会和数据源位点一样,一份存到内存变量中,另一份存到leveldb中,每隔2秒进行刷到磁盘

 

以上 除了 Bristol 模块为了解决 mysql 连接异常,造成的自动重连的位点,不会被保存到磁盘,另外2种位点都会刷到磁盘,而且还会刷到leveldb中,为了解决中途断点或者被kill -9 等非正常退出的情况下,数据恢复的时候,才知道是从哪一个位点开始重新解析!

 

位点恢复

即然每个同步都保存了有相对应的位点信息,那肯定 就能算出一个,所有同步中,最小的位点,然后进行从算出来最小的位点进行重新开始解析。实际有,也有个别情况,是不需要参与位点大写计算的或者说,有个别情况,在两者之前位点比较的时候取更大位点的

1).  不管数据源的位点,还是同步队列的最后成功的位点,或者 最后进入队列的位点,首先配置文件中的位点数据和leveldb中取出来位点数据做对比,取大值。

 刷到leveldb的数据 是每2秒刷一次,数据理论上是延迟的,除非非正常关闭的时候,leveldb 前2位的位点数据已经进入到 磁盘,但是同步文件中的数据没刷到db.bifrost 文件的时候进程就被强制退出了。所以这里取两者的大值

2). 数据同步队列的 最后成功的位点 和 最后进入队列的位点 相同步的时候,放弃位点比较。

这里解决的问题是  假如有2个表配置了同步到 ClickHouse 或者 Redis 等目标库,Table1 更新了一条数据当前位点是 1000,然后再也没更新过数据了,以后都是Table2 这个表的数据更新,当Table2 同步数据的位点到了 3000 的时候,重启了Bfirost,那这个时候,应该是按3000这个位点开始重新解析,对不对?而不是从1000这个位点开始对吧?

会进行 保存 最后进入队列的位点 的位点,就是为了解决这个问题的

3). 假如所有同步队列 的 最后成功的位点 和 最后进入队列的位点 都是相同的情况下,直接取 数据源对应的最后的位点. 

相关源码:

server/recovery.go

func recoveryData(data map[string]dbSaveInfo,isStop bool){}

为了实现这个数据位点安全及性能,在位点管理上,做了不少事情 ,并不是代表这个方案好,而是我们在设计系统上,需要考滤极端情况下,数据怎么恢复,重新拉取一次全量任务,其实也是很不错的选择!

GitHub : https://github.com/brokercap/Bifrost

Gitee : https://gitee.com/jc3wish/Bifrost

版权声明
本文为[布哦]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/4233862/blog/4710366