Skip to content

SEDONA-738 Add sedonadb worker#2593

Open
Imbruced wants to merge 6 commits intomasterfrom
add-sedona-worker-daemon-mode
Open

SEDONA-738 Add sedonadb worker#2593
Imbruced wants to merge 6 commits intomasterfrom
add-sedona-worker-daemon-mode

Conversation

@Imbruced
Copy link
Member

@Imbruced Imbruced commented Jan 18, 2026

Did you read the Contributor Guide?

Is this PR related to a ticket?

  • Yes, and the PR name follows the format [SEDONA-738] my subject.

What changes were proposed in this PR?

Sedona vectorized udf (Apache Arrow exchange), which is utilizing the SedonaDB. It supports:

  • scalar functions
  • daemon mode

How was this patch tested?

unit tests

Did this PR include necessary documentation updates?

TODO

@github-actions github-actions bot added sedona-python sedona-spark github_actions Pull requests that update GitHub Actions code root labels Jan 18, 2026
@Imbruced
Copy link
Member Author

image

@Imbruced
Copy link
Member Author

Working on proper benchmarking

@Imbruced
Copy link
Member Author

Items on the list are the extensions to the Sedona DB vectorized UDFs:

  • support for Spark 4.0
  • vectorized table functions (Sedona DB, table object would be an input to the function)
  • implementing additional serialization method in SedonaDB to reduce the amount of transformation for table functions
  • aggregate functions?
  • adding geopandas as other method

@Imbruced
Copy link
Member Author

@jiayuasu @paleolimbot I think we can start reviewing the changes and the ideas that I am proposing in this MR. What I observed is that this way, UDF can be even faster than native Sedona functions like ST_Buffer. But, for instance, ST_Area is three times slower, and I guess it depends on the specific function. But what is more important, the performance is better than the previous UDFs in Sedona. I would mark this functionality as experimental.

Also, I haven't included a documentation update, as we might decide during the review that this MR needs adjustment.

@Imbruced Imbruced marked this pull request as ready for review January 25, 2026 21:27
@Imbruced Imbruced requested a review from jiayuasu as a code owner January 25, 2026 21:27
@Imbruced
Copy link
Member Author

This piece of code is working only for Spark 3.5, but I plan to extend it for Spark 4.0

@Imbruced
Copy link
Member Author

I would like to extend it to include table-defined user functions, which will allow us to operate on the entire SedonaDB dataframe.

cd python
uv add apache-flink==1.20.1
uv sync
# uv sync --extra flink
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to remove

matrix:
os: ['ubuntu-latest', 'windows-latest', 'macos-15']
python: ['3.11', '3.10', '3.9', '3.8']
python: ['3.11', '3.10', '3.9']
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had trouble integrating with Python 3.8, it's already one year since it reached EOL, what would you think about removing it? and maybe start supporting Python 3.12 and 3.13

# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is almost a copy-paste of what is in Apache Spark. The only difference is the import worker function

from sedona.spark.worker.worker import main as worker_main

I don't know what a better approach is, using the import of functions like manager?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if monkeypatching the worker_main property of PySpark's daemon module after importing it would work, but I still prefer the current approach. Maintaining a fork of daemon.py is fine once we know where are the modified pieces.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I agree, I ll add that information in the header

crs = self.geom_offsets[arg]
fields.append(
f"ST_GeomFromSedonaSpark(_{arg}, 'EPSG:{crs}') AS _{arg}"
) # nosec
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretical SQL injection, which is not causing any harm here.

return Py_BuildValue("(Kibi)", geom, geom_type_id, has_z, length);
}

static PyObject *to_sedona_func(PyObject *self, PyObject *args) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the Sedona speedup is available, instead of translating to wkb and then loading from wkb with shapely, we can create shapely objects directly to speed up vectorized UDFs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it could also be applied to _apply_shapely_series_udf and _apply_geo_series_udf.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think it could be, but with some modifications to the existing Python and Scala code. That's what I suppose

"20",
)
# Pandas on PySpark doesn't work with ANSI mode, which is enabled by default
.config("spark.executor.memory", "10G")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To remove, forgot to remove it after testing.

from setuptools import setup
import numpy

