start_mail_alert.py 1.2 KB

123456789101112131415161718192021222324252627282930313233343536
  1. '''Created 16 Nov 2022 Levi'''
  2. from collections import deque
  3. import asyncio
  4. import mail_alert
  5. async def main() -> None:
  6. container: set = set()
  7. try:
  8. while True:
  9. # loop = asyncio.get_event_loop()
  10. # tasks = map(asyncio.create_task,
  11. # [mail_alert.mail_alert(container)
  12. # # mail_alert.mail_alert2()
  13. # # mail_alert.mail_alert3()
  14. # ])
  15. tasks = [mail_alert.mail_alert(container),
  16. mail_alert.mail_alert2(container),
  17. mail_alert.mail_alert3(container)]
  18. # result = await asyncio.wait(tasks)
  19. results = await asyncio.gather(*tasks)
  20. # print(result)
  21. for result in results:
  22. if result not in container:
  23. container.add(result)
  24. # await asyncio.sleep(1)
  25. except KeyboardInterrupt:
  26. print('Cancelling all tasks...')
  27. for task in tasks:
  28. task.cancel() # noqa
  29. await asyncio.gather(*tasks, return_exceptions=True)
  30. print('All tasks cancelled.')
  31. if __name__ == '__main__':
  32. asyncio.run(main())