Skip to content

Commit c7cf551

Browse files
committed
Add basic transaction db
1 parent a2ecab7 commit c7cf551

5 files changed

Lines changed: 458 additions & 0 deletions

File tree

rocksdb/_rocksdb.pyx

Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ from . cimport env
2929
from . cimport table_factory
3030
from . cimport memtablerep
3131
from . cimport universal_compaction
32+
from . cimport transaction_db
3233

3334
# Enums are the only exception for direct imports
3435
# Their name als already unique enough
@@ -1499,6 +1500,87 @@ cdef class Options(ColumnFamilyOptions):
14991500
self.py_row_cache = value
15001501
self.opts.row_cache = self.py_row_cache.get_cache()
15011502

1503+
1504+
cdef class TransactionDBOptions(object):
1505+
cdef transaction_db.TransactionDBOptions* opts
1506+
1507+
def __cinit__(self):
1508+
self.opts = new transaction_db.TransactionDBOptions()
1509+
1510+
def __dealloc__(self):
1511+
if not self.opts == NULL:
1512+
del self.opts
1513+
1514+
def __init__(self, **kwargs):
1515+
for key, value in kwargs.items():
1516+
setattr(self, key, value)
1517+
1518+
property max_num_locks:
1519+
def __get__(self):
1520+
return self.opts.max_num_locks
1521+
def __set__(self, value):
1522+
self.opts.max_num_locks = value
1523+
1524+
property max_num_deadlocks:
1525+
def __get__(self):
1526+
return self.opts.max_num_deadlocks
1527+
def __set__(self, value):
1528+
self.opts.max_num_deadlocks = value
1529+
1530+
property num_stripes:
1531+
def __get__(self):
1532+
return self.opts.num_stripes
1533+
def __set__(self, value):
1534+
self.opts.num_stripes = value
1535+
1536+
property transaction_lock_timeout:
1537+
def __get__(self):
1538+
return self.opts.transaction_lock_timeout
1539+
def __set__(self, value):
1540+
self.opts.transaction_lock_timeout = value
1541+
1542+
property default_lock_timeout:
1543+
def __get__(self):
1544+
return self.opts.default_lock_timeout
1545+
def __set__(self, value):
1546+
self.opts.default_lock_timeout = value
1547+
1548+
# TODO property custom_mutex_factory
1549+
property write_policy:
1550+
def __get__(self):
1551+
if self.opts.write_policy == transaction_db.WRITE_COMMITTED:
1552+
return 'write_committed'
1553+
if self.opts.write_policy == transaction_db.WRITE_PREPARED:
1554+
return 'write_prepared'
1555+
if self.opts.write_policy == transaction_db.WRITE_UNPREPARED:
1556+
return 'write_unprepared'
1557+
raise Exception("Unknown write policy")
1558+
1559+
def __set__(self, str value):
1560+
if value == 'write_committed':
1561+
self.opts.write_policy = transaction_db.WRITE_COMMITTED
1562+
elif value == 'write_prepared':
1563+
self.opts.write_policy = transaction_db.WRITE_PREPARED
1564+
elif value == 'write_unprepared':
1565+
self.opts.write_policy = transaction_db.WRITE_UNPREPARED
1566+
else:
1567+
raise Exception("Unknown write policy")
1568+
1569+
property rollback_merge_operands:
1570+
def __get__(self):
1571+
return self.opts.rollback_merge_operands
1572+
def __set__(self, value):
1573+
self.opts.rollback_merge_operands = value
1574+
property skip_concurrency_control:
1575+
def __get__(self):
1576+
return self.opts.skip_concurrency_control
1577+
def __set__(self, value):
1578+
self.opts.skip_concurrency_control = value
1579+
property default_write_batch_flush_threshold:
1580+
def __get__(self):
1581+
return self.opts.default_write_batch_flush_threshold
1582+
def __set__(self, value):
1583+
self.opts.default_write_batch_flush_threshold = value
15021584

15031585
# Forward declaration
15041586
cdef class Snapshot
@@ -2271,6 +2353,159 @@ def list_column_families(db_name, Options opts):
22712353

22722354
return column_families
22732355

