Skip to content

Commit

Permalink
DatastoreExportが失敗した場合、リトライを行うようにした refs #48
Browse files Browse the repository at this point in the history
  • Loading branch information
sinmetal committed Oct 5, 2019
1 parent c1d798a commit 26468a9
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 146 deletions.
18 changes: 18 additions & 0 deletions api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package main

import (
"fmt"
"log"
"net/http"
)

func WriteError(w http.ResponseWriter, statusCode int, message string, err error) {
msg := fmt.Sprintln(message, " : ", err)
log.Println(msg)
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(statusCode)
_, err = w.Write([]byte(msg))
if err != nil {
log.Println(err)
}
}
6 changes: 5 additions & 1 deletion bqload_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ func TestBQLoadService_InsertBigQueryLoadJob(t *testing.T) {
if err != nil {
t.Fatal(err)
}
bqljcQ, err := NewBQLoadJobCheckQueue("localhost:8080", TasksClient)
if err != nil {
t.Fatal(err)
}

const ds2bqJobID = "helloJob"
{
Expand All @@ -38,7 +42,7 @@ func TestBQLoadService_InsertBigQueryLoadJob(t *testing.T) {
}
}

ls := NewBQLoadService(s)
ls := NewBQLoadService(s, bqljcQ)
if err := ls.InsertBigQueryLoadJob(ctx, ds2bqJobID, "gs://datastore-backup-gcpugjp-dev/2019-07-25T10:35:08_16520"); err != nil {
t.Fatal(err)
}
Expand Down
108 changes: 41 additions & 67 deletions datastore_export_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/morikuni/failure"
)

const DefaultSeparateKindCount = 30

type DatastoreExportRequest struct {
ProjectID string `json:"projectId"`
AllKinds bool `json:"allKinds"`
Expand All @@ -33,94 +35,62 @@ type DS2BQJobIDWithDatastoreExportJobID struct {
DatastoreExportJobID string `json:"datastoreExportJobId"`
}

func HandleDatastoreExportAPI(w http.ResponseWriter, r *http.Request) {
queue, err := NewDatastoreExportJobCheckQueue(r.Host, TasksClient)
if err != nil {
msg := fmt.Sprintf("failed NewDatastoreExportJobCheckQueue.err=%+v", err)
log.Println(msg)
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusBadRequest)
_, err := w.Write([]byte(msg))
if err != nil {
log.Println(err)
}
return
type DatastoreExportAPI struct {
DatastoreExportJobCheckQueue *DatastoreExportJobCheckQueue
DSExportJobStore *DSExportJobStore
BQLoadJobStore *BQLoadJobStore
}

func NewDatastoreExportAPI(queue *DatastoreExportJobCheckQueue, dseJS *DSExportJobStore, bqlJS *BQLoadJobStore) *DatastoreExportAPI {
return &DatastoreExportAPI{
queue, dseJS, bqlJS,
}
}

func HandleDatastoreExportAPI(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
msg := fmt.Sprintf("failed ioutil.Read(request.Body).err=%+v", err)
log.Println(msg)
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusBadRequest)
_, err := w.Write([]byte(msg))
if err != nil {
log.Println(err)
}
WriteError(w, http.StatusBadRequest, "failed ioutil.Read(request.Body)", err)
return
}

form := &DatastoreExportRequest{}
if err := json.Unmarshal(body, form); err != nil {
msg := fmt.Sprintf("failed json.Unmarshal(request.Body).err=%+v", err)
log.Println(msg)
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusBadRequest)
_, err := w.Write([]byte(msg))
if err != nil {
log.Println(err)
}
WriteError(w, http.StatusBadRequest, fmt.Sprintf("failed json.Unmarshal(request.Body) body=%v", string(body)), err)
return
}

log.Printf("%s\n", string(body))

