github hatchet-dev/hatchet v0.57.0
v0.57.0 - v1 SDKs

latest releases: v0.72.5, v0.72.4, v0.72.3...
5 months ago

V1 SDK Improvements

This release bundles SDK improvements for Go, Typescript, and Python.

Python SDK Highlights

The Python SDK has a number of notable highlights to showcase for V1. Many of them have been highlighted elsewhere, such as in the migration guide, on the Pydantic page, an in various examples. Here, we'll list out each of them, along with their motivations and benefits.

First and foremost: Many of the changes in the V1 Python SDK are motivated by improved support for type checking and validation across large codebases and in production use-cases. With that in mind, the main highlights in the V1 Python SDK are:

  1. Workflows are now declared with hatchet.workflow, which returns a Workflow object, or hatchet.task (for simple cases) which returns a Standalone object. Workflows then have their corresponding tasks registered with Workflow.task. The Workflow object (and the Standalone object) can be reused easily across the codebase, and has wrapper methods like run and schedule that make it easy to run workflows. In these wrapper methods, inputs to the workflow are type checked, and you no longer need to specify the name of the workflow to run as a magic string.
  2. Tasks have their inputs type checked, and inputs are now Pydantic models. The input field is either the model you provide to the workflow as the input_validator, or is an EmptyModel, which is a helper Pydantic model Hatchet provides and uses as a default.
  3. In the new SDK, we define the parents of a task as a list of Task objects as opposed to as a list of strings. This also allows us to use ctx.task_output(my_task) to access the output of the my_task task in the a downstream task, while allowing that output to be type checked correctly.
  4. In the new SDK, inputs are injected directly into the task as the first positional argument, so the signature of a task now will be Callable[[YourWorkflowInputType, Context]]. This replaces the old method of accessing workflow inputs via context.workflow_input().

Other Breaking Changes

There have been a number of other breaking changes throughout the SDK in V1.

Typing improvements:

  1. External-facing protobuf objects, such as StickyStrategy and ConcurrencyLimitStrategy, have been replaced by native Python enums to make working with them easier.
  2. All external-facing types that are used for triggering workflows, scheduling workflows, etc. are now Pydantic objects, as opposed to being TypedDicts.
  3. The return type of each Task is restricted to a JSONSerializableMapping or a Pydantic model, to better align with what the Hatchet Engine expects.
  4. The ClientConfig now uses Pydantic Settings, and we've removed the static methods on the Client for from_environment and from_config in favor of passing configuration in correctly. See the configuration example for more details.
  5. The REST API wrappers, which previously were under hatchet.rest, have been completely overhauled.

Naming changes:

  1. We no longer have nested aio clients for async methods. Instead, async methods throughout the entire SDK are prefixed by aio_, similar to Langchain's use of the a prefix to indicate async. For example, to run a workflow, you may now either use workflow.run() or workflow.aio_run().
  2. All functions on Hatchet clients are now verbs. For instance, if something was named hatchet.nounVerb before, it now will be something more like hatchet.verb_noun. For example, hatchet.runs.get_result gets the result of a workflow run.
  3. timeout, the execution timeout of a task, has been renamed to execution_timeout for clarity.

Removals:

  1. sync_to_async has been removed. We recommend reading our asyncio documentation for our recommendations on handling blocking work in otherwise async tasks.
  2. The AdminClient has been removed, and refactored into individual clients. For example, if you absolutely need to create a workflow run manually without using Workflow.run or Standalone.run, you can use hatchet.runs.create. This replaces the old hatchet.admin.run_workflow.

Other miscellaneous changes:

  1. As shown in the Pydantic example above, there is no longer a spawn_workflow(s) method on the Context. run is now the preferred method for spawning workflows, which will automatically propagate the parent's metadata to the child workflow.
  2. All times and durations, such as execution_timeout and schedule_timeout, now allow datetime.timedelta objects instead of only allowing strings (e.g. "10s" can be timedelta(seconds=10)).

Other New features

