Subscriptions / WebSockets
info
Subscriptions & WebSockets are in beta & might change without a major version bump. However, feel free to use them and report any issue you may find on GitHub
Using Subscriptions​
tip
- For a full-stack example have a look at /examples/next-prisma-starter-websockets.
- For a bare-minumum Node.js example see /examples/standalone-server.
Adding a subscription procedure​
server/router.ts
import { initTRPC } from '@trpc/server';
import { observable } from '@trpc/server/observable';
import { EventEmitter } from 'events';
import { z } from 'zod';
// create a global event emitter (could be replaced by redis, etc)
const ee = new EventEmitter();
const t = initTRPC()();
export const appRouter = t.router({
  onAdd: t.procedure.subscription(() => {
    // `resolve()` is triggered for each client when they start subscribing `onAdd`
    // return an `observable` with a callback which is triggered immediately
    return observable<Post>((emit) => {
      const onAdd = (data: Post) => {
        // emit data to client
        emit.next(data);
      };
      // trigger `onAdd()` when `add` is triggered in our event emitter
      ee.on('add', onAdd);
      // unsubscribe function when client disconnects or stops subscribing
      return () => {
        ee.off('add', onAdd);
      };
    });
  }),
  add: t.procedure
    .input(
      z.object({
        id: z.string().uuid().optional(),
        text: z.string().min(1),
      }),
    )
    .mutation(async ({ input }) => {
      const post = { ...input }; /* [..] add to db */
      ee.emit('add', post);
      return post;
    }),
});
Creating a WebSocket-server​
yarn add ws
server/wsServer.ts
import { applyWSSHandler } from '@trpc/server/adapters/ws';
import ws from 'ws';
import { appRouter } from './routers/app';
import { createContext } from './trpc';
const wss = new ws.Server({
  port: 3001,
});
const handler = applyWSSHandler({ wss, router: appRouter, createContext });
wss.on('connection', (ws) => {
  console.log(`➕➕ Connection (${wss.clients.size})`);
  ws.once('close', () => {
    console.log(`âž–âž– Connection (${wss.clients.size})`);
  });
});
console.log('✅ WebSocket Server listening on ws://localhost:3001');
process.on('SIGTERM', () => {
  console.log('SIGTERM');
  handler.broadcastReconnectNotification();
  wss.close();
});
Setting TRPCClient to use WebSockets​
tip
You can use Links to route queries and/or mutations to HTTP transport and subscriptions over WebSockets.
client.ts
import { createWSClient, wsLink } from '@trpc/client';
import { httpBatchLink } from '@trpc/client';
// create persistent WebSocket connection
const wsClient = createWSClient({
  url: `ws://localhost:3001`,
});
// configure TRPCClient to use WebSockets transport
const client = createTRPCClient<AppRouter>({
  links: [
    wsLink({
      client: wsClient,
    }),
  ],
});
// create a proxy function for ease of use
const proxy = createTRPCClientProxy(client);
Using React​
See /examples/next-prisma-starter-websockets.
WebSockets RPC Specification​
You can read more details by drilling into the TypeScript definitions:
query / mutation​
Request​
{
  id: number | string;
  jsonrpc?: '2.0'; // optional
  method: 'query' | 'mutation';
  params: {
    path: string;
    input?: unknown; // <-- pass input of procedure, serialized by transformer
  };
}
Response​
... below, or an error.
{
  id: number | string;
  jsonrpc?: '2.0'; // only defined if included in request
  result: {
    type: 'data'; // always 'data' for mutation / queries
    data: TOutput; // output from procedure
  }
}
subscription / subscription.stop​
Start a subscription​
{
  id: number | string;
  jsonrpc?: '2.0';
  method: 'subscription';
  params: {
    path: string;
    input?: unknown; // <-- pass input of procedure, serialized by transformer
  };
}
To cancel a subscription, call subscription.stop​
{
  id: number | string; // <-- id of your created subscription
  jsonrpc?: '2.0';
  method: 'subscription.stop';
}
Subscription response shape​
... below, or an error.
{
  id: number | string;
  jsonrpc?: '2.0';
  result: (
    | {
        type: 'data';
        data: TData; // subscription emitted data
      }
    | {
        type: 'started'; // subscription started
      }
    | {
        type: 'stopped'; // subscription stopped
      }
  )
}
Errors​
See https://www.jsonrpc.org/specification#error_object or Error Formatting.
Notifications from Server to Client​
{ id: null, type: 'reconnect' }​
Tells clients to reconnect before shutting down server. Invoked by wssHandler.broadcastReconnectNotification().