-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy pathtest_transactions.py
41 lines (31 loc) · 1.08 KB
/
test_transactions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import objectbox
from tests.model import TestEntity
from tests.common import *
def test_transactions(test_store):
box = test_store.box(TestEntity)
assert box.is_empty()
with test_store.write_tx():
box.put(TestEntity(str="first"))
box.put(TestEntity(str="second"))
assert box.count() == 2
try:
with test_store.write_tx():
box.put(TestEntity(str="third"))
box.put(TestEntity(str="fourth"))
raise Exception("mission abort!")
# exception must be propagated so this line must not execute
assert 0
except Exception as err:
assert str(err) == "mission abort!"
# changes have been rolled back
assert box.count() == 2
# can't write in a read TX
try:
with test_store.read_tx():
box.put(TestEntity(str="third"))
# exception must be propagated so this line must not execute
assert 0
except Exception as err:
assert "Cannot start a write transaction inside a read only transaction" in str(err)
finally:
test_store.close()