# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # cython: profile=False # distutils: language = c++ # cython: language_level = 3 from cython.operator cimport dereference as deref from collections import namedtuple from collections.abc import Mapping from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport * from pyarrow.includes.libarrow_python cimport * from pyarrow.lib cimport (check_status, Field, MemoryPool, Schema, RecordBatchReader, ensure_type, maybe_unbox_memory_pool, get_input_stream, get_writer, native_transcoding_input_stream, pyarrow_unwrap_batch, pyarrow_unwrap_schema, pyarrow_unwrap_table, pyarrow_wrap_schema, pyarrow_wrap_table, pyarrow_wrap_data_type, pyarrow_unwrap_data_type, Table, RecordBatch, StopToken, _CRecordBatchWriter) from pyarrow.lib import frombytes, tobytes, SignalStopHandler cdef unsigned char _single_char(s) except 0: val = ord(s) if val == 0 or val > 127: raise ValueError("Expecting an ASCII character") return val _InvalidRow = namedtuple( "_InvalidRow", ("expected_columns", "actual_columns", "number", "text"), module=__name__) class InvalidRow(_InvalidRow): """ Description of an invalid row in a CSV file. Parameters ---------- expected_columns : int The expected number of columns in the row. actual_columns : int The actual number of columns in the row. number : int or None The physical row number if known, otherwise None. text : str The contents of the row. """ __slots__ = () cdef CInvalidRowResult _handle_invalid_row( handler, const CCSVInvalidRow& c_row) except CInvalidRowResult_Error: # A negative row number means undetermined (because of parallel reading) row_number = c_row.number if c_row.number >= 0 else None row = InvalidRow(c_row.expected_columns, c_row.actual_columns, row_number, frombytes( c_row.text)) result = handler(row) if result == 'error': return CInvalidRowResult_Error elif result == 'skip': return CInvalidRowResult_Skip else: raise ValueError("Invalid return value for invalid row handler: " f"expected 'error' or 'skip', got {result!r}") cdef class ReadOptions(_Weakrefable): """ Options for reading CSV files. Parameters ---------- use_threads : bool, optional (default True) Whether to use multiple threads to accelerate reading block_size : int, optional How much bytes to process at a time from the input stream. This will determine multi-threading granularity as well as the size of individual record batches or table chunks. Minimum valid value for block size is 1 skip_rows : int, optional (default 0) The number of rows to skip before the column names (if any) and the CSV data. skip_rows_after_names : int, optional (default 0) The number of rows to skip after the column names. This number can be larger than the number of rows in one block, and empty rows are counted. The order of application is as follows: - `skip_rows` is applied (if non-zero); - column names are read (unless `column_names` is set); - `skip_rows_after_names` is applied (if non-zero). column_names : list, optional The column names of the target table. If empty, fall back on `autogenerate_column_names`. autogenerate_column_names : bool, optional (default False) Whether to autogenerate column names if `column_names` is empty. If true, column names will be of the form "f0", "f1"... If false, column names will be read from the first CSV row after `skip_rows`. encoding : str, optional (default 'utf8') The character encoding of the CSV data. Columns that cannot decode using this encoding can still be read as Binary. Examples -------- Defining an example data: >>> import io >>> s = "1,2,3\\nFlamingo,2,2022-03-01\\nHorse,4,2022-03-02\\nBrittle stars,5,2022-03-03\\nCentipede,100,2022-03-04" >>> print(s) 1,2,3 Flamingo,2,2022-03-01 Horse,4,2022-03-02 Brittle stars,5,2022-03-03 Centipede,100,2022-03-04 Ignore the first numbered row and substitute it with defined or autogenerated column names: >>> from pyarrow import csv >>> read_options = csv.ReadOptions( ... column_names=["animals", "n_legs", "entry"], ... skip_rows=1) >>> csv.read_csv(io.BytesIO(s.encode()), read_options=read_options) pyarrow.Table animals: string n_legs: int64 entry: date32[day] ---- animals: [["Flamingo","Horse","Brittle stars","Centipede"]] n_legs: [[2,4,5,100]] entry: [[2022-03-01,2022-03-02,2022-03-03,2022-03-04]] >>> read_options = csv.ReadOptions(autogenerate_column_names=True, ... skip_rows=1) >>> csv.read_csv(io.BytesIO(s.encode()), read_options=read_options) pyarrow.Table f0: string f1: int64 f2: date32[day] ---- f0: [["Flamingo","Horse","Brittle stars","Centipede"]] f1: [[2,4,5,100]] f2: [[2022-03-01,2022-03-02,2022-03-03,2022-03-04]] Remove the first 2 rows of the data: >>> read_options = csv.ReadOptions(skip_rows_after_names=2) >>> csv.read_csv(io.BytesIO(s.encode()), read_options=read_options) pyarrow.Table 1: string 2: int64 3: date32[day] ---- 1: [["Brittle stars","Centipede"]] 2: [[5,100]] 3: [[2022-03-03,2022-03-04]] """ # Avoid mistakingly creating attributes __slots__ = () # __init__() is not called when unpickling, initialize storage here def __cinit__(self, *argw, **kwargs): self.options.reset(new CCSVReadOptions(CCSVReadOptions.Defaults())) def __init__(self, *, use_threads=None, block_size=None, skip_rows=None, skip_rows_after_names=None, column_names=None, autogenerate_column_names=None, encoding='utf8'): if use_threads is not None: self.use_threads = use_threads if block_size is not None: self.block_size = block_size if skip_rows is not None: self.skip_rows = skip_rows if skip_rows_after_names is not None: self.skip_rows_after_names = skip_rows_after_names if column_names is not None: self.column_names = column_names if autogenerate_column_names is not None: self.autogenerate_column_names= autogenerate_column_names # Python-specific option self.encoding = encoding @property def use_threads(self): """ Whether to use multiple threads to accelerate reading. """ return deref(self.options).use_threads @use_threads.setter def use_threads(self, value): deref(self.options).use_threads = value @property def block_size(self): """ How much bytes to process at a time from the input stream. This will determine multi-threading granularity as well as the size of individual record batches or table chunks. """ return deref(self.options).block_size @block_size.setter def block_size(self, value): deref(self.options).block_size = value @property def skip_rows(self): """ The number of rows to skip before the column names (if any) and the CSV data. See `skip_rows_after_names` for interaction description """ return deref(self.options).skip_rows @skip_rows.setter def skip_rows(self, value): deref(self.options).skip_rows = value @property def skip_rows_after_names(self): """ The number of rows to skip after the column names. This number can be larger than the number of rows in one block, and empty rows are counted. The order of application is as follows: - `skip_rows` is applied (if non-zero); - column names are read (unless `column_names` is set); - `skip_rows_after_names` is applied (if non-zero). """ return deref(self.options).skip_rows_after_names @skip_rows_after_names.setter def skip_rows_after_names(self, value): deref(self.options).skip_rows_after_names = value @property def column_names(self): """ The column names of the target table. If empty, fall back on `autogenerate_column_names`. """ return [frombytes(s) for s in deref(self.options).column_names] @column_names.setter def column_names(self, value): deref(self.options).column_names.clear() for item in value: deref(self.options).column_names.push_back(tobytes(item)) @property def autogenerate_column_names(self): """ Whether to autogenerate column names if `column_names` is empty. If true, column names will be of the form "f0", "f1"... If false, column names will be read from the first CSV row after `skip_rows`. """ return deref(self.options).autogenerate_column_names @autogenerate_column_names.setter def autogenerate_column_names(self, value): deref(self.options).autogenerate_column_names = value def validate(self): check_status(deref(self.options).Validate()) def equals(self, ReadOptions other): """ Parameters ---------- other : pyarrow.csv.ReadOptions Returns ------- bool """ return ( self.use_threads == other.use_threads and self.block_size == other.block_size and self.skip_rows == other.skip_rows and self.skip_rows_after_names == other.skip_rows_after_names and self.column_names == other.column_names and self.autogenerate_column_names == other.autogenerate_column_names and self.encoding == other.encoding ) @staticmethod cdef ReadOptions wrap(CCSVReadOptions options): out = ReadOptions() out.options.reset(new CCSVReadOptions(move(options))) out.encoding = 'utf8' # No way to know this return out def __getstate__(self): return (self.use_threads, self.block_size, self.skip_rows, self.column_names, self.autogenerate_column_names, self.encoding, self.skip_rows_after_names) def __setstate__(self, state): (self.use_threads, self.block_size, self.skip_rows, self.column_names, self.autogenerate_column_names, self.encoding, self.skip_rows_after_names) = state def __eq__(self, other): try: return self.equals(other) except TypeError: return False cdef class ParseOptions(_Weakrefable): """ Options for parsing CSV files. Parameters ---------- delimiter : 1-character string, optional (default ',') The character delimiting individual cells in the CSV data. quote_char : 1-character string or False, optional (default '"') The character used optionally for quoting CSV values (False if quoting is not allowed). double_quote : bool, optional (default True) Whether two quotes in a quoted CSV value denote a single quote in the data. escape_char : 1-character string or False, optional (default False) The character used optionally for escaping special characters (False if escaping is not allowed). newlines_in_values : bool, optional (default False) Whether newline characters are allowed in CSV values. Setting this to True reduces the performance of multi-threaded CSV reading. ignore_empty_lines : bool, optional (default True) Whether empty lines are ignored in CSV input. If False, an empty line is interpreted as containing a single empty value (assuming a one-column CSV file). invalid_row_handler : callable, optional (default None) If not None, this object is called for each CSV row that fails parsing (because of a mismatching number of columns). It should accept a single InvalidRow argument and return either "skip" or "error" depending on the desired outcome. Examples -------- Defining an example file from bytes object: >>> import io >>> s = ( ... "animals;n_legs;entry\\n" ... "Flamingo;2;2022-03-01\\n" ... "# Comment here:\\n" ... "Horse;4;2022-03-02\\n" ... "Brittle stars;5;2022-03-03\\n" ... "Centipede;100;2022-03-04" ... ) >>> print(s) animals;n_legs;entry Flamingo;2;2022-03-01 # Comment here: Horse;4;2022-03-02 Brittle stars;5;2022-03-03 Centipede;100;2022-03-04 >>> source = io.BytesIO(s.encode()) Read the data from a file skipping rows with comments and defining the delimiter: >>> from pyarrow import csv >>> def skip_comment(row): ... if row.text.startswith("# "): ... return 'skip' ... else: ... return 'error' ... >>> parse_options = csv.ParseOptions(delimiter=";", invalid_row_handler=skip_comment) >>> csv.read_csv(source, parse_options=parse_options) pyarrow.Table animals: string n_legs: int64 entry: date32[day] ---- animals: [["Flamingo","Horse","Brittle stars","Centipede"]] n_legs: [[2,4,5,100]] entry: [[2022-03-01,2022-03-02,2022-03-03,2022-03-04]] """ __slots__ = () def __cinit__(self, *argw, **kwargs): self._invalid_row_handler = None self.options.reset(new CCSVParseOptions(CCSVParseOptions.Defaults())) def __init__(self, *, delimiter=None, quote_char=None, double_quote=None, escape_char=None, newlines_in_values=None, ignore_empty_lines=None, invalid_row_handler=None): if delimiter is not None: self.delimiter = delimiter if quote_char is not None: self.quote_char = quote_char if double_quote is not None: self.double_quote = double_quote if escape_char is not None: self.escape_char = escape_char if newlines_in_values is not None: self.newlines_in_values = newlines_in_values if ignore_empty_lines is not None: self.ignore_empty_lines = ignore_empty_lines if invalid_row_handler is not None: self.invalid_row_handler = invalid_row_handler @property def delimiter(self): """ The character delimiting individual cells in the CSV data. """ return chr(deref(self.options).delimiter) @delimiter.setter def delimiter(self, value): deref(self.options).delimiter = _single_char(value) @property def quote_char(self): """ The character used optionally for quoting CSV values (False if quoting is not allowed). """ if deref(self.options).quoting: return chr(deref(self.options).quote_char) else: return False @quote_char.setter def quote_char(self, value): if value is False: deref(self.options).quoting = False else: deref(self.options).quote_char = _single_char(value) deref(self.options).quoting = True @property def double_quote(self): """ Whether two quotes in a quoted CSV value denote a single quote in the data. """ return deref(self.options).double_quote @double_quote.setter def double_quote(self, value): deref(self.options).double_quote = value @property def escape_char(self): """ The character used optionally for escaping special characters (False if escaping is not allowed). """ if deref(self.options).escaping: return chr(deref(self.options).escape_char) else: return False @escape_char.setter def escape_char(self, value): if value is False: deref(self.options).escaping = False else: deref(self.options).escape_char = _single_char(value) deref(self.options).escaping = True @property def newlines_in_values(self): """ Whether newline characters are allowed in CSV values. Setting this to True reduces the performance of multi-threaded CSV reading. """ return deref(self.options).newlines_in_values @newlines_in_values.setter def newlines_in_values(self, value): deref(self.options).newlines_in_values = value @property def ignore_empty_lines(self): """ Whether empty lines are ignored in CSV input. If False, an empty line is interpreted as containing a single empty value (assuming a one-column CSV file). """ return deref(self.options).ignore_empty_lines @property def invalid_row_handler(self): """ Optional handler for invalid rows. If not None, this object is called for each CSV row that fails parsing (because of a mismatching number of columns). It should accept a single InvalidRow argument and return either "skip" or "error" depending on the desired outcome. """ return self._invalid_row_handler @invalid_row_handler.setter def invalid_row_handler(self, value): if value is not None and not callable(value): raise TypeError("Expected callable or None, " f"got instance of {type(value)!r}") self._invalid_row_handler = value deref(self.options).invalid_row_handler = MakeInvalidRowHandler( &_handle_invalid_row, value) @ignore_empty_lines.setter def ignore_empty_lines(self, value): deref(self.options).ignore_empty_lines = value def validate(self): check_status(deref(self.options).Validate()) def equals(self, ParseOptions other): """ Parameters ---------- other : pyarrow.csv.ParseOptions Returns ------- bool """ return ( self.delimiter == other.delimiter and self.quote_char == other.quote_char and self.double_quote == other.double_quote and self.escape_char == other.escape_char and self.newlines_in_values == other.newlines_in_values and self.ignore_empty_lines == other.ignore_empty_lines and self._invalid_row_handler == other._invalid_row_handler ) @staticmethod cdef ParseOptions wrap(CCSVParseOptions options): out = ParseOptions() out.options.reset(new CCSVParseOptions(move(options))) return out def __getstate__(self): return (self.delimiter, self.quote_char, self.double_quote, self.escape_char, self.newlines_in_values, self.ignore_empty_lines, self.invalid_row_handler) def __setstate__(self, state): (self.delimiter, self.quote_char, self.double_quote, self.escape_char, self.newlines_in_values, self.ignore_empty_lines, self.invalid_row_handler) = state def __eq__(self, other): try: return self.equals(other) except TypeError: return False cdef class _ISO8601(_Weakrefable): """ A special object indicating ISO-8601 parsing. """ __slots__ = () def __str__(self): return 'ISO8601' def __eq__(self, other): return isinstance(other, _ISO8601) ISO8601 = _ISO8601() cdef class ConvertOptions(_Weakrefable): """ Options for converting CSV data. Parameters ---------- check_utf8 : bool, optional (default True) Whether to check UTF8 validity of string columns. column_types : pyarrow.Schema or dict, optional Explicitly map column names to column types. Passing this argument disables type inference on the defined columns. null_values : list, optional A sequence of strings that denote nulls in the data (defaults are appropriate in most cases). Note that by default, string columns are not checked for null values. To enable null checking for those, specify ``strings_can_be_null=True``. true_values : list, optional A sequence of strings that denote true booleans in the data (defaults are appropriate in most cases). false_values : list, optional A sequence of strings that denote false booleans in the data (defaults are appropriate in most cases). decimal_point : 1-character string, optional (default '.') The character used as decimal point in floating-point and decimal data. strings_can_be_null : bool, optional (default False) Whether string / binary columns can have null values. If true, then strings in null_values are considered null for string columns. If false, then all strings are valid string values. quoted_strings_can_be_null : bool, optional (default True) Whether quoted values can be null. If true, then strings in "null_values" are also considered null when they appear quoted in the CSV file. Otherwise, quoted values are never considered null. include_columns : list, optional The names of columns to include in the Table. If empty, the Table will include all columns from the CSV file. If not empty, only these columns will be included, in this order. include_missing_columns : bool, optional (default False) If false, columns in `include_columns` but not in the CSV file will error out. If true, columns in `include_columns` but not in the CSV file will produce a column of nulls (whose type is selected using `column_types`, or null by default). This option is ignored if `include_columns` is empty. auto_dict_encode : bool, optional (default False) Whether to try to automatically dict-encode string / binary data. If true, then when type inference detects a string or binary column, it it dict-encoded up to `auto_dict_max_cardinality` distinct values (per chunk), after which it switches to regular encoding. This setting is ignored for non-inferred columns (those in `column_types`). auto_dict_max_cardinality : int, optional The maximum dictionary cardinality for `auto_dict_encode`. This value is per chunk. timestamp_parsers : list, optional A sequence of strptime()-compatible format strings, tried in order when attempting to infer or convert timestamp values (the special value ISO8601() can also be given). By default, a fast built-in ISO-8601 parser is used. Examples -------- Defining an example data: >>> import io >>> s = ( ... "animals,n_legs,entry,fast\\n" ... "Flamingo,2,01/03/2022,Yes\\n" ... "Horse,4,02/03/2022,Yes\\n" ... "Brittle stars,5,03/03/2022,No\\n" ... "Centipede,100,04/03/2022,No\\n" ... ",6,05/03/2022," ... ) >>> print(s) animals,n_legs,entry,fast Flamingo,2,01/03/2022,Yes Horse,4,02/03/2022,Yes Brittle stars,5,03/03/2022,No Centipede,100,04/03/2022,No ,6,05/03/2022, Change the type of a column: >>> import pyarrow as pa >>> from pyarrow import csv >>> convert_options = csv.ConvertOptions(column_types={"n_legs": pa.float64()}) >>> csv.read_csv(io.BytesIO(s.encode()), convert_options=convert_options) pyarrow.Table animals: string n_legs: double entry: string fast: string ---- animals: [["Flamingo","Horse","Brittle stars","Centipede",""]] n_legs: [[2,4,5,100,6]] entry: [["01/03/2022","02/03/2022","03/03/2022","04/03/2022","05/03/2022"]] fast: [["Yes","Yes","No","No",""]] Define a date parsing format to get a timestamp type column (in case dates are not in ISO format and not converted by default): >>> convert_options = csv.ConvertOptions( ... timestamp_parsers=["%m/%d/%Y", "%m-%d-%Y"]) >>> csv.read_csv(io.BytesIO(s.encode()), convert_options=convert_options) pyarrow.Table animals: string n_legs: int64 entry: timestamp[s] fast: string ---- animals: [["Flamingo","Horse","Brittle stars","Centipede",""]] n_legs: [[2,4,5,100,6]] entry: [[2022-01-03 00:00:00,2022-02-03 00:00:00,2022-03-03 00:00:00,2022-04-03 00:00:00,2022-05-03 00:00:00]] fast: [["Yes","Yes","No","No",""]] Specify a subset of columns to be read: >>> convert_options = csv.ConvertOptions( ... include_columns=["animals", "n_legs"]) >>> csv.read_csv(io.BytesIO(s.encode()), convert_options=convert_options) pyarrow.Table animals: string n_legs: int64 ---- animals: [["Flamingo","Horse","Brittle stars","Centipede",""]] n_legs: [[2,4,5,100,6]] List additional column to be included as a null typed column: >>> convert_options = csv.ConvertOptions( ... include_columns=["animals", "n_legs", "location"], ... include_missing_columns=True) >>> csv.read_csv(io.BytesIO(s.encode()), convert_options=convert_options) pyarrow.Table animals: string n_legs: int64 location: null ---- animals: [["Flamingo","Horse","Brittle stars","Centipede",""]] n_legs: [[2,4,5,100,6]] location: [5 nulls] Define columns as dictionary type (by default only the string/binary columns are dictionary encoded): >>> convert_options = csv.ConvertOptions( ... timestamp_parsers=["%m/%d/%Y", "%m-%d-%Y"], ... auto_dict_encode=True) >>> csv.read_csv(io.BytesIO(s.encode()), convert_options=convert_options) pyarrow.Table animals: dictionary n_legs: int64 entry: timestamp[s] fast: dictionary ---- animals: [ -- dictionary: ["Flamingo","Horse","Brittle stars","Centipede",""] -- indices: [0,1,2,3,4]] n_legs: [[2,4,5,100,6]] entry: [[2022-01-03 00:00:00,2022-02-03 00:00:00,2022-03-03 00:00:00,2022-04-03 00:00:00,2022-05-03 00:00:00]] fast: [ -- dictionary: ["Yes","No",""] -- indices: [0,0,1,1,2]] Set upper limit for the number of categories. If the categories is more than the limit, the conversion to dictionary will not happen: >>> convert_options = csv.ConvertOptions( ... include_columns=["animals"], ... auto_dict_encode=True, ... auto_dict_max_cardinality=2) >>> csv.read_csv(io.BytesIO(s.encode()), convert_options=convert_options) pyarrow.Table animals: string ---- animals: [["Flamingo","Horse","Brittle stars","Centipede",""]] Set empty strings to missing values: >>> convert_options = csv.ConvertOptions(include_columns=["animals", "n_legs"], ... strings_can_be_null=True) >>> csv.read_csv(io.BytesIO(s.encode()), convert_options=convert_options) pyarrow.Table animals: string n_legs: int64 ---- animals: [["Flamingo","Horse","Brittle stars","Centipede",null]] n_legs: [[2,4,5,100,6]] Define values to be True and False when converting a column into a bool type: >>> convert_options = csv.ConvertOptions( ... include_columns=["fast"], ... false_values=["No"], ... true_values=["Yes"]) >>> csv.read_csv(io.BytesIO(s.encode()), convert_options=convert_options) pyarrow.Table fast: bool ---- fast: [[true,true,false,false,null]] """ # Avoid mistakingly creating attributes __slots__ = () def __cinit__(self, *argw, **kwargs): self.options.reset( new CCSVConvertOptions(CCSVConvertOptions.Defaults())) def __init__(self, *, check_utf8=None, column_types=None, null_values=None, true_values=None, false_values=None, decimal_point=None, strings_can_be_null=None, quoted_strings_can_be_null=None, include_columns=None, include_missing_columns=None, auto_dict_encode=None, auto_dict_max_cardinality=None, timestamp_parsers=None): if check_utf8 is not None: self.check_utf8 = check_utf8 if column_types is not None: self.column_types = column_types if null_values is not None: self.null_values = null_values if true_values is not None: self.true_values = true_values if false_values is not None: self.false_values = false_values if decimal_point is not None: self.decimal_point = decimal_point if strings_can_be_null is not None: self.strings_can_be_null = strings_can_be_null if quoted_strings_can_be_null is not None: self.quoted_strings_can_be_null = quoted_strings_can_be_null if include_columns is not None: self.include_columns = include_columns if include_missing_columns is not None: self.include_missing_columns = include_missing_columns if auto_dict_encode is not None: self.auto_dict_encode = auto_dict_encode if auto_dict_max_cardinality is not None: self.auto_dict_max_cardinality = auto_dict_max_cardinality if timestamp_parsers is not None: self.timestamp_parsers = timestamp_parsers @property def check_utf8(self): """ Whether to check UTF8 validity of string columns. """ return deref(self.options).check_utf8 @check_utf8.setter def check_utf8(self, value): deref(self.options).check_utf8 = value @property def strings_can_be_null(self): """ Whether string / binary columns can have null values. """ return deref(self.options).strings_can_be_null @strings_can_be_null.setter def strings_can_be_null(self, value): deref(self.options).strings_can_be_null = value @property def quoted_strings_can_be_null(self): """ Whether quoted values can be null. """ return deref(self.options).quoted_strings_can_be_null @quoted_strings_can_be_null.setter def quoted_strings_can_be_null(self, value): deref(self.options).quoted_strings_can_be_null = value @property def column_types(self): """ Explicitly map column names to column types. """ d = {frombytes(item.first): pyarrow_wrap_data_type(item.second) for item in deref(self.options).column_types} return d @column_types.setter def column_types(self, value): cdef: shared_ptr[CDataType] typ if isinstance(value, Mapping): value = value.items() deref(self.options).column_types.clear() for item in value: if isinstance(item, Field): k = item.name v = item.type else: k, v = item typ = pyarrow_unwrap_data_type(ensure_type(v)) assert typ != NULL deref(self.options).column_types[tobytes(k)] = typ @property def null_values(self): """ A sequence of strings that denote nulls in the data. """ return [frombytes(x) for x in deref(self.options).null_values] @null_values.setter def null_values(self, value): deref(self.options).null_values = [tobytes(x) for x in value] @property def true_values(self): """ A sequence of strings that denote true booleans in the data. """ return [frombytes(x) for x in deref(self.options).true_values] @true_values.setter def true_values(self, value): deref(self.options).true_values = [tobytes(x) for x in value] @property def false_values(self): """ A sequence of strings that denote false booleans in the data. """ return [frombytes(x) for x in deref(self.options).false_values] @false_values.setter def false_values(self, value): deref(self.options).false_values = [tobytes(x) for x in value] @property def decimal_point(self): """ The character used as decimal point in floating-point and decimal data. """ return chr(deref(self.options).decimal_point) @decimal_point.setter def decimal_point(self, value): deref(self.options).decimal_point = _single_char(value) @property def auto_dict_encode(self): """ Whether to try to automatically dict-encode string / binary data. """ return deref(self.options).auto_dict_encode @auto_dict_encode.setter def auto_dict_encode(self, value): deref(self.options).auto_dict_encode = value @property def auto_dict_max_cardinality(self): """ The maximum dictionary cardinality for `auto_dict_encode`. This value is per chunk. """ return deref(self.options).auto_dict_max_cardinality @auto_dict_max_cardinality.setter def auto_dict_max_cardinality(self, value): deref(self.options).auto_dict_max_cardinality = value @property def include_columns(self): """ The names of columns to include in the Table. If empty, the Table will include all columns from the CSV file. If not empty, only these columns will be included, in this order. """ return [frombytes(s) for s in deref(self.options).include_columns] @include_columns.setter def include_columns(self, value): deref(self.options).include_columns.clear() for item in value: deref(self.options).include_columns.push_back(tobytes(item)) @property def include_missing_columns(self): """ If false, columns in `include_columns` but not in the CSV file will error out. If true, columns in `include_columns` but not in the CSV file will produce a null column (whose type is selected using `column_types`, or null by default). This option is ignored if `include_columns` is empty. """ return deref(self.options).include_missing_columns @include_missing_columns.setter def include_missing_columns(self, value): deref(self.options).include_missing_columns = value @property def timestamp_parsers(self): """ A sequence of strptime()-compatible format strings, tried in order when attempting to infer or convert timestamp values (the special value ISO8601() can also be given). By default, a fast built-in ISO-8601 parser is used. """ cdef: shared_ptr[CTimestampParser] c_parser c_string kind parsers = [] for c_parser in deref(self.options).timestamp_parsers: kind = deref(c_parser).kind() if kind == b'strptime': parsers.append(frombytes(deref(c_parser).format())) else: assert kind == b'iso8601' parsers.append(ISO8601) return parsers @timestamp_parsers.setter def timestamp_parsers(self, value): cdef: vector[shared_ptr[CTimestampParser]] c_parsers for v in value: if isinstance(v, str): c_parsers.push_back(CTimestampParser.MakeStrptime(tobytes(v))) elif v == ISO8601: c_parsers.push_back(CTimestampParser.MakeISO8601()) else: raise TypeError("Expected list of str or ISO8601 objects") deref(self.options).timestamp_parsers = move(c_parsers) @staticmethod cdef ConvertOptions wrap(CCSVConvertOptions options): out = ConvertOptions() out.options.reset(new CCSVConvertOptions(move(options))) return out def validate(self): check_status(deref(self.options).Validate()) def equals(self, ConvertOptions other): """ Parameters ---------- other : pyarrow.csv.ConvertOptions Returns ------- bool """ return ( self.check_utf8 == other.check_utf8 and self.column_types == other.column_types and self.null_values == other.null_values and self.true_values == other.true_values and self.false_values == other.false_values and self.decimal_point == other.decimal_point and self.timestamp_parsers == other.timestamp_parsers and self.strings_can_be_null == other.strings_can_be_null and self.quoted_strings_can_be_null == other.quoted_strings_can_be_null and self.auto_dict_encode == other.auto_dict_encode and self.auto_dict_max_cardinality == other.auto_dict_max_cardinality and self.include_columns == other.include_columns and self.include_missing_columns == other.include_missing_columns ) def __getstate__(self): return (self.check_utf8, self.column_types, self.null_values, self.true_values, self.false_values, self.decimal_point, self.timestamp_parsers, self.strings_can_be_null, self.quoted_strings_can_be_null, self.auto_dict_encode, self.auto_dict_max_cardinality, self.include_columns, self.include_missing_columns) def __setstate__(self, state): (self.check_utf8, self.column_types, self.null_values, self.true_values, self.false_values, self.decimal_point, self.timestamp_parsers, self.strings_can_be_null, self.quoted_strings_can_be_null, self.auto_dict_encode, self.auto_dict_max_cardinality, self.include_columns, self.include_missing_columns) = state def __eq__(self, other): try: return self.equals(other) except TypeError: return False cdef _get_reader(input_file, ReadOptions read_options, shared_ptr[CInputStream]* out): use_memory_map = False get_input_stream(input_file, use_memory_map, out) if read_options is not None: out[0] = native_transcoding_input_stream(out[0], read_options.encoding, 'utf8') cdef _get_read_options(ReadOptions read_options, CCSVReadOptions* out): if read_options is None: out[0] = CCSVReadOptions.Defaults() else: out[0] = deref(read_options.options) cdef _get_parse_options(ParseOptions parse_options, CCSVParseOptions* out): if parse_options is None: out[0] = CCSVParseOptions.Defaults() else: out[0] = deref(parse_options.options) cdef _get_convert_options(ConvertOptions convert_options, CCSVConvertOptions* out): if convert_options is None: out[0] = CCSVConvertOptions.Defaults() else: out[0] = deref(convert_options.options) cdef class CSVStreamingReader(RecordBatchReader): """An object that reads record batches incrementally from a CSV file. Should not be instantiated directly by user code. """ cdef readonly: Schema schema def __init__(self): raise TypeError("Do not call {}'s constructor directly, " "use pyarrow.csv.open_csv() instead." .format(self.__class__.__name__)) # Note about cancellation: we cannot create a SignalStopHandler # by default here, as several CSVStreamingReader instances may be # created (including by the same thread). Handling cancellation # would require having the user pass the SignalStopHandler. # (in addition to solving ARROW-11853) cdef _open(self, shared_ptr[CInputStream] stream, CCSVReadOptions c_read_options, CCSVParseOptions c_parse_options, CCSVConvertOptions c_convert_options, MemoryPool memory_pool): cdef: shared_ptr[CSchema] c_schema CIOContext io_context io_context = CIOContext(maybe_unbox_memory_pool(memory_pool)) with nogil: self.reader = GetResultValue( CCSVStreamingReader.Make( io_context, stream, move(c_read_options), move(c_parse_options), move(c_convert_options))) c_schema = self.reader.get().schema() self.schema = pyarrow_wrap_schema(c_schema) def read_csv(input_file, read_options=None, parse_options=None, convert_options=None, MemoryPool memory_pool=None): """ Read a Table from a stream of CSV data. Parameters ---------- input_file : string, path or file-like object The location of CSV data. If a string or path, and if it ends with a recognized compressed file extension (e.g. ".gz" or ".bz2"), the data is automatically decompressed when reading. read_options : pyarrow.csv.ReadOptions, optional Options for the CSV reader (see pyarrow.csv.ReadOptions constructor for defaults) parse_options : pyarrow.csv.ParseOptions, optional Options for the CSV parser (see pyarrow.csv.ParseOptions constructor for defaults) convert_options : pyarrow.csv.ConvertOptions, optional Options for converting CSV data (see pyarrow.csv.ConvertOptions constructor for defaults) memory_pool : MemoryPool, optional Pool to allocate Table memory from Returns ------- :class:`pyarrow.Table` Contents of the CSV file as a in-memory table. Examples -------- Defining an example file from bytes object: >>> import io >>> s = ( ... "animals,n_legs,entry\\n" ... "Flamingo,2,2022-03-01\\n" ... "Horse,4,2022-03-02\\n" ... "Brittle stars,5,2022-03-03\\n" ... "Centipede,100,2022-03-04" ... ) >>> print(s) animals,n_legs,entry Flamingo,2,2022-03-01 Horse,4,2022-03-02 Brittle stars,5,2022-03-03 Centipede,100,2022-03-04 >>> source = io.BytesIO(s.encode()) Reading from the file >>> from pyarrow import csv >>> csv.read_csv(source) pyarrow.Table animals: string n_legs: int64 entry: date32[day] ---- animals: [["Flamingo","Horse","Brittle stars","Centipede"]] n_legs: [[2,4,5,100]] entry: [[2022-03-01,2022-03-02,2022-03-03,2022-03-04]] """ cdef: shared_ptr[CInputStream] stream CCSVReadOptions c_read_options CCSVParseOptions c_parse_options CCSVConvertOptions c_convert_options CIOContext io_context SharedPtrNoGIL[CCSVReader] reader shared_ptr[CTable] table _get_reader(input_file, read_options, &stream) _get_read_options(read_options, &c_read_options) _get_parse_options(parse_options, &c_parse_options) _get_convert_options(convert_options, &c_convert_options) with SignalStopHandler() as stop_handler: io_context = CIOContext( maybe_unbox_memory_pool(memory_pool), ( stop_handler.stop_token).stop_token) reader = GetResultValue(CCSVReader.Make( io_context, stream, c_read_options, c_parse_options, c_convert_options)) with nogil: table = GetResultValue(reader.get().Read()) return pyarrow_wrap_table(table) def open_csv(input_file, read_options=None, parse_options=None, convert_options=None, MemoryPool memory_pool=None): """ Open a streaming reader of CSV data. Reading using this function is always single-threaded. Parameters ---------- input_file : string, path or file-like object The location of CSV data. If a string or path, and if it ends with a recognized compressed file extension (e.g. ".gz" or ".bz2"), the data is automatically decompressed when reading. read_options : pyarrow.csv.ReadOptions, optional Options for the CSV reader (see pyarrow.csv.ReadOptions constructor for defaults) parse_options : pyarrow.csv.ParseOptions, optional Options for the CSV parser (see pyarrow.csv.ParseOptions constructor for defaults) convert_options : pyarrow.csv.ConvertOptions, optional Options for converting CSV data (see pyarrow.csv.ConvertOptions constructor for defaults) memory_pool : MemoryPool, optional Pool to allocate Table memory from Returns ------- :class:`pyarrow.csv.CSVStreamingReader` """ cdef: shared_ptr[CInputStream] stream CCSVReadOptions c_read_options CCSVParseOptions c_parse_options CCSVConvertOptions c_convert_options CSVStreamingReader reader _get_reader(input_file, read_options, &stream) _get_read_options(read_options, &c_read_options) _get_parse_options(parse_options, &c_parse_options) _get_convert_options(convert_options, &c_convert_options) reader = CSVStreamingReader.__new__(CSVStreamingReader) reader._open(stream, move(c_read_options), move(c_parse_options), move(c_convert_options), memory_pool) return reader def _raise_invalid_function_option(value, description, *, exception_class=ValueError): raise exception_class(f"\"{value}\" is not a valid {description}") cdef CQuotingStyle unwrap_quoting_style(quoting_style) except *: if quoting_style == "needed": return CQuotingStyle_Needed elif quoting_style == "all_valid": return CQuotingStyle_AllValid elif quoting_style == "none": return CQuotingStyle_None _raise_invalid_function_option(quoting_style, "quoting style") cdef wrap_quoting_style(quoting_style): if quoting_style == CQuotingStyle_Needed: return 'needed' elif quoting_style == CQuotingStyle_AllValid: return 'all_valid' elif quoting_style == CQuotingStyle_None: return 'none' cdef class WriteOptions(_Weakrefable): """ Options for writing CSV files. Parameters ---------- include_header : bool, optional (default True) Whether to write an initial header line with column names batch_size : int, optional (default 1024) How many rows to process together when converting and writing CSV data delimiter : 1-character string, optional (default ",") The character delimiting individual cells in the CSV data. quoting_style : str, optional (default "needed") Whether to quote values, and if so, which quoting style to use. The following values are accepted: - "needed" (default): only enclose values in quotes when needed. - "all_valid": enclose all valid values in quotes; nulls are not quoted. - "none": do not enclose any values in quotes; values containing special characters (such as quotes, cell delimiters or line endings) will raise an error. """ # Avoid mistakingly creating attributes __slots__ = () def __init__(self, *, include_header=None, batch_size=None, delimiter=None, quoting_style=None): self.options.reset(new CCSVWriteOptions(CCSVWriteOptions.Defaults())) if include_header is not None: self.include_header = include_header if batch_size is not None: self.batch_size = batch_size if delimiter is not None: self.delimiter = delimiter if quoting_style is not None: self.quoting_style = quoting_style @property def include_header(self): """ Whether to write an initial header line with column names. """ return deref(self.options).include_header @include_header.setter def include_header(self, value): deref(self.options).include_header = value @property def batch_size(self): """ How many rows to process together when converting and writing CSV data. """ return deref(self.options).batch_size @batch_size.setter def batch_size(self, value): deref(self.options).batch_size = value @property def delimiter(self): """ The character delimiting individual cells in the CSV data. """ return chr(deref(self.options).delimiter) @delimiter.setter def delimiter(self, value): deref(self.options).delimiter = _single_char(value) @property def quoting_style(self): """ Whether to quote values, and if so, which quoting style to use. The following values are accepted: - "needed" (default): only enclose values in quotes when needed. - "all_valid": enclose all valid values in quotes; nulls are not quoted. - "none": do not enclose any values in quotes; values containing special characters (such as quotes, cell delimiters or line endings) will raise an error. """ return wrap_quoting_style(deref(self.options).quoting_style) @quoting_style.setter def quoting_style(self, value): deref(self.options).quoting_style = unwrap_quoting_style(value) @staticmethod cdef WriteOptions wrap(CCSVWriteOptions options): out = WriteOptions() out.options.reset(new CCSVWriteOptions(move(options))) return out def validate(self): check_status(self.options.get().Validate()) cdef _get_write_options(WriteOptions write_options, CCSVWriteOptions* out): if write_options is None: out[0] = CCSVWriteOptions.Defaults() else: out[0] = deref(write_options.options) def write_csv(data, output_file, write_options=None, MemoryPool memory_pool=None): """ Write record batch or table to a CSV file. Parameters ---------- data : pyarrow.RecordBatch or pyarrow.Table The data to write. output_file : string, path, pyarrow.NativeFile, or file-like object The location where to write the CSV data. write_options : pyarrow.csv.WriteOptions Options to configure writing the CSV data. memory_pool : MemoryPool, optional Pool for temporary allocations. Examples -------- >>> import pyarrow as pa >>> from pyarrow import csv >>> legs = pa.array([2, 4, 5, 100]) >>> animals = pa.array(["Flamingo", "Horse", "Brittle stars", "Centipede"]) >>> entry_date = pa.array(["01/03/2022", "02/03/2022", ... "03/03/2022", "04/03/2022"]) >>> table = pa.table([animals, legs, entry_date], ... names=["animals", "n_legs", "entry"]) >>> csv.write_csv(table, "animals.csv") >>> write_options = csv.WriteOptions(include_header=False) >>> csv.write_csv(table, "animals.csv", write_options=write_options) >>> write_options = csv.WriteOptions(delimiter=";") >>> csv.write_csv(table, "animals.csv", write_options=write_options) """ cdef: shared_ptr[COutputStream] stream CCSVWriteOptions c_write_options CMemoryPool* c_memory_pool CRecordBatch* batch CTable* table _get_write_options(write_options, &c_write_options) get_writer(output_file, &stream) c_memory_pool = maybe_unbox_memory_pool(memory_pool) c_write_options.io_context = CIOContext(c_memory_pool) if isinstance(data, RecordBatch): batch = pyarrow_unwrap_batch(data).get() with nogil: check_status(WriteCSV(deref(batch), c_write_options, stream.get())) elif isinstance(data, Table): table = pyarrow_unwrap_table(data).get() with nogil: check_status(WriteCSV(deref(table), c_write_options, stream.get())) else: raise TypeError(f"Expected Table or RecordBatch, got '{type(data)}'") cdef class CSVWriter(_CRecordBatchWriter): """ Writer to create a CSV file. Parameters ---------- sink : str, path, pyarrow.OutputStream or file-like object The location where to write the CSV data. schema : pyarrow.Schema The schema of the data to be written. write_options : pyarrow.csv.WriteOptions Options to configure writing the CSV data. memory_pool : MemoryPool, optional Pool for temporary allocations. """ def __init__(self, sink, Schema schema, *, WriteOptions write_options=None, MemoryPool memory_pool=None): cdef: shared_ptr[COutputStream] c_stream shared_ptr[CSchema] c_schema = pyarrow_unwrap_schema(schema) CCSVWriteOptions c_write_options CMemoryPool* c_memory_pool = maybe_unbox_memory_pool(memory_pool) _get_write_options(write_options, &c_write_options) c_write_options.io_context = CIOContext(c_memory_pool) get_writer(sink, &c_stream) with nogil: self.writer = GetResultValue(MakeCSVWriter( c_stream, c_schema, c_write_options))