Skip to content

Commit

Permalink
chore: disable transaction cache
Browse files Browse the repository at this point in the history
Signed-off-by: TheOneWithTheBraid <[email protected]>
  • Loading branch information
TheOneWithTheBraid committed Jul 19, 2022
1 parent b1ab7b7 commit 8d972e1
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 109 deletions.
171 changes: 67 additions & 104 deletions hive/lib/src/backend/vm/storage_backend_vm.dart
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ class StorageBackendVm extends StorageBackend {

bool _compactionScheduled = false;

/// a cache for asynchronouse I/O operations
final Set<Future<void>> _ongoingTransactions = {};

/// Not part of public API
StorageBackendVm(
this._file, this._lockFile, this._crashRecovery, this._cipher)
Expand Down Expand Up @@ -105,30 +102,21 @@ class StorageBackendVm extends StorageBackend {

@override
Future<dynamic> readValue(Frame frame) async {
Future<dynamic> operation() async {
await Future.wait(_ongoingTransactions);
await _sync.syncRead(() async {
await readRaf.setPosition(frame.offset);

var bytes = await readRaf.read(frame.length!);
return await _sync.syncRead(() async {
await readRaf.setPosition(frame.offset);

var reader = BinaryReaderImpl(bytes, registry);
var readFrame = await reader.readFrame(cipher: _cipher, lazy: false);
var bytes = await readRaf.read(frame.length!);

if (readFrame == null) {
throw HiveError(
'Could not read value from box. Maybe your box is corrupted.');
}
var reader = BinaryReaderImpl(bytes, registry);
var readFrame = await reader.readFrame(cipher: _cipher, lazy: false);

return readFrame.value;
});
}
if (readFrame == null) {
throw HiveError(
'Could not read value from box. Maybe your box is corrupted.');
}

final operationFuture = operation.call();
_ongoingTransactions.add(operationFuture);
final result = await operationFuture;
_ongoingTransactions.remove(operationFuture);
return result;
return readFrame.value;
});
}

@override
Expand All @@ -138,26 +126,18 @@ class StorageBackendVm extends StorageBackend {
for (var frame in frames) {
frame.length = await writer.writeFrame(frame, cipher: _cipher);
}
Future<void> operation() async {
// adding to the end of the queue
await Future.wait(_ongoingTransactions);
await _sync.syncWrite(() async {
final bytes = writer.toBytes();

final cachedOffset = writeOffset;
try {
/// TODO(TheOneWithTheBraid): implement real transactions with cache
await writeRaf.writeFrom(bytes);
} catch (e) {
await writeRaf.setPosition(cachedOffset);
rethrow;
}
});
}

final future = operation();
_ongoingTransactions.add(future);
future.then((value) => _ongoingTransactions.remove(future));
await _sync.syncWrite(() async {
final bytes = writer.toBytes();

final cachedOffset = writeOffset;
try {
/// TODO(TheOneWithTheBraid): implement real transactions with cache
await writeRaf.writeFrom(bytes);
} catch (e) {
await writeRaf.setPosition(cachedOffset);
rethrow;
}
});

for (var frame in frames) {
frame.offset = writeOffset;
Expand All @@ -170,63 +150,55 @@ class StorageBackendVm extends StorageBackend {
if (_compactionScheduled) return Future.value();
_compactionScheduled = true;

Future<void> operation() async {
await Future.wait(_ongoingTransactions);
await _sync.syncReadWrite(() async {
await readRaf.setPosition(0);
var reader = BufferedFileReader(readRaf);

var fileDirectory = path.substring(0, path.length - 5);
var compactFile = File('$fileDirectory.hivec');
var compactRaf = await compactFile.open(mode: FileMode.write);
var writer = BufferedFileWriter(compactRaf);

var sortedFrames = frames.toList();
sortedFrames.sort((a, b) => a.offset.compareTo(b.offset));
try {
for (var frame in sortedFrames) {
if (frame.offset == -1) continue; // Frame has not been written yet
if (frame.offset != reader.offset) {
var skip = frame.offset - reader.offset;
if (reader.remainingInBuffer < skip) {
if (await reader.loadBytes(skip) < skip) {
throw HiveError('Could not compact box: Unexpected EOF.');
}
}
reader.skip(skip);
}
await _sync.syncReadWrite(() async {
await readRaf.setPosition(0);
var reader = BufferedFileReader(readRaf);

var fileDirectory = path.substring(0, path.length - 5);
var compactFile = File('$fileDirectory.hivec');
var compactRaf = await compactFile.open(mode: FileMode.write);
var writer = BufferedFileWriter(compactRaf);

if (reader.remainingInBuffer < frame.length!) {
if (await reader.loadBytes(frame.length!) < frame.length!) {
var sortedFrames = frames.toList();
sortedFrames.sort((a, b) => a.offset.compareTo(b.offset));
try {
for (var frame in sortedFrames) {
if (frame.offset == -1) continue; // Frame has not been written yet
if (frame.offset != reader.offset) {
var skip = frame.offset - reader.offset;
if (reader.remainingInBuffer < skip) {
if (await reader.loadBytes(skip) < skip) {
throw HiveError('Could not compact box: Unexpected EOF.');
}
}
await writer.write(reader.viewBytes(frame.length!));
reader.skip(skip);
}
await writer.flush();
} finally {
await compactRaf.close();
}

await readRaf.close();
await writeRaf.close();
await compactFile.rename(path);
await open();

var offset = 0;
for (var frame in sortedFrames) {
if (frame.offset == -1) continue;
frame.offset = offset;
offset += frame.length!;
if (reader.remainingInBuffer < frame.length!) {
if (await reader.loadBytes(frame.length!) < frame.length!) {
throw HiveError('Could not compact box: Unexpected EOF.');
}
}
await writer.write(reader.viewBytes(frame.length!));
}
_compactionScheduled = false;
});
}
await writer.flush();
} finally {
await compactRaf.close();
}

await readRaf.close();
await writeRaf.close();
await compactFile.rename(path);
await open();

final future = operation();
_ongoingTransactions.add(future);
await future;
_ongoingTransactions.remove(future);
var offset = 0;
for (var frame in sortedFrames) {
if (frame.offset == -1) continue;
frame.offset = offset;
offset += frame.length!;
}
_compactionScheduled = false;
});
}

@override
Expand All @@ -248,7 +220,6 @@ class StorageBackendVm extends StorageBackend {

@override
Future<void> close() async {
await Future.wait(_ongoingTransactions);
return _sync.syncReadWrite(_closeInternal);
}

Expand All @@ -262,16 +233,8 @@ class StorageBackendVm extends StorageBackend {

@override
Future<void> flush() async {
Future<void> operation() async {
await Future.wait(_ongoingTransactions);
await _sync.syncWrite(() async {
await writeRaf.flush();
});
}

final future = operation();
_ongoingTransactions.add(future);
await future;
_ongoingTransactions.remove(future);
await _sync.syncWrite(() async {
await writeRaf.flush();
});
}
}
3 changes: 1 addition & 2 deletions hive/lib/src/box_collection/box_collection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,11 @@ class CollectionBox<V> implements implementation.CollectionBox<V> {

@override
Future<void> flush() async {
final box = await _getBox();
// we do *not* await the flushing here. That makes it so that we can execute
// other stuff while the flushing is still in progress. Fortunately, hive
// has a proper read / write queue, meaning that if we do actually want to
// write something again, it'll wait until the flush is completed.
await box.flush();
_getBox().then((box) => box.flush());
}

Future<void> _flushOrMark() async {
Expand Down
3 changes: 0 additions & 3 deletions hive/test/integration/put_many_simultaneously_test.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
import 'dart:developer';

import 'package:hive/hive.dart';
import 'package:test/test.dart';

import '../util/is_browser.dart';
Expand Down
2 changes: 2 additions & 0 deletions hive/test/tests/backend/vm/storage_backend_vm_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ void main() {
.thenAnswer((i) => Future.value(writeRaf));
when(() => writeRaf.writeFrom(bytes))
.thenAnswer((i) => Future.value(writeRaf));
when(() => writeRaf.flush())
.thenAnswer((i) => Future.value(writeRaf));

var backend = _getBackend(writeRaf: writeRaf)
// The registry needs to be initialized before writing values, and
Expand Down
7 changes: 7 additions & 0 deletions hive/test/tests/box/box_base_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ void main() {
var keystore = MockKeystore();

returnFutureVoid(when(() => backend.clear()));
returnFutureVoid(when(() => backend.flush()));
when(() => keystore.clear()).thenReturn(2);

var box = _openBoxBaseMock(backend: backend, keystore: keystore);
Expand All @@ -310,6 +311,7 @@ void main() {
test('throws if box is closed', () async {
var backend = MockStorageBackend();
returnFutureVoid(when(() => backend.close()));
returnFutureVoid(when(() => backend.flush()));

var box = _openBoxBaseMock(backend: backend);
await box.close();
Expand All @@ -322,6 +324,7 @@ void main() {
var backend = MockStorageBackend();
when(() => backend.supportsCompaction).thenReturn(false);
var box = _openBoxBaseMock(backend: backend);
returnFutureVoid(when(() => backend.flush()));

await box.compact();
verify(() => backend.supportsCompaction);
Expand Down Expand Up @@ -352,6 +355,7 @@ void main() {
when(
() => backend.compact([Frame('key', 1, length: 22, offset: 33)])),
);
returnFutureVoid(when(() => backend.flush()));

var box = _openBoxBaseMock(backend: backend, keystore: keystore);

Expand All @@ -364,6 +368,7 @@ void main() {
test('throws if box is closed', () async {
var backend = MockStorageBackend();
returnFutureVoid(when(() => backend.close()));
returnFutureVoid(when(() => backend.flush()));

var box = _openBoxBaseMock(backend: backend);
await box.close();
Expand All @@ -383,6 +388,7 @@ void main() {
);
returnFutureVoid(when(() => keystore.close()));
returnFutureVoid(when(() => backend.close()));
returnFutureVoid(when(() => backend.flush()));

await box.close();
verifyInOrder([
Expand All @@ -400,6 +406,7 @@ void main() {
var box = _openBoxBaseMock(backend: backend, keystore: keystore);
returnFutureVoid(when(() => keystore.close()));
returnFutureVoid(when(() => backend.close()));
returnFutureVoid(when(() => backend.flush()));
returnFutureVoid(when(() => backend.deleteFromDisk()));

await box.close();
Expand Down

0 comments on commit 8d972e1

Please sign in to comment.