2356+
@cython.no_gc_clear
2357+
cdef class TransactionDB(object):
2358+
cdef Options opts
2359+
cdef transaction_db.TransactionDB* db
2360+
cdef list cf_handles
2361+
cdef list cf_options
2362+
2363+
def __cinit__(self, db_name, Options opts, TransactionDBOptions tdb_opts, dict column_families=None):
2364+
cdef Status st
2365+
cdef string db_path
2366+
cdef vector[db.ColumnFamilyDescriptor] column_family_descriptors
2367+
cdef vector[db.ColumnFamilyHandle*] column_family_handles
2368+
cdef bytes default_cf_name = db.kDefaultColumnFamilyName
2369+
self.db = NULL
2370+
self.opts = None
2371+
self.cf_handles = []
2372+
self.cf_options = []
2373+
2374+
if opts.in_use:
2375+
raise Exception("Options object is already used by another DB")
2376+
2377+
db_path = path_to_string(db_name)
2378+
if not column_families or default_cf_name not in column_families:
2379+
# Always add the default column family
2380+
column_family_descriptors.push_back(
2381+
db.ColumnFamilyDescriptor(
2382+
db.kDefaultColumnFamilyName,
2383+
options.ColumnFamilyOptions(deref(opts.opts))
2384+
)
2385+
)
2386+
self.cf_options.append(None) # Since they are the same as db
2387+
if column_families:
2388+
for cf_name, cf_options in column_families.items():
2389+
if not isinstance(cf_name, bytes):
2390+
raise TypeError(
2391+
f"column family name {cf_name!r} is not of type {bytes}!"
2392+
)
2393+
if not isinstance(cf_options, ColumnFamilyOptions):
2394+
raise TypeError(
2395+
f"column family options {cf_options!r} is not of type "
2396+
f"{ColumnFamilyOptions}!"
2397+
)
2398+
if (<ColumnFamilyOptions>cf_options).in_use:
2399+
raise Exception(
2400+
f"ColumnFamilyOptions object for {cf_name} is already "
2401+
"used by another Column Family"
2402+
)
2403+
(<ColumnFamilyOptions>cf_options).in_use = True
2404+
column_family_descriptors.push_back(
2405+
db.ColumnFamilyDescriptor(
2406+
cf_name,
2407+
deref((<ColumnFamilyOptions>cf_options).copts)
2408+
)
2409+
)
2410+
self.cf_options.append(cf_options)
2411+
if column_families:
2412+
for cf_name, cf_options in column_families.items():
2413+
if not isinstance(cf_name, bytes):
2414+
raise TypeError(
2415+
f"column family name {cf_name!r} is not of type {bytes}!"
2416+
)
2417+
if not isinstance(cf_options, ColumnFamilyOptions):
2418+
raise TypeError(
2419+
f"column family options {cf_options!r} is not of type "
2420+
f"{ColumnFamilyOptions}!"
2421+
)
2422+
if (<ColumnFamilyOptions>cf_options).in_use:
2423+
raise Exception(
2424+
f"ColumnFamilyOptions object for {cf_name} is already "
2425+
"used by another Column Family"
2426+
)
2427+
(<ColumnFamilyOptions>cf_options).in_use = True
2428+
column_family_descriptors.push_back(
2429+
db.ColumnFamilyDescriptor(
2430+
cf_name,
2431+
deref((<ColumnFamilyOptions>cf_options).copts)
2432+
)
2433+
)
2434+
self.cf_options.append(cf_options)
2435+
2436+
with nogil:
2437+
st = transaction_db.TransactionDB_Open_ColumnFamilies(
2438+
deref(opts.opts),
2439+
deref(tdb_opts.opts),
2440+
db_path,
2441+
column_family_descriptors,
2442+
&column_family_handles,
2443+
&self.db)
2444+
check_status(st)
2445+
2446+
for handle in column_family_handles:
2447+
wrapper = _ColumnFamilyHandle.from_handle_ptr(handle)
2448+
self.cf_handles.append(wrapper)
2449+
2450+
# Inject the loggers into the python callbacks
2451+
cdef shared_ptr[logger.Logger] info_log = self.db.GetOptions(
2452+
self.db.DefaultColumnFamily()).info_log
2453+
if opts.py_comparator is not None:
2454+
opts.py_comparator.set_info_log(info_log)
2455+
2456+
if opts.py_table_factory is not None:
2457+
opts.py_table_factory.set_info_log(info_log)
2458+
2459+
if opts.prefix_extractor is not None:
2460+
opts.py_prefix_extractor.set_info_log(info_log)
2461+
2462+
cdef ColumnFamilyOptions copts
2463+
for idx, copts in enumerate(self.cf_options):
2464+
if not copts:
2465+
continue
2466+
2467+
info_log = self.db.GetOptions(column_family_handles[idx]).info_log
2468+
2469+
if copts.py_comparator is not None:
2470+
copts.py_comparator.set_info_log(info_log)
2471+
2472+
if copts.py_table_factory is not None:
2473+
copts.py_table_factory.set_info_log(info_log)
2474+
2475+
if copts.prefix_extractor is not None:
2476+
copts.py_prefix_extractor.set_info_log(info_log)
2477+
2478+
self.opts = opts
2479+
self.opts.in_use = True
2480+
2481+
def close(self, safe=True):
2482+
cdef ColumnFamilyOptions copts
2483+
cdef cpp_bool c_safe = safe
2484+
cdef Status st
2485+
if self.db != NULL:
2486+
# We need stop backround compactions
2487+
with nogil:
2488+
db.CancelAllBackgroundWork(self.db, c_safe)
2489+
# We have to make sure we delete the handles so rocksdb doesn't
2490+
# assert when we delete the db
2491+
del self.cf_handles[:]
2492+
for copts in self.cf_options:
2493+
if copts:
2494+
copts.in_use = False
2495+
del self.cf_options[:]
2496+
with nogil:
2497+
st = self.db.Close()
2498+
self.db = NULL
2499+
if self.opts is not None:
2500+
self.opts.in_use = False
2501+
2502+
def __dealloc__(self):
2503+
self.close()
2504+
2505+
property options:
2506+
def __get__(self):
2507+
return self.opts
2508+
22742509

