Subscription

Subscriptionとは

クエリによるデータの取得、Mutationによるデータの変更に加えてGraphQL仕様ではsubscriptionと呼ばれる3つ目の操作Typeがサポートされています。GraphQLにおけるSubscriptionとは、サーバからのリアルタイムメッセージを受け取ることを選択したクライアントに対して、サーバからデータをプッシュするための方法です。
Subscriptionは、クライアントに配信するフィールドのセットを指定するという点ではクエリーと似ていますが、すぐに単一の回答を返すのではなく、サーバーで特定のイベントが発生するたびにチャンネルが開かれ、結果がクライアントに送信されるようになっています。
Subscriptionの一般的な使用例は、特定のイベント、例えば新しいオブジェクトの作成やフィールドの更新などをクライアント側に通知することです。(詳細はこちら)

ApolloでSubscriptionを可能にする方法

Subscriptionを可能にするためには、installSubscriptionHandlersプロパティをtrueに設定してください。
GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
installSubscriptionHandlers: true,
}),
注意
installSubscriptionHandlers設定オプションはApolloサーバーの最新版から削除され、このパッケージでもまもなく非推奨となります。デフォルトでは、installSubscriptionHandlerssubscriptions-transport-wsを使うようにフォールバックしますが、代わりにgraphql-wsライブラリを使うことを強く推奨します。
代わりにgraphql-wsパッケージを使うならば、以下の設定を使用します。
GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
subscriptions: {
'graphql-ws': true
},
}),
メモ
また、後方互換性を考慮して両方のパッケージ(subscription-transport-wsgraphql-ws)を同時に使用できます。

コードファースト

コードファーストのアプローチでサブスクリプションを作成するには、@Subscription()デコレーター (@nestjs/graphql パッケージからエクスポート) と、シンプルな publish/subscribe API を提供するgraphql-subscriptionsパッケージのPubSubクラスを使用します。
以下のサブスクリプションハンドラは、PubSub#asyncIteratorを呼び出すことで、イベントの購読を引き受けています。このメソッドは、イベントトピックの名前に対応するtriggerNameという単一の引数を取ります。
const pubSub = new PubSub();
@Resolver((of) => Author)
export class AuthorResolver {
// ...
@Subscription((returns) => Comment)
commentAdded() {
return pubSub.asyncIterator('commentAdded');
}
}
メモ
すべてのデコレータは@nestjs/graphqlパッケージから、PubSubクラスは graphql-subscriptionsパッケージからエクスポートされています。
注意
PubSubは、シンプルなPublish and Subscribe APIを公開するクラスです。詳しくはこちらをご覧ください。Apolloのドキュメントでは、デフォルトの実装は実運用に適さないと警告していることに注意してください。運用アプリケーションでは、外部ストアに支えられたPubSubの実装を使用する必要があります。
これにより、SDLでGraphQLスキーマの以下の部分が作成されます。
type Subscription {
commentAdded(): Comment!
}
Subscriptionは、定義上Subscriptionの名前をキーとする単一のトップレベルプロパティを持つオブジェクトを返すことに注意しましょう。この名前は、サブスクリプションハンドラメソッドの名前から継承されるか (すなわち、上記のcommentAdded) 、あるいは以下に示すように@Subscription()デコレーターの第二引数としてキー名を持つオプションを渡すことによって明示的に提供されます。
@Subscription(returns => Comment, {
name: 'commentAdded',
})
subscribeToCommentAdded() {
return pubSub.asyncIterator('commentAdded');
}
こちらの構文は、前のコードサンプルと同じSDLを作成しますが、メソッド名とSubscriptionを切り離すことができます。

イベントの発行

さて、イベントを発行するにはPubSub#publishメソッドを使用します。これは、オブジェクトグラフの一部が変更されたときにクライアント側の更新をトリガーするために、変異の中でよく使用されます。
posts/post.service.ts
@Mutation(returns => Post)
async addComment(
@Args('postId', { type: () => Int }) postId: number,
@Args('comment', { type: () => Comment }) comment: CommentInput,
) {
const newComment = this.commentsService.addComment({ id: postId, comment });
pubSub.publish('commentAdded', { commentAdded: newComment });
return newComment;
}
PubSub#publishメソッドは、最初のパラメータとして triggerName(繰り返しますが、これはイベントトピック名と考えてください)、2番目のパラメータとしてイベントペイロードを受け取ります。前述のように、サブスクリプションは定義上、値を返し、その値は形状を持っています。commentAddedサブスクリプションの生成された SDL をもう一度見てみましょう。
type Subscription {
commentAdded(): Comment!
}
これは、サブスクリプションが commentAddedというトップレベルのプロパティ名 を持つオブジェクトを返さなければならず、その値がCommentオブジェクトである ことを教えています。注意すべき重要な点は、PubSub#publishメソッドによって発行されるイベントペイロードの形は、サブスクリプションから戻ると期待される値の形に対応していなければならないということです。
したがって、上記の例では、pubSub.publish('commentAdded', { commentAdded: newComment }) 文は、適切な形のペイロードを持つ commentAddedイベントを発行しています。これらの形状が一致しない場合、サブスクリプションはGraphQL検証フェーズで失敗します。

