Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .teamcity/test/build_types/RunPlatformTests.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ object RunPlatformTests : BuildType({
// snapshot(PlatformCppTestsWindows) {} // Always falling, under investigation
snapshot(PlatformDotnetTestsWindows) {}
snapshot(PlatformDotnetTestsLinux) {}
snapshot(PlatformPythonTestsLinux) {}
snapshot(RunPythonTests) {}
}
})
4 changes: 3 additions & 1 deletion .teamcity/test/platform_tests/Project.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ object Project : Project({
id(getId(this::class))
name = "[Platform Tests]"

subProject(test.platform_tests.python_tests.Project)

/**
* List of platform linux tests
*/
Expand All @@ -19,7 +21,7 @@ object Project : Project({
PlatformCppOdbcTestsRpmLinux,
PlatformCppOdbcTestsTgzLinux,
PlatformDotnetTestsLinux,
PlatformPythonTestsLinux
RunPythonTests
).forEach {
buildType(
ApacheIgnite3CustomBuildType.Builder(it)
Expand Down
17 changes: 17 additions & 0 deletions .teamcity/test/platform_tests/RunPythonTests.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package test.platform_tests

import jetbrains.buildServer.configs.kotlin.BuildType
import org.apache.ignite.teamcity.Teamcity.Companion.getId

object RunPythonTests : BuildType({
id(getId(this::class))
name = "> Run :: Python Tests"
description = "Run all Python Tests"
type = Type.COMPOSITE

dependencies {
test.platform_tests.python_tests.Project.buildTypes.forEach{
snapshot(it) {}
}
}
})
27 changes: 27 additions & 0 deletions .teamcity/test/platform_tests/python_tests/Project.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package test.platform_tests.python_tests

import jetbrains.buildServer.configs.kotlin.Project
import org.apache.ignite.teamcity.ApacheIgnite3CustomBuildType
import org.apache.ignite.teamcity.Teamcity.Companion.getId

object Project : Project({
id(getId(this::class))
name = "[Python Tests]"

listOf(
Triple("3.10", "py310", "Python DB API Tests - Python 3.10"),
Triple("3.11", "py311", "Python DB API Tests - Python 3.11"),
Triple("3.12", "py312", "Python DB API Tests - Python 3.12"),
Triple("3.13", "py313", "Python DB API Tests - Python 3.13"),
Triple("3.14", "py314", "Python DB API Tests - Python 3.14"),
Triple("3.14t", "py314t", "Python DB API Tests (No GIL) - Python 3.14"),
).forEach { (ver, toxEnv, name) ->
buildType(
ApacheIgnite3CustomBuildType.Builder(PythonDbApiToxTest(ver, toxEnv, name))
.ignite3VCS().ignite3BuildDependency().setupMavenProxy()
.defaultBuildTypeSettings().requireLinux()
.build().buildType
)
}
})

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package test.platform_tests
package test.platform_tests.python_tests

import jetbrains.buildServer.configs.kotlin.BuildType
import jetbrains.buildServer.configs.kotlin.ParameterDisplay
Expand All @@ -10,17 +10,22 @@ import jetbrains.buildServer.configs.kotlin.failureConditions.failOnText
import org.apache.ignite.teamcity.CustomBuildSteps.Companion.customGradle
import org.apache.ignite.teamcity.Teamcity


object PlatformPythonTestsLinux : BuildType({
id(Teamcity.getId(this::class))
name = "Platform Python Tests (Linux)"
class PythonDbApiToxTest(
private val pythonVersion: String,
private val toxEnv: String,
private val suiteName: String
) : BuildType({
id(Teamcity.getId(this::class, pythonVersion, true))
name = suiteName

params {
text("PATH__WORKING_DIR", """%VCSROOT__IGNITE3%\modules\platforms\python\dbapi""", display = ParameterDisplay.HIDDEN, allowEmpty = true)
param("env.IGNITE_CPP_TESTS_USE_SINGLE_NODE", "")
param("env.CPP_STAGING", """%PATH__WORKING_DIR%\cpp_staging""")
param("TOX_ENV", "py310")
param("PYTHON_VERSION", "3.10")
param("TOX_ENV", toxEnv)
param("PYTHON_VERSION", pythonVersion)
}

requirements {
equals("env.DIND_ENABLED", "true")
}

steps {
Expand All @@ -29,7 +34,18 @@ object PlatformPythonTestsLinux : BuildType({
tasks = ":ignite-runner:integrationTestClasses"
}
script {
name = "Python Client tests"
name = "Update pyenv"
workingDir = "%PATH__WORKING_DIR%"
scriptContent = """
set -x
eval "${'$'}(pyenv init - dash)"

cd "${'$'}(pyenv root)"
git pull
""".trimIndent()
}
script {
name = "Run tox"
workingDir = "%PATH__WORKING_DIR%"
scriptContent = """
#!/usr/bin/env bash
Expand Down Expand Up @@ -84,8 +100,4 @@ object PlatformPythonTestsLinux : BuildType({
reverse = false
}
}

requirements {
equals("env.DIND_ENABLED", "true")
}
})
4 changes: 2 additions & 2 deletions modules/platforms/python/dbapi/pyignite_dbapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
apilevel = '2.0'
"""PEP 249 is supported."""

threadsafety = 1
"""Threads may share the module, but not connections."""
threadsafety = 2
"""Threads may share the module and connections, but not cursors."""

paramstyle = 'qmark'
"""Parameter style is a question mark, e.g. '...WHERE name=?'."""
Expand Down
2 changes: 0 additions & 2 deletions modules/platforms/python/dbapi/requirements/install.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
# these pip packages are necessary for the pyignite_dbapi to run

attrs==23.1.0
6 changes: 2 additions & 4 deletions modules/platforms/python/dbapi/requirements/tests.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
# these packages are used for testing

pytest==6.2.5
pytest-cov==2.11.1
teamcity-messages==1.28
pytest==8.2.2
teamcity-messages==1.33
psutil==5.8.0
flake8==3.8.4
dbapi-compliance==1.15.0
13 changes: 10 additions & 3 deletions modules/platforms/python/dbapi/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time

import pyignite_dbapi
import pytest
Expand All @@ -24,20 +25,26 @@

TEST_PAGE_SIZE = 32

TEST_CONNECT_KWARGS = {
"address": server_addresses_basic,
"page_size": TEST_PAGE_SIZE,
"heartbeat_interval": 2,
}

@pytest.fixture()
def table_name(request):
return request.node.originalname
return f"{request.node.originalname}_{int(time.monotonic_ns())}"


@pytest.fixture()
def connection():
conn = pyignite_dbapi.connect(address=server_addresses_basic, page_size=TEST_PAGE_SIZE, heartbeat_interval=2)
conn = pyignite_dbapi.connect(**TEST_CONNECT_KWARGS)
yield conn
conn.close()

@pytest.fixture()
def service_connection():
conn = pyignite_dbapi.connect(address=server_addresses_basic, page_size=TEST_PAGE_SIZE, heartbeat_interval=2)
conn = pyignite_dbapi.connect(**TEST_CONNECT_KWARGS)
yield conn
conn.close()

Expand Down
189 changes: 189 additions & 0 deletions modules/platforms/python/dbapi/tests/test_concurrency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import threading
import time

import pytest

import pyignite_dbapi
from tests.conftest import TEST_CONNECT_KWARGS
from tests.util import wait_for_condition

NUM_THREADS = 50


@pytest.fixture()
def module_level_threadsafety():
assert pyignite_dbapi.threadsafety >= 1, "Module can not be used concurrently"


@pytest.fixture()
def connection_level_threadsafety(module_level_threadsafety):
assert pyignite_dbapi.threadsafety >= 2, "Connections can not be used concurrently"


@pytest.fixture()
def table(table_name, service_cursor, drop_table_cleanup):
service_cursor.execute(f"CREATE TABLE {table_name} (id int primary key, data varchar)")
yield table_name


def run_threads(fn, n=NUM_THREADS, *args):
barrier = threading.Barrier(n)
errors = []
errors_lock = threading.Lock()

def wrapper(tid):
try:
barrier.wait()
fn(tid, *args)
except Exception as e:
with errors_lock:
errors.append(e)

threads = [threading.Thread(target=wrapper, args=(i,)) for i in range(n)]
for t in threads:
t.start()
for t in threads:
t.join()

if errors:
raise errors[0]


def test_concurrent_module_import(module_level_threadsafety):
import importlib

def task(_):
m = importlib.import_module(pyignite_dbapi.__name__)
assert m.threadsafety > 0, "Module can not be used concurrently"

run_threads(task)


def test_concurrent_connect_use_close(module_level_threadsafety):
def task(_):
c = pyignite_dbapi.connect(**TEST_CONNECT_KWARGS)
with c.cursor() as cur:
cur.execute("SELECT 1")
assert cur.fetchone() is not None
c.close()

run_threads(task)


def test_shared_connection_per_thread_cursors(connection, connection_level_threadsafety):
def task(_):
with connection.cursor() as cur:
cur.execute("SELECT 1")
row = cur.fetchone()
assert row is not None

run_threads(task)


def test_concurrent_inserts_no_lost_writes(table, connection, connection_level_threadsafety):
rows_per_thread = 50

def task(thread_id):
with connection.cursor() as cur:
for i in range(rows_per_thread):
cur.execute(f"INSERT INTO {table} (id, data) VALUES (?, ?)", (thread_id * rows_per_thread + i, f"v{thread_id}-{i}"))

run_threads(task)

with connection.cursor() as cur:
cur.execute(f"SELECT COUNT(*) FROM {table}")
count = cur.fetchone()[0]
assert count == NUM_THREADS * rows_per_thread


def test_concurrent_commit_and_rollback(table, module_level_threadsafety):
"""Half the threads commit, half rollback. Only committed rows appear."""
committed_ids = []
lock = threading.Lock()

def task(thread_id):
with pyignite_dbapi.connect(**TEST_CONNECT_KWARGS) as conn:
conn.autocommit = False
with conn.cursor() as cur:
cur.execute(f"INSERT INTO {table} (id, data) VALUES (?, ?)", (thread_id, "x"))
if thread_id % 2 == 0:
conn.commit()
with lock:
committed_ids.append(thread_id)
else:
conn.rollback()

run_threads(task)

def get_ids():
with pyignite_dbapi.connect(**TEST_CONNECT_KWARGS) as conn:
with conn.cursor() as cur:
cur.execute(f"SELECT id FROM {table} ORDER BY id")
return {row[0] for row in cur.fetchall()}

# There is currently no mechanism to synchronize the observable timestamp across
# multiple connections, so changes will eventually become visible, but not necessarily immediately.
wait_for_condition(lambda: get_ids() == set(committed_ids), interval=0.5)


def test_concurrent_fetchall_result_integrity(table, connection, connection_level_threadsafety):
rows_num = 200
with connection.cursor() as cur:
cur.executemany(f"INSERT INTO {table} (id, data) VALUES (?, ?)", [(i, f"val-{i}") for i in range(rows_num)])

def task(_):
with connection.cursor() as cur:
cur.execute(f"SELECT id, data FROM {table} ORDER BY id")
rows = cur.fetchall()
assert len(rows) == rows_num, f"Expected {rows_num} rows, got {len(rows)}"

for idx, (rid, val) in enumerate(rows):
assert val == f"val-{rid}", f"Corrupted row: id={rid}, val={val!r}"

run_threads(task)


def test_cursor_description_thread_safety(table, connection, connection_level_threadsafety):
expected_names = {"ID", "DATA"}

def task(_):
with connection.cursor() as cur:
cur.execute(f"SELECT id, data FROM {table} LIMIT 1")
desc = cur.description
assert desc is not None
col_names = {col[0] for col in desc}
assert col_names == expected_names, f"Unexpected columns: {col_names}"

run_threads(task)


def test_concurrent_executemany(table, connection, connection_level_threadsafety):
rows_per_thread = 20

def task(thread_id):
rows = [(thread_id * 1000 + i, f"{thread_id}-{i}") for i in range(rows_per_thread)]
with connection.cursor() as cur:
cur.executemany(f"INSERT INTO {table} (id, data) VALUES (?, ?)", rows)

run_threads(task)

with connection.cursor() as cur:
cur.execute(f"SELECT COUNT(*) FROM {table}")
count = cur.fetchone()[0]

assert count == NUM_THREADS * rows_per_thread
Loading