# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import functools import os import pathlib import subprocess import sys import time import urllib.request import pytest import hypothesis as h from ..conftest import groups, defaults from pyarrow import set_timezone_db_path from pyarrow.util import find_free_port # setup hypothesis profiles h.settings.register_profile('ci', max_examples=1000) h.settings.register_profile('dev', max_examples=50) h.settings.register_profile('debug', max_examples=10, verbosity=h.Verbosity.verbose) # load default hypothesis profile, either set HYPOTHESIS_PROFILE environment # variable or pass --hypothesis-profile option to pytest, to see the generated # examples try: # pytest pyarrow -sv --enable-hypothesis --hypothesis-profile=debug h.settings.load_profile(os.environ.get('HYPOTHESIS_PROFILE', 'dev')) # Set this at the beginning before the AWS SDK was loaded to avoid reading in # user configuration values. os.environ['AWS_CONFIG_FILE'] = "/dev/null" if sys.platform == 'win32': tzdata_set_path = os.environ.get('PYARROW_TZDATA_PATH', None) if tzdata_set_path: set_timezone_db_path(tzdata_set_path) def pytest_addoption(parser): # Create options to selectively enable test groups def bool_env(name, default=None): value = os.environ.get(name.upper()) if not value: # missing or empty return default value = value.lower() if value in {'1', 'true', 'on', 'yes', 'y'}: return True elif value in {'0', 'false', 'off', 'no', 'n'}: return False else: raise ValueError('{}={} is not parsable as boolean' .format(name.upper(), value)) for group in groups: default = bool_env('PYARROW_TEST_{}'.format(group), defaults[group]) parser.addoption('--enable-{}'.format(group), action='store_true', default=default, help=('Enable the {} test group'.format(group))) parser.addoption('--disable-{}'.format(group), action='store_true', default=False, help=('Disable the {} test group'.format(group))) class PyArrowConfig: def __init__(self): self.is_enabled = {} def apply_mark(self, mark): group = mark.name if group in groups: self.requires(group) def requires(self, group): if not self.is_enabled[group]: pytest.skip('{} NOT enabled'.format(group)) def pytest_configure(config): # Apply command-line options to initialize PyArrow-specific config object config.pyarrow = PyArrowConfig() for mark in groups: config.addinivalue_line( "markers", mark, ) enable_flag = '--enable-{}'.format(mark) disable_flag = '--disable-{}'.format(mark) is_enabled = (config.getoption(enable_flag) and not config.getoption(disable_flag)) config.pyarrow.is_enabled[mark] = is_enabled def pytest_runtest_setup(item): # Apply test markers to skip tests selectively for mark in item.iter_markers(): item.config.pyarrow.apply_mark(mark) @pytest.fixture def tempdir(tmpdir): # convert pytest's LocalPath to pathlib.Path return pathlib.Path(tmpdir.strpath) @pytest.fixture(scope='session') def base_datadir(): return pathlib.Path(__file__).parent / 'data' @pytest.fixture(autouse=True) def disable_aws_metadata(monkeypatch): """Stop the AWS SDK from trying to contact the EC2 metadata server. Otherwise, this causes a 5 second delay in tests that exercise the S3 filesystem. """ monkeypatch.setenv("AWS_EC2_METADATA_DISABLED", "true") # TODO(kszucs): move the following fixtures to test_fs.py once the previous # parquet dataset implementation and hdfs implementation are removed. @pytest.fixture(scope='session') def hdfs_connection(): host = os.environ.get('ARROW_HDFS_TEST_HOST', 'default') port = int(os.environ.get('ARROW_HDFS_TEST_PORT', 0)) user = os.environ.get('ARROW_HDFS_TEST_USER', 'hdfs') return host, port, user @pytest.fixture(scope='session') def s3_connection(): host, port = '127.0.0.1', find_free_port() access_key, secret_key = 'arrow', 'apachearrow' return host, port, access_key, secret_key def retry(attempts=3, delay=1.0, max_delay=None, backoff=1): """ Retry decorator Parameters ---------- attempts : int, default 3 The number of attempts. delay : float, default 1 Initial delay in seconds. max_delay : float, optional The max delay between attempts. backoff : float, default 1 The multiplier to delay after each attempt. """ def decorate(func): @functools.wraps(func) def wrapper(*args, **kwargs): remaining_attempts = attempts curr_delay = delay while remaining_attempts > 0: try: return func(*args, **kwargs) except Exception as err: remaining_attempts -= 1 last_exception = err curr_delay *= backoff if max_delay: curr_delay = min(curr_delay, max_delay) time.sleep(curr_delay) raise last_exception return wrapper return decorate @pytest.fixture(scope='session') def s3_server(s3_connection, tmpdir_factory): @retry(attempts=5, delay=1, backoff=2) def minio_server_health_check(address): resp = urllib.request.urlopen(f"http://{address}/minio/health/live") assert resp.getcode() == 200 tmpdir = tmpdir_factory.getbasetemp() host, port, access_key, secret_key = s3_connection address = '{}:{}'.format(host, port) env = os.environ.copy() env.update({ 'MINIO_ACCESS_KEY': access_key, 'MINIO_SECRET_KEY': secret_key }) args = ['minio', '--compat', 'server', '--quiet', '--address', address, tmpdir] proc = None try: proc = subprocess.Popen(args, env=env) except OSError: pytest.skip('`minio` command cannot be located') else: # Wait for the server to startup before yielding minio_server_health_check(address) yield { 'connection': s3_connection, 'process': proc, 'tempdir': tmpdir } finally: if proc is not None: proc.kill() proc.wait() @pytest.fixture(scope='session') def gcs_server(): port = find_free_port() env = os.environ.copy() exe = 'storage-testbench' args = [exe, '--port', str(port)] proc = None try: # start server proc = subprocess.Popen(args, env=env) # Make sure the server is alive. if proc.poll() is not None: pytest.skip(f"Command {args} did not start server successfully!") except OSError as e: pytest.skip(f"Command {args} failed to execute: {e}") else: yield { 'connection': ('localhost', port), 'process': proc, } finally: if proc is not None: proc.kill() proc.wait() @pytest.fixture(scope='session') def azure_server(tmpdir_factory): port = find_free_port() env = os.environ.copy() tmpdir = tmpdir_factory.getbasetemp() # We only need blob service emulator, not queue or table. args = ['azurite-blob', "--location", tmpdir, "--blobPort", str(port)] # For old Azurite. We can't install the latest Azurite with old # Node.js on old Ubuntu. args += ["--skipApiVersionCheck"] proc = None try: proc = subprocess.Popen(args, env=env) # Make sure the server is alive. if proc.poll() is not None: pytest.skip(f"Command {args} did not start server successfully!") except (ModuleNotFoundError, OSError) as e: pytest.skip(f"Command {args} failed to execute: {e}") else: yield { # Use the standard azurite account_name and account_key. # https://learn.microsoft.com/en-us/azure/storage/common/storage-use-emulator#authorize-with-shared-key-credentials 'connection': ('127.0.0.1', port, 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2' 'UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=='), 'process': proc, 'tempdir': tmpdir, } finally: if proc is not None: proc.kill() proc.wait() @pytest.fixture( params=[ 'builtin_pickle', 'cloudpickle' ], scope='session' ) def pickle_module(request): return request.getfixturevalue(request.param) @pytest.fixture(scope='session') def builtin_pickle(): import pickle return pickle @pytest.fixture(scope='session') def cloudpickle(): cp = pytest.importorskip('cloudpickle') if 'HIGHEST_PROTOCOL' not in cp.__dict__: cp.HIGHEST_PROTOCOL = cp.DEFAULT_PROTOCOL return cp