Skip to content

Commit 00fe94e

Browse files
isapegoashishm07
authored andcommitted
IGNITE-27373 DB API Driver 3: Add tests with GIL disabled (apache#7683)
1 parent 108175c commit 00fe94e

11 files changed

Lines changed: 284 additions & 29 deletions

File tree

.teamcity/test/build_types/RunPlatformTests.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,6 @@ object RunPlatformTests : BuildType({
1818
// snapshot(PlatformCppTestsWindows) {} // Always falling, under investigation
1919
snapshot(PlatformDotnetTestsWindows) {}
2020
snapshot(PlatformDotnetTestsLinux) {}
21-
snapshot(PlatformPythonTestsLinux) {}
21+
snapshot(RunPythonTests) {}
2222
}
2323
})

.teamcity/test/platform_tests/Project.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ object Project : Project({
99
id(getId(this::class))
1010
name = "[Platform Tests]"
1111

12+
subProject(test.platform_tests.python_tests.Project)
13+
1214
/**
1315
* List of platform linux tests
1416
*/
@@ -19,7 +21,7 @@ object Project : Project({
1921
PlatformCppOdbcTestsRpmLinux,
2022
PlatformCppOdbcTestsTgzLinux,
2123
PlatformDotnetTestsLinux,
22-
PlatformPythonTestsLinux
24+
RunPythonTests
2325
).forEach {
2426
buildType(
2527
ApacheIgnite3CustomBuildType.Builder(it)
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package test.platform_tests
2+
3+
import jetbrains.buildServer.configs.kotlin.BuildType
4+
import org.apache.ignite.teamcity.Teamcity.Companion.getId
5+
6+
object RunPythonTests : BuildType({
7+
id(getId(this::class))
8+
name = "> Run :: Python Tests"
9+
description = "Run all Python Tests"
10+
type = Type.COMPOSITE
11+
12+
dependencies {
13+
test.platform_tests.python_tests.Project.buildTypes.forEach{
14+
snapshot(it) {}
15+
}
16+
}
17+
})
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package test.platform_tests.python_tests
2+
3+
import jetbrains.buildServer.configs.kotlin.Project
4+
import org.apache.ignite.teamcity.ApacheIgnite3CustomBuildType
5+
import org.apache.ignite.teamcity.Teamcity.Companion.getId
6+
7+
object Project : Project({
8+
id(getId(this::class))
9+
name = "[Python Tests]"
10+
11+
listOf(
12+
Triple("3.10", "py310", "Python DB API Tests - Python 3.10"),
13+
Triple("3.11", "py311", "Python DB API Tests - Python 3.11"),
14+
Triple("3.12", "py312", "Python DB API Tests - Python 3.12"),
15+
Triple("3.13", "py313", "Python DB API Tests - Python 3.13"),
16+
Triple("3.14", "py314", "Python DB API Tests - Python 3.14"),
17+
Triple("3.14t", "py314t", "Python DB API Tests (No GIL) - Python 3.14"),
18+
).forEach { (ver, toxEnv, name) ->
19+
buildType(
20+
ApacheIgnite3CustomBuildType.Builder(PythonDbApiToxTest(ver, toxEnv, name))
21+
.ignite3VCS().ignite3BuildDependency().setupMavenProxy()
22+
.defaultBuildTypeSettings().requireLinux()
23+
.build().buildType
24+
)
25+
}
26+
})
27+

.teamcity/test/platform_tests/PlatformPythonTestsLinux.kt renamed to .teamcity/test/platform_tests/python_tests/PythonDbApiToxTest.kt

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package test.platform_tests
1+
package test.platform_tests.python_tests
22

33
import jetbrains.buildServer.configs.kotlin.BuildType
44
import jetbrains.buildServer.configs.kotlin.ParameterDisplay
@@ -10,17 +10,22 @@ import jetbrains.buildServer.configs.kotlin.failureConditions.failOnText
1010
import org.apache.ignite.teamcity.CustomBuildSteps.Companion.customGradle
1111
import org.apache.ignite.teamcity.Teamcity
1212

13-
14-
object PlatformPythonTestsLinux : BuildType({
15-
id(Teamcity.getId(this::class))
16-
name = "Platform Python Tests (Linux)"
13+
class PythonDbApiToxTest(
14+
private val pythonVersion: String,
15+
private val toxEnv: String,
16+
private val suiteName: String
17+
) : BuildType({
18+
id(Teamcity.getId(this::class, pythonVersion, true))
19+
name = suiteName
1720

1821
params {
1922
text("PATH__WORKING_DIR", """%VCSROOT__IGNITE3%\modules\platforms\python\dbapi""", display = ParameterDisplay.HIDDEN, allowEmpty = true)
20-
param("env.IGNITE_CPP_TESTS_USE_SINGLE_NODE", "")
21-
param("env.CPP_STAGING", """%PATH__WORKING_DIR%\cpp_staging""")
22-
param("TOX_ENV", "py310")
23-
param("PYTHON_VERSION", "3.10")
23+
param("TOX_ENV", toxEnv)
24+
param("PYTHON_VERSION", pythonVersion)
25+
}
26+
27+
requirements {
28+
equals("env.DIND_ENABLED", "true")
2429
}
2530

2631
steps {
@@ -29,7 +34,18 @@ object PlatformPythonTestsLinux : BuildType({
2934
tasks = ":ignite-runner:integrationTestClasses"
3035
}
3136
script {
32-
name = "Python Client tests"
37+
name = "Update pyenv"
38+
workingDir = "%PATH__WORKING_DIR%"
39+
scriptContent = """
40+
set -x
41+
eval "${'$'}(pyenv init - dash)"
42+
43+
cd "${'$'}(pyenv root)"
44+
git pull
45+
""".trimIndent()
46+
}
47+
script {
48+
name = "Run tox"
3349
workingDir = "%PATH__WORKING_DIR%"
3450
scriptContent = """
3551
#!/usr/bin/env bash
@@ -84,8 +100,4 @@ object PlatformPythonTestsLinux : BuildType({
84100
reverse = false
85101
}
86102
}
87-
88-
requirements {
89-
equals("env.DIND_ENABLED", "true")
90-
}
91103
})

modules/platforms/python/dbapi/pyignite_dbapi/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@
2727
apilevel = '2.0'
2828
"""PEP 249 is supported."""
2929

30-
threadsafety = 1
31-
"""Threads may share the module, but not connections."""
30+
threadsafety = 2
31+
"""Threads may share the module and connections, but not cursors."""
3232

3333
paramstyle = 'qmark'
3434
"""Parameter style is a question mark, e.g. '...WHERE name=?'."""
Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1 @@
11
# these pip packages are necessary for the pyignite_dbapi to run
2-
3-
attrs==23.1.0
Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
# these packages are used for testing
22

3-
pytest==6.2.5
4-
pytest-cov==2.11.1
5-
teamcity-messages==1.28
3+
pytest==8.2.2
4+
teamcity-messages==1.33
65
psutil==5.8.0
7-
flake8==3.8.4
86
dbapi-compliance==1.15.0

modules/platforms/python/dbapi/tests/conftest.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515
import logging
16+
import time
1617

1718
import pyignite_dbapi
1819
import pytest
@@ -24,20 +25,26 @@
2425

2526
TEST_PAGE_SIZE = 32
2627

28+
TEST_CONNECT_KWARGS = {
29+
"address": server_addresses_basic,
30+
"page_size": TEST_PAGE_SIZE,
31+
"heartbeat_interval": 2,
32+
}
33+
2734
@pytest.fixture()
2835
def table_name(request):
29-
return request.node.originalname
36+
return f"{request.node.originalname}_{int(time.monotonic_ns())}"
3037

3138

3239
@pytest.fixture()
3340
def connection():
34-
conn = pyignite_dbapi.connect(address=server_addresses_basic, page_size=TEST_PAGE_SIZE, heartbeat_interval=2)
41+
conn = pyignite_dbapi.connect(**TEST_CONNECT_KWARGS)
3542
yield conn
3643
conn.close()
3744

3845
@pytest.fixture()
3946
def service_connection():
40-
conn = pyignite_dbapi.connect(address=server_addresses_basic, page_size=TEST_PAGE_SIZE, heartbeat_interval=2)
47+
conn = pyignite_dbapi.connect(**TEST_CONNECT_KWARGS)
4148
yield conn
4249
conn.close()
4350

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
import threading
17+
import time
18+
19+
import pytest
20+
21+
import pyignite_dbapi
22+
from tests.conftest import TEST_CONNECT_KWARGS
23+
from tests.util import wait_for_condition
24+
25+
NUM_THREADS = 50
26+
27+
28+
@pytest.fixture()
29+
def module_level_threadsafety():
30+
assert pyignite_dbapi.threadsafety >= 1, "Module can not be used concurrently"
31+
32+
33+
@pytest.fixture()
34+
def connection_level_threadsafety(module_level_threadsafety):
35+
assert pyignite_dbapi.threadsafety >= 2, "Connections can not be used concurrently"
36+
37+
38+
@pytest.fixture()
39+
def table(table_name, service_cursor, drop_table_cleanup):
40+
service_cursor.execute(f"CREATE TABLE {table_name} (id int primary key, data varchar)")
41+
yield table_name
42+
43+
44+
def run_threads(fn, n=NUM_THREADS, *args):
45+
barrier = threading.Barrier(n)
46+
errors = []
47+
errors_lock = threading.Lock()
48+
49+
def wrapper(tid):
50+
try:
51+
barrier.wait()
52+
fn(tid, *args)
53+
except Exception as e:
54+
with errors_lock:
55+
errors.append(e)
56+
57+
threads = [threading.Thread(target=wrapper, args=(i,)) for i in range(n)]
58+
for t in threads:
59+
t.start()
60+
for t in threads:
61+
t.join()
62+
63+
if errors:
64+
raise errors[0]
65+
66+
67+
def test_concurrent_module_import(module_level_threadsafety):
68+
import importlib
69+
70+
def task(_):
71+
m = importlib.import_module(pyignite_dbapi.__name__)
72+
assert m.threadsafety > 0, "Module can not be used concurrently"
73+
74+
run_threads(task)
75+
76+
77+
def test_concurrent_connect_use_close(module_level_threadsafety):
78+
def task(_):
79+
c = pyignite_dbapi.connect(**TEST_CONNECT_KWARGS)
80+
with c.cursor() as cur:
81+
cur.execute("SELECT 1")
82+
assert cur.fetchone() is not None
83+
c.close()
84+
85+
run_threads(task)
86+
87+
88+
def test_shared_connection_per_thread_cursors(connection, connection_level_threadsafety):
89+
def task(_):
90+
with connection.cursor() as cur:
91+
cur.execute("SELECT 1")
92+
row = cur.fetchone()
93+
assert row is not None
94+
95+
run_threads(task)
96+
97+
98+
def test_concurrent_inserts_no_lost_writes(table, connection, connection_level_threadsafety):
99+
rows_per_thread = 50
100+
101+
def task(thread_id):
102+
with connection.cursor() as cur:
103+
for i in range(rows_per_thread):
104+
cur.execute(f"INSERT INTO {table} (id, data) VALUES (?, ?)", (thread_id * rows_per_thread + i, f"v{thread_id}-{i}"))
105+
106+
run_threads(task)
107+
108+
with connection.cursor() as cur:
109+
cur.execute(f"SELECT COUNT(*) FROM {table}")
110+
count = cur.fetchone()[0]
111+
assert count == NUM_THREADS * rows_per_thread
112+
113+
114+
def test_concurrent_commit_and_rollback(table, module_level_threadsafety):
115+
"""Half the threads commit, half rollback. Only committed rows appear."""
116+
committed_ids = []
117+
lock = threading.Lock()
118+
119+
def task(thread_id):
120+
with pyignite_dbapi.connect(**TEST_CONNECT_KWARGS) as conn:
121+
conn.autocommit = False
122+
with conn.cursor() as cur:
123+
cur.execute(f"INSERT INTO {table} (id, data) VALUES (?, ?)", (thread_id, "x"))
124+
if thread_id % 2 == 0:
125+
conn.commit()
126+
with lock:
127+
committed_ids.append(thread_id)
128+
else:
129+
conn.rollback()
130+
131+
run_threads(task)
132+
133+
def get_ids():
134+
with pyignite_dbapi.connect(**TEST_CONNECT_KWARGS) as conn:
135+
with conn.cursor() as cur:
136+
cur.execute(f"SELECT id FROM {table} ORDER BY id")
137+
return {row[0] for row in cur.fetchall()}
138+
139+
# There is currently no mechanism to synchronize the observable timestamp across
140+
# multiple connections, so changes will eventually become visible, but not necessarily immediately.
141+
wait_for_condition(lambda: get_ids() == set(committed_ids), interval=0.5)
142+
143+
144+
def test_concurrent_fetchall_result_integrity(table, connection, connection_level_threadsafety):
145+
rows_num = 200
146+
with connection.cursor() as cur:
147+
cur.executemany(f"INSERT INTO {table} (id, data) VALUES (?, ?)", [(i, f"val-{i}") for i in range(rows_num)])
148+
149+
def task(_):
150+
with connection.cursor() as cur:
151+
cur.execute(f"SELECT id, data FROM {table} ORDER BY id")
152+
rows = cur.fetchall()
153+
assert len(rows) == rows_num, f"Expected {rows_num} rows, got {len(rows)}"
154+
155+
for idx, (rid, val) in enumerate(rows):
156+
assert val == f"val-{rid}", f"Corrupted row: id={rid}, val={val!r}"
157+
158+
run_threads(task)
159+
160+
161+
def test_cursor_description_thread_safety(table, connection, connection_level_threadsafety):
162+
expected_names = {"ID", "DATA"}
163+
164+
def task(_):
165+
with connection.cursor() as cur:
166+
cur.execute(f"SELECT id, data FROM {table} LIMIT 1")
167+
desc = cur.description
168+
assert desc is not None
169+
col_names = {col[0] for col in desc}
170+
assert col_names == expected_names, f"Unexpected columns: {col_names}"
171+
172+
run_threads(task)
173+
174+
175+
def test_concurrent_executemany(table, connection, connection_level_threadsafety):
176+
rows_per_thread = 20
177+
178+
def task(thread_id):
179+
rows = [(thread_id * 1000 + i, f"{thread_id}-{i}") for i in range(rows_per_thread)]
180+
with connection.cursor() as cur:
181+
cur.executemany(f"INSERT INTO {table} (id, data) VALUES (?, ?)", rows)
182+
183+
run_threads(task)
184+
185+
with connection.cursor() as cur:
186+
cur.execute(f"SELECT COUNT(*) FROM {table}")
187+
count = cur.fetchone()[0]
188+
189+
assert count == NUM_THREADS * rows_per_thread

0 commit comments

Comments
 (0)