gmail2telegram.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. '''Created 9 Aug 2024 Levi'''
  2. import asyncio
  3. import pickle
  4. import sys
  5. import traceback
  6. import configparser
  7. from imaplib import IMAP4
  8. from time import sleep
  9. import telegram
  10. from telegram.error import BadRequest
  11. from imap_tools import MailBox, AND, OR
  12. from bs4 import BeautifulSoup
  13. with open('gmail2tg', 'rb') as pckfile:
  14. app_password = pickle.load(pckfile)
  15. # Set up your credentials
  16. IMAP_SERVER = 'imap.gmail.com'
  17. EMAIL_ACCOUNT = 'deeejas@gmail.com'
  18. EMAIL_PASSWORD = app_password
  19. config = configparser.ConfigParser()
  20. config.read('bot')
  21. gmail_bot = telegram.Bot(token=config['AUTH']['token2'])
  22. def pretty_exc():
  23. exc_type, _exc_obj, exc_tb = sys.exc_info()
  24. # fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
  25. for tb in list(traceback.format_exception(exc_type, _exc_obj, exc_tb)):
  26. print(tb)
  27. def clean_html(text) -> str:
  28. soup = BeautifulSoup(text, 'html.parser')
  29. stripped_text = soup.get_text()
  30. lines = (line.strip() for line in stripped_text.splitlines())
  31. # break multi-headlines into a line each
  32. chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
  33. # drop blank lines
  34. text = '\n'.join(chunk for chunk in chunks if chunk)
  35. return text
  36. async def gmail2telegram(container) -> list | str:
  37. # global container
  38. try:
  39. with MailBox(IMAP_SERVER).login(EMAIL_ACCOUNT, EMAIL_PASSWORD, 'INBOX') as mailbox:
  40. sleep(15)
  41. primary_emails = mailbox.fetch(AND(OR('X-GM-RAW "category:primary"', 'X-GM-RAW "category:updates"'), seen=False), mark_seen=False) # type:ignore
  42. # print(container)
  43. ids = []
  44. for msg in primary_emails:
  45. # chk = msg.uid not in container
  46. # print(chk)
  47. if msg.uid not in container:
  48. soup = BeautifulSoup(msg.html, 'html.parser')
  49. msg_text = soup.prettify().replace('<br/>', '').replace('<html>', '').replace('<head>', '').replace('</head>', '').replace('<body>', '').replace('<div>', '').replace('</div>', '').replace('</body>', '').replace('</html>', '')
  50. msg_text = msg_text[:msg_text.find('Intraday transactions')]
  51. msg_text = clean_html(msg_text)
  52. try:
  53. if len(msg.text) or len(msg.html) != 0:
  54. await gmail_bot.send_message(chat_id=config['AUTH']['chatid'], text=msg_text)
  55. else:
  56. await gmail_bot.send_message(chat_id=config['AUTH']['chatid'], text=msg_text)
  57. print('message appended', len(container))
  58. except BadRequest:
  59. try:
  60. if len(msg.text) or len(msg.html) != 0:
  61. await bot2.send_message(chat_id=config['AUTH']['chatid'], text=msg_text // 5) # noqa E:mypy
  62. else:
  63. await bot2.send_message(chat_id=config['AUTH']['chatid'], text=msg_text // 5) # noqa E:mypy
  64. print('message appended', len(container))
  65. except BadRequest:
  66. await gmail_bot.send_message(chat_id=config['AUTH']['chatid'], text=f'Message is too long')
  67. print('message appended', len(container), '!! last message was to long !!')
  68. except Exception:
  69. try:
  70. # mailbox.logout()
  71. pretty_exc()
  72. except IMAP4.abort:
  73. pass
  74. ids.append(msg.uid)
  75. # mailbox.logout()
  76. # print(msg.uid)
  77. return ids
  78. except Exception:
  79. pretty_exc()
  80. return 'empty box'
  81. async def main() -> None:
  82. container: set = set()
  83. try:
  84. while True:
  85. tasks = [gmail2telegram(container)]
  86. results = await asyncio.gather(*tasks)
  87. # print(results)
  88. for result in results[0]:
  89. # print(result)
  90. if result not in container:
  91. container.add(result)
  92. # await asyncio.sleep(1)
  93. except KeyboardInterrupt:
  94. print('Cancelling all tasks...')
  95. for task in tasks:
  96. task.cancel() # type: ignore
  97. await asyncio.gather(*tasks, return_exceptions=True)
  98. print('All tasks cancelled.')
  99. if __name__ == '__main__':
  100. asyncio.run(main())