Subscriptionにフィルターをかける

特定のイベントをフィルタリングするには、filterプロパティにフィルタリング関数を設定します。この関数は、配列filterに渡される関数と同じように動作します。この関数は 2 つの引数を取ります: payloadはイベントのペイロード (イベントパブリッシャーから送られたもの) を含み、variablesはサブスクリプション要求の間に渡された任意の引数を取ります。それは、このイベントがクライアントのリスナーに公開されるべきかどうかを決定する booleanを返します。
@Subscription(returns => Comment, {
filter: (payload, variables) =>
payload.commentAdded.title === variables.title,
})
commentAdded(@Args('title') title: string) {
return pubSub.asyncIterator('commentAdded');
}

Subscriptionのペイロード(payload)を変異させる

公開されたイベントペイロードを変更するには、resolveプロパティを関数に設定します。この関数はイベントペイロードを受け取り(イベントパブリッシャーから送信される)、適切な値を返します。
@Subscription(returns => Comment, {
resolve: value => value,
})
commentAdded() {
return pubSub.asyncIterator('commentAdded');
}
メモ
resolveオプションを使用する場合は、ラップされていないペイロードを返す必要があります (たとえば、この例では{ commentAdded: newComment }オブジェクトではなく、newCommentオブジェクトを直接返します)。
注入されたプロバイダにアクセスする必要がある場合(例えば、データを検証するために外部サービスを使用する)、以下の構造を使用します。
@Subscription(returns => Comment, {
resolve(this: AuthorResolver, value) {
// "this" refers to an instance of "AuthorResolver"
return value;
}
})
commentAdded() {
return pubSub.asyncIterator('commentAdded');
}
フィルターも同じ構造で実装できます。
@Subscription(returns => Comment, {
filter(this: AuthorResolver, payload, variables) {
// "this" refers to an instance of "AuthorResolver"
return payload.commentAdded.title === variables.title;
}
})
commentAdded() {
return pubSub.asyncIterator('commentAdded');
}

スキーマファースト

NestJSで同等のSubscriptionを作成するには、@Subscription()デコレータを活用します。
const pubSub = new PubSub();
@Resolver('Author')
export class AuthorResolver {
// ...
@Subscription()
commentAdded() {
return pubSub.asyncIterator('commentAdded');
}
}
コンテキストと引数に基づいて特定のイベントをフィルタリングするときには、filterプロパティを設定します。
@Subscription('commentAdded', {
filter: (payload, variables) =>
payload.commentAdded.title === variables.title,
})
commentAdded() {
return pubSub.asyncIterator('commentAdded');
}
発行されたペイロードを変異させるには、resolve関数が使えます。
@Subscription('commentAdded', {
resolve: value => value,
})
commentAdded() {
return pubSub.asyncIterator('commentAdded');
}
注入されたプロバイダにアクセスする必要がある場合(例えば、データを検証するために外部サービスを使用する)、以下の構造を使用します。
@Subscription('commentAdded', {
resolve(this: AuthorResolver, value) {
// "this" refers to an instance of "AuthorResolver"
return value;
}
})
commentAdded() {
return pubSub.asyncIterator('commentAdded');
}
フィルターも同じような構造で実装します。
@Subscription('commentAdded', {
filter(this: AuthorResolver, payload, variables) {
// "this" refers to an instance of "AuthorResolver"
return payload.commentAdded.title === variables.title;
}
})
commentAdded() {
return pubSub.asyncIterator('commentAdded');
}
最後に、型定義ファイルの更新を行います。(GraphQLファイル側で)
type Author {
id: Int!
firstName: String
lastName: String
posts: [Post]
}
type Post {
id: Int!
title: String
votes: Int
}
type Query {
author(id: Int!): Author
}
type Comment {
id: String
content: String
}
type Subscription {
commentAdded(title: String!): Comment
}
これで、commentAdded(title: String!):CommentのSubscriptionを実装しました。完全なサンプル実装はこちらでご覧いただけます。

PubSub

