Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dados] br_ans_beneficiario.informacao_consolidada #1582

Closed
wants to merge 35 commits into from
Closed
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
d77723d
feat: add code br_ans_beneficiario
guissalustiano Mar 2, 2023
c5b26d1
feat: add metadata
guissalustiano Mar 2, 2023
19eab3d
fix: wrong type when load from input
guissalustiano Mar 2, 2023
8f4f7fd
remove .vscode
guissalustiano Mar 2, 2023
f6e998b
feat: remove run
guissalustiano Mar 2, 2023
1e94764
change observation_level to municipality
guissalustiano Mar 3, 2023
6e0c002
one line cleaning description
guissalustiano Mar 3, 2023
87b66c1
add release
guissalustiano Mar 3, 2023
a9a792d
Merge branch 'master' into master
mergify[bot] Mar 4, 2023
f988e9f
add row number
guissalustiano Mar 6, 2023
ed23b17
update code range
guissalustiano Mar 6, 2023
4843e9d
Merge branch 'master' into master
mergify[bot] Mar 6, 2023
78a9b3f
Merge branch 'master' into master
mergify[bot] Mar 6, 2023
4064fbe
Merge branch 'master' into master
mergify[bot] Mar 7, 2023
942df45
Merge branch 'master' into master
mergify[bot] Mar 13, 2023
d7440e5
Merge branch 'master' into master
mergify[bot] Mar 14, 2023
41b1c6a
Merge branch 'master' into master
mergify[bot] Mar 15, 2023
22698b0
Merge branch 'master' into master
mergify[bot] Mar 15, 2023
4d8f8e4
Merge branch 'master' into master
mergify[bot] Mar 15, 2023
8d3f1f8
code partition by state
guissalustiano Mar 28, 2023
b6d2604
remove index and fix uf partition name
guissalustiano Apr 2, 2023
97dd160
add observetion about uf
guissalustiano Apr 2, 2023
efc497d
fix pr comments
guissalustiano Apr 17, 2023
c572990
fix table config qtd to quantidade
guissalustiano Apr 19, 2023
5a12ca8
Merge branch 'basedosdados:master' into master
guissalustiano Apr 19, 2023
d1f4ac8
feat: add count lines script
guissalustiano Apr 28, 2023
2b49b05
Merge branch 'master' into master
mergify[bot] Apr 28, 2023
cdda122
Merge branch 'master' into master
mergify[bot] Apr 28, 2023
15408d2
Merge branch 'master' into master
mergify[bot] May 2, 2023
4eb78a8
Merge branch 'master' into master
mergify[bot] Jun 16, 2023
0ee05e3
Merge branch 'master' into master
mergify[bot] Jun 19, 2023
52c7549
Merge branch 'master' into master
mergify[bot] Jun 21, 2023
48a4094
Merge branch 'master' into master
mergify[bot] Jun 23, 2023
f4ff95f
Merge branch 'master' into master
mergify[bot] Jun 29, 2023
6dc507f
Merge branch 'master' into master
mergify[bot] Jul 3, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions bases/br_ans_beneficiario/dataset_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@

# Nome (slug) do conjunto no CKAN
# Exemplos: br-ibge-populacao, br-tse-eleicoes
name: br-ans-beneficiarios

# Título do conjunto, a ser exibido no mecanismo de busca.
# Exemplo: População brasileira
title: Beneficiarios de plano de Saúde

# Qual organização disponibiliza os dados originais?
# Opções: escolher dessa lista -> https://basedosdados.org/api/3/action/organization_list
# Se a organização não estiver na lista acima ou o nome não estiver conforme o manual de estilo
# criar ou renomear a organização em https://basedosdados.org/organization/
# Exemplos: br-ibge, br-tse, br-rj-gov
organization: br-ans

# Descrição do conjunto
notes:

# Quais temas caracterizam a base?
# Opções: escolher dessa lista -> https://basedosdados.org/api/3/action/group_list
# Importante: preencher com a chave, e não o valor.
groups:
- saude

# Quais etiquetas caracterizam a base?
# Opções: escolher dessa lista -> https://basedosdados.org/api/3/action/tag_list
# Exemplos:
# - fertilidade
# - preco
# - desmatamento
# Caso crie etiquetas novas, as regras são:
# - letras minúsculas
# - sem acentos
# - sempre no singular
# - não repita nomes de grupos (ex. educacao, saude, meio ambiente, economia, etc.)
tags:
Copy link
Contributor

@tricktx tricktx Mar 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Colocar as tags

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideias de quais tags coloca?


# Não altere esse campo.
# Data da última modificação dos metadados gerada automaticamente pelo CKAN.
metadata_modified:
2 changes: 2 additions & 0 deletions bases/br_ans_beneficiario/informacao_consolidada/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
input
output
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from glob import glob
import pandas as pd
from loguru import logger

csv_filepaths = glob('../input/**/*.csv', recursive=True)

