Skip to content

Commit

Permalink
Merge pull request #350 from xitongsys/dev
Browse files Browse the repository at this point in the history
use x01 instead of . as the delimiter
  • Loading branch information
xitongsys authored Jan 29, 2021
2 parents c7174e0 + 37a1ffc commit a403f20
Show file tree
Hide file tree
Showing 11 changed files with 141 additions and 27 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ var jsonSchema string = `
* Parquet-go reads data as an object in Golang and every field must be a public field, which start with an upper letter. This field name we call it `InName`. Field name in parquet file we call it `ExName`. Function `common.HeadToUpper` converts `ExName` to `InName`. There are some restriction:
1. It's not allowed if two field names are only different at their first letter case. Such as `name` and `Name`.
2. `PARGO_PREFIX_` is a reserved string, which you'd better not use it as a name prefix. ([#294](https://github.com/xitongsys/parquet-go/issues/294))
3. Use `\x01` as the delimiter of fields to support `.` in some field name.([dot_in_name.go](https://github.com/xitongsys/parquet-go/blob/master/example/dot_in_name.go), [#349](https://github.com/xitongsys/parquet-go/issues/349))

## Concurrency

Expand Down Expand Up @@ -368,6 +369,7 @@ func NewCSVWriter(md []string, pfile ParquetFile.ParquetFile, np int64) (*CSVWri
|[type_alias.go](https://github.com/xitongsys/parquet-go/blob/master/example/type_alias.go)|example for type alias|
|[writer.go](https://github.com/xitongsys/parquet-go/blob/master/example/writer.go)|create ParquetWriter from io.Writer|
|[keyvalue_metadata.go](https://github.com/xitongsys/parquet-go/blob/master/example/keyvalue_metadata.go)|write keyvalue metadata|
|[dot_in_name.go](https://github.com/xitongsys/parquet-go/blob/master/example/dot_in_name.go)|`.` in filed name|



Expand Down
13 changes: 10 additions & 3 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,17 +915,24 @@ func SizeOf(val reflect.Value) int64 {
return 4
}

const PAR_GO_PATH_DELIMITER = "\x01"

// . -> \x01
func ReformPathStr(pathStr string) string {
return strings.ReplaceAll(pathStr, ".", "\x01")
}

//Convert path slice to string
func PathToStr(path []string) string {
return strings.Join(path, ".")
return strings.Join(path, PAR_GO_PATH_DELIMITER)
}

//Convert string to path slice
func StrToPath(str string) []string {
return strings.Split(str, ".")
return strings.Split(str, PAR_GO_PATH_DELIMITER)
}

//Get the pathStr index in a path
func PathStrIndex(str string) int {
return len(strings.Split(str, "."))
return len(strings.Split(str, PAR_GO_PATH_DELIMITER))
}
11 changes: 6 additions & 5 deletions example/column_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/common"
"github.com/xitongsys/parquet-go/reader"
"github.com/xitongsys/parquet-go/writer"
)
Expand Down Expand Up @@ -72,15 +73,15 @@ func main() {
}
num = int64(pr.GetNumRows())

pr.SkipRowsByPath("parquet_go_root.name", 5) //skip the first five rows
names, rls, dls, err = pr.ReadColumnByPath("parquet_go_root.name", num)
pr.SkipRowsByPath(common.ReformPathStr("parquet_go_root.name"), 5) //skip the first five rows
names, rls, dls, err = pr.ReadColumnByPath(common.ReformPathStr("parquet_go_root.name"), num)
log.Println("name", names, rls, dls, err)

classes, rls, dls, err = pr.ReadColumnByPath("parquet_go_root.class.list.element", num)
classes, rls, dls, err = pr.ReadColumnByPath(common.ReformPathStr("parquet_go_root.class.list.element"), num)
log.Println("class", classes, rls, dls, err)

scores_key, rls, dls, err = pr.ReadColumnByPath("parquet_go_root.score.key_value.key", num)
scores_value, rls, dls, err = pr.ReadColumnByPath("parquet_go_root.score.key_value.value", num)
scores_key, rls, dls, err = pr.ReadColumnByPath(common.ReformPathStr("parquet_go_root.score.key_value.key"), num)
scores_value, rls, dls, err = pr.ReadColumnByPath(common.ReformPathStr("parquet_go_root.score.key_value.value"), num)
log.Println("parquet_go_root.scores_key", scores_key, err)
log.Println("parquet_go_root.scores_value", scores_value, err)

Expand Down
103 changes: 103 additions & 0 deletions example/dot_in_name.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package main

import (
"log"

"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/parquet"
"github.com/xitongsys/parquet-go/reader"
"github.com/xitongsys/parquet-go/writer"
)

type A struct {
V1 int32 `parquet:"name=b.c, type=INT32, encoding=PLAIN"`
V2 B `parquet:"name=b"`
V3 int32 `parquet:"name=c, type=INT32, encoding=PLAIN"`
}

type B struct {
C int32 `parquet:"name=c, type=INT32, encoding=PLAIN"`
}

func main() {
var err error
fw, err := local.NewLocalFileWriter("a.parquet")
if err != nil {
log.Println("Can't create local file", err)
return
}

//write
pw, err := writer.NewParquetWriter(fw, new(A), 4)
if err != nil {
log.Println("Can't create parquet writer", err)
return
}

pw.RowGroupSize = 128 * 1024 * 1024 //128M
pw.PageSize = 8 * 1024 //8K
pw.CompressionType = parquet.CompressionCodec_SNAPPY
num := 10
for i := 0; i < num; i++ {
o := A{
V1: 1,
V2: B{
C: 2,
},
V3: 3,
}
if err = pw.Write(o); err != nil {
log.Println("Write error", err)
}
}
if err = pw.WriteStop(); err != nil {
log.Println("WriteStop error", err)
return
}
log.Println("Write Finished")
fw.Close()

///read all
fr, err := local.NewLocalFileReader("a.parquet")
if err != nil {
log.Println("Can't open file")
return
}

pr, err := reader.NewParquetReader(fr, new(A), 4)
if err != nil {
log.Println("Can't create parquet reader", err)
return
}
num = int(pr.GetNumRows())
os := make([]A, num)

if err = pr.Read(&os); err != nil {
log.Println("Read error", err)
}
log.Println(os)

pr.ReadStop()
fr.Close()

///read column by path
fr, err = local.NewLocalFileReader("a.parquet")
if err != nil {
log.Println("Can't open file")
return
}

pr, err = reader.NewParquetReader(fr, new(A), 4)
if err != nil {
log.Println("Can't create parquet reader", err)
return
}
cn := pr.GetNumRows()
v1, _, _, _ := pr.ReadColumnByPath("parquet_go_root\x01b.c",cn)
v2, _, _, _ := pr.ReadColumnByPath("parquet_go_root\x01b\x01c",cn)
v3, _, _, _ := pr.ReadColumnByPath("parquet_go_root\x01c", cn)
log.Println(v1,v2,v3)

pr.ReadStop()
fr.Close()
}
5 changes: 3 additions & 2 deletions example/read_partial2.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
"time"

"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/common"
"github.com/xitongsys/parquet-go/parquet"
"github.com/xitongsys/parquet-go/reader"
"github.com/xitongsys/parquet-go/writer"
"github.com/xitongsys/parquet-go/parquet"
)

type Student struct {
Expand Down Expand Up @@ -80,7 +81,7 @@ func main() {
num = int(pr.GetNumRows())
//only read scores
scores := make([]map[string]int32, num)
pr.ReadPartial(&scores, "parquet_go_root.scores")
pr.ReadPartial(&scores, common.ReformPathStr("parquet_go_root.scores"))
log.Println(scores)

pr.ReadStop()
Expand Down
5 changes: 3 additions & 2 deletions example/read_partial_without_schema_predefined.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
"time"

"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/common"
"github.com/xitongsys/parquet-go/parquet"
"github.com/xitongsys/parquet-go/reader"
"github.com/xitongsys/parquet-go/writer"
"github.com/xitongsys/parquet-go/parquet"
)

type Student struct {
Expand Down Expand Up @@ -79,7 +80,7 @@ func main() {

num = int(pr.GetNumRows())
//only read scores
res, err := pr.ReadPartialByNumber(num, "parquet_go_root.scores")
res, err := pr.ReadPartialByNumber(num, common.ReformPathStr("parquet_go_root.scores"))
if err != nil {
log.Println("Can't read", err)
return
Expand Down
7 changes: 3 additions & 4 deletions layout/page.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"io"
"math/bits"
"strings"

"github.com/apache/thrift/lib/go/thrift"
"github.com/xitongsys/parquet-go/common"
Expand Down Expand Up @@ -510,7 +509,7 @@ func (self *Page) GetRLDLFromRawData(schemaHandler *schema.SchemaHandler) (int64

table := new(Table)
table.Path = self.Path
name := strings.Join(self.Path, ".")
name := common.PathToStr(self.Path)
table.RepetitionType = schemaHandler.SchemaElements[schemaHandler.MapIndex[name]].GetRepetitionType()
table.MaxRepetitionLevel = maxRepetitionLevel
table.MaxDefinitionLevel = maxDefinitionLevel
Expand Down Expand Up @@ -576,7 +575,7 @@ func (self *Page) GetValueFromRawData(schemaHandler *schema.SchemaHandler) error
numNulls++
}
}
name := strings.Join(self.DataTable.Path, ".")
name := common.PathToStr(self.DataTable.Path)
var values []interface{}
var ct parquet.ConvertedType = -1
if schemaHandler.SchemaElements[schemaHandler.MapIndex[name]].IsSetConvertedType() {
Expand Down Expand Up @@ -768,7 +767,7 @@ func ReadPage(thriftReader *thrift.TBufferedTransport, schemaHandler *schema.Sch
path := make([]string, 0)
path = append(path, schemaHandler.GetRootInName())
path = append(path, colMetaData.GetPathInSchema()...)
name := strings.Join(path, ".")
name := common.PathToStr(path)

if pageHeader.GetType() == parquet.PageType_DICTIONARY_PAGE {
page = NewDictPage()
Expand Down
2 changes: 1 addition & 1 deletion marshal/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func MarshalCSV(records []interface{}, schemaHandler *schema.SchemaHandler) (*ma
}

for i := 0; i < len(records[0].([]interface{})); i++ {
pathStr := schemaHandler.GetRootInName() + "." + schemaHandler.Infos[i+1].InName
pathStr := schemaHandler.GetRootInName() + common.PAR_GO_PATH_DELIMITER + schemaHandler.Infos[i+1].InName
table := layout.NewEmptyTable()
res[pathStr] = table
table.Path = common.StrToPath(pathStr)
Expand Down
12 changes: 6 additions & 6 deletions marshal/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ func MarshalJSON(ss []interface{}, schemaHandler *schema.SchemaHandler) (tb *map
keys := node.Val.MapKeys()

if schema.GetConvertedType() == parquet.ConvertedType_MAP { //real map
pathStr = pathStr + ".Key_value"
pathStr = pathStr + common.PAR_GO_PATH_DELIMITER + "Key_value"
if len(keys) <= 0 {
for key, table := range res {
if strings.HasPrefix(key, node.PathMap.Path) &&
(len(key) == len(node.PathMap.Path) || key[len(node.PathMap.Path)] == '.'){
(len(key) == len(node.PathMap.Path) || key[len(node.PathMap.Path)] == common.PAR_GO_PATH_DELIMITER[0]){
table.Values = append(table.Values, nil)
table.DefinitionLevels = append(table.DefinitionLevels, node.DL)
table.RepetitionLevels = append(table.RepetitionLevels, node.RL)
Expand Down Expand Up @@ -160,7 +160,7 @@ func MarshalJSON(ss []interface{}, schemaHandler *schema.SchemaHandler) (tb *map
newPathStr := node.PathMap.Children[key].Path
for path, table := range res {
if strings.HasPrefix(path, newPathStr) &&
(len(path) == len(newPathStr) || path[len(newPathStr)] == '.') {
(len(path) == len(newPathStr) || path[len(newPathStr)] == common.PAR_GO_PATH_DELIMITER[0]) {

table.Values = append(table.Values, nil)
table.DefinitionLevels = append(table.DefinitionLevels, node.DL)
Expand All @@ -175,11 +175,11 @@ func MarshalJSON(ss []interface{}, schemaHandler *schema.SchemaHandler) (tb *map
ln := node.Val.Len()

if schema.GetConvertedType() == parquet.ConvertedType_LIST { // real LIST
pathStr = pathStr + ".List" + ".Element"
pathStr = pathStr + common.PAR_GO_PATH_DELIMITER + "List" + common.PAR_GO_PATH_DELIMITER + "Element"
if ln <= 0 {
for key, table := range res {
if strings.HasPrefix(key, node.PathMap.Path) &&
(len(key) == len(node.PathMap.Path) || key[len(node.PathMap.Path)] == '.'){
(len(key) == len(node.PathMap.Path) || key[len(node.PathMap.Path)] == common.PAR_GO_PATH_DELIMITER[0]){
table.Values = append(table.Values, nil)
table.DefinitionLevels = append(table.DefinitionLevels, node.DL)
table.RepetitionLevels = append(table.RepetitionLevels, node.RL)
Expand Down Expand Up @@ -213,7 +213,7 @@ func MarshalJSON(ss []interface{}, schemaHandler *schema.SchemaHandler) (tb *map
if ln <= 0 {
for key, table := range res {
if strings.HasPrefix(key, node.PathMap.Path) &&
(len(key) == len(node.PathMap.Path) || key[len(node.PathMap.Path)] == '.'){
(len(key) == len(node.PathMap.Path) || key[len(node.PathMap.Path)] == common.PAR_GO_PATH_DELIMITER[0]){
table.Values = append(table.Values, nil)
table.DefinitionLevels = append(table.DefinitionLevels, node.DL)
table.RepetitionLevels = append(table.RepetitionLevels, node.RL)
Expand Down
6 changes: 3 additions & 3 deletions marshal/marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (p *ParquetSlice) Marshal(node *Node, nodeBuf *NodeBufType) []*Node {
path := node.PathMap.Path
if *p.schemaHandler.SchemaElements[p.schemaHandler.MapIndex[node.PathMap.Path]].RepetitionType != parquet.FieldRepetitionType_REPEATED {
pathMap = pathMap.Children["List"].Children["Element"]
path += ".List" + ".Element"
path = path + common.PAR_GO_PATH_DELIMITER + "List" + common.PAR_GO_PATH_DELIMITER + "Element"
}
if ln <= 0 {
return nodes
Expand Down Expand Up @@ -158,7 +158,7 @@ type ParquetMap struct {

func (p *ParquetMap) Marshal(node *Node, nodeBuf *NodeBufType) []*Node {
nodes := make([]*Node, 0)
path := node.PathMap.Path + ".Key_value"
path := node.PathMap.Path + common.PAR_GO_PATH_DELIMITER + "Key_value"
keys := node.Val.MapKeys()
if len(keys) <= 0 {
return nodes
Expand Down Expand Up @@ -281,7 +281,7 @@ func Marshal(srcInterface []interface{}, schemaHandler *schema.SchemaHandler) (t
if numChildren > int32(0) {
for key, table := range res {
if strings.HasPrefix(key, path) &&
(len(key) == len(path) || key[len(path)] == '.') {
(len(key) == len(path) || key[len(path)] == common.PAR_GO_PATH_DELIMITER[0]) {
table.Values = append(table.Values, nil)
table.DefinitionLevels = append(table.DefinitionLevels, node.DL)
table.RepetitionLevels = append(table.RepetitionLevels, node.RL)
Expand Down
2 changes: 1 addition & 1 deletion schema/schemahandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (self *PathMapType) Add(path []string) {
}
c := path[1]
if _, ok := self.Children[c]; !ok {
self.Children[c] = NewPathMap(self.Path + "." + c)
self.Children[c] = NewPathMap(self.Path + common.PAR_GO_PATH_DELIMITER + c)
}
self.Children[c].Add(path[1:])
}
Expand Down

0 comments on commit a403f20

Please sign in to comment.