''' Created on Jun 21, 2018 @author: levente.marton ''' import os import shutil import re import datetime from dataclasses import dataclass from pypxlib import Table @dataclass class Company: name: str vat_code: str reg_number: str address: str location: str county: str shortname: str obs: str adm: str admp: str admcnp: str mail: str = None price: int = None @dataclass class Account: clasa: str simbol: str denumire: str soldid: str = None soldic: str = None precedentd: str = None precedentc: str = None curentd: str = None curentc: str = None def soldf(self, type_): if type_ == 'd': return round((self.soldid + self.precedentd + self.curentd) - (self.soldic + self.precedentc + self.curentc)) elif type_ == 'c': return round((self.soldic + self.precedentc + self.curentc) - (self.soldid + self.precedentd + self.curentd)) class WinMentor(object): ''' Class to read winmentor database ''' def __init__(self, winment_path=os.getenv('WINMENT', 'c:/winment/data/').replace('\\', '/')): ''' Constructor ''' self.winment_path = winment_path @property def get_winment_path(self): return self.winment_path @get_winment_path.setter def set_winment_path(self, value): # self.value = value self.winment_path = value return self.winment_path def update_copy(self, db_file) -> str: '''returns the updated version of a .db file :param db_file is a .db from winmentor''' # self._to_file = db_file _, file_name = os.path.split(db_file) dir_list = db_file.split('/') # !!NOTE: if path is ?/winment/data/ then this must be 3 & 5 # if is ?/winment/winment/data/ then 4 & 6 dir_to_file = dir_list[4] if len(dir_list) != 6: os.makedirs(os.path.join('cash', dir_to_file), exist_ok=True) cashed_copy_mtime = -1 if os.path.isfile(os.path.join('./cash/', dir_to_file, file_name)): cashed_copy_mtime = os.stat(os.path.join('./cash/', dir_to_file, file_name)).st_mtime if os.stat(db_file).st_mtime != cashed_copy_mtime: shutil.copy2(db_file, os.path.join('./cash', dir_to_file)) return(os.path.join('./cash/', dir_to_file, file_name)) else: os.makedirs('cash', exist_ok=True) cashed_copy_mtime = -1 if os.path.isfile('./cash/' + file_name): cashed_copy_mtime = os.stat('./cash/' + file_name).st_mtime if os.stat(db_file).st_mtime != cashed_copy_mtime: shutil.copy2(db_file, './cash') return('./cash/' + file_name) def make_list(self, db_file, headers) -> list: '''returns a list of lists with elements from a given db file :param db_file is .db file from winmentor :param headers are the fields of the specified db file''' file_name = self.update_copy(db_file) with Table(file_name) as table: m_list = [] for row in table: item = [] for header in headers: element = row[header] if type(element) == datetime.date: if element < datetime.date(1999, 1, 1): element = '' item.append(element) m_list.append(item) return m_list def make_sal_list(self, file_1, file_2) -> list: '''returns list with all_ employees in current month :param file_1 is npers.db from shortname :param file_2 is likisal.db shortname/current_month''' file_name1 = self.update_copy(self.winment_path + file_1) file_name2 = self.update_copy(self.winment_path + file_2) with Table(file_name1) as perss, Table(file_name2) as sals: empl_all = [] # for field in perss.fields: for sal in sals: for pers in perss: # print(pers) if pers.Cod == sal.Cod: empl = [] empl.append(pers.Cod) empl.append(f'{pers.Nume} {pers.Prenume}') empl.append(pers.DataAngF.strftime('%d-%m-%Y')) empl.append(sal.VenitBrut) empl.append(sal.SalRealizat) empl.append(sal.CO) empl.append(round(sal.SalOra, 2)) empl.append(sal.ContribAngajat) empl.append(round(sal.ContribAngajator, 2)) empl.append(sal.VenitNet) empl.append(sal.Impozit) empl.append(sal.SalarNet) empl.append(sal.OreLucrate) empl.append(sal.ZileLuk) empl.append(sal.ZileCO) empl_all.append(empl) return empl_all def gen_firms(self, db_file, headers) -> list: '''generates company list from firme.db file :param db_file is firme.db from winment/data folder''' file_db = self.update_copy(self.winment_path + db_file) with Table(file_db) as table: for row in table: yield [row[header] for header in headers] def firmlist(self, headers, db_file='/firme.db', ban=None) -> list: '''returns company list from firme.db file :param db_file is firme.db from winment/data folder :param list headers is the fields from firme.db file :param list|str ban companies excluded from list''' comp_list = [] for a_company in self.gen_firms(db_file, headers): company = Company( name=a_company[0], vat_code=a_company[1], reg_number=a_company[2], address=a_company[3], location=a_company[4], county=a_company[5], shortname=a_company[6], adm=a_company[7], admp=a_company[8], admcnp=a_company[9], obs=a_company[10]) comp_list.append(company) if ban: if type(ban) is list: for r in comp_list: for company_shortname in ban: if company_shortname == r.shortname: comp_list.remove(r) else: for r in comp_list: if r == ban: comp_list.remove(r) return comp_list def filtered_firmlist(self, headers, db_file='/firme.db', ban=None) -> list: '''generates company list from firme.db file :param db_file is firme.db from winment/data folder :param list headers is the fields from firme.db file :param list|str ban companies excluded from list''' if ban: if type(ban) is list: for a_company in self.gen_firms(db_file, headers): company = Company( name=a_company[0], vat_code=a_company[1], reg_number=a_company[2], address=a_company[3], location=a_company[4], county=a_company[5], shortname=a_company[6], adm=a_company[7], admp=a_company[8], admcnp=a_company[9], obs=a_company[10]) if a_company[6] not in ban: yield [company.name, company.vat_code, company.reg_number, company.address, company.location, company.county, company.shortname, company.adm, company.admp, company.admcnp, company.obs] else: for a_company in self.gen_firms(db_file, headers): company = Company( name=a_company[0], vat_code=a_company[1], reg_number=a_company[2], address=a_company[3], location=a_company[4], county=a_company[5], shortname=a_company[6], adm=a_company[7], admp=a_company[8], admcnp=a_company[9], obs=a_company[10]) yield [company.name, company.vat_code, company.reg_number, company.address, company.location, company.county, company.shortname, company.adm, company.admp, company.admcnp, company.obs] def get_last_month(self, short_name) -> str: '''returns last month of a company in format YYYY_MM :param str short_name is the company shortname''' short_name = self.winment_path + short_name month_folders = [f for f in os.listdir(short_name) if re.match(r'[0-9_]+$', f)] month_folders.reverse() return month_folders[0] def get_bank_accounts(self, short_name, db_file='/nbanca.db'): #....................................................................... # TO DO: put an all_ parameter to yield all_ accounts or # just one, # make a named tuple with bank accounts. #....................................................................... headers = ['Codbanca', 'Denumire', 'NrCont'] headers2 = ['COD', 'DENUMIRE'] short_name = self.winment_path + short_name nbanks = self.update_copy(short_name + '/nbanci.db') file_db = self.update_copy(short_name + db_file) bank_codes = {} with Table(file_db) as table, Table(nbanks) as nbanks: # tables = zip(table, nbanks) for bank in nbanks: data = [bank[header] for header in headers2] bdict = {data[i]: data[i + 1] for i in range(0, len(data), 2)} bank_codes.update(bdict) for row in table: if row.NrCont: bank_account = [row[header] for header in headers] bank_account.append(bank_codes[bank_account[0]]) if bank_account[2].startswith(' '): yield bank_account def get_oblig(self, short_name, db_file='/ObligPI.DB'): headers = ['Part', 'TipDoc', 'Doc', 'NrDoc', 'DataDoc', 'Valoare', 'Rest'] short_name = self.winment_path + short_name # nbanks = self.update_copy(short_name + '/nbanci.db') oblig_db = self.update_copy(short_name + db_file) with Table(oblig_db) as table: # , Table(nbanks) as nbanks all_ = [] parts = [] tips = [] docs = [] nrs = [] dates = [] values = [] rests = [] for row in table: if row.TipDoc == 1: parts.append(row.Part) all_.append(parts) tips.append(row.TipDoc) all_.append(tips) docs.append(row.Doc) all_.append(docs) nrs.append(row.NrDoc) all_.append(nrs) dates.append(row.DataDoc) all_.append(dates) values.append(row.Valoare) all_.append(values) rests.append(row.Rest) all_.append(rests) df_dict = dict(zip(headers, all_)) return df_dict def corp_list(self, name=None): '''returns company list from actual shortnames from winmwnt/data folder ''' dir_list = [f.name for f in os.scandir(self.winment_path) if f.is_dir() and '@' not in f.name] if name: if type(name) is list: for r in name: dir_list.remove(r) else: dir_list.remove(name) return dir_list def verif_corp(self, file_='/firme.db', ban=None): headers = ['Denumire', 'CF', 'J', 'Adresa', 'Oras', 'Judet', 'Prescurtat', 'Obs'] corplist = self.make_list(self.winment_path + file_, headers) # dbRead.make_list(m_path + '/FIRME.DB', headers) if ban: if type(ban) is list: for r in corplist: for i in ban: if i == r[6]: corplist.remove(r) else: for r in corplist: if r == ban: corplist.remove(r) return corplist def verif_cont(self, file_) -> list: '''returns an account with its values from the balance :param file_ is shortname/ncont.db''' accounts = [] headers = ['Clasa', 'Simbol', 'Denumire', 'SoldID', 'SoldIC', 'PrecedentD', 'PrecedentC', 'CurentD', 'CurentC'] accountlist = self.make_list(self.winment_path + file_, headers) # + lunaCurenta for elem in accountlist: account = Account(clasa=elem[0], simbol=elem[1], denumire=elem[2], soldid=elem[3], soldic=elem[4], precedentd=elem[5], precedentc=elem[6], curentd=elem[7], curentc=elem[8]) accounts.append(account) return accountlist def an_inc(self, account_list, boolind, ind2, ind3): '''returns the annual turnover in given year :param int boolind:account class number :param int ind2, ind3:debit or credit position in balance (6-rulaj curent debit, 8-rulaj cumulat debit, index starting from 0)''' tt = 0 for account in account_list: # n = len(account[boolind]) == 3 and account[boolind] != '...' and int(account[boolind]) > 700 and int(account[boolind]) < 760 n = account[boolind] == 7 can = account[1][:2] != '76' # print(account[1][:2]) if n and can: tt += int(account[ind2]) + int(account[ind3]) return tt def divid(self, account_list, account='457') -> int: '''returns dividends/year :param account_list is account from ncont.db''' div_ = 0 for acc in account_list: if acc[1][:3] == account: div_ += acc[8] + acc[6] return round(div_) def divid_current(self, account_list, account='457') -> int: '''returns dividends from current month :param account_list is account from ncont.db''' div_ = 0 for acc in account_list: if acc[1][:3] == account: div_ += acc[8] return round(div_) def divid_intermed(self, account_list, account='463') -> int: '''returns dividends from current result/year :param account_list is account from ncont.db''' div_ = 0 for acc in account_list: if acc[1][:3] == account: div_ += acc[7] + acc[5] return round(div_) def divid_inter_current(self, account_list, account='463') -> int: '''returns dividends from current results in the current month :param account_list is account from ncont.db''' div_ = 0 for acc in account_list: if acc[1][:3] == account: div_ += acc[7] return round(div_) def spons(self, account_list, account='658.02') -> int: '''returns sponsored money/year :param account_list is account from ncont.db''' spons_ = 0 for acc in account_list: if acc[1] == account: spons_ += acc[8] + acc[6] return round(spons_) def result(self, account_list, boolind, ind2, ind3) -> int: # account='121' '''returns the final result in given year :param int boolind:account class number :param int ind2, ind3:debit or credit position in balance (6-rulaj curent debit, 8-rulaj cumulat debit, index starting from 0)''' res_minus = res_plus = 0 for r_minus in account_list: p = r_minus[boolind] == 6 if p: res_minus += int(r_minus[ind2]) + int(r_minus[ind3]) for r_plus in account_list: i = r_plus[boolind] == 7 if i: res_plus += int(r_plus[ind2]) + int(r_plus[ind3]) return res_plus - res_minus def ins_payable(self, account_list, account='431') -> int: '''returns insurences payable in current month :param account_list is account from ncont.db''' ins = 0 for acc in account_list: p = acc[1][:3] == account if p: ins += acc[8] return round(ins) def CAS_payable(self, account_list, accounts=('431.02', '431.05')) -> int: '''returns CAS payable in current month :param account_list is account from ncont.db''' CAS = 0 acc_1, acc_2 = accounts for acc in account_list: p = acc[1] == acc_1 d = acc[1] == acc_2 if p: CAS += acc[8] if d: CAS += acc[8] return round(CAS) def CASS_payable(self, account_list, accounts=('431.04', '431.06')) -> int: '''returns CASS payable in current month :param account_list is account from ncont.db''' CASS = 0 acc_1, acc_2 = accounts for acc in account_list: p = acc[1] == acc_1 d = acc[1] == acc_2 if p: CASS += acc[8] if d: CASS += acc[8] return round(CASS) def sal_tax_payable(self, account_list, account='431.44') -> int: '''returns salary tax payable in current month :param account_list is account from ncont.db''' tax = 0 for acc in account_list: p = acc[1] == account if p: tax += acc[8] return round(tax) def cam_payable(self, account_list, account='436') -> int: '''returns CAM payable in current month :param account_list is account from ncont.db''' cam = 0 for acc in account_list: p = acc[1][:3] == account if p: cam += acc[8] return round(cam) def vat_payable(self, account_list, accounts=('442.03', '442.04')) -> int: '''returns VAT payable in current month :param account_list is account from ncont.db''' tt = 0 acc_1, acc_2 = accounts for acc in account_list: p = acc[1] == acc_1 d = acc[1] == acc_2 if p: tt += acc[8] elif d: tt -= acc[7] + acc[8] return round(tt) def vat_final(self, acc_list, accounts=('442.03', '442.04')) -> int: '''returns VAT final payable in current month :param account_list is account from ncont.db''' tt = 0 acc_1, acc_2 = accounts for acc in acc_list: account = Account(clasa=acc[0], simbol=acc[1], denumire=acc[2], soldid=acc[3], soldic=acc[4], precedentd=acc[5], precedentc=acc[6], curentd=acc[7], curentc=acc[8]) payable = account.simbol == acc_1 deductible = account.simbol == acc_2 if payable: tt += account.soldf('c') elif deductible: tt -= account.soldf('d') return round(tt) def tax_payable(self, account_list, account='441') -> int: '''returns income TAX payable in current month :param account_list is account from ncont.db''' tax = 0 for acc in account_list: p = acc[1][:3] == account if p: tax += acc[8] return round(tax) def div_tax_payable(self, acc_list, accounts='446.07') -> int: '''returns dividend TAX payable in current month :param account_list is account from ncont.db''' tt = 0 # acc_1, acc_2 = accounts for acc in acc_list: account = Account(clasa=acc[0], simbol=acc[1], denumire=acc[2], soldid=acc[3], soldic=acc[4], precedentd=acc[5], precedentc=acc[6], curentd=acc[7], curentc=acc[8]) payable = account.simbol == accounts # deductible = account.simbol == acc_2 if payable: tt += account.curentc # elif deductible: # tt -= account.soldf('d') return round(tt) def advance_final(self, acc_list, accounts='542') -> int: '''returns final dvances/year :param account_list is account from ncont.db''' tt = 0 # acc_1, acc_2 = accounts for acc in acc_list: account = Account(clasa=acc[0], simbol=acc[1], denumire=acc[2], soldid=acc[3], soldic=acc[4], precedentd=acc[5], precedentc=acc[6], curentd=acc[7], curentc=acc[8]) payable = account.simbol[:3] == accounts # deductible = account.simbol == acc_2 if payable: tt += account.soldf('d') # elif deductible: # tt -= account.soldf('d') return round(tt) def deb_div(self, acc_list, accounts='461') -> int: '''returns advances transfered to deb. diversi/year :param account_list is account from ncont.db''' tt = 0 # acc_1, acc_2 = accounts for acc in acc_list: account = Account(clasa=acc[0], simbol=acc[1], denumire=acc[2], soldid=acc[3], soldic=acc[4], precedentd=acc[5], precedentc=acc[6], curentd=acc[7], curentc=acc[8]) payable = account.simbol[:3] == accounts # deductible = account.simbol == acc_2 if payable: tt += account.soldf('d') # elif deductible: # tt -= account.soldf('d') return round(tt) def credit_final(self, acc_list, accounts='455') -> int: '''returns credited ammount/year :param account_list is account from ncont.db''' tt = 0 # acc_1, acc_2 = accounts for acc in acc_list: account = Account(clasa=acc[0], simbol=acc[1], denumire=acc[2], soldid=acc[3], soldic=acc[4], precedentd=acc[5], precedentc=acc[6], curentd=acc[7], curentc=acc[8]) payable = account.simbol[:3] == accounts # deductible = account.simbol == acc_2 if payable: tt += account.soldf('d') # elif deductible: # tt -= account.soldf('d') return round(tt) if __name__ == '__main__': mentor = WinMentor() # accounts = list(mentor.get_bank_accounts('WEBS')) # account_num = [n for n in accounts[1] if n.startswith(' ')] for account in mentor.get_bank_accounts('WEBS'): # if account[2].startswith(' '): print(account)