There are a handful of other new features that will make interfacing with the SDK easier, which are listed below.

  1. Concurrency keys using the input to a workflow are now checked for validity at runtime. If the workflow's input_validator does not contain a field that's used in a key, Hatchet will reject the workflow when it's created. For example, if the key is input.user_id, the input_validator Pydantic model must contain a user_id field.
  2. There is now an on_success_task on the Workflow object, which works just like an on-failure task, but it runs after all upstream tasks in the workflow have succeeded.
  3. We've exposed feature clients on the Hatchet client to make it easier to interact with and control your environment.

For example, you can write scripts to find all runs that match certain criteria, and replay or cancel them.

hatchet = Hatchet()

workflows = hatchet.workflows.list()

assert workflows.rows

workflow = workflows.rows[0]

workflow_runs = hatchet.runs.list(workflow_ids=[workflow.metadata.id])

workflow_run_ids = [workflow_run.metadata.id for workflow_run in workflow_runs.rows]

bulk_cancel_by_ids = BulkCancelReplayOpts(ids=workflow_run_ids)

hatchet.runs.bulk_cancel(bulk_cancel_by_ids)

bulk_cancel_by_filters = BulkCancelReplayOpts(
    filters=RunFilter(
        since=datetime.today() - timedelta(days=1),
        until=datetime.now(),
        statuses=[V1TaskStatus.RUNNING],
        workflow_ids=[workflow.metadata.id],
        additional_metadata={"key": "value"},
    )
)

hatchet.runs.bulk_cancel(bulk_cancel_by_filters)

The hatchet client also has clients for workflows (declarations), schedules, crons, metrics (i.e. queue depth), events, and workers.

Typescript SDK Highlights

The Typescript SDK has a number of notable highlights to showcase for V1. Many of them have been highlighted elsewhere, such as in the migration guide, an in various examples. Here, we'll list out each of them, along with their motivations and benefits.

First and foremost: Many of the changes in the V1 Typescript SDK are motivated by improved support for type checking and inference across large codebases and in production use-cases. With that in mind, here are the main highlights:

  1. We've moved away from a pure object-based pattern to a factory pattern for creating your workflows and tasks. This allows for much more flexibility and type safety.

The simplest way to declare a workflow is with hatchet.task.

export const simple = hatchet.task({
  name: "simple",
  fn: (input: SimpleInput) => {
    return {
      TransformedMessage: input.Message.toLowerCase(),
    };
  },
});

This returns an object that you can use to run the task with fully inferred types!

const input = { Message: "Hello, World!" };
// run now
const result = await simple.run(input);
const runReference = await simple.runNoWait(input);

// or in the future
const runAt = new Date(new Date().setHours(12, 0, 0, 0) + 24 * 60 * 60 * 1000);
const scheduled = await simple.schedule(runAt, input);
const cron = await simple.cron("simple-daily", "0 0 * * *", input);
  1. DAGs got a similar and can be run the same way. DAGs are now a collection of tasks that are composed by calling .task on the Workflow object.

You can declare your types for DAGs. Output types are checked if there is a corresponding task name as a key in the output type.

type DagInput = {
  Message: string;
};

type DagOutput = {
  reverse: {
    Original: string;
    Transformed: string;
  };
};

export const dag = hatchet.workflow<DagInput, DagOutput>({
  name: "simple",
});

// Next, we declare the tasks bound to the workflow
const toLower = dag.task({
  name: "to-lower",
  fn: (input) => {
    return {
      TransformedMessage: input.Message.toLowerCase(),
    };
  },
});

// Next, we declare the tasks bound to the workflow
dag.task({
  name: "reverse",
  parents: [toLower],
  fn: async (input, ctx) => {
    const lower = await ctx.parentOutput(toLower);
    return {
      Original: input.Message,
      Transformed: lower.TransformedMessage.split("").reverse().join(""),
    };
  },
});
  1. Logical organization of SDK features to make it easier to understand and use.

We've exposed feature clients on the Hatchet client to make it easier to interact with and control your environment.

