OiO.lk Community platform!

Oio.lk is an excellent forum for developers, providing a wide range of resources, discussions, and support for those in the developer community. Join oio.lk today to connect with like-minded professionals, share insights, and stay updated on the latest trends and technologies in the development field.
  You need to log in or register to access the solved answers to this problem.
  • You have reached the maximum number of guest views allowed
  • Please register below to remove this limitation

Passing dynamically generated classes' instances as parameters for celery tasks

  • Thread starter Thread starter GregoirePelegrin
  • Start date Start date
G

GregoirePelegrin

Guest

Goal​


The objective is to pass an instance of a dynamically generated class (inheriting from a statically defined one) as a parameter to a task to be executed on a Celery worker.

Issue​


I am encountering an error at runtime, when trying to pass my instance as parameter: kombu.exceptions.EncodeError: Can't pickle local object 'ChildClassGenerator.generate.<locals>.ChildClass'.

Minimal code to reproduce (see EDIT 1)​


classes.py

Code:
from abc import ABC, abstractmethod


class ParentClass(ABC):
    def __init__(self):
        self.attribute1: str = "attribute1"

    @abstractmethod
    def some_method(self) -> str:
        pass


class ChildClassGenerator:
    @staticmethod
    def generate(value: int) -> type[ParentClass]:
        class ChildClass(ParentClass):
            def __init__(self):
                super().__init__()
                self.value: int = value

            def some_method(self) -> str:
                return f"{self.value} - {vars(self)}"
        return ChildClass

worker.py

Code:
from celery import Celery

from classes import ParentClass


def attach_test_task(celery_worker: Celery) -> Celery:
    @celery_worker.task(name="some_task")
    def some_task(some_instance: ParentClass) -> str:
        print(f"{some_instance.some_method()}")
        return some_instance.some_method()

    return celery_worker


def build_worker() -> Celery:
    celery_worker: Celery = Celery(
        "worker",
        broker="amqp://guest:guest@localhost:5672",
        backend="rpc://",
        result_expires=300
    )

    celery_worker.conf.task_default_queue = "worker"
    celery_worker.conf.event_serializer = "pickle"
    celery_worker.conf.task_serializer = "pickle"
    celery_worker.conf.result_serializer = "pickle"
    celery_worker.conf.accept_content = [
        "json", "application/json", "application/x-python-serialize"
    ]
    celery_worker.conf.broker_connection_retry_on_startup = True

    celery_worker = attach_test_task(worker=celery_worker)

    return celery_worker


worker: Celery = build_worker()

main.py

Code:
from celery.canvas import Signature

from worker import worker
from classes import ChildClassGenerator, ParentClass


def main():
    some_class: type[ParentClass] = ChildClassGenerator.generate(value=1)
    some_instance: some_class = some_class()
    task_signature: Signature = worker.signature("some_task")
    task = task_signature.delay(
        some_instance=some_instance
    )


if __name__ == "__main__":
    main()

To test, run celery -A worker.worker -q worker --loglevel DEBUG --concurrency=2 and python main.py with a local RabbitMQ.

Modifications​


The raised error led me to this GitHub issue from 2016-2017. It seems to recommend using dill as a serializer instead of pickle when trying to serialize locally defined objects. This led to the following modifications in worker.py:

Code:
from celery import Celery
# ADDED IMPORT
from kombu.serialization import registry

from classes import ParentClass


def attach_test_task(celery_worker: Celery) -> Celery:
    @celery_worker.task(name="some_task")
    def some_task(some_instance: ParentClass) -> str:
        print(f"{some_instance.some_method()}")
        return some_instance.some_method()

    return celery_worker


# ADDED METHOD
def register_dill():
    def encode(obj):
        return dill.dumps(obj=obj)

    def decode(s):
        deserialized_object = dill.loads(str=s)
        return deserialized_object

    registry.register(
        name="dill",
        encoder=encode,
        decoder=decode,
        content_type="application/x-my-content",
        content_encoding="binary"
    )


def build_worker() -> Celery:
    celery_worker: Celery = Celery(
        "worker",
        broker="amqp://guest:guest@localhost:5672",
        backend="rpc://",
        result_expires=300
    )

    # ADDED CALL
    register_dill()

    celery_worker.conf.task_default_queue = "worker"
    celery_worker.conf.event_serializer = "dill"        # CHANGED SERIALIZER
    celery_worker.conf.task_serializer = "dill"         # CHANGED SERIALIZER
    celery_worker.conf.result_serializer = "dill"       # CHANGED SERIALIZER
    celery_worker.conf.accept_content = [
        "dill", "application/x-my-content",             # ADDED ACCEPTED CONTENT
        "json", "application/json", "application/x-python-serialize",
    ]
    celery_worker.conf.broker_connection_retry_on_startup = True

    celery_worker = attach_test_task(celery_worker=celery_worker)

    return celery_worker


