properties: distributed: type: object properties: version: type: integer scheduler: type: object properties: allowed-failures: type: integer minimum: 0 description: | The number of retries before a task is considered bad When a worker dies when a task is running that task is rerun elsewhere. If many workers die while running this same task then we call the task bad, and raise a KilledWorker exception. This is the number of workers that are allowed to die before this task is marked as bad. bandwidth: type: - integer - string description: | The expected bandwidth between any pair of workers This is used when making scheduling decisions. The scheduler will use this value as a baseline, but also learn it over time. blocked-handlers: type: array description: | A list of handlers to exclude The scheduler operates by receiving messages from various workers and clients and then performing operations based on those messages. Each message has an operation like "close-worker" or "task-finished". In some high security situations administrators may choose to block certain handlers from running. Those handlers can be listed here. For a list of handlers see the `dask.distributed.Scheduler.handlers` attribute. contact-address: type: - string - "null" description: | The address that the scheduler advertises to workers for communication with it. To be specified when the address to which the scheduler binds cannot be the same as the address that workers use to contact the scheduler (e.g. because the former is private and the scheduler is in a different network than the workers). default-data-size: type: - string - integer description: | The default size of a piece of data if we don't know anything about it. This is used by the scheduler in some scheduling decisions events-cleanup-delay: type: string description: | The amount of time to wait until workers or clients are removed from the event log after they have been removed from the scheduler idle-timeout: type: - string - "null" description: | Shut down the scheduler after this duration if no activity has occurred no-workers-timeout: type: - string - "null" description: | Shut down the scheduler after this duration if there are pending tasks, but no workers that can process them. This can either mean that there are no workers running at all, or that there are idle workers but they've been excluded through worker or resource restrictions. In adaptive clusters, this timeout must be set to be safely higher than the time it takes for workers to spin up. Works in conjunction with idle-timeout. work-stealing: type: boolean description: | Whether or not to balance work between workers dynamically Some times one worker has more work than we expected. The scheduler will move these tasks around as necessary by default. Set this to false to disable this behavior work-stealing-interval: type: string description: | How frequently to balance worker loads worker-saturation: oneOf: - type: number exclusiveMinimum: 0 # String "inf", not to be confused with .inf which in YAML means float # infinity. This is necessary because there's no way to parse a float # infinity from a DASK_* environment variable. - enum: [inf] description: | Controls how many root tasks are sent to workers (like a `readahead`). Up to worker-saturation * nthreads root tasks are sent to a worker at a time. If `.inf`, all runnable tasks are immediately sent to workers. The target number is rounded up, so any `worker-saturation` value > 1.0 guarantees at least one extra task will be sent to workers. Allowing oversaturation (> 1.0) means a worker may start running a new root task as soon as it completes the previous, even if there is a higher-priority downstream task to run. This reduces worker idleness, by letting workers do something while waiting for further instructions from the scheduler, even if it's not the most efficient thing to do. This generally comes at the expense of increased memory usage. It leads to "wider" (more breadth-first) execution of the graph. Compute-bound workloads may benefit from oversaturation. Memory-bound workloads should generally leave `worker-saturation` at 1.0, though 1.25-1.5 could slightly improve performance if ample memory is available. worker-ttl: type: - string - "null" description: | Time to live for workers. If we don't receive a heartbeat faster than this then we assume that the worker has died. preload: type: array description: | Run custom modules during the lifetime of the scheduler You can run custom modules when the scheduler starts up and closes down. See https://docs.dask.org/en/latest/how-to/customize-initialization.html for more information preload-argv: type: array description: | Arguments to pass into the preload scripts described above See https://docs.dask.org/en/latest/how-to/customize-initialization.html for more information unknown-task-duration: type: string description: | Default duration for all tasks with unknown durations Over time the scheduler learns a duration for tasks. However when it sees a new type of task for the first time it has to make a guess as to how long it will take. This value is that guess. default-task-durations: type: object description: | How long we expect function names to run Over time the scheduler will learn these values, but these give it a good starting point. validate: type: boolean description: | Whether or not to run consistency checks during execution. This is typically only used for debugging. dashboard: type: object description: | Configuration options for Dask's real-time dashboard properties: status: type: object description: The main status page of the dashboard properties: task-stream-length: type: integer minimum: 0 description: | The maximum number of tasks to include in the task stream plot tasks: type: object description: | The page which includes the full task stream history properties: task-stream-length: type: integer minimum: 0 description: | The maximum number of tasks to include in the task stream plot tls: type: object description: | Settings around securing the dashboard properties: ca-file: type: - string - "null" key: type: - string - "null" cert: type: - string - "null" bokeh-application: type: object description: | Keywords to pass to the BokehTornado application locks: type: object description: | Settings for Dask's distributed Lock object See https://docs.dask.org/en/latest/futures.html#locks for more information properties: lease-validation-interval: type: string description: | The interval in which the scheduler validates staleness of all acquired leases. Must always be smaller than the lease-timeout itself. lease-timeout: type: string description: | Maximum interval to wait for a Client refresh before a lease is invalidated and released. http: type: object description: Settings for Dask's embedded HTTP Server properties: routes: type: array description: | A list of modules like "prometheus" and "health" that can be included or excluded as desired These modules will have a ``routes`` keyword that gets added to the main HTTP Server. This is also a list that can be extended with user defined modules. allowed-imports: type: array description: | A list of trusted root modules the schedular is allowed to import (incl. submodules). For security reasons, the scheduler does not import arbitrary Python modules. active-memory-manager: type: object required: [start, interval, measure, policies] additionalProperties: false properties: start: type: boolean description: set to true to auto-start the AMM on Scheduler init interval: type: string description: Time expression, e.g. "2s". Run the AMM cycle every . measure: enum: - process - optimistic - managed - managed_total description: One of the attributes of distributed.scheduler.MemoryState policies: type: array items: type: object required: [class] properties: class: type: string description: fully qualified name of an ActiveMemoryManagerPolicy subclass additionalProperties: description: keyword arguments to the policy constructor, if any worker: type: object description: | Configuration settings for Dask Workers properties: blocked-handlers: type: array description: | A list of handlers to exclude The scheduler operates by receiving messages from various workers and clients and then performing operations based on those messages. Each message has an operation like "close-worker" or "task-finished". In some high security situations administrators may choose to block certain handlers from running. Those handlers can be listed here. For a list of handlers see the `dask.distributed.Scheduler.handlers` attribute. multiprocessing-method: enum: - spawn - fork - forkserver description: | How we create new workers, one of "spawn", "forkserver", or "fork" This is passed to the ``multiprocessing.get_context`` function. use-file-locking: type: boolean description: | Whether or not to use lock files when creating workers Workers create a local directory in which to place temporary files. When many workers are created on the same process at once these workers can conflict with each other by trying to create this directory all at the same time. To avoid this, Dask usually used a file-based lock. However, on some systems file-based locks don't work. This is particularly common on HPC NFS systems, where users may want to set this to false. transfer: type: object description: | Configuration setting for data transfer between workers properties: message-bytes-limit: type: - string - integer description: | The maximum amount of data for a worker to request from another in a single gather operation Tasks are gathered in batches, and if the first task in a batch is larger than this value, the task will still be gathered to ensure progress. Hence, this limit is not absolute. Note that this limit applies to a single gather operation and a worker may gather data from multiple workers in parallel. connections: type: object description: | The number of concurrent connections to allow to other workers properties: incoming: type: integer minimum: 0 outgoing: type: integer minimum: 0 preload: type: array description: | Run custom modules during the lifetime of the worker You can run custom modules when the worker starts up and closes down. See https://docs.dask.org/en/latest/how-to/customize-initialization.html for more information preload-argv: type: array description: | Arguments to pass into the preload scripts described above See https://docs.dask.org/en/latest/how-to/customize-initialization.html for more information daemon: type: boolean description: | Whether or not to run our process as a daemon process validate: type: boolean description: | Whether or not to run consistency checks during execution. This is typically only used for debugging. resources: type: object description: | A dictionary specifying resources for workers. See https://distributed.dask.org/en/latest/resources.html for more information. properties: {} lifetime: type: object description: | The worker may choose to gracefully close itself down after some pre-determined time. This is particularly useful if you know that your worker job has a time limit on it. This is particularly common in HPC job schedulers. For example if your worker has a walltime of one hour, then you may want to set the lifetime.duration to "55 minutes" properties: duration: type: - string - "null" description: | The time after creation to close the worker, like "1 hour" stagger: type: string description: | Random amount by which to stagger lifetimes If you create many workers at the same time, you may want to avoid having them kill themselves all at the same time. To avoid this you might want to set a stagger time, so that they close themselves with some random variation, like "5 minutes" That way some workers can die, new ones can be brought up, and data can be transferred over smoothly. restart: type: boolean description: | Do we try to resurrect the worker after the lifetime deadline? profile: type: object description: | The workers periodically poll every worker thread to see what they are working on. This data gets collected into statistical profiling information, which is then periodically bundled together and sent along to the scheduler. properties: enabled: type: boolean description: | Whether or not to enable profiling interval: type: string description: | The time between polling the worker threads, typically short like 10ms cycle: type: string description: | The time between bundling together this data and sending it to the scheduler This controls the granularity at which people can query the profile information on the time axis. low-level: type: boolean description: | Whether or not to use the libunwind and stacktrace libraries to gather profiling information at the lower level (beneath Python) To get this to work you will need to install the experimental stacktrace library at conda install -c numba stacktrace See https://github.com/numba/stacktrace memory: type: object description: >- Settings for memory management properties: recent-to-old-time: type: string description: >- When there is an increase in process memory (as observed by the operating system) that is not accounted for by the dask keys stored on the worker, ignore it for this long before considering it in non-time-sensitive heuristics. This should be set to be longer than the duration of most dask tasks. rebalance: type: object description: >- Settings for memory rebalance operations properties: measure: enum: - process - optimistic - managed - managed_total description: >- Which of the properties of distributed.scheduler.MemoryState should be used for measuring worker memory usage sender-min: type: number minimum: 0 maximum: 1 description: >- Fraction of worker process memory at which we start potentially transferring data to other workers. recipient-max: type: number minimum: 0 maximum: 1 description: >- Fraction of worker process memory at which we stop potentially receiving data from other workers. Ignored when max_memory is not set. sender-recipient-gap: type: number minimum: 0 maximum: 1 description: >- Fraction of worker process memory, around the cluster mean, where a worker is neither a sender nor a recipient of data during a rebalance operation. E.g. if the mean cluster occupation is 50%, sender-recipient-gap=0.1 means that only nodes above 55% will donate data and only nodes below 45% will receive them. This helps avoid data from bouncing around the cluster repeatedly. transfer: oneOf: - {type: number, exclusiveMinimum: 0, maximum: 1} - {enum: [false]} description: >- When the total size of incoming data transfers gets above this amount, we start throttling incoming data transfers target: oneOf: - {type: number, exclusiveMinimum: 0, maximum: 1} - {enum: [false]} description: >- When the process memory (as observed by the operating system) gets above this amount, we start spilling the dask keys holding the oldest chunks of data to disk spill: oneOf: - {type: number, exclusiveMinimum: 0, maximum: 1} - {enum: [false]} description: >- When the process memory (as observed by the operating system) gets above this amount, we spill data to disk, starting from the dask keys holding the oldest chunks of data, until the process memory falls below the target threshold. pause: oneOf: - {type: number, exclusiveMinimum: 0, maximum: 1} - {enum: [false]} description: >- When the process memory (as observed by the operating system) gets above this amount, we no longer start new tasks or fetch new data on the worker. terminate: oneOf: - {type: number, exclusiveMinimum: 0, maximum: 1} - {enum: [false]} description: >- When the process memory reaches this level the nanny process will kill the worker (if a nanny is present) max-spill: oneOf: - type: string - {type: number, minimum: 0} - enum: [false] description: >- Limit of number of bytes to be spilled on disk. monitor-interval: type: string description: >- Interval between checks for the spill, pause, and terminate thresholds spill-compression: enum: [null, false, auto, zlib, lz4, snappy, zstd] description: The compression algorithm to use. 'auto' defaults to lz4 if installed, otherwise to snappy if installed, otherwise to false. zlib and zstd are only used if explicitly requested here. Uncompressible data is always uncompressed, regardless of this setting. See also distributed.comm.compression. http: type: object description: Settings for Dask's embedded HTTP Server properties: routes: type: array description: | A list of modules like "prometheus" and "health" that can be included or excluded as desired These modules will have a ``routes`` keyword that gets added to the main HTTP Server. This is also a list that can be extended with user defined modules. nanny: type: object description: | Configuration settings for Dask Nannies properties: preload: type: array description: | Run custom modules during the lifetime of the nanny You can run custom modules when the nanny starts up and closes down. See https://docs.dask.org/en/latest/how-to/customize-initialization.html for more information preload-argv: type: array description: | Arguments to pass into the preload scripts described above See https://docs.dask.org/en/latest/how-to/customize-initialization.html for more information environ: type: object description: | Environment variables to set on all worker processes started by nannies. These variables are set in the worker process after it has started. To unset a variable in a config override, set its value to null (if using a YAML config file) or None (if using dask.config.set) or "None" (if using a DASK_* environment variable). pre-spawn-environ: type: object description: | Environment variables to set on all worker processes started by nannies. These variables are set within the Nanny process, before spawning the worker process. Should be used for variables that must be set before process startup, interpreter startup, or imports. To unset a variable in a config override, set its value to null (if using a YAML config file) or None (if using dask.config.set) or "None" (if using a DASK_* environment variable). client: type: object description: | Configuration settings for Dask Clients properties: heartbeat: type: string description: This value is the time between heartbeats The client sends a periodic heartbeat message to the scheduler. If it misses enough of these then the scheduler assumes that it has gone. scheduler-info-interval: type: string description: Interval between scheduler-info updates security-loader: type: [string, 'null'] description: | A fully qualified name (e.g. ``module.submodule.function``) of a callback to use for loading security credentials for the client. If no security object is explicitly passed when creating a ``Client``, this callback is called with a dict containing client information (currently just ``address``), and should return a ``Security`` object to use for this client, or ``None`` to fallback to the default security configuration. preload: type: array description: | Run custom modules during the lifetime of the client You can run custom modules when the client starts up and closes down. See https://docs.dask.org/en/latest/how-to/customize-initialization.html for more information preload-argv: type: array description: | Arguments to pass into the preload scripts described above See https://docs.dask.org/en/latest/how-to/customize-initialization.html for more information deploy: type: object description: Configuration settings for general Dask deployment properties: lost-worker-timeout: type: string description: | Interval after which to hard-close a lost worker job Otherwise we wait for a while to see if a worker will reappear cluster-repr-interval: type: string description: Interval between calls to update cluster-repr for the widget adaptive: type: object description: Configuration settings for Dask's adaptive scheduling properties: interval: type: string description: | The duration between checking in with adaptive scheduling load The adaptive system periodically checks scheduler load and determines if it should scale the cluster up or down. This is the timing between those checks. target-duration: type: string description: | The desired time for the entire computation to run The adaptive system will try to start up enough workers to run the computation in about this time. minimum: type: integer minimum: 0 description: | The minimum number of workers to keep around maximum: type: number minimum: 0 description: | The maximum number of workers to keep around wait-count: type: integer minimum: 1 description: | The number of times a worker should be suggested for removal before removing it This helps to smooth out the number of deployed workers comm: type: object description: Configuration settings for Dask communications properties: retry: type: object description: | Some operations (such as gathering data) are subject to re-tries with the below parameters properties: count: type: integer minimum: 0 description: | The number of times to retry a connection delay: type: object properties: min: type: string description: The first non-zero delay between retry attempts max: type: string description: The maximum delay between retries compression: enum: [null, false, auto, zlib, lz4, snappy, zstd] description: The compression algorithm to use. 'auto' defaults to lz4 if installed, otherwise to snappy if installed, otherwise to false. zlib and zstd are only used if explicitly requested here. Uncompressible data and transfers on localhost are always uncompressed, regardless of this setting. See also distributed.worker.memory.spill-compression. offload: type: - boolean - string description: | The size of message after which we choose to offload serialization to another thread In some cases, you may also choose to disable this altogether with the value false This is useful if you want to include serialization in profiling data, or if you have data types that are particularly sensitive to deserialization shard: type: string description: | The maximum size of a frame to send through a comm Some network infrastructure doesn't like sending through very large messages. Dask comms will cut up these large messages into many small ones. This attribute determines the maximum size of such a shard. socket-backlog: type: integer description: | When shuffling data between workers, there can really be O(cluster size) connection requests on a single worker socket, make sure the backlog is large enough not to lose any. zstd: type: object description: Options for the Z Standard compression scheme properties: level: type: integer minimum: 1 maximum: 22 description: Compression level, between 1 and 22. threads: type: integer minimum: -1 description: | Number of threads to use. 0 for single-threaded, -1 to infer from cpu count. timeouts: type: object properties: connect: type: string tcp: type: string require-encryption: type: - boolean - "null" description: | Whether to require encryption on non-local comms default-scheme: type: string description: The default protocol to use, like tcp or tls tls: type: object properties: ciphers: type: - string - "null" description: Allowed ciphers, specified as an OpenSSL cipher string. min-version: enum: [null, 1.2, 1.3] description: The minimum TLS version to support. Defaults to TLS 1.2. max-version: enum: [null, 1.2, 1.3] description: | The maximum TLS version to support. Defaults to the maximum version supported by the platform. ca-file: type: - string - "null" description: Path to a CA file, in pem format scheduler: type: object description: TLS information for the scheduler properties: cert: type: - string - "null" description: Path to certificate file key: type: - string - "null" description: | Path to key file. Alternatively, the key can be appended to the cert file above, and this field left blank worker: type: object description: TLS information for the worker properties: cert: type: - string - "null" description: Path to certificate file key: type: - string - "null" description: | Path to key file. Alternatively, the key can be appended to the cert file above, and this field left blank client: type: object description: TLS information for the client properties: cert: type: - string - "null" description: Path to certificate file key: type: - string - "null" description: | Path to key file. Alternatively, the key can be appended to the cert file above, and this field left blank ucx: type: object description: | UCX provides access to other transport methods including NVLink and InfiniBand. properties: cuda-copy: type: [boolean, 'null'] description: | Set environment variables to enable CUDA support over UCX. This may be used even if InfiniBand and NVLink are not supported or disabled, then transferring data over TCP. tcp: type: [boolean, 'null'] description: | Set environment variables to enable TCP over UCX, even if InfiniBand and NVLink are not supported or disabled. nvlink: type: [boolean, 'null'] description: | Set environment variables to enable UCX over NVLink, implies ``distributed.comm.ucx.tcp=True``. infiniband: type: [boolean, 'null'] description: | Set environment variables to enable UCX over InfiniBand, implies ``distributed.comm.ucx.tcp=True``. rdmacm: type: [boolean, 'null'] description: | Set environment variables to enable UCX RDMA connection manager support, requires ``distributed.comm.ucx.infiniband=True``. create-cuda-context: type: [boolean, 'null'] description: | Creates a CUDA context before UCX is initialized. This is necessary to enable UCX to properly identify connectivity of GPUs with specialized networking hardware, such as InfiniBand. This permits UCX to choose transports automatically, without specifying additional variables for each transport, while ensuring optimal connectivity. When ``True``, a CUDA context will be created on the first device listed in ``CUDA_VISIBLE_DEVICES``. environment: type: object description: | Mapping for setting arbitrary UCX environment variables. Names here are translated via the following rules to map to the relevant UCX environment variable: - hyphens are replaced with underscores - words are uppercased - UCX_ is prepended So, for example, setting ``some-option=value`` is equivalent to setting ``UCX_SOME_OPTION=value`` in the calling environment. For a full list of supported UCX environment variables, run ``ucx_info -f``. websockets: type: object properties: shard: type: - string description: | The maximum size of a websocket frame to send through a comm. This is somewhat duplicative of distributed.comm.shard, but websockets often have much smaller maximum message sizes than other protocols, so this attribute is used to set a smaller default shard size and to allow separate control of websocket message sharding. diagnostics: type: object properties: nvml: type: boolean description: | If ``True``, enables GPU diagnostics with NVML. Generally leaving it enabled is not a problem and will be automatically disabled if no GPUs are found in the system, but in certain cases it may be desirable to completely disable NVML diagnostics. cudf: type: boolean description: | If ``True``, enables tracking of GPU spilling and unspilling managed by cuDF (if it is enabled). Note that this forces a cuDF import at worker startup, which may be undesirable for performance and memory footprint. computations: type: object properties: max-history: type: integer minimum: 0 description: | The maximum number of computations to remember. nframes: type: integer minimum: 0 description: | The number of frames of code to capture, starting from the innermost frame. ignore-modules: type: array description: | A list of modules which are ignored when trying to collect the code context when submitting a computation. Accepts regular expressions. ignore-files: type: array description: | A list of files and directories which are ignored when trying to collect the code context when submitting a computation. Accepts regular expressions. erred-tasks: type: object properties: max-history: type: integer minimum: 0 description: | The maximum number of erred tasks to remember. p2p: type: object description: Configuration for P2P shuffles properties: comm: type: object description: Configuration settings for Dask communications specific to P2P properties: retry: type: object description: | Sending data during P2P is subject to re-tries with the below parameters properties: count: type: integer minimum: 0 description: | The number of times to retry a connection delay: type: object properties: min: type: string description: The first non-zero delay between retry attempts max: type: string description: The maximum delay between retries disk: type: boolean description: | Whether or not P2P stores intermediate data on disk instead of memory dashboard: type: object properties: link: type: string description: | The form for the dashboard links This is used wherever we print out the link for the dashboard It is filled in with relevant information like the schema, host, and port number graph-max-items: type: integer minimum: 0 description: maximum number of tasks to try to plot in "graph" view export-tool: type: boolean prometheus: type: object properties: namespace: type: string description: Namespace prefix to use for all prometheus metrics. admin: type: object description: | Options for logs, event loops, and so on properties: large-graph-warning-threshold: type: string description: | Threshold in bytes for when a warning is raised about a large submitted task graph. Default is 10MB. tick: type: object description: | Time between event loop health checks We set up a periodic callback to run on the event loop and check in fairly frequently. (by default, this is every 20 milliseconds) If this periodic callback sees that the last time it checked in was several seconds ago (by default, this is 3 seconds) then it logs a warning saying that something has been stopping the event loop from smooth operation. This is typically caused by GIL holding operations, but could also be several other things. properties: interval: type: string description: The time between ticks, default 20ms limit : type: string description: The time allowed before triggering a warning cycle : type: string description: The time in between verifying event loop speed max-error-length: type: integer minimum: 0 description: | Maximum length of traceback as text Some Python tracebacks can be very very long (particularly in stack overflow errors) If the traceback is larger than this size (in bytes) then we truncate it. log-length: type: [integer, 'null'] minimum: 0 description: | Maximum length of worker/scheduler logs to keep in memory. They can be retrieved with get_scheduler_logs() / get_worker_logs(). Set to null for unlimited. log-format: type: string description: | The log format to emit. See https://docs.python.org/3/library/logging.html#logrecord-attributes low-level-log-length: type: [integer, 'null'] minimum: 0 description: | Maximum length of various event logs for developers. Set to null for unlimited. event-loop: type: string description: | The event loop to use, Must be one of tornado, asyncio, or uvloop pdb-on-err: type: boolean description: Enter Python Debugger on scheduling error system-monitor: type: object description: | Options for the periodic system monitor properties: interval: type: string description: Polling time to query cpu/memory statistics default 500ms log-length: type: [ integer, 'null' ] minimum: 0 description: | Maximum number of samples to keep in memory. Multiply by `interval` to obtain log duration. Set to null for unlimited. disk: type: boolean description: Should we include disk metrics? (they can cause issues in some systems) host-cpu: type: boolean description: Should we include host-wide CPU usage, with very granular breakdown? gil: type: object description: | Should we include GIL contention metrics, requires `gilknocker` to be installed. properties: enabled: type: boolean description: Enable monitoring of GIL contention interval: type: string description: | GIL polling interval. More frequent polling will reflect a more accurate GIL contention metric but will be more likely to impact runtime performance. rmm: type: object description: | Configuration options for the RAPIDS Memory Manager. properties: pool-size: type: [integer, 'null'] description: | The size of the memory pool in bytes.