count = 0
for filepath in csv_filepaths:
logger.info(f"reading {filepath}")
df = pd.read_csv(filepath, encoding="utf-8", index_col=0)
size = len(df.index)
logger.debug(f"{filepath} has {size}")
count += size

logger.info(f"Total: {count}")
164 changes: 164 additions & 0 deletions bases/br_ans_beneficiario/informacao_consolidada/code/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
from typing import Optional, Tuple
from dateutil.relativedelta import relativedelta
import pandas as pd
from multiprocessing import Pool
from datetime import datetime
from loguru import logger
from pathlib import Path
from ftputil import FTPHost
from datetime import datetime
from io import BytesIO
import tempfile
import zipfile
from functools import reduce

host = FTPHost('ftp.dadosabertos.ans.gov.br', 'anonymous')
host.keep_alive()

TABLE_NAME = 'informacoes_consolidadas_de_beneficiarios'
FTP_PATH = 'FTP/PDA/informacoes_consolidadas_de_beneficiarios'
RAW_COLLUNS_TYPE = {
'#ID_CMPT_MOVEL': str,
'CD_OPERADORA': str,
'NM_RAZAO_SOCIAL': str,
'NR_CNPJ': str,
'MODALIDADE_OPERADORA': str,
'SG_UF': str,
'CD_MUNICIPIO': str,
'NM_MUNICIPIO': str,
'TP_SEXO': str,
'DE_FAIXA_ETARIA': str,
'DE_FAIXA_ETARIA_REAJ': str,
'CD_PLANO': str,
'TP_VIGENCIA_PLANO': str,
'DE_CONTRATACAO_PLANO': str,
'DE_SEGMENTACAO_PLANO': str,
'DE_ABRG_GEOGRAFICA_PLANO': str,
'COBERTURA_ASSIST_PLAN': str,
'TIPO_VINCULO': str,
'QT_BENEFICIARIO_ATIVO': int,
'QT_BENEFICIARIO_ADERIDO': int,
'QT_BENEFICIARIO_CANCELADO': int,
'QT_BENEFICIARIO_CANCELADO': int,
'DT_CARGA': str,
}


def range_year_month(start: datetime, stop = datetime.now(), step = relativedelta(months=1)):
if (start > stop):
return

yield start
yield from range_year_month(start + step, stop)

def host_months_path():
for date in range_year_month(datetime(2014, 5, 1), datetime(2022, 12, 31)):
month_path = date.strftime("%Y%m")
yield FTP_PATH + '/' + month_path, date



def host_list(basepath: str):
for path in host.listdir(basepath)[::-1]:
complete_path = host.path.join(basepath, str(path))
yield complete_path

def host_read(path: str) -> BytesIO:
filename, file_extension = host.path.splitext(host.path.basename(path))
with tempfile.NamedTemporaryFile(prefix=filename, suffix=file_extension) as tmp:
logger.info(f'ftp downloading {path}')
host.download(path, tmp.name)
return BytesIO(tmp.read())

def read_csv_zip_to_dataframe(path) -> pd.DataFrame:
zfile = host_read(path)
with zipfile.ZipFile(zfile) as zref:
for filename in zref.namelist():
if not filename.endswith('.csv'):
continue
path = host.path.join(host.path.dirname(path), filename)
logger.debug(f"load {path}")

filecontent = BytesIO(zref.read(filename))

return pd.read_csv(filecontent, sep=';', encoding='cp1252', dtype=RAW_COLLUNS_TYPE)
raise Exception(f"CSV not foun in {path}")

def process(df: pd.DataFrame):
time_col = pd.to_datetime(df['#ID_CMPT_MOVEL'], format='%Y%m')
df['ano'] = time_col.dt.year
df['mes'] = time_col.dt.month
del df['#ID_CMPT_MOVEL']
del df['NM_MUNICIPIO']
del df['DT_CARGA']

df.rename(columns={
'CD_OPERADORA': 'codigo_operadora',
'NM_RAZAO_SOCIAL': 'razao_social',
'NR_CNPJ': 'cnpj',
'MODALIDADE_OPERADORA': 'modalidade_operadora',
'SG_UF': 'sigla_uf',
'CD_MUNICIPIO': 'id_municipio_6',
'TP_SEXO': 'sexo',
'DE_FAIXA_ETARIA': 'faixa_etaria',
'DE_FAIXA_ETARIA_REAJ': 'faixa_etaria_reajuste',
'CD_PLANO': 'codigo_plano',
'TP_VIGENCIA_PLANO': 'tipo_vigencia_plano',
'DE_CONTRATACAO_PLANO': 'contratacao_beneficiario',
'DE_SEGMENTACAO_PLANO': 'segmentacao_beneficiario',
'DE_ABRG_GEOGRAFICA_PLANO': 'abrangencia_beneficiario',
'COBERTURA_ASSIST_PLAN': 'cobertura_assistencia_beneficiario',
'TIPO_VINCULO': 'tipo_vinculo',
'QT_BENEFICIARIO_ATIVO': 'quantidade_beneficiario_ativo',
'QT_BENEFICIARIO_ADERIDO': 'quantidade_beneficiario_aderido',
'QT_BENEFICIARIO_CANCELADO': 'quantidade_beneficiario_cancelado'
}, inplace=True)