worker: Celery = build_worker()

Modifications results​


While this has been somewhat successful, seeing we do not have the error when running the main.py, it is now the worker which crashes following a Unrecoverable error: AttributeError("Can't pickle local object 'ChildClassGenerator.generate.<locals>.ChildClass'"). (See EDIT 2)

My questions​

  • Is there a way to fix this for the worker to work?
  • If not, is there a way to achieve what I want here?

Thanks a lot!

EDIT 1​


After some tests, it seems another way to trigger the error is to define a lambda and passing this as a parameter. The minimal code is then shorter:
worker.py

Code:
from celery import Celery


def attach_test_task(celery_worker: Celery) -> Celery:
    @celery_worker.task(name="some_task")
    def some_task(function: Callable = lambda x: print(x)):
        function("Hello")

    return celery_worker


def build_worker() -> Celery:
    celery_worker: Celery = Celery(
        "worker",
        broker="amqp://guest:guest@localhost:5672",
        backend="rpc://",
        result_expires=300
    )

    celery_worker.conf.task_default_queue = "worker"
    celery_worker.conf.event_serializer = "pickle"
    celery_worker.conf.task_serializer = "pickle"
    celery_worker.conf.result_serializer = "pickle"
    celery_worker.conf.accept_content = [
        "json", "application/json", "application/x-python-serialize"
    ]
    celery_worker.conf.broker_connection_retry_on_startup = True

    celery_worker = attach_test_task(worker=celery_worker)

    return celery_worker


worker: Celery = build_worker()

main.py

Code:
from celery.canvas import Signature

from worker import worker


def main():
    task_signature: Signature = worker.signature("some_task")
    task = task_signature.delay(function=lambda x: print(f"{x}{x}"))


if __name__ == "__main__":
    main()

This triggers the same errors, and the modification to include dill seems to have the same effects.

EDIT 2​


I first verified that dill was indeed able to dump and load <locals>.
lambda_sender.py

Code:
from typing import Callable

import dill


def main():
    function: Callable = lambda x: 2*x
    with open("testing_dill", "wb") as file:
        dill.dump(function, file)


if __name__ == "__main__":
    main()

lambda_receiver.py

Code:
import dill


def main():
    with open("testing_dill", "rb") as file:
        file_content = file.read()
    deserialized_object = dill.loads(file_content)
    print(deserialized_object(5))


if __name__ == "__main__":
    main()

This results in 10 being printed out, as desired.
Then I looked more into the dill decoder, and I found something surprising. When adding a breakpoint before the decoder's return statement to explore the deserialized_object, it seems the object is decoded correctly (deserialized_object[1]["some_instance"].some_method() results in "1 - {'attribute1': 'attribute1', 'value': 1}", which is the expected/desired behavior). Thus it seems the object is correctly decoded, but the next steps are blocking the code execution.

EDIT 3​


After exploring the stack trace, I've found the following. (Every method or class here is defined in celery.concurrency.asynpool, unless stated otherwise, ellipses have been added to shorten the definitions and focus on important parts)
To trigger a task, Celery seems to be registering the async pool with the current event loop (through AsynPool.register_with_event_loop(self, hub: ...)).
This in turn calls the AsynPool._create_write_handlers(self, hub: ..., ..., dumps: ... = _pickle.dumps, ...) method as such self._create_write_handlers(hub). Notice that this forces the dumps parameter to its default value of pickle.dumps.
We already know this serializer doesn't work for us. This also seems to prevent us from passing dill.dumps instead of pickle.dumps, as the register_with_event_loop doesn't offer the option to change the serializer.
Even if this was supported in this method, there is an entire stack trace of calls where this parameter would need to be added, isn't it?

EDIT 4​


For completeness, I also have opened a GitHub discussion on the Celery GitHub repository.

EDIT 5​


Changing the default serializer (and deserializer) has worked, I opened two PRs, on Celery and on Billiard. It seems it doesn't have any adversarial effects, though I have not been able to use the test suite.
<h2>Goal</h2>
<p>The objective is to pass an instance of a dynamically generated class (inheriting from a statically defined one) as a parameter to a task to be executed on a Celery worker.</p>
<h2>Issue</h2>
<p>I am encountering an error at runtime, when trying to pass my instance as parameter: <code>kombu.exceptions.EncodeError: Can't pickle local object 'ChildClassGenerator.generate.<locals>.ChildClass'</code>.</p>
<h2>Minimal code to reproduce (see EDIT 1)</h2>
<p><code>classes.py</code></p>
<pre class="lang-py prettyprint-override"><code>from abc import ABC, abstractmethod


