# Copyright (c) 2016-present, Gregory Szorc
# All rights reserved.
#
# This software may be modified and distributed under the terms
# of the BSD license. See the LICENSE file for details.

"""Python interface to the Zstandard (zstd) compression library."""

from __future__ import absolute_import, unicode_literals

# This should match what the C extension exports.
__all__ = [
    "BufferSegment",
    "BufferSegments",
    "BufferWithSegments",
    "BufferWithSegmentsCollection",
    "ZstdCompressionChunker",
    "ZstdCompressionDict",
    "ZstdCompressionObj",
    "ZstdCompressionParameters",
    "ZstdCompressionReader",
    "ZstdCompressionWriter",
    "ZstdCompressor",
    "ZstdDecompressionObj",
    "ZstdDecompressionReader",
    "ZstdDecompressionWriter",
    "ZstdDecompressor",
    "ZstdError",
    "FrameParameters",
    "backend_features",
    "estimate_decompression_context_size",
    "frame_content_size",
    "frame_header_size",
    "get_frame_parameters",
    "train_dictionary",
    # Constants.
    "FLUSH_BLOCK",
    "FLUSH_FRAME",
    "COMPRESSOBJ_FLUSH_FINISH",
    "COMPRESSOBJ_FLUSH_BLOCK",
    "ZSTD_VERSION",
    "FRAME_HEADER",
    "CONTENTSIZE_UNKNOWN",
    "CONTENTSIZE_ERROR",
    "MAX_COMPRESSION_LEVEL",
    "COMPRESSION_RECOMMENDED_INPUT_SIZE",
    "COMPRESSION_RECOMMENDED_OUTPUT_SIZE",
    "DECOMPRESSION_RECOMMENDED_INPUT_SIZE",
    "DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE",
    "MAGIC_NUMBER",
    "BLOCKSIZELOG_MAX",
    "BLOCKSIZE_MAX",
    "WINDOWLOG_MIN",
    "WINDOWLOG_MAX",
    "CHAINLOG_MIN",
    "CHAINLOG_MAX",
    "HASHLOG_MIN",
    "HASHLOG_MAX",
    "MINMATCH_MIN",
    "MINMATCH_MAX",
    "SEARCHLOG_MIN",
    "SEARCHLOG_MAX",
    "SEARCHLENGTH_MIN",
    "SEARCHLENGTH_MAX",
    "TARGETLENGTH_MIN",
    "TARGETLENGTH_MAX",
    "LDM_MINMATCH_MIN",
    "LDM_MINMATCH_MAX",
    "LDM_BUCKETSIZELOG_MAX",
    "STRATEGY_FAST",
    "STRATEGY_DFAST",
    "STRATEGY_GREEDY",
    "STRATEGY_LAZY",
    "STRATEGY_LAZY2",
    "STRATEGY_BTLAZY2",
    "STRATEGY_BTOPT",
    "STRATEGY_BTULTRA",
    "STRATEGY_BTULTRA2",
    "DICT_TYPE_AUTO",
    "DICT_TYPE_RAWCONTENT",
    "DICT_TYPE_FULLDICT",
    "FORMAT_ZSTD1",
    "FORMAT_ZSTD1_MAGICLESS",
]

import io
import os

from ._cffi import (  # type: ignore
    ffi,
    lib,
)


backend_features = set()  # type: ignore

COMPRESSION_RECOMMENDED_INPUT_SIZE = lib.ZSTD_CStreamInSize()
COMPRESSION_RECOMMENDED_OUTPUT_SIZE = lib.ZSTD_CStreamOutSize()
DECOMPRESSION_RECOMMENDED_INPUT_SIZE = lib.ZSTD_DStreamInSize()
DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE = lib.ZSTD_DStreamOutSize()

new_nonzero = ffi.new_allocator(should_clear_after_alloc=False)


MAX_COMPRESSION_LEVEL = lib.ZSTD_maxCLevel()
MAGIC_NUMBER = lib.ZSTD_MAGICNUMBER
FRAME_HEADER = b"\x28\xb5\x2f\xfd"
CONTENTSIZE_UNKNOWN = lib.ZSTD_CONTENTSIZE_UNKNOWN
CONTENTSIZE_ERROR = lib.ZSTD_CONTENTSIZE_ERROR
ZSTD_VERSION = (
    lib.ZSTD_VERSION_MAJOR,
    lib.ZSTD_VERSION_MINOR,
    lib.ZSTD_VERSION_RELEASE,
)

BLOCKSIZELOG_MAX = lib.ZSTD_BLOCKSIZELOG_MAX
BLOCKSIZE_MAX = lib.ZSTD_BLOCKSIZE_MAX
WINDOWLOG_MIN = lib.ZSTD_WINDOWLOG_MIN
WINDOWLOG_MAX = lib.ZSTD_WINDOWLOG_MAX
CHAINLOG_MIN = lib.ZSTD_CHAINLOG_MIN
CHAINLOG_MAX = lib.ZSTD_CHAINLOG_MAX
HASHLOG_MIN = lib.ZSTD_HASHLOG_MIN
HASHLOG_MAX = lib.ZSTD_HASHLOG_MAX
MINMATCH_MIN = lib.ZSTD_MINMATCH_MIN
MINMATCH_MAX = lib.ZSTD_MINMATCH_MAX
SEARCHLOG_MIN = lib.ZSTD_SEARCHLOG_MIN
SEARCHLOG_MAX = lib.ZSTD_SEARCHLOG_MAX
SEARCHLENGTH_MIN = lib.ZSTD_MINMATCH_MIN
SEARCHLENGTH_MAX = lib.ZSTD_MINMATCH_MAX
TARGETLENGTH_MIN = lib.ZSTD_TARGETLENGTH_MIN
TARGETLENGTH_MAX = lib.ZSTD_TARGETLENGTH_MAX
LDM_MINMATCH_MIN = lib.ZSTD_LDM_MINMATCH_MIN
LDM_MINMATCH_MAX = lib.ZSTD_LDM_MINMATCH_MAX
LDM_BUCKETSIZELOG_MAX = lib.ZSTD_LDM_BUCKETSIZELOG_MAX

STRATEGY_FAST = lib.ZSTD_fast
STRATEGY_DFAST = lib.ZSTD_dfast
STRATEGY_GREEDY = lib.ZSTD_greedy
STRATEGY_LAZY = lib.ZSTD_lazy
STRATEGY_LAZY2 = lib.ZSTD_lazy2
STRATEGY_BTLAZY2 = lib.ZSTD_btlazy2
STRATEGY_BTOPT = lib.ZSTD_btopt
STRATEGY_BTULTRA = lib.ZSTD_btultra
STRATEGY_BTULTRA2 = lib.ZSTD_btultra2

DICT_TYPE_AUTO = lib.ZSTD_dct_auto
DICT_TYPE_RAWCONTENT = lib.ZSTD_dct_rawContent
DICT_TYPE_FULLDICT = lib.ZSTD_dct_fullDict

FORMAT_ZSTD1 = lib.ZSTD_f_zstd1
FORMAT_ZSTD1_MAGICLESS = lib.ZSTD_f_zstd1_magicless

FLUSH_BLOCK = 0
FLUSH_FRAME = 1

COMPRESSOBJ_FLUSH_FINISH = 0
COMPRESSOBJ_FLUSH_BLOCK = 1


def _cpu_count():
    # os.cpu_count() was introducd in Python 3.4.
    try:
        return os.cpu_count() or 0
    except AttributeError:
        pass

    # Linux.
    try:
        return os.sysconf("SC_NPROCESSORS_ONLN")
    except (AttributeError, ValueError):
        pass

    # TODO implement on other platforms.
    return 0


class BufferSegment:
    """Represents a segment within a ``BufferWithSegments``.

    This type is essentially a reference to N bytes within a
    ``BufferWithSegments``.

    The object conforms to the buffer protocol.
    """

    @property
    def offset(self):
        """The byte offset of this segment within its parent buffer."""
        raise NotImplementedError()

    def __len__(self):
        """Obtain the length of the segment, in bytes."""
        raise NotImplementedError()

    def tobytes(self):
        """Obtain bytes copy of this segment."""
        raise NotImplementedError()


class BufferSegments:
    """Represents an array of ``(offset, length)`` integers.

    This type is effectively an index used by :py:class:`BufferWithSegments`.

    The array members are 64-bit unsigned integers using host/native bit order.

    Instances conform to the buffer protocol.
    """


class BufferWithSegments:
    """A memory buffer containing N discrete items of known lengths.

    This type is essentially a fixed size memory address and an array
    of 2-tuples of ``(offset, length)`` 64-bit unsigned native-endian
    integers defining the byte offset and length of each segment within
    the buffer.

    Instances behave like containers.

    Instances also conform to the buffer protocol. So a reference to the
    backing bytes can be obtained via ``memoryview(o)``. A *copy* of the
    backing bytes can be obtained via ``.tobytes()``.

    This type exists to facilitate operations against N>1 items without
    the overhead of Python object creation and management. Used with
    APIs like :py:meth:`ZstdDecompressor.multi_decompress_to_buffer`, it
    is possible to decompress many objects in parallel without the GIL
    held, leading to even better performance.
    """

    @property
    def size(self):
        """Total sizein bytes of the backing buffer."""
        raise NotImplementedError()

    def __len__(self):
        raise NotImplementedError()

    def __getitem__(self, i):
        """Obtains a segment within the buffer.

        The returned object references memory within this buffer.

        :param i:
           Integer index of segment to retrieve.
        :return:
           :py:class:`BufferSegment`
        """
        raise NotImplementedError()

    def segments(self):
        """Obtain the array of ``(offset, length)`` segments in the buffer.

        :return:
           :py:class:`BufferSegments`
        """
        raise NotImplementedError()

    def tobytes(self):
        """Obtain bytes copy of this instance."""
        raise NotImplementedError()


class BufferWithSegmentsCollection:
    """A virtual spanning view over multiple BufferWithSegments.

    Instances are constructed from 1 or more :py:class:`BufferWithSegments`
    instances. The resulting object behaves like an ordered sequence whose
    members are the segments within each ``BufferWithSegments``.

    If the object is composed of 2 ``BufferWithSegments`` instances with the
    first having 2 segments and the second have 3 segments, then ``b[0]``
    and ``b[1]`` access segments in the first object and ``b[2]``, ``b[3]``,
    and ``b[4]`` access segments from the second.
    """

    def __len__(self):
        """The number of segments within all ``BufferWithSegments``."""
        raise NotImplementedError()

    def __getitem__(self, i):
        """Obtain the ``BufferSegment`` at an offset."""
        raise NotImplementedError()


class ZstdError(Exception):
    pass


def _zstd_error(zresult):
    # Resolves to bytes on Python 2 and 3. We use the string for formatting
    # into error messages, which will be literal unicode. So convert it to
    # unicode.
    return ffi.string(lib.ZSTD_getErrorName(zresult)).decode("utf-8")


def _make_cctx_params(params):
    res = lib.ZSTD_createCCtxParams()
    if res == ffi.NULL:
        raise MemoryError()

    res = ffi.gc(res, lib.ZSTD_freeCCtxParams)

    attrs = [
        (lib.ZSTD_c_format, params.format),
        (lib.ZSTD_c_compressionLevel, params.compression_level),
        (lib.ZSTD_c_windowLog, params.window_log),
        (lib.ZSTD_c_hashLog, params.hash_log),
        (lib.ZSTD_c_chainLog, params.chain_log),
        (lib.ZSTD_c_searchLog, params.search_log),
        (lib.ZSTD_c_minMatch, params.min_match),
        (lib.ZSTD_c_targetLength, params.target_length),
        (lib.ZSTD_c_strategy, params.strategy),
        (lib.ZSTD_c_contentSizeFlag, params.write_content_size),
        (lib.ZSTD_c_checksumFlag, params.write_checksum),
        (lib.ZSTD_c_dictIDFlag, params.write_dict_id),
        (lib.ZSTD_c_nbWorkers, params.threads),
        (lib.ZSTD_c_jobSize, params.job_size),
        (lib.ZSTD_c_overlapLog, params.overlap_log),
        (lib.ZSTD_c_forceMaxWindow, params.force_max_window),
        (lib.ZSTD_c_enableLongDistanceMatching, params.enable_ldm),
        (lib.ZSTD_c_ldmHashLog, params.ldm_hash_log),
        (lib.ZSTD_c_ldmMinMatch, params.ldm_min_match),
        (lib.ZSTD_c_ldmBucketSizeLog, params.ldm_bucket_size_log),
        (lib.ZSTD_c_ldmHashRateLog, params.ldm_hash_rate_log),
    ]

    for param, value in attrs:
        _set_compression_parameter(res, param, value)

    return res


class ZstdCompressionParameters(object):
    """Low-level zstd compression parameters.

    This type represents a collection of parameters to control how zstd
    compression is performed.

    Instances can be constructed from raw parameters or derived from a
    base set of defaults specified from a compression level (recommended)
    via :py:meth:`ZstdCompressionParameters.from_level`.

    >>> # Derive compression settings for compression level 7.
    >>> params = zstandard.ZstdCompressionParameters.from_level(7)

    >>> # With an input size of 1MB
    >>> params = zstandard.ZstdCompressionParameters.from_level(7, source_size=1048576)

    Using ``from_level()``, it is also possible to override individual compression
    parameters or to define additional settings that aren't automatically derived.
    e.g.:

    >>> params = zstandard.ZstdCompressionParameters.from_level(4, window_log=10)
    >>> params = zstandard.ZstdCompressionParameters.from_level(5, threads=4)

    Or you can define low-level compression settings directly:

    >>> params = zstandard.ZstdCompressionParameters(window_log=12, enable_ldm=True)

    Once a ``ZstdCompressionParameters`` instance is obtained, it can be used to
    configure a compressor:

    >>> cctx = zstandard.ZstdCompressor(compression_params=params)

    Some of these are very low-level settings. It may help to consult the official
    zstandard documentation for their behavior. Look for the ``ZSTD_p_*`` constants
    in ``zstd.h`` (https://github.com/facebook/zstd/blob/dev/lib/zstd.h).
    """

    @staticmethod
    def from_level(level, source_size=0, dict_size=0, **kwargs):
        """Create compression parameters from a compression level.

        :param level:
           Integer compression level.
        :param source_size:
           Integer size in bytes of source to be compressed.
        :param dict_size:
           Integer size in bytes of compression dictionary to use.
        :return:
           :py:class:`ZstdCompressionParameters`
        """
        params = lib.ZSTD_getCParams(level, source_size, dict_size)

        args = {
            "window_log": "windowLog",
            "chain_log": "chainLog",
            "hash_log": "hashLog",
            "search_log": "searchLog",
            "min_match": "minMatch",
            "target_length": "targetLength",
            "strategy": "strategy",
        }

        for arg, attr in args.items():
            if arg not in kwargs:
                kwargs[arg] = getattr(params, attr)

        return ZstdCompressionParameters(**kwargs)

    def __init__(
        self,
        format=0,
        compression_level=0,
        window_log=0,
        hash_log=0,
        chain_log=0,
        search_log=0,
        min_match=0,
        target_length=0,
        strategy=-1,
        write_content_size=1,
        write_checksum=0,
        write_dict_id=0,
        job_size=0,
        overlap_log=-1,
        force_max_window=0,
        enable_ldm=0,
        ldm_hash_log=0,
        ldm_min_match=0,
        ldm_bucket_size_log=0,
        ldm_hash_rate_log=-1,
        threads=0,
    ):

        params = lib.ZSTD_createCCtxParams()
        if params == ffi.NULL:
            raise MemoryError()

        params = ffi.gc(params, lib.ZSTD_freeCCtxParams)

        self._params = params

        if threads < 0:
            threads = _cpu_count()

        # We need to set ZSTD_c_nbWorkers before ZSTD_c_jobSize and ZSTD_c_overlapLog
        # because setting ZSTD_c_nbWorkers resets the other parameters.
        _set_compression_parameter(params, lib.ZSTD_c_nbWorkers, threads)

        _set_compression_parameter(params, lib.ZSTD_c_format, format)
        _set_compression_parameter(
            params, lib.ZSTD_c_compressionLevel, compression_level
        )
        _set_compression_parameter(params, lib.ZSTD_c_windowLog, window_log)
        _set_compression_parameter(params, lib.ZSTD_c_hashLog, hash_log)
        _set_compression_parameter(params, lib.ZSTD_c_chainLog, chain_log)
        _set_compression_parameter(params, lib.ZSTD_c_searchLog, search_log)
        _set_compression_parameter(params, lib.ZSTD_c_minMatch, min_match)
        _set_compression_parameter(
            params, lib.ZSTD_c_targetLength, target_length
        )

        if strategy == -1:
            strategy = 0

        _set_compression_parameter(params, lib.ZSTD_c_strategy, strategy)
        _set_compression_parameter(
            params, lib.ZSTD_c_contentSizeFlag, write_content_size
        )
        _set_compression_parameter(
            params, lib.ZSTD_c_checksumFlag, write_checksum
        )
        _set_compression_parameter(params, lib.ZSTD_c_dictIDFlag, write_dict_id)
        _set_compression_parameter(params, lib.ZSTD_c_jobSize, job_size)

        if overlap_log == -1:
            overlap_log = 0

        _set_compression_parameter(params, lib.ZSTD_c_overlapLog, overlap_log)
        _set_compression_parameter(
            params, lib.ZSTD_c_forceMaxWindow, force_max_window
        )
        _set_compression_parameter(
            params, lib.ZSTD_c_enableLongDistanceMatching, enable_ldm
        )
        _set_compression_parameter(params, lib.ZSTD_c_ldmHashLog, ldm_hash_log)
        _set_compression_parameter(
            params, lib.ZSTD_c_ldmMinMatch, ldm_min_match
        )
        _set_compression_parameter(
            params, lib.ZSTD_c_ldmBucketSizeLog, ldm_bucket_size_log
        )

        if ldm_hash_rate_log == -1:
            ldm_hash_rate_log = 0

        _set_compression_parameter(
            params, lib.ZSTD_c_ldmHashRateLog, ldm_hash_rate_log
        )

    @property
    def format(self):
        return _get_compression_parameter(self._params, lib.ZSTD_c_format)

    @property
    def compression_level(self):
        return _get_compression_parameter(
            self._params, lib.ZSTD_c_compressionLevel
        )

    @property
    def window_log(self):
        return _get_compression_parameter(self._params, lib.ZSTD_c_windowLog)

    @property
    def hash_log(self):
        return _get_compression_parameter(self._params, lib.ZSTD_c_hashLog)

    @property
    def chain_log(self):
        return _get_compression_parameter(self._params, lib.ZSTD_c_chainLog)

    @property
    def search_log(self):
        return _get_compression_parameter(self._params, lib.ZSTD_c_searchLog)

    @property
    def min_match(self):
        return _get_compression_parameter(self._params, lib.ZSTD_c_minMatch)

    @property
    def target_length(self):
        return _get_compression_parameter(self._params, lib.ZSTD_c_targetLength)

    @property
    def strategy(self):
        return _get_compression_parameter(self._params, lib.ZSTD_c_strategy)

    @property
    def write_content_size(self):
        return _get_compression_parameter(
            self._params, lib.ZSTD_c_contentSizeFlag
        )

    @property
    def write_checksum(self):
        return _get_compression_parameter(self._params, lib.ZSTD_c_checksumFlag)

    @property
    def write_dict_id(self):
        return _get_compression_parameter(self._params, lib.ZSTD_c_dictIDFlag)

    @property
    def job_size(self):
        return _get_compression_parameter(self._params, lib.ZSTD_c_jobSize)

    @property
    def overlap_log(self):
        return _get_compression_parameter(self._params, lib.ZSTD_c_overlapLog)

    @property
    def force_max_window(self):
        return _get_compression_parameter(
            self._params, lib.ZSTD_c_forceMaxWindow
        )

    @property
    def enable_ldm(self):
        return _get_compression_parameter(
            self._params, lib.ZSTD_c_enableLongDistanceMatching
        )

    @property
    def ldm_hash_log(self):
        return _get_compression_parameter(self._params, lib.ZSTD_c_ldmHashLog)

    @property
    def ldm_min_match(self):
        return _get_compression_parameter(self._params, lib.ZSTD_c_ldmMinMatch)

    @property
    def ldm_bucket_size_log(self):
        return _get_compression_parameter(
            self._params, lib.ZSTD_c_ldmBucketSizeLog
        )

    @property
    def ldm_hash_rate_log(self):
        return _get_compression_parameter(
            self._params, lib.ZSTD_c_ldmHashRateLog
        )

    @property
    def threads(self):
        return _get_compression_parameter(self._params, lib.ZSTD_c_nbWorkers)

    def estimated_compression_context_size(self):
        """Estimated size in bytes needed to compress with these parameters."""
        return lib.ZSTD_estimateCCtxSize_usingCCtxParams(self._params)


