github dagster-io/dagster 0.10.0
0.10.0 The Edge of Glory

latest releases: 1.7.2rc4, 1.7.2rc3, 1.7.2...
3 years ago

0.10.0 The Edge of Glory

Major Changes

  • A native scheduler with support for exactly-once, fault tolerant, timezone-aware scheduling. A new Dagster daemon process has been added to manage your schedules and sensors with a reconciliation loop, ensuring that all runs are executed exactly once, even if the Dagster daemon experiences occasional failure. See the Migration Guide for instructions on moving from SystemCronScheduler or K8sScheduler to the new scheduler.
  • First-class sensors, built on the new Dagster daemon, allow you to instigate runs based on changes in external state - for example, files on S3 or assets materialized by other Dagster pipelines. See the Sensors Overview for more information.
  • Dagster now supports pipeline run queueing. You can apply instance-level run concurrency limits and prioritization rules by adding the QueuedRunCoordinator to your Dagster instance. See the Run Concurrency Overview for more information.
  • The IOManager abstraction provides a new, streamlined primitive for granular control over where and how solid outputs are stored and loaded. This is intended to replace the (deprecated) intermediate/system storage abstractions, See the IO Manager Overview for more information.
  • A new Partitions page in Dagit lets you view your your pipeline runs organized by partition. You can also launch backfills from Dagit and monitor them from this page.
  • A new Instance Status page in Dagit lets you monitor the health of your Dagster instance, with repository location information, daemon statuses, instance-level schedule and sensor information, and linkable instance configuration.
  • Resources can now declare their dependencies on other resources via the required_resource_keys parameter on @resource.
  • Our support for deploying on Kubernetes is now mature and battle-tested Our Helm chart is now easier to configure and deploy, and we’ve made big investments in observability and reliability. You can view Kubernetes interactions in the structured event log and use Dagit to help you understand what’s happening in your deployment. The defaults in the Helm chart will give you graceful degradation and failure recovery right out of the box.
  • Experimental support for dynamic orchestration with the new DynamicOutputDefinition API. Dagster can now map the downstream dependencies over a dynamic output at runtime.

Breaking Changes

Dropping Python 2 support

  • We’ve dropped support for Python 2.7, based on community usage and enthusiasm for Python 3-native public APIs.

Removal of deprecated APIs

These APIs were marked for deprecation with warnings in the 0.9.0 release, and have been removed in the 0.10.0 release.

  • The decorator input_hydration_config has been removed. Use the dagster_type_loader decorator instead.
  • The decorator output_materialization_config has been removed. Use dagster_type_materializer instead.
  • The system storage subsystem has been removed. This includes SystemStorageDefinition, @system_storage, and default_system_storage_defs . Use the new IOManagers API instead. See the IO Manager Overview for more information.
  • The config_field argument on decorators and definitions classes has been removed and replaced with config_schema. This is a drop-in rename.
  • The argument step_keys_to_execute to the functions reexecute_pipeline and reexecute_pipeline_iterator has been removed. Use the step_selection argument to select subsets for execution instead.
  • Repositories can no longer be loaded using the legacy repository key in your workspace.yaml; use load_from instead. See the
    Workspaces Overview for documentation about how to define a workspace.

Breaking API Changes

  • SolidExecutionResult.compute_output_event_dict has been renamed to SolidExecutionResult.compute_output_events_dict. A solid execution result is returned from methods such as result_for_solid. Any call sites will need to be updated.
  • The .compute suffix is no longer applied to step keys. Step keys that were previously named my_solid.compute will now be named my_solid. If you are using any API method that takes a step_selection argument, you will need to update the step keys accordingly.
  • The pipeline_def property has been removed from the InitResourceContext passed to functions decorated with @resource.

Helm Chart

  • The schema for the scheduler values in the helm chart has changed. Instead of a simple toggle on/off, we now require an explicit scheduler.type to specify usage of the DagsterDaemonScheduler, K8sScheduler, or otherwise. If your specified scheduler.type has required config, these fields must be specified under scheduler.config.
  • snake_case fields have been changed to camelCase. Please update your values.yaml as follows:
    • pipeline_runpipelineRun
    • dagster_homedagsterHome
    • env_secretsenvSecrets
    • env_config_mapsenvConfigMaps
  • The Helm values celery and k8sRunLauncher have now been consolidated under the Helm value runLauncher for simplicity. Use the field runLauncher.type to specify usage of the K8sRunLauncher, CeleryK8sRunLauncher, or otherwise. By default, the K8sRunLauncher is enabled.
  • All Celery message brokers (i.e. RabbitMQ and Redis) are disabled by default. If you are using the CeleryK8sRunLauncher, you should explicitly enable your message broker of choice.
  • userDeployments are now enabled by default.

