OiO.lk Community platform!

Oio.lk is an excellent forum for developers, providing a wide range of resources, discussions, and support for those in the developer community. Join oio.lk today to connect with like-minded professionals, share insights, and stay updated on the latest trends and technologies in the development field.
  You need to log in or register to access the solved answers to this problem.
  • You have reached the maximum number of guest views allowed
  • Please register below to remove this limitation

Python asyncio: Concurrent write tasks frequently canceled, how to ensure alternating execution?

  • Thread starter Thread starter user25772377
  • Start date Start date
U

user25772377

Guest
I'm developing an asynchronous cache system in Python using asyncio. I have two concurrent write operations that I want to alternate, but currently, only one side tasks are frequently canceled

Here is my current implementation in python3.10:

Code:
import asyncio
from collections import defaultdict
from contextlib import asynccontextmanager
from copy import deepcopy
from datetime import datetime


class LocalCacheUserToken:
    _g: dict[str, dict] = {}
    _tasks: dict[str, asyncio.Task] = {}
    _locks: defaultdict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
    _write_lock = asyncio.Lock()

    @classmethod
    @asynccontextmanager
    async def acquire_lock(cls, key: str):
        async with cls._locks[key]:
            yield

    @classmethod
    async def get(cls, key: str):
        async with cls.acquire_lock(key):
            value = deepcopy(cls._g.get(key, None))
        return value

    @classmethod
    async def setex(cls, key: str, time: int, value: dict):
        async with cls._write_lock:
            async with cls.acquire_lock(key):
                print(f"[{datetime.now()}] {value=} get lock")
                if key in cls._tasks:
                    cls._tasks[key].cancel()

                cls._g[key] = value

            task = asyncio.create_task(cls._delay_delete(key, time, value))
            cls._tasks[key] = task

    @classmethod
    async def _delay_delete(cls, key, time, value):
        try:
            await asyncio.sleep(time)
            async with cls.acquire_lock(key):
                del cls._g[key]
            cls._tasks.pop(key, None)
        except asyncio.CancelledError:
            print(f"[{datetime.now()}] Deletion task for {key=}, {value=} was cancelled")


async def p(a: LocalCacheUserToken):
    while True:
        print(f"[{datetime.now()}] {await a.get('test')=}")
        await asyncio.sleep(1)


async def write(a: LocalCacheUserToken, k: str):
    for i in range(20):
        await a.setex("test", 5, {'value': f'{k}->{i}'})
        await asyncio.sleep(0.01)


# 示例使用
async def main():
    asyncio.create_task(p(LocalCacheUserToken))
    asyncio.create_task(write(LocalCacheUserToken, k='test1'))
    asyncio.create_task(write(LocalCacheUserToken, k='test2'))

    await asyncio.sleep(10)


asyncio.run(main())

outputs:

