Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

blosc2.compress hangs when used in a multiprocessing setting (processing pool) #43

Open
JanSellner opened this issue Jul 28, 2022 · 1 comment

Comments

@JanSellner
Copy link
Contributor

I am currently in the process of upgrading from blosc v1 to blosc v2 and facing some issues. I use blosc to compress many binary files (numpy arrays) and I am really impressed by the compression ratio :-)

I don't know what the difference between blosc2.compress and blosc2.compress2 is (except the different API) so I tried both. The compressed file size is the same even though the files are not identical. However, compress is much faster than compress2. Unfortunately, compress hangs when used in a multiprocessing setting. Here is a reproducible:

import multiprocessing
import pickle
import tempfile
from pathlib import Path
from timeit import default_timer

import blosc
import blosc2
import numpy as np


# Just for time measurements, not relevant for the reproducible
class MeasureTime:
    def __init__(self, name: str = "", silent: bool = False):
        """
        Easily measure the time of a Python code block.

        >>> import time
        >>> with MeasureTime() as m:
        ...     time.sleep(1)
        Elapsed time: 0 m and 1.00 s
        >>> round(m.elapsed_seconds)
        1

        Args:
            name: Name which is included in the time info message.
            silent: Whether to print the time info message.
        """
        self.name = name
        self.silent = silent
        self.elapsed_seconds = 0

    def __enter__(self):
        self.start = default_timer()

        return self

    def __exit__(self, exc_type, exc_value, traceback):
        end = default_timer()
        seconds = end - self.start

        if self.name:
            tag = "[" + self.name + "] "
        else:
            tag = ""

        self.elapsed_seconds = seconds

        if not self.silent:
            print("%sElapsed time: %d m and %.2f s" % (tag, seconds // 60, seconds % 60))


def compress_file_v1(path: Path, array: np.ndarray) -> None:
    """
    Compresses the numpy array using blosc (https://github.com/Blosc/c-blosc).

    Args:
        path: The path where the compressed file should be stored.
        array: The array data to store.
    """
    # Based on https://stackoverflow.com/a/56761075
    array = np.ascontiguousarray(array)  # Does nothing if already contiguous (https://stackoverflow.com/a/51457275)

    # A bit ugly, but very fast (http://python-blosc.blosc.org/tutorial.html#compressing-from-a-data-pointer)
    compressed_data = blosc.compress_ptr(
        array.__array_interface__["data"][0],
        array.size,
        array.dtype.itemsize,
        clevel=9,
        cname="zstd",
        shuffle=blosc.SHUFFLE,
    )

    with open(path, "wb") as f:
        pickle.dump((array.shape, array.dtype), f)
        f.write(compressed_data)


def compress_file(path: Path, array: np.ndarray) -> None:
    """
    Compresses the numpy array using blosc2 (https://github.com/Blosc/c-blosc2).

    Args:
        path: The path where the compressed file should be stored.
        array: The array data to store.
    """
    # Based on https://stackoverflow.com/a/56761075
    array = np.ascontiguousarray(array)  # Does nothing if already contiguous (https://stackoverflow.com/a/51457275)

    compressed_data = blosc2.compress(
        array,
        typesize=array.dtype.itemsize,
        clevel=9,
        cname="zstd",
    )

    with open(path, "wb") as f:
        pickle.dump((array.shape, array.dtype), f)
        f.write(compressed_data)


def compress_file2(path: Path, array: np.ndarray) -> None:
    """
    Compresses the numpy array using blosc2 (https://github.com/Blosc/c-blosc2).

    Args:
        path: The path where the compressed file should be stored.
        array: The array data to store.
    """
    # Based on https://stackoverflow.com/a/56761075
    array = np.ascontiguousarray(array)  # Does nothing if already contiguous (https://stackoverflow.com/a/51457275)

    compressed_data = blosc2.compress2(
        array,
        typesize=array.dtype.itemsize,
        clevel=9,
        compcode=blosc2.Codec.ZSTD,
    )

    with open(path, "wb") as f:
        pickle.dump((array.shape, array.dtype), f)
        f.write(compressed_data)

def compress_multi_v1(i):
    np.random.seed(0)
    N = int(1e6)
    arr = np.random.randint(0, 10_000, N)
    compress_file_v1(tmp_dir / f"compress.blosc2", arr)


def compress_multi(i):
    np.random.seed(0)
    N = int(1e6)
    arr = np.random.randint(0, 10_000, N)
    compress_file(tmp_dir / f"compress.blosc2", arr)

def compress_multi2(i):
    np.random.seed(0)
    N = int(1e6)
    arr = np.random.randint(0, 10_000, N)
    compress_file2(tmp_dir / f"compress.blosc2", arr)

if __name__ == "__main__":
    np.random.seed(0)
    N = int(1e6)
    arr = np.random.randint(0, 10_000, N)
    tmp_dir_handle = tempfile.TemporaryDirectory()
    tmp_dir = Path(tmp_dir_handle.name)

    with MeasureTime("compress_v1"):
        compress_file_v1(tmp_dir / f"compress.blosc", arr)
    with MeasureTime("compress"):
        compress_file(tmp_dir / f"compress.blosc2", arr)
    with MeasureTime("compress2"):
        compress_file2(tmp_dir / f"compress2.blosc2", arr)

    for f in sorted(tmp_dir.iterdir()):
        print(f"{f.name}: {f.stat().st_size} Bytes")
    
    pool = multiprocessing.Pool()
    pool.map(compress_multi_v1, [0, 1])
    pool.close()
    pool.join()
    print("Finished with compress_multi_v1 (using compress_file_v1)")

    pool = multiprocessing.Pool()
    pool.map(compress_multi2, [0, 1])
    pool.close()
    pool.join()
    print("Finished with compress_multi2 (using compress_file2)")

    # This code block hangs
    pool = multiprocessing.Pool()
    pool.map(compress_multi, [0, 1])
    pool.close()
    pool.join()
    print("Finished with compress_multi (using compress_file)")

    tmp_dir_handle.cleanup()

Which produces the following output on my machine (Ubuntu 20.04):

[compress_v1] Elapsed time: 0 m and 0.09 s
[compress] Elapsed time: 0 m and 0.05 s
[compress2] Elapsed time: 0 m and 0.11 s
compress.blosc: 1685799 Bytes
compress.blosc2: 1676904 Bytes
compress2.blosc2: 1676904 Bytes
Finished with compress_multi_v1 (using compress_file_v1)
Finished with compress_multi2 (using compress_file2)

The line Finished with compress_multi (using compress_file) does not show up. Compression with blosc v1 works fine and with blosc v2 compress2 also works but compress just does nothing when used with the processing pool.

I'll stick to blosc v1 for now but would of course be cool to upgrade to the new library :-) Do you have any idea what the problem here is?

@FrancescAlted
Copy link
Member

This is a good point. I think we are not doing the correct thing with blosc2.compress, but blosc2.compress2 should be fine. I still need to think a bit more about this, but upfront I don't see a reason why we should not replace blosc2.compress by blosc2.compress2 (and deprecate the blosc2.compress2 name).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants