-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathexecutor.rb
More file actions
281 lines (249 loc) · 10.9 KB
/
executor.rb
File metadata and controls
281 lines (249 loc) · 10.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
# frozen_string_literal: true
require_relative "./executor/execution_scope"
require_relative "./executor/execution_field"
require_relative "./executor/authorization"
require_relative "./executor/hot_paths"
require_relative "./executor/response_hash"
require_relative "./executor/error_formatter"
module GraphQL
module Cardinal
class Executor
include HotPaths
TYPENAME_FIELD = "__typename"
TYPENAME_FIELD_RESOLVER = TypenameResolver.new
attr_reader :exec_count
def initialize(schema, resolvers, document, root_object, variables: {}, context: {}, tracers: [])
@query = GraphQL::Query.new(schema, document: document) # << for schema reference
@resolvers = resolvers
@document = document
@root_object = root_object
@tracers = tracers
@variables = variables
@context = context
@data = {}
@errors = []
@exec_queue = []
@exec_count = 0
@context[:query] = @query
end
def perform
operation = @query.selected_operation
root_scopes = case operation.operation_type
when "query"
# query fields can run in parallel
[
ExecutionScope.new(
parent_type: @query.root_type_for_operation(operation.operation_type),
selections: operation.selections,
sources: [@root_object],
responses: [@data],
)
]
when "mutation"
# each mutation field must run serially as its own scope
mutation_type = @query.root_type_for_operation(operation.operation_type)
execution_fields_by_key(mutation_type, operation.selections).each_value.map do |exec_field|
ExecutionScope.new(
parent_type: mutation_type,
selections: exec_field.nodes,
sources: [@root_object],
responses: [@data],
)
end
else
raise DocumentError.new("Unsupported operation type: #{operation.operation_type}")
end
root_scopes.each do |scope|
@exec_queue << scope
# execute until no more scopes (without using recursion)...
execute_scope(@exec_queue.shift) until @exec_queue.empty?
end
response = { "data" => @errors.empty? ? @data : ErrorFormatter.new(@query, @data, @errors).perform }
response["errors"] = @errors.map(&:to_h) unless @errors.empty?
response
end
private
def execute_scope(exec_scope)
unless exec_scope.fields
exec_scope.fields = execution_fields_by_key(exec_scope.parent_type, exec_scope.selections)
exec_scope.fields.each_value do |exec_field|
parent_type = exec_scope.parent_type
parent_sources = exec_scope.sources
field_name = exec_field.name
exec_field.scope = exec_scope
exec_field.type = @query.get_field(parent_type, field_name).type
value_type = exec_field.type.unwrap
field_resolver = @resolvers.dig(parent_type.graphql_name, field_name)
unless field_resolver
if field_name == TYPENAME_FIELD
field_resolver = TYPENAME_FIELD_RESOLVER
else
raise NotImplementedError, "No field resolver for '#{parent_type.graphql_name}.#{field_name}'"
end
end
exec_field.result = if !field_resolver.authorized?(@context)
@errors << AuthorizationError.new(type_name: parent_type.graphql_name, field_name: field_name, path: exec_field.path, base: true)
Array.new(parent_sources.length, @errors.last)
elsif !Authorization.can_access_type?(value_type, @context)
@errors << AuthorizationError.new(type_name: value_type.graphql_name, path: exec_field.path, base: true)
Array.new(parent_sources.length, @errors.last)
else
begin
@tracers.each { _1.before_resolve_field(parent_type, field_name, parent_sources.length, @context) }
field_resolver.resolve(parent_sources, exec_field.arguments(@variables), @context, exec_scope)
rescue StandardError => e
report_exception(error: e, field: exec_field)
@errors << InternalError.new(path: exec_field.path, base: true)
Array.new(parent_sources.length, @errors.last)
ensure
@tracers.each { _1.after_resolve_field(parent_type, field_name, parent_sources.length, @context) }
@exec_count += 1
end
end
end
end
if exec_scope.lazy_fields?
if exec_scope.lazy_fields_ready?
exec_scope.send(:lazy_exec!) # << noop for loaders that have already run
exec_scope.fields.each_value do |exec_field|
sources = exec_field.result.is_a?(Promise) ? exec_field.result.value : exec_field.result
resolve_execution_field(exec_field, sources)
end
else
# requeue the scope to wait on others that haven't built fields yet
@exec_queue << exec_scope
end
else
exec_scope.fields.each_value do |exec_field|
resolve_execution_field(exec_field, exec_field.result)
end
end
nil
end
def resolve_execution_field(exec_field, resolved_sources)
parent_sources = exec_field.scope.sources
parent_responses = exec_field.scope.responses
field_key = exec_field.key
field_type = exec_field.type
return_type = field_type.unwrap
if resolved_sources.length != parent_sources.length
report_exception("Incorrect number of results resolved. Expected #{parent_sources.length}, got #{resolved_sources.length}", field: exec_field)
resolved_sources = Array.new(parent_sources.length, nil)
end
if return_type.kind.composite?
# build results with child selections
next_sources = []
next_responses = []
resolved_sources.each_with_index do |source, i|
# DANGER: HOT PATH!
parent_responses[i][field_key] = build_composite_response(exec_field, field_type, source, next_sources, next_responses)
end
if return_type.kind.abstract?
type_resolver = @resolvers.dig(return_type.graphql_name, "__type__")
unless type_resolver
raise NotImplementedError, "No type resolver for `#{return_type.graphql_name}`"
end
next_sources_by_type = Hash.new { |h, k| h[k] = [] }
next_responses_by_type = Hash.new { |h, k| h[k] = [] }
next_sources.each_with_index do |source, i|
# DANGER: HOT PATH!
impl_type = type_resolver.call(source, @context)
next_sources_by_type[impl_type] << (exec_field.name == TYPENAME_FIELD ? impl_type.graphql_name : source)
next_responses_by_type[impl_type] << next_responses[i].tap { |r| r.typename = impl_type.graphql_name }
end
loader_cache = {} # << all scopes in the abstract generation share a loader cache
loader_group = []
next_sources_by_type.each do |impl_type, impl_type_sources|
# check concrete type access only once per resolved type...
unless Authorization.can_access_type?(impl_type, @context)
@errors << AuthorizationError.new(type_name: impl_type.graphql_name, path: exec_field.path, base: true)
impl_type_sources = Array.new(impl_type_sources.length, @errors.last)
end
loader_group << ExecutionScope.new(
parent_type: impl_type,
selections: exec_field.selections,
sources: impl_type_sources,
responses: next_responses_by_type[impl_type],
loader_cache: loader_cache,
loader_group: loader_group,
path: exec_field.path,
parent: exec_field.scope,
)
end
@exec_queue.concat(loader_group)
else
@exec_queue << ExecutionScope.new(
parent_type: return_type,
selections: exec_field.selections,
sources: next_sources,
responses: next_responses,
path: exec_field.path,
parent: exec_field.scope,
)
end
else
# build leaf results
resolved_sources.each_with_index do |val, i|
# DANGER: HOT PATH!
parent_responses[i][field_key] = if val.nil? || val.is_a?(StandardError)
build_missing_value(exec_field, field_type, val)
elsif return_type.kind.scalar?
coerce_scalar_value(return_type, val)
elsif return_type.kind.enum?
coerce_enum_value(return_type, val)
else
val
end
end
end
end
def execution_fields_by_key(parent_type, selections, map: Hash.new { |h, k| h[k] = ExecutionField.new(k) })
selections.each do |node|
next if node_skipped?(node)
case node
when GraphQL::Language::Nodes::Field
map[node.alias || node.name].add_node(node)
when GraphQL::Language::Nodes::InlineFragment
fragment_type = node.type ? @query.get_type(node.type.name) : parent_type
if @query.possible_types(fragment_type).include?(parent_type)
execution_fields_by_key(parent_type, node.selections, map: map)
end
when GraphQL::Language::Nodes::FragmentSpread
fragment = @query.fragments[node.name]
fragment_type = @query.get_type(fragment.type.name)
if @query.possible_types(fragment_type).include?(parent_type)
execution_fields_by_key(parent_type, fragment.selections, map: map)
end
else
raise DocumentError.new("Invalid selection node type")
end
end
map
end
def node_skipped?(node)
return false if node.directives.empty?
node.directives.any? do |directive|
if directive.name == "skip"
if_argument?(directive.arguments.first)
elsif directive.name == "include"
!if_argument?(directive.arguments.first)
else
false
end
end
end
def if_argument?(bool_arg)
if bool_arg.value.is_a?(GraphQL::Language::Nodes::VariableIdentifier)
@variables[bool_arg.value.name] || @variables[bool_arg.value.name.to_sym]
else
bool_arg.value
end
end
def report_exception(message = nil, error: nil, field: nil)
# todo: add real error reporting...
puts "Error at #{field.path.join(".")}: #{message || error&.message}" if field
puts error.backtrace.join("\n") if error
end
end
end
end