-
Notifications
You must be signed in to change notification settings - Fork 123
Expand file tree
/
Copy pathstate.py
More file actions
96 lines (81 loc) · 2.98 KB
/
state.py
File metadata and controls
96 lines (81 loc) · 2.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from dataclasses import dataclass
from pandas import DataFrame
from pyarrow import Table
from typing import Optional
from .schema import Schema, AttributeType
from .schema.attribute_type import FROM_PYOBJECT_MAPPING
@dataclass
class State:
def __init__(
self, table: Optional[Table] = None, pass_to_all_downstream: bool = True
):
self.schema = Schema()
self.passToAllDownstream = pass_to_all_downstream
if table is not None:
self.__dict__.update(table.to_pandas().iloc[0].to_dict())
self.schema = Schema(table.schema)
@classmethod
def from_tuple(cls, tuple, schema):
obj = cls()
obj.__dict__.update(tuple.as_dict())
obj.schema = schema
return obj
@classmethod
def from_dict(cls, dictionary):
obj = cls()
for item in dictionary:
obj.add(item, dictionary[item])
return obj
def add(
self, key: str, value: any, value_type: Optional[AttributeType] = None
) -> None:
if key not in self.__dict__:
if value_type is not None:
self.schema.add(key, value_type)
elif key != "schema":
self.schema.add(key, FROM_PYOBJECT_MAPPING[type(value)])
self.__dict__[key] = value
def get(self, key: str) -> any:
return self.__dict__[key]
def to_table(self) -> Table:
return Table.from_pandas(
df=DataFrame([self.__dict__]),
schema=self.schema.as_arrow_schema(),
)
def to_dict(self) -> dict:
dictionary = self.__dict__.copy()
del dictionary["passToAllDownstream"]
del dictionary["schema"]
return dictionary
def __setattr__(self, key: str, value: any) -> None:
self.add(key, value)
def __setitem__(self, key: str, value: any) -> None:
self.add(key, value)
def __getitem__(self, key: str) -> any:
return self.get(key)
def __str__(self) -> str:
content = ", ".join(
[
repr(key) + ": " + repr(value)
for key, value in self.__dict__.items()
if key != "schema"
]
)
return f"State[{content}]"
__repr__ = __str__