Giter Club home page Giter Club logo

airflow-provider-great-expectations's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

airflow-provider-great-expectations's Issues

`great-expectations>=0.14` causes `commented_map` missing error

I could not check if this has to do with Airflow 2.4.x, but I'm getting this error when using GreatExpectationsOperator with CheckpointConfig when using great-expectations>=0.14:

[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - Exception in thread Thread-1:
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - Traceback (most recent call last):
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/great_expectations/types/__init__.py", line 209, in assert_valid_keys
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     _ = self[name]
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/great_expectations/types/__init__.py", line 73, in __getitem__
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     return getattr(self, item)
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/types/base.py", line 151, in commented_map
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     return self._get_schema_validated_updated_commented_map()
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/types/base.py", line 125, in _get_schema_validated_updated_commented_map
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     commented_map: CommentedMap = copy.deepcopy(self._commented_map)
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - AttributeError: 'CheckpointConfig' object has no attribute '_commented_map'
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - 
During handling of the above exception, another exception occurred:
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - Traceback (most recent call last):
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/great_expectations/types/__init__.py", line 212, in assert_valid_keys
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     _ = self[f"_{name}"]
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/great_expectations/types/__init__.py", line 73, in __getitem__
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     return getattr(self, item)
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - AttributeError: 'CheckpointConfig' object has no attribute '_commented_map'
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - 
During handling of the above exception, another exception occurred:
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - Traceback (most recent call last):
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/threading.py", line 980, in _bootstrap_inner
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     self.run()
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/threading.py", line 917, in run
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     self._target(*self._args, **self._kwargs)
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/openlineage/airflow/listener.py", line 114, in on_running
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     **get_custom_facets(task, dagrun.external_trigger, task_instance_copy)
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/openlineage/airflow/utils.py", line 265, in get_custom_facets
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     "airflow_version": AirflowVersionRunFacet.from_task(task),
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/openlineage/airflow/facets.py", line 34, in from_task
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     to_json_encodable(task),
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/openlineage/airflow/utils.py", line 101, in to_json_encodable
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     return json.loads(json.dumps(task.__dict__, default=_task_encoder))
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/json/__init__.py", line 234, in dumps
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     return cls(
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/json/encoder.py", line 199, in encode
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     chunks = self.iterencode(o, _one_shot=True)
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/json/encoder.py", line 257, in iterencode
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     return _iterencode(o, 0)
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/openlineage/airflow/utils.py", line 99, in _task_encoder
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     return str(obj)
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/types/base.py", line 2883, in __str__
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     return self.__repr__()
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/types/base.py", line 2862, in __repr__
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     json_dict: dict = self.to_json_dict()
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/types/base.py", line 2850, in to_json_dict
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     dict_obj: dict = self.to_dict()
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/great_expectations/types/__init__.py", line 140, in to_dict
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     for key in self.property_names(
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/great_expectations/types/__init__.py", line 227, in property_names
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     assert_valid_keys(keys=exclude_keys, purpose="exclusion")
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -   File "/usr/local/lib/python3.9/site-packages/great_expectations/types/__init__.py", line 214, in assert_valid_keys
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -     raise ValueError(
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - ValueError: Property "commented_map", marked for exclusion on object "<class 'great_expectations.data_context.types.base.CheckpointConfig'>", does not exist.

I don't see this error with >=0.15, but with 0.14.x, I see this additional error:

[2022-10-07, 19:46:14 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/util.py", line 117, in instantiate_class_from_config
    class_instance = class_(**config_with_defaults)
TypeError: __init__() got an unexpected keyword argument 'site_names'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/great_expectations_provider/operators/great_expectations.py", line 142, in execute
    self.checkpoint = instantiate_class_from_config(
  File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/util.py", line 121, in instantiate_class_from_config
    class_name, format_dict_for_error_message(config_with_defaults)
  File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/util.py", line 165, in format_dict_for_error_message
    return "\n\t".join("\t\t".join((str(key), str(dict_[key]))) for key in dict_)
  File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/util.py", line 165, in <genexpr>
    return "\n\t".join("\t\t".join((str(key), str(dict_[key]))) for key in dict_)
  File "/usr/local/lib/python3.9/site-packages/great_expectations/core/config_peer.py", line 80, in __repr__
    return str(self.get_config())
  File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/types/base.py", line 2130, in __str__
    return self.__repr__()
  File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/types/base.py", line 2109, in __repr__
    json_dict: dict = self.to_sanitized_json_dict()
  File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/types/base.py", line 2098, in to_sanitized_json_dict
    serializeable_dict = self.to_json_dict()
  File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/types/base.py", line 2088, in to_json_dict
    serializeable_dict: dict = convert_to_json_serializable(data=dict_obj)
  File "/usr/local/lib/python3.9/site-packages/great_expectations/core/util.py", line 161, in convert_to_json_serializable
    new_dict[str(key)] = convert_to_json_serializable(data[key])
  File "/usr/local/lib/python3.9/site-packages/great_expectations/core/util.py", line 161, in convert_to_json_serializable
    new_dict[str(key)] = convert_to_json_serializable(data[key])
  File "/usr/local/lib/python3.9/site-packages/great_expectations/core/util.py", line 256, in convert_to_json_serializable
    raise TypeError(
TypeError: <great_expectations.data_context.types.base.DatasourceConfig object at 0x7f4009f096a0> is of type DatasourceConfig which cannot be serialized.

Current environment:
apache-airflow==2.4.1
airflow-provider-great-expectations==0.1.5
great-expectations>=0.14

I confirmed that the same DAG works with great-expectations==0.13.49 (this requires pinning jinja2>=3.0.0,<3.1.0)

Parallel Execution of GX in Airflow randomly fails. In serial execution always passes

Parallel Execution of GX provider in Airflow randomly fails.

Here is the log:

***   * /mnt/airdrive/airflow/logs/dag_id=xxxx_pipeline_dag_v4/run_id=scheduled__2024-04-07T00:00:00+00:00/task_id=gx_validate_xxx_col_std_dev/attempt=1.log
[2024-04-08, 09:51:48 IST] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: xxxx_pipeline_dag_v4.gx_validate_xxx_col_std_dev scheduled__2024-04-07T00:00:00+00:00 [queued]>
[2024-04-08, 09:51:49 IST] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: xxxx_pipeline_dag_v4.gx_validate_xxx_col_std_dev scheduled__2024-04-07T00:00:00+00:00 [queued]>
[2024-04-08, 09:51:49 IST] {taskinstance.py:2193} INFO - Starting attempt 1 of 1
[2024-04-08, 09:51:49 IST] {taskinstance.py:2214} INFO - Executing <Task(GreatExpectationsOperator): gx_validate_xxx_col_std_dev> on 2024-04-07 00:00:00+00:00
[2024-04-08, 09:51:49 IST] {standard_task_runner.py:60} INFO - Started process 1111645 to run task
[2024-04-08, 09:51:49 IST] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'xxxx_pipeline_dag_v4', 'gx_validate_xxx_col_std_dev', 'scheduled__2024-04-07T00:00:00+00:00', '--job-id', '622', '--raw', '--subdir', 'DAGS_FOLDER/xxxxxxxx/xxxxx_pipeline_dag_v4.py', '--cfg-path', '/tmp/tmpn6gnlugt']
[2024-04-08, 09:51:49 IST] {standard_task_runner.py:88} INFO - Job 622: Subtask gx_validate_xxx_col_std_dev
[2024-04-08, 09:51:49 IST] {task_command.py:423} INFO - Running <TaskInstance: xxxx_pipeline_dag_v4.gx_validate_xxx_col_std_dev scheduled__2024-04-07T00:00:00+00:00 [running]> on host xxxx-datapipeline-***-xxxxx.xxxxdatapipelin.xxxxxxxx.com
[2024-04-08, 09:51:49 IST] {taskinstance.py:2510} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='xxxx_pipeline_dag_v4' AIRFLOW_CTX_TASK_ID='gx_validate_xxxx_exa_xxxx_col_std_dev' AIRFLOW_CTX_EXECUTION_DATE='2024-04-07T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-04-07T00:00:00+00:00'
[2024-04-08, 09:51:49 IST] {great_expectations.py:580} INFO - Running validation with Great Expectations...
[2024-04-08, 09:51:49 IST] {great_expectations.py:582} INFO - Instantiating Data Context...
[2024-04-08, 09:51:49 IST] {taskinstance.py:2728} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 444, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 414, in _execute_callable
    return execute_callable(context=context, **execute_callable_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations_provider/operators/great_expectations.py", line 586, in execute
    self.data_context = ge.data_context.FileDataContext(
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/file_data_context.py", line 64, in __init__
    self._scaffold_project()
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/file_data_context.py", line 91, in _scaffold_project
    if self.is_project_scaffolded(self._context_root_directory):
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/serializable_data_context.py", line 513, in is_project_scaffolded
    and cls.config_variables_yml_exist(ge_dir)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/serializable_data_context.py", line 261, in config_variables_yml_exist
    config_var_path = config.get("config_variables_file_path")
                      ^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'get'

Another random failure log:

:
:
[2024-04-08, 22:21:00 IST] {taskinstance.py:2193} INFO - Starting attempt 1 of 1
[2024-04-08, 22:21:00 IST] {taskinstance.py:2214} INFO - Executing &lt;Task(GreatExpectationsOperator): gx_validate_xxx_col_not_null&gt; on 2024-04-08 16:50:34.740105+00:00
[2024-04-08, 22:21:00 IST] {standard_task_runner.py:60} INFO - Started process 1539585 to run task
[2024-04-08, 22:21:00 IST] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'xxx_pipeline_dag_v4', 'gx_validate_xxx_col_not_null', 'manual__2024-04-08T16:50:34.740105+00:00', '--job-id', '633', '--raw', '--subdir', 'DAGS_FOLDER/xxx_dags/xxx_pipeline_dag_v4.py', '--cfg-path', '/tmp/tmpdgy8dl6u']
[2024-04-08, 22:21:00 IST] {standard_task_runner.py:88} INFO - Job 633: Subtask gx_validate_xxx_col_not_null
[2024-04-08, 22:21:00 IST] {task_command.py:423} INFO - Running &lt;TaskInstance: xxx_pipeline_dag_v4.gx_validate_xxx_col_not_null manual__2024-04-08T16:50:34.740105+00:00 [running]&gt; on host xxx-datapipeline-***-private.sub03080733021.xxx.com
[2024-04-08, 22:21:00 IST] {taskinstance.py:2510} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='xxx_pipeline_dag_v4' AIRFLOW_CTX_TASK_ID='gx_validate_xxx_col_not_null' AIRFLOW_CTX_EXECUTION_DATE='2024-04-08T16:50:34.740105+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-04-08T16:50:34.740105+00:00'
[2024-04-08, 22:21:00 IST] {great_expectations.py:580} INFO - Running validation with Great Expectations...
[2024-04-08, 22:21:00 IST] {great_expectations.py:582} INFO - Instantiating Data Context...
[2024-04-08, 22:21:00 IST] {logging_mixin.py:188} INFO - /home/xxx/workspace/***/gx/great_expectations.yml
[2024-04-08, 22:21:00 IST] {logging_mixin.py:188} INFO - ordereddict([('config_version', 3.0), ('datasources', ordereddict([('xxx_CLEANSED_runtime_datasource', ordereddict([('class_name', 'Datasource'), ('module_name', 'great_expectations.datasource'), ('execution_engine', ordereddict([('class_name', 'PandasExecutionEngine'), ('module_name', 'great_expectations.execution_engine')])), ('data_connectors', ordereddict([('default_runtime_connector', ordereddict([('name', 'default_runtime_connector'), ('class_name', 'RuntimeDataConnector'), ('module_name', 'great_expectations.datasource.data_connector'), ('batch_identifiers', ['***_run_id'])]))]))])), ('dynamic_pandas_asset_runtime_datasource', ordereddict([('class_name', 'Datasource'), ('module_name', 'great_expectations.datasource'), ('execution_engine', ordereddict([('class_name', 'PandasExecutionEngine'), ('module_name', 'great_expectations.execution_engine')])), ('data_connectors', ordereddict([('default_runtime_connector', ordereddict([('name', 'default_runtime_connector'), ('class_name', 'RuntimeDataConnector'), ('module_name', 'great_expectations.datasource.data_connector'), ('batch_identifiers', ['***_run_id'])]))]))]))])), ('config_variables_file_path', 'uncommitted/config_variables.yml'), ('plugins_directory', 'plugins/'), ('stores', ordereddict([('expectations_store', ordereddict([('class_name', 'ExpectationsStore'), ('store_backend', ordereddict([('class_name', 'TupleFilesystemStoreBackend'), ('base_directory', 'expectations/')]))])), ('validations_store', ordereddict([('class_name', 'ValidationsStore'), ('store_backend', ordereddict([('class_name', 'TupleFilesystemStoreBackend'), ('base_directory', 'uncommitted/validations/')]))])), ('evaluation_parameter_store', ordereddict([('class_name', 'EvaluationParameterStore')])), ('checkpoint_store', ordereddict([('class_name', 'CheckpointStore'), ('store_backend', ordereddict([('class_name', 'TupleFilesystemStoreBackend'), ('suppress_store_backend_id', True), ('base_directory', 'checkpoints/')]))])), ('profiler_store', ordereddict([('class_name', 'ProfilerStore'), ('store_backend', ordereddict([('class_name', 'TupleFilesystemStoreBackend'), ('suppress_store_backend_id', True), ('base_directory', 'profilers/')]))]))])), ('expectations_store_name', 'expectations_store'), ('validations_store_name', 'validations_store'), ('evaluation_parameter_store_name', 'evaluation_parameter_store'), ('checkpoint_store_name', 'checkpoint_store'), ('data_docs_sites', ordereddict([('local_site', ordereddict([('class_name', 'SiteBuilder'), ('show_how_to_buttons', True), ('store_backend', ordereddict([('class_name', 'TupleFilesystemStoreBackend'), ('base_directory', 'uncommitted/data_docs/local_site/')])), ('site_index_builder', ordereddict([('class_name', 'DefaultSiteIndexBuilder')]))]))])), ('anonymous_usage_statistics', ordereddict([('data_context_id', 'dba4d0fa-ce75-444b-94e5-623ad64aecd1'), ('enabled', True)])), ('fluent_datasources', ordereddict([('filesystem_source_pandas', ordereddict([('type', 'pandas'), ('assets', ordereddict([('xxx_CLEANSED', ordereddict([('type', 'csv'), ('filepath_or_buffer', 'data/xxx_CLEANSED.csv')]))]))])), ('dynamic_pandas', ordereddict([('type', 'pandas'), ('assets', ordereddict([('dynamic_pandas_asset', ordereddict([('type', 'dataframe'), ('batch_metadata', ordereddict())]))]))]))])), ('notebooks', None), ('include_rendered_content', ordereddict([('globally', False), ('expectation_suite', False), ('expectation_validation_result', False)]))])
[2024-04-08, 22:21:00 IST] {base.py:1716} ERROR - Error while processing DataContextConfig: _schema
[2024-04-08, 22:21:00 IST] {base.py:145} ERROR - Encountered errors during loading config.  See ValidationError for more details.
[2024-04-08, 22:21:00 IST] {taskinstance.py:2728} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 444, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 414, in _execute_callable
    return execute_callable(context=context, **execute_callable_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations_provider/operators/great_expectations.py", line 586, in execute
    self.data_context = ge.data_context.FileDataContext(
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/file_data_context.py", line 66, in __init__
    self._project_config = self._init_project_config(project_config)
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/file_data_context.py", line 111, in _init_project_config
    project_config = FileDataContext._load_file_backed_project_config(
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/file_data_context.py", line 213, in _load_file_backed_project_config
    return DataContextConfig.from_commented_map(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/types/base.py", line 139, in from_commented_map
    config: Union[dict, BYC] = schema_instance.load(commented_map)
                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/marshmallow/schema.py", line 723, in load
    return self._do_load(
           ^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/marshmallow/schema.py", line 909, in _do_load
    self.handle_error(exc, data, many=many, partial=partial)
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/types/base.py", line 1717, in handle_error
    raise gx_exceptions.InvalidDataContextConfigError(
great_expectations.exceptions.exceptions.InvalidDataContextConfigError: Error while processing DataContextConfig: _schema
[2024-04-08, 22:21:00 IST] {taskinstance.py:1149} INFO - Marking task as FAILED. dag_id=xxx_pipeline_dag_v4, task_id=gx_validate_xxx_col_not_null, execution_date=20240408T165034, start_date=20240408T165100, end_date=20240408T165100
[2024-04-08, 22:21:00 IST] {standard_task_runner.py:107} ERROR - Failed to execute job 633 for task gx_validate_xxx_col_not_null (Error while processing DataContextConfig: _schema; 1539585)
[2024-04-08, 22:21:00 IST] {local_task_job_runner.py:234} INFO - Task exited with return code 1
[2024-04-08, 22:21:01 IST] {taskinstance.py:3309} INFO - 0 downstream tasks scheduled from follow-on schedule check</code>

data_asset_name is not recognized in airflow-provider-great-expectations==0.2.0

Hi team, we are working on integrating GX with snowflake datasource into our data validation system via GreatExpectationsOperator .

We are planning to run some expectations validation against a snowflake table named test_sf_table . However, we are getting KeyError: 'data_asset_name test_sf_table is not recognized.' when running our DAG. We have try both upper and lower cases with and without schema, such as data_asset_name: <schema_name>.<table_name> .

Does anyone know what the issue could be? Or is there any configuration issue in my data_context_config, checkpoint_config? Any help would be greatly appreciated ~~

Detailed Info:
we are using
airflow-provider-great-expectations==0.2.0

datasource_config:

sf_url = f'snowflake://{username}:{password}@{account}.{region}/{database}/{schema}?warehouse={warehouse}&role={role}&application=great_expectations_oss'

sf_datasource_config = {
        "class_name": "Datasource",
        "module_name": "great_expectations.datasource",
        "execution_engine": {
            "class_name": "SqlAlchemyExecutionEngine",
            "connection_string": sf_url,
        },
        "data_connectors": {
            "default_runtime_data_connector_name": {
                "class_name": "RuntimeDataConnector",
                "batch_identifiers": ["default_identifier_name"],
            },
            "default_inferred_data_connector_name": {
                "class_name": "InferredAssetSqlDataConnector",
                "include_schema_name": True,
                "included_tables": f"{schema}.test_sf_table".lower()
            },
        },
    }

data_context_config:

base_path = Path(__file__).parents[3]
ge_root_dir = os.path.join(base_path, "include", "great_expectations")
snowflake_data_context_config = DataContextConfig(
    **{
        "config_version": 3.0,
        "datasources": {
            "my_snowflake_datasource": sf_datasource_config
        },
        "stores": {
            "expectations_store": {
                "class_name": "ExpectationsStore",
                "store_backend": {
                    "class_name": "TupleFilesystemStoreBackend",
                    "base_directory": os.path.join(ge_root_dir, "expectations"),
                },
            },
            "validations_store": {
                "class_name": "ValidationsStore",
                "store_backend": {
                    "class_name": "TupleFilesystemStoreBackend",
                    "base_directory": os.path.join(
                        ge_root_dir, "uncommitted", "validations"
                    ),
                },
            },
            "evaluation_parameter_store": {"class_name": "EvaluationParameterStore"},
            "checkpoint_store": {
                "class_name": "CheckpointStore",
                "store_backend": {
                    "class_name": "TupleFilesystemStoreBackend",
                    "suppress_store_backend_id": True,
                    "base_directory": os.path.join(ge_root_dir, "checkpoints"),
                },
            },
        },
        "expectations_store_name": "expectations_store",
        "validations_store_name": "validations_store",
        "evaluation_parameter_store_name": "evaluation_parameter_store",
        "checkpoint_store_name": "checkpoint_store",
        "data_docs_sites": {
            "local_site": {
                "class_name": "SiteBuilder",
                "show_how_to_buttons": True,
                "store_backend": {
                    "class_name": "TupleFilesystemStoreBackend",
                    "base_directory": os.path.join(
                        ge_root_dir, "uncommitted", "data_docs", "local_site"
                    ),
                },
                "site_index_builder": {"class_name": "DefaultSiteIndexBuilder"},
            }
        },
        "anonymous_usage_statistics": {
            "data_context_id": "abcdabcd-1111-2222-3333-abcdabcdabcd",
            "enabled": True,
        },
        "notebooks": None,
        "concurrency": {"enabled": False},
    }
)

checkpoint_config:

snowflake_checkpoint_config = CheckpointConfig(
    **{
        "name": "test_sf_checkpoint",
        "config_version": 1.0,
        "template_name": None,
        "module_name": "great_expectations.checkpoint",
        "class_name": "Checkpoint",
        "run_name_template": "%Y%m%d-%H%M%S-test-sf-checkpoint",
        "expectation_suite_name": "sf_test.demo",
        "action_list": [
            {
                "name": "store_validation_result",
                "action": {"class_name": "StoreValidationResultAction"},
            },
            {
                "name": "store_evaluation_params",
                "action": {"class_name": "StoreEvaluationParametersAction"},
            },
            {
                "name": "update_data_docs",
                "action": {"class_name": "UpdateDataDocsAction", "site_names": []},
            },
        ],
        "evaluation_parameters": {},
        "runtime_configuration": {},
        "validations": [
            {
                "batch_request": {
                    "datasource_name": "my_snowflake_datasource",
                    "data_connector_name": "default_inferred_data_connector_name",
                    "data_asset_name": "test_sf_table".lower(),
                    "data_connector_query": {"index": -1},
                },
            }
        ],
        "profilers": [],
        "ge_cloud_id": None,
        "expectation_suite_ge_cloud_id": None,
    }
)

operator:

ge_snowflake_validation = GreatExpectationsOperator(
    task_id="test_snowflake_validation",
    data_context_config=snowflake_data_context_config,
    checkpoint_config=snowflake_checkpoint_config
)

SQL Alchemy dependencies issue

Hi,
We have a dependency conflict when using the provider package.

Starting from Airflow 2.3.3, the Airflow constraints file requires Sql Alchemy 1.4.27 while the great expectations provider requires a version earlier than 1.4.10.

Can someone please advice? Is this requirement planned to be changed?
Thanks a lot!

Error running GreatExpectationsOperator

Describe the bug
When running a great_expectation suite on Google Cloud Composer using BigQuery as a datasource, I'm getting the follwing error:

[2021-07-23 17:40:24,866] {xcom.py:237} ERROR - Could not serialize the XCom value into JSON. If you are using pickles instead of JSON for XCom, then you need to enable pickle support for XCom in your airflow config.
[2021-07-23 17:40:24,866] {taskinstance.py:1457} ERROR - Object of type ValidationOperatorResult is not JSON serializable
Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1113, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1287, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1320, in _execute_task
    self.xcom_push(key=XCOM_RETURN_KEY, value=result)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 65, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1896, in xcom_push
    XCom.set(
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 62, in wrapper
    return func(*args, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/xcom.py", line 88, in set
    value = XCom.serialize_value(value)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/xcom.py", line 235, in serialize_value
    return json.dumps(value).encode('UTF-8')
  File "/opt/python3.8/lib/python3.8/json/__init__.py", line 231, in dumps
    return _default_encoder.encode(obj)
  File "/opt/python3.8/lib/python3.8/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/opt/python3.8/lib/python3.8/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/opt/python3.8/lib/python3.8/json/encoder.py", line 179, in default
    raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type ValidationOperatorResult is not JSON serializable

To Reproduce
Steps to reproduce the behavior:

  1. Dag File
  2. GE config file
  3. GE suite

Expected behavior
It should run and push the result to XComs

Environment (please complete the following information):

  • Cloud Composer Verison: 1.17.0
  • Airflow Version: 2.0.2
  • Great Expectations Version: 0.13.22

Additional context
If I set do_xcom_push to False it works but I can't fetch the results after execution.

GreatExpectationsOperator Performance Issue

Is your feature request related to a problem? Please describe.
The GreatExpectationsOperator for Airflow currently instantiates a DataContext in the __init__() method, which gets run every time the task is parsed (which is frequently), slowing down the DAG it is a part of.

Describe the solution you'd like
Move instantiation of DataContext and Checkpoint to the execute() method of the Operator.

I will open a PR on this shortly, feel free to assign to me.

Discussion: Version Release Dates

Hi @denimalpaca @kaxil, thanks again for approving the PR and maintaining this operator. We are actively spiking and trying to onboard great_expectations airflow operator into our DQ system. And we are happy to keep contributing to this fantastic library. We are wondering do we have any rules for the new version release? For example, how often do we release a new version/tag? How are we managing the version number aligned with the great_expectation python library?

Thanks,

Triggering LegacyCheckpoint without 'validation_operator_name' from GreatExpectationsOperator

@talagluck as I wrote on the Slack channel,
I'm trying to trigger a Slack notification with the GreatExpectationsOperator.
It necessary to use the validation_operator_name to trigger such notification and it is not possible with the current configuration:
https://github.com/great-expectations/airflow-provider-great-expectations/blob/e92a613db94aa872957fa8626f6d5e71690fe421/great_expectations_provider/operators/great_expectations.py#L130

Passing value_set through Evaluation Parameters crashes if list size is more than 100

Dialect Used: Snowflake
Expectation Used : expect_column_values_to_be_in_set

Example of expectation:

"expectations": [
    {
      "expectation_type": "expect_column_values_to_be_in_set",
      "kwargs": {
        "column": "column_A",
        "value_set": {
          "$PARAMETER": "valid_ids"
        }
      },
      "meta": {
        "id": "test_unknown_ids"
      }
    }
  ],

Problem:
If the size of list passed through evaluation parameters(Not loading params from db, loading them from python list) to this expectation crosses 100, it breaks the underlying json.

Error Message:

sqlalchemy.exc.DataError: (psycopg2.errors.InvalidTextRepresentation) invalid input syntax for type json
LINE 1: ...rendered_task_instance_fields SET rendered_fields='{"run_nam...
                                                             ^
DETAIL:  Token "Infinity" is invalid.
CONTEXT:  JSON data, line 1: ...6e5", 150641421, "e6096a21", "e6093099", Infinity...

Question:
Is it a bug that expectation crashes on list size more than 100 or is it desired behaviour?
I have verified that it only crashes when I pass 100+ size value_set through evaluation params, expectation works fine if I add the list of 100+ size directly into expectation json. So I think it must be something related to handling of big json objects by operator.

Feature Request: pass parameters from Airflow to GE Checkpoint

We need to run a GE checkpoint from Airflow.
Checkpoint is based on SQL query.
SQL query must get values for its parameters from Airflow - e.g. a datamart should be checked for DQ for particular date and region after that date and region were refreshed from another Airflow task.

Part of checkpoint.yml looks like:

validations:
  - batch_request:
      datasource_name: snowflake
      data_connector_name: default_runtime_data_connector_name
      data_asset_name: db1.table1
      runtime_parameters:
        query: "SELECT *
        	from db1.table1
	        WHERE fld1 > $DATE_PARAM_FROM_AIRFLOW and fld2 = $REGION_PARAM_FROM_AIRFLOW
"

How to do it properly with GreatExpectationsOperator?

Looks like it can't pass parameters only,
while query_to_validate or checkpoint_config will break unit tests (you will need airflow to test your checkpoint!)

Workaround: use environment variables.

Thanks!

GreatExpectationsOperator is overriding database name with schema

hello,

Usecase: Database name is different from schema names and multiple schemas are available..

  1. Trying to connect to redshift db name: main (using conn_id and database name is passed in airflow UI) is failing when schema= "login" or data_asset_name="schema.table_name" is passed as parameter to GreatExpectationsOperator

Error: connection failed for database login.

  1. If schema is not passed as parameter or with data_asset_name then seeing

InvalidSchemaName: schema "main" does not exist

Screenshot 2023-04-14 at 2 22 21 PM

Can't use `checkpoint_kwargs` with `conn_id`

When I add checkpoint_kwargs to my code it error. But if I delete checkpoint_kwargs the task can run success.
This is my code:

GreatExpectationsOperator(
        task_id="task_1234",
        data_context_root_dir="/opt/airflow/plugins/gx/",
        conn_id="postgres",
        schema="public",
        data_asset_name="table",
        query_to_validate=query_sql,
        expectation_suite_name="sample_suite",
        checkpoint_kwargs={
            "action_list":[],
        },
        return_json_dict=True,
        dag=dag,
    )

ERROR:

File "/home/airflow/.local/lib/python3.12/site-packages/great_expectations/checkpoint/checkpoint.py", line 282, in run
    raise gx_exceptions.CheckpointError(
great_expectations.exceptions.exceptions.CheckpointError: Checkpoint "table.sample_suite.chk" must be called with a validator or contain either a batch_request or validations.

Import and instrumenting the GreatExpectationsOperator has unintended side effects causing Tasks to go into weird states

Bug

Example log line: [2021-04-19 14:52:15,487] {taskinstance.py:1150} ERROR - Executor reports task instance <TaskInstance: {dag_name}.{task_name} 2021-04-18 07:00:00+00:00 [queued]> finished (failed) although the task says its queued. Was the task killed externally?

I have a DAG with ~40 tasks. I recently added GreatExpectations to it. As soon as I introduced this to my DAG weird things occurred with my tasks not properly QUEUEing and being marked failed before even going into RUNNING state. As soon as I remove from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator my DAG successfully runs without experiencing any of these side effects

Expected Behaviour

Using the GreatExpectationsOperator has no unintended consequences

Environment

Astronomer Cloud and locally in docker-compose
Executor: Kubernetes Executor
Airflow: 1.10.12
Docker Image: quay.io/astronomer/ap-airflow:1.10.12-alpine3.10-onbuild

Airflow deployment - Using GreatExpectationsOperator: `great_expectations.data_context.store` does not contain the class: `ProfilerStore`

Hi Guys,
currently i have an Error when I want to use the official Airflow Operator for Great Expectations.

My packages:
airflow-provider-great-expectations==0.1.4
great-expectations==0.15.17
apache-airflow=2.1.4

Bildschirmfoto 2022-08-05 um 15 21 27

---> [2022-08-05 12:49:47,094] {util.py:153} CRITICAL - Error The module: great_expectations.data_context.store does not contain the class: ProfilerStore.

Can you please help ?

Test failure due to `apache-airflow-providers-snowflake` 4.3.1

Hey there! I'm one of the core devs from the GX team and wanted to reach out to highlight a test failure we've been seeing in our CI - tests/operators/test_great_expectations.py::test_great_expectations_operator__make_connection_string_snowflake_pkey

As part of our CI, we clone this repository, install dependencies, and run the full suite. I've done some debugging and have come to the conclusion that this is a result of the new 4.3.1 release; testing on 4.3.0 passes all tests.

Here's where it fails:

# test_great_expectations_operator__make_connection_string_snowflake_pkey
>       assert operator.make_connection_configuration() == test_conn_conf

tests/operators/test_great_expectations.py:911: 

Here's the specific part of the snowflake package that fails:

        elif private_key_file:
            private_key_file_path = Path(private_key_file)
            if not private_key_file_path.is_file() or private_key_file_path.stat().st_size == 0:
>               raise ValueError("The private_key_file path points to an empty or invalid file.")
E               ValueError: The private_key_file path points to an empty or invalid file.

/opt/hostedtoolcache/Python/3.8.17/x64/lib/python3.8/site-packages/airflow/providers/snowflake/hooks/snowflake.py:253: ValueError

Cannot run validation on BigQuery

Hello!

I'm trying to execute Great Expectations in my airflow instance to run some validations in my BigQuery tables. There's not much content about this on the internet, and I even found some references to an old GreatExpectationsBigQueryOperator.

I've setup my connection on Airfow as a Google Cloud connection and it works for other things (such as querying data), but on the GreatExpectationsOperator I get this error below:

ERROR - Failed to execute job 118 for task gx_validate_products (Conn type: google_cloud_platform is not supported.; 425)

Here's my code:

 gx_validate_products = GreatExpectationsOperator(
        task_id="gx_validate_products",
        conn_id="gcp_connection",
        data_context_root_dir="great_expectations",
        data_asset_name="sample_ecommerce.products",
        expectation_suite_name="raw_products",
        return_json_dict=True,
    )

Feature request: Way to pass in schema name to overwrite the schema pulled from the conn_id

Hi :)

I've been exploring how to use the building the datasource from a provided conn_id-feature and ran into a small issue:
Our dev (and prod) environment is set up with Snowflake tables located in different schemas, something like:

dev_db
- schema_1
-- table_1
-- table_2
- schema_2
-- table_3
-- table_4
etc

There is currently only one snowflake connection in the dev environment which has its schema field left empty. Of course I (or rather the deployment admin) could create one additional snowflake connection per schema but this does not seem ideal or scaleable.

Because of this I've been trying to find a way to change the schema of the datasource created when passing in a conn_id but I could not find a way to do so e.g. via a data_contex_config with the intended schema.

When trying to leave the schema in the Airflow connection blank and passing in the schema name (TAMARAFINGERLIN) with the table name (CLUSTERS) to data_asset_name like this:

t1 = GreatExpectationsOperator(
        task_id="t1",
        data_asset_name="TAMARAFINGERLIN.CLUSTERS",
        conn_id="{{conn.galaxy_snowflake_etl}}",
        data_context_root_dir=ge_root_dir,
        expectation_suite_name="CLUSTERS",
    )

the following error happens:

[2022-12-01T19:11:34.849+0000] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.9/site-packages/snowflake/connector/cursor.py", line 803, in execute
    Error.errorhandler_wrapper(self.connection, self, error_class, errvalue)
  File "/usr/local/lib/python3.9/site-packages/snowflake/connector/errors.py", line 275, in errorhandler_wrapper
    handed_over = Error.hand_to_other_handler(
  File "/usr/local/lib/python3.9/site-packages/snowflake/connector/errors.py", line 330, in hand_to_other_handler
    cursor.errorhandler(connection, cursor, error_class, error_value)
  File "/usr/local/lib/python3.9/site-packages/snowflake/connector/errors.py", line 209, in default_errorhandler
    raise error_class(
snowflake.connector.errors.ProgrammingError: 090106 (22000): 01a8aedf-0506-3740-0000-6821196f772e: Cannot perform CREATE TEMPTABLE. This session does not have a current schema. Call 'USE SCHEMA', or use a qualified name.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/great_expectations_provider/operators/great_expectations.py", line 521, in execute
    result = self.checkpoint.run(batch_request=self.batch_request)
  File "/usr/local/lib/python3.9/site-packages/great_expectations/core/usage_statistics/usage_statistics.py", line 304, in usage_statistics_wrapped_method
    result = func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/great_expectations/checkpoint/checkpoint.py", line 194, in run
    self._run_validation(
  File "/usr/local/lib/python3.9/site-packages/great_expectations/checkpoint/checkpoint.py", line 339, in _run_validation
    validator: Validator = self.data_context.get_validator(
  File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/data_context/abstract_data_context.py", line 1481, in get_validator
    self.get_batch_list(
  File "/usr/local/lib/python3.9/site-packages/great_expectations/core/usage_statistics/usage_statistics.py", line 304, in usage_statistics_wrapped_method
    result = func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/data_context/abstract_data_context.py", line 1666, in get_batch_list
    return datasource.get_batch_list_from_batch_request(batch_request=batch_request)
  File "/usr/local/lib/python3.9/site-packages/great_expectations/datasource/new_datasource.py", line 205, in get_batch_list_from_batch_request
    ) = data_connector.get_batch_data_and_metadata(
  File "/usr/local/lib/python3.9/site-packages/great_expectations/datasource/data_connector/data_connector.py", line 116, in get_batch_data_and_metadata
    batch_data, batch_markers = self._execution_engine.get_batch_data_and_markers(
  File "/usr/local/lib/python3.9/site-packages/great_expectations/execution_engine/sqlalchemy_execution_engine.py", line 1243, in get_batch_data_and_markers
    batch_data = SqlAlchemyBatchData(
  File "/usr/local/lib/python3.9/site-packages/great_expectations/execution_engine/sqlalchemy_batch_data.py", line 161, in __init__
    self._create_temporary_table(
  File "/usr/local/lib/python3.9/site-packages/great_expectations/execution_engine/sqlalchemy_batch_data.py", line 295, in _create_temporary_table
    self._engine.execute(stmt)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1274, in execute
    return self._exec_driver_sql(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1578, in _exec_driver_sql
    ret = self._execute_context(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1845, in _execute_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2026, in _handle_dbapi_exception
    util.raise_(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
    raise exception
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.9/site-packages/snowflake/connector/cursor.py", line 803, in execute
    Error.errorhandler_wrapper(self.connection, self, error_class, errvalue)
  File "/usr/local/lib/python3.9/site-packages/snowflake/connector/errors.py", line 275, in errorhandler_wrapper
    handed_over = Error.hand_to_other_handler(
  File "/usr/local/lib/python3.9/site-packages/snowflake/connector/errors.py", line 330, in hand_to_other_handler
    cursor.errorhandler(connection, cursor, error_class, error_value)
  File "/usr/local/lib/python3.9/site-packages/snowflake/connector/errors.py", line 209, in default_errorhandler
    raise error_class(
sqlalchemy.exc.ProgrammingError: (snowflake.connector.errors.ProgrammingError) 090106 (22000): 01a8aedf-0506-3740-0000-6821196f772e: Cannot perform CREATE TEMPTABLE. This session does not have a current schema. Call 'USE SCHEMA', or use a qualified name.
[SQL: CREATE OR REPLACE TEMPORARY TABLE ge_temp_7a4f3e73 AS SELECT * 
FROM "TAMARAFINGERLIN.CLUSTERS" 
WHERE true]
(Background on this error at: https://sqlalche.me/e/14/f405)
[2022-12-01T19:11:34.888+0000] {taskinstance.py:1401} INFO - Marking task as FAILED. dag_id=GE_TEST, task_id=t1, execution_date=20221201T191130, start_date=20221201T191131, end_date=20221201T191134

I was wondering if there could be a way to override some parameters of the connection provided via the conn_id. This would allow me to map the operator over sets of schema names, tables names and expectation suite names without having to have admin access to the deployment or manually create a lot of connections. :)

cc @denimalpaca

Feature Request: run EXPERIMENTAL expectation (from great_expectations_experimental library) from Airflow?

Hi!
Is it possible to run
1) EXPERIMENTAL expectation (from great_expectations_experimental library) from Airflow?

example: expect_queried_column_values_to_exist_in_second_table_column

Simple import to DAG does not help:

from great_expectations_experimental.expectations.expect_queried_column_values_to_exist_in_second_table_column import ExpectQueriedColumnValuesToExistInSecondTableColumn

  • after DAG run getting this text in DataDocs instead of the expectation result:

expect_queried_column_values_to_exist_in_second_table_column(**{'batch_id': '0120cd462e58ed32be35bc92c0ae', 'template_dict': {'condition': '1=1', 'first_table_column': 'PROV_ID', 'second_table_column': 'PROV_ID', 'second_table_full_name': 'LINC'}}) (edited)

2) a custom expectation from great_expectations/plugins/expectations folder?
could it be run from Airflow? how?
https://docs.greatexpectations.io/docs/guides/expectations/creating_custom_expectations/how_to_use_custom_expectations/ (edited)

Build data_context object in `__init__()` and not in `execute` method

Right now, the self.data_context object is initialized within the execute method of the airflow BaseOperator.

This is done in:

However, this makes impossible to interact with the data context before or after the execution.

If this self.data_context is initiated in the __init__() method, the user could interact with this object in the pre_execute() or post_execute() methods of airflow BaseOperator.

A possible use case, for example, is to add ExpectationsSuites on runtime using an InMemoryStoreBackend Expectation store?

    def pre_execute(self, context: Any):
    """
    Create and add an expectation suite to the in-memory DataContext.
    """
        suite = self.data_context.create_expectation_suite(suite_name=suite_name, overwrite_existing=True)
        
        # Add expectations
        # Here we'll add a simple expectation as an example
        suite.add_expectation(
            expectation_type="expect_table_row_count_to_be_between",
            kwargs={
                "min_value": 1,
                "max_value": 1000000
            }
        )

        # Save the suite to the DataContext's in-memory expectations store
       self.data_context.save_expectation_suite(suite)

Error in action StoreValidationResultAction when checkpoint called via GreatExpectationsOperator with "include_unexpected_rows": True

GE version : 0.15.0, 0.15.1

I am calling the GreatExpectationsOperator from one of my airflow dags, where I am passing a checkpoint, which has runtime configuration as below :
runtime_configuration: {"result_format": {"result_format": "SUMMARY","include_unexpected_rows": True}}

Since the unexpected rows are also being pulled, the validation result is failing to store it in local disk. My action list configuration in checkpoint is as below :
_action_list:

  • name: store_validation_result
    action:
    class_name: StoreValidationResultAction
  • name: store_evaluation_params
    action:
    class_name: StoreEvaluationParametersAction
  • name: update_data_docs
    action:
    class_name: UpdateDataDocsAction_

There is no problem when the include_unexpected_rows is toggled to False.

The error Trace is as mentioned below :
_[2022-04-21 14:23:59,760] {cursor.py:720} INFO - query execution done
[2022-04-21 14:24:00,136] {validator.py:1646} INFO - 1 expectation(s) included in expectation_suite.
[2022-04-21 14:24:00,477] {logging_mixin.py:109} WARNING -
Calculating Metrics: 0%| | 0/12 [00:00<?, ?it/s]
[2022-04-21 14:24:00,604] {cursor.py:696} INFO - query: [SHOW /* sqlalchemy:_get_schema_primary_keys /PRIMARY KEYS IN SCHEMA sample_db.pub...]
[2022-04-21 14:24:02,266] {cursor.py:720} INFO - query execution done
[2022-04-21 14:24:02,370] {cursor.py:696} INFO - query: [SELECT /
sqlalchemy:_get_schema_columns / ic.table_name, ic.column_name, ic.da...]
[2022-04-21 14:24:03,873] {cursor.py:720} INFO - query execution done
[2022-04-21 14:24:09,438] {logging_mixin.py:109} WARNING -
Calculating Metrics: 17%|#6 | 2/12 [00:08<00:44, 4.43s/it]
[2022-04-21 14:24:09,465] {cursor.py:696} INFO - query: [SELECT count(
) AS "table.row_count" FROM ge_temp_284c970f]
[2022-04-21 14:24:09,967] {cursor.py:720} INFO - query execution done
[2022-04-21 14:24:10,178] {logging_mixin.py:109} WARNING -
Calculating Metrics: 33%|###3 | 4/12 [00:09<00:16, 2.04s/it]
[2022-04-21 14:24:10,237] {cursor.py:696} INFO - query: [SELECT c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegme...]
[2022-04-21 14:24:10,782] {cursor.py:720} INFO - query execution done
[2022-04-21 14:24:11,241] {cursor.py:696} INFO - query: [SELECT c_nationkey AS unexpected_values FROM ge_temp_284c970f WHERE c_nationkey ...]
[2022-04-21 14:24:11,770] {cursor.py:720} INFO - query execution done
[2022-04-21 14:24:13,527] {logging_mixin.py:109} WARNING -
Calculating Metrics: 83%|########3 | 10/12 [00:12<00:01, 1.03it/s]
[2022-04-21 14:24:13,641] {cursor.py:696} INFO - query: [SELECT sum(CASE WHEN (c_nationkey IS NULL) THEN 1 ELSE 0 END) AS "column_values....]
[2022-04-21 14:24:14,463] {cursor.py:720} INFO - query execution done
[2022-04-21 14:24:15,786] {logging_mixin.py:109} WARNING -
Calculating Metrics: 100%|##########| 12/12 [00:15<00:00, 1.00s/it]
[2022-04-21 14:24:15,909] {logging_mixin.py:109} WARNING -
Calculating Metrics: 100%|##########| 12/12 [00:15<00:00, 1.28s/it]
[2022-04-21 14:24:15,912] {logging_mixin.py:109} WARNING -
[2022-04-21 14:24:16,255] {validation_operators.py:465} ERROR - Error running action with name store_validation_result
Traceback (most recent call last):
File "/root/.local/lib/python3.6/site-packages/great_expectations/validation_operators/validation_operators.py", line 453, in _run_actions
checkpoint_identifier=checkpoint_identifier,
File "/root/.local/lib/python3.6/site-packages/great_expectations/checkpoint/actions.py", line 78, in run
**kwargs,
File "/root/.local/lib/python3.6/site-packages/great_expectations/checkpoint/actions.py", line 830, in _run
expectation_suite_id=expectation_suite_ge_cloud_id,
File "/root/.local/lib/python3.6/site-packages/great_expectations/data_context/store/store.py", line 161, in set
self.key_to_tuple(key), self.serialize(key, value), **kwargs
File "/root/.local/lib/python3.6/site-packages/great_expectations/data_context/store/validations_store.py", line 168, in serialize
value, indent=2, sort_keys=True
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 584, in dumps
serialized = self.dump(obj, many=many)
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 560, in dump
result = self._serialize(processed_obj, many=many)
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 524, in _serialize
value = field_obj.serialize(attr_name, obj, accessor=self.get_attribute)
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/fields.py", line 309, in serialize
return self._serialize(value, attr, obj, **kwargs)
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/fields.py", line 697, in _serialize
return [self.inner._serialize(each, attr, obj, **kwargs) for each in value]
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/fields.py", line 697, in
return [self.inner._serialize(each, attr, obj, **kwargs) for each in value]
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/fields.py", line 564, in _serialize
return schema.dump(nested_obj, many=many)
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 555, in dump
PRE_DUMP, obj, many=many, original_data=obj
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 1075, in _invoke_dump_processors
tag, pass_many=False, data=data, many=many, original_data=original_data
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 1231, in _invoke_processors
data = processor(data, many=many, **kwargs)
File "/root/.local/lib/python3.6/site-packages/great_expectations/core/expectation_validation_result.py", line 253, in convert_result_to_serializable
data.result = convert_to_json_serializable(data.result)
File "/root/.local/lib/python3.6/site-packages/great_expectations/core/util.py", line 161, in convert_to_json_serializable
new_dict[str(key)] = convert_to_json_serializable(data[key])
File "/root/.local/lib/python3.6/site-packages/great_expectations/core/util.py", line 168, in convert_to_json_serializable
new_list.append(convert_to_json_serializable(val))
File "/root/.local/lib/python3.6/site-packages/great_expectations/core/util.py", line 257, in convert_to_json_serializable
f"{str(data)} is of type {type(data).name} which cannot be serialized."
TypeError: (891097, 'Customer#0007', 'eRL', 21, '31-843-843', Decimal('50.04'), 'FUTURE', 'some junk string for testing') is of type RowProxy which cannot be serialized.
[2022-04-21 14:24:16,838] {taskinstance.py:1463} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1165, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1283, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1313, in _execute_task
result = task_copy.execute(context=context)
File "/root/.local/lib/python3.6/site-packages/great_expectations_provider/operators/great_expectations.py", line 160, in execute
result = self.checkpoint.run()
File "/root/.local/lib/python3.6/site-packages/great_expectations/core/usage_statistics/usage_statistics.py", line 287, in usage_statistics_wrapped_method
result = func(*args, **kwargs)
File "/root/.local/lib/python3.6/site-packages/great_expectations/checkpoint/checkpoint.py", line 167, in run
validation_dict=validation_dict,
File "/root/.local/lib/python3.6/site-packages/great_expectations/checkpoint/checkpoint.py", line 367, in _run_validation
**operator_run_kwargs,
File "/root/.local/lib/python3.6/site-packages/great_expectations/core/async_executor.py", line 100, in submit
return AsyncResult(value=fn(*args, **kwargs))
File "/root/.local/lib/python3.6/site-packages/great_expectations/validation_operators/validation_operators.py", line 392, in run
checkpoint_identifier=checkpoint_identifier,
File "/root/.local/lib/python3.6/site-packages/great_expectations/validation_operators/validation_operators.py", line 466, in _run_actions
raise e
File "/root/.local/lib/python3.6/site-packages/great_expectations/validation_operators/validation_operators.py", line 453, in _run_actions
checkpoint_identifier=checkpoint_identifier,
File "/root/.local/lib/python3.6/site-packages/great_expectations/checkpoint/actions.py", line 78, in run
**kwargs,
File "/root/.local/lib/python3.6/site-packages/great_expectations/checkpoint/actions.py", line 830, in _run
expectation_suite_id=expectation_suite_ge_cloud_id,
File "/root/.local/lib/python3.6/site-packages/great_expectations/data_context/store/store.py", line 161, in set
self.key_to_tuple(key), self.serialize(key, value), **kwargs
File "/root/.local/lib/python3.6/site-packages/great_expectations/data_context/store/validations_store.py", line 168, in serialize
value, indent=2, sort_keys=True
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 584, in dumps
serialized = self.dump(obj, many=many)
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 560, in dump
result = self._serialize(processed_obj, many=many)
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 524, in _serialize
value = field_obj.serialize(attr_name, obj, accessor=self.get_attribute)
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/fields.py", line 309, in serialize
return self._serialize(value, attr, obj, **kwargs)
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/fields.py", line 697, in _serialize
return [self.inner._serialize(each, attr, obj, **kwargs) for each in value]
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/fields.py", line 697, in
return [self.inner._serialize(each, attr, obj, **kwargs) for each in value]
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/fields.py", line 564, in _serialize
return schema.dump(nested_obj, many=many)
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 555, in dump
PRE_DUMP, obj, many=many, original_data=obj
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 1075, in _invoke_dump_processors
tag, pass_many=False, data=data, many=many, original_data=original_data
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 1231, in invoke_processors
data = processor(data, many=many, **kwargs)
File "/root/.local/lib/python3.6/site-packages/great_expectations/core/expectation_validation_result.py", line 253, in convert_result_to_serializable
data.result = convert_to_json_serializable(data.result)
File "/root/.local/lib/python3.6/site-packages/great_expectations/core/util.py", line 161, in convert_to_json_serializable
new_dict[str(key)] = convert_to_json_serializable(data[key])
File "/root/.local/lib/python3.6/site-packages/great_expectations/core/util.py", line 168, in convert_to_json_serializable
new_list.append(convert_to_json_serializable(val))
File "/root/.local/lib/python3.6/site-packages/great_expectations/core/util.py", line 257, in convert_to_json_serializable
f"{str(data)} is of type {type(data).name} which cannot be serialized."
TypeError: (891097, 'Customer#0007', 'eRL', 21, '31-843-843', Decimal('50.04'), 'FUTURE', 'some junk string for testing') is of type RowProxy which cannot be serialized.
[2022-04-21 14:24:17,362] {taskinstance.py:1513} INFO - Marking task as FAILED. dag_id=ge_test_01, task_id=validation_customer_table, execution_date=20220421T142326, start_date=20220421T142338, end_date=20220421T142417
[2022-04-21 14:24:22,341] {local_task_job.py:151} INFO - Task exited with return code 1
[2022-04-21 14:24:23,283] {local_task_job.py:261} INFO - 0 downstream tasks scheduled from follow-on schedule check

I think calling the bashoperator to run the checkpoint can be a workaround , but we want to avoid the bashoperator.

Thanks in advance !!

GreatExpectationsBigQueryOperator - tuple index out of range error

Hello team,

Iโ€™m receiving the below error when I try to use GreatExpectationsBigQueryOperator

[2021-05-20 00:23:39,025] {validation_operators.py:405} ERROR - Error running action with name update_data_docs
Traceback (most recent call last)
  File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/validation_operators/validation_operators.py", line 392, in _run_action
    payload=batch_actions_results
  File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/checkpoint/actions.py", line 62, in ru
    **kwargs
  File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/checkpoint/actions.py", line 939, in _ru
    validation_result_suite_identifier.expectation_suite_identifier
  File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/core/usage_statistics/usage_statistics.py", line 262, in usage_statistics_wrapped_metho
    result = func(*args, **kwargs
  File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/data_context/data_context.py", line 2304, in build_data_doc
    resource_identifiers, build_index=build_inde
  File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/render/renderer/site_builder.py", line 293, in buil
    site_section_builder.build(resource_identifiers=resource_identifiers
  File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/render/renderer/site_builder.py", line 388, in buil
    source_store_keys = self.source_store.list_keys(
  File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/data_context/store/store.py", line 142, in list_key
    return [self.tuple_to_key(key) for key in keys_without_store_backend_id
  File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/data_context/store/store.py", line 142, in <listcomp
    return [self.tuple_to_key(key) for key in keys_without_store_backend_id
  File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/data_context/store/store.py", line 110, in tuple_to_ke
    return self._key_class.from_tuple(tuple_
  File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/data_context/types/resource_identifiers.py", line 164, in from_tupl
    RunIdentifier.from_tuple((tuple_[-3], tuple_[-2]))
IndexError: tuple index out of rang

Here is my airflow task:

validate_output = GreatExpectationsBigQueryOperator(
    task_id='validate_output',
    gcp_project='<project_id>',
    #data_context=DC,
    gcs_bucket='<my_bucket_id>',
    gcs_datadocs_prefix='dags/playground/test_great_expectations/great_expectations',
    gcs_expectations_prefix='dags/playground/test_great_expectations/great_expectations/expectations',
    gcs_validations_prefix='dags/playground/test_great_expectations/great_expectations',
    expectation_suite_name='test_new_suite',
    bq_dataset_name='data_platform_staging',
    email_to='[email protected]',
    send_alert_email=False,
    bigquery_conn_id='bigquery-greatexpectations',
    table='project_id.test_dataset.test_table',
    dag=dag
)

Iโ€™m using the latest great-expectations & airflow-provider-great-expectations dependencies.
I'm using Version : 1.10.14+composer.

Thanks in advance!

Issue in connecting the GX operator with Snowflake data

I am currently using the following version:
airflow-provider-great-expectations==0.2.0

I am trying to run Great Expectations operator and pass the snowflake connection id and execute the query.

My code throws out the following error trace:

AIRFLOW_CTX_DAG_RUN_ID=manual__2023-01-30T10:18:21.654701+00:00
[2023-01-30, 10:18:36 UTC] {great_expectations.py:470} INFO - Running validation with Great Expectations...
[2023-01-30, 10:18:36 UTC] {great_expectations.py:472} INFO - Instantiating Data Context...
[2023-01-30, 10:18:36 UTC] {base.py:71} INFO - Using connection ID 'snowflake_conn' for task execution.
[2023-01-30, 10:18:36 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/great_expectations_provider/operators/great_expectations.py", line 474, in execute
    self.build_runtime_datasources()
  File "/usr/local/lib/python3.8/site-packages/great_expectations_provider/operators/great_expectations.py", line 368, in build_runtime_datasources
    self.build_runtime_sql_datasource_config_from_conn_id()
  File "/usr/local/lib/python3.8/site-packages/great_expectations_provider/operators/great_expectations.py", line 304, in build_runtime_sql_datasource_config_from_conn_id
    "connection_string": self.make_connection_string(),
  File "/usr/local/lib/python3.8/site-packages/great_expectations_provider/operators/great_expectations.py", line 251, in make_connection_string
    uri_string = f"snowflake://{self.conn.login}:{self.conn.password}@{self.conn.extra_dejson['extra__snowflake__account']}.{self.conn.extra_dejson['extra__snowflake__region']}/{self.conn.extra_dejson['extra__snowflake__database']}/{self.conn.schema}?warehouse={self.conn.extra_dejson['extra__snowflake__warehouse']}&role={self.conn.extra_dejson['extra__snowflake__role']}"  # noqa
KeyError: 'extra__snowflake__account'

On logging the conn in Airflow using the below code:

  @task
  def test_conn():
    from airflow.hooks.base import BaseHook
    conn = BaseHook.get_connection(SNOWFLAKE_CONN_ID)
    task_logger.info(f"connection info {conn.extra_dejson} %s")
Below is the information logged:
[2023-01-30, 10:18:28 UTC] {base.py:71} INFO - Using connection ID 'snowflake_conn' for task execution.
[2023-01-30, 10:18:28 UTC] {test_gx_snowflake.py:265} INFO - connection info {'account': 'rtb82372.us-east-1', 'insecure_mode': False, 'database': 'SFSALES_SFC_SAMPLES_VA3_SAMPLE_DATA', 'warehouse': 'XLARGEWH'} %s
[2023-01-30, 10:18:28 UTC] {python.py:177} INFO - Done. Returned value was: None

Reference issue at airflow side:
apache/airflow#26764

Add Trino support

Hi,
Would you consider adding Trino connection to the operator?

Apparently there is a TODO, but I don't know if it's in your foreseeable rodmap.

Thanks

GreatExpectationsBigQueryOperator is failing on Airflow

Describe the bug
Hi,
I have deployed Great Expectations with Google Cloud Composer. I've created an Airflow DAG to insert data into table from another table and then validate if the data is correct. Then I generated suite file using the scaffold example. I added all of GE files to GCP bucket. But I'm receiving this error: DataContextError: No validation operator 'action_list_operator' was found in your project. Please verify this in your great_expectations.yml. I'm sure that action_list_operator exists in my great_expectations.yml file.

Error message:
DataContextError: No validation operator 'action_list_operator' was found in your project. Please verify this in your great_expectations.yml. I'm sure that action_list_operator exists in my great_expectations.yml file.

How to reproduce:

  1. Installed pybigquery, great_expectations, airflow-provider-great-expectations on the composer environment.
  2. Followed this example with the same folder structure.

Environment Versions:
Airflow Version: 1.10.9+composer
Great Expectations Version: 0.13.11

I've also attached my DAG and great_expectations.yml files.

Workaround:
One workaround to fix this is that I've created my custom operator using this code and added action_list_operator part manually inside create_data_context_config function. But I'm receiving another error: IndexError: tuple index out of range

Workaround Error:
image

attached_files.zip

Thanks in advance!

Snowflake connection failure when only using the Airflow UI fields to set the connection

Hi!

I noticed that when only using the Airflow UI fields to set a connection the Extra JSON rendered has different keys than what the GXO is looking for.

Extra field automatically rendered from connection UI fields:
{"account": "myacc", "warehouse": "HUMANS", "database": "DWH_DEV", "region": "us-east-1", "role": "myrole", "insecure_mode": false}

The GXO is looking for extra__snowflake__account, extra__snowflake__region etc. I think this might have been how keys used to be rendered? Setting the connection with these keys adding extra__snowflake__ in front of the parameters everything works as expected.

Error trace:

[2023-01-05, 13:37:40 UTC] {great_expectations.py:454} INFO - Instantiating Data Context...
[2023-01-05, 13:37:40 UTC] {base.py:73} INFO - Using connection ID 'galaxy_snowflake_etl' for task execution.
[2023-01-05, 13:37:40 UTC] {taskinstance.py:1772} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/great_expectations_provider/operators/great_expectations.py", line 456, in execute
    self.build_runtime_datasources()
  File "/usr/local/lib/python3.9/site-packages/great_expectations_provider/operators/great_expectations.py", line 366, in build_runtime_datasources
    self.datasource = self.build_configured_sql_datasource_config_from_conn_id()
  File "/usr/local/lib/python3.9/site-packages/great_expectations_provider/operators/great_expectations.py", line 259, in build_configured_sql_datasource_config_from_conn_id
    conn_str = self.make_connection_string()
  File "/usr/local/lib/python3.9/site-packages/great_expectations_provider/operators/great_expectations.py", line 246, in make_connection_string
    uri_string = f"snowflake://{self.conn.login}:{self.conn.password}@{self.conn.extra_dejson['extra__snowflake__account']}.{self.conn.extra_dejson['extra__snowflake__region']}/{self.conn.extra_dejson['extra__snowflake__database']}/{schema}?warehouse={self.conn.extra_dejson['extra__snowflake__warehouse']}&role={self.conn.extra_dejson['extra__snowflake__role']}"  # noqa
KeyError: 'extra__snowflake__account'```

Snowflake Region should be optional

Currently gexp provider assumes a snowflake region will always be provided.

Some snowflake regions (ie. us-west-2) do not require (or allow) a region in the connection string. Additionally, some regions have multiple deployments and it becomes necessary to specify region and deployment. This is best accomplished by putting the fully-qualified name in the account field (ie. account.deployment.region.cloud). For this to work the region field must be empty.

Missing `template_fields` in operator

Is:


Should be:
template_fields = ('checkpoint_name', 'batch_kwargs', 'assets_to_validate')

Somebody removed the template_fields declaration for the operator, which basically makes the operator useless. Please fix that ASAP and COVER it with unit tests to prevent such situation from happening in the future.

Parallel GreatExpectationsOperator tasks corrupt great_expectations.yml

I have several GreatExpectationOperator tasks running concurrently. I would get an error often letting me know that the great_expectations.yml file could not be parsed. Inspecting the file, I noticed the file would normally have at the end of the file ult: false. It corresponded with the parsing error in the Airflow logs page of the task.

I suspect a race condition is happening when the great_expectations.yml file is generated due to needing to update the datasource section for a job. After changing GreatExpectationOperator tasks to run one at a time, the great_expectations.yml file has not been corrupted any longer.

Add operator extra link for Data Docs in the `GreatExpectationsOperator`

In Airflow, there is a feature which allows for external links to be present as buttons on the Task Instance modal in the Airflow UI called an operator extra link. This feature is really useful to directly navigate from the Airflow UI to an external site which may be for monitoring a third-party execution that Airflow just triggered, handy documentation, etc.

IMO linking to Data Docs directly from a GreatExpectationsOperator task would be really beneficial to users and increase the holistic visibility that's available at their fingertips.

Remove `apply_defaults` from `GreatExpectationsBigQueryOperator` (or remove explicit kwargs)

So apply_defaults throws an error if any args or kwargs which are explicitly mentioned in the function signature are empty (i.e. None). Since GreatExpectationsBigQueryOperator names expectation_suite_name as an positional arg (not variable, i.e. hidden in *args), it becomes mandatory to the function call.

The problem is when I want to use checkpoints, which have this check in the GreatExpectationsOperator:

# Check that only the correct args to validate are passed
# this doesn't cover the case where only one of expectation_suite_name or batch_kwargs is specified
# along with one of the others, but I'm ok with just giving precedence to the correct one
if sum(bool(x) for x in [(expectation_suite_name and batch_kwargs), assets_to_validate, checkpoint_name]) != 1:
    raise ValueError("Exactly one of expectation_suite_name + batch_kwargs, assets_to_validate, \
     or checkpoint_name is required to run validation.")

As a result, I am mandated by GreatExpectationsBigQueryOperator apply_defaults() call to have expectation_suite_name, but the GreatExpectationsOperator requires me to only name the checkpoint_name.

Without expectation_suite_name:

airflow.exceptions.AirflowException: Argument ['expectation_suite_name'] is required

With expectation_suite_name:

Traceback (most recent call last):
  File "dags/gfk/test_ge.py", line 54, in <module>
    dag=dag,
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/decorators.py", line 98, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/great_expectations_provider/operators/great_expectations_bigquery.py", line 150, in __init__
    **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/decorators.py", line 98, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/great_expectations_provider/operators/great_expectations.py", line 85, in __init__
    or checkpoint_name is required to run validation.")
ValueError: Exactly one of expectation_suite_name + batch_kwargs, assets_to_validate,              or checkpoint_name is required to run validation.

I wonder why it fails though, since the check seems to allow for one of the values (batch_kwargs or expectation_suite_name) to be set, but it does not work. I will investigate it bit further tomorrow.

In order to access the expectation_suite_name of the parent class, just put it back to the variable kwargs, call super() and access it via self, maybe? Or remove apply_defaults?

Versions

  • airflow==1.10.12
  • airflow-provider-great-expectations==0.0.4
  • great-expectations==0.13.19
  • Python 3.7.10

Connection to Athena via `conn_id`

When using the GreatExpectationsOperator with an Athena conn_id, I received the following from PyAthena: Unable to locate credentials.

I would like to know the best way to provide credentials, as this page here: GX docs - connect to data does not specify how to setup with just an airflow connection.
Should I be using credentials: in the great_expectations.yml file, with secrets injected from the config_variables.yml? Is there anyway to connect with only the conn_id via Airflow Connection?

Case-sensitive code for ODBC Driver extras

I'm using the ODBC Driver 18 for SQL Server and have it listed as an extra under "Driver:" in my Airflow connection - this would be correct if the source code for the method 'make_connection_configuration' in great_expectations.py didn't specify that the extra should be a lower-case "driver". As soon as I noticed this detail and changed it in my connection the GXOperator works perfectly. To prevent this problem from occuring the code should be changed to allow variations in writing of the "driver" extra.

...
 elif conn_type == "mssql":
                odbc_connector = "mssql+pyodbc"
                ms_driver = self.conn.extra_dejson.get("driver") or "ODBC Driver 17 for SQL Server"
                driver = f"?driver={ms_driver}"
                database_name = self.conn.schema or "master"
...

Expectations fail callback function

It seems to be nice to be able to log out specific expectations that failed, when the Operator fails. I am not sure this should be internalized, so instead, perhaps it makes sense to set a callback function on fail?

I have something as naive as this in mind:

def generate_list_failed_expectations(r) -> str:
    """Note - only for validation results

    Input:
        r (reat_expectations.validation_operators.types.validation_operator_result.ValidationOperatorResult)
    Returns: str
    """
    from itertools import groupby

    lines = ["Validation with Great Expectations failed."]

    for batch, result in r["run_results"].items():
        lines.append(f"{batch}")
        failed = [
            r.expectation_config
            for r in result["validation_result"].results
            if not r.success
        ]
        for col, group_failed in groupby(failed, key=lambda k: k["kwargs"]["column"]):
            lines.append(f"  {col}")
            for test in group_failed:
                lines.append(f'    - {test["expectation_type"]}')
    raise AirflowException("\n".join(lines))

collision with another protobuf version

Describe the bug:
DAG contating great_expecations opeartor in astronomer airflow environment - doesn't load.
Tried downgrading to protobuf==3.2.1, but have several google packages that require latest (4.2.x) protobuf version.

Steps to reproduce the behavior:
Restart astronomer airflow environment

Expected behavior:
DAG is succsesfully loaded

Environment (please complete the following information):

Operating System: Linux (docker on vm)
Great Expectations Version: 0.15.46

Additional context:

Broken DAG: [/usr/local/airflow/dags/great_expectations/great_expectations_athena.py] Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/google/cloud/secretmanager_v1/proto/resources_pb2.py", line 57, in <module>
    _descriptor.EnumValueDescriptor(
  File "/usr/local/lib/python3.9/site-packages/google/protobuf/descriptor.py", line 755, in __new__
    _message.Message._CheckCalledFromGeneratedFile()
TypeError: Descriptors cannot not be created directly.
If this call came from a _pb2.py file, your generated code is out of date and must be regenerated with protoc >= 3.19.0.
If you cannot immediately regenerate your protos, some other possible workarounds are:
 1. Downgrade the protobuf package to 3.20.x or lower.
 2. Set PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python (but this will use pure-Python parsing and will be much slower).

More information: https://developers.google.com/protocol-buffers/docs/news/2022-05-06#python-updates

TypeError: can't pickle _thread.lock objects when cleaning

This is most likely an issue with airflow itself (we use 1.10.9), but we didn't have any similar issue with it before - on clearing GE operator, whole system goes kabum with this log:

Node: airflow-airflow-web-6f4fbbfc58-cnx4j
-------------------------------------------------------------------------------
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 2446, in wsgi_app
    response = self.full_dispatch_request()
  File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1951, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1820, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/usr/local/lib/python3.7/site-packages/flask/_compat.py", line 39, in reraise
    raise value
  File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1949, in full_dispatch_request
    rv = self.dispatch_request()
  File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1935, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/usr/local/lib/python3.7/site-packages/flask_admin/base.py", line 69, in inner
    return self._run_view(f, *args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/flask_admin/base.py", line 368, in _run_view
    return fn(self, *args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/flask_login/utils.py", line 261, in decorated_view
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/www/utils.py", line 290, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/www/utils.py", line 337, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/www/views.py", line 1296, in clear
    include_upstream=upstream)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/dag.py", line 1211, in sub_dag
    for t in regex_match + also_include}
  File "/usr/local/lib/python3.7/site-packages/airflow/models/dag.py", line 1211, in <dictcomp>
    for t in regex_match + also_include}
  File "/usr/local/lib/python3.7/copy.py", line 161, in deepcopy
    y = copier(memo)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 681, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
  File "/usr/local/lib/python3.7/copy.py", line 180, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/usr/local/lib/python3.7/copy.py", line 280, in _reconstruct
    state = deepcopy(state, memo)
  File "/usr/local/lib/python3.7/copy.py", line 150, in deepcopy
    y = copier(x, memo)
  File "/usr/local/lib/python3.7/copy.py", line 240, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/lib/python3.7/copy.py", line 180, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/usr/local/lib/python3.7/copy.py", line 280, in _reconstruct
    state = deepcopy(state, memo)
  File "/usr/local/lib/python3.7/copy.py", line 150, in deepcopy
    y = copier(x, memo)
  File "/usr/local/lib/python3.7/copy.py", line 240, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/lib/python3.7/copy.py", line 180, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/usr/local/lib/python3.7/copy.py", line 280, in _reconstruct
    state = deepcopy(state, memo)
  File "/usr/local/lib/python3.7/copy.py", line 150, in deepcopy
    y = copier(x, memo)
  File "/usr/local/lib/python3.7/copy.py", line 240, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/lib/python3.7/copy.py", line 169, in deepcopy
    rv = reductor(4)
TypeError: can't pickle _thread.lock objects

Remove request to database from init method

The GreatExpecatationsBigQueryOperator makes a get_connection request in the init method of the class. This is problematic for a few reasons:

  • It causes a fatal DAG import error if the connection string is not defined.
  • It will consume unnecessary scheduler resources. Every time the Airflow scheduler parses a DAG, Airflow will execute whatever is contained in the init method of your class. If the init method makes requests to the database or over the network, it will place an unnecessary burden on the database.

The general pattern to follow is to have init store the conn_id string and build the hook via get_connection at runtime.

Error initializing a Checkpoint using CheckpointConfig

In great_expectations==0.14.4 the Checkpoint class does not have a module_name class attribute (https://github.com/great-expectations/great_expectations/blob/0.14.4/great_expectations/checkpoint/checkpoint.py#L74) however when initializing a Checkpoint on the Great Expectations Operator (https://github.com/great-expectations/airflow-provider-great-expectations/blob/main/great_expectations_provider/operators/great_expectations.py#L139) a Checkpoint is initialized with all the attributes of checkpoint_config.to_json_dict() but the CheckpointConfig has a module_name attribute so this causes an error when initializing the Checkpoint.

Stack Trace

  File "/usr/local/airflow/.local/lib/python3.7/site-packages/great_expectations_provider/operators/great_expectations.py", line 139, in __init__
    data_context=self.data_context, **self.checkpoint_config.to_json_dict()
TypeError: __init__() got an unexpected keyword argument 'module_name'

Expectation suite not found

Hi

I am getting the following error in the logs when trying to run my expectations via airflow.

great_expectations.exceptions.exceptions.DataContextError: expectation_suite accounts not found

import os
import airflow
from airflow import DAG
from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator


default_args = {
  'dir': '/project/great_expectations',
  'start_date': airflow.utils.dates.days_ago(1),
  'profiles_dir': '/home/airflow/.dbt/',
  'catchup': False
}


with DAG(dag_id='accounts_great_expectations', default_args=default_args, schedule_interval='@daily') as dag:
    accounts = GreatExpectationsOperator(
        task_id='accounts',
        expectation_suite_name='accounts',
        data_context_root_dir='/project/great_expectations',
        batch_kwargs={
            'table': 'raw_accounts',
            'datasource': 'fusioncell_local'
        },
        dag=dag
    )

  accounts_great_expectations

It can find the great_expectation.yml file fine though which in /project/great_expectations and my expectations are in a directory one below that called expectations??

Thanks

Operator won't work in 0.2.1

With the new 0.2.1 release of the Great Expectations Operator we've been getting the below error. We reverted to 0.2.0 and the error went away.

Error:

[2022-11-17, 01:58:38 UTC] {{abstractoperator.py:175}} ERROR - Failed to resolve template field 'data_context_root_dir'
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/models/abstractoperator.py", line 173, in resolve_template_files
    setattr(self, field, env.loader.get_source(env, content)[0])  # type: ignore
  File "/usr/local/lib/python3.9/site-packages/jinja2/loaders.py", line 218, in get_source
    raise TemplateNotFound(template)
jinja2.exceptions.TemplateNotFound: /usr/src/app/great_expectations

Code:

    ge_data_unit_test_op = GreatExpectationsOperator(
        task_id="test_package",
        data_context_root_dir=ge_root_dir,
        checkpoint_name=test_package,
        checkpoint_kwargs={
            "batch_request": {
                "datasource_name": TARGET_PROJECT_ID,
                "data_asset_name": data_asset_name,
            }
        },
    )

How to pass create_temp_table: False to SqlAlchemy?

When using great expectations with datasources from great_expectations.yml you can use

  create_temp_table: False

setting for SqlAlchemyExecutionEngine to prevent SqlAlchemy from issuing SQLs like this:

CREATE OR REPLACE TEMPORARY TABLE gx_temp_d5435 AS select ...

Question:
How to do the same if using airflow-provider-great-expectations with Airflow conn_id (which overrides datasources from great_expectations.yml)?

`GreatExpectationsOperator` fails when run validations agains `Athena` and work correct against `Redshift`

Overview

GreatExpectationsOperator fails when run validations agains Athena and work correct against Redshift

Conditions

docker-compose airflow
airflow==2.2.3
airflow-provider-great-expectations==0.1.3
great-expectations==0.14.7

Procedure

  • Created a DAG that run validations inside AWS environment with SqlAlchemyEngine.
  • Defined both datasources identically, just change the connection string / credentials.
  • Run the same operator with both same configurations.

Problems found

  • DataDoc on the slack notification, never shown.
    • Neither with the modification of notify_with: ["all" | "local_site" | None]
      image
  • Athena validations never executed correctly
"exception_info": {
  "exception_traceback": "Traceback (most recent call last):\n  File \"/home/***/.local/lib/python3.7/site-packages/great_expectations/execution_engine/execution_engine.py\", line 397, in resolve_metrics\n    new_resolved = self.resolve_metric_bundle(metric_fn_bundle)\n  File \"/home/***/.local/lib/python3.7/site-packages/great_expectations/execution_engine/sqlalchemy_execution_engine.py\", line 815, in resolve_metric_bundle\n    domain_kwargs=domain_kwargs,\n  File \"/home/***/.local/lib/python3.7/site-packages/great_expectations/execution_engine/sqlalchemy_execution_engine.py\", line 505, in get_domain_records\n    selectable = selectable.columns().subquery()\nAttributeError: 'TextAsFrom' object has no attribute 'subquery'\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n  File \"/home/***/.local/lib/python3.7/site-packages/great_expectations/validator/validator.py\", line 845, in resolve_validation_graph\n    runtime_configuration=runtime_configuration,\n  File \"/home/***/.local/lib/python3.7/site-packages/great_expectations/validator/validator.py\", line 1749, in _resolve_metrics\n    runtime_configuration=runtime_configuration,\n  File \"/home/***/.local/lib/python3.7/site-packages/great_expectations/execution_engine/execution_engine.py\", line 401, in resolve_metrics\n    message=str(e), failed_metrics=[x[0] for x in metric_fn_bundle]\ngreat_expectations.exceptions.exceptions.MetricResolutionError: 'TextAsFrom' object has no attribute 'subquery'\n",
  "exception_message": "'TextAsFrom' object has no attribute 'subquery'",
  "raised_exception": true
}

While `Redshift with the exact same configurations, runs OK.

Additional information for the issue

  • When we was developing these procedure with a Lambda, Athena RuntimeBatchRequest query need to be between ( ) to work.
"query": f"(select * from db.table limit 100)"
  • Before these last version of a DAG; we was creation a RuntimeBatchRequest by code, and pushing as a parameter for the checkpoint. This create the EXACTLY SAME ERROR as Athena now.

File reference

DAG gist file for reference (modified / deleted confidential information): dag_poc.py

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.