// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #pragma once #include #include #include #include #include "arrow/compute/exec.h" #include "arrow/result.h" namespace arrow { namespace acero { namespace util { using arrow::compute::ExecBatch; /// \brief A container that accumulates batches until they are ready to /// be processed. class AccumulationQueue { public: AccumulationQueue() : row_count_(0) {} ~AccumulationQueue() = default; // We should never be copying ExecBatch around AccumulationQueue(const AccumulationQueue&) = delete; AccumulationQueue& operator=(const AccumulationQueue&) = delete; AccumulationQueue(AccumulationQueue&& that); AccumulationQueue& operator=(AccumulationQueue&& that); void Concatenate(AccumulationQueue&& that); void InsertBatch(ExecBatch batch); int64_t row_count() { return row_count_; } size_t batch_count() { return batches_.size(); } bool empty() const { return batches_.empty(); } void Clear(); ExecBatch& operator[](size_t i); private: int64_t row_count_; std::vector batches_; }; /// A queue that sequences incoming batches /// /// This can be used when a node needs to do some kind of ordered processing on /// the stream. /// /// Batches can be inserted in any order. The process_callback will be called on /// the batches, in order, without reentrant calls. For this reason the callback /// should be quick. /// /// For example, in a top-n node, the process callback should determine how many /// rows need to be delivered for the given batch, and then return a task to actually /// deliver those rows. class SequencingQueue { public: using Task = std::function; /// Strategy that describes how to handle items class Processor { public: /// Process the batch, potentially generating a task /// /// This method will be called on each batch in order. Calls to this method /// will be serialized and it will not be called reentrantly. This makes it /// safe to do things that rely on order but minimal time should be spent here /// to avoid becoming a bottleneck. /// /// \return a follow-up task that will be scheduled. The follow-up task(s) are /// is not guaranteed to run in any particular order. If nullopt is /// returned then nothing will be scheduled. virtual Result> Process(ExecBatch batch) = 0; /// Schedule a task virtual void Schedule(Task task) = 0; }; virtual ~SequencingQueue() = default; /// Insert a batch into the queue /// /// This will insert the batch into the queue. If this batch was the next batch /// to deliver then this will trigger 1+ calls to the process callback to generate /// 1+ tasks. /// /// The task generated by this call will be executed immediately. The remaining /// tasks will be scheduled using the schedule callback. /// /// From a data pipeline perspective the sequencing queue is a "sometimes" breaker. If /// a task arrives in order then this call will usually execute the downstream pipeline. /// If this task arrives early then this call will only queue the data. virtual Status InsertBatch(ExecBatch batch) = 0; /// Create a queue /// \param processor describes how to process the batches, must outlive the queue static std::unique_ptr Make(Processor* processor); }; /// A queue that sequences incoming batches /// /// Unlike SequencingQueue the Process method is not expected to schedule new tasks. /// /// If a batch arrives and another thread is currently processing then the batch /// will be queued and control will return. In other words, delivery of batches will /// not block on the Process method. /// /// It can be helpful to think of this as if a dedicated thread is running Process as /// batches arrive class SerialSequencingQueue { public: /// Strategy that describes how to handle items class Processor { public: /// Process the batch /// /// This method will be called on each batch in order. Calls to this method /// will be serialized and it will not be called reentrantly. This makes it /// safe to do things that rely on order. /// /// If this falls behind then data may accumulate /// /// TODO: Could add backpressure if needed but right now all uses of this should /// be pretty fast and so are unlikely to block. virtual Status Process(ExecBatch batch) = 0; }; virtual ~SerialSequencingQueue() = default; /// Insert a batch into the queue /// /// This will insert the batch into the queue. If this batch was the next batch /// to deliver then this may trigger calls to the processor which will be run /// as part of this call. virtual Status InsertBatch(ExecBatch batch) = 0; /// Create a queue /// \param processor describes how to process the batches, must outlive the queue static std::unique_ptr Make(Processor* processor); }; } // namespace util } // namespace acero } // namespace arrow