kinds, err := GetDatastoreKinds(r.Context(), form)
if err != nil {
msg := fmt.Sprintf("failed GetDatastoreKinds form=%+v.err=%+v", form, err)
log.Println(msg)
w.WriteHeader(http.StatusInternalServerError)
_, err := w.Write([]byte(msg))
if err != nil {
log.Println(err)
}
WriteError(w, http.StatusBadRequest, fmt.Sprintf("failed GetDatastoreKinds form=%+v", form), err)
return
}
efs, err := BuildEntityFilter(r.Context(), form.NamespaceIDs, kinds, 30)
efs, err := BuildEntityFilter(r.Context(), form.NamespaceIDs, kinds, DefaultSeparateKindCount)
if err != nil {
msg := fmt.Sprintf("failed BuildEntityFilter form=%+v.err=%+v", form, err)
log.Println(msg)
w.WriteHeader(http.StatusInternalServerError)
_, err := w.Write([]byte(msg))
if err != nil {
log.Println(err)
}
WriteError(w, http.StatusBadRequest, fmt.Sprintf("failed BuildEntityFilter form=%+v", form), err)
return
}

queue, err := NewDatastoreExportJobCheckQueue(r.Host, TasksClient)
if err != nil {
WriteError(w, http.StatusBadRequest, "failed NewDatastoreExportJobCheckQueue", err)
return
}

dsexportJobStore, err := NewDSExportJobStore(r.Context(), DatastoreClient)
if err != nil {
msg := fmt.Sprintf("failed NewDSExportJobStore() form=%+v.err=%+v", form, err)
log.Println(msg)
w.WriteHeader(http.StatusInternalServerError)
_, err := w.Write([]byte(msg))
if err != nil {
log.Println(err)
}
WriteError(w, http.StatusInternalServerError, fmt.Sprintf("failed NewDSExportJobStore() form=%+v", form), err)
return
}

bqloadJobStore, err := NewBQLoadJobStore(r.Context(), DatastoreClient)
if err != nil {
msg := fmt.Sprintf("failed NewBQLoadJobStore() form=%+v.err=%+v", form, err)
log.Println(msg)
w.WriteHeader(http.StatusInternalServerError)
_, err := w.Write([]byte(msg))
if err != nil {
log.Println(err)
}
WriteError(w, http.StatusInternalServerError, fmt.Sprintf("failed NewBQLoadJobStore() form=%+v", form), err)
return
}
api := NewDatastoreExportAPI(queue, dsexportJobStore, bqloadJobStore)

