OiO.lk Community platform!

Oio.lk is an excellent forum for developers, providing a wide range of resources, discussions, and support for those in the developer community. Join oio.lk today to connect with like-minded professionals, share insights, and stay updated on the latest trends and technologies in the development field.
  You need to log in or register to access the solved answers to this problem.
  • You have reached the maximum number of guest views allowed
  • Please register below to remove this limitation

Asyncio Task was destroyed but it was pending

  • Thread starter Thread starter Coldchain9
  • Start date Start date
C

Coldchain9

Guest
I am making requests to an external API, getting the response back and writing it to a file. Everything works/runs fine, however I receive the "Task was destroyed but it is pending!" warning that I'd like to clean up.

I have emulated the process below. I receive items from a source (e.g. a list) of items and as I receive them, I put them into the queue, signaling the end of the list but putting in a sentinel None value.

The write_q continues to receive items from this queue until it receives the sentinel and then breaks out.

Below is the code which will show that my write_task() is cancelled before it is completed. What is the proper design to handle this?

Code:
import asyncio
import aiofiles
import aiocsv
import json

async def task(l: list,  write_q: asyncio.Queue) -> None:

    # Read tasks from source of data
    for i in l:
        # Put a request task into the queue
        req: dict = {
            "headers": {"Accept": "application/json"},
            "url": "https://httpbin.org/post",
            "data": i
        }
        await write_q.put(req)

    # Sentinel value to signal we are done receiving from source
    await write_q.put(None)

async def write_task(write_q: asyncio.Queue) -> None:

    headers: bool = True
    while True:
        async with aiofiles.open("file.csv", mode="a+", newline='') as f:
            w = aiocsv.AsyncWriter(f)
            # Get data out of the queue to write it
            data = await write_q.get()
            if not data:
                write_q.task_done()
                await f.flush()
                break

            if headers:
                await w.writerow([
                    "status",
                    "data",
                ])
                headers = False

            # Write the data from the response
            await w.writerow([
                "200",
                json.dumps(data)
            ])
            write_q.task_done()

async def main() -> None:

    # Create fake data to POST
    items: list[str] = [["hello", "world"], ["asyncio", "test"]] * 5 

    # Queues for orchestrating 
    write_q = asyncio.Queue()
    producer = asyncio.create_task(
        task(items, write_q)
    )
    consumer = asyncio.create_task(
        write_task(write_q)
    )

    errors = await asyncio.gather(producer, return_exceptions=True)
    print(f"INFO: Producer has completed! exceptions: {errors}")

    # Wait for queue to empty and cancel the consumer
    await write_q.join()
    consumer.cancel()
    print("INFO: write consumer has completed! ")
    print("INFO: Complete!")

if __name__ == "__main__":
    loop = asyncio.new_event_loop()
    loop.run_until_complete(main())
<p>I am making requests to an external API, getting the response back and writing it to a file. Everything works/runs fine, however I receive the "Task was destroyed but it is pending!" warning that I'd like to clean up.</p>
<p>I have emulated the process below. I receive items from a source (e.g. a list) of items and as I receive them, I put them into the queue, signaling the end of the list but putting in a sentinel <code>None</code> value.</p>
<p>The <code>write_q</code> continues to receive items from this queue until it receives the sentinel and then breaks out.</p>
<p>Below is the code which will show that my <code>write_task()</code> is cancelled before it is completed. What is the proper design to handle this?</p>
<pre><code>import asyncio
import aiofiles
import aiocsv
import json

async def task(l: list, write_q: asyncio.Queue) -> None:

# Read tasks from source of data
for i in l:
# Put a request task into the queue
req: dict = {
"headers": {"Accept": "application/json"},
"url": "https://httpbin.org/post",
"data": i
}
await write_q.put(req)

# Sentinel value to signal we are done receiving from source
await write_q.put(None)

async def write_task(write_q: asyncio.Queue) -> None:

headers: bool = True
while True:
async with aiofiles.open("file.csv", mode="a+", newline='') as f:
w = aiocsv.AsyncWriter(f)
# Get data out of the queue to write it
data = await write_q.get()
if not data:
write_q.task_done()
await f.flush()
break

if headers:
await w.writerow([
"status",
"data",
])
headers = False

# Write the data from the response
await w.writerow([
"200",
json.dumps(data)
])
write_q.task_done()

async def main() -> None:

# Create fake data to POST
items: list[str] = [["hello", "world"], ["asyncio", "test"]] * 5

# Queues for orchestrating
write_q = asyncio.Queue()
producer = asyncio.create_task(
task(items, write_q)
)
consumer = asyncio.create_task(
write_task(write_q)
)

errors = await asyncio.gather(producer, return_exceptions=True)
print(f"INFO: Producer has completed! exceptions: {errors}")

# Wait for queue to empty and cancel the consumer
await write_q.join()
consumer.cancel()
print("INFO: write consumer has completed! ")
print("INFO: Complete!")

if __name__ == "__main__":
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
</code></pre>
 

Latest posts

Top