Skip to content

Commit

Permalink
Merge branch 'main' into basedosdados-2.0.0b26
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Jan 23, 2025
2 parents d779c8a + b8fb73f commit bc70760
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 190 deletions.
15 changes: 14 additions & 1 deletion pipelines/datasets/br_rf_cafir/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,20 @@


class constants(Enum):
URL = ["https://dadosabertos.rfb.gov.br/dados/cafir/"]
URL = ["https://arquivos.receitafederal.gov.br/cafir/"]

HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "pt-BR,pt;q=0.8,en-US;q=0.5,en;q=0.3",
"Sec-GPC": "1",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-User": "?1",
"Priority": "u=0, i",
}

PATH = [
"/tmp/input/br_rf_cafir",
Expand Down
29 changes: 21 additions & 8 deletions pipelines/datasets/br_rf_cafir/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
from pipelines.constants import constants
from pipelines.datasets.br_rf_cafir.constants import constants as br_rf_cafir_constants
from pipelines.datasets.br_rf_cafir.schedules import schedule_br_rf_cafir_imoveis_rurais
from pipelines.datasets.br_rf_cafir.tasks import parse_data, parse_files_parse_date
from pipelines.datasets.br_rf_cafir.tasks import (
task_decide_files_to_download,
task_parse_api_metadata,
task_download_files
)
from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.decorators import Flow
from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants
Expand Down Expand Up @@ -46,15 +50,22 @@
prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id
)

info = parse_files_parse_date(url=br_rf_cafir_constants.URL.value[0])
log_task("Checando se os dados estão desatualizados")
df_metadata = task_parse_api_metadata(
url=br_rf_cafir_constants.URL.value[0],
headers=br_rf_cafir_constants.HEADERS.value
)

arquivos, data_atualizacao = task_decide_files_to_download(
df=df_metadata,
upstream_tasks=[df_metadata],
)

is_outdated = check_if_data_is_outdated(
dataset_id=dataset_id,
table_id=table_id,
data_source_max_date=info[0],
data_source_max_date=data_atualizacao,
date_format="%Y-%m-%d",
upstream_tasks=[info],
upstream_tasks=[arquivos],
)

with case(is_outdated, False):
Expand All @@ -63,10 +74,12 @@
with case(is_outdated, True):
log_task("Existem atualizações! A run será inciada")