def estimate_decompression_context_size():
    """Estimate the memory size requirements for a decompressor instance.

    :return:
       Integer number of bytes.
    """
    return lib.ZSTD_estimateDCtxSize()


def _set_compression_parameter(params, param, value):
    zresult = lib.ZSTD_CCtxParams_setParameter(params, param, value)
    if lib.ZSTD_isError(zresult):
        raise ZstdError(
            "unable to set compression context parameter: %s"
            % _zstd_error(zresult)
        )


def _get_compression_parameter(params, param):
    result = ffi.new("int *")

    zresult = lib.ZSTD_CCtxParams_getParameter(params, param, result)
    if lib.ZSTD_isError(zresult):
        raise ZstdError(
            "unable to get compression context parameter: %s"
            % _zstd_error(zresult)
        )

    return result[0]


class ZstdCompressionWriter(object):
    """Writable compressing stream wrapper.

    ``ZstdCompressionWriter`` is a write-only stream interface for writing
    compressed data to another stream.

    This type conforms to the ``io.RawIOBase`` interface and should be usable
    by any type that operates against a *file-object* (``typing.BinaryIO``
    in Python type hinting speak). Only methods that involve writing will do
    useful things.

    As data is written to this stream (e.g. via ``write()``), that data
    is sent to the compressor. As compressed data becomes available from
    the compressor, it is sent to the underlying stream by calling its
    ``write()`` method.

    Both ``write()`` and ``flush()`` return the number of bytes written to the
    object's ``write()``. In many cases, small inputs do not accumulate enough
    data to cause a write and ``write()`` will return ``0``.

    Calling ``close()`` will mark the stream as closed and subsequent I/O
    operations will raise ``ValueError`` (per the documented behavior of
    ``io.RawIOBase``). ``close()`` will also call ``close()`` on the underlying
    stream if such a method exists and the instance was constructed with
    ``closefd=True``

    Instances are obtained by calling :py:meth:`ZstdCompressor.stream_writer`.

    Typically usage is as follows:

    >>> cctx = zstandard.ZstdCompressor(level=10)
    >>> compressor = cctx.stream_writer(fh)
    >>> compressor.write(b"chunk 0\\n")
    >>> compressor.write(b"chunk 1\\n")
    >>> compressor.flush()
    >>> # Receiver will be able to decode ``chunk 0\\nchunk 1\\n`` at this point.
    >>> # Receiver is also expecting more data in the zstd *frame*.
    >>>
    >>> compressor.write(b"chunk 2\\n")
    >>> compressor.flush(zstandard.FLUSH_FRAME)
    >>> # Receiver will be able to decode ``chunk 0\\nchunk 1\\nchunk 2``.
    >>> # Receiver is expecting no more data, as the zstd frame is closed.
    >>> # Any future calls to ``write()`` at this point will construct a new
    >>> # zstd frame.

    Instances can be used as context managers. Exiting the context manager is
    the equivalent of calling ``close()``, which is equivalent to calling
    ``flush(zstandard.FLUSH_FRAME)``:

    >>> cctx = zstandard.ZstdCompressor(level=10)
    >>> with cctx.stream_writer(fh) as compressor:
    ...     compressor.write(b'chunk 0')
    ...     compressor.write(b'chunk 1')
    ...     ...

    .. important::

       If ``flush(FLUSH_FRAME)`` is not called, emitted data doesn't
       constitute a full zstd *frame* and consumers of this data may complain
       about malformed input. It is recommended to use instances as a context
       manager to ensure *frames* are properly finished.

    If the size of the data being fed to this streaming compressor is known,
    you can declare it before compression begins:

    >>> cctx = zstandard.ZstdCompressor()
    >>> with cctx.stream_writer(fh, size=data_len) as compressor:
    ...     compressor.write(chunk0)
    ...     compressor.write(chunk1)
    ...     ...

    Declaring the size of the source data allows compression parameters to
    be tuned. And if ``write_content_size`` is used, it also results in the
    content size being written into the frame header of the output data.

    The size of chunks being ``write()`` to the destination can be specified:

    >>> cctx = zstandard.ZstdCompressor()
    >>> with cctx.stream_writer(fh, write_size=32768) as compressor:
    ...     ...

    To see how much memory is being used by the streaming compressor:

    >>> cctx = zstandard.ZstdCompressor()
    >>> with cctx.stream_writer(fh) as compressor:
    ...     ...
    ...     byte_size = compressor.memory_size()

    Thte total number of bytes written so far are exposed via ``tell()``:

    >>> cctx = zstandard.ZstdCompressor()
    >>> with cctx.stream_writer(fh) as compressor:
    ...     ...
    ...     total_written = compressor.tell()

    ``stream_writer()`` accepts a ``write_return_read`` boolean argument to
    control the return value of ``write()``. When ``False`` (the default),
    ``write()`` returns the number of bytes that were ``write()``'en to the
    underlying object. When ``True``, ``write()`` returns the number of bytes
    read from the input that were subsequently written to the compressor.
    ``True`` is the *proper* behavior for ``write()`` as specified by the
    ``io.RawIOBase`` interface and will become the default value in a future
    release.
    """

    def __init__(
        self,
        compressor,
        writer,
        source_size,
        write_size,
        write_return_read,
        closefd=True,
    ):
        self._compressor = compressor
        self._writer = writer
        self._write_size = write_size
        self._write_return_read = bool(write_return_read)
        self._closefd = bool(closefd)
        self._entered = False
        self._closing = False
        self._closed = False
        self._bytes_compressed = 0

        self._dst_buffer = ffi.new("char[]", write_size)
        self._out_buffer = ffi.new("ZSTD_outBuffer *")
        self._out_buffer.dst = self._dst_buffer
        self._out_buffer.size = len(self._dst_buffer)
        self._out_buffer.pos = 0

        zresult = lib.ZSTD_CCtx_setPledgedSrcSize(compressor._cctx, source_size)
        if lib.ZSTD_isError(zresult):
            raise ZstdError(
                "error setting source size: %s" % _zstd_error(zresult)
            )

    def __enter__(self):
        if self._closed:
            raise ValueError("stream is closed")

        if self._entered:
            raise ZstdError("cannot __enter__ multiple times")

        self._entered = True
        return self

    def __exit__(self, exc_type, exc_value, exc_tb):
        self._entered = False
        self.close()
        self._compressor = None

        return False

    def __iter__(self):
        raise io.UnsupportedOperation()

    def __next__(self):
        raise io.UnsupportedOperation()

    def memory_size(self):
        return lib.ZSTD_sizeof_CCtx(self._compressor._cctx)

    def fileno(self):
        f = getattr(self._writer, "fileno", None)
        if f:
            return f()
        else:
            raise OSError("fileno not available on underlying writer")

    def close(self):
        if self._closed:
            return

        try:
            self._closing = True
            self.flush(FLUSH_FRAME)
        finally:
            self._closing = False
            self._closed = True

        # Call close() on underlying stream as well.
        f = getattr(self._writer, "close", None)
        if self._closefd and f:
            f()

    @property
    def closed(self):
        return self._closed

    def isatty(self):
        return False

    def readable(self):
        return False

    def readline(self, size=-1):
        raise io.UnsupportedOperation()

    def readlines(self, hint=-1):
        raise io.UnsupportedOperation()

    def seek(self, offset, whence=None):
        raise io.UnsupportedOperation()

    def seekable(self):
        return False

    def truncate(self, size=None):
        raise io.UnsupportedOperation()

    def writable(self):
        return True

    def writelines(self, lines):
        raise NotImplementedError("writelines() is not yet implemented")

    def read(self, size=-1):
        raise io.UnsupportedOperation()

    def readall(self):
        raise io.UnsupportedOperation()

    def readinto(self, b):
        raise io.UnsupportedOperation()

    def write(self, data):
        """Send data to the compressor and possibly to the inner stream."""
        if self._closed:
            raise ValueError("stream is closed")

        total_write = 0

        data_buffer = ffi.from_buffer(data)

        in_buffer = ffi.new("ZSTD_inBuffer *")
        in_buffer.src = data_buffer
        in_buffer.size = len(data_buffer)
        in_buffer.pos = 0

        out_buffer = self._out_buffer
        out_buffer.pos = 0

        while in_buffer.pos < in_buffer.size:
            zresult = lib.ZSTD_compressStream2(
                self._compressor._cctx,
                out_buffer,
                in_buffer,
                lib.ZSTD_e_continue,
            )
            if lib.ZSTD_isError(zresult):
                raise ZstdError(
                    "zstd compress error: %s" % _zstd_error(zresult)
                )

            if out_buffer.pos:
                self._writer.write(
                    ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
                )
                total_write += out_buffer.pos
                self._bytes_compressed += out_buffer.pos
                out_buffer.pos = 0

        if self._write_return_read:
            return in_buffer.pos
        else:
            return total_write

    def flush(self, flush_mode=FLUSH_BLOCK):
        """Evict data from compressor's internal state and write it to inner stream.

        Calling this method may result in 0 or more ``write()`` calls to the
        inner stream.

        This method will also call ``flush()`` on the inner stream, if such a
        method exists.

        :param flush_mode:
           How to flush the zstd compressor.

           ``zstandard.FLUSH_BLOCK`` will flush data already sent to the
           compressor but not emitted to the inner stream. The stream is still
           writable after calling this. This is the default behavior.

           See documentation for other ``zstandard.FLUSH_*`` constants for more
           flushing options.
        :return:
           Integer number of bytes written to the inner stream.
        """

        if flush_mode == FLUSH_BLOCK:
            flush = lib.ZSTD_e_flush
        elif flush_mode == FLUSH_FRAME:
            flush = lib.ZSTD_e_end
        else:
            raise ValueError("unknown flush_mode: %r" % flush_mode)

        if self._closed:
            raise ValueError("stream is closed")

        total_write = 0

        out_buffer = self._out_buffer
        out_buffer.pos = 0

        in_buffer = ffi.new("ZSTD_inBuffer *")
        in_buffer.src = ffi.NULL
        in_buffer.size = 0
        in_buffer.pos = 0

        while True:
            zresult = lib.ZSTD_compressStream2(
                self._compressor._cctx, out_buffer, in_buffer, flush
            )
            if lib.ZSTD_isError(zresult):
                raise ZstdError(
                    "zstd compress error: %s" % _zstd_error(zresult)
                )

            if out_buffer.pos:
                self._writer.write(
                    ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
                )
                total_write += out_buffer.pos
                self._bytes_compressed += out_buffer.pos
                out_buffer.pos = 0

            if not zresult:
                break

        f = getattr(self._writer, "flush", None)
        if f and not self._closing:
            f()

        return total_write

    def tell(self):
        return self._bytes_compressed


