import sys # This needs to be very early, dependency loading can fail python_version = ( f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}" ) print(f"Using Python {python_version} at {sys.executable}", file=sys.stderr) import os import importlib import json from typing import TypedDict, cast, List, Optional, Literal from xmlrpc.client import boolean import uvicorn from asgiref.typing import ( ASGI3Application, Scope, ASGIReceiveCallable, ASGISendCallable, ASGISendEvent, ) from starlette.responses import PlainTextResponse if sys.version_info >= (3, 8): from importlib import metadata else: import importlib_metadata as metadata from shiny_express import escape_to_var_name, is_express_app class ShinyInput(TypedDict): appDir: str port: str sharedSecret: str reconnect: boolean disableProtocols: List[str] gaTrackingId: Optional[str] shinyServerVersion: str workerId: str mode: Literal["shiny-python"] pandocPath: str logFilePath: str sanitizeErrors: boolean bookmarkStateDir: Optional[str] # Do not allow any HTTP or WebSocket requests to succeed unless the # shiny-shared-secret header is present and has the correct value class SharedSecretMiddleware: def __init__(self, app: ASGI3Application, sharedSecret: str): self.app = app self.sharedSecret: bytes = sharedSecret.encode("utf-8") async def __call__( self, scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable ) -> None: if not self.check_secret(scope): resp = cast( ASGI3Application, PlainTextResponse( "Direct access to this content is not permitted.", 403 ), ) return await resp(scope, receive, send) await self.app(scope, receive, send) def check_secret(self, scope: Scope) -> boolean: # We're only responsible for securing HTTP and WebSocket if not scope["type"] == "http" and not scope["type"] == "websocket": return True # name and value are bytes, not strings for [name, value] in scope["headers"]: if name == b"shiny-shared-secret": if value == self.sharedSecret: return True break return False class ShinyInjectHeadMiddleware: def __init__(self, app: ASGI3Application, input: ShinyInput): self.app = app reconnect = "true" if input["reconnect"] else "false" if input["disableProtocols"] and len(input["disableProtocols"]) > 0: disable_protocols = '"' + '","'.join(input["disableProtocols"]) + '"' else: disable_protocols = "" gaTrackingCode = "" if input["gaTrackingId"]: gaID = input["gaTrackingId"] if gaID[:3] == "UA-": # Deprecated Google Analytics with Universal Analytics ID gaTrackingCode = """ """.format( gaID ) else: gaTrackingCode = """ """.format( gaID, gaID ) self.script = """ {2} """.format( reconnect, disable_protocols, gaTrackingCode ).encode( "ascii" ) async def __call__( self, scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable ) -> None: if scope["type"] != "http" or scope["path"] != "/": return await self.app(scope, receive, send) intercept = True body = b"" async def sockjs_send(event: ASGISendEvent) -> None: nonlocal intercept nonlocal body if intercept: if event["type"] == "http.response.start": if event["status"] != 200: intercept = False # Must remove Content-Length, if present; if we insert our # scripts, it won't be correct anymore event["headers"] = [ (name, value) for (name, value) in event["headers"] if name.decode("ascii").lower() != "content-length" ] elif event["type"] == "http.response.body": body += event["body"] if b"" in body: event["body"] = body.replace(b"", self.script) body = b"" # Allow gc intercept = False elif event["more_body"]: # DO NOT send the response; wait for more data return else: # The entire response was seen, and we never encountered # any . Just send everything we have event["body"] = body body = b"" # Allow gc return await send(event) await self.app(scope, receive, sockjs_send) def wrap_shiny_app(app: ASGI3Application, input: ShinyInput) -> ASGI3Application: app = SharedSecretMiddleware(app, input["sharedSecret"]) app = ShinyInjectHeadMiddleware(app, input) return app def run(): shiny_output = { "pid": os.getpid(), "versions": { "python": f"{python_version} ({sys.executable})", "shiny": metadata.version("shiny"), }, } print("shiny_launch_info: " + json.dumps(shiny_output, indent=None)) print("==END==") # Shiny for Python currently logs some important info to stdout. # Redirect stdout to stderr, so that those messages get captured # by the app logs. sys.stderr.flush() sys.stdout.flush() # Causes the file description at sys.stderr.fileno() to be pointed to by # sys.stdout.fileno() as well. The original file descriptor at stdout is # closed before reuse. os.dup2(sys.stderr.fileno(), sys.stdout.fileno()) input: ShinyInput = json.load(sys.stdin) if input["logFilePath"] != "": log_file_handle = open(input["logFilePath"], "w") sys.stderr = log_file_handle if input["sanitizeErrors"]: os.environ["SHINY_SANITIZE_ERRORS"] = "1" if input["pandocPath"] != "": os.environ["RSTUDIO_PANDOC"] = input["pandocPath"] sys.path.insert(0, input["appDir"]) app_file = os.path.join(input["appDir"], "app.py") if is_express_app(app_file, None): app_module = importlib.import_module("shiny.express.app") app = getattr(app_module, escape_to_var_name(app_file)) else: app_module = importlib.import_module("app") app = getattr(app_module, "app") app = wrap_shiny_app(app, input) uvicorn.run(app, host="127.0.0.1", port=int(input["port"])) run()