Skip to content

Commit 2ab47cf

Browse files
authored
Fast list prefix update (#28)
* updated prefix to apply to start range of first process * fix slow listing bug when using prefixes * add comment explanation for prefix removal * updated to include safety for None input * add verbose test output * update fake_gcs * reduce e2e object count * fix removeprefix version error * comment explaining prefix removal
1 parent 972407d commit 2ab47cf

7 files changed

Lines changed: 42 additions & 21 deletions

File tree

dataflux_core/benchmarking/dataflux_client_bench.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def parse_args():
3030
parser.add_argument("--bucket-file-size", type=int, default=None)
3131
parser.add_argument("--num-workers", type=int, default=10)
3232
parser.add_argument("--max-compose-bytes", type=int, default=100000000)
33-
parser.add_argument("--prefix", type=str, default=None)
33+
parser.add_argument("--prefix", type=str, default="")
3434
return parser.parse_args()
3535

3636

dataflux_core/benchmarking/dataflux_client_parallel_bench.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def parse_args():
3131
parser.add_argument("--num-workers", type=int, default=10)
3232
parser.add_argument("--max-compose-bytes", type=int, default=100000000)
3333
parser.add_argument("--parallelization", type=int, default=20)
34-
parser.add_argument("--prefix", type=str, default=None)
34+
parser.add_argument("--prefix", type=str, default="")
3535
return parser.parse_args()
3636

3737

dataflux_core/benchmarking/dataflux_client_threaded_bench.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def parse_args():
3131
parser.add_argument("--num-workers", type=int, default=10)
3232
parser.add_argument("--max-compose-bytes", type=int, default=100000000)
3333
parser.add_argument("--threads", type=int, default=20)
34-
parser.add_argument("--prefix", type=str, default=None)
34+
parser.add_argument("--prefix", type=str, default="")
3535
return parser.parse_args()
3636

3737

dataflux_core/fast_list.py

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,22 @@
2929
DEFAULT_ALLOWED_CLASS = ["STANDARD"]
3030

3131

32+
def remove_prefix(text: str, prefix: str):
33+
"""Helper function that removes prefix from a string.
34+
35+
Args:
36+
text: String of text to trim a prefix from.
37+
prefix: String of text that will be trimmed from text.
38+
39+
Returns:
40+
Text value with the specified prefix removed.
41+
"""
42+
# Note that as of python 3.9 removeprefix is built into string.
43+
if text.startswith(prefix):
44+
return text[len(prefix) :]
45+
return text
46+
47+
3248
class ListWorker(object):
3349
"""Worker that lists a range of objects from a GCS bucket.
3450
@@ -75,7 +91,7 @@ def __init__(
7591
client: storage.Client = None,
7692
skip_compose: bool = True,
7793
list_directory_objects: bool = False,
78-
prefix: str = None,
94+
prefix: str = "",
7995
allowed_storage_classes: list[str] = DEFAULT_ALLOWED_CLASS,
8096
max_retries: int = 5,
8197
):
@@ -98,7 +114,7 @@ def __init__(
98114
self.default_alph = "a"
99115
self.skip_compose = skip_compose
100116
self.list_directory_objects = list_directory_objects
101-
self.prefix = prefix
117+
self.prefix = prefix if prefix else ""
102118
self.allowed_storage_classes = allowed_storage_classes
103119
self.api_call_count = 0
104120
self.max_retries = max_retries
@@ -163,8 +179,10 @@ def run(self) -> None:
163179
try:
164180
list_blob_args = {
165181
"max_results": self.max_results,
166-
"start_offset": self.start_range,
167-
"end_offset": self.end_range,
182+
"start_offset": self.prefix + self.start_range,
183+
"end_offset": (
184+
"" if not self.end_range else self.prefix + self.end_range
185+
),
168186
}
169187
if self.prefix:
170188
list_blob_args["prefix"] = self.prefix
@@ -184,7 +202,10 @@ def run(self) -> None:
184202
and blob.storage_class in self.allowed_storage_classes
185203
):
186204
self.results.add((blob.name, blob.size))
187-
self.start_range = blob.name
205+
# Remove the prefix from the name so that range calculations remain prefix-agnostic.
206+
# This is necessary due to the unbounded end-range when splitting string namespaces
207+
# of unknown size.
208+
self.start_range = remove_prefix(blob.name, self.prefix)
188209
if i == self.max_results:
189210
# Only allow work stealing when paging.
190211
has_results = True
@@ -237,7 +258,7 @@ def run_list_worker(
237258
end_range: str,
238259
client: storage.Client = None,
239260
skip_compose: bool = True,
240-
prefix: str = None,
261+
prefix: str = "",
241262
allowed_storage_classes: list[str] = DEFAULT_ALLOWED_CLASS,
242263
) -> None:
243264
"""Helper function to execute a ListWorker.
@@ -253,7 +274,7 @@ def run_list_worker(
253274
unidle_queue: Multiprocessing queue pushed to when the worker has successfully stolen work.
254275
results_queue: Multiprocessing queue on which the worker pushes its listing results onto.
255276
metadata_queue: Multiprocessing queue on which the worker pushes tracking metadata.
256-
start_range: Stirng start range worker will begin listing from.
277+
start_range: String start range worker will begin listing from.
257278
end_range: String end range worker will list until.
258279
client: The GCS storage client. When not provided, will be derived from background auth.
259280
skip_compose: When true, skip listing files with the composed object prefix.
@@ -303,7 +324,7 @@ def __init__(
303324
bucket: str,
304325
sort_results: bool = False,
305326
skip_compose: bool = True,
306-
prefix: str = None,
327+
prefix: str = "",
307328
allowed_storage_classes: list[str] = DEFAULT_ALLOWED_CLASS,
308329
):
309330
# The maximum number of threads utilized in the fast list operation.

dataflux_core/tests/fake_gcs.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def list_blobs(
3838
max_results: int = 0,
3939
start_offset: str = "",
4040
end_offset: str = "",
41-
prefix=None,
41+
prefix: str = "",
4242
) -> list[Blob]:
4343
results = []
4444
for name in sorted(self.blobs):
@@ -47,7 +47,7 @@ def list_blobs(
4747
if (not start_offset or name >= start_offset) and (
4848
not end_offset or name < end_offset
4949
):
50-
if not prefix or name.startswith(prefix):
50+
if name.startswith(prefix):
5151
results.append(self.blobs[name])
5252
return results
5353

dataflux_core/tests/test_fast_list.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def test_single_worker(self):
3131
"compose_obj_count": 1,
3232
"prefix_obj_count": 0,
3333
"archive_obj_count": 0,
34-
"prefix": None,
34+
"prefix": "",
3535
"object_size": 10,
3636
"directory_obj_count": 10,
3737
"skip_compose": True,
@@ -45,7 +45,7 @@ def test_single_worker(self):
4545
"compose_obj_count": 1,
4646
"prefix_obj_count": 0,
4747
"archive_obj_count": 0,
48-
"prefix": None,
48+
"prefix": "",
4949
"object_size": 10,
5050
"directory_obj_count": 0,
5151
"skip_compose": False,
@@ -59,7 +59,7 @@ def test_single_worker(self):
5959
"compose_obj_count": 5000,
6060
"prefix_obj_count": 0,
6161
"archive_obj_count": 0,
62-
"prefix": None,
62+
"prefix": "",
6363
"object_size": 10,
6464
"directory_obj_count": 0,
6565
"skip_compose": True,
@@ -87,7 +87,7 @@ def test_single_worker(self):
8787
"compose_obj_count": 0,
8888
"prefix_obj_count": 0,
8989
"archive_obj_count": 0,
90-
"prefix": None,
90+
"prefix": "",
9191
"object_size": 10,
9292
"directory_obj_count": 10,
9393
"skip_compose": True,
@@ -101,7 +101,7 @@ def test_single_worker(self):
101101
"compose_obj_count": 0,
102102
"prefix_obj_count": 0,
103103
"archive_obj_count": 1000,
104-
"prefix": None,
104+
"prefix": "",
105105
"object_size": 10,
106106
"directory_obj_count": 0,
107107
"skip_compose": True,
@@ -260,9 +260,9 @@ def test_list_controller_e2e(self):
260260
client = fake_gcs.Client()
261261
bucket_name = "test_bucket"
262262
bucket = client.bucket(bucket_name)
263-
object_count = 100000
263+
object_count = 1000
264264
object_size = 10
265-
for i in range(100000):
265+
for i in range(object_count):
266266
bucket._add_file(str(i), "aaaaaaaaaa")
267267
controller = fast_list.ListingController(1, "", bucket_name, True)
268268
controller.client = client

kokoro/build.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ function install_requirements() {
3232

3333
function run_unit_tests() {
3434
echo Running unit tests.
35-
python -m pytest dataflux_core/tests --junit-xml="${KOKORO_ARTIFACTS_DIR}/unit_tests/sponge_log.xml"
35+
python -m pytest dataflux_core/tests -vvv --junit-xml="${KOKORO_ARTIFACTS_DIR}/unit_tests/sponge_log.xml"
3636
}
3737

3838
install_requirements

0 commit comments

Comments
 (0)