OiO.lk Community platform!

Oio.lk is an excellent forum for developers, providing a wide range of resources, discussions, and support for those in the developer community. Join oio.lk today to connect with like-minded professionals, share insights, and stay updated on the latest trends and technologies in the development field.
  You need to log in or register to access the solved answers to this problem.
  • You have reached the maximum number of guest views allowed
  • Please register below to remove this limitation

Communicating between Queues in Asyncio not Working as Expected

  • Thread starter Thread starter Coldchain9
  • Start date Start date
C

Coldchain9

Guest
I'm working on a pattern where I communicate amongst multiple queues to process items along a pipeline. I am using sentinels to communicate between the queues when to stop working, however in the following code, I am seeing results that confuse me.

When reading from the write_q in write_task() I see the first value come in as the sentinel None instead of the tasks in the order they were placed in response_task(). If I understand right, write_task() should receive the items in order and process them in order as the tasks are created.

Also, when printing the qsize() in write_task() after I find the sentinel, it says there are 0 items, however when printing back in main it seems that the qsize() of write_q still has 2 items. I've read somewhere that aiofiles uses run_in_executor() which means there might be a divergence of where the queue is hand

Most of the below code is boilerplate to illustrate the actual scenario on why my code continues to block infinitely.

Code:
import asyncio
import aiohttp
import aiofiles
import aiocsv
import json

async def fetch(req: dict) -> dict:
    # Make the request
    async with aiohttp.ClientSession() as session:
        try:
            async with session.request("POST", url=req["url"], data=json.dumps(req["data"]), headers=req["headers"]) as response:
                payload = await response.json()
                response.raise_for_status()
                print(f"INFO: response status was: {response.status}")

                # Put response into queue to be written to file
                return payload
        except Exception as err:
            print(f"ERROR: error making request: {err}")

async def task(l: list,  request_q: asyncio.Queue) -> None:

    # Read tasks from source of data
    for i in l:
        # Put a request task into the queue
        req: dict = {
            "headers": {"Accept": "application/json"},
            "url": "https://httpbin.org/post",
            "data": i
        }
        await request_q.put(
            asyncio.create_task(fetch(req))#, response_q=response_q))
        )

    # Sentinel value to signal we are done receiving from source
    await request_q.put(None)

async def request_task(request_q: asyncio.Queue, response_q: asyncio.Queue) -> None:
    while True:

        # Retrieve necessary data to make request 
        req = await request_q.get()
        print(f"INFO: request from request_q: {req}")

        # If we received sentinel for tasks, pass message to next queue
        if not req:
            print(f"INFO: request in request_task: {req}")
            print("INFO: received sentinel from request_q")
            request_q.task_done()
            await response_q.put(None)
            break
        
        # Make the request which will put data into the response queue
        resp = await req
        print(f"INFO: response in request_task: {resp}")
        await response_q.put(resp)
        request_q.task_done()

async def response_task(response_q: asyncio.Queue, write_q: asyncio.Queue) -> None:
    while True:

        # Retrieve response
        resp = await response_q.get()

        # If we received sentinel for tasks, pass message to next queue
        if not resp:
            print("INFO: received sentinel from response_q")
            response_q.task_done()
            await write_q.put(None)
            break

        await write_q.put(resp)
        response_q.task_done()

async def write_task(write_q: asyncio.Queue) -> None:

    headers: bool = True
    while True:
        print(f"INFO: write_q address in write_task: {id(write_q)}")
        async with aiofiles.open("file.csv", mode="a+", newline='') as f:
            w = aiocsv.AsyncWriter(f)

            # Get data out of the queue to write it
            data = await write_q.get()
            print(f"INFO: data in write_task: {data}")

            if not data:
                print(f"INFO: Found sentinel in write_task, queue size was: {write_q.qsize()}")
                write_q.task_done()
                await f.flush()
                break

            if headers:
                await w.writerow([
                    "status",
                    "data",
                ])
                headers = False

            # Write the data from the response
            await w.writerow([
                "200",
                json.dumps(data)
            ])
            await f.flush()

            print("INFO: finished write task")
            write_q.task_done()

async def main() -> None:

    # Create fake data to POST
    items: list[str] = [["hello", "world"], ["asyncio", "test"]] * 1 

    # Queues for orchestrating 
    request_q = asyncio.Queue()
    response_q = asyncio.Queue()
    write_q = asyncio.Queue()

    # one producer
    producer = asyncio.create_task(
        task(items, request_q)
    )

    # 5 request consumers
    request_consumers = [
        asyncio.create_task(
            request_task(request_q, response_q)
        )
        for _ in range(5)
    ]

    # 5 response consumers
    response_consumers = [
        asyncio.create_task(
            response_task(response_q, write_q)
        )
        for _ in range(5)
    ]

    # 5 write consumers
    write_consumer = asyncio.create_task(
        write_task(write_q)
    )

    errors = await asyncio.gather(producer, return_exceptions=True)
    print(f"INFO: Producer has completed! exceptions: {errors}")

    await request_q.join()
    for c in request_consumers:
        c.cancel()
    print("INFO: request consumer has completed! ")

    await response_q.join()
    for c in response_consumers:
        c.cancel()
    print("INFO: response consumer has completed! ")
    print(f"INFO: write_q address in main: {id(write_q)}")
    print(f"INFO: write_q in main qsize: {write_q.qsize()}")
    
    await write_q.join()
    await write_consumer
    print("INFO: write consumer has completed! ")
    print("INFO: Complete!")

