123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- '''Created 9 Aug 2024 Levi'''
- import asyncio
- import pickle
- import sys
- import traceback
- import configparser
- from imaplib import IMAP4
- from time import sleep
- import telegram
- from telegram.error import BadRequest
- from imap_tools import MailBox, AND, OR
- from bs4 import BeautifulSoup
- with open('gmail2tg', 'rb') as pckfile:
- app_password = pickle.load(pckfile)
- # Set up your credentials
- IMAP_SERVER = 'imap.gmail.com'
- EMAIL_ACCOUNT = 'deeejas@gmail.com'
- EMAIL_PASSWORD = app_password
- config = configparser.ConfigParser()
- config.read('bot')
- gmail_bot = telegram.Bot(token=config['AUTH']['token2'])
- def pretty_exc():
- exc_type, _exc_obj, exc_tb = sys.exc_info()
- # fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
- for tb in list(traceback.format_exception(exc_type, _exc_obj, exc_tb)):
- print(tb)
- def clean_html(text) -> str:
- soup = BeautifulSoup(text, 'html.parser')
- stripped_text = soup.get_text()
- lines = (line.strip() for line in stripped_text.splitlines())
- # break multi-headlines into a line each
- chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
- # drop blank lines
- text = '\n'.join(chunk for chunk in chunks if chunk)
- return text
- async def gmail2telegram(container) -> list | str:
- # global container
- try:
- with MailBox(IMAP_SERVER).login(EMAIL_ACCOUNT, EMAIL_PASSWORD, 'INBOX') as mailbox:
- sleep(15)
- primary_emails = mailbox.fetch(AND(OR('X-GM-RAW "category:primary"', 'X-GM-RAW "category:updates"'), seen=False), mark_seen=False) # type:ignore
- # print(container)
- ids = []
- for msg in primary_emails:
- # chk = msg.uid not in container
- # print(chk)
- if msg.uid not in container:
- soup = BeautifulSoup(msg.html, 'html.parser')
- msg_text = soup.prettify().replace('<br/>', '').replace('<html>', '').replace('<head>', '').replace('</head>', '').replace('<body>', '').replace('<div>', '').replace('</div>', '').replace('</body>', '').replace('</html>', '')
- msg_text = msg_text[:msg_text.find('Intraday transactions')]
- msg_text = clean_html(msg_text)
- try:
- if len(msg.text) or len(msg.html) != 0:
- await gmail_bot.send_message(chat_id=config['AUTH']['chatid'], text=msg_text)
- else:
- await gmail_bot.send_message(chat_id=config['AUTH']['chatid'], text=msg_text)
- print('message appended', len(container))
- except BadRequest:
- try:
- if len(msg.text) or len(msg.html) != 0:
- await bot2.send_message(chat_id=config['AUTH']['chatid'], text=msg_text // 5) # noqa E:mypy
- else:
- await bot2.send_message(chat_id=config['AUTH']['chatid'], text=msg_text // 5) # noqa E:mypy
- print('message appended', len(container))
- except BadRequest:
- await gmail_bot.send_message(chat_id=config['AUTH']['chatid'], text=f'Message is too long')
- print('message appended', len(container), '!! last message was to long !!')
- except Exception:
- try:
- # mailbox.logout()
- pretty_exc()
- except IMAP4.abort:
- pass
- ids.append(msg.uid)
- # mailbox.logout()
- # print(msg.uid)
- return ids
- except Exception:
- pretty_exc()
- return 'empty box'
- async def main() -> None:
- container: set = set()
- try:
- while True:
- tasks = [gmail2telegram(container)]
- results = await asyncio.gather(*tasks)
- # print(results)
- for result in results[0]:
- # print(result)
- if result not in container:
- container.add(result)
- # await asyncio.sleep(1)
- except KeyboardInterrupt:
- print('Cancelling all tasks...')
- for task in tasks:
- task.cancel() # type: ignore
- await asyncio.gather(*tasks, return_exceptions=True)
- print('All tasks cancelled.')
- if __name__ == '__main__':
- asyncio.run(main())
|