@@ -46,25 +46,42 @@ func subscribe(
46
46
)
47
47
48
48
if let sourceStream = sourceResult. stream {
49
- let subscriptionStream = sourceStream. map { eventPayload -> GraphQLResult in
50
- // For each payload yielded from a subscription, map it over the normal
51
- // GraphQL `execute` function, with `payload` as the rootValue.
52
- // This implements the "MapSourceToResponseEvent" algorithm described in
53
- // the GraphQL specification. The `execute` function provides the
54
- // "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
55
- // "ExecuteQuery" algorithm, for which `execute` is also used.
56
- try await execute (
57
- queryStrategy: queryStrategy,
58
- mutationStrategy: mutationStrategy,
59
- subscriptionStrategy: subscriptionStrategy,
60
- instrumentation: instrumentation,
61
- schema: schema,
62
- documentAST: documentAST,
63
- rootValue: eventPayload,
64
- context: context,
65
- variableValues: variableValues,
66
- operationName: operationName
67
- )
49
+ // We must create a new AsyncSequence because AsyncSequence.map requires a concrete type
50
+ // (which we cannot know),
51
+ // and we need the result to be a concrete type.
52
+ let subscriptionStream = AsyncThrowingStream < GraphQLResult , Error > { continuation in
53
+ let task = Task {
54
+ do {
55
+ for try await eventPayload in sourceStream {
56
+ // For each payload yielded from a subscription, map it over the normal
57
+ // GraphQL `execute` function, with `payload` as the rootValue.
58
+ // This implements the "MapSourceToResponseEvent" algorithm described in
59
+ // the GraphQL specification. The `execute` function provides the
60
+ // "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
61
+ // "ExecuteQuery" algorithm, for which `execute` is also used.
62
+ let newEvent = try await execute (
63
+ queryStrategy: queryStrategy,
64
+ mutationStrategy: mutationStrategy,
65
+ subscriptionStrategy: subscriptionStrategy,
66
+ instrumentation: instrumentation,
67
+ schema: schema,
68
+ documentAST: documentAST,
69
+ rootValue: eventPayload,
70
+ context: context,
71
+ variableValues: variableValues,
72
+ operationName: operationName
73
+ )
74
+ continuation. yield ( newEvent)
75
+ }
76
+ continuation. finish ( )
77
+ } catch {
78
+ continuation. finish ( throwing: error)
79
+ }
80
+ }
81
+
82
+ continuation. onTermination = { @Sendable reason in
83
+ task. cancel ( )
84
+ }
68
85
}
69
86
return SubscriptionResult ( stream: subscriptionStream, errors: sourceResult. errors)
70
87
} else {
@@ -151,7 +168,7 @@ func createSourceEventStream(
151
168
}
152
169
153
170
func executeSubscription(
154
- context: ExecutionContext ,
171
+ context: ExecutionContext
155
172
) async throws -> SourceEventStreamResult {
156
173
// Get the first node
157
174
let type = try getOperationRootType ( schema: context. schema, operation: context. operation)
@@ -245,7 +262,7 @@ func executeSubscription(
245
262
return SourceEventStreamResult ( errors: context. errors)
246
263
} else if let error = resolved as? GraphQLError {
247
264
return SourceEventStreamResult ( errors: [ error] )
248
- } else if let stream = resolved as? EventStream < Any > {
265
+ } else if let stream = resolved as? any AsyncSequence {
249
266
return SourceEventStreamResult ( stream: stream)
250
267
} else if resolved == nil {
251
268
return SourceEventStreamResult ( errors: [
@@ -255,7 +272,7 @@ func executeSubscription(
255
272
let resolvedObj = resolved as AnyObject
256
273
return SourceEventStreamResult ( errors: [
257
274
GraphQLError (
258
- message: " Subscription field resolver must return EventStream<Any> . Received: ' \( resolvedObj) ' "
275
+ message: " Subscription field resolver must return an AsyncSequence . Received: ' \( resolvedObj) ' "
259
276
) ,
260
277
] )
261
278
}
@@ -266,10 +283,10 @@ func executeSubscription(
266
283
// checking. Normal resolvers for subscription fields should handle type casting, same as resolvers
267
284
// for query fields.
268
285
struct SourceEventStreamResult {
269
- public let stream : EventStream < Any > ?
286
+ public let stream : ( any AsyncSequence ) ?
270
287
public let errors : [ GraphQLError ]
271
288
272
- public init ( stream: EventStream < Any > ? = nil , errors: [ GraphQLError ] = [ ] ) {
289
+ public init ( stream: ( any AsyncSequence ) ? = nil , errors: [ GraphQLError ] = [ ] ) {
273
290
self . stream = stream
274
291
self . errors = errors
275
292
}
0 commit comments