res := &DatastoreExportResponse{
[]*DS2BQJobIDWithDatastoreExportJobID{},
Expand All @@ -129,7 +99,7 @@ func HandleDatastoreExportAPI(w http.ResponseWriter, r *http.Request) {
var dsExportJobID string
ds2bqJobID := dsexportJobStore.NewDS2BQJobID(r.Context())
bqLoadKinds := BuildBQLoadKinds(ef, form.IgnoreBQLoadKinds)
dsExportJobID, err := CreateDatastoreExportJob(r.Context(), dsexportJobStore, bqloadJobStore, queue, ds2bqJobID, string(body), form, bqLoadKinds, ef)
dsExportJobID, err := api.StartDS2BQJob(r.Context(), ds2bqJobID, string(body), form, form.NamespaceIDs, bqLoadKinds, ef)
if err != nil {
msg := fmt.Sprintf("failed CreateDatastoreExportJob ds2bqJobID=%v.err=%+v", ds2bqJobID, err)
log.Println(msg)
Expand All @@ -153,41 +123,45 @@ func HandleDatastoreExportAPI(w http.ResponseWriter, r *http.Request) {
}
}

func CreateDatastoreExportJob(ctx context.Context, dsexportJobStore *DSExportJobStore, bqloadJobStore *BQLoadJobStore, queue *DatastoreExportJobCheckQueue, ds2bqJobID string, body string, form *DatastoreExportRequest, kinds []string, ef *datastore.EntityFilter) (string, error) {
_, err := dsexportJobStore.Create(ctx, ds2bqJobID, body, kinds)
func (api *DatastoreExportAPI) StartDS2BQJob(ctx context.Context, ds2bqJobID string, body string, form *DatastoreExportRequest, namespaceIDs []string, kinds []string, ef *datastore.EntityFilter) (string, error) {
_, err := api.DSExportJobStore.Create(ctx, ds2bqJobID, body, form.ProjectID, namespaceIDs, kinds)
if err != nil {
return "", fmt.Errorf("failed DSExportJobStore.Create() ds2bqJobID=%v.err=%+v", ds2bqJobID, err)
}

_, err = bqloadJobStore.PutMulti(ctx, BuildBQLoadJobPutMultiForm(ds2bqJobID, kinds, form))
_, err = api.BQLoadJobStore.PutMulti(ctx, BuildBQLoadJobPutMultiForm(ds2bqJobID, kinds, form))
if err != nil {
return "", fmt.Errorf("failed BQLoadJobStore.PutMulti() ds2bqJobID=%v,bqLoadKinds=%+v.err=%+v", ds2bqJobID, kinds, err)
}

ope, err := datastore.Export(ctx, form.ProjectID, form.OutputGCSFilePath, ef)
return api.CreateDatastoreExportJob(ctx, ds2bqJobID, form.ProjectID, form.OutputGCSFilePath, ef)
}

func (api *DatastoreExportAPI) CreateDatastoreExportJob(ctx context.Context, ds2bqJobID string, projectID string, outputGCSFilePath string, ef *datastore.EntityFilter) (string, error) {
ope, err := datastore.Export(ctx, projectID, outputGCSFilePath, ef)
if err != nil {
return "", fmt.Errorf("failed datastore.Export() form=%+v.err=%+v", form, err)
return "", fmt.Errorf("failed datastore.Export() err=%+v", err)
}
switch ope.HTTPStatusCode {
case http.StatusOK:
log.Printf("%+v", ope)

if _, err := dsexportJobStore.StartExportJob(ctx, ds2bqJobID, ope.Name); err != nil {
if _, err := api.DSExportJobStore.StartExportJob(ctx, ds2bqJobID, ope.Name, 0); err != nil {
return "", fmt.Errorf("failed DSExportJobStore.StartExportJob. ds2bqJobID=%v,jobName=%s.err=%+v", ds2bqJobID, ope.Name, err)
}

if err := queue.AddTask(ctx, &DatastoreExportJobCheckRequest{
if err := api.DatastoreExportJobCheckQueue.AddTask(ctx, &DatastoreExportJobCheckRequest{
DS2BQJobID: ds2bqJobID,
DatastoreExportJobID: ope.Name,
}); err != nil {
return "", fmt.Errorf("failed queue.AddTask. jobName=%s.err=%+v", ope.Name, err)
}
return ope.Name, nil
default:
if _, err := dsexportJobStore.FinishExportJob(ctx, ds2bqJobID, DSExportJobStatusFailed, fmt.Sprintf("failed DatastoreExportJob.INSERT(). Code=%v,Message=%v", ope.Error.Code, ope.Error.Message)); err != nil {
if _, err := api.DSExportJobStore.FinishExportJob(ctx, ds2bqJobID, DSExportJobStatusFailed, "", fmt.Sprintf("failed DatastoreExportJob.INSERT(). Code=%v,Message=%v", ope.Error.Code, ope.Error.Message)); err != nil {
return "", fmt.Errorf("failed DSExportJobStore.FinishExportJob. ds2bqJobID=%v.err=%+v", ds2bqJobID, err)
}
return "", fmt.Errorf("failed DatastoreExportJob.INSERT(). form=%+v.ope.Error=%+v", form, ope.Error)
return "", fmt.Errorf("failed DatastoreExportJob.INSERT(). ds2bqJobID=%v,ope.Error=%+v", ds2bqJobID, ope.Error)
}
}

Expand Down
Loading

0 comments on commit 26468a9

Please sign in to comment.