Core

  • Event log messages streamed to stdout and stderr have been streamlined to be a single line per event.

  • Experimental support for memoization and versioning lets you execute pipelines incrementally, selecting which solids need to be rerun based on runtime criteria and versioning their outputs with configurable identifiers that capture their upstream dependencies.

    To set up memoized step selection, users can provide a MemoizableIOManager, whose has_output function decides whether a given solid output needs to be computed or already exists. To execute a pipeline with memoized step selection, users can supply the dagster/is_memoized_run run tag to execute_pipeline.

    To set the version on a solid or resource, users can supply the version field on the definition. To access the derived version for a step output, users can access the version field on the OutputContext passed to the handle_output and load_input methods of IOManager and the has_output method of MemoizableIOManager.

  • Schedules that are executed using the new DagsterDaemonScheduler can now execute in any timezone by adding an execution_timezone parameter to the schedule. Daylight Savings Time transitions are also supported. See the Schedules Overview for more information and examples.

Dagit

  • Countdown and refresh buttons have been added for pages with regular polling queries (e.g. Runs, Schedules).
  • Confirmation and progress dialogs are now presented when performing run terminations and deletions. Additionally, hanging/orphaned runs can now be forced to terminate, by selecting "Force termination immediately" in the run termination dialog.
  • The Runs page now shows counts for "Queued" and "In progress" tabs, and individual run pages show timing, tags, and configuration metadata.
  • The backfill experience has been improved with means to view progress and terminate the entire backfill via the partition set page. Additionally, errors related to backfills are now surfaced more clearly.
  • Shortcut hints are no longer displayed when attempting to use the screen capture command.
  • The asset page has been revamped to include a table of events and enable organizing events by partition. Asset key escaping issues in other views have been fixed as well.
  • Miscellaneous bug fixes, frontend performance tweaks, and other improvements are also included.

Kubernetes/Helm

Helm

  • We've added schema validation to our Helm chart. You can now check that your values YAML file is
    correct by running:

    helm lint helm/dagster -f helm/dagster/values.yaml
  • Added support for resource annotations throughout our Helm chart.

  • Added Helm deployment of the dagster daemon & daemon scheduler.

  • Added Helm support for configuring a compute log manager in your dagster instance.

  • User code deployments now include a user ConfigMap by default.

  • Changed the default liveness probe for Dagit to use httpGet "/dagit_info" instead of tcpSocket:80

Dagster-K8s [Kubernetes]

  • Added support for user code deployments on Kubernetes.
  • Added support for tagging pipeline executions.
  • Fixes to support version 12.0.0 of the Python Kubernetes client.
  • Improved implementation of Kubernetes+Dagster retries.
  • Many logging improvements to surface debugging information and failures in the structured event log.

Dagster-Celery-K8s

  • Improved interrupt/termination handling in Celery workers.

Integrations & Libraries

  • Added a new dagster-docker library with a DockerRunLauncher that launches each run in its own Docker container. (See Deploying with Docker docs for an example.)
  • Added support for AWS Athena. (Thanks @jmsanders!)
  • Added mocks for AWS S3, Athena, and Cloudwatch in tests. (Thanks @jmsanders!)
  • Allow setting of S3 endpoint through env variables. (Thanks @marksteve!)
  • Various bug fixes and new features for the Azure, Databricks, and Dask integrations.
  • Added a create_databricks_job_solid for creating solids that launch Databricks jobs.

Migrating to 0.10.0

Action Required: Run and event storage schema changes

# Run after migrating to 0.10.0

$ dagster instance migrate

This release includes several schema changes to the Dagster storages that improve performance and enable new features like sensors and run queueing. After upgrading to 0.10.0, run the dagster instance migrate command to migrate your instance storage to the latest schema. This will turn off any running schedules, so you will need to restart any previously running schedules after migrating the schema. Before turning them back on, you should follow the steps below to migrate to DagsterDaemonScheduler.

New scheduler: DagsterDaemonScheduler

This release includes a new DagsterDaemonScheduler with improved fault tolerance and full support for timezones. We highly recommend upgrading to the new scheduler during this release. The existing schedulers, SystemCronScheduler and K8sScheduler, are deprecated and will be removed in a future release.

Steps to migrate

Instead of relying on system cron or k8s cron jobs, the DaemonScheduler uses the new dagster-daemon service to run schedules. This requires running the dagster-daemon service as a part of your deployment.

Refer to our deployment documentation for a guides on how to set up and run the daemon process for local development, Docker, or Kubernetes deployments.

