-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfileManager.go
131 lines (120 loc) · 3.32 KB
/
fileManager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package db
// fileManager handles all write and read operations on files in lsm.
// Centralized file manager is required to prevent 'too many files open' error
type fileManager struct {
fileWriteChan chan *fileWriteReq
fileMmapChan chan *fileMmapReq
fileFindChan chan *fileFindReq
fileRangeChan chan *fileRangeReq
}
type fileWriteReq struct {
filename string
data []byte
errChan chan error
}
type fileMmapReq struct {
filename string
replyChan chan []*Entry
errChan chan error
}
type fileFindReq struct {
filename string
key string
ts uint64
replyChan chan *Entry
errChan chan error
}
type fileRangeReq struct {
filename string
keyRange *keyRange
ts uint64
replyChan chan []*Entry
errChan chan error
}
// newfileManager creates a new file manager that spawns allotted file workers
func newFileManager() *fileManager {
fm := &fileManager{
fileWriteChan: make(chan *fileWriteReq),
fileMmapChan: make(chan *fileMmapReq),
fileFindChan: make(chan *fileFindReq),
fileRangeChan: make(chan *fileRangeReq),
}
for i := 0; i < numWorkers; i++ {
go fm.spawnFileWorker()
}
return fm
}
// spawnFileWorker takes care of all open file operations. This prevents 'too many open files' error by
// restricting amount of workers who can open files
func (fm *fileManager) spawnFileWorker() {
for {
select {
case req := <-fm.fileWriteChan:
err := writeNewFile(req.filename, req.data)
req.errChan <- err
case req := <-fm.fileMmapChan:
entries, err := mmap(req.filename)
req.errChan <- err
req.replyChan <- entries
case req := <-fm.fileFindChan:
entry, err := fileFind(req.filename, req.key, req.ts)
req.errChan <- err
req.replyChan <- entry
case req := <-fm.fileRangeChan:
entries, err := fileRange(req.filename, req.keyRange, req.ts)
req.errChan <- err
req.replyChan <- entries
}
}
}
// Write writes an arbitrary sized byte slice to a file
func (fm *fileManager) Write(filename string, data []byte) error {
errChan := make(chan error, 1)
req := &fileWriteReq{
filename: filename,
data: data,
errChan: errChan,
}
fm.fileWriteChan <- req
return <-errChan
}
// MMap reads a file's data block and converts it to a slice of lsmDataEntry
func (fm *fileManager) MMap(filename string) ([]*Entry, error) {
replyChan := make(chan []*Entry, 1)
errChan := make(chan error, 1)
req := &fileMmapReq{
filename: filename,
replyChan: replyChan,
errChan: errChan,
}
fm.fileMmapChan <- req
return <-replyChan, <-errChan
}
// Find attempts to find a lsmDataEntry that matches the given key within the file
func (fm *fileManager) Find(filename, key string, ts uint64) (*Entry, error) {
replyChan := make(chan *Entry, 1)
errChan := make(chan error, 1)
req := &fileFindReq{
filename: filename,
key: key,
ts: ts,
replyChan: replyChan,
errChan: errChan,
}
fm.fileFindChan <- req
return <-replyChan, <-errChan
}
// Range returns all lsmDataEntry within in the specified key range within the file
func (fm *fileManager) Range(filename string, keyRange *keyRange, ts uint64) ([]*Entry, error) {
replyChan := make(chan []*Entry, 1)
errChan := make(chan error, 1)
req := &fileRangeReq{
filename: filename,
keyRange: keyRange,
ts: ts,
replyChan: replyChan,
errChan: errChan,
}
fm.fileRangeChan <- req
return <-replyChan, <-errChan
}