From 38322b3a38d45005804721d4fe4ce9796470c2d4 Mon Sep 17 00:00:00 2001 From: "Adisa Mubarak (AdMub)" <99817240+AdMub@users.noreply.github.com> Date: Thu, 22 Jan 2026 06:14:27 +0000 Subject: [PATCH 1/3] docs: Add warning to first_value about usage in select vs aggregate Clarifies that aggregate functions like first_value must be used within .aggregate() and not .select(). Closes #1300. --- python/datafusion/functions.py | 5 ++++ reproduce_1300.py | 45 ++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+) create mode 100644 reproduce_1300.py diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 7ae59c000..89645104a 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -2268,6 +2268,11 @@ def first_value( ) -> Expr: """Returns the first value in a group of values. + .. note:: + This is an aggregate function. It must be used within + :py:meth:`~datafusion.dataframe.DataFrame.aggregate` and generally cannot be + used with :py:meth:`~datafusion.dataframe.DataFrame.select`. + This aggregate function will return the first value in the partition. If using the builder functions described in ref:`_aggregation` this function ignores diff --git a/reproduce_1300.py b/reproduce_1300.py new file mode 100644 index 000000000..55285aa2b --- /dev/null +++ b/reproduce_1300.py @@ -0,0 +1,45 @@ +import datafusion as dfn +from datafusion import lit, col, functions as F +from datafusion.expr import Window, WindowFrame + +def main() -> None: + # Create the context and data + ctx = dfn.SessionContext() + df = ctx.from_pydict( + {"any_row": list(range(10))}, + ) + + # Add a column of ones + df = df.select( + "any_row", + lit(1).alias("ones"), + ) + + # Perform Window functions + df = df.select( + "any_row", + F.sum(col("ones"))\ + .over(Window(window_frame=WindowFrame("rows", None, 0), order_by=col("any_row").sort(ascending=True))) \ + .alias("forward_row_sum"), + F.sum(col("ones"))\ + .over(Window(window_frame=WindowFrame("rows", None, 0), order_by=col("any_row").sort(ascending=False))) \ + .alias("reverse_row_sum"), + ) + + # Collect the intermediate window results (this should work) + print("Collecting Window Results...") + df.collect() + + # THIS IS THE FIX TEST + print("Attempting to use .aggregate() instead of .select() ...") + + # We use an empty list [] for group_by, and the function for the aggregate + df.aggregate( + [], + [F.first_value(col("forward_row_sum"), order_by=col("any_row"))] + ).collect() + + print("Success! .aggregate() worked.") + +if __name__ == "__main__": + main() \ No newline at end of file From d7b3ab504f4e9f3d4d54039df9a458a54a598ecd Mon Sep 17 00:00:00 2001 From: "Adisa Mubarak (AdMub)" <99817240+AdMub@users.noreply.github.com> Date: Thu, 22 Jan 2026 06:16:33 +0000 Subject: [PATCH 2/3] chore: remove temporary reproduction script --- reproduce_1300.py | 45 --------------------------------------------- 1 file changed, 45 deletions(-) delete mode 100644 reproduce_1300.py diff --git a/reproduce_1300.py b/reproduce_1300.py deleted file mode 100644 index 55285aa2b..000000000 --- a/reproduce_1300.py +++ /dev/null @@ -1,45 +0,0 @@ -import datafusion as dfn -from datafusion import lit, col, functions as F -from datafusion.expr import Window, WindowFrame - -def main() -> None: - # Create the context and data - ctx = dfn.SessionContext() - df = ctx.from_pydict( - {"any_row": list(range(10))}, - ) - - # Add a column of ones - df = df.select( - "any_row", - lit(1).alias("ones"), - ) - - # Perform Window functions - df = df.select( - "any_row", - F.sum(col("ones"))\ - .over(Window(window_frame=WindowFrame("rows", None, 0), order_by=col("any_row").sort(ascending=True))) \ - .alias("forward_row_sum"), - F.sum(col("ones"))\ - .over(Window(window_frame=WindowFrame("rows", None, 0), order_by=col("any_row").sort(ascending=False))) \ - .alias("reverse_row_sum"), - ) - - # Collect the intermediate window results (this should work) - print("Collecting Window Results...") - df.collect() - - # THIS IS THE FIX TEST - print("Attempting to use .aggregate() instead of .select() ...") - - # We use an empty list [] for group_by, and the function for the aggregate - df.aggregate( - [], - [F.first_value(col("forward_row_sum"), order_by=col("any_row"))] - ).collect() - - print("Success! .aggregate() worked.") - -if __name__ == "__main__": - main() \ No newline at end of file From c7b3663b5a83030d30689e56c0eb201136c61c8f Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 11 Feb 2026 13:48:40 -0500 Subject: [PATCH 3/3] Update all aggregate functions to have an example usage that is correct --- python/datafusion/functions.py | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 89645104a..2aed9dd39 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -1779,7 +1779,7 @@ def array_agg( For example:: - df.select(array_agg(col("a"), order_by="b")) + df.aggregate([], array_agg(col("a"), order_by="b")) """ order_by_raw = sort_list_to_raw_sort_list(order_by) filter_raw = filter.expr if filter is not None else None @@ -1941,7 +1941,7 @@ def median( def min(expression: Expr, filter: Expr | None = None) -> Expr: - """Returns the minimum value of the argument. + """Aggregate function that returns the minimum value of the argument. If using the builder functions described in ref:`_aggregation` this function ignores the options ``order_by``, ``null_treatment``, and ``distinct``. @@ -2268,11 +2268,6 @@ def first_value( ) -> Expr: """Returns the first value in a group of values. - .. note:: - This is an aggregate function. It must be used within - :py:meth:`~datafusion.dataframe.DataFrame.aggregate` and generally cannot be - used with :py:meth:`~datafusion.dataframe.DataFrame.select`. - This aggregate function will return the first value in the partition. If using the builder functions described in ref:`_aggregation` this function ignores @@ -2287,7 +2282,7 @@ def first_value( For example:: - df.select(first_value(col("a"), order_by="ts")) + df.aggregate([], first_value(col("a"), order_by="ts")) """ order_by_raw = sort_list_to_raw_sort_list(order_by) filter_raw = filter.expr if filter is not None else None @@ -2324,7 +2319,7 @@ def last_value( For example:: - df.select(last_value(col("a"), order_by="ts")) + df.aggregate([], last_value(col("a"), order_by="ts")) """ order_by_raw = sort_list_to_raw_sort_list(order_by) filter_raw = filter.expr if filter is not None else None @@ -2363,7 +2358,7 @@ def nth_value( For example:: - df.select(nth_value(col("a"), 2, order_by="ts")) + df.aggregate([], nth_value(col("a"), 2, order_by="ts")) """ order_by_raw = sort_list_to_raw_sort_list(order_by) filter_raw = filter.expr if filter is not None else None @@ -2848,7 +2843,7 @@ def string_agg( For example:: - df.select(string_agg(col("a"), ",", order_by="b")) + df.aggregate([], string_agg(col("a"), ",", order_by="b")) """ order_by_raw = sort_list_to_raw_sort_list(order_by) filter_raw = filter.expr if filter is not None else None