-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathjournals.py
More file actions
154 lines (124 loc) · 4.79 KB
/
journals.py
File metadata and controls
154 lines (124 loc) · 4.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
from urllib.parse import urljoin
from mpt_api_client.http import AsyncService, Service
from mpt_api_client.http.mixins import (
AsyncCollectionMixin,
AsyncManagedResourceMixin,
CollectionMixin,
ManagedResourceMixin,
)
from mpt_api_client.http.types import FileTypes
from mpt_api_client.models import Model
from mpt_api_client.resources.billing.journal_attachments import (
AsyncJournalAttachmentsService,
JournalAttachmentsService,
)
from mpt_api_client.resources.billing.journal_charges import (
AsyncJournalChargesService,
JournalChargesService,
)
from mpt_api_client.resources.billing.journal_sellers import (
AsyncJournalSellersService,
JournalSellersService,
)
from mpt_api_client.resources.billing.mixins import AsyncRegeneratableMixin, RegeneratableMixin
class Journal(Model):
"""Journal resource."""
class JournalsServiceConfig:
"""Journals service configuration."""
_endpoint = "/public/v1/billing/journals"
_model_class = Journal
_collection_key = "data"
_upload_file_key = "file"
_upload_data_key = "id"
class JournalsService(
RegeneratableMixin[Journal],
ManagedResourceMixin[Journal],
CollectionMixin[Journal],
Service[Journal],
JournalsServiceConfig,
):
"""Journals service."""
def upload(self, journal_id: str, file: FileTypes | None = None) -> Journal: # noqa: WPS110
"""Upload journal file.
Args:
journal_id: Journal ID.
file: journal file.
Returns:
Journal: Created resource.
"""
files = {}
if file:
files[self._upload_file_key] = file # UNUSED type: ignore[attr-defined]
files[self._upload_data_key] = journal_id # UNUSED type: ignore
path = urljoin(f"{self.path}/", f"{journal_id}/upload")
response = self.http_client.request( # UNUSED type: ignore[attr-defined]
"post",
path, # UNUSED type: ignore[attr-defined]
files=files,
force_multipart=True,
)
return self._model_class.from_response(
response
) # UNUSED type: ignore[attr-defined, no-any-return]
def attachments(self, journal_id: str) -> JournalAttachmentsService:
"""Return journal attachments service."""
return JournalAttachmentsService(
http_client=self.http_client,
endpoint_params={"journal_id": journal_id},
)
def sellers(self, journal_id: str) -> JournalSellersService:
"""Return journal sellers service."""
return JournalSellersService(
http_client=self.http_client, endpoint_params={"journal_id": journal_id}
)
def charges(self, journal_id: str) -> JournalChargesService:
"""Return journal charges service."""
return JournalChargesService(
http_client=self.http_client, endpoint_params={"journal_id": journal_id}
)
class AsyncJournalsService(
AsyncRegeneratableMixin[Journal],
AsyncManagedResourceMixin[Journal],
AsyncCollectionMixin[Journal],
AsyncService[Journal],
JournalsServiceConfig,
):
"""Async Journals service."""
async def upload(self, journal_id: str, file: FileTypes | None = None) -> Journal: # noqa: WPS110
"""Upload journal file.
Args:
journal_id: Journal ID.
file: journal file.
Returns:
Journal: Created resource.
"""
files = {}
if file:
files[self._upload_file_key] = file # UNUSED type: ignore[attr-defined]
files[self._upload_data_key] = journal_id # UNUSED type: ignore
path = urljoin(f"{self.path}/", f"{journal_id}/upload")
response = await self.http_client.request( # UNUSED type: ignore[attr-defined]
"post",
path, # UNUSED type: ignore[attr-defined]
files=files,
force_multipart=True,
)
return self._model_class.from_response(
response
) # UNUSED type: ignore[attr-defined, no-any-return]
def attachments(self, journal_id: str) -> AsyncJournalAttachmentsService:
"""Return journal attachments service."""
return AsyncJournalAttachmentsService(
http_client=self.http_client,
endpoint_params={"journal_id": journal_id},
)
def sellers(self, journal_id: str) -> AsyncJournalSellersService:
"""Return journal sellers service."""
return AsyncJournalSellersService(
http_client=self.http_client, endpoint_params={"journal_id": journal_id}
)
def charges(self, journal_id: str) -> AsyncJournalChargesService:
"""Return journal charges service."""
return AsyncJournalChargesService(
http_client=self.http_client, endpoint_params={"journal_id": journal_id}
)