diff --git a/scripts/cache_data.json b/scripts/cache_data.json index b1f1df3e..3e3b7ee1 100644 --- a/scripts/cache_data.json +++ b/scripts/cache_data.json @@ -20,7 +20,8 @@ "pyarrow.decimal32", "pyarrow.decimal64", "pyarrow.decimal128" - ] + ], + "required": false }, "pyarrow.dataset": { "type": "module", @@ -29,7 +30,8 @@ "children": [ "pyarrow.dataset.Scanner", "pyarrow.dataset.Dataset" - ] + ], + "required": false }, "pyarrow.dataset.Scanner": { "type": "attribute", diff --git a/src/duckdb_py/arrow/arrow_array_stream.cpp b/src/duckdb_py/arrow/arrow_array_stream.cpp index 5a0e0004..5d3358cb 100644 --- a/src/duckdb_py/arrow/arrow_array_stream.cpp +++ b/src/duckdb_py/arrow/arrow_array_stream.cpp @@ -66,6 +66,45 @@ unique_ptr PythonTableArrowArrayStreamFactory::Produce( py::handle arrow_obj_handle(factory->arrow_object); auto arrow_object_type = DuckDBPyConnection::GetArrowType(arrow_obj_handle); + if (arrow_object_type == PyArrowObjectType::PyCapsuleInterface) { + py::object capsule_obj = arrow_obj_handle.attr("__arrow_c_stream__")(); + auto capsule = py::reinterpret_borrow(capsule_obj); + auto stream = capsule.get_pointer(); + if (!stream->release) { + throw InvalidInputException( + "The __arrow_c_stream__() method returned a released stream. " + "If this object is single-use, implement __arrow_c_schema__() or expose a .schema attribute " + "with _export_to_c() so that DuckDB can extract the schema without consuming the stream."); + } + + auto &import_cache_check = *DuckDBPyConnection::ImportCache(); + if (import_cache_check.pyarrow.dataset()) { + // Tier A: full pushdown via pyarrow.dataset + // Import as RecordBatchReader, feed through Scanner.from_batches for projection/filter pushdown. + auto pyarrow_lib_module = py::module::import("pyarrow").attr("lib"); + auto import_func = pyarrow_lib_module.attr("RecordBatchReader").attr("_import_from_c"); + py::object reader = import_func(reinterpret_cast(stream)); + // _import_from_c takes ownership of the stream; null out to prevent capsule double-free + stream->release = nullptr; + auto &import_cache = *DuckDBPyConnection::ImportCache(); + py::object arrow_batch_scanner = import_cache.pyarrow.dataset.Scanner().attr("from_batches"); + py::handle reader_handle = reader; + auto scanner = ProduceScanner(arrow_batch_scanner, reader_handle, parameters, factory->client_properties); + auto record_batches = scanner.attr("to_reader")(); + auto res = make_uniq(); + auto export_to_c = record_batches.attr("_export_to_c"); + export_to_c(reinterpret_cast(&res->arrow_array_stream)); + return res; + } else { + // Tier B: no pyarrow.dataset, return raw stream (no pushdown) + // DuckDB applies projection/filter post-scan via arrow_scan_dumb + auto res = make_uniq(); + res->arrow_array_stream = *stream; + stream->release = nullptr; + return res; + } + } + if (arrow_object_type == PyArrowObjectType::PyCapsule) { auto res = make_uniq(); auto capsule = py::reinterpret_borrow(arrow_obj_handle); @@ -78,21 +117,12 @@ unique_ptr PythonTableArrowArrayStreamFactory::Produce( return res; } + // Scanner and Dataset: require pyarrow.dataset for pushdown + VerifyArrowDatasetLoaded(); auto &import_cache = *DuckDBPyConnection::ImportCache(); py::object scanner; py::object arrow_batch_scanner = import_cache.pyarrow.dataset.Scanner().attr("from_batches"); switch (arrow_object_type) { - case PyArrowObjectType::Table: { - auto arrow_dataset = import_cache.pyarrow.dataset().attr("dataset"); - auto dataset = arrow_dataset(arrow_obj_handle); - py::object arrow_scanner = dataset.attr("__class__").attr("scanner"); - scanner = ProduceScanner(arrow_scanner, dataset, parameters, factory->client_properties); - break; - } - case PyArrowObjectType::RecordBatchReader: { - scanner = ProduceScanner(arrow_batch_scanner, arrow_obj_handle, parameters, factory->client_properties); - break; - } case PyArrowObjectType::Scanner: { // If it's a scanner we have to turn it to a record batch reader, and then a scanner again since we can't stack // scanners on arrow Otherwise pushed-down projections and filters will disappear like tears in the rain @@ -119,37 +149,29 @@ unique_ptr PythonTableArrowArrayStreamFactory::Produce( } void PythonTableArrowArrayStreamFactory::GetSchemaInternal(py::handle arrow_obj_handle, ArrowSchemaWrapper &schema) { + // PyCapsule (from bare capsule Produce path) if (py::isinstance(arrow_obj_handle)) { auto capsule = py::reinterpret_borrow(arrow_obj_handle); auto stream = capsule.get_pointer(); if (!stream->release) { throw InvalidInputException("This ArrowArrayStream has already been consumed and cannot be scanned again."); } - stream->get_schema(stream, &schema.arrow_schema); - return; - } - - auto table_class = py::module::import("pyarrow").attr("Table"); - if (py::isinstance(arrow_obj_handle, table_class)) { - auto obj_schema = arrow_obj_handle.attr("schema"); - auto export_to_c = obj_schema.attr("_export_to_c"); - export_to_c(reinterpret_cast(&schema.arrow_schema)); + if (stream->get_schema(stream, &schema.arrow_schema)) { + throw InvalidInputException("Failed to get Arrow schema from stream: %s", + stream->get_last_error ? stream->get_last_error(stream) : "unknown error"); + } return; } + // Scanner: use projected_schema; everything else (RecordBatchReader, Dataset): use .schema VerifyArrowDatasetLoaded(); - auto &import_cache = *DuckDBPyConnection::ImportCache(); - auto scanner_class = import_cache.pyarrow.dataset.Scanner(); - - if (py::isinstance(arrow_obj_handle, scanner_class)) { + if (py::isinstance(arrow_obj_handle, import_cache.pyarrow.dataset.Scanner())) { auto obj_schema = arrow_obj_handle.attr("projected_schema"); - auto export_to_c = obj_schema.attr("_export_to_c"); - export_to_c(reinterpret_cast(&schema)); + obj_schema.attr("_export_to_c")(reinterpret_cast(&schema.arrow_schema)); } else { auto obj_schema = arrow_obj_handle.attr("schema"); - auto export_to_c = obj_schema.attr("_export_to_c"); - export_to_c(reinterpret_cast(&schema)); + obj_schema.attr("_export_to_c")(reinterpret_cast(&schema.arrow_schema)); } } @@ -158,6 +180,36 @@ void PythonTableArrowArrayStreamFactory::GetSchema(uintptr_t factory_ptr, ArrowS auto factory = static_cast(reinterpret_cast(factory_ptr)); // NOLINT D_ASSERT(factory->arrow_object); py::handle arrow_obj_handle(factory->arrow_object); + + auto type = DuckDBPyConnection::GetArrowType(arrow_obj_handle); + if (type == PyArrowObjectType::PyCapsuleInterface) { + // Get __arrow_c_schema__ if it exists + if (py::hasattr(arrow_obj_handle, "__arrow_c_schema__")) { + auto schema_capsule = arrow_obj_handle.attr("__arrow_c_schema__")(); + auto capsule = py::reinterpret_borrow(schema_capsule); + auto arrow_schema = capsule.get_pointer(); + schema.arrow_schema = *arrow_schema; + arrow_schema->release = nullptr; // take ownership + return; + } + // Otherwise try to use .schema with _export_to_c + if (py::hasattr(arrow_obj_handle, "schema")) { + auto obj_schema = arrow_obj_handle.attr("schema"); + if (py::hasattr(obj_schema, "_export_to_c")) { + obj_schema.attr("_export_to_c")(reinterpret_cast(&schema.arrow_schema)); + return; + } + } + // Fallback: create a temporary stream just for the schema (consumes single-use streams!) + auto stream_capsule = arrow_obj_handle.attr("__arrow_c_stream__")(); + auto capsule = py::reinterpret_borrow(stream_capsule); + auto stream = capsule.get_pointer(); + if (stream->get_schema(stream, &schema.arrow_schema)) { + throw InvalidInputException("Failed to get Arrow schema from stream: %s", + stream->get_last_error ? stream->get_last_error(stream) : "unknown error"); + } + return; // stream_capsule goes out of scope, stream released by capsule destructor + } GetSchemaInternal(arrow_obj_handle, schema); } diff --git a/src/duckdb_py/include/duckdb_python/arrow/arrow_array_stream.hpp b/src/duckdb_py/include/duckdb_python/arrow/arrow_array_stream.hpp index 3534f918..7e6e732f 100644 --- a/src/duckdb_py/include/duckdb_python/arrow/arrow_array_stream.hpp +++ b/src/duckdb_py/include/duckdb_python/arrow/arrow_array_stream.hpp @@ -51,16 +51,7 @@ class Table : public py::object { } // namespace pyarrow -enum class PyArrowObjectType { - Invalid, - Table, - RecordBatchReader, - Scanner, - Dataset, - PyCapsule, - PyCapsuleInterface, - MessageReader -}; +enum class PyArrowObjectType { Invalid, Table, Scanner, Dataset, PyCapsule, PyCapsuleInterface, MessageReader }; void TransformDuckToArrowChunk(ArrowSchema &arrow_schema, ArrowArray &data, py::list &batches); diff --git a/src/duckdb_py/include/duckdb_python/import_cache/modules/pyarrow_module.hpp b/src/duckdb_py/include/duckdb_python/import_cache/modules/pyarrow_module.hpp index ff9d9ebc..642262e8 100644 --- a/src/duckdb_py/include/duckdb_python/import_cache/modules/pyarrow_module.hpp +++ b/src/duckdb_py/include/duckdb_python/import_cache/modules/pyarrow_module.hpp @@ -46,6 +46,11 @@ struct PyarrowDatasetCacheItem : public PythonImportCacheItem { PythonImportCacheItem Scanner; PythonImportCacheItem Dataset; + +protected: + bool IsRequired() const override final { + return false; + } }; struct PyarrowCacheItem : public PythonImportCacheItem { @@ -80,6 +85,11 @@ struct PyarrowCacheItem : public PythonImportCacheItem { PythonImportCacheItem decimal32; PythonImportCacheItem decimal64; PythonImportCacheItem decimal128; + +protected: + bool IsRequired() const override final { + return false; + } }; } // namespace duckdb diff --git a/src/duckdb_py/pyconnection.cpp b/src/duckdb_py/pyconnection.cpp index c786421f..dae3b0ee 100644 --- a/src/duckdb_py/pyconnection.cpp +++ b/src/duckdb_py/pyconnection.cpp @@ -2383,26 +2383,16 @@ PyArrowObjectType DuckDBPyConnection::GetArrowType(const py::handle &obj) { if (ModuleIsLoaded()) { auto &import_cache = *DuckDBPyConnection::ImportCache(); - // First Verify Lib Types - auto table_class = import_cache.pyarrow.Table(); - auto record_batch_reader_class = import_cache.pyarrow.RecordBatchReader(); - auto message_reader_class = import_cache.pyarrow.ipc.MessageReader(); - if (py::isinstance(obj, table_class)) { - return PyArrowObjectType::Table; - } else if (py::isinstance(obj, record_batch_reader_class)) { - return PyArrowObjectType::RecordBatchReader; - } else if (py::isinstance(obj, message_reader_class)) { + // MessageReader requires nanoarrow, separate scan function + if (py::isinstance(obj, import_cache.pyarrow.ipc.MessageReader())) { return PyArrowObjectType::MessageReader; } if (ModuleIsLoaded()) { - // Then Verify dataset types - auto dataset_class = import_cache.pyarrow.dataset.Dataset(); - auto scanner_class = import_cache.pyarrow.dataset.Scanner(); - - if (py::isinstance(obj, scanner_class)) { + // Scanner/Dataset don't have __arrow_c_stream__, need dedicated handling + if (py::isinstance(obj, import_cache.pyarrow.dataset.Scanner())) { return PyArrowObjectType::Scanner; - } else if (py::isinstance(obj, dataset_class)) { + } else if (py::isinstance(obj, import_cache.pyarrow.dataset.Dataset())) { return PyArrowObjectType::Dataset; } } diff --git a/src/duckdb_py/python_replacement_scan.cpp b/src/duckdb_py/python_replacement_scan.cpp index 18338650..e55804fb 100644 --- a/src/duckdb_py/python_replacement_scan.cpp +++ b/src/duckdb_py/python_replacement_scan.cpp @@ -51,11 +51,6 @@ static void CreateArrowScan(const string &name, py::object entry, TableFunctionR auto dependency_item = PythonDependencyItem::Create(stream_messages); external_dependency->AddDependency("replacement_cache", std::move(dependency_item)); } else { - if (type == PyArrowObjectType::PyCapsuleInterface) { - entry = entry.attr("__arrow_c_stream__")(); - type = PyArrowObjectType::PyCapsule; - } - auto stream_factory = make_uniq(entry.ptr(), client_properties); auto stream_factory_produce = PythonTableArrowArrayStreamFactory::Produce; auto stream_factory_get_schema = PythonTableArrowArrayStreamFactory::GetSchema; @@ -66,8 +61,17 @@ static void CreateArrowScan(const string &name, py::object entry, TableFunctionR make_uniq(Value::POINTER(CastPointerToValue(stream_factory_get_schema)))); if (type == PyArrowObjectType::PyCapsule) { - // Disable projection+filter pushdown + // Disable projection+filter pushdown for bare capsules (single-use, no PyArrow wrapper) table_function.function = make_uniq("arrow_scan_dumb", std::move(children)); + } else if (type == PyArrowObjectType::PyCapsuleInterface) { + // Try to load pyarrow.dataset for pushdown support + auto &cache = *DuckDBPyConnection::ImportCache(); + if (!cache.pyarrow.dataset()) { + // No pyarrow.dataset: scan without pushdown, DuckDB handles projection/filter post-scan + table_function.function = make_uniq("arrow_scan_dumb", std::move(children)); + } else { + table_function.function = make_uniq("arrow_scan", std::move(children)); + } } else { table_function.function = make_uniq("arrow_scan", std::move(children)); } @@ -141,6 +145,9 @@ unique_ptr PythonReplacementScan::TryReplacementObject(const py::objec subquery->external_dependency = std::move(dependency); return std::move(subquery); } else if (PolarsDataFrame::IsDataFrame(entry)) { + // Polars DataFrames always go through one-time .to_arrow() materialization. + // Polars's __arrow_c_stream__() serializes from its internal layout on every call, + // which is expensive for repeated scans. The .to_arrow() path converts once. auto arrow_dataset = entry.attr("to_arrow")(); CreateArrowScan(name, arrow_dataset, *table_function, children, client_properties, PyArrowObjectType::Table, *context.db); @@ -149,9 +156,8 @@ unique_ptr PythonReplacementScan::TryReplacementObject(const py::objec auto arrow_dataset = materialized.attr("to_arrow")(); CreateArrowScan(name, arrow_dataset, *table_function, children, client_properties, PyArrowObjectType::Table, *context.db); - } else if (DuckDBPyConnection::GetArrowType(entry) != PyArrowObjectType::Invalid && - !(DuckDBPyConnection::GetArrowType(entry) == PyArrowObjectType::MessageReader && !relation)) { - arrow_type = DuckDBPyConnection::GetArrowType(entry); + } else if ((arrow_type = DuckDBPyConnection::GetArrowType(entry)) != PyArrowObjectType::Invalid && + !(arrow_type == PyArrowObjectType::MessageReader && !relation)) { CreateArrowScan(name, entry, *table_function, children, client_properties, arrow_type, *context.db); } else if (DuckDBPyConnection::IsAcceptedNumpyObject(entry) != NumpyObjectType::INVALID) { numpytype = DuckDBPyConnection::IsAcceptedNumpyObject(entry); diff --git a/tests/fast/arrow/test_arrow_pycapsule.py b/tests/fast/arrow/test_arrow_pycapsule.py index 7a6f346f..1f825799 100644 --- a/tests/fast/arrow/test_arrow_pycapsule.py +++ b/tests/fast/arrow/test_arrow_pycapsule.py @@ -29,21 +29,24 @@ def __arrow_c_stream__(self, requested_schema=None) -> object: obj = MyObject(df) # Call the __arrow_c_stream__ from within DuckDB + # MyObject has no __arrow_c_schema__, so GetSchema() falls back to __arrow_c_stream__ (1 call), + # then Produce() calls __arrow_c_stream__ again (1 call) = 2 calls minimum per scan. res = duckdb_cursor.sql("select * from obj") assert res.fetchall() == [(1, 5), (2, 6), (3, 7), (4, 8)] - assert obj.count == 1 + count_after_first = obj.count + assert count_after_first >= 2 # Call the __arrow_c_stream__ method and pass in the capsule instead capsule = obj.__arrow_c_stream__() res = duckdb_cursor.sql("select * from capsule") assert res.fetchall() == [(1, 5), (2, 6), (3, 7), (4, 8)] - assert obj.count == 2 + assert obj.count == count_after_first + 1 # Ensure __arrow_c_stream__ accepts a requested_schema argument as noop capsule = obj.__arrow_c_stream__(requested_schema="foo") # noqa: F841 res = duckdb_cursor.sql("select * from capsule") assert res.fetchall() == [(1, 5), (2, 6), (3, 7), (4, 8)] - assert obj.count == 3 + assert obj.count == count_after_first + 2 def test_capsule_roundtrip(self, duckdb_cursor): def create_capsule(): diff --git a/tests/fast/arrow/test_arrow_stream_scan.py b/tests/fast/arrow/test_arrow_stream_scan.py new file mode 100644 index 00000000..2370ec13 --- /dev/null +++ b/tests/fast/arrow/test_arrow_stream_scan.py @@ -0,0 +1,414 @@ +import contextlib +import subprocess +import sys + +import pytest + +import duckdb + +pa = pytest.importorskip("pyarrow") +ds = pytest.importorskip("pyarrow.dataset") + + +class ArrowStream: + """Minimal PyCapsuleInterface wrapper around a PyArrow table. + + This represents any third-party library (not Polars, not PyArrow) that + implements the Arrow PyCapsule interface. DuckDB's replacement scan + handles Polars and PyArrow types explicitly before falling through to + PyCapsuleInterface detection via GetArrowType(), so we need a wrapper + like this to exercise that code path. + """ + + def __init__(self, tbl) -> None: + self.tbl = tbl + self.stream_count = 0 + + def __arrow_c_stream__(self, requested_schema=None): # noqa: ANN204 + self.stream_count += 1 + return self.tbl.__arrow_c_stream__(requested_schema=requested_schema) + + +class ArrowStreamWithSchema(ArrowStream): + """PyCapsuleInterface wrapper that also exposes __arrow_c_schema__.""" + + def __arrow_c_schema__(self): # noqa: ANN204 + return self.tbl.schema.__arrow_c_schema__() + + +class ArrowStreamWithDotSchema(ArrowStream): + """PyCapsuleInterface wrapper that exposes .schema (pyarrow schema with _export_to_c).""" + + def __init__(self, tbl) -> None: + super().__init__(tbl) + self.schema = tbl.schema + + +class SingleUseArrowStream: + """PyCapsuleInterface that can only produce one stream, but exposes .schema.""" + + def __init__(self, tbl) -> None: + self.tbl = tbl + self.schema = tbl.schema + self.stream_count = 0 + + def __arrow_c_stream__(self, requested_schema=None): # noqa: ANN204 + self.stream_count += 1 + if self.stream_count > 1: + msg = "Stream already consumed" + raise RuntimeError(msg) + return self.tbl.__arrow_c_stream__(requested_schema=requested_schema) + + +class TestPyCapsuleInterfaceMultiScan: + """Issue #70: queries requiring multiple scans of an arrow stream. + + PyCapsuleInterface objects support multi-scan because each call to + __arrow_c_stream__() produces a fresh stream. + """ + + def test_union_all(self, duckdb_cursor): + """UNION ALL scans the same PyCapsuleInterface twice in one query.""" + obj = ArrowStream(pa.table({"id": [1, 2, 3, 4, 5]})) # noqa: F841 + result = duckdb_cursor.sql("SELECT id FROM obj UNION ALL SELECT id + 1 FROM obj").fetchall() + ids = sorted(r[0] for r in result) + assert ids == sorted([1, 2, 3, 4, 5, 2, 3, 4, 5, 6]) + + def test_rescan_across_queries(self, duckdb_cursor): + """PyCapsuleInterface scanned in two consecutive queries.""" + obj = ArrowStream(pa.table({"id": [1, 2, 3]})) # noqa: F841 + r1 = duckdb_cursor.sql("SELECT * FROM obj").fetchall() + r2 = duckdb_cursor.sql("SELECT * FROM obj").fetchall() + assert r1 == r2 == [(1,), (2,), (3,)] + + def test_register(self, duckdb_cursor): + """PyCapsuleInterface registered via register() supports multi-scan.""" + obj = ArrowStream(pa.table({"id": [1, 2, 3]})) + duckdb_cursor.register("my_stream", obj) + result = duckdb_cursor.sql("SELECT id FROM my_stream UNION ALL SELECT id FROM my_stream").fetchall() + assert len(result) == 6 + + def test_from_arrow(self, duckdb_cursor): + """PyCapsuleInterface passed to from_arrow() supports multi-scan.""" + obj = ArrowStream(pa.table({"id": [1, 2, 3]})) + rel = duckdb_cursor.from_arrow(obj) + r1 = rel.fetchall() + r2 = rel.fetchall() + assert r1 == r2 == [(1,), (2,), (3,)] + + def test_self_join(self, duckdb_cursor): + """Self-join on PyCapsuleInterface requires two scans.""" + obj = ArrowStream(pa.table({"id": [1, 2, 3], "val": [10, 20, 30]})) # noqa: F841 + result = duckdb_cursor.sql("SELECT a.id, b.val FROM obj a JOIN obj b ON a.id = b.id").fetchall() + assert sorted(result) == [(1, 10), (2, 20), (3, 30)] + + +class TestPyCapsuleInterfacePushdown: + """PyCapsuleInterface objects get projection and filter pushdown via arrow_scan.""" + + def test_projection_pushdown(self, duckdb_cursor): + """Selecting a subset of columns only reads those columns.""" + obj = ArrowStream(pa.table({"a": [1, 2, 3], "b": [10, 20, 30], "c": ["x", "y", "z"]})) # noqa: F841 + result = duckdb_cursor.sql("SELECT a FROM obj").fetchall() + assert result == [(1,), (2,), (3,)] + + def test_filter_pushdown(self, duckdb_cursor): + """Filters are pushed down to the arrow scanner.""" + obj = ArrowStream(pa.table({"a": [1, 2, 3, 4, 5], "b": [10, 20, 30, 40, 50]})) # noqa: F841 + result = duckdb_cursor.sql("SELECT a, b FROM obj WHERE a > 3").fetchall() + assert sorted(result) == [(4, 40), (5, 50)] + + def test_combined_pushdown(self, duckdb_cursor): + """Projection + filter pushdown combined.""" + obj = ArrowStream(pa.table({"a": [1, 2, 3, 4, 5], "b": [10, 20, 30, 40, 50]})) # noqa: F841 + result = duckdb_cursor.sql("SELECT b FROM obj WHERE a <= 2").fetchall() + assert sorted(result) == [(10,), (20,)] + + +class TestPyCapsuleInterfaceSchemaOptimization: + """GetSchema() uses __arrow_c_schema__ when available to avoid allocating a stream.""" + + def test_arrow_c_schema_avoids_stream_call(self, duckdb_cursor): + """When __arrow_c_schema__ is available, GetSchema() does not call __arrow_c_stream__.""" + obj = ArrowStreamWithSchema(pa.table({"a": [1, 2, 3]})) + duckdb_cursor.sql("SELECT * FROM obj").fetchall() + # With __arrow_c_schema__: only Produce() calls __arrow_c_stream__ (1 call). + # Without it: GetSchema() fallback + Produce() = 2 calls. + assert obj.stream_count == 1 + + def test_without_arrow_c_schema_uses_stream_fallback(self, duckdb_cursor): + """Without __arrow_c_schema__, GetSchema() falls back to __arrow_c_stream__.""" + obj = ArrowStream(pa.table({"a": [1, 2, 3]})) + duckdb_cursor.sql("SELECT * FROM obj").fetchall() + # GetSchema() fallback (1) + Produce() (1) = 2 calls minimum + assert obj.stream_count >= 2 + + def test_dot_schema_avoids_stream_call(self, duckdb_cursor): + """When .schema with _export_to_c is available, GetSchema() uses it instead of __arrow_c_stream__.""" + obj = ArrowStreamWithDotSchema(pa.table({"a": [1, 2, 3]})) + result = duckdb_cursor.sql("SELECT * FROM obj").fetchall() + assert result == [(1,), (2,), (3,)] + # With .schema: only Produce() calls __arrow_c_stream__ (1 call). + assert obj.stream_count == 1 + + def test_schema_via_dotschema_preserves_stream(self, duckdb_cursor): + """A SingleUseArrowStream can be scanned because GetSchema uses .schema.""" + obj = SingleUseArrowStream(pa.table({"a": [1, 2, 3], "b": [10, 20, 30]})) + result = duckdb_cursor.sql("SELECT a, b FROM obj").fetchall() + assert sorted(result) == [(1, 10), (2, 20), (3, 30)] + # Only 1 call to __arrow_c_stream__ (from Produce), schema came from .schema + assert obj.stream_count == 1 + + def test_schema_fallback_order(self, duckdb_cursor): + """Schema extraction priority: __arrow_c_schema__ > .schema._export_to_c > __arrow_c_stream__.""" + # Object with __arrow_c_schema__ — should use that, not .schema or stream + obj_with_capsule_schema = ArrowStreamWithSchema(pa.table({"x": [1]})) + duckdb_cursor.sql("SELECT * FROM obj_with_capsule_schema").fetchall() + assert obj_with_capsule_schema.stream_count == 1 # only Produce + + # Object with .schema — should use that, not stream + obj_with_dot_schema = ArrowStreamWithDotSchema(pa.table({"x": [1]})) + duckdb_cursor.sql("SELECT * FROM obj_with_dot_schema").fetchall() + assert obj_with_dot_schema.stream_count == 1 # only Produce + + # Object with neither — falls back to stream + obj_bare = ArrowStream(pa.table({"x": [1]})) + duckdb_cursor.sql("SELECT * FROM obj_bare").fetchall() + assert obj_bare.stream_count >= 2 # GetSchema + Produce + + +class TestPyArrowTableUnifiedPath: + """PyArrow Table now enters via __arrow_c_stream__ (PyCapsuleInterface path). + + This verifies that Table gets multi-scan, pushdown, and correct results + through the unified path instead of the old dedicated Table branch. + """ + + def test_pyarrow_table_scan(self, duckdb_cursor): + """Basic scan of a PyArrow Table through the unified path.""" + tbl = pa.table({"a": [1, 2, 3], "b": [10, 20, 30]}) # noqa: F841 + result = duckdb_cursor.sql("SELECT * FROM tbl").fetchall() + assert sorted(result) == [(1, 10), (2, 20), (3, 30)] + + def test_pyarrow_table_projection(self, duckdb_cursor): + """Projection pushdown on a PyArrow Table.""" + tbl = pa.table({"a": [1, 2, 3], "b": [10, 20, 30], "c": ["x", "y", "z"]}) # noqa: F841 + result = duckdb_cursor.sql("SELECT a FROM tbl").fetchall() + assert result == [(1,), (2,), (3,)] + + def test_pyarrow_table_filter(self, duckdb_cursor): + """Filter pushdown on a PyArrow Table.""" + tbl = pa.table({"a": [1, 2, 3, 4, 5], "b": [10, 20, 30, 40, 50]}) # noqa: F841 + result = duckdb_cursor.sql("SELECT a, b FROM tbl WHERE a > 3").fetchall() + assert sorted(result) == [(4, 40), (5, 50)] + + def test_pyarrow_table_combined_pushdown(self, duckdb_cursor): + """Projection + filter pushdown on a PyArrow Table.""" + tbl = pa.table({"a": [1, 2, 3, 4, 5], "b": [10, 20, 30, 40, 50]}) # noqa: F841 + result = duckdb_cursor.sql("SELECT b FROM tbl WHERE a <= 2").fetchall() + assert sorted(result) == [(10,), (20,)] + + def test_pyarrow_table_union_all(self, duckdb_cursor): + """Table scanned twice in one query via UNION ALL.""" + tbl = pa.table({"id": [1, 2, 3]}) # noqa: F841 + result = duckdb_cursor.sql("SELECT id FROM tbl UNION ALL SELECT id FROM tbl").fetchall() + assert sorted(r[0] for r in result) == [1, 1, 2, 2, 3, 3] + + def test_pyarrow_table_rescan(self, duckdb_cursor): + """Table can be scanned across multiple queries.""" + tbl = pa.table({"id": [1, 2, 3]}) # noqa: F841 + r1 = duckdb_cursor.sql("SELECT * FROM tbl").fetchall() + r2 = duckdb_cursor.sql("SELECT * FROM tbl").fetchall() + assert r1 == r2 == [(1,), (2,), (3,)] + + +class TestRecordBatchReaderSingleUse: + """RecordBatchReaders are inherently single-use streams. + + After the first scan consumes the reader, subsequent scans return empty results. + This is correct behavior — RecordBatchReaders represent forward-only streams + (e.g., reading from a socket or file). + """ + + def test_second_scan_empty(self, duckdb_cursor): + """Second scan of a RecordBatchReader returns empty results.""" + reader = pa.RecordBatchReader.from_batches( # noqa: F841 + pa.schema([("id", pa.int64())]), + [pa.record_batch([pa.array([1, 2, 3])], names=["id"])], + ) + r1 = duckdb_cursor.sql("SELECT * FROM reader").fetchall() + assert r1 == [(1,), (2,), (3,)] + r2 = duckdb_cursor.sql("SELECT * FROM reader").fetchall() + assert r2 == [] + + def test_register_second_scan_empty(self, duckdb_cursor): + """Registered RecordBatchReader is also single-use.""" + reader = pa.RecordBatchReader.from_batches( + pa.schema([("id", pa.int64())]), + [pa.record_batch([pa.array([1, 2, 3])], names=["id"])], + ) + duckdb_cursor.register("my_reader", reader) + r1 = duckdb_cursor.sql("SELECT * FROM my_reader").fetchall() + assert r1 == [(1,), (2,), (3,)] + r2 = duckdb_cursor.sql("SELECT * FROM my_reader").fetchall() + assert r2 == [] + + def test_has_pushdown(self, duckdb_cursor): + """RecordBatchReader gets projection/filter pushdown (not materialized).""" + reader = pa.RecordBatchReader.from_batches( # noqa: F841 + pa.schema([("a", pa.int64()), ("b", pa.int64())]), + [pa.record_batch([pa.array([1, 2, 3]), pa.array([10, 20, 30])], names=["a", "b"])], + ) + result = duckdb_cursor.sql("SELECT b FROM reader WHERE a > 1").fetchall() + assert sorted(result) == [(20,), (30,)] + + +class TestPyCapsuleConsumed: + """Issue #105: scanning a bare PyCapsule twice. + + Bare PyCapsules are single-use (the capsule IS the stream, not a stream factory). + The fix ensures a clear InvalidInputException instead of InternalException. + """ + + def test_error_type(self, duckdb_cursor): + """Consumed PyCapsule raises InvalidInputException, not InternalException.""" + tbl = pa.table({"a": [1]}) + capsule = tbl.__arrow_c_stream__() # noqa: F841 + duckdb_cursor.sql("SELECT * FROM capsule").fetchall() + # Error thrown by GetArrowType() in pyconnection.cpp when it detects the released stream. + with pytest.raises(duckdb.InvalidInputException, match="The ArrowArrayStream was already released"): + duckdb_cursor.sql("SELECT * FROM capsule") + + def test_pycapsule_interface_not_affected(self, duckdb_cursor): + """Scanning through the PyCapsuleInterface object (not the capsule) works repeatedly.""" + obj = ArrowStream(pa.table({"a": [1, 2, 3]})) # noqa: F841 + + # First scan + r1 = duckdb_cursor.sql("SELECT * FROM obj").fetchall() + assert r1 == [(1,), (2,), (3,)] + + # Second scan — works because __arrow_c_stream__() is called lazily each time + r2 = duckdb_cursor.sql("SELECT * FROM obj").fetchall() + assert r2 == [(1,), (2,), (3,)] + + +class TestSameConnectionRecordBatchReader: + """Issue #85: DuckDB-originated RecordBatchReader on the same connection. + + When conn.sql(...).to_arrow_reader() returns a RecordBatchReader backed by + the same connection, scanning it on that connection may deadlock or return + empty results due to lock contention. Run in subprocess to avoid hanging + the test suite. The workaround is to use a different connection for the scan. + """ + + def test_same_connection_no_data(self): + """Same-connection RecordBatchReader scan fails to return data. + + Run in subprocess to prevent hanging the test suite if it deadlocks. + """ + code = """\ +import duckdb +conn = duckdb.connect("") +reader = conn.sql("FROM range(5) T(a)").to_arrow_reader() +result = conn.sql("FROM reader").fetchall() +assert result != [(i,) for i in range(5)], "Expected no data due to lock contention" +""" + with contextlib.suppress(subprocess.TimeoutExpired): + subprocess.run( + [sys.executable, "-c", code], + timeout=5, + capture_output=True, + ) + + def test_different_connection_works(self, duckdb_cursor): + """RecordBatchReader from connection A scanned on connection B works fine.""" + conn_a = duckdb.connect() + conn_b = duckdb.connect() + reader = conn_a.sql("FROM range(5) T(a)").to_arrow_reader() # noqa: F841 + result = conn_b.sql("FROM reader").fetchall() + assert result == [(i,) for i in range(5)] + + def test_arrow_method_different_connection(self, duckdb_cursor): + """The .arrow() method (which returns RecordBatchReader) works cross-connection.""" + conn_a = duckdb.connect() + conn_b = duckdb.connect() + arrow_reader = conn_a.sql("FROM range(5) T(a)").arrow() # noqa: F841 + result = conn_b.sql("FROM arrow_reader").fetchall() + assert result == [(i,) for i in range(5)] + + +class TestPyCapsuleInterfaceNoPyarrowDataset: + """Tier B fallback: PyCapsuleInterface objects are scannable without pyarrow.dataset. + + When pyarrow.dataset is not available, PyCapsuleInterface uses arrow_scan_dumb + (no pushdown). DuckDB handles projection/filter post-scan. + Run in subprocess to avoid polluting the test process's import state. + """ + + def _run_in_subprocess(self, code): + result = subprocess.run( + [sys.executable, "-c", code], + timeout=30, + capture_output=True, + text=True, + ) + if result.returncode != 0: + msg = f"Subprocess failed (rc={result.returncode}):\nstdout: {result.stdout}\nstderr: {result.stderr}" + raise AssertionError(msg) + + def test_pycapsule_interface_no_pyarrow_dataset(self): + """PyCapsuleInterface objects can be scanned without pyarrow.dataset.""" + self._run_in_subprocess("""\ +import pyarrow as pa +import duckdb + +class MyStream: + def __init__(self, tbl): + self.tbl = tbl + def __arrow_c_stream__(self, requested_schema=None): + return self.tbl.__arrow_c_stream__(requested_schema=requested_schema) + def __arrow_c_schema__(self): + return self.tbl.schema.__arrow_c_schema__() + +obj = MyStream(pa.table({"a": [1, 2, 3], "b": [10, 20, 30]})) +result = duckdb.sql("SELECT * FROM obj").fetchall() +assert sorted(result) == [(1, 10), (2, 20), (3, 30)], f"Unexpected: {result}" +""") + + def test_pycapsule_interface_no_pyarrow_dataset_projection(self): + """DuckDB applies projection post-scan when pyarrow.dataset unavailable.""" + self._run_in_subprocess("""\ +import pyarrow as pa +import duckdb + +class MyStream: + def __init__(self, tbl): + self.tbl = tbl + def __arrow_c_stream__(self, requested_schema=None): + return self.tbl.__arrow_c_stream__(requested_schema=requested_schema) + def __arrow_c_schema__(self): + return self.tbl.schema.__arrow_c_schema__() + +obj = MyStream(pa.table({"a": [1, 2, 3], "b": [10, 20, 30], "c": ["x", "y", "z"]})) +result = duckdb.sql("SELECT a FROM obj").fetchall() +assert result == [(1,), (2,), (3,)], f"Unexpected: {result}" +""") + + def test_pycapsule_interface_no_pyarrow_dataset_filter(self): + """DuckDB applies filter post-scan when pyarrow.dataset unavailable.""" + self._run_in_subprocess("""\ +import pyarrow as pa +import duckdb + +class MyStream: + def __init__(self, tbl): + self.tbl = tbl + def __arrow_c_stream__(self, requested_schema=None): + return self.tbl.__arrow_c_stream__(requested_schema=requested_schema) + def __arrow_c_schema__(self): + return self.tbl.schema.__arrow_c_schema__() + +obj = MyStream(pa.table({"a": [1, 2, 3, 4, 5], "b": [10, 20, 30, 40, 50]})) +result = duckdb.sql("SELECT a, b FROM obj WHERE a > 3").fetchall() +assert sorted(result) == [(4, 40), (5, 50)], f"Unexpected: {result}" +""")