Major Changes:
- Breaking Change: Unique tasks ids have changed slightly. See more details below.
TaskTiger.purge_errored_tasks
has been added- Breaking Change: Internal refactoring moved a lot of code out of
__init__.py
. This should be transparent unless calling code uses internal variables/constants. If this affects you, it should be obvious as only imports should break. The fix is just to change the import path.
Breaking Change: Unique Task Ids Changed
This breaking change only affects periodic tasks, and scheduled unique tasks. See #146 for details.
Unique tasks rely on their ID to enforce uniqueness. The id is a hash of
function_name
, args
, and kwargs
. There were some cases where creating unique scheduled tasks manually using Task
objects or manually .delay()
-ing a periodic task would inconsistently use None
for args
/kwargs
instead of []
and {}
. With this release, args
and kwargs
will always be normalized to []
/{}
no matter how the Task
was created. Existing scheduled unique tasks will have to be migrated to use a consistent id format.
Here's a script that migrates task ids:
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""Re-schedule tasks that have malformed ids
Be sure to ``pip install click tasktiger limitlion tqdm`` as well.
Recommended steps:
* stop tasktiger workers
* upgrade tasktiger version
* run this script (without --apply) and check the logs to make sure it's
doing what you'd expect
* run this script (with --apply), which corrects all unique scheduled task
ids
* start tasktiger workers
"""
from __future__ import absolute_import, print_function, unicode_literals
import datetime
import json
import click
import limitlion
import tqdm
from redis import Redis
from tasktiger import Task, TaskTiger
from tasktiger._internal import SCHEDULED, gen_unique_id, queue_matches
# Connect to Redis (defaults to localhost)
redis_connection = Redis(decode_responses=True)
# Initialize tasktiger
tiger = TaskTiger(redis_connection)
# Initialize limitlion (optional, see throttling comment below)
limitlion.throttle_configure(redis_connection)
class JSONLineLog(object):
"""Safe and convenient json logger
Usage::
with JSONLineLog("my_file.json") as logger:
logger.write({'key': 'this is serialized as json'})
"""
def __init__(self, filename):
self.filename = filename
def __enter__(self):
self.file = open(self.filename, 'a')
self.file.__enter__()
return self
def write(self, log_entry):
print(json.dumps(log_entry), file=self.file)
def __exit__(self, exc_type, exc_val, exc_tb):
self.file.__exit__(exc_type, exc_val, exc_tb)
@click.command()
@click.option(
"--apply",
is_flag=True,
help="Actually make these changes. This is not a drill!",
default=False,
)
@click.option("--limit", help="Limit to processing N tasks", default=None)
@click.option(
"--only-queues",
help="Only process these queues (comma delimited)",
default=None,
)
@click.option(
"--exclude-queues",
help="Exclude these queues from processing (comma delimited)",
default=None,
)
def main(apply=False, limit=None, only_queues=None, exclude_queues=None):
# warn if not applying change
if not apply:
print('*** NO CHANGES WILL BE MADE')
print('To apply this migration run with --apply.')
else:
print('*** CHANGES WILL BE APPLIED')
print()
# If we're actually running this on a production redis instance, we
# probably don't want to iterate overall the keys as fast as we can.
# limitlion is a simple token-bucket throttle that gets the job done.
# This step is optional but recommended. If you don't want to use
# limitlion, maybe do something simple like ``lambda: time.sleep(.1)``
throttle = limitlion.throttle_wait('migrations', rps=10)
# actually do the migration
with JSONLineLog("task_id_migration.json") as migration_log:
print("Writing log to {}".format(migration_log.filename))
if limit:
limit = int(limit)
if only_queues:
only_queues = only_queues.split(",")
if exclude_queues:
exclude_queues = exclude_queues.split(",")
def unique_scheduled_tasks():
"""Yields all unique scheduled tasks"""
queues_with_scheduled_tasks = tiger.connection.smembers(
tiger._key(SCHEDULED)
)
for queue in queues_with_scheduled_tasks:
if not queue_matches(
queue,
only_queues=only_queues,
exclude_queues=exclude_queues,
):
continue
skip = 0
total_tasks = None
task_limit = 5000
while total_tasks is None or skip < total_tasks:
throttle()
total_tasks, tasks = Task.tasks_from_queue(
tiger, queue, SCHEDULED, skip=skip, limit=task_limit
)
for task in tasks:
if task.unique:
yield task
skip += task_limit
# note that tqdm is completely optional here, but shows a nice progress
# bar.
total_with_wrong_id = 0
total_processed = 0
for idx, task in enumerate(tqdm.tqdm(unique_scheduled_tasks())):
# generate the new correct id
correct_id = gen_unique_id(
task.serialized_func, task.args, task.kwargs
)
if task.id != correct_id:
total_with_wrong_id += 1
when = datetime.datetime.fromtimestamp(
tiger.connection.zscore(
tiger._key(SCHEDULED, task.queue), task.id
)
)
migration_log.write(
{
"incorrect_task_id": task.id,
"correct_task_id": correct_id,
"serialized_func": task.serialized_func,
"queue": task.queue,
"ts": datetime.datetime.utcnow(),
"apply": apply,
"scheduled_at": when,
}
)
# Reschedule the task with the correct id. There's a 10 second
# buffer here in case any tasktigers are still running so we're
# not racing with them.
if apply and (
when - datetime.datetime.utcnow()
) > datetime.timedelta(seconds=10):
new_task = task.clone()
new_task._data["id"] = correct_id
new_task.delay(when=when)
task.cancel()
throttle()
total_processed = idx + 1
if limit and total_processed >= limit:
break
print(
'Processed {} tasks, found {} with incorrect ids'.format(
total_processed, total_with_wrong_id
)
)
print("Done")
if __name__ == "__main__":
main()
Minor Changes
- The codebase is now formatted with
black
- Some additional testing infrastructure has been added to make local development/testing easier