Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[infra] Adiciona suporte a Avro e Parquet (cont.) #1145

Merged
merged 4 commits into from
Mar 11, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions python-package/basedosdados/upload/datatypes.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from google.cloud import bigquery
import csv
import pandas as pd
import pandavro


class Datatype:
Expand All @@ -19,11 +21,19 @@ def __init__(
def header(self, data_sample_path):

if self.source_format == "csv":

return next(csv.reader(open(data_sample_path, "r", encoding="utf-8")))

elif self.source_format == "avro":
dataframe = pandavro.read_avro(str(data_sample_path))
return list(dataframe.columns.values)

elif self.source_format == "parquet":
dataframe = pd.read_parquet(str(data_sample_path))
return list(dataframe.columns.values)

else:
raise NotImplementedError(
"Base dos Dados just supports comma separated csv files"
"Base dos Dados just supports comma separated csv, avro and parquet files"
)

def partition(self):
Expand All @@ -48,12 +58,18 @@ def external_config(self):
_external_config.autodetect = False
_external_config.schema = self.table_obj._load_schema(self.mode)

# You can add new formats here
elif self.source_format == "avro":

_external_config = bigquery.ExternalConfig("AVRO")

elif self.source_format == "parquet":

_external_config = bigquery.ExternalConfig("PARQUET")

else:

raise NotImplementedError(
"Base dos Dados just supports comma separated csv files"
"Base dos Dados just supports comma separated csv, avro and parquet files"
)

_external_config.source_uris = f"gs://{self.table_obj.bucket_name}/staging/{self.table_obj.dataset_id}/{self.table_obj.table_id}/*"
Expand Down
21 changes: 12 additions & 9 deletions python-package/basedosdados/upload/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,7 @@ def _get_table_obj(self, mode):
def _is_partitioned(self):
## check if the table are partitioned, need the split because of a change in the type of partitions in pydantic
partitions = self.table_config["partitions"]
if partitions:
partitions = partitions.split(",")

if partitions is None:
if not partitions:
return False

elif isinstance(partitions, list):
Expand Down Expand Up @@ -417,7 +414,8 @@ def init(
Args:
data_sample_path (str, pathlib.PosixPath): Optional.
Data sample path to auto complete columns names
It supports Comma Delimited CSV.
It supports Comma Delimited CSV, Apache Avro and
Apache Parquet.
if_folder_exists (str): Optional.
What to do if table folder exists

Expand All @@ -431,7 +429,8 @@ def init(
* 'replace' : Replace files with blank template
* 'pass' : Do nothing
source_format (str): Optional
Data source format. Only 'csv' is supported. Defaults to 'csv'.
Data source format. Only 'csv', 'avro' and 'parquet'
are supported. Defaults to 'csv'.

columns_config_url (str): google sheets URL.
The URL must be in the format https://docs.google.com/spreadsheets/d/<table_key>/edit#gid=<table_gid>.
Expand Down Expand Up @@ -479,7 +478,7 @@ def init(
data_sample_path = [
f
for f in data_sample_path.glob("**/*")
if f.is_file() and f.suffix == ".csv"
if f.is_file() and f.suffix == f".{source_format}"
][0]

partition_columns = [
Expand Down Expand Up @@ -558,6 +557,8 @@ def create(
It currently supports the types:

- Comma Delimited CSV
- Apache Avro
- Apache Parquet

Data can also be partitioned following the hive partitioning scheme
`<key1>=<value1>/<key2>=<value2>` - for instance,
Expand Down Expand Up @@ -588,7 +589,8 @@ def create(
* 'replace' : Replace table
* 'pass' : Do nothing
source_format (str): Optional
Data source format. Only 'csv' is supported. Defaults to 'csv'.
Data source format. Only 'csv', 'avro' and 'parquet'
are supported. Defaults to 'csv'.

columns_config_url (str): google sheets URL.
The URL must be in the format https://docs.google.com/spreadsheets/d/<table_key>/edit#gid=<table_gid>.
Expand Down Expand Up @@ -639,6 +641,7 @@ def create(
if_folder_exists="replace",
if_table_config_exists=if_table_config_exists,
columns_config_url=columns_config_url,
source_format=source_format,
)

table = bigquery.Table(self.table_full_name["staging"])
Expand Down Expand Up @@ -675,9 +678,9 @@ def create(

self.client["bigquery_staging"].create_table(table)


def update(self, mode="all", not_found_ok=True):
"""Updates BigQuery schema and description.

Args:
mode (str): Optional.
Table of which table to update [prod|staging|all]
Expand Down
Loading