-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathloader.go
167 lines (136 loc) · 3.45 KB
/
loader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package bqloader
import (
"bytes"
"context"
"encoding/csv"
"io"
"log"
"sync"
"cloud.google.com/go/bigquery"
"cloud.google.com/go/storage"
"golang.org/x/text/transform"
)
// BQLoader loads data from Cloud Storage to BigQuery table.
type BQLoader interface {
AddHandler(*Handler)
Handle(context.Context, Event) error
}
// Event is an event from Cloud Storage.
type Event struct {
Name string `json:"name"`
Bucket string `json:"bucket"`
// for test
source io.Reader
}
// New build a new Loader.
func New() BQLoader {
return &bqloader{
handlers: []*Handler{},
mu: sync.RWMutex{},
}
}
type bqloader struct {
handlers []*Handler
mu sync.RWMutex
}
func (l *bqloader) AddHandler(h *Handler) {
l.mu.Lock()
defer l.mu.Unlock()
l.handlers = append(l.handlers, h)
}
func (l *bqloader) Handle(ctx context.Context, e Event) error {
log.Printf("loader started")
defer log.Printf("loader finished")
log.Printf("file name = %s", e.Name)
for _, h := range l.handlers {
log.Printf("handler = %+v", h)
if h.match(e.Name) {
log.Printf("handler matches")
if err := l.handle(ctx, e, h); err != nil {
log.Printf("error: %v", err)
return err
}
}
}
return nil
}
func (l *bqloader) handle(ctx context.Context, e Event, h *Handler) error {
var r io.Reader
if h.extractor != nil {
// If extractor is specified, prefer to use it.
er, err := h.extractor.extract(ctx, e)
if err != nil {
return err
}
r = er
} else {
// If extractor is not specified, use the default extractor.
// TODO: Make following process to get data from cloud storage an extractor.
sc, err := storage.NewClient(ctx)
if err != nil {
return err
}
obj := sc.Bucket(e.Bucket).Object(e.Name)
objr, err := obj.NewReader(ctx)
if err != nil {
log.Printf("[%s] failed to initialize object reader: %v", h.Name, err)
return err
}
defer objr.Close()
log.Printf("[%s] DEBUG objr = %+v", h.Name, objr)
r = objr
}
if h.Encoding != nil {
r = transform.NewReader(r, h.Encoding.NewDecoder())
}
source, err := h.Parser(ctx, r)
if err != nil {
log.Printf("[%s] failed to parse object: %v", h.Name, err)
return err
}
source = source[h.SkipLeadingRows:]
records := make([][]string, len(source))
// TODO: Make this loop parallel.
for i, r := range source {
record, err := h.Projector(r)
if err != nil {
log.Printf("[%s] failed to project row %d: %v", h.Name, i+h.SkipLeadingRows, err)
return err
}
records[i] = record
}
log.Printf("[%s] DEBUG records = %+v", h.Name, records)
// If loader is specified, prefer to use Loader.
if h.loader != nil {
return h.loader.load(ctx, records)
}
// TODO: Make following process to load a Loader.
// TODO: Make output format more efficient. e.g. gzip.
buf := &bytes.Buffer{}
if err := csv.NewWriter(buf).WriteAll(records); err != nil {
log.Printf("[%s] failed to write csv: %v", h.Name, err)
return err
}
bq, err := bigquery.NewClient(ctx, h.Project)
if err != nil {
return err
}
table := bq.Dataset(h.Dataset).Table(h.Table)
rs := bigquery.NewReaderSource(buf)
loader := table.LoaderFrom(rs)
job, err := loader.Run(ctx)
if err != nil {
log.Printf("[%s] failed to run bigquery load job: %v", h.Name, err)
return err
}
status, err := job.Wait(ctx)
if err != nil {
log.Printf("[%s] failed to wait job: %v", h.Name, err)
return err
}
if status.Err() != nil {
log.Printf("[%s] failed to load csv: %v", h.Name, status.Errors)
return status.Err()
}
return nil
}