From ca29455412de42520c2790a4eac5b9f97c40f7a3 Mon Sep 17 00:00:00 2001 From: Robert Mosolgo Date: Wed, 25 Mar 2026 15:10:51 -0400 Subject: [PATCH 01/10] Start supporting subscriptions with exec-next --- lib/graphql/execution/next.rb | 4 +- .../execution/next/field_resolve_step.rb | 53 +++++++++++++------ lib/graphql/execution/next/runner.rb | 33 +++++++----- lib/graphql/query/context.rb | 2 +- lib/graphql/schema/resolver.rb | 8 ++- lib/graphql/schema/subscription.rb | 4 +- .../default_subscription_resolve_extension.rb | 7 ++- lib/graphql/subscriptions/event.rb | 5 +- spec/graphql/schema/subscription_spec.rb | 53 ++++++++++++------- spec/graphql/subscriptions_spec.rb | 2 +- 10 files changed, 115 insertions(+), 56 deletions(-) diff --git a/lib/graphql/execution/next.rb b/lib/graphql/execution/next.rb index ef8c5aad9e..45fdf699d5 100644 --- a/lib/graphql/execution/next.rb +++ b/lib/graphql/execution/next.rb @@ -52,7 +52,7 @@ def self.use(schema, authorization: true) def self.run_all(schema, query_options, context: {}, max_complexity: schema.max_complexity) queries = query_options.map do |opts| - case opts + query = case opts when Hash schema.query_class.new(schema, nil, **opts) when GraphQL::Query, GraphQL::Query::Partial @@ -60,6 +60,8 @@ def self.run_all(schema, query_options, context: {}, max_complexity: schema.max_ else raise "Expected Hash or GraphQL::Query, not #{opts.class} (#{opts.inspect})" end + query.context[:__graphql_execute_next] = true + query end multiplex = Execution::Multiplex.new(schema: schema, queries: queries, context: context, max_complexity: max_complexity) runner = Runner.new(multiplex, **schema.execution_next_options) diff --git a/lib/graphql/execution/next/field_resolve_step.rb b/lib/graphql/execution/next/field_resolve_step.rb index a8abb3af2f..a3660034d3 100644 --- a/lib/graphql/execution/next/field_resolve_step.rb +++ b/lib/graphql/execution/next/field_resolve_step.rb @@ -48,7 +48,7 @@ def append_selection(ast_node) nil end - def coerce_arguments(argument_owner, ast_arguments_or_hash) + def coerce_arguments(argument_owner, ast_arguments_or_hash, run_loads = true) arg_defns = argument_owner.arguments(@selections_step.query.context) if arg_defns.empty? return EmptyObjects::EMPTY_HASH @@ -60,25 +60,26 @@ def coerce_arguments(argument_owner, ast_arguments_or_hash) arg_defn = arg_defns.each_value.find { |a| a.keyword == key || a.graphql_name == (key_s ||= String(key)) } - coerce_argument_value(args_hash, arg_defn, value) + coerce_argument_value(args_hash, arg_defn, value, run_loads) end else ast_arguments_or_hash.each { |arg_node| arg_defn = arg_defns[arg_node.name] - coerce_argument_value(args_hash, arg_defn, arg_node.value) + coerce_argument_value(args_hash, arg_defn, arg_node.value, run_loads) } end # TODO refactor the loop above into this one arg_defns.each do |arg_graphql_name, arg_defn| if arg_defn.default_value? && !args_hash.key?(arg_defn.keyword) - coerce_argument_value(args_hash, arg_defn, arg_defn.default_value) + coerce_argument_value(args_hash, arg_defn, arg_defn.default_value, run_loads) end end args_hash end - def coerce_argument_value(arguments, arg_defn, arg_value, target_keyword: arg_defn.keyword, as_type: nil) + def coerce_argument_value(arguments, arg_defn, arg_value, run_loads, target_keyword: run_loads ? arg_defn.keyword : arg_defn.graphql_name, as_type: nil) + p [:coerce, arg_defn.path, arg_value, run_loads, target_keyword] arg_t = as_type || arg_defn.type if arg_t.non_null? arg_t = arg_t.of_type @@ -97,8 +98,6 @@ def coerce_argument_value(arguments, arg_defn, arg_value, target_keyword: arg_de nil elsif arg_value.is_a?(Language::Nodes::Enum) arg_value.name - elsif arg_value.is_a?(Language::Nodes::InputObject) - arg_value.arguments # rubocop:disable Development/ContextIsPassedCop else arg_value end @@ -125,7 +124,8 @@ def coerce_argument_value(arguments, arg_defn, arg_value, target_keyword: arg_de end end elsif arg_t.kind.input_object? - input_obj_args = coerce_arguments(arg_t, arg_value) + input_obj_vals = arg_value.is_a?(Language::Nodes::InputObject) ? arg_value.arguments : arg_value # rubocop:disable Development/ContextIsPassedCop + input_obj_args = coerce_arguments(arg_t, input_obj_vals) arg_t.new(nil, ruby_kwargs: input_obj_args, context: @selections_step.query.context, defaults_used: nil) else raise "Unsupported argument value: #{arg_t.to_type_signature} / #{arg_value.class} (#{arg_value.inspect})" @@ -145,7 +145,7 @@ def coerce_argument_value(arguments, arg_defn, arg_value, target_keyword: arg_de if arg_value.is_a?(GraphQL::Error) @arguments = arg_value - elsif arg_defn.loads && as_type.nil? && !arg_value.nil? + elsif run_loads && arg_defn.loads && as_type.nil? && !arg_value.nil? # This is for legacy compat: load_receiver = if (r = @field_definition.resolver) r.new(field: @field_definition, context: @selections_step.query.context, object: nil) @@ -181,6 +181,7 @@ def coerce_argument_value(arguments, arg_defn, arg_value, target_keyword: arg_de @runner.add_step(loads_step) end else + p [:assign_arg, target_keyword, arg_value] arguments[target_keyword] = arg_value end nil @@ -236,10 +237,15 @@ def call end def add_graphql_error(err) - err.path = path - err.ast_nodes = ast_nodes - @selections_step.query.context.add_error(err) - err + if GraphQL::Execution::SKIP.equal?(err) + # pass + nil + else + err.path = path + err.ast_nodes = ast_nodes + @selections_step.query.context.add_error(err) + err + end end module AlwaysAuthorized @@ -323,6 +329,14 @@ def execute_field is_authed = @field_definition.authorized?(o, @arguments, ctx) if is_authed authorized_objects << o + else + begin + err = GraphQL::UnauthorizedFieldError.new(object: o, type: @parent_type, context: ctx, field: @field_definition) + authorized_objects << query.schema.unauthorized_object(err) + is_authed = true + rescue GraphQL::ExecutionError => exec_err + add_graphql_error(exec_err) + end end is_authed } @@ -638,7 +652,17 @@ def resolve_batch(objects, context, args_hash) objects.map { |o| o[@field_definition.execution_next_mode_key] } when :direct_send if args_hash.empty? - objects.map { |o| o.public_send(@field_definition.execution_next_mode_key) } + objects.map do |o| + o.public_send(@field_definition.execution_next_mode_key) + rescue GraphQL::ExecutionError => err + err + rescue StandardError => stderr + begin + @selections_step.query.handle_or_reraise(stderr) + rescue GraphQL::ExecutionError => ex_err + ex_err + end + end else objects.map { |o| o.public_send(@field_definition.execution_next_mode_key, **args_hash) } end @@ -694,7 +718,6 @@ def resolve_batch(objects, context, args_hash) raise "Batching execution for #{path} not implemented (execution_next_mode: #{@execution_next_mode.inspect}); provide `resolve_static:`, `resolve_batch:`, `hash_key:`, `method:`, or use a compatibility plug-in" end end - end class RawValueFieldResolveStep < FieldResolveStep diff --git a/lib/graphql/execution/next/runner.rb b/lib/graphql/execution/next/runner.rb index eed554807e..a68bda0928 100644 --- a/lib/graphql/execution/next/runner.rb +++ b/lib/graphql/execution/next/runner.rb @@ -112,10 +112,16 @@ def execute root_value = query.root_value end + if query.subscription? && !query.subscription_update? + subs_namespace = query.context.namespace(:subscriptions) + subs_namespace[:events] = [] + subs_namespace[:subscriptions] = {} + end + results << { "data" => data } case selected_operation.operation_type - when nil, "query" + when nil, "query", "subscription" isolated_steps[0] << SelectionsStep.new( parent_type: root_type, selections: selected_operation.selections, @@ -139,8 +145,6 @@ def execute query: query, )] end - when "subscription" - raise ArgumentError, "TODO implement subscriptions" else raise ArgumentError, "Unhandled operation type: #{operation.operation_type.inspect}" end @@ -165,6 +169,10 @@ def execute queries.each_with_index.map do |query, idx| result = results[idx] + if (events = query.context.namespace(:subscriptions)[:events]) && !events.empty? + @schema.subscriptions.write_subscription(query, events) + end + p [:events, events] fin_result = if query.context.errors.empty? result else @@ -241,16 +249,17 @@ def propagate_errors(data, query) paths_to_check.compact! # root-level auth errors currently come without a path # TODO dry with above? # This is also where a query-level "Step" would be used? - selected_operation = query.document.definitions.first # TODO pick a selected operation - root_type = case selected_operation.operation_type - when nil, "query" - query.schema.query - when "mutation" - query.schema.mutation - when "subscription" - raise "Not implemented yet, TODO" + if (selected_operation = query.selected_operation) + root_type = case selected_operation.operation_type + when nil, "query" + query.schema.query + when "mutation" + query.schema.mutation + when "subscription" + query.schema.subscription + end + check_object_result(query, data, root_type, selected_operation.selections, [], [], paths_to_check) end - check_object_result(query, data, root_type, selected_operation.selections, [], [], paths_to_check) end def check_object_result(query, result_h, static_type, ast_selections, current_exec_path, current_result_path, paths_to_check) diff --git a/lib/graphql/query/context.rb b/lib/graphql/query/context.rb index e6eaed0793..67b3e8872a 100644 --- a/lib/graphql/query/context.rb +++ b/lib/graphql/query/context.rb @@ -112,7 +112,7 @@ def [](key) # Return this value to tell the runtime # to exclude this field from the response altogether def skip - GraphQL::Execution::SKIP + self[:__graphql_execute_next] ? nil : GraphQL::Execution::SKIP end # Add error at query-level. diff --git a/lib/graphql/schema/resolver.rb b/lib/graphql/schema/resolver.rb index 46e56565cf..bd35f4e3e7 100644 --- a/lib/graphql/schema/resolver.rb +++ b/lib/graphql/schema/resolver.rb @@ -46,7 +46,7 @@ def initialize(object:, context:, field:) @prepared_arguments = nil end - attr_accessor :exec_result, :exec_index, :field_resolve_step + attr_accessor :exec_result, :exec_index, :field_resolve_step, :raw_arguments # @return [Object] The application object this field is being resolved on attr_accessor :object @@ -80,7 +80,11 @@ def call end q = context.query q.current_trace.end_execute_field(field, @prepared_arguments, trace_objs, q, [result]) - + if q.subscription? && @field.owner == context.schema.subscription + # TODO unify this -- do it in a single pass + @original_arguments = @field_resolve_step.coerce_arguments(@field, @field_resolve_step.ast_node.arguments, false) + Subscriptions::DefaultSubscriptionResolveExtension.write_subscription(@field, result, @original_arguments, context) + end exec_result[exec_index] = result rescue RuntimeError => err exec_result[exec_index] = err diff --git a/lib/graphql/schema/subscription.rb b/lib/graphql/schema/subscription.rb index 8f1c07971c..bbd761661d 100644 --- a/lib/graphql/schema/subscription.rb +++ b/lib/graphql/schema/subscription.rb @@ -15,9 +15,7 @@ class Subscription < GraphQL::Schema::Resolver extend GraphQL::Schema::Resolver::HasPayloadType extend GraphQL::Schema::Member::HasFields NO_UPDATE = :no_update - # The generated payload type is required; If there's no payload, - # propagate null. - null false + null true # @api private def initialize(object:, context:, field:) diff --git a/lib/graphql/subscriptions/default_subscription_resolve_extension.rb b/lib/graphql/subscriptions/default_subscription_resolve_extension.rb index 38e457bea2..d6c1bec90a 100644 --- a/lib/graphql/subscriptions/default_subscription_resolve_extension.rb +++ b/lib/graphql/subscriptions/default_subscription_resolve_extension.rb @@ -18,9 +18,14 @@ def resolve(context:, object:, arguments:) end def after_resolve(value:, context:, object:, arguments:, **rest) + self.class.write_subscription(@field, value, arguments, context) + end + + def self.write_subscription(field, value, arguments, context) + p [field.path, value, arguments] if value.is_a?(GraphQL::ExecutionError) value - elsif @field.resolver&.method_defined?(:subscription_written?) && + elsif field.resolver&.method_defined?(:subscription_written?) && (subscription_namespace = context.namespace(:subscriptions)) && (subscriptions_by_path = subscription_namespace[:subscriptions]) (subscription_instance = subscriptions_by_path[context.current_path]) diff --git a/lib/graphql/subscriptions/event.rb b/lib/graphql/subscriptions/event.rb index d2d8ce7abb..3ac917cc84 100644 --- a/lib/graphql/subscriptions/event.rb +++ b/lib/graphql/subscriptions/event.rb @@ -41,7 +41,9 @@ def self.serialize(_name, arguments, field, scope:, context: GraphQL::Query::Nul subscription = field.resolver || GraphQL::Schema::Subscription arguments = arguments_without_field_extras(field: field, arguments: arguments) normalized_args = stringify_args(field, arguments.to_h, context) - subscription.topic_for(arguments: normalized_args, field: field, scope: scope) + t = subscription.topic_for(arguments: normalized_args, field: field, scope: scope) + p [:topic, t] + t end # @return [String] a logical identifier for this event. (Stable when the query is broadcastable.) @@ -104,6 +106,7 @@ def deep_sort_array_hashes(array_to_inspect) def stringify_args(arg_owner, args, context) arg_owner = arg_owner.respond_to?(:unwrap) ? arg_owner.unwrap : arg_owner # remove list and non-null wrappers + case args when Hash next_args = {} diff --git a/spec/graphql/schema/subscription_spec.rb b/spec/graphql/schema/subscription_spec.rb index 0df9d1953e..dacb6b8ac6 100644 --- a/spec/graphql/schema/subscription_spec.rb +++ b/spec/graphql/schema/subscription_spec.rb @@ -14,13 +14,13 @@ class SubscriptionFieldSchema < GraphQL::Schema USERS = {} class User < GraphQL::Schema::Object - field :handle, String, null: false - field :private, Boolean, null: false + field :handle, String, null: false, hash_key: :handle + field :private, Boolean, null: false, hash_key: :private end class Toot < GraphQL::Schema::Object - field :handle, String, null: false - field :body, String, null: false + field :handle, String, null: false, hash_key: :handle + field :body, String, null: false, hash_key: :body def self.visible?(context) !context[:legacy_schema] @@ -28,7 +28,7 @@ def self.visible?(context) end class LegacyToot < Toot - field :likes_count, Int, null: false + field :likes_count, Int, null: false, hash_key: :likes_count def self.visible?(context) !!context[:legacy_schema] @@ -37,11 +37,15 @@ def self.visible?(context) class Query < GraphQL::Schema::Object field :toots, [Toot], null: false - field :toots, [LegacyToot], null: false + field :toots, [LegacyToot], null: false, resolve_static: true - def toots + def self.toots(context) TOOTS end + + def toots + self.class.toots(context) + end end class BaseSubscription < GraphQL::Schema::Subscription @@ -50,17 +54,17 @@ class BaseSubscription < GraphQL::Schema::Subscription class TootWasTooted < BaseSubscription argument :handle, String, loads: User, as: :user, camelize: false - field :toot, Toot, null: false - field :user, User, null: false + field :toot, Toot, null: false, hash_key: :toot + field :user, User, null: false, hash_key: :user def self.visible?(context) !context[:legacy_schema] end # Can't subscribe to private users - def authorized?(user:, path:, query:) + def authorized?(user:) if user[:private] - context[:last_path] = path + context[:last_path] = context[:current_path] false else true @@ -94,7 +98,7 @@ def update(user:, **args) end class LegacyTootWasTooted < TootWasTooted - field :toot, LegacyToot + field :toot, LegacyToot, hash_key: :toot def self.visible?(context) !!context[:legacy_schema] @@ -103,8 +107,8 @@ def self.visible?(context) class DirectTootWasTooted < BaseSubscription subscription_scope :viewer - field :toot, Toot, null: false - field :user, User, null: false + field :toot, Toot, null: false, hash_key: :toot + field :user, User, null: false, hash_key: :user end class DirectTootWasTootedWithOptionalScope < DirectTootWasTooted @@ -115,6 +119,7 @@ class DirectTootWasTootedWithOptionalScope < DirectTootWasTooted class UsersJoined < BaseSubscription class UsersJoinedManualPayload < GraphQL::Schema::Object field :users, [User], + hash_key: :users, description: "Includes newly-created users, or all users on the initial load" end @@ -134,12 +139,13 @@ def update # to make sure it works without arguments class NewUsersJoined < BaseSubscription field :users, [User], + hash_key: :users, description: "Includes newly-created users, or all users on the initial load" end class Subscription < GraphQL::Schema::Object - field :toot_was_tooted, subscription: TootWasTooted, extras: [:path, :query] - field :toot_was_tooted, subscription: LegacyTootWasTooted, extras: [:path, :query] + field :toot_was_tooted, subscription: TootWasTooted + field :toot_was_tooted, subscription: LegacyTootWasTooted field :direct_toot_was_tooted, subscription: DirectTootWasTooted field :direct_toot_was_tooted_with_optional_scope, subscription: DirectTootWasTootedWithOptionalScope field :users_joined, subscription: UsersJoined @@ -147,21 +153,26 @@ class Subscription < GraphQL::Schema::Object end class Mutation < GraphQL::Schema::Object - field :toot, Toot, null: false do + field :toot, Toot, null: false, resolve_static: true do argument :body, String end - def toot(body:) + def self.toot(context, body:) handle = context[:viewer][:handle] toot = { handle: handle, body: body } TOOTS << toot SubscriptionFieldSchema.trigger(:toot_was_tooted, {handle: handle}, toot) end + + def toot(body:) + self.class.toot(context, body: body) + end end query(Query) mutation(Mutation) subscription(Subscription) + use GraphQL::Execution::Next if TESTING_EXEC_NEXT rescue_from(StandardError) { |err, *rest| if err.is_a?(GraphQL::Subscriptions::SubscriptionScopeMissingError) @@ -233,7 +244,11 @@ def delete_subscription(subscription_id) end def exec_query(*args, **kwargs) - SubscriptionFieldSchema.execute(*args, **kwargs) + if TESTING_EXEC_NEXT + SubscriptionFieldSchema.execute_next(*args, **kwargs) + else + SubscriptionFieldSchema.execute(*args, **kwargs) + end end def in_memory_subscription_count diff --git a/spec/graphql/subscriptions_spec.rb b/spec/graphql/subscriptions_spec.rb index 55c9cbe6cc..98a24434e6 100644 --- a/spec/graphql/subscriptions_spec.rb +++ b/spec/graphql/subscriptions_spec.rb @@ -627,7 +627,7 @@ def str end describe "building topic string when `prepare:` is given" do - it "doesn't apply with a Subscription class" do + it "does apply with a Subscription class" do query_str = <<-GRAPHQL subscription($type: PayloadType = TWO) { eventSubscription(userId: "3", payloadType: $type) { payload { int } } From b0b737c6ea3155b0696ee3d58858b0bced306766 Mon Sep 17 00:00:00 2001 From: Robert Mosolgo Date: Fri, 27 Mar 2026 11:47:19 -0400 Subject: [PATCH 02/10] Make subscription_spec pass --- lib/graphql/execution.rb | 11 ++++--- lib/graphql/execution/interpreter/runtime.rb | 2 +- lib/graphql/execution/lazy.rb | 2 +- .../execution/next/field_resolve_step.rb | 20 ++++------- lib/graphql/execution/next/runner.rb | 33 +++++++++++-------- lib/graphql/execution_error.rb | 4 +++ lib/graphql/query/context.rb | 2 +- lib/graphql/schema/field.rb | 7 ++-- lib/graphql/schema/resolver.rb | 7 ++++ lib/graphql/schema/subscription.rb | 2 +- .../default_subscription_resolve_extension.rb | 1 - lib/graphql/subscriptions/event.rb | 4 +-- spec/graphql/schema/subscription_spec.rb | 24 +++++++++----- spec/spec_helper.rb | 2 +- 14 files changed, 68 insertions(+), 53 deletions(-) diff --git a/lib/graphql/execution.rb b/lib/graphql/execution.rb index 93ab610206..5088edb604 100644 --- a/lib/graphql/execution.rb +++ b/lib/graphql/execution.rb @@ -10,10 +10,13 @@ module GraphQL module Execution # @api private - class Skip < GraphQL::Error; end + class Skip < GraphQL::RuntimeError + attr_accessor :path + def ast_nodes=(_ignored); end - # Just a singleton for implementing {Query::Context#skip} - # @api private - SKIP = Skip.new + def assign_graphql_result(query, result_data, key) + result_data.delete(key) + end + end end end diff --git a/lib/graphql/execution/interpreter/runtime.rb b/lib/graphql/execution/interpreter/runtime.rb index 4724830817..b5cd59f434 100644 --- a/lib/graphql/execution/interpreter/runtime.rb +++ b/lib/graphql/execution/interpreter/runtime.rb @@ -603,7 +603,7 @@ def continue_value(value, field, is_non_null, ast_node, result_name, selection_r err end continue_value(next_value, field, is_non_null, ast_node, result_name, selection_result) - elsif GraphQL::Execution::SKIP == value + elsif value.is_a?(GraphQL::Execution::Skip) # It's possible a lazy was already written here case selection_result when GraphQLResultHash diff --git a/lib/graphql/execution/lazy.rb b/lib/graphql/execution/lazy.rb index 41849f5f50..3ad9f3e050 100644 --- a/lib/graphql/execution/lazy.rb +++ b/lib/graphql/execution/lazy.rb @@ -38,7 +38,7 @@ def value # (fewer clauses in a hot `case` block), but now it requires special handling here. # I think it's still worth it for the performance win, but if the number of special # cases grows, then maybe it's worth rethinking somehow. - if @value.is_a?(StandardError) && @value != GraphQL::Execution::SKIP + if @value.is_a?(StandardError) && !@value.is_a?(GraphQL::Execution::Skip) raise @value else @value diff --git a/lib/graphql/execution/next/field_resolve_step.rb b/lib/graphql/execution/next/field_resolve_step.rb index a3660034d3..36aa5fce28 100644 --- a/lib/graphql/execution/next/field_resolve_step.rb +++ b/lib/graphql/execution/next/field_resolve_step.rb @@ -79,7 +79,6 @@ def coerce_arguments(argument_owner, ast_arguments_or_hash, run_loads = true) end def coerce_argument_value(arguments, arg_defn, arg_value, run_loads, target_keyword: run_loads ? arg_defn.keyword : arg_defn.graphql_name, as_type: nil) - p [:coerce, arg_defn.path, arg_value, run_loads, target_keyword] arg_t = as_type || arg_defn.type if arg_t.non_null? arg_t = arg_t.of_type @@ -110,7 +109,7 @@ def coerce_argument_value(arguments, arg_defn, arg_value, run_loads, target_keyw arg_value = Array(arg_value) inner_t = arg_t.of_type result = Array.new(arg_value.size) - arg_value.each_with_index { |v, i| coerce_argument_value(result, arg_defn, v, target_keyword: i, as_type: inner_t) } + arg_value.each_with_index { |v, i| coerce_argument_value(result, arg_defn, v, run_loads, target_keyword: i, as_type: inner_t) } result end elsif arg_t.kind.leaf? @@ -181,7 +180,6 @@ def coerce_argument_value(arguments, arg_defn, arg_value, run_loads, target_keyw @runner.add_step(loads_step) end else - p [:assign_arg, target_keyword, arg_value] arguments[target_keyword] = arg_value end nil @@ -237,15 +235,10 @@ def call end def add_graphql_error(err) - if GraphQL::Execution::SKIP.equal?(err) - # pass - nil - else - err.path = path - err.ast_nodes = ast_nodes - @selections_step.query.context.add_error(err) - err - end + err.path = path + err.ast_nodes = ast_nodes + @selections_step.query.context.add_error(err) + err end module AlwaysAuthorized @@ -269,7 +262,8 @@ def build_arguments arguments = coerce_arguments(@field_definition, @ast_node.arguments) # rubocop:disable Development/ContextIsPassedCop @arguments ||= arguments # may have already been set to an error - if @pending_steps.nil? || @pending_steps.size == 0 + if (@pending_steps.nil? || @pending_steps.size == 0) && + @field_results.nil? # Make sure the arguments flow didn't already call through execute_field end end diff --git a/lib/graphql/execution/next/runner.rb b/lib/graphql/execution/next/runner.rb index a68bda0928..ef509884b8 100644 --- a/lib/graphql/execution/next/runner.rb +++ b/lib/graphql/execution/next/runner.rb @@ -172,7 +172,7 @@ def execute if (events = query.context.namespace(:subscriptions)[:events]) && !events.empty? @schema.subscriptions.write_subscription(query, events) end - p [:events, events] + fin_result = if query.context.errors.empty? result else @@ -278,22 +278,26 @@ def check_object_result(query, result_h, static_type, ast_selections, current_ex if (result_type_non_null = result_type.non_null?) result_type = result_type.of_type end + new_result_value = if result_value.is_a?(GraphQL::Error) result_value.path = current_result_path.dup - nil + result_value.assign_graphql_result(query, result_h, key) + result_h.key?(key) ? result_h[key] : :unassigned else if result_type.list? check_list_result(query, result_value, result_type.of_type, ast_selection.selections, current_exec_path, current_result_path, paths_to_check) - elsif result_type.kind.leaf? - result_value - else + elsif !result_type.kind.leaf? check_object_result(query, result_value, result_type, ast_selection.selections, current_exec_path, current_result_path, paths_to_check) + else + result_value end end if new_result_value.nil? && result_type_non_null return nil - else + elsif :unassigned.equal?(new_result_value) + # Do nothing + elsif !new_result_value.equal?(result_value) result_h[key] = new_result_value end end @@ -326,24 +330,25 @@ def check_list_result(query, result_arr, inner_type, ast_selections, current_exe end new_invalid_null = false - result_arr.map!.with_index do |result_item, idx| + result_arr.each_with_index do |result_item, idx| current_result_path << idx new_result = if result_item.is_a?(GraphQL::Error) result_item.path = current_result_path.dup - nil + result_item.assign_graphql_result(query, result_arr, idx) + result_arr[idx] elsif inner_type.list? check_list_result(query, result_item, inner_type.of_type, ast_selections, current_exec_path, current_result_path, paths_to_check) - elsif inner_type.kind.leaf? - result_item - else + elsif !inner_type.kind.leaf? check_object_result(query, result_item, inner_type, ast_selections, current_exec_path, current_result_path, paths_to_check) + else + result_item end if new_result.nil? && inner_type_non_null new_invalid_null = true - nil - else - new_result + result_arr[idx] = nil + elsif !new_result.equal?(result_item) + result_arr[idx] = new_result end ensure current_result_path.pop diff --git a/lib/graphql/execution_error.rb b/lib/graphql/execution_error.rb index ce3f5cf6d3..e3534c9c40 100644 --- a/lib/graphql/execution_error.rb +++ b/lib/graphql/execution_error.rb @@ -36,6 +36,10 @@ def initialize(message, ast_node: nil, ast_nodes: nil, options: nil, extensions: super(message) end + def assign_graphql_result(query, result_data, key) + result_data[key] = nil + end + # @return [Hash] An entry for the response's "errors" key def to_h hash = { diff --git a/lib/graphql/query/context.rb b/lib/graphql/query/context.rb index 67b3e8872a..da454f8c8f 100644 --- a/lib/graphql/query/context.rb +++ b/lib/graphql/query/context.rb @@ -112,7 +112,7 @@ def [](key) # Return this value to tell the runtime # to exclude this field from the response altogether def skip - self[:__graphql_execute_next] ? nil : GraphQL::Execution::SKIP + GraphQL::Execution::Skip.new end # Add error at query-level. diff --git a/lib/graphql/schema/field.rb b/lib/graphql/schema/field.rb index cbe9300983..1fecacdd08 100644 --- a/lib/graphql/schema/field.rb +++ b/lib/graphql/schema/field.rb @@ -665,10 +665,9 @@ def visible?(context) end def authorizes?(context) - method(:authorized?).owner != GraphQL::Schema::Field || ( - (args = context.types.arguments(self)) && - (args.any? { |a| a.authorizes?(context) }) - ) + method(:authorized?).owner != GraphQL::Schema::Field || + ((args = context.types.arguments(self)) && (args.any? { |a| a.authorizes?(context) })) || + (@resolver_class&.authorizes?(context)) end def authorized?(object, args, context) diff --git a/lib/graphql/schema/resolver.rb b/lib/graphql/schema/resolver.rb index bd35f4e3e7..20e25cad13 100644 --- a/lib/graphql/schema/resolver.rb +++ b/lib/graphql/schema/resolver.rb @@ -75,6 +75,9 @@ def call result = if is_authed Schema::Validator.validate!(self.class.validators, object, context, @prepared_arguments, as: @field) call_resolve(@prepared_arguments) + elsif new_return_value.nil? + err = UnauthorizedFieldError.new(object: object, type: @field_resolve_step.parent_type, context: context, field: @field) + context.schema.unauthorized_field(err) else new_return_value end @@ -202,6 +205,10 @@ def authorized?(**inputs) authorize_arguments(args, inputs) end + def self.authorizes?(context) + self.instance_method(:authorized?).owner != GraphQL::Schema::Resolver + end + # Called when an object loaded by `loads:` fails the `.authorized?` check for its resolved GraphQL object type. # # By default, the error is re-raised and passed along to {{Schema.unauthorized_object}}. diff --git a/lib/graphql/schema/subscription.rb b/lib/graphql/schema/subscription.rb index bbd761661d..9b8438a2fb 100644 --- a/lib/graphql/schema/subscription.rb +++ b/lib/graphql/schema/subscription.rb @@ -15,7 +15,7 @@ class Subscription < GraphQL::Schema::Resolver extend GraphQL::Schema::Resolver::HasPayloadType extend GraphQL::Schema::Member::HasFields NO_UPDATE = :no_update - null true + null false # @api private def initialize(object:, context:, field:) diff --git a/lib/graphql/subscriptions/default_subscription_resolve_extension.rb b/lib/graphql/subscriptions/default_subscription_resolve_extension.rb index d6c1bec90a..84e3987b96 100644 --- a/lib/graphql/subscriptions/default_subscription_resolve_extension.rb +++ b/lib/graphql/subscriptions/default_subscription_resolve_extension.rb @@ -22,7 +22,6 @@ def after_resolve(value:, context:, object:, arguments:, **rest) end def self.write_subscription(field, value, arguments, context) - p [field.path, value, arguments] if value.is_a?(GraphQL::ExecutionError) value elsif field.resolver&.method_defined?(:subscription_written?) && diff --git a/lib/graphql/subscriptions/event.rb b/lib/graphql/subscriptions/event.rb index 3ac917cc84..38e4ed0669 100644 --- a/lib/graphql/subscriptions/event.rb +++ b/lib/graphql/subscriptions/event.rb @@ -41,9 +41,7 @@ def self.serialize(_name, arguments, field, scope:, context: GraphQL::Query::Nul subscription = field.resolver || GraphQL::Schema::Subscription arguments = arguments_without_field_extras(field: field, arguments: arguments) normalized_args = stringify_args(field, arguments.to_h, context) - t = subscription.topic_for(arguments: normalized_args, field: field, scope: scope) - p [:topic, t] - t + subscription.topic_for(arguments: normalized_args, field: field, scope: scope) end # @return [String] a logical identifier for this event. (Stable when the query is broadcastable.) diff --git a/spec/graphql/schema/subscription_spec.rb b/spec/graphql/schema/subscription_spec.rb index dacb6b8ac6..3a31e5f316 100644 --- a/spec/graphql/schema/subscription_spec.rb +++ b/spec/graphql/schema/subscription_spec.rb @@ -194,7 +194,8 @@ def self.resolve_type(type, obj, ctx) def self.unauthorized_field(err) path = err.context[:last_path] - raise GraphQL::ExecutionError, "Can't subscribe to private user (#{path})" + p [:raising, err] + raise GraphQL::ExecutionError, "Can't subscribe to private user (#{path || "EXEC_NEXT_NO_PATH"})" end class InMemorySubscriptions < GraphQL::Subscriptions @@ -343,16 +344,16 @@ def in_memory_subscription_count GRAPHQL expected_response = { - "data" => nil, "errors" => [ { "message"=>"No object found for `handle: \"jack\"`", "locations"=>[{"line"=>2, "column"=>9}], "path"=>["tootWasTooted"] } - ] + ], + "data" => nil, } - assert_equal(expected_response, res) + assert_graphql_equal(expected_response, res) assert_equal 0, in_memory_subscription_count end @@ -365,16 +366,16 @@ def in_memory_subscription_count } GRAPHQL expected_response = { - "data"=>nil, "errors"=>[ { - "message"=>"Can't subscribe to private user ([\"tootWasTooted\"])", + "message"=>"Can't subscribe to private user (#{TESTING_EXEC_NEXT ? "EXEC_NEXT_NO_PATH" : "[\"tootWasTooted\"]"})", "locations"=>[{"line"=>2, "column"=>9}], "path"=>["tootWasTooted"] }, ], + "data"=>nil, } - assert_equal(expected_response, res) + assert_graphql_equal(expected_response, res) end it "sends no initial response if :no_response is returned, which is the default" do @@ -611,7 +612,12 @@ def in_memory_subscription_count } GRAPHQL end - expected_message = "Subscription.directTootWasTooted (SubscriptionFieldSchema::DirectTootWasTooted) requires a `scope:` value to trigger updates (Set `subscription_scope ..., optional: true` to disable this requirement)" + plain_expected_message = "Subscription.directTootWasTooted (SubscriptionFieldSchema::DirectTootWasTooted) requires a `scope:` value to trigger updates (Set `subscription_scope ..., optional: true` to disable this requirement)" + expected_message = if TESTING_EXEC_NEXT + "Resolving Subscription.directTootWasTooted: #{plain_expected_message}" + else + plain_expected_message + end assert_equal expected_message, err.message assert_equal 0, in_memory_subscription_count @@ -628,7 +634,7 @@ def in_memory_subscription_count err = assert_raises GraphQL::Subscriptions::SubscriptionScopeMissingError do SubscriptionFieldSchema.subscriptions.trigger(:direct_toot_was_tooted, {}, obj) end - assert_equal expected_message, err.message + assert_equal plain_expected_message, err.message end it "doesn't require subscription scope if `optional: true`" do diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index b5704c2e78..6dda2e97da 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -217,7 +217,7 @@ module Assertions def assert_graphql_equal(data1, data2, message = "GraphQL Result was equal") case data1 when Hash - assert_equal(data1, data2, message) + assert_equal(data1, data2.to_h, message) assert_equal(data1.keys, data2.keys, "Order of keys matched (#{message})") when Array data1.each_with_index do |item1, idx| From 65b36d36f43641ecdbfd752c0f652443b977ee58 Mon Sep 17 00:00:00 2001 From: Robert Mosolgo Date: Fri, 27 Mar 2026 11:51:55 -0400 Subject: [PATCH 03/10] Fix has_authorization_spec --- lib/graphql/schema/field.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/graphql/schema/field.rb b/lib/graphql/schema/field.rb index 1fecacdd08..81ecca0faa 100644 --- a/lib/graphql/schema/field.rb +++ b/lib/graphql/schema/field.rb @@ -667,7 +667,7 @@ def visible?(context) def authorizes?(context) method(:authorized?).owner != GraphQL::Schema::Field || ((args = context.types.arguments(self)) && (args.any? { |a| a.authorizes?(context) })) || - (@resolver_class&.authorizes?(context)) + (@resolver_class&.authorizes?(context)) || false end def authorized?(object, args, context) From b2c2159ec2efa337f2ccd9d5154355e59f4ed34e Mon Sep 17 00:00:00 2001 From: Robert Mosolgo Date: Fri, 27 Mar 2026 13:09:54 -0400 Subject: [PATCH 04/10] Start fixing for subscriptions_spec.rb --- .../execution/next/field_resolve_step.rb | 58 +++++---- .../default_subscription_resolve_extension.rb | 21 ++++ spec/graphql/subscriptions_spec.rb | 114 ++++++++++-------- 3 files changed, 124 insertions(+), 69 deletions(-) diff --git a/lib/graphql/execution/next/field_resolve_step.rb b/lib/graphql/execution/next/field_resolve_step.rb index 36aa5fce28..11672e0c97 100644 --- a/lib/graphql/execution/next/field_resolve_step.rb +++ b/lib/graphql/execution/next/field_resolve_step.rb @@ -54,24 +54,38 @@ def coerce_arguments(argument_owner, ast_arguments_or_hash, run_loads = true) return EmptyObjects::EMPTY_HASH end args_hash = {} + # TODO somehow DRY these loops? if ast_arguments_or_hash.is_a?(Hash) - ast_arguments_or_hash.each do |key, value| - key_s = nil - arg_defn = arg_defns.each_value.find { |a| - a.keyword == key || a.graphql_name == (key_s ||= String(key)) - } - coerce_argument_value(args_hash, arg_defn, value, run_loads) + arg_defns.each do |arg_graphql_name, arg_defn| + given_value = nil + was_found = false + ast_arguments_or_hash.each do |key, value| + if key == arg_defn.keyword || key.to_s == arg_defn.graphql_name + given_value = value + was_found = true + break + end + end + if !was_found && arg_defn.default_value? + given_value = arg_defn.default_value + end + coerce_argument_value(args_hash, arg_defn, given_value, run_loads) end else - ast_arguments_or_hash.each { |arg_node| - arg_defn = arg_defns[arg_node.name] - coerce_argument_value(args_hash, arg_defn, arg_node.value, run_loads) - } - end - # TODO refactor the loop above into this one - arg_defns.each do |arg_graphql_name, arg_defn| - if arg_defn.default_value? && !args_hash.key?(arg_defn.keyword) - coerce_argument_value(args_hash, arg_defn, arg_defn.default_value, run_loads) + arg_defns.each do |arg_graphql_name, arg_defn| + given_value = nil + was_found = false + ast_arguments_or_hash.each do |arg_node| + if arg_node.name == arg_defn.graphql_name + given_value = arg_node.value + was_found = true + break + end + end + if !was_found && arg_defn.default_value? + given_value = arg_defn.default_value + end + coerce_argument_value(args_hash, arg_defn, given_value, run_loads) end end @@ -84,21 +98,21 @@ def coerce_argument_value(arguments, arg_defn, arg_value, run_loads, target_keyw arg_t = arg_t.of_type end - arg_value = if arg_value.is_a?(Language::Nodes::VariableIdentifier) + if arg_value.is_a?(Language::Nodes::VariableIdentifier) vars = @selections_step.query.variables - if vars.key?(arg_value.name) + arg_value = if vars.key?(arg_value.name) vars[arg_value.name] elsif vars.key?(arg_value.name.to_sym) vars[arg_value.name.to_sym] else return # not present end - elsif arg_value.is_a?(Language::Nodes::NullValue) - nil + end + + if arg_value.is_a?(Language::Nodes::NullValue) + arg_value = nil elsif arg_value.is_a?(Language::Nodes::Enum) - arg_value.name - else - arg_value + arg_value = arg_value.name end ctx = @selections_step.query.context diff --git a/lib/graphql/subscriptions/default_subscription_resolve_extension.rb b/lib/graphql/subscriptions/default_subscription_resolve_extension.rb index 84e3987b96..d0908f3392 100644 --- a/lib/graphql/subscriptions/default_subscription_resolve_extension.rb +++ b/lib/graphql/subscriptions/default_subscription_resolve_extension.rb @@ -17,10 +17,30 @@ def resolve(context:, object:, arguments:) end end + def resolve_next(context:, objects:, arguments:) + has_override_implementation = @field.execution_next_mode != :direct_send + + if !has_override_implementation + if context.query.subscription_update? + objects.map(&:object) + else + objects.map { |o| context.skip } + end + else + yield(objects, arguments) + end + end + def after_resolve(value:, context:, object:, arguments:, **rest) self.class.write_subscription(@field, value, arguments, context) end + def after_resolve_next(values:, context:, objects:, arguments:, **rest) + values.map do |value| + self.class.write_subscription(@field, value, arguments, context) + end + end + def self.write_subscription(field, value, arguments, context) if value.is_a?(GraphQL::ExecutionError) value @@ -28,6 +48,7 @@ def self.write_subscription(field, value, arguments, context) (subscription_namespace = context.namespace(:subscriptions)) && (subscriptions_by_path = subscription_namespace[:subscriptions]) (subscription_instance = subscriptions_by_path[context.current_path]) + # If it was already written, don't append this event to be written later if !subscription_instance.subscription_written? events = context.namespace(:subscriptions)[:events] diff --git a/spec/graphql/subscriptions_spec.rb b/spec/graphql/subscriptions_spec.rb index 98a24434e6..dda0e08f43 100644 --- a/spec/graphql/subscriptions_spec.rb +++ b/spec/graphql/subscriptions_spec.rb @@ -228,6 +228,7 @@ class Schema < GraphQL::Schema max_complexity(InMemoryBackend::MAX_COMPLEXITY) complexity_cost_calculation_mode(:future) use GraphQL::Schema::Warden if ADD_WARDEN + use GraphQL::Execution::Next end end @@ -287,6 +288,7 @@ class FromDefinitionInMemoryBackend < InMemoryBackend } Schema = GraphQL::Schema.from_definition(SchemaDefinition, default_resolve: Resolvers, using: {InMemoryBackend::Subscriptions => { extra: 123 }}) Schema.max_complexity(MAX_COMPLEXITY) + Schema.use(GraphQL::Execution::Next) Schema.complexity_cost_calculation_mode(:future) # TODO don't hack this (no way to add metadata from IDL parser right now) Schema.get_field("Subscription", "myEvent").subscription_scope = :me @@ -303,6 +305,14 @@ def to_param end describe GraphQL::Subscriptions do + def exec_query(*args, schema: self.schema, **kwargs) + if TESTING_EXEC_NEXT + schema.execute_next(*args, **kwargs) + else + schema.execute(*args, **kwargs) + end + end + [ClassBasedInMemoryBackend, FromDefinitionInMemoryBackend].each do |in_memory_backend_class| describe "using #{in_memory_backend_class}" do before do @@ -311,7 +321,7 @@ def to_param let(:root_object) { OpenStruct.new( payload: in_memory_backend_class::SubscriptionPayload.new, - ) + ) } let(:schema) { in_memory_backend_class::Schema } @@ -332,8 +342,8 @@ def to_param GRAPHQL # Initial subscriptions - res_1 = schema.execute(query_str, context: { socket: "1" }, variables: { "id" => "100" }, root_value: root_object) - res_2 = schema.execute(query_str, context: { socket: "2" }, variables: { "id" => "200" }, root_value: root_object) + res_1 = exec_query(query_str, context: { socket: "1" }, variables: { "id" => "100" }, root_value: root_object) + res_2 = exec_query(query_str, context: { socket: "2" }, variables: { "id" => "200" }, root_value: root_object) empty_response = {} @@ -359,7 +369,7 @@ def to_param end it "works with the introspection query" do - res = schema.execute("{ __schema { subscriptionType { name } } }") + res = exec_query("{ __schema { subscriptionType { name } } }") assert_equal "Subscription", res["data"]["__schema"]["subscriptionType"]["name"] end @@ -368,14 +378,14 @@ def to_param query_str = "subscription($channel: Int) { filteredStream(channel: $channel) { message } }" # Unfiltered: - schema.execute(query_str, context: { socket: "1", segment: "A" }, variables: {}) + exec_query(query_str, context: { socket: "1", segment: "A" }, variables: {}) # Filtered: - schema.execute(query_str, context: { socket: "2", segment: "A" }, variables: { channel: 1 }) - schema.execute(query_str, context: { socket: "3", segment: "A" }, variables: { channel: 2 }) + exec_query(query_str, context: { socket: "2", segment: "A" }, variables: { channel: 1 }) + exec_query(query_str, context: { socket: "3", segment: "A" }, variables: { channel: 2 }) # Another Subscription scope: - schema.execute(query_str, context: { socket: "4", segment: "B" }, variables: {}) - schema.execute(query_str, context: { socket: "5", segment: "B" }, variables: { channel: 1 }) + exec_query(query_str, context: { socket: "4", segment: "B" }, variables: {}) + exec_query(query_str, context: { socket: "5", segment: "B" }, variables: { channel: 1 }) schema.subscriptions.trigger(:filtered_stream, {}, OpenStruct.new(channel: 1, message: "Message 1"), scope: "A") schema.subscriptions.trigger(:filtered_stream, {}, OpenStruct.new(channel: 2, message: "Message 2"), scope: "A") @@ -404,7 +414,7 @@ def to_param it "runs visibility checks when calling .trigger" do query_str = "subscription { hiddenEvent { int } }" - res_1 = schema.execute(query_str, context: { socket: "1", hidden_event: true }, root_value: root_object) + res_1 = exec_query(query_str, context: { socket: "1", hidden_event: true }, root_value: root_object) assert_equal({}, res_1["data"]) schema.subscriptions.trigger(:hidden_event, {}, root_object.payload, context: { hidden_event: true }) @@ -428,7 +438,7 @@ def to_param document = GraphQL.parse(query_str) # Initial subscriptions - response = schema.execute(nil, document: document, context: { socket: "1" }, variables: { "id" => "100" }, root_value: root_object) + response = exec_query(nil, document: document, context: { socket: "1" }, variables: { "id" => "100" }, root_value: root_object) empty_response = {} @@ -457,7 +467,7 @@ def to_param } GRAPHQL - res = schema.execute(query_str, context: { socket: "1" }, variables: { "id" => "100" }, root_value: root_object) + res = exec_query(query_str, context: { socket: "1" }, variables: { "id" => "100" }, root_value: root_object) assert_equal true, res.key?("errors") assert_equal 0, implementation.events.size assert_equal 0, implementation.queries.size @@ -484,7 +494,7 @@ def str } GRAPHQL - schema.execute(query_str, context: { socket: "1" }, variables: { "id" => "8" }, root_value: root_object) + exec_query(query_str, context: { socket: "1" }, variables: { "id" => "8" }, root_value: root_object) schema.subscriptions.trigger("payload", { "id" => "8"}, root_object.payload) assert_equal ["1"], implementation.pushes end @@ -496,7 +506,7 @@ def str } GRAPHQL - schema.execute(query_str, context: { socket: "1" }, variables: { "id" => "8" }, root_value: root_object) + exec_query(query_str, context: { socket: "1" }, variables: { "id" => "8" }, root_value: root_object) schema.subscriptions.trigger("payload", { "id" => "8"}, OpenStruct.new(str: nil, int: nil)) delivery = deliveries["1"].first assert_nil delivery.fetch("data") @@ -510,7 +520,7 @@ def str } GRAPHQL - schema.execute(query_str, context: { socket: "1" }, variables: { "id" => "8" }, root_value: root_object) + exec_query(query_str, context: { socket: "1" }, variables: { "id" => "8" }, root_value: root_object) assert_equal 1, implementation.events.size sub_id = implementation.queries.keys.first # Mess with the private storage so that `read_subscription` will be nil @@ -532,13 +542,13 @@ def str GRAPHQL # Subscribe with explicit `TYPE` - schema.execute(query_str, context: { socket: "1" }, variables: { "type" => "ONE" }, root_value: root_object) + exec_query(query_str, context: { socket: "1" }, variables: { "type" => "ONE" }, root_value: root_object) # Subscribe with default `TYPE` - schema.execute(query_str, context: { socket: "2" }, root_value: root_object) + exec_query(query_str, context: { socket: "2" }, root_value: root_object) # Subscribe with non-matching `TYPE` - schema.execute(query_str, context: { socket: "3" }, variables: { "type" => "TWO" }, root_value: root_object) + exec_query(query_str, context: { socket: "3" }, variables: { "type" => "TWO" }, root_value: root_object) # Subscribe with explicit null - schema.execute(query_str, context: { socket: "4" }, variables: { "type" => nil }, root_value: root_object) + exec_query(query_str, context: { socket: "4" }, variables: { "type" => nil }, root_value: root_object) # The class-based schema has a "prepare" behavior, so it expects these downcased values in `.trigger` if schema == ClassBasedInMemoryBackend::Schema @@ -579,10 +589,10 @@ def str GRAPHQL # Subscriptions for user 1 - schema.execute(query_str, context: { socket: "1", me: "1" }, variables: { "type" => "ONE" }, root_value: root_object) - schema.execute(query_str, context: { socket: "2", me: "1" }, variables: { "type" => "TWO" }, root_value: root_object) + exec_query(query_str, context: { socket: "1", me: "1" }, variables: { "type" => "ONE" }, root_value: root_object) + exec_query(query_str, context: { socket: "2", me: "1" }, variables: { "type" => "TWO" }, root_value: root_object) # Subscription for user 2 - schema.execute(query_str, context: { socket: "3", me: "2" }, variables: { "type" => "ONE" }, root_value: root_object) + exec_query(query_str, context: { socket: "3", me: "2" }, variables: { "type" => "ONE" }, root_value: root_object) schema.subscriptions.trigger("myEvent", { "payloadType" => "ONE" }, OpenStruct.new(str: "", int: 1), scope: "1") schema.subscriptions.trigger("myEvent", { "payloadType" => "TWO" }, OpenStruct.new(str: "", int: 2), scope: "1") @@ -604,12 +614,12 @@ def str GRAPHQL # Global ID Backed User - schema.execute(query_str, context: { socket: "1", me: GlobalIDUser.new(1) }, variables: { "type" => "ONE" }, root_value: root_object) - schema.execute(query_str, context: { socket: "2", me: GlobalIDUser.new(1) }, variables: { "type" => "TWO" }, root_value: root_object) + exec_query(query_str, context: { socket: "1", me: GlobalIDUser.new(1) }, variables: { "type" => "ONE" }, root_value: root_object) + exec_query(query_str, context: { socket: "2", me: GlobalIDUser.new(1) }, variables: { "type" => "TWO" }, root_value: root_object) # ToParam Backed User - schema.execute(query_str, context: { socket: "3", me: ToParamUser.new(2) }, variables: { "type" => "ONE" }, root_value: root_object) + exec_query(query_str, context: { socket: "3", me: ToParamUser.new(2) }, variables: { "type" => "ONE" }, root_value: root_object) # Array of Objects - schema.execute(query_str, context: { socket: "4", me: [GlobalIDUser.new(4), ToParamUser.new(5)] }, variables: { "type" => "ONE" }, root_value: root_object) + exec_query(query_str, context: { socket: "4", me: [GlobalIDUser.new(4), ToParamUser.new(5)] }, variables: { "type" => "ONE" }, root_value: root_object) schema.subscriptions.trigger("myEvent", { "payloadType" => "ONE" }, OpenStruct.new(str: "", int: 1), scope: GlobalIDUser.new(1)) schema.subscriptions.trigger("myEvent", { "payloadType" => "TWO" }, OpenStruct.new(str: "", int: 2), scope: GlobalIDUser.new(1)) @@ -646,13 +656,13 @@ def str } GRAPHQL # Value from variable - schema.execute(query_str, context: { socket: "1" }, variables: { "type" => "ONE" }, root_value: root_object) + exec_query(query_str, context: { socket: "1" }, variables: { "type" => "ONE" }, root_value: root_object) # Default value for variable - schema.execute(query_str, context: { socket: "1" }, root_value: root_object) + exec_query(query_str, context: { socket: "1" }, root_value: root_object) # Query string literal value - schema.execute(query_str_2, context: { socket: "1" }, root_value: root_object) + exec_query(query_str_2, context: { socket: "1" }, root_value: root_object) # Schema default value - schema.execute(query_str_3, context: { socket: "1" }, root_value: root_object) + exec_query(query_str_3, context: { socket: "1" }, root_value: root_object) # There's no way to add `prepare:` when using SDL, so only the Ruby-defined schema has it expected_sub_count = if schema == ClassBasedInMemoryBackend::Schema @@ -693,13 +703,13 @@ def str } GRAPHQL # Value from variable - schema.execute(query_str, context: { socket: "1" }, variables: { "type" => "ONE" }, root_value: root_object) + exec_query(query_str, context: { socket: "1" }, variables: { "type" => "ONE" }, root_value: root_object) # Default value for variable - schema.execute(query_str, context: { socket: "1" }, root_value: root_object) + exec_query(query_str, context: { socket: "1" }, root_value: root_object) # Query string literal value - schema.execute(query_str_2, context: { socket: "1" }, root_value: root_object) + exec_query(query_str_2, context: { socket: "1" }, root_value: root_object) # Schema default value - schema.execute(query_str_3, context: { socket: "1" }, root_value: root_object) + exec_query(query_str_3, context: { socket: "1" }, root_value: root_object) # There's no way to add `prepare:` when using SDL, so only the Ruby-defined schema has it @@ -722,7 +732,7 @@ def str describe "errors" do it "avoid subscription on resolver error" do - res = schema.execute(<<-GRAPHQL, context: { socket: "1" }, variables: { "id" => "100" }) + res = exec_query(<<-GRAPHQL, context: { socket: "1" }, variables: { "id" => "100" }) subscription ($id: ID!){ failedEvent(id: $id) { str, int } } @@ -740,7 +750,7 @@ def str } GRAPHQL - schema.execute(query_str, context: { socket: "1", me: "1" }, variables: { "type" => "ONE" }, root_value: root_object) + exec_query(query_str, context: { socket: "1", me: "1" }, variables: { "type" => "ONE" }, root_value: root_object) err = assert_raises(RuntimeError) { schema.subscriptions.trigger("myEvent", { "payloadType" => "ONE" }, error_payload_class.new, scope: "1") } @@ -755,7 +765,7 @@ def str } GRAPHQL - schema.execute(query_str, context: { socket: "1", me: "1" }, variables: { "type" => "ONE" }, root_value: root_object) + exec_query(query_str, context: { socket: "1", me: "1" }, variables: { "type" => "ONE" }, root_value: root_object) schema.subscriptions.trigger("myEvent", { "payloadType" => "ONE" }, error_payload_class.new, scope: "1") res = deliveries["1"].first assert_equal "This is handled", res["errors"][0]["message"] @@ -817,7 +827,7 @@ def str } GRAPHQL - res = schema.execute(query_str, context: { socket: "1"}) + res = exec_query(query_str, context: { socket: "1"}) errs = ["Query has complexity of 7, which exceeds max complexity of 5"] assert_equal errs, res["errors"].map { |e| e["message"] } assert_equal 0, implementation.events.size @@ -830,8 +840,8 @@ def str it "can share topics" do schema = ClassBasedInMemoryBackend::Schema schema.subscriptions.reset - schema.execute("subscription { sharedEvent { ok } }", context: { shared_stream: "stream-1", socket: "1" } ) - schema.execute("subscription { otherSharedEvent { ok __typename } }", context: { shared_stream: "stream-1", socket: "2" } ) + exec_query("subscription { sharedEvent { ok } }", context: { shared_stream: "stream-1", socket: "1" }, schema: schema ) + exec_query("subscription { otherSharedEvent { ok __typename } }", context: { shared_stream: "stream-1", socket: "2" }, schema: schema ) schema.subscriptions.trigger(:shared_event, {}, OpenStruct.new(ok: true), scope: "stream-1") schema.subscriptions.trigger(:other_shared_event, {}, OpenStruct.new(ok: false), scope: "stream-1") @@ -891,10 +901,15 @@ class Query < GraphQL::Schema::Object subscription(Subscription) use InMemoryBackend::Subscriptions, extra: nil, broadcast: true, default_broadcastable: true + use GraphQL::Execution::Next end def exec_query(query_str, **options) - BroadcastTrueSchema.execute(query_str, **options) + if TESTING_EXEC_NEXT + BroadcastTrueSchema.execute_next(query_str, **options) + else + BroadcastTrueSchema.execute(query_str, **options) + end end it "broadcasts when possible" do @@ -955,6 +970,7 @@ def update(id:) subscription(Subscription) tracer(ValidationDetectionTracer) use InMemoryBackend::Subscriptions, extra: nil, validate_update: false + use GraphQL::Execution::Next end class SometimesSkipUpdateValidationSchema < GraphQL::Schema @@ -968,6 +984,7 @@ def validate_update?(context:, **_rest) subscription(SkipUpdateValidationSchema::Subscription) tracer(SkipUpdateValidationSchema::ValidationDetectionTracer) use(SometimesSkipSubscriptions, extra: nil) + use GraphQL::Execution::Next end describe "Skipping validation on updates" do @@ -976,8 +993,9 @@ def validate_update?(context:, **_rest) end let(:schema) { SkipUpdateValidationSchema } + it "Skips validation when configured" do - res = schema.execute("subscription { counter(id: \"1\") { value } }", context: { socket: "1" }) + res = exec_query("subscription { counter(id: \"1\") { value } }", context: { socket: "1" }) assert res.context[:was_validated] assert_equal({"validate_true" => 1}, schema::COUNTERS) schema.subscriptions.trigger(:counter, {id: "1"}, {}) @@ -987,8 +1005,8 @@ def validate_update?(context:, **_rest) describe "when the method is overridden" do let(:schema) { SometimesSkipUpdateValidationSchema } it "calls `validate_update?`" do - schema.execute("subscription { counter(id: \"3\") { value } }", context: { socket: "2" }) - schema.execute("subscription { counter(id: \"3\") { value } }", context: { socket: "3", validate_update: true }) + exec_query("subscription { counter(id: \"3\") { value } }", context: { socket: "2" }) + exec_query("subscription { counter(id: \"3\") { value } }", context: { socket: "3", validate_update: true }) assert_equal({"validate_true" => 2}, schema::COUNTERS) schema.subscriptions.trigger(:counter, {id: "3"}, {}) assert_equal({"validate_true" => 3, "validate_false" => 1, "counter_3" => 2}, schema::COUNTERS) @@ -1054,6 +1072,7 @@ class SubscriptionType < GraphQL::Schema::Object class Schema < GraphQL::Schema subscription SubscriptionType use InMemorySubscriptions + use GraphQL::Execution::Next end end @@ -1071,7 +1090,7 @@ class Schema < GraphQL::Schema } GRAPHQL - schema.execute(query_str, variables: { "myEnum" => "ONE" }) + exec_query(query_str, variables: { "myEnum" => "ONE" }) schema.subscriptions.trigger(:mySubscription, { "myEnum" => "ONE" }, nil) @@ -1125,6 +1144,7 @@ class SubscriptionType < GraphQL::Schema::Object class Schema < GraphQL::Schema subscription SubscriptionType use InMemoryBackend + use GraphQL::Execution::Next end end @@ -1147,7 +1167,7 @@ class Schema < GraphQL::Schema } GRAPHQL - schema.execute(query_str, variables: { 'input' => { 'innerInput' => nil } }) + exec_query(query_str, variables: { 'input' => { 'innerInput' => nil } }) schema.subscriptions.trigger(:mySubscription, { 'input' => { 'innerInput' => nil } }, nil) @@ -1164,7 +1184,7 @@ class Schema < GraphQL::Schema } GRAPHQL - schema.execute(query_str, variables: { 'input' => nil }) + exec_query(query_str, variables: { 'input' => nil }) schema.subscriptions.trigger(:mySubscription, { 'input' => nil }, nil) From 79695b233c19a3d32c8c2107b02b2c714a11ee09 Mon Sep 17 00:00:00 2001 From: Robert Mosolgo Date: Fri, 27 Mar 2026 16:03:36 -0400 Subject: [PATCH 05/10] Fix more subscription tests --- .../execution/next/field_resolve_step.rb | 102 +++++++++--------- lib/graphql/schema/resolver.rb | 3 +- .../default_subscription_resolve_extension.rb | 3 +- spec/graphql/schema/subscription_spec.rb | 1 - spec/graphql/subscriptions_spec.rb | 8 +- 5 files changed, 58 insertions(+), 59 deletions(-) diff --git a/lib/graphql/execution/next/field_resolve_step.rb b/lib/graphql/execution/next/field_resolve_step.rb index 11672e0c97..3d18f6d2c7 100644 --- a/lib/graphql/execution/next/field_resolve_step.rb +++ b/lib/graphql/execution/next/field_resolve_step.rb @@ -54,38 +54,59 @@ def coerce_arguments(argument_owner, ast_arguments_or_hash, run_loads = true) return EmptyObjects::EMPTY_HASH end args_hash = {} - # TODO somehow DRY these loops? - if ast_arguments_or_hash.is_a?(Hash) - arg_defns.each do |arg_graphql_name, arg_defn| - given_value = nil - was_found = false + if ast_arguments_or_hash.nil? # This can happen with `.trigger` + return args_hash + end + + arg_inputs_are_h = ast_arguments_or_hash.is_a?(Hash) + + arg_defns.each do |arg_graphql_name, arg_defn| + arg_value = nil + was_found = false + if arg_inputs_are_h ast_arguments_or_hash.each do |key, value| if key == arg_defn.keyword || key.to_s == arg_defn.graphql_name - given_value = value + arg_value = value was_found = true break end end - if !was_found && arg_defn.default_value? - given_value = arg_defn.default_value - end - coerce_argument_value(args_hash, arg_defn, given_value, run_loads) - end - else - arg_defns.each do |arg_graphql_name, arg_defn| - given_value = nil - was_found = false + else ast_arguments_or_hash.each do |arg_node| if arg_node.name == arg_defn.graphql_name - given_value = arg_node.value + arg_value = arg_node.value was_found = true break end end - if !was_found && arg_defn.default_value? - given_value = arg_defn.default_value + end + + if arg_value.is_a?(Language::Nodes::VariableIdentifier) + vars = @selections_step.query.variables + arg_value = if vars.key?(arg_value.name) + vars[arg_value.name] + elsif vars.key?(arg_value.name.to_sym) + vars[arg_value.name.to_sym] + else + was_found = false + nil end - coerce_argument_value(args_hash, arg_defn, given_value, run_loads) + end + + if arg_value.is_a?(Language::Nodes::NullValue) + arg_value = nil + elsif arg_value.is_a?(Language::Nodes::Enum) + arg_value = arg_value.name + end + + if !was_found && arg_defn.default_value? + was_found = true + arg_value = arg_defn.default_value + end + + + if was_found + coerce_argument_value(args_hash, arg_defn, arg_value, run_loads) end end @@ -98,23 +119,6 @@ def coerce_argument_value(arguments, arg_defn, arg_value, run_loads, target_keyw arg_t = arg_t.of_type end - if arg_value.is_a?(Language::Nodes::VariableIdentifier) - vars = @selections_step.query.variables - arg_value = if vars.key?(arg_value.name) - vars[arg_value.name] - elsif vars.key?(arg_value.name.to_sym) - vars[arg_value.name.to_sym] - else - return # not present - end - end - - if arg_value.is_a?(Language::Nodes::NullValue) - arg_value = nil - elsif arg_value.is_a?(Language::Nodes::Enum) - arg_value = arg_value.name - end - ctx = @selections_step.query.context arg_value = if arg_t.list? if arg_value.nil? @@ -638,23 +642,19 @@ def resolve_batch(objects, context, args_hash) method_receiver = @field_definition.dynamic_introspection ? @field_definition.owner : @parent_type case @field_definition.execution_next_mode when :resolve_batch - if args_hash.empty? - method_receiver.public_send(@field_definition.execution_next_mode_key, objects, context) - else + begin method_receiver.public_send(@field_definition.execution_next_mode_key, objects, context, **args_hash) + rescue GraphQL::ExecutionError => exec_err + Array.new(objects.size, exec_err) end when :resolve_static - result = if args_hash.empty? - method_receiver.public_send(@field_definition.execution_next_mode_key, context) - else - method_receiver.public_send(@field_definition.execution_next_mode_key, context, **args_hash) - end + result = method_receiver.public_send(@field_definition.execution_next_mode_key, context, **args_hash) Array.new(objects.size, result) when :resolve_each - if args_hash.empty? - objects.map { |o| method_receiver.public_send(@field_definition.execution_next_mode_key, o, context) } - else - objects.map { |o| method_receiver.public_send(@field_definition.execution_next_mode_key, o, context, **args_hash) } + objects.map do |o| + method_receiver.public_send(@field_definition.execution_next_mode_key, o, context, **args_hash) + rescue GraphQL::ExecutionError => err + err end when :hash_key objects.map { |o| o[@field_definition.execution_next_mode_key] } @@ -716,11 +716,7 @@ def resolve_batch(objects, context, args_hash) if @field_definition.dynamic_introspection obj_inst = @owner.wrap(obj_inst, context) end - if args_hash.empty? - obj_inst.public_send(@field_definition.execution_next_mode_key) - else - obj_inst.public_send(@field_definition.execution_next_mode_key, **args_hash) - end + obj_inst.public_send(@field_definition.execution_next_mode_key, **args_hash) end else raise "Batching execution for #{path} not implemented (execution_next_mode: #{@execution_next_mode.inspect}); provide `resolve_static:`, `resolve_batch:`, `hash_key:`, `method:`, or use a compatibility plug-in" diff --git a/lib/graphql/schema/resolver.rb b/lib/graphql/schema/resolver.rb index 20e25cad13..a5f775d9c9 100644 --- a/lib/graphql/schema/resolver.rb +++ b/lib/graphql/schema/resolver.rb @@ -83,10 +83,11 @@ def call end q = context.query q.current_trace.end_execute_field(field, @prepared_arguments, trace_objs, q, [result]) - if q.subscription? && @field.owner == context.schema.subscription + if q.subscription? && @field.owner == context.schema.subscription && !@subscription_written # TODO unify this -- do it in a single pass @original_arguments = @field_resolve_step.coerce_arguments(@field, @field_resolve_step.ast_node.arguments, false) Subscriptions::DefaultSubscriptionResolveExtension.write_subscription(@field, result, @original_arguments, context) + @subscription_written = true end exec_result[exec_index] = result rescue RuntimeError => err diff --git a/lib/graphql/subscriptions/default_subscription_resolve_extension.rb b/lib/graphql/subscriptions/default_subscription_resolve_extension.rb index d0908f3392..b57dad69cc 100644 --- a/lib/graphql/subscriptions/default_subscription_resolve_extension.rb +++ b/lib/graphql/subscriptions/default_subscription_resolve_extension.rb @@ -22,7 +22,7 @@ def resolve_next(context:, objects:, arguments:) if !has_override_implementation if context.query.subscription_update? - objects.map(&:object) + objects else objects.map { |o| context.skip } end @@ -48,7 +48,6 @@ def self.write_subscription(field, value, arguments, context) (subscription_namespace = context.namespace(:subscriptions)) && (subscriptions_by_path = subscription_namespace[:subscriptions]) (subscription_instance = subscriptions_by_path[context.current_path]) - # If it was already written, don't append this event to be written later if !subscription_instance.subscription_written? events = context.namespace(:subscriptions)[:events] diff --git a/spec/graphql/schema/subscription_spec.rb b/spec/graphql/schema/subscription_spec.rb index 3a31e5f316..d0ac47e79c 100644 --- a/spec/graphql/schema/subscription_spec.rb +++ b/spec/graphql/schema/subscription_spec.rb @@ -194,7 +194,6 @@ def self.resolve_type(type, obj, ctx) def self.unauthorized_field(err) path = err.context[:last_path] - p [:raising, err] raise GraphQL::ExecutionError, "Can't subscribe to private user (#{path || "EXEC_NEXT_NO_PATH"})" end diff --git a/spec/graphql/subscriptions_spec.rb b/spec/graphql/subscriptions_spec.rb index dda0e08f43..127a7e3ba8 100644 --- a/spec/graphql/subscriptions_spec.rb +++ b/spec/graphql/subscriptions_spec.rb @@ -197,14 +197,18 @@ class Subscription < GraphQL::Schema::Object argument :payload_type, PayloadType, required: false end - field :failed_event, Payload, null: false do + field :failed_event, Payload, null: false, resolve_each: true do argument :id, ID end - def failed_event(id:) + def self.failed_event(object, context, id:) raise GraphQL::ExecutionError.new("unauthorized") end + def failed_event(id:) + self.class.failed_event(object, context, id: id) + end + field :filtered_stream, subscription: FilteredStream field :hidden_event, Payload do From 681eb0ca7ed3fcd94b38ec467d82c8ff0f070c3c Mon Sep 17 00:00:00 2001 From: Robert Mosolgo Date: Fri, 27 Mar 2026 16:11:22 -0400 Subject: [PATCH 06/10] Fix array of enum handling --- .../execution/next/field_resolve_step.rb | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/lib/graphql/execution/next/field_resolve_step.rb b/lib/graphql/execution/next/field_resolve_step.rb index 3d18f6d2c7..0d484d4b8e 100644 --- a/lib/graphql/execution/next/field_resolve_step.rb +++ b/lib/graphql/execution/next/field_resolve_step.rb @@ -93,18 +93,11 @@ def coerce_arguments(argument_owner, ast_arguments_or_hash, run_loads = true) end end - if arg_value.is_a?(Language::Nodes::NullValue) - arg_value = nil - elsif arg_value.is_a?(Language::Nodes::Enum) - arg_value = arg_value.name - end - if !was_found && arg_defn.default_value? was_found = true arg_value = arg_defn.default_value end - if was_found coerce_argument_value(args_hash, arg_defn, arg_value, run_loads) end @@ -119,6 +112,23 @@ def coerce_argument_value(arguments, arg_defn, arg_value, run_loads, target_keyw arg_t = arg_t.of_type end + if arg_value.is_a?(Language::Nodes::VariableIdentifier) + vars = @selections_step.query.variables + arg_value = if vars.key?(arg_value.name) + vars[arg_value.name] + elsif vars.key?(arg_value.name.to_sym) + vars[arg_value.name.to_sym] + else + nil + end + end + + if arg_value.is_a?(Language::Nodes::NullValue) + arg_value = nil + elsif arg_value.is_a?(Language::Nodes::Enum) + arg_value = arg_value.name + end + ctx = @selections_step.query.context arg_value = if arg_t.list? if arg_value.nil? From dc6dd1089370d0c17c0707b1af6732d095d24ee3 Mon Sep 17 00:00:00 2001 From: Robert Mosolgo Date: Fri, 27 Mar 2026 16:23:54 -0400 Subject: [PATCH 07/10] DRY subscription state init, fix error spec --- lib/graphql/execution/interpreter.rb | 4 +--- lib/graphql/execution/next/runner.rb | 21 ++++++++++++++------- lib/graphql/subscriptions.rb | 10 ++++++++++ lib/graphql/unauthorized_error.rb | 4 ++++ spec/graphql/schema/subscription_spec.rb | 8 ++------ 5 files changed, 31 insertions(+), 16 deletions(-) diff --git a/lib/graphql/execution/interpreter.rb b/lib/graphql/execution/interpreter.rb index 21d1c0a4f4..34cac43432 100644 --- a/lib/graphql/execution/interpreter.rb +++ b/lib/graphql/execution/interpreter.rb @@ -59,9 +59,7 @@ def run_all(schema, query_options, context: {}, max_complexity: schema.max_compl results = [] queries.each_with_index do |query, idx| if query.subscription? && !query.subscription_update? - subs_namespace = query.context.namespace(:subscriptions) - subs_namespace[:events] = [] - subs_namespace[:subscriptions] = {} + schema.subscriptions.initialize_subscriptions(query) end multiplex.dataloader.append_job { operation = query.selected_operation diff --git a/lib/graphql/execution/next/runner.rb b/lib/graphql/execution/next/runner.rb index ef509884b8..12f2c879d2 100644 --- a/lib/graphql/execution/next/runner.rb +++ b/lib/graphql/execution/next/runner.rb @@ -112,16 +112,10 @@ def execute root_value = query.root_value end - if query.subscription? && !query.subscription_update? - subs_namespace = query.context.namespace(:subscriptions) - subs_namespace[:events] = [] - subs_namespace[:subscriptions] = {} - end - results << { "data" => data } case selected_operation.operation_type - when nil, "query", "subscription" + when nil, "query" isolated_steps[0] << SelectionsStep.new( parent_type: root_type, selections: selected_operation.selections, @@ -145,6 +139,19 @@ def execute query: query, )] end + when "subscription" + if !query.subscription_update? + schema.subscriptions.initialize_subscriptions(query) + end + isolated_steps[0] << SelectionsStep.new( + parent_type: root_type, + selections: selected_operation.selections, + objects: [root_value], + results: [data], + path: EmptyObjects::EMPTY_ARRAY, + runner: self, + query: query, + ) else raise ArgumentError, "Unhandled operation type: #{operation.operation_type.inspect}" end diff --git a/lib/graphql/subscriptions.rb b/lib/graphql/subscriptions.rb index d7f87dc4d8..301e25a129 100644 --- a/lib/graphql/subscriptions.rb +++ b/lib/graphql/subscriptions.rb @@ -239,6 +239,16 @@ def broadcastable?(query_str, **query_options) query.context.namespace(:subscriptions)[:subscription_broadcastable] end + # Called during execution when a new `subscription ...` operation is received + # @param query [GraphQL::Query] + # @return [void] + def initialize_subscriptions(query) + subs_namespace = query.context.namespace(:subscriptions) + subs_namespace[:events] = [] + subs_namespace[:subscriptions] = {} + nil + end + private # Recursively normalize `args` as belonging to `arg_owner`: diff --git a/lib/graphql/unauthorized_error.rb b/lib/graphql/unauthorized_error.rb index c191615548..71ee783328 100644 --- a/lib/graphql/unauthorized_error.rb +++ b/lib/graphql/unauthorized_error.rb @@ -29,5 +29,9 @@ def initialize(message = nil, object: nil, type: nil, context: nil) end attr_accessor :path, :ast_nodes + + def assign_graphql_result(query, result_data, key) + result_data[key] = nil + end end end diff --git a/spec/graphql/schema/subscription_spec.rb b/spec/graphql/schema/subscription_spec.rb index d0ac47e79c..c4a79b2d05 100644 --- a/spec/graphql/schema/subscription_spec.rb +++ b/spec/graphql/schema/subscription_spec.rb @@ -719,11 +719,7 @@ class PrivateSubscription < SubscriptionFieldSchema::BaseSubscription describe "writing during resolution" do class DirectWriteSchema < GraphQL::Schema - class WriteCheckSubscriptions - def use(schema) - schema.subscriptions = self - end - + class WriteCheckSubscriptions < GraphQL::Subscriptions def write_subscription(query, events) query.context[:write_subscription_count] ||= 0 query.context[:write_subscription_count] += 1 @@ -762,7 +758,7 @@ class Subscription < GraphQL::Schema::Object field :direct_twice, subscription: DirectWriteTwice end - use WriteCheckSubscriptions.new + use WriteCheckSubscriptions subscription(Subscription) end From 2072619433f5c852bc837b2bd0274be49d081722 Mon Sep 17 00:00:00 2001 From: Robert Mosolgo Date: Sat, 28 Mar 2026 05:10:48 -0400 Subject: [PATCH 08/10] Only setup subscriptions on valid queries --- lib/graphql/execution/interpreter.rb | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/lib/graphql/execution/interpreter.rb b/lib/graphql/execution/interpreter.rb index 34cac43432..c8270e5af9 100644 --- a/lib/graphql/execution/interpreter.rb +++ b/lib/graphql/execution/interpreter.rb @@ -58,9 +58,6 @@ def run_all(schema, query_options, context: {}, max_complexity: schema.max_compl # Do as much eager evaluation of the query as possible results = [] queries.each_with_index do |query, idx| - if query.subscription? && !query.subscription_update? - schema.subscriptions.initialize_subscriptions(query) - end multiplex.dataloader.append_job { operation = query.selected_operation result = if operation.nil? || !query.valid? || !query.context.errors.empty? @@ -72,7 +69,9 @@ def run_all(schema, query_options, context: {}, max_complexity: schema.max_compl # in particular, assign it here: runtime = Runtime.new(query: query) query.context.namespace(:interpreter_runtime)[:runtime] = runtime - + if query.subscription? && !query.subscription_update? + schema.subscriptions.initialize_subscriptions(query) + end query.current_trace.execute_query(query: query) do runtime.run_eager end From b6ce8d6a214f3c374edba78925274a76a2ab44a7 Mon Sep 17 00:00:00 2001 From: Robert Mosolgo Date: Sat, 28 Mar 2026 05:32:59 -0400 Subject: [PATCH 09/10] Add a finish_subscriptions method --- lib/graphql/execution/interpreter.rb | 4 ++-- lib/graphql/execution/next/runner.rb | 4 ++-- lib/graphql/schema/resolver.rb | 10 ++++------ lib/graphql/subscriptions.rb | 10 ++++++++++ 4 files changed, 18 insertions(+), 10 deletions(-) diff --git a/lib/graphql/execution/interpreter.rb b/lib/graphql/execution/interpreter.rb index c8270e5af9..36ec80b942 100644 --- a/lib/graphql/execution/interpreter.rb +++ b/lib/graphql/execution/interpreter.rb @@ -88,8 +88,8 @@ def run_all(schema, query_options, context: {}, max_complexity: schema.max_compl # Then, find all errors and assign the result to the query object results.each_with_index do |data_result, idx| query = queries[idx] - if (events = query.context.namespace(:subscriptions)[:events]) && !events.empty? - schema.subscriptions.write_subscription(query, events) + if query.subscription? + schema.subscriptions.finish_subscriptions(query) end # Assign the result so that it can be accessed in instrumentation query.result_values = if data_result.equal?(NO_OPERATION) diff --git a/lib/graphql/execution/next/runner.rb b/lib/graphql/execution/next/runner.rb index 12f2c879d2..d3f96b67c1 100644 --- a/lib/graphql/execution/next/runner.rb +++ b/lib/graphql/execution/next/runner.rb @@ -176,8 +176,8 @@ def execute queries.each_with_index.map do |query, idx| result = results[idx] - if (events = query.context.namespace(:subscriptions)[:events]) && !events.empty? - @schema.subscriptions.write_subscription(query, events) + if query.subscription? + @schema.subscriptions.finish_subscriptions(query) end fin_result = if query.context.errors.empty? diff --git a/lib/graphql/schema/resolver.rb b/lib/graphql/schema/resolver.rb index a5f775d9c9..084290313e 100644 --- a/lib/graphql/schema/resolver.rb +++ b/lib/graphql/schema/resolver.rb @@ -74,6 +74,10 @@ def call result = if is_authed Schema::Validator.validate!(self.class.validators, object, context, @prepared_arguments, as: @field) + if q.subscription? && @field.owner == context.schema.subscription + # This needs to use arguments without `loads:` + @original_arguments = @field_resolve_step.coerce_arguments(@field, @field_resolve_step.ast_node.arguments, false) + end call_resolve(@prepared_arguments) elsif new_return_value.nil? err = UnauthorizedFieldError.new(object: object, type: @field_resolve_step.parent_type, context: context, field: @field) @@ -83,12 +87,6 @@ def call end q = context.query q.current_trace.end_execute_field(field, @prepared_arguments, trace_objs, q, [result]) - if q.subscription? && @field.owner == context.schema.subscription && !@subscription_written - # TODO unify this -- do it in a single pass - @original_arguments = @field_resolve_step.coerce_arguments(@field, @field_resolve_step.ast_node.arguments, false) - Subscriptions::DefaultSubscriptionResolveExtension.write_subscription(@field, result, @original_arguments, context) - @subscription_written = true - end exec_result[exec_index] = result rescue RuntimeError => err exec_result[exec_index] = err diff --git a/lib/graphql/subscriptions.rb b/lib/graphql/subscriptions.rb index 301e25a129..005bf176bd 100644 --- a/lib/graphql/subscriptions.rb +++ b/lib/graphql/subscriptions.rb @@ -249,6 +249,16 @@ def initialize_subscriptions(query) nil end + # Called during execution when a subscription operation has finished + # @param query [GraphQL::Query] + # @return [void] + def finish_subscriptions(query) + if (events = query.context.namespace(:subscriptions)[:events]) && !events.empty? + write_subscription(query, events) + end + nil + end + private # Recursively normalize `args` as belonging to `arg_owner`: From 65a49f17d7c8df7caf0f231ce62463ad8701a8a1 Mon Sep 17 00:00:00 2001 From: Robert Mosolgo Date: Sat, 28 Mar 2026 05:44:46 -0400 Subject: [PATCH 10/10] Only finish_subscriptions on valid queries --- lib/graphql/execution/interpreter.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/graphql/execution/interpreter.rb b/lib/graphql/execution/interpreter.rb index 36ec80b942..1688b01fdd 100644 --- a/lib/graphql/execution/interpreter.rb +++ b/lib/graphql/execution/interpreter.rb @@ -88,9 +88,6 @@ def run_all(schema, query_options, context: {}, max_complexity: schema.max_compl # Then, find all errors and assign the result to the query object results.each_with_index do |data_result, idx| query = queries[idx] - if query.subscription? - schema.subscriptions.finish_subscriptions(query) - end # Assign the result so that it can be accessed in instrumentation query.result_values = if data_result.equal?(NO_OPERATION) if !query.valid? || !query.context.errors.empty? @@ -100,6 +97,9 @@ def run_all(schema, query_options, context: {}, max_complexity: schema.max_compl data_result end else + if query.subscription? + schema.subscriptions.finish_subscriptions(query) + end result = {} if !query.context.errors.empty?