OiO.lk Community platform!

Oio.lk is an excellent forum for developers, providing a wide range of resources, discussions, and support for those in the developer community. Join oio.lk today to connect with like-minded professionals, share insights, and stay updated on the latest trends and technologies in the development field.
  You need to log in or register to access the solved answers to this problem.
  • You have reached the maximum number of guest views allowed
  • Please register below to remove this limitation

luigi Workflows with multiple async calls?

  • Thread starter Thread starter Harper Marchman-Jones
  • Start date Start date
H

Harper Marchman-Jones

Guest
I'm attempting to implement a luigi workflow that interfaces with OneDrive using msgraph in three distinct steps:

  • Retrieve drive items
  • Extract content from drive items
  • Delete drive items

Code:
# simplified for the sake of clarity/brevity

class RegisterTask(luigi.Task):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # custom class that manages targets across tasks
        self.register = TargetRegister(self)

class OneDriveItemsExtractTask(RegisterTask)
    def output(self):
        return self.register.generate_output(PickleTarget)

    def run(self):
        data = OneDriveData(
            "my_connection", 
            "my_parent_directory", 
            ["item1.csv", "item2.csv"],
            "csv"
            )
        items = asyncio.run(get_onedrive_items(data))
        item_ids = [item.id for item in items]
        self.output.write(item_ids)


class OneDriveContentExtractTask(RegisterTask)
    def requires(self):
        return OneDriveItemsExtractTask(**self.param_kwargs)

    def output(self):
        return self.register.generate_output(ParquetTarget)

    def run(self):
        item_ids = self.input().read()
        content = asyncio.run(get_onedrive_content("my_connection", item_ids))


class DeleteOneDriveItemsTask(RegisterTask)
    def requires(self):
        return {
        "item_ids": OneDriveItemsExtractTask(**self.param_kwargs),
        "extract_compete": OneDriveContentExtractTask(**self.param_kwargs),
        }

    def output(self):
        return self.register.generate_output(ParquetTarget)

    def run(self):
        item_ids = self.input()["item_ids"].read()
        asyncio.run(delete_onedrive_items("my_connection", item_ids))

The tasks and workflow pass unit tests when isolated from the msgraph dependency with testing fixtures, both when submitted to the luigi scheduler, and when calling the run() method after instantiation.

Running the tasks outside of the luigi scheduler without testing fixtures also works as expected.

However, running the workflow in integration testing against a real graph endpoint results in RuntimeError: Event loop is closed.

Here's the workaround I came up with, which is suboptimal:

Code:
class OneDriveContentExtractTask(RegisterTask)
    def output(self):
        return self.register.generate_output(ParquetTarget)

    async def _async_operations(self, data:OneDriveData):
        items = await get_onedrive_items(data)
        content = await get_onedrive_content(data, items)

        if self.delete_extracted_items and not content.empty():
            await delete_onedrive_items(data.connection, items)

        return content

    def run(self):
        data = OneDriveData(
            "my_connection", 
            "my_parent_directory", 
            ["item1.csv", "item2.csv"],
            "csv"
            )
        df = asyncio.run(self._async_operations(data))

        self.output().write(df)

This works. It seems what breaks luigi is having multiple async loops in a single workflow. I'd appreciate any insight into how to properly manage multiple async calls in this context. I'm using luigi 3.4.0.
<p>I'm attempting to implement a luigi workflow that interfaces with OneDrive using msgraph in three distinct steps:</p>
<ul>
<li>Retrieve drive items</li>
<li>Extract content from drive items</li>
<li>Delete drive items</li>
</ul>
<pre class="lang-py prettyprint-override"><code># simplified for the sake of clarity/brevity

class RegisterTask(luigi.Task):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# custom class that manages targets across tasks
self.register = TargetRegister(self)

class OneDriveItemsExtractTask(RegisterTask)
def output(self):
return self.register.generate_output(PickleTarget)

def run(self):
data = OneDriveData(
"my_connection",
"my_parent_directory",
["item1.csv", "item2.csv"],
"csv"
)
items = asyncio.run(get_onedrive_items(data))
item_ids = [item.id for item in items]
self.output.write(item_ids)


class OneDriveContentExtractTask(RegisterTask)
def requires(self):
return OneDriveItemsExtractTask(**self.param_kwargs)

def output(self):
return self.register.generate_output(ParquetTarget)

def run(self):
item_ids = self.input().read()
content = asyncio.run(get_onedrive_content("my_connection", item_ids))


class DeleteOneDriveItemsTask(RegisterTask)
def requires(self):
return {
"item_ids": OneDriveItemsExtractTask(**self.param_kwargs),
"extract_compete": OneDriveContentExtractTask(**self.param_kwargs),
}

def output(self):
return self.register.generate_output(ParquetTarget)

def run(self):
item_ids = self.input()["item_ids"].read()
asyncio.run(delete_onedrive_items("my_connection", item_ids))
</code></pre>
<p>The tasks and workflow pass unit tests when isolated from the msgraph dependency with testing fixtures, both when submitted to the luigi scheduler, and when calling the <code>run()</code> method after instantiation.</p>
<p>Running the tasks outside of the luigi scheduler without testing fixtures also works as expected.</p>
<p>However, running the workflow in integration testing against a real graph endpoint results in <code>RuntimeError: Event loop is closed.</code></p>
<p>Here's the workaround I came up with, which is suboptimal:</p>
<pre class="lang-py prettyprint-override"><code>class OneDriveContentExtractTask(RegisterTask)
def output(self):
return self.register.generate_output(ParquetTarget)

async def _async_operations(self, data:OneDriveData):
items = await get_onedrive_items(data)
content = await get_onedrive_content(data, items)

if self.delete_extracted_items and not content.empty():
await delete_onedrive_items(data.connection, items)

return content

def run(self):
data = OneDriveData(
"my_connection",
"my_parent_directory",
["item1.csv", "item2.csv"],
"csv"
)
df = asyncio.run(self._async_operations(data))

self.output().write(df)
</code></pre>
<p>This works. It seems what breaks luigi is having multiple async loops in a single workflow.
I'd appreciate any insight into how to properly manage multiple async calls in this context.
I'm using luigi 3.4.0.</p>
 

Latest posts

Top