// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #pragma once #include #include #include #include #include #include #include #include "arrow/util/async_generator_fwd.h" #include "arrow/util/async_util.h" #include "arrow/util/functional.h" #include "arrow/util/future.h" #include "arrow/util/io_util.h" #include "arrow/util/iterator.h" #include "arrow/util/mutex.h" #include "arrow/util/queue.h" #include "arrow/util/thread_pool.h" namespace arrow { // The methods in this file create, modify, and utilize AsyncGenerator which is an // iterator of futures. This allows an asynchronous source (like file input) to be run // through a pipeline in the same way that iterators can be used to create pipelined // workflows. // // In order to support pipeline parallelism we introduce the concept of asynchronous // reentrancy. This is different than synchronous reentrancy. With synchronous code a // function is reentrant if the function can be called again while a previous call to that // function is still running. Unless otherwise specified none of these generators are // synchronously reentrant. Care should be taken to avoid calling them in such a way (and // the utilities Visit/Collect/Await take care to do this). // // Asynchronous reentrancy on the other hand means the function is called again before the // future returned by the function is marked finished (but after the call to get the // future returns). Some of these generators are async-reentrant while others (e.g. // those that depend on ordered processing like decompression) are not. Read the MakeXYZ // function comments to determine which generators support async reentrancy. // // Note: Generators that are not asynchronously reentrant can still support readahead // (\see MakeSerialReadaheadGenerator). // // Readahead operators, and some other operators, may introduce queueing. Any operators // that introduce buffering should detail the amount of buffering they introduce in their // MakeXYZ function comments. // // A generator should always be fully consumed before it is destroyed. // A generator should not mark a future complete with an error status or a terminal value // until all outstanding futures have completed. Generators that spawn multiple // concurrent futures may need to hold onto an error while other concurrent futures wrap // up. template struct IterationTraits> { /// \brief by default when iterating through a sequence of AsyncGenerator, /// an empty function indicates the end of iteration. static AsyncGenerator End() { return AsyncGenerator(); } static bool IsEnd(const AsyncGenerator& val) { return !val; } }; template Future AsyncGeneratorEnd() { return Future::MakeFinished(IterationTraits::End()); } /// returning a future that completes when all have been visited template Future<> VisitAsyncGenerator(AsyncGenerator generator, Visitor visitor) { struct LoopBody { struct Callback { Result> operator()(const T& next) { if (IsIterationEnd(next)) { return Break(); } else { auto visited = visitor(next); if (visited.ok()) { return Continue(); } else { return visited; } } } Visitor visitor; }; Future> operator()() { Callback callback{visitor}; auto next = generator(); return next.Then(std::move(callback)); } AsyncGenerator generator; Visitor visitor; }; return Loop(LoopBody{std::move(generator), std::move(visitor)}); } /// \brief Wait for an async generator to complete, discarding results. template Future<> DiscardAllFromAsyncGenerator(AsyncGenerator generator) { std::function visitor = [](const T&) { return Status::OK(); }; return VisitAsyncGenerator(generator, visitor); } /// \brief Collect the results of an async generator into a vector template Future> CollectAsyncGenerator(AsyncGenerator generator) { auto vec = std::make_shared>(); auto loop_body = [generator = std::move(generator), vec = std::move(vec)]() -> Future>> { auto next = generator(); return next.Then([vec](const T& result) -> Result>> { if (IsIterationEnd(result)) { return Break(*vec); } else { vec->push_back(result); return Continue(); } }); }; return Loop(std::move(loop_body)); } /// \see MakeMappedGenerator template class MappingGenerator { public: MappingGenerator(AsyncGenerator source, std::function(const T&)> map) : state_(std::make_shared(std::move(source), std::move(map))) {} Future operator()() { auto future = Future::Make(); bool should_trigger; { auto guard = state_->mutex.Lock(); if (state_->finished) { return AsyncGeneratorEnd(); } should_trigger = state_->waiting_jobs.empty(); state_->waiting_jobs.push_back(future); } if (should_trigger) { state_->source().AddCallback(Callback{state_}); } return future; } private: struct State { State(AsyncGenerator source, std::function(const T&)> map) : source(std::move(source)), map(std::move(map)), waiting_jobs(), mutex(), finished(false) {} void Purge() { // This might be called by an original callback (if the source iterator fails or // ends) or by a mapped callback (if the map function fails or ends prematurely). // Either way it should only be called once and after finished is set so there is no // need to guard access to `waiting_jobs`. while (!waiting_jobs.empty()) { waiting_jobs.front().MarkFinished(IterationTraits::End()); waiting_jobs.pop_front(); } } AsyncGenerator source; std::function(const T&)> map; std::deque> waiting_jobs; util::Mutex mutex; bool finished; }; struct Callback; struct MappedCallback { void operator()(const Result& maybe_next) { bool end = !maybe_next.ok() || IsIterationEnd(*maybe_next); bool should_purge = false; if (end) { { auto guard = state->mutex.Lock(); should_purge = !state->finished; state->finished = true; } } sink.MarkFinished(maybe_next); if (should_purge) { state->Purge(); } } std::shared_ptr state; Future sink; }; struct Callback { void operator()(const Result& maybe_next) { Future sink; bool end = !maybe_next.ok() || IsIterationEnd(*maybe_next); bool should_purge = false; bool should_trigger; { auto guard = state->mutex.Lock(); // A MappedCallback may have purged or be purging the queue; // we shouldn't do anything here. if (state->finished) return; if (end) { should_purge = !state->finished; state->finished = true; } sink = state->waiting_jobs.front(); state->waiting_jobs.pop_front(); should_trigger = !end && !state->waiting_jobs.empty(); } if (should_purge) { state->Purge(); } if (should_trigger) { state->source().AddCallback(Callback{state}); } if (maybe_next.ok()) { const T& val = maybe_next.ValueUnsafe(); if (IsIterationEnd(val)) { sink.MarkFinished(IterationTraits::End()); } else { Future mapped_fut = state->map(val); mapped_fut.AddCallback(MappedCallback{std::move(state), std::move(sink)}); } } else { sink.MarkFinished(maybe_next.status()); } } std::shared_ptr state; }; std::shared_ptr state_; }; /// \brief Create a generator that will apply the map function to each element of /// source. The map function is not called on the end token. /// /// Note: This function makes a copy of `map` for each item /// Note: Errors returned from the `map` function will be propagated /// /// If the source generator is async-reentrant then this generator will be also template , typename V = typename EnsureFuture::type::ValueType> AsyncGenerator MakeMappedGenerator(AsyncGenerator source_generator, MapFn map) { auto map_callback = [map = std::move(map)](const T& val) mutable -> Future { return ToFuture(map(val)); }; return MappingGenerator(std::move(source_generator), std::move(map_callback)); } /// \brief Create a generator that will apply the map function to /// each element of source. The map function is not called on the end /// token. The result of the map function should be another /// generator; all these generators will then be flattened to produce /// a single stream of items. /// /// Note: This function makes a copy of `map` for each item /// Note: Errors returned from the `map` function will be propagated /// /// If the source generator is async-reentrant then this generator will be also template , typename V = typename EnsureFuture::type::ValueType> AsyncGenerator MakeFlatMappedGenerator(AsyncGenerator source_generator, MapFn map) { return MakeConcatenatedGenerator( MakeMappedGenerator(std::move(source_generator), std::move(map))); } /// \see MakeSequencingGenerator template class SequencingGenerator { public: SequencingGenerator(AsyncGenerator source, ComesAfter compare, IsNext is_next, T initial_value) : state_(std::make_shared(std::move(source), std::move(compare), std::move(is_next), std::move(initial_value))) {} Future operator()() { { auto guard = state_->mutex.Lock(); // We can send a result immediately if the top of the queue is either an // error or the next item if (!state_->queue.empty() && (!state_->queue.top().ok() || state_->is_next(state_->previous_value, *state_->queue.top()))) { auto result = std::move(state_->queue.top()); if (result.ok()) { state_->previous_value = *result; } state_->queue.pop(); return Future::MakeFinished(result); } if (state_->finished) { return AsyncGeneratorEnd(); } // The next item is not in the queue so we will need to wait auto new_waiting_fut = Future::Make(); state_->waiting_future = new_waiting_fut; guard.Unlock(); state_->source().AddCallback(Callback{state_}); return new_waiting_fut; } } private: struct WrappedComesAfter { bool operator()(const Result& left, const Result& right) { if (!left.ok() || !right.ok()) { // Should never happen return false; } return compare(*left, *right); } ComesAfter compare; }; struct State { State(AsyncGenerator source, ComesAfter compare, IsNext is_next, T initial_value) : source(std::move(source)), is_next(std::move(is_next)), previous_value(std::move(initial_value)), waiting_future(), queue(WrappedComesAfter{compare}), finished(false), mutex() {} AsyncGenerator source; IsNext is_next; T previous_value; Future waiting_future; std::priority_queue, std::vector>, WrappedComesAfter> queue; bool finished; util::Mutex mutex; }; class Callback { public: explicit Callback(std::shared_ptr state) : state_(std::move(state)) {} void operator()(const Result result) { Future to_deliver; bool finished; { auto guard = state_->mutex.Lock(); bool ready_to_deliver = false; if (!result.ok()) { // Clear any cached results while (!state_->queue.empty()) { state_->queue.pop(); } ready_to_deliver = true; state_->finished = true; } else if (IsIterationEnd(result.ValueUnsafe())) { ready_to_deliver = state_->queue.empty(); state_->finished = true; } else { ready_to_deliver = state_->is_next(state_->previous_value, *result); } if (ready_to_deliver && state_->waiting_future.is_valid()) { to_deliver = state_->waiting_future; if (result.ok()) { state_->previous_value = *result; } } else { state_->queue.push(result); } // Capture state_->finished so we can access it outside the mutex finished = state_->finished; } // Must deliver result outside of the mutex if (to_deliver.is_valid()) { to_deliver.MarkFinished(result); } else { // Otherwise, if we didn't get the next item (or a terminal item), we // need to keep looking if (!finished) { state_->source().AddCallback(Callback{state_}); } } } private: const std::shared_ptr state_; }; const std::shared_ptr state_; }; /// \brief Buffer an AsyncGenerator to return values in sequence order ComesAfter /// and IsNext determine the sequence order. /// /// ComesAfter should be a BinaryPredicate that only returns true if a comes after b /// /// IsNext should be a BinaryPredicate that returns true, given `a` and `b`, only if /// `b` follows immediately after `a`. It should return true given `initial_value` and /// `b` if `b` is the first item in the sequence. /// /// This operator will queue unboundedly while waiting for the next item. It is intended /// for jittery sources that might scatter an ordered sequence. It is NOT intended to /// sort. Using it to try and sort could result in excessive RAM usage. This generator /// will queue up to N blocks where N is the max "out of order"ness of the source. /// /// For example, if the source is 1,6,2,5,4,3 it will queue 3 blocks because 3 is 3 /// blocks beyond where it belongs. /// /// This generator is not async-reentrant but it consists only of a simple log(n) /// insertion into a priority queue. template AsyncGenerator MakeSequencingGenerator(AsyncGenerator source_generator, ComesAfter compare, IsNext is_next, T initial_value) { return SequencingGenerator( std::move(source_generator), std::move(compare), std::move(is_next), std::move(initial_value)); } /// \see MakeTransformedGenerator template class TransformingGenerator { // The transforming generator state will be referenced as an async generator but will // also be referenced via callback to various futures. If the async generator owner // moves it around we need the state to be consistent for future callbacks. struct TransformingGeneratorState : std::enable_shared_from_this { TransformingGeneratorState(AsyncGenerator generator, Transformer transformer) : generator_(std::move(generator)), transformer_(std::move(transformer)), last_value_(), finished_() {} Future operator()() { while (true) { auto maybe_next_result = Pump(); if (!maybe_next_result.ok()) { return Future::MakeFinished(maybe_next_result.status()); } auto maybe_next = std::move(maybe_next_result).ValueUnsafe(); if (maybe_next.has_value()) { return Future::MakeFinished(*std::move(maybe_next)); } auto next_fut = generator_(); // If finished already, process results immediately inside the loop to avoid // stack overflow if (next_fut.is_finished()) { auto next_result = next_fut.result(); if (next_result.ok()) { last_value_ = *next_result; } else { return Future::MakeFinished(next_result.status()); } // Otherwise, if not finished immediately, add callback to process results } else { auto self = this->shared_from_this(); return next_fut.Then([self](const T& next_result) { self->last_value_ = next_result; return (*self)(); }); } } } // See comment on TransformingIterator::Pump Result> Pump() { if (!finished_ && last_value_.has_value()) { ARROW_ASSIGN_OR_RAISE(TransformFlow next, transformer_(*last_value_)); if (next.ReadyForNext()) { if (IsIterationEnd(*last_value_)) { finished_ = true; } last_value_.reset(); } if (next.Finished()) { finished_ = true; } if (next.HasValue()) { return next.Value(); } } if (finished_) { return IterationTraits::End(); } return std::nullopt; } AsyncGenerator generator_; Transformer transformer_; std::optional last_value_; bool finished_; }; public: explicit TransformingGenerator(AsyncGenerator generator, Transformer transformer) : state_(std::make_shared(std::move(generator), std::move(transformer))) {} Future operator()() { return (*state_)(); } protected: std::shared_ptr state_; }; /// \brief Transform an async generator using a transformer function returning a new /// AsyncGenerator /// /// The transform function here behaves exactly the same as the transform function in /// MakeTransformedIterator and you can safely use the same transform function to /// transform both synchronous and asynchronous streams. /// /// This generator is not async-reentrant /// /// This generator may queue up to 1 instance of T but will not delay template AsyncGenerator MakeTransformedGenerator(AsyncGenerator generator, Transformer transformer) { return TransformingGenerator(generator, transformer); } /// \see MakeSerialReadaheadGenerator template class SerialReadaheadGenerator { public: SerialReadaheadGenerator(AsyncGenerator source_generator, int max_readahead) : state_(std::make_shared(std::move(source_generator), max_readahead)) {} Future operator()() { if (state_->first_) { // Lazy generator, need to wait for the first ask to prime the pump state_->first_ = false; auto next = state_->source_(); return next.Then(Callback{state_}, ErrCallback{state_}); } // This generator is not async-reentrant. We won't be called until the last // future finished so we know there is something in the queue auto finished = state_->finished_.load(); if (finished && state_->readahead_queue_.IsEmpty()) { return AsyncGeneratorEnd(); } std::shared_ptr> next; if (!state_->readahead_queue_.Read(next)) { return Status::UnknownError("Could not read from readahead_queue"); } auto last_available = state_->spaces_available_.fetch_add(1); if (last_available == 0 && !finished) { // Reader idled out, we need to restart it ARROW_RETURN_NOT_OK(state_->Pump(state_)); } return *next; } private: struct State { State(AsyncGenerator source, int max_readahead) : first_(true), source_(std::move(source)), finished_(false), // There is one extra "space" for the in-flight request spaces_available_(max_readahead + 1), // The SPSC queue has size-1 "usable" slots so we need to overallocate 1 readahead_queue_(max_readahead + 1) {} Status Pump(const std::shared_ptr& self) { // Can't do readahead_queue.write(source().Then(...)) because then the // callback might run immediately and add itself to the queue before this gets added // to the queue messing up the order. auto next_slot = std::make_shared>(); auto written = readahead_queue_.Write(next_slot); if (!written) { return Status::UnknownError("Could not write to readahead_queue"); } // If this Pump is being called from a callback it is possible for the source to // poll and read from the queue between the Write and this spot where we fill the // value in. However, it is not possible for the future to read this value we are // writing. That is because this callback (the callback for future X) must be // finished before future X is marked complete and this source is not pulled // reentrantly so it will not poll for future X+1 until this callback has completed. *next_slot = source_().Then(Callback{self}, ErrCallback{self}); return Status::OK(); } // Only accessed by the consumer end bool first_; // Accessed by both threads AsyncGenerator source_; std::atomic finished_; // The queue has a size but it is not atomic. We keep track of how many spaces are // left in the queue here so we know if we've just written the last value and we need // to stop reading ahead or if we've just read from a full queue and we need to // restart reading ahead std::atomic spaces_available_; // Needs to be a queue of shared_ptr and not Future because we set the value of the // future after we add it to the queue util::SpscQueue>> readahead_queue_; }; struct Callback { Result operator()(const T& next) { if (IsIterationEnd(next)) { state_->finished_.store(true); return next; } auto last_available = state_->spaces_available_.fetch_sub(1); if (last_available > 1) { ARROW_RETURN_NOT_OK(state_->Pump(state_)); } return next; } std::shared_ptr state_; }; struct ErrCallback { Result operator()(const Status& st) { state_->finished_.store(true); return st; } std::shared_ptr state_; }; std::shared_ptr state_; }; /// \see MakeFromFuture template class FutureFirstGenerator { public: explicit FutureFirstGenerator(Future> future) : state_(std::make_shared(std::move(future))) {} Future operator()() { if (state_->source_) { return state_->source_(); } else { auto state = state_; return state_->future_.Then([state](const AsyncGenerator& source) { state->source_ = source; return state->source_(); }); } } private: struct State { explicit State(Future> future) : future_(future), source_() {} Future> future_; AsyncGenerator source_; }; std::shared_ptr state_; }; /// \brief Transform a Future> into an AsyncGenerator /// that waits for the future to complete as part of the first item. /// /// This generator is not async-reentrant (even if the generator yielded by future is) /// /// This generator does not queue template AsyncGenerator MakeFromFuture(Future> future) { return FutureFirstGenerator(std::move(future)); } /// \brief Create a generator that will pull from the source into a queue. Unlike /// MakeReadaheadGenerator this will not pull reentrantly from the source. /// /// The source generator does not need to be async-reentrant /// /// This generator is not async-reentrant (even if the source is) /// /// This generator may queue up to max_readahead additional instances of T template AsyncGenerator MakeSerialReadaheadGenerator(AsyncGenerator source_generator, int max_readahead) { return SerialReadaheadGenerator(std::move(source_generator), max_readahead); } /// \brief Create a generator that immediately pulls from the source /// /// Typical generators do not pull from their source until they themselves /// are pulled. This generator does not follow that convention and will call /// generator() once before it returns. The returned generator will otherwise /// mirror the source. /// /// This generator forwards async-reentrant pressure to the source /// This generator buffers one item (the first result) until it is delivered. template AsyncGenerator MakeAutoStartingGenerator(AsyncGenerator generator) { struct AutostartGenerator { Future operator()() { if (first_future->is_valid()) { Future result = *first_future; *first_future = Future(); return result; } return source(); } std::shared_ptr> first_future; AsyncGenerator source; }; std::shared_ptr> first_future = std::make_shared>(generator()); return AutostartGenerator{std::move(first_future), std::move(generator)}; } /// \see MakeReadaheadGenerator template class ReadaheadGenerator { public: ReadaheadGenerator(AsyncGenerator source_generator, int max_readahead) : state_(std::make_shared(std::move(source_generator), max_readahead)) {} Future AddMarkFinishedContinuation(Future fut) { auto state = state_; return fut.Then( [state](const T& result) -> Future { state->MarkFinishedIfDone(result); if (state->finished.load()) { if (state->num_running.fetch_sub(1) == 1) { state->final_future.MarkFinished(); } } else { state->num_running.fetch_sub(1); } return result; }, [state](const Status& err) -> Future { // If there is an error we need to make sure all running // tasks finish before we return the error. state->finished.store(true); if (state->num_running.fetch_sub(1) == 1) { state->final_future.MarkFinished(); } return state->final_future.Then([err]() -> Result { return err; }); }); } Future operator()() { if (state_->readahead_queue.empty()) { // This is the first request, let's pump the underlying queue state_->num_running.store(state_->max_readahead); for (int i = 0; i < state_->max_readahead; i++) { auto next = state_->source_generator(); auto next_after_check = AddMarkFinishedContinuation(std::move(next)); state_->readahead_queue.push(std::move(next_after_check)); } } // Pop one and add one auto result = state_->readahead_queue.front(); state_->readahead_queue.pop(); if (state_->finished.load()) { state_->readahead_queue.push(AsyncGeneratorEnd()); } else { state_->num_running.fetch_add(1); auto back_of_queue = state_->source_generator(); auto back_of_queue_after_check = AddMarkFinishedContinuation(std::move(back_of_queue)); state_->readahead_queue.push(std::move(back_of_queue_after_check)); } return result; } private: struct State { State(AsyncGenerator source_generator, int max_readahead) : source_generator(std::move(source_generator)), max_readahead(max_readahead) {} void MarkFinishedIfDone(const T& next_result) { if (IsIterationEnd(next_result)) { finished.store(true); } } AsyncGenerator source_generator; int max_readahead; Future<> final_future = Future<>::Make(); std::atomic num_running{0}; std::atomic finished{false}; std::queue> readahead_queue; }; std::shared_ptr state_; }; /// \brief A generator where the producer pushes items on a queue. /// /// No back-pressure is applied, so this generator is mostly useful when /// producing the values is neither CPU- nor memory-expensive (e.g. fetching /// filesystem metadata). /// /// This generator is not async-reentrant. template class PushGenerator { struct State { State() {} util::Mutex mutex; std::deque> result_q; std::optional> consumer_fut; bool finished = false; }; public: /// Producer API for PushGenerator class Producer { public: explicit Producer(const std::shared_ptr& state) : weak_state_(state) {} /// \brief Push a value on the queue /// /// True is returned if the value was pushed, false if the generator is /// already closed or destroyed. If the latter, it is recommended to stop /// producing any further values. bool Push(Result result) { auto state = weak_state_.lock(); if (!state) { // Generator was destroyed return false; } auto lock = state->mutex.Lock(); if (state->finished) { // Closed early return false; } if (state->consumer_fut.has_value()) { auto fut = std::move(state->consumer_fut.value()); state->consumer_fut.reset(); lock.Unlock(); // unlock before potentially invoking a callback fut.MarkFinished(std::move(result)); } else { state->result_q.push_back(std::move(result)); } return true; } /// \brief Tell the consumer we have finished producing /// /// It is allowed to call this and later call Push() again ("early close"). /// In this case, calls to Push() after the queue is closed are silently /// ignored. This can help implementing non-trivial cancellation cases. /// /// True is returned on success, false if the generator is already closed /// or destroyed. bool Close() { auto state = weak_state_.lock(); if (!state) { // Generator was destroyed return false; } auto lock = state->mutex.Lock(); if (state->finished) { // Already closed return false; } state->finished = true; if (state->consumer_fut.has_value()) { auto fut = std::move(state->consumer_fut.value()); state->consumer_fut.reset(); lock.Unlock(); // unlock before potentially invoking a callback fut.MarkFinished(IterationTraits::End()); } return true; } /// Return whether the generator was closed or destroyed. bool is_closed() const { auto state = weak_state_.lock(); if (!state) { // Generator was destroyed return true; } auto lock = state->mutex.Lock(); return state->finished; } private: const std::weak_ptr weak_state_; }; PushGenerator() : state_(std::make_shared()) {} /// Read an item from the queue Future operator()() const { auto lock = state_->mutex.Lock(); assert(!state_->consumer_fut.has_value()); // Non-reentrant if (!state_->result_q.empty()) { auto fut = Future::MakeFinished(std::move(state_->result_q.front())); state_->result_q.pop_front(); return fut; } if (state_->finished) { return AsyncGeneratorEnd(); } auto fut = Future::Make(); state_->consumer_fut = fut; return fut; } /// \brief Return producer-side interface /// /// The returned object must be used by the producer to push values on the queue. /// Only a single Producer object should be instantiated. Producer producer() { return Producer{state_}; } private: const std::shared_ptr state_; }; /// \brief Create a generator that pulls reentrantly from a source /// This generator will pull reentrantly from a source, ensuring that max_readahead /// requests are active at any given time. /// /// The source generator must be async-reentrant /// /// This generator itself is async-reentrant. /// /// This generator may queue up to max_readahead instances of T template AsyncGenerator MakeReadaheadGenerator(AsyncGenerator source_generator, int max_readahead) { return ReadaheadGenerator(std::move(source_generator), max_readahead); } /// \brief Creates a generator that will yield finished futures from a vector /// /// This generator is async-reentrant template AsyncGenerator MakeVectorGenerator(std::vector vec) { struct State { explicit State(std::vector vec_) : vec(std::move(vec_)), vec_idx(0) {} std::vector vec; std::atomic vec_idx; }; auto state = std::make_shared(std::move(vec)); return [state]() { auto idx = state->vec_idx.fetch_add(1); if (idx >= state->vec.size()) { // Eagerly return memory state->vec.clear(); return AsyncGeneratorEnd(); } return Future::MakeFinished(state->vec[idx]); }; } /// \see MakeMergedGenerator template class MergedGenerator { // Note, the implementation of this class is quite complex at the moment (PRs to // simplify are always welcome) // // Terminology is borrowed from rxjs. This is a pull based implementation of the // mergeAll operator. The "outer subscription" refers to the async // generator that the caller provided when creating this. The outer subscription // yields generators. // // Each of these generators is then subscribed to (up to max_subscriptions) and these // are referred to as "inner subscriptions". // // As soon as we start we try and establish `max_subscriptions` inner subscriptions. For // each inner subscription we will cache up to 1 value. This means we may have more // values than we have been asked for. In our example, if a caller asks for one record // batch we will start scanning `max_subscriptions` different files. For each file we // will only queue up to 1 batch (so a separate readahead is needed on the file if batch // readahead is desired). // // If the caller is slow we may accumulate ready-to-deliver items. These are stored // in `delivered_jobs`. // // If the caller is very quick we may accumulate requests. These are stored in // `waiting_jobs`. // // It may be helpful to consider an example, in the scanner the outer subscription // is some kind of asynchronous directory listing. The inner subscription is // then a scan on a file yielded by the directory listing. // // An "outstanding" request is when we have polled either the inner or outer // subscription but that future hasn't completed yet. // // There are three possible "events" that can happen. // * A caller could request the next future // * An outer callback occurs when the next subscription is ready (e.g. the directory // listing has produced a new file) // * An inner callback occurs when one of the inner subscriptions emits a value (e.g. // a file scan emits a record batch) // // Any time an event happens the logic is broken into two phases. First, we grab the // lock and modify the shared state. While doing this we figure out what callbacks we // will need to execute. Then, we give up the lock and execute these callbacks. It is // important to execute these callbacks without the lock to avoid deadlock. public: explicit MergedGenerator(AsyncGenerator> source, int max_subscriptions) : state_(std::make_shared(std::move(source), max_subscriptions)) {} Future operator()() { // A caller has requested a future Future waiting_future; std::shared_ptr delivered_job; bool mark_generator_complete = false; { auto guard = state_->mutex.Lock(); if (!state_->delivered_jobs.empty()) { // If we have a job sitting around we can deliver it delivered_job = std::move(state_->delivered_jobs.front()); state_->delivered_jobs.pop_front(); if (state_->IsCompleteUnlocked(guard)) { // It's possible this waiting job was the only thing left to handle and // we have now completed the generator. mark_generator_complete = true; } else { // Since we had a job sitting around we also had an inner subscription // that had paused. We are going to restart this inner subscription and // so there will be a new outstanding request. state_->outstanding_requests++; } } else if (state_->broken || (!state_->first && state_->num_running_subscriptions == 0)) { // If we are broken or exhausted then prepare a terminal item but // we won't complete it until we've finished. Result end_res = IterationEnd(); if (!state_->final_error.ok()) { end_res = state_->final_error; state_->final_error = Status::OK(); } return state_->all_finished.Then([end_res]() -> Result { return end_res; }); } else { // Otherwise we just queue the request and it will be completed when one of the // ongoing inner subscriptions delivers a result waiting_future = Future::Make(); state_->waiting_jobs.push_back(std::make_shared>(waiting_future)); } if (state_->first) { // On the first request we are going to try and immediately fill our queue // of subscriptions. We assume we are going to be able to start them all. state_->outstanding_requests += static_cast(state_->active_subscriptions.size()); state_->num_running_subscriptions += static_cast(state_->active_subscriptions.size()); } } // If we grabbed a finished item from the delivered_jobs queue then we may need // to mark the generator finished or issue a request for a new item to fill in // the spot we just vacated. Notice that we issue that request to the same // subscription that delivered it (deliverer). if (delivered_job) { if (mark_generator_complete) { state_->all_finished.MarkFinished(); } else { delivered_job->deliverer().AddCallback( InnerCallback(state_, delivered_job->index)); } return std::move(delivered_job->value); } // On the first call we try and fill up our subscriptions. It's possible the outer // generator only has a few items and we can't fill up to what we were hoping. In // that case we have to bail early. if (state_->first) { state_->first = false; mark_generator_complete = false; for (int i = 0; i < static_cast(state_->active_subscriptions.size()); i++) { state_->PullSource().AddCallback( OuterCallback{state_, static_cast(i)}); // If we have to bail early then we need to update the shared state again so // we need to reacquire the lock. auto guard = state_->mutex.Lock(); if (state_->source_exhausted) { int excess_requests = static_cast(state_->active_subscriptions.size()) - i - 1; state_->outstanding_requests -= excess_requests; state_->num_running_subscriptions -= excess_requests; if (excess_requests > 0) { // It's possible that we are completing the generator by reducing the number // of outstanding requests (e.g. this happens when the outer subscription and // all inner subscriptions are synchronous) mark_generator_complete = state_->IsCompleteUnlocked(guard); } break; } } if (mark_generator_complete) { state_->MarkFinishedAndPurge(); } } return waiting_future; } private: struct DeliveredJob { explicit DeliveredJob(AsyncGenerator deliverer_, Result value_, std::size_t index_) : deliverer(deliverer_), value(std::move(value_)), index(index_) {} // The generator that delivered this result, we will request another item // from this generator once the result is delivered AsyncGenerator deliverer; // The result we received from the generator Result value; // The index of the generator (in active_subscriptions) that delivered this // result. This is used if we need to replace a finished generator. std::size_t index; }; struct State { State(AsyncGenerator> source, int max_subscriptions) : source(std::move(source)), active_subscriptions(max_subscriptions), delivered_jobs(), waiting_jobs(), mutex(), first(true), broken(false), source_exhausted(false), outstanding_requests(0), num_running_subscriptions(0), final_error(Status::OK()) {} Future> PullSource() { // Need to guard access to source() so we don't pull sync-reentrantly which // is never valid. auto lock = mutex.Lock(); return source(); } void SignalErrorUnlocked(const util::Mutex::Guard& guard) { broken = true; // Empty any results that have arrived but not asked for. while (!delivered_jobs.empty()) { delivered_jobs.pop_front(); } } // This function is called outside the mutex but it will only ever be // called once void MarkFinishedAndPurge() { all_finished.MarkFinished(); while (!waiting_jobs.empty()) { waiting_jobs.front()->MarkFinished(IterationEnd()); waiting_jobs.pop_front(); } } // This is called outside the mutex but it is only ever called // once and Future<>::AddCallback is thread-safe void MarkFinalError(const Status& err, Future maybe_sink) { if (maybe_sink.is_valid()) { // Someone is waiting for this error so lets mark it complete when // all the work is done all_finished.AddCallback([maybe_sink, err](const Status& status) mutable { maybe_sink.MarkFinished(err); }); } else { // No one is waiting for this error right now so it will be delivered // next. final_error = err; } } bool IsCompleteUnlocked(const util::Mutex::Guard& guard) { return outstanding_requests == 0 && (broken || (source_exhausted && num_running_subscriptions == 0 && delivered_jobs.empty())); } bool MarkTaskFinishedUnlocked(const util::Mutex::Guard& guard) { --outstanding_requests; return IsCompleteUnlocked(guard); } // The outer generator. Each item we pull from this will be its own generator // and become an inner subscription AsyncGenerator> source; // active_subscriptions and delivered_jobs will be bounded by max_subscriptions std::vector> active_subscriptions; // Results delivered by the inner subscriptions that weren't yet asked for by the // caller std::deque> delivered_jobs; // waiting_jobs is unbounded, reentrant pulls (e.g. AddReadahead) will provide the // backpressure std::deque>> waiting_jobs; // A future that will be marked complete when the terminal item has arrived and all // outstanding futures have completed. It is used to hold off emission of an error // until all outstanding work is done. Future<> all_finished = Future<>::Make(); util::Mutex mutex; // A flag cleared when the caller firsts asks for a future. Used to start polling. bool first; // A flag set when an error arrives, prevents us from issuing new requests. bool broken; // A flag set when the outer subscription has been exhausted. Prevents us from // pulling it further (even though it would be generally harmless) and lets us know we // are finishing up. bool source_exhausted; // The number of futures that we have requested from either the outer or inner // subscriptions that have not yet completed. We cannot mark all_finished until this // reaches 0. This will never be greater than max_subscriptions int outstanding_requests; // The number of running subscriptions. We ramp this up to `max_subscriptions` as // soon as the first item is requested and then it stays at that level (each exhausted // inner subscription is replaced by a new inner subscription) until the outer // subscription is exhausted at which point this descends to 0 (and source_exhausted) // is then set to true. int num_running_subscriptions; // If an error arrives, and the caller hasn't asked for that item, we store the error // here. It is analagous to delivered_jobs but for errors instead of finished // results. Status final_error; }; struct InnerCallback { InnerCallback(std::shared_ptr state, std::size_t index, bool recursive = false) : state(std::move(state)), index(index), recursive(recursive) {} void operator()(const Result& maybe_next_ref) { // An item has been delivered by one of the inner subscriptions Future next_fut; const Result* maybe_next = &maybe_next_ref; // When an item is delivered (and the caller has asked for it) we grab the // next item from the inner subscription. To avoid this behavior leading to an // infinite loop (this can happen if the caller's callback asks for the next item) // we use a while loop. while (true) { Future sink; bool sub_finished = maybe_next->ok() && IsIterationEnd(**maybe_next); bool pull_next_sub = false; bool was_broken = false; bool should_mark_gen_complete = false; bool should_mark_final_error = false; { auto guard = state->mutex.Lock(); if (state->broken) { // We've errored out previously so ignore the result. If anyone was waiting // for this they will get IterationEnd when we purge was_broken = true; } else { if (!sub_finished) { // There is a result to deliver. Either we can deliver it now or we will // queue it up if (state->waiting_jobs.empty()) { state->delivered_jobs.push_back(std::make_shared( state->active_subscriptions[index], *maybe_next, index)); } else { sink = std::move(*state->waiting_jobs.front()); state->waiting_jobs.pop_front(); } } // If this is the first error then we transition the state to a broken state if (!maybe_next->ok()) { should_mark_final_error = true; state->SignalErrorUnlocked(guard); } } // If we finished this inner subscription then we need to grab a new inner // subscription to take its spot. If we can't (because we're broken or // exhausted) then we aren't going to be starting any new futures and so // the number of running subscriptions drops. pull_next_sub = sub_finished && !state->source_exhausted && !was_broken; if (sub_finished && !pull_next_sub) { state->num_running_subscriptions--; } // There are three situations we won't pull again. If an error occurred or we // are already finished or if no one was waiting for our result and so we queued // it up. We will decrement outstanding_requests and possibly mark the // generator completed. if (state->broken || (!sink.is_valid() && !sub_finished) || (sub_finished && state->source_exhausted)) { if (state->MarkTaskFinishedUnlocked(guard)) { should_mark_gen_complete = true; } } } // Now we have given up the lock and we can take all the actions we decided we // need to take. if (should_mark_final_error) { state->MarkFinalError(maybe_next->status(), std::move(sink)); } if (should_mark_gen_complete) { state->MarkFinishedAndPurge(); } // An error occurred elsewhere so there is no need to mark any future // finished (will happen during the purge) or pull from anything if (was_broken) { return; } if (pull_next_sub) { if (recursive) { was_empty = true; return; } // We pulled an end token so we need to start a new subscription // in our spot state->PullSource().AddCallback(OuterCallback{state, index}); } else if (sink.is_valid()) { // We pulled a valid result and there was someone waiting for it // so lets fetch the next result from our subscription sink.MarkFinished(*maybe_next); next_fut = state->active_subscriptions[index](); if (next_fut.TryAddCallback([this]() { return InnerCallback(state, index); })) { return; } // Already completed. Avoid very deep recursion by looping // here instead of relying on the callback. maybe_next = &next_fut.result(); continue; } // else: We pulled a valid result but no one was waiting for it so // we can just stop. return; } } std::shared_ptr state; std::size_t index; bool recursive; bool was_empty = false; }; struct OuterCallback { void operator()(const Result>& initial_maybe_next) { Result> maybe_next = initial_maybe_next; while (true) { // We have been given a new inner subscription bool should_continue = false; bool should_mark_gen_complete = false; bool should_deliver_error = false; bool source_exhausted = maybe_next.ok() && IsIterationEnd(*maybe_next); Future error_sink; { auto guard = state->mutex.Lock(); if (!maybe_next.ok() || source_exhausted || state->broken) { // If here then we will not pull any more from the outer source if (!state->broken && !maybe_next.ok()) { state->SignalErrorUnlocked(guard); // If here then we are the first error so we need to deliver it should_deliver_error = true; if (!state->waiting_jobs.empty()) { error_sink = std::move(*state->waiting_jobs.front()); state->waiting_jobs.pop_front(); } } if (source_exhausted) { state->source_exhausted = true; state->num_running_subscriptions--; } if (state->MarkTaskFinishedUnlocked(guard)) { should_mark_gen_complete = true; } } else { state->active_subscriptions[index] = *maybe_next; should_continue = true; } } if (should_deliver_error) { state->MarkFinalError(maybe_next.status(), std::move(error_sink)); } if (should_mark_gen_complete) { state->MarkFinishedAndPurge(); } if (should_continue) { // There is a possibility that a large sequence of immediately available inner // callbacks could lead to a stack overflow. To avoid this we need to // synchronously loop through inner/outer callbacks until we either find an // unfinished future or we find an actual item to deliver. Future next_item = (*maybe_next)(); if (!next_item.TryAddCallback([this] { return InnerCallback(state, index); })) { // By setting recursive to true we signal to the inner callback that, if it is // empty, instead of adding a new outer callback, it should just immediately // return, flagging was_empty so that we know we need to check the next // subscription. InnerCallback immediate_inner(state, index, /*recursive=*/true); immediate_inner(next_item.result()); if (immediate_inner.was_empty) { Future> next_source = state->PullSource(); if (next_source.TryAddCallback([this] { return OuterCallback{state, index}; })) { // We hit an unfinished future so we can stop looping return; } // The current subscription was immediately and synchronously empty // and we were able to synchronously pull the next subscription so we // can keep looping. maybe_next = next_source.result(); continue; } } } return; } } std::shared_ptr state; std::size_t index; }; std::shared_ptr state_; }; /// \brief Create a generator that takes in a stream of generators and pulls from up to /// max_subscriptions at a time /// /// Note: This may deliver items out of sequence. For example, items from the third /// AsyncGenerator generated by the source may be emitted before some items from the first /// AsyncGenerator generated by the source. /// /// This generator will pull from source async-reentrantly unless max_subscriptions is 1 /// This generator will not pull from the individual subscriptions reentrantly. Add /// readahead to the individual subscriptions if that is desired. /// This generator is async-reentrant /// /// This generator may queue up to max_subscriptions instances of T template AsyncGenerator MakeMergedGenerator(AsyncGenerator> source, int max_subscriptions) { return MergedGenerator(std::move(source), max_subscriptions); } template Result> MakeSequencedMergedGenerator( AsyncGenerator> source, int max_subscriptions) { if (max_subscriptions < 0) { return Status::Invalid("max_subscriptions must be a positive integer"); } if (max_subscriptions == 1) { return Status::Invalid("Use MakeConcatenatedGenerator if max_subscriptions is 1"); } AsyncGenerator> autostarting_source = MakeMappedGenerator( std::move(source), [](const AsyncGenerator& sub) { return MakeAutoStartingGenerator(sub); }); AsyncGenerator> sub_readahead = MakeSerialReadaheadGenerator(std::move(autostarting_source), max_subscriptions - 1); return MakeConcatenatedGenerator(std::move(sub_readahead)); } /// \brief Create a generator that takes in a stream of generators and pulls from each /// one in sequence. /// /// This generator is async-reentrant but will never pull from source reentrantly and /// will never pull from any subscription reentrantly. /// /// This generator may queue 1 instance of T /// /// TODO: Could potentially make a bespoke implementation instead of MergedGenerator that /// forwards async-reentrant requests instead of buffering them (which is what /// MergedGenerator does) template AsyncGenerator MakeConcatenatedGenerator(AsyncGenerator> source) { return MergedGenerator(std::move(source), 1); } template struct Enumerated { T value; int index; bool last; }; template struct IterationTraits> { static Enumerated End() { return Enumerated{IterationEnd(), -1, false}; } static bool IsEnd(const Enumerated& val) { return val.index < 0; } }; /// \see MakeEnumeratedGenerator template class EnumeratingGenerator { public: EnumeratingGenerator(AsyncGenerator source, T initial_value) : state_(std::make_shared(std::move(source), std::move(initial_value))) {} Future> operator()() { if (state_->finished) { return AsyncGeneratorEnd>(); } else { auto state = state_; return state->source().Then([state](const T& next) { auto finished = IsIterationEnd(next); auto prev = Enumerated{state->prev_value, state->prev_index, finished}; state->prev_value = next; state->prev_index++; state->finished = finished; return prev; }); } } private: struct State { State(AsyncGenerator source, T initial_value) : source(std::move(source)), prev_value(std::move(initial_value)), prev_index(0) { finished = IsIterationEnd(prev_value); } AsyncGenerator source; T prev_value; int prev_index; bool finished; }; std::shared_ptr state_; }; /// Wrap items from a source generator with positional information /// /// When used with MakeMergedGenerator and MakeSequencingGenerator this allows items to be /// processed in a "first-available" fashion and later resequenced which can reduce the /// impact of sources with erratic performance (e.g. a filesystem where some items may /// take longer to read than others). /// /// TODO(ARROW-12371) Would require this generator be async-reentrant /// /// \see MakeSequencingGenerator for an example of putting items back in order /// /// This generator is not async-reentrant /// /// This generator buffers one item (so it knows which item is the last item) template AsyncGenerator> MakeEnumeratedGenerator(AsyncGenerator source) { return FutureFirstGenerator>( source().Then([source](const T& initial_value) -> AsyncGenerator> { return EnumeratingGenerator(std::move(source), initial_value); })); } /// \see MakeTransferredGenerator template class TransferringGenerator { public: explicit TransferringGenerator(AsyncGenerator source, internal::Executor* executor) : source_(std::move(source)), executor_(executor) {} Future operator()() { return executor_->Transfer(source_()); } private: AsyncGenerator source_; internal::Executor* executor_; }; /// \brief Transfer a future to an underlying executor. /// /// Continuations run on the returned future will be run on the given executor /// if they cannot be run synchronously. /// /// This is often needed to move computation off I/O threads or other external /// completion sources and back on to the CPU executor so the I/O thread can /// stay busy and focused on I/O /// /// Keep in mind that continuations called on an already completed future will /// always be run synchronously and so no transfer will happen in that case. /// /// This generator is async reentrant if the source is /// /// This generator will not queue template AsyncGenerator MakeTransferredGenerator(AsyncGenerator source, internal::Executor* executor) { return TransferringGenerator(std::move(source), executor); } /// \see MakeBackgroundGenerator template class BackgroundGenerator { public: explicit BackgroundGenerator(Iterator it, internal::Executor* io_executor, int max_q, int q_restart) : state_(std::make_shared(io_executor, std::move(it), max_q, q_restart)), cleanup_(std::make_shared(state_.get())) {} Future operator()() { auto guard = state_->mutex.Lock(); Future waiting_future; if (state_->queue.empty()) { if (state_->finished) { return AsyncGeneratorEnd(); } else { waiting_future = Future::Make(); state_->waiting_future = waiting_future; } } else { auto next = Future::MakeFinished(std::move(state_->queue.front())); state_->queue.pop(); if (state_->NeedsRestart()) { return state_->RestartTask(state_, std::move(guard), std::move(next)); } return next; } // This should only trigger the very first time this method is called if (state_->NeedsRestart()) { return state_->RestartTask(state_, std::move(guard), std::move(waiting_future)); } return waiting_future; } protected: static constexpr uint64_t kUnlikelyThreadId{std::numeric_limits::max()}; struct State { State(internal::Executor* io_executor, Iterator it, int max_q, int q_restart) : io_executor(io_executor), max_q(max_q), q_restart(q_restart), it(std::move(it)), reading(false), finished(false), should_shutdown(false) {} void ClearQueue() { while (!queue.empty()) { queue.pop(); } } bool TaskIsRunning() const { return task_finished.is_valid(); } bool NeedsRestart() const { return !finished && !reading && static_cast(queue.size()) <= q_restart; } void DoRestartTask(std::shared_ptr state, util::Mutex::Guard guard) { // If we get here we are actually going to start a new task so let's create a // task_finished future for it state->task_finished = Future<>::Make(); state->reading = true; auto spawn_status = io_executor->Spawn( [state]() { BackgroundGenerator::WorkerTask(std::move(state)); }); if (!spawn_status.ok()) { // If we can't spawn a new task then send an error to the consumer (either via a // waiting future or the queue) and mark ourselves finished state->finished = true; state->task_finished = Future<>(); if (waiting_future.has_value()) { auto to_deliver = std::move(waiting_future.value()); waiting_future.reset(); guard.Unlock(); to_deliver.MarkFinished(spawn_status); } else { ClearQueue(); queue.push(spawn_status); } } } Future RestartTask(std::shared_ptr state, util::Mutex::Guard guard, Future next) { if (TaskIsRunning()) { // If the task is still cleaning up we need to wait for it to finish before // restarting. We also want to block the consumer until we've restarted the // reader to avoid multiple restarts return task_finished.Then([state, next]() { // This may appear dangerous (recursive mutex) but we should be guaranteed the // outer guard has been released by this point. We know... // * task_finished is not already finished (it would be invalid in that case) // * task_finished will not be marked complete until we've given up the mutex auto guard_ = state->mutex.Lock(); state->DoRestartTask(state, std::move(guard_)); return next; }); } // Otherwise we can restart immediately DoRestartTask(std::move(state), std::move(guard)); return next; } internal::Executor* io_executor; const int max_q; const int q_restart; Iterator it; std::atomic worker_thread_id{kUnlikelyThreadId}; // If true, the task is actively pumping items from the queue and does not need a // restart bool reading; // Set to true when a terminal item arrives bool finished; // Signal to the background task to end early because consumers have given up on it bool should_shutdown; // If the queue is empty, the consumer will create a waiting future and wait for it std::queue> queue; std::optional> waiting_future; // Every background task is given a future to complete when it is entirely finished // processing and ready for the next task to start or for State to be destroyed Future<> task_finished; util::Mutex mutex; }; // Cleanup task that will be run when all consumer references to the generator are lost struct Cleanup { explicit Cleanup(State* state) : state(state) {} ~Cleanup() { /// TODO: Once ARROW-13109 is available then we can be force consumers to spawn and /// there is no need to perform this check. /// /// It's a deadlock if we enter cleanup from /// the worker thread but it can happen if the consumer doesn't transfer away assert(state->worker_thread_id.load() != ::arrow::internal::GetThreadId()); Future<> finish_fut; { auto lock = state->mutex.Lock(); if (!state->TaskIsRunning()) { return; } // Signal the current task to stop and wait for it to finish state->should_shutdown = true; finish_fut = state->task_finished; } // Using future as a condition variable here Status st = finish_fut.status(); ARROW_UNUSED(st); } State* state; }; static void WorkerTask(std::shared_ptr state) { state->worker_thread_id.store(::arrow::internal::GetThreadId()); // We need to capture the state to read while outside the mutex bool reading = true; while (reading) { auto next = state->it.Next(); // Need to capture state->waiting_future inside the mutex to mark finished outside Future waiting_future; { auto guard = state->mutex.Lock(); if (state->should_shutdown) { state->finished = true; break; } if (!next.ok() || IsIterationEnd(*next)) { // Terminal item. Mark finished to true, send this last item, and quit state->finished = true; if (!next.ok()) { state->ClearQueue(); } } // At this point we are going to send an item. Either we will add it to the // queue or deliver it to a waiting future. if (state->waiting_future.has_value()) { waiting_future = std::move(state->waiting_future.value()); state->waiting_future.reset(); } else { state->queue.push(std::move(next)); // We just filled up the queue so it is time to quit. We may need to notify // a cleanup task so we transition to Quitting if (static_cast(state->queue.size()) >= state->max_q) { state->reading = false; } } reading = state->reading && !state->finished; } // This should happen outside the mutex. Presumably there is a // transferring generator on the other end that will quickly transfer any // callbacks off of this thread so we can continue looping. Still, best not to // rely on that if (waiting_future.is_valid()) { waiting_future.MarkFinished(next); } } // Once we've sent our last item we can notify any waiters that we are done and so // either state can be cleaned up or a new background task can be started Future<> task_finished; { auto guard = state->mutex.Lock(); // After we give up the mutex state can be safely deleted. We will no longer // reference it. We can safely transition to idle now. task_finished = state->task_finished; state->task_finished = Future<>(); state->worker_thread_id.store(kUnlikelyThreadId); } task_finished.MarkFinished(); } std::shared_ptr state_; // state_ is held by both the generator and the background thread so it won't be cleaned // up when all consumer references are relinquished. cleanup_ is only held by the // generator so it will be destructed when the last consumer reference is gone. We use // this to cleanup / stop the background generator in case the consuming end stops // listening (e.g. due to a downstream error) std::shared_ptr cleanup_; }; constexpr int kDefaultBackgroundMaxQ = 32; constexpr int kDefaultBackgroundQRestart = 16; /// \brief Create an AsyncGenerator by iterating over an Iterator on a background /// thread /// /// The parameter max_q and q_restart control queue size and background thread task /// management. If the background task is fast you typically don't want it creating a /// thread task for every item. Instead the background thread will run until it fills /// up a readahead queue. /// /// Once the queue has filled up the background thread task will terminate (allowing other /// I/O tasks to use the thread). Once the queue has been drained enough (specified by /// q_restart) then the background thread task will be restarted. If q_restart is too low /// then you may exhaust the queue waiting for the background thread task to start running /// again. If it is too high then it will be constantly stopping and restarting the /// background queue task /// /// The "background thread" is a logical thread and will run as tasks on the io_executor. /// This thread may stop and start when the queue fills up but there will only be one /// active background thread task at any given time. You MUST transfer away from this /// background generator. Otherwise there could be a race condition if a callback on the /// background thread deletes the last consumer reference to the background generator. You /// can transfer onto the same executor as the background thread, it is only necessary to /// create a new thread task, not to switch executors. /// /// This generator is not async-reentrant /// /// This generator will queue up to max_q blocks template static Result> MakeBackgroundGenerator( Iterator iterator, internal::Executor* io_executor, int max_q = kDefaultBackgroundMaxQ, int q_restart = kDefaultBackgroundQRestart) { if (max_q < q_restart) { return Status::Invalid("max_q must be >= q_restart"); } return BackgroundGenerator(std::move(iterator), io_executor, max_q, q_restart); } /// \brief Create an AsyncGenerator by iterating over an Iterator synchronously /// /// This should only be used if you know the source iterator does not involve any /// I/O (or other blocking calls). Otherwise a CPU thread will be blocked and, depending /// on the complexity of the iterator, it may lead to deadlock. /// /// If you are not certain if there will be I/O then it is better to use /// MakeBackgroundGenerator. If helpful you can think of this as the AsyncGenerator /// equivalent of Future::MakeFinished /// /// It is impossible to call this in an async-reentrant manner since the returned /// future will be completed by the time it is polled. /// /// This generator does not queue template static Result> MakeBlockingGenerator( std::shared_ptr> iterator) { return [it = std::move(iterator)]() mutable -> Future { return Future::MakeFinished(it->Next()); }; } template static Result> MakeBlockingGenerator(Iterator iterator) { return MakeBlockingGenerator(std::make_shared>(std::move(iterator))); } /// \see MakeGeneratorIterator template class GeneratorIterator { public: explicit GeneratorIterator(AsyncGenerator source) : source_(std::move(source)) {} Result Next() { return source_().result(); } private: AsyncGenerator source_; }; /// \brief Convert an AsyncGenerator to an Iterator which blocks until each future /// is finished template Iterator MakeGeneratorIterator(AsyncGenerator source) { return Iterator(GeneratorIterator(std::move(source))); } /// \brief Add readahead to an iterator using a background thread. /// /// Under the hood this is converting the iterator to a generator using /// MakeBackgroundGenerator, adding readahead to the converted generator with /// MakeReadaheadGenerator, and then converting back to an iterator using /// MakeGeneratorIterator. template Result> MakeReadaheadIterator(Iterator it, int readahead_queue_size) { ARROW_ASSIGN_OR_RAISE(auto io_executor, internal::ThreadPool::Make(1)); auto max_q = readahead_queue_size; auto q_restart = std::max(1, max_q / 2); ARROW_ASSIGN_OR_RAISE( auto background_generator, MakeBackgroundGenerator(std::move(it), io_executor.get(), max_q, q_restart)); // Capture io_executor to keep it alive as long as owned_bg_generator is still // referenced AsyncGenerator owned_bg_generator = [io_executor, background_generator]() { return background_generator(); }; return MakeGeneratorIterator(std::move(owned_bg_generator)); } /// \brief Make a generator that returns a single pre-generated future /// /// This generator is async-reentrant. template std::function()> MakeSingleFutureGenerator(Future future) { assert(future.is_valid()); auto state = std::make_shared>(std::move(future)); return [state]() -> Future { auto fut = std::move(*state); if (fut.is_valid()) { return fut; } else { return AsyncGeneratorEnd(); } }; } /// \brief Make a generator that immediately ends. /// /// This generator is async-reentrant. template std::function()> MakeEmptyGenerator() { return []() -> Future { return AsyncGeneratorEnd(); }; } /// \brief Make a generator that always fails with a given error /// /// This generator is async-reentrant. template AsyncGenerator MakeFailingGenerator(Status st) { assert(!st.ok()); auto state = std::make_shared(std::move(st)); return [state]() -> Future { auto st = std::move(*state); if (!st.ok()) { return std::move(st); } else { return AsyncGeneratorEnd(); } }; } /// \brief Make a generator that always fails with a given error /// /// This overload allows inferring the return type from the argument. template AsyncGenerator MakeFailingGenerator(const Result& result) { return MakeFailingGenerator(result.status()); } /// \brief Prepend initial_values onto a generator /// /// This generator is async-reentrant but will buffer requests and will not /// pull from following_values async-reentrantly. template AsyncGenerator MakeGeneratorStartsWith(std::vector initial_values, AsyncGenerator following_values) { auto initial_values_vec_gen = MakeVectorGenerator(std::move(initial_values)); auto gen_gen = MakeVectorGenerator>( {std::move(initial_values_vec_gen), std::move(following_values)}); return MakeConcatenatedGenerator(std::move(gen_gen)); } template struct CancellableGenerator { Future operator()() { if (stop_token.IsStopRequested()) { return stop_token.Poll(); } return source(); } AsyncGenerator source; StopToken stop_token; }; /// \brief Allow an async generator to be cancelled /// /// This generator is async-reentrant template AsyncGenerator MakeCancellable(AsyncGenerator source, StopToken stop_token) { return CancellableGenerator{std::move(source), std::move(stop_token)}; } template class DefaultIfEmptyGenerator { public: DefaultIfEmptyGenerator(AsyncGenerator source, T or_value) : state_(std::make_shared(std::move(source), std::move(or_value))) {} Future operator()() { if (state_->first) { state_->first = false; struct { T or_value; Result operator()(const T& value) { if (IterationTraits::IsEnd(value)) { return std::move(or_value); } return value; } } Continuation; Continuation.or_value = std::move(state_->or_value); return state_->source().Then(std::move(Continuation)); } return state_->source(); } private: struct State { AsyncGenerator source; T or_value; bool first; State(AsyncGenerator source_, T or_value_) : source(std::move(source_)), or_value(std::move(or_value_)), first(true) {} }; std::shared_ptr state_; }; /// \brief If the generator is empty, return the given value, else /// forward the values from the generator. /// /// This generator is async-reentrant. template AsyncGenerator MakeDefaultIfEmptyGenerator(AsyncGenerator source, T or_value) { return DefaultIfEmptyGenerator(std::move(source), std::move(or_value)); } } // namespace arrow