django - Celery chain monitoring: the easy way -


using django-celery-3.0.17, celery-3.0.21 , django-1.5.1, i'm trying monitor chain execution. found solution seems little bit weird me, i'm searching easier solution if possible. here's code:

views.py

def runcod(request):     runfuntask = runfunctions.delay(shpid, codid, stepvalues, bboxtuple);     getrunfunstatus.delay(runfuntask)     return render_to_response('tss/append_runcod.html',                              {'runfuntask': runfuntask},                               context_instance=requestcontext(request))  def getprogresscod(request):     task = asyncresult(taskid)     currstep = task.result['step']     totsteps = task.result['total']      if task.status == 'success':         task.revoke() # manually terminate runfunctions task      response = dumps({'status':task.status,'currstep':currstep,'totsteps':totsteps})     return httpresponse(response, mimetype='application/json') 

tasks.py

@task() def runfunctions(shpid, codid, stepvalues, bboxtuple):     # ... code define functions launch ...      stepstolaunch = [fun1, fun2, fun3, fun4, fun5]     chainid = chain(stepstolaunch).apply_async()     chainasyncobjects = [node node in reversed(list(nodes(chainid)))]      current_task.update_state(state="progress", meta={'step':1, 'total':numsteps})      t in range(10800): # same max duration of celery task         i, step in enumerate(chainasyncobjects):             currstep = i+1             if step.state == 'pending':                 current_task.update_state(state="progress", meta={'step':currstep, 'total':numsteps})                 break             if step.state == 'success':                 if currstep == numsteps:                     current_task.update_state(state="success", meta={'step':currstep, 'total':numsteps})                     # task terminated (revoked) getprogresscod()             if step.state == 'failure':                 return     sleep(1) 

cods.js

function getprogresscod(taskid){     var aoistatus, allstatus, currstep, totsteps;     var interval = 2000; // perform ajax call every tot milliseconds      var refreshid = setinterval(function(){         $.ajax({             type: 'get',             url: 'getprogresscod/',             data:{'taskid': taskid,},             success: function(response){},         });     }, interval); } 

this what's happening:

  1. runcod() launches async task runfunctions();
  2. runfunctions() creates , launches chain of subtasks
  3. with runfunctions() final loop update every second own 'progress' status looking @ single chain subtask status. (reference 1 , 2)
  4. to know what's happening, user informed getprogresscod() javascript function, makes every 2 seconds ajax request getprocesscod() python function
  5. getprocesscod() python function looks @ runfunctions() status, , when 'success', revokes (terminate) runfunctions() execution.

i did not find way, because if return runfunctions() when every subtask of chain done inside of final loop, cannot notify user 'success' status user because getprocesscod() none object performing task.status

any advice , suggestion realy appreciated, thanks!

i've solved deleting getprogresscod() py function , inserting if statement inside runcod(). in way can monitor runfunctions() using runcod() , when succesfully terminated, wait 5 seconds result , close task return. remaining doubt if waiting approach correct or not... here's modified code:

views.py

def runcod(request):     taskid = request.get['taskid']     if taskid != '': # if task running         task = asyncresult(taskid)         currstep = task.result['step']         totsteps = task.result['total']         response = dumps({'status':task.status,                           'currstep':currstep,                           'totsteps':totsteps})         return httpresponse(response, mimetype='application/json')     else: # if task must started     runfuntask = runfunctions.delay(shpid, codid, stepvalues, bboxtuple);     getrunfunstatus.delay(runfuntask)     return render_to_response('tss/append_runcod.html',                              {'runfuntask': runfuntask},                               context_instance=requestcontext(request)) 

tasks.py

@task() def runfunctions(shpid, codid, stepvalues, bboxtuple):     # ... code define functions launch ...     stepstolaunch = [fun1, fun2, fun3, fun4, fun5]     chainid = chain(stepstolaunch).apply_async()     chainasyncobjects = [node node in reversed(list(nodes(chainid)))]      current_task.update_state(state="progress", meta={'step':1, 'total':numsteps})      t in range(10800): # same max duration of celery task         i, step in enumerate(chainasyncobjects):             currstep = i+1             if step.state == 'pending':                 current_task.update_state(state="progress", meta={'step':currstep, 'total':numsteps})                 break             if step.state == 'success':                 if currstep == numsteps:                     current_task.update_state(state="success", meta={'step':currstep, 'total':numsteps})                     sleep(5) # wait before stop task, in order javascript result!                     return              if step.state == 'failure':                 return         sleep(1) 

cods.js

function getprogresscod(taskid){     var interval = 2000; // perform ajax call every tot milliseconds      var refreshid = setinterval(function(){         $.ajax({             type: 'get',             url: 'runcod/',             data:{'taskid': taskid,},             success: function(response){},         });     }, interval); } 

Comments