A

#### Aenaon

##### Guest

`compute function`

vs `compute method`

and `Client`

.I understand that

`dask.compute()`

operates on multiple dask collections at once (see here), but what I want to ask is whether in that case the operations happen in a pool of threads. I imagine so since intermediate results are shared. I have some code that runs fine on my local pc. I dont see any new python processes getting spawned, I do see however a spike in one of my cpu cores, it reaches 100% sometimes the load is split in more than one CPUs.Is there a way to use multiple python processes with

`dask.compute`

?I used a very simple example (taken from here, it is the function

`fun_1`

in the code below) to experiment with Client so I can manuall create processes and threads. For that I wrote `fun_2`

but I am must be doing something really wrond because execution time is a lot slower now. When the input to these functions is a pair of arrays with length 100 then, on my pc, the execution time from `0.85secs for fun_a`

jumps to `14.71secs for fun_2`

.Also, when the input array A is array of length 100 abd B has length 1000 I get the

`UserWarning: Sending large graph of size ....`

Could someone help me understand what I am doing wrong with `fun_2`

and how I should change it pleaseFinally, it is really strange that the vanila python function (without Dask) named

`fun_3`

is always the fastest by a very big margin! Is this because the case here is so simple that dask doesnt really help?I have a more complicated and heavier case and I would like to use dask to run it on my local pc but also on a multinode compute cluster facility and I try to understand how I should do my code to make the best use out of dask. Any comments will be greatly appreciated.

Code:

```
import dask
import dask.array as da
from dask.distributed import Client, progress
import numpy as np
import time
def f(x, y):
return min(x, y)
def g(x, y):
return x + y
def fun_1(A, B):
lazy_results = []
for a in A:
for b in B:
if a < b:
c = dask.delayed(f)(a, b) # add lazy task
else:
c = dask.delayed(g)(a, b) # add lazy task
lazy_results.append(c)
results = dask.compute(*lazy_results)
print(sum(results))
def fun_2(A, B):
lazy_results = []
for a in A:
for b in B:
if a < b:
c = dask.delayed(f)(a, b) # add lazy task
else:
c = dask.delayed(g)(a, b) # add lazy task
c = da.from_delayed(c, shape=(), dtype=np.float64)
lazy_results.append(c)
client = Client(threads_per_worker=8, n_workers=8)
results = da.block(lazy_results).compute()
print(sum(results))
def fun_3(A, B):
"""
Simple sequential function. No dask involved here
"""
results = []
for a in A:
for b in B:
if a < b:
c = f(a, b)
else:
c = g(a, b)
results.append(c)
print(sum(results))
if __name__ == "__main__":
np.random.seed(0)
A = np.random.random(100)
B = np.random.random(100)
functions = [fun_1, fun_2, fun_3]
for fun in functions:
tic = time.time()
fun(A, B)
toc = time.time()
print(f"{fun}: Computation time: {toc- tic:.2f} seconds\n")
```

<p>I understand that <code>dask.compute()</code> operates on multiple dask collections at once (see <a href="https://docs.dask.org/en/stable/scheduler-overview.html" rel="nofollow noreferrer">here</a>), but what I want to ask is whether in that case the operations happen in a pool of threads. I imagine so since intermediate results are shared.

I have some code that runs fine on my local pc. I dont see any new python processes getting spawned, I do see however a spike in one of my cpu cores, it reaches 100% sometimes the load is split in more than one CPUs.</p>

<p>Is there a way to use multiple python processes with <code>dask.compute</code>?</p>

<p>I used a very simple example (taken from <a href="https://docs.dask.org/en/stable/user-interfaces.html" rel="nofollow noreferrer">here</a>, it is the function <code>fun_1</code> in the code below) to experiment with Client so I can manuall create processes and threads. For that I wrote <code>fun_2</code> but I am must be doing something really wrond because execution time is a lot slower now. When the input to these functions is a pair of arrays with length 100 then, on my pc, the execution time from <code>0.85secs for fun_a</code> jumps to <code>14.71secs for fun_2</code>.</p>

<p>Also, when the input array A is array of length 100 abd B has length 1000 I get the <code>UserWarning: Sending large graph of size ....</code> Could someone help me understand what I am doing wrong with <code>fun_2</code> and how I should change it please</p>

<p>Finally, it is really strange that the vanila python function (without Dask) named

<code>fun_3</code> is always the fastest by a very big margin! Is this because the case here is so simple that dask doesnt really help?</p>

<p>I have a more complicated and heavier case and I would like to use dask to run it on my local pc but also on a multinode compute cluster facility and I try to understand how I should do my code to make the best use out of dask. Any comments will be greatly appreciated.</p>

<pre><code>import dask

import dask.array as da

from dask.distributed import Client, progress

import numpy as np

import time

def f(x, y):

return min(x, y)

def g(x, y):

return x + y

def fun_1(A, B):

lazy_results = []

for a in A:

for b in B:

if a < b:

c = dask.delayed(f)(a, b) # add lazy task

else:

c = dask.delayed(g)(a, b) # add lazy task

lazy_results.append(c)

results = dask.compute(*lazy_results)

print(sum(results))

def fun_2(A, B):

lazy_results = []

for a in A:

for b in B:

if a < b:

c = dask.delayed(f)(a, b) # add lazy task

else:

c = dask.delayed(g)(a, b) # add lazy task

c = da.from_delayed(c, shape=(), dtype=np.float64)

lazy_results.append(c)

client = Client(threads_per_worker=8, n_workers=8)

results = da.block(lazy_results).compute()

print(sum(results))

def fun_3(A, B):

"""

Simple sequential function. No dask involved here

"""

results = []

for a in A:

for b in B:

if a < b:

c = f(a, b)

else:

c = g(a, b)

results.append(c)

print(sum(results))

if __name__ == "__main__":

np.random.seed(0)

A = np.random.random(100)

B = np.random.random(100)

functions = [fun_1, fun_2, fun_3]

for fun in functions:

tic = time.time()

fun(A, B)

toc = time.time()

print(f"{fun}: Computation time: {toc- tic:.2f} seconds\n")

</code></pre>