from __future__ import annotations

import os
import pathlib
import re
import sys
from functools import lru_cache
from typing import Any

from ._file_contents import _VIRTUAL_FILES, FileContents, VirtualFileContents

_BINARY_TYPES = (memoryview, bytearray, bytes)
_WIDGET_MIME_TYPE = "application/vnd.jupyter.widget-view+json"
_PROTOCOL_VERSION_MAJOR = 2
_PROTOCOL_VERSION_MINOR = 1
_PROTOCOL_VERSION = f"{_PROTOCOL_VERSION_MAJOR}.{_PROTOCOL_VERSION_MINOR}.0"
_ANYWIDGET_ID_KEY = "_anywidget_id"
_ESM_KEY = "_esm"
_CSS_KEY = "_css"
_DEFAULT_ESM = """
function render(view) {
  console.log("Dev note: No _esm defined for this widget:", view);
  let url = "https://anywidget.dev/en/getting-started/";
  view.el.innerHTML = `<p>
    <strong>Dev note</strong>:
    <a href='${url}' target='blank'>Implement an <code>_esm</code> attribute</a>
    on AnyWidget subclass <code>${view.model.get('_anywidget_id')}</code>
    to customize this widget.
  </p>`;
}
export default { render };
"""

# next 3 functions vendored with modifications from ipywidgets
# BSD-3-Clause
# Copyright (c) 2015 Project Jupyter Contributors
# https://github.com/jupyter-widgets/ipywidgets/blob/7325e5952efb71bd69692b2d7ed815646c0ac521/python/ipywidgets/ipywidgets/widgets/widget.py


def _separate_buffers(
    substate: Any, path: list, buffer_paths: list, buffers: list
) -> Any:
    """For internal, see _remove_buffers.

    remove binary types from dicts and lists, but keep track of their paths any part of
    the dict/list that needs modification will be cloned, so the original stays
    untouched e.g. {'x': {'ar': ar}, 'y': [ar2, ar3]}, where ar/ar2/ar3 are binary types
    will result in {'x': {}, 'y': [None, None]}, [ar, ar2, ar3], [['x', 'ar'], ['y', 0],
    ['y', 1]] instead of removing elements from the list, this will make replacing the
    buffers on the js side much easier
    """
    _t = type(substate)
    _sub: list | dict | None = None
    if isinstance(substate, (list, tuple)):
        for i, v in enumerate(substate):
            if isinstance(v, _BINARY_TYPES):
                if _sub is None:
                    _sub = list(substate)  # shallow clone list/tuple
                _sub[i] = None
                buffers.append(v)
                buffer_paths.append([*path, i])
            elif isinstance(v, (dict, list, tuple)):
                _v = _separate_buffers(v, [*path, i], buffer_paths, buffers)
                if v is not _v:  # only assign when value changed
                    if _sub is None:
                        _sub = list(substate)  # shallow clone list/tuple
                    _sub[i] = _v
    elif isinstance(substate, dict):
        for k, v in substate.items():
            if isinstance(v, _BINARY_TYPES):
                if _sub is None:
                    _sub = dict(substate)  # shallow clone dict
                del _sub[k]
                buffers.append(v)
                buffer_paths.append([*path, k])
            elif isinstance(v, (dict, list, tuple)):
                _v = _separate_buffers(v, [*path, k], buffer_paths, buffers)
                if v is not _v:  # only assign when value changed
                    if _sub is None:
                        _sub = dict(substate)  # shallow clone dict
                    _sub[k] = _v
    else:  # pragma: no cover
        raise ValueError(f"expected state to be a list or dict, not {substate!r}")
    return _sub if _sub is not None else substate


def remove_buffers(state: Any) -> tuple[Any, list[list], list[memoryview]]:
    """Return (state_without_buffers, buffer_paths, buffers) for binary message parts.

    A binary message part is a memoryview, bytearray, or python 3 bytes object.

    Examples
    --------
    >>> ar1 = np.arange(8).reshape(4, 2)
    >>> ar2 = np.arange(100).reshape(10, 10)
    >>> state = {
            'plain': [0, 'text'],
            'x': {'ar': memoryview(ar1)},
            'y': {'shape': (10,10), 'data': memoryview(ar2)}
        }
    >>> _remove_buffers(state)
    (
        {
            'plain': [0, 'text'],
            'x': {},
            'y': {'shape': (10, 10)}
        },
        [['x', 'ar'], ['y', 'data']],
        [<memory at 0x114e7fac0>, <memory at 0x114e7fed0>]
    )
    """
    buffer_paths: list = []
    buffers: list[memoryview] = []
    state = _separate_buffers(state, [], buffer_paths, buffers)
    return state, buffer_paths, buffers


