123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 |
- '''Created Dec 3, 2021 Levi'''
- import os
- import platform
- import subprocess as sp
- import time
- from json import JSONDecodeError
- from urllib.parse import urljoin
- import requests
- import xlsxwriter as xlsw
- from .writer import Writer
- def insert_keys(dict_, obj, pos):
- return {k: v for k, v in (list(dict_.items())[:pos] + list(obj.items()) + list(dict_.items())[pos:])}
- class anafAPI(object):
- '''classdocs
- '''
- BASEURL = 'https://webservicesp.anaf.ro/'
- VATURL = 'PlatitorTvaRest/api/v6/ws/tva'
- REPURL = 'bilant?an={}&cui={}'
- CULTURL = 'RegCult/api/v2/ws/cult'
- AGRURL = 'RegAgric/api/v2/ws/agric'
- def __init__(self):
- '''Constructor
- '''
- self.session = requests.Session()
- def parse_list(self, file_name=None, list_=None, excl_=[], ):
- if file_name:
- with open(file_name, 'r') as fiscal_codes, open("lista_cf_new.txt", "w") as fileout:
- for code in fiscal_codes:
- if code[0:2] != 'HU' and code[0:2] != 'DE' and code[0:2] != 'GB' and code[0:2] != 'EL' and code[0:2] != 'LT' and 'SK' and code not in excl_:
- fileout.write(code)
- with open("lista_cf_new.txt", "r") as newfile:
- self.lista_cf_ro = ' '.join(newfile).replace(',', '').split()
- if list_:
- lista_cf_ro = list_
- print('items in list', len(lista_cf_ro))
- return self.lista_cf_ro
- def get_vat(self, cui):
- url = urljoin(self.BASEURL, self.VATURL)
- payload = [{"cui": cui.replace('RO', ''), "data": time.strftime('%Y-%m-%d')}]
- result = self.session.post(url, json=payload, headers={'Content-Type': 'application/json'})
- try:
- resp_dict = result.json()['found'][0]
- random_keys = ['telefon', 'fax', 'codPostal', 'act', 'stare_inregistrare']
- pos = 5
- for random_key in random_keys:
- if random_key not in resp_dict.keys():
- resp_dict = insert_keys(resp_dict, {random_key: 'Nan'}, pos)
- pos += 1
- new_dict = {resp_dict['cui']: self._dict_to_list(resp_dict, 0)}
- except (KeyError, JSONDecodeError):
- pass
- time.sleep(1.1)
- return new_dict, resp_dict
- def get_cult(self, cui):
- url = urljoin(self.BASEURL, self.CULTURL)
- payload = [{"cui": cui.replace('RO', ''), "data": time.strftime('%Y-%m-%d')}]
- result = self.session.post(url, json=payload)
- try:
- resp_dict = result.json()['found'][0]
- new_dict = {resp_dict['cui']: self._dict_to_list(resp_dict, 0)}
- except (KeyError, JSONDecodeError):
- pass
- time.sleep(1.1)
- return new_dict, resp_dict
- def get_rep(self, cui, year):
- url = urljoin(self.BASEURL, self.REPURL.format(year, cui))
- result = self.session.get(url)
- resp_dict = result.json()
- # print(resp_dict)
- vals = [resp_dict[val] for val in resp_dict if val != 'i']
- indicators = resp_dict['i']
- for adict in indicators:
- # print(adict['indicator'])
- adict['indicator'] = int(adict['indicator'].replace('I', ''))
- # print(adict['indicator'])
- indicators = sorted(indicators, key=lambda d: d['indicator'])
- vals2 = [val["val_indicator"] for val in indicators]
- headers = [key for key in resp_dict if key != 'i']
- headers2 = ['{}-{}'.format(val['indicator'], val["val_den_indicator"]) for val in indicators]
- headers.extend(headers2)
- vals.extend(vals2)
- time.sleep(1.1)
- return headers, vals
- def to_excel(self, type_, cui=None, func=None, year=None, range_=None):
- types = {
- 'to_rep': Writer(f'{type_}.xlsx').to_rep,
- 'to_vat': Writer(f'{type_}.xlsx').to_vat,
- 'to_cult': Writer(f'{type_}.xlsx').to_cult
- }
- types[type_](cui, func, year, range_)
- self.type_ = type_
- # if type_ == 'to_rep':
- # Writer(f'{type_}.xlsx').to_rep(cui, func, year, range_)
- # elif type_ == 'to_vat':
- # Writer(f'{type_}.xlsx').to_vat(cui, func)
- def open(self, file_name=None):
- file_name = file_name or f'{self.type_}.xlsx'
- abs_path = os.path.abspath(file_name)
- if platform.system() == 'Windows':
- sp.run(f'start {abs_path}', shell=True)
- elif platform.system() == 'Linux':
- sp.run(['xdg-open', f'{abs_path}'])
- else:
- raise NotImplementedError
- def _dict_to_list(self, s_dict, from_ind=None):
- if from_ind is None: n_list = [s_dict[i] for i in s_dict]
- else:
- n_list = [s_dict[i] for i in s_dict]
- n_list.pop(from_ind)
- return n_list
|