class ZstdCompressionObj(object):
    """A compressor conforming to the API in Python's standard library.

    This type implements an API similar to compression types in Python's
    standard library such as ``zlib.compressobj`` and ``bz2.BZ2Compressor``.
    This enables existing code targeting the standard library API to swap
    in this type to achieve zstd compression.

    .. important::

       The design of this API is not ideal for optimal performance.

       The reason performance is not optimal is because the API is limited to
       returning a single buffer holding compressed data. When compressing
       data, we don't know how much data will be emitted. So in order to
       capture all this data in a single buffer, we need to perform buffer
       reallocations and/or extra memory copies. This can add significant
       overhead depending on the size or nature of the compressed data how
       much your application calls this type.

       If performance is critical, consider an API like
       :py:meth:`ZstdCompressor.stream_reader`,
       :py:meth:`ZstdCompressor.stream_writer`,
       :py:meth:`ZstdCompressor.chunker`, or
       :py:meth:`ZstdCompressor.read_to_iter`, which result in less overhead
       managing buffers.

    Instances are obtained by calling :py:meth:`ZstdCompressor.compressobj`.

    Here is how this API should be used:

    >>> cctx = zstandard.ZstdCompressor()
    >>> cobj = cctx.compressobj()
    >>> data = cobj.compress(b"raw input 0")
    >>> data = cobj.compress(b"raw input 1")
    >>> data = cobj.flush()

    Or to flush blocks:

    >>> cctx.zstandard.ZstdCompressor()
    >>> cobj = cctx.compressobj()
    >>> data = cobj.compress(b"chunk in first block")
    >>> data = cobj.flush(zstandard.COMPRESSOBJ_FLUSH_BLOCK)
    >>> data = cobj.compress(b"chunk in second block")
    >>> data = cobj.flush()

    For best performance results, keep input chunks under 256KB. This avoids
    extra allocations for a large output object.

    It is possible to declare the input size of the data that will be fed
    into the compressor:

    >>> cctx = zstandard.ZstdCompressor()
    >>> cobj = cctx.compressobj(size=6)
    >>> data = cobj.compress(b"foobar")
    >>> data = cobj.flush()
    """

    def compress(self, data):
        """Send data to the compressor.

        This method receives bytes to feed to the compressor and returns
        bytes constituting zstd compressed data.

        The zstd compressor accumulates bytes and the returned bytes may be
        substantially smaller or larger than the size of the input data on
        any given call. The returned value may be the empty byte string
        (``b""``).

        :param data:
           Data to write to the compressor.
        :return:
           Compressed data.
        """
        if self._finished:
            raise ZstdError("cannot call compress() after compressor finished")

        data_buffer = ffi.from_buffer(data)
        source = ffi.new("ZSTD_inBuffer *")
        source.src = data_buffer
        source.size = len(data_buffer)
        source.pos = 0

        chunks = []

        while source.pos < len(data):
            zresult = lib.ZSTD_compressStream2(
                self._compressor._cctx, self._out, source, lib.ZSTD_e_continue
            )
            if lib.ZSTD_isError(zresult):
                raise ZstdError(
                    "zstd compress error: %s" % _zstd_error(zresult)
                )

            if self._out.pos:
                chunks.append(ffi.buffer(self._out.dst, self._out.pos)[:])
                self._out.pos = 0

        return b"".join(chunks)

    def flush(self, flush_mode=COMPRESSOBJ_FLUSH_FINISH):
        """Emit data accumulated in the compressor that hasn't been outputted yet.

        The ``flush_mode`` argument controls how to end the stream.

        ``zstandard.COMPRESSOBJ_FLUSH_FINISH`` (the default) ends the
        compression stream and finishes a zstd frame. Once this type of flush
        is performed, ``compress()`` and ``flush()`` can no longer be called.
        This type of flush **must** be called to end the compression context. If
        not called, the emitted data may be incomplete and may not be readable
        by a decompressor.

        ``zstandard.COMPRESSOBJ_FLUSH_BLOCK`` will flush a zstd block. This
        ensures that all data fed to this instance will have been omitted and
        can be decoded by a decompressor. Flushes of this type can be performed
        multiple times. The next call to ``compress()`` will begin a new zstd
        block.

        :param flush_mode:
           How to flush the zstd compressor.
        :return:
           Compressed data.
        """
        if flush_mode not in (
            COMPRESSOBJ_FLUSH_FINISH,
            COMPRESSOBJ_FLUSH_BLOCK,
        ):
            raise ValueError("flush mode not recognized")

        if self._finished:
            raise ZstdError("compressor object already finished")

        if flush_mode == COMPRESSOBJ_FLUSH_BLOCK:
            z_flush_mode = lib.ZSTD_e_flush
        elif flush_mode == COMPRESSOBJ_FLUSH_FINISH:
            z_flush_mode = lib.ZSTD_e_end
            self._finished = True
        else:
            raise ZstdError("unhandled flush mode")

        assert self._out.pos == 0

        in_buffer = ffi.new("ZSTD_inBuffer *")
        in_buffer.src = ffi.NULL
        in_buffer.size = 0
        in_buffer.pos = 0

        chunks = []

        while True:
            zresult = lib.ZSTD_compressStream2(
                self._compressor._cctx, self._out, in_buffer, z_flush_mode
            )
            if lib.ZSTD_isError(zresult):
                raise ZstdError(
                    "error ending compression stream: %s" % _zstd_error(zresult)
                )

            if self._out.pos:
                chunks.append(ffi.buffer(self._out.dst, self._out.pos)[:])
                self._out.pos = 0

            if not zresult:
                break

        return b"".join(chunks)


class ZstdCompressionChunker(object):
    """Compress data to uniformly sized chunks.

    This type allows you to iteratively feed chunks of data into a compressor
    and produce output chunks of uniform size.

    ``compress()``, ``flush()``, and ``finish()`` all return an iterator of
    ``bytes`` instances holding compressed data. The iterator may be empty.
    Callers MUST iterate through all elements of the returned iterator before
    performing another operation on the object or else the compressor's
    internal state may become confused. This can result in an exception being
    raised or malformed data being emitted.

    All chunks emitted by ``compress()`` will have a length of the configured
    chunk size.

    ``flush()`` and ``finish()`` may return a final chunk smaller than
    the configured chunk size.

    Instances are obtained by calling :py:meth:`ZstdCompressor.chunker`.

    Here is how the API should be used:

    >>> cctx = zstandard.ZstdCompressor()
    >>> chunker = cctx.chunker(chunk_size=32768)
    >>>
    >>> with open(path, 'rb') as fh:
    ...     while True:
    ...         in_chunk = fh.read(32768)
    ...         if not in_chunk:
    ...             break
    ...
    ...         for out_chunk in chunker.compress(in_chunk):
    ...             # Do something with output chunk of size 32768.
    ...
    ...     for out_chunk in chunker.finish():
    ...         # Do something with output chunks that finalize the zstd frame.

    This compressor type is often a better alternative to
    :py:class:`ZstdCompressor.compressobj` because it has better performance
    properties.

    ``compressobj()`` will emit output data as it is available. This results
    in a *stream* of output chunks of varying sizes. The consistency of the
    output chunk size with ``chunker()`` is more appropriate for many usages,
    such as sending compressed data to a socket.

    ``compressobj()`` may also perform extra memory reallocations in order
    to dynamically adjust the sizes of the output chunks. Since ``chunker()``
    output chunks are all the same size (except for flushed or final chunks),
    there is less memory allocation/copying overhead.
    """

    def __init__(self, compressor, chunk_size):
        self._compressor = compressor
        self._out = ffi.new("ZSTD_outBuffer *")
        self._dst_buffer = ffi.new("char[]", chunk_size)
        self._out.dst = self._dst_buffer
        self._out.size = chunk_size
        self._out.pos = 0

        self._in = ffi.new("ZSTD_inBuffer *")
        self._in.src = ffi.NULL
        self._in.size = 0
        self._in.pos = 0
        self._finished = False

    def compress(self, data):
        """Feed new input data into the compressor.

        :param data:
           Data to feed to compressor.
        :return:
           Iterator of ``bytes`` representing chunks of compressed data.
        """
        if self._finished:
            raise ZstdError("cannot call compress() after compression finished")

        if self._in.src != ffi.NULL:
            raise ZstdError(
                "cannot perform operation before consuming output "
                "from previous operation"
            )

        data_buffer = ffi.from_buffer(data)

        if not len(data_buffer):
            return

        self._in.src = data_buffer
        self._in.size = len(data_buffer)
        self._in.pos = 0

        while self._in.pos < self._in.size:
            zresult = lib.ZSTD_compressStream2(
                self._compressor._cctx, self._out, self._in, lib.ZSTD_e_continue
            )

            if self._in.pos == self._in.size:
                self._in.src = ffi.NULL
                self._in.size = 0
                self._in.pos = 0

            if lib.ZSTD_isError(zresult):
                raise ZstdError(
                    "zstd compress error: %s" % _zstd_error(zresult)
                )

            if self._out.pos == self._out.size:
                yield ffi.buffer(self._out.dst, self._out.pos)[:]
                self._out.pos = 0

    def flush(self):
        """Flushes all data currently in the compressor.

        :return:
           Iterator of ``bytes`` of compressed data.
        """
        if self._finished:
            raise ZstdError("cannot call flush() after compression finished")

        if self._in.src != ffi.NULL:
            raise ZstdError(
                "cannot call flush() before consuming output from "
                "previous operation"
            )

        while True:
            zresult = lib.ZSTD_compressStream2(
                self._compressor._cctx, self._out, self._in, lib.ZSTD_e_flush
            )
            if lib.ZSTD_isError(zresult):
                raise ZstdError(
                    "zstd compress error: %s" % _zstd_error(zresult)
                )

            if self._out.pos:
                yield ffi.buffer(self._out.dst, self._out.pos)[:]
                self._out.pos = 0

            if not zresult:
                return

    def finish(self):
        """Signals the end of input data.

        No new data can be compressed after this method is called.

        This method will flush buffered data and finish the zstd frame.

        :return:
           Iterator of ``bytes`` of compressed data.
        """
        if self._finished:
            raise ZstdError("cannot call finish() after compression finished")

        if self._in.src != ffi.NULL:
            raise ZstdError(
                "cannot call finish() before consuming output from "
                "previous operation"
            )

        while True:
            zresult = lib.ZSTD_compressStream2(
                self._compressor._cctx, self._out, self._in, lib.ZSTD_e_end
            )
            if lib.ZSTD_isError(zresult):
                raise ZstdError(
                    "zstd compress error: %s" % _zstd_error(zresult)
                )

            if self._out.pos:
                yield ffi.buffer(self._out.dst, self._out.pos)[:]
                self._out.pos = 0

            if not zresult:
                self._finished = True
                return


class ZstdCompressionReader(object):
    """Readable compressing stream wrapper.

    ``ZstdCompressionReader`` is a read-only stream interface for obtaining
    compressed data from a source.

    This type conforms to the ``io.RawIOBase`` interface and should be usable
    by any type that operates against a *file-object* (``typing.BinaryIO``
    in Python type hinting speak).

    Instances are neither writable nor seekable (even if the underlying
    source is seekable). ``readline()`` and ``readlines()`` are not implemented
    because they don't make sense for compressed data. ``tell()`` returns the
    number of compressed bytes emitted so far.

    Instances are obtained by calling :py:meth:`ZstdCompressor.stream_reader`.

    In this example, we open a file for reading and then wrap that file
    handle with a stream from which compressed data can be ``read()``.

    >>> with open(path, 'rb') as fh:
    ...     cctx = zstandard.ZstdCompressor()
    ...     reader = cctx.stream_reader(fh)
    ...     while True:
    ...         chunk = reader.read(16384)
    ...         if not chunk:
    ...             break
    ...
    ...         # Do something with compressed chunk.

    Instances can also be used as context managers:

    >>> with open(path, 'rb') as fh:
    ...     cctx = zstandard.ZstdCompressor()
    ...     with cctx.stream_reader(fh) as reader:
    ...         while True:
    ...             chunk = reader.read(16384)
    ...             if not chunk:
    ...                 break
    ...
    ...             # Do something with compressed chunk.

    When the context manager exits or ``close()`` is called, the stream is
    closed, underlying resources are released, and future operations against
    the compression stream will fail.

    ``stream_reader()`` accepts a ``size`` argument specifying how large the
    input stream is. This is used to adjust compression parameters so they are
    tailored to the source size. e.g.

    >>> with open(path, 'rb') as fh:
    ...     cctx = zstandard.ZstdCompressor()
    ...     with cctx.stream_reader(fh, size=os.stat(path).st_size) as reader:
    ...         ...

    If the ``source`` is a stream, you can specify how large ``read()``
    requests to that stream should be via the ``read_size`` argument.
    It defaults to ``zstandard.COMPRESSION_RECOMMENDED_INPUT_SIZE``. e.g.

    >>> with open(path, 'rb') as fh:
    ...     cctx = zstandard.ZstdCompressor()
    ...     # Will perform fh.read(8192) when obtaining data to feed into the
    ...     # compressor.
    ...     with cctx.stream_reader(fh, read_size=8192) as reader:
    ...         ...
    """

    def __init__(self, compressor, source, read_size, closefd=True):
        self._compressor = compressor
        self._source = source
        self._read_size = read_size
        self._closefd = closefd
        self._entered = False
        self._closed = False
        self._bytes_compressed = 0
        self._finished_input = False
        self._finished_output = False

        self._in_buffer = ffi.new("ZSTD_inBuffer *")
        # Holds a ref so backing bytes in self._in_buffer stay alive.
        self._source_buffer = None

    def __enter__(self):
        if self._entered:
            raise ValueError("cannot __enter__ multiple times")

        if self._closed:
            raise ValueError("stream is closed")

        self._entered = True
        return self

    def __exit__(self, exc_type, exc_value, exc_tb):
        self._entered = False
        self._compressor = None
        self.close()
        self._source = None

        return False

    def readable(self):
        return True

    def writable(self):
        return False

    def seekable(self):
        return False

    def readline(self):
        raise io.UnsupportedOperation()

    def readlines(self):
        raise io.UnsupportedOperation()

    def write(self, data):
        raise OSError("stream is not writable")

    def writelines(self, ignored):
        raise OSError("stream is not writable")

    def isatty(self):
        return False

    def flush(self):
        return None

    def close(self):
        if self._closed:
            return

        self._closed = True

        f = getattr(self._source, "close", None)
        if self._closefd and f:
            f()

    @property
    def closed(self):
        return self._closed

    def tell(self):
        return self._bytes_compressed

    def readall(self):
        chunks = []

        while True:
            chunk = self.read(1048576)
            if not chunk:
                break

            chunks.append(chunk)

        return b"".join(chunks)

    def __iter__(self):
        raise io.UnsupportedOperation()

    def __next__(self):
        raise io.UnsupportedOperation()

    next = __next__

    def _read_input(self):
        if self._finished_input:
            return

        if hasattr(self._source, "read"):
            data = self._source.read(self._read_size)

            if not data:
                self._finished_input = True
                return

            self._source_buffer = ffi.from_buffer(data)
            self._in_buffer.src = self._source_buffer
            self._in_buffer.size = len(self._source_buffer)
            self._in_buffer.pos = 0
        else:
            self._source_buffer = ffi.from_buffer(self._source)
            self._in_buffer.src = self._source_buffer
            self._in_buffer.size = len(self._source_buffer)
            self._in_buffer.pos = 0

    def _compress_into_buffer(self, out_buffer):
        if self._in_buffer.pos >= self._in_buffer.size:
            return

        old_pos = out_buffer.pos

        zresult = lib.ZSTD_compressStream2(
            self._compressor._cctx,
            out_buffer,
            self._in_buffer,
            lib.ZSTD_e_continue,
        )

        self._bytes_compressed += out_buffer.pos - old_pos

        if self._in_buffer.pos == self._in_buffer.size:
            self._in_buffer.src = ffi.NULL
            self._in_buffer.pos = 0
            self._in_buffer.size = 0
            self._source_buffer = None

            if not hasattr(self._source, "read"):
                self._finished_input = True

        if lib.ZSTD_isError(zresult):
            raise ZstdError("zstd compress error: %s", _zstd_error(zresult))

        return out_buffer.pos and out_buffer.pos == out_buffer.size

    def read(self, size=-1):
        if self._closed:
            raise ValueError("stream is closed")

        if size < -1:
            raise ValueError("cannot read negative amounts less than -1")

        if size == -1:
            return self.readall()

        if self._finished_output or size == 0:
            return b""

        # Need a dedicated ref to dest buffer otherwise it gets collected.
        dst_buffer = ffi.new("char[]", size)
        out_buffer = ffi.new("ZSTD_outBuffer *")
        out_buffer.dst = dst_buffer
        out_buffer.size = size
        out_buffer.pos = 0

        if self._compress_into_buffer(out_buffer):
            return ffi.buffer(out_buffer.dst, out_buffer.pos)[:]

        while not self._finished_input:
            self._read_input()

            if self._compress_into_buffer(out_buffer):
                return ffi.buffer(out_buffer.dst, out_buffer.pos)[:]

        # EOF
        old_pos = out_buffer.pos

        zresult = lib.ZSTD_compressStream2(
            self._compressor._cctx, out_buffer, self._in_buffer, lib.ZSTD_e_end
        )

        self._bytes_compressed += out_buffer.pos - old_pos

        if lib.ZSTD_isError(zresult):
            raise ZstdError(
                "error ending compression stream: %s", _zstd_error(zresult)
            )

        if zresult == 0:
            self._finished_output = True

        return ffi.buffer(out_buffer.dst, out_buffer.pos)[:]

    def read1(self, size=-1):
        if self._closed:
            raise ValueError("stream is closed")

        if size < -1:
            raise ValueError("cannot read negative amounts less than -1")

        if self._finished_output or size == 0:
            return b""

        # -1 returns arbitrary number of bytes.
        if size == -1:
            size = COMPRESSION_RECOMMENDED_OUTPUT_SIZE

        dst_buffer = ffi.new("char[]", size)
        out_buffer = ffi.new("ZSTD_outBuffer *")
        out_buffer.dst = dst_buffer
        out_buffer.size = size
        out_buffer.pos = 0

        # read1() dictates that we can perform at most 1 call to the
        # underlying stream to get input. However, we can't satisfy this
        # restriction with compression because not all input generates output.
        # It is possible to perform a block flush in order to ensure output.
        # But this may not be desirable behavior. So we allow multiple read()
        # to the underlying stream. But unlike read(), we stop once we have
        # any output.

        self._compress_into_buffer(out_buffer)
        if out_buffer.pos:
            return ffi.buffer(out_buffer.dst, out_buffer.pos)[:]

        while not self._finished_input:
            self._read_input()

            # If we've filled the output buffer, return immediately.
            if self._compress_into_buffer(out_buffer):
                return ffi.buffer(out_buffer.dst, out_buffer.pos)[:]

            # If we've populated the output buffer and we're not at EOF,
            # also return, as we've satisfied the read1() limits.
            if out_buffer.pos and not self._finished_input:
                return ffi.buffer(out_buffer.dst, out_buffer.pos)[:]

            # Else if we're at EOS and we have room left in the buffer,
            # fall through to below and try to add more data to the output.

        # EOF.
        old_pos = out_buffer.pos

        zresult = lib.ZSTD_compressStream2(
            self._compressor._cctx, out_buffer, self._in_buffer, lib.ZSTD_e_end
        )

        self._bytes_compressed += out_buffer.pos - old_pos

        if lib.ZSTD_isError(zresult):
            raise ZstdError(
                "error ending compression stream: %s" % _zstd_error(zresult)
            )

        if zresult == 0:
            self._finished_output = True

        return ffi.buffer(out_buffer.dst, out_buffer.pos)[:]

    def readinto(self, b):
        if self._closed:
            raise ValueError("stream is closed")

        if self._finished_output:
            return 0

        # TODO use writable=True once we require CFFI >= 1.12.
        dest_buffer = ffi.from_buffer(b)
        ffi.memmove(b, b"", 0)
        out_buffer = ffi.new("ZSTD_outBuffer *")
        out_buffer.dst = dest_buffer
        out_buffer.size = len(dest_buffer)
        out_buffer.pos = 0

        if self._compress_into_buffer(out_buffer):
            return out_buffer.pos

        while not self._finished_input:
            self._read_input()
            if self._compress_into_buffer(out_buffer):
                return out_buffer.pos

        # EOF.
        old_pos = out_buffer.pos
        zresult = lib.ZSTD_compressStream2(
            self._compressor._cctx, out_buffer, self._in_buffer, lib.ZSTD_e_end
        )

        self._bytes_compressed += out_buffer.pos - old_pos

        if lib.ZSTD_isError(zresult):
            raise ZstdError(
                "error ending compression stream: %s", _zstd_error(zresult)
            )

        if zresult == 0:
            self._finished_output = True

        return out_buffer.pos

    def readinto1(self, b):
        if self._closed:
            raise ValueError("stream is closed")

        if self._finished_output:
            return 0

        # TODO use writable=True once we require CFFI >= 1.12.
        dest_buffer = ffi.from_buffer(b)
        ffi.memmove(b, b"", 0)

        out_buffer = ffi.new("ZSTD_outBuffer *")
        out_buffer.dst = dest_buffer
        out_buffer.size = len(dest_buffer)
        out_buffer.pos = 0

        self._compress_into_buffer(out_buffer)
        if out_buffer.pos:
            return out_buffer.pos

        while not self._finished_input:
            self._read_input()

            if self._compress_into_buffer(out_buffer):
                return out_buffer.pos

            if out_buffer.pos and not self._finished_input:
                return out_buffer.pos

        # EOF.
        old_pos = out_buffer.pos

        zresult = lib.ZSTD_compressStream2(
            self._compressor._cctx, out_buffer, self._in_buffer, lib.ZSTD_e_end
        )

        self._bytes_compressed += out_buffer.pos - old_pos

        if lib.ZSTD_isError(zresult):
            raise ZstdError(
                "error ending compression stream: %s" % _zstd_error(zresult)
            )

        if zresult == 0:
            self._finished_output = True

        return out_buffer.pos


