From d77723da3fd6baf6f4aaf7d3815e35932d04b7ae Mon Sep 17 00:00:00 2001 From: Guilherme Salustiano Date: Wed, 1 Mar 2023 23:35:25 -0300 Subject: [PATCH 01/16] feat: add code br_ans_beneficiario --- .../informacao_consolidada/.gitignore | 2 + .../informacao_consolidada/code/pipeline.py | 161 ++++++++++++++++++ .../code/requirements.txt | 2 + .../extra/architecture/_table_id_.xlsx | Bin 0 -> 5051 bytes .../architecture/informacao_consolidada.url | 2 + 5 files changed, 167 insertions(+) create mode 100644 bases/br_ans_beneficiario/informacao_consolidada/.gitignore create mode 100644 bases/br_ans_beneficiario/informacao_consolidada/code/pipeline.py create mode 100644 bases/br_ans_beneficiario/informacao_consolidada/code/requirements.txt create mode 100644 bases/br_ans_beneficiario/informacao_consolidada/extra/architecture/_table_id_.xlsx create mode 100644 bases/br_ans_beneficiario/informacao_consolidada/extra/architecture/informacao_consolidada.url diff --git a/bases/br_ans_beneficiario/informacao_consolidada/.gitignore b/bases/br_ans_beneficiario/informacao_consolidada/.gitignore new file mode 100644 index 000000000..1aafc3b01 --- /dev/null +++ b/bases/br_ans_beneficiario/informacao_consolidada/.gitignore @@ -0,0 +1,2 @@ +input +output diff --git a/bases/br_ans_beneficiario/informacao_consolidada/code/pipeline.py b/bases/br_ans_beneficiario/informacao_consolidada/code/pipeline.py new file mode 100644 index 000000000..fd5ab1155 --- /dev/null +++ b/bases/br_ans_beneficiario/informacao_consolidada/code/pipeline.py @@ -0,0 +1,161 @@ +from typing import Optional, Tuple +from dateutil.relativedelta import relativedelta +import pandas as pd +from multiprocessing import Pool +from datetime import datetime +from loguru import logger +from pathlib import Path +from ftputil import FTPHost +from datetime import datetime +from io import BytesIO +import tempfile +import zipfile +from functools import reduce + +host = FTPHost('ftp.dadosabertos.ans.gov.br', 'anonymous') + +TABLE_NAME = 'informacoes_consolidadas_de_beneficiarios' +FTP_PATH = 'FTP/PDA/informacoes_consolidadas_de_beneficiarios' +RAW_COLLUNS_TYPE = { + '#ID_CMPT_MOVEL': str, + 'CD_OPERADORA': str, + 'NM_RAZAO_SOCIAL': str, + 'NR_CNPJ': str, + 'MODALIDADE_OPERADORA': str, + 'SG_UF': str, + 'CD_MUNICIPIO': str, + 'NM_MUNICIPIO': str, + 'TP_SEXO': str, + 'DE_FAIXA_ETARIA': str, + 'DE_FAIXA_ETARIA_REAJ': str, + 'CD_PLANO': str, + 'TP_VIGENCIA_PLANO': str, + 'DE_CONTRATACAO_PLANO': str, + 'DE_SEGMENTACAO_PLANO': str, + 'DE_ABRG_GEOGRAFICA_PLANO': str, + 'COBERTURA_ASSIST_PLAN': str, + 'TIPO_VINCULO': str, + 'QT_BENEFICIARIO_ATIVO': int, + 'QT_BENEFICIARIO_ADERIDO': int, + 'QT_BENEFICIARIO_CANCELADO': int, + 'QT_BENEFICIARIO_CANCELADO': int, + 'DT_CARGA': str, +} + + +def range_year_month(start: datetime, stop = datetime.now(), step = relativedelta(months=1)): + if (start > stop): + return + + yield start + yield from range_year_month(start + step, stop) + +def host_months_path(): + for date in range_year_month(datetime(day=1, month=5, year=2014)): + month_path = date.strftime("%Y%m") + yield FTP_PATH + '/' + month_path, date + + + +def host_list(basepath: str): + for path in host.listdir(basepath)[::-1]: + complete_path = host.path.join(basepath, str(path)) + yield complete_path + +def host_read(path: str) -> BytesIO: + filename, file_extension = host.path.splitext(host.path.basename(path)) + with tempfile.NamedTemporaryFile(prefix=filename, suffix=file_extension) as tmp: + logger.info(f'ftp downloading {path}') + host.download(path, tmp.name) + return BytesIO(tmp.read()) + +def read_csv_zip_to_dataframe(path) -> pd.DataFrame: + zfile = host_read(path) + with zipfile.ZipFile(zfile) as zref: + for filename in zref.namelist(): + if not filename.endswith('.csv'): + continue + path = host.path.join(host.path.dirname(path), filename) + logger.debug(f"load {path}") + + filecontent = BytesIO(zref.read(filename)) + + return pd.read_csv(filecontent, sep=';', encoding='cp1252', dtype=RAW_COLLUNS_TYPE) + raise Exception(f"CSV not foun in {path}") + +def process(df: pd.DataFrame): + time_col = pd.to_datetime(df['#ID_CMPT_MOVEL'], format='%Y%m') + df['ano'] = time_col.dt.year + df['mes'] = time_col.dt.month + del df['#ID_CMPT_MOVEL'] + del df['NM_MUNICIPIO'] + del df['DT_CARGA'] + + df.rename(columns={ + 'CD_OPERADORA': 'codigo_operadora', + 'NM_RAZAO_SOCIAL': 'razao_social', + 'NR_CNPJ': 'cnpj', + 'MODALIDADE_OPERADORA': 'modalidade_operadora', + 'SG_UF': 'sigla_uf', + 'CD_MUNICIPIO': 'id_municipio_6', + 'TP_SEXO': 'sexo', + 'DE_FAIXA_ETARIA': 'faixa_etaria', + 'DE_FAIXA_ETARIA_REAJ': 'faixa_etaria_reajuste', + 'CD_PLANO': 'codigo_plano', + 'TP_VIGENCIA_PLANO': 'tipo_vigencia_plano', + 'DE_CONTRATACAO_PLANO': 'contratacao_beneficiario', + 'DE_SEGMENTACAO_PLANO': 'segmentacao_beneficiario', + 'DE_ABRG_GEOGRAFICA_PLANO': 'abrangencia_beneficiario', + 'COBERTURA_ASSIST_PLAN': 'cobertura_assistencia_beneficiario', + 'TIPO_VINCULO': 'tipo_vinculo', + 'QT_BENEFICIARIO_ATIVO': 'qtd_beneficiario_ativo', + 'QT_BENEFICIARIO_ADERIDO': 'qtd_beneficiario_aderido', + 'QT_BENEFICIARIO_CANCELADO': 'qtd_beneficiario_cancelado' + }, inplace=True) + + # df['cnpj'] = df['cnpj'].str.zfill(14) + # df['cnpj'] = df['cnpj'].str.zfill(14) + + df['tipo_vigencia_plano'].replace({ + 'P': 'Posterior à Lei 9656/1998 ou planos adaptados à lei', + 'A': 'Anterior à Lei 9656/1998' + }) + + return df + +if __name__ == '__main__': + for month_path, month_date in host_months_path(): + output_path = Path('../output') / f'ano={month_date.strftime("%Y")}' / f'mes={month_date.strftime("%m")}' / f'ben{month_date.strftime("%Y%m")}.parquet' + + if output_path.exists(): + logger.info(f"Jumping path {output_path}. Already download") + continue + + dfs = [] + for state_path in host_list(month_path): + state = state_path.split("_")[-1].split(".")[0] + input_path = Path('../input') / f'ano={month_date.strftime("%Y")}' / f'mes={month_date.strftime("%m")}' / f'ben{month_date.strftime("%Y%m")}_{state}.csv' + + if input_path.exists(): + logger.debug(f"reading input in {input_path}") + state_df = pd.read_csv(input_path, encoding="utf-8") + else: + state_df = read_csv_zip_to_dataframe(state_path) + input_path.parent.mkdir(parents=True, exist_ok=True) + state_df.to_csv(input_path, encoding="utf-8") + dfs.append(state_df) + + logger.info("Concat states dataframes") + df = pd.concat(dfs) + + logger.info("Cleaning dataset") + df = process(df) + + output_path.parent.mkdir(parents=True, exist_ok=True) + + # delete partition columns + del df['ano'] + del df['mes'] + + logger.info(f"Writing to output {output_path.as_posix()}") + df.to_parquet(output_path) diff --git a/bases/br_ans_beneficiario/informacao_consolidada/code/requirements.txt b/bases/br_ans_beneficiario/informacao_consolidada/code/requirements.txt new file mode 100644 index 000000000..474aa6563 --- /dev/null +++ b/bases/br_ans_beneficiario/informacao_consolidada/code/requirements.txt @@ -0,0 +1,2 @@ +python-dateutil +ftputil diff --git a/bases/br_ans_beneficiario/informacao_consolidada/extra/architecture/_table_id_.xlsx b/bases/br_ans_beneficiario/informacao_consolidada/extra/architecture/_table_id_.xlsx new file mode 100644 index 0000000000000000000000000000000000000000..2d316bb7d94d0c4adc0d3d1f9fabca7d41e4c296 GIT binary patch literal 5051 zcmai22UJtrwxtt#fKa6hNbgOM-lT(oNReJbO%zb7NH0MIAt0bAHFTu+BE2aBf=H1L z3P=e^5h46U@B4E7@7}-87|A(fWY3&E_gZt$wGDM}@jzGr001jDz|IuwLeQh{{XB#n zz3hBlAz<(S+=vSKd3t1ySV1~PNC|hTzmngabG4wSRs~s)=!=jqa6v9TNf;#0oedt< zmU0cmX7LZaAK^5hLM_m@$rt-22Vx(W**O0BSA(CW^h=#T9n+}(3sP8EOG6K6BUS3(+ z=)lduUEW9OO-9s75+5mNGM!vykD#?b!9Q|dC@mr^zaejUirxD#OdN(2EtSgGu*o~6 zsd6SqbQQ&T@TuRZZE(V-{}ZSWo7tdCpiwoTKf3aYbz8ENhVOQ0y$ddL=um&d)#K$) zm3#wIwLJVDmDY1*Z>lfX+ZgH)0toG0!|}1Pun2Lnunhk{Dt)0|?%pm=PCnkkKfiua znKGZ0Ap+`snYHu!j_~;n$B@izi{?W&FnGoq%&x7WlExX+_CPfTke{my<&4-l%6ylc zt7ZVc?ZU0eaM`JJCMw^(LRTwlP_^{@^rH}tl>-x;fk%_b;S47o1rLg4R7ntsPIT|g za;$veZ5@di7-O=3O={F(BsWD3!=|hMW?KbQNz?++AsozgsBuj;GF3G{`z4;SU)xWIl*6B^`lw}J76nInFDJ9&i1*WEG zB4dfWg~e_`y&P8fUXd4B%fW2k1Uy;M$Q ztLM~=Z7XwE?iosLJcjd0O;E0unwxbNRojjD2{VcFJyMdf?k4)%waCZhGEnI^oMM6c zLc#1GD>^LP`;L;LY z+lHf|V;C~9_bw~koX=54ckS3WJ;mNsbxoNNrw6a=mAB%w1^ecuC3%>$d*rz2K2&MGdnNwd}!?iowcGmo1?%Wi6zg1 zQDCehR@0D&zsMPh(05!+M#>u7MC;;UVQ~`vTSjvHD((vzY2)SOaWOB2FwgPtbcj1e zfCLNFe$gU<$sZzPB(^jnAo(n|ng|LH7Ea{o%(M8Mh0fJC?}EB)Bq_wJFGXIJ;m=;d z7pWRtA0Ve1{&cq$mzLThzPg&zE*qfNTC6YOFerWZd5|MR<&Dm?qvZ-&&kt5hUjb>Lw zN#UG#P}8klNnlzvtJqht18NKhWrdr1)#w@Wp)2#BtCIQ`2R<%No=(C)KYteF+gme` zSvKHV7z%f(wPUiCxA{iZvzx3IH#h1w#4C~c9UpO|Tv2WGfu2D+InEqTHaN@T(cl1hR?M)F4JEBVQT zYkuML%x;a1Qx;ai^*w>OEt^)tLG|HB7A=ZYn;zgP=9c*-aF-ugx_PTKd@?Hn@ijQ4 z&U1bq;_ibR>vgi~{{(Sm611fpH73a%R@zy{{5T9lQZe;TsFM>|unS(2d3o3#Rn>6+}jQemvjfT-g`d*uPOw~|-KfcZNSzbZ`%jQ+-`zb2s z=HUIqM5%89v?RtMue5B0JIzY@JDDj(eQmfq>xT2~WpeE86bo<|Zr&ioU#N0Fw$2B{ zU>Q@T-YY&en2-<5FG!jqn_(b=u4k5wF;n*5--Lm?!YI=l<)D)>P#A913u646m0mdOLcNUVZ9=7?|-g=GZcETkY4JHd$MLvnt z&St-dkIR2l8SjiecVXKfu>5+K(ULvLd}}pf?%q)3LC{mbKvj`E!QTAF9Sg@6{)5sZgPDPXrNz(=IvtV<>YAW zFryt7x~qgZQF7mbr=vJk-~E)L{zlSSzYw$HiB}rxU{Oq*)a1( zDUG5no=gEz<5gDP>my=obAxcoFwz*t>wz|t6jC^qML`+nBz1;h#5)EXR75%D2ICg} zWhv9k`e-m4Y zp%oE!k;vkkpdYBm?M$+n3|8epA05E5>kYA3yDL+DNp+Ig8<%VX3PzBG%y95Ay8Ks-Ut%#fa!5SiI*gC zjjN+CA=wD*dRpP29$aP4wJ9wq>8NXFuWlk8uPWlP2IxM~;hCl81Yz?u@mY;j<8CYI zQ8=A|F4dbS(!{whQ>EPJ4%ZBIEHl-8k{vu&NbWqIYd=qg}iXQni> zYiDow3>s0B1VTr7sdULWkA^h7995$v(*-s=innV5sTLV^kq?doIW>ArHHCd@m_68S zQ@4V#bMKv}4+Udma!fhhIGGpi3Hblj6HxrhF^ng$heF+d`uwCyNVo_v3T;k9TDYV5 z%8Wq$MmGnI?zwy(tW2=&0dve`uWSXfhrpecGk2%jb`;VdR&R}P3La-=3YyjFn&X(t z=5J-&+Ps9uN$RM8Oo^5SKwn8iJ%^&-ChYgMdiO<~c=DJrJ0yFvIFj*nj!D|C=XtTH z=Z&%YxyS)%+(Kfa6cOv<*Rutisk{`U7mKxZ{O;{V*e=s7b;M{MW`EG5cm4Qj(NJe6 ztYpbZmPyl*O~!{RAp0O5I4GmRbBY`n08AzJSYKj(*E$*DN^*Zsij_W2Yk4gB{b`H3 zxb=3`gxpSZU(lkk^r6+^H$|h=J%JzL7O_9-(_V3{sCD|MttXID@)k5a@Uw;l1k<-Y z_0b@9E(xEzWE)GP=>#$fa#vq(ot@?-mevU>cU|;#>xt>Qo*1TWX}B}a;9^6}@k2R? z$r6QwS%z$@`^VboGRG+9Kid+axG?RvMSj~JNCIA zkK+u25IFWxtdfgeZ~EjgQ))pLs6uBQd){+;KH^e+#u$JM<@N30FN9Iy1yFl@w#46w ze)@wOm|j!gM8QJyD9z4t-qX5rvRi!Wo+t@9(ma#{_Rh$8`OZiqfKq;5f}JeG3}LF_ z`M62Fk#FdF`Wl0wjU%k}X5wKT&Ru#A6!$d=J@6g6H?B$sxA-`b0$y4Af=OXE{!%4j zT}swqGr@sG*RS8nxMQl@Znj(3%&kWp##0IV>5JAwJLCk2_8G=Ruc@8jrkApsC?ID0yS1^)`m70J=y&~!{ z0`j2QY*pxq@cipOfbp!Bs!)iJ6U4{H#Q%Z~(Tp?TVG;}*2ZZ(eFyvxy zk2wtad;HVU$e$<|`?0^H=%D`>jdGy}`+X|#C(6Z+2GhI!j!$SY`8&#g?d1Lhyy)C8 zN%cDx(Nv%T|7?r?1i0v1Fb3~;Xajx%{2#mbC&I;D3FB{mM?BFlg#UFre-6E7)uckWZr Zlld>@ZK#7!fI%llUm0jAB%#DS{SOsKG4ucc literal 0 HcmV?d00001 diff --git a/bases/br_ans_beneficiario/informacao_consolidada/extra/architecture/informacao_consolidada.url b/bases/br_ans_beneficiario/informacao_consolidada/extra/architecture/informacao_consolidada.url new file mode 100644 index 000000000..20379cb81 --- /dev/null +++ b/bases/br_ans_beneficiario/informacao_consolidada/extra/architecture/informacao_consolidada.url @@ -0,0 +1,2 @@ +[InternetShortcut] +URL=https://docs.google.com/spreadsheets/d/11hTzsBQja-bpM9dac9NkmGvSI39xqiW7TC8amQUUvzc/edit#gid=0 From c5b26d11647fd2b445e3f02c8516af7d4901165e Mon Sep 17 00:00:00 2001 From: Guilherme Salustiano Date: Thu, 2 Mar 2023 02:56:44 -0300 Subject: [PATCH 02/16] feat: add metadata --- .vscode/launch.json | 16 + bases/br_ans_beneficiario/dataset_config.yaml | 41 ++ .../informacao_consolidada/code/pipeline.py | 1 + .../informacao_consolidada/publish.sql | 43 ++ .../informacao_consolidada/table_config.yaml | 446 ++++++++++++++++++ run.py | 11 + 6 files changed, 558 insertions(+) create mode 100644 .vscode/launch.json create mode 100644 bases/br_ans_beneficiario/dataset_config.yaml create mode 100644 bases/br_ans_beneficiario/informacao_consolidada/publish.sql create mode 100644 bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml create mode 100644 run.py diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 000000000..4aec051fb --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,16 @@ +{ + // Use o IntelliSense para saber mais sobre os atributos possíveis. + // Focalizar para exibir as descrições dos atributos existentes. + // Para obter mais informações, acesse: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python: Current File", + "type": "python", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal", + "justMyCode": false + } + ] +} \ No newline at end of file diff --git a/bases/br_ans_beneficiario/dataset_config.yaml b/bases/br_ans_beneficiario/dataset_config.yaml new file mode 100644 index 000000000..c89f1e843 --- /dev/null +++ b/bases/br_ans_beneficiario/dataset_config.yaml @@ -0,0 +1,41 @@ + +# Nome (slug) do conjunto no CKAN +# Exemplos: br-ibge-populacao, br-tse-eleicoes +name: br-ans-beneficiarios + +# Título do conjunto, a ser exibido no mecanismo de busca. +# Exemplo: População brasileira +title: Beneficiarios de plano de Saúde + +# Qual organização disponibiliza os dados originais? +# Opções: escolher dessa lista -> https://basedosdados.org/api/3/action/organization_list +# Se a organização não estiver na lista acima ou o nome não estiver conforme o manual de estilo +# criar ou renomear a organização em https://basedosdados.org/organization/ +# Exemplos: br-ibge, br-tse, br-rj-gov +organization: br-ans + +# Descrição do conjunto +notes: + +# Quais temas caracterizam a base? +# Opções: escolher dessa lista -> https://basedosdados.org/api/3/action/group_list +# Importante: preencher com a chave, e não o valor. +groups: + - saude + +# Quais etiquetas caracterizam a base? +# Opções: escolher dessa lista -> https://basedosdados.org/api/3/action/tag_list +# Exemplos: +# - fertilidade +# - preco +# - desmatamento +# Caso crie etiquetas novas, as regras são: +# - letras minúsculas +# - sem acentos +# - sempre no singular +# - não repita nomes de grupos (ex. educacao, saude, meio ambiente, economia, etc.) +tags: + +# Não altere esse campo. +# Data da última modificação dos metadados gerada automaticamente pelo CKAN. +metadata_modified: diff --git a/bases/br_ans_beneficiario/informacao_consolidada/code/pipeline.py b/bases/br_ans_beneficiario/informacao_consolidada/code/pipeline.py index fd5ab1155..9694b3ce2 100644 --- a/bases/br_ans_beneficiario/informacao_consolidada/code/pipeline.py +++ b/bases/br_ans_beneficiario/informacao_consolidada/code/pipeline.py @@ -116,6 +116,7 @@ def process(df: pd.DataFrame): # df['cnpj'] = df['cnpj'].str.zfill(14) # df['cnpj'] = df['cnpj'].str.zfill(14) + # Using parquet, don't need external dictionary df['tipo_vigencia_plano'].replace({ 'P': 'Posterior à Lei 9656/1998 ou planos adaptados à lei', 'A': 'Anterior à Lei 9656/1998' diff --git a/bases/br_ans_beneficiario/informacao_consolidada/publish.sql b/bases/br_ans_beneficiario/informacao_consolidada/publish.sql new file mode 100644 index 000000000..eac22da73 --- /dev/null +++ b/bases/br_ans_beneficiario/informacao_consolidada/publish.sql @@ -0,0 +1,43 @@ +/* +Query para publicar a tabela. + +Esse é o lugar para: + - modificar nomes, ordem e tipos de colunas + - dar join com outras tabelas + - criar colunas extras (e.g. logs, proporções, etc.) + +Qualquer coluna definida aqui deve também existir em `table_config.yaml`. + +# Além disso, sinta-se à vontade para alterar alguns nomes obscuros +# para algo um pouco mais explícito. + +TIPOS: + - Para modificar tipos de colunas, basta substituir STRING por outro tipo válido. + - Exemplo: `SAFE_CAST(column_name AS NUMERIC) column_name` + - Mais detalhes: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types +*/ + +CREATE VIEW basedosdados-379403.br_ans_beneficiario.informacao_consolidada AS +SELECT +SAFE_CAST(ano AS INT64) ano, +SAFE_CAST(mes AS INT64) mes, +SAFE_CAST(codigo_operadora AS STRING) codigo_operadora, +SAFE_CAST(razao_social AS STRING) razao_social, +SAFE_CAST(cnpj AS STRING) cnpj, +SAFE_CAST(modalidade_operadora AS STRING) modalidade_operadora, +SAFE_CAST(sigla_uf AS STRING) sigla_uf, +SAFE_CAST(id_municipio_6 AS STRING) id_municipio_6, +SAFE_CAST(sexo AS STRING) sexo, +SAFE_CAST(faixa_etaria AS STRING) faixa_etaria, +SAFE_CAST(faixa_etaria_reajuste AS STRING) faixa_etaria_reajuste, +SAFE_CAST(codigo_plano AS STRING) codigo_plano, +SAFE_CAST(tipo_vigencia_plano AS STRING) tipo_vigencia_plano, +SAFE_CAST(contratacao_beneficiario AS STRING) contratacao_beneficiario, +SAFE_CAST(segmentacao_beneficiario AS STRING) segmentacao_beneficiario, +SAFE_CAST(abrangencia_beneficiario AS STRING) abrangencia_beneficiario, +SAFE_CAST(cobertura_assistencia_beneficiario AS STRING) cobertura_assistencia_beneficiario, +SAFE_CAST(tipo_vinculo AS STRING) tipo_vinculo, +SAFE_CAST(qtd_beneficiario_ativo AS INT64) qtd_beneficiario_ativo, +SAFE_CAST(qtd_beneficiario_aderido AS INT64) qtd_beneficiario_aderido, +SAFE_CAST(qtd_beneficiario_cancelado AS INT64) qtd_beneficiario_cancelado +FROM basedosdados-staging-379403.br_ans_beneficiario_staging.informacao_consolidada AS t diff --git a/bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml b/bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml new file mode 100644 index 000000000..04ce171a1 --- /dev/null +++ b/bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml @@ -0,0 +1,446 @@ + +# Igual ao dataset.name mas como lower case. +# Exemplos: br_ibge_populacao, br_inep_censo_escolar +dataset_id: br_ans_beneficiario + +table_id: informacao_consolidada + +# Título da tabela. +title: Informações consolidadas de Beneficiários + +# Descreva a tabela. Essas são as primeiras frases que um usuário vai ver. +# Você não precisa ser muito conciso. Sinta-se a vontade para dar exemplos de +# como usar os dados. +# Se souber, liste também aplicações: pesquisa, apps, etc. que usem os dados., +description: O conjunto contém Informações consolidadas de beneficiários por competência. Este será alterado para inclusão de dados acima de 5 anos e melhorias na qualidade dos dados + +# As máximas unidades espaciais que a tabela cobre. +# Exemplo: +# - sa.br +# +# - sa.br.sp +# +# - world +spatial_coverage: + - sa.br + +# Anos cobertos pela tabela. +# Exemplos: +# - 1995(1)2019 +# Caso a cobertura não seja contínua: +# - 2002(2)2010 +# - 2016 +# - 2020 +temporal_coverage: + - 2014-08(1)2022-12 + +# A unidade temporal com qual a tabela é atualizada. +# Opções em 'https://basedosdados.org/api/3/action/bd_available_options' +update_frequency: month +# # Observação: as operadoras podem enviar dados que afetam informações das competências passadas. +# # A cada divulgação atualizamos os dados dos últimos 5 anos. +# # Portanto, os dados atualizados de competências já publicadas podem estar diferentes do divulgado anteriormente. + +# Nível de observação da tabela: o que representa cada linha. +# A combinação das colunas aqui deve construir uma chave única da tabelaOpções de entity em 'https://basedosdados.org/api/3/action/bd_available_options' +# Caso a entidade seja espacial incluir a informação de 'country' Exemplos: +# - entity: year +# columns: +# - ano +# - country: br +# entity: state +# columns: +# - sigla_uf +observation_level: other + # O unico valor agregado é o numero de pessoas, todo o resto tá bem granularizado + +last_updated: + metadata: '2023-03-02' + data: '2022-10-11 17:33:30' + release: + +# Versão da tabela. Seguindo o padrão de semantic versioning. +# Exemplos: v1.0, v1.1.3 +version: v1.0 + +# Quem está preenchendo esses metadados? +published_by: + name: Guilherme Salustiano + email: guissalustiano@gmail.com + github_user: guissalustiano + ckan_user: + website: guissalustiano.dev + +# Qual organização/departamento/pessoa tratou os dados? +# As vezes há um ponto intermediário entre os dados originais e subir na Base dos Dados. +# Se essa pessoa é você, preencha abaixo com suas informações. +data_cleaned_by: + name: Guilherme Salustiano + email: guissalustiano@gmail.com + github_user: guissalustiano + ckan_user: + website: guissalustiano.dev + +# Se houve passos de tratamento, limpeza e manipulação de dados, descreva-os aqui. +data_cleaning_description: + A coluna NM_MUNICIPIO, DT_CARGA e ID_CMPT_MOVEL foi apagada, + as demais colunas foram renomeadas em lowercase e snake_case, + A coluna TP_VIGENCIA_PLANO foi expandida + +# Url do código de limpeza dos dados do github. +data_cleaning_code_url: https://github.com/guissalustiano/mais/blob/38200e878adab55b3b0aea1a626ce45b43619057/bases/br_ans_beneficiario/informacao_consolidada/code/pipeline.py#L86-L123 + +# Organização que ajudou institucionalmente na disponibilização dos dados. +partner_organization: + name: Guilherme Salustiano + organization_id: + +# Url dos dados originais no GCP Storage. +raw_files_url: https://console.cloud.google.com/storage/browser/basedosdados_stag_guissalustiano + +# Url dos arquivos auxiliares no GCP Storage. +auxiliary_files_url: + +# Url da tabela de arquitetura no GCP Storage. +architecture_url: https://docs.google.com/spreadsheets/d/11hTzsBQja-bpM9dac9NkmGvSI39xqiW7TC8amQUUvzc/edit#gid=0 + +source_bucket_name: basedosdados_stag_guissalustiano + +project_id_prod: basedosdados-379403 + +project_id_staging: basedosdados-staging-379403 + +# Liste as colunas da tabela que representam partições. +# Não esqueça de deletar essas colunas nas tabelas .csv na hora de subir para o BigQuery. +# Isso poupará muito tempo e dinheiro às pessoas utilizando essa tabela. +# Se não houver partições, não modifique abaixo. +partitions: + - ano + - mes + +# Quais são as colunas? Certifique-se de escrever uma boa descrição, as pessoas vão gostar +# para saber sobre o que é a coluna. +# Adicionar todas as colunas manualmente pode ser bastante cansativo, por isso, quando +# inicializando este arquivo de configuração, você pode apontar a função para uma amostra de dados que +# preencherá automaticamente as colunas. +# Além disso, você deve adicionar as colunas de partição aqui e definir is_partition como True. +columns: + - name: ano + bigquery_type: int64 + description: Ano da competência dos dados + temporal_coverage: + - (1) + covered_by_dictionary: no + directory_column: + dataset_id: br_bd_diretorios_data_tempo + table_id: ano + column_name: ano + measurement_unit: ano + has_sensitive_data: no + observations: + is_in_staging: true + is_partition: true + - name: mes + bigquery_type: INT64 + description: Mês da competência dos dados + temporal_coverage: + - (1) + covered_by_dictionary: no + directory_column: + dataset_id: br_bd_diretorios_data_tempo + table_id: mes + column_name: mes + measurement_unit: month + has_sensitive_data: no + observations: + is_in_staging: true + is_partition: true + - name: codigo_operadora + bigquery_type: string + description: Código de registro da operadora de plano de saúde na ANS + temporal_coverage: + - (1) + covered_by_dictionary: no + directory_column: + dataset_id: + table_id: + column_name: + measurement_unit: + has_sensitive_data: yes + observations: + is_in_staging: + is_partition: + - name: razao_social + bigquery_type: string + description: Razão Social da Operadora + temporal_coverage: + - (1) + covered_by_dictionary: no + directory_column: + dataset_id: + table_id: + column_name: + measurement_unit: + has_sensitive_data: no + observations: + is_in_staging: true + is_partition: false + - name: cnpj + bigquery_type: string + description: CNPJ da Operadora + temporal_coverage: + - (1) + covered_by_dictionary: no + directory_column: + dataset_id: + table_id: + column_name: + measurement_unit: + has_sensitive_data: no + observations: + is_in_staging: true + is_partition: false + - name: modalidade_operadora + bigquery_type: string' + description: Classificação das operadoras de planos privados de assistência à saúde de acordo com seu estatuto jurídico + temporal_coverage: + - (1) + covered_by_dictionary: no + directory_column: + dataset_id: + table_id: + column_name: + measurement_unit: + has_sensitive_data: no + observations: "'Administradora', 'Cooperativa Médica', 'Cooperativa odontológica', 'Autogestão', 'Medicina de Grupo', 'Odontologia de Grupo', 'Filantropia', 'Seguradora Especializada em Saúde', 'Seguradora' or 'Administradora de Benefícios'" + is_in_staging: true + is_partition: false + - name: sigla_uf + bigquery_type: string + description: Sigla da Unidade da Federação + temporal_coverage: + - (1) + covered_by_dictionary: no + directory_column: + dataset_id: br_bd_diretorios_brasil + table_id: uf + column_name: sigla + measurement_unit: + has_sensitive_data: no + observations: + is_in_staging: true + is_partition: false + - name: id_municipio_6 + bigquery_type: string + description: ID do Município IBGE - 6 Dígitos + temporal_coverage: + - (1) + covered_by_dictionary: no + directory_column: + dataset_id: br_bd_diretorios_brasil + table_id: municipio + column_name: id_municipio_6 + measurement_unit: + has_sensitive_data: no + observations: + is_in_staging: true + is_partition: false + - name: sexo + bigquery_type: string + description: Sexo + temporal_coverage: + - (1) + covered_by_dictionary: no + directory_column: + dataset_id: + table_id: + column_name: + measurement_unit: + has_sensitive_data: no + observations: "'M' or 'F'" + is_in_staging: true + is_partition: false + - name: faixa_etaria + bigquery_type: 'string' + description: 'Faixa etária do beneficiário' + temporal_coverage: + - (1) + covered_by_dictionary: no + directory_column: + dataset_id: + table_id: + column_name: + measurement_unit: age + has_sensitive_data: no + observations: + is_in_staging: true + is_partition: false + - name: faixa_etaria_reajuste + bigquery_type: string + description: Faixa etária do beneficiária utilizada para o reajuste do plano definida pela Lei 9.656 + temporal_coverage: + - (1) + covered_by_dictionary: no + directory_column: + dataset_id: + table_id: + column_name: + measurement_unit: age + has_sensitive_data: no + observations: + is_in_staging: true + is_partition: false + - name: codigo_plano + bigquery_type: string + description: Código do plano registrado ou cadastrado na ANS no qual o beneficiário possui vínculo + temporal_coverage: + - (1) + covered_by_dictionary: no + directory_column: + dataset_id: + table_id: + column_name: + measurement_unit: + has_sensitive_data: no + observations: + is_in_staging: true + is_partition: false + - name: tipo_vigencia_plano + bigquery_type: string + description: Início da vigência do plano para comercialização + temporal_coverage: + - (1) + covered_by_dictionary: no + directory_column: + dataset_id: + table_id: + column_name: + measurement_unit: + has_sensitive_data: no + observations: "'Anterior à Lei 9656/1998' or 'Posterior à Lei 9656/1998 ou planos adaptados à lei'" + is_in_staging: true + is_partition: false + - name: contratacao_beneficiario + bigquery_type: string + description: Tipo de contratação do plano do beneficiário + temporal_coverage: + - (1) + covered_by_dictionary: no + directory_column: + dataset_id: + table_id: + column_name: + measurement_unit: + has_sensitive_data: no + observations: "'Individual ou familiar', 'Coletivo empresarial', 'Coletivo por adesão', 'Individual ou Familiar + Coletivo Empresarial', 'Individual ou Familiar + Coletivo por Adesão', 'Coletivo Empresarial + Coletivo por Adesão' or 'Individual + Coletivo Empresarial + Coletivo por Adesão'" + is_in_staging: true + is_partition: false + - name: segmentacao_beneficiario + bigquery_type: string + description: Tipo de segmentação assistencial do plano do beneficiário + temporal_coverage: + - (1) + covered_by_dictionary: no + directory_column: + dataset_id: + table_id: + column_name: + measurement_unit: + has_sensitive_data: no + observations: "'Ambulatorial (Ambulatorial)', 'Hospitalar com obstetrícia (Hosp c/ obst)', 'Hospitalar sem obstetrícia (Hosp s/ obst)', 'Odontológico (Odontológico)', 'Referência (Amb + hosp c/ obst)', 'Ambulatorial + Hospitalar com obstetrícia (Amb + hosp c/ obst)', 'Ambulatorial + Hospitalar sem obstetrícia (Amb + hosp s/ obst)', 'Ambulatorial + Odontológico (Ambulatorial)', 'Hosp c/ obstetrícia + Hosp s/ obstetrícia (Hosp c/s obst)', 'Hospitalar com obstetrícia + Odontológico (Hosp c/ obst)', 'Hospitalar sem obstetrícia + Odontológico (Hosp s/ obst)', 'Amb + Hosp c/s Obstetrícia (Amb + hosp c/s obst)', 'Ambulatorial + Hospitalar com obstetrícia + Odontológico (Amb + hosp c/ obst)', 'Ambulatorial + Hospitalar sem obstetrícia + Odontológico (Amb + hosp s/ obst)' or 'Hosp c/s Obstetrícia + Odont (Hosp c/s obst)'" + is_in_staging: true + is_partition: false + - name: abrangencia_beneficiario + bigquery_type: string + description: Tipo de abrangência do plano do beneficiário + temporal_coverage: + - (1) + covered_by_dictionary: no + directory_column: + dataset_id: + table_id: + column_name: + measurement_unit: + has_sensitive_data: no + observations: "'Nacional', 'Grupo de estados', 'Estadual', 'Grupo de municípios', 'Municipal' or 'Outras'" + is_in_staging: true + is_partition: false + - name: cobertura_assistencia_beneficiario + bigquery_type: string + description: Tipo de cobertura de plano do beneficiário + temporal_coverage: + - (1) + covered_by_dictionary: no + directory_column: + dataset_id: + table_id: + column_name: + measurement_unit: + has_sensitive_data: no + observations: "'Médico-hospitalar com ou sem cobertura odontológica' or 'Exclusivamente Odontológica'" + is_in_staging: true + is_partition: false + - name: tipo_vinculo + bigquery_type: string + description: Tipo de vinculo do beneficiário + temporal_coverage: + - (1) + covered_by_dictionary: no + directory_column: + dataset_id: + table_id: + column_name: + measurement_unit: people + has_sensitive_data: no + observations: "'Titular', 'Depedente' or 'Nao Informado'" + is_in_staging: true + is_partition: false + - name: qtd_beneficiario_ativo + bigquery_type: int64 + description: Quantidade de beneficiários ativos na competência + temporal_coverage: + - (1) + covered_by_dictionary: no + directory_column: + dataset_id: + table_id: + column_name: + measurement_unit: people + has_sensitive_data: no + observations: + is_in_staging: true + is_partition: false + - name: qtd_beneficiario_aderido + bigquery_type: int64 + description: Quantidade de beneficiários aderidos na competência + temporal_coverage: + - (1) + covered_by_dictionary: no + directory_column: + dataset_id: + table_id: + column_name: + measurement_unit: people + has_sensitive_data: no + observations: + is_in_staging: true + is_partition: false + - name: qtd_beneficiario_cancelado + bigquery_type: int64 + description: Quantidade de beneficiários cancelados na competência + temporal_coverage: + - (1) + covered_by_dictionary: no + directory_column: + dataset_id: + table_id: + column_name: + measurement_unit: people + has_sensitive_data: no + observations: + is_in_staging: true + is_partition: false + +number_rows: + +metadata_modified: diff --git a/run.py b/run.py new file mode 100644 index 000000000..33637a634 --- /dev/null +++ b/run.py @@ -0,0 +1,11 @@ +import basedosdados as bd + +tb = bd.Table(dataset_id='br_ans_beneficiario', table_id='informacao_consolidada') +tb.create( + path='/home/guiss/projects/open_source/mais/bases/br_ans_beneficiario/informacao_consolidada/output', + force_dataset=False, + if_table_exists='replace', + if_storage_data_exists='pass', + if_table_config_exists='pass', + source_format='parquet' +) From 19eab3df99dea4cd767f687f5632015a0d77d461 Mon Sep 17 00:00:00 2001 From: Guilherme Salustiano Date: Thu, 2 Mar 2023 03:31:41 -0300 Subject: [PATCH 03/16] fix: wrong type when load from input --- .../br_ans_beneficiario/informacao_consolidada/code/pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bases/br_ans_beneficiario/informacao_consolidada/code/pipeline.py b/bases/br_ans_beneficiario/informacao_consolidada/code/pipeline.py index 9694b3ce2..8ea115e8f 100644 --- a/bases/br_ans_beneficiario/informacao_consolidada/code/pipeline.py +++ b/bases/br_ans_beneficiario/informacao_consolidada/code/pipeline.py @@ -139,7 +139,7 @@ def process(df: pd.DataFrame): if input_path.exists(): logger.debug(f"reading input in {input_path}") - state_df = pd.read_csv(input_path, encoding="utf-8") + state_df = pd.read_csv(input_path, encoding="utf-8", dtype=RAW_COLLUNS_TYPE) else: state_df = read_csv_zip_to_dataframe(state_path) input_path.parent.mkdir(parents=True, exist_ok=True) From 8f4f7fd357f9e3fc39f7b646e119b33f987d499f Mon Sep 17 00:00:00 2001 From: Guilherme Salustiano Date: Thu, 2 Mar 2023 16:43:18 -0300 Subject: [PATCH 04/16] remove .vscode --- .vscode/launch.json | 16 ---------------- 1 file changed, 16 deletions(-) delete mode 100644 .vscode/launch.json diff --git a/.vscode/launch.json b/.vscode/launch.json deleted file mode 100644 index 4aec051fb..000000000 --- a/.vscode/launch.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - // Use o IntelliSense para saber mais sobre os atributos possíveis. - // Focalizar para exibir as descrições dos atributos existentes. - // Para obter mais informações, acesse: https://go.microsoft.com/fwlink/?linkid=830387 - "version": "0.2.0", - "configurations": [ - { - "name": "Python: Current File", - "type": "python", - "request": "launch", - "program": "${file}", - "console": "integratedTerminal", - "justMyCode": false - } - ] -} \ No newline at end of file From f6e998b7ca40d3b77a8e5203abacf6e84d050c9b Mon Sep 17 00:00:00 2001 From: Guilherme Salustiano Date: Thu, 2 Mar 2023 16:46:03 -0300 Subject: [PATCH 05/16] feat: remove run --- run.py | 11 ----------- 1 file changed, 11 deletions(-) delete mode 100644 run.py diff --git a/run.py b/run.py deleted file mode 100644 index 33637a634..000000000 --- a/run.py +++ /dev/null @@ -1,11 +0,0 @@ -import basedosdados as bd - -tb = bd.Table(dataset_id='br_ans_beneficiario', table_id='informacao_consolidada') -tb.create( - path='/home/guiss/projects/open_source/mais/bases/br_ans_beneficiario/informacao_consolidada/output', - force_dataset=False, - if_table_exists='replace', - if_storage_data_exists='pass', - if_table_config_exists='pass', - source_format='parquet' -) From 1e94764d0db01e4425017a211cfb1b65b5b28f0d Mon Sep 17 00:00:00 2001 From: Guilherme Salustiano Date: Fri, 3 Mar 2023 09:28:40 -0300 Subject: [PATCH 06/16] change observation_level to municipality --- .../informacao_consolidada/table_config.yaml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml b/bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml index 04ce171a1..c3f9b5003 100644 --- a/bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml +++ b/bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml @@ -51,8 +51,7 @@ update_frequency: month # entity: state # columns: # - sigla_uf -observation_level: other - # O unico valor agregado é o numero de pessoas, todo o resto tá bem granularizado +observation_level: municipality last_updated: metadata: '2023-03-02' From 6e0c002f6ab418fb81ef7c89b1b0add2669fcb05 Mon Sep 17 00:00:00 2001 From: Guilherme Salustiano Date: Fri, 3 Mar 2023 09:31:50 -0300 Subject: [PATCH 07/16] one line cleaning description --- .../informacao_consolidada/table_config.yaml | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml b/bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml index c3f9b5003..b3e4dd3de 100644 --- a/bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml +++ b/bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml @@ -81,10 +81,7 @@ data_cleaned_by: website: guissalustiano.dev # Se houve passos de tratamento, limpeza e manipulação de dados, descreva-os aqui. -data_cleaning_description: - A coluna NM_MUNICIPIO, DT_CARGA e ID_CMPT_MOVEL foi apagada, - as demais colunas foram renomeadas em lowercase e snake_case, - A coluna TP_VIGENCIA_PLANO foi expandida +data_cleaning_description: A coluna NM_MUNICIPIO, DT_CARGA e ID_CMPT_MOVEL foi apagada, as demais colunas foram renomeadas em lowercase e snake_case, A coluna TP_VIGENCIA_PLANO foi expandida # Url do código de limpeza dos dados do github. data_cleaning_code_url: https://github.com/guissalustiano/mais/blob/38200e878adab55b3b0aea1a626ce45b43619057/bases/br_ans_beneficiario/informacao_consolidada/code/pipeline.py#L86-L123 From 87b66c14198820e19d9808ea3617ffe3fc87b3c5 Mon Sep 17 00:00:00 2001 From: Guilherme Salustiano Date: Fri, 3 Mar 2023 09:32:51 -0300 Subject: [PATCH 08/16] add release --- .../informacao_consolidada/table_config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml b/bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml index b3e4dd3de..352186ea7 100644 --- a/bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml +++ b/bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml @@ -56,7 +56,7 @@ observation_level: municipality last_updated: metadata: '2023-03-02' data: '2022-10-11 17:33:30' - release: + release: '2022-10-11 17:33:30' # Versão da tabela. Seguindo o padrão de semantic versioning. # Exemplos: v1.0, v1.1.3 From f988e9f3a3a7874a13fad1ccb43e1808f07275a8 Mon Sep 17 00:00:00 2001 From: Guilherme Salustiano Date: Mon, 6 Mar 2023 12:44:07 -0300 Subject: [PATCH 09/16] add row number --- .../informacao_consolidada/table_config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml b/bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml index 352186ea7..17c990074 100644 --- a/bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml +++ b/bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml @@ -437,6 +437,6 @@ columns: is_in_staging: true is_partition: false -number_rows: +number_rows: 1476900777 metadata_modified: From ed23b175abd76fc3ce4bfde8c8e67e6e1468be1d Mon Sep 17 00:00:00 2001 From: Guilherme Salustiano Date: Mon, 6 Mar 2023 12:44:53 -0300 Subject: [PATCH 10/16] update code range --- .../informacao_consolidada/code/pipeline.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/bases/br_ans_beneficiario/informacao_consolidada/code/pipeline.py b/bases/br_ans_beneficiario/informacao_consolidada/code/pipeline.py index 8ea115e8f..9d76a682c 100644 --- a/bases/br_ans_beneficiario/informacao_consolidada/code/pipeline.py +++ b/bases/br_ans_beneficiario/informacao_consolidada/code/pipeline.py @@ -13,6 +13,7 @@ from functools import reduce host = FTPHost('ftp.dadosabertos.ans.gov.br', 'anonymous') +host.keep_alive() TABLE_NAME = 'informacoes_consolidadas_de_beneficiarios' FTP_PATH = 'FTP/PDA/informacoes_consolidadas_de_beneficiarios' @@ -51,7 +52,7 @@ def range_year_month(start: datetime, stop = datetime.now(), step = relativedelt yield from range_year_month(start + step, stop) def host_months_path(): - for date in range_year_month(datetime(day=1, month=5, year=2014)): + for date in range_year_month(datetime(2014, 5, 1), datetime(2022, 12, 31)): month_path = date.strftime("%Y%m") yield FTP_PATH + '/' + month_path, date @@ -113,8 +114,7 @@ def process(df: pd.DataFrame): 'QT_BENEFICIARIO_CANCELADO': 'qtd_beneficiario_cancelado' }, inplace=True) - # df['cnpj'] = df['cnpj'].str.zfill(14) - # df['cnpj'] = df['cnpj'].str.zfill(14) + df['cnpj'] = df['cnpj'].str.zfill(14) # Using parquet, don't need external dictionary df['tipo_vigencia_plano'].replace({ @@ -141,6 +141,11 @@ def process(df: pd.DataFrame): logger.debug(f"reading input in {input_path}") state_df = pd.read_csv(input_path, encoding="utf-8", dtype=RAW_COLLUNS_TYPE) else: + # Wtf, pq tem um repositorio aqui? + # https://dadosabertos.ans.gov.br/FTP/PDA/informacoes_consolidadas_de_beneficiarios/201602/201607/ + if not host.path.isfile(state_path): + continue + state_df = read_csv_zip_to_dataframe(state_path) input_path.parent.mkdir(parents=True, exist_ok=True) state_df.to_csv(input_path, encoding="utf-8") From 8d3f1f85e9944f3f2cfe72c033f28f2d2b966e1d Mon Sep 17 00:00:00 2001 From: Guilherme Salustiano Date: Tue, 28 Mar 2023 09:39:55 -0300 Subject: [PATCH 11/16] code partition by state --- .../informacao_consolidada/code/pipeline.py | 40 +++++++++---------- .../informacao_consolidada/table_config.yaml | 2 +- 2 files changed, 19 insertions(+), 23 deletions(-) diff --git a/bases/br_ans_beneficiario/informacao_consolidada/code/pipeline.py b/bases/br_ans_beneficiario/informacao_consolidada/code/pipeline.py index 9d76a682c..f7e235ae9 100644 --- a/bases/br_ans_beneficiario/informacao_consolidada/code/pipeline.py +++ b/bases/br_ans_beneficiario/informacao_consolidada/code/pipeline.py @@ -126,42 +126,38 @@ def process(df: pd.DataFrame): if __name__ == '__main__': for month_path, month_date in host_months_path(): - output_path = Path('../output') / f'ano={month_date.strftime("%Y")}' / f'mes={month_date.strftime("%m")}' / f'ben{month_date.strftime("%Y%m")}.parquet' - - if output_path.exists(): - logger.info(f"Jumping path {output_path}. Already download") - continue - - dfs = [] for state_path in host_list(month_path): state = state_path.split("_")[-1].split(".")[0] + + output_path = Path('../output') / f'ano={month_date.strftime("%Y")}' / f'mes={month_date.strftime("%m")}' / f'state={state}' / f'ben{month_date.strftime("%Y%m")}.parquet' + + if output_path.exists(): + logger.info(f"Jumping path {output_path}. Already download") + continue + input_path = Path('../input') / f'ano={month_date.strftime("%Y")}' / f'mes={month_date.strftime("%m")}' / f'ben{month_date.strftime("%Y%m")}_{state}.csv' if input_path.exists(): logger.debug(f"reading input in {input_path}") - state_df = pd.read_csv(input_path, encoding="utf-8", dtype=RAW_COLLUNS_TYPE) + df = pd.read_csv(input_path, encoding="utf-8", dtype=RAW_COLLUNS_TYPE) else: # Wtf, pq tem um repositorio aqui? # https://dadosabertos.ans.gov.br/FTP/PDA/informacoes_consolidadas_de_beneficiarios/201602/201607/ if not host.path.isfile(state_path): continue - state_df = read_csv_zip_to_dataframe(state_path) + df = read_csv_zip_to_dataframe(state_path) input_path.parent.mkdir(parents=True, exist_ok=True) - state_df.to_csv(input_path, encoding="utf-8") - dfs.append(state_df) - - logger.info("Concat states dataframes") - df = pd.concat(dfs) + df.to_csv(input_path, encoding="utf-8") - logger.info("Cleaning dataset") - df = process(df) + logger.info("Cleaning dataset") + df = process(df) - output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.parent.mkdir(parents=True, exist_ok=True) - # delete partition columns - del df['ano'] - del df['mes'] + # delete partition columns + del df['ano'] + del df['mes'] - logger.info(f"Writing to output {output_path.as_posix()}") - df.to_parquet(output_path) + logger.info(f"Writing to output {output_path.as_posix()}") + df.to_parquet(output_path) diff --git a/bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml b/bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml index 17c990074..b9f642886 100644 --- a/bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml +++ b/bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml @@ -437,6 +437,6 @@ columns: is_in_staging: true is_partition: false -number_rows: 1476900777 +number_rows: 1490467876 metadata_modified: From b6d2604cc08a9df5eba934f7f9b8311ca255cf2f Mon Sep 17 00:00:00 2001 From: Guilherme Salustiano Date: Sun, 2 Apr 2023 14:40:35 -0300 Subject: [PATCH 12/16] remove index and fix uf partition name --- .../informacao_consolidada/code/pipeline.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bases/br_ans_beneficiario/informacao_consolidada/code/pipeline.py b/bases/br_ans_beneficiario/informacao_consolidada/code/pipeline.py index f7e235ae9..c44682fad 100644 --- a/bases/br_ans_beneficiario/informacao_consolidada/code/pipeline.py +++ b/bases/br_ans_beneficiario/informacao_consolidada/code/pipeline.py @@ -129,7 +129,7 @@ def process(df: pd.DataFrame): for state_path in host_list(month_path): state = state_path.split("_")[-1].split(".")[0] - output_path = Path('../output') / f'ano={month_date.strftime("%Y")}' / f'mes={month_date.strftime("%m")}' / f'state={state}' / f'ben{month_date.strftime("%Y%m")}.parquet' + output_path = Path('../output') / f'ano={month_date.strftime("%Y")}' / f'mes={month_date.strftime("%m")}' / f'sigla_uf={state}' / f'ben{month_date.strftime("%Y%m")}-{state}.parquet' if output_path.exists(): logger.info(f"Jumping path {output_path}. Already download") @@ -160,4 +160,4 @@ def process(df: pd.DataFrame): del df['mes'] logger.info(f"Writing to output {output_path.as_posix()}") - df.to_parquet(output_path) + df.to_parquet(output_path, index=False) From 97dd160235883cb07da5e3e3fef05cfe839e9dfb Mon Sep 17 00:00:00 2001 From: Guilherme Salustiano Date: Sun, 2 Apr 2023 14:40:59 -0300 Subject: [PATCH 13/16] add observetion about uf --- .../informacao_consolidada/table_config.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml b/bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml index b9f642886..cec4276da 100644 --- a/bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml +++ b/bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml @@ -223,7 +223,7 @@ columns: column_name: sigla measurement_unit: has_sensitive_data: no - observations: + observations: Os dados originais anonimizados pela ANS fornecem informações onde a sigla_uf="XX" is_in_staging: true is_partition: false - name: id_municipio_6 @@ -238,7 +238,7 @@ columns: column_name: id_municipio_6 measurement_unit: has_sensitive_data: no - observations: + observations: Os dados originais anonimizados pela ANS fornecem informações onde o id_municipio_6="-1" is_in_staging: true is_partition: false - name: sexo From efc497d90bd11b1ee4c9ec41964a5b09f3d39ddd Mon Sep 17 00:00:00 2001 From: Guilherme Salustiano Date: Sun, 16 Apr 2023 22:17:09 -0300 Subject: [PATCH 14/16] fix pr comments --- .../informacao_consolidada/code/pipeline.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/bases/br_ans_beneficiario/informacao_consolidada/code/pipeline.py b/bases/br_ans_beneficiario/informacao_consolidada/code/pipeline.py index c44682fad..fec903500 100644 --- a/bases/br_ans_beneficiario/informacao_consolidada/code/pipeline.py +++ b/bases/br_ans_beneficiario/informacao_consolidada/code/pipeline.py @@ -109,9 +109,9 @@ def process(df: pd.DataFrame): 'DE_ABRG_GEOGRAFICA_PLANO': 'abrangencia_beneficiario', 'COBERTURA_ASSIST_PLAN': 'cobertura_assistencia_beneficiario', 'TIPO_VINCULO': 'tipo_vinculo', - 'QT_BENEFICIARIO_ATIVO': 'qtd_beneficiario_ativo', - 'QT_BENEFICIARIO_ADERIDO': 'qtd_beneficiario_aderido', - 'QT_BENEFICIARIO_CANCELADO': 'qtd_beneficiario_cancelado' + 'QT_BENEFICIARIO_ATIVO': 'quantidade_beneficiario_ativo', + 'QT_BENEFICIARIO_ADERIDO': 'quantidade_beneficiario_aderido', + 'QT_BENEFICIARIO_CANCELADO': 'quantidade_beneficiario_cancelado' }, inplace=True) df['cnpj'] = df['cnpj'].str.zfill(14) @@ -139,7 +139,7 @@ def process(df: pd.DataFrame): if input_path.exists(): logger.debug(f"reading input in {input_path}") - df = pd.read_csv(input_path, encoding="utf-8", dtype=RAW_COLLUNS_TYPE) + df = pd.read_csv(input_path, encoding="utf-8", dtype=RAW_COLLUNS_TYPE, index_col=0) else: # Wtf, pq tem um repositorio aqui? # https://dadosabertos.ans.gov.br/FTP/PDA/informacoes_consolidadas_de_beneficiarios/201602/201607/ @@ -158,6 +158,7 @@ def process(df: pd.DataFrame): # delete partition columns del df['ano'] del df['mes'] + del df['sigla_uf'] logger.info(f"Writing to output {output_path.as_posix()}") df.to_parquet(output_path, index=False) From c572990b074d8f9999f71e31cd581ccfae419f03 Mon Sep 17 00:00:00 2001 From: Guilherme Salustiano Date: Wed, 19 Apr 2023 14:15:14 -0300 Subject: [PATCH 15/16] fix table config qtd to quantidade --- .../informacao_consolidada/publish.sql | 10 +++++----- .../informacao_consolidada/table_config.yaml | 6 +++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/bases/br_ans_beneficiario/informacao_consolidada/publish.sql b/bases/br_ans_beneficiario/informacao_consolidada/publish.sql index eac22da73..c35927e39 100644 --- a/bases/br_ans_beneficiario/informacao_consolidada/publish.sql +++ b/bases/br_ans_beneficiario/informacao_consolidada/publish.sql @@ -17,7 +17,7 @@ TIPOS: - Mais detalhes: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types */ -CREATE VIEW basedosdados-379403.br_ans_beneficiario.informacao_consolidada AS +CREATE VIEW basedosdados.br_ans_beneficiario.informacao_consolidada AS SELECT SAFE_CAST(ano AS INT64) ano, SAFE_CAST(mes AS INT64) mes, @@ -37,7 +37,7 @@ SAFE_CAST(segmentacao_beneficiario AS STRING) segmentacao_beneficiario, SAFE_CAST(abrangencia_beneficiario AS STRING) abrangencia_beneficiario, SAFE_CAST(cobertura_assistencia_beneficiario AS STRING) cobertura_assistencia_beneficiario, SAFE_CAST(tipo_vinculo AS STRING) tipo_vinculo, -SAFE_CAST(qtd_beneficiario_ativo AS INT64) qtd_beneficiario_ativo, -SAFE_CAST(qtd_beneficiario_aderido AS INT64) qtd_beneficiario_aderido, -SAFE_CAST(qtd_beneficiario_cancelado AS INT64) qtd_beneficiario_cancelado -FROM basedosdados-staging-379403.br_ans_beneficiario_staging.informacao_consolidada AS t +SAFE_CAST(quantidade_beneficiario_ativo AS INT64) quantidade_beneficiario_ativo, +SAFE_CAST(quantidade_beneficiario_aderido AS INT64) quantidade_beneficiario_aderido, +SAFE_CAST(quantidade_beneficiario_cancelado AS INT64) quantidade_beneficiario_cancelado +FROM basedosdados-dev.br_ans_beneficiario_staging.informacao_consolidada AS t diff --git a/bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml b/bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml index cec4276da..0cb98733d 100644 --- a/bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml +++ b/bases/br_ans_beneficiario/informacao_consolidada/table_config.yaml @@ -391,7 +391,7 @@ columns: observations: "'Titular', 'Depedente' or 'Nao Informado'" is_in_staging: true is_partition: false - - name: qtd_beneficiario_ativo + - name: quantidade_beneficiario_ativo bigquery_type: int64 description: Quantidade de beneficiários ativos na competência temporal_coverage: @@ -406,7 +406,7 @@ columns: observations: is_in_staging: true is_partition: false - - name: qtd_beneficiario_aderido + - name: quantidade_beneficiario_aderido bigquery_type: int64 description: Quantidade de beneficiários aderidos na competência temporal_coverage: @@ -421,7 +421,7 @@ columns: observations: is_in_staging: true is_partition: false - - name: qtd_beneficiario_cancelado + - name: quantidade_beneficiario_cancelado bigquery_type: int64 description: Quantidade de beneficiários cancelados na competência temporal_coverage: From d1f4ac88f163485b6a8c7cd8c22b72b86e437117 Mon Sep 17 00:00:00 2001 From: Guilherme Salustiano Date: Fri, 28 Apr 2023 16:25:40 -0300 Subject: [PATCH 16/16] feat: add count lines script --- .../informacao_consolidada/code/count_lines.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 bases/br_ans_beneficiario/informacao_consolidada/code/count_lines.py diff --git a/bases/br_ans_beneficiario/informacao_consolidada/code/count_lines.py b/bases/br_ans_beneficiario/informacao_consolidada/code/count_lines.py new file mode 100644 index 000000000..f8e6a9331 --- /dev/null +++ b/bases/br_ans_beneficiario/informacao_consolidada/code/count_lines.py @@ -0,0 +1,15 @@ +from glob import glob +import pandas as pd +from loguru import logger + +csv_filepaths = glob('../input/**/*.csv', recursive=True) + +count = 0 +for filepath in csv_filepaths: + logger.info(f"reading {filepath}") + df = pd.read_csv(filepath, encoding="utf-8", index_col=0) + size = len(df.index) + logger.debug(f"{filepath} has {size}") + count += size + +logger.info(f"Total: {count}")