OiO.lk Community platform!

Oio.lk is an excellent forum for developers, providing a wide range of resources, discussions, and support for those in the developer community. Join oio.lk today to connect with like-minded professionals, share insights, and stay updated on the latest trends and technologies in the development field.
  You need to log in or register to access the solved answers to this problem.
  • You have reached the maximum number of guest views allowed
  • Please register below to remove this limitation

How to execute a DAG of tasks using async.io?

  • Thread starter Thread starter chunchunmaru
  • Start date Start date
C

chunchunmaru

Guest
I have written the following code: `

Code:
    async def execute_task(self, task_id):
        await self.tasks[task_id]()
        self.task_status[task_id] = "done"

        async with self.lock:
            for successor in self.dag.successors(task_id):
                if all(self.task_status[predecessor] == "done" for predecessor in self.dag.predecessors(successor)):
                    self.ready_queue.append(successor)

    async def execute_dag(self):
        for task in self.dag.nodes:
            if self.dag.in_degree(task) == 0:
                self.ready_queue.append(task)
        
        

        while self.ready_queue:
            current_tasks = []
            async with self.lock:
                while self.ready_queue:
                    task_id = self.ready_queue.popleft()
                    self.task_status[task_id] = "running"
                    current_tasks.append(self.execute_task(task_id))
            
            await asyncio.gather(*current_tasks)

For the following testcase:

Code:
1 => 2
1 => 5
2 => 3
3 => 4
5 => 4

I have the output:

Code:
Executing Task 1
Task 1 Done
Executing Task 2
Executing Task 5
Task 2 Done
Task 5 Done
Executing Task 3
Task 3 Done
Executing Task 4
Task 4 Done

Where the task 5 takes significantly more time. Here, why does task 3 wait for task 5 to complete? asyncio.gather seems to run the tasks in batches. Is there any workaround for this?

I want a minimal code to execute tasks in parallel. But a few testcases seem to bottleneck in my implementation.
<p>I have written the following code:
`</p>
<pre><code> async def execute_task(self, task_id):
await self.tasks[task_id]()
self.task_status[task_id] = "done"

async with self.lock:
for successor in self.dag.successors(task_id):
if all(self.task_status[predecessor] == "done" for predecessor in self.dag.predecessors(successor)):
self.ready_queue.append(successor)

async def execute_dag(self):
for task in self.dag.nodes:
if self.dag.in_degree(task) == 0:
self.ready_queue.append(task)



while self.ready_queue:
current_tasks = []
async with self.lock:
while self.ready_queue:
task_id = self.ready_queue.popleft()
self.task_status[task_id] = "running"
current_tasks.append(self.execute_task(task_id))

await asyncio.gather(*current_tasks)

</code></pre>
<p>For the following testcase:</p>
<pre><code>1 => 2
1 => 5
2 => 3
3 => 4
5 => 4
</code></pre>
<p>I have the output:</p>
<pre><code>Executing Task 1
Task 1 Done
Executing Task 2
Executing Task 5
Task 2 Done
Task 5 Done
Executing Task 3
Task 3 Done
Executing Task 4
Task 4 Done
</code></pre>
<p>Where the task 5 takes significantly more time. Here, why does task 3 wait for task 5 to complete? <code>asyncio.gather</code> seems to run the tasks in batches. Is there any workaround for this?</p>
<p>I want a minimal code to execute tasks in parallel. But a few testcases seem to bottleneck in my implementation.</p>
 

Latest posts

I
Replies
0
Views
1
Isaac P. Liu
I
U
Replies
0
Views
1
user3658366
U
G
Replies
0
Views
1
Giampaolo Levorato
G
M
Replies
0
Views
1
Marcelo Rodrigo Nascimento
M
Top