From 43af0b6a2f381a1f273a4eb84b02e0542cd67076 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9sar=20Rom=C3=A1n?= Date: Wed, 21 Jan 2026 01:20:35 +0000 Subject: [PATCH] refactor(system): add arguments to pollPartition and pollTopic functions part of the kafka module --- src/system/kafka.py | 16 ++++++++++++++-- stubs/stubs/system/kafka.pyi | 4 ++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/src/system/kafka.py b/src/system/kafka.py index 5c76ce6..d63414c 100644 --- a/src/system/kafka.py +++ b/src/system/kafka.py @@ -76,6 +76,8 @@ def pollPartition( partition, # type: int offset, # type: Union[str, unicode] options=None, # type: Optional[Dict[Union[str, unicode], Any]] + sizeCutoff=None, # type: Optional[int] + timeoutMs=None, # type: Optional[long] ): # type: (...) -> List[Any] """Polls a specific partition of a topic. @@ -87,11 +89,15 @@ def pollPartition( offset: The position of offset to start the poll at. options: Custom options specific to the consumer, with key value string pairs. Optional + sizeCutoff: The total record count allowed before polling will + be stopped. Optional. + timeoutMs: The amount of time in milliseconds before polling + will be stopped. Optional. Returns: A list of records polled from a specified partition. """ - print(connector, topic, partition, offset, options) + print(connector, topic, partition, offset, options, sizeCutoff, timeoutMs) return [] @@ -100,6 +106,8 @@ def pollTopic( topic, # type: Union[str, unicode] groupId, # type: Union[str, unicode] options=None, # type: Optional[Dict[Union[str, unicode], Any]] + sizeCutoff=None, # type: Optional[int] + timeoutMs=None, # type: Optional[long] ): # type: (...) -> List[Any] """Returns a list of records from the specified topic. @@ -111,11 +119,15 @@ def pollTopic( belongs to. options: Custom options specific to the consumer, with key value string pairs. Optional + sizeCutoff: The total record count allowed before polling will + be stopped. Optional. + timeoutMs: The amount of time in milliseconds before polling + will be stopped. Optional. Returns: A list of records from the specified topic. """ - print(connector, topic, groupId, options) + print(connector, topic, groupId, options, sizeCutoff, timeoutMs) return [] diff --git a/stubs/stubs/system/kafka.pyi b/stubs/stubs/system/kafka.pyi index 675de5d..a609a5b 100644 --- a/stubs/stubs/system/kafka.pyi +++ b/stubs/stubs/system/kafka.pyi @@ -14,12 +14,16 @@ def pollPartition( partition: int, offset: Union[str, unicode], options: Optional[Dict[Union[str, unicode], Any]] = ..., + sizeCutoff: Optional[int] = ..., + timeoutMs: Optional[long] = ..., ) -> List[Any]: ... def pollTopic( connector: Union[str, unicode], topic: Union[str, unicode], groupId: Union[str, unicode], options: Optional[Dict[Union[str, unicode], Any]] = ..., + sizeCutoff: Optional[int] = ..., + timeoutMs: Optional[long] = ..., ) -> List[Any]: ... def seekLatest( connector: Union[str, unicode],