class ZstdCompressor(object):
    """
    Create an object used to perform Zstandard compression.

    Each instance is essentially a wrapper around a ``ZSTD_CCtx`` from
    zstd's C API.

    An instance can compress data various ways. Instances can be used
    multiple times. Each compression operation will use the compression
    parameters defined at construction time.

    .. note:

       When using a compression dictionary and multiple compression
       operations are performed, the ``ZstdCompressionParameters`` derived
       from an integer compression ``level`` and the first compressed data's
       size will be reused for all subsequent operations. This may not be
       desirable if source data sizes vary significantly.

    ``compression_params`` is mutually exclusive with ``level``,
    ``write_checksum``, ``write_content_size``, ``write_dict_id``, and
    ``threads``.

    Assume that each ``ZstdCompressor`` instance can only handle a single
    logical compression operation at the same time. i.e. if you call a method
    like ``stream_reader()`` to obtain multiple objects derived from the same
    ``ZstdCompressor`` instance and attempt to use them simultaneously, errors
    will likely occur.

    If you need to perform multiple logical compression operations and you
    can't guarantee those operations are temporally non-overlapping, you need
    to obtain multiple ``ZstdCompressor`` instances.

    Unless specified otherwise, assume that no two methods of
    ``ZstdCompressor`` instances can be called from multiple Python
    threads simultaneously. In other words, assume instances are not thread safe
    unless stated otherwise.

    :param level:
       Integer compression level. Valid values are all negative integers
       through 22. Lower values generally yield faster operations with lower
       compression ratios. Higher values are generally slower but compress
       better. The default is 3, which is what the ``zstd`` CLI uses. Negative
       levels effectively engage ``--fast`` mode from the ``zstd`` CLI.
    :param dict_data:
       A ``ZstdCompressionDict`` to be used to compress with dictionary
        data.
    :param compression_params:
       A ``ZstdCompressionParameters`` instance defining low-level compression
       parameters. If defined, this will overwrite the ``level`` argument.
    :param write_checksum:
       If True, a 4 byte content checksum will be written with the compressed
       data, allowing the decompressor to perform content verification.
    :param write_content_size:
       If True (the default), the decompressed content size will be included
       in the header of the compressed data. This data will only be written if
       the compressor knows the size of the input data.
    :param write_dict_id:
       Determines whether the dictionary ID will be written into the compressed
       data. Defaults to True. Only adds content to the compressed data if
       a dictionary is being used.
    :param threads:
       Number of threads to use to compress data concurrently. When set,
       compression operations are performed on multiple threads. The default
       value (0) disables multi-threaded compression. A value of ``-1`` means
       to set the number of threads to the number of detected logical CPUs.
    """

    def __init__(
        self,
        level=3,
        dict_data=None,
        compression_params=None,
        write_checksum=None,
        write_content_size=None,
        write_dict_id=None,
        threads=0,
    ):
        if level > lib.ZSTD_maxCLevel():
            raise ValueError(
                "level must be less than %d" % lib.ZSTD_maxCLevel()
            )

        if threads < 0:
            threads = _cpu_count()

        if compression_params and write_checksum is not None:
            raise ValueError(
                "cannot define compression_params and " "write_checksum"
            )

        if compression_params and write_content_size is not None:
            raise ValueError(
                "cannot define compression_params and " "write_content_size"
            )

        if compression_params and write_dict_id is not None:
            raise ValueError(
                "cannot define compression_params and " "write_dict_id"
            )

        if compression_params and threads:
            raise ValueError("cannot define compression_params and threads")

        if compression_params:
            self._params = _make_cctx_params(compression_params)
        else:
            if write_dict_id is None:
                write_dict_id = True

            params = lib.ZSTD_createCCtxParams()
            if params == ffi.NULL:
                raise MemoryError()

            self._params = ffi.gc(params, lib.ZSTD_freeCCtxParams)

            _set_compression_parameter(
                self._params, lib.ZSTD_c_compressionLevel, level
            )

            _set_compression_parameter(
                self._params,
                lib.ZSTD_c_contentSizeFlag,
                write_content_size if write_content_size is not None else 1,
            )

            _set_compression_parameter(
                self._params,
                lib.ZSTD_c_checksumFlag,
                1 if write_checksum else 0,
            )

            _set_compression_parameter(
                self._params, lib.ZSTD_c_dictIDFlag, 1 if write_dict_id else 0
            )

            if threads:
                _set_compression_parameter(
                    self._params, lib.ZSTD_c_nbWorkers, threads
                )

        cctx = lib.ZSTD_createCCtx()
        if cctx == ffi.NULL:
            raise MemoryError()

        self._cctx = cctx
        self._dict_data = dict_data

        # We defer setting up garbage collection until after calling
        # _setup_cctx() to ensure the memory size estimate is more accurate.
        try:
            self._setup_cctx()
        finally:
            self._cctx = ffi.gc(
                cctx, lib.ZSTD_freeCCtx, size=lib.ZSTD_sizeof_CCtx(cctx)
            )

    def _setup_cctx(self):
        zresult = lib.ZSTD_CCtx_setParametersUsingCCtxParams(
            self._cctx, self._params
        )
        if lib.ZSTD_isError(zresult):
            raise ZstdError(
                "could not set compression parameters: %s"
                % _zstd_error(zresult)
            )

        dict_data = self._dict_data

        if dict_data:
            if dict_data._cdict:
                zresult = lib.ZSTD_CCtx_refCDict(self._cctx, dict_data._cdict)
            else:
                zresult = lib.ZSTD_CCtx_loadDictionary_advanced(
                    self._cctx,
                    dict_data.as_bytes(),
                    len(dict_data),
                    lib.ZSTD_dlm_byRef,
                    dict_data._dict_type,
                )

            if lib.ZSTD_isError(zresult):
                raise ZstdError(
                    "could not load compression dictionary: %s"
                    % _zstd_error(zresult)
                )

    def memory_size(self):
        """Obtain the memory usage of this compressor, in bytes.

        >>> cctx = zstandard.ZstdCompressor()
        >>> memory = cctx.memory_size()
        """
        return lib.ZSTD_sizeof_CCtx(self._cctx)

    def compress(self, data):
        """
        Compress data in a single operation.

        This is the simplest mechanism to perform compression: simply pass in a
        value and get a compressed value back. It is almost the most prone to
        abuse.

        The input and output values must fit in memory, so passing in very large
        values can result in excessive memory usage. For this reason, one of the
        streaming based APIs is preferred for larger values.

        :param data:
           Source data to compress
        :return:
           Compressed data

        >>> cctx = zstandard.ZstdCompressor()
        >>> compressed = cctx.compress(b"data to compress")
        """
        lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only)

        data_buffer = ffi.from_buffer(data)

        dest_size = lib.ZSTD_compressBound(len(data_buffer))
        out = new_nonzero("char[]", dest_size)

        zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, len(data_buffer))
        if lib.ZSTD_isError(zresult):
            raise ZstdError(
                "error setting source size: %s" % _zstd_error(zresult)
            )

        out_buffer = ffi.new("ZSTD_outBuffer *")
        in_buffer = ffi.new("ZSTD_inBuffer *")

        out_buffer.dst = out
        out_buffer.size = dest_size
        out_buffer.pos = 0

        in_buffer.src = data_buffer
        in_buffer.size = len(data_buffer)
        in_buffer.pos = 0

        zresult = lib.ZSTD_compressStream2(
            self._cctx, out_buffer, in_buffer, lib.ZSTD_e_end
        )

        if lib.ZSTD_isError(zresult):
            raise ZstdError("cannot compress: %s" % _zstd_error(zresult))
        elif zresult:
            raise ZstdError("unexpected partial frame flush")

        return ffi.buffer(out, out_buffer.pos)[:]

    def compressobj(self, size=-1):
        """
        Obtain a compressor exposing the Python standard library compression API.

        See :py:class:`ZstdCompressionObj` for the full documentation.

        :param size:
           Size in bytes of data that will be compressed.
        :return:
           :py:class:`ZstdCompressionObj`
        """
        lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only)

        if size < 0:
            size = lib.ZSTD_CONTENTSIZE_UNKNOWN

        zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size)
        if lib.ZSTD_isError(zresult):
            raise ZstdError(
                "error setting source size: %s" % _zstd_error(zresult)
            )

        cobj = ZstdCompressionObj()
        cobj._out = ffi.new("ZSTD_outBuffer *")
        cobj._dst_buffer = ffi.new(
            "char[]", COMPRESSION_RECOMMENDED_OUTPUT_SIZE
        )
        cobj._out.dst = cobj._dst_buffer
        cobj._out.size = COMPRESSION_RECOMMENDED_OUTPUT_SIZE
        cobj._out.pos = 0
        cobj._compressor = self
        cobj._finished = False

        return cobj

    def chunker(self, size=-1, chunk_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE):
        """
        Create an object for iterative compressing to same-sized chunks.

        This API is similar to :py:meth:`ZstdCompressor.compressobj` but has
        better performance properties.

        :param size:
           Size in bytes of data that will be compressed.
        :param chunk_size:
           Size of compressed chunks.
        :return:
           :py:class:`ZstdCompressionChunker`
        """
        lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only)

        if size < 0:
            size = lib.ZSTD_CONTENTSIZE_UNKNOWN

        zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size)
        if lib.ZSTD_isError(zresult):
            raise ZstdError(
                "error setting source size: %s" % _zstd_error(zresult)
            )

        return ZstdCompressionChunker(self, chunk_size=chunk_size)

    def copy_stream(
        self,
        ifh,
        ofh,
        size=-1,
        read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE,
        write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE,
    ):
        """
        Copy data between 2 streams while compressing it.

        Data will be read from ``ifh``, compressed, and written to ``ofh``.
        ``ifh`` must have a ``read(size)`` method. ``ofh`` must have a
        ``write(data)``
        method.

        >>> cctx = zstandard.ZstdCompressor()
        >>> with open(input_path, "rb") as ifh, open(output_path, "wb") as ofh:
        ...     cctx.copy_stream(ifh, ofh)

        It is also possible to declare the size of the source stream:

        >>> cctx = zstandard.ZstdCompressor()
        >>> cctx.copy_stream(ifh, ofh, size=len_of_input)

        You can also specify how large the chunks that are ``read()``
        and ``write()`` from and to the streams:

        >>> cctx = zstandard.ZstdCompressor()
        >>> cctx.copy_stream(ifh, ofh, read_size=32768, write_size=16384)

        The stream copier returns a 2-tuple of bytes read and written:

        >>> cctx = zstandard.ZstdCompressor()
        >>> read_count, write_count = cctx.copy_stream(ifh, ofh)

        :param ifh:
           Source stream to read from
        :param ofh:
           Destination stream to write to
        :param size:
           Size in bytes of the source stream. If defined, compression
           parameters will be tuned for this size.
        :param read_size:
           Chunk sizes that source stream should be ``read()`` from.
        :param write_size:
           Chunk sizes that destination stream should be ``write()`` to.
        :return:
           2-tuple of ints of bytes read and written, respectively.
        """

        if not hasattr(ifh, "read"):
            raise ValueError("first argument must have a read() method")
        if not hasattr(ofh, "write"):
            raise ValueError("second argument must have a write() method")

        lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only)

        if size < 0:
            size = lib.ZSTD_CONTENTSIZE_UNKNOWN

        zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size)
        if lib.ZSTD_isError(zresult):
            raise ZstdError(
                "error setting source size: %s" % _zstd_error(zresult)
            )

        in_buffer = ffi.new("ZSTD_inBuffer *")
        out_buffer = ffi.new("ZSTD_outBuffer *")

        dst_buffer = ffi.new("char[]", write_size)
        out_buffer.dst = dst_buffer
        out_buffer.size = write_size
        out_buffer.pos = 0

        total_read, total_write = 0, 0

        while True:
            data = ifh.read(read_size)
            if not data:
                break

            data_buffer = ffi.from_buffer(data)
            total_read += len(data_buffer)
            in_buffer.src = data_buffer
            in_buffer.size = len(data_buffer)
            in_buffer.pos = 0

            while in_buffer.pos < in_buffer.size:
                zresult = lib.ZSTD_compressStream2(
                    self._cctx, out_buffer, in_buffer, lib.ZSTD_e_continue
                )
                if lib.ZSTD_isError(zresult):
                    raise ZstdError(
                        "zstd compress error: %s" % _zstd_error(zresult)
                    )

                if out_buffer.pos:
                    ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos))
                    total_write += out_buffer.pos
                    out_buffer.pos = 0

        # We've finished reading. Flush the compressor.
        while True:
            zresult = lib.ZSTD_compressStream2(
                self._cctx, out_buffer, in_buffer, lib.ZSTD_e_end
            )
            if lib.ZSTD_isError(zresult):
                raise ZstdError(
                    "error ending compression stream: %s" % _zstd_error(zresult)
                )

            if out_buffer.pos:
                ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos))
                total_write += out_buffer.pos
                out_buffer.pos = 0

            if zresult == 0:
                break

        return total_read, total_write

    def stream_reader(
        self,
        source,
        size=-1,
        read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE,
        closefd=True,
    ):
        """
        Wrap a readable source with a stream that can read compressed data.

        This will produce an object conforming to the ``io.RawIOBase``
        interface which can be ``read()`` from to retrieve compressed data
        from a source.

        The source object can be any object with a ``read(size)`` method
        or an object that conforms to the buffer protocol.

        See :py:class:`ZstdCompressionReader` for type documentation and usage
        examples.

        :param source:
           Object to read source data from
        :param size:
           Size in bytes of source object.
        :param read_size:
           How many bytes to request when ``read()``'ing from the source.
        :param closefd:
           Whether to close the source stream when the returned stream is
           closed.
        :return:
           :py:class:`ZstdCompressionReader`
        """
        lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only)

        try:
            size = len(source)
        except Exception:
            pass

        if size < 0:
            size = lib.ZSTD_CONTENTSIZE_UNKNOWN

        zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size)
        if lib.ZSTD_isError(zresult):
            raise ZstdError(
                "error setting source size: %s" % _zstd_error(zresult)
            )

        return ZstdCompressionReader(self, source, read_size, closefd=closefd)

    def stream_writer(
        self,
        writer,
        size=-1,
        write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE,
        write_return_read=True,
        closefd=True,
    ):
        """
        Create a stream that will write compressed data into another stream.

        The argument to ``stream_writer()`` must have a ``write(data)`` method.
        As compressed data is available, ``write()`` will be called with the
        compressed data as its argument. Many common Python types implement
        ``write()``, including open file handles and ``io.BytesIO``.

        See :py:class:`ZstdCompressionWriter` for more documentation, including
        usage examples.

        :param writer:
           Stream to write compressed data to.
        :param size:
           Size in bytes of data to be compressed. If set, it will be used
           to influence compression parameter tuning and could result in the
           size being written into the header of the compressed data.
        :param write_size:
           How much data to ``write()`` to ``writer`` at a time.
        :param write_return_read:
           Whether ``write()`` should return the number of bytes that were
           consumed from the input.
        :param closefd:
           Whether to ``close`` the ``writer`` when this stream is closed.
        :return:
           :py:class:`ZstdCompressionWriter`
        """
        if not hasattr(writer, "write"):
            raise ValueError("must pass an object with a write() method")

        lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only)

        if size < 0:
            size = lib.ZSTD_CONTENTSIZE_UNKNOWN

        return ZstdCompressionWriter(
            self, writer, size, write_size, write_return_read, closefd=closefd
        )

    def read_to_iter(
        self,
        reader,
        size=-1,
        read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE,
        write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE,
    ):
        """
        Read uncompressed data from a reader and return an iterator

        Returns an iterator of compressed data produced from reading from
        ``reader``.

        This method provides a mechanism to stream compressed data out of a
        source as an iterator of data chunks.

        Uncompressed data will be obtained from ``reader`` by calling the
        ``read(size)`` method of it or by reading a slice (if ``reader``
        conforms to the *buffer protocol*). The source data will be streamed
        into a compressor. As compressed data is available, it will be exposed
        to the iterator.

        Data is read from the source in chunks of ``read_size``. Compressed
        chunks are at most ``write_size`` bytes. Both values default to the
        zstd input and and output defaults, respectively.

        If reading from the source via ``read()``, ``read()`` will be called
        until it raises or returns an empty bytes (``b""``). It is perfectly
        valid for the source to deliver fewer bytes than were what requested
        by ``read(size)``.

        The caller is partially in control of how fast data is fed into the
        compressor by how it consumes the returned iterator. The compressor
        will not consume from the reader unless the caller consumes from the
        iterator.

        >>> cctx = zstandard.ZstdCompressor()
        >>> for chunk in cctx.read_to_iter(fh):
        ...     # Do something with emitted data.

        ``read_to_iter()`` accepts a ``size`` argument declaring the size of
        the input stream:

        >>> cctx = zstandard.ZstdCompressor()
        >>> for chunk in cctx.read_to_iter(fh, size=some_int):
        >>>     pass

        You can also control the size that data is ``read()`` from the source
        and the ideal size of output chunks:

        >>> cctx = zstandard.ZstdCompressor()
        >>> for chunk in cctx.read_to_iter(fh, read_size=16384, write_size=8192):
        >>>     pass

        ``read_to_iter()`` does not give direct control over the sizes of chunks
        fed into the compressor. Instead, chunk sizes will be whatever the object
        being read from delivers. These will often be of a uniform size.

        :param reader:
           Stream providing data to be compressed.
        :param size:
           Size in bytes of input data.
        :param read_size:
           Controls how many bytes are ``read()`` from the source.
        :param write_size:
           Controls the output size of emitted chunks.
        :return:
           Iterator of ``bytes``.
        """

        if hasattr(reader, "read"):
            have_read = True
        elif hasattr(reader, "__getitem__"):
            have_read = False
            buffer_offset = 0
            size = len(reader)
        else:
            raise ValueError(
                "must pass an object with a read() method or "
                "conforms to buffer protocol"
            )

        lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only)

        if size < 0:
            size = lib.ZSTD_CONTENTSIZE_UNKNOWN

        zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size)
        if lib.ZSTD_isError(zresult):
            raise ZstdError(
                "error setting source size: %s" % _zstd_error(zresult)
            )

        in_buffer = ffi.new("ZSTD_inBuffer *")
        out_buffer = ffi.new("ZSTD_outBuffer *")

        in_buffer.src = ffi.NULL
        in_buffer.size = 0
        in_buffer.pos = 0

        dst_buffer = ffi.new("char[]", write_size)
        out_buffer.dst = dst_buffer
        out_buffer.size = write_size
        out_buffer.pos = 0

        while True:
            # We should never have output data sitting around after a previous
            # iteration.
            assert out_buffer.pos == 0

            # Collect input data.
            if have_read:
                read_result = reader.read(read_size)
            else:
                remaining = len(reader) - buffer_offset
                slice_size = min(remaining, read_size)
                read_result = reader[buffer_offset : buffer_offset + slice_size]
                buffer_offset += slice_size

            # No new input data. Break out of the read loop.
            if not read_result:
                break

            # Feed all read data into the compressor and emit output until
            # exhausted.
            read_buffer = ffi.from_buffer(read_result)
            in_buffer.src = read_buffer
            in_buffer.size = len(read_buffer)
            in_buffer.pos = 0

            while in_buffer.pos < in_buffer.size:
                zresult = lib.ZSTD_compressStream2(
                    self._cctx, out_buffer, in_buffer, lib.ZSTD_e_continue
                )
                if lib.ZSTD_isError(zresult):
                    raise ZstdError(
                        "zstd compress error: %s" % _zstd_error(zresult)
                    )

                if out_buffer.pos:
                    data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
                    out_buffer.pos = 0
                    yield data

            assert out_buffer.pos == 0

            # And repeat the loop to collect more data.
            continue

        # If we get here, input is exhausted. End the stream and emit what
        # remains.
        while True:
            assert out_buffer.pos == 0
            zresult = lib.ZSTD_compressStream2(
                self._cctx, out_buffer, in_buffer, lib.ZSTD_e_end
            )
            if lib.ZSTD_isError(zresult):
                raise ZstdError(
                    "error ending compression stream: %s" % _zstd_error(zresult)
                )

            if out_buffer.pos:
                data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
                out_buffer.pos = 0
                yield data

            if zresult == 0:
                break

    def multi_compress_to_buffer(self, data, threads=-1):
        """
        Compress multiple pieces of data as a single function call.

        (Experimental. Not yet supported by CFFI backend.)

        This function is optimized to perform multiple compression operations
        as as possible with as little overhead as possible.

        Data to be compressed can be passed as a ``BufferWithSegmentsCollection``,
        a ``BufferWithSegments``, or a list containing byte like objects. Each
        element of the container will be compressed individually using the
        configured parameters on the ``ZstdCompressor`` instance.

        The ``threads`` argument controls how many threads to use for
        compression. The default is ``0`` which means to use a single thread.
        Negative values use the number of logical CPUs in the machine.

        The function returns a ``BufferWithSegmentsCollection``. This type
        represents N discrete memory allocations, each holding 1 or more
        compressed frames.

        Output data is written to shared memory buffers. This means that unlike
        regular Python objects, a reference to *any* object within the collection
        keeps the shared buffer and therefore memory backing it alive. This can
        have undesirable effects on process memory usage.

        The API and behavior of this function is experimental and will likely
        change. Known deficiencies include:

        * If asked to use multiple threads, it will always spawn that many
          threads, even if the input is too small to use them. It should
          automatically lower the thread count when the extra threads would
          just add overhead.
        * The buffer allocation strategy is fixed. There is room to make it
          dynamic, perhaps even to allow one output buffer per input,
          facilitating a variation of the API to return a list without the
          adverse effects of shared memory buffers.

        :param data:
           Source to read discrete pieces of data to compress.

           Can be a ``BufferWithSegmentsCollection``, a ``BufferWithSegments``,
           or a ``list[bytes]``.
        :return:
           BufferWithSegmentsCollection holding compressed data.
        """
        raise NotImplementedError()

    def frame_progression(self):
        """
        Return information on how much work the compressor has done.

        Returns a 3-tuple of (ingested, consumed, produced).

        >>> cctx = zstandard.ZstdCompressor()
        >>> (ingested, consumed, produced) = cctx.frame_progression()
        """
        progression = lib.ZSTD_getFrameProgression(self._cctx)

        return progression.ingested, progression.consumed, progression.produced


