OiO.lk Community platform!

Oio.lk is an excellent forum for developers, providing a wide range of resources, discussions, and support for those in the developer community. Join oio.lk today to connect with like-minded professionals, share insights, and stay updated on the latest trends and technologies in the development field.
  You need to log in or register to access the solved answers to this problem.
  • You have reached the maximum number of guest views allowed
  • Please register below to remove this limitation

How to paralelize list of unrelated functions and get result from them all?

  • Thread starter Thread starter KamilCuk
  • Start date Start date
K

KamilCuk

Guest
Link to mypy: https://mypy-play.net/?mypy=latest&python=3.12&gist=a4da5db5bfbdf1e6bddce442286cc843

More often than not, I find myself connecting to multiple APIs and then collecting the results. Requests to multiple APIs can be made in parallel.

Code:
from typing import Dict, List, TypeVar, Callable, Tuple, overload
from concurrent.futures import ThreadPoolExecutor

def get_airflow() -> str:
    return "a"
def get_prometheus() -> Dict[str, str]:
    return {"b": "c"}
def get_zabbix() -> List[str]:
    return ["d", "e"]

adata = get_airflow()
pdata = get_prometheus()
zdata = get_zabbix()

I can parallelize it using ThreadPoolExecutor:

Code:
exe = ThreadPoolExecutor()
arr = [
     exe.submit(get_airflow),
     exe.submit(get_prometheus),
     exe.submit(get_zabbix),
]
adata = arr[0].result()
pdata = arr[1].result()
zdata = arr[2].result()

However, this design requires that I keep a separate list of submits and a separate list of variable assignments, and I have to keep those lists in sync and don't mix. It also loses any typing information. Is there a way I could write it better? Something along:

Code:
adata, pdata, zdata = paralelize(get_airflow, get_prometheus, get_zabbix)

How to write such a function to preserve typing information? I tried the following, but I do not understand how to preserve types or construct a tuple with a dynamic number of elements:

Code:
T1 = TypeVar('T1')
T2 = TypeVar('T2')
T3 = TypeVar('T3')
@overload
def parallelize(a: Callable[[], T1]) -> Tuple[T1]: ...
@overload
def parallelize(a: Callable[[], T1], b: Callable[[], T2]) -> Tuple[T1, T2]: ...
@overload
def parallelize(a: Callable[[], T1], b: Callable[[], T2], c: Callable[[], T3]) -> Tuple[T1, T2, T3]: ...
def parallelize(*cbs: Callable[[], Any]) -> Tuple[Any]:
    with ThreadPoolExecutor() as exe:
        return (x.result() for x in [exe.submit(cb) for cb in cbs])

How to write such a function to preserve typing information? Is it possible using standard functions, like futures.concurrent.wait(..)[0]? Is there a better way to do it?
<p>Link to mypy: <a href="https://mypy-play.net/?mypy=latest&python=3.12&gist=a4da5db5bfbdf1e6bddce442286cc843" rel="nofollow noreferrer">https://mypy-play.net/?mypy=latest&python=3.12&gist=a4da5db5bfbdf1e6bddce442286cc843</a></p>
<p>More often than not, I find myself connecting to multiple APIs and then collecting the results. Requests to multiple APIs can be made in parallel.</p>
<pre><code>from typing import Dict, List, TypeVar, Callable, Tuple, overload
from concurrent.futures import ThreadPoolExecutor

def get_airflow() -> str:
return "a"
def get_prometheus() -> Dict[str, str]:
return {"b": "c"}
def get_zabbix() -> List[str]:
return ["d", "e"]

adata = get_airflow()
pdata = get_prometheus()
zdata = get_zabbix()
</code></pre>
<p>I can parallelize it using ThreadPoolExecutor:</p>
<pre><code>exe = ThreadPoolExecutor()
arr = [
exe.submit(get_airflow),
exe.submit(get_prometheus),
exe.submit(get_zabbix),
]
adata = arr[0].result()
pdata = arr[1].result()
zdata = arr[2].result()
</code></pre>
<p>However, this design requires that I keep a separate list of submits and a separate list of variable assignments, and I have to keep those lists in sync and don't mix. It also loses any typing information. Is there a way I could write it better? Something along:</p>
<pre><code>adata, pdata, zdata = paralelize(get_airflow, get_prometheus, get_zabbix)
</code></pre>
<p>How to write such a function to preserve typing information? I tried the following, but I do not understand how to preserve types or construct a tuple with a dynamic number of elements:</p>
<pre><code>T1 = TypeVar('T1')
T2 = TypeVar('T2')
T3 = TypeVar('T3')
@overload
def parallelize(a: Callable[[], T1]) -> Tuple[T1]: ...
@overload
def parallelize(a: Callable[[], T1], b: Callable[[], T2]) -> Tuple[T1, T2]: ...
@overload
def parallelize(a: Callable[[], T1], b: Callable[[], T2], c: Callable[[], T3]) -> Tuple[T1, T2, T3]: ...
def parallelize(*cbs: Callable[[], Any]) -> Tuple[Any]:
with ThreadPoolExecutor() as exe:
return (x.result() for x in [exe.submit(cb) for cb in cbs])
</code></pre>
<p>How to write such a function to preserve typing information? Is it possible using standard functions, like <code>futures.concurrent.wait(..)[0]</code>? Is there a better way to do it?</p>
 

Online statistics

Members online
0
Guests online
3
Total visitors
3
Top