目录
  • 一、安装
  • 二、定时执行一次
  • 三、间隔执行
    • 四、每日定时执行一次
  • 五、每几分钟执行一次
    • 六、每小时执行一次
      • 七、调度器分类

        一、安装

        pip install apscheduler

        二、定时执行一次

        • 新建一个scheduler调度器
        • 添加一个job store调度任务
        • 运行调度任务
        import datetime
        from apscheduler.schedulers.blocking import BlockingScheduler
        
        def task(name):
        print('%s告诉你现在时间是:%s' . format(name, datetime.datetime.now()))
        
        # 该任务将会在2022-05-20 13:14:52执行一次
        scheduler = BlockingScheduler()
        scheduler.add_job(task, 'date', run_date=datetime.datetime(2022, 5, 20, 13, 14, 52), args=['autofelix'], id='task')
        scheduler.start()

        三、间隔执行

        • 当你调度作业的时候,你需要为这个作业选择一个触发器,用来描述这个作业何时被触发
        • date 一次性指定日期
        • interval 在某个时间范围内间隔多长时间执行一次
        from apscheduler.schedulers.blocking import BlockingScheduler
        
        def task():
        print('我是飞兔小哥')
        
        # 每隔10秒数执行一次
        scheduler = BlockingScheduler()
        scheduler.add_job(task, 'interval', seconds=10, id='task')
        scheduler.start()

        四、每日定时执行一次

        • cron 和Linux crontab格式兼容,最为强大
        from apscheduler.schedulers.blocking import BlockingScheduler
        
        f = open('status.text', 'a', encoding='utf8')
        sc = BlockingScheduler()
        @sc.scheduled_job('cron', day_of_week='*', hour=1, minute='30', second='50')
        
        if name == '__main__':
        try:
        sc.start()
        f.write('定时任务成功执行')
        except Exception as e:
        sc.shutdown()
        f.write('定时任务执行失败')
        finally:
        f.close()

        五、每几分钟执行一次

        • /2:每隔2分钟执行一次
        • /1:每隔1分钟执行一次
        from apscheduler.schedulers.blocking import BlockingScheduler
        
        def task():
        print('你的任务每隔2分钟执行一次')
        
        scheduler = BlockingScheduler()
        scheduler.add_job(job1, 'cron', minute="/2", id='task')
        scheduler.start()

        六、每小时执行一次

        • jitter:代表可以上下浮动的秒数
        from apscheduler.schedulers.blocking import BlockingScheduler
        
        def task():
        print('你的任务每隔1小时执行一次')
        
        scheduler = BlockingScheduler()
        scheduler.add_job(task, 'interval', hours=1, id='task')
        # scheduler.add_job(task, 'interval', hours=1, id='task', jitter=120)
        scheduler.start()

        七、调度器分类

        • BlockingScheduler : 当调度器是你应用中唯一要运行的东西时
        • BackgroundScheduler : 当你没有运行任何其他框架并希望调度器在你应用的后台执行时使用(充电桩即使用此种方式)
        • AsyncIOScheduler : 当你的程序使用了asyncio(一个异步框架)的时候使用
        • GeventScheduler : 当你的程序使用了gevent(高性能的Python并发框架)的时候使用
        • TornadoScheduler : 当你的程序基于Tornado(一个web框架)的时候使用
        • TwistedScheduler : 当你的程序使用了Twisted(一个异步框架)的时候使用
        • QtScheduler : 如果你的应用是一个Qt应用的时候可以使用
        声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。