github tastyware/streaq v6.0.0b2
tastyware/streaq:v6.0.0b2

pre-release9 hours ago

What's Changed

  • Ability to set custom formatters for task results and exceptions in Web UI by @espdev in #95

    • Show task result/exception info in the adaptive scroll area
    • Add dependency functions get_result_formatter and get_exception_formatter to customize result/exception output
  • Bump astral-sh/setup-uv from 6 to 7 by @dependabot[bot] in #96

  • Update docs for SAQ comparison: abort support by @vikigenius in #100

  • Custom worker id feature by @espdev in #101
    The PR adds ability to set a custom worker id.

  • Place sleep after the first healthcheck, not before it. by @espdev in #104

  • Add the info about exception traceback serialization with tblib to the docs by @espdev in #105

  • Improve the user experience when running tests by @espdev in #107

  • Bump actions/checkout from 5 to 6 by @dependabot[bot] in #110

  • dynamic cron jobs by @Graeme22 in #117
    Moves cron registry to Redis. This allows for dynamically adding/removing cron jobs. RegisteredCron is collapsed into RegisteredTask.

  • Bump python-multipart from 0.0.20 to 0.0.22 by @dependabot[bot] in #120

  • Bump urllib3 from 2.5.0 to 2.6.3 by @dependabot[bot] in #121

  • Bump starlette from 0.48.0 to 0.49.1 by @dependabot[bot] in #122

  • Bump filelock from 3.19.1 to 3.20.3 by @dependabot[bot] in #123

  • Modular changes by @Graeme22 in #125
    Instead of each worker maintaining a list of "coworkers" whose context managers would also have to be entered (which led to a ton of duplicate Redis clients and some messy code with AsyncExitStack), Worker.include() simply modifies the included worker's tasks to point to it instead. At its face this sounds like it could lead to problems, but since a) Only one worker is running per process at a time; and b) Workers that are included in other workers are not aware of that at import time, practically this is a clean and easy solution. To make this a bit more concrete:

    from streaq import Worker
    
    child = Worker()
    
    @child.task()
    async def foobar() -> None: ...
    from streaq import Worker
    
    from child_module import child
    
    parent = Worker()
    parent.include(child)

    Here, we can actually run either worker independently without issues: streaq run child_module:child and streaq run parent_module:parent should both work fine, because when running just the child, the parent.include(child) line will never run.

    The other element of this PR that is quite crucial is detaching the ability to access task and worker context from the worker instance itself. Under the hood this is done with a couple global ContextVars, but in practice it uses a clean, FastAPI-like dependency injection pattern:

    from streaq import TaskContext, TaskDepends, WorkerDepends
    from typing import NamedTuple
    
    class WorkerContext(NamedTuple):
        db_connection: MyDBDriver
    
    @worker.task()
    async def has_dependencies(
        task_context: TaskContext = TaskDepends(),
        worker_context: WorkerContext = WorkerDepends(),
    ) -> None:
        print(task_context.task_id)
        print(worker_context.db_connection)

    What this enables that was not previously possible is a total separation of concerns. Task definitions can now access dependencies without having to know anything about the top level Worker they will eventually be included in. This replaces the previous Worker.context and Worker.task_context() functions altogether. Since coredis 6.0.0 is around the corner, this will likely be the last breaking change for the foreseeable future.

    Finally, this PR adds a few other things in preparation for the v6.0.0 release:

    • CLI makeover: the streaq command has been split into two subcommands, streaq run and streaq web for running a worker and the web UI respectively. As a bonus this allowed for getting some extra coverage that was previously marked with # pragma: no cover.
    • Typing for pipelined tasks is drastically better for both Task.then() and Task.__or__(), which some fancy overloads and a TypeVarTuple that allow for nearly perfect typing of the complex scenario where results from one task are fed into the next.
    • StreaqRetry, StreaqError, and StreaqCancelled have been moved from utils to the streaq.types module.
    • Worker can now be initialized from a coredis.ConnectionPool instead of a connection URI.
    • The Redis health check cron job is now registered at worker startup instead of in the constructor.

New Contributors

Full Changelog: v6.0.0b1...v6.0.0b2

Don't miss a new streaq release

NewReleases is sending notifications on new releases.