.. currentmodule:: pyspark.sql
Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer data between JVM and Python processes. This currently is most beneficial to Python users that work with Pandas/NumPy data. Its usage is not automatic and might require some minor changes to configuration or code to take full advantage and ensure compatibility. This guide will give a high-level description of how to use Arrow in Spark and highlight any differences when working with Arrow-enabled data.
To use Apache Arrow in PySpark, the recommended version of PyArrow
should be installed.
If you install PySpark using pip, then PyArrow can be brought in as an extra dependency of the
SQL module with the command pip install "pyspark[sql]". Otherwise, you must ensure that PyArrow
is installed and available on all cluster nodes.
You can install it using pip or conda from the conda-forge channel. See PyArrow
installation for details.
From Spark 4.0, you can create a Spark DataFrame from a PyArrow Table with :meth:`SparkSession.createDataFrame`, and you can convert a Spark DataFrame to a PyArrow Table with :meth:`DataFrame.toArrow`.
.. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py
:language: python
:lines: 37-52
:dedent: 4
Note that :meth:`DataFrame.toArrow` results in the collection of all records in the DataFrame to the driver program and should be done on a small subset of the data. Not all Spark and Arrow data types are currently supported and an error can be raised if a column has an unsupported type.
Arrow is available as an optimization when converting a Spark DataFrame to a Pandas DataFrame
using the call :meth:`DataFrame.toPandas` and when creating a Spark DataFrame from a Pandas DataFrame with
:meth:`SparkSession.createDataFrame`. To use Arrow when executing these calls,
the Spark configuration spark.sql.execution.arrow.pyspark.enabled must be set to true. This is enabled by default.
In addition, optimizations enabled by spark.sql.execution.arrow.pyspark.enabled could fallback automatically
to non-Arrow optimization implementation if an error occurs before the actual computation within Spark.
This can be controlled by spark.sql.execution.arrow.pyspark.fallback.enabled.
.. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py
:language: python
:lines: 56-71
:dedent: 4
Using the above optimizations with Arrow will produce the same results as when Arrow is not enabled.
Note that even with Arrow, :meth:`DataFrame.toPandas` results in the collection of all records in the DataFrame to the driver program and should be done on a small subset of the data. Not all Spark data types are currently supported and an error can be raised if a column has an unsupported type. If an error occurs during :meth:`SparkSession.createDataFrame`, Spark will fall back to create the DataFrame without Arrow.
.. currentmodule:: pyspark.sql.functions
Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. A Pandas UDF is defined using the :meth:`pandas_udf` as a decorator or to wrap the function, and no additional configuration is required. A Pandas UDF behaves as a regular PySpark function API in general.
Before Spark 3.0, Pandas UDFs used to be defined with pyspark.sql.functions.PandasUDFType. From Spark 3.0
with Python 3.6+, you can also use Python type hints.
Using Python type hints is preferred and using pyspark.sql.functions.PandasUDFType will be deprecated in
the future release.
.. currentmodule:: pyspark.sql.types
Note that the type hint should use pandas.Series in all cases but there is one variant
that pandas.DataFrame should be used for its input or output type hint instead when the input
or output column is of :class:`StructType`. The following example shows a Pandas UDF which takes long
column, string column and struct column, and outputs a struct column. It requires the function to
specify the type hints of pandas.Series and pandas.DataFrame as below:
.. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py
:language: python
:lines: 75-99
:dedent: 4
In the following sections, it describes the combinations of the supported type hints. For simplicity,
pandas.DataFrame variant is omitted.
.. currentmodule:: pyspark.sql.functions
The type hint can be expressed as pandas.Series, ... -> pandas.Series.
By using :func:`pandas_udf` with the function having such type hints above, it creates a Pandas UDF where the given
function takes one or more pandas.Series and outputs one pandas.Series. The output of the function should
always be of the same length as the input. Internally, PySpark will execute a Pandas UDF by splitting
columns into batches and calling the function for each batch as a subset of the data, then concatenating
the results together.
The following example shows how to create this Pandas UDF that computes the product of 2 columns.
.. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py
:language: python
:lines: 103-133
:dedent: 4
For detailed usage, please see :func:`pandas_udf`.
.. currentmodule:: pyspark.sql.functions
The type hint can be expressed as Iterator[pandas.Series] -> Iterator[pandas.Series].
By using :func:`pandas_udf` with the function having such type hints above, it creates a Pandas UDF where the given
function takes an iterator of pandas.Series and outputs an iterator of pandas.Series. The
length of the entire output from the function should be the same length of the entire input; therefore, it can
prefetch the data from the input iterator as long as the lengths are the same.
In this case, the created Pandas UDF requires one input column when the Pandas UDF is called. To use
multiple input columns, a different type hint is required. See Iterator of Multiple Series to Iterator
of Series.
It is also useful when the UDF execution requires initializing some states although internally it works identically as Series to Series case. The pseudocode below illustrates the example.
@pandas_udf("long")
def calculate(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
# Do some expensive initialization with a state
state = very_expensive_initialization()
for x in iterator:
# Use that state for the whole iterator.
yield calculate_with_state(x, state)
df.select(calculate("value")).show()The following example shows how to create this Pandas UDF:
.. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py
:language: python
:lines: 137-159
:dedent: 4
For detailed usage, please see :func:`pandas_udf`.
.. currentmodule:: pyspark.sql.functions
The type hint can be expressed as Iterator[Tuple[pandas.Series, ...]] -> Iterator[pandas.Series].
By using :func:`pandas_udf` with the function having such type hints above, it creates a Pandas UDF where the
given function takes an iterator of a tuple of multiple pandas.Series and outputs an iterator of pandas.Series.
In this case, the created pandas UDF requires multiple input columns as many as the series in the tuple
when the Pandas UDF is called. Otherwise, it has the same characteristics and restrictions as the Iterator of Series
to Iterator of Series case.
The following example shows how to create this Pandas UDF:
.. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py
:language: python
:lines: 163-186
:dedent: 4
For detailed usage, please see :func:`pandas_udf`.
.. currentmodule:: pyspark.sql.functions
The type hint can be expressed as pandas.Series, ... -> Any.
By using :func:`pandas_udf` with the function having such type hints above, it creates a Pandas UDF similar
to PySpark's aggregate functions. The given function takes pandas.Series and returns a scalar value.
The return type should be a primitive data type, and the returned scalar can be either a python
primitive type, e.g., int or float or a numpy data type, e.g., numpy.int64 or numpy.float64.
Any should ideally be a specific scalar type accordingly.
.. currentmodule:: pyspark.sql
This UDF can be also used with :meth:`GroupedData.agg` and Window.
It defines an aggregation from one or more pandas.Series to a scalar value, where each pandas.Series
represents a column within the group or window.
Note that this type of UDF does not support partial aggregation and all data for a group or window will be loaded into memory. Also, only unbounded window is supported with Grouped aggregate Pandas UDFs currently. The following example shows how to use this type of UDF to compute mean with a group-by and window operations:
.. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py
:language: python
:lines: 190-231
:dedent: 4
.. currentmodule:: pyspark.sql.functions
For detailed usage, please see :func:`pandas_udf`.
.. currentmodule:: pyspark.sql
Pandas Function APIs can directly apply a Python native function against the whole :class:`DataFrame` by using Pandas instances. Internally it works similarly with Pandas UDFs by using Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. However, a Pandas Function API behaves as a regular API under PySpark :class:`DataFrame` instead of :class:`Column`, and Python type hints in Pandas Functions APIs are optional and do not affect how it works internally at this moment although they might be required in the future.
.. currentmodule:: pyspark.sql.functions
From Spark 3.0, grouped map pandas UDF is now categorized as a separate Pandas Function API,
DataFrame.groupby().applyInPandas(). It is still possible to use it with pyspark.sql.functions.PandasUDFType
and DataFrame.groupby().apply() as it was; however, it is preferred to use
DataFrame.groupby().applyInPandas() directly. Using pyspark.sql.functions.PandasUDFType will be deprecated
in the future.
.. currentmodule:: pyspark.sql
Grouped map operations with Pandas instances are supported by DataFrame.groupby().applyInPandas()
which requires a Python function that takes a pandas.DataFrame and return another pandas.DataFrame.
It maps each group to each pandas.DataFrame in the Python function.
This API implements the "split-apply-combine" pattern which consists of three steps:
- Split the data into groups by using :meth:`DataFrame.groupBy`.
- Apply a function on each group. The input and output of the function are both
pandas.DataFrame. The input data contains all the rows and columns for each group. - Combine the results into a new PySpark :class:`DataFrame`.
To use DataFrame.groupBy().applyInPandas(), the user needs to define the following:
- A Python function that defines the computation for each group.
- A
StructTypeobject or a string that defines the schema of the output PySpark :class:`DataFrame`.
The column labels of the returned pandas.DataFrame must either match the field names in the
defined output schema if specified as strings, or match the field data types by position if not
strings, e.g. integer indices. See pandas.DataFrame
on how to label columns when constructing a pandas.DataFrame.
Note that all data for a group will be loaded into memory before the function is applied. This can lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for maxRecordsPerBatch is not applied on groups and it is up to the user to ensure that the grouped data will fit into the available memory.
The following example shows how to use DataFrame.groupby().applyInPandas() to subtract the mean from each value
in the group.
.. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py
:language: python
:lines: 235-253
:dedent: 4
For detailed usage, please see please see :meth:`GroupedData.applyInPandas`
Map operations with Pandas instances are supported by :meth:`DataFrame.mapInPandas` which maps an iterator
of pandas.DataFrames to another iterator of pandas.DataFrames that represents the current
PySpark :class:`DataFrame` and returns the result as a PySpark :class:`DataFrame`. The function takes and outputs
an iterator of pandas.DataFrame. It can return the output of arbitrary length in contrast to some
Pandas UDFs although internally it works similarly with Series to Series Pandas UDF.
The following example shows how to use :meth:`DataFrame.mapInPandas`:
.. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py
:language: python
:lines: 257-268
:dedent: 4
For detailed usage, please see :meth:`DataFrame.mapInPandas`.
.. currentmodule:: pyspark.sql
Co-grouped map operations with Pandas instances are supported by DataFrame.groupby().cogroup().applyInPandas() which
allows two PySpark :class:`DataFrame`s to be cogrouped by a common key and then a Python function applied to each
cogroup. It consists of the following steps:
- Shuffle the data such that the groups of each dataframe which share a key are cogrouped together.
- Apply a function to each cogroup. The input of the function is two
pandas.DataFrame(with an optional tuple representing the key). The output of the function is apandas.DataFrame. - Combine the
pandas.DataFrames from all groups into a new PySpark :class:`DataFrame`.
To use groupBy().cogroup().applyInPandas(), the user needs to define the following:
- A Python function that defines the computation for each cogroup.
- A
StructTypeobject or a string that defines the schema of the output PySpark :class:`DataFrame`.
The column labels of the returned pandas.DataFrame must either match the field names in the
defined output schema if specified as strings, or match the field data types by position if not
strings, e.g. integer indices. See pandas.DataFrame.
on how to label columns when constructing a pandas.DataFrame.
Note that all data for a cogroup will be loaded into memory before the function is applied. This can lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for maxRecordsPerBatch is not applied and it is up to the user to ensure that the cogrouped data will fit into the available memory.
The following example shows how to use DataFrame.groupby().cogroup().applyInPandas() to perform an asof join between two datasets.
.. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py
:language: python
:lines: 272-294
:dedent: 4
For detailed usage, please see :meth:`PandasCogroupedOps.applyInPandas`
Arrow Python UDFs are user defined functions that are executed row-by-row, utilizing Arrow for efficient batch data
transfer and serialization. To define an Arrow Python UDF, you can use the :meth:`udf` decorator or wrap the function
with the :meth:`udf` method, ensuring the useArrow parameter is set to True. Additionally, you can enable Arrow
optimization for Python UDFs throughout the entire SparkSession by setting the Spark configuration
spark.sql.execution.pythonUDF.arrow.enabled to true. It's important to note that the Spark configuration takes
effect only when useArrow is either not set or set to None.
The type hints for Arrow Python UDFs should be specified in the same way as for default, pickled Python UDFs.
Here's an example that demonstrates the usage of both a default, pickled Python UDF and an Arrow Python UDF:
.. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py
:language: python
:lines: 298-316
:dedent: 4
Compared to the default, pickled Python UDFs, Arrow Python UDFs provide a more coherent type coercion mechanism. UDF type coercion poses challenges when the Python instances returned by UDFs do not align with the user-specified return type. The default, pickled Python UDFs' type coercion has certain limitations, such as relying on None as a fallback for type mismatches, leading to potential ambiguity and data loss. Additionally, converting date, datetime, and tuples to strings can yield ambiguous results. Arrow Python UDFs, on the other hand, leverage Arrow's capabilities to standardize type coercion and address these issues effectively.
Type coercion differences are introduced by the following changes:
- Since Spark 4.2, Arrow optimization is enabled by default for regular Python UDFs. The full type coercion difference is summarized in the tables here. To disable Arrow optimization, set
spark.sql.execution.pythonUDF.arrow.enabledto false. - Since Spark 4.1, unnecessary conversion to pandas instances in Arrow-optimized Python UDF is removed in the serializer when
spark.sql.legacy.execution.pythonUDF.pandas.conversion.enabledis disabled. The behavior difference is summarized in the tables here. To restore the legacy behavior, setspark.sql.legacy.execution.pythonUDF.pandas.conversion.enabledto true.
.. currentmodule:: pyspark.sql.types
Currently, all Spark SQL data types are supported by Arrow-based conversion except :class:`ArrayType` of :class:`TimestampType`. :class:`MapType` and :class:`ArrayType` of nested :class:`StructType` are only supported when using PyArrow 2.0.0 and above.
Data partitions in Spark are converted into Arrow record batches, which can temporarily lead to
high memory usage in the JVM. To avoid possible out of memory exceptions, the size of the Arrow
record batches can be adjusted by setting the conf spark.sql.execution.arrow.maxRecordsPerBatch
to an integer that will determine the maximum number of rows for each batch. The default value is
10,000 records per batch. If the number of columns is large, the value should be adjusted
accordingly. Using this limit, each data partition will be made into 1 or more record batches for
processing.
.. currentmodule:: pyspark.sql
Spark internally stores timestamps as UTC values, and timestamp data that is brought in without
a specified time zone is converted as local time to UTC with microsecond resolution. When timestamp
data is exported or displayed in Spark, the session time zone is used to localize the timestamp
values. The session time zone is set with the configuration spark.sql.session.timeZone and will
default to the JVM system local time zone if not set. Pandas uses a datetime64 type with nanosecond
resolution, datetime64[ns], with optional time zone on a per-column basis.
When timestamp data is transferred from Spark to Pandas it will be converted to nanoseconds
and each column will be converted to the Spark session time zone then localized to that time
zone, which removes the time zone and displays values as local time. This will occur
when calling :meth:`DataFrame.toPandas()` or pandas_udf with timestamp columns.
When timestamp data is transferred from Spark to a PyArrow Table, it will remain in microsecond resolution with the UTC time zone. This occurs when calling :meth:`DataFrame.toArrow()` with timestamp columns.
When timestamp data is transferred from Pandas or PyArrow to Spark, it will be converted to UTC
microseconds. This occurs when calling :meth:`SparkSession.createDataFrame` with a Pandas DataFrame
or PyArrow Table, or when returning a timestamp from a pandas_udf. These conversions are done
automatically to ensure Spark will have data in the expected format, so it is not necessary to do
any of these conversions yourself. Any nanosecond values will be truncated.
Note that a standard UDF (non-Pandas) will load timestamp data as Python datetime objects, which is
different from a Pandas timestamp. It is recommended to use Pandas time series functionality when
working with timestamps in pandas_udfs to get the best performance, see
here for details.
For usage with pyspark.sql, the minimum supported versions of Pandas is 2.2.0 and PyArrow is 11.0.0. Higher versions may be used, however, compatibility and data correctness can not be guaranteed and should be verified by the user.
Since Spark 3.2, the Spark configuration spark.sql.execution.arrow.pyspark.selfDestruct.enabled
can be used to enable PyArrow's self_destruct feature, which can save memory when creating a
Pandas DataFrame via toPandas by freeing Arrow-allocated memory while building the Pandas
DataFrame. This option can also save memory when creating a PyArrow Table via toArrow.
This option is experimental. When used with toPandas, some operations may fail on the resulting
Pandas DataFrame due to immutable backing arrays. Typically, you would see the error
ValueError: buffer source array is read-only. Newer versions of Pandas may fix these errors by
improving support for such cases. You can work around this error by copying the column(s)
beforehand. Additionally, this conversion may be slower because it is single-threaded.