October 22, 2024
Chicago 12, Melborne City, USA
python

Stop and resume process by semaphore


I wish to implement an API that utilize asyncio events in order to stop/resume a asyncio task. The API will support the following methods:

  • /import (start)
  • /stop
  • /resume

Main problem is happening when invoking the method /stop:

File "/src/srv.py", line 131, in __stop
    loop.run_until_complete(self.__proc.stop())
RuntimeError: This event loop is already running
/lib/python3.10/site-packages/uvicorn/protocols/http/h11_impl.py:-1: RuntimeWarning: coroutine 'Application.stop' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

I’ve tried use await in run_until_complete, but have output:

start
# here i call /stop
end
start_stop
end_stop

i expecting this output

start
# here i call /stop
start_stop
# here i call /resume
end

I’ve used the following code:

srv.py

import asyncio
from fastapi import FastAPI, APIRouter
from app import Application

class Service:
    def __init__(self):
        self.__app = FastAPI(title="api")
        self.__router = APIRouter()
        self.__router.add_api_route('/import', self.__import, methods=['POST'])
        self.__router.add_api_route('/stop', self.__stop, methods=['POST', 'GET'])
        self.__router.add_api_route('/resume', self.__resume, methods=['POST', 'GET'])
        self.__app.include_router(self.__router)
        self.__proc = Application()

    async def __import(self):
        loop = asyncio.get_running_loop()
        loop.create_task(self.__proc.import_data())

    async def __stop(self) -> None:
        loop = asyncio.get_running_loop()
        loop.run_until_complete(self.__proc.stop())

    async def __resume(self) -> None:
        self.__proc.resume()

    def run(self) -> None:
        run(app=self.__app, host="0.0.0.0", port=8000)


if __name__ == '__main__':
    Service().run()

app.py

import asyncio


class Application:
    def __init__(self):
        self.__state = True
        self.__semaphore = asyncio.Semaphore(1)

    async def import_data(self):
        while True:
            async with self.__semaphore:
                print('start')
                await asyncio.sleep(5)
                print('end')

    def resume(self):
        self.__state = True

    async def stop(self):
        self.__state = False
        while not self.__state:
            async with self.__semaphore:
                print('start_stop')
                await asyncio.sleep(5)
                print('end_stop')



You need to sign in to view this answers

Leave feedback about this

  • Quality
  • Price
  • Service

PROS

+
Add Field

CONS

+
Add Field
Choose Image
Choose Video