file_path = parse_data(
file_path = task_download_files(
url=br_rf_cafir_constants.URL.value[0],
other_task_output=info,
upstream_tasks=[info, is_outdated],
file_list=arquivos,
headers=br_rf_cafir_constants.HEADERS.value,
data_atualizacao=data_atualizacao,
upstream_tasks=[arquivos, is_outdated],
)

wait_upload_table = create_table_and_upload_to_gcs(
Expand Down
91 changes: 31 additions & 60 deletions pipelines/datasets/br_rf_cafir/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@
from pipelines.datasets.br_rf_cafir.constants import constants as br_rf_cafir_constants
from pipelines.datasets.br_rf_cafir.utils import (
download_csv_files,
parse_date_parse_files,
preserve_zeros,
remove_accent,
remove_non_ascii_from_df,
parse_api_metadata,
decide_files_to_download,
strip_string,
)
from pipelines.utils.utils import log
Expand All @@ -27,48 +26,36 @@
max_retries=2,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def parse_files_parse_date(url) -> tuple[list[datetime], list[str]]:
"""Extrai os nomes dos arquivos e a data de disponibilização dos dados no FTP
def task_parse_api_metadata(url: str, headers:dict) -> pd.DataFrame:
return parse_api_metadata(url=url, headers=headers)

Args:
url (string): URL do FTP
Returns:
Tuple: Retorna uma tupla com duas listas. A primeira contém uma lista de datas de atualização dos dados e a segunda contém uma lista com os nomes dos arquivos.
"""
log("######## download_files_parse_data ########")

date_files = parse_date_parse_files(url)

return date_files
@task(
max_retries=2,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def task_decide_files_to_download(df: pd.DataFrame, data_especifica: datetime.date = None, data_maxima: bool = True) -> tuple[list[str],list[datetime]]:
return decide_files_to_download(df=df, data_especifica=data_especifica, data_maxima=data_maxima)


@task(
max_retries=3,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def parse_data(url: str, other_task_output: tuple[list[datetime], list[str]]) -> str:
def task_download_files(url: str, file_list: list[str], data_atualizacao:[datetime.date], headers: dict) -> str:
"""Essa task faz o download dos arquivos do FTP, faz o parse dos dados e salva os arquivos em um diretório temporário.
Returns:
str: Caminho do diretório temporário
"""

date = other_task_output[0]
log(f"###### Extraindo dados para data: {date}")

files_list = other_task_output[1]
log(f"###### Extraindo files: {files_list}")

# inicializa counter para ser usado na nomeação dos arquivos repetindo o padrão de divulgação dos dados
counter = 0
log(f"###### -----COUNTER: {counter}")
date = data_atualizacao
log(f"------ Extraindo dados para data: {date}")

list_n_cols = []
files_list = file_list
log(f"------ Os seguintes arquivos foram selecionados para download: {files_list}")

for file in files_list:
counter += 1
log(f"###### X-----COUNTER: {counter}")


log(f"Baixando arquivo: {file} de {url}")

Expand All @@ -79,6 +66,7 @@ def parse_data(url: str, other_task_output: tuple[list[datetime], list[str]]) ->
download_csv_files(
file_name=file,
url=complete_url,
headers=headers,
download_directory=br_rf_cafir_constants.PATH.value[0],
)

Expand All @@ -95,58 +83,41 @@ def parse_data(url: str, other_task_output: tuple[list[datetime], list[str]]) ->
converters={
col: preserve_zeros for col in br_rf_cafir_constants.COLUMN_NAMES.value
},
encoding="latin-1",
encoding="ISO-8859-1",
)

list_n_cols.append(df.shape[1])

# remove acentos
df["nome"] = df["nome"].apply(remove_accent)
df["endereco"] = df["endereco"].apply(remove_accent)

# remove não ascii
df = remove_non_ascii_from_df(df)

# tira os espacos em branco
df = df.applymap(strip_string)

log(f"Saving file: {file}")
log(f"Salvando arquivo: {file}")

# constroi diretório
os.makedirs(
br_rf_cafir_constants.PATH.value[1] + f"/imoveis_rurais/data={date}/",
exist_ok=True,
)

#NOTE: Com modificação do formato de divulgação do FTP os arquivos passaram a ser divulgados csvs particionados por UF
#A partir de 2025, a nomenclaruta dos no Storage arquivos mudou para: "imoveis_rurais_uf_numero.csv" no lugar de "imoveris_rurais_numero.csv"

save_path = (
br_rf_cafir_constants.PATH.value[1]
+ f"/imoveis_rurais/data={date}/"
+ "imoveis_rurais_"
+ str(counter)
#extrai uf e numeração do nome do arquivo
+ file.split(".")[-2]
+ ".csv"
)
log(f"save path: {save_path}")

# save new file as csv
df.to_csv(save_path, index=False, sep=",", na_rep="", encoding="utf-8",escapechar='\\')

# resolve ASCII 0 no momento da leitura do BQ. Ler e salvar de novo.
df = pd.read_csv(save_path, dtype=str)
df.to_csv(save_path, index=False, sep=",", na_rep="", encoding="utf-8",escapechar='\\')

log(f"----- Removendo o arquivo: {os.listdir(br_rf_cafir_constants.PATH.value[0])}")
log(f"Arquivo salvo: {save_path.split('/')[-1]}")

# remove o arquivo de input
os.system("rm -rf " + br_rf_cafir_constants.PATH.value[0] + "/" + "*")
del df

log(f"----- Removendo o arquivo: {os.listdir(br_rf_cafir_constants.PATH.value[0])} do diretório de input")

log(f"list_n_cols: O NUMERO DE COLUNAS É {list_n_cols}")

# gera paths
files_path = (
br_rf_cafir_constants.PATH.value[1]
+ "/"
+ br_rf_cafir_constants.TABLE.value[0]
+ "/"
)
# remove o arquivo de input
os.remove(os.path.join(br_rf_cafir_constants.PATH.value[0], file))

return files_path
return br_rf_cafir_constants.PATH.value[1] + f"/imoveis_rurais"
Loading

0 comments on commit bc70760

Please sign in to comment.