class ParentClass(ABC):
def __init__(self):
self.attribute1: str = "attribute1"

@abstractmethod
def some_method(self) -> str:
pass


class ChildClassGenerator:
@staticmethod
def generate(value: int) -> type[ParentClass]:
class ChildClass(ParentClass):
def __init__(self):
super().__init__()
self.value: int = value

def some_method(self) -> str:
return f"{self.value} - {vars(self)}"
return ChildClass
</code></pre>
<p><code>worker.py</code></p>
<pre class="lang-py prettyprint-override"><code>from celery import Celery

from classes import ParentClass


def attach_test_task(celery_worker: Celery) -> Celery:
@celery_worker.task(name="some_task")
def some_task(some_instance: ParentClass) -> str:
print(f"{some_instance.some_method()}")
return some_instance.some_method()

return celery_worker


def build_worker() -> Celery:
celery_worker: Celery = Celery(
"worker",
broker="amqp://guest:guest@localhost:5672",
backend="rpc://",
result_expires=300
)

celery_worker.conf.task_default_queue = "worker"
celery_worker.conf.event_serializer = "pickle"
celery_worker.conf.task_serializer = "pickle"
celery_worker.conf.result_serializer = "pickle"
celery_worker.conf.accept_content = [
"json", "application/json", "application/x-python-serialize"
]
celery_worker.conf.broker_connection_retry_on_startup = True

celery_worker = attach_test_task(worker=celery_worker)

return celery_worker


worker: Celery = build_worker()
</code></pre>
<p><code>main.py</code></p>
<pre class="lang-py prettyprint-override"><code>from celery.canvas import Signature

from worker import worker
from classes import ChildClassGenerator, ParentClass


def main():
some_class: type[ParentClass] = ChildClassGenerator.generate(value=1)
some_instance: some_class = some_class()
task_signature: Signature = worker.signature("some_task")
task = task_signature.delay(
some_instance=some_instance
)


if __name__ == "__main__":
main()
</code></pre>
<p>To test, run <code>celery -A worker.worker -q worker --loglevel DEBUG --concurrency=2</code> and <code>python main.py</code> with a local RabbitMQ.</p>
<h2>Modifications</h2>
<p>The raised error led me to <a href="https://github.com/celery/celery/issues/3404" rel="nofollow noreferrer">this GitHub issue from 2016-2017</a>. It seems to recommend using <code>dill</code> as a serializer instead of <code>pickle</code> when trying to serialize locally defined objects. This led to the following modifications in <code>worker.py</code>:</p>
<pre class="lang-py prettyprint-override"><code>from celery import Celery
# ADDED IMPORT
from kombu.serialization import registry

from classes import ParentClass


def attach_test_task(celery_worker: Celery) -> Celery:
@celery_worker.task(name="some_task")
def some_task(some_instance: ParentClass) -> str:
print(f"{some_instance.some_method()}")
return some_instance.some_method()

return celery_worker


# ADDED METHOD
def register_dill():
def encode(obj):
return dill.dumps(obj=obj)

def decode(s):
deserialized_object = dill.loads(str=s)
return deserialized_object

registry.register(
name="dill",
encoder=encode,
decoder=decode,
content_type="application/x-my-content",
content_encoding="binary"
)


def build_worker() -> Celery:
celery_worker: Celery = Celery(
"worker",
broker="amqp://guest:guest@localhost:5672",
backend="rpc://",
result_expires=300
)

# ADDED CALL
register_dill()

celery_worker.conf.task_default_queue = "worker"
celery_worker.conf.event_serializer = "dill" # CHANGED SERIALIZER
celery_worker.conf.task_serializer = "dill" # CHANGED SERIALIZER
celery_worker.conf.result_serializer = "dill" # CHANGED SERIALIZER
celery_worker.conf.accept_content = [
"dill", "application/x-my-content", # ADDED ACCEPTED CONTENT
"json", "application/json", "application/x-python-serialize",
]
celery_worker.conf.broker_connection_retry_on_startup = True

celery_worker = attach_test_task(celery_worker=celery_worker)

return celery_worker


worker: Celery = build_worker()

</code></pre>
<h2>Modifications results</h2>
<p>While this has been somewhat successful, seeing we do not have the error when running the <code>main.py</code>, it is now the worker which crashes following a <code>Unrecoverable error: AttributeError("Can't pickle local object 'ChildClassGenerator.generate.<locals>.ChildClass'")</code>. (See EDIT 2)</p>
<h2>My questions</h2>
<ul>
<li>Is there a way to fix this for the worker to work?</li>
<li>If not, is there a way to achieve what I want here?</li>
</ul>
<p>Thanks a lot!</p>
<h2>EDIT 1</h2>
<p>After some tests, it seems another way to trigger the error is to define a lambda and passing this as a parameter. The minimal code is then shorter:<br />
<code>worker.py</code></p>
<pre class="lang-py prettyprint-override"><code>from celery import Celery


