'''Created 6 Jul 2022 Levi''' # coding: UTF-8 import platform import unittest import os from typing import Union import ujson from lxml import etree from anafapi.anafoauth import Anafoauth, Efactoauth from anafapi import utils from anafapi.en16931 import Invoice if platform.system() == 'Linux': token_file_path = '/home/deeejas/git/anafapi/anafapi/{}' elif platform.system() == 'Windows': token_file_path = os.path.join(os.path.expanduser('~'), 'git/anafapi/anafapi/{}') class oauthTest(unittest.TestCase): anoauth = Anafoauth( # client_id='16cbef8754bef0bcf015529353427e8a7e3ee71d56879565', # client_secret='c6aa66d89002f36461f93facd990e6bf5bc706e1df897e8a7e3ee71d56879565', client_id='312afcb54fae740eaa721aa7364b7e8a7e3ee71d52220c66', client_secret='3eba7d34e1c516403033bcdf22619ab1942bec0bb0ef7e8a7e3ee71d52220c66', # redirect_uri='https://www.anaf.ro/oauth2callback', redirect_uri='https://oauth.pstmn.io/v1/browser-callback', ) def test_is_valid(self): self.anoauth.load_token_file(token_file_path.format('efact.json')) # print(self.anoauth.to) print(self.anoauth.is_valid) def test_browser_redirect(self): print(self.anoauth._browser_redirect()) def test_get_token(self): self.anoauth.get_token() self.anoauth.save_token(token_file_path.format('efact.json')) def test_get_jwt_token(self): # self.anoauth.get_jwt_token() self.anoauth.get_jwt_token_requests() # self.anoauth.save_jwt_token(token_file_path.format('efact_jwt.json')) def test_revive_token(self): self.anoauth.load_token_file(token_file_path.format('efact.json')) self.anoauth.revive_token() self.anoauth.save_token(token_file_path.format('efact_refreshed.json')) class efactTest(unittest.TestCase): efactoauth = Efactoauth( # client_id='16cbef8754bef0bcf015529353427e8a7e3ee71d56879565', # client_secret='c6aa66d89002f36461f93facd990e6bf5bc706e1df897e8a7e3ee71d56879565', client_id='312afcb54fae740eaa721aa7364b7e8a7e3ee71d52220c66', client_secret='3eba7d34e1c516403033bcdf22619ab1942bec0bb0ef7e8a7e3ee71d52220c66', # redirect_uri='https://www.anaf.ro/oauth2callback', redirect_uri='https://oauth.pstmn.io/v1/browser-callback' ) efactoauth.load_token_file(token_file_path.format('efact_jwt.json')) efactoauth.load_session() def test_add_invoice(self): # self.efactoauth.validate_invoice('C:/Users/DECEL/FACT1/MZK/2023_06/MZK2321602.xml', env='prod') self.efactoauth.add_invoice('17259191', r'C:/Users/DECEL/FACT1/MZK/2023_06/MZK2321602.xml', env='prod') def test_bulk_add_invoice(self): for root, _, files in os.walk(r'f:/DECEL/EFACT/KINVEST/2024_01/'): for file in files: if file.endswith('.xml'): inv_file = os.path.join(root, file) self.efactoauth.add_invoice('31434328', inv_file, env='prod') def test_transform_to_pdf(self): self.efactoauth.transform_to_pdf(r'c:/Users/DECEL/FACT1/MZK/2023_01_03/MZK2414.xml') def test_bulk_transform(self): for root, _, files in os.walk(r'f:/DECEL/EFACT/KINVEST/2024_01/'): for file in files: if file.endswith('.xml'): inv_file = os.path.join(root, file) self.efactoauth.transform_to_pdf(inv_file, no_validation=True) def test_get_all_messages(self, vat_id, days, root_path, env, filter_, extract): self.efactoauth.get_all_messages(vat_id, days=days, env=env, root_path=root_path, filter_=filter_, extract=extract) def test_pretty_messages(self): self.efactoauth.pretty_messages('17259191', env='prod') def test_download_invoice(self): self.efactoauth.download_invoice('3105899770', path=r'C:/Users/DECEL/FACT1/MZK/2023_06', env='prod') def test_getxml_to_erp(self): # self.efactoauth.getxml_from_zip(path='../efact/3016038776.zip', dest_f='') # latest = self.efactoauth.get_latest_invoice('efact') inv_files = utils.get_not_booked('17259191') # inv_files = [r'C:/Users/Levi/git/anafapi/tests/17259191/FACTURA PRIMITA/2024_04/08/16702141/4238612905.xml'] for latest in inv_files: if 'booked' not in latest: inv = self.efactoauth.parse_inv(invoice=latest) print(inv.seller_party.name, inv.invoice_id, inv.issue_date.date()) # for line in inv.lines: # print(line.tax) self.efactoauth.addinvoice_tonextup(invoice=inv, kind='stock', template=32, exp_plan=691, warehouse_code=23, generic=True, generic_value='1767', send=True) os.rename(latest, latest.replace('.xml'.lower(), '_booked.xml')) # os.remove(latest) def test_partner_code(self): with open(r'C:/Users/Levi/git/ciel_rest_api/parts.json') as partner_file: partners = ujson.load(partner_file) invoice = Invoice.from_xml(r'C:\Users\Levi\git\anafapi\tests\17259191\FACTURA PRIMITA\2024_04\08\16702141\4238612905_booked.xml') tax_code = invoice.seller_party.tax_scheme_id.replace('ro'.upper(), '') for partner in partners: if tax_code == partner['TaxCode']: print(partner['Code'], partner['Name']) break def test_verify_invoice(self): self.efactoauth.verify_invoice('4075641152', env='prod') def test_isvalid(self): self.efactoauth.load_token_file(token_file_path.format(r'efact.json')) print(self.efactoauth.is_valid) def test_get_latest_invoice(self): latest = self.efactoauth.get_latest_invoice('efact') print(latest) # os.replace(latest, latest.replace('.xml'.lower(), '_booked.xml')) def test_validate_invoice(self): # print(os.listdir('c:/Users/DECEL/FACT1/MZK/2023.01.03/')) for root, _dir, files in os.walk(r'c:\Users\levi\FACT1\ALP\2024_04_08'): for file in files: # print(file) self.efactoauth.validate_invoice(os.path.join(root, file)) # self.efactoauth.validate_invoice('c:/Users/levi/FACT1/MZK/2024_01_23/MZK241157.xml') def test_rename_all(self, dir_path, kind: Union[str, 'seller', 'buyer']): # @UndefinedVariable if kind == 'seller': account = 'AccountingCustomerParty' # print(os.path.basename(dir_path)) # dir_path = os.path.join(dir_path, os.path.basename(dir_path)) else: account = 'AccountingSupplierParty' # next_subfolder = next(os.walk(dir_path))[1][0] for root_f, _dirs, files in os.walk(dir_path): for file in files: # print(root_f.split('/')) # print(os.path.basename(dir_path)) # print(root_f, file) if file.endswith('.xml') and file.count('_') < 2 and 'cif error' not in root_f: # print(os.path.join(root_f, file)) tree = etree.parse(os.path.join(root_f, file)) root = tree.getroot() inv_id = root.xpath('//cbc:ID', namespaces={'cbc': 'urn:oasis:names:specification:ubl:schema:xsd:CommonBasicComponents-2'}) inv_id = inv_id[0].text parties = root.xpath('//cbc:RegistrationName', namespaces={'cbc': 'urn:oasis:names:specification:ubl:schema:xsd:CommonBasicComponents-2'}) for partie in parties: parent_p_type = partie.getparent().getparent().getparent() if etree.QName(parent_p_type).localname == account: # @UndefinedVariable # print(partie.text) file_partie = partie.text is_date = root.xpath('//cbc:IssueDate', namespaces={'cbc': 'urn:oasis:names:specification:ubl:schema:xsd:CommonBasicComponents-2'}) is_date = is_date[0].text partie_ids = root.xpath('//cbc:CompanyID', namespaces={'cbc': 'urn:oasis:names:specification:ubl:schema:xsd:CommonBasicComponents-2'}) for partie_id in partie_ids: parent_p_type = partie_id.getparent().getparent().getparent() parent_t_type = partie_id.getparent() # print(parent_t_type) if etree.QName(parent_t_type).localname == 'PartyTaxScheme' and etree.QName(parent_p_type).localname == account: # @UndefinedVariable # print(partie_id.text) file_partn_id = partie_id.text new_name = '{}_{}_{}_{}.xml'.format(file.replace('.xml', ''), inv_id, file_partie, is_date) current_path = os.path.join(root_f, file) new_path = os.path.join(root_f, new_name) # # !This will rename all files # os.rename(current_path, new_path) print(new_name) else: print('Nothing to rename') def test_parse_inv(self): # inv = self.efactoauth.parse_inv('C:/Users/Levi/git/anafapi/tests/17259191/8262600/FACTURA PRIMITA/01_2024/31/4154212674_booked.xml') # print(self.efactoauth._get_series(inv.invoice_id)) inv_files = utils.get_not_booked('17259191') print(inv_files) if __name__ == "__main__": # unittest.main() # efactTest().test_add_invoice() efactTest().test_get_all_messages('17259191', 2, '17259191', 'prod', 'P', True) # efactTest().test_rename_all(os.path.join(os.path.expanduser('~'), 'git/anafapi/tests/17259191/FACTURA TRIMISA'), kind='seller') # efactTest().test_rename_all(os.path.join(os.path.expanduser('~'), 'git/anafapi/tests/17259191/FACTURA PRIMITA'), kind='buyer') # anoauth.get_token()