技术文章

如何用python监视mysql数据库的更新?

其实每种数据库都能找到一些方法去监控数据的变化,比如mysql可以通过配置my.ini将数据库操作日志写到文本文件中,然后通过分析文本去获取变化。但是这样处理实在缺少Python精神:一是你的代码同特定数据库深度耦合,如果后续会迁移到其它数据库问题很多;二是这种代码安装部署很麻烦,需要系统管理员去配合修改mysql设置,而且对mysql的性能影响也需要测试人员进行深度测试。

最终,我选择了一种看起来有点笨,但却非常通用,而且对数据库的性能影响也能预估的方法:使用sql语句去监控数据表的变化。

这种方法具有以下优点:

  • 只使用sql语句,很容易移植到其它数据库系统中使用。
  • 定义好轮询间隔时间,可以有效的控制对数据库系统的资源占用。
  • 安装配置非常简单,无需修改数据库系统的设置。

设计思路非常简单,每隔固定间隔检查一下数据表,如果有新的记录触发回调函数。通常的业务需要两种监控模式,一是新增记录监听(我称之为listen),二是监控已有记录的变化(称之为moniter)。

新增记录的监听

所有待监听的表需要有一个自增的字段id,只要判断上一次轮询后有没有新的id出现即可。你需要将上一次处理的最后一个id存储下来,这里我只用了一个变量去存储,你可能需要把它持久存在磁盘或数据库里。代码原型如下:

#!/usr/bin/python
# -*- coding: UTF-8 -*-
import threading
class BaseListener(object):
#使用一个线程启动监听
def __init__(self):
self.checkpoint = 0
self.listen_thread = threading.Thread(name="Listener", target=self.do_listen)
self.listen_thread.start()
def start(self):
self.stop_flag = False
def stop(self):
self.stop_flag = True
def set_checkpoint(self, v):
#设置监听的断点,如果需要可以持久存储在磁盘上
self.checkpoint = v
def get_checkpoint(self):
return self.checkpoint
def do_listen(self):
while True:
if not self.stop_flag:
#监听用sql语句,应当以id倒排,需要使用 WHERE id > {CHECK_POINT}进行筛选,如
sql = "SELECT * FROM a WHERE id>{CHECK_POINT} ORDER BY id DESC"
checkpoint = self.get_checkpoint()
sql_listen = sql.replace("{CHECK_POINT}", checkpoint)
# fetchall为读取全部记录的语句
recs = self.fetchall(sql_listen)
for rec in recs:
rec_id = rec.get('id')
self.callback(rec)
self.set_checkpoint(rec_id)
#根据情况设置轮询时间
time.sleep(1)
def callback(self, dictdata):
# 这是do_listen调用的一个回调函数,把数据传过来处理,在子类中实现
print "Should be implemented in subclasses!"

已有记录是否变化

为了更加通用,我们可以抽象为,某一个sql语句查询结果是否有变化。查询结果通常是一个结构体,在Python里面无法有效的比较一个结构体是否有变化,我们可以使用讨巧的办法:将这个结构体序列化后去做比较,我选择了pickle去做序列化操作,它比json更加高效和稳定一些。很明显,这里的一个关键是你需要存储上一次查询得到的数据才能和最近一次查询做比较。代码原型如下:

#!/usr/bin/python
# -*- coding: UTF-8 -*-
import threading
import pickle
class BaseMonitor(object):
"""
监听数据变化的基类
"""
def __init__(self):
self.prev_data = None
self.stop_flag = True
self.monitor_thread = threading.Thread(name="Monitor", target=self.do_monitor)
self.monitor_thread.start()
def start(self):
self.stop_flag = False
def stop(self):
self.stop_flag = True
def do_monitor(self):
while True:
if not self.stop_flag:
self.execute(self.extra_sql)
data = self.fetchall(self.base_sql)
if data:
str_data = pickle.dumps(data)
if str_data != self.prev_data:
self.callback(data)
self.prev_data = str_data
def callback(self, dictdata):
# 这是do_monitor调用的一个回调函数,把数据传过来处理,在子类中实现
print "Should be implemented in subclasses!"

如何使用

使用这两个类,只需要继承它们,并实现callback函数就好。如:

class ListenTest(BaseListener):
def callback(self, dictdata):
print "LISTEN:",dictdata
if __name__ == "__main__":
ad = ListenTest()
ad.start()