-
Notifications
You must be signed in to change notification settings - Fork 4.1k
GH-49376: [Python][Parquet] Add ability to write Bloom filters from pyarrow #49377
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
a9ca4d7
c0fb063
176fe77
c74fa69
c67af90
1b39f04
965729a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -51,6 +51,10 @@ cimport cpython as cp | |
| _DEFAULT_ROW_GROUP_SIZE = 1024*1024 | ||
| _MAX_ROW_GROUP_SIZE = 64*1024*1024 | ||
|
|
||
| # from definition of BloomFilterOptions struct | ||
| _DEFAULT_BLOOM_FILTER_NDV = 1024*1024 | ||
| _DEFAULT_BLOOM_FILTER_FPP = 0.05 | ||
|
|
||
|
|
||
| cdef Type _unwrap_list_type(obj) except *: | ||
| if obj is ListType: | ||
|
|
@@ -1992,13 +1996,15 @@ cdef shared_ptr[WriterProperties] _create_writer_properties( | |
| write_page_checksum=False, | ||
| sorting_columns=None, | ||
| store_decimal_as_integer=False, | ||
| use_content_defined_chunking=False) except *: | ||
| use_content_defined_chunking=False, | ||
| bloom_filter_options=None) except *: | ||
|
|
||
| """General writer properties""" | ||
| cdef: | ||
| shared_ptr[WriterProperties] properties | ||
| WriterProperties.Builder props | ||
| CdcOptions cdc_options | ||
| BloomFilterOptions bloom_opts | ||
|
|
||
| # data_page_version | ||
|
|
||
|
|
@@ -2122,6 +2128,48 @@ cdef shared_ptr[WriterProperties] _create_writer_properties( | |
| raise TypeError( | ||
| "'column_encoding' should be a dictionary or a string") | ||
|
|
||
| # bloom filters | ||
| if bloom_filter_options is not None: | ||
| if isinstance(bloom_filter_options, dict): | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we factor this out into a helper function? This function is becoming much too large IMHO. |
||
| # for each entry in bloom_filter_options, {"path": {"ndv": ndv, "fpp", fpp}} | ||
| # convert (ndv,fpp) to BloomFilterOptions struct and pass to props | ||
| for column, _bloom_opts in bloom_filter_options.items(): | ||
| # set defaults | ||
| bloom_opts.ndv = _DEFAULT_BLOOM_FILTER_NDV | ||
| bloom_opts.fpp = _DEFAULT_BLOOM_FILTER_FPP | ||
| if isinstance(_bloom_opts, dict): | ||
| if "ndv" in _bloom_opts: | ||
| ndv = _bloom_opts["ndv"] | ||
| if isinstance(ndv, int): | ||
| if ndv < 0: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ndv <= 0?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure if NDV can be 0 or not...I can't seem to find validation in either the C++ nor rust implementations. This test just ensures it's a positive value. Honestly, I don't see why you'd need a bloom filter for a column with no values, so I'm fine following your suggestion. |
||
| raise ValueError( | ||
| f"'ndv' for column '{column}' must be positive, got {ndv}") | ||
| bloom_opts.ndv = ndv | ||
| else: | ||
| raise TypeError( | ||
| f"'ndv' for column '{column}' must be an int") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we notice they're in bloom filter options? |
||
| if "fpp" in _bloom_opts: | ||
| fpp = _bloom_opts["fpp"] | ||
| if isinstance(fpp, float): | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know whether casting to
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think so. We don't want to accept strings, for example. |
||
| if fpp <= 0.0 or fpp >= 1.0: | ||
| raise ValueError( | ||
| f"'fpp' for column '{column}' must be in (0.0, 1,0), got {fpp}") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 1,0 typo here? |
||
| bloom_opts.fpp = fpp | ||
| else: | ||
| raise TypeError( | ||
| f"'fpp' for column '{column}' must be a float") | ||
| elif isinstance(_bloom_opts, bool): | ||
| if not _bloom_opts: | ||
|
mapleFU marked this conversation as resolved.
Outdated
|
||
| props.disable_bloom_filter(tobytes(column)) | ||
| continue | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand that bloom filter is disabled by default but should we be explicit here calling
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I agree it would be better to be explicit here. Thanks! |
||
| else: | ||
| raise TypeError( | ||
| f"'bloom_filter_options:{column}' must be a boolean or a dictionary") | ||
|
|
||
| props.enable_bloom_filter(tobytes(column), bloom_opts) | ||
| else: | ||
| raise TypeError("'bloom_filter_options' must be a dictionary") | ||
|
|
||
| # size limits | ||
| if data_page_size is not None: | ||
| props.data_pagesize(data_page_size) | ||
|
|
@@ -2317,7 +2365,8 @@ cdef class ParquetWriter(_Weakrefable): | |
| sorting_columns=None, | ||
| store_decimal_as_integer=False, | ||
| use_content_defined_chunking=False, | ||
| write_time_adjusted_to_utc=False): | ||
| write_time_adjusted_to_utc=False, | ||
| bloom_filter_options=None): | ||
| cdef: | ||
| shared_ptr[WriterProperties] properties | ||
| shared_ptr[ArrowWriterProperties] arrow_properties | ||
|
|
@@ -2353,7 +2402,8 @@ cdef class ParquetWriter(_Weakrefable): | |
| write_page_checksum=write_page_checksum, | ||
| sorting_columns=sorting_columns, | ||
| store_decimal_as_integer=store_decimal_as_integer, | ||
| use_content_defined_chunking=use_content_defined_chunking | ||
| use_content_defined_chunking=use_content_defined_chunking, | ||
| bloom_filter_options=bloom_filter_options | ||
| ) | ||
| arrow_properties = _create_arrow_writer_properties( | ||
| use_deprecated_int96_timestamps=use_deprecated_int96_timestamps, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -951,6 +951,30 @@ def _sanitize_table(table, new_schema, flavor): | |
| are expressed in reference to midnight in the UTC timezone. | ||
| If False (the default), the TIME columns are assumed to be expressed | ||
| in reference to midnight in an unknown, presumably local, timezone. | ||
| bloom_filter_options : dict, default None | ||
| Create Bloom filters for the columns specified by the provided `dict`. | ||
|
|
||
| Bloom filters can be configured with two parameters: number of distinct values | ||
| (NDV), and false-positive probability (FPP). | ||
|
|
||
| Bloom filters are most effective for high-cardinality columns. A good default | ||
| is to set NDV equal to the number of rows. Lower values reduce disk usage but | ||
| may not be worthwhile for very small NDVs. Increasing NDV (without increasing FPP) | ||
| increases disk and memory usage. | ||
|
|
||
| Lower FPP values require more disk and memory space. For a fixed NDV, the | ||
| space requirement grows roughly proportional to log(1/FPP). Recommended | ||
| values are 0.1, 0.05, or 0.01. Very small values are counterproductive as | ||
| the bitset may exceed the size of the actual data. Set NDV appropriately | ||
| to minimize space usage. | ||
|
|
||
| The keys of the `dict` are column paths. For each path, the value can be either: | ||
|
|
||
| - A boolean, with ``True`` indicating that a Bloom filter should be produced with | ||
| the default values of `NDV=1048576` and `FPP=0.05`. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lower case? |
||
| - A dictionary, with keys `ndv` and `fpp`. `ndv` must be a positive integer, and | ||
| `fpp` must be a float between 0.0 and 1.0. Default values will be used for any | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unexists ndv and fpp is regarded as 1048576/0.05 ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. I've reworked the docs to hopefully make this clearer. |
||
| missing keys. | ||
| """ | ||
|
|
||
| _parquet_writer_example_doc = """\ | ||
|
|
@@ -1980,6 +2004,7 @@ def write_table(table, where, row_group_size=None, version='2.6', | |
| store_decimal_as_integer=False, | ||
| write_time_adjusted_to_utc=False, | ||
| max_rows_per_page=None, | ||
| bloom_filter_options=None, | ||
| **kwargs): | ||
| # Implementor's note: when adding keywords here / updating defaults, also | ||
| # update it in write_to_dataset and _dataset_parquet.pyx ParquetFileWriteOptions | ||
|
|
@@ -2013,6 +2038,7 @@ def write_table(table, where, row_group_size=None, version='2.6', | |
| store_decimal_as_integer=store_decimal_as_integer, | ||
| write_time_adjusted_to_utc=write_time_adjusted_to_utc, | ||
| max_rows_per_page=max_rows_per_page, | ||
| bloom_filter_options=bloom_filter_options, | ||
| **kwargs) as writer: | ||
| writer.write_table(table, row_group_size=row_group_size) | ||
| except Exception: | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.