class FrameParameters(object):
    """Information about a zstd frame.

    Instances have the following attributes:

    ``content_size``
       Integer size of original, uncompressed content. This will be ``0`` if the
       original content size isn't written to the frame (controlled with the
       ``write_content_size`` argument to ``ZstdCompressor``) or if the input
       content size was ``0``.

    ``window_size``
       Integer size of maximum back-reference distance in compressed data.

    ``dict_id``
       Integer of dictionary ID used for compression. ``0`` if no dictionary
       ID was used or if the dictionary ID was ``0``.

    ``has_checksum``
       Bool indicating whether a 4 byte content checksum is stored at the end
       of the frame.
    """

    def __init__(self, fparams):
        self.content_size = fparams.frameContentSize
        self.window_size = fparams.windowSize
        self.dict_id = fparams.dictID
        self.has_checksum = bool(fparams.checksumFlag)


def frame_content_size(data):
    """Obtain the decompressed size of a frame.

    The returned value is usually accurate. But strictly speaking it should
    not be trusted.

    :return:
       ``-1`` if size unknown and a non-negative integer otherwise.
    """
    data_buffer = ffi.from_buffer(data)

    size = lib.ZSTD_getFrameContentSize(data_buffer, len(data_buffer))

    if size == lib.ZSTD_CONTENTSIZE_ERROR:
        raise ZstdError("error when determining content size")
    elif size == lib.ZSTD_CONTENTSIZE_UNKNOWN:
        return -1
    else:
        return size


def frame_header_size(data):
    """Obtain the size of a frame header.

    :return:
       Integer size in bytes.
    """
    data_buffer = ffi.from_buffer(data)

    zresult = lib.ZSTD_frameHeaderSize(data_buffer, len(data_buffer))
    if lib.ZSTD_isError(zresult):
        raise ZstdError(
            "could not determine frame header size: %s" % _zstd_error(zresult)
        )

    return zresult


def get_frame_parameters(data):
    """
    Parse a zstd frame header into frame parameters.

    Depending on which fields are present in the frame and their values, the
    length of the frame parameters varies. If insufficient bytes are passed
    in to fully parse the frame parameters, ``ZstdError`` is raised. To ensure
    frame parameters can be parsed, pass in at least 18 bytes.

    :param data:
       Data from which to read frame parameters.
    :return:
       :py:class:`FrameParameters`
    """
    params = ffi.new("ZSTD_frameHeader *")

    data_buffer = ffi.from_buffer(data)
    zresult = lib.ZSTD_getFrameHeader(params, data_buffer, len(data_buffer))
    if lib.ZSTD_isError(zresult):
        raise ZstdError(
            "cannot get frame parameters: %s" % _zstd_error(zresult)
        )

    if zresult:
        raise ZstdError(
            "not enough data for frame parameters; need %d bytes" % zresult
        )

    return FrameParameters(params[0])


class ZstdCompressionDict(object):
    """Represents a computed compression dictionary.

    Instances are obtained by calling :py:func:`train_dictionary` or by
    passing bytes obtained from another source into the constructor.

    Instances can be constructed from bytes:

    >>> dict_data = zstandard.ZstdCompressionDict(data)

    It is possible to construct a dictionary from *any* data. If the data
    doesn't begin with a magic header, it will be treated as a *prefix*
    dictionary. *Prefix* dictionaries allow compression operations to
    reference raw data within the dictionary.

    It is possible to force the use of *prefix* dictionaries or to require
    a dictionary header:

    >>> dict_data = zstandard.ZstdCompressionDict(data, dict_type=zstandard.DICT_TYPE_RAWCONTENT)
    >>> dict_data = zstandard.ZstdCompressionDict(data, dict_type=zstandard.DICT_TYPE_FULLDICT)

    You can see how many bytes are in the dictionary by calling ``len()``:

    >>> dict_data = zstandard.train_dictionary(size, samples)
    >>> dict_size = len(dict_data)  # will not be larger than ``size``

    Once you have a dictionary, you can pass it to the objects performing
    compression and decompression:

    >>> dict_data = zstandard.train_dictionary(131072, samples)
    >>> cctx = zstandard.ZstdCompressor(dict_data=dict_data)
    >>> for source_data in input_data:
    ...     compressed = cctx.compress(source_data)
    ...     # Do something with compressed data.
    ...
    >>> dctx = zstandard.ZstdDecompressor(dict_data=dict_data)
    >>> for compressed_data in input_data:
    ...     buffer = io.BytesIO()
    ...     with dctx.stream_writer(buffer) as decompressor:
    ...         decompressor.write(compressed_data)
    ...         # Do something with raw data in ``buffer``.

    Dictionaries have unique integer IDs. You can retrieve this ID via:

    >>> dict_id = zstandard.dictionary_id(dict_data)

    You can obtain the raw data in the dict (useful for persisting and constructing
    a ``ZstdCompressionDict`` later) via ``as_bytes()``:

    >>> dict_data = zstandard.train_dictionary(size, samples)
    >>> raw_data = dict_data.as_bytes()

    By default, when a ``ZstdCompressionDict`` is *attached* to a
    ``ZstdCompressor``, each ``ZstdCompressor`` performs work to prepare the
    dictionary for use. This is fine if only 1 compression operation is being
    performed or if the ``ZstdCompressor`` is being reused for multiple operations.
    But if multiple ``ZstdCompressor`` instances are being used with the dictionary,
    this can add overhead.

    It is possible to *precompute* the dictionary so it can readily be consumed
    by multiple ``ZstdCompressor`` instances:

    >>> d = zstandard.ZstdCompressionDict(data)
    >>> # Precompute for compression level 3.
    >>> d.precompute_compress(level=3)
    >>> # Precompute with specific compression parameters.
    >>> params = zstandard.ZstdCompressionParameters(...)
    >>> d.precompute_compress(compression_params=params)

    .. note::

       When a dictionary is precomputed, the compression parameters used to
       precompute the dictionary overwrite some of the compression parameters
       specified to ``ZstdCompressor``.

    :param data:
       Dictionary data.
    :param dict_type:
       Type of dictionary. One of the ``DICT_TYPE_*`` constants.
    """

    def __init__(self, data, dict_type=DICT_TYPE_AUTO, k=0, d=0):
        assert isinstance(data, bytes)
        self._data = data
        self.k = k
        self.d = d

        if dict_type not in (
            DICT_TYPE_AUTO,
            DICT_TYPE_RAWCONTENT,
            DICT_TYPE_FULLDICT,
        ):
            raise ValueError(
                "invalid dictionary load mode: %d; must use "
                "DICT_TYPE_* constants"
            )

        self._dict_type = dict_type
        self._cdict = None

    def __len__(self):
        return len(self._data)

    def dict_id(self):
        """Obtain the integer ID of the dictionary."""
        return int(lib.ZDICT_getDictID(self._data, len(self._data)))

    def as_bytes(self):
        """Obtain the ``bytes`` representation of the dictionary."""
        return self._data

    def precompute_compress(self, level=0, compression_params=None):
        """Precompute a dictionary os it can be used by multiple compressors.

        Calling this method on an instance that will be used by multiple
        :py:class:`ZstdCompressor` instances will improve performance.
        """
        if level and compression_params:
            raise ValueError(
                "must only specify one of level or " "compression_params"
            )

        if not level and not compression_params:
            raise ValueError("must specify one of level or compression_params")

        if level:
            cparams = lib.ZSTD_getCParams(level, 0, len(self._data))
        else:
            cparams = ffi.new("ZSTD_compressionParameters")
            cparams.chainLog = compression_params.chain_log
            cparams.hashLog = compression_params.hash_log
            cparams.minMatch = compression_params.min_match
            cparams.searchLog = compression_params.search_log
            cparams.strategy = compression_params.strategy
            cparams.targetLength = compression_params.target_length
            cparams.windowLog = compression_params.window_log

        cdict = lib.ZSTD_createCDict_advanced(
            self._data,
            len(self._data),
            lib.ZSTD_dlm_byRef,
            self._dict_type,
            cparams,
            lib.ZSTD_defaultCMem,
        )
        if cdict == ffi.NULL:
            raise ZstdError("unable to precompute dictionary")

        self._cdict = ffi.gc(
            cdict, lib.ZSTD_freeCDict, size=lib.ZSTD_sizeof_CDict(cdict)
        )

    @property
    def _ddict(self):
        ddict = lib.ZSTD_createDDict_advanced(
            self._data,
            len(self._data),
            lib.ZSTD_dlm_byRef,
            self._dict_type,
            lib.ZSTD_defaultCMem,
        )

        if ddict == ffi.NULL:
            raise ZstdError("could not create decompression dict")

        ddict = ffi.gc(
            ddict, lib.ZSTD_freeDDict, size=lib.ZSTD_sizeof_DDict(ddict)
        )
        self.__dict__["_ddict"] = ddict

        return ddict


