Conversation
|
Working on proper benchmarking |
|
Items on the list are the extensions to the Sedona DB vectorized UDFs:
|
|
@jiayuasu @paleolimbot I think we can start reviewing the changes and the ideas that I am proposing in this MR. What I observed is that this way, UDF can be even faster than native Sedona functions like ST_Buffer. But, for instance, ST_Area is three times slower, and I guess it depends on the specific function. But what is more important, the performance is better than the previous UDFs in Sedona. I would mark this functionality as experimental. Also, I haven't included a documentation update, as we might decide during the review that this MR needs adjustment. |
|
This piece of code is working only for Spark 3.5, but I plan to extend it for Spark 4.0 |
|
I would like to extend it to include table-defined user functions, which will allow us to operate on the entire SedonaDB dataframe. |
.github/workflows/pyflink.yml
Outdated
| cd python | ||
| uv add apache-flink==1.20.1 | ||
| uv sync | ||
| # uv sync --extra flink |
| matrix: | ||
| os: ['ubuntu-latest', 'windows-latest', 'macos-15'] | ||
| python: ['3.11', '3.10', '3.9', '3.8'] | ||
| python: ['3.11', '3.10', '3.9'] |
There was a problem hiding this comment.
I had trouble integrating with Python 3.8, it's already one year since it reached EOL, what would you think about removing it? and maybe start supporting Python 3.12 and 3.13
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
|
|
There was a problem hiding this comment.
This one is almost a copy-paste of what is in Apache Spark. The only difference is the import worker function
from sedona.spark.worker.worker import main as worker_mainI don't know what a better approach is, using the import of functions like manager?
There was a problem hiding this comment.
I'm not sure if monkeypatching the worker_main property of PySpark's daemon module after importing it would work, but I still prefer the current approach. Maintaining a fork of daemon.py is fine once we know where are the modified pieces.
There was a problem hiding this comment.
yeah I agree, I ll add that information in the header
| crs = self.geom_offsets[arg] | ||
| fields.append( | ||
| f"ST_GeomFromSedonaSpark(_{arg}, 'EPSG:{crs}') AS _{arg}" | ||
| ) # nosec |
There was a problem hiding this comment.
Theoretical SQL injection, which is not causing any harm here.
| return Py_BuildValue("(Kibi)", geom, geom_type_id, has_z, length); | ||
| } | ||
|
|
||
| static PyObject *to_sedona_func(PyObject *self, PyObject *args) { |
There was a problem hiding this comment.
If the Sedona speedup is available, instead of translating to wkb and then loading from wkb with shapely, we can create shapely objects directly to speed up vectorized UDFs.
There was a problem hiding this comment.
I'm not sure if it could also be applied to _apply_shapely_series_udf and _apply_geo_series_udf.
There was a problem hiding this comment.
Yeah, I think it could be, but with some modifications to the existing Python and Scala code. That's what I suppose
python/tests/test_base.py
Outdated
| "20", | ||
| ) | ||
| # Pandas on PySpark doesn't work with ANSI mode, which is enabled by default | ||
| .config("spark.executor.memory", "10G") |
There was a problem hiding this comment.
To remove, forgot to remove it after testing.
| from setuptools import setup | ||
| import numpy | ||
|
|
||
| setup( |
There was a problem hiding this comment.
this is needed to make numpy C wrappers available
| val sedonaArrowStrategy = Try( | ||
| Class | ||
| .forName("org.apache.spark.sql.udf.SedonaArrowStrategy") | ||
| .forName("org.apache.spark.sql.execution.python.SedonaArrowStrategy") |
There was a problem hiding this comment.
need some execution, python private methods from spark
| case _ => None | ||
| } | ||
|
|
||
| schema |
There was a problem hiding this comment.
infer for geometry fields by taking the firs value
spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
Outdated
Show resolved
Hide resolved
| <plugin> | ||
| <groupId>org.apache.maven.plugins</groupId> | ||
| <artifactId>maven-javadoc-plugin</artifactId> | ||
| <!-- <version>3.12.0</version>--> |
|
@Kontinuation can you take a look? |
There was a problem hiding this comment.
I initially thought that this PR is for enabling offloading some ST functions to SedonaDB to accelerate the evaluation of some functions supported by SedonaDB, but it does not seem to be the case after taking a closer look at it. The main purpose seems to delegate UDFs to SedonaDB, so that the vectorized UDFs will be executed by SedonaDB instead of PySpark. I'm not sure if my understanding is correct.
I'm a bit of curious why adding another level of indirection result in improvement in performance, and if SedonaDB is really playing an important role in the performance improvement.
| val geom = rowMatched.get.get(index, GeometryUDT).asInstanceOf[Array[Byte]] | ||
| val preambleByte = geom(0) & 0xff | ||
| val hasSrid = (preambleByte & 0x01) != 0 | ||
|
|
||
| var srid = 0 | ||
| if (hasSrid) { | ||
| val srid2 = (geom(1) & 0xff) << 16 | ||
| val srid1 = (geom(2) & 0xff) << 8 | ||
| val srid0 = geom(3) & 0xff | ||
| srid = srid2 | srid1 | srid0 | ||
| } | ||
|
|
||
| (index, srid) |
There was a problem hiding this comment.
We can extract the code for parsing srid in
to a static function| val row = iterator.next() | ||
|
|
||
| val rowMatched = row match { | ||
| case generic: GenericInternalRow => | ||
| Some(generic) | ||
| case _ => None | ||
| } |
There was a problem hiding this comment.
We are only taking the SRID of the geometry value in the first row as the SRID of the entire field, this does not work well with geometry data with mixed SRIDs. SedonaDB has item-crs data type added for such data: apache/sedona-db#410, I think we should always use this to bridge Sedona and SedonaDB.
| mem.map(_ / cores) | ||
| } | ||
|
|
||
| import java.io._ |
There was a problem hiding this comment.
Move to the top of the file
| import java.io.DataOutputStream | ||
| import java.net.Socket | ||
|
|
||
| private[python] trait SedonaPythonArrowInput[IN] extends PythonArrowInput[IN] { |
There was a problem hiding this comment.
I suggest add a github link to the comment noting where most of the code was taken from. This suggestion also applies to SedonaPythonArrowOutput.
| eval_type = 6200 | ||
| if sedona_db_speedup_enabled: | ||
| eval_type = 6201 |
There was a problem hiding this comment.
Define thse eval types as constants such as SQL_SCALAR_SEDONA_DB_UDF. I believe that we should follow the same pattern as SEDONA_SCALAR_EVAL_TYPE.
| PyArrayObject *array = (PyArrayObject *)input_obj; | ||
| PyObject **objs = (PyObject **)PyArray_DATA(array); |
There was a problem hiding this comment.
Do we need to check the type of input_obj using PyArray_Check before casting it to PyArrayObject and calling PyArray_* methods?
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
|
|
There was a problem hiding this comment.
I'm not sure if monkeypatching the worker_main property of PySpark's daemon module after importing it would work, but I still prefer the current approach. Maintaining a fork of daemon.py is fine once we know where are the modified pieces.
| df = self.db.create_data_frame(table) | ||
| table_name = f"my_table_{index}" | ||
|
|
||
| df.to_view(table_name) |
There was a problem hiding this comment.
Do we need overwrite=True here?
| {"from_sedona_func", from_sedona_func, METH_VARARGS, | ||
| "Deserialize bytes-like object to geometry object."}, | ||
| {"to_sedona_func", to_sedona_func, METH_VARARGS, | ||
| "Deserialize bytes-like object to geometry object."}, |
There was a problem hiding this comment.
These functions are for working with arrays, we should clarify this in the description.
| def register_sedona_db_udf(infile, pickle_ser) -> UDFInfo: | ||
| num_udfs = read_int(infile) | ||
|
|
||
| udf = None | ||
| for _ in range(num_udfs): | ||
| udf = read_udf(infile, pickle_ser) | ||
|
|
||
| return udf |
There was a problem hiding this comment.
Is it intended to discard udfs except the last one?
There was a problem hiding this comment.
yes, it is supporting only one level nesting functions so far, I would like to extend this functionallity in next MRs to not overwhelm in reviews
|
#2593 (comment) yeah, that's a good question, I am not sure if the monkeypatching will be too hacky, maybe we can add the info in the file header that it's one to one copy with what is in Apache Spark with one changed line |
|
How does it improve the performance of what we already have:
Current solution mitigates all those issues:
Instead of SedonaDB, we could use GeoPandas, as Apache Spark already does with Pandas. However, we already have SedonaDB in the ecosystem, so why not use it? I guess the Python UDFs in SedonaDB will be improved, now in the most optimized version we run Shapely over the Arrow arrays, which is more efficient than we already have to the point where the buffer version is faster than the native one we have in Sedona. |
|
@Kontinuation, I'll fix the errors once we have consensus on the direction. I am planning to add support for the GeoPandas alongside the SedonaDB. |
|
@Kontinuation I would like to perceive as a fundamental to make Spatial Python UDFs more efficient |

Did you read the Contributor Guide?
Is this PR related to a ticket?
[SEDONA-738] my subject.What changes were proposed in this PR?
Sedona vectorized udf (Apache Arrow exchange), which is utilizing the SedonaDB. It supports:
How was this patch tested?
unit tests
Did this PR include necessary documentation updates?
TODO