Publisher helper docs
The Publisher is a helper that enables you to listen to and publish events to subscribers. Combined with the Event Iterator, it allows you to build streaming responses, real-time updates, and server-sent events with minimal requirements.
const publisher = new MemoryPublisher<{
'something-updated': {
id: string
}
}>()
const live = os
.handler(async function* ({ input, signal, lastEventId }) {
const iterator = publisher.subscribe('something-updated', { signal, lastEventId })
for await (const payload of iterator) {
// Handle payload here or yield directly to client
yield payload
}
})
const publish = os
.input(z.object({ id: z.string() }))
.handler(async ({ input }) => {
await publisher.publish('something-updated', { id: input.id })
})
Available Adapters
Name | Resume Support | Description |
---|---|---|
MemoryPublisher
| ✅ | A simple in-memory publisher |
IORedisPublisher
| ✅ | Adapter for ioredis |
UpstashRedisPublisher
| ✅ | Adapter for Upstash Redis |
eventIteratorToUnproxiedDataStream
util
Prefer using eventIteratorToUnproxiedDataStream
over eventIteratorToStream
when integrating oRPC with the AI SDK. The AI SDK uses structuredClone
internally, which doesn't support proxied data. Since oRPC may proxy events to attach metadata, you should unproxy them before passing to the AI SDK.
const { messages, sendMessage, status } = useChat({
transport: {
async sendMessages(options) {
return eventIteratorToUnproxiedDataStream(await client.chat({
chatId: options.chatId,
messages: options.messages,
}, { signal: options.abortSignal }))
},
reconnectToStream(options) {
throw new Error('Unsupported')
},
},
})
🚀 Features
- client, server: Add eventIteratorToUnproxiedDataStream util - by @unnoq in #1110 (e91d2)
- publisher: Memory, ioredis, upstash redis publishers - by @unnoq and Joonseo Lee in #1094 (22ef1)
🏎 Performance
Tip
If you find oRPC valuable and would like to support its development, you can do so here.