上記では、ローカルのPubSubインスタンスをインスタンス化しました。望ましい方法は、PubSubをプロバイダとして定義し、コンストラクタを通して (@Inject()デコレータを使用して) それを注入することです。これにより、アプリケーション全体でインスタンスを再利用することができます。例えば、以下のようにプロバイダを定義し、必要な場所に'PUB_SUB'をインジェクトします。
{
provide: 'PUB_SUB',
useValue: new PubSub(),
}

Subscriptionサーバのカスタマイズ

購読サーバーをカスタマイズする(パスを変更するなど)には、subscriptionオプションのプロパティを使用します。
GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
subscriptions: {
'subscriptions-transport-ws': {
path: '/graphql'
},
}
}),
Subscriptionにgraphql-wsパッケージを活用している場合、以下のようにsubscriptions-transport-wsキーをgraphql-wsに置き換えてください。
GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
subscriptions: {
'graphql-ws': {
path: '/graphql'
},
}
}),

WebSocketを経由した認証を実装する

ユーザが認証されているかどうかのチェックは、Subscriptionのオプションで指定できるonConnectコールバック関数の内部で実装できます。
onConnectは、SubscriptionClientに渡されたconnectionParamsを第一引数として受け取れます。
GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
subscriptions: {
'subscriptions-transport-ws': {
onConnect: (connectionParams) => {
const authToken = connectionParams.authToken;
if (!isValid(authToken)) {
throw new Error('Token is not valid');
}
// extract user information from token
const user = parseToken(authToken);
// return user info to add them to the context later
return { user };
},
}
},
context: ({ connection }) => {
// connection.context will be equal to what was returned by the "onConnect" callback
},
}),
上記のプログラムにおけるauthTokenは、接続が最初に確立されたときにクライアントから一度だけ送信されます。この接続で作成されたすべてのサブスクリプションは、同じauthTokenを持ち、したがって同じユーザー情報を持つことになります。
注意
subscriptions-transport-wsには、コネクションがonConnectフェーズをスキップできるバグがあります。ユーザーがサブスクリプションを開始したときに onConnectが呼び出されたと仮定せず、常にコンテキストが投入されていることをチェックする必要があります.
graphql-wsパッケージを使用している場合、onConnectコールバックのシグネチャが若干異なります。
GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
subscriptions: {
'graphql-ws': {
onConnect: (context: Context<any>) => {
const { connectionParams, extra } = context;
// user validation will remain the same as in the example above
// when using with graphql-ws, additional context value should be stored in the extra field
extra.user = { user: {} };
},
},
},
context: ({ extra }) => {
// you can now access your additional context value through the extra field
},
});

Mercurius driverでSubscriptionを有効化する手順

Subscriptionを有効化するには、subscriptionプロパティをtrueに設定します。
GraphQLModule.forRoot<MercuriusDriverConfig>({
driver: MercuriusDriver,
subscription: true,
}),
メモ
また、設定のオブジェクトを渡すことで、カスタムエミッターの設定や、着信接続の検証などを行うことができます。

コードファースト

コードファーストのアプローチでSubscriptionを作成するには、@Subscription()デコレータ(@nestjs/graphqlパッケージからインポート)と、シンプルなAPIを提供するmercuriusパッケージのPubSubクラスを使います。
以下のSubscriptionハンドラは、PubSub#asyncIteratorを呼び出すことで、イベントへのSubscriptionの処理を行います。このメソッドは、イベントトピックの名前に対応するtriggerNameという単一の引数を取ります。
@Resolver((of) => Author)
export class AuthorResolver {
// ...
@Subscription((returns) => Comment)
commentAdded(@Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded');
}
}
メモ
上記の例で使われているデコレータはすべて@nestjs/graphqlパッケージから、PubSubクラスはmercuriusパッケージからエクスポートされています。
注意
PubSubは、シンプルな発行と登録を実装するAPIを公開するクラスです。
これによって、SDLでGraphQLスキーマの以下の部分が作成されます。
type Subscription {
commentAdded(): Comment!
}
Subscriptionは定義上、Subscriptionの名前をキーとする単一のトップレベルプロパティを持つオブジェクトを返すことに注意しましょう。この名前は、サブスクリプションハンドラメソッドの名前から継承されるか (すなわち、上記のcommentAdded) 、あるいは以下に示すように@Subscription()デコレーターの第二引数としてキー名を持つオプションを渡すことによって明示的に提供されます。
@Subscription(returns => Comment, {
name: 'commentAdded',
})
subscribeToCommentAdded(@Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded');
}
上記の構文は、前のコードサンプルと同じSDLを作成しますが、メソッド名とSubscriptionを分離できます。

発行

