-
-
Notifications
You must be signed in to change notification settings - Fork 562
Expand file tree
/
Copy pathtest_models.py
More file actions
186 lines (146 loc) · 5.1 KB
/
test_models.py
File metadata and controls
186 lines (146 loc) · 5.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# Copyright 2016 Camptocamp SA
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
from odoo import api, fields, models
from odoo.addons.queue_job.delay import chain
from odoo.addons.queue_job.exception import RetryableJobError
from odoo.addons.queue_job.job import identity_exact
class QueueJob(models.Model):
_inherit = "queue.job"
additional_info = fields.Char()
def testing_related_method(self, **kwargs):
return self, kwargs
def testing_related__none(self, **kwargs):
return None
def testing_related__url(self, **kwargs):
assert "url" in kwargs, "url required"
subject = self.args[0]
return {
"type": "ir.actions.act_url",
"target": "new",
"url": kwargs["url"].format(subject=subject),
}
class ModelTestQueueJob(models.Model):
_name = "test.queue.job"
_description = "Test model for queue.job"
name = fields.Char()
# to test the context is serialized/deserialized properly
@api.model
def _job_prepare_context_before_enqueue_keys(self):
return ("tz", "lang")
def testing_method(self, *args, **kwargs):
"""Method used for tests
Return always the arguments and keyword arguments received
"""
if kwargs.get("raise_retry"):
raise RetryableJobError("Must be retried later")
if kwargs.get("return_context"):
return self.env.context
return args, kwargs
def create_ir_logging(self, message, level="info"):
return self.env["ir.logging"].create(
{
"name": "test_queue_job",
"type": "server",
"dbname": self.env.cr.dbname,
"message": message,
"path": "job",
"func": "create_ir_logging",
"line": 1,
}
)
def job_with_retry_and_new_dependency(self):
logging_domain = [
("name", "=", "test_queue_job"),
("message", "=", "job_with_retry_and_new_dependency"),
]
if not self.env["ir.logging"].search_count(logging_domain):
new_job = self.with_delay().create_ir_logging(
message="job_with_retry_and_new_dependency"
)
raise RetryableJobError(
"Must be retried after creating the logging",
add_depends=[new_job],
)
return True
def no_description(self):
return
def job_with_retry_pattern(self):
return
def job_with_retry_pattern__no_zero(self):
return
def mapped(self, func):
return super().mapped(func)
def job_alter_mutable(self, mutable_arg, mutable_kwarg=None):
mutable_arg.append(2)
mutable_kwarg["b"] = 2
return mutable_arg, mutable_kwarg
def delay_me(self, arg, kwarg=None):
return arg, kwarg
def delay_me_options_job_options(self):
return {
"identity_key": "my_job_identity",
}
def delay_me_options(self):
return "ok"
def delay_me_context_key(self):
return "ok"
def _register_hook(self):
self._patch_method("delay_me", self._patch_job_auto_delay("delay_me"))
self._patch_method(
"delay_me_options", self._patch_job_auto_delay("delay_me_options")
)
self._patch_method(
"delay_me_context_key",
self._patch_job_auto_delay(
"delay_me_context_key", context_key="auto_delay_delay_me_context_key"
),
)
return super()._register_hook()
def _job_store_values(self, job):
value = "JUST_TESTING"
if job.state == "failed":
value += "_BUT_FAILED"
return {"additional_info": value}
def button_that_uses_with_delay(self):
self.with_delay(
channel="root.test",
description="Test",
eta=15,
identity_key=identity_exact,
max_retries=1,
priority=15,
).testing_method(1, foo=2)
def button_that_uses_delayable_chain(self):
delayables = chain(
self.delayable(
channel="root.test",
description="Test",
eta=15,
identity_key=identity_exact,
max_retries=1,
priority=15,
).testing_method(1, foo=2),
self.delayable().testing_method("x", foo="y"),
self.delayable().no_description(),
)
delayables.delay()
class ModelTestQueueChannel(models.Model):
_name = "test.queue.channel"
_description = "Test model for queue.channel"
def job_a(self):
return
def job_b(self):
return
def job_sub_channel(self):
return
class ModelTestRelatedAction(models.Model):
_name = "test.related.action"
_description = "Test model for related actions"
def testing_related_action__no(self):
return
def testing_related_action__return_none(self):
return
def testing_related_action__kwargs(self):
return
def testing_related_action__store(self):
return