// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // This API is EXPERIMENTAL. #pragma once #include #include #include #include #include #include #include "arrow/compute/expression.h" #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" #include "arrow/util/async_generator_fwd.h" #include "arrow/util/future.h" #include "arrow/util/macros.h" #include "arrow/util/mutex.h" namespace arrow { namespace internal { class Executor; } // namespace internal namespace dataset { using RecordBatchGenerator = std::function>()>; /// \brief Description of a column to scan struct ARROW_DS_EXPORT FragmentSelectionColumn { /// \brief The path to the column to load FieldPath path; /// \brief The type of the column in the dataset schema /// /// A format may choose to ignore this field completely. For example, when /// reading from IPC the reader can just return the column in the data type /// that is stored on disk. There is no point in doing anything special. /// /// However, some formats may be capable of casting on the fly. For example, /// when reading from CSV, if we know the target type of the column, we can /// convert from string to the target type as we read. DataType* requested_type; }; /// \brief A list of columns that should be loaded from a fragment /// /// The paths in this selection should be referring to the fragment schema. This class /// contains a virtual destructor as it is expected evolution strategies will need to /// extend this to add any information needed to later evolve the batches. /// /// For example, in the basic evolution strategy, we keep track of which columns /// were missing from the file so that we can fill those in with null when evolving. class ARROW_DS_EXPORT FragmentSelection { public: explicit FragmentSelection(std::vector columns) : columns_(std::move(columns)) {} virtual ~FragmentSelection() = default; /// The columns that should be loaded from the fragment const std::vector& columns() const { return columns_; } private: std::vector columns_; }; /// \brief Instructions for scanning a particular fragment /// /// The fragment scan request is derived from ScanV2Options. The main /// difference is that the scan options are based on the dataset schema /// while the fragment request is based on the fragment schema. struct ARROW_DS_EXPORT FragmentScanRequest { /// \brief A row filter /// /// The filter expression should be written against the fragment schema. /// /// \see ScanV2Options for details on how this filter should be applied compute::Expression filter = compute::literal(true); /// \brief The columns to scan /// /// These indices refer to the fragment schema /// /// Note: This is NOT a simple list of top-level column indices. /// For more details \see ScanV2Options /// /// If possible a fragment should only read from disk the data needed /// to satisfy these columns. If a format cannot partially read a nested /// column (e.g. JSON) then it must apply the column selection (in memory) /// before returning the scanned batch. std::shared_ptr fragment_selection; /// \brief Options specific to the format being scanned const FragmentScanOptions* format_scan_options; }; /// \brief An iterator-like object that can yield batches created from a fragment class ARROW_DS_EXPORT FragmentScanner { public: /// This instance will only be destroyed after all ongoing scan futures /// have been completed. /// /// This means any callbacks created as part of the scan can safely /// capture `this` virtual ~FragmentScanner() = default; /// \brief Scan a batch of data from the file /// \param batch_number The index of the batch to read virtual Future> ScanBatch(int batch_number) = 0; /// \brief Calculate an estimate of how many data bytes the given batch will represent /// /// "Data bytes" should be the total size of all the buffers once the data has been /// decoded into the Arrow format. virtual int64_t EstimatedDataBytes(int batch_number) = 0; /// \brief The number of batches in the fragment to scan virtual int NumBatches() = 0; }; /// \brief Information learned about a fragment through inspection /// /// This information can be used to figure out which fields need /// to be read from a file and how the data read in should be evolved /// to match the dataset schema. /// /// For example, from a CSV file we can inspect and learn the column /// names and use those column names to determine which columns to load /// from the CSV file. struct ARROW_DS_EXPORT InspectedFragment { explicit InspectedFragment(std::vector column_names) : column_names(std::move(column_names)) {} std::vector column_names; }; /// \brief A granular piece of a Dataset, such as an individual file. /// /// A Fragment can be read/scanned separately from other fragments. It yields a /// collection of RecordBatches when scanned /// /// Note that Fragments have well defined physical schemas which are reconciled by /// the Datasets which contain them; these physical schemas may differ from a parent /// Dataset's schema and the physical schemas of sibling Fragments. class ARROW_DS_EXPORT Fragment : public std::enable_shared_from_this { public: /// \brief An expression that represents no known partition information static const compute::Expression kNoPartitionInformation; /// \brief Return the physical schema of the Fragment. /// /// The physical schema is also called the writer schema. /// This method is blocking and may suffer from high latency filesystem. /// The schema is cached after being read once, or may be specified at construction. Result> ReadPhysicalSchema(); /// An asynchronous version of Scan virtual Result ScanBatchesAsync( const std::shared_ptr& options) = 0; /// \brief Inspect a fragment to learn basic information /// /// This will be called before a scan and a fragment should attach whatever /// information will be needed to figure out an evolution strategy. This information /// will then be passed to the call to BeginScan virtual Future> InspectFragment( const FragmentScanOptions* format_options, compute::ExecContext* exec_context); /// \brief Start a scan operation virtual Future> BeginScan( const FragmentScanRequest& request, const InspectedFragment& inspected_fragment, const FragmentScanOptions* format_options, compute::ExecContext* exec_context); /// \brief Count the number of rows in this fragment matching the filter using metadata /// only. That is, this method may perform I/O, but will not load data. /// /// If this is not possible, resolve with an empty optional. The fragment can perform /// I/O (e.g. to read metadata) before it deciding whether it can satisfy the request. virtual Future> CountRows( compute::Expression predicate, const std::shared_ptr& options); virtual std::string type_name() const = 0; virtual std::string ToString() const { return type_name(); } /// \brief An expression which evaluates to true for all data viewed by this /// Fragment. const compute::Expression& partition_expression() const { return partition_expression_; } virtual ~Fragment() = default; protected: Fragment() = default; explicit Fragment(compute::Expression partition_expression, std::shared_ptr physical_schema); virtual Result> ReadPhysicalSchemaImpl() = 0; util::Mutex physical_schema_mutex_; compute::Expression partition_expression_ = compute::literal(true); std::shared_ptr physical_schema_; }; /// \brief Per-scan options for fragment(s) in a dataset. /// /// These options are not intrinsic to the format or fragment itself, but do affect /// the results of a scan. These are options which make sense to change between /// repeated reads of the same dataset, such as format-specific conversion options /// (that do not affect the schema). /// /// \ingroup dataset-scanning class ARROW_DS_EXPORT FragmentScanOptions { public: virtual std::string type_name() const = 0; virtual std::string ToString() const { return type_name(); } virtual ~FragmentScanOptions() = default; }; /// \defgroup dataset-implementations Concrete implementations /// /// @{ /// \brief A trivial Fragment that yields ScanTask out of a fixed set of /// RecordBatch. class ARROW_DS_EXPORT InMemoryFragment : public Fragment { public: class Scanner; InMemoryFragment(std::shared_ptr schema, RecordBatchVector record_batches, compute::Expression = compute::literal(true)); explicit InMemoryFragment(RecordBatchVector record_batches, compute::Expression = compute::literal(true)); Result ScanBatchesAsync( const std::shared_ptr& options) override; Future> CountRows( compute::Expression predicate, const std::shared_ptr& options) override; Future> InspectFragment( const FragmentScanOptions* format_options, compute::ExecContext* exec_context) override; Future> BeginScan( const FragmentScanRequest& request, const InspectedFragment& inspected_fragment, const FragmentScanOptions* format_options, compute::ExecContext* exec_context) override; std::string type_name() const override { return "in-memory"; } protected: Result> ReadPhysicalSchemaImpl() override; RecordBatchVector record_batches_; }; /// @} using FragmentGenerator = AsyncGenerator>; /// \brief Rules for converting the dataset schema to and from fragment schemas class ARROW_DS_EXPORT FragmentEvolutionStrategy { public: /// This instance will only be destroyed when all scan operations for the /// fragment have completed. virtual ~FragmentEvolutionStrategy() = default; /// \brief A guarantee that applies to all batches of this fragment /// /// For example, if a fragment is missing one of the fields in the dataset /// schema then a typical evolution strategy is to set that field to null. /// /// So if the column at index 3 is missing then the guarantee is /// FieldRef(3) == null /// /// Individual field guarantees should be AND'd together and returned /// as a single expression. virtual Result GetGuarantee( const std::vector& dataset_schema_selection) const = 0; /// \brief Return a fragment schema selection given a dataset schema selection /// /// For example, if the user wants fields 2 & 4 of the dataset schema and /// in this fragment the field 2 is missing and the field 4 is at index 1 then /// this should return {1} virtual Result> DevolveSelection( const std::vector& dataset_schema_selection) const = 0; /// \brief Return a filter expression bound to the fragment schema given /// a filter expression bound to the dataset schema /// /// The dataset scan filter will first be simplified by the guarantee returned /// by GetGuarantee. This means an evolution that only handles dropping or casting /// fields doesn't need to do anything here except return the given filter. /// /// On the other hand, an evolution that is doing some kind of aliasing will likely /// need to convert field references in the filter to the aliased field references /// where appropriate. virtual Result DevolveFilter( const compute::Expression& filter) const = 0; /// \brief Convert a batch from the fragment schema to the dataset schema /// /// Typically this involves casting columns from the data type stored on disk /// to the data type of the dataset schema. For example, this fragment might /// have columns stored as int32 and the dataset schema might have int64 for /// the column. In this case we should cast the column from int32 to int64. /// /// Note: A fragment may perform this cast as the data is read from disk. In /// that case a cast might not be needed. virtual Result EvolveBatch( const std::shared_ptr& batch, const std::vector& dataset_selection, const FragmentSelection& selection) const = 0; /// \brief Return a string description of this strategy virtual std::string ToString() const = 0; }; /// \brief Lookup to create a FragmentEvolutionStrategy for a given fragment class ARROW_DS_EXPORT DatasetEvolutionStrategy { public: virtual ~DatasetEvolutionStrategy() = default; /// \brief Create a strategy for evolving from the given fragment /// to the schema of the given dataset virtual std::unique_ptr GetStrategy( const Dataset& dataset, const Fragment& fragment, const InspectedFragment& inspected_fragment) = 0; /// \brief Return a string description of this strategy virtual std::string ToString() const = 0; }; ARROW_DS_EXPORT std::unique_ptr MakeBasicDatasetEvolutionStrategy(); /// \brief A container of zero or more Fragments. /// /// A Dataset acts as a union of Fragments, e.g. files deeply nested in a /// directory. A Dataset has a schema to which Fragments must align during a /// scan operation. This is analogous to Avro's reader and writer schema. class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this { public: /// \brief Begin to build a new Scan operation against this Dataset Result> NewScan(); /// \brief GetFragments returns an iterator of Fragments given a predicate. Result GetFragments(compute::Expression predicate); Result GetFragments(); /// \brief Async versions of `GetFragments`. Result GetFragmentsAsync(compute::Expression predicate); Result GetFragmentsAsync(); const std::shared_ptr& schema() const { return schema_; } /// \brief An expression which evaluates to true for all data viewed by this Dataset. /// May be null, which indicates no information is available. const compute::Expression& partition_expression() const { return partition_expression_; } /// \brief The name identifying the kind of Dataset virtual std::string type_name() const = 0; /// \brief Return a copy of this Dataset with a different schema. /// /// The copy will view the same Fragments. If the new schema is not compatible with the /// original dataset's schema then an error will be raised. virtual Result> ReplaceSchema( std::shared_ptr schema) const = 0; /// \brief Rules used by this dataset to handle schema evolution DatasetEvolutionStrategy* evolution_strategy() { return evolution_strategy_.get(); } virtual ~Dataset() = default; protected: explicit Dataset(std::shared_ptr schema) : schema_(std::move(schema)) {} Dataset(std::shared_ptr schema, compute::Expression partition_expression); virtual Result GetFragmentsImpl(compute::Expression predicate) = 0; /// \brief Default non-virtual implementation method for the base /// `GetFragmentsAsyncImpl` method, which creates a fragment generator for /// the dataset, possibly filtering results with a predicate (forwarding to /// the synchronous `GetFragmentsImpl` method and moving the computations /// to the background, using the IO thread pool). /// /// Currently, `executor` is always the same as `internal::GetCPUThreadPool()`, /// which means the results from the underlying fragment generator will be /// transferred to the default CPU thread pool. The generator itself is /// offloaded to run on the default IO thread pool. virtual Result GetFragmentsAsyncImpl( compute::Expression predicate, arrow::internal::Executor* executor); std::shared_ptr schema_; compute::Expression partition_expression_ = compute::literal(true); std::unique_ptr evolution_strategy_ = MakeBasicDatasetEvolutionStrategy(); }; /// \addtogroup dataset-implementations /// /// @{ /// \brief A Source which yields fragments wrapping a stream of record batches. /// /// The record batches must match the schema provided to the source at construction. class ARROW_DS_EXPORT InMemoryDataset : public Dataset { public: class RecordBatchGenerator { public: virtual ~RecordBatchGenerator() = default; virtual RecordBatchIterator Get() const = 0; }; /// Construct a dataset from a schema and a factory of record batch iterators. InMemoryDataset(std::shared_ptr schema, std::shared_ptr get_batches) : Dataset(std::move(schema)), get_batches_(std::move(get_batches)) {} /// Convenience constructor taking a fixed list of batches InMemoryDataset(std::shared_ptr schema, RecordBatchVector batches); /// Convenience constructor taking a Table explicit InMemoryDataset(std::shared_ptr table); std::string type_name() const override { return "in-memory"; } Result> ReplaceSchema( std::shared_ptr schema) const override; protected: Result GetFragmentsImpl(compute::Expression predicate) override; std::shared_ptr get_batches_; }; /// \brief A Dataset wrapping child Datasets. class ARROW_DS_EXPORT UnionDataset : public Dataset { public: /// \brief Construct a UnionDataset wrapping child Datasets. /// /// \param[in] schema the schema of the resulting dataset. /// \param[in] children one or more child Datasets. Their schemas must be identical to /// schema. static Result> Make(std::shared_ptr schema, DatasetVector children); const DatasetVector& children() const { return children_; } std::string type_name() const override { return "union"; } Result> ReplaceSchema( std::shared_ptr schema) const override; protected: Result GetFragmentsImpl(compute::Expression predicate) override; explicit UnionDataset(std::shared_ptr schema, DatasetVector children) : Dataset(std::move(schema)), children_(std::move(children)) {} DatasetVector children_; friend class UnionDatasetFactory; }; /// @} } // namespace dataset } // namespace arrow