setup(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is needed to make numpy C wrappers available

val sedonaArrowStrategy = Try(
Class
.forName("org.apache.spark.sql.udf.SedonaArrowStrategy")
.forName("org.apache.spark.sql.execution.python.SedonaArrowStrategy")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need some execution, python private methods from spark

case _ => None
}

schema
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

infer for geometry fields by taking the firs value

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<!-- <version>3.12.0</version>-->
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to remove

@jiayuasu jiayuasu requested a review from Kontinuation January 28, 2026 07:56
@jiayuasu
Copy link
Member

@Kontinuation can you take a look?

@jiayuasu jiayuasu added this to the sedona-1.9.0 milestone Feb 6, 2026
Copy link
Member

@Kontinuation Kontinuation left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially thought that this PR is for enabling offloading some ST functions to SedonaDB to accelerate the evaluation of some functions supported by SedonaDB, but it does not seem to be the case after taking a closer look at it. The main purpose seems to delegate UDFs to SedonaDB, so that the vectorized UDFs will be executed by SedonaDB instead of PySpark. I'm not sure if my understanding is correct.

I'm a bit of curious why adding another level of indirection result in improvement in performance, and if SedonaDB is really playing an important role in the performance improvement.

Comment on lines +88 to +100
val geom = rowMatched.get.get(index, GeometryUDT).asInstanceOf[Array[Byte]]
val preambleByte = geom(0) & 0xff
val hasSrid = (preambleByte & 0x01) != 0

var srid = 0
if (hasSrid) {
val srid2 = (geom(1) & 0xff) << 16
val srid1 = (geom(2) & 0xff) << 8
val srid0 = geom(3) & 0xff
srid = srid2 | srid1 | srid0
}

(index, srid)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can extract the code for parsing srid in

checkBufferSize(buffer, 8);
int preambleByte = buffer.getByte(0) & 0xFF;
int wkbType = preambleByte >> 4;
CoordinateType coordType = CoordinateType.valueOf((preambleByte & 0x0F) >> 1);
boolean hasSrid = (preambleByte & 0x01) != 0;
buffer.setCoordinateType(coordType);
int srid = 0;
if (hasSrid) {
int srid2 = (buffer.getByte(1) & 0xFF) << 16;
int srid1 = (buffer.getByte(2) & 0xFF) << 8;
int srid0 = buffer.getByte(3) & 0xFF;
srid = (srid2 | srid1 | srid0);
}
to a static function

Comment on lines +72 to +78
val row = iterator.next()

val rowMatched = row match {
case generic: GenericInternalRow =>
Some(generic)
case _ => None
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are only taking the SRID of the geometry value in the first row as the SRID of the entire field, this does not work well with geometry data with mixed SRIDs. SedonaDB has item-crs data type added for such data: apache/sedona-db#410, I think we should always use this to bridge Sedona and SedonaDB.

mem.map(_ / cores)
}

import java.io._
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to the top of the file

import java.io.DataOutputStream
import java.net.Socket

private[python] trait SedonaPythonArrowInput[IN] extends PythonArrowInput[IN] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest add a github link to the comment noting where most of the code was taken from. This suggestion also applies to SedonaPythonArrowOutput.

Comment on lines +207 to +209
eval_type = 6200
if sedona_db_speedup_enabled:
eval_type = 6201
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Define thse eval types as constants such as SQL_SCALAR_SEDONA_DB_UDF. I believe that we should follow the same pattern as SEDONA_SCALAR_EVAL_TYPE.

Comment on lines +275 to +276
PyArrayObject *array = (PyArrayObject *)input_obj;
PyObject **objs = (PyObject **)PyArray_DATA(array);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check the type of input_obj using PyArray_Check before casting it to PyArrayObject and calling PyArray_* methods?

# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if monkeypatching the worker_main property of PySpark's daemon module after importing it would work, but I still prefer the current approach. Maintaining a fork of daemon.py is fine once we know where are the modified pieces.

df = self.db.create_data_frame(table)
table_name = f"my_table_{index}"

df.to_view(table_name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need overwrite=True here?

Comment on lines 379 to 382
{"from_sedona_func", from_sedona_func, METH_VARARGS,
"Deserialize bytes-like object to geometry object."},
{"to_sedona_func", to_sedona_func, METH_VARARGS,
"Deserialize bytes-like object to geometry object."},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These functions are for working with arrays, we should clarify this in the description.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point

Comment on lines +200 to +207
def register_sedona_db_udf(infile, pickle_ser) -> UDFInfo:
num_udfs = read_int(infile)

udf = None
for _ in range(num_udfs):
udf = read_udf(infile, pickle_ser)

return udf
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intended to discard udfs except the last one?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it is supporting only one level nesting functions so far, I would like to extend this functionallity in next MRs to not overwhelm in reviews

@Imbruced
Copy link
Member Author

#2593 (comment) yeah, that's a good question, I am not sure if the monkeypatching will be too hacky, maybe we can add the info in the file header that it's one to one copy with what is in Apache Spark with one changed line

@Imbruced
Copy link
Member Author

@Kontinuation
#2593 (review)

How does it improve the performance of what we already have:

  • Standard udf Python function transfers data one by one, which suffers from the Python object serialization
  • Standard udf Python with C serialization code, suffers from the penalty of sending data over the network
  • Vectorized udfs which we already have, are using the WKB as the internal transfer format, so they suffer from the WKB to Sedona and Sedona to WKB translations

Current solution mitigates all those issues:

  • using Arrow and sends data in batches
  • using Sedona serde C code to convert the data directly to shapely and from shapely

Instead of SedonaDB, we could use GeoPandas, as Apache Spark already does with Pandas. However, we already have SedonaDB in the ecosystem, so why not use it? I guess the Python UDFs in SedonaDB will be improved, now in the most optimized version we run Shapely over the Arrow arrays, which is more efficient than we already have to the point where the buffer version is faster than the native one we have in Sedona.

@Imbruced
Copy link
Member Author

@Kontinuation, I'll fix the errors once we have consensus on the direction. I am planning to add support for the GeoPandas alongside the SedonaDB.

@Imbruced
Copy link
Member Author

@Kontinuation I would like to perceive as a fundamental to make Spatial Python UDFs more efficient

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

github_actions Pull requests that update GitHub Actions code root sedona-python sedona-spark

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants

Comments