github sparckles/Robyn v0.71.0
v0.71.0 - add support for server sent events

latest releases: v0.72.2, v0.72.1, v0.72.0...
5 months ago

What's Changed

"""
Server-Sent Events (SSE) Example for Robyn

This example demonstrates how to implement Server-Sent Events using Robyn,
similar to FastAPI's implementation. SSE allows real-time server-to-client
communication over a single HTTP connection.
"""

import asyncio
import time

from robyn import Robyn, SSE_Message, SSE_Response

app = Robyn(__file__)


@app.get("/events")
def stream_events(request):
    """Basic SSE endpoint that sends a message every second"""

    def event_generator():
        """Generator function that yields SSE-formatted messages"""
        for i in range(10):
            yield SSE_Message(f"Message {i} - {time.strftime('%H:%M:%S')}", id=str(i))
            time.sleep(1)

        # Send a final message
        yield SSE_Message("Stream ended", event="end")

    return SSE_Response(event_generator())


@app.get("/events/json")
def stream_json_events(request):
    """SSE endpoint that sends JSON data"""
    import json

    def json_event_generator():
        """Generator that yields JSON data as SSE"""
        for i in range(5):
            data = {"id": i, "message": f"JSON message {i}", "timestamp": time.time(), "type": "notification"}
            yield SSE_Message(json.dumps(data), event="notification", id=str(i))
            time.sleep(2)

    return SSE_Response(json_event_generator())


@app.get("/events/named")
def stream_named_events(request):
    """SSE endpoint with named events"""

    def named_event_generator():
        """Generator that yields named SSE events"""
        events = [
            ("user_joined", "Alice joined the chat"),
            ("message", "Hello everyone!"),
            ("user_left", "Bob left the chat"),
            ("message", "How is everyone doing?"),
            ("user_joined", "Charlie joined the chat"),
        ]

        for i, (event_type, message) in enumerate(events):
            yield SSE_Message(message, event=event_type, id=str(i))
            time.sleep(1.5)

    return SSE_Response(named_event_generator())


@app.get("/events/async")
async def stream_async_events(request):
    """Async SSE endpoint demonstrating async generators"""

    async def async_event_generator():
        """Async generator for SSE events"""
        for i in range(8):
            # Simulate async work
            await asyncio.sleep(0.5)
            yield SSE_Message(f"Async message {i} - {time.strftime('%H:%M:%S')}", event="async", id=str(i))

    return SSE_Response(async_event_generator())


@app.get("/events/heartbeat")
def stream_heartbeat(request):
    """SSE endpoint that sends heartbeat messages"""

    def heartbeat_generator():
        """Generator that sends heartbeat pings"""
        counter = 0
        while counter < 20:  # Send 20 heartbeats
            yield SSE_Message(f"heartbeat {counter}", event="heartbeat", id=str(counter))
            counter += 1
            time.sleep(0.5)

        yield SSE_Message("heartbeat ended", event="end")

    return SSE_Response(heartbeat_generator())

Full Changelog: v0.70.0...v0.71.0

Don't miss a new Robyn release

NewReleases is sending notifications on new releases.