-
Notifications
You must be signed in to change notification settings - Fork 688
/
Copy pathremote_file_output_reader.go
137 lines (118 loc) · 4.1 KB
/
remote_file_output_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package ioutils
import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/io"
"github.com/flyteorg/flyte/flytestdlib/storage"
)
type RemoteFileOutputReader struct {
outPath io.OutputFilePaths
store storage.ComposedProtobufStore
maxPayloadSize int64
}
func (r RemoteFileOutputReader) IsError(ctx context.Context) (bool, error) {
metadata, err := r.store.Head(ctx, r.outPath.GetErrorPath())
if err != nil {
return false, errors.Wrapf(err, "failed to read error file @[%s]", r.outPath.GetErrorPath())
}
if metadata.Exists() {
if metadata.Size() > r.maxPayloadSize {
return false, errors.Wrapf(err, "error file @[%s] is too large [%d] bytes, max allowed [%d] bytes", r.outPath.GetErrorPath(), metadata.Size(), r.maxPayloadSize)
}
return true, nil
}
return false, nil
}
func (r RemoteFileOutputReader) ReadError(ctx context.Context) (io.ExecutionError, error) {
errorDoc := &core.ErrorDocument{}
err := r.store.ReadProtobuf(ctx, r.outPath.GetErrorPath(), errorDoc)
if err != nil {
if storage.IsNotFound(err) {
return io.ExecutionError{
IsRecoverable: true,
ExecutionError: &core.ExecutionError{
Code: "ErrorFileNotFound",
Message: err.Error(),
Kind: core.ExecutionError_SYSTEM,
},
}, nil
}
return io.ExecutionError{}, errors.Wrapf(err, "failed to read error data from task @[%s]", r.outPath.GetErrorPath())
}
if errorDoc.Error == nil {
return io.ExecutionError{
IsRecoverable: true,
ExecutionError: &core.ExecutionError{
Code: "ErrorFileBadFormat",
Message: fmt.Sprintf("error not formatted correctly, nil error @path [%s]", r.outPath.GetErrorPath()),
Kind: core.ExecutionError_SYSTEM,
},
}, nil
}
ee := io.ExecutionError{
ExecutionError: &core.ExecutionError{
Code: errorDoc.Error.Code,
Message: errorDoc.Error.Message,
Kind: errorDoc.Error.Origin,
},
}
if errorDoc.Error.Kind == core.ContainerError_RECOVERABLE {
ee.IsRecoverable = true
}
return ee, nil
}
func (r RemoteFileOutputReader) Exists(ctx context.Context) (bool, error) {
md, err := r.store.Head(ctx, r.outPath.GetOutputPath())
if err != nil {
return false, err
}
if md.Exists() {
if md.Size() > r.maxPayloadSize {
return false, errors.Errorf("output file @[%s] is too large [%d] bytes, max allowed [%d] bytes", r.outPath.GetOutputPath(), md.Size(), r.maxPayloadSize)
}
return true, nil
}
return false, nil
}
func (r RemoteFileOutputReader) Read(ctx context.Context) (*core.LiteralMap, *io.ExecutionError, error) {
d := &core.LiteralMap{}
if err := r.store.ReadProtobuf(ctx, r.outPath.GetOutputPath(), d); err != nil {
// TODO change flytestdlib to return protobuf unmarshal errors separately. As this can indicate malformed output and we should catch that
return nil, nil, fmt.Errorf("failed to read data from dataDir [%v]. Error: %v", r.outPath.GetOutputPath(), err)
}
if d.Literals == nil {
return nil, &io.ExecutionError{
IsRecoverable: true,
ExecutionError: &core.ExecutionError{
Code: "No outputs produced",
Message: fmt.Sprintf("outputs not found at [%s]", r.outPath.GetOutputPath()),
},
}, nil
}
return d, nil, nil
}
func (r RemoteFileOutputReader) IsFile(ctx context.Context) bool {
return true
}
func (r RemoteFileOutputReader) DeckExists(ctx context.Context) (bool, error) {
md, err := r.store.Head(ctx, r.outPath.GetDeckPath())
if err != nil {
return false, err
}
return md.Exists(), nil
}
func NewRemoteFileOutputReader(_ context.Context, store storage.ComposedProtobufStore, outPaths io.OutputFilePaths, maxDatasetSize int64) RemoteFileOutputReader {
// Note: even though the data store retrieval checks against GetLimitMegabytes, there might be external
// storage implementations, so we keep this check here as well.
maxPayloadSize := maxDatasetSize
if maxPayloadSize == 0 {
maxPayloadSize = storage.GetConfig().Limits.GetLimitMegabytes * 1024 * 1024
}
return RemoteFileOutputReader{
outPath: outPaths,
store: store,
maxPayloadSize: maxPayloadSize,
}
}