python周期任務調度工具Schedule使用詳解_第1頁
python周期任務調度工具Schedule使用詳解_第2頁
python周期任務調度工具Schedule使用詳解_第3頁
python周期任務調度工具Schedule使用詳解_第4頁
python周期任務調度工具Schedule使用詳解_第5頁
已閱讀5頁,還剩6頁未讀 繼續免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領

文檔簡介

第python周期任務調度工具Schedule使用詳解目錄1.準備2.基本使用參數傳遞獲取目前所有的作業取消所有作業標簽功能設定作業截止時間3.高級使用裝飾器安排作業并行執行日志記錄異常處理如果你想周期性地執行某個Python腳本,最出名的選擇應該是Crontab腳本,但是Crontab具有以下缺點:

1.不方便執行秒級任務。

2.當需要執行的定時任務有上百個的時候,Crontab的管理就會特別不方便。

還有一個選擇是Celery,但是Celery的配置比較麻煩,如果你只是需要一個輕量級的調度工具,Celery不會是一個好選擇。

在你想要使用一個輕量級的任務調度工具,而且希望它盡量簡單、容易使用、不需要外部依賴,最好能夠容納Crontab的所有基本功能,那么Schedule模塊是你的不二之選。

使用它來調度任務可能只需要幾行代碼,感受一下:

importschedule

importtime

defjob():

print("I'mworking...")

schedule.every(10).minutes.do(job)

whileTrue:

schedule.run_pending()

time.sleep(1)

上面的代碼表示每10分鐘執行一次job函數,非常簡單方便。你只需要引入schedule模塊,通過調用scedule.every(時間數).時間類型.do(job)發布周期任務。

發布后的周期任務需要用run_pending函數來檢測是否執行,因此需要一個While循環不斷地輪詢這個函數。

下面具體講講Schedule模塊的安裝和初級、進階使用方法。

1.準備

開始之前,你要確保Python和pip已經成功安裝在電腦上,請選擇以下任一種方式輸入命令安裝依賴:

Windows環境打開Cmd(開始-運行-CMD)。

MacOS環境打開Terminal(command+空格輸入Terminal)。

如果你用的是VSCode編輯器或Pycharm,可以直接使用界面下方的Terminal.

pipinstallschedule

2.基本使用

最基本的使用在文首已經提到過,下面給大家展示更多的調度任務例子:

importschedule

importtime

defjob():

print("I'mworking...")

#每十分鐘執行任務

schedule.every(10).minutes.do(job)

#每個小時執行任務

schedule.every().hour.do(job)

#每天的10:30執行任務

schedule.every().day.at("10:30").do(job)

#每個月執行任務

schedule.every().monday.do(job)

#每個星期三的13:15分執行任務

schedule.every().wednesday.at("13:15").do(job)

#每分鐘的第17秒執行任務

schedule.every().minute.at(":17").do(job)

whileTrue:

schedule.run_pending()

time.sleep(1)

可以看到,從月到秒的配置,上面的例子都覆蓋到了。不過如果你想只運行一次任務的話,可以這么配

importschedule

importtime

defjob_that_executes_once():

#此處編寫的任務只會執行一次...

returnschedule.CancelJob

schedule.every().day.at('22:30').do(job_that_executes_once)

whileTrue:

schedule.run_pending()

time.sleep(1)

參數傳遞

如果你有參數需要傳遞給作業去執行,你只需要這么做:

importschedule

defgreet(name):

print('Hello',name)

#do()將額外的參數傳遞給job函數

schedule.every(2).seconds.do(greet,name='Alice')

schedule.every(4).seconds.do(greet,name='Bob')

獲取目前所有的作業

如果你想獲取目前所有的作業:

importschedule

defhello():

print('Helloworld')

schedule.every().second.do(hello)

all_jobs=schedule.get_jobs()

取消所有作業

如果某些機制觸發了,你需要立即清除當前程序的所有作業:

importschedule

defgreet(name):

print('Hello{}'.format(name))

schedule.every().second.do(greet)

schedule.clear()

標簽功能

在設置作業的時候,為了后續方便管理作業,你可以給作業打個標簽,這樣你可以通過標簽過濾獲取作業或取消作業。

importschedule

defgreet(name):

print('Hello{}'.format(name))

