diff --git a/README.md b/README.md index c1a53f13..f8fcb32f 100644 --- a/README.md +++ b/README.md @@ -337,6 +337,7 @@ var jsonSchema string = ` * Parquet-go reads data as an object in Golang and every field must be a public field, which start with an upper letter. This field name we call it `InName`. Field name in parquet file we call it `ExName`. Function `common.HeadToUpper` converts `ExName` to `InName`. There are some restriction: 1. It's not allowed if two field names are only different at their first letter case. Such as `name` and `Name`. 2. `PARGO_PREFIX_` is a reserved string, which you'd better not use it as a name prefix. ([#294](https://github.com/xitongsys/parquet-go/issues/294)) +3. Use `\x01` as the delimiter of fields to support `.` in some field name.([dot_in_name.go](https://github.com/xitongsys/parquet-go/blob/master/example/dot_in_name.go), [#349](https://github.com/xitongsys/parquet-go/issues/349)) ## Concurrency @@ -368,6 +369,7 @@ func NewCSVWriter(md []string, pfile ParquetFile.ParquetFile, np int64) (*CSVWri |[type_alias.go](https://github.com/xitongsys/parquet-go/blob/master/example/type_alias.go)|example for type alias| |[writer.go](https://github.com/xitongsys/parquet-go/blob/master/example/writer.go)|create ParquetWriter from io.Writer| |[keyvalue_metadata.go](https://github.com/xitongsys/parquet-go/blob/master/example/keyvalue_metadata.go)|write keyvalue metadata| +|[dot_in_name.go](https://github.com/xitongsys/parquet-go/blob/master/example/dot_in_name.go)|`.` in filed name| diff --git a/common/common.go b/common/common.go index 43a3ea2a..a4d83b04 100644 --- a/common/common.go +++ b/common/common.go @@ -915,17 +915,24 @@ func SizeOf(val reflect.Value) int64 { return 4 } +const PAR_GO_PATH_DELIMITER = "\x01" + +// . -> \x01 +func ReformPathStr(pathStr string) string { + return strings.ReplaceAll(pathStr, ".", "\x01") +} + //Convert path slice to string func PathToStr(path []string) string { - return strings.Join(path, ".") + return strings.Join(path, PAR_GO_PATH_DELIMITER) } //Convert string to path slice func StrToPath(str string) []string { - return strings.Split(str, ".") + return strings.Split(str, PAR_GO_PATH_DELIMITER) } //Get the pathStr index in a path func PathStrIndex(str string) int { - return len(strings.Split(str, ".")) + return len(strings.Split(str, PAR_GO_PATH_DELIMITER)) } diff --git a/example/column_read.go b/example/column_read.go index 47a1d17d..35651167 100644 --- a/example/column_read.go +++ b/example/column_read.go @@ -5,6 +5,7 @@ import ( "time" "github.com/xitongsys/parquet-go-source/local" + "github.com/xitongsys/parquet-go/common" "github.com/xitongsys/parquet-go/reader" "github.com/xitongsys/parquet-go/writer" ) @@ -72,15 +73,15 @@ func main() { } num = int64(pr.GetNumRows()) - pr.SkipRowsByPath("parquet_go_root.name", 5) //skip the first five rows - names, rls, dls, err = pr.ReadColumnByPath("parquet_go_root.name", num) + pr.SkipRowsByPath(common.ReformPathStr("parquet_go_root.name"), 5) //skip the first five rows + names, rls, dls, err = pr.ReadColumnByPath(common.ReformPathStr("parquet_go_root.name"), num) log.Println("name", names, rls, dls, err) - classes, rls, dls, err = pr.ReadColumnByPath("parquet_go_root.class.list.element", num) + classes, rls, dls, err = pr.ReadColumnByPath(common.ReformPathStr("parquet_go_root.class.list.element"), num) log.Println("class", classes, rls, dls, err) - scores_key, rls, dls, err = pr.ReadColumnByPath("parquet_go_root.score.key_value.key", num) - scores_value, rls, dls, err = pr.ReadColumnByPath("parquet_go_root.score.key_value.value", num) + scores_key, rls, dls, err = pr.ReadColumnByPath(common.ReformPathStr("parquet_go_root.score.key_value.key"), num) + scores_value, rls, dls, err = pr.ReadColumnByPath(common.ReformPathStr("parquet_go_root.score.key_value.value"), num) log.Println("parquet_go_root.scores_key", scores_key, err) log.Println("parquet_go_root.scores_value", scores_value, err) diff --git a/example/dot_in_name.go b/example/dot_in_name.go new file mode 100644 index 00000000..10c492cd --- /dev/null +++ b/example/dot_in_name.go @@ -0,0 +1,103 @@ +package main + +import ( + "log" + + "github.com/xitongsys/parquet-go-source/local" + "github.com/xitongsys/parquet-go/parquet" + "github.com/xitongsys/parquet-go/reader" + "github.com/xitongsys/parquet-go/writer" +) + +type A struct { + V1 int32 `parquet:"name=b.c, type=INT32, encoding=PLAIN"` + V2 B `parquet:"name=b"` + V3 int32 `parquet:"name=c, type=INT32, encoding=PLAIN"` +} + +type B struct { + C int32 `parquet:"name=c, type=INT32, encoding=PLAIN"` +} + +func main() { + var err error + fw, err := local.NewLocalFileWriter("a.parquet") + if err != nil { + log.Println("Can't create local file", err) + return + } + + //write + pw, err := writer.NewParquetWriter(fw, new(A), 4) + if err != nil { + log.Println("Can't create parquet writer", err) + return + } + + pw.RowGroupSize = 128 * 1024 * 1024 //128M + pw.PageSize = 8 * 1024 //8K + pw.CompressionType = parquet.CompressionCodec_SNAPPY + num := 10 + for i := 0; i < num; i++ { + o := A{ + V1: 1, + V2: B{ + C: 2, + }, + V3: 3, + } + if err = pw.Write(o); err != nil { + log.Println("Write error", err) + } + } + if err = pw.WriteStop(); err != nil { + log.Println("WriteStop error", err) + return + } + log.Println("Write Finished") + fw.Close() + + ///read all + fr, err := local.NewLocalFileReader("a.parquet") + if err != nil { + log.Println("Can't open file") + return + } + + pr, err := reader.NewParquetReader(fr, new(A), 4) + if err != nil { + log.Println("Can't create parquet reader", err) + return + } + num = int(pr.GetNumRows()) + os := make([]A, num) + + if err = pr.Read(&os); err != nil { + log.Println("Read error", err) + } + log.Println(os) + + pr.ReadStop() + fr.Close() + + ///read column by path + fr, err = local.NewLocalFileReader("a.parquet") + if err != nil { + log.Println("Can't open file") + return + } + + pr, err = reader.NewParquetReader(fr, new(A), 4) + if err != nil { + log.Println("Can't create parquet reader", err) + return + } + cn := pr.GetNumRows() + v1, _, _, _ := pr.ReadColumnByPath("parquet_go_root\x01b.c",cn) + v2, _, _, _ := pr.ReadColumnByPath("parquet_go_root\x01b\x01c",cn) + v3, _, _, _ := pr.ReadColumnByPath("parquet_go_root\x01c", cn) + log.Println(v1,v2,v3) + + pr.ReadStop() + fr.Close() +} diff --git a/example/read_partial2.go b/example/read_partial2.go index bbd8a624..25db8d63 100644 --- a/example/read_partial2.go +++ b/example/read_partial2.go @@ -5,9 +5,10 @@ import ( "time" "github.com/xitongsys/parquet-go-source/local" + "github.com/xitongsys/parquet-go/common" + "github.com/xitongsys/parquet-go/parquet" "github.com/xitongsys/parquet-go/reader" "github.com/xitongsys/parquet-go/writer" - "github.com/xitongsys/parquet-go/parquet" ) type Student struct { @@ -80,7 +81,7 @@ func main() { num = int(pr.GetNumRows()) //only read scores scores := make([]map[string]int32, num) - pr.ReadPartial(&scores, "parquet_go_root.scores") + pr.ReadPartial(&scores, common.ReformPathStr("parquet_go_root.scores")) log.Println(scores) pr.ReadStop() diff --git a/example/read_partial_without_schema_predefined.go b/example/read_partial_without_schema_predefined.go index b54c5527..c6555617 100644 --- a/example/read_partial_without_schema_predefined.go +++ b/example/read_partial_without_schema_predefined.go @@ -5,9 +5,10 @@ import ( "time" "github.com/xitongsys/parquet-go-source/local" + "github.com/xitongsys/parquet-go/common" + "github.com/xitongsys/parquet-go/parquet" "github.com/xitongsys/parquet-go/reader" "github.com/xitongsys/parquet-go/writer" - "github.com/xitongsys/parquet-go/parquet" ) type Student struct { @@ -79,7 +80,7 @@ func main() { num = int(pr.GetNumRows()) //only read scores - res, err := pr.ReadPartialByNumber(num, "parquet_go_root.scores") + res, err := pr.ReadPartialByNumber(num, common.ReformPathStr("parquet_go_root.scores")) if err != nil { log.Println("Can't read", err) return diff --git a/layout/page.go b/layout/page.go index daceddd7..3e1076a3 100644 --- a/layout/page.go +++ b/layout/page.go @@ -6,7 +6,6 @@ import ( "fmt" "io" "math/bits" - "strings" "github.com/apache/thrift/lib/go/thrift" "github.com/xitongsys/parquet-go/common" @@ -510,7 +509,7 @@ func (self *Page) GetRLDLFromRawData(schemaHandler *schema.SchemaHandler) (int64 table := new(Table) table.Path = self.Path - name := strings.Join(self.Path, ".") + name := common.PathToStr(self.Path) table.RepetitionType = schemaHandler.SchemaElements[schemaHandler.MapIndex[name]].GetRepetitionType() table.MaxRepetitionLevel = maxRepetitionLevel table.MaxDefinitionLevel = maxDefinitionLevel @@ -576,7 +575,7 @@ func (self *Page) GetValueFromRawData(schemaHandler *schema.SchemaHandler) error numNulls++ } } - name := strings.Join(self.DataTable.Path, ".") + name := common.PathToStr(self.DataTable.Path) var values []interface{} var ct parquet.ConvertedType = -1 if schemaHandler.SchemaElements[schemaHandler.MapIndex[name]].IsSetConvertedType() { @@ -768,7 +767,7 @@ func ReadPage(thriftReader *thrift.TBufferedTransport, schemaHandler *schema.Sch path := make([]string, 0) path = append(path, schemaHandler.GetRootInName()) path = append(path, colMetaData.GetPathInSchema()...) - name := strings.Join(path, ".") + name := common.PathToStr(path) if pageHeader.GetType() == parquet.PageType_DICTIONARY_PAGE { page = NewDictPage() diff --git a/marshal/csv.go b/marshal/csv.go index aeeff1a3..611df5db 100644 --- a/marshal/csv.go +++ b/marshal/csv.go @@ -15,7 +15,7 @@ func MarshalCSV(records []interface{}, schemaHandler *schema.SchemaHandler) (*ma } for i := 0; i < len(records[0].([]interface{})); i++ { - pathStr := schemaHandler.GetRootInName() + "." + schemaHandler.Infos[i+1].InName + pathStr := schemaHandler.GetRootInName() + common.PAR_GO_PATH_DELIMITER + schemaHandler.Infos[i+1].InName table := layout.NewEmptyTable() res[pathStr] = table table.Path = common.StrToPath(pathStr) diff --git a/marshal/json.go b/marshal/json.go index 3c94493f..a294d72e 100644 --- a/marshal/json.go +++ b/marshal/json.go @@ -86,11 +86,11 @@ func MarshalJSON(ss []interface{}, schemaHandler *schema.SchemaHandler) (tb *map keys := node.Val.MapKeys() if schema.GetConvertedType() == parquet.ConvertedType_MAP { //real map - pathStr = pathStr + ".Key_value" + pathStr = pathStr + common.PAR_GO_PATH_DELIMITER + "Key_value" if len(keys) <= 0 { for key, table := range res { if strings.HasPrefix(key, node.PathMap.Path) && - (len(key) == len(node.PathMap.Path) || key[len(node.PathMap.Path)] == '.'){ + (len(key) == len(node.PathMap.Path) || key[len(node.PathMap.Path)] == common.PAR_GO_PATH_DELIMITER[0]){ table.Values = append(table.Values, nil) table.DefinitionLevels = append(table.DefinitionLevels, node.DL) table.RepetitionLevels = append(table.RepetitionLevels, node.RL) @@ -160,7 +160,7 @@ func MarshalJSON(ss []interface{}, schemaHandler *schema.SchemaHandler) (tb *map newPathStr := node.PathMap.Children[key].Path for path, table := range res { if strings.HasPrefix(path, newPathStr) && - (len(path) == len(newPathStr) || path[len(newPathStr)] == '.') { + (len(path) == len(newPathStr) || path[len(newPathStr)] == common.PAR_GO_PATH_DELIMITER[0]) { table.Values = append(table.Values, nil) table.DefinitionLevels = append(table.DefinitionLevels, node.DL) @@ -175,11 +175,11 @@ func MarshalJSON(ss []interface{}, schemaHandler *schema.SchemaHandler) (tb *map ln := node.Val.Len() if schema.GetConvertedType() == parquet.ConvertedType_LIST { // real LIST - pathStr = pathStr + ".List" + ".Element" + pathStr = pathStr + common.PAR_GO_PATH_DELIMITER + "List" + common.PAR_GO_PATH_DELIMITER + "Element" if ln <= 0 { for key, table := range res { if strings.HasPrefix(key, node.PathMap.Path) && - (len(key) == len(node.PathMap.Path) || key[len(node.PathMap.Path)] == '.'){ + (len(key) == len(node.PathMap.Path) || key[len(node.PathMap.Path)] == common.PAR_GO_PATH_DELIMITER[0]){ table.Values = append(table.Values, nil) table.DefinitionLevels = append(table.DefinitionLevels, node.DL) table.RepetitionLevels = append(table.RepetitionLevels, node.RL) @@ -213,7 +213,7 @@ func MarshalJSON(ss []interface{}, schemaHandler *schema.SchemaHandler) (tb *map if ln <= 0 { for key, table := range res { if strings.HasPrefix(key, node.PathMap.Path) && - (len(key) == len(node.PathMap.Path) || key[len(node.PathMap.Path)] == '.'){ + (len(key) == len(node.PathMap.Path) || key[len(node.PathMap.Path)] == common.PAR_GO_PATH_DELIMITER[0]){ table.Values = append(table.Values, nil) table.DefinitionLevels = append(table.DefinitionLevels, node.DL) table.RepetitionLevels = append(table.RepetitionLevels, node.RL) diff --git a/marshal/marshal.go b/marshal/marshal.go index b7cbc59e..0d7f0190 100644 --- a/marshal/marshal.go +++ b/marshal/marshal.go @@ -130,7 +130,7 @@ func (p *ParquetSlice) Marshal(node *Node, nodeBuf *NodeBufType) []*Node { path := node.PathMap.Path if *p.schemaHandler.SchemaElements[p.schemaHandler.MapIndex[node.PathMap.Path]].RepetitionType != parquet.FieldRepetitionType_REPEATED { pathMap = pathMap.Children["List"].Children["Element"] - path += ".List" + ".Element" + path = path + common.PAR_GO_PATH_DELIMITER + "List" + common.PAR_GO_PATH_DELIMITER + "Element" } if ln <= 0 { return nodes @@ -158,7 +158,7 @@ type ParquetMap struct { func (p *ParquetMap) Marshal(node *Node, nodeBuf *NodeBufType) []*Node { nodes := make([]*Node, 0) - path := node.PathMap.Path + ".Key_value" + path := node.PathMap.Path + common.PAR_GO_PATH_DELIMITER + "Key_value" keys := node.Val.MapKeys() if len(keys) <= 0 { return nodes @@ -281,7 +281,7 @@ func Marshal(srcInterface []interface{}, schemaHandler *schema.SchemaHandler) (t if numChildren > int32(0) { for key, table := range res { if strings.HasPrefix(key, path) && - (len(key) == len(path) || key[len(path)] == '.') { + (len(key) == len(path) || key[len(path)] == common.PAR_GO_PATH_DELIMITER[0]) { table.Values = append(table.Values, nil) table.DefinitionLevels = append(table.DefinitionLevels, node.DL) table.RepetitionLevels = append(table.RepetitionLevels, node.RL) diff --git a/schema/schemahandler.go b/schema/schemahandler.go index f2b294d0..1c9a642b 100644 --- a/schema/schemahandler.go +++ b/schema/schemahandler.go @@ -38,7 +38,7 @@ func (self *PathMapType) Add(path []string) { } c := path[1] if _, ok := self.Children[c]; !ok { - self.Children[c] = NewPathMap(self.Path + "." + c) + self.Children[c] = NewPathMap(self.Path + common.PAR_GO_PATH_DELIMITER + c) } self.Children[c].Add(path[1:]) }