if __name__ == "__main__":
    loop = asyncio.new_event_loop()
    loop.run_until_complete(main())
<p>I'm working on a pattern where I communicate amongst multiple queues to process items along a pipeline. I am using sentinels to communicate between the queues when to stop working, however in the following code, I am seeing results that confuse me.</p>
<p>When reading from the <code>write_q</code> in <code>write_task()</code> I see the first value come in as the sentinel <code>None</code> instead of the tasks in the order they were placed in <code>response_task()</code>. If I understand right, <code>write_task()</code> should receive the items in order and process them in order as the tasks are created.</p>
<p>Also, when printing the <code>qsize()</code> in <code>write_task()</code> after I find the sentinel, it says there are 0 items, however when printing back in main it seems that the <code>qsize()</code> of <code>write_q</code> still has 2 items. I've read somewhere that <code>aiofiles</code> uses <code>run_in_executor()</code> which means there might be a divergence of where the queue is hand</p>
<p>Most of the below code is boilerplate to illustrate the actual scenario on why my code continues to block infinitely.</p>
<pre><code>import asyncio
import aiohttp
import aiofiles
import aiocsv
import json

async def fetch(req: dict) -> dict:
# Make the request
async with aiohttp.ClientSession() as session:
try:
async with session.request("POST", url=req["url"], data=json.dumps(req["data"]), headers=req["headers"]) as response:
payload = await response.json()
response.raise_for_status()
print(f"INFO: response status was: {response.status}")

# Put response into queue to be written to file
return payload
except Exception as err:
print(f"ERROR: error making request: {err}")

async def task(l: list, request_q: asyncio.Queue) -> None:

# Read tasks from source of data
for i in l:
# Put a request task into the queue
req: dict = {
"headers": {"Accept": "application/json"},
"url": "https://httpbin.org/post",
"data": i
}
await request_q.put(
asyncio.create_task(fetch(req))#, response_q=response_q))
)

# Sentinel value to signal we are done receiving from source
await request_q.put(None)

async def request_task(request_q: asyncio.Queue, response_q: asyncio.Queue) -> None:
while True:

# Retrieve necessary data to make request
req = await request_q.get()
print(f"INFO: request from request_q: {req}")

# If we received sentinel for tasks, pass message to next queue
if not req:
print(f"INFO: request in request_task: {req}")
print("INFO: received sentinel from request_q")
request_q.task_done()
await response_q.put(None)
break

# Make the request which will put data into the response queue
resp = await req
print(f"INFO: response in request_task: {resp}")
await response_q.put(resp)
request_q.task_done()

async def response_task(response_q: asyncio.Queue, write_q: asyncio.Queue) -> None:
while True:

# Retrieve response
resp = await response_q.get()

# If we received sentinel for tasks, pass message to next queue
if not resp:
print("INFO: received sentinel from response_q")
response_q.task_done()
await write_q.put(None)
break

await write_q.put(resp)
response_q.task_done()

async def write_task(write_q: asyncio.Queue) -> None:

headers: bool = True
while True:
print(f"INFO: write_q address in write_task: {id(write_q)}")
async with aiofiles.open("file.csv", mode="a+", newline='') as f:
w = aiocsv.AsyncWriter(f)

# Get data out of the queue to write it
data = await write_q.get()
print(f"INFO: data in write_task: {data}")

if not data:
print(f"INFO: Found sentinel in write_task, queue size was: {write_q.qsize()}")
write_q.task_done()
await f.flush()
break

if headers:
await w.writerow([
"status",
"data",
])
headers = False

# Write the data from the response
await w.writerow([
"200",
json.dumps(data)
])
await f.flush()

print("INFO: finished write task")
write_q.task_done()

async def main() -> None:

# Create fake data to POST
items: list[str] = [["hello", "world"], ["asyncio", "test"]] * 1

# Queues for orchestrating
request_q = asyncio.Queue()
response_q = asyncio.Queue()
write_q = asyncio.Queue()

# one producer
producer = asyncio.create_task(
task(items, request_q)
)

# 5 request consumers
request_consumers = [
asyncio.create_task(
request_task(request_q, response_q)
)
for _ in range(5)
]

# 5 response consumers
response_consumers = [
asyncio.create_task(
response_task(response_q, write_q)
)
for _ in range(5)
]

# 5 write consumers
write_consumer = asyncio.create_task(
write_task(write_q)
)

errors = await asyncio.gather(producer, return_exceptions=True)
print(f"INFO: Producer has completed! exceptions: {errors}")

await request_q.join()
for c in request_consumers:
c.cancel()
print("INFO: request consumer has completed! ")

await response_q.join()
for c in response_consumers:
c.cancel()
print("INFO: response consumer has completed! ")
print(f"INFO: write_q address in main: {id(write_q)}")
print(f"INFO: write_q in main qsize: {write_q.qsize()}")

await write_q.join()
await write_consumer
print("INFO: write consumer has completed! ")
print("INFO: Complete!")

if __name__ == "__main__":
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
</code></pre>
 

Latest posts

H
Replies
0
Views
1
hohohohoho
H
Top