-
Notifications
You must be signed in to change notification settings - Fork 129
Expand file tree
/
Copy pathcatalog.py
More file actions
155 lines (127 loc) · 5.32 KB
/
catalog.py
File metadata and controls
155 lines (127 loc) · 5.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
'''Provides an object model for a Singer Catalog.'''
import json
import sys
from . import metadata as metadata_module
from .logger import get_logger
from .schema import Schema
LOGGER = get_logger()
def write_catalog(catalog):
# If the catalog has no streams, log a warning
if not catalog.streams:
LOGGER.warning("Catalog being written with no streams.")
json.dump(catalog.to_dict(), sys.stdout, indent=2)
# pylint: disable=too-many-instance-attributes
class CatalogEntry():
def __init__(self, tap_stream_id=None, stream=None,
key_properties=None, schema=None, replication_key=None,
is_view=None, database=None, table=None, row_count=None,
stream_alias=None, metadata=None, replication_method=None):
self.tap_stream_id = tap_stream_id
self.stream = stream
self.key_properties = key_properties
self.schema = schema
self.replication_key = replication_key
self.replication_method = replication_method
self.is_view = is_view
self.database = database
self.table = table
self.row_count = row_count
self.stream_alias = stream_alias
self.metadata = metadata
def __str__(self):
return str(self.__dict__)
def __eq__(self, other):
return self.__dict__ == other.__dict__
def is_selected(self):
mdata = metadata_module.to_map(self.metadata)
# pylint: disable=no-member
return self.schema.selected or metadata_module.get(mdata, (), 'selected')
def to_dict(self):
result = {}
if self.tap_stream_id:
result['tap_stream_id'] = self.tap_stream_id
if self.database:
result['database_name'] = self.database
if self.table:
result['table_name'] = self.table
if self.replication_key is not None:
result['replication_key'] = self.replication_key
if self.replication_method is not None:
result['replication_method'] = self.replication_method
if self.key_properties is not None:
result['key_properties'] = self.key_properties
if self.schema is not None:
schema = self.schema.to_dict() # pylint: disable=no-member
result['schema'] = schema
if self.is_view is not None:
result['is_view'] = self.is_view
if self.stream is not None:
result['stream'] = self.stream
if self.row_count is not None:
result['row_count'] = self.row_count
if self.stream_alias is not None:
result['stream_alias'] = self.stream_alias
if self.metadata is not None:
result['metadata'] = self.metadata
return result
class Catalog():
def __init__(self, streams):
self.streams = streams
def __str__(self):
return str(self.__dict__)
def __eq__(self, other):
return self.__dict__ == other.__dict__
@classmethod
def load(cls, filename):
with open(filename) as fp: # pylint: disable=invalid-name
return Catalog.from_dict(json.load(fp))
@classmethod
def from_dict(cls, data):
# TODO: We may want to store streams as a dict where the key is a
# tap_stream_id and the value is a CatalogEntry. This will allow
# faster lookup based on tap_stream_id. This would be a breaking
# change, since callers typically access the streams property
# directly.
streams = []
for stream in data['streams']:
entry = CatalogEntry()
entry.tap_stream_id = stream.get('tap_stream_id')
entry.stream = stream.get('stream')
entry.replication_key = stream.get('replication_key')
entry.key_properties = stream.get('key_properties')
entry.database = stream.get('database_name')
entry.table = stream.get('table_name')
entry.schema = Schema.from_dict(stream.get('schema'))
entry.is_view = stream.get('is_view')
entry.stream_alias = stream.get('stream_alias')
entry.metadata = stream.get('metadata')
entry.replication_method = stream.get('replication_method')
streams.append(entry)
return Catalog(streams)
def to_dict(self):
return {'streams': [stream.to_dict() for stream in self.streams]}
def dump(self):
json.dump(self.to_dict(), sys.stdout, indent=2)
def get_stream(self, tap_stream_id):
for stream in self.streams:
if stream.tap_stream_id == tap_stream_id:
return stream
return None
def _shuffle_streams(self, state):
currently_syncing = state.get_currently_syncing()
if currently_syncing is None:
return self.streams
matching_index = 0
for i, catalog_entry in enumerate(self.streams):
if catalog_entry.tap_stream_id == currently_syncing:
matching_index = i
break
top_half = self.streams[matching_index:]
bottom_half = self.streams[:matching_index]
return top_half + bottom_half
def get_selected_streams(self, state):
for stream in self._shuffle_streams(state):
if not stream.is_selected():
LOGGER.info('Skipping stream: %s', stream.tap_stream_id)
continue
yield stream