API reference
The Pulse API reference documentation on this page is based on the following schema:
model User {
id Int @id @default(autoincrement())
name String?
email String @unique
}
stream()
stream()
returns an async iterable that receives all database change events related to the table you call this method on.
const stream = await prisma.user.stream();
Because an async iterable is returned, you can use a for await...of
loop to wait for and receive events:
for await (let event of subscription) {
console.log(event);
}
Notes
- Usage of
stream()
requires event persistence to be enabled in your Pulse project. stream()
guarantees that all events will be delivered at least once and in the right order.
Options
You can pass an object with configuration options to stream()
. The object has the following fields:
Name | Description |
---|---|
name | The name of the stream. Providing this option enables "resumability" and will make sure you receive events later if your stream isn't active at the time when the event actually happened (e.g. because your server was down). Note that if name is provided, only one client can be connected to a stream with that particular name at any given time. |
create | An object to specify filters for the create events to be received. If you leave the object empty with create: {} , you will receive all create events. You can filter on any scalar field of your model. |
update | An object with an after field to specify filters for the update events to be received. If you leave the object empty with update: {} , you will receive all update events. The filter is applied to the values of the record after an update has been performed. You can filter on any scalar field of your model. |
delete | An object to specify filters for the delete events to be received. You can filter on any scalar field of your model. |
Return type
When called with no filter arguments, the stream()
method returns the following type:
const stream: PulseSubscription<
| PulseCreateEvent<{
id: number;
name: string | null;
email: string;
}>
| PulseUpdateEvent<{
id: number;
name: string | null;
email: string;
}>
| PulseDeleteEvent<{
id: number;
name: string | null;
email: string;
}>
> = await prisma.user.stream();
Depending on the arguments you provide, the return type may change. For example, if you filter for only create
events, the type will get adjusted:
const stream: PulseSubscription<
PulseCreateEvent<{
id: number;
email: string;
name: string | null;
}>
> = await prisma.user.stream({
create: {},
});
Examples
Use a name
to be able to "resume" the stream
const stream = await prisma.user.stream({
name: "all-user-events",
});
Learn more about resuming streams here.
Filter for new User
records with a non-null value for name
const stream = await prisma.user.stream({
create: {
name: { not: null },
},
});
Filter for updated User
records where the email
ends with @prisma.io
after the update
const stream = await prisma.user.stream({
update: {
after: {
email: { endsWith: "@prisma.io" },
},
},
});
Filter for deleted User
records where the email
contains hello
const stream = await prisma.user.stream({
delete: {
email: { contains: "hello" },
},
});
subscribe()
subscribe()
returns an async iterable that receives all database change events related to the table you call this method on.
const subscription = await prisma.user.subscribe();
Because an async iterable is returned, you can use a for await...of
loop to wait for and receive events:
for await (let event of subscription) {
console.log(event);
}
Notes
subscribe()
guarantees that all events will be delivered at most once. There are no guarantees about the order in which the events arrive.- Events delivered with
subscribe()
are ephemeral, meaning they won't be delivered if your subscription isn't active during the time when the event happens in the database (e.g. because your server is down).
Options
You can pass an object with configuration options to subscribe()
. The object has the following fields:
Name | Description |
---|---|
create | An object to specify filters for the create events to be received. If you leave the object empty with create: {} , you will receive all create events. You can filter on any scalar field of your model. |
update | An object with an after field to specify filters for the update events to be received. If you leave the object empty with update: {} , you will receive all update events. The filter is applied to the values of the record after an update has been performed. You can filter on any scalar field of your model. |
delete | An object to specify filters for the delete events to be received. You can filter on any scalar field of your model. |
Return type
When called with no filter arguments, the subscribe()
method returns the following type:
const subscription: PulseSubscription<
| PulseCreateEvent<{
id: number;
name: string | null;
email: string;
}>
| PulseUpdateEvent<{
id: number;
name: string | null;
email: string;
}>
| PulseDeleteEvent<{
id: number;
name: string | null;
email: string;
}>
> = await prisma.user.subscribe();
Depending on the arguments you provide, the return type may change. For example, if you filter for only create
events, the type will get adjusted:
const subscription: PulseSubscription<
PulseCreateEvent<{
id: number;
email: string;
name: string | null;
}>
> = await prisma.user.subscribe({
create: {},
});
Examples
Filter for new User
records with a non-null value for name
const subscription = await prisma.user.subscribe({
create: {
name: { not: null },
},
});
Filter for updated User
records where the email
ends with @prisma.io
after the update
const subscription = await prisma.user.subscribe({
update: {
after: {
email: { endsWith: "@prisma.io" },
},
},
});
Filter for deleted User
records where the email
contains hello
const subscription = await prisma.user.subscribe({
delete: {
email: { contains: "hello" },
},
});
stream()
vs subscribe()
For the majority of use cases, stream()
is the recommended option because it can provide guarantees for events to arrive on the consumer-side. Note though that because stream()
requires event persistence to be enabled, this has implications for event storage and costs.
See a more detailed comparison.
stop()
Allows you to explicitly stop streams and subscriptions and close the connection. This is needed to ensure that the limited number of subscriptions allowed per table is not exhausted.
For stream()
// Create the stream
const stream = await prisma.user.stream();
// ... Use the stream
// Stop the stream
stream.stop();
For subscribe()
// Create the subscription
const subscription = await prisma.user.subscribe();
// ... Use the subscription
// Stop the subscription
subscription.stop();
PulseCreateEvent<User>
An object of type PulseCreateEvent
is returned by any create
event that happens in the database.
Type
A PulseCreateEvent
has the following fields:
Name | Type | Example value | Description |
---|---|---|---|
id | string | 01HYBEER1JPSBVPG2NQADNQTA6 | A unique identifier / idempotency key following the ULID specification. |
action | string | create | The kind of write-operation performed in the database: create |
created | User | See created in the example below. | An object with the values of the record was just created. |
The type of the event is generic to the fields of your model. In the case, of the User
model above, it looks as follows:
PulseCreateEvent<{
email: string;
name: string | null;
}>;
Example
Here is an example:
{
action: 'create',
created: { id: 3, email: 'jane@prisma.io', name: 'Jane Doe' },
id: '0/2A5A590'
}
PulseUpdateEvent<User>
An object of type PulseUpdateEvent
is returned by any delete
event that happens in the database.
Type
A PulseUpdateEvent
has the following fields:
Name | Type | Example value | Description |
---|---|---|---|
id | string | 01HYBEER1JPSBVPG2NQADNQTA6 | A unique identifier / idempotency key following the ULID specification. |
action | string | update | The kind of write-operation performed in the database: update |
before | User | null | An object with the old values of the record that was just updated. This only works with when the REPLICA IDENTITY in your database is set to FULL . Otherwise the value will always be null . |
after | User | See after in the example below. | An object with the new values of the record that was just updated. |
The type of the event is generic to the fields of your model. In the case, of the User
model above, it looks as follows:
PulseUpdateEvent<{
id: number;
email: string;
name: string | null;
}>;
Example
Without having set the REPLICA IDENDITY
to FULL
:
{
action: 'update',
after: { id: 2, email: 'doe@prisma.io', name: 'Jane Doe' },
before: null,
id: '0/2A5A248'
}
With having set the REPLICA IDENDITY
to FULL
:
{
action: 'update',
after: { id: 2, email: 'support@prisma.io', name: 'Jane Doe' },
before: { id: 2, email: 'support@prisma.io', name: null },
id: '0/2A5A248'
}
To enable REPLICA IDENTITY
with the FULL
setting, you must configure it on a per-table basis, as it is not enabled by default. This requires executing an SQL query against your database. Learn more here.
PulseDeleteEvent<User>
Type
A PulseDeleteEvent
has the following fields:
Name | Type | Example value | Description |
---|---|---|---|
id | string | 01HYBEER1JPSBVPG2NQADNQTA6 | A unique identifier / idempotency key following the ULID specification. |
action | string | delete | The kind of write-operation performed in the database: create , update or delete . |
deleted | User | { id: 3 } | An object with the values of the record that was just deleted. This only works with when the REPLICA IDENTITY in your database is set to FULL . Otherwise the object will only carry an id field. |
The type of the event is generic to the fields of your model. In the case, of the User
model above, it looks as follows:
PulseDeleteEvent<{
id: number;
email: string;
name: string | null;
}>;
Example
Without having set the REPLICA IDENDITY
to FULL
:
{
action: 'delete',
deleted: { id: 1 },
id: '0/2A5A398'
}
With having set the REPLICA IDENDITY
to FULL
:
{
action: 'delete',
deleted: { id: 42, email: 'doe@prisma.io', name: 'Jane Doe' },
id: '0/2A5A398'
}
To enable REPLICA IDENTITY
with the FULL
setting, you must configure it on a per-table basis, as it is not enabled by default. This requires executing an SQL query against your database. Learn more here.