OiO.lk Community platform!

Oio.lk is an excellent forum for developers, providing a wide range of resources, discussions, and support for those in the developer community. Join oio.lk today to connect with like-minded professionals, share insights, and stay updated on the latest trends and technologies in the development field.
  You need to log in or register to access the solved answers to this problem.
  • You have reached the maximum number of guest views allowed
  • Please register below to remove this limitation

Subprocess Task Is not teriminating in the `@celery_app.task`

  • Thread starter Thread starter DarkHorse_906
  • Start date Start date
D

DarkHorse_906

Guest
I would like to stop the celery task using the celery ID. I am executing the subprocess task to run the spider for scrapping. So, when we stop the celery task the sub-process task also terminates.

But actually the Subprocess task is not terminating. I use the os.kill(pid,signal) but it is not terminating and the celery tasks continues running in the docker celery- Container.

Celery_Worker​


Code:
@celery_app.task(bind=True, name='jina_fetch_urls_celery_task', base=AbortableTask)
def jina_fetch_urls_celery_task(self, run_id: PydanticObjectId, site: str):
    try:
        for i in range(5):
            if self.is_aborted():
                return 'Task stopped!'
            logger.info(f"Progress: {i}")
            self.update_state(state='PROGRESS', meta={'progress': i})
            sleep(1)
        
        process = subprocess.Popen(['scrapy', 'crawl', 'base_spider', '-a', f'start_url={site}', '-a', f'task_id={str(run_id)}'], cwd='/code/jina_scrapy')
        
        # Save the subprocess ID in task metadata
        subprocess_id = process.pid
        self.update_state(state='PROGRESS', meta={'subprocess_id': subprocess_id})
        logger.info(f"Subprocess ID set in Celery: {subprocess_id}")
        process.wait()  # Wait for the subprocess to finish
        return {'status': 'DONE', 'subprocess_id': subprocess_id

Run/Endpoint:​


Code:
@router.post("/{run_id}/stop", status_code=201)
async def stop_run(run_id: str = Path(..., title="The ID of the run to stop")) -> Any:
    run = await retrieve_run(run_id)
    logger.info(f"Run ID: {run_id}")
    logger.info(f"Task ID: {run.task_id}")

    if not run or not run.task_id:
        raise HTTPException(status_code=404, detail="Task not found")

    task_id = run.task_id
    celery_task = AsyncResult(task_id)
    task_state = celery_task.state
    logger.info(f"Task state: {task_state}")

    if task_state in ['SUCCESS', 'FAILURE', 'REVOKED']:
        raise HTTPException(status_code=400, detail=f"Task already {task_state.lower()}")
    elif task_state in ['PENDING', 'STARTED', 'PROGRESS']:
        logger.info(f"Task state is {task_state}")
        
        # Revoke the task
        celery_task.revoke(terminate=True, signal='SIGKILL')
        
        # Retrieve the subprocess_id from the task's info
        meta = celery_task.info
        subprocess_id = None
        
        if meta and 'subprocess_id' in meta:
            subprocess_id = meta['subprocess_id']
            logger.info(f"Subprocess ID found in task info: {subprocess_id}")
        else:
            logger.error("Subprocess ID not found in task info")

        if subprocess_id:
            os.kill(os.getpid(subprocess_id), signal.SIGUSR1)
            logger.info(f"Subprocess with ID {subprocess_id} terminated.")
        else:
            logger.error("Failed to terminate subprocess. ID not found.")

        update_data = {
            "status": "stopped",
            "status_description": "The task was stopped by the user",
            "finished_at": datetime.now()
        }
        await patch_run(run_id, update_data)
        return {"message": "Task stopped successfully"}
    else:
        return {"message": "Task NOT stopped successfully"}
<p>I would like to stop the celery task using the <code>celery ID</code>. I am executing the subprocess task to run the spider for scrapping. So, when we stop the celery task the sub-process task also terminates.</p>
<p>But actually the Subprocess task is not terminating. I use the <code>os.kill(pid,signal)</code> but it is not terminating and the celery tasks continues running in the docker <code>celery- Container</code>.</p>
<h2>Celery_Worker</h2>
<pre class="lang-py prettyprint-override"><code>@celery_app.task(bind=True, name='jina_fetch_urls_celery_task', base=AbortableTask)
def jina_fetch_urls_celery_task(self, run_id: PydanticObjectId, site: str):
try:
for i in range(5):
if self.is_aborted():
return 'Task stopped!'
logger.info(f"Progress: {i}")
self.update_state(state='PROGRESS', meta={'progress': i})
sleep(1)

process = subprocess.Popen(['scrapy', 'crawl', 'base_spider', '-a', f'start_url={site}', '-a', f'task_id={str(run_id)}'], cwd='/code/jina_scrapy')

# Save the subprocess ID in task metadata
subprocess_id = process.pid
self.update_state(state='PROGRESS', meta={'subprocess_id': subprocess_id})
logger.info(f"Subprocess ID set in Celery: {subprocess_id}")
process.wait() # Wait for the subprocess to finish
return {'status': 'DONE', 'subprocess_id': subprocess_id
</code></pre>
<h2>Run/Endpoint:</h2>
<pre class="lang-py prettyprint-override"><code>@router.post("/{run_id}/stop", status_code=201)
async def stop_run(run_id: str = Path(..., title="The ID of the run to stop")) -> Any:
run = await retrieve_run(run_id)
logger.info(f"Run ID: {run_id}")
logger.info(f"Task ID: {run.task_id}")

if not run or not run.task_id:
raise HTTPException(status_code=404, detail="Task not found")

task_id = run.task_id
celery_task = AsyncResult(task_id)
task_state = celery_task.state
logger.info(f"Task state: {task_state}")

if task_state in ['SUCCESS', 'FAILURE', 'REVOKED']:
raise HTTPException(status_code=400, detail=f"Task already {task_state.lower()}")
elif task_state in ['PENDING', 'STARTED', 'PROGRESS']:
logger.info(f"Task state is {task_state}")

# Revoke the task
celery_task.revoke(terminate=True, signal='SIGKILL')

# Retrieve the subprocess_id from the task's info
meta = celery_task.info
subprocess_id = None

if meta and 'subprocess_id' in meta:
subprocess_id = meta['subprocess_id']
logger.info(f"Subprocess ID found in task info: {subprocess_id}")
else:
logger.error("Subprocess ID not found in task info")

if subprocess_id:
os.kill(os.getpid(subprocess_id), signal.SIGUSR1)
logger.info(f"Subprocess with ID {subprocess_id} terminated.")
else:
logger.error("Failed to terminate subprocess. ID not found.")

update_data = {
"status": "stopped",
"status_description": "The task was stopped by the user",
"finished_at": datetime.now()
}
await patch_run(run_id, update_data)
return {"message": "Task stopped successfully"}
else:
return {"message": "Task NOT stopped successfully"}
</code></pre>
 

Latest posts

A
Replies
0
Views
1
Ashrik Ahamed
A
A
Replies
0
Views
1
Ashrik Ahamed
A

Online statistics

Members online
1
Guests online
3
Total visitors
4
Top