For example, you can write scripts to find all runs that match certain criteria, and replay or cancel them.

const hatchet = HatchetClient.init();
const { runs } = hatchet;

const allFailedRuns = await runs.list({
  statuses: [WorkflowRunStatus.FAILED],
});

// replay by ids
await runs.replay({ ids: allFailedRuns.rows?.map((r) => r.metadata.id) });

// or you can run bulk operations with filters directly
await runs.cancel({
  filters: {
    since: new Date("2025-03-27"),
    additionalMetadata: { user: "123" },
  },
});

The hatchet client also has clients for workflows (declarations), schedules, crons, metrics (i.e. queue depth), events, and workers.

Go SDK Highlights

The Go SDK has a number of notable highlights to showcase for V1. Many of them have been highlighted elsewhere, such as in the migration guide, an in various examples. Here, we'll list out each of them, along with their motivations and benefits.

  1. Workflows and tasks are now instantiated via a factory pattern which makes it easier to define and run workflows. For example:
type SimpleInput struct {
    Message string
}

type SimpleResult struct {
    TransformedMessage string
}

simple := factory.NewTask(
    create.StandaloneTask{
        Name: "simple-task",
    }, func(ctx worker.HatchetContext, input SimpleInput) (*SimpleResult, error) {
        return &SimpleResult{
            TransformedMessage: strings.ToLower(input.Message),
        }, nil
    },
    hatchet, // a Hatchet client instance
)

// somewhere else in your code
result, err := simple.Run(ctx, SimpleInput{
    Message: "Hello, World!",
})

// result is fully typed!
  1. Instead of passing parent references via []string, you can simply pass task references directly to other tasks in a workflow, reducing the fragility of your code. For example:
simple := factory.NewWorkflow[DagInput, DagResult](
    create.WorkflowCreateOpts[DagInput]{
        Name: "simple-dag",
    },
    hatchet,
)

step1 := simple.Task(
    create.WorkflowTask[DagInput, DagResult]{
        Name: "Step1",
    }, func(ctx worker.HatchetContext, input DagInput) (interface{}, error) {
        // ...
    },
)

simple.Task(
    create.WorkflowTask[DagInput, DagResult]{
        Name: "Step2",
        Parents: []create.NamedTask{
            step1,
        },
    }, func(ctx worker.HatchetContext, input DagInput) (interface{}, error) {
        // getting parent input also uses the task reference, for example:
        var step1Output SimpleOutput
        ctx.ParentOutput(step1, &step1Output)

        // ...
    },
)
  1. Configuring workflows and tasks is much easier, with all configuration options flattened into a single struct.

  2. We've exposed feature clients on the Hatchet client to make it easier to interact with and control your environment.

For example, you can write scripts to find all runs that match certain criteria, and replay or cancel them.

hatchet, err := v1.NewHatchetClient()

if err != nil {
    panic(err)
}

ctx := context.Background()
runs, err := hatchet.Runs().List(ctx, rest.V1WorkflowRunListParams{
    Statuses: &[]rest.V1TaskStatus{rest.V1TaskStatusFAILED},
})

if err != nil {
    panic(err)
}

replayIds := []types.UUID{}
for _, run := range runs.JSON200.Rows {
    replayIds = append(replayIds, uuid.MustParse(run.Metadata.Id))
}

// Replay the runs
hatchet.Runs().Replay(ctx, rest.V1ReplayTaskRequest{
    ExternalIds: &replayIds,
})

// Or run bulk operations with filters directly
hatchet.Runs().Cancel(ctx, rest.V1CancelTaskRequest{
    Filter: &rest.V1TaskFilter{
        Since:              time.Now().Add(-time.Hour * 24),
        AdditionalMetadata: &[]string{"user:123"},
    },
})

The hatchet client also has clients for workflows (declarations), schedules, crons, metrics (i.e. queue depth), events, and workers.

Don't miss a new hatchet release

NewReleases is sending notifications on new releases.