If you are currently using the SystemCronScheduler or K8sScheduler:

  1. Stop any currently running schedules, to prevent any dangling cron jobs from being left behind. You can do this through the Dagit UI, or using the following command:

    dagster schedule stop --location {repository_location_name} {schedule_name}

    If you do not stop running schedules before changing schedulers, Dagster will throw an exception on startup due to the misconfigured running schedules.

  2. In your dagster.yaml file, remove the scheduler: entry. If there is no scheduler: entry, the DagsterDaemonScheduler is automatically used as the default scheduler.

  3. Start the dagster-daemon process. Guides can be found in our deployment documentations.

See our schedules troubleshooting guide for help if you experience any problems with the new scheduler.

If you are not using a legacy scheduler:

No migration steps are needed, but make sure you run dagster instance migrate as a part of upgrading to 0.10.0.

Deprecation: Intermediate Storage

We have deprecated the intermediate storage machinery in favor of the new IO manager abstraction, which offers finer-grained control over how inputs and outputs are serialized and persisted. Check out the IO Managers Overview for more information.

Steps to Migrate

  • We have deprecated the top level "storage" and "intermediate_storage" fields on run_config. If you are currently executing pipelines as follows:

    @pipeline
    def my_pipeline():
        ...
    
    execute_pipeline(
        my_pipeline,
        run_config={
            "intermediate_storage": {
                "filesystem": {"base_dir": ...}
            }
        },
    )
    
    execute_pipeline(
        my_pipeline,
        run_config={
            "storage": {
                "filesystem": {"base_dir": ...}
            }
        },
    )

    You should instead use the built-in IO manager fs_io_manager, which can be attached to your pipeline as a resource:

    @pipeline(
        mode_defs=[
            ModeDefinition(
                resource_defs={"io_manager": fs_io_manager}
            )
        ],
    )
    def my_pipeline():
        ...
    
    execute_pipeline(
        my_pipeline,
        run_config={
            "resources": {
                "io_manager": {"config": {"base_dir": ...}}
            }
        },
    )

    There are corresponding IO managers for other intermediate storages, such as the S3- and ADLS2-based storages

  • We have deprecated IntermediateStorageDefinition and @intermediate_storage.

    If you have written custom intermediate storage, you should migrate to custom IO managers defined using the @io_manager API. We have provided a helper method, io_manager_from_intermediate_storage, to help migrate your existing custom intermediate storages to IO managers.

    my_io_manager_def = io_manager_from_intermediate_storage(
        my_intermediate_storage_def
    )
    
    @pipeline(
        mode_defs=[
            ModeDefinition(
                resource_defs={
                    "io_manager": my_io_manager_def
                }
            ),
        ],
    )
    def my_pipeline():
        ...
  • We have deprecated the intermediate_storage_defs argument to ModeDefinition, in favor of the new IO managers, which should be attached using the resource_defs argument.

Removal: input_hydration_config and output_materialization_config

Use dagster_type_loader instead of input_hydration_config and dagster_type_materializer instead of output_materialization_config.

On DagsterType and type constructors in dagster_pandas use the loader argument instead of input_hydration_config and the materializer argument instead of dagster_type_materializer argument.

Removal: repository key in workspace YAML

We have removed the ability to specify a repository in your workspace using the repository: key. Use load_from: instead when specifying how to load the repositories in your workspace.

Deprecated: python_environment key in workspace YAML

The python_environment: key is now deprecated and will be removed in a future release.

Previously, when you wanted to load a repository location in your workspace using a different Python environment from Dagit’s Python environment, you needed to use a python_environment: key under load_from: instead of the python_file: or python_package: keys. Now, you can simply customize the executable_path in your workspace entries without needing to change to the
python_environment: key.

For example, the following workspace entry:

  - python_environment:
      executable_path: "/path/to/venvs/dagster-dev-3.7.6/bin/python"
      target:
        python_package:
          package_name: dagster_examples
          location_name: dagster_examples

should now be expressed as:

  - python_package:
      executable_path: "/path/to/venvs/dagster-dev-3.7.6/bin/python"
      package_name: dagster_examples
      location_name: dagster_examples

See our Workspaces Overview for more information and examples.

Removal: config_field property on definition classes

We have removed the property config_field on definition classes. Use config_schema instead.

Removal: System Storage

We have removed the system storage abstractions, i.e. SystemStorageDefinition and @system_storage (deprecated in 0.9.0).

Please note that the intermediate storage abstraction is also deprecated and will be removed in 0.11.0. Use IO managers instead.

  • We have removed the system_storage_defs argument (deprecated in 0.9.0) to ModeDefinition, in favor of intermediate_storage_defs.
  • We have removed the built-in system storages, e.g. default_system_storage_defs (deprecated in 0.9.0).

Removal: step_keys_to_execute

