-
Notifications
You must be signed in to change notification settings - Fork 48
Expand file tree
/
Copy pathtest_003_connection.py
More file actions
5613 lines (4561 loc) · 246 KB
/
test_003_connection.py
File metadata and controls
5613 lines (4561 loc) · 246 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
This file contains tests for the Connection class.
Functions:
- test_connection_string: Check if the connection string is not None.
- test_connection: Check if the database connection is established.
- test_connection_close: Check if the database connection is closed.
- test_commit: Make a transaction and commit.
- test_rollback: Make a transaction and rollback.
- test_invalid_connection_string: Check if initializing with an invalid connection string raises an exception.
- test_connection_pooling_speed: Test connection pooling speed.
- test_connection_pooling_basic: Test basic connection pooling functionality.
- test_autocommit_default: Check if autocommit is False by default.
- test_autocommit_setter: Test setting autocommit mode and its effect on transactions.
- test_set_autocommit: Test the setautocommit method.
- test_construct_connection_string: Check if the connection string is constructed correctly with kwargs.
- test_connection_string_with_attrs_before: Check if the connection string is constructed correctly with attrs_before.
- test_connection_string_with_odbc_param: Check if the connection string is constructed correctly with ODBC parameters.
- test_rollback_on_close: Test that rollback occurs on connection close if autocommit is False.
- test_context_manager_commit: Test that context manager commits transaction on normal exit.
- test_context_manager_autocommit_mode: Test context manager behavior with autocommit enabled.
- test_context_manager_connection_closes: Test that context manager closes the connection.
"""
import mssql_python
import pytest
import time
from mssql_python import connect, Connection, pooling, SQL_CHAR, SQL_WCHAR
import threading
# Import all exception classes for testing
from mssql_python.exceptions import (
Warning,
Error,
InterfaceError,
DatabaseError,
DataError,
OperationalError,
IntegrityError,
InternalError,
ProgrammingError,
NotSupportedError,
)
import struct
from datetime import datetime, timedelta, timezone
from mssql_python.constants import ConstantsDDBC
@pytest.fixture(autouse=True)
def clean_connection_state(db_connection):
"""Ensure connection is in a clean state before each test"""
# Create a cursor and clear any active results
try:
cleanup_cursor = db_connection.cursor()
cleanup_cursor.execute("SELECT 1") # Simple query to reset state
cleanup_cursor.fetchall() # Consume all results
cleanup_cursor.close()
except Exception:
pass # Ignore errors during cleanup
yield # Run the test
# Clean up after the test
try:
cleanup_cursor = db_connection.cursor()
cleanup_cursor.execute("SELECT 1") # Simple query to reset state
cleanup_cursor.fetchall() # Consume all results
cleanup_cursor.close()
except Exception:
pass # Ignore errors during cleanup
from mssql_python.constants import GetInfoConstants as sql_const
def drop_table_if_exists(cursor, table_name):
"""Drop the table if it exists"""
try:
cursor.execute(f"DROP TABLE IF EXISTS {table_name}")
except Exception as e:
pytest.fail(f"Failed to drop table {table_name}: {e}")
# Add these helper functions after other helper functions
def handle_datetimeoffset(dto_value):
"""Converter function for SQL Server's DATETIMEOFFSET type"""
if dto_value is None:
return None
# The format depends on the ODBC driver and how it returns binary data
# This matches SQL Server's format for DATETIMEOFFSET
tup = struct.unpack("<6hI2h", dto_value) # e.g., (2017, 3, 16, 10, 35, 18, 500000000, -6, 0)
return datetime(
tup[0], tup[1], tup[2], tup[3], tup[4], tup[5], tup[6] // 1000,
timezone(timedelta(hours=tup[7], minutes=tup[8]))
)
def custom_string_converter(value):
"""A simple converter that adds a prefix to string values"""
if value is None:
return None
return "CONVERTED: " + value.decode('utf-16-le') # SQL_WVARCHAR is UTF-16LE encoded
def test_connection_string(conn_str):
# Check if the connection string is not None
assert conn_str is not None, "Connection string should not be None"
def test_connection(db_connection):
# Check if the database connection is established
assert db_connection is not None, "Database connection variable should not be None"
cursor = db_connection.cursor()
assert cursor is not None, "Database connection failed - Cursor cannot be None"
def test_construct_connection_string(db_connection):
# Check if the connection string is constructed correctly with kwargs
conn_str = db_connection._construct_connection_string(host="localhost", user="me", password="mypwd", database="mydb", encrypt="yes", trust_server_certificate="yes")
assert "Server=localhost;" in conn_str, "Connection string should contain 'Server=localhost;'"
assert "Uid=me;" in conn_str, "Connection string should contain 'Uid=me;'"
assert "Pwd=mypwd;" in conn_str, "Connection string should contain 'Pwd=mypwd;'"
assert "Database=mydb;" in conn_str, "Connection string should contain 'Database=mydb;'"
assert "Encrypt=yes;" in conn_str, "Connection string should contain 'Encrypt=yes;'"
assert "TrustServerCertificate=yes;" in conn_str, "Connection string should contain 'TrustServerCertificate=yes;'"
assert "APP=MSSQL-Python" in conn_str, "Connection string should contain 'APP=MSSQL-Python'"
assert "Driver={ODBC Driver 18 for SQL Server}" in conn_str, "Connection string should contain 'Driver={ODBC Driver 18 for SQL Server}'"
assert "Driver={ODBC Driver 18 for SQL Server};;APP=MSSQL-Python;Server=localhost;Uid=me;Pwd=mypwd;Database=mydb;Encrypt=yes;TrustServerCertificate=yes;" == conn_str, "Connection string is incorrect"
def test_connection_string_with_attrs_before(db_connection):
# Check if the connection string is constructed correctly with attrs_before
conn_str = db_connection._construct_connection_string(host="localhost", user="me", password="mypwd", database="mydb", encrypt="yes", trust_server_certificate="yes", attrs_before={1256: "token"})
assert "Server=localhost;" in conn_str, "Connection string should contain 'Server=localhost;'"
assert "Uid=me;" in conn_str, "Connection string should contain 'Uid=me;'"
assert "Pwd=mypwd;" in conn_str, "Connection string should contain 'Pwd=mypwd;'"
assert "Database=mydb;" in conn_str, "Connection string should contain 'Database=mydb;'"
assert "Encrypt=yes;" in conn_str, "Connection string should contain 'Encrypt=yes;'"
assert "TrustServerCertificate=yes;" in conn_str, "Connection string should contain 'TrustServerCertificate=yes;'"
assert "APP=MSSQL-Python" in conn_str, "Connection string should contain 'APP=MSSQL-Python'"
assert "Driver={ODBC Driver 18 for SQL Server}" in conn_str, "Connection string should contain 'Driver={ODBC Driver 18 for SQL Server}'"
assert "{1256: token}" not in conn_str, "Connection string should not contain '{1256: token}'"
def test_connection_string_with_odbc_param(db_connection):
# Check if the connection string is constructed correctly with ODBC parameters
conn_str = db_connection._construct_connection_string(server="localhost", uid="me", pwd="mypwd", database="mydb", encrypt="yes", trust_server_certificate="yes")
assert "Server=localhost;" in conn_str, "Connection string should contain 'Server=localhost;'"
assert "Uid=me;" in conn_str, "Connection string should contain 'Uid=me;'"
assert "Pwd=mypwd;" in conn_str, "Connection string should contain 'Pwd=mypwd;'"
assert "Database=mydb;" in conn_str, "Connection string should contain 'Database=mydb;'"
assert "Encrypt=yes;" in conn_str, "Connection string should contain 'Encrypt=yes;'"
assert "TrustServerCertificate=yes;" in conn_str, "Connection string should contain 'TrustServerCertificate=yes;'"
assert "APP=MSSQL-Python" in conn_str, "Connection string should contain 'APP=MSSQL-Python'"
assert "Driver={ODBC Driver 18 for SQL Server}" in conn_str, "Connection string should contain 'Driver={ODBC Driver 18 for SQL Server}'"
assert "Driver={ODBC Driver 18 for SQL Server};;APP=MSSQL-Python;Server=localhost;Uid=me;Pwd=mypwd;Database=mydb;Encrypt=yes;TrustServerCertificate=yes;" == conn_str, "Connection string is incorrect"
def test_autocommit_default(db_connection):
assert db_connection.autocommit is False, "Autocommit should be False by default"
def test_autocommit_setter(db_connection):
db_connection.autocommit = True
cursor = db_connection.cursor()
# Make a transaction and check if it is autocommited
drop_table_if_exists(cursor, "#pytest_test_autocommit")
try:
cursor.execute("CREATE TABLE #pytest_test_autocommit (id INT PRIMARY KEY, value VARCHAR(50));")
cursor.execute("INSERT INTO #pytest_test_autocommit (id, value) VALUES (1, 'test');")
cursor.execute("SELECT * FROM #pytest_test_autocommit WHERE id = 1;")
result = cursor.fetchone()
assert result is not None, "Autocommit failed: No data found"
assert result[1] == 'test', "Autocommit failed: Incorrect data"
except Exception as e:
pytest.fail(f"Autocommit failed: {e}")
finally:
cursor.execute("DROP TABLE #pytest_test_autocommit;")
db_connection.commit()
assert db_connection.autocommit is True, "Autocommit should be True"
db_connection.autocommit = False
cursor = db_connection.cursor()
# Make a transaction and check if it is not autocommited
drop_table_if_exists(cursor, "#pytest_test_autocommit")
try:
cursor.execute("CREATE TABLE #pytest_test_autocommit (id INT PRIMARY KEY, value VARCHAR(50));")
cursor.execute("INSERT INTO #pytest_test_autocommit (id, value) VALUES (1, 'test');")
cursor.execute("SELECT * FROM #pytest_test_autocommit WHERE id = 1;")
result = cursor.fetchone()
assert result is not None, "Autocommit failed: No data found"
assert result[1] == 'test', "Autocommit failed: Incorrect data"
db_connection.commit()
cursor.execute("SELECT * FROM #pytest_test_autocommit WHERE id = 1;")
result = cursor.fetchone()
assert result is not None, "Autocommit failed: No data found after commit"
assert result[1] == 'test', "Autocommit failed: Incorrect data after commit"
except Exception as e:
pytest.fail(f"Autocommit failed: {e}")
finally:
cursor.execute("DROP TABLE #pytest_test_autocommit;")
db_connection.commit()
def test_set_autocommit(db_connection):
db_connection.setautocommit(True)
assert db_connection.autocommit is True, "Autocommit should be True"
db_connection.setautocommit(False)
assert db_connection.autocommit is False, "Autocommit should be False"
def test_commit(db_connection):
# Make a transaction and commit
cursor = db_connection.cursor()
drop_table_if_exists(cursor, "#pytest_test_commit")
try:
cursor.execute("CREATE TABLE #pytest_test_commit (id INT PRIMARY KEY, value VARCHAR(50));")
cursor.execute("INSERT INTO #pytest_test_commit (id, value) VALUES (1, 'test');")
db_connection.commit()
cursor.execute("SELECT * FROM #pytest_test_commit WHERE id = 1;")
result = cursor.fetchone()
assert result is not None, "Commit failed: No data found"
assert result[1] == 'test', "Commit failed: Incorrect data"
except Exception as e:
pytest.fail(f"Commit failed: {e}")
finally:
cursor.execute("DROP TABLE #pytest_test_commit;")
db_connection.commit()
def test_rollback_on_close(conn_str, db_connection):
# Test that rollback occurs on connection close if autocommit is False
# Using a permanent table to ensure rollback is tested correctly
cursor = db_connection.cursor()
drop_table_if_exists(cursor, "pytest_test_rollback_on_close")
try:
# Create a permanent table for testing
cursor.execute("CREATE TABLE pytest_test_rollback_on_close (id INT PRIMARY KEY, value VARCHAR(50));")
db_connection.commit()
# This simulates a scenario where the connection is closed without committing
# and checks if the rollback occurs
temp_conn = connect(conn_str)
temp_cursor = temp_conn.cursor()
temp_cursor.execute("INSERT INTO pytest_test_rollback_on_close (id, value) VALUES (1, 'test');")
# Verify data is visible within the same transaction
temp_cursor.execute("SELECT * FROM pytest_test_rollback_on_close WHERE id = 1;")
result = temp_cursor.fetchone()
assert result is not None, "Rollback on close failed: No data found before close"
assert result[1] == 'test', "Rollback on close failed: Incorrect data before close"
# Close the temporary connection without committing
temp_conn.close()
# Now check if the data is rolled back
cursor.execute("SELECT * FROM pytest_test_rollback_on_close WHERE id = 1;")
result = cursor.fetchone()
assert result is None, "Rollback on close failed: Data found after rollback"
except Exception as e:
pytest.fail(f"Rollback on close failed: {e}")
finally:
drop_table_if_exists(cursor, "pytest_test_rollback_on_close")
db_connection.commit()
def test_rollback(db_connection):
# Make a transaction and rollback
cursor = db_connection.cursor()
drop_table_if_exists(cursor, "#pytest_test_rollback")
try:
# Create a table and insert data
cursor.execute("CREATE TABLE #pytest_test_rollback (id INT PRIMARY KEY, value VARCHAR(50));")
cursor.execute("INSERT INTO #pytest_test_rollback (id, value) VALUES (1, 'test');")
db_connection.commit()
# Check if the data is present before rollback
cursor.execute("SELECT * FROM #pytest_test_rollback WHERE id = 1;")
result = cursor.fetchone()
assert result is not None, "Rollback failed: No data found before rollback"
assert result[1] == 'test', "Rollback failed: Incorrect data"
# Insert data and rollback
cursor.execute("INSERT INTO #pytest_test_rollback (id, value) VALUES (2, 'test');")
db_connection.rollback()
# Check if the data is not present after rollback
cursor.execute("SELECT * FROM #pytest_test_rollback WHERE id = 2;")
result = cursor.fetchone()
assert result is None, "Rollback failed: Data found after rollback"
except Exception as e:
pytest.fail(f"Rollback failed: {e}")
finally:
cursor.execute("DROP TABLE #pytest_test_rollback;")
db_connection.commit()
def test_invalid_connection_string():
# Check if initializing with an invalid connection string raises an exception
with pytest.raises(Exception):
Connection("invalid_connection_string")
def test_connection_close(conn_str):
# Create a separate connection just for this test
temp_conn = connect(conn_str)
# Check if the database connection can be closed
temp_conn.close()
def test_connection_pooling_speed(conn_str):
"""Test that connection pooling provides performance benefits over multiple iterations."""
import statistics
# Warm up to eliminate cold start effects
for _ in range(3):
conn = connect(conn_str)
conn.close()
# Disable pooling first
pooling(enabled=False)
# Test without pooling (multiple times)
no_pool_times = []
for _ in range(10):
start = time.perf_counter()
conn = connect(conn_str)
conn.close()
end = time.perf_counter()
no_pool_times.append(end - start)
# Enable pooling
pooling(max_size=5, idle_timeout=30)
# Test with pooling (multiple times)
pool_times = []
for _ in range(10):
start = time.perf_counter()
conn = connect(conn_str)
conn.close()
end = time.perf_counter()
pool_times.append(end - start)
# Use median times to reduce impact of outliers
median_no_pool = statistics.median(no_pool_times)
median_pool = statistics.median(pool_times)
# Allow for some variance - pooling should be at least 30% faster on average
improvement_threshold = 0.7 # Pool should be <= 70% of no-pool time
print(f"No pool median: {median_no_pool:.6f}s")
print(f"Pool median: {median_pool:.6f}s")
print(f"Improvement ratio: {median_pool/median_no_pool:.2f}")
# Clean up - disable pooling for other tests
pooling(enabled=False)
assert median_pool <= median_no_pool * improvement_threshold, \
f"Expected pooling to be at least 30% faster. No-pool: {median_no_pool:.6f}s, Pool: {median_pool:.6f}s"
def test_connection_pooling_reuse_spid(conn_str):
"""Test that connections are actually reused from the pool"""
# Enable pooling
pooling(max_size=1, idle_timeout=30)
# Create and close a connection
conn1 = connect(conn_str)
cursor1 = conn1.cursor()
cursor1.execute("SELECT @@SPID") # Get SQL Server process ID
spid1 = cursor1.fetchone()[0]
conn1.close()
# Get another connection - should be the same one from pool
conn2 = connect(conn_str)
cursor2 = conn2.cursor()
cursor2.execute("SELECT @@SPID")
spid2 = cursor2.fetchone()[0]
conn2.close()
# The SPID should be the same, indicating connection reuse
assert spid1 == spid2, "Connections not reused - different SPIDs"
# Clean up
def test_pool_exhaustion_max_size_1(conn_str):
"""Test pool exhaustion when max_size=1 and multiple concurrent connections are requested."""
pooling(max_size=1, idle_timeout=30)
conn1 = connect(conn_str)
results = []
def try_connect():
try:
conn2 = connect(conn_str)
results.append("success")
conn2.close()
except Exception as e:
results.append(str(e))
# Start a thread that will attempt to get a second connection while the first is open
t = threading.Thread(target=try_connect)
t.start()
t.join(timeout=2)
conn1.close()
# Depending on implementation, either blocks, raises, or times out
assert results, "Second connection attempt did not complete"
# If pool blocks, the thread may not finish until conn1 is closed, so allow both outcomes
assert results[0] == "success" or "pool" in results[0].lower() or "timeout" in results[0].lower(), \
f"Unexpected pool exhaustion result: {results[0]}"
pooling(enabled=False)
def test_pool_idle_timeout_removes_connections(conn_str):
"""Test that idle_timeout removes connections from the pool after the timeout."""
pooling(max_size=2, idle_timeout=2)
conn1 = connect(conn_str)
spid_list = []
cursor1 = conn1.cursor()
cursor1.execute("SELECT @@SPID")
spid1 = cursor1.fetchone()[0]
spid_list.append(spid1)
conn1.close()
# Wait for longer than idle_timeout
time.sleep(3)
# Get a new connection, which should not reuse the previous SPID
conn2 = connect(conn_str)
cursor2 = conn2.cursor()
cursor2.execute("SELECT @@SPID")
spid2 = cursor2.fetchone()[0]
spid_list.append(spid2)
conn2.close()
assert spid1 != spid2, "Idle timeout did not remove connection from pool"
def test_connection_timeout_invalid_password(conn_str):
"""Test that connecting with an invalid password raises an exception quickly (timeout)."""
# Modify the connection string to use an invalid password
if "Pwd=" in conn_str:
bad_conn_str = conn_str.replace("Pwd=", "Pwd=wrongpassword")
elif "Password=" in conn_str:
bad_conn_str = conn_str.replace("Password=", "Password=wrongpassword")
else:
pytest.skip("No password found in connection string to modify")
start = time.perf_counter()
with pytest.raises(Exception):
connect(bad_conn_str)
elapsed = time.perf_counter() - start
# Should fail quickly (within 10 seconds)
assert elapsed < 10, f"Connection with invalid password took too long: {elapsed:.2f}s"
def test_connection_timeout_invalid_host(conn_str):
"""Test that connecting to an invalid host fails with a timeout."""
# Replace server/host with an invalid one
if "Server=" in conn_str:
bad_conn_str = conn_str.replace("Server=", "Server=invalidhost12345;")
elif "host=" in conn_str:
bad_conn_str = conn_str.replace("host=", "host=invalidhost12345;")
else:
pytest.skip("No server/host found in connection string to modify")
start = time.perf_counter()
with pytest.raises(Exception):
connect(bad_conn_str)
elapsed = time.perf_counter() - start
# Should fail within a reasonable time (30s)
# Note: This may vary based on network conditions, so adjust as needed
# but generally, a connection to an invalid host should not take too long
# to fail.
# If it takes too long, it may indicate a misconfiguration or network issue.
assert elapsed < 30, f"Connection to invalid host took too long: {elapsed:.2f}s"
def test_pool_removes_invalid_connections(conn_str):
"""Test that the pool removes connections that become invalid (simulate by closing underlying connection)."""
pooling(max_size=1, idle_timeout=30)
conn = connect(conn_str)
cursor = conn.cursor()
cursor.execute("SELECT 1")
# Simulate invalidation by forcibly closing the connection at the driver level
try:
# Try to access a private attribute or method to forcibly close the underlying connection
# This is implementation-specific; if not possible, skip
if hasattr(conn, "_conn") and hasattr(conn._conn, "close"):
conn._conn.close()
else:
pytest.skip("Cannot forcibly close underlying connection for this driver")
except Exception:
pass
# Safely close the connection, ignoring errors due to forced invalidation
try:
conn.close()
except RuntimeError as e:
if "not initialized" not in str(e):
raise
# Now, get a new connection from the pool and ensure it works
new_conn = connect(conn_str)
new_cursor = new_conn.cursor()
try:
new_cursor.execute("SELECT 1")
result = new_cursor.fetchone()
assert result is not None and result[0] == 1, "Pool did not remove invalid connection"
finally:
new_conn.close()
pooling(enabled=False)
def test_pool_recovery_after_failed_connection(conn_str):
"""Test that the pool recovers after a failed connection attempt."""
pooling(max_size=1, idle_timeout=30)
# First, try to connect with a bad password (should fail)
if "Pwd=" in conn_str:
bad_conn_str = conn_str.replace("Pwd=", "Pwd=wrongpassword")
elif "Password=" in conn_str:
bad_conn_str = conn_str.replace("Password=", "Password=wrongpassword")
else:
pytest.skip("No password found in connection string to modify")
with pytest.raises(Exception):
connect(bad_conn_str)
# Now, connect with the correct string and ensure it works
conn = connect(conn_str)
cursor = conn.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
assert result is not None and result[0] == 1, "Pool did not recover after failed connection"
conn.close()
pooling(enabled=False)
def test_pool_capacity_limit_and_overflow(conn_str):
"""Test that pool does not grow beyond max_size and handles overflow gracefully."""
pooling(max_size=2, idle_timeout=30)
conns = []
try:
# Open up to max_size connections
conns.append(connect(conn_str))
conns.append(connect(conn_str))
# Try to open a third connection, which should fail or block
overflow_result = []
def try_overflow():
try:
c = connect(conn_str)
overflow_result.append("success")
c.close()
except Exception as e:
overflow_result.append(str(e))
t = threading.Thread(target=try_overflow)
t.start()
t.join(timeout=2)
assert overflow_result, "Overflow connection attempt did not complete"
# Accept either block, error, or success if pool implementation allows overflow
assert overflow_result[0] == "success" or "pool" in overflow_result[0].lower() or "timeout" in overflow_result[0].lower(), \
f"Unexpected pool overflow result: {overflow_result[0]}"
finally:
for c in conns:
c.close()
pooling(enabled=False)
def test_connection_pooling_basic(conn_str):
# Enable pooling with small pool size
pooling(max_size=2, idle_timeout=5)
conn1 = connect(conn_str)
conn2 = connect(conn_str)
assert conn1 is not None
assert conn2 is not None
try:
conn3 = connect(conn_str)
assert conn3 is not None, "Third connection failed — pooling is not working or limit is too strict"
conn3.close()
except Exception as e:
print(f"Expected: Could not open third connection due to max_size=2: {e}")
conn1.close()
conn2.close()
def test_context_manager_commit(conn_str):
"""Test that context manager closes connection on normal exit"""
# Create a permanent table for testing across connections
setup_conn = connect(conn_str)
setup_cursor = setup_conn.cursor()
drop_table_if_exists(setup_cursor, "pytest_context_manager_test")
try:
setup_cursor.execute("CREATE TABLE pytest_context_manager_test (id INT PRIMARY KEY, value VARCHAR(50));")
setup_conn.commit()
setup_conn.close()
# Test context manager closes connection
with connect(conn_str) as conn:
assert conn.autocommit is False, "Autocommit should be False by default"
cursor = conn.cursor()
cursor.execute("INSERT INTO pytest_context_manager_test (id, value) VALUES (1, 'context_test');")
conn.commit() # Manual commit now required
# Connection should be closed here
# Verify data was committed manually
verify_conn = connect(conn_str)
verify_cursor = verify_conn.cursor()
verify_cursor.execute("SELECT * FROM pytest_context_manager_test WHERE id = 1;")
result = verify_cursor.fetchone()
assert result is not None, "Manual commit failed: No data found"
assert result[1] == 'context_test', "Manual commit failed: Incorrect data"
verify_conn.close()
except Exception as e:
pytest.fail(f"Context manager test failed: {e}")
finally:
# Cleanup
cleanup_conn = connect(conn_str)
cleanup_cursor = cleanup_conn.cursor()
drop_table_if_exists(cleanup_cursor, "pytest_context_manager_test")
cleanup_conn.commit()
cleanup_conn.close()
def test_context_manager_connection_closes(conn_str):
"""Test that context manager closes the connection"""
conn = None
try:
with connect(conn_str) as conn:
cursor = conn.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
assert result[0] == 1, "Connection should work inside context manager"
# Connection should be closed after exiting context manager
assert conn._closed, "Connection should be closed after exiting context manager"
# Should not be able to use the connection after closing
with pytest.raises(InterfaceError):
conn.cursor()
except Exception as e:
pytest.fail(f"Context manager connection close test failed: {e}")
def test_close_with_autocommit_true(conn_str):
"""Test that connection.close() with autocommit=True doesn't trigger rollback."""
cursor = None
conn = None
try:
# Create a temporary table for testing
setup_conn = connect(conn_str)
setup_cursor = setup_conn.cursor()
drop_table_if_exists(setup_cursor, "pytest_autocommit_close_test")
setup_cursor.execute("CREATE TABLE pytest_autocommit_close_test (id INT PRIMARY KEY, value VARCHAR(50));")
setup_conn.commit()
setup_conn.close()
# Create a connection with autocommit=True
conn = connect(conn_str)
conn.autocommit = True
assert conn.autocommit is True, "Autocommit should be True"
# Insert data
cursor = conn.cursor()
cursor.execute("INSERT INTO pytest_autocommit_close_test (id, value) VALUES (1, 'test_autocommit');")
# Close the connection without explicitly committing
conn.close()
# Verify the data was committed automatically despite connection.close()
verify_conn = connect(conn_str)
verify_cursor = verify_conn.cursor()
verify_cursor.execute("SELECT * FROM pytest_autocommit_close_test WHERE id = 1;")
result = verify_cursor.fetchone()
# Data should be present if autocommit worked and wasn't affected by close()
assert result is not None, "Autocommit failed: Data not found after connection close"
assert result[1] == 'test_autocommit', "Autocommit failed: Incorrect data after connection close"
verify_conn.close()
except Exception as e:
pytest.fail(f"Test failed: {e}")
finally:
# Clean up
cleanup_conn = connect(conn_str)
cleanup_cursor = cleanup_conn.cursor()
drop_table_if_exists(cleanup_cursor, "pytest_autocommit_close_test")
cleanup_conn.commit()
cleanup_conn.close()
def test_setencoding_default_settings(db_connection):
"""Test that default encoding settings are correct."""
settings = db_connection.getencoding()
assert settings['encoding'] == 'utf-16le', "Default encoding should be utf-16le"
assert settings['ctype'] == -8, "Default ctype should be SQL_WCHAR (-8)"
def test_setencoding_basic_functionality(db_connection):
"""Test basic setencoding functionality."""
# Test setting UTF-8 encoding
db_connection.setencoding(encoding='utf-8')
settings = db_connection.getencoding()
assert settings['encoding'] == 'utf-8', "Encoding should be set to utf-8"
assert settings['ctype'] == 1, "ctype should default to SQL_CHAR (1) for utf-8"
# Test setting UTF-16LE with explicit ctype
db_connection.setencoding(encoding='utf-16le', ctype=-8)
settings = db_connection.getencoding()
assert settings['encoding'] == 'utf-16le', "Encoding should be set to utf-16le"
assert settings['ctype'] == -8, "ctype should be SQL_WCHAR (-8)"
def test_setencoding_automatic_ctype_detection(db_connection):
"""Test automatic ctype detection based on encoding."""
# UTF-16 variants should default to SQL_WCHAR
utf16_encodings = ['utf-16', 'utf-16le', 'utf-16be']
for encoding in utf16_encodings:
db_connection.setencoding(encoding=encoding)
settings = db_connection.getencoding()
assert settings['ctype'] == -8, f"{encoding} should default to SQL_WCHAR (-8)"
# Other encodings should default to SQL_CHAR
other_encodings = ['utf-8', 'latin-1', 'ascii']
for encoding in other_encodings:
db_connection.setencoding(encoding=encoding)
settings = db_connection.getencoding()
assert settings['ctype'] == 1, f"{encoding} should default to SQL_CHAR (1)"
def test_setencoding_explicit_ctype_override(db_connection):
"""Test that explicit ctype parameter overrides automatic detection."""
# Set UTF-8 with SQL_WCHAR (override default)
db_connection.setencoding(encoding='utf-8', ctype=-8)
settings = db_connection.getencoding()
assert settings['encoding'] == 'utf-8', "Encoding should be utf-8"
assert settings['ctype'] == -8, "ctype should be SQL_WCHAR (-8) when explicitly set"
# Set UTF-16LE with SQL_CHAR (override default)
db_connection.setencoding(encoding='utf-16le', ctype=1)
settings = db_connection.getencoding()
assert settings['encoding'] == 'utf-16le', "Encoding should be utf-16le"
assert settings['ctype'] == 1, "ctype should be SQL_CHAR (1) when explicitly set"
def test_setencoding_none_parameters(db_connection):
"""Test setencoding with None parameters."""
# Test with encoding=None (should use default)
db_connection.setencoding(encoding=None)
settings = db_connection.getencoding()
assert settings['encoding'] == 'utf-16le', "encoding=None should use default utf-16le"
assert settings['ctype'] == -8, "ctype should be SQL_WCHAR for utf-16le"
# Test with both None (should use defaults)
db_connection.setencoding(encoding=None, ctype=None)
settings = db_connection.getencoding()
assert settings['encoding'] == 'utf-16le', "encoding=None should use default utf-16le"
assert settings['ctype'] == -8, "ctype=None should use default SQL_WCHAR"
def test_setencoding_invalid_encoding(db_connection):
"""Test setencoding with invalid encoding."""
with pytest.raises(ProgrammingError) as exc_info:
db_connection.setencoding(encoding='invalid-encoding-name')
assert "Unsupported encoding" in str(exc_info.value), "Should raise ProgrammingError for invalid encoding"
assert "invalid-encoding-name" in str(exc_info.value), "Error message should include the invalid encoding name"
def test_setencoding_invalid_ctype(db_connection):
"""Test setencoding with invalid ctype."""
with pytest.raises(ProgrammingError) as exc_info:
db_connection.setencoding(encoding='utf-8', ctype=999)
assert "Invalid ctype" in str(exc_info.value), "Should raise ProgrammingError for invalid ctype"
assert "999" in str(exc_info.value), "Error message should include the invalid ctype value"
def test_setencoding_closed_connection(conn_str):
"""Test setencoding on closed connection."""
temp_conn = connect(conn_str)
temp_conn.close()
with pytest.raises(InterfaceError) as exc_info:
temp_conn.setencoding(encoding='utf-8')
assert "Connection is closed" in str(exc_info.value), "Should raise InterfaceError for closed connection"
def test_setencoding_constants_access():
"""Test that SQL_CHAR and SQL_WCHAR constants are accessible."""
import mssql_python
# Test constants exist and have correct values
assert hasattr(mssql_python, 'SQL_CHAR'), "SQL_CHAR constant should be available"
assert hasattr(mssql_python, 'SQL_WCHAR'), "SQL_WCHAR constant should be available"
assert mssql_python.SQL_CHAR == 1, "SQL_CHAR should have value 1"
assert mssql_python.SQL_WCHAR == -8, "SQL_WCHAR should have value -8"
def test_setencoding_with_constants(db_connection):
"""Test setencoding using module constants."""
import mssql_python
# Test with SQL_CHAR constant
db_connection.setencoding(encoding='utf-8', ctype=mssql_python.SQL_CHAR)
settings = db_connection.getencoding()
assert settings['ctype'] == mssql_python.SQL_CHAR, "Should accept SQL_CHAR constant"
# Test with SQL_WCHAR constant
db_connection.setencoding(encoding='utf-16le', ctype=mssql_python.SQL_WCHAR)
settings = db_connection.getencoding()
assert settings['ctype'] == mssql_python.SQL_WCHAR, "Should accept SQL_WCHAR constant"
def test_setencoding_common_encodings(db_connection):
"""Test setencoding with various common encodings."""
common_encodings = [
'utf-8',
'utf-16le',
'utf-16be',
'utf-16',
'latin-1',
'ascii',
'cp1252'
]
for encoding in common_encodings:
try:
db_connection.setencoding(encoding=encoding)
settings = db_connection.getencoding()
assert settings['encoding'] == encoding, f"Failed to set encoding {encoding}"
except Exception as e:
pytest.fail(f"Failed to set valid encoding {encoding}: {e}")
def test_setencoding_persistence_across_cursors(db_connection):
"""Test that encoding settings persist across cursor operations."""
# Set custom encoding
db_connection.setencoding(encoding='utf-8', ctype=1)
# Create cursors and verify encoding persists
cursor1 = db_connection.cursor()
settings1 = db_connection.getencoding()
cursor2 = db_connection.cursor()
settings2 = db_connection.getencoding()
assert settings1 == settings2, "Encoding settings should persist across cursor creation"
assert settings1['encoding'] == 'utf-8', "Encoding should remain utf-8"
assert settings1['ctype'] == 1, "ctype should remain SQL_CHAR"
cursor1.close()
cursor2.close()
@pytest.mark.skip("Skipping Unicode data tests till we have support for Unicode")
def test_setencoding_with_unicode_data(db_connection):
"""Test setencoding with actual Unicode data operations."""
# Test UTF-8 encoding with Unicode data
db_connection.setencoding(encoding='utf-8')
cursor = db_connection.cursor()
try:
# Create test table
cursor.execute("CREATE TABLE #test_encoding_unicode (text_col NVARCHAR(100))")
# Test various Unicode strings
test_strings = [
"Hello, World!",
"Hello, 世界!", # Chinese
"Привет, мир!", # Russian
"مرحبا بالعالم", # Arabic
"🌍🌎🌏", # Emoji
]
for test_string in test_strings:
# Insert data
cursor.execute("INSERT INTO #test_encoding_unicode (text_col) VALUES (?)", test_string)
# Retrieve and verify
cursor.execute("SELECT text_col FROM #test_encoding_unicode WHERE text_col = ?", test_string)
result = cursor.fetchone()
assert result is not None, f"Failed to retrieve Unicode string: {test_string}"
assert result[0] == test_string, f"Unicode string mismatch: expected {test_string}, got {result[0]}"
# Clear for next test
cursor.execute("DELETE FROM #test_encoding_unicode")
except Exception as e:
pytest.fail(f"Unicode data test failed with UTF-8 encoding: {e}")
finally:
try:
cursor.execute("DROP TABLE #test_encoding_unicode")
except:
pass
cursor.close()
def test_setencoding_before_and_after_operations(db_connection):
"""Test that setencoding works both before and after database operations."""
cursor = db_connection.cursor()
try:
# Initial encoding setting
db_connection.setencoding(encoding='utf-16le')
# Perform database operation
cursor.execute("SELECT 'Initial test' as message")
result1 = cursor.fetchone()
assert result1[0] == 'Initial test', "Initial operation failed"
# Change encoding after operation
db_connection.setencoding(encoding='utf-8')
settings = db_connection.getencoding()
assert settings['encoding'] == 'utf-8', "Failed to change encoding after operation"
# Perform another operation with new encoding
cursor.execute("SELECT 'Changed encoding test' as message")
result2 = cursor.fetchone()
assert result2[0] == 'Changed encoding test', "Operation after encoding change failed"
except Exception as e:
pytest.fail(f"Encoding change test failed: {e}")
finally:
cursor.close()
def test_getencoding_default(conn_str):
"""Test getencoding returns default settings"""
conn = connect(conn_str)
try:
encoding_info = conn.getencoding()
assert isinstance(encoding_info, dict)
assert 'encoding' in encoding_info
assert 'ctype' in encoding_info
# Default should be utf-16le with SQL_WCHAR
assert encoding_info['encoding'] == 'utf-16le'
assert encoding_info['ctype'] == SQL_WCHAR
finally:
conn.close()
def test_getencoding_returns_copy(conn_str):
"""Test getencoding returns a copy (not reference)"""
conn = connect(conn_str)
try:
encoding_info1 = conn.getencoding()
encoding_info2 = conn.getencoding()
# Should be equal but not the same object
assert encoding_info1 == encoding_info2
assert encoding_info1 is not encoding_info2
# Modifying one shouldn't affect the other
encoding_info1['encoding'] = 'modified'
assert encoding_info2['encoding'] != 'modified'
finally:
conn.close()
def test_getencoding_closed_connection(conn_str):
"""Test getencoding on closed connection raises InterfaceError"""
conn = connect(conn_str)
conn.close()
with pytest.raises(InterfaceError, match="Connection is closed"):
conn.getencoding()
def test_setencoding_getencoding_consistency(conn_str):
"""Test that setencoding and getencoding work consistently together"""
conn = connect(conn_str)
try:
test_cases = [
('utf-8', SQL_CHAR),
('utf-16le', SQL_WCHAR),
('latin-1', SQL_CHAR),
('ascii', SQL_CHAR),
]
for encoding, expected_ctype in test_cases:
conn.setencoding(encoding)
encoding_info = conn.getencoding()
assert encoding_info['encoding'] == encoding.lower()
assert encoding_info['ctype'] == expected_ctype
finally:
conn.close()
def test_setencoding_default_encoding(conn_str):
"""Test setencoding with default UTF-16LE encoding"""
conn = connect(conn_str)
try:
conn.setencoding()
encoding_info = conn.getencoding()
assert encoding_info['encoding'] == 'utf-16le'
assert encoding_info['ctype'] == SQL_WCHAR
finally:
conn.close()
def test_setencoding_utf8(conn_str):
"""Test setencoding with UTF-8 encoding"""
conn = connect(conn_str)
try:
conn.setencoding('utf-8')
encoding_info = conn.getencoding()
assert encoding_info['encoding'] == 'utf-8'
assert encoding_info['ctype'] == SQL_CHAR
finally:
conn.close()
def test_setencoding_latin1(conn_str):
"""Test setencoding with latin-1 encoding"""
conn = connect(conn_str)
try:
conn.setencoding('latin-1')
encoding_info = conn.getencoding()
assert encoding_info['encoding'] == 'latin-1'
assert encoding_info['ctype'] == SQL_CHAR
finally:
conn.close()
def test_setencoding_with_explicit_ctype_sql_char(conn_str):
"""Test setencoding with explicit SQL_CHAR ctype"""
conn = connect(conn_str)
try:
conn.setencoding('utf-8', SQL_CHAR)
encoding_info = conn.getencoding()
assert encoding_info['encoding'] == 'utf-8'
assert encoding_info['ctype'] == SQL_CHAR
finally:
conn.close()
def test_setencoding_with_explicit_ctype_sql_wchar(conn_str):
"""Test setencoding with explicit SQL_WCHAR ctype"""
conn = connect(conn_str)
try:
conn.setencoding('utf-16le', SQL_WCHAR)
encoding_info = conn.getencoding()
assert encoding_info['encoding'] == 'utf-16le'
assert encoding_info['ctype'] == SQL_WCHAR
finally:
conn.close()
def test_setencoding_invalid_ctype_error(conn_str):
"""Test setencoding with invalid ctype raises ProgrammingError"""