def attach_test_task(celery_worker: Celery) -> Celery:
@celery_worker.task(name="some_task")
def some_task(function: Callable = lambda x: print(x)):
function("Hello")

return celery_worker


def build_worker() -> Celery:
celery_worker: Celery = Celery(
"worker",
broker="amqp://guest:guest@localhost:5672",
backend="rpc://",
result_expires=300
)

celery_worker.conf.task_default_queue = "worker"
celery_worker.conf.event_serializer = "pickle"
celery_worker.conf.task_serializer = "pickle"
celery_worker.conf.result_serializer = "pickle"
celery_worker.conf.accept_content = [
"json", "application/json", "application/x-python-serialize"
]
celery_worker.conf.broker_connection_retry_on_startup = True

celery_worker = attach_test_task(worker=celery_worker)

return celery_worker


worker: Celery = build_worker()
</code></pre>
<p><code>main.py</code></p>
<pre class="lang-py prettyprint-override"><code>from celery.canvas import Signature

from worker import worker


def main():
task_signature: Signature = worker.signature("some_task")
task = task_signature.delay(function=lambda x: print(f"{x}{x}"))


if __name__ == "__main__":
main()
</code></pre>
<p>This triggers the same errors, and the modification to include <code>dill</code> seems to have the same effects.</p>
<h2>EDIT 2</h2>
<p>I first verified that <code>dill</code> was indeed able to dump and load <code><locals></code>.<br />
<code>lambda_sender.py</code></p>
<pre class="lang-py prettyprint-override"><code>from typing import Callable

import dill


def main():
function: Callable = lambda x: 2*x
with open("testing_dill", "wb") as file:
dill.dump(function, file)


if __name__ == "__main__":
main()
</code></pre>
<p><code>lambda_receiver.py</code></p>
<pre class="lang-py prettyprint-override"><code>import dill


def main():
with open("testing_dill", "rb") as file:
file_content = file.read()
deserialized_object = dill.loads(file_content)
print(deserialized_object(5))


if __name__ == "__main__":
main()
</code></pre>
<p>This results in <code>10</code> being printed out, as desired.<br />
Then I looked more into the dill decoder, and I found something surprising. When adding a breakpoint before the decoder's <code>return</code> statement to explore the <code>deserialized_object</code>, it seems the object is decoded correctly (<code>deserialized_object[1]["some_instance"].some_method()</code> results in <code>"1 - {'attribute1': 'attribute1', 'value': 1}"</code>, which is the expected/desired behavior). Thus it seems the object is correctly decoded, but the next steps are blocking the code execution.</p>
<h2>EDIT 3</h2>
<p>After exploring the stack trace, I've found the following. (Every method or class here is defined in <code>celery.concurrency.asynpool</code>, unless stated otherwise, ellipses have been added to shorten the definitions and focus on important parts)<br />
To trigger a task, Celery seems to be registering the async pool with the current event loop (through <code>AsynPool.register_with_event_loop(self, hub: ...)</code>).<br />
This in turn calls the <code>AsynPool._create_write_handlers(self, hub: ..., ..., dumps: ... = _pickle.dumps, ...)</code> method as such <code>self._create_write_handlers(hub)</code>. Notice that this forces the <code>dumps</code> parameter to its default value of <code>pickle.dumps</code>.<br />
We already know this serializer doesn't work for us. This also seems to prevent us from passing <code>dill.dumps</code> instead of <code>pickle.dumps</code>, as the <code>register_with_event_loop</code> doesn't offer the option to change the serializer.<br />
Even if this was supported in this method, there is an entire stack trace of calls where this parameter would need to be added, isn't it?</p>
<h2>EDIT 4</h2>
<p>For completeness, I also have opened <a href="https://github.com/celery/celery/discussions/9081" rel="nofollow noreferrer">a GitHub discussion</a> on the Celery GitHub repository.</p>
<h2>EDIT 5</h2>
<p>Changing the default serializer (and deserializer) has worked, I opened two PRs, on <a href="https://github.com/celery/celery/pull/9100" rel="nofollow noreferrer">Celery</a> and on <a href="https://github.com/celery/billiard/pull/406" rel="nofollow noreferrer">Billiard</a>. It seems it doesn't have any adversarial effects, though I have not been able to use the test suite.</p>
 

Latest posts

Top