// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #pragma once #include #include #include #include #include "arrow/result.h" #include "arrow/status.h" #include "arrow/util/cancel.h" #include "arrow/util/functional.h" #include "arrow/util/future.h" #include "arrow/util/iterator.h" #include "arrow/util/mutex.h" #include "arrow/util/thread_pool.h" #include "arrow/util/tracing.h" namespace arrow { using internal::FnOnce; namespace util { /// A utility which keeps tracks of, and schedules, asynchronous tasks /// /// An asynchronous task has a synchronous component and an asynchronous component. /// The synchronous component typically schedules some kind of work on an external /// resource (e.g. the I/O thread pool or some kind of kernel-based asynchronous /// resource like io_uring). The asynchronous part represents the work /// done on that external resource. Executing the synchronous part will be referred /// to as "submitting the task" since this usually includes submitting the asynchronous /// portion to the external thread pool. /// /// By default the scheduler will submit the task (execute the synchronous part) as /// soon as it is added, assuming the underlying thread pool hasn't terminated or the /// scheduler hasn't aborted. In this mode, the scheduler is simply acting as /// a simple task group. /// /// A task scheduler starts with an initial task. That task, and all subsequent tasks /// are free to add subtasks. Once all submitted tasks finish the scheduler will /// finish. Note, it is not an error to add additional tasks after a scheduler has /// aborted. These tasks will be ignored and never submitted. The scheduler returns a /// future which will complete when all submitted tasks have finished executing. Once all /// tasks have been finished the scheduler is invalid and should no longer be used. /// /// Task failure (either the synchronous portion or the asynchronous portion) will cause /// the scheduler to enter an aborted state. The first such failure will be reported in /// the final task future. class ARROW_EXPORT AsyncTaskScheduler { public: /// Destructor for AsyncTaskScheduler /// /// The lifetime of the task scheduled is managed automatically. The scheduler /// will remain valid while any tasks are running (and can always be safely accessed) /// within tasks) and will be destroyed as soon as all tasks have finished. virtual ~AsyncTaskScheduler() = default; /// An interface for a task /// /// Users may want to override this, for example, to add priority /// information for use by a queue. class Task { public: virtual ~Task() = default; /// Submit the task /// /// This will be called by the scheduler at most once when there /// is space to run the task. This is expected to be a fairly quick /// function that simply submits the actual task work to an external /// resource (e.g. I/O thread pool). /// /// If this call fails then the scheduler will enter an aborted state. virtual Result> operator()() = 0; /// The cost of the task /// /// A ThrottledAsyncTaskScheduler can be used to limit the number of concurrent tasks. /// A custom cost may be used, for example, if you would like to limit the number of /// tasks based on the total expected RAM usage of the tasks (this is done in the /// scanner) virtual int cost() const { return 1; } /// The name of the task /// /// This is used for debugging and traceability. The returned view must remain /// valid for the lifetime of the task. virtual std::string_view name() const = 0; /// a span tied to the lifetime of the task, for internal use only tracing::Span span; }; /// Add a task to the scheduler /// /// If the scheduler is in an aborted state this call will return false and the task /// will never be run. This is harmless and does not need to be guarded against. /// /// The return value for this call can usually be ignored. There is little harm in /// attempting to add tasks to an aborted scheduler. It is only included for callers /// that want to avoid future task generation to save effort. /// /// \param task the task to submit /// /// A task's name must remain valid for the duration of the task. It is used for /// debugging (e.g. when debugging a deadlock to see which tasks still remain) and for /// traceability (the name will be used for spans assigned to the task) /// /// \return true if the task was submitted or queued, false if the task was ignored virtual bool AddTask(std::unique_ptr task) = 0; /// Adds an async generator to the scheduler /// /// The async generator will be visited, one item at a time. Submitting a task /// will consist of polling the generator for the next future. The generator's future /// will then represent the task itself. /// /// This visits the task serially without readahead. If readahead or parallelism /// is desired then it should be added in the generator itself. /// /// The generator itself will be kept alive until all tasks have been completed. /// However, if the scheduler is aborted, the generator will be destroyed as soon as the /// next item would be requested. /// /// \param generator the generator to submit to the scheduler /// \param visitor a function which visits each generator future as it completes /// \param name a name which will be used for each submitted task template bool AddAsyncGenerator(std::function()> generator, std::function visitor, std::string_view name); template struct SimpleTask : public Task { SimpleTask(Callable callable, std::string_view name) : callable(std::move(callable)), name_(name) {} SimpleTask(Callable callable, std::string name) : callable(std::move(callable)), owned_name_(std::move(name)) { name_ = *owned_name_; } Result> operator()() override { return callable(); } std::string_view name() const override { return name_; } Callable callable; std::string_view name_; std::optional owned_name_; }; /// Add a task with cost 1 to the scheduler /// /// \param callable a "submit" function that should return a future /// \param name a name for the task /// /// `name` must remain valid until the task has been submitted AND the returned /// future completes. It is used for debugging and tracing. /// /// \see AddTask for more details template bool AddSimpleTask(Callable callable, std::string_view name) { return AddTask(std::make_unique>(std::move(callable), name)); } /// Add a task with cost 1 to the scheduler /// /// This is an overload of \see AddSimpleTask that keeps `name` alive /// in the task. template bool AddSimpleTask(Callable callable, std::string name) { return AddTask( std::make_unique>(std::move(callable), std::move(name))); } /// Construct a scheduler /// /// \param initial_task The initial task which is responsible for adding /// the first subtasks to the scheduler. /// \param abort_callback A callback that will be triggered immediately after a task /// fails while other tasks may still be running. Nothing needs to be done here, /// when a task fails the scheduler will stop accepting new tasks and eventually /// return the error. However, this callback can be used to more quickly end /// long running tasks that have already been submitted. Defaults to doing /// nothing. /// \param stop_token An optional stop token that will allow cancellation of the /// scheduler. This will be checked before each task is submitted and, in the /// event of a cancellation, the scheduler will enter an aborted state. This is /// a graceful cancellation and submitted tasks will still complete. /// \return A future that will be completed when the initial task and all subtasks have /// finished. static Future<> Make( FnOnce initial_task, FnOnce abort_callback = [](const Status&) {}, StopToken stop_token = StopToken::Unstoppable()); /// A span tracking execution of the scheduler's tasks, for internal use only virtual const tracing::Span& span() const = 0; }; class ARROW_EXPORT ThrottledAsyncTaskScheduler : public AsyncTaskScheduler { public: /// An interface for a task queue /// /// A queue's methods will not be called concurrently class Queue { public: virtual ~Queue() = default; /// Push a task to the queue /// /// \param task the task to enqueue virtual void Push(std::unique_ptr task) = 0; /// Pop the next task from the queue virtual std::unique_ptr Pop() = 0; /// Peek the next task in the queue virtual const Task& Peek() = 0; /// Check if the queue is empty virtual bool Empty() = 0; /// Purge the queue of all items virtual void Purge() = 0; virtual std::size_t Size() const = 0; }; class Throttle { public: virtual ~Throttle() = default; /// Acquire amt permits /// /// If nullopt is returned then the permits were immediately /// acquired and the caller can proceed. If a future is returned then the caller /// should wait for the future to complete first. When the returned future completes /// the permits have NOT been acquired and the caller must call Acquire again /// /// \param amt the number of permits to acquire virtual std::optional> TryAcquire(int amt) = 0; /// Release amt permits /// /// This will possibly complete waiting futures and should probably not be /// called while holding locks. /// /// \param amt the number of permits to release virtual void Release(int amt) = 0; /// The size of the largest task that can run /// /// Incoming tasks will have their cost latched to this value to ensure /// they can still run (although they will be the only thing allowed to /// run at that time). virtual int Capacity() = 0; /// Pause the throttle /// /// Any tasks that have been submitted already will continue. However, no new tasks /// will be run until the throttle is resumed. virtual void Pause() = 0; /// Resume the throttle /// /// Allows task to be submitted again. If there is a max_concurrent_cost limit then /// it will still apply. virtual void Resume() = 0; }; /// Pause the throttle /// /// Any tasks that have been submitted already will continue. However, no new tasks /// will be run until the throttle is resumed. virtual void Pause() = 0; /// Resume the throttle /// /// Allows task to be submitted again. If there is a max_concurrent_cost limit then /// it will still apply. virtual void Resume() = 0; /// Return the number of tasks queued but not yet submitted virtual std::size_t QueueSize() = 0; /// Create a throttled view of a scheduler /// /// Tasks added via this view will be subjected to the throttle and, if the tasks cannot /// run immediately, will be placed into a queue. /// /// Although a shared_ptr is returned it should generally be assumed that the caller /// is being given exclusive ownership. The shared_ptr is used to share the view with /// queued and submitted tasks and the lifetime of those is unpredictable. It is /// important the caller keep the returned pointer alive for as long as they plan to add /// tasks to the view. /// /// \param scheduler a scheduler to submit tasks to after throttling /// /// This can be the root scheduler, another throttled scheduler, or a task group. These /// are all composable. /// /// \param max_concurrent_cost the maximum amount of cost allowed to run at any one time /// /// If a task is added that has a cost greater than max_concurrent_cost then its cost /// will be reduced to max_concurrent_cost so that it is still possible for the task to /// run. /// /// \param queue the queue to use when tasks cannot be submitted /// /// By default a FIFO queue will be used. However, a custom queue can be provided if /// some tasks have higher priority than other tasks. static std::shared_ptr Make( AsyncTaskScheduler* scheduler, int max_concurrent_cost, std::unique_ptr queue = NULLPTR); /// @brief Create a ThrottledAsyncTaskScheduler using a custom throttle /// /// \see Make static std::shared_ptr MakeWithCustomThrottle( AsyncTaskScheduler* scheduler, std::unique_ptr throttle, std::unique_ptr queue = NULLPTR); }; /// A utility to keep track of a collection of tasks /// /// Often it is useful to keep track of some state that only needs to stay alive /// for some small collection of tasks, or to perform some kind of final cleanup /// when a collection of tasks is finished. /// /// For example, when scanning, we need to keep the file reader alive while all scan /// tasks run for a given file, and then we can gracefully close it when we finish the /// file. class ARROW_EXPORT AsyncTaskGroup : public AsyncTaskScheduler { public: /// Destructor for the task group /// /// The destructor might trigger the finish callback. If the finish callback fails /// then the error will be reported as a task on the scheduler. /// /// Failure to destroy the async task group will not prevent the scheduler from /// finishing. If the scheduler finishes before the async task group is done then /// the finish callback will be run immediately when the async task group finishes. /// /// If the scheduler has aborted then the finish callback will not run. ~AsyncTaskGroup() = default; /// Create an async task group /// /// The finish callback will not run until the task group is destroyed and all /// tasks are finished so you will generally want to reset / destroy the returned /// unique_ptr at some point. /// /// \param scheduler The underlying scheduler to submit tasks to /// \param finish_callback A callback that will be run only after the task group has /// been destroyed and all tasks added by the group have /// finished. /// /// Note: in error scenarios the finish callback may not run. However, it will still, /// of course, be destroyed. static std::unique_ptr Make(AsyncTaskScheduler* scheduler, FnOnce finish_callback); }; /// Create a task group that is also throttled /// /// This is a utility factory that creates a throttled view of a scheduler and then /// wraps that throttled view with a task group that destroys the throttle when finished. /// /// \see ThrottledAsyncTaskScheduler /// \see AsyncTaskGroup /// \param target the underlying scheduler to submit tasks to /// \param max_concurrent_cost the maximum amount of cost allowed to run at any one time /// \param queue the queue to use when tasks cannot be submitted /// \param finish_callback A callback that will be run only after the task group has /// been destroyed and all tasks added by the group have finished ARROW_EXPORT std::unique_ptr MakeThrottledAsyncTaskGroup( AsyncTaskScheduler* target, int max_concurrent_cost, std::unique_ptr queue, FnOnce finish_callback); // Defined down here to avoid circular dependency between AsyncTaskScheduler and // AsyncTaskGroup template bool AsyncTaskScheduler::AddAsyncGenerator(std::function()> generator, std::function visitor, std::string_view name) { struct State { State(std::function()> generator, std::function visitor, std::unique_ptr task_group, std::string_view name) : generator(std::move(generator)), visitor(std::move(visitor)), task_group(std::move(task_group)), name(name) {} std::function()> generator; std::function visitor; std::unique_ptr task_group; std::string_view name; }; struct SubmitTask : public Task { explicit SubmitTask(std::unique_ptr state_holder) : state_holder(std::move(state_holder)) {} struct SubmitTaskCallback { SubmitTaskCallback(std::unique_ptr state_holder, Future<> task_completion) : state_holder(std::move(state_holder)), task_completion(std::move(task_completion)) {} void operator()(const Result& maybe_item) { if (!maybe_item.ok()) { task_completion.MarkFinished(maybe_item.status()); return; } const auto& item = *maybe_item; if (IsIterationEnd(item)) { task_completion.MarkFinished(); return; } Status visit_st = state_holder->visitor(item); if (!visit_st.ok()) { task_completion.MarkFinished(std::move(visit_st)); return; } state_holder->task_group->AddTask( std::make_unique(std::move(state_holder))); task_completion.MarkFinished(); } std::unique_ptr state_holder; Future<> task_completion; }; Result> operator()() { Future<> task = Future<>::Make(); // Consume as many items as we can (those that are already finished) // synchronously to avoid recursion / stack overflow. while (true) { Future next = state_holder->generator(); if (next.TryAddCallback( [&] { return SubmitTaskCallback(std::move(state_holder), task); })) { return task; } ARROW_ASSIGN_OR_RAISE(T item, next.result()); if (IsIterationEnd(item)) { task.MarkFinished(); return task; } ARROW_RETURN_NOT_OK(state_holder->visitor(item)); } } std::string_view name() const { return state_holder->name; } std::unique_ptr state_holder; }; std::unique_ptr task_group = AsyncTaskGroup::Make(this, [] { return Status::OK(); }); AsyncTaskGroup* task_group_view = task_group.get(); std::unique_ptr state_holder = std::make_unique( std::move(generator), std::move(visitor), std::move(task_group), name); task_group_view->AddTask(std::make_unique(std::move(state_holder))); return true; } } // namespace util } // namespace arrow