#.tag打標簽

schedule.every().day.do(greet,'Andrea').tag('daily-tasks','friend')

schedule.every().hour.do(greet,'John').tag('hourly-tasks','friend')

schedule.every().hour.do(greet,'Monica').tag('hourly-tasks','customer')

schedule.every().day.do(greet,'Derek').tag('daily-tasks','guest')

#get_jobs(標簽):可以獲取所有該標簽的任務

friends=schedule.get_jobs('friend')

#取消所有daily-tasks標簽的任務

schedule.clear('daily-tasks')

設定作業截止時間

如果你需要讓某個作業到某個時間截止,你可以通過這個方法:

importschedule

fromdatetimeimportdatetime,timedelta,time

defjob():

print('Boo')

#每個小時運行作業,18:30后停止

schedule.every(1).hours.until("18:30").do(job)

#每個小時運行作業,2030-01-0118:33today

schedule.every(1).hours.until("2030-01-0118:33").do(job)

#每個小時運行作業,8個小時后停止

schedule.every(1).hours.until(timedelta(hours=8)).do(job)

#每個小時運行作業,11:32:42后停止

schedule.every(1).hours.until(time(11,33,42)).do(job)

#每個小時運行作業,2025-5-1711:36:20后停止

schedule.every(1).hours.until(datetime(2025,5,17,11,36,20)).do(job)

截止日期之后,該作業將無法運行。

立即運行所有作業,而不管其安排如何

如果某個機制觸發了,你需要立即運行所有作業,可以調用schedule.run_all():

importschedule

defjob_1():

print('Foo')

defjob_2():

print('Bar')

schedule.every().monday.at("12:40").do(job_1)

schedule.every().tuesday.at("16:40").do(job_2)

schedule.run_all()

#立即運行所有作業,每次作業間隔10秒

schedule.run_all(delay_seconds=10)

3.高級使用

裝飾器安排作業

如果你覺得設定作業這種形式太啰嗦了,也可以使用裝飾器模式:

fromscheduleimportevery,repeat,run_pending

importtime

#此裝飾器效果等同于schedule.every(10).minutes.do(job)

@repeat(every(10).minutes)

defjob():

print("Iamascheduledjob")

whileTrue:

run_pending()

time.sleep(1)

并行執行

默認情況下,Schedule按順序執行所有作業。其背后的原因是,很難找到讓每個人都高興的并行執行模型。

不過你可以通過多線程的形式來運行每個作業以解決此限制:

importthreading

importtime

importschedule

defjob1():

print("I'mrunningonthread%s"%threading.current_thread())

defjob2():

print("I'mrunningonthread%s"%threading.current_thread())

defjob3():

print("I'mrunningonthread%s"%threading.current_thread())

defrun_threaded(job_func):

job_thread=threading.Thread(target=job_func)

job_thread.start()

schedule.every(10).seconds.do(run_threaded,job1)

schedule.every(10).seconds.do(run_threaded,job2)

schedule.every(10).seconds.do(run_threaded,job3)

whileTrue:

schedule.run_pending()

time.sleep(1)

日志記錄

Schedule模塊同時也支持logging日志記錄,這么使用:

importschedule

importlogging

logging.basicConfig()

schedule_logger=logging.getLogger('schedule')

#日志級別為DEBUG

schedule_logger.setLevel(level=logging.DEBUG)

defjob():

print("Hello,Logs")

schedule.every().second.do(job)

schedule.run_all()

schedule.clear()

效果如下:

DEBUG:schedule:Running*all*1jobswith0sdelayinbetween

DEBUG:schedule:RunningjobJob(interval=1,unit=seconds,do=job,args=(),kwargs={})

Hello,Logs

DEBUG:schedule:Deleting*all*jobs

異常處理

Schedule不會自動捕捉異常,它遇到異常會直接拋出,這會導致一個嚴重的問題:后續所有的作業都會被中斷執行,因此我們需要捕捉到這些異常。

你可以手動捕捉,但是某些你預料不到的情況需要程序進行自動捕獲,加一個裝飾器就能做到了:

importfunctools

defcatch_exceptions(cancel_on_failure=False):

defcatch_exceptions_d

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經權益所有人同意不得將文件中的內容挪作商業或盈利用途。
  • 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
  • 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論