さて、イベントを発行するためにはPubSub#publishメソッドを使用します。これは、オブジェクトグラフの一部が変更されたときにクライアント側の更新をトリガーするために、変異の中でよく使用されます。
posts/post.resolver.ts
@Mutation(returns => Post)
async addComment(
@Args('postId', { type: () => Int }) postId: number,
@Args('comment', { type: () => Comment }) comment: CommentInput,
@Context('pubsub') pubSub: PubSub,
) {
const newComment = this.commentsService.addComment({ id: postId, comment });
await pubSub.publish({
topic: 'commentAdded',
payload: {
commentAdded: newComment
}
});
return newComment;
}
前述のように、Subscriptionは定義上値を返し、その値は型を持っています。commentAddedSubscriptionが生成したSDLを再度確認してみましょう。
type Subscription {
commentAdded(): Comment!
}
これは、サブスクリプションがcommentAddedというトップレベルのプロパティ名 を持つオブジェクトを返さなければならず、その値がCommentオブジェクトである ことを教えています。注意すべき重要な点は、PubSub#publishメソッドによって発行されるイベントペイロードの形は、サブスクリプションから戻ると期待される値の形に対応していなければならないということです。
したがって、上記の例では、pubSub.publish({ topic: 'commentAdded', payload: { commentAdded: newComment }) ステートメントは、適切な形のペイロードでcommentAddedイベントをパブリッシュします。これらの形状が一致しない場合、サブスクリプションは GraphQL 検証フェーズの間に失敗します。

Subscriptionにフィルターをかける

特定のイベントをフィルタリングするには、filterプロパティにフィルタリング関数を設定します。この関数は、配列フィルターに渡される関数と同じように動作します。この関数は 2 つの引数を取ります。言い換えれば、payloadはイベントのペイロード (イベントパブリッシャーから送られたもの) を含み、variablesはサブスクリプション要求の間に渡された任意の引数を取ります。それは、このイベントがクライアントのリスナーに公開されるべきかどうかを決定する真偽値を返します。
@Subscription(returns => Comment, {
filter: (payload, variables) =>
payload.commentAdded.title === variables.title,
})
commentAdded(@Args('title') title: string, @Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded');
}
注入されたプロバイダにアクセスする必要がある場合(例えば、データを検証するために外部サービスを活用するなど)、以下の構造を使用します。
@Subscription(returns => Comment, {
filter(this: AuthorResolver, payload, variables) {
// "this" refers to an instance of "AuthorResolver"
return payload.commentAdded.title === variables.title;
}
})
commentAdded(@Args('title') title: string, @Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded');
}

スキーマファースト

NestJSで同等のSubscriptionを作成するために、@Subscription()デコレータを使います。
const pubSub = new PubSub();
@Resolver('Author')
export class AuthorResolver {
// ...
@Subscription()
commentAdded(@Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded');
}
}
コンテキストと引数に基づいて特定のイベントをフィルタリングするには、filterプロパティを設定します。
@Subscription('commentAdded', {
filter: (payload, variables) =>
payload.commentAdded.title === variables.title,
})
commentAdded(@Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded');
}
注入されたプロバイダにアクセスする必要がある場合(例えば、データを検証するために外部サービスを活用するなど)、以下の構造を使用します。
@Subscription('commentAdded', {
filter(this: AuthorResolver, payload, variables) {
// "this" refers to an instance of "AuthorResolver"
return payload.commentAdded.title === variables.title;
}
})
commentAdded(@Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded');
}
最後に、GraphQLの型定義ファイルの更新を行います。
type Author {
id: Int!
firstName: String
lastName: String
posts: [Post]
}
type Post {
id: Int!
title: String
votes: Int
}
type Query {
author(id: Int!): Author
}
type Comment {
id: String
content: String
}
type Subscription {
commentAdded(title: String!): Comment
}
これでcommentAdded(title: String!): CommentSubscriptionの実装は完了です。

PubSub

上記の例では、デフォルトのPubSubエミッターを使いました。あるいは、カスタムのPubSub実装を提供できます。
GraphQLModule.forRoot<MercuriusDriverConfig>({
driver: MercuriusDriver,
subscription: {
emitter: require('mqemitter-redis')({
port: 6579,
host: '127.0.0.1',
}),
},
});

WebSocketを経由した認証を実装する

ユーザーが認証されているかどうかのチェックは、サブスクリプションのオプションで指定できるverifyClientコールバック関数の内部で行うことができます。
verifyClientは第一引数としてinfoオブジェクトを受け取り、それを使ってリクエストのヘッダを取得します。
GraphQLModule.forRoot<MercuriusDriverConfig>({
driver: MercuriusDriver,
subscription: {
verifyClient: (info, next) => {
const authorization = info.req.headers?.authorization as string;
if (!authorization?.startsWith('Bearer ')) {
return next(false);
}
next(true);
},
}
}),
Last modified 10mo ago