OiO.lk Community platform!

Oio.lk is an excellent forum for developers, providing a wide range of resources, discussions, and support for those in the developer community. Join oio.lk today to connect with like-minded professionals, share insights, and stay updated on the latest trends and technologies in the development field.
  You need to log in or register to access the solved answers to this problem.
  • You have reached the maximum number of guest views allowed
  • Please register below to remove this limitation

Does processes triggered by ProcessPoolExecutor.submit have any kind of interference with each other?

  • Thread starter Thread starter xinjie-h
  • Start date Start date
X

xinjie-h

Guest
Background:

Code:
.
├── app.py # API source code file
├── worker.py # CPU-bound task

I am using ProcessPoolExecutor().submit() in app.py to execute an entrance function main() stored in worker.py. As long as the worker process is triggered, the process id of it will be returned to app process via a Queue created from Manager.

My original source code is like this:

app.py:

Code:
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager

from fastapi import FastAPI
from loguru import logger

from worker import main

app = FastAPI()


executor = ProcessPoolExecutor()
queue = Manager().Queue() # A queue created via Manager is used for message exchange.


@app.get("/whatever-route")
def whatever-route():
    logger.info(f"Before submitting")
    executor.submit(main, queue) # submit the CPU-bound task
    sub_pid = queue.get()
    logger.info(f"Got sub pid from queue.")
    return sub_pid

worker.py:

Code:
... # Three dots here means there are many packages to be imported.

logger.info("Finished importing packages.")


def init():
    pass


def main(q: Queue):
    sub_pid = os.getpid()
    q.put(sub_pid)
    logger.info(f"Put sub pid into queue.")
    ... # Three dots here means additional CPU-bound task to be executed.
    return 1

When debugging the api, I noticed that it takes sooo much time in importing the packages in worker.py:

2024-06-25 17:54:23.661 | INFO | app:whatever_route:26 - Before submitting

2024-06-25 17:54:25.146 | INFO | worker::33 - Finished importing packages.

2024-06-25 17:54:25.151 | INFO | worker:main:44 - Put sub pid into queue.

2024-06-25 17:54:25.151 | INFO | app:whatever_route:29 - Got sub pid from queue.



One feasible solution:

After realising the problem, my motivation is to try executing import clauses in worker.py once app.py is executed, instead of being put off to the stage when literally executing the task. Therefore, I added a line in app.py, after created the executor. I tried it on, it works.

app.py(modified):


Code:
... # Remain unchanged.


executor = ProcessPoolExecutor()
executor.submit(init) # NEWLY ADDED LINE. init is a function in worker.py which does nothing.
queue = Manager().Queue() # A queue created via Manager is used for message exchange.


@app.get("/whatever-route")
... # Remain unchanged

2024-06-25 18:14:59.382 | INFO | worker::33 - Finished importing packages.

------------------------Triggered API-----------------------

2024-06-25 18:19:42.704 | INFO | app:whatever_route:29 - Before submitting

2024-06-25 18:19:42.721 | INFO | worker:main:44 - Put sub pid into queue.

2024-06-25 18:19:42.721 | INFO | app:whatever_route:32 - Got sub pid from queue.



My Question:

Though the time overhead issue has been solved, I am quite confused on why my newly added line makes improvement. Given that executor.submit(init) is submitted to process A, and executor.submit(main, queue) should be submitted to a separate process B, process B should not be interfered by A and it should execute import clauses again?What is the underlying principle behind this improvement?

Apart from your guidance, any links to python official docs are highly welcomed!
<p><strong>Background:</strong></p>
<pre><code>.
├── app.py # API source code file
├── worker.py # CPU-bound task
</code></pre>
<p>I am using <em>ProcessPoolExecutor().submit()</em> in app.py to execute an entrance function <code>main()</code> stored in worker.py. As long as the worker process is triggered, the process id of it will be returned to app process via a <em>Queue</em> created from <em>Manager</em>.</p>
<p>My <strong>original</strong> source code is like this:</p>
<p><strong>app.py:</strong></p>
<pre><code>from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager

from fastapi import FastAPI
from loguru import logger

from worker import main

app = FastAPI()


executor = ProcessPoolExecutor()
queue = Manager().Queue() # A queue created via Manager is used for message exchange.


@app.get("/whatever-route")
def whatever-route():
logger.info(f"Before submitting")
executor.submit(main, queue) # submit the CPU-bound task
sub_pid = queue.get()
logger.info(f"Got sub pid from queue.")
return sub_pid
</code></pre>
<p><strong>worker.py:</strong></p>
<pre><code>... # Three dots here means there are many packages to be imported.

logger.info("Finished importing packages.")


def init():
pass


def main(q: Queue):
sub_pid = os.getpid()
q.put(sub_pid)
logger.info(f"Put sub pid into queue.")
... # Three dots here means additional CPU-bound task to be executed.
return 1

</code></pre>
<p>When debugging the api, I noticed that it takes sooo much time in importing the packages in worker.py:</p>
<p>2024-06-25 <strong>17:54:23.661</strong> | INFO | app:whatever_route:26 - Before submitting</p>
<p>2024-06-25 <strong>17:54:25.146</strong> | INFO | worker::33 - Finished importing packages.</p>
<p>2024-06-25 17:54:25.151 | INFO | worker:main:44 - Put sub pid into queue.</p>
<p>2024-06-25 17:54:25.151 | INFO | app:whatever_route:29 - Got sub pid from queue.</p>
<hr />
<p><strong>One feasible solution:</strong></p>
<p>After realising the problem, my motivation is to try executing <code>import</code> clauses in worker.py once app.py is executed, instead of being put off to the stage when literally executing the task. Therefore, I added a line in app.py, after created the executor. I tried it on, <strong>it works.</strong></p>
<p><strong>app.py(modified):</strong></p>
<pre><code>... # Remain unchanged.


executor = ProcessPoolExecutor()
executor.submit(init) # NEWLY ADDED LINE. init is a function in worker.py which does nothing.
queue = Manager().Queue() # A queue created via Manager is used for message exchange.


@app.get("/whatever-route")
... # Remain unchanged
</code></pre>
<p>2024-06-25 18:14:59.382 | INFO | worker::33 - Finished importing packages.</p>
<p>------------------------Triggered API-----------------------</p>
<p>2024-06-25 <strong>18:19:42</strong>.704 | INFO | app:whatever_route:29 - Before submitting</p>
<p>2024-06-25 <strong>18:19:42</strong>.721 | INFO | worker:main:44 - Put sub pid into queue.</p>
<p>2024-06-25 <strong>18:19:42</strong>.721 | INFO | app:whatever_route:32 - Got sub pid from queue.</p>
<hr />
<p><strong>My Question:</strong></p>
<p>Though the time overhead issue has been solved, I am quite confused on why my newly added line makes improvement. Given that <code>executor.submit(init)</code> is submitted to process A, and <code>executor.submit(main, queue)</code> should be submitted to a separate process B, process B should not be interfered by A and it should execute <code>import</code> clauses again?What is the underlying principle behind this improvement?</p>
<p>Apart from your guidance, any links to python official docs are highly welcomed!</p>
 
Top