OiO.lk Community platform!

Oio.lk is an excellent forum for developers, providing a wide range of resources, discussions, and support for those in the developer community. Join oio.lk today to connect with like-minded professionals, share insights, and stay updated on the latest trends and technologies in the development field.
  You need to log in or register to access the solved answers to this problem.
  • You have reached the maximum number of guest views allowed
  • Please register below to remove this limitation

dependency_injector Python for decoupled projects

  • Thread starter Thread starter RStyle
  • Start date Start date
R

RStyle

Guest
I have 3 main modules

  • application (knows about domain)
  • domain (no dependencies)
  • persistence (knows about application)
  • main.py (knows about everything)

I want to use mediatr and dependency_injector while keeping the inter module dependencies as above.

Im running into the following error during injection, AttributeError: 'Provide' object has no attribute 'get_person' In my domain layer, i have a simple Person model.

Code:
# ./domain/models.py

from dataclasses import dataclass

@dataclass
class Person:
    id :str
    name :str
    gender :str

In my application layer, I have 3 files,

Code:
# ./application/interfaces/database.py
from abc import ABC, abstractmethod
from domain.models import Person

class IDBConnection(ABC):
    
    # This has to be implemented by persistence layer

    @abstractmethod
    def get_person(self, id :str) -> Person:
        pass

Code:
# ./application/containers.py
from dependency_injector import containers, providers
class ApplicationContainer(containers.DeclarativeContainer):
    
    dbconnection = providers.Dependency()

Code:
# ./application/personcontext/queries/person.py

from domain.models import Person
from mediatr import Mediator
from application.interfaces.database import IDBConnection
from dependency_injector.wiring import inject, Provide
from application.containers import ApplicationContainer

class PersonQuery:

    def __init__(self, id :str) -> None:
        self.id = id

@inject
@Mediator.handler
class PersonQueryHandler:

    
    def __init__(self, db:IDBConnection=Provide[ApplicationContainer.dbconnection]) -> None:
        self.db:IDBConnection = db

    def handle(self, request :PersonQuery):
        return self.db.get_person(request.id)

In my persistence layer,

Code:
# ./persistence/containers.py

from dependency_injector import containers,providers
from application.interfaces.database import IDBConnection
from persistence.storage import DBConnection
from application.containers import ApplicationContainer

class PersistenceContainer(containers.DeclarativeContainer):
    database = providers.Singleton(IDBConnection, DBConnection())
    application_container = providers.Container(
        ApplicationContainer,
        dbconnection=database
    )

Code:
# ./persistence/storage.py
from application.interfaces.database import IDBConnection
from domain.models import Person

class DBConnection(IDBConnection):
    
    def get_person(self, id :int) -> Person:
        return Person(id=id, name="Razali", gender="Male")

finally in my main,

Code:
from mediatr import Mediator
from application.personcontext.queries.person import PersonQuery
from dependency_injector import containers, providers
from dependency_injector.wiring import inject, Provide
from application.interfaces.database import IDBConnection
from persistence.storage import DBConnection
from persistence.containers import PersistenceContainer
from application.containers import ApplicationContainer

class Container(containers.DeclarativeContainer):
    mediator = providers.Singleton(Mediator)
    application_container = providers.Container(ApplicationContainer)
    persistence_container = providers.Container(PersistenceContainer)


@inject
def main(mediator:Mediator=Provide[Container.mediator]):
    response = mediator.send(PersonQuery(1))
    print(response)

if __name__ == '__main__':
    container = Container()
    container.init_resources()
    container.wire(modules=[__name__])
    main()
<p>I have 3 main modules</p>
<ul>
<li>application (knows about domain)</li>
<li>domain (no dependencies)</li>
<li>persistence (knows about application)</li>
<li>main.py (knows about everything)</li>
</ul>
<p>I want to use mediatr and dependency_injector while keeping the inter module dependencies as above.</p>
<p>Im running into the following error during injection,
<code>AttributeError: 'Provide' object has no attribute 'get_person'</code>
In my domain layer, i have a simple Person model.</p>
<pre class="lang-py prettyprint-override"><code># ./domain/models.py

from dataclasses import dataclass

@dataclass
class Person:
id :str
name :str
gender :str
</code></pre>
<p>In my application layer, I have 3 files,</p>
<pre class="lang-py prettyprint-override"><code># ./application/interfaces/database.py
from abc import ABC, abstractmethod
from domain.models import Person

class IDBConnection(ABC):

# This has to be implemented by persistence layer

@abstractmethod
def get_person(self, id :str) -> Person:
pass


</code></pre>
<pre class="lang-py prettyprint-override"><code># ./application/containers.py
from dependency_injector import containers, providers
class ApplicationContainer(containers.DeclarativeContainer):

dbconnection = providers.Dependency()
</code></pre>
<pre class="lang-py prettyprint-override"><code># ./application/personcontext/queries/person.py

from domain.models import Person
from mediatr import Mediator
from application.interfaces.database import IDBConnection
from dependency_injector.wiring import inject, Provide
from application.containers import ApplicationContainer

class PersonQuery:

def __init__(self, id :str) -> None:
self.id = id

@inject
@Mediator.handler
class PersonQueryHandler:


def __init__(self, db:IDBConnection=Provide[ApplicationContainer.dbconnection]) -> None:
self.db:IDBConnection = db

def handle(self, request :PersonQuery):
return self.db.get_person(request.id)
</code></pre>
<p>In my persistence layer,</p>
<pre class="lang-py prettyprint-override"><code># ./persistence/containers.py

from dependency_injector import containers,providers
from application.interfaces.database import IDBConnection
from persistence.storage import DBConnection
from application.containers import ApplicationContainer

class PersistenceContainer(containers.DeclarativeContainer):
database = providers.Singleton(IDBConnection, DBConnection())
application_container = providers.Container(
ApplicationContainer,
dbconnection=database
)


</code></pre>
<pre class="lang-py prettyprint-override"><code># ./persistence/storage.py
from application.interfaces.database import IDBConnection
from domain.models import Person

class DBConnection(IDBConnection):

def get_person(self, id :int) -> Person:
return Person(id=id, name="Razali", gender="Male")

</code></pre>
<p>finally in my main,</p>
<pre class="lang-py prettyprint-override"><code>from mediatr import Mediator
from application.personcontext.queries.person import PersonQuery
from dependency_injector import containers, providers
from dependency_injector.wiring import inject, Provide
from application.interfaces.database import IDBConnection
from persistence.storage import DBConnection
from persistence.containers import PersistenceContainer
from application.containers import ApplicationContainer

class Container(containers.DeclarativeContainer):
mediator = providers.Singleton(Mediator)
application_container = providers.Container(ApplicationContainer)
persistence_container = providers.Container(PersistenceContainer)


@inject
def main(mediator:Mediator=Provide[Container.mediator]):
response = mediator.send(PersonQuery(1))
print(response)

if __name__ == '__main__':
container = Container()
container.init_resources()
container.wire(modules=[__name__])
main()
</code></pre>
 
Top