github tastyware/streaq v3.0.0
tastyware/streaq:v3.0.0

latest releases: v5.2.2, v5.2.1, v5.2.0...
2 months ago

What's Changed

Description

Context is no longer part of task functions' signatures. This means less code, less warnings, and cleaner code everywhere!
API changes:

  • ctx parameter of type WrappedContext as part of task and cron job functions' signatures is now gone. Tasks are expected to define only the parameters they'll work with, nothing more.
  • WrappedContext has been renamed to TaskContext. Rather than wrapping the worker dependencies with additional information, it now just contains the task-specific information (retry count, task ID, etc.) This is no longer accessible from the function signature, but rather as Worker.task_context()
  • WrappedContext.deps of type WD (user-defined dependencies) is also moved, since the ctx parameter is no longer part of task signatures. This is now accessible from the worker: Worker.context.
  • Middleware registered to the worker no longer take a ctx parameter. Middleware are also expected to use Worker.task_context() for task-specific context and Worker.context for user-defined dependencies.

Here's an example demonstrating migration from streaQ v2 to v3.

Old API

@worker.task(timeout=5)
async def fetch(ctx: WrappedContext[Context], url: str) -> int:
    res = await ctx.deps.http_client.get(url)
    return len(res.text)

@worker.cron("* * * * mon-fri")
async def cronjob(ctx: WrappedContext[Context]) -> None:
    print("It's a bird... It's a plane... It's CRON!")

@worker.middleware
def double(ctx: WrappedContext[Context], task: Callable[..., Coroutine]):
    async def wrapper(*args, **kwargs):
        result = await task(*args, **kwargs)
        return result * 2

New API

@worker.task(timeout=5)
async def fetch(url: str) -> int:
    res = await worker.context.http_client.get(url)
    return len(res.text)

@worker.cron("* * * * mon-fri")
async def cronjob() -> None:
    print("It's a bird... It's a plane... It's CRON!")

@worker.middleware
def double(task: ReturnCoroutine) -> ReturnCoroutine:
    async def wrapper(*args, **kwargs) -> Any:
        result = await task(*args, **kwargs)
        return result * 2

Full Changelog: v2.2.2...v3.0.0

Don't miss a new streaq release

NewReleases is sending notifications on new releases.