def train_dictionary(
    dict_size,
    samples,
    k=0,
    d=0,
    f=0,
    split_point=0.0,
    accel=0,
    notifications=0,
    dict_id=0,
    level=0,
    steps=0,
    threads=0,
):
    """Train a dictionary from sample data using the COVER algorithm.

    A compression dictionary of size ``dict_size`` will be created from the
    iterable of ``samples``. The raw dictionary bytes will be returned.

    The dictionary training mechanism is known as *cover*. More details about it
    are available in the paper *Effective Construction of Relative Lempel-Ziv
    Dictionaries* (authors: Liao, Petri, Moffat, Wirth).

    The cover algorithm takes parameters ``k`` and ``d``. These are the
    *segment size* and *dmer size*, respectively. The returned dictionary
    instance created by this function has ``k`` and ``d`` attributes
    containing the values for these parameters. If a ``ZstdCompressionDict``
    is constructed from raw bytes data (a content-only dictionary), the
    ``k`` and ``d`` attributes will be ``0``.

    The segment and dmer size parameters to the cover algorithm can either be
    specified manually or ``train_dictionary()`` can try multiple values
    and pick the best one, where *best* means the smallest compressed data size.
    This later mode is called *optimization* mode.

    Under the hood, this function always calls
    ``ZDICT_optimizeTrainFromBuffer_fastCover()``. See the corresponding C library
    documentation for more.

    If neither ``steps`` nor ``threads`` is defined, defaults for ``d``, ``steps``,
    and ``level`` will be used that are equivalent with what
    ``ZDICT_trainFromBuffer()`` would use.


    :param dict_size:
       Target size in bytes of the dictionary to generate.
    :param samples:
       A list of bytes holding samples the dictionary will be trained from.
    :param k:
       Segment size : constraint: 0 < k : Reasonable range [16, 2048+]
    :param d:
       dmer size : constraint: 0 < d <= k : Reasonable range [6, 16]
    :param f:
       log of size of frequency array : constraint: 0 < f <= 31 : 1 means
       default(20)
    :param split_point:
       Percentage of samples used for training: Only used for optimization.
       The first # samples * ``split_point`` samples will be used to training.
       The last # samples * (1 - split_point) samples will be used for testing.
       0 means default (0.75), 1.0 when all samples are used for both training
       and testing.
    :param accel:
       Acceleration level: constraint: 0 < accel <= 10. Higher means faster
       and less accurate, 0 means default(1).
    :param dict_id:
       Integer dictionary ID for the produced dictionary. Default is 0, which uses
       a random value.
    :param steps:
       Number of steps through ``k`` values to perform when trying parameter
       variations.
    :param threads:
       Number of threads to use when trying parameter variations. Default is 0,
       which means to use a single thread. A negative value can be specified to
       use as many threads as there are detected logical CPUs.
    :param level:
       Integer target compression level when trying parameter variations.
    :param notifications:
       Controls writing of informational messages to ``stderr``. ``0`` (the
       default) means to write nothing. ``1`` writes errors. ``2`` writes
       progression info. ``3`` writes more details. And ``4`` writes all info.
    """

    if not isinstance(samples, list):
        raise TypeError("samples must be a list")

    if threads < 0:
        threads = _cpu_count()

    if not steps and not threads:
        d = d or 8
        steps = steps or 4
        level = level or 3

    total_size = sum(map(len, samples))

    samples_buffer = new_nonzero("char[]", total_size)
    sample_sizes = new_nonzero("size_t[]", len(samples))

    offset = 0
    for i, sample in enumerate(samples):
        if not isinstance(sample, bytes):
            raise ValueError("samples must be bytes")

        l = len(sample)
        ffi.memmove(samples_buffer + offset, sample, l)
        offset += l
        sample_sizes[i] = l

    dict_data = new_nonzero("char[]", dict_size)

    dparams = ffi.new("ZDICT_fastCover_params_t *")[0]
    dparams.k = k
    dparams.d = d
    dparams.f = f
    dparams.steps = steps
    dparams.nbThreads = threads
    dparams.splitPoint = split_point
    dparams.accel = accel
    dparams.zParams.notificationLevel = notifications
    dparams.zParams.dictID = dict_id
    dparams.zParams.compressionLevel = level

    zresult = lib.ZDICT_optimizeTrainFromBuffer_fastCover(
        ffi.addressof(dict_data),
        dict_size,
        ffi.addressof(samples_buffer),
        ffi.addressof(sample_sizes, 0),
        len(samples),
        ffi.addressof(dparams),
    )

    if lib.ZDICT_isError(zresult):
        msg = ffi.string(lib.ZDICT_getErrorName(zresult)).decode("utf-8")
        raise ZstdError("cannot train dict: %s" % msg)

    return ZstdCompressionDict(
        ffi.buffer(dict_data, zresult)[:],
        dict_type=DICT_TYPE_FULLDICT,
        k=dparams.k,
        d=dparams.d,
    )


class ZstdDecompressionObj(object):
    """A standard library API compatible decompressor.

    This type implements a compressor that conforms to the API by other
    decompressors in Python's standard library. e.g. ``zlib.decompressobj``
    or ``bz2.BZ2Decompressor``. This allows callers to use zstd compression
    while conforming to a similar API.

    Compressed data chunks are fed into ``decompress(data)`` and
    uncompressed output (or an empty bytes) is returned. Output from
    subsequent calls needs to be concatenated to reassemble the full
    decompressed byte sequence.

    Each instance is single use: once an input frame is decoded,
    ``decompress()`` can no longer be called.

    >>> dctx = zstandard.ZstdDecompressor()
    >>> dobj = dctx.decompressobj()
    >>> data = dobj.decompress(compressed_chunk_0)
    >>> data = dobj.decompress(compressed_chunk_1)

    By default, calls to ``decompress()`` write output data in chunks of size
    ``DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE``. These chunks are concatenated
    before being returned to the caller. It is possible to define the size of
    these temporary chunks by passing ``write_size`` to ``decompressobj()``:

    >>> dctx = zstandard.ZstdDecompressor()
    >>> dobj = dctx.decompressobj(write_size=1048576)

    .. note::

       Because calls to ``decompress()`` may need to perform multiple
       memory (re)allocations, this streaming decompression API isn't as
       efficient as other APIs.
    """

    def __init__(self, decompressor, write_size):
        self._decompressor = decompressor
        self._write_size = write_size
        self._finished = False
        self._unused_input = b""

    def decompress(self, data):
        """Send compressed data to the decompressor and obtain decompressed data.

        :param data:
           Data to feed into the decompressor.
        :return:
           Decompressed bytes.
        """
        if self._finished:
            raise ZstdError("cannot use a decompressobj multiple times")

        in_buffer = ffi.new("ZSTD_inBuffer *")
        out_buffer = ffi.new("ZSTD_outBuffer *")

        data_buffer = ffi.from_buffer(data)

        if len(data_buffer) == 0:
            return b""

        in_buffer.src = data_buffer
        in_buffer.size = len(data_buffer)
        in_buffer.pos = 0

        dst_buffer = ffi.new("char[]", self._write_size)
        out_buffer.dst = dst_buffer
        out_buffer.size = len(dst_buffer)
        out_buffer.pos = 0

        chunks = []

        while True:
            zresult = lib.ZSTD_decompressStream(
                self._decompressor._dctx, out_buffer, in_buffer
            )
            if lib.ZSTD_isError(zresult):
                raise ZstdError(
                    "zstd decompressor error: %s" % _zstd_error(zresult)
                )

            # Always record any output from decompressor.
            if out_buffer.pos:
                chunks.append(ffi.buffer(out_buffer.dst, out_buffer.pos)[:])

            # 0 is only seen when a frame is fully decoded *and* fully flushed.
            # But there may be extra input data: make that available to
            # `unused_input`.
            if zresult == 0:
                self._finished = True
                self._decompressor = None
                self._unused_input = data[in_buffer.pos : in_buffer.size]
                break

            # We're not at the end of the frame *or* we're not fully flushed.

            # The decompressor will write out all the bytes it can to the output
            # buffer. So if the output buffer is partially filled and the input
            # is exhausted, there's nothing more to write. So we've done all we
            # can.
            elif (
                in_buffer.pos == in_buffer.size
                and out_buffer.pos < out_buffer.size
            ):
                break
            else:
                out_buffer.pos = 0

        return b"".join(chunks)

    def flush(self, length=0):
        """Effectively a no-op.

        Implemented for compatibility with the standard library APIs.

        Safe to call at any time.

        :return:
           Empty bytes.
        """
        return b""

    @property
    def unused_data(self):
        """Bytes past the end of compressed data.

        If ``decompress()`` is fed additional data beyond the end of a zstd
        frame, this value will be non-empty once ``decompress()`` fully decodes
        the input frame.
        """
        return self._unused_input

    @property
    def unconsumed_tail(self):
        """Data that has not yet been fed into the decompressor."""
        return b""

    @property
    def eof(self):
        """Whether the end of the compressed data stream has been reached."""
        return self._finished


class ZstdDecompressionReader(object):
    """Read only decompressor that pull uncompressed data from another stream.

    This type provides a read-only stream interface for performing transparent
    decompression from another stream or data source. It conforms to the
    ``io.RawIOBase`` interface. Only methods relevant to reading are
    implemented.

    >>> with open(path, 'rb') as fh:
    >>> dctx = zstandard.ZstdDecompressor()
    >>> reader = dctx.stream_reader(fh)
    >>> while True:
    ...     chunk = reader.read(16384)
    ...     if not chunk:
    ...         break
    ...     # Do something with decompressed chunk.

    The stream can also be used as a context manager:

    >>> with open(path, 'rb') as fh:
    ...     dctx = zstandard.ZstdDecompressor()
    ...     with dctx.stream_reader(fh) as reader:
    ...         ...

    When used as a context manager, the stream is closed and the underlying
    resources are released when the context manager exits. Future operations
    against the stream will fail.

    The ``source`` argument to ``stream_reader()`` can be any object with a
    ``read(size)`` method or any object implementing the *buffer protocol*.

    If the ``source`` is a stream, you can specify how large ``read()`` requests
    to that stream should be via the ``read_size`` argument. It defaults to
    ``zstandard.DECOMPRESSION_RECOMMENDED_INPUT_SIZE``.:

    >>> with open(path, 'rb') as fh:
    ...     dctx = zstandard.ZstdDecompressor()
    ...     # Will perform fh.read(8192) when obtaining data for the decompressor.
    ...     with dctx.stream_reader(fh, read_size=8192) as reader:
    ...         ...

    Instances are *partially* seekable. Absolute and relative positions
    (``SEEK_SET`` and ``SEEK_CUR``) forward of the current position are
    allowed. Offsets behind the current read position and offsets relative
    to the end of stream are not allowed and will raise ``ValueError``
    if attempted.

    ``tell()`` returns the number of decompressed bytes read so far.

    Not all I/O methods are implemented. Notably missing is support for
    ``readline()``, ``readlines()``, and linewise iteration support. This is
    because streams operate on binary data - not text data. If you want to
    convert decompressed output to text, you can chain an ``io.TextIOWrapper``
    to the stream:

    >>> with open(path, 'rb') as fh:
    ...     dctx = zstandard.ZstdDecompressor()
    ...     stream_reader = dctx.stream_reader(fh)
    ...     text_stream = io.TextIOWrapper(stream_reader, encoding='utf-8')
    ...     for line in text_stream:
    ...         ...
    """

    def __init__(
        self,
        decompressor,
        source,
        read_size,
        read_across_frames,
        closefd=True,
    ):
        self._decompressor = decompressor
        self._source = source
        self._read_size = read_size
        self._read_across_frames = bool(read_across_frames)
        self._closefd = bool(closefd)
        self._entered = False
        self._closed = False
        self._bytes_decompressed = 0
        self._finished_input = False
        self._finished_output = False
        self._in_buffer = ffi.new("ZSTD_inBuffer *")
        # Holds a ref to self._in_buffer.src.
        self._source_buffer = None

    def __enter__(self):
        if self._entered:
            raise ValueError("cannot __enter__ multiple times")

        if self._closed:
            raise ValueError("stream is closed")

        self._entered = True
        return self

    def __exit__(self, exc_type, exc_value, exc_tb):
        self._entered = False
        self._decompressor = None
        self.close()
        self._source = None

        return False

    def readable(self):
        return True

    def writable(self):
        return False

    def seekable(self):
        return False

    def readline(self, size=-1):
        raise io.UnsupportedOperation()

    def readlines(self, hint=-1):
        raise io.UnsupportedOperation()

    def write(self, data):
        raise io.UnsupportedOperation()

    def writelines(self, lines):
        raise io.UnsupportedOperation()

    def isatty(self):
        return False

    def flush(self):
        return None

    def close(self):
        if self._closed:
            return None

        self._closed = True

        f = getattr(self._source, "close", None)
        if self._closefd and f:
            f()

    @property
    def closed(self):
        return self._closed

    def tell(self):
        return self._bytes_decompressed

    def readall(self):
        chunks = []

        while True:
            chunk = self.read(1048576)
            if not chunk:
                break

            chunks.append(chunk)

        return b"".join(chunks)

    def __iter__(self):
        raise io.UnsupportedOperation()

    def __next__(self):
        raise io.UnsupportedOperation()

    next = __next__

    def _read_input(self):
        # We have data left over in the input buffer. Use it.
        if self._in_buffer.pos < self._in_buffer.size:
            return

        # All input data exhausted. Nothing to do.
        if self._finished_input:
            return

        # Else populate the input buffer from our source.
        if hasattr(self._source, "read"):
            data = self._source.read(self._read_size)

            if not data:
                self._finished_input = True
                return

            self._source_buffer = ffi.from_buffer(data)
            self._in_buffer.src = self._source_buffer
            self._in_buffer.size = len(self._source_buffer)
            self._in_buffer.pos = 0
        else:
            self._source_buffer = ffi.from_buffer(self._source)
            self._in_buffer.src = self._source_buffer
            self._in_buffer.size = len(self._source_buffer)
            self._in_buffer.pos = 0

    def _decompress_into_buffer(self, out_buffer):
        """Decompress available input into an output buffer.

        Returns True if data in output buffer should be emitted.
        """
        zresult = lib.ZSTD_decompressStream(
            self._decompressor._dctx, out_buffer, self._in_buffer
        )

        if self._in_buffer.pos == self._in_buffer.size:
            self._in_buffer.src = ffi.NULL
            self._in_buffer.pos = 0
            self._in_buffer.size = 0
            self._source_buffer = None

            if not hasattr(self._source, "read"):
                self._finished_input = True

        if lib.ZSTD_isError(zresult):
            raise ZstdError("zstd decompress error: %s" % _zstd_error(zresult))

        # Emit data if there is data AND either:
        # a) output buffer is full (read amount is satisfied)
        # b) we're at end of a frame and not in frame spanning mode
        return out_buffer.pos and (
            out_buffer.pos == out_buffer.size
            or zresult == 0
            and not self._read_across_frames
        )

    def read(self, size=-1):
        if self._closed:
            raise ValueError("stream is closed")

        if size < -1:
            raise ValueError("cannot read negative amounts less than -1")

        if size == -1:
            # This is recursive. But it gets the job done.
            return self.readall()

        if self._finished_output or size == 0:
            return b""

        # We /could/ call into readinto() here. But that introduces more
        # overhead.
        dst_buffer = ffi.new("char[]", size)
        out_buffer = ffi.new("ZSTD_outBuffer *")
        out_buffer.dst = dst_buffer
        out_buffer.size = size
        out_buffer.pos = 0

        self._read_input()
        if self._decompress_into_buffer(out_buffer):
            self._bytes_decompressed += out_buffer.pos
            return ffi.buffer(out_buffer.dst, out_buffer.pos)[:]

        while not self._finished_input:
            self._read_input()
            if self._decompress_into_buffer(out_buffer):
                self._bytes_decompressed += out_buffer.pos
                return ffi.buffer(out_buffer.dst, out_buffer.pos)[:]

        self._bytes_decompressed += out_buffer.pos
        return ffi.buffer(out_buffer.dst, out_buffer.pos)[:]

    def readinto(self, b):
        if self._closed:
            raise ValueError("stream is closed")

        if self._finished_output:
            return 0

        # TODO use writable=True once we require CFFI >= 1.12.
        dest_buffer = ffi.from_buffer(b)
        ffi.memmove(b, b"", 0)
        out_buffer = ffi.new("ZSTD_outBuffer *")
        out_buffer.dst = dest_buffer
        out_buffer.size = len(dest_buffer)
        out_buffer.pos = 0

        self._read_input()
        if self._decompress_into_buffer(out_buffer):
            self._bytes_decompressed += out_buffer.pos
            return out_buffer.pos

        while not self._finished_input:
            self._read_input()
            if self._decompress_into_buffer(out_buffer):
                self._bytes_decompressed += out_buffer.pos
                return out_buffer.pos

        self._bytes_decompressed += out_buffer.pos
        return out_buffer.pos

    def read1(self, size=-1):
        if self._closed:
            raise ValueError("stream is closed")

        if size < -1:
            raise ValueError("cannot read negative amounts less than -1")

        if self._finished_output or size == 0:
            return b""

        # -1 returns arbitrary number of bytes.
        if size == -1:
            size = DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE

        dst_buffer = ffi.new("char[]", size)
        out_buffer = ffi.new("ZSTD_outBuffer *")
        out_buffer.dst = dst_buffer
        out_buffer.size = size
        out_buffer.pos = 0

        # read1() dictates that we can perform at most 1 call to underlying
        # stream to get input. However, we can't satisfy this restriction with
        # decompression because not all input generates output. So we allow
        # multiple read(). But unlike read(), we stop once we have any output.
        while not self._finished_input:
            self._read_input()
            self._decompress_into_buffer(out_buffer)

            if out_buffer.pos:
                break

        self._bytes_decompressed += out_buffer.pos
        return ffi.buffer(out_buffer.dst, out_buffer.pos)[:]

    def readinto1(self, b):
        if self._closed:
            raise ValueError("stream is closed")

        if self._finished_output:
            return 0

        # TODO use writable=True once we require CFFI >= 1.12.
        dest_buffer = ffi.from_buffer(b)
        ffi.memmove(b, b"", 0)

        out_buffer = ffi.new("ZSTD_outBuffer *")
        out_buffer.dst = dest_buffer
        out_buffer.size = len(dest_buffer)
        out_buffer.pos = 0

        while not self._finished_input and not self._finished_output:
            self._read_input()
            self._decompress_into_buffer(out_buffer)

            if out_buffer.pos:
                break

        self._bytes_decompressed += out_buffer.pos
        return out_buffer.pos

    def seek(self, pos, whence=os.SEEK_SET):
        if self._closed:
            raise ValueError("stream is closed")

        read_amount = 0

        if whence == os.SEEK_SET:
            if pos < 0:
                raise OSError("cannot seek to negative position with SEEK_SET")

            if pos < self._bytes_decompressed:
                raise OSError(
                    "cannot seek zstd decompression stream " "backwards"
                )

            read_amount = pos - self._bytes_decompressed

        elif whence == os.SEEK_CUR:
            if pos < 0:
                raise OSError(
                    "cannot seek zstd decompression stream " "backwards"
                )

            read_amount = pos
        elif whence == os.SEEK_END:
            raise OSError(
                "zstd decompression streams cannot be seeked " "with SEEK_END"
            )

        while read_amount:
            result = self.read(
                min(read_amount, DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE)
            )

            if not result:
                break

            read_amount -= len(result)

        return self._bytes_decompressed


