// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #pragma once #include #include #include #include #include #include "arrow/acero/type_fwd.h" #include "arrow/acero/visibility.h" #include "arrow/compute/api_aggregate.h" #include "arrow/compute/api_vector.h" #include "arrow/compute/exec.h" #include "arrow/compute/expression.h" #include "arrow/record_batch.h" #include "arrow/result.h" #include "arrow/util/async_generator.h" #include "arrow/util/async_util.h" namespace arrow { using compute::Aggregate; using compute::ExecBatch; using compute::Expression; using compute::literal; using compute::Ordering; using compute::SelectKOptions; using compute::SortOptions; namespace internal { class Executor; } // namespace internal namespace acero { /// \brief This must not be used in release-mode struct DebugOptions; using AsyncExecBatchGenerator = AsyncGenerator>; /// \addtogroup acero-nodes /// @{ /// \brief A base class for all options objects /// /// The only time this is used directly is when a node has no configuration class ARROW_ACERO_EXPORT ExecNodeOptions { public: virtual ~ExecNodeOptions() = default; /// \brief This must not be used in release-mode std::shared_ptr debug_opts; }; /// \brief A node representing a generic source of data for Acero /// /// The source node will start calling `generator` during StartProducing. An initial /// task will be created that will call `generator`. It will not call `generator` /// reentrantly. If the source can be read in parallel then those details should be /// encapsulated within `generator`. /// /// For each batch received a new task will be created to push that batch downstream. /// This task will slice smaller units of size `ExecPlan::kMaxBatchSize` from the /// parent batch and call InputReceived. Thus, if the `generator` yields a large /// batch it may result in several calls to InputReceived. /// /// The SourceNode will, by default, assign an implicit ordering to outgoing batches. /// This is valid as long as the generator generates batches in a deterministic fashion. /// Currently, the only way to override this is to subclass the SourceNode. /// /// This node is not generally used directly but can serve as the basis for various /// specialized nodes. class ARROW_ACERO_EXPORT SourceNodeOptions : public ExecNodeOptions { public: /// Create an instance from values SourceNodeOptions(std::shared_ptr output_schema, std::function>()> generator) : output_schema(std::move(output_schema)), generator(std::move(generator)) {} /// \brief the schema for batches that will be generated by this source std::shared_ptr output_schema; /// \brief an asynchronous stream of batches ending with std::nullopt std::function>()> generator; }; /// \brief a node that generates data from a table already loaded in memory /// /// The table source node will slice off chunks, defined by `max_batch_size` /// for parallel processing. The table source node extends source node and so these /// chunks will be iteratively processed in small batches. \see SourceNodeOptions /// for details. class ARROW_ACERO_EXPORT TableSourceNodeOptions : public ExecNodeOptions { public: static constexpr int64_t kDefaultMaxBatchSize = 1 << 20; /// Create an instance from values TableSourceNodeOptions(std::shared_ptr table, int64_t max_batch_size = kDefaultMaxBatchSize) : table(std::move(table)), max_batch_size(max_batch_size) {} /// \brief a table which acts as the data source std::shared_ptr
table; /// \brief size of batches to emit from this node /// If the table is larger the node will emit multiple batches from the /// the table to be processed in parallel. int64_t max_batch_size; }; /// \brief define a lazily resolved Arrow table. /// /// The table uniquely identified by the names can typically be resolved at the time when /// the plan is to be consumed. /// /// This node is for serialization purposes only and can never be executed. class ARROW_ACERO_EXPORT NamedTableNodeOptions : public ExecNodeOptions { public: /// Create an instance from values NamedTableNodeOptions(std::vector names, std::shared_ptr schema) : names(std::move(names)), schema(std::move(schema)) {} /// \brief the names to put in the serialized plan std::vector names; /// \brief the output schema of the table std::shared_ptr schema; }; /// \brief a source node which feeds data from a synchronous iterator of batches /// /// ItMaker is a maker of an iterator of tabular data. /// /// The node can be configured to use an I/O executor. If set then each time the /// iterator is polled a new I/O thread task will be created to do the polling. This /// allows a blocking iterator to stay off the CPU thread pool. template class ARROW_ACERO_EXPORT SchemaSourceNodeOptions : public ExecNodeOptions { public: /// Create an instance that will create a new task on io_executor for each iteration SchemaSourceNodeOptions(std::shared_ptr schema, ItMaker it_maker, arrow::internal::Executor* io_executor) : schema(std::move(schema)), it_maker(std::move(it_maker)), io_executor(io_executor), requires_io(true) {} /// Create an instance that will either iterate synchronously or use the default I/O /// executor SchemaSourceNodeOptions(std::shared_ptr schema, ItMaker it_maker, bool requires_io = false) : schema(std::move(schema)), it_maker(std::move(it_maker)), io_executor(NULLPTR), requires_io(requires_io) {} /// \brief The schema of the record batches from the iterator std::shared_ptr schema; /// \brief A maker of an iterator which acts as the data source ItMaker it_maker; /// \brief The executor to use for scanning the iterator /// /// Defaults to the default I/O executor. Only used if requires_io is true. /// If requires_io is false then this MUST be nullptr. arrow::internal::Executor* io_executor; /// \brief If true then items will be fetched from the iterator on a dedicated I/O /// thread to keep I/O off the CPU thread bool requires_io; }; /// a source node that reads from a RecordBatchReader /// /// Each iteration of the RecordBatchReader will be run on a new thread task created /// on the I/O thread pool. class ARROW_ACERO_EXPORT RecordBatchReaderSourceNodeOptions : public ExecNodeOptions { public: /// Create an instance from values RecordBatchReaderSourceNodeOptions(std::shared_ptr reader, arrow::internal::Executor* io_executor = NULLPTR) : reader(std::move(reader)), io_executor(io_executor) {} /// \brief The RecordBatchReader which acts as the data source std::shared_ptr reader; /// \brief The executor to use for the reader /// /// Defaults to the default I/O executor. arrow::internal::Executor* io_executor; }; /// a source node that reads from an iterator of array vectors using ArrayVectorIteratorMaker = std::function>()>; /// \brief An extended Source node which accepts a schema and array-vectors class ARROW_ACERO_EXPORT ArrayVectorSourceNodeOptions : public SchemaSourceNodeOptions { using SchemaSourceNodeOptions::SchemaSourceNodeOptions; }; /// a source node that reads from an iterator of ExecBatch using ExecBatchIteratorMaker = std::function>()>; /// \brief An extended Source node which accepts a schema and exec-batches class ARROW_ACERO_EXPORT ExecBatchSourceNodeOptions : public SchemaSourceNodeOptions { public: using SchemaSourceNodeOptions::SchemaSourceNodeOptions; ExecBatchSourceNodeOptions(std::shared_ptr schema, std::vector batches, ::arrow::internal::Executor* io_executor); ExecBatchSourceNodeOptions(std::shared_ptr schema, std::vector batches, bool requires_io = false); }; using RecordBatchIteratorMaker = std::function>()>; /// a source node that reads from an iterator of RecordBatch class ARROW_ACERO_EXPORT RecordBatchSourceNodeOptions : public SchemaSourceNodeOptions { using SchemaSourceNodeOptions::SchemaSourceNodeOptions; }; /// \brief a node which excludes some rows from batches passed through it /// /// filter_expression will be evaluated against each batch which is pushed to /// this node. Any rows for which filter_expression does not evaluate to `true` will be /// excluded in the batch emitted by this node. /// /// This node will emit empty batches if all rows are excluded. This is done /// to avoid gaps in the ordering. class ARROW_ACERO_EXPORT FilterNodeOptions : public ExecNodeOptions { public: /// \brief create an instance from values explicit FilterNodeOptions(Expression filter_expression) : filter_expression(std::move(filter_expression)) {} /// \brief the expression to filter batches /// /// The return type of this expression must be boolean Expression filter_expression; }; /// \brief a node which selects a specified subset from the input class ARROW_ACERO_EXPORT FetchNodeOptions : public ExecNodeOptions { public: static constexpr std::string_view kName = "fetch"; /// \brief create an instance from values FetchNodeOptions(int64_t offset, int64_t count) : offset(offset), count(count) {} /// \brief the number of rows to skip int64_t offset; /// \brief the number of rows to keep (not counting skipped rows) int64_t count; }; /// \brief a node which executes expressions on input batches, producing batches /// of the same length with new columns. /// /// Each expression will be evaluated against each batch which is pushed to /// this node to produce a corresponding output column. /// /// If names are not provided, the string representations of exprs will be used. class ARROW_ACERO_EXPORT ProjectNodeOptions : public ExecNodeOptions { public: /// \brief create an instance from values explicit ProjectNodeOptions(std::vector expressions, std::vector names = {}) : expressions(std::move(expressions)), names(std::move(names)) {} /// \brief the expressions to run on the batches /// /// The output will have one column for each expression. If you wish to keep any of /// the columns from the input then you should create a simple field_ref expression /// for that column. std::vector expressions; /// \brief the names of the output columns /// /// If this is not specified then the result of calling ToString on the expression will /// be used instead /// /// This list should either be empty or have the same length as `expressions` std::vector names; }; /// \brief a node which aggregates input batches and calculates summary statistics /// /// The node can summarize the entire input or it can group the input with grouping keys /// and segment keys. /// /// By default, the aggregate node is a pipeline breaker. It must accumulate all input /// before any output is produced. Segment keys are a performance optimization. If /// you know your input is already partitioned by one or more columns then you can /// specify these as segment keys. At each change in the segment keys the node will /// emit values for all data seen so far. /// /// Segment keys are currently limited to single-threaded mode. /// /// Both keys and segment-keys determine the group. However segment-keys are also used /// for determining grouping segments, which should be large, and allow streaming a /// partial aggregation result after processing each segment. One common use-case for /// segment-keys is ordered aggregation, in which the segment-key attribute specifies a /// column with non-decreasing values or a lexicographically-ordered set of such columns. /// /// If the keys attribute is a non-empty vector, then each aggregate in `aggregates` is /// expected to be a HashAggregate function. If the keys attribute is an empty vector, /// then each aggregate is assumed to be a ScalarAggregate function. /// /// If the segment_keys attribute is a non-empty vector, then segmented aggregation, as /// described above, applies. /// /// The keys and segment_keys vectors must be disjoint. /// /// If no measures are provided then you will simply get the list of unique keys. /// /// This node outputs segment keys first, followed by regular keys, followed by one /// column for each aggregate. class ARROW_ACERO_EXPORT AggregateNodeOptions : public ExecNodeOptions { public: /// \brief create an instance from values explicit AggregateNodeOptions(std::vector aggregates, std::vector keys = {}, std::vector segment_keys = {}) : aggregates(std::move(aggregates)), keys(std::move(keys)), segment_keys(std::move(segment_keys)) {} // aggregations which will be applied to the targeted fields std::vector aggregates; // keys by which aggregations will be grouped (optional) std::vector keys; // keys by which aggregations will be segmented (optional) std::vector segment_keys; }; /// \brief a default value at which backpressure will be applied constexpr int32_t kDefaultBackpressureHighBytes = 1 << 30; // 1GiB /// \brief a default value at which backpressure will be removed constexpr int32_t kDefaultBackpressureLowBytes = 1 << 28; // 256MiB /// \brief an interface that can be queried for backpressure statistics class ARROW_ACERO_EXPORT BackpressureMonitor { public: virtual ~BackpressureMonitor() = default; /// \brief fetches the number of bytes currently queued up virtual uint64_t bytes_in_use() = 0; /// \brief checks to see if backpressure is currently applied virtual bool is_paused() = 0; }; /// \brief Options to control backpressure behavior struct ARROW_ACERO_EXPORT BackpressureOptions { /// \brief Create default options that perform no backpressure BackpressureOptions() : resume_if_below(0), pause_if_above(0) {} /// \brief Create options that will perform backpressure /// /// \param resume_if_below The producer should resume producing if the backpressure /// queue has fewer than resume_if_below items. /// \param pause_if_above The producer should pause producing if the backpressure /// queue has more than pause_if_above items BackpressureOptions(uint64_t resume_if_below, uint64_t pause_if_above) : resume_if_below(resume_if_below), pause_if_above(pause_if_above) {} /// \brief create an instance using default values for backpressure limits static BackpressureOptions DefaultBackpressure() { return BackpressureOptions(kDefaultBackpressureLowBytes, kDefaultBackpressureHighBytes); } /// \brief helper method to determine if backpressure is disabled /// \return true if pause_if_above is greater than zero, false otherwise bool should_apply_backpressure() const { return pause_if_above > 0; } /// \brief the number of bytes at which the producer should resume producing uint64_t resume_if_below; /// \brief the number of bytes at which the producer should pause producing /// /// If this is <= 0 then backpressure will be disabled uint64_t pause_if_above; }; /// \brief a sink node which collects results in a queue /// /// Emitted batches will only be ordered if there is a meaningful ordering /// and sequence_output is not set to false. class ARROW_ACERO_EXPORT SinkNodeOptions : public ExecNodeOptions { public: explicit SinkNodeOptions(std::function>()>* generator, std::shared_ptr* schema, BackpressureOptions backpressure = {}, BackpressureMonitor** backpressure_monitor = NULLPTR, std::optional sequence_output = std::nullopt) : generator(generator), schema(schema), backpressure(backpressure), backpressure_monitor(backpressure_monitor), sequence_output(sequence_output) {} explicit SinkNodeOptions(std::function>()>* generator, BackpressureOptions backpressure = {}, BackpressureMonitor** backpressure_monitor = NULLPTR, std::optional sequence_output = std::nullopt) : generator(generator), schema(NULLPTR), backpressure(std::move(backpressure)), backpressure_monitor(backpressure_monitor), sequence_output(sequence_output) {} /// \brief A pointer to a generator of batches. /// /// This will be set when the node is added to the plan and should be used to consume /// data from the plan. If this function is not called frequently enough then the sink /// node will start to accumulate data and may apply backpressure. std::function>()>* generator; /// \brief A pointer which will be set to the schema of the generated batches /// /// This is optional, if nullptr is passed in then it will be ignored. /// This will be set when the node is added to the plan, before StartProducing is called std::shared_ptr* schema; /// \brief Options to control when to apply backpressure /// /// This is optional, the default is to never apply backpressure. If the plan is not /// consumed quickly enough the system may eventually run out of memory. BackpressureOptions backpressure; /// \brief A pointer to a backpressure monitor /// /// This will be set when the node is added to the plan. This can be used to inspect /// the amount of data currently queued in the sink node. This is an optional utility /// and backpressure can be applied even if this is not used. BackpressureMonitor** backpressure_monitor; /// \brief Controls whether batches should be emitted immediately or sequenced in order /// /// \see QueryOptions for more details std::optional sequence_output; }; /// \brief Control used by a SinkNodeConsumer to pause & resume /// /// Callers should ensure that they do not call Pause and Resume simultaneously and they /// should sequence things so that a call to Pause() is always followed by an eventual /// call to Resume() class ARROW_ACERO_EXPORT BackpressureControl { public: virtual ~BackpressureControl() = default; /// \brief Ask the input to pause /// /// This is best effort, batches may continue to arrive /// Must eventually be followed by a call to Resume() or deadlock will occur virtual void Pause() = 0; /// \brief Ask the input to resume virtual void Resume() = 0; }; /// \brief a sink node that consumes the data as part of the plan using callbacks class ARROW_ACERO_EXPORT SinkNodeConsumer { public: virtual ~SinkNodeConsumer() = default; /// \brief Prepare any consumer state /// /// This will be run once the schema is finalized as the plan is starting and /// before any calls to Consume. A common use is to save off the schema so that /// batches can be interpreted. virtual Status Init(const std::shared_ptr& schema, BackpressureControl* backpressure_control, ExecPlan* plan) = 0; /// \brief Consume a batch of data virtual Status Consume(ExecBatch batch) = 0; /// \brief Signal to the consumer that the last batch has been delivered /// /// The returned future should only finish when all outstanding tasks have completed /// /// If the plan is ended early or aborts due to an error then this will not be /// called. virtual Future<> Finish() = 0; }; /// \brief Add a sink node which consumes data within the exec plan run class ARROW_ACERO_EXPORT ConsumingSinkNodeOptions : public ExecNodeOptions { public: explicit ConsumingSinkNodeOptions(std::shared_ptr consumer, std::vector names = {}, std::optional sequence_output = std::nullopt) : consumer(std::move(consumer)), names(std::move(names)), sequence_output(sequence_output) {} std::shared_ptr consumer; /// \brief Names to rename the sink's schema fields to /// /// If specified then names must be provided for all fields. Currently, only a flat /// schema is supported (see GH-31875). /// /// If not specified then names will be generated based on the source data. std::vector names; /// \brief Controls whether batches should be emitted immediately or sequenced in order /// /// \see QueryOptions for more details std::optional sequence_output; }; /// \brief Make a node which sorts rows passed through it /// /// All batches pushed to this node will be accumulated, then sorted, by the given /// fields. Then sorted batches will be forwarded to the generator in sorted order. class ARROW_ACERO_EXPORT OrderBySinkNodeOptions : public SinkNodeOptions { public: /// \brief create an instance from values explicit OrderBySinkNodeOptions( SortOptions sort_options, std::function>()>* generator) : SinkNodeOptions(generator), sort_options(std::move(sort_options)) {} /// \brief options describing which columns and direction to sort SortOptions sort_options; }; /// \brief Apply a new ordering to data /// /// Currently this node works by accumulating all data, sorting, and then emitting /// the new data with an updated batch index. /// /// Larger-than-memory sort is not currently supported. class ARROW_ACERO_EXPORT OrderByNodeOptions : public ExecNodeOptions { public: static constexpr std::string_view kName = "order_by"; explicit OrderByNodeOptions(Ordering ordering) : ordering(std::move(ordering)) {} /// \brief The new ordering to apply to outgoing data Ordering ordering; }; enum class JoinType { LEFT_SEMI, RIGHT_SEMI, LEFT_ANTI, RIGHT_ANTI, INNER, LEFT_OUTER, RIGHT_OUTER, FULL_OUTER }; std::string ToString(JoinType t); enum class JoinKeyCmp { EQ, IS }; /// \brief a node which implements a join operation using a hash table class ARROW_ACERO_EXPORT HashJoinNodeOptions : public ExecNodeOptions { public: static constexpr const char* default_output_suffix_for_left = ""; static constexpr const char* default_output_suffix_for_right = ""; /// \brief create an instance from values that outputs all columns HashJoinNodeOptions( JoinType in_join_type, std::vector in_left_keys, std::vector in_right_keys, Expression filter = literal(true), std::string output_suffix_for_left = default_output_suffix_for_left, std::string output_suffix_for_right = default_output_suffix_for_right, bool disable_bloom_filter = false) : join_type(in_join_type), left_keys(std::move(in_left_keys)), right_keys(std::move(in_right_keys)), output_all(true), output_suffix_for_left(std::move(output_suffix_for_left)), output_suffix_for_right(std::move(output_suffix_for_right)), filter(std::move(filter)), disable_bloom_filter(disable_bloom_filter) { this->key_cmp.resize(this->left_keys.size()); for (size_t i = 0; i < this->left_keys.size(); ++i) { this->key_cmp[i] = JoinKeyCmp::EQ; } } /// \brief create an instance from keys /// /// This will create an inner join that outputs all columns and has no post join filter /// /// `in_left_keys` should have the same length and types as `in_right_keys` /// @param in_left_keys the keys in the left input /// @param in_right_keys the keys in the right input HashJoinNodeOptions(std::vector in_left_keys, std::vector in_right_keys) : left_keys(std::move(in_left_keys)), right_keys(std::move(in_right_keys)) { this->join_type = JoinType::INNER; this->output_all = true; this->output_suffix_for_left = default_output_suffix_for_left; this->output_suffix_for_right = default_output_suffix_for_right; this->key_cmp.resize(this->left_keys.size()); for (size_t i = 0; i < this->left_keys.size(); ++i) { this->key_cmp[i] = JoinKeyCmp::EQ; } this->filter = literal(true); } /// \brief create an instance from values using JoinKeyCmp::EQ for all comparisons HashJoinNodeOptions( JoinType join_type, std::vector left_keys, std::vector right_keys, std::vector left_output, std::vector right_output, Expression filter = literal(true), std::string output_suffix_for_left = default_output_suffix_for_left, std::string output_suffix_for_right = default_output_suffix_for_right, bool disable_bloom_filter = false) : join_type(join_type), left_keys(std::move(left_keys)), right_keys(std::move(right_keys)), output_all(false), left_output(std::move(left_output)), right_output(std::move(right_output)), output_suffix_for_left(std::move(output_suffix_for_left)), output_suffix_for_right(std::move(output_suffix_for_right)), filter(std::move(filter)), disable_bloom_filter(disable_bloom_filter) { this->key_cmp.resize(this->left_keys.size()); for (size_t i = 0; i < this->left_keys.size(); ++i) { this->key_cmp[i] = JoinKeyCmp::EQ; } } /// \brief create an instance from values HashJoinNodeOptions( JoinType join_type, std::vector left_keys, std::vector right_keys, std::vector left_output, std::vector right_output, std::vector key_cmp, Expression filter = literal(true), std::string output_suffix_for_left = default_output_suffix_for_left, std::string output_suffix_for_right = default_output_suffix_for_right, bool disable_bloom_filter = false) : join_type(join_type), left_keys(std::move(left_keys)), right_keys(std::move(right_keys)), output_all(false), left_output(std::move(left_output)), right_output(std::move(right_output)), key_cmp(std::move(key_cmp)), output_suffix_for_left(std::move(output_suffix_for_left)), output_suffix_for_right(std::move(output_suffix_for_right)), filter(std::move(filter)), disable_bloom_filter(disable_bloom_filter) {} HashJoinNodeOptions() = default; // type of join (inner, left, semi...) JoinType join_type = JoinType::INNER; // key fields from left input std::vector left_keys; // key fields from right input std::vector right_keys; // if set all valid fields from both left and right input will be output // (and field ref vectors for output fields will be ignored) bool output_all = false; // output fields passed from left input std::vector left_output; // output fields passed from right input std::vector right_output; // key comparison function (determines whether a null key is equal another null // key or not) std::vector key_cmp; // suffix added to names of output fields coming from left input (used to distinguish, // if necessary, between fields of the same name in left and right input and can be left // empty if there are no name collisions) std::string output_suffix_for_left; // suffix added to names of output fields coming from right input std::string output_suffix_for_right; // residual filter which is applied to matching rows. Rows that do not match // the filter are not included. The filter is applied against the // concatenated input schema (left fields then right fields) and can reference // fields that are not included in the output. Expression filter = literal(true); // whether or not to disable Bloom filters in this join bool disable_bloom_filter = false; }; /// \brief a node which implements the asof join operation /// /// Note, this API is experimental and will change in the future /// /// This node takes one left table and any number of right tables, and asof joins them /// together. Batches produced by each input must be ordered by the "on" key. /// This node will output one row for each row in the left table. class ARROW_ACERO_EXPORT AsofJoinNodeOptions : public ExecNodeOptions { public: /// \brief Keys for one input table of the AsofJoin operation /// /// The keys must be consistent across the input tables: /// Each "on" key must refer to a field of the same type and units across the tables. /// Each "by" key must refer to a list of fields of the same types across the tables. struct Keys { /// \brief "on" key for the join. /// /// The input table must be sorted by the "on" key. Must be a single field of a common /// type. Inexact match is used on the "on" key. i.e., a row is considered a match iff /// left_on - tolerance <= right_on <= left_on. /// Currently, the "on" key must be of an integer, date, or timestamp type. FieldRef on_key; /// \brief "by" key for the join. /// /// Each input table must have each field of the "by" key. Exact equality is used for /// each field of the "by" key. /// Currently, each field of the "by" key must be of an integer, date, timestamp, or /// base-binary type. std::vector by_key; }; AsofJoinNodeOptions(std::vector input_keys, int64_t tolerance) : input_keys(std::move(input_keys)), tolerance(tolerance) {} /// \brief AsofJoin keys per input table. At least two keys must be given. The first key /// corresponds to a left table and all other keys correspond to right tables for the /// as-of-join. /// /// \see `Keys` for details. std::vector input_keys; /// \brief Tolerance for inexact "on" key matching. A right row is considered a match /// with the left row if `right.on - left.on <= tolerance`. The `tolerance` may be: /// - negative, in which case a past-as-of-join occurs; /// - or positive, in which case a future-as-of-join occurs; /// - or zero, in which case an exact-as-of-join occurs. /// /// The tolerance is interpreted in the same units as the "on" key. int64_t tolerance; }; /// \brief a node which select top_k/bottom_k rows passed through it /// /// All batches pushed to this node will be accumulated, then selected, by the given /// fields. Then sorted batches will be forwarded to the generator in sorted order. class ARROW_ACERO_EXPORT SelectKSinkNodeOptions : public SinkNodeOptions { public: explicit SelectKSinkNodeOptions( SelectKOptions select_k_options, std::function>()>* generator) : SinkNodeOptions(generator), select_k_options(std::move(select_k_options)) {} /// SelectK options SelectKOptions select_k_options; }; /// \brief a sink node which accumulates all output into a table class ARROW_ACERO_EXPORT TableSinkNodeOptions : public ExecNodeOptions { public: /// \brief create an instance from values explicit TableSinkNodeOptions(std::shared_ptr
* output_table, std::optional sequence_output = std::nullopt) : output_table(output_table), sequence_output(sequence_output) {} /// \brief an "out parameter" specifying the table that will be created /// /// Must not be null and remain valid for the entirety of the plan execution. After the /// plan has completed this will be set to point to the result table std::shared_ptr
* output_table; /// \brief Controls whether batches should be emitted immediately or sequenced in order /// /// \see QueryOptions for more details std::optional sequence_output; /// \brief Custom names to use for the columns. /// /// If specified then names must be provided for all fields. Currently, only a flat /// schema is supported (see GH-31875). /// /// If not specified then names will be generated based on the source data. std::vector names; }; /// \brief a row template that describes one row that will be generated for each input row struct ARROW_ACERO_EXPORT PivotLongerRowTemplate { PivotLongerRowTemplate(std::vector feature_values, std::vector> measurement_values) : feature_values(std::move(feature_values)), measurement_values(std::move(measurement_values)) {} /// A (typically unique) set of feature values for the template, usually derived from a /// column name /// /// These will be used to populate the feature columns std::vector feature_values; /// The fields containing the measurements to use for this row /// /// These will be used to populate the measurement columns. If nullopt then nulls /// will be inserted for the given value. std::vector> measurement_values; }; /// \brief Reshape a table by turning some columns into additional rows /// /// This operation is sometimes also referred to as UNPIVOT /// /// This is typically done when there are multiple observations in each row in order to /// transform to a table containing a single observation per row. /// /// For example: /// /// | time | left_temp | right_temp | /// | ---- | --------- | ---------- | /// | 1 | 10 | 20 | /// | 2 | 15 | 18 | /// /// The above table contains two observations per row. There is an implicit feature /// "location" (left vs right) and a measurement "temp". What we really want is: /// /// | time | location | temp | /// | --- | --- | --- | /// | 1 | left | 10 | /// | 1 | right | 20 | /// | 2 | left | 15 | /// | 2 | right | 18 | /// /// For a more complex example consider: /// /// | time | ax1 | ay1 | bx1 | ay2 | /// | ---- | --- | --- | --- | --- | /// | 0 | 1 | 2 | 3 | 4 | /// /// We can pretend a vs b and x vs y are features while 1 and 2 are two different /// kinds of measurements. We thus want to pivot to /// /// | time | a/b | x/y | f1 | f2 | /// | ---- | --- | --- | ---- | ---- | /// | 0 | a | x | 1 | null | /// | 0 | a | y | 2 | 4 | /// | 0 | b | x | 3 | null | /// /// To do this we create a row template for each combination of features. One should /// be able to do this purely by looking at the column names. For example, given the /// above columns "ax1", "ay1", "bx1", and "ay2" we know we have three feature /// combinations (a, x), (a, y), and (b, x). Similarly, we know we have two possible /// measurements, "1" and "2". /// /// For each combination of features we create a row template. In each row template we /// describe the combination and then list which columns to use for the measurements. /// If a measurement doesn't exist for a given combination then we use nullopt. /// /// So, for our above example, we have: /// /// (a, x): names={"a", "x"}, values={"ax1", nullopt} /// (a, y): names={"a", "y"}, values={"ay1", "ay2"} /// (b, x): names={"b", "x"}, values={"bx1", nullopt} /// /// Finishing it off we name our new columns: /// feature_field_names={"a/b","x/y"} /// measurement_field_names={"f1", "f2"} class ARROW_ACERO_EXPORT PivotLongerNodeOptions : public ExecNodeOptions { public: static constexpr std::string_view kName = "pivot_longer"; /// One or more row templates to create new output rows /// /// Normally there are at least two row templates. The output # of rows /// will be the input # of rows * the number of row templates std::vector row_templates; /// The names of the columns which describe the new features std::vector feature_field_names; /// The names of the columns which represent the measurements std::vector measurement_field_names; }; /// @} } // namespace acero } // namespace arrow