df['cnpj'] = df['cnpj'].str.zfill(14)

# Using parquet, don't need external dictionary
df['tipo_vigencia_plano'].replace({
'P': 'Posterior à Lei 9656/1998 ou planos adaptados à lei',
'A': 'Anterior à Lei 9656/1998'
})

return df

if __name__ == '__main__':
for month_path, month_date in host_months_path():
for state_path in host_list(month_path):
state = state_path.split("_")[-1].split(".")[0]

output_path = Path('../output') / f'ano={month_date.strftime("%Y")}' / f'mes={month_date.strftime("%m")}' / f'sigla_uf={state}' / f'ben{month_date.strftime("%Y%m")}-{state}.parquet'

if output_path.exists():
logger.info(f"Jumping path {output_path}. Already download")
continue

input_path = Path('../input') / f'ano={month_date.strftime("%Y")}' / f'mes={month_date.strftime("%m")}' / f'ben{month_date.strftime("%Y%m")}_{state}.csv'

if input_path.exists():
logger.debug(f"reading input in {input_path}")
df = pd.read_csv(input_path, encoding="utf-8", dtype=RAW_COLLUNS_TYPE, index_col=0)
else:
# Wtf, pq tem um repositorio aqui?
# https://dadosabertos.ans.gov.br/FTP/PDA/informacoes_consolidadas_de_beneficiarios/201602/201607/
if not host.path.isfile(state_path):
continue

df = read_csv_zip_to_dataframe(state_path)
input_path.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(input_path, encoding="utf-8")

logger.info("Cleaning dataset")
df = process(df)

output_path.parent.mkdir(parents=True, exist_ok=True)

# delete partition columns
del df['ano']
del df['mes']
del df['sigla_uf']

logger.info(f"Writing to output {output_path.as_posix()}")
df.to_parquet(output_path, index=False)
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
python-dateutil
ftputil
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[InternetShortcut]
URL=https://docs.google.com/spreadsheets/d/11hTzsBQja-bpM9dac9NkmGvSI39xqiW7TC8amQUUvzc/edit#gid=0
43 changes: 43 additions & 0 deletions bases/br_ans_beneficiario/informacao_consolidada/publish.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
Query para publicar a tabela.

Esse é o lugar para:
- modificar nomes, ordem e tipos de colunas
- dar join com outras tabelas
- criar colunas extras (e.g. logs, proporções, etc.)

Qualquer coluna definida aqui deve também existir em `table_config.yaml`.

# Além disso, sinta-se à vontade para alterar alguns nomes obscuros
# para algo um pouco mais explícito.

TIPOS:
- Para modificar tipos de colunas, basta substituir STRING por outro tipo válido.
- Exemplo: `SAFE_CAST(column_name AS NUMERIC) column_name`
- Mais detalhes: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
*/

CREATE VIEW basedosdados.br_ans_beneficiario.informacao_consolidada AS
SELECT
SAFE_CAST(ano AS INT64) ano,
SAFE_CAST(mes AS INT64) mes,
SAFE_CAST(codigo_operadora AS STRING) codigo_operadora,
SAFE_CAST(razao_social AS STRING) razao_social,
SAFE_CAST(cnpj AS STRING) cnpj,
SAFE_CAST(modalidade_operadora AS STRING) modalidade_operadora,
SAFE_CAST(sigla_uf AS STRING) sigla_uf,
SAFE_CAST(id_municipio_6 AS STRING) id_municipio_6,
SAFE_CAST(sexo AS STRING) sexo,
SAFE_CAST(faixa_etaria AS STRING) faixa_etaria,
SAFE_CAST(faixa_etaria_reajuste AS STRING) faixa_etaria_reajuste,
SAFE_CAST(codigo_plano AS STRING) codigo_plano,
SAFE_CAST(tipo_vigencia_plano AS STRING) tipo_vigencia_plano,
SAFE_CAST(contratacao_beneficiario AS STRING) contratacao_beneficiario,
SAFE_CAST(segmentacao_beneficiario AS STRING) segmentacao_beneficiario,
SAFE_CAST(abrangencia_beneficiario AS STRING) abrangencia_beneficiario,
SAFE_CAST(cobertura_assistencia_beneficiario AS STRING) cobertura_assistencia_beneficiario,
SAFE_CAST(tipo_vinculo AS STRING) tipo_vinculo,
SAFE_CAST(quantidade_beneficiario_ativo AS INT64) quantidade_beneficiario_ativo,
SAFE_CAST(quantidade_beneficiario_aderido AS INT64) quantidade_beneficiario_aderido,
SAFE_CAST(quantidade_beneficiario_cancelado AS INT64) quantidade_beneficiario_cancelado
FROM basedosdados-dev.br_ans_beneficiario_staging.informacao_consolidada AS t
Loading