We have removed the step_keys_to_execute argument to reexecute_pipeline and reexecute_pipeline_iterator, in favor of step_selection. This argument accepts the Dagster selection syntax, so, for example, *solid_a+ represents solid_a, all of its upstream steps, and its immediate downstream steps.

Breaking Change: date_partition_range

Starting in 0.10.0, Dagster uses the pendulum library to ensure that schedules and partitions behave correctly with respect to timezones. As part of this change, the delta parameter to date_partition_range (which determined the time different between partitions and was a datetime.timedelta) has been replaced by a delta_range parameter (which must be a string that's a valid argument to the pendulum.period function, such as "days", "hours", or "months").

For example, the following partition range for a monthly partition set:

date_partition_range(
    start=datetime.datetime(2018, 1, 1),
    end=datetime.datetime(2019, 1, 1),
    delta=datetime.timedelta(months=1)
)

should now be expressed as:

date_partition_range(
    start=datetime.datetime(2018, 1, 1),
    end=datetime.datetime(2019, 1, 1),
    delta_range="months"
)

Breaking Change: PartitionSetDefinition.create_schedule_definition

When you create a schedule from a partition set using PartitionSetDefinition.create_schedule_definition, you now must supply a partition_selector argument that tells the scheduler which partition to use for a given schedule time.

We have added two helper functions, create_offset_partition_selector and identity_partition_selector, that capture two common partition selectors (schedules that execute at a fixed offset from the partition times, e.g. a schedule that creates the previous day's partition each morning, and schedules that execute at the same time as the partition times).

The previous default partition selector was last_partition, which didn't always work as expected when using the default scheduler and has been removed in favor of the two helper partition selectors above.

For example, a schedule created from a daily partition set that fills in each partition the next day at 10AM would be created as follows:

partition_set = PartitionSetDefinition(
    name='hello_world_partition_set',
    pipeline_name='hello_world_pipeline',
    partition_fn= date_partition_range(
        start=datetime.datetime(2021, 1, 1),
        delta_range="days",
        timezone="US/Central",
    )
    run_config_fn_for_partition=my_run_config_fn,
)

schedule_definition = partition_set.create_schedule_definition(
    "daily_10am_schedule",
    "0 10 * * *",
    partition_selector=create_offset_partition_selector(lambda d: d.subtract(hours=10, days=1))
    execution_timezone="US/Central",
)

Renamed: Helm values

Following convention in the Helm docs, we now camel case all of our Helm values. To migrate to 0.10.0, you'll need to update your values.yaml with the following renames:

  • pipeline_runpipelineRun
  • dagster_homedagsterHome
  • env_secretsenvSecrets
  • env_config_mapsenvConfigMaps

Restructured: scheduler in Helm values

When specifying the Dagster instance scheduler, rather than using a boolean field to switch between the current options of K8sScheduler and DagsterDaemonScheduler, we now require the scheduler type to be explicitly defined under scheduler.type. If the user specified scheduler.type has required config, additional fields will need to be specified under scheduler.config.

scheduler.type and corresponding scheduler.config values are enforced via JSON Schema.

For example, if your Helm values previously were set like this to enable the DagsterDaemonScheduler:

scheduler:
  k8sEnabled: false


You should instead have:

scheduler:
  type: DagsterDaemonScheduler

Restructured: celery and k8sRunLauncher in Helm values

celery and k8sRunLauncher now live under runLauncher.config.celeryK8sRunLauncher and runLauncher.config.k8sRunLauncher respectively. Now, to enable celery, runLauncher.type must equal CeleryK8sRunLauncher. To enable the vanilla K8s run launcher, runLauncher.type must equal K8sRunLauncher.

runLauncher.type and corresponding runLauncher.config values are enforced via JSON Schema.

For example, if your Helm values previously were set like this to enable the K8sRunLauncher:

celery:
  enabled: false

k8sRunLauncher:
  enabled: true
  jobNamespace: ~
  loadInclusterConfig: true
  kubeconfigFile: ~
  envConfigMaps: []
  envSecrets: []


You should instead have:

runLauncher:
  type: K8sRunLauncher
  config:
    k8sRunLauncher:
      jobNamespace: ~
      loadInclusterConfig: true
      kubeconfigFile: ~
      envConfigMaps: []
      envSecrets: []

New Helm defaults

By default, userDeployments is enabled and the runLauncher is set to the K8sRunLauncher. Along with the latter change, all message brokers (e.g. rabbitmq and redis) are now disabled by default.

If you were using the CeleryK8sRunLauncher, one of rabbitmq or redis must now be explicitly enabled in your Helm values.

Don't miss a new dagster release

NewReleases is sending notifications on new releases.