๐งช Experimental: Non-blocking submission of flow runs to the Runner
web server
You can now submit runs of served flows without blocking the main thread, from inside or outside a flow run. If submitting flows from inside a parent flow, these submitted runs will be tracked as subflows of the parent flow run.
In order to use this feature, you must:
- enable the experimental
Runner
webserver endpoints viaprefect config set PREFECT_EXPERIMENTAL_ENABLE_EXTRA_RUNNER_ENDPOINTS=True
- ensure the
Runner
web server is enabled, either by:- passing
webserver=True
to yourserve
call - enabling the webserver via
prefect config set PREFECT_RUNNER_SERVER_ENABLE=True
- passing
You can then submit any flow available in the import space of the served flow, and you can submit multiple runs at once. If submitting flows from a parent flow, you may optionally block the parent flow run from completing until all submitted runs are complete with wait_for_submitted_runs()
.
Click for an example
import time
from pydantic import BaseModel
from prefect import flow, serve, task
from prefect.runner import submit_to_runner, wait_for_submitted_runs
class Foo(BaseModel):
bar: str
baz: int
class ParentFoo(BaseModel):
foo: Foo
x: int = 42
@task
def noop():
pass
@flow(log_prints=True)
async def child(foo: Foo = Foo(bar="hello", baz=42)):
print(f"received {foo.bar} and {foo.baz}")
print("going to sleep")
noop()
time.sleep(20)
@task
def foo():
time.sleep(2)
@flow(log_prints=True)
def parent(parent_foo: ParentFoo = ParentFoo(foo=Foo(bar="hello", baz=42))):
print(f"I'm a parent and I received {parent_foo=}")
submit_to_runner(
child, [{"foo": Foo(bar="hello", baz=i)} for i in range(9)]
)
foo.submit()
wait_for_submitted_runs() # optionally block until all submitted runs are complete
if __name__ == "__main__":
# either enable the webserver via `webserver=True` or via
# `prefect config set PREFECT_RUNNER_SERVER_ENABLE=True`
serve(parent.to_deployment(__file__), limit=10, webserver=True)
This feature is experimental and subject to change. Please try it out and let us know what you think!
See the PR for implementation details.
๐ Enhancements
- Add
url
toprefect.runtime.flow_run
โ #11686 - Add ability to subpath the
/ui-settings
endpoint โ #11701
๐งฐ Fixes
- Handle
pydantic
v2 types in schema generation for flow parameters โ #11656 - Increase flow run resiliency by gracefully handling
PENDING
toPENDING
state transitions โ #11695
โ๏ธ Documentation
- Add documentation for
cache_result_in_memory
argument forflow
decorator โ #11669 - Add runnable example of
flow.from_source()
โ #11690 - Improve discoverability of creating interactive workflows guide โ #11704
- Fix typo in automations guide โ #11716
- Remove events and incidents from concepts index page โ #11708
- Remove subflow task tag concurrency warning โ #11725
- Remove misleading line on pausing a flow run from the UI โ #11730
- Improve readability of Jinja templating guide in automations concept doc โ #11729
- Resolve links to relocated interactive workflows guide โ #11692
- Fix typo in flows concept documentation โ #11693
๐งโ๐คโ๐ง Contributors
All changes: 2.14.16...2.14.17