我遇到過和題主一樣的問題,其實(shí)每種數(shù)據(jù)庫都能找到一些方法去監(jiān)控?cái)?shù)據(jù)的變化,比如mysql可以通過配置my.ini將數(shù)據(jù)庫操作日志寫到文本文件中,然后通過分析文本去獲取變化。但是這樣處理實(shí)在缺少Python精神:一是你的代碼同特定數(shù)據(jù)庫深度耦合,如果后續(xù)會遷移到其它數(shù)據(jù)庫問題很多;二是這種代碼安裝部署很麻煩,需要系統(tǒng)管理員去配合修改mysql設(shè)置,而且對mysql的性能影響也需要測試人員進(jìn)行深度測試。
最終,我選擇了一種看起來有點(diǎn)笨,但卻非常通用,而且對數(shù)據(jù)庫的性能影響也能預(yù)估的方法:使用sql語句去監(jiān)控?cái)?shù)據(jù)表的變化。
這種方法具有以下優(yōu)點(diǎn):
只使用sql語句,很容易移植到其它數(shù)據(jù)庫系統(tǒng)中使用。定義好輪詢間隔時(shí)間,可以有效的控制對數(shù)據(jù)庫系統(tǒng)的資源占用。安裝配置非常簡單,無需修改數(shù)據(jù)庫系統(tǒng)的設(shè)置。設(shè)計(jì)思路非常簡單,每隔固定間隔檢查一下數(shù)據(jù)表,如果有新的記錄觸發(fā)回調(diào)函數(shù)。通常的業(yè)務(wù)需要兩種監(jiān)控模式,一是新增記錄監(jiān)聽(我稱之為listen),二是監(jiān)控已有記錄的變化(稱之為moniter)。
新增記錄的監(jiān)聽所有待監(jiān)聽的表需要有一個自增的字段id,只要判斷上一次輪詢后有沒有新的id出現(xiàn)即可。你需要將上一次處理的最后一個id存儲下來,這里我只用了一個變量去存儲,你可能需要把它持久存在磁盤或數(shù)據(jù)庫里。代碼原型如下:
#!/usr/bin/python# -*- coding: UTF-8 -*-import threadingclass BaseListener(object): #使用一個線程啟動監(jiān)聽 def __init__(self): self.checkpoint = 0 self.listen_thread = threading.Thread(name="Listener", target=self.do_listen) self.listen_thread.start() def start(self): self.stop_flag = False def stop(self): self.stop_flag = True def set_checkpoint(self, v): #設(shè)置監(jiān)聽的斷點(diǎn),如果需要可以持久存儲在磁盤上 self.checkpoint = v def get_checkpoint(self): return self.checkpoint def do_listen(self): while True: if not self.stop_flag: #監(jiān)聽用sql語句,應(yīng)當(dāng)以id倒排,需要使用 WHERE id > {CHECK_POINT}進(jìn)行篩選,如 sql = "SELECT * FROM a WHERE id>{CHECK_POINT} ORDER BY id DESC" checkpoint = self.get_checkpoint() sql_listen = sql.replace("{CHECK_POINT}", checkpoint) # fetchall為讀取全部記錄的語句 recs = self.fetchall(sql_listen) for rec in recs: rec_id = rec.get('id') self.callback(rec) self.set_checkpoint(rec_id) #根據(jù)情況設(shè)置輪詢時(shí)間 time.sleep(1) def callback(self, dictdata): # 這是do_listen調(diào)用的一個回調(diào)函數(shù),把數(shù)據(jù)傳過來處理,在子類中實(shí)現(xiàn) print "Should be implemented in subclasses!"已有記錄是否變化為了更加通用,我們可以抽象為,某一個sql語句查詢結(jié)果是否有變化。查詢結(jié)果通常是一個結(jié)構(gòu)體,在Python里面無法有效的比較一個結(jié)構(gòu)體是否有變化,我們可以使用討巧的辦法:將這個結(jié)構(gòu)體序列化后去做比較,我選擇了pickle去做序列化操作,它比json更加高效和穩(wěn)定一些。很明顯,這里的一個關(guān)鍵是你需要存儲上一次查詢得到的數(shù)據(jù)才能和最近一次查詢做比較。代碼原型如下:
#!/usr/bin/python# -*- coding: UTF-8 -*-import threadingimport pickleclass BaseMonitor(object): """ 監(jiān)聽數(shù)據(jù)變化的基類 """ def __init__(self): self.prev_data = None self.stop_flag = True self.monitor_thread = threading.Thread(name="Monitor", target=self.do_monitor) self.monitor_thread.start() def start(self): self.stop_flag = False def stop(self): self.stop_flag = True def do_monitor(self): while True: if not self.stop_flag: self.execute(self.extra_sql) data = self.fetchall(self.base_sql) if data: str_data = pickle.dumps(data) if str_data != self.prev_data: self.callback(data) self.prev_data = str_data def callback(self, dictdata): # 這是do_monitor調(diào)用的一個回調(diào)函數(shù),把數(shù)據(jù)傳過來處理,在子類中實(shí)現(xiàn) print "Should be implemented in subclasses!"如何使用使用這兩個類,只需要繼承它們,并實(shí)現(xiàn)callback函數(shù)就好。如:
class ListenTest(BaseListener): def callback(self, dictdata): print "LISTEN:",dictdataif __name__ == "__main__": ad = ListenTest() ad.start()希望上面的思路或多或少能幫到你。