diff --git a/async_postgres/pg_client.nim b/async_postgres/pg_client.nim index f3104a9..e3ebbdf 100644 --- a/async_postgres/pg_client.nim +++ b/async_postgres/pg_client.nim @@ -757,6 +757,12 @@ proc queryEach*( ): Future[int64] {.async.} = ## Execute a query with typed parameters, invoking `callback` once per row. ## Returns the number of rows processed. + ## + ## The `Row` passed to `callback` is only valid for the duration of that + ## single invocation: its backing buffer is reused for the next row as soon + ## as the callback returns. To retain a row beyond the callback, call + ## `row.clone()` to get a detached copy, or extract the column values you + ## need into your own types before returning. var count: int64 withConnTracing( conn, diff --git a/async_postgres/pg_pool.nim b/async_postgres/pg_pool.nim index fc38f73..c8a337d 100644 --- a/async_postgres/pg_pool.nim +++ b/async_postgres/pg_pool.nim @@ -662,6 +662,10 @@ proc queryEach*( timeout: Duration = ZeroDuration, ): Future[int64] {.async.} = ## Execute a query with typed parameters using a pooled connection, invoking `callback` once per row. + ## + ## Row lifetime: the `Row` passed to `callback` is only valid for the + ## duration of that single invocation. To retain a row beyond the callback, + ## call `row.clone()` to get a detached copy. let conn = await pool.acquire() try: return await conn.queryEach(sql, params, callback, resultFormat, timeout) diff --git a/async_postgres/pg_protocol.nim b/async_postgres/pg_protocol.nim index 7743a74..ec7497a 100644 --- a/async_postgres/pg_protocol.nim +++ b/async_postgres/pg_protocol.nim @@ -818,6 +818,47 @@ proc reuseRowData*(rd: RowData, numCols: int16): RowData = result.colFormats.setLen(0) result.colTypeOids.setLen(0) +proc clone*(row: Row): Row = + ## Return a deep copy of `row` with an independent `RowData` backing buffer + ## containing only this single row. Use this to retain rows from a + ## `queryEach` callback beyond the callback's lifetime — the original + ## buffer is reused for subsequent rows and would otherwise be overwritten. + if row.data == nil: + return Row(data: nil, rowIdx: 0) + let src = row.data + let numCols = src.numCols + let cellBase = int(row.rowIdx) * int(numCols) * 2 + var total = 0 + for i in 0 ..< int(numCols): + let clen = src.cellIndex[cellBase + i * 2 + 1] + if clen > 0: + total += int(clen) + let rd = RowData( + numCols: numCols, + colFormats: src.colFormats, + colTypeOids: src.colTypeOids, + fields: src.fields, + colMap: src.colMap, + cellIndex: newSeq[int32](int(numCols) * 2), + buf: newSeq[byte](total), + ) + var pos = 0 + for i in 0 ..< int(numCols): + let srcOff = int(src.cellIndex[cellBase + i * 2]) + let clen = src.cellIndex[cellBase + i * 2 + 1] + if clen == -1: + rd.cellIndex[i * 2] = 0'i32 + rd.cellIndex[i * 2 + 1] = -1'i32 + elif clen == 0: + rd.cellIndex[i * 2] = 0'i32 + rd.cellIndex[i * 2 + 1] = 0'i32 + else: + copyMem(addr rd.buf[pos], unsafeAddr src.buf[srcOff], int(clen)) + rd.cellIndex[i * 2] = int32(pos) + rd.cellIndex[i * 2 + 1] = clen + pos += int(clen) + Row(data: rd, rowIdx: 0) + proc buildResultFormats*(fields: openArray[FieldDescription]): seq[int16] = ## Build per-column binary format codes: 1 for known safe types, 0 for others. result = newSeq[int16](fields.len) diff --git a/tests/test_e2e.nim b/tests/test_e2e.nim index 91e1990..55ac355 100644 --- a/tests/test_e2e.nim +++ b/tests/test_e2e.nim @@ -6196,6 +6196,26 @@ suite "E2E: queryEach": waitFor t() + test "row.clone() retains row beyond callback lifetime": + proc t() {.async.} = + let conn = await connect(plainConfig()) + var saved: seq[Row] = @[] + discard await conn.queryEach( + "SELECT * FROM (VALUES ('a', 1), ('b', 2), ('c', 3)) AS t(s, n)", + callback = proc(row: Row) = + saved.add(row.clone()), + ) + doAssert saved.len == 3 + doAssert saved[0].getStr(0) == "a" + doAssert saved[0].getInt(1) == 1 + doAssert saved[1].getStr(0) == "b" + doAssert saved[1].getInt(1) == 2 + doAssert saved[2].getStr(0) == "c" + doAssert saved[2].getInt(1) == 3 + await conn.close() + + waitFor t() + test "10000 rows": proc t() {.async.} = let conn = await connect(plainConfig()) diff --git a/tests/test_rowdata.nim b/tests/test_rowdata.nim index 5d1832c..3edf58e 100644 --- a/tests/test_rowdata.nim +++ b/tests/test_rowdata.nim @@ -209,3 +209,80 @@ suite "reuseRowData": # The key safety property: rd1 is a different ref object than rd2 let rd2 = rd1.reuseRowData(1) check rd1 != rd2 # Different ref objects + +suite "Row.clone": + test "returns independent RowData backing": + let rd = newRowData(2) + parseDataRowInto(buildDataRowBody(["hello", "world"]), rd) + let row = Row(data: rd, rowIdx: 0) + let cloned = row.clone() + check cloned.data != rd + check cloned.rowIdx == 0 + check cloned.data.numCols == 2 + + test "clone survives original buffer reuse": + let rd = newRowData(2) + parseDataRowInto(buildDataRowBody(["hello", "world"]), rd) + let cloned = Row(data: rd, rowIdx: 0).clone() + + # Simulate queryEach behavior: reset and parse next row into same rd + rd.buf.setLen(0) + rd.cellIndex.setLen(0) + parseDataRowInto(buildDataRowBody(["xxx", "yyy"]), rd) + + check getCell(cloned.data, 0, 0) == "hello" + check getCell(cloned.data, 0, 1) == "world" + + test "clones a specific row out of a multi-row buffer": + let rd = newRowData(2) + parseDataRowInto(buildDataRowBody(["a0", "b0"]), rd) + parseDataRowInto(buildDataRowBody(["a1", "b1"]), rd) + parseDataRowInto(buildDataRowBody(["a2", "b2"]), rd) + + let cloned = Row(data: rd, rowIdx: 1).clone() + check cloned.data.numCols == 2 + check cloned.data.cellIndex.len == 4 # just one row worth + check getCell(cloned.data, 0, 0) == "a1" + check getCell(cloned.data, 0, 1) == "b1" + + test "preserves NULL cells": + let rd = newRowData(3) + parseDataRowInto(buildDataRowBody(["val", "\xFF", "end"]), rd) + let cloned = Row(data: rd, rowIdx: 0).clone() + check getCell(cloned.data, 0, 0) == "val" + check isCellNull(cloned.data, 0, 1) + check getCell(cloned.data, 0, 2) == "end" + + test "preserves empty string vs NULL distinction": + let rd = newRowData(2) + var body: seq[byte] + body.addInt16(2) + body.addInt32(0) # empty string + body.addInt32(-1) # NULL + parseDataRowInto(body, rd) + + let cloned = Row(data: rd, rowIdx: 0).clone() + check not isCellNull(cloned.data, 0, 0) + check getCell(cloned.data, 0, 0) == "" + check isCellNull(cloned.data, 0, 1) + + test "copies colFormats, colTypeOids, and fields": + let rd = + newRowData(2, colFormats = @[1'i16, 0'i16], colTypeOids = @[23'i32, 25'i32]) + rd.fields = @[ + FieldDescription(name: "id", typeOid: 23, formatCode: 1), + FieldDescription(name: "name", typeOid: 25, formatCode: 0), + ] + parseDataRowInto(buildDataRowBody(["x", "y"]), rd) + + let cloned = Row(data: rd, rowIdx: 0).clone() + check cloned.data.colFormats == @[1'i16, 0'i16] + check cloned.data.colTypeOids == @[23'i32, 25'i32] + check cloned.data.fields.len == 2 + check cloned.data.fields[0].name == "id" + check cloned.data.fields[1].name == "name" + + test "clone of nil data is safe": + let row = Row(data: nil, rowIdx: 0) + let cloned = row.clone() + check cloned.data == nil