def put_buffers(
    state: dict,
    buffer_paths: list[list[str | int]],
    buffers: list[memoryview],
) -> None:
    """The inverse of _remove_buffers.

    ...except here we modify the existing dict/lists.
    Modifying should be fine, since this is used when state comes from the wire.
    """
    for buffer_path, buffer in zip(buffer_paths, buffers):
        # we'd like to set say sync_data['x'][0]['y'] = buffer
        # where buffer_path in this example would be ['x', 0, 'y']
        obj = state
        for key in buffer_path[:-1]:
            obj = obj[key]
        obj[buffer_path[-1]] = buffer


def in_colab() -> bool:
    """Determines whether in Google Colab."""
    return "google.colab.output" in sys.modules


@lru_cache(maxsize=None)
def enable_custom_widget_manager_once() -> None:
    """Enables Google Colab's custom widget manager so third-party widgets display.

    See https://github.com/googlecolab/colabtools/issues/498#issuecomment-998308485
    """
    sys.modules["google.colab.output"].enable_custom_widget_manager()


def get_repr_metadata() -> dict:
    """Creates metadata dict for _repr_mimebundle_.

    If in Google Colab, enables custom widgets as a side effect
    and injects the `custom_widget_manager` metadata for more
    consistent rendering.

    See https://github.com/manzt/anywidget/issues/63#issuecomment-1427194000.
    """
    if not in_colab():
        return {}

    enable_custom_widget_manager_once()
    url = sys.modules["google.colab.output"]._widgets._installed_url

    if url is None:
        return {}

    return {_WIDGET_MIME_TYPE: {"colab": {"custom_widget_manager": {"url": url}}}}


def _is_hmr_enabled() -> bool:
    return os.getenv("ANYWIDGET_HMR") == "1"


def _should_start_thread(path: pathlib.Path) -> bool:
    if "site-packages" in path.parts:
        # File is inside site-packages, likely not a local development install
        return False

    if "dist-packages" in path.parts:
        # Debian-specific directory, where python packages are installed in Colab.
        return False

    # If we're in dev mode, we should start a thread to watch the file for changes.
    if not _is_hmr_enabled():
        return False

    try:
        import watchfiles  # noqa: F401
    except ImportError:
        import warnings

        warnings.warn(
            "anywidget: Live-reloading feature is disabled."
            " To enable, please install the 'watchfiles' package.",
            stacklevel=2,
        )

        return False

    return True


def try_file_path(x: Any) -> pathlib.Path | None:
    """If possible coerce x into a pathlib.Path object.

    If a string, we handle the following cases:

    - If it's a URL, return None.
    - If it's a multi-line string, return None.
    - If it's a single-line string with a file extension, return a pathlib.Path object.
    - Otherwise, return None.

    Parameters
    ----------
    x : Any
        The object to try to coerce into a pathlib.Path object.

    Returns
    -------
    pathlib.Path | None
        A pathlib.Path object if x is a file path, otherwise None.
    """
    # Already a pathlib.Path
    if isinstance(x, pathlib.Path):
        return x

    if not isinstance(x, str):
        return None

    # Handle the string

    # Ignore URLs
    if x.startswith("http://") or x.startswith("https://"):
        return None

    # Ignore multi-line strings (probably raw file contents)
    is_multi_line = "\n" in x or "\r" in x
    if is_multi_line:
        return None

    # Is a single line string, but we don't know if it's a file path or raw contents.

    # Just check if it has a file extension for now.
    includes_file_suffix = re.search(r"[a-zA-Z0-9]\.[a-zA-Z0-9]+$", x) is not None
    if includes_file_suffix:
        return pathlib.Path(x).resolve().absolute()

    return None


def try_file_contents(x: Any) -> FileContents | VirtualFileContents | None:
    """Try to coerce x into a FileContents object."""
    if x in _VIRTUAL_FILES:
        return _VIRTUAL_FILES[x]

    maybe_path = try_file_path(x)
    if maybe_path is None:
        return None

    path = maybe_path
    if not path.is_file():
        raise FileNotFoundError(f"File not found: {path}")
    return FileContents(
        path=path,
        start_thread=_should_start_thread(path),
    )


def repr_mimebundle(
    model_id: str,
    repr_text: str,
) -> tuple[dict[str, Any], dict[str, Any]]:
    """Create a MIME bundle for a widget representation."""
    data = {
        "text/plain": repr_text,
        _WIDGET_MIME_TYPE: {
            "version_major": _PROTOCOL_VERSION_MAJOR,
            "version_minor": _PROTOCOL_VERSION_MINOR,
            "model_id": model_id,
        },
    }
    return data, get_repr_metadata()