-
Notifications
You must be signed in to change notification settings - Fork 44
Expand file tree
/
Copy pathdefault_stream.py
More file actions
152 lines (131 loc) · 6.05 KB
/
default_stream.py
File metadata and controls
152 lines (131 loc) · 6.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from logging import Logger
from typing import Any, Callable, Iterable, List, Mapping, Optional, Union
from airbyte_cdk.models import AirbyteStream, SyncMode
from airbyte_cdk.sources.declarative.incremental.concurrent_partition_cursor import (
ConcurrentPerPartitionCursor,
)
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
StreamSlicerPartitionGenerator,
)
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, CursorField
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
class DefaultStream(AbstractStream):
def __init__(
self,
partition_generator: PartitionGenerator,
name: str,
json_schema: Union[Mapping[str, Any], Callable[[], Mapping[str, Any]]],
primary_key: List[str],
cursor_field: Optional[CursorField],
logger: Logger,
cursor: Cursor,
namespace: Optional[str] = None,
supports_file_transfer: bool = False,
block_simultaneous_read: str = "",
) -> None:
self._stream_partition_generator = partition_generator
self._name = name
self._json_schema = json_schema
self._primary_key = primary_key
self._cursor_field = cursor_field
self._logger = logger
self._cursor = cursor
self._namespace = namespace
self._supports_file_transfer = supports_file_transfer
self._block_simultaneous_read = block_simultaneous_read
def generate_partitions(self) -> Iterable[Partition]:
yield from self._stream_partition_generator.generate()
@property
def name(self) -> str:
return self._name
@property
def namespace(self) -> Optional[str]:
return self._namespace
@property
def cursor_field(self) -> Optional[str]:
return self._cursor_field.cursor_field_key if self._cursor_field else None
def get_json_schema(self) -> Mapping[str, Any]:
return self._json_schema() if callable(self._json_schema) else self._json_schema
def as_airbyte_stream(self) -> AirbyteStream:
stream = AirbyteStream(
name=self.name,
json_schema=dict(self.get_json_schema()),
supported_sync_modes=[SyncMode.full_refresh],
is_resumable=False,
is_file_based=self._supports_file_transfer,
)
if self._namespace:
stream.namespace = self._namespace
if self._cursor_field:
stream.source_defined_cursor = (
not self._cursor_field.supports_catalog_defined_cursor_field
)
stream.is_resumable = True
stream.supported_sync_modes.append(SyncMode.incremental)
stream.default_cursor_field = [self._cursor_field.cursor_field_key]
keys = self._primary_key
if keys and len(keys) > 0:
stream.source_defined_primary_key = [[key] for key in keys]
return stream
def log_stream_sync_configuration(self) -> None:
self._logger.debug(
f"Syncing stream instance: {self.name}",
extra={
"primary_key": self._primary_key,
"cursor_field": self.cursor_field,
},
)
@property
def cursor(self) -> Cursor:
return self._cursor
@property
def block_simultaneous_read(self) -> str:
"""Returns the blocking group name for this stream, or empty string if no blocking"""
return self._block_simultaneous_read
@block_simultaneous_read.setter
def block_simultaneous_read(self, value: str) -> None:
self._block_simultaneous_read = value
def get_partition_router(self) -> PartitionRouter | None:
"""Return the partition router for this stream, or None if not available."""
if not isinstance(self._stream_partition_generator, StreamSlicerPartitionGenerator):
return None
stream_slicer = self._stream_partition_generator._stream_slicer
if not isinstance(stream_slicer, ConcurrentPerPartitionCursor):
return None
return stream_slicer._partition_router
def check_availability(self) -> StreamAvailability:
"""
Check stream availability by attempting to read the first record of the stream.
"""
try:
partition = next(iter(self.generate_partitions()))
except StopIteration:
# NOTE: The following comment was copied from legacy stuff and I don't know how relevant it is:
# If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!)
# This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield <something>`
# without accounting for the case in which the parent stream is empty.
return StreamAvailability.unavailable(
f"Cannot attempt to connect to stream {self.name} - no stream slices were found"
)
except AirbyteTracedException as error:
return StreamAvailability.unavailable(
error.message or error.internal_message or "<no error message>"
)
try:
next(iter(partition.read()))
return StreamAvailability.available()
except StopIteration:
self._logger.info(f"Successfully connected to stream {self.name}, but got 0 records.")
return StreamAvailability.available()
except AirbyteTracedException as error:
return StreamAvailability.unavailable(
error.message or error.internal_message or "<no error message>"
)