22752510
@cython.no_gc_clear
22762511
@cython.internal

rocksdb/stackable_db.pxd

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from . cimport options
2+
from libc.stdint cimport uint64_t, uint32_t
3+
from .status cimport Status
4+
from libcpp cimport bool as cpp_bool
5+
from libcpp.string cimport string
6+
from libcpp.vector cimport vector
7+
from libcpp.map cimport map
8+
from libcpp.unordered_map cimport unordered_map
9+
from libcpp.memory cimport shared_ptr
10+
from .types cimport SequenceNumber
11+
from .slice_ cimport Slice
12+
from .snapshot cimport Snapshot
13+
from .iterator cimport Iterator
14+
from .env cimport Env
15+
from .metadata cimport ColumnFamilyMetaData
16+
from .metadata cimport LiveFileMetaData
17+
from .metadata cimport ExportImportFilesMetaData
18+
from .table_properties cimport TableProperties
19+
from .db cimport DB
20+
21+
cdef extern from "rocksdb/utilities/stackable_db.h" namespace "rocksdb":
22+
cdef cppclass StackableDB(DB):
23+
StackableDB(DB*) nogil except+
24+
StackableDB(shared_ptr[DB] db) nogil except+
25+
DB* GetBaseDB() nogil except+
26+

rocksdb/tests/test_stackable_db.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
import os
2+
import sys
3+
import shutil
4+
import gc
5+
import unittest
6+
import rocksdb
7+
from itertools import takewhile
8+
import struct
9+
import tempfile
10+
from rocksdb.merge_operators import UintAddOperator, StringAppendOperator
11+
12+
from .test_db import TestHelper
13+
14+
class TestStackableDB(TestHelper):
15+
def setUp(self):
16+
TestHelper.setUp(self)
17+
opts = rocksdb.Options(create_if_missing=True)
18+
self.db = rocksdb.StackableDB(os.path.join(self.db_loc, "test"), opts)
19+
20+
def test_options_used_twice(self):
21+
if sys.version_info[0] == 3:
22+
assertRaisesRegex = self.assertRaisesRegex
23+
else:
24+
assertRaisesRegex = self.assertRaisesRegexp
25+
expected = "Options object is already used by another DB"
26+
with assertRaisesRegex(Exception, expected):
27+
rocksdb.DB(os.path.join(self.db_loc, "test2"), self.db.options)
28+
29+
def test_unicode_path(self):
30+
name = os.path.join(self.db_loc, b'M\xc3\xbcnchen'.decode('utf8'))
31+
rocksdb.DB(name, rocksdb.Options(create_if_missing=True))
32+
self.addCleanup(shutil.rmtree, name)
33+
self.assertTrue(os.path.isdir(name))
34+
35+
def test_get_none(self):
36+
self.assertIsNone(self.db.get(b'xxx'))
37+
38+
def test_put_get(self):
39+
self.db.put(b"a", b"b")
40+
self.assertEqual(b"b", self.db.get(b"a"))
41+
42+
def test_multi_get(self):
43+
self.db.put(b"a", b"1")
44+
self.db.put(b"b", b"2")
45+
self.db.put(b"c", b"3")
46+
47+
ret = self.db.multi_get([b'a', b'b', b'c'])
48+
ref = {b'a': b'1', b'c': b'3', b'b': b'2'}
49+
self.assertEqual(ref, ret)
50+
51+
def test_delete(self):
52+
self.db.put(b"a", b"b")
53+
self.assertEqual(b"b", self.db.get(b"a"))
54+
self.db.delete(b"a")
55+
self.assertIsNone(self.db.get(b"a"))
56+
57+
def test_write_batch(self):
58+
batch = rocksdb.WriteBatch()
59+
batch.put(b"key", b"v1")
60+
batch.delete(b"key")
61+
batch.put(b"key", b"v2")
62+
batch.put(b"key", b"v3")
63+
batch.put(b"a", b"b")
64+
65+
self.db.write(batch)
66+
ref = {b'a': b'b', b'key': b'v3'}
67+
ret = self.db.multi_get([b'key', b'a'])
68+
self.assertEqual(ref, ret)
69+
70+
def test_write_batch_iter(self):
71+
batch = rocksdb.WriteBatch()
72+
self.assertEqual([], list(batch))
73+
74+
batch.put(b"key1", b"v1")
75+
batch.put(b"key2", b"v2")
76+
batch.put(b"key3", b"v3")
77+
batch.delete(b'a')
78+
batch.delete(b'key1')
79+
batch.merge(b'xxx', b'value')
80+
81+
it = iter(batch)
82+
del batch
83+
ref = [
84+
('Put', b'key1', b'v1'),
85+
('Put', b'key2', b'v2'),
86+
('Put', b'key3', b'v3'),
87+
('Delete', b'a', b''),
88+
('Delete', b'key1', b''),
89+
('Merge', b'xxx', b'value')
90+
]
91+
self.assertEqual(ref, list(it))
92+
93+
94+

0 commit comments

Comments
 (0)