Browse Source

anaf spv api

marton levente 10 tháng trước cách đây
mục cha
commit
fed19f6beb
2 tập tin đã thay đổi với 123 bổ sung0 xóa
  1. 103 0
      anafapi/anafspv.py
  2. 20 0
      tests/test_anafspv.py

+ 103 - 0
anafapi/anafspv.py

@@ -0,0 +1,103 @@
+'''Created 24 Nov 2023 Levi'''
+import os
+from time import sleep
+from urllib3.util.retry import Retry
+from datetime import datetime
+from threading import Thread
+import requests
+import keyboard as kb
+import pywinctl as gw
+from requests.adapters import HTTPAdapter
+from selenium import webdriver
+from selenium.webdriver import ChromeOptions
+
+class anafSPV(object):
+    '''
+    classdocs
+    '''
+    KEYURL = 'https://webserviced.anaf.ro/SPVWS2/rest/listaMesaje?zile=1'
+    BASEURL = 'https://webserviced.anaf.ro/SPVWS2/rest/{}'
+    MESLIST = 'listaMesaje?zile={}'
+    DLURL = 'descarcare?id={}'
+    REQ = BASEURL.format('cerere?tip=')
+    ISTBIL = REQ + 'Istoric%20bilant&cui={}'
+    DATID = REQ + 'DATE%20IDENTIFICARE&cui={}'
+    VECT = REQ + 'VECTOR%20FISCAL&cui={}'
+    SINT = REQ + 'Situatie%20Sintetica&cui={}'
+    FROL = REQ + 'Fisa%20Rol&cui={}'
+    DLLOC = 'c:/Users/levi/Downloads/anaf/application/pdf/'
+
+    def __init__(self):
+        '''
+        Constructor
+        '''
+        self.session = requests.Session()
+        retry = Retry(connect=3, backoff_factor=1)
+        adapter = HTTPAdapter(max_retries=retry)
+        self.session.mount('http://', adapter)
+        self.session.mount('https://', adapter)
+
+    def get_cookies_from_browser(self):
+        chrome_options = ChromeOptions()
+        chrome_options.add_argument('--disable-dev-shm-usage')
+        chrome_options.add_argument('--no-sandbox')
+        chrome_options.add_argument('--disable-gpu')
+        chrome_options.add_argument('--window-size=0,0')
+        chrome_options.add_experimental_option('excludeSwitches', ['enable-automation'])
+
+        driver = webdriver.Chrome(options=chrome_options)
+        thread = Thread(target=self.accept_cert, args=())
+        thread.start()
+        driver.get(self.KEYURL)
+        self.cookies = driver.get_cookies()
+        driver.close()
+        self.requests_cookies = {}
+        for c in self.cookies:
+            self.requests_cookies[c['name']] = c['value']
+        return self.requests_cookies
+
+    def accept_cert(self):
+        # brave_ready.wait()
+        sleep(2)
+        win_2 = gw.getWindowsWithTitle('data:,', condition=gw.Re.CONTAINS)[0]
+        if win_2.isActive:
+            win_2.hide()
+        t_ = 0
+        while True:
+            try:
+                if t_ >= 1.5:
+                    break
+                win_ = gw.getWindowsWithTitle('Windows Security', condition=gw.Re.CONTAINS)[0]
+                if not win_.isActive:
+                    win_.activate()
+                    win_.acceptInput(True)
+                sleep(0.5)
+                kb.write(os.getenv('PKCS11PIN'))
+                kb.press_and_release('enter')
+                break
+            except Exception:
+                t_ += 0.01
+                sleep(0.01)
+
+    def get_messages(self, days=10):
+        response = self.session.get(self.BASEURL.format(self.MESLIST.format(days)), cookies=self.requests_cookies).json()
+        sorted_response = sorted(map(self._convert_and_extract_date, response['mesaje']), key=lambda x: x['created_date'])
+        filtered_list = sorted([item for item in response['mesaje'] if item['tip'] != 'RECIPISA'], key=lambda x: x['created_date'])
+        response['mesaje'] = filtered_list
+        return response
+
+    def download_message(self, cui, id_, details):
+        response = self.session.get(self.BASEURL.format(self.DLURL.format(id_)), cookies=self.requests_cookies)
+        os.makedirs(os.path.join(self.DLLOC, cui), exist_ok=True)
+        with open(os.path.join(self.DLLOC, cui, details + '.pdf'), 'wb') as file:
+            file.write(response.content)
+
+    def get_vector(self, cui):
+        response = self.session.get(self.VECT.format(cui), cookies=self.requests_cookies).json()
+        sorted_response = sorted(map(self._convert_and_extract_date, response['mesaje']), key=lambda x: x['created_date'])
+        return sorted_response
+
+    def _convert_and_extract_date(self, item):
+        item['created_date'] = datetime.strptime(item['data_creare'], '%d.%m.%Y %H:%M:%S')
+        del item['data_creare']
+        return item

+ 20 - 0
tests/test_anafspv.py

@@ -0,0 +1,20 @@
+'''Created 24 Nov 2023 Levi'''
+
+import unittest
+import pprint
+
+from anafapi.anafspv import anafSPV
+
+
+class spvTest(unittest.TestCase):
+    spv = anafSPV()
+
+    def test_get_messages(self):
+        requests_cookies = self.spv.get_cookies_from_browser()
+        messages = self.spv.get_messages(10)
+        pprint.pprint(messages)
+        self.spv.download_message(messages['mesaje'][2]['cif'], id_=messages['mesaje'][2]['id'], details=messages['mesaje'][2]['tip'])
+
+
+if __name__ == "__main__":
+    unittest.main()