V1 SDK Improvements
This release bundles SDK improvements for Go, Typescript, and Python.
Python SDK Highlights
The Python SDK has a number of notable highlights to showcase for V1. Many of them have been highlighted elsewhere, such as in the migration guide, on the Pydantic page, an in various examples. Here, we'll list out each of them, along with their motivations and benefits.
First and foremost: Many of the changes in the V1 Python SDK are motivated by improved support for type checking and validation across large codebases and in production use-cases. With that in mind, the main highlights in the V1 Python SDK are:
- Workflows are now declared with
hatchet.workflow
, which returns aWorkflow
object, orhatchet.task
(for simple cases) which returns aStandalone
object. Workflows then have their corresponding tasks registered withWorkflow.task
. TheWorkflow
object (and theStandalone
object) can be reused easily across the codebase, and has wrapper methods likerun
andschedule
that make it easy to run workflows. In these wrapper methods, inputs to the workflow are type checked, and you no longer need to specify the name of the workflow to run as a magic string. - Tasks have their inputs type checked, and inputs are now Pydantic models. The
input
field is either the model you provide to the workflow as theinput_validator
, or is anEmptyModel
, which is a helper Pydantic model Hatchet provides and uses as a default. - In the new SDK, we define the
parents
of a task as a list ofTask
objects as opposed to as a list of strings. This also allows us to usectx.task_output(my_task)
to access the output of themy_task
task in the a downstream task, while allowing that output to be type checked correctly. - In the new SDK, inputs are injected directly into the task as the first positional argument, so the signature of a task now will be
Callable[[YourWorkflowInputType, Context]]
. This replaces the old method of accessing workflow inputs viacontext.workflow_input()
.
Other Breaking Changes
There have been a number of other breaking changes throughout the SDK in V1.
Typing improvements:
- External-facing protobuf objects, such as
StickyStrategy
andConcurrencyLimitStrategy
, have been replaced by native Python enums to make working with them easier. - All external-facing types that are used for triggering workflows, scheduling workflows, etc. are now Pydantic objects, as opposed to being
TypedDict
s. - The return type of each
Task
is restricted to aJSONSerializableMapping
or a Pydantic model, to better align with what the Hatchet Engine expects. - The
ClientConfig
now uses Pydantic Settings, and we've removed the static methods on the Client forfrom_environment
andfrom_config
in favor of passing configuration in correctly. See the configuration example for more details. - The REST API wrappers, which previously were under
hatchet.rest
, have been completely overhauled.
Naming changes:
- We no longer have nested
aio
clients for async methods. Instead, async methods throughout the entire SDK are prefixed byaio_
, similar to Langchain's use of thea
prefix to indicate async. For example, to run a workflow, you may now either useworkflow.run()
orworkflow.aio_run()
. - All functions on Hatchet clients are now verbs. For instance, if something was named
hatchet.nounVerb
before, it now will be something more likehatchet.verb_noun
. For example,hatchet.runs.get_result
gets the result of a workflow run. timeout
, the execution timeout of a task, has been renamed toexecution_timeout
for clarity.
Removals:
sync_to_async
has been removed. We recommend reading our asyncio documentation for our recommendations on handling blocking work in otherwise async tasks.- The
AdminClient
has been removed, and refactored into individual clients. For example, if you absolutely need to create a workflow run manually without usingWorkflow.run
orStandalone.run
, you can usehatchet.runs.create
. This replaces the oldhatchet.admin.run_workflow
.
Other miscellaneous changes:
- As shown in the Pydantic example above, there is no longer a
spawn_workflow(s)
method on theContext
.run
is now the preferred method for spawning workflows, which will automatically propagate the parent's metadata to the child workflow. - All times and durations, such as
execution_timeout
andschedule_timeout
, now allowdatetime.timedelta
objects instead of only allowing strings (e.g."10s"
can betimedelta(seconds=10)
).
Other New features
There are a handful of other new features that will make interfacing with the SDK easier, which are listed below.
- Concurrency keys using the
input
to a workflow are now checked for validity at runtime. If the workflow'sinput_validator
does not contain a field that's used in a key, Hatchet will reject the workflow when it's created. For example, if the key isinput.user_id
, theinput_validator
Pydantic model must contain auser_id
field. - There is now an
on_success_task
on theWorkflow
object, which works just like an on-failure task, but it runs after all upstream tasks in the workflow have succeeded. - We've exposed feature clients on the Hatchet client to make it easier to interact with and control your environment.
For example, you can write scripts to find all runs that match certain criteria, and replay or cancel them.
hatchet = Hatchet()
workflows = hatchet.workflows.list()
assert workflows.rows
workflow = workflows.rows[0]
workflow_runs = hatchet.runs.list(workflow_ids=[workflow.metadata.id])
workflow_run_ids = [workflow_run.metadata.id for workflow_run in workflow_runs.rows]
bulk_cancel_by_ids = BulkCancelReplayOpts(ids=workflow_run_ids)
hatchet.runs.bulk_cancel(bulk_cancel_by_ids)
bulk_cancel_by_filters = BulkCancelReplayOpts(
filters=RunFilter(
since=datetime.today() - timedelta(days=1),
until=datetime.now(),
statuses=[V1TaskStatus.RUNNING],
workflow_ids=[workflow.metadata.id],
additional_metadata={"key": "value"},
)
)
hatchet.runs.bulk_cancel(bulk_cancel_by_filters)
The hatchet
client also has clients for workflows
(declarations), schedules
, crons
, metrics
(i.e. queue depth), events
, and workers
.
Typescript SDK Highlights
The Typescript SDK has a number of notable highlights to showcase for V1. Many of them have been highlighted elsewhere, such as in the migration guide, an in various examples. Here, we'll list out each of them, along with their motivations and benefits.
First and foremost: Many of the changes in the V1 Typescript SDK are motivated by improved support for type checking and inference across large codebases and in production use-cases. With that in mind, here are the main highlights:
- We've moved away from a pure object-based pattern to a factory pattern for creating your workflows and tasks. This allows for much more flexibility and type safety.
The simplest way to declare a workflow is with hatchet.task
.
export const simple = hatchet.task({
name: "simple",
fn: (input: SimpleInput) => {
return {
TransformedMessage: input.Message.toLowerCase(),
};
},
});
This returns an object that you can use to run the task with fully inferred types!
const input = { Message: "Hello, World!" };
// run now
const result = await simple.run(input);
const runReference = await simple.runNoWait(input);
// or in the future
const runAt = new Date(new Date().setHours(12, 0, 0, 0) + 24 * 60 * 60 * 1000);
const scheduled = await simple.schedule(runAt, input);
const cron = await simple.cron("simple-daily", "0 0 * * *", input);
- DAGs got a similar and can be run the same way. DAGs are now a collection of tasks that are composed by calling
.task
on theWorkflow
object.
You can declare your types for DAGs. Output types are checked if there is a corresponding task name as a key in the output type.
type DagInput = {
Message: string;
};
type DagOutput = {
reverse: {
Original: string;
Transformed: string;
};
};
export const dag = hatchet.workflow<DagInput, DagOutput>({
name: "simple",
});
// Next, we declare the tasks bound to the workflow
const toLower = dag.task({
name: "to-lower",
fn: (input) => {
return {
TransformedMessage: input.Message.toLowerCase(),
};
},
});
// Next, we declare the tasks bound to the workflow
dag.task({
name: "reverse",
parents: [toLower],
fn: async (input, ctx) => {
const lower = await ctx.parentOutput(toLower);
return {
Original: input.Message,
Transformed: lower.TransformedMessage.split("").reverse().join(""),
};
},
});
- Logical organization of SDK features to make it easier to understand and use.
We've exposed feature clients on the Hatchet client to make it easier to interact with and control your environment.
For example, you can write scripts to find all runs that match certain criteria, and replay or cancel them.
const hatchet = HatchetClient.init();
const { runs } = hatchet;
const allFailedRuns = await runs.list({
statuses: [WorkflowRunStatus.FAILED],
});
// replay by ids
await runs.replay({ ids: allFailedRuns.rows?.map((r) => r.metadata.id) });
// or you can run bulk operations with filters directly
await runs.cancel({
filters: {
since: new Date("2025-03-27"),
additionalMetadata: { user: "123" },
},
});
The hatchet
client also has clients for workflows
(declarations), schedules
, crons
, metrics
(i.e. queue depth), events
, and workers
.
Go SDK Highlights
The Go SDK has a number of notable highlights to showcase for V1. Many of them have been highlighted elsewhere, such as in the migration guide, an in various examples. Here, we'll list out each of them, along with their motivations and benefits.
- Workflows and tasks are now instantiated via a factory pattern which makes it easier to define and run workflows. For example:
type SimpleInput struct {
Message string
}
type SimpleResult struct {
TransformedMessage string
}
simple := factory.NewTask(
create.StandaloneTask{
Name: "simple-task",
}, func(ctx worker.HatchetContext, input SimpleInput) (*SimpleResult, error) {
return &SimpleResult{
TransformedMessage: strings.ToLower(input.Message),
}, nil
},
hatchet, // a Hatchet client instance
)
// somewhere else in your code
result, err := simple.Run(ctx, SimpleInput{
Message: "Hello, World!",
})
// result is fully typed!
- Instead of passing parent references via
[]string
, you can simply pass task references directly to other tasks in a workflow, reducing the fragility of your code. For example:
simple := factory.NewWorkflow[DagInput, DagResult](
create.WorkflowCreateOpts[DagInput]{
Name: "simple-dag",
},
hatchet,
)
step1 := simple.Task(
create.WorkflowTask[DagInput, DagResult]{
Name: "Step1",
}, func(ctx worker.HatchetContext, input DagInput) (interface{}, error) {
// ...
},
)
simple.Task(
create.WorkflowTask[DagInput, DagResult]{
Name: "Step2",
Parents: []create.NamedTask{
step1,
},
}, func(ctx worker.HatchetContext, input DagInput) (interface{}, error) {
// getting parent input also uses the task reference, for example:
var step1Output SimpleOutput
ctx.ParentOutput(step1, &step1Output)
// ...
},
)
-
Configuring workflows and tasks is much easier, with all configuration options flattened into a single struct.
-
We've exposed feature clients on the Hatchet client to make it easier to interact with and control your environment.
For example, you can write scripts to find all runs that match certain criteria, and replay or cancel them.
hatchet, err := v1.NewHatchetClient()
if err != nil {
panic(err)
}
ctx := context.Background()
runs, err := hatchet.Runs().List(ctx, rest.V1WorkflowRunListParams{
Statuses: &[]rest.V1TaskStatus{rest.V1TaskStatusFAILED},
})
if err != nil {
panic(err)
}
replayIds := []types.UUID{}
for _, run := range runs.JSON200.Rows {
replayIds = append(replayIds, uuid.MustParse(run.Metadata.Id))
}
// Replay the runs
hatchet.Runs().Replay(ctx, rest.V1ReplayTaskRequest{
ExternalIds: &replayIds,
})
// Or run bulk operations with filters directly
hatchet.Runs().Cancel(ctx, rest.V1CancelTaskRequest{
Filter: &rest.V1TaskFilter{
Since: time.Now().Add(-time.Hour * 24),
AdditionalMetadata: &[]string{"user:123"},
},
})
The hatchet
client also has clients for workflows
(declarations), schedules
, crons
, metrics
(i.e. queue depth), events
, and workers
.