Skip to content

Commit

Permalink
Merge branch 'main' into staging/reactivate_cnpj
Browse files Browse the repository at this point in the history
  • Loading branch information
folhesgabriel authored Jan 2, 2025
2 parents c13f2fd + 1fadaba commit bbee8e5
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 57 deletions.
2 changes: 1 addition & 1 deletion pipelines/datasets/br_rf_cafir/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@


class constants(Enum):
URL = ["https://dadosabertos.rfb.gov.br/CAFIR/"]
URL = ["https://dadosabertos.rfb.gov.br/dados/cafir/"]

PATH = [
"/tmp/input/br_rf_cafir",
Expand Down
1 change: 0 additions & 1 deletion pipelines/datasets/br_rf_cafir/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
with Flow(
name="br_rf_cafir.imoveis_rurais", code_owners=["Gabriel Pisa"]
) as br_rf_cafir_imoveis_rurais:
# Parameters
dataset_id = Parameter("dataset_id", default="br_rf_cafir", required=True)
table_id = Parameter("table_id", default="imoveis_rurais", required=True)
update_metadata = Parameter("update_metadata", default=False, required=False)
Expand Down
13 changes: 10 additions & 3 deletions pipelines/datasets/br_rf_cafir/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


import os
from datetime import datetime
from datetime import datetime, timedelta

import pandas as pd
from prefect import task
Expand All @@ -20,9 +20,13 @@
strip_string,
)
from pipelines.utils.utils import log
from pipelines.constants import constants