class ZstdDecompressionWriter(object):
    """
    Write-only stream wrapper that performs decompression.

    This type provides a writable stream that performs decompression and writes
    decompressed data to another stream.

    This type implements the ``io.RawIOBase`` interface. Only methods that
    involve writing will do useful things.

    Behavior is similar to :py:meth:`ZstdCompressor.stream_writer`: compressed
    data is sent to the decompressor by calling ``write(data)`` and decompressed
    output is written to the inner stream by calling its ``write(data)``
    method:

    >>> dctx = zstandard.ZstdDecompressor()
    >>> decompressor = dctx.stream_writer(fh)
    >>> # Will call fh.write() with uncompressed data.
    >>> decompressor.write(compressed_data)

    Instances can be used as context managers. However, context managers add no
    extra special behavior other than automatically calling ``close()`` when
    they exit.

    Calling ``close()`` will mark the stream as closed and subsequent I/O
    operations will raise ``ValueError`` (per the documented behavior of
    ``io.RawIOBase``). ``close()`` will also call ``close()`` on the
    underlying stream if such a method exists and the instance was created with
    ``closefd=True``.

    The size of chunks to ``write()`` to the destination can be specified:

    >>> dctx = zstandard.ZstdDecompressor()
    >>> with dctx.stream_writer(fh, write_size=16384) as decompressor:
    >>>    pass

    You can see how much memory is being used by the decompressor:

    >>> dctx = zstandard.ZstdDecompressor()
    >>> with dctx.stream_writer(fh) as decompressor:
    >>>    byte_size = decompressor.memory_size()

    ``stream_writer()`` accepts a ``write_return_read`` boolean argument to control
    the return value of ``write()``. When ``True`` (the default)``, ``write()``
    returns the number of bytes that were read from the input. When ``False``,
    ``write()`` returns the number of bytes that were ``write()`` to the inner
    stream.
    """

    def __init__(
        self,
        decompressor,
        writer,
        write_size,
        write_return_read,
        closefd=True,
    ):
        decompressor._ensure_dctx()

        self._decompressor = decompressor
        self._writer = writer
        self._write_size = write_size
        self._write_return_read = bool(write_return_read)
        self._closefd = bool(closefd)
        self._entered = False
        self._closing = False
        self._closed = False

    def __enter__(self):
        if self._closed:
            raise ValueError("stream is closed")

        if self._entered:
            raise ZstdError("cannot __enter__ multiple times")

        self._entered = True

        return self

    def __exit__(self, exc_type, exc_value, exc_tb):
        self._entered = False
        self.close()

        return False

    def __iter__(self):
        raise io.UnsupportedOperation()

    def __next__(self):
        raise io.UnsupportedOperation()

    def memory_size(self):
        return lib.ZSTD_sizeof_DCtx(self._decompressor._dctx)

    def close(self):
        if self._closed:
            return

        try:
            self._closing = True
            self.flush()
        finally:
            self._closing = False
            self._closed = True

        f = getattr(self._writer, "close", None)
        if self._closefd and f:
            f()

    @property
    def closed(self):
        return self._closed

    def fileno(self):
        f = getattr(self._writer, "fileno", None)
        if f:
            return f()
        else:
            raise OSError("fileno not available on underlying writer")

    def flush(self):
        if self._closed:
            raise ValueError("stream is closed")

        f = getattr(self._writer, "flush", None)
        if f and not self._closing:
            return f()

    def isatty(self):
        return False

    def readable(self):
        return False

    def readline(self, size=-1):
        raise io.UnsupportedOperation()

    def readlines(self, hint=-1):
        raise io.UnsupportedOperation()

    def seek(self, offset, whence=None):
        raise io.UnsupportedOperation()

    def seekable(self):
        return False

    def tell(self):
        raise io.UnsupportedOperation()

    def truncate(self, size=None):
        raise io.UnsupportedOperation()

    def writable(self):
        return True

    def writelines(self, lines):
        raise io.UnsupportedOperation()

    def read(self, size=-1):
        raise io.UnsupportedOperation()

    def readall(self):
        raise io.UnsupportedOperation()

    def readinto(self, b):
        raise io.UnsupportedOperation()

    def write(self, data):
        if self._closed:
            raise ValueError("stream is closed")

        total_write = 0

        in_buffer = ffi.new("ZSTD_inBuffer *")
        out_buffer = ffi.new("ZSTD_outBuffer *")

        data_buffer = ffi.from_buffer(data)
        in_buffer.src = data_buffer
        in_buffer.size = len(data_buffer)
        in_buffer.pos = 0

        dst_buffer = ffi.new("char[]", self._write_size)
        out_buffer.dst = dst_buffer
        out_buffer.size = len(dst_buffer)
        out_buffer.pos = 0

        dctx = self._decompressor._dctx

        while in_buffer.pos < in_buffer.size:
            zresult = lib.ZSTD_decompressStream(dctx, out_buffer, in_buffer)
            if lib.ZSTD_isError(zresult):
                raise ZstdError(
                    "zstd decompress error: %s" % _zstd_error(zresult)
                )

            if out_buffer.pos:
                self._writer.write(
                    ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
                )
                total_write += out_buffer.pos
                out_buffer.pos = 0

        if self._write_return_read:
            return in_buffer.pos
        else:
            return total_write


