// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #pragma once #include #include #include #include #include "arrow/acero/visibility.h" #include "arrow/status.h" #include "arrow/util/config.h" #include "arrow/util/logging.h" namespace arrow { namespace acero { // Atomic value surrounded by padding bytes to avoid cache line invalidation // whenever it is modified by a concurrent thread on a different CPU core. // template class AtomicWithPadding { private: static constexpr int kCacheLineSize = 64; uint8_t padding_before[kCacheLineSize]; public: std::atomic value; private: uint8_t padding_after[kCacheLineSize]; }; // Used for asynchronous execution of operations that can be broken into // a fixed number of symmetric tasks that can be executed concurrently. // // Implements priorities between multiple such operations, called task groups. // // Allows to specify the maximum number of in-flight tasks at any moment. // // Also allows for executing next pending tasks immediately using a caller thread. // class ARROW_ACERO_EXPORT TaskScheduler { public: using TaskImpl = std::function; using TaskGroupContinuationImpl = std::function; using ScheduleImpl = std::function; using AbortContinuationImpl = std::function; virtual ~TaskScheduler() = default; // Order in which task groups are registered represents priorities of their tasks // (the first group has the highest priority). // // Returns task group identifier that is used to request operations on the task group. virtual int RegisterTaskGroup(TaskImpl task_impl, TaskGroupContinuationImpl cont_impl) = 0; virtual void RegisterEnd() = 0; // total_num_tasks may be zero, in which case task group continuation will be executed // immediately virtual Status StartTaskGroup(size_t thread_id, int group_id, int64_t total_num_tasks) = 0; // Execute given number of tasks immediately using caller thread virtual Status ExecuteMore(size_t thread_id, int num_tasks_to_execute, bool execute_all) = 0; // Begin scheduling tasks using provided callback and // the limit on the number of in-flight tasks at any moment. // // Scheduling will continue as long as there are waiting tasks. // // It will automatically resume whenever new task group gets started. virtual Status StartScheduling(size_t thread_id, ScheduleImpl schedule_impl, int num_concurrent_tasks, bool use_sync_execution) = 0; // Abort scheduling and execution. // Used in case of being notified about unrecoverable error for the entire query. virtual void Abort(AbortContinuationImpl impl) = 0; static std::unique_ptr Make(); }; } // namespace acero } // namespace arrow