OiO.lk Community platform!

Oio.lk is an excellent forum for developers, providing a wide range of resources, discussions, and support for those in the developer community. Join oio.lk today to connect with like-minded professionals, share insights, and stay updated on the latest trends and technologies in the development field.
  You need to log in or register to access the solved answers to this problem.
  • You have reached the maximum number of guest views allowed
  • Please register below to remove this limitation

Dask - How to optimize the computation of the first row of each partition in a dask dataframe?

  • Thread starter Thread starter barbarab
  • Start date Start date
B

barbarab

Guest
My overall goal is to read several csv files, do some computation, save them as a parquet database using the partition_on option in the to_parquet function.

I cannot reindex and repartition before saving because of limited memory. When saving, each file will end up being a different partition, and thus a different parquet file. I cannot use the default file name part.0.parquet because I might need to add files to the same directory in the future and they might be as well a part.0.parquet.

I thus want to assign to each parquet file the name of the original csv file it comes from.

To do this, when I first read the csv file I add a column with the file name (--> all rows in each partition will have the same file name). Then I read the first row of each partition (and specifically the column with the original csv file name) and create a list of file names. Then I use the option name_function in the to_parquet function.

I achieve what I wanted, but in this way I have to call a .compute() and this takes very long.

Do you have any idea how I can limit the computation to the first row of each partition?

This is my current code:

Code:
def get_first_element(partition):
    return partition['orig_file_name'].iloc[0]

first_elements = ddf.map_partitions(get_first_element).compute()

def name_function(part_idx):
    return f"{first_elements[part_idx]}.parquet"    

ddf.to_parquet(path=target_directory,
               engine='pyarrow',
               partition_on=['date', 'hour'],
               name_function=name_function,
               write_index=True)

Thank you very much in advance for any suggestion!

Edit This code replicates my problem after @mdurant reply: This code replicates the problem:

Code:
@dask.delayed
def process(file_path):
    df = pd.DataFrame({'col1':[0, 1, 2, 3], 'col2':[4, 5, 6, 7], 'col3':[88, 88, 99, 99]}) # this is read_csv in my code
    file_name = 'aaa'

    df.to_parquet(f'{file_name}.parquet',
             partition_cols=['col3'])

dask.compute(*[process(f) for f in [1]])
<p>My overall goal is to read several csv files, do some computation, save them as a parquet database using the partition_on option in the to_parquet function.</p>
<p>I cannot reindex and repartition before saving because of limited memory. When saving, each file will end up being a different partition, and thus a different parquet file. I cannot use the default file name part.0.parquet because I might need to add files to the same directory in the future and they might be as well a part.0.parquet.</p>
<p><strong>I thus want to assign to each parquet file the name of the original csv file it comes from.</strong></p>
<p>To do this, when I first read the csv file I add a column with the file name (--> all rows in each partition will have the same file name). Then I read the first row of each partition (and specifically the column with the original csv file name) and create a list of file names. Then I use the option name_function in the to_parquet function.</p>
<p>I achieve what I wanted, but in this way I have to call a .compute() and this takes very long.</p>
<p>Do you have any idea how I can limit the computation to the first row of each partition?</p>
<p>This is my current code:</p>
<pre><code>def get_first_element(partition):
return partition['orig_file_name'].iloc[0]

first_elements = ddf.map_partitions(get_first_element).compute()

def name_function(part_idx):
return f"{first_elements[part_idx]}.parquet"

ddf.to_parquet(path=target_directory,
engine='pyarrow',
partition_on=['date', 'hour'],
name_function=name_function,
write_index=True)
</code></pre>
<p>Thank you very much in advance for any suggestion!</p>
<p><strong>Edit</strong>
This code replicates my problem after @mdurant reply:
This code replicates the problem:</p>
<pre><code>@dask.delayed
def process(file_path):
df = pd.DataFrame({'col1':[0, 1, 2, 3], 'col2':[4, 5, 6, 7], 'col3':[88, 88, 99, 99]}) # this is read_csv in my code
file_name = 'aaa'

df.to_parquet(f'{file_name}.parquet',
partition_cols=['col3'])

dask.compute(*[process(f) for f in [1]])
</code></pre>
 

Latest posts

Top