import sys
# This needs to be very early, dependency loading can fail
python_version = (
f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
)
print(f"Using Python {python_version} at {sys.executable}", file=sys.stderr)
import os
import importlib
import json
from typing import TypedDict, cast, List, Optional, Literal
from xmlrpc.client import boolean
import uvicorn
from asgiref.typing import (
ASGI3Application,
Scope,
ASGIReceiveCallable,
ASGISendCallable,
ASGISendEvent,
)
from starlette.responses import PlainTextResponse
if sys.version_info >= (3, 8):
from importlib import metadata
else:
import importlib_metadata as metadata
from shiny_express import escape_to_var_name, is_express_app
class ShinyInput(TypedDict):
appDir: str
port: str
sharedSecret: str
reconnect: boolean
disableProtocols: List[str]
gaTrackingId: Optional[str]
shinyServerVersion: str
workerId: str
mode: Literal["shiny-python"]
pandocPath: str
logFilePath: str
sanitizeErrors: boolean
bookmarkStateDir: Optional[str]
# Do not allow any HTTP or WebSocket requests to succeed unless the
# shiny-shared-secret header is present and has the correct value
class SharedSecretMiddleware:
def __init__(self, app: ASGI3Application, sharedSecret: str):
self.app = app
self.sharedSecret: bytes = sharedSecret.encode("utf-8")
async def __call__(
self, scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable
) -> None:
if not self.check_secret(scope):
resp = cast(
ASGI3Application,
PlainTextResponse(
"Direct access to this content is not permitted.", 403
),
)
return await resp(scope, receive, send)
await self.app(scope, receive, send)
def check_secret(self, scope: Scope) -> boolean:
# We're only responsible for securing HTTP and WebSocket
if not scope["type"] == "http" and not scope["type"] == "websocket":
return True
# name and value are bytes, not strings
for [name, value] in scope["headers"]:
if name == b"shiny-shared-secret":
if value == self.sharedSecret:
return True
break
return False
class ShinyInjectHeadMiddleware:
def __init__(self, app: ASGI3Application, input: ShinyInput):
self.app = app
reconnect = "true" if input["reconnect"] else "false"
if input["disableProtocols"] and len(input["disableProtocols"]) > 0:
disable_protocols = '"' + '","'.join(input["disableProtocols"]) + '"'
else:
disable_protocols = ""
gaTrackingCode = ""
if input["gaTrackingId"]:
gaID = input["gaTrackingId"]
if gaID[:3] == "UA-":
# Deprecated Google Analytics with Universal Analytics ID
gaTrackingCode = """
""".format(
gaID
)
else:
gaTrackingCode = """
""".format(
gaID, gaID
)
self.script = """
{2}
""".format(
reconnect, disable_protocols, gaTrackingCode
).encode(
"ascii"
)
async def __call__(
self, scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable
) -> None:
if scope["type"] != "http" or scope["path"] != "/":
return await self.app(scope, receive, send)
intercept = True
body = b""
async def sockjs_send(event: ASGISendEvent) -> None:
nonlocal intercept
nonlocal body
if intercept:
if event["type"] == "http.response.start":
if event["status"] != 200:
intercept = False
# Must remove Content-Length, if present; if we insert our
# scripts, it won't be correct anymore
event["headers"] = [
(name, value)
for (name, value) in event["headers"]
if name.decode("ascii").lower() != "content-length"
]
elif event["type"] == "http.response.body":
body += event["body"]
if b"" in body:
event["body"] = body.replace(b"", self.script)
body = b"" # Allow gc
intercept = False
elif event["more_body"]:
# DO NOT send the response; wait for more data
return
else:
# The entire response was seen, and we never encountered
# any . Just send everything we have
event["body"] = body
body = b"" # Allow gc
return await send(event)
await self.app(scope, receive, sockjs_send)
def wrap_shiny_app(app: ASGI3Application, input: ShinyInput) -> ASGI3Application:
app = SharedSecretMiddleware(app, input["sharedSecret"])
app = ShinyInjectHeadMiddleware(app, input)
return app
def run():
shiny_output = {
"pid": os.getpid(),
"versions": {
"python": f"{python_version} ({sys.executable})",
"shiny": metadata.version("shiny"),
},
}
print("shiny_launch_info: " + json.dumps(shiny_output, indent=None))
print("==END==")
# Shiny for Python currently logs some important info to stdout.
# Redirect stdout to stderr, so that those messages get captured
# by the app logs.
sys.stderr.flush()
sys.stdout.flush()
# Causes the file description at sys.stderr.fileno() to be pointed to by
# sys.stdout.fileno() as well. The original file descriptor at stdout is
# closed before reuse.
os.dup2(sys.stderr.fileno(), sys.stdout.fileno())
input: ShinyInput = json.load(sys.stdin)
if input["logFilePath"] != "":
log_file_handle = open(input["logFilePath"], "w")
sys.stderr = log_file_handle
if input["sanitizeErrors"]:
os.environ["SHINY_SANITIZE_ERRORS"] = "1"
if input["pandocPath"] != "":
os.environ["RSTUDIO_PANDOC"] = input["pandocPath"]
sys.path.insert(0, input["appDir"])
app_file = os.path.join(input["appDir"], "app.py")
if is_express_app(app_file, None):
app_module = importlib.import_module("shiny.express.app")
app = getattr(app_module, escape_to_var_name(app_file))
else:
app_module = importlib.import_module("app")
app = getattr(app_module, "app")
app = wrap_shiny_app(app, input)
uvicorn.run(app, host="127.0.0.1", port=int(input["port"]))
run()