我要訂便當 (2): 用 Python + Sqlite 儲存訂單
前情回顧:上一集在 我要訂便當(1) —— 用 Python + Selenium 控制瀏覽器取得訂單 中,我們藉由自動化套件 Selenium 控制 Chrome 成功從訂便當網站裡取得訂單資訊了。但只能夠取得現在的訂單,和原本有新訂單的時候才通知的目標仍然有點差距,那麼,如何判斷有沒有新訂單呢?只要和上一次讀取時的訂單相比就能知道了。
目標:將訂單儲存起來,判斷有沒有新訂單。
這一篇主要的做法主要參考自 大數軟體 - 如何透過 Line 發送最新一集的漫畫 中,關於如何判斷是否有最新一集漫畫的部分。這系列的影片步驟明瞭,說明直接,同時標題也很對我胃口(我就喜歡把工具拿來生活周遭玩的感覺),因此這邊也推薦一下,有興趣的可以去看看。
回歸正題,這篇的第一部分就是要使用 Sqlite 將抓到的訂單儲存起來。Sqlite 顧名思義就是 SQL + Lite 的感覺,主打小巧輕便。它會將資料儲存在一個檔案中,並且支援精簡的 SQL 指令,可以說是相當方便。
在 Python 要對 Sqlite 做操作主要是藉由 sqlite3 這個包,因此在接下來的步驟前,請先安裝這個包。關於 Sqlite3 的基本操作,可以參閱 菜鳥教程的 SQLite - Python 教學。
基本上和一般資料庫的操作邏輯並無太多差異,主要也是以 sqlite3.connect
先連線到資料庫中,再使用如 connection.execute
等的執行語法來進行操作。同時連線之後也可以使用其他資料框架提供的儲存和讀取等方法來處理,上方的菜鳥教程已有完整的教學流程,因此這邊就不多做說明,以下將會直接寫出操作資料庫動作的函式。
儲存與讀取訂單
由於我們的情境相對簡單,只需要儲存和讀取。因此我們直接建立這兩個方法來操作。且因為我們在上一集讀回來的資料已經轉成 DataFrame,因此可以直接調用相關的方法進行操作。
def saveToDb(data, dbname, tablename):
''' 將資料表存放到 sqlLite 資料庫 '''
with lite.connect(dbname) as db:
data.to_sql(tablename, con = db, if_exists = 'replace')
def readFromDb(dbname, tablename):
''' 從資料庫取出上次資料表 '''
with lite.connect(dbname) as db:
return pandas.read_sql_query('SELECT * FROM {tablename}'.format(tablename = tablename), db)
def delDataInDb(dbname, tablename):
''' 將指定資料表的資料刪除 '''
with lite.connect(dbname) as db:
cursor = db.cursor()
cursor.execute('SELECT * FROM {tablename}'.format(tablename=tablename))
db.commit()
cursor.close()
另外也要考慮到可能是第一次連接到資料庫,若是指定的資料表不存在可能會報錯,因此加上一個檢查有沒有資料表的方法。
def IsDbTableExist(dbname, tablename):
''' 確認是否有資料表 '''
with lite.connect(dbname) as db:
c = db.cursor()
c.execute("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='{tablename}'".format(tablename = tablename))
if c.fetchone()[0]==1 :
return True
return False
這些小零件完成之後,就可以把它們加到主要的流程裡。最簡單的順序應該是這樣的
- 取得網路訂單資訊
- 和資料庫的訂單資訊比對
- 如果有差異,且訂單變多代表有新訂單! → 後續的通知等動作
- 如果沒有差異,或是訂單變少了,代表沒有新訂單。 → 按兵不動
- 將新的資料存回資料庫
- 結束
按照這個流程加入之前的主程式後,就會變得如下(全程式將附於文末)
def main():
url = 'https://dinbendon.net/do/login'
dbname = 'order.sqlite'
tablename = 'bandon'
order = fetch_bandon(url) # 連線到便當網取得訂單資訊
if order is None or len(order) == 0:
print("目前沒有進行中的訂單")
return 0
# 如果資料庫沒資料表(=第一次執行)先存一次建個環境
if IsDbTableExist(dbname, tablename) is False:
print('資料庫無過往訂單,即將儲存目前訂單並退出')
saveToDb(order, dbname, tablename)
print_order(order)
return 0
oldOrder = readFromDb(dbname, tablename)
orderList = list(order['目標'])
oldOrderList = list(oldOrder['目標'])
#print_order(order)
delDataInDb(dbname, tablename)
saveToDb(order, dbname, tablename)
# 如果上次訂單是空的
if oldOrder is None or len(oldOrder) == 0:
print('資料庫無過往訂單,即將儲存目前訂單並退出')
return 0
if orderList == oldOrderList or len(orderList) < len(oldOrderList):
print("訂單無變動,程式即將退出")
return 0
print("偵測到訂單變動!")
# 做點通知的事
print("尚未實作任何通知,程式即將終止…")
print_order(order)
完成之後,接著進行測試,首先用顧客帳號登入便當網,確認一下目前的訂單內容
接著先執行過一次
可以看見有確實抓到訂單部分,並且在 py 檔所在的位置也多出了一個 sqlite 檔案存放我們的資料。
那麼接著直接再執行一次,確認是不是會確認資料一致
接著登入便當網,自己發起一個團購試試
接著我們再執行試試能不能告訴我們有差異
可以看見確實有抓到差異了!
接著要實作任何通知都相當簡單了,只要將和之前不一樣的部分取出就可以取得新增的訂單,或是可以記錄每個訂單的開始和結束時間做管理,可以說能做的事相當多。
屆此已經完成了預定將訂單做儲存和偵測有沒有變動的部分,故這篇就先到這裡打住。原本考慮到這部分相較之下篇幅比較短,是不是該跟下一段落合併成一篇,但考量之後還是作罷。
畢竟大多時候也是做給自己往後參考用的,希望能把過程和看過的資料記錄下來,將來需要的時候可以自己抄自己,因此還是決定將不同套件的部份作切割,也是在這時候確定了這系列將會有四篇。
下一篇 將處理通知的部分,用到的技巧也在上面稍微提到過囉!那麼就下週再見~
補充:怎麼看 Sqlite 裡的資料?
紀錄一下看 Sqlite 檔案裡資料的方法。
由於 Sqlite 主打輕便可攜,因此平常的需求大多也是看資料而已。使用 DB Browser for SQLite 就足夠了
不過我個人是比較偏好直接從 VS Code 裡就打開看,這時候就需要安裝擴充套件。
我是採用 Sqlite 這個套件,該頁面就已經有操作示範了,使用相當簡單。
只要先用 Ctrl + Shift + P 叫出命令提示區,輸入 Sqlite 之後,開啟資料庫就可以用簡單的操作來看內容或執行 SQL 指令,這邊強烈推薦。
我要訂便當系列
- 我要訂便當(1) —— 用 Python + Selenium 控制瀏覽器取得訂單
- 我要訂便當(2) —— 用 Python + Sqlite 儲存訂單
- 我要訂便當(3) —— 用 Python + Line Notify 傳送通知
- 我要訂便當(4) —— 將 Python 腳本部署上 Heroku
- 我要訂便當(5) —— Heroku 填坑小記
參考資料
- 大數軟體 - 如何透過 Line 發送最新一集的漫畫
- 菜鳥教程 - SQLite Python
- SQLite 資料庫介紹 - Aaron Ho
- DAY22 - 搞懂如何導入sqlite - iT邦幫忙
- Python 學習筆記 : 資料庫存取測試 (一) SQLite - 小狐狸事務所
附錄:目前程式碼
from selenium import webdriver
import re
import time
import pandas
import sqlite3 as lite
# 自動檢查團購便當網
def main():
url = 'https://dinbendon.net/do/login'
dbname = 'order.sqlite'
tablename = 'bandon'
order = fetch_bandon(url) # 連線到便當網取得訂單資訊
if order is None or len(order) == 0:
print("目前沒有進行中的訂單")
return 0
# 如果資料庫沒資料表(=第一次執行)先存一次建個環境
if IsDbTableExist(dbname, tablename) is False:
print('資料庫無過往訂單,即將儲存目前訂單並退出')
saveToDb(order, dbname, tablename)
print_order(order)
return 0
oldOrder = readFromDb(dbname, tablename)
orderList = list(order['目標'])
oldOrderList = list(oldOrder['目標'])
#print_order(order)
delDataInDb(dbname, tablename)
saveToDb(order, dbname, tablename)
# 如果上次訂單是空的
if oldOrder is None or len(oldOrder) == 0:
print('資料庫無過往訂單,即將儲存目前訂單並退出')
return 0
if orderList == oldOrderList or len(orderList) < len(oldOrderList):
print("訂單無變動,程式即將退出")
return 0
print("偵測到訂單變動!")
# 做點通知的事
print("尚未實作任何通知,程式即將終止…")
print_order(order)
def fetch_bandon(url, username="guest", password="guest"):
''' 開啟瀏覽器並連線到便當網取得資料 '''
options = webdriver.ChromeOptions()
#options.add_argument('headless')
driver = webdriver.Chrome(options=options)
driver.get(url) # 連線到訂便當頁面
time.sleep(1) # 演一下
# 輸入帳密
driver.find_element_by_name("username").send_keys(username)
driver.find_element_by_name("password").send_keys(password)
# 輸入驗證碼
ques = driver.find_elements_by_class_name("alignRight")[2].text
temp = re.findall(r"\d+\.?\d*", ques)
answer = int(temp[0]) + int(temp[1])
driver.find_element_by_name("result").send_keys(answer)
# 提交表單
driver.find_element_by_name("submit").click()
#time.sleep(1)
# 取出訂單表格列
rows = driver.find_elements_by_css_selector(
"div#inProgressBox>table>tbody>tr")
if len(rows) == 0:
driver.close()
return list()
bandons = [list(map(getText, row.find_elements_by_css_selector(
"td>div>a>span"))) for row in rows] # 取出每一列資料的文字
driver.close()
tableHeader = ['人數', '發起人', '目標']
bandons_df = pandas.DataFrame(bandons, columns=tableHeader)
return bandons_df
def print_order(data):
'''列印訂單資料,看起來整齊一點'''
for index, row in data.iterrows():
if row is not None:
print('({hcount:>4s}) {orderer}: {order:<40s}'.format(
orderer = str(row['發起人']),
order = str(row['目標']),
hcount = str(row['人數'])))
def getText(x):
return x.text
def saveToDb(data, dbname, tablename):
''' 將資料表存放到 sqlLite 資料庫 '''
with lite.connect(dbname) as db:
data.to_sql(tablename, con=db, if_exists='replace')
def readFromDb(dbname, tablename):
''' 從資料庫取出上次資料表 '''
with lite.connect(dbname) as db:
return pandas.read_sql_query('SELECT * FROM {tablename}'.format(tablename=tablename), db)
def delDataInDb(dbname, tablename):
''' 將指定資料表的資料刪除 '''
with lite.connect(dbname) as db:
cursor = db.cursor()
cursor.execute('SELECT * FROM {tablename}'.format(tablename=tablename))
db.commit()
cursor.close()
def IsDbTableExist(dbname, tablename):
''' 確認是否有資料表 '''
with lite.connect(dbname) as db:
c = db.cursor()
c.execute("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='{tablename}'".format(tablename=tablename))
if c.fetchone()[0] == 1:
return True
return False
if __name__ == '__main__':
main()
其他文章
哈囉,如果你也有 LikeCoin,也覺得我的文章有幫上忙的話,還請不吝給我拍拍手呦,謝謝~ ;)