github closeio/tasktiger v0.10.1

latest releases: v0.21.0, v0.20.0, v0.19.5...
4 years ago

Major Changes:

  • Breaking Change: Unique tasks ids have changed slightly. See more details below.
  • TaskTiger.purge_errored_tasks has been added
  • Breaking Change: Internal refactoring moved a lot of code out of This should be transparent unless calling code uses internal variables/constants. If this affects you, it should be obvious as only imports should break. The fix is just to change the import path.

Breaking Change: Unique Task Ids Changed

This breaking change only affects periodic tasks, and scheduled unique tasks. See #146 for details.

Unique tasks rely on their ID to enforce uniqueness. The id is a hash of function_name, args, and kwargs. There were some cases where creating unique scheduled tasks manually using Task objects or manually .delay()-ing a periodic task would inconsistently use None for args/kwargs instead of [] and {}. With this release, args and kwargs will always be normalized to []/{} no matter how the Task was created. Existing scheduled unique tasks will have to be migrated to use a consistent id format.

Here's a script that migrates task ids:

# -*- coding: utf-8 -*-

"""Re-schedule tasks that have malformed ids

Be sure to ``pip install click tasktiger limitlion tqdm`` as well.

Recommended steps:
    * stop tasktiger workers
    * upgrade tasktiger version
    * run this script (without --apply) and check the logs to make sure it's
      doing what you'd expect
    * run this script (with --apply), which corrects all unique scheduled task
    * start tasktiger workers

from __future__ import absolute_import, print_function, unicode_literals

import datetime
import json

import click
import limitlion
import tqdm
from redis import Redis
from tasktiger import Task, TaskTiger
from tasktiger._internal import SCHEDULED, gen_unique_id, queue_matches

# Connect to Redis (defaults to localhost)
redis_connection = Redis(decode_responses=True)

# Initialize tasktiger
tiger = TaskTiger(redis_connection)

# Initialize limitlion (optional, see throttling comment below)

class JSONLineLog(object):
    """Safe and convenient json logger


        with JSONLineLog("my_file.json") as logger:
            logger.write({'key': 'this is serialized as json'})

    def __init__(self, filename):
        self.filename = filename

    def __enter__(self):
        self.file = open(self.filename, 'a')
        return self

    def write(self, log_entry):
        print(json.dumps(log_entry), file=self.file)

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.file.__exit__(exc_type, exc_val, exc_tb)

    help="Actually make these changes. This is not a drill!",
@click.option("--limit", help="Limit to processing N tasks", default=None)
    help="Only process these queues (comma delimited)",
    help="Exclude these queues from processing (comma delimited)",
def main(apply=False, limit=None, only_queues=None, exclude_queues=None):
    # warn if not applying change
    if not apply:
        print('*** NO CHANGES WILL BE MADE')
        print('To apply this migration run with --apply.')
        print('*** CHANGES WILL BE APPLIED')

    # If we're actually running this on a production redis instance, we
    # probably don't want to iterate overall the keys as fast as we can.
    # limitlion is a simple token-bucket throttle that gets the job done.
    # This step is optional but recommended. If you don't want to use
    # limitlion, maybe do something simple like ``lambda: time.sleep(.1)``
    throttle = limitlion.throttle_wait('migrations', rps=10)

    # actually do the migration
    with JSONLineLog("task_id_migration.json") as migration_log:
        print("Writing log to {}".format(migration_log.filename))

        if limit:
            limit = int(limit)
        if only_queues:
            only_queues = only_queues.split(",")
        if exclude_queues:
            exclude_queues = exclude_queues.split(",")

        def unique_scheduled_tasks():
            """Yields all unique scheduled tasks"""
            queues_with_scheduled_tasks = tiger.connection.smembers(
            for queue in queues_with_scheduled_tasks:
                if not queue_matches(

                skip = 0
                total_tasks = None
                task_limit = 5000
                while total_tasks is None or skip < total_tasks:
                    total_tasks, tasks = Task.tasks_from_queue(
                        tiger, queue, SCHEDULED, skip=skip, limit=task_limit
                    for task in tasks:
                        if task.unique:
                            yield task
                    skip += task_limit

        # note that tqdm is completely optional here, but shows a nice progress
        # bar.
        total_with_wrong_id = 0
        total_processed = 0
        for idx, task in enumerate(tqdm.tqdm(unique_scheduled_tasks())):
            # generate the new correct id
            correct_id = gen_unique_id(
                task.serialized_func, task.args, task.kwargs
            if != correct_id:
                total_with_wrong_id += 1
                when = datetime.datetime.fromtimestamp(
                        tiger._key(SCHEDULED, task.queue),
                        "correct_task_id": correct_id,
                        "serialized_func": task.serialized_func,
                        "queue": task.queue,
                        "ts": datetime.datetime.utcnow(),
                        "apply": apply,
                        "scheduled_at": when,
                # Reschedule the task with the correct id. There's a 10 second
                # buffer here in case any tasktigers are still running so we're
                # not racing with them.
                if apply and (
                    when - datetime.datetime.utcnow()
                ) > datetime.timedelta(seconds=10):
                    new_task = task.clone()
                    new_task._data["id"] = correct_id

            total_processed = idx + 1
            if limit and total_processed >= limit:

            'Processed {} tasks, found {} with incorrect ids'.format(
                total_processed, total_with_wrong_id


if __name__ == "__main__":

Minor Changes

  • The codebase is now formatted with black
  • Some additional testing infrastructure has been added to make local development/testing easier

Don't miss a new tasktiger release

NewReleases is sending notifications on new releases.