'''Created Sep 28, 2021 Levi'''
import configparser
import sys
import traceback
import ssl
from time import sleep
from imaplib import IMAP4
import telegram
import keyring
from bs4 import BeautifulSoup
from telegram.error import BadRequest
from imap_tools import MailBox, AND
# -> for python >=3.10, the dh key too small problem
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
context.set_ciphers('DEFAULT@SECLEVEL=1')
config = configparser.ConfigParser()
config.read('bot')
bot = telegram.Bot(token=config['AUTH']['token'])
bot2 = telegram.Bot(token=config['AUTH']['bank'])
def pretty_exc():
exc_type, _exc_obj, exc_tb = sys.exc_info()
# fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
for tb in list(traceback.format_exception(exc_type, _exc_obj, exc_tb)):
print(tb)
def mask(msg, to_):
return any([to_ == msg.from_,
to_ in msg.to,
to_ in msg.cc,
to_ in msg.bcc])
def mask2(msg, to_):
return any([to_ == msg.from_,
to_ in msg.to,
to_ in msg.cc,
to_ in msg.bcc,
to_ in msg.subject])
def clean_html(text) -> str:
soup = BeautifulSoup(text, 'html.parser')
stripped_text = soup.get_text()
lines = (line.strip() for line in stripped_text.splitlines())
# break multi-headlines into a line each
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
# drop blank lines
text = '\n'.join(chunk for chunk in chunks if chunk)
return text
async def mail_alert(container) -> str | None:
'''mail alert Inbox'''
# container: deque = deque(maxlen=500)
# while True:
try:
mailbox = MailBox(host=config['email']['host'], ssl_context=context)
mailbox.login(config['email']['user'], keyring.get_password('yagmail', 'levente.marton@mzk.ro'), initial_folder='Inbox') # noqa E:mypy
sleep(15)
msgs = [msgs for msgs in mailbox.fetch(AND(seen=False), mark_seen=False)]
for msg in msgs:
chk = mask(msg, 'levente.marton@mzk.ro')
if chk:
msg_text = msg.text
if 'end of day extras/intraday' in msg.subject:
msg_text = msg.text.replace('
', '\n')
if msg.uid not in container:
try:
if len(msg.text) != 0:
await bot.send_message(chat_id=config['AUTH']['chatid'], text=f'{msg.from_}\nsubj: {msg.subject}\n\n{msg_text}')
else:
await bot.send_message(chat_id=config['AUTH']['chatid'], text=f'{msg.from_}\nsubj: {msg.subject}\n\n{msg.html}')
# container.appendleft(msg.uid)
print('message appended', len(container))
except BadRequest:
try:
if len(msg.text) != 0:
await bot.send_message(chat_id=config['AUTH']['chatid'], text=f'{msg.from_}\nsubj: {msg.subject}\n\n{msg_text[:len(msg_text) // 5]}')
else:
await bot.send_message(chat_id=config['AUTH']['chatid'], text=f'{msg.from_}\nsubj: {msg.subject}\n\n{msg.html[:len(msg.html) // 5]}')
# container.appendleft(msg.uid)
print('message appended', len(container))
except BadRequest:
await bot.send_message(chat_id=config['AUTH']['chatid'], text=f'{msg.from_}\nsubj: {msg.subject}\n\nMessage is too long')
# container.appendleft(msg.uid)
print('message appended', len(container), '!! last message was to long !!')
except Exception:
try:
mailbox.logout()
pretty_exc()
except IMAP4.abort:
pass
# break
mailbox.logout()
return msg.uid
except Exception:
try:
mailbox.logout()
except IMAP4.abort:
pass
return 'empty box'
async def mail_alert2(container) -> str | None:
# container: deque = deque(maxlen=500)
# while True:
try:
mailbox2 = MailBox(host=config['email']['host'], ssl_context=context)
mailbox2.login(config['email']['user'], keyring.get_password('yagmail', 'levente.marton@mzk.ro'), initial_folder='bank_events') # noqa E:mypy
sleep(15)
msgs = [msgs for msgs in mailbox2.fetch(AND(seen=False), mark_seen=True)]
for msg in msgs:
chk = mask2(msg, 'account')
if chk:
if msg.uid not in container:
soup = BeautifulSoup(msg.html, 'html.parser')
msg_text = soup.prettify().replace('
', '').replace('', '').replace('