OiO.lk Community platform!

Oio.lk is an excellent forum for developers, providing a wide range of resources, discussions, and support for those in the developer community. Join oio.lk today to connect with like-minded professionals, share insights, and stay updated on the latest trends and technologies in the development field.
  You need to log in or register to access the solved answers to this problem.
  • You have reached the maximum number of guest views allowed
  • Please register below to remove this limitation

Improve Parallelising *Reading, Cropping and Patching* individual .RT-H5 files (using Python **H5py**)

  • Thread starter Thread starter DSeal6
  • Start date Start date
D

DSeal6

Guest
I have a data processing pipeline that is a perfect candidate for parallelisation but I can't seem to get "good" speed ups.

The process that I need to complete is:

  • Read in a .RT-H5 file from a local directory
  • Crop the file with supplied Latitude and Longitude limits
  • Chop each cropped arr into square patches nxn
  • Save the cropped, patched arr back to a .nc file

Each of the files is independent and requires no data sharing.

I currently have concurrent.futures set up that submits the open, crop, patch and save functionality to a ProcessPoolExecutor(). Using concurrent.futures I get a roughly 50% speed up versus non-parallel. In terms of data, each of the .RT-H5 files is simply 1800x3600 pixels and I have 48 per day. Currently, to run a single month ~ 30*48 = 1440 files, it takes 2 minutes (4 min if I don't use concurrent.futures). I have read a few blogs online about parallelising the H5 files with h5py but I can't get anything to give me more improvements. I have tried DASK and also I have tried the ThreadPoolExecutor but they give me no speed up. I feel like it should be easy. The workflow I want is:

  • start N threads that each read in the .RT-H5 file
  • have multiple processes, running the open, crop, patch and save functionality across the threads

Here is my current implementation:

Code:
    # data processing: crop, patch and save.
    tasks = []
    with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
        for _, v in file_mapping.items():
            # generate raw and save file paths.
            # TODO: think of a better way of handling all the paths.
            file_path = os.path.join(
                config["download_dir"], f"{v}{config['raw_file_type']}"
            )
            save_file_path = os.path.join(
                config["download_dir"],
                f"{v}{config['save_file_suffix']}{config['save_file_type']}",
            )

            tasks.append(
                executor.submit(
                    open_crop_patch_save_wrapper,
                    file_path,
                    config["crop"],
                    config["patch_size"],
                    save_file_path,
                    config["save_file_data_attr"],
                )
            )

where the open_crop_patch_save_wrapper() function looks like this:

Code:
def open_crop_patch_save_wrapper(
    file_path: str,
    crop: Dict[str, Tuple[float, float]],
    patch_size: int,
    save_file_path: str,
    h5_data_attr: str,
    ):

    try:
        # 1. crop the file to desired lat/lon lims.
        cropped_arr = create_xarr_from_imerg_hdf5(
            path_to_hdf5=file_path,
            lat_lims=crop["latitude"],
            lon_lims=crop["longitude"]
        )
        # 2. patch the file to (patch_size x patch_size) patches.
        y_max, x_max = cropped_arr.shape
        x_pad = calculate_required_1d_padding(X=x_max, Y=patch_size, frac=0)
        y_pad = calculate_required_1d_padding(X=y_max, Y=patch_size, frac=0)
        patched_arr = create_square_patches_from_2d_arr_using_overlap(
                arr2d=cropped_arr,
                patch_size=PATCH_SIZE,
                x_pad=x_pad,
                y_pad=y_pad,
                flip_pixels=True,
            )
        # 3. save the cropped, patched file.
        with h5py.File(save_file_path, "w") as f:
            f.create_dataset(
                name=h5_data_attr,
                data=patched_arr,
                compression="gzip",
                dtype=patched_arr.dtype,
            )
        return None
    except Exception as e:
        # return file_path if unable to execute (assume file corrupted).
        return file_path

and create_xarr_from_imerg_hdf5 and create_square_patches_from_2d_arr_using_overlap look like this:

Code:
def create_xarr_from_imerg_hdf5(
    path_to_hdf5: str,
    lat_lims: Optional[Tuple[float, float]],
    lon_lims: Optional[Tuple[float, float]],
    imerg_version: str = "7",
) -> xr.DataArray:
    # get precipitation key - changes per version.
    imerg_config = IMERGEarlyRunConfig()
    precipitation_key = imerg_config.raw_data_attr[str(imerg_version)]

    with h5py.File(path_to_hdf5, "r") as h5file:
        lat = h5file["Grid/lat"][:]
        lon = h5file["Grid/lon"][:]
        data = h5file[f"Grid/{precipitation_key}"][0, :, :].T

        xarr = xr.DataArray(
            data,
            dims=["latitude", "longitude"],
            coords={"latitude": lat, "longitude": lon}
            )
        
        # slice the array using latitude and longitude.
        slice_params = {}
        if lat_lims is not None:
            slice_params["latitude"] = slice(*lat_lims)
        if lon_lims is not None:
            slice_params["longitude"] = slice(*lon_lims)

        xarr_cropped = xarr.sel(**slice_params)

    return xarr_cropped

and

Code:
def create_square_patches_from_2d_arr_using_overlap(
    arr2d, patch_size, x_pad, y_pad, flip_pixels: bool = False
):
    # error handling.
    assert len(arr2d.shape) == 2  # 2d.
    y_max, x_max = arr2d.shape
    assert (y_max >= patch_size) and (x_max >= patch_size)  # sufficient input arr dims.

    if flip_pixels:
        # flip pixels so that (0, 0) is the 0th pixel top-left.
        arr2d = np.flipud(arr2d)

    max_rows = int(np.ceil(y_max / patch_size))
    max_cols = int(np.ceil(x_max / patch_size))
    patches = []
    for r in range(max_rows):
        for c in range(max_cols):
            if (r < (max_rows - 1)) and (c < (max_cols - 1)):
                # no overlap.
                patch = arr2d[
                    (r * patch_size) : (r + 1) * patch_size,
                    (c * patch_size) : (c + 1) * patch_size,
                ]
            elif (r < (max_rows - 1)) and (c < (max_cols)):
                # only longitude overlap.
                patch = arr2d[
                    (r * patch_size) : (r + 1) * patch_size,
                    (c * patch_size) - x_pad : (c + 1) * patch_size,
                ]
            elif (r < (max_rows)) and (c < (max_cols - 1)):
                # only latitude overlap.
                patch = arr2d[
                    (r * patch_size) - y_pad : (r + 1) * patch_size,
                    (c * patch_size) : (c + 1) * patch_size,
                ]
            else:
                # overlap in lat and lon.
                patch = arr2d[
                    (r * patch_size) - y_pad : (r + 1) * patch_size,
                    (c * patch_size) - x_pad : (c + 1) * patch_size,
                ]

            patches.append(patch)

    arr_patches = np.array(patches).reshape(
        (max_rows, max_cols, patch_size, patch_size)
    )

    return arr_patches

Expected much quicker speed up with Threading but I think the files could be locked.
<p>I have a data processing pipeline that is a perfect candidate for parallelisation but I can't seem to get "good" speed ups.</p>
<p>The process that I need to complete is:</p>
<ul>
<li>Read in a .RT-H5 file from a local directory</li>
<li>Crop the file with supplied Latitude and Longitude limits</li>
<li>Chop each cropped arr into square patches nxn</li>
<li>Save the cropped, patched arr back to a .nc file</li>
</ul>
<p>Each of the files is independent and requires no data sharing.</p>
<p>I currently have <code>concurrent.futures</code> set up that submits the <code>open, crop, patch and save</code> functionality to a <code>ProcessPoolExecutor()</code>. Using <code>concurrent.futures</code> I get a roughly 50% speed up versus non-parallel. In terms of data, each of the <code>.RT-H5</code> files is simply 1800x3600 pixels and I have 48 per day. Currently, to run a single month ~ 30*48 = 1440 files, it takes 2 minutes (4 min if I don't use <code>concurrent.futures</code>). I have read a few blogs online about parallelising the H5 files with h5py but I can't get anything to give me more improvements. I have tried DASK and also I have tried the <code>ThreadPoolExecutor</code> but they give me no speed up. I feel like it should be easy. The workflow I want is:</p>
<ul>
<li>start N threads that each read in the <code>.RT-H5</code> file</li>
<li>have multiple processes, running the <code>open, crop, patch and save</code> functionality across the threads</li>
</ul>
<p>Here is my current implementation:</p>
<pre><code> # data processing: crop, patch and save.
tasks = []
with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
for _, v in file_mapping.items():
# generate raw and save file paths.
# TODO: think of a better way of handling all the paths.
file_path = os.path.join(
config["download_dir"], f"{v}{config['raw_file_type']}"
)
save_file_path = os.path.join(
config["download_dir"],
f"{v}{config['save_file_suffix']}{config['save_file_type']}",
)

tasks.append(
executor.submit(
open_crop_patch_save_wrapper,
file_path,
config["crop"],
config["patch_size"],
save_file_path,
config["save_file_data_attr"],
)
)
</code></pre>
<p>where the <code>open_crop_patch_save_wrapper()</code> function looks like this:</p>
<pre><code>def open_crop_patch_save_wrapper(
file_path: str,
crop: Dict[str, Tuple[float, float]],
patch_size: int,
save_file_path: str,
h5_data_attr: str,
):

try:
# 1. crop the file to desired lat/lon lims.
cropped_arr = create_xarr_from_imerg_hdf5(
path_to_hdf5=file_path,
lat_lims=crop["latitude"],
lon_lims=crop["longitude"]
)
# 2. patch the file to (patch_size x patch_size) patches.
y_max, x_max = cropped_arr.shape
x_pad = calculate_required_1d_padding(X=x_max, Y=patch_size, frac=0)
y_pad = calculate_required_1d_padding(X=y_max, Y=patch_size, frac=0)
patched_arr = create_square_patches_from_2d_arr_using_overlap(
arr2d=cropped_arr,
patch_size=PATCH_SIZE,
x_pad=x_pad,
y_pad=y_pad,
flip_pixels=True,
)
# 3. save the cropped, patched file.
with h5py.File(save_file_path, "w") as f:
f.create_dataset(
name=h5_data_attr,
data=patched_arr,
compression="gzip",
dtype=patched_arr.dtype,
)
return None
except Exception as e:
# return file_path if unable to execute (assume file corrupted).
return file_path
</code></pre>
<p>and <code>create_xarr_from_imerg_hdf5</code> and <code>create_square_patches_from_2d_arr_using_overlap</code> look like this:</p>
<pre><code>def create_xarr_from_imerg_hdf5(
path_to_hdf5: str,
lat_lims: Optional[Tuple[float, float]],
lon_lims: Optional[Tuple[float, float]],
imerg_version: str = "7",
) -> xr.DataArray:
# get precipitation key - changes per version.
imerg_config = IMERGEarlyRunConfig()
precipitation_key = imerg_config.raw_data_attr[str(imerg_version)]

with h5py.File(path_to_hdf5, "r") as h5file:
lat = h5file["Grid/lat"][:]
lon = h5file["Grid/lon"][:]
data = h5file[f"Grid/{precipitation_key}"][0, :, :].T

xarr = xr.DataArray(
data,
dims=["latitude", "longitude"],
coords={"latitude": lat, "longitude": lon}
)

# slice the array using latitude and longitude.
slice_params = {}
if lat_lims is not None:
slice_params["latitude"] = slice(*lat_lims)
if lon_lims is not None:
slice_params["longitude"] = slice(*lon_lims)

xarr_cropped = xarr.sel(**slice_params)

return xarr_cropped
</code></pre>
<p>and</p>
<pre><code>def create_square_patches_from_2d_arr_using_overlap(
arr2d, patch_size, x_pad, y_pad, flip_pixels: bool = False
):
# error handling.
assert len(arr2d.shape) == 2 # 2d.
y_max, x_max = arr2d.shape
assert (y_max >= patch_size) and (x_max >= patch_size) # sufficient input arr dims.

if flip_pixels:
# flip pixels so that (0, 0) is the 0th pixel top-left.
arr2d = np.flipud(arr2d)

max_rows = int(np.ceil(y_max / patch_size))
max_cols = int(np.ceil(x_max / patch_size))
patches = []
for r in range(max_rows):
for c in range(max_cols):
if (r < (max_rows - 1)) and (c < (max_cols - 1)):
# no overlap.
patch = arr2d[
(r * patch_size) : (r + 1) * patch_size,
(c * patch_size) : (c + 1) * patch_size,
]
elif (r < (max_rows - 1)) and (c < (max_cols)):
# only longitude overlap.
patch = arr2d[
(r * patch_size) : (r + 1) * patch_size,
(c * patch_size) - x_pad : (c + 1) * patch_size,
]
elif (r < (max_rows)) and (c < (max_cols - 1)):
# only latitude overlap.
patch = arr2d[
(r * patch_size) - y_pad : (r + 1) * patch_size,
(c * patch_size) : (c + 1) * patch_size,
]
else:
# overlap in lat and lon.
patch = arr2d[
(r * patch_size) - y_pad : (r + 1) * patch_size,
(c * patch_size) - x_pad : (c + 1) * patch_size,
]

patches.append(patch)

arr_patches = np.array(patches).reshape(
(max_rows, max_cols, patch_size, patch_size)
)

return arr_patches
</code></pre>
<p>Expected much quicker speed up with Threading but I think the files could be locked.</p>
 

Latest posts

J
Replies
0
Views
1
jbowerbir
J
V
Replies
0
Views
1
Vinicius Martin
V
Top