Skip to content Skip to sidebar Skip to footer

Initializing Different Celery Workers With Different Values

I am using celery to run long running tasks on Hadoop. Each task executes a Pig script on Hadoop which runs for about 30 mins - 2 hours. My current Hadoop setup has 4 queues a,b,c,

Solution 1:

What I usually do is, after starting the workers (the tasks are not executed) in another script (say manage.py) I add commands with parameters to start specific tasks or tasks with different arguments.

in manager.py:

from tasks import some_task

@click.command
def run_task(params):
    some_task.apply_async(params)

And this will start the tasks as needed.


Solution 2:

Hope this helps someone.

Multiple problems needed solving for this problem.

The first step involved adding support in celery for the custom parameter. If this is not done, celery will complain that it doesn't understand the parameter.

Since I am running celery with Flask, I initialize celery like so.

def configure_celery():
    app.config.update(
        CELERY_BROKER_URL='amqp://:@localhost:5672',
        RESULT_BACKEND='db+mysql://root:@localhost:3306/<database_name>'            
    )
    celery = Celery(app.import_name, backend=app.config['RESULT_BACKEND'],
                    broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    return celery

I call this function to initialize celery and store it in a variable called celery.

celery = configure_celery()

To add the custom parameter you need to do the following.

def add_hadoop_queue_argument_to_worker(parser):
    parser.add_argument(
        '--hadoop-queue', help='Hadoop queue to be used by the worker'
    )

The celery used below is the one we obtained from above steps.

celery.user_options['worker'].add(add_hadoop_queue_argument_to_worker)

The next step would be to make this argument accessible in the worker. To do that follow these steps.

class HadoopCustomWorkerStep(bootsteps.StartStopStep):

    def __init__(self, worker, **kwargs):
        worker.app.hadoop_queue = kwargs['hadoop_queue']

Inform celery to use this class for creating the workers.

celery.steps['worker'].add(HadoopCustomWorkerStep)

The tasks should now be able to access the variables.

@app.task(bind=True)
def print_hadoop_queue_from_config(self):
    print self.app.hadoop_queue

Verify it by running the worker on the command-line.

celery -A app.celery worker --concurrency=1 --hadoop-queue=A -n aworker@%h
celery -A app.celery worker --concurrency=1 --hadoop-queue=B -n bworker@%h
celery -A app.celery worker --concurrency=1 --hadoop-queue=C -n cworker@%h
celery -A app.celery worker --concurrency=1 --hadoop-queue=default -n defaultworker@%h

Post a Comment for "Initializing Different Celery Workers With Different Values"