Code:
[2024-06-28 01:42:35.054703] await a.get('test')=None
[2024-06-28 01:42:35.054703] value={'value': 'test1->0'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->0'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test1->1'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->1'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->0'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->2'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->2'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->1'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->3'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->3'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->2'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->4'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->4'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->3'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->5'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->5'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->4'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->6'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->6'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->5'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->7'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->7'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->6'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->8'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->8'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->7'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->9'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->9'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->8'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->10'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->10'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->9'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->11'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->11'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->10'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->12'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->12'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->11'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->13'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->13'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->12'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->14'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->14'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->13'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->15'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->15'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->14'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->16'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->16'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->15'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->17'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->17'} get lock
[2024-06-28 01:42:35.070327] Deletion task for key='test', value={'value': 'test2->16'} was cancelled
[2024-06-28 01:42:35.070327] value={'value': 'test1->18'} get lock
[2024-06-28 01:42:35.070327] value={'value': 'test2->18'} get lock
[2024-06-28 01:42:35.070327] Deletion task for key='test', value={'value': 'test2->17'} was cancelled
[2024-06-28 01:42:35.070327] value={'value': 'test1->19'} get lock
[2024-06-28 01:42:35.070327] value={'value': 'test2->19'} get lock
[2024-06-28 01:42:35.070327] Deletion task for key='test', value={'value': 'test2->18'} was cancelled
[2024-06-28 01:42:36.056942] await a.get('test')={'value': 'test2->19'}
[2024-06-28 01:42:37.063445] await a.get('test')={'value': 'test2->19'}
[2024-06-28 01:42:38.064713] await a.get('test')={'value': 'test2->19'}
[2024-06-28 01:42:39.069732] await a.get('test')={'value': 'test2->19'}
[2024-06-28 01:42:40.077492] await a.get('test')=None
[2024-06-28 01:42:41.087926] await a.get('test')=None
[2024-06-28 01:42:42.103530] await a.get('test')=None
[2024-06-28 01:42:43.107391] await a.get('test')=None
[2024-06-28 01:42:44.108523] await a.get('test')=None

What is reason of this problem and how can i fix it?
<p>I'm developing an asynchronous cache system in Python using asyncio. I have two concurrent write operations that I want to alternate, but currently, only one side tasks are frequently canceled</p>
<p>Here is my current implementation in python3.10:</p>
<pre class="lang-py prettyprint-override"><code>import asyncio
from collections import defaultdict
from contextlib import asynccontextmanager
from copy import deepcopy
from datetime import datetime


class LocalCacheUserToken:
_g: dict[str, dict] = {}
_tasks: dict[str, asyncio.Task] = {}
_locks: defaultdict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
_write_lock = asyncio.Lock()

@classmethod
@asynccontextmanager
async def acquire_lock(cls, key: str):
async with cls._locks[key]:
yield

@classmethod
async def get(cls, key: str):
async with cls.acquire_lock(key):
value = deepcopy(cls._g.get(key, None))
return value

@classmethod
async def setex(cls, key: str, time: int, value: dict):
async with cls._write_lock:
async with cls.acquire_lock(key):
print(f"[{datetime.now()}] {value=} get lock")
if key in cls._tasks:
cls._tasks[key].cancel()

cls._g[key] = value

task = asyncio.create_task(cls._delay_delete(key, time, value))
cls._tasks[key] = task

@classmethod
async def _delay_delete(cls, key, time, value):
try:
await asyncio.sleep(time)
async with cls.acquire_lock(key):
del cls._g[key]
cls._tasks.pop(key, None)
except asyncio.CancelledError:
print(f"[{datetime.now()}] Deletion task for {key=}, {value=} was cancelled")


async def p(a: LocalCacheUserToken):
while True:
print(f"[{datetime.now()}] {await a.get('test')=}")
await asyncio.sleep(1)


async def write(a: LocalCacheUserToken, k: str):
for i in range(20):
await a.setex("test", 5, {'value': f'{k}->{i}'})
await asyncio.sleep(0.01)


# 示例使用
async def main():
asyncio.create_task(p(LocalCacheUserToken))
asyncio.create_task(write(LocalCacheUserToken, k='test1'))
asyncio.create_task(write(LocalCacheUserToken, k='test2'))

await asyncio.sleep(10)


asyncio.run(main())
</code></pre>
<p>outputs:</p>
<pre><code>[2024-06-28 01:42:35.054703] await a.get('test')=None
[2024-06-28 01:42:35.054703] value={'value': 'test1->0'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->0'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test1->1'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->1'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->0'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->2'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->2'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->1'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->3'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->3'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->2'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->4'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->4'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->3'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->5'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->5'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->4'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->6'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->6'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->5'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->7'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->7'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->6'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->8'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->8'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->7'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->9'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->9'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->8'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->10'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->10'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->9'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->11'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->11'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->10'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->12'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->12'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->11'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->13'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->13'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->12'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->14'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->14'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->13'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->15'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->15'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->14'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->16'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->16'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->15'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->17'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->17'} get lock
[2024-06-28 01:42:35.070327] Deletion task for key='test', value={'value': 'test2->16'} was cancelled
[2024-06-28 01:42:35.070327] value={'value': 'test1->18'} get lock
[2024-06-28 01:42:35.070327] value={'value': 'test2->18'} get lock
[2024-06-28 01:42:35.070327] Deletion task for key='test', value={'value': 'test2->17'} was cancelled
[2024-06-28 01:42:35.070327] value={'value': 'test1->19'} get lock
[2024-06-28 01:42:35.070327] value={'value': 'test2->19'} get lock
[2024-06-28 01:42:35.070327] Deletion task for key='test', value={'value': 'test2->18'} was cancelled
[2024-06-28 01:42:36.056942] await a.get('test')={'value': 'test2->19'}
[2024-06-28 01:42:37.063445] await a.get('test')={'value': 'test2->19'}
[2024-06-28 01:42:38.064713] await a.get('test')={'value': 'test2->19'}
[2024-06-28 01:42:39.069732] await a.get('test')={'value': 'test2->19'}
[2024-06-28 01:42:40.077492] await a.get('test')=None
[2024-06-28 01:42:41.087926] await a.get('test')=None
[2024-06-28 01:42:42.103530] await a.get('test')=None
[2024-06-28 01:42:43.107391] await a.get('test')=None
[2024-06-28 01:42:44.108523] await a.get('test')=None
</code></pre>
<p>What is reason of this problem and how can i fix it?</p>
 

Latest posts

S
Replies
0
Views
1
Souvik Manna
S
S
Replies
0
Views
1
Shelling ford
S
Top