@task
@task(
max_retries=2,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def parse_files_parse_date(url) -> tuple[list[datetime], list[str]]:
"""Extrai os nomes dos arquivos e a data de disponibilização dos dados no FTP
Expand All @@ -39,7 +43,10 @@ def parse_files_parse_date(url) -> tuple[list[datetime], list[str]]:
return date_files


@task
@task(
max_retries=3,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def parse_data(url: str, other_task_output: tuple[list[datetime], list[str]]) -> str:
"""Essa task faz o download dos arquivos do FTP, faz o parse dos dados e salva os arquivos em um diretório temporário.
Expand Down
93 changes: 55 additions & 38 deletions pipelines/datasets/br_rf_cafir/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
import os
import unicodedata
from datetime import datetime
import time

import pandas as pd
import requests
from bs4 import BeautifulSoup

from pipelines.utils.utils import log
from typing import Tuple, List


def strip_string(x: pd.DataFrame) -> pd.DataFrame:
Expand Down Expand Up @@ -103,53 +105,68 @@ def remove_accent(input_str: pd.DataFrame, pattern: str = "all") -> pd.DataFrame
return input_str


def parse_date_parse_files(url: str) -> tuple[list[datetime], list[str]]:
"""Faz o parse da data de atualização e dos links de download dos arquivos
def parse_date_parse_files(url: str, retries: int = 3, backoff_factor: int = 2) -> Tuple[datetime, List[str]]:
"""
Faz o parse da data de atualização e dos links de download dos arquivos.
Args:
url (string): A url do ftp do CAFIR da receita federal
url (str): A URL do FTP do CAFIR da Receita Federal.
retries (int): Número de tentativas em caso de falha por timeout.
backoff_factor (int): Fator de espera exponencial entre tentativas.
Returns:
tuple[list[datetime],list[str]]: Retorna uma tupla com duas listas. A primeira contém uma lista de datas de atualização dos dados e a segunda contém uma lista com os nomes dos arquivos.
Tuple[datetime, List[str]]: Retorna uma tupla com a data de atualização e os nomes dos arquivos.
"""
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "pt-BR,pt;q=0.8,en-US;q=0.5,en;q=0.3",
"Sec-GPC": "1",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-User": "?1",
"Priority": "u=0, i",
}

xpath_release_date = "tr td:nth-of-type(3)"
response = requests.get(url)
attempt = 0

# Checa se a requisição foi bem sucedida
if response.status_code == 200:
soup = BeautifulSoup(response.text, "html.parser")

## 1. parsing da data de atualização ##

# Seleciona tags com base no xpath
td_elements = soup.select(xpath_release_date)
# Extrai texto das tags selecionadas
td_texts = [td.get_text(strip=True) for td in td_elements]
# selecionar someente data
td_texts = [td[0:10] for td in td_texts]
# seleciona data
# são liberados 20 arquivos a cada atualização, a data é a mesma o que varia é a hora.
td_texts = td_texts[3]
# converte para data
release_data = datetime.strptime(td_texts, "%Y-%m-%d").date()
log(f"realease date: {release_data}")

## 2. parsing dos nomes dos arquivos ##

a_elements = soup.find_all("a")
# Extrai todos href
href_values = [a["href"] for a in a_elements]
# Filtra href com nomes dos arquivos
files_name = [href for href in href_values if "csv" in href]
log(f"files name: {files_name}")
return release_data, files_name
while attempt < retries:
try:
response = requests.get(url, headers=headers, timeout=(10, 30))

# Checa se a requisição foi bem-sucedida
if response.status_code == 200:
soup = BeautifulSoup(response.text, "html.parser")

# 1. Parsing da data de atualização
td_elements = soup.select(xpath_release_date)
td_texts = [td.get_text(strip=True)[:10] for td in td_elements]
release_data = datetime.strptime(td_texts[3], "%Y-%m-%d").date()
log(f"Release date: {release_data}")

# 2. Parsing dos nomes dos arquivos
a_elements = soup.find_all("a")
href_values = [a["href"] for a in a_elements if a.has_attr("href")]
files_name = [href for href in href_values if "csv" in href]
log(f"Files name: {files_name}")

return release_data, files_name

else:
log(f"Erro na requisição: {response.status_code}")
raise requests.RequestException(f"HTTP {response.status_code}")

except (requests.Timeout, requests.ConnectionError) as e:
attempt += 1
wait_time = backoff_factor ** attempt
log(f"Erro: {e}. Tentando novamente em {wait_time} segundos...")
time.sleep(wait_time)

raise TimeoutError("Falha após várias tentativas de conectar ao servidor.")

else:
log(
f"Não foi possível acessar o site :/. O status da requisição foi: {response.status_code}"
)
raise FileNotFoundError


def download_csv_files(url, file_name, download_directory):
Expand Down
1 change: 0 additions & 1 deletion pipelines/datasets/br_rf_cno/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
with Flow(
name="br_rf_cno.tables", code_owners=["Gabriel Pisa"]
) as br_rf_cno_tables:
# Parameters
dataset_id = Parameter("dataset_id", default="br_rf_cno", required=True)
table_id = Parameter("table_id", default="microdados", required=True)
table_ids = Parameter("table_ids", default=['microdados', 'areas', 'cnaes', 'vinculos'], required=False)
Expand Down
36 changes: 24 additions & 12 deletions pipelines/datasets/br_rf_cno/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import pandas as pd
import shutil
import asyncio
import time
from requests.exceptions import ConnectionError, HTTPError

from prefect import task

Expand All @@ -19,7 +21,8 @@
from pipelines.utils.utils import log


#NOTE: O crawler falhará se o nome do arquivo mudar.


@task
def check_need_for_update(url: str) -> str:
"""
Expand All @@ -34,30 +37,40 @@ def check_need_for_update(url: str) -> str:
Raises:
requests.HTTPError: If there is an HTTP error when making the request.
ValueError: If the file 'cno.zip' is not found in the URL.
#NOTE: O crawler falhará se o nome do arquivo mudar.
"""
log('---- Extracting most recent update date from CNO FTP')


response = requests.get(url)
if response.status_code != 200:
raise requests.HTTPError(f"HTTP error occurred: Status code {response.status_code}")

retries = 5
delay = 2

for attempt in range(retries):
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
break
except ConnectionError as e:
log(f"Connection attempt {attempt + 1}/{retries} failed: {e}")
if attempt < retries - 1:
time.sleep(delay)
delay *= 2
else:
raise
except HTTPError as e:
raise requests.HTTPError(f"HTTP error occurred: {e}")

soup = BeautifulSoup(response.content, 'html.parser')
rows = soup.find_all('tr')


max_file_date = None

# A lógica é simples: processa cada 'table data' (td) de cada linha 'tr'
for row in rows:
cells = row.find_all('td')


if len(cells) < 4:
continue


link = cells[1].find('a')
if not link:
continue
Expand All @@ -66,13 +79,12 @@ def check_need_for_update(url: str) -> str:
if name != "cno.zip":
continue


date = cells[2].get_text(strip=True)
max_file_date = datetime.strptime(date, "%Y-%m-%d %H:%M").strftime("%Y-%m-%d")
break

if not max_file_date:
raise ValueError("File 'cno.zip' not found on the FTP site. Check the api endpoint: https://arquivos.receitafederal.gov.br/dados/cno/ to see folder structure or file name has changed")
raise ValueError("File 'cno.zip' not found on the FTP site. Check the API endpoint to see if the folder structure or file name has changed.")

log(f"---- Most recent update date for 'cno.zip': {max_file_date}")

Expand Down
14 changes: 13 additions & 1 deletion pipelines/datasets/br_rf_cno/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,19 @@ async def download_chunk(
Returns:
None
"""
headers = {"Range": f"bytes={start}-{end}"}
headers = {
"Range": f"bytes={start}-{end}",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "pt-BR,pt;q=0.8,en-US;q=0.5,en;q=0.3",
"Sec-GPC": "1",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-User": "?1",
"Priority": "u=0, i"
}
async with semaphore:
response = await client.get(url, headers=headers, timeout=60.0)
with open(filepath, "r+b") as f:
Expand Down

0 comments on commit bbee8e5

Please sign in to comment.