from __future__ import annotations
import os
import pathlib
import re
import sys
from functools import lru_cache
from typing import Any
from ._file_contents import _VIRTUAL_FILES, FileContents, VirtualFileContents
_BINARY_TYPES = (memoryview, bytearray, bytes)
_WIDGET_MIME_TYPE = "application/vnd.jupyter.widget-view+json"
_PROTOCOL_VERSION_MAJOR = 2
_PROTOCOL_VERSION_MINOR = 1
_PROTOCOL_VERSION = f"{_PROTOCOL_VERSION_MAJOR}.{_PROTOCOL_VERSION_MINOR}.0"
_ANYWIDGET_ID_KEY = "_anywidget_id"
_ESM_KEY = "_esm"
_CSS_KEY = "_css"
_DEFAULT_ESM = """
function render(view) {
console.log("Dev note: No _esm defined for this widget:", view);
let url = "https://anywidget.dev/en/getting-started/";
view.el.innerHTML = `
Dev note:
Implement an _esm
attribute
on AnyWidget subclass ${view.model.get('_anywidget_id')}
to customize this widget.
`;
}
export default { render };
"""
# next 3 functions vendored with modifications from ipywidgets
# BSD-3-Clause
# Copyright (c) 2015 Project Jupyter Contributors
# https://github.com/jupyter-widgets/ipywidgets/blob/7325e5952efb71bd69692b2d7ed815646c0ac521/python/ipywidgets/ipywidgets/widgets/widget.py
def _separate_buffers(
substate: Any, path: list, buffer_paths: list, buffers: list
) -> Any:
"""For internal, see _remove_buffers.
remove binary types from dicts and lists, but keep track of their paths any part of
the dict/list that needs modification will be cloned, so the original stays
untouched e.g. {'x': {'ar': ar}, 'y': [ar2, ar3]}, where ar/ar2/ar3 are binary types
will result in {'x': {}, 'y': [None, None]}, [ar, ar2, ar3], [['x', 'ar'], ['y', 0],
['y', 1]] instead of removing elements from the list, this will make replacing the
buffers on the js side much easier
"""
_t = type(substate)
_sub: list | dict | None = None
if isinstance(substate, (list, tuple)):
for i, v in enumerate(substate):
if isinstance(v, _BINARY_TYPES):
if _sub is None:
_sub = list(substate) # shallow clone list/tuple
_sub[i] = None
buffers.append(v)
buffer_paths.append([*path, i])
elif isinstance(v, (dict, list, tuple)):
_v = _separate_buffers(v, [*path, i], buffer_paths, buffers)
if v is not _v: # only assign when value changed
if _sub is None:
_sub = list(substate) # shallow clone list/tuple
_sub[i] = _v
elif isinstance(substate, dict):
for k, v in substate.items():
if isinstance(v, _BINARY_TYPES):
if _sub is None:
_sub = dict(substate) # shallow clone dict
del _sub[k]
buffers.append(v)
buffer_paths.append([*path, k])
elif isinstance(v, (dict, list, tuple)):
_v = _separate_buffers(v, [*path, k], buffer_paths, buffers)
if v is not _v: # only assign when value changed
if _sub is None:
_sub = dict(substate) # shallow clone dict
_sub[k] = _v
else: # pragma: no cover
raise ValueError(f"expected state to be a list or dict, not {substate!r}")
return _sub if _sub is not None else substate
def remove_buffers(state: Any) -> tuple[Any, list[list], list[memoryview]]:
"""Return (state_without_buffers, buffer_paths, buffers) for binary message parts.
A binary message part is a memoryview, bytearray, or python 3 bytes object.
Examples
--------
>>> ar1 = np.arange(8).reshape(4, 2)
>>> ar2 = np.arange(100).reshape(10, 10)
>>> state = {
'plain': [0, 'text'],
'x': {'ar': memoryview(ar1)},
'y': {'shape': (10,10), 'data': memoryview(ar2)}
}
>>> _remove_buffers(state)
(
{
'plain': [0, 'text'],
'x': {},
'y': {'shape': (10, 10)}
},
[['x', 'ar'], ['y', 'data']],
[, ]
)
"""
buffer_paths: list = []
buffers: list[memoryview] = []
state = _separate_buffers(state, [], buffer_paths, buffers)
return state, buffer_paths, buffers
def put_buffers(
state: dict,
buffer_paths: list[list[str | int]],
buffers: list[memoryview],
) -> None:
"""The inverse of _remove_buffers.
...except here we modify the existing dict/lists.
Modifying should be fine, since this is used when state comes from the wire.
"""
for buffer_path, buffer in zip(buffer_paths, buffers):
# we'd like to set say sync_data['x'][0]['y'] = buffer
# where buffer_path in this example would be ['x', 0, 'y']
obj = state
for key in buffer_path[:-1]:
obj = obj[key]
obj[buffer_path[-1]] = buffer
def in_colab() -> bool:
"""Determines whether in Google Colab."""
return "google.colab.output" in sys.modules
@lru_cache(maxsize=None)
def enable_custom_widget_manager_once() -> None:
"""Enables Google Colab's custom widget manager so third-party widgets display.
See https://github.com/googlecolab/colabtools/issues/498#issuecomment-998308485
"""
sys.modules["google.colab.output"].enable_custom_widget_manager()
def get_repr_metadata() -> dict:
"""Creates metadata dict for _repr_mimebundle_.
If in Google Colab, enables custom widgets as a side effect
and injects the `custom_widget_manager` metadata for more
consistent rendering.
See https://github.com/manzt/anywidget/issues/63#issuecomment-1427194000.
"""
if not in_colab():
return {}
enable_custom_widget_manager_once()
url = sys.modules["google.colab.output"]._widgets._installed_url
if url is None:
return {}
return {_WIDGET_MIME_TYPE: {"colab": {"custom_widget_manager": {"url": url}}}}
def _is_hmr_enabled() -> bool:
return os.getenv("ANYWIDGET_HMR") == "1"
def _should_start_thread(path: pathlib.Path) -> bool:
if "site-packages" in path.parts:
# File is inside site-packages, likely not a local development install
return False
if "dist-packages" in path.parts:
# Debian-specific directory, where python packages are installed in Colab.
return False
# If we're in dev mode, we should start a thread to watch the file for changes.
if not _is_hmr_enabled():
return False
try:
import watchfiles # noqa: F401
except ImportError:
import warnings
warnings.warn(
"anywidget: Live-reloading feature is disabled."
" To enable, please install the 'watchfiles' package.",
stacklevel=2,
)
return False
return True
def try_file_path(x: Any) -> pathlib.Path | None:
"""If possible coerce x into a pathlib.Path object.
If a string, we handle the following cases:
- If it's a URL, return None.
- If it's a multi-line string, return None.
- If it's a single-line string with a file extension, return a pathlib.Path object.
- Otherwise, return None.
Parameters
----------
x : Any
The object to try to coerce into a pathlib.Path object.
Returns
-------
pathlib.Path | None
A pathlib.Path object if x is a file path, otherwise None.
"""
# Already a pathlib.Path
if isinstance(x, pathlib.Path):
return x
if not isinstance(x, str):
return None
# Handle the string
# Ignore URLs
if x.startswith("http://") or x.startswith("https://"):
return None
# Ignore multi-line strings (probably raw file contents)
is_multi_line = "\n" in x or "\r" in x
if is_multi_line:
return None
# Is a single line string, but we don't know if it's a file path or raw contents.
# Just check if it has a file extension for now.
includes_file_suffix = re.search(r"[a-zA-Z0-9]\.[a-zA-Z0-9]+$", x) is not None
if includes_file_suffix:
return pathlib.Path(x).resolve().absolute()
return None
def try_file_contents(x: Any) -> FileContents | VirtualFileContents | None:
"""Try to coerce x into a FileContents object."""
if x in _VIRTUAL_FILES:
return _VIRTUAL_FILES[x]
maybe_path = try_file_path(x)
if maybe_path is None:
return None
path = maybe_path
if not path.is_file():
raise FileNotFoundError(f"File not found: {path}")
return FileContents(
path=path,
start_thread=_should_start_thread(path),
)
def repr_mimebundle(
model_id: str,
repr_text: str,
) -> tuple[dict[str, Any], dict[str, Any]]:
"""Create a MIME bundle for a widget representation."""
data = {
"text/plain": repr_text,
_WIDGET_MIME_TYPE: {
"version_major": _PROTOCOL_VERSION_MAJOR,
"version_minor": _PROTOCOL_VERSION_MINOR,
"model_id": model_id,
},
}
return data, get_repr_metadata()