// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #pragma once #include #include #include #include #include #include "arrow/type.h" // for DataType, FieldRef, Field and Schema namespace arrow { using internal::checked_cast; namespace acero { // Identifiers for all different row schemas that are used in a join // enum class HashJoinProjection : int { INPUT = 0, KEY = 1, PAYLOAD = 2, FILTER = 3, OUTPUT = 4 }; struct SchemaProjectionMap { static constexpr int kMissingField = -1; int num_cols; const int* source_to_base; const int* base_to_target; inline int get(int i) const { assert(i >= 0 && i < num_cols); assert(source_to_base[i] != kMissingField); return base_to_target[source_to_base[i]]; } }; /// Helper class for managing different projections of the same row schema. /// Used to efficiently map any field in one projection to a corresponding field in /// another projection. /// Materialized mappings are generated lazily at the time of the first access. /// Thread-safe apart from initialization. template class SchemaProjectionMaps { public: static constexpr int kMissingField = -1; Status Init(ProjectionIdEnum full_schema_handle, const Schema& schema, const std::vector& projection_handles, const std::vector*>& projections) { assert(projection_handles.size() == projections.size()); ARROW_RETURN_NOT_OK(RegisterSchema(full_schema_handle, schema)); for (size_t i = 0; i < projections.size(); ++i) { ARROW_RETURN_NOT_OK( RegisterProjectedSchema(projection_handles[i], *(projections[i]), schema)); } RegisterEnd(); return Status::OK(); } int num_cols(ProjectionIdEnum schema_handle) const { int id = schema_id(schema_handle); return static_cast(schemas_[id].second.data_types.size()); } bool is_empty(ProjectionIdEnum schema_handle) const { return num_cols(schema_handle) == 0; } const std::string& field_name(ProjectionIdEnum schema_handle, int field_id) const { int id = schema_id(schema_handle); return schemas_[id].second.field_names[field_id]; } const std::shared_ptr& data_type(ProjectionIdEnum schema_handle, int field_id) const { int id = schema_id(schema_handle); return schemas_[id].second.data_types[field_id]; } const std::vector>& data_types( ProjectionIdEnum schema_handle) const { int id = schema_id(schema_handle); return schemas_[id].second.data_types; } SchemaProjectionMap map(ProjectionIdEnum from, ProjectionIdEnum to) const { int id_from = schema_id(from); int id_to = schema_id(to); SchemaProjectionMap result; result.num_cols = num_cols(from); result.source_to_base = mappings_[id_from].data(); result.base_to_target = inverse_mappings_[id_to].data(); return result; } protected: struct FieldInfos { std::vector field_paths; std::vector field_names; std::vector> data_types; }; Status RegisterSchema(ProjectionIdEnum handle, const Schema& schema) { FieldInfos out_fields; const FieldVector& in_fields = schema.fields(); out_fields.field_paths.resize(in_fields.size()); out_fields.field_names.resize(in_fields.size()); out_fields.data_types.resize(in_fields.size()); for (size_t i = 0; i < in_fields.size(); ++i) { const std::string& name = in_fields[i]->name(); const std::shared_ptr& type = in_fields[i]->type(); out_fields.field_paths[i] = static_cast(i); out_fields.field_names[i] = name; out_fields.data_types[i] = type; } schemas_.push_back(std::make_pair(handle, out_fields)); return Status::OK(); } Status RegisterProjectedSchema(ProjectionIdEnum handle, const std::vector& selected_fields, const Schema& full_schema) { FieldInfos out_fields; const FieldVector& in_fields = full_schema.fields(); out_fields.field_paths.resize(selected_fields.size()); out_fields.field_names.resize(selected_fields.size()); out_fields.data_types.resize(selected_fields.size()); for (size_t i = 0; i < selected_fields.size(); ++i) { // All fields must be found in schema without ambiguity ARROW_ASSIGN_OR_RAISE(auto match, selected_fields[i].FindOne(full_schema)); const std::string& name = in_fields[match[0]]->name(); const std::shared_ptr& type = in_fields[match[0]]->type(); out_fields.field_paths[i] = match[0]; out_fields.field_names[i] = name; out_fields.data_types[i] = type; } schemas_.push_back(std::make_pair(handle, out_fields)); return Status::OK(); } void RegisterEnd() { size_t size = schemas_.size(); mappings_.resize(size); inverse_mappings_.resize(size); int id_base = 0; for (size_t i = 0; i < size; ++i) { GenerateMapForProjection(static_cast(i), id_base); } } int schema_id(ProjectionIdEnum schema_handle) const { for (size_t i = 0; i < schemas_.size(); ++i) { if (schemas_[i].first == schema_handle) { return static_cast(i); } } // We should never get here assert(false); return -1; } void GenerateMapForProjection(int id_proj, int id_base) { int num_cols_proj = static_cast(schemas_[id_proj].second.data_types.size()); int num_cols_base = static_cast(schemas_[id_base].second.data_types.size()); std::vector& mapping = mappings_[id_proj]; std::vector& inverse_mapping = inverse_mappings_[id_proj]; mapping.resize(num_cols_proj); inverse_mapping.resize(num_cols_base); if (id_proj == id_base) { for (int i = 0; i < num_cols_base; ++i) { mapping[i] = inverse_mapping[i] = i; } } else { const FieldInfos& fields_proj = schemas_[id_proj].second; const FieldInfos& fields_base = schemas_[id_base].second; for (int i = 0; i < num_cols_base; ++i) { inverse_mapping[i] = SchemaProjectionMap::kMissingField; } for (int i = 0; i < num_cols_proj; ++i) { int field_id = SchemaProjectionMap::kMissingField; for (int j = 0; j < num_cols_base; ++j) { if (fields_proj.field_paths[i] == fields_base.field_paths[j]) { field_id = j; // If there are multiple matches for the same input field, // it will be mapped to the first match. break; } } assert(field_id != SchemaProjectionMap::kMissingField); mapping[i] = field_id; inverse_mapping[field_id] = i; } } } // vector used as a mapping from ProjectionIdEnum to fields std::vector> schemas_; std::vector> mappings_; std::vector> inverse_mappings_; }; using HashJoinProjectionMaps = SchemaProjectionMaps; } // namespace acero } // namespace arrow