class ZstdDecompressor(object):
    """
    Context for performing zstandard decompression.

    Each instance is essentially a wrapper around a ``ZSTD_DCtx`` from zstd's
    C API.

    An instance can compress data various ways. Instances can be used multiple
    times.

    The interface of this class is very similar to
    :py:class:`zstandard.ZstdCompressor` (by design).

    Assume that each ``ZstdDecompressor`` instance can only handle a single
    logical compression operation at the same time. i.e. if you call a method
    like ``decompressobj()`` to obtain multiple objects derived from the same
    ``ZstdDecompressor`` instance and attempt to use them simultaneously, errors
    will likely occur.

    If you need to perform multiple logical decompression operations and you
    can't guarantee those operations are temporally non-overlapping, you need
    to obtain multiple ``ZstdDecompressor`` instances.

    Unless specified otherwise, assume that no two methods of
    ``ZstdDecompressor`` instances can be called from multiple Python
    threads simultaneously. In other words, assume instances are not thread safe
    unless stated otherwise.

    :param dict_data:
       Compression dictionary to use.
    :param max_window_size:
       Sets an upper limit on the window size for decompression operations in
       kibibytes. This setting can be used to prevent large memory allocations
       for inputs using large compression windows.
    :param format:
       Set the format of data for the decoder.

       By default this is ``zstandard.FORMAT_ZSTD1``. It can be set to
       ``zstandard.FORMAT_ZSTD1_MAGICLESS`` to allow decoding frames without
       the 4 byte magic header. Not all decompression APIs support this mode.
    """

    def __init__(self, dict_data=None, max_window_size=0, format=FORMAT_ZSTD1):
        self._dict_data = dict_data
        self._max_window_size = max_window_size
        self._format = format

        dctx = lib.ZSTD_createDCtx()
        if dctx == ffi.NULL:
            raise MemoryError()

        self._dctx = dctx

        # Defer setting up garbage collection until full state is loaded so
        # the memory size is more accurate.
        try:
            self._ensure_dctx()
        finally:
            self._dctx = ffi.gc(
                dctx, lib.ZSTD_freeDCtx, size=lib.ZSTD_sizeof_DCtx(dctx)
            )

    def memory_size(self):
        """Size of decompression context, in bytes.

        >>> dctx = zstandard.ZstdDecompressor()
        >>> size = dctx.memory_size()
        """
        return lib.ZSTD_sizeof_DCtx(self._dctx)

    def decompress(
        self,
        data,
        max_output_size=0,
        read_across_frames=False,
        allow_extra_data=True,
    ):
        """
        Decompress data in a single operation.

        This method will decompress the input data in a single operation and
        return the decompressed data.

        The input bytes are expected to contain at least 1 full Zstandard frame
        (something compressed with :py:meth:`ZstdCompressor.compress` or
        similar). If the input does not contain a full frame, an exception will
        be raised.

        ``read_across_frames`` controls whether to read multiple zstandard
        frames in the input. When False, decompression stops after reading the
        first frame. This feature is not yet implemented but the argument is
        provided for forward API compatibility when the default is changed to
        True in a future release. For now, if you need to decompress multiple
        frames, use an API like :py:meth:`ZstdCompressor.stream_reader` with
        ``read_across_frames=True``.

        ``allow_extra_data`` controls how to handle extra input data after a
        fully decoded frame. If False, any extra data (which could be a valid
        zstd frame) will result in ``ZstdError`` being raised. If True, extra
        data is silently ignored. The default will likely change to False in a
        future release when ``read_across_frames`` defaults to True.

        If the input contains extra data after a full frame, that extra input
        data is silently ignored. This behavior is undesirable in many scenarios
        and will likely be changed or controllable in a future release (see
        #181).

        If the frame header of the compressed data does not contain the content
        size, ``max_output_size`` must be specified or ``ZstdError`` will be
        raised. An allocation of size ``max_output_size`` will be performed and an
        attempt will be made to perform decompression into that buffer. If the
        buffer is too small or cannot be allocated, ``ZstdError`` will be
        raised. The buffer will be resized if it is too large.

        Uncompressed data could be much larger than compressed data. As a result,
        calling this function could result in a very large memory allocation
        being performed to hold the uncompressed data. This could potentially
        result in ``MemoryError`` or system memory swapping. If you don't need
        the full output data in a single contiguous array in memory, consider
        using streaming decompression for more resilient memory behavior.

        Usage:

        >>> dctx = zstandard.ZstdDecompressor()
        >>> decompressed = dctx.decompress(data)

        If the compressed data doesn't have its content size embedded within it,
        decompression can be attempted by specifying the ``max_output_size``
        argument:

        >>> dctx = zstandard.ZstdDecompressor()
        >>> uncompressed = dctx.decompress(data, max_output_size=1048576)

        Ideally, ``max_output_size`` will be identical to the decompressed
        output size.

        .. important::

           If the exact size of decompressed data is unknown (not passed in
           explicitly and not stored in the zstd frame), for performance
           reasons it is encouraged to use a streaming API.

        :param data:
           Compressed data to decompress.
        :param max_output_size:
           Integer max size of response.

           If ``0``, there is no limit and we can attempt to allocate an output
           buffer of infinite size.
        :return:
           ``bytes`` representing decompressed output.
        """

        if read_across_frames:
            raise ZstdError(
                "ZstdDecompressor.read_across_frames=True is not yet implemented"
            )

        self._ensure_dctx()

        data_buffer = ffi.from_buffer(data)

        output_size = lib.ZSTD_getFrameContentSize(
            data_buffer, len(data_buffer)
        )

        if output_size == lib.ZSTD_CONTENTSIZE_ERROR:
            raise ZstdError("error determining content size from frame header")
        elif output_size == 0:
            return b""
        elif output_size == lib.ZSTD_CONTENTSIZE_UNKNOWN:
            if not max_output_size:
                raise ZstdError(
                    "could not determine content size in frame header"
                )

            result_buffer = ffi.new("char[]", max_output_size)
            result_size = max_output_size
            output_size = 0
        else:
            result_buffer = ffi.new("char[]", output_size)
            result_size = output_size

        out_buffer = ffi.new("ZSTD_outBuffer *")
        out_buffer.dst = result_buffer
        out_buffer.size = result_size
        out_buffer.pos = 0

        in_buffer = ffi.new("ZSTD_inBuffer *")
        in_buffer.src = data_buffer
        in_buffer.size = len(data_buffer)
        in_buffer.pos = 0

        zresult = lib.ZSTD_decompressStream(self._dctx, out_buffer, in_buffer)
        if lib.ZSTD_isError(zresult):
            raise ZstdError("decompression error: %s" % _zstd_error(zresult))
        elif zresult:
            raise ZstdError(
                "decompression error: did not decompress full frame"
            )
        elif output_size and out_buffer.pos != output_size:
            raise ZstdError(
                "decompression error: decompressed %d bytes; expected %d"
                % (zresult, output_size)
            )
        elif not allow_extra_data and in_buffer.pos < in_buffer.size:
            count = in_buffer.size - in_buffer.pos

            raise ZstdError(
                "compressed input contains %d bytes of unused data, which is disallowed"
                % count
            )

        return ffi.buffer(result_buffer, out_buffer.pos)[:]

    def stream_reader(
        self,
        source,
        read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE,
        read_across_frames=False,
        closefd=True,
    ):
        """
        Read-only stream wrapper that performs decompression.

        This method obtains an object that conforms to the ``io.RawIOBase``
        interface and performs transparent decompression via ``read()``
        operations. Source data is obtained by calling ``read()`` on a
        source stream or object implementing the buffer protocol.

        See :py:class:`zstandard.ZstdDecompressionReader` for more documentation
        and usage examples.

        :param source:
           Source of compressed data to decompress. Can be any object
           with a ``read(size)`` method or that conforms to the buffer protocol.
        :param read_size:
           Integer number of bytes to read from the source and feed into the
           compressor at a time.
        :param read_across_frames:
           Whether to read data across multiple zstd frames. If False,
           decompression is stopped at frame boundaries.
        :param closefd:
           Whether to close the source stream when this instance is closed.
        :return:
           :py:class:`zstandard.ZstdDecompressionReader`.
        """
        self._ensure_dctx()
        return ZstdDecompressionReader(
            self, source, read_size, read_across_frames, closefd=closefd
        )

    def decompressobj(self, write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE):
        """Obtain a standard library compatible incremental decompressor.

        See :py:class:`ZstdDecompressionObj` for more documentation
        and usage examples.

        :param write_size:
        :return:
           :py:class:`zstandard.ZstdDecompressionObj`
        """
        if write_size < 1:
            raise ValueError("write_size must be positive")

        self._ensure_dctx()
        return ZstdDecompressionObj(self, write_size=write_size)

    def read_to_iter(
        self,
        reader,
        read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE,
        write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE,
        skip_bytes=0,
    ):
        """Read compressed data to an iterator of uncompressed chunks.

        This method will read data from ``reader``, feed it to a decompressor,
        and emit ``bytes`` chunks representing the decompressed result.

        >>> dctx = zstandard.ZstdDecompressor()
        >>> for chunk in dctx.read_to_iter(fh):
        ...     # Do something with original data.

        ``read_to_iter()`` accepts an object with a ``read(size)`` method that
        will return compressed bytes or an object conforming to the buffer
        protocol.

        ``read_to_iter()`` returns an iterator whose elements are chunks of the
        decompressed data.

        The size of requested ``read()`` from the source can be specified:

        >>> dctx = zstandard.ZstdDecompressor()
        >>> for chunk in dctx.read_to_iter(fh, read_size=16384):
        ...    pass

        It is also possible to skip leading bytes in the input data:

        >>> dctx = zstandard.ZstdDecompressor()
        >>> for chunk in dctx.read_to_iter(fh, skip_bytes=1):
        ...    pass

        .. tip::

           Skipping leading bytes is useful if the source data contains extra
           *header* data. Traditionally, you would need to create a slice or
           ``memoryview`` of the data you want to decompress. This would create
           overhead. It is more efficient to pass the offset into this API.

        Similarly to :py:meth:`ZstdCompressor.read_to_iter`, the consumer of the
        iterator controls when data is decompressed. If the iterator isn't consumed,
        decompression is put on hold.

        When ``read_to_iter()`` is passed an object conforming to the buffer protocol,
        the behavior may seem similar to what occurs when the simple decompression
        API is used. However, this API works when the decompressed size is unknown.
        Furthermore, if feeding large inputs, the decompressor will work in chunks
        instead of performing a single operation.

        :param reader:
           Source of compressed data. Can be any object with a
           ``read(size)`` method or any object conforming to the buffer
           protocol.
        :param read_size:
           Integer size of data chunks to read from ``reader`` and feed into
           the decompressor.
        :param write_size:
           Integer size of data chunks to emit from iterator.
        :param skip_bytes:
           Integer number of bytes to skip over before sending data into
           the decompressor.
        :return:
           Iterator of ``bytes`` representing uncompressed data.
        """

        if skip_bytes >= read_size:
            raise ValueError("skip_bytes must be smaller than read_size")

        if hasattr(reader, "read"):
            have_read = True
        elif hasattr(reader, "__getitem__"):
            have_read = False
            buffer_offset = 0
            size = len(reader)
        else:
            raise ValueError(
                "must pass an object with a read() method or "
                "conforms to buffer protocol"
            )

        if skip_bytes:
            if have_read:
                reader.read(skip_bytes)
            else:
                if skip_bytes > size:
                    raise ValueError("skip_bytes larger than first input chunk")

                buffer_offset = skip_bytes

        self._ensure_dctx()

        in_buffer = ffi.new("ZSTD_inBuffer *")
        out_buffer = ffi.new("ZSTD_outBuffer *")

        dst_buffer = ffi.new("char[]", write_size)
        out_buffer.dst = dst_buffer
        out_buffer.size = len(dst_buffer)
        out_buffer.pos = 0

        while True:
            assert out_buffer.pos == 0

            if have_read:
                read_result = reader.read(read_size)
            else:
                remaining = size - buffer_offset
                slice_size = min(remaining, read_size)
                read_result = reader[buffer_offset : buffer_offset + slice_size]
                buffer_offset += slice_size

            # No new input. Break out of read loop.
            if not read_result:
                break

            # Feed all read data into decompressor and emit output until
            # exhausted.
            read_buffer = ffi.from_buffer(read_result)
            in_buffer.src = read_buffer
            in_buffer.size = len(read_buffer)
            in_buffer.pos = 0

            while in_buffer.pos < in_buffer.size:
                assert out_buffer.pos == 0

                zresult = lib.ZSTD_decompressStream(
                    self._dctx, out_buffer, in_buffer
                )
                if lib.ZSTD_isError(zresult):
                    raise ZstdError(
                        "zstd decompress error: %s" % _zstd_error(zresult)
                    )

                if out_buffer.pos:
                    data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
                    out_buffer.pos = 0
                    yield data

                if zresult == 0:
                    return

            # Repeat loop to collect more input data.
            continue

        # If we get here, input is exhausted.

    def stream_writer(
        self,
        writer,
        write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE,
        write_return_read=True,
        closefd=True,
    ):
        """
        Push-based stream wrapper that performs decompression.

        This method constructs a stream wrapper that conforms to the
        ``io.RawIOBase`` interface and performs transparent decompression
        when writing to a wrapper stream.

        See :py:class:`zstandard.ZstdDecompressionWriter` for more documentation
        and usage examples.

        :param writer:
           Destination for decompressed output. Can be any object with a
           ``write(data)``.
        :param write_size:
           Integer size of chunks to ``write()`` to ``writer``.
        :param write_return_read:
           Whether ``write()`` should return the number of bytes of input
           consumed. If False, ``write()`` returns the number of bytes sent
           to the inner stream.
        :param closefd:
           Whether to ``close()`` the inner stream when this stream is closed.
        :return:
           :py:class:`zstandard.ZstdDecompressionWriter`
        """
        if not hasattr(writer, "write"):
            raise ValueError("must pass an object with a write() method")

        return ZstdDecompressionWriter(
            self,
            writer,
            write_size,
            write_return_read,
            closefd=closefd,
        )

    def copy_stream(
        self,
        ifh,
        ofh,
        read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE,
        write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE,
    ):
        """
        Copy data between streams, decompressing in the process.

        Compressed data will be read from ``ifh``, decompressed, and written
        to ``ofh``.

        >>> dctx = zstandard.ZstdDecompressor()
        >>> dctx.copy_stream(ifh, ofh)

        e.g. to decompress a file to another file:

        >>> dctx = zstandard.ZstdDecompressor()
        >>> with open(input_path, 'rb') as ifh, open(output_path, 'wb') as ofh:
        ...     dctx.copy_stream(ifh, ofh)

        The size of chunks being ``read()`` and ``write()`` from and to the
        streams can be specified:

        >>> dctx = zstandard.ZstdDecompressor()
        >>> dctx.copy_stream(ifh, ofh, read_size=8192, write_size=16384)

        :param ifh:
           Source stream to read compressed data from.

           Must have a ``read()`` method.
        :param ofh:
           Destination stream to write uncompressed data to.

           Must have a ``write()`` method.
        :param read_size:
           The number of bytes to ``read()`` from the source in a single
           operation.
        :param write_size:
           The number of bytes to ``write()`` to the destination in a single
           operation.
        :return:
           2-tuple of integers representing the number of bytes read and
           written, respectively.
        """

        if not hasattr(ifh, "read"):
            raise ValueError("first argument must have a read() method")
        if not hasattr(ofh, "write"):
            raise ValueError("second argument must have a write() method")

        self._ensure_dctx()

        in_buffer = ffi.new("ZSTD_inBuffer *")
        out_buffer = ffi.new("ZSTD_outBuffer *")

        dst_buffer = ffi.new("char[]", write_size)
        out_buffer.dst = dst_buffer
        out_buffer.size = write_size
        out_buffer.pos = 0

        total_read, total_write = 0, 0

        # Read all available input.
        while True:
            data = ifh.read(read_size)
            if not data:
                break

            data_buffer = ffi.from_buffer(data)
            total_read += len(data_buffer)
            in_buffer.src = data_buffer
            in_buffer.size = len(data_buffer)
            in_buffer.pos = 0

            # Flush all read data to output.
            while in_buffer.pos < in_buffer.size:
                zresult = lib.ZSTD_decompressStream(
                    self._dctx, out_buffer, in_buffer
                )
                if lib.ZSTD_isError(zresult):
                    raise ZstdError(
                        "zstd decompressor error: %s" % _zstd_error(zresult)
                    )

                if out_buffer.pos:
                    ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos))
                    total_write += out_buffer.pos
                    out_buffer.pos = 0

            # Continue loop to keep reading.

        return total_read, total_write

    def decompress_content_dict_chain(self, frames):
        """
        Decompress a series of frames using the content dictionary chaining technique.

        Such a list of frames is produced by compressing discrete inputs where
        each non-initial input is compressed with a *prefix* dictionary consisting
        of the content of the previous input.

        For example, say you have the following inputs:

        >>> inputs = [b"input 1", b"input 2", b"input 3"]

        The zstd frame chain consists of:

        1. ``b"input 1"`` compressed in standalone/discrete mode
        2. ``b"input 2"`` compressed using ``b"input 1"`` as a *prefix* dictionary
        3. ``b"input 3"`` compressed using ``b"input 2"`` as a *prefix* dictionary

        Each zstd frame **must** have the content size written.

        The following Python code can be used to produce a *prefix dictionary chain*:

        >>> def make_chain(inputs):
        ...    frames = []
        ...
        ...    # First frame is compressed in standalone/discrete mode.
        ...    zctx = zstandard.ZstdCompressor()
        ...    frames.append(zctx.compress(inputs[0]))
        ...
        ...    # Subsequent frames use the previous fulltext as a prefix dictionary
        ...    for i, raw in enumerate(inputs[1:]):
        ...        dict_data = zstandard.ZstdCompressionDict(
        ...            inputs[i], dict_type=zstandard.DICT_TYPE_RAWCONTENT)
        ...        zctx = zstandard.ZstdCompressor(dict_data=dict_data)
        ...        frames.append(zctx.compress(raw))
        ...
        ...    return frames

        ``decompress_content_dict_chain()`` returns the uncompressed data of the last
        element in the input chain.

        .. note::

           It is possible to implement *prefix dictionary chain* decompression
           on top of other APIs. However, this function will likely be faster -
           especially for long input chains - as it avoids the overhead of
           instantiating and passing around intermediate objects between
           multiple functions.

        :param frames:
           List of ``bytes`` holding compressed zstd frames.
        :return:
        """
        if not isinstance(frames, list):
            raise TypeError("argument must be a list")

        if not frames:
            raise ValueError("empty input chain")

        # First chunk should not be using a dictionary. We handle it specially.
        chunk = frames[0]
        if not isinstance(chunk, bytes):
            raise ValueError("chunk 0 must be bytes")

        # All chunks should be zstd frames and should have content size set.
        chunk_buffer = ffi.from_buffer(chunk)
        params = ffi.new("ZSTD_frameHeader *")
        zresult = lib.ZSTD_getFrameHeader(
            params, chunk_buffer, len(chunk_buffer)
        )
        if lib.ZSTD_isError(zresult):
            raise ValueError("chunk 0 is not a valid zstd frame")
        elif zresult:
            raise ValueError("chunk 0 is too small to contain a zstd frame")

        if params.frameContentSize == lib.ZSTD_CONTENTSIZE_UNKNOWN:
            raise ValueError("chunk 0 missing content size in frame")

        self._ensure_dctx(load_dict=False)

        last_buffer = ffi.new("char[]", params.frameContentSize)

        out_buffer = ffi.new("ZSTD_outBuffer *")
        out_buffer.dst = last_buffer
        out_buffer.size = len(last_buffer)
        out_buffer.pos = 0

        in_buffer = ffi.new("ZSTD_inBuffer *")
        in_buffer.src = chunk_buffer
        in_buffer.size = len(chunk_buffer)
        in_buffer.pos = 0

        zresult = lib.ZSTD_decompressStream(self._dctx, out_buffer, in_buffer)
        if lib.ZSTD_isError(zresult):
            raise ZstdError(
                "could not decompress chunk 0: %s" % _zstd_error(zresult)
            )
        elif zresult:
            raise ZstdError("chunk 0 did not decompress full frame")

        # Special case of chain length of 1
        if len(frames) == 1:
            return ffi.buffer(last_buffer, len(last_buffer))[:]

        i = 1
        while i < len(frames):
            chunk = frames[i]
            if not isinstance(chunk, bytes):
                raise ValueError("chunk %d must be bytes" % i)

            chunk_buffer = ffi.from_buffer(chunk)
            zresult = lib.ZSTD_getFrameHeader(
                params, chunk_buffer, len(chunk_buffer)
            )
            if lib.ZSTD_isError(zresult):
                raise ValueError("chunk %d is not a valid zstd frame" % i)
            elif zresult:
                raise ValueError(
                    "chunk %d is too small to contain a zstd frame" % i
                )

            if params.frameContentSize == lib.ZSTD_CONTENTSIZE_UNKNOWN:
                raise ValueError("chunk %d missing content size in frame" % i)

            dest_buffer = ffi.new("char[]", params.frameContentSize)

            out_buffer.dst = dest_buffer
            out_buffer.size = len(dest_buffer)
            out_buffer.pos = 0

            in_buffer.src = chunk_buffer
            in_buffer.size = len(chunk_buffer)
            in_buffer.pos = 0

            zresult = lib.ZSTD_decompressStream(
                self._dctx, out_buffer, in_buffer
            )
            if lib.ZSTD_isError(zresult):
                raise ZstdError(
                    "could not decompress chunk %d: %s" % _zstd_error(zresult)
                )
            elif zresult:
                raise ZstdError("chunk %d did not decompress full frame" % i)

            last_buffer = dest_buffer
            i += 1

        return ffi.buffer(last_buffer, len(last_buffer))[:]

    def multi_decompress_to_buffer(
        self, frames, decompressed_sizes=None, threads=0
    ):
        """
        Decompress multiple zstd frames to output buffers as a single operation.

        (Experimental. Not available in CFFI backend.)

        Compressed frames can be passed to the function as a
        ``BufferWithSegments``, a ``BufferWithSegmentsCollection``, or as a
        list containing objects that conform to the buffer protocol. For best
        performance, pass a ``BufferWithSegmentsCollection`` or a
        ``BufferWithSegments``, as minimal input validation will be done for
        that type. If calling from Python (as opposed to C), constructing one
        of these instances may add overhead cancelling out the performance
        overhead of validation for list inputs.

        Returns a ``BufferWithSegmentsCollection`` containing the decompressed
        data. All decompressed data is allocated in a single memory buffer. The
        ``BufferWithSegments`` instance tracks which objects are at which offsets
        and their respective lengths.

        >>> dctx = zstandard.ZstdDecompressor()
        >>> results = dctx.multi_decompress_to_buffer([b'...', b'...'])

        The decompressed size of each frame MUST be discoverable. It can either be
        embedded within the zstd frame or passed in via the ``decompressed_sizes``
        argument.

        The ``decompressed_sizes`` argument is an object conforming to the buffer
        protocol which holds an array of 64-bit unsigned integers in the machine's
        native format defining the decompressed sizes of each frame. If this argument
        is passed, it avoids having to scan each frame for its decompressed size.
        This frame scanning can add noticeable overhead in some scenarios.

        >>> frames = [...]
        >>> sizes = struct.pack('=QQQQ', len0, len1, len2, len3)
        >>>
        >>> dctx = zstandard.ZstdDecompressor()
        >>> results = dctx.multi_decompress_to_buffer(frames, decompressed_sizes=sizes)

        .. note::

           It is possible to pass a ``mmap.mmap()`` instance into this function by
           wrapping it with a ``BufferWithSegments`` instance (which will define the
           offsets of frames within the memory mapped region).

        This function is logically equivalent to performing
        :py:meth:`ZstdCompressor.decompress` on each input frame and returning the
        result.

        This function exists to perform decompression on multiple frames as fast
        as possible by having as little overhead as possible. Since decompression is
        performed as a single operation and since the decompressed output is stored in
        a single buffer, extra memory allocations, Python objects, and Python function
        calls are avoided. This is ideal for scenarios where callers know up front that
        they need to access data for multiple frames, such as when  *delta chains* are
        being used.

        Currently, the implementation always spawns multiple threads when requested,
        even if the amount of work to do is small. In the future, it will be smarter
        about avoiding threads and their associated overhead when the amount of
        work to do is small.

        :param frames:
           Source defining zstd frames to decompress.
        :param decompressed_sizes:
           Array of integers representing sizes of decompressed zstd frames.
        :param threads:
           How many threads to use for decompression operations.

           Negative values will use the same number of threads as logical CPUs
           on the machine. Values ``0`` or ``1`` use a single thread.
        :return:
           ``BufferWithSegmentsCollection``
        """
        raise NotImplementedError()

    def _ensure_dctx(self, load_dict=True):
        lib.ZSTD_DCtx_reset(self._dctx, lib.ZSTD_reset_session_only)

        if self._max_window_size:
            zresult = lib.ZSTD_DCtx_setMaxWindowSize(
                self._dctx, self._max_window_size
            )
            if lib.ZSTD_isError(zresult):
                raise ZstdError(
                    "unable to set max window size: %s" % _zstd_error(zresult)
                )

        zresult = lib.ZSTD_DCtx_setParameter(
            self._dctx, lib.ZSTD_d_format, self._format
        )
        if lib.ZSTD_isError(zresult):
            raise ZstdError(
                "unable to set decoding format: %s" % _zstd_error(zresult)
            )

        if self._dict_data and load_dict:
            zresult = lib.ZSTD_DCtx_refDDict(self._dctx, self._dict_data._ddict)
            if lib.ZSTD_isError(zresult):
                raise ZstdError(
                    "unable to reference prepared dictionary: %s"
                    % _zstd_error(zresult)
                )