@@ -16,20 +16,15 @@ import (
16
16
17
17
func TestManager_Append (t * testing.T ) {
18
18
m , err := NewManager (Config {
19
+ MaxAge : 30 * time .Second ,
19
20
MaxSegments : 1 ,
20
21
MaxSegmentSize : 1024 , // 1KB
21
22
}, NewMetrics (nil ))
22
23
require .NoError (t , err )
23
24
24
25
// Append some data.
25
- lbs := labels.Labels {{
26
- Name : "foo" ,
27
- Value : "bar" ,
28
- }}
29
- entries := []* logproto.Entry {{
30
- Timestamp : time .Now (),
31
- Line : strings .Repeat ("a" , 1024 ),
32
- }}
26
+ lbs := labels.Labels {{Name : "a" , Value : "b" }}
27
+ entries := []* logproto.Entry {{Timestamp : time .Now (), Line : strings .Repeat ("c" , 1024 )}}
33
28
res , err := m .Append (AppendRequest {
34
29
TenantID : "1" ,
35
30
Labels : lbs ,
@@ -46,28 +41,30 @@ func TestManager_Append(t *testing.T) {
46
41
default :
47
42
}
48
43
49
- // Flush the data and broadcast that the flush is successful.
50
- it := m .NextPending ()
51
- require .NotNil (t , it )
52
- it .Result .SetDone (nil )
44
+ // Pretend to flush the data.
45
+ res .SetDone (nil )
53
46
54
- // Should be able to read from the Done() as it is closed.
47
+ // Should be able to read from Done() as it is closed.
55
48
select {
56
49
case <- res .Done ():
57
50
default :
58
51
t .Fatal ("expected closed Done()" )
59
52
}
60
53
require .NoError (t , res .Err ())
54
+ }
61
55
62
- // Return the segment to be written to again.
63
- m .Put (it )
56
+ func TestManager_AppendFailed (t * testing.T ) {
57
+ m , err := NewManager (Config {
58
+ MaxAge : 30 * time .Second ,
59
+ MaxSegments : 1 ,
60
+ MaxSegmentSize : 1024 , // 1KB
61
+ }, NewMetrics (nil ))
62
+ require .NoError (t , err )
64
63
65
- // Append some more data.
66
- entries = []* logproto.Entry {{
67
- Timestamp : time .Now (),
68
- Line : strings .Repeat ("b" , 1024 ),
69
- }}
70
- res , err = m .Append (AppendRequest {
64
+ // Append some data.
65
+ lbs := labels.Labels {{Name : "a" , Value : "b" }}
66
+ entries := []* logproto.Entry {{Timestamp : time .Now (), Line : strings .Repeat ("c" , 1024 )}}
67
+ res , err := m .Append (AppendRequest {
71
68
TenantID : "1" ,
72
69
Labels : lbs ,
73
70
LabelsStr : lbs .String (),
@@ -76,12 +73,11 @@ func TestManager_Append(t *testing.T) {
76
73
require .NoError (t , err )
77
74
require .NotNil (t , res )
78
75
79
- // Flush the data, but this time broadcast an error that the flush failed.
80
- it = m .NextPending ()
81
- require .NotNil (t , it )
82
- it .Result .SetDone (errors .New ("failed to flush" ))
76
+ // Pretend that the flush failed.
77
+ res .SetDone (errors .New ("failed to flush" ))
83
78
84
- // Should be able to read from the Done() as it is closed.
79
+ // Should be able to read from the Done() as it is closed and assert
80
+ // that the error is the expected error.
85
81
select {
86
82
case <- res .Done ():
87
83
default :
@@ -90,25 +86,103 @@ func TestManager_Append(t *testing.T) {
90
86
require .EqualError (t , res .Err (), "failed to flush" )
91
87
}
92
88
93
- // This test asserts that Append operations return ErrFull if all segments
94
- // are full and waiting to be flushed.
95
- func TestManager_Append_ErrFull (t * testing.T ) {
89
+ func TestManager_AppendMaxAge (t * testing.T ) {
96
90
m , err := NewManager (Config {
91
+ MaxAge : 100 * time .Millisecond ,
92
+ MaxSegments : 1 ,
93
+ MaxSegmentSize : 8 * 1024 * 1024 , // 8MB
94
+ }, NewMetrics (nil ))
95
+ require .NoError (t , err )
96
+
97
+ // Append 1B of data.
98
+ lbs := labels.Labels {{Name : "a" , Value : "b" }}
99
+ entries := []* logproto.Entry {{Timestamp : time .Now (), Line : "c" }}
100
+ res , err := m .Append (AppendRequest {
101
+ TenantID : "1" ,
102
+ Labels : lbs ,
103
+ LabelsStr : lbs .String (),
104
+ Entries : entries ,
105
+ })
106
+ require .NoError (t , err )
107
+ require .NotNil (t , res )
108
+
109
+ // The segment that was just appended to has neither reached the maximum
110
+ // age nor maximum size to be flushed.
111
+ require .Equal (t , 1 , m .available .Len ())
112
+ require .Equal (t , 0 , m .pending .Len ())
113
+
114
+ // Wait 100ms and append some more data.
115
+ time .Sleep (100 * time .Millisecond )
116
+ entries = []* logproto.Entry {{Timestamp : time .Now (), Line : "c" }}
117
+ res , err = m .Append (AppendRequest {
118
+ TenantID : "1" ,
119
+ Labels : lbs ,
120
+ LabelsStr : lbs .String (),
121
+ Entries : entries ,
122
+ })
123
+ require .NoError (t , err )
124
+ require .NotNil (t , res )
125
+
126
+ // The segment has reached the maximum age and should have been moved to
127
+ // pending list to be flushed.
128
+ require .Equal (t , 0 , m .available .Len ())
129
+ require .Equal (t , 1 , m .pending .Len ())
130
+ }
131
+
132
+ func TestManager_AppendMaxSize (t * testing.T ) {
133
+ m , err := NewManager (Config {
134
+ MaxAge : 30 * time .Second ,
135
+ MaxSegments : 1 ,
136
+ MaxSegmentSize : 1024 , // 1KB
137
+ }, NewMetrics (nil ))
138
+ require .NoError (t , err )
139
+
140
+ // Append 512B of data.
141
+ lbs := labels.Labels {{Name : "a" , Value : "b" }}
142
+ entries := []* logproto.Entry {{Timestamp : time .Now (), Line : strings .Repeat ("c" , 512 )}}
143
+ res , err := m .Append (AppendRequest {
144
+ TenantID : "1" ,
145
+ Labels : lbs ,
146
+ LabelsStr : lbs .String (),
147
+ Entries : entries ,
148
+ })
149
+ require .NoError (t , err )
150
+ require .NotNil (t , res )
151
+
152
+ // The segment that was just appended to has neither reached the maximum
153
+ // age nor maximum size to be flushed.
154
+ require .Equal (t , 1 , m .available .Len ())
155
+ require .Equal (t , 0 , m .pending .Len ())
156
+
157
+ // Append another 512B of data.
158
+ entries = []* logproto.Entry {{Timestamp : time .Now (), Line : strings .Repeat ("c" , 512 )}}
159
+ res , err = m .Append (AppendRequest {
160
+ TenantID : "1" ,
161
+ Labels : lbs ,
162
+ LabelsStr : lbs .String (),
163
+ Entries : entries ,
164
+ })
165
+ require .NoError (t , err )
166
+ require .NotNil (t , res )
167
+
168
+ // The segment has reached the maximum size and should have been moved to
169
+ // pending list to be flushed.
170
+ require .Equal (t , 0 , m .available .Len ())
171
+ require .Equal (t , 1 , m .pending .Len ())
172
+ }
173
+
174
+ func TestManager_AppendWALFull (t * testing.T ) {
175
+ m , err := NewManager (Config {
176
+ MaxAge : 30 * time .Second ,
97
177
MaxSegments : 10 ,
98
178
MaxSegmentSize : 1024 , // 1KB
99
179
}, NewMetrics (nil ))
100
180
require .NoError (t , err )
101
181
102
- // Should be able to write to all 10 segments of 1KB each.
103
- lbs := labels.Labels {{
104
- Name : "foo" ,
105
- Value : "bar" ,
106
- }}
182
+ // Should be able to write 100KB of data, 10KB per segment.
183
+ lbs := labels.Labels {{Name : "a" , Value : "b" }}
107
184
for i := 0 ; i < 10 ; i ++ {
108
- entries := []* logproto.Entry {{
109
- Timestamp : time .Now (),
110
- Line : strings .Repeat ("a" , 1024 ),
111
- }}
185
+ entries := []* logproto.Entry {{Timestamp : time .Now (), Line : strings .Repeat ("c" , 1024 )}}
112
186
res , err := m .Append (AppendRequest {
113
187
TenantID : "1" ,
114
188
Labels : lbs ,
@@ -119,12 +193,9 @@ func TestManager_Append_ErrFull(t *testing.T) {
119
193
require .NotNil (t , res )
120
194
}
121
195
122
- // Append more data should fail as all segments are full and waiting to be
123
- // flushed.
124
- entries := []* logproto.Entry {{
125
- Timestamp : time .Now (),
126
- Line : strings .Repeat ("b" , 1024 ),
127
- }}
196
+ // However, appending more data should fail as all segments are full and
197
+ // waiting to be flushed.
198
+ entries := []* logproto.Entry {{Timestamp : time .Now (), Line : strings .Repeat ("c" , 1024 )}}
128
199
res , err := m .Append (AppendRequest {
129
200
TenantID : "1" ,
130
201
Labels : lbs ,
@@ -137,76 +208,88 @@ func TestManager_Append_ErrFull(t *testing.T) {
137
208
138
209
func TestManager_NextPending (t * testing.T ) {
139
210
m , err := NewManager (Config {
140
- MaxAge : DefaultMaxAge ,
211
+ MaxAge : 30 * time . Second ,
141
212
MaxSegments : 1 ,
142
213
MaxSegmentSize : 1024 , // 1KB
143
214
}, NewMetrics (nil ))
144
215
require .NoError (t , err )
145
216
146
- // There should be no items as no data has been written.
217
+ // There should be no segments waiting to be flushed as no data has been
218
+ // written.
147
219
it := m .NextPending ()
148
220
require .Nil (t , it )
149
221
150
- // Append 512B of data. There should still be no items to as the segment is
151
- // not full (1KB).
152
- lbs := labels.Labels {{
153
- Name : "foo" ,
154
- Value : "bar" ,
155
- }}
156
- entries := []* logproto.Entry {{
157
- Timestamp : time .Now (),
158
- Line : strings .Repeat ("b" , 512 ),
159
- }}
222
+ // Append 1KB of data.
223
+ lbs := labels.Labels {{Name : "a" , Value : "b" }}
224
+ entries := []* logproto.Entry {{Timestamp : time .Now (), Line : strings .Repeat ("c" , 1024 )}}
160
225
_ , err = m .Append (AppendRequest {
161
226
TenantID : "1" ,
162
227
Labels : lbs ,
163
228
LabelsStr : lbs .String (),
164
229
Entries : entries ,
165
230
})
166
231
require .NoError (t , err )
232
+
233
+ // There should be a segment waiting to be flushed.
234
+ it = m .NextPending ()
235
+ require .NotNil (t , it )
236
+
237
+ // There should be no more segments waiting to be flushed.
167
238
it = m .NextPending ()
168
239
require .Nil (t , it )
240
+ }
169
241
170
- // Write another 512B of data. There should be an item waiting to be flushed.
171
- entries = []* logproto.Entry {{
172
- Timestamp : time .Now (),
173
- Line : strings .Repeat ("b" , 512 ),
174
- }}
175
- _ , err = m .Append (AppendRequest {
242
+ func TestManager_NexPendingMaxAge (t * testing.T ) {
243
+ m , err := NewManager (Config {
244
+ MaxAge : 100 * time .Millisecond ,
245
+ MaxSegments : 1 ,
246
+ MaxSegmentSize : 1024 , // 1KB
247
+ }, NewMetrics (nil ))
248
+ require .NoError (t , err )
249
+
250
+ // Append 1B of data.
251
+ lbs := labels.Labels {{Name : "a" , Value : "b" }}
252
+ entries := []* logproto.Entry {{Timestamp : time .Now (), Line : "c" }}
253
+ res , err := m .Append (AppendRequest {
176
254
TenantID : "1" ,
177
255
Labels : lbs ,
178
256
LabelsStr : lbs .String (),
179
257
Entries : entries ,
180
258
})
181
259
require .NoError (t , err )
182
- it = m .NextPending ()
183
- require .NotNil (t , it )
260
+ require .NotNil (t , res )
184
261
185
- // Should not get the same item more than once.
186
- it = m .NextPending ()
262
+ // The segment that was just appended to has neither reached the maximum
263
+ // age nor maximum size to be flushed.
264
+ it := m .NextPending ()
187
265
require .Nil (t , it )
266
+ require .Equal (t , 1 , m .available .Len ())
267
+ require .Equal (t , 0 , m .pending .Len ())
268
+
269
+ // Wait 100ms. The segment that was just appended to should have reached
270
+ // the maximum age.
271
+ time .Sleep (100 * time .Millisecond )
272
+ it = m .NextPending ()
273
+ require .NotNil (t , it )
274
+ require .Equal (t , 0 , m .available .Len ())
275
+ require .Equal (t , 0 , m .pending .Len ())
188
276
}
189
277
190
278
func TestManager_Put (t * testing.T ) {
191
279
m , err := NewManager (Config {
192
- MaxSegments : 10 ,
280
+ MaxAge : 30 * time .Second ,
281
+ MaxSegments : 1 ,
193
282
MaxSegmentSize : 1024 , // 1KB
194
283
}, NewMetrics (nil ))
195
284
require .NoError (t , err )
196
285
197
- // There should be 10 available segments, and 0 pending.
198
- require .Equal (t , 10 , m .available .Len ())
286
+ // There should be 1 available and 0 pending segments .
287
+ require .Equal (t , 1 , m .available .Len ())
199
288
require .Equal (t , 0 , m .pending .Len ())
200
289
201
290
// Append 1KB of data.
202
- lbs := labels.Labels {{
203
- Name : "foo" ,
204
- Value : "bar" ,
205
- }}
206
- entries := []* logproto.Entry {{
207
- Timestamp : time .Now (),
208
- Line : strings .Repeat ("b" , 1024 ),
209
- }}
291
+ lbs := labels.Labels {{Name : "a" , Value : "b" }}
292
+ entries := []* logproto.Entry {{Timestamp : time .Now (), Line : strings .Repeat ("c" , 1024 )}}
210
293
_ , err = m .Append (AppendRequest {
211
294
TenantID : "1" ,
212
295
Labels : lbs ,
@@ -215,23 +298,23 @@ func TestManager_Put(t *testing.T) {
215
298
})
216
299
require .NoError (t , err )
217
300
218
- // 1 segment is full, so there should now be 9 available segments,
219
- // and 1 pending segment.
220
- require .Equal (t , 9 , m .available .Len ())
301
+ // The segment is full, so there should now be 0 available segments and 1
302
+ // pending segment.
303
+ require .Equal (t , 0 , m .available .Len ())
221
304
require .Equal (t , 1 , m .pending .Len ())
222
305
223
306
// Getting the pending segment should remove it from the list.
224
307
it := m .NextPending ()
225
308
require .NotNil (t , it )
226
- require .Equal (t , 9 , m .available .Len ())
309
+ require .Equal (t , 0 , m .available .Len ())
227
310
require .Equal (t , 0 , m .pending .Len ())
228
311
229
312
// The segment should contain 1KB of data.
230
313
require .Equal (t , int64 (1024 ), it .Writer .InputSize ())
231
314
232
315
// Putting it back should add it to the available list.
233
316
m .Put (it )
234
- require .Equal (t , 10 , m .available .Len ())
317
+ require .Equal (t , 1 , m .available .Len ())
235
318
require .Equal (t , 0 , m .pending .Len ())
236
319
237
320
// The segment should be reset.
@@ -265,8 +348,8 @@ wal_segments_pending 0
265
348
require .NoError (t , testutil .CollectAndCompare (r , strings .NewReader (expected ), metricNames ... ))
266
349
267
350
// Appending 1KB of data.
268
- lbs := labels.Labels {{Name : "foo " , Value : "bar " }}
269
- entries := []* logproto.Entry {{Timestamp : time .Now (), Line : strings .Repeat ("b " , 1024 )}}
351
+ lbs := labels.Labels {{Name : "a " , Value : "b " }}
352
+ entries := []* logproto.Entry {{Timestamp : time .Now (), Line : strings .Repeat ("c " , 1024 )}}
270
353
_ , err = m .Append (AppendRequest {
271
354
TenantID : "1" ,
272
355
Labels : lbs ,
0 commit comments