astronomer / airflow-provider-great-expectations Goto Github PK
View Code? Open in Web Editor NEWGreat Expectations Airflow operator
Home Page: http://greatexpectations.io
License: Apache License 2.0
Great Expectations Airflow operator
Home Page: http://greatexpectations.io
License: Apache License 2.0
I could not check if this has to do with Airflow 2.4.x, but I'm getting this error when using GreatExpectationsOperator
with CheckpointConfig
when using great-expectations>=0.14
:
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - Exception in thread Thread-1:
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - Traceback (most recent call last):
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - File "/usr/local/lib/python3.9/site-packages/great_expectations/types/__init__.py", line 209, in assert_valid_keys
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - _ = self[name]
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - File "/usr/local/lib/python3.9/site-packages/great_expectations/types/__init__.py", line 73, in __getitem__
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - return getattr(self, item)
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/types/base.py", line 151, in commented_map
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - return self._get_schema_validated_updated_commented_map()
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/types/base.py", line 125, in _get_schema_validated_updated_commented_map
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - commented_map: CommentedMap = copy.deepcopy(self._commented_map)
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - AttributeError: 'CheckpointConfig' object has no attribute '_commented_map'
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -
During handling of the above exception, another exception occurred:
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - Traceback (most recent call last):
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - File "/usr/local/lib/python3.9/site-packages/great_expectations/types/__init__.py", line 212, in assert_valid_keys
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - _ = self[f"_{name}"]
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - File "/usr/local/lib/python3.9/site-packages/great_expectations/types/__init__.py", line 73, in __getitem__
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - return getattr(self, item)
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - AttributeError: 'CheckpointConfig' object has no attribute '_commented_map'
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING -
During handling of the above exception, another exception occurred:
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - Traceback (most recent call last):
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - File "/usr/local/lib/python3.9/threading.py", line 980, in _bootstrap_inner
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - self.run()
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - File "/usr/local/lib/python3.9/threading.py", line 917, in run
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - self._target(*self._args, **self._kwargs)
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - File "/usr/local/lib/python3.9/site-packages/openlineage/airflow/listener.py", line 114, in on_running
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - **get_custom_facets(task, dagrun.external_trigger, task_instance_copy)
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - File "/usr/local/lib/python3.9/site-packages/openlineage/airflow/utils.py", line 265, in get_custom_facets
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - "airflow_version": AirflowVersionRunFacet.from_task(task),
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - File "/usr/local/lib/python3.9/site-packages/openlineage/airflow/facets.py", line 34, in from_task
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - to_json_encodable(task),
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - File "/usr/local/lib/python3.9/site-packages/openlineage/airflow/utils.py", line 101, in to_json_encodable
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - return json.loads(json.dumps(task.__dict__, default=_task_encoder))
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - File "/usr/local/lib/python3.9/json/__init__.py", line 234, in dumps
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - return cls(
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - File "/usr/local/lib/python3.9/json/encoder.py", line 199, in encode
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - chunks = self.iterencode(o, _one_shot=True)
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - File "/usr/local/lib/python3.9/json/encoder.py", line 257, in iterencode
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - return _iterencode(o, 0)
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - File "/usr/local/lib/python3.9/site-packages/openlineage/airflow/utils.py", line 99, in _task_encoder
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - return str(obj)
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/types/base.py", line 2883, in __str__
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - return self.__repr__()
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/types/base.py", line 2862, in __repr__
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - json_dict: dict = self.to_json_dict()
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/types/base.py", line 2850, in to_json_dict
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - dict_obj: dict = self.to_dict()
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - File "/usr/local/lib/python3.9/site-packages/great_expectations/types/__init__.py", line 140, in to_dict
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - for key in self.property_names(
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - File "/usr/local/lib/python3.9/site-packages/great_expectations/types/__init__.py", line 227, in property_names
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - assert_valid_keys(keys=exclude_keys, purpose="exclusion")
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - File "/usr/local/lib/python3.9/site-packages/great_expectations/types/__init__.py", line 214, in assert_valid_keys
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - raise ValueError(
[2022-10-07, 19:23:04 UTC] {logging_mixin.py:117} WARNING - ValueError: Property "commented_map", marked for exclusion on object "<class 'great_expectations.data_context.types.base.CheckpointConfig'>", does not exist.
I don't see this error with >=0.15, but with 0.14.x, I see this additional error:
[2022-10-07, 19:46:14 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/util.py", line 117, in instantiate_class_from_config
class_instance = class_(**config_with_defaults)
TypeError: __init__() got an unexpected keyword argument 'site_names'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/great_expectations_provider/operators/great_expectations.py", line 142, in execute
self.checkpoint = instantiate_class_from_config(
File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/util.py", line 121, in instantiate_class_from_config
class_name, format_dict_for_error_message(config_with_defaults)
File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/util.py", line 165, in format_dict_for_error_message
return "\n\t".join("\t\t".join((str(key), str(dict_[key]))) for key in dict_)
File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/util.py", line 165, in <genexpr>
return "\n\t".join("\t\t".join((str(key), str(dict_[key]))) for key in dict_)
File "/usr/local/lib/python3.9/site-packages/great_expectations/core/config_peer.py", line 80, in __repr__
return str(self.get_config())
File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/types/base.py", line 2130, in __str__
return self.__repr__()
File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/types/base.py", line 2109, in __repr__
json_dict: dict = self.to_sanitized_json_dict()
File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/types/base.py", line 2098, in to_sanitized_json_dict
serializeable_dict = self.to_json_dict()
File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/types/base.py", line 2088, in to_json_dict
serializeable_dict: dict = convert_to_json_serializable(data=dict_obj)
File "/usr/local/lib/python3.9/site-packages/great_expectations/core/util.py", line 161, in convert_to_json_serializable
new_dict[str(key)] = convert_to_json_serializable(data[key])
File "/usr/local/lib/python3.9/site-packages/great_expectations/core/util.py", line 161, in convert_to_json_serializable
new_dict[str(key)] = convert_to_json_serializable(data[key])
File "/usr/local/lib/python3.9/site-packages/great_expectations/core/util.py", line 256, in convert_to_json_serializable
raise TypeError(
TypeError: <great_expectations.data_context.types.base.DatasourceConfig object at 0x7f4009f096a0> is of type DatasourceConfig which cannot be serialized.
Current environment:
apache-airflow==2.4.1
airflow-provider-great-expectations==0.1.5
great-expectations>=0.14
I confirmed that the same DAG works with great-expectations==0.13.49
(this requires pinning jinja2>=3.0.0,<3.1.0
)
Schema should be added to template fields in order to use the connection jinja templating provided by airflow.
Parallel Execution of GX provider in Airflow randomly fails.
Here is the log:
*** * /mnt/airdrive/airflow/logs/dag_id=xxxx_pipeline_dag_v4/run_id=scheduled__2024-04-07T00:00:00+00:00/task_id=gx_validate_xxx_col_std_dev/attempt=1.log
[2024-04-08, 09:51:48 IST] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: xxxx_pipeline_dag_v4.gx_validate_xxx_col_std_dev scheduled__2024-04-07T00:00:00+00:00 [queued]>
[2024-04-08, 09:51:49 IST] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: xxxx_pipeline_dag_v4.gx_validate_xxx_col_std_dev scheduled__2024-04-07T00:00:00+00:00 [queued]>
[2024-04-08, 09:51:49 IST] {taskinstance.py:2193} INFO - Starting attempt 1 of 1
[2024-04-08, 09:51:49 IST] {taskinstance.py:2214} INFO - Executing <Task(GreatExpectationsOperator): gx_validate_xxx_col_std_dev> on 2024-04-07 00:00:00+00:00
[2024-04-08, 09:51:49 IST] {standard_task_runner.py:60} INFO - Started process 1111645 to run task
[2024-04-08, 09:51:49 IST] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'xxxx_pipeline_dag_v4', 'gx_validate_xxx_col_std_dev', 'scheduled__2024-04-07T00:00:00+00:00', '--job-id', '622', '--raw', '--subdir', 'DAGS_FOLDER/xxxxxxxx/xxxxx_pipeline_dag_v4.py', '--cfg-path', '/tmp/tmpn6gnlugt']
[2024-04-08, 09:51:49 IST] {standard_task_runner.py:88} INFO - Job 622: Subtask gx_validate_xxx_col_std_dev
[2024-04-08, 09:51:49 IST] {task_command.py:423} INFO - Running <TaskInstance: xxxx_pipeline_dag_v4.gx_validate_xxx_col_std_dev scheduled__2024-04-07T00:00:00+00:00 [running]> on host xxxx-datapipeline-***-xxxxx.xxxxdatapipelin.xxxxxxxx.com
[2024-04-08, 09:51:49 IST] {taskinstance.py:2510} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='xxxx_pipeline_dag_v4' AIRFLOW_CTX_TASK_ID='gx_validate_xxxx_exa_xxxx_col_std_dev' AIRFLOW_CTX_EXECUTION_DATE='2024-04-07T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-04-07T00:00:00+00:00'
[2024-04-08, 09:51:49 IST] {great_expectations.py:580} INFO - Running validation with Great Expectations...
[2024-04-08, 09:51:49 IST] {great_expectations.py:582} INFO - Instantiating Data Context...
[2024-04-08, 09:51:49 IST] {taskinstance.py:2728} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 444, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 414, in _execute_callable
return execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations_provider/operators/great_expectations.py", line 586, in execute
self.data_context = ge.data_context.FileDataContext(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/file_data_context.py", line 64, in __init__
self._scaffold_project()
File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/file_data_context.py", line 91, in _scaffold_project
if self.is_project_scaffolded(self._context_root_directory):
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/serializable_data_context.py", line 513, in is_project_scaffolded
and cls.config_variables_yml_exist(ge_dir)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/serializable_data_context.py", line 261, in config_variables_yml_exist
config_var_path = config.get("config_variables_file_path")
^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'get'
Another random failure log:
:
:
[2024-04-08, 22:21:00 IST] {taskinstance.py:2193} INFO - Starting attempt 1 of 1
[2024-04-08, 22:21:00 IST] {taskinstance.py:2214} INFO - Executing <Task(GreatExpectationsOperator): gx_validate_xxx_col_not_null> on 2024-04-08 16:50:34.740105+00:00
[2024-04-08, 22:21:00 IST] {standard_task_runner.py:60} INFO - Started process 1539585 to run task
[2024-04-08, 22:21:00 IST] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'xxx_pipeline_dag_v4', 'gx_validate_xxx_col_not_null', 'manual__2024-04-08T16:50:34.740105+00:00', '--job-id', '633', '--raw', '--subdir', 'DAGS_FOLDER/xxx_dags/xxx_pipeline_dag_v4.py', '--cfg-path', '/tmp/tmpdgy8dl6u']
[2024-04-08, 22:21:00 IST] {standard_task_runner.py:88} INFO - Job 633: Subtask gx_validate_xxx_col_not_null
[2024-04-08, 22:21:00 IST] {task_command.py:423} INFO - Running <TaskInstance: xxx_pipeline_dag_v4.gx_validate_xxx_col_not_null manual__2024-04-08T16:50:34.740105+00:00 [running]> on host xxx-datapipeline-***-private.sub03080733021.xxx.com
[2024-04-08, 22:21:00 IST] {taskinstance.py:2510} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='xxx_pipeline_dag_v4' AIRFLOW_CTX_TASK_ID='gx_validate_xxx_col_not_null' AIRFLOW_CTX_EXECUTION_DATE='2024-04-08T16:50:34.740105+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-04-08T16:50:34.740105+00:00'
[2024-04-08, 22:21:00 IST] {great_expectations.py:580} INFO - Running validation with Great Expectations...
[2024-04-08, 22:21:00 IST] {great_expectations.py:582} INFO - Instantiating Data Context...
[2024-04-08, 22:21:00 IST] {logging_mixin.py:188} INFO - /home/xxx/workspace/***/gx/great_expectations.yml
[2024-04-08, 22:21:00 IST] {logging_mixin.py:188} INFO - ordereddict([('config_version', 3.0), ('datasources', ordereddict([('xxx_CLEANSED_runtime_datasource', ordereddict([('class_name', 'Datasource'), ('module_name', 'great_expectations.datasource'), ('execution_engine', ordereddict([('class_name', 'PandasExecutionEngine'), ('module_name', 'great_expectations.execution_engine')])), ('data_connectors', ordereddict([('default_runtime_connector', ordereddict([('name', 'default_runtime_connector'), ('class_name', 'RuntimeDataConnector'), ('module_name', 'great_expectations.datasource.data_connector'), ('batch_identifiers', ['***_run_id'])]))]))])), ('dynamic_pandas_asset_runtime_datasource', ordereddict([('class_name', 'Datasource'), ('module_name', 'great_expectations.datasource'), ('execution_engine', ordereddict([('class_name', 'PandasExecutionEngine'), ('module_name', 'great_expectations.execution_engine')])), ('data_connectors', ordereddict([('default_runtime_connector', ordereddict([('name', 'default_runtime_connector'), ('class_name', 'RuntimeDataConnector'), ('module_name', 'great_expectations.datasource.data_connector'), ('batch_identifiers', ['***_run_id'])]))]))]))])), ('config_variables_file_path', 'uncommitted/config_variables.yml'), ('plugins_directory', 'plugins/'), ('stores', ordereddict([('expectations_store', ordereddict([('class_name', 'ExpectationsStore'), ('store_backend', ordereddict([('class_name', 'TupleFilesystemStoreBackend'), ('base_directory', 'expectations/')]))])), ('validations_store', ordereddict([('class_name', 'ValidationsStore'), ('store_backend', ordereddict([('class_name', 'TupleFilesystemStoreBackend'), ('base_directory', 'uncommitted/validations/')]))])), ('evaluation_parameter_store', ordereddict([('class_name', 'EvaluationParameterStore')])), ('checkpoint_store', ordereddict([('class_name', 'CheckpointStore'), ('store_backend', ordereddict([('class_name', 'TupleFilesystemStoreBackend'), ('suppress_store_backend_id', True), ('base_directory', 'checkpoints/')]))])), ('profiler_store', ordereddict([('class_name', 'ProfilerStore'), ('store_backend', ordereddict([('class_name', 'TupleFilesystemStoreBackend'), ('suppress_store_backend_id', True), ('base_directory', 'profilers/')]))]))])), ('expectations_store_name', 'expectations_store'), ('validations_store_name', 'validations_store'), ('evaluation_parameter_store_name', 'evaluation_parameter_store'), ('checkpoint_store_name', 'checkpoint_store'), ('data_docs_sites', ordereddict([('local_site', ordereddict([('class_name', 'SiteBuilder'), ('show_how_to_buttons', True), ('store_backend', ordereddict([('class_name', 'TupleFilesystemStoreBackend'), ('base_directory', 'uncommitted/data_docs/local_site/')])), ('site_index_builder', ordereddict([('class_name', 'DefaultSiteIndexBuilder')]))]))])), ('anonymous_usage_statistics', ordereddict([('data_context_id', 'dba4d0fa-ce75-444b-94e5-623ad64aecd1'), ('enabled', True)])), ('fluent_datasources', ordereddict([('filesystem_source_pandas', ordereddict([('type', 'pandas'), ('assets', ordereddict([('xxx_CLEANSED', ordereddict([('type', 'csv'), ('filepath_or_buffer', 'data/xxx_CLEANSED.csv')]))]))])), ('dynamic_pandas', ordereddict([('type', 'pandas'), ('assets', ordereddict([('dynamic_pandas_asset', ordereddict([('type', 'dataframe'), ('batch_metadata', ordereddict())]))]))]))])), ('notebooks', None), ('include_rendered_content', ordereddict([('globally', False), ('expectation_suite', False), ('expectation_validation_result', False)]))])
[2024-04-08, 22:21:00 IST] {base.py:1716} ERROR - Error while processing DataContextConfig: _schema
[2024-04-08, 22:21:00 IST] {base.py:145} ERROR - Encountered errors during loading config. See ValidationError for more details.
[2024-04-08, 22:21:00 IST] {taskinstance.py:2728} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 444, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 414, in _execute_callable
return execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations_provider/operators/great_expectations.py", line 586, in execute
self.data_context = ge.data_context.FileDataContext(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/file_data_context.py", line 66, in __init__
self._project_config = self._init_project_config(project_config)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/file_data_context.py", line 111, in _init_project_config
project_config = FileDataContext._load_file_backed_project_config(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/file_data_context.py", line 213, in _load_file_backed_project_config
return DataContextConfig.from_commented_map(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/types/base.py", line 139, in from_commented_map
config: Union[dict, BYC] = schema_instance.load(commented_map)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/marshmallow/schema.py", line 723, in load
return self._do_load(
^^^^^^^^^^^^^^
File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/marshmallow/schema.py", line 909, in _do_load
self.handle_error(exc, data, many=many, partial=partial)
File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/types/base.py", line 1717, in handle_error
raise gx_exceptions.InvalidDataContextConfigError(
great_expectations.exceptions.exceptions.InvalidDataContextConfigError: Error while processing DataContextConfig: _schema
[2024-04-08, 22:21:00 IST] {taskinstance.py:1149} INFO - Marking task as FAILED. dag_id=xxx_pipeline_dag_v4, task_id=gx_validate_xxx_col_not_null, execution_date=20240408T165034, start_date=20240408T165100, end_date=20240408T165100
[2024-04-08, 22:21:00 IST] {standard_task_runner.py:107} ERROR - Failed to execute job 633 for task gx_validate_xxx_col_not_null (Error while processing DataContextConfig: _schema; 1539585)
[2024-04-08, 22:21:00 IST] {local_task_job_runner.py:234} INFO - Task exited with return code 1
[2024-04-08, 22:21:01 IST] {taskinstance.py:3309} INFO - 0 downstream tasks scheduled from follow-on schedule check</code>
Hi team, we are working on integrating GX with snowflake datasource into our data validation system via GreatExpectationsOperator
.
We are planning to run some expectations validation against a snowflake table named test_sf_table . However, we are getting KeyError: 'data_asset_name test_sf_table is not recognized.'
when running our DAG. We have try both upper and lower cases with and without schema, such as data_asset_name: <schema_name>.<table_name>
.
Does anyone know what the issue could be? Or is there any configuration issue in my data_context_config, checkpoint_config? Any help would be greatly appreciated ~~
Detailed Info:
we are using
airflow-provider-great-expectations==0.2.0
datasource_config:
sf_url = f'snowflake://{username}:{password}@{account}.{region}/{database}/{schema}?warehouse={warehouse}&role={role}&application=great_expectations_oss'
sf_datasource_config = {
"class_name": "Datasource",
"module_name": "great_expectations.datasource",
"execution_engine": {
"class_name": "SqlAlchemyExecutionEngine",
"connection_string": sf_url,
},
"data_connectors": {
"default_runtime_data_connector_name": {
"class_name": "RuntimeDataConnector",
"batch_identifiers": ["default_identifier_name"],
},
"default_inferred_data_connector_name": {
"class_name": "InferredAssetSqlDataConnector",
"include_schema_name": True,
"included_tables": f"{schema}.test_sf_table".lower()
},
},
}
data_context_config:
base_path = Path(__file__).parents[3]
ge_root_dir = os.path.join(base_path, "include", "great_expectations")
snowflake_data_context_config = DataContextConfig(
**{
"config_version": 3.0,
"datasources": {
"my_snowflake_datasource": sf_datasource_config
},
"stores": {
"expectations_store": {
"class_name": "ExpectationsStore",
"store_backend": {
"class_name": "TupleFilesystemStoreBackend",
"base_directory": os.path.join(ge_root_dir, "expectations"),
},
},
"validations_store": {
"class_name": "ValidationsStore",
"store_backend": {
"class_name": "TupleFilesystemStoreBackend",
"base_directory": os.path.join(
ge_root_dir, "uncommitted", "validations"
),
},
},
"evaluation_parameter_store": {"class_name": "EvaluationParameterStore"},
"checkpoint_store": {
"class_name": "CheckpointStore",
"store_backend": {
"class_name": "TupleFilesystemStoreBackend",
"suppress_store_backend_id": True,
"base_directory": os.path.join(ge_root_dir, "checkpoints"),
},
},
},
"expectations_store_name": "expectations_store",
"validations_store_name": "validations_store",
"evaluation_parameter_store_name": "evaluation_parameter_store",
"checkpoint_store_name": "checkpoint_store",
"data_docs_sites": {
"local_site": {
"class_name": "SiteBuilder",
"show_how_to_buttons": True,
"store_backend": {
"class_name": "TupleFilesystemStoreBackend",
"base_directory": os.path.join(
ge_root_dir, "uncommitted", "data_docs", "local_site"
),
},
"site_index_builder": {"class_name": "DefaultSiteIndexBuilder"},
}
},
"anonymous_usage_statistics": {
"data_context_id": "abcdabcd-1111-2222-3333-abcdabcdabcd",
"enabled": True,
},
"notebooks": None,
"concurrency": {"enabled": False},
}
)
checkpoint_config:
snowflake_checkpoint_config = CheckpointConfig(
**{
"name": "test_sf_checkpoint",
"config_version": 1.0,
"template_name": None,
"module_name": "great_expectations.checkpoint",
"class_name": "Checkpoint",
"run_name_template": "%Y%m%d-%H%M%S-test-sf-checkpoint",
"expectation_suite_name": "sf_test.demo",
"action_list": [
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"},
},
{
"name": "store_evaluation_params",
"action": {"class_name": "StoreEvaluationParametersAction"},
},
{
"name": "update_data_docs",
"action": {"class_name": "UpdateDataDocsAction", "site_names": []},
},
],
"evaluation_parameters": {},
"runtime_configuration": {},
"validations": [
{
"batch_request": {
"datasource_name": "my_snowflake_datasource",
"data_connector_name": "default_inferred_data_connector_name",
"data_asset_name": "test_sf_table".lower(),
"data_connector_query": {"index": -1},
},
}
],
"profilers": [],
"ge_cloud_id": None,
"expectation_suite_ge_cloud_id": None,
}
)
operator:
ge_snowflake_validation = GreatExpectationsOperator(
task_id="test_snowflake_validation",
data_context_config=snowflake_data_context_config,
checkpoint_config=snowflake_checkpoint_config
)
Hi,
We have a dependency conflict when using the provider package.
Starting from Airflow 2.3.3, the Airflow constraints file requires Sql Alchemy 1.4.27 while the great expectations provider requires a version earlier than 1.4.10.
Can someone please advice? Is this requirement planned to be changed?
Thanks a lot!
Describe the bug
When running a great_expectation suite on Google Cloud Composer using BigQuery as a datasource, I'm getting the follwing error:
[2021-07-23 17:40:24,866] {xcom.py:237} ERROR - Could not serialize the XCom value into JSON. If you are using pickles instead of JSON for XCom, then you need to enable pickle support for XCom in your airflow config.
[2021-07-23 17:40:24,866] {taskinstance.py:1457} ERROR - Object of type ValidationOperatorResult is not JSON serializable
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1113, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1287, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1320, in _execute_task
self.xcom_push(key=XCOM_RETURN_KEY, value=result)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 65, in wrapper
return func(*args, session=session, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1896, in xcom_push
XCom.set(
File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 62, in wrapper
return func(*args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/xcom.py", line 88, in set
value = XCom.serialize_value(value)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/xcom.py", line 235, in serialize_value
return json.dumps(value).encode('UTF-8')
File "/opt/python3.8/lib/python3.8/json/__init__.py", line 231, in dumps
return _default_encoder.encode(obj)
File "/opt/python3.8/lib/python3.8/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/opt/python3.8/lib/python3.8/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/opt/python3.8/lib/python3.8/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type ValidationOperatorResult is not JSON serializable
To Reproduce
Steps to reproduce the behavior:
Expected behavior
It should run and push the result to XComs
Environment (please complete the following information):
Additional context
If I set do_xcom_push
to False
it works but I can't fetch the results after execution.
Is your feature request related to a problem? Please describe.
The GreatExpectationsOperator
for Airflow currently instantiates a DataContext in the __init__()
method, which gets run every time the task is parsed (which is frequently), slowing down the DAG it is a part of.
Describe the solution you'd like
Move instantiation of DataContext and Checkpoint to the execute()
method of the Operator.
I will open a PR on this shortly, feel free to assign to me.
Hi @denimalpaca @kaxil, thanks again for approving the PR and maintaining this operator. We are actively spiking and trying to onboard great_expectations airflow operator into our DQ system. And we are happy to keep contributing to this fantastic library. We are wondering do we have any rules for the new version release? For example, how often do we release a new version/tag? How are we managing the version number aligned with the great_expectation python library?
Thanks,
@talagluck as I wrote on the Slack channel,
I'm trying to trigger a Slack notification with the GreatExpectationsOperator.
It necessary to use the validation_operator_name to trigger such notification and it is not possible with the current configuration:
https://github.com/great-expectations/airflow-provider-great-expectations/blob/e92a613db94aa872957fa8626f6d5e71690fe421/great_expectations_provider/operators/great_expectations.py#L130
Dialect Used: Snowflake
Expectation Used : expect_column_values_to_be_in_set
Example of expectation:
"expectations": [
{
"expectation_type": "expect_column_values_to_be_in_set",
"kwargs": {
"column": "column_A",
"value_set": {
"$PARAMETER": "valid_ids"
}
},
"meta": {
"id": "test_unknown_ids"
}
}
],
Problem:
If the size of list passed through evaluation parameters(Not loading params from db, loading them from python list) to this expectation crosses 100, it breaks the underlying json.
Error Message:
sqlalchemy.exc.DataError: (psycopg2.errors.InvalidTextRepresentation) invalid input syntax for type json
LINE 1: ...rendered_task_instance_fields SET rendered_fields='{"run_nam...
^
DETAIL: Token "Infinity" is invalid.
CONTEXT: JSON data, line 1: ...6e5", 150641421, "e6096a21", "e6093099", Infinity...
Question:
Is it a bug that expectation crashes on list size more than 100 or is it desired behaviour?
I have verified that it only crashes when I pass 100+ size value_set through evaluation params, expectation works fine if I add the list of 100+ size directly into expectation json. So I think it must be something related to handling of big json objects by operator.
We still have a TODO in the example DAG for in-code contexts. I actually implemented them in another repo when testing out Astronomer - we should migrate the 3 examples at the bottom over to this example DAG, test them to make sure they run too, package, and release. That's lines 131 and below in this example: https://github.com/superconductive/astro-ge/blob/main/dags/example_great_expectations_dag.py
We need to run a GE checkpoint from Airflow.
Checkpoint is based on SQL query.
SQL query must get values for its parameters from Airflow - e.g. a datamart should be checked for DQ for particular date and region after that date and region were refreshed from another Airflow task.
Part of checkpoint.yml looks like:
validations:
- batch_request:
datasource_name: snowflake
data_connector_name: default_runtime_data_connector_name
data_asset_name: db1.table1
runtime_parameters:
query: "SELECT *
from db1.table1
WHERE fld1 > $DATE_PARAM_FROM_AIRFLOW and fld2 = $REGION_PARAM_FROM_AIRFLOW
"
How to do it properly with GreatExpectationsOperator?
Looks like it can't pass parameters only,
while query_to_validate or checkpoint_config will break unit tests (you will need airflow to test your checkpoint!)
Workaround: use environment variables.
Thanks!
Currently if there is a '.' in the asset name it assumed to be a schema.table. Should be able to pass database.schema.table.
hello,
Usecase: Database name is different from schema names and multiple schemas are available..
Error: connection failed for database login.
InvalidSchemaName: schema "main" does not exist
When I add checkpoint_kwargs
to my code it error. But if I delete checkpoint_kwargs
the task can run success.
This is my code:
GreatExpectationsOperator(
task_id="task_1234",
data_context_root_dir="/opt/airflow/plugins/gx/",
conn_id="postgres",
schema="public",
data_asset_name="table",
query_to_validate=query_sql,
expectation_suite_name="sample_suite",
checkpoint_kwargs={
"action_list":[],
},
return_json_dict=True,
dag=dag,
)
ERROR:
File "/home/airflow/.local/lib/python3.12/site-packages/great_expectations/checkpoint/checkpoint.py", line 282, in run
raise gx_exceptions.CheckpointError(
great_expectations.exceptions.exceptions.CheckpointError: Checkpoint "table.sample_suite.chk" must be called with a validator or contain either a batch_request or validations.
Example log line: [2021-04-19 14:52:15,487] {taskinstance.py:1150} ERROR - Executor reports task instance <TaskInstance: {dag_name}.{task_name} 2021-04-18 07:00:00+00:00 [queued]> finished (failed) although the task says its queued. Was the task killed externally?
I have a DAG with ~40 tasks. I recently added GreatExpectations to it. As soon as I introduced this to my DAG weird things occurred with my tasks not properly QUEUEing and being marked failed before even going into RUNNING state. As soon as I remove from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator
my DAG successfully runs without experiencing any of these side effects
Using the GreatExpectationsOperator
has no unintended consequences
Astronomer Cloud and locally in docker-compose
Executor: Kubernetes Executor
Airflow: 1.10.12
Docker Image: quay.io/astronomer/ap-airflow:1.10.12-alpine3.10-onbuild
When specifying schema in connection (instead of as operator parameter) it is ignored when building datasource.
Hi Guys,
currently i have an Error when I want to use the official Airflow Operator for Great Expectations.
My packages:
airflow-provider-great-expectations==0.1.4
great-expectations==0.15.17
apache-airflow=2.1.4
---> [2022-08-05 12:49:47,094] {util.py:153} CRITICAL - Error The module: great_expectations.data_context.store
does not contain the class: ProfilerStore
.
Can you please help ?
Hey there! I'm one of the core devs from the GX team and wanted to reach out to highlight a test failure we've been seeing in our CI - tests/operators/test_great_expectations.py::test_great_expectations_operator__make_connection_string_snowflake_pkey
As part of our CI, we clone this repository, install dependencies, and run the full suite. I've done some debugging and have come to the conclusion that this is a result of the new 4.3.1 release; testing on 4.3.0 passes all tests.
Here's where it fails:
# test_great_expectations_operator__make_connection_string_snowflake_pkey
> assert operator.make_connection_configuration() == test_conn_conf
tests/operators/test_great_expectations.py:911:
Here's the specific part of the snowflake package that fails:
elif private_key_file:
private_key_file_path = Path(private_key_file)
if not private_key_file_path.is_file() or private_key_file_path.stat().st_size == 0:
> raise ValueError("The private_key_file path points to an empty or invalid file.")
E ValueError: The private_key_file path points to an empty or invalid file.
/opt/hostedtoolcache/Python/3.8.17/x64/lib/python3.8/site-packages/airflow/providers/snowflake/hooks/snowflake.py:253: ValueError
Hello!
I'm trying to execute Great Expectations in my airflow instance to run some validations in my BigQuery tables. There's not much content about this on the internet, and I even found some references to an old GreatExpectationsBigQueryOperator
.
I've setup my connection on Airfow as a Google Cloud connection and it works for other things (such as querying data), but on the GreatExpectationsOperator I get this error below:
ERROR - Failed to execute job 118 for task gx_validate_products (Conn type: google_cloud_platform is not supported.; 425)
Here's my code:
gx_validate_products = GreatExpectationsOperator(
task_id="gx_validate_products",
conn_id="gcp_connection",
data_context_root_dir="great_expectations",
data_asset_name="sample_ecommerce.products",
expectation_suite_name="raw_products",
return_json_dict=True,
)
Hi :)
I've been exploring how to use the building the datasource from a provided conn_id
-feature and ran into a small issue:
Our dev (and prod) environment is set up with Snowflake tables located in different schemas, something like:
dev_db
- schema_1
-- table_1
-- table_2
- schema_2
-- table_3
-- table_4
etc
There is currently only one snowflake connection in the dev environment which has its schema field left empty. Of course I (or rather the deployment admin) could create one additional snowflake connection per schema but this does not seem ideal or scaleable.
Because of this I've been trying to find a way to change the schema of the datasource created when passing in a conn_id
but I could not find a way to do so e.g. via a data_contex_config
with the intended schema.
When trying to leave the schema in the Airflow connection blank and passing in the schema name (TAMARAFINGERLIN
) with the table name (CLUSTERS
) to data_asset_name
like this:
t1 = GreatExpectationsOperator(
task_id="t1",
data_asset_name="TAMARAFINGERLIN.CLUSTERS",
conn_id="{{conn.galaxy_snowflake_etl}}",
data_context_root_dir=ge_root_dir,
expectation_suite_name="CLUSTERS",
)
the following error happens:
[2022-12-01T19:11:34.849+0000] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
self.dialect.do_execute(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.9/site-packages/snowflake/connector/cursor.py", line 803, in execute
Error.errorhandler_wrapper(self.connection, self, error_class, errvalue)
File "/usr/local/lib/python3.9/site-packages/snowflake/connector/errors.py", line 275, in errorhandler_wrapper
handed_over = Error.hand_to_other_handler(
File "/usr/local/lib/python3.9/site-packages/snowflake/connector/errors.py", line 330, in hand_to_other_handler
cursor.errorhandler(connection, cursor, error_class, error_value)
File "/usr/local/lib/python3.9/site-packages/snowflake/connector/errors.py", line 209, in default_errorhandler
raise error_class(
snowflake.connector.errors.ProgrammingError: 090106 (22000): 01a8aedf-0506-3740-0000-6821196f772e: Cannot perform CREATE TEMPTABLE. This session does not have a current schema. Call 'USE SCHEMA', or use a qualified name.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/great_expectations_provider/operators/great_expectations.py", line 521, in execute
result = self.checkpoint.run(batch_request=self.batch_request)
File "/usr/local/lib/python3.9/site-packages/great_expectations/core/usage_statistics/usage_statistics.py", line 304, in usage_statistics_wrapped_method
result = func(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/great_expectations/checkpoint/checkpoint.py", line 194, in run
self._run_validation(
File "/usr/local/lib/python3.9/site-packages/great_expectations/checkpoint/checkpoint.py", line 339, in _run_validation
validator: Validator = self.data_context.get_validator(
File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/data_context/abstract_data_context.py", line 1481, in get_validator
self.get_batch_list(
File "/usr/local/lib/python3.9/site-packages/great_expectations/core/usage_statistics/usage_statistics.py", line 304, in usage_statistics_wrapped_method
result = func(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/data_context/abstract_data_context.py", line 1666, in get_batch_list
return datasource.get_batch_list_from_batch_request(batch_request=batch_request)
File "/usr/local/lib/python3.9/site-packages/great_expectations/datasource/new_datasource.py", line 205, in get_batch_list_from_batch_request
) = data_connector.get_batch_data_and_metadata(
File "/usr/local/lib/python3.9/site-packages/great_expectations/datasource/data_connector/data_connector.py", line 116, in get_batch_data_and_metadata
batch_data, batch_markers = self._execution_engine.get_batch_data_and_markers(
File "/usr/local/lib/python3.9/site-packages/great_expectations/execution_engine/sqlalchemy_execution_engine.py", line 1243, in get_batch_data_and_markers
batch_data = SqlAlchemyBatchData(
File "/usr/local/lib/python3.9/site-packages/great_expectations/execution_engine/sqlalchemy_batch_data.py", line 161, in __init__
self._create_temporary_table(
File "/usr/local/lib/python3.9/site-packages/great_expectations/execution_engine/sqlalchemy_batch_data.py", line 295, in _create_temporary_table
self._engine.execute(stmt)
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1274, in execute
return self._exec_driver_sql(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1578, in _exec_driver_sql
ret = self._execute_context(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1845, in _execute_context
self._handle_dbapi_exception(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2026, in _handle_dbapi_exception
util.raise_(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
raise exception
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
self.dialect.do_execute(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.9/site-packages/snowflake/connector/cursor.py", line 803, in execute
Error.errorhandler_wrapper(self.connection, self, error_class, errvalue)
File "/usr/local/lib/python3.9/site-packages/snowflake/connector/errors.py", line 275, in errorhandler_wrapper
handed_over = Error.hand_to_other_handler(
File "/usr/local/lib/python3.9/site-packages/snowflake/connector/errors.py", line 330, in hand_to_other_handler
cursor.errorhandler(connection, cursor, error_class, error_value)
File "/usr/local/lib/python3.9/site-packages/snowflake/connector/errors.py", line 209, in default_errorhandler
raise error_class(
sqlalchemy.exc.ProgrammingError: (snowflake.connector.errors.ProgrammingError) 090106 (22000): 01a8aedf-0506-3740-0000-6821196f772e: Cannot perform CREATE TEMPTABLE. This session does not have a current schema. Call 'USE SCHEMA', or use a qualified name.
[SQL: CREATE OR REPLACE TEMPORARY TABLE ge_temp_7a4f3e73 AS SELECT *
FROM "TAMARAFINGERLIN.CLUSTERS"
WHERE true]
(Background on this error at: https://sqlalche.me/e/14/f405)
[2022-12-01T19:11:34.888+0000] {taskinstance.py:1401} INFO - Marking task as FAILED. dag_id=GE_TEST, task_id=t1, execution_date=20221201T191130, start_date=20221201T191131, end_date=20221201T191134
I was wondering if there could be a way to override some parameters of the connection provided via the conn_id
. This would allow me to map the operator over sets of schema names, tables names and expectation suite names without having to have admin access to the deployment or manually create a lot of connections. :)
cc @denimalpaca
Hi!
Is it possible to run
1) EXPERIMENTAL expectation (from great_expectations_experimental library) from Airflow?
example: expect_queried_column_values_to_exist_in_second_table_column
Simple import to DAG does not help:
from great_expectations_experimental.expectations.expect_queried_column_values_to_exist_in_second_table_column import ExpectQueriedColumnValuesToExistInSecondTableColumn
expect_queried_column_values_to_exist_in_second_table_column(**{'batch_id': '0120cd462e58ed32be35bc92c0ae', 'template_dict': {'condition': '1=1', 'first_table_column': 'PROV_ID', 'second_table_column': 'PROV_ID', 'second_table_full_name': 'LINC'}}) (edited)
2) a custom expectation from great_expectations/plugins/expectations folder?
could it be run from Airflow? how?
https://docs.greatexpectations.io/docs/guides/expectations/creating_custom_expectations/how_to_use_custom_expectations/ (edited)
Right now, the self.data_context
object is initialized within the execute
method of the airflow BaseOperator
.
This is done in:
However, this makes impossible to interact with the data context before or after the execution.
If this self.data_context
is initiated in the __init__()
method, the user could interact with this object in the pre_execute()
or post_execute()
methods of airflow BaseOperator
.
A possible use case, for example, is to add ExpectationsSuites on runtime using an InMemoryStoreBackend
Expectation store?
def pre_execute(self, context: Any):
"""
Create and add an expectation suite to the in-memory DataContext.
"""
suite = self.data_context.create_expectation_suite(suite_name=suite_name, overwrite_existing=True)
# Add expectations
# Here we'll add a simple expectation as an example
suite.add_expectation(
expectation_type="expect_table_row_count_to_be_between",
kwargs={
"min_value": 1,
"max_value": 1000000
}
)
# Save the suite to the DataContext's in-memory expectations store
self.data_context.save_expectation_suite(suite)
GE version : 0.15.0, 0.15.1
I am calling the GreatExpectationsOperator from one of my airflow dags, where I am passing a checkpoint, which has runtime configuration as below :
runtime_configuration: {"result_format": {"result_format": "SUMMARY","include_unexpected_rows": True}}
Since the unexpected rows are also being pulled, the validation result is failing to store it in local disk. My action list configuration in checkpoint is as below :
_action_list:
There is no problem when the include_unexpected_rows is toggled to False.
The error Trace is as mentioned below :
_[2022-04-21 14:23:59,760] {cursor.py:720} INFO - query execution done
[2022-04-21 14:24:00,136] {validator.py:1646} INFO - 1 expectation(s) included in expectation_suite.
[2022-04-21 14:24:00,477] {logging_mixin.py:109} WARNING -
Calculating Metrics: 0%| | 0/12 [00:00<?, ?it/s]
[2022-04-21 14:24:00,604] {cursor.py:696} INFO - query: [SHOW /* sqlalchemy:_get_schema_primary_keys /PRIMARY KEYS IN SCHEMA sample_db.pub...]
[2022-04-21 14:24:02,266] {cursor.py:720} INFO - query execution done
[2022-04-21 14:24:02,370] {cursor.py:696} INFO - query: [SELECT / sqlalchemy:_get_schema_columns / ic.table_name, ic.column_name, ic.da...]
[2022-04-21 14:24:03,873] {cursor.py:720} INFO - query execution done
[2022-04-21 14:24:09,438] {logging_mixin.py:109} WARNING -
Calculating Metrics: 17%|#6 | 2/12 [00:08<00:44, 4.43s/it]
[2022-04-21 14:24:09,465] {cursor.py:696} INFO - query: [SELECT count() AS "table.row_count" FROM ge_temp_284c970f]
[2022-04-21 14:24:09,967] {cursor.py:720} INFO - query execution done
[2022-04-21 14:24:10,178] {logging_mixin.py:109} WARNING -
Calculating Metrics: 33%|###3 | 4/12 [00:09<00:16, 2.04s/it]
[2022-04-21 14:24:10,237] {cursor.py:696} INFO - query: [SELECT c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegme...]
[2022-04-21 14:24:10,782] {cursor.py:720} INFO - query execution done
[2022-04-21 14:24:11,241] {cursor.py:696} INFO - query: [SELECT c_nationkey AS unexpected_values FROM ge_temp_284c970f WHERE c_nationkey ...]
[2022-04-21 14:24:11,770] {cursor.py:720} INFO - query execution done
[2022-04-21 14:24:13,527] {logging_mixin.py:109} WARNING -
Calculating Metrics: 83%|########3 | 10/12 [00:12<00:01, 1.03it/s]
[2022-04-21 14:24:13,641] {cursor.py:696} INFO - query: [SELECT sum(CASE WHEN (c_nationkey IS NULL) THEN 1 ELSE 0 END) AS "column_values....]
[2022-04-21 14:24:14,463] {cursor.py:720} INFO - query execution done
[2022-04-21 14:24:15,786] {logging_mixin.py:109} WARNING -
Calculating Metrics: 100%|##########| 12/12 [00:15<00:00, 1.00s/it]
[2022-04-21 14:24:15,909] {logging_mixin.py:109} WARNING -
Calculating Metrics: 100%|##########| 12/12 [00:15<00:00, 1.28s/it]
[2022-04-21 14:24:15,912] {logging_mixin.py:109} WARNING -
[2022-04-21 14:24:16,255] {validation_operators.py:465} ERROR - Error running action with name store_validation_result
Traceback (most recent call last):
File "/root/.local/lib/python3.6/site-packages/great_expectations/validation_operators/validation_operators.py", line 453, in _run_actions
checkpoint_identifier=checkpoint_identifier,
File "/root/.local/lib/python3.6/site-packages/great_expectations/checkpoint/actions.py", line 78, in run
**kwargs,
File "/root/.local/lib/python3.6/site-packages/great_expectations/checkpoint/actions.py", line 830, in _run
expectation_suite_id=expectation_suite_ge_cloud_id,
File "/root/.local/lib/python3.6/site-packages/great_expectations/data_context/store/store.py", line 161, in set
self.key_to_tuple(key), self.serialize(key, value), **kwargs
File "/root/.local/lib/python3.6/site-packages/great_expectations/data_context/store/validations_store.py", line 168, in serialize
value, indent=2, sort_keys=True
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 584, in dumps
serialized = self.dump(obj, many=many)
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 560, in dump
result = self._serialize(processed_obj, many=many)
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 524, in _serialize
value = field_obj.serialize(attr_name, obj, accessor=self.get_attribute)
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/fields.py", line 309, in serialize
return self._serialize(value, attr, obj, **kwargs)
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/fields.py", line 697, in _serialize
return [self.inner._serialize(each, attr, obj, **kwargs) for each in value]
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/fields.py", line 697, in
return [self.inner._serialize(each, attr, obj, **kwargs) for each in value]
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/fields.py", line 564, in _serialize
return schema.dump(nested_obj, many=many)
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 555, in dump
PRE_DUMP, obj, many=many, original_data=obj
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 1075, in _invoke_dump_processors
tag, pass_many=False, data=data, many=many, original_data=original_data
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 1231, in _invoke_processors
data = processor(data, many=many, **kwargs)
File "/root/.local/lib/python3.6/site-packages/great_expectations/core/expectation_validation_result.py", line 253, in convert_result_to_serializable
data.result = convert_to_json_serializable(data.result)
File "/root/.local/lib/python3.6/site-packages/great_expectations/core/util.py", line 161, in convert_to_json_serializable
new_dict[str(key)] = convert_to_json_serializable(data[key])
File "/root/.local/lib/python3.6/site-packages/great_expectations/core/util.py", line 168, in convert_to_json_serializable
new_list.append(convert_to_json_serializable(val))
File "/root/.local/lib/python3.6/site-packages/great_expectations/core/util.py", line 257, in convert_to_json_serializable
f"{str(data)} is of type {type(data).name} which cannot be serialized."
TypeError: (891097, 'Customer#0007', 'eRL', 21, '31-843-843', Decimal('50.04'), 'FUTURE', 'some junk string for testing') is of type RowProxy which cannot be serialized.
[2022-04-21 14:24:16,838] {taskinstance.py:1463} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1165, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1283, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1313, in _execute_task
result = task_copy.execute(context=context)
File "/root/.local/lib/python3.6/site-packages/great_expectations_provider/operators/great_expectations.py", line 160, in execute
result = self.checkpoint.run()
File "/root/.local/lib/python3.6/site-packages/great_expectations/core/usage_statistics/usage_statistics.py", line 287, in usage_statistics_wrapped_method
result = func(*args, **kwargs)
File "/root/.local/lib/python3.6/site-packages/great_expectations/checkpoint/checkpoint.py", line 167, in run
validation_dict=validation_dict,
File "/root/.local/lib/python3.6/site-packages/great_expectations/checkpoint/checkpoint.py", line 367, in _run_validation
**operator_run_kwargs,
File "/root/.local/lib/python3.6/site-packages/great_expectations/core/async_executor.py", line 100, in submit
return AsyncResult(value=fn(*args, **kwargs))
File "/root/.local/lib/python3.6/site-packages/great_expectations/validation_operators/validation_operators.py", line 392, in run
checkpoint_identifier=checkpoint_identifier,
File "/root/.local/lib/python3.6/site-packages/great_expectations/validation_operators/validation_operators.py", line 466, in _run_actions
raise e
File "/root/.local/lib/python3.6/site-packages/great_expectations/validation_operators/validation_operators.py", line 453, in _run_actions
checkpoint_identifier=checkpoint_identifier,
File "/root/.local/lib/python3.6/site-packages/great_expectations/checkpoint/actions.py", line 78, in run
**kwargs,
File "/root/.local/lib/python3.6/site-packages/great_expectations/checkpoint/actions.py", line 830, in _run
expectation_suite_id=expectation_suite_ge_cloud_id,
File "/root/.local/lib/python3.6/site-packages/great_expectations/data_context/store/store.py", line 161, in set
self.key_to_tuple(key), self.serialize(key, value), **kwargs
File "/root/.local/lib/python3.6/site-packages/great_expectations/data_context/store/validations_store.py", line 168, in serialize
value, indent=2, sort_keys=True
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 584, in dumps
serialized = self.dump(obj, many=many)
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 560, in dump
result = self._serialize(processed_obj, many=many)
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 524, in _serialize
value = field_obj.serialize(attr_name, obj, accessor=self.get_attribute)
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/fields.py", line 309, in serialize
return self._serialize(value, attr, obj, **kwargs)
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/fields.py", line 697, in _serialize
return [self.inner._serialize(each, attr, obj, **kwargs) for each in value]
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/fields.py", line 697, in
return [self.inner._serialize(each, attr, obj, **kwargs) for each in value]
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/fields.py", line 564, in _serialize
return schema.dump(nested_obj, many=many)
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 555, in dump
PRE_DUMP, obj, many=many, original_data=obj
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 1075, in _invoke_dump_processors
tag, pass_many=False, data=data, many=many, original_data=original_data
File "/root/.local/lib/python3.6/site-packages/great_expectations/marshmallow__shade/schema.py", line 1231, in invoke_processors
data = processor(data, many=many, **kwargs)
File "/root/.local/lib/python3.6/site-packages/great_expectations/core/expectation_validation_result.py", line 253, in convert_result_to_serializable
data.result = convert_to_json_serializable(data.result)
File "/root/.local/lib/python3.6/site-packages/great_expectations/core/util.py", line 161, in convert_to_json_serializable
new_dict[str(key)] = convert_to_json_serializable(data[key])
File "/root/.local/lib/python3.6/site-packages/great_expectations/core/util.py", line 168, in convert_to_json_serializable
new_list.append(convert_to_json_serializable(val))
File "/root/.local/lib/python3.6/site-packages/great_expectations/core/util.py", line 257, in convert_to_json_serializable
f"{str(data)} is of type {type(data).name} which cannot be serialized."
TypeError: (891097, 'Customer#0007', 'eRL', 21, '31-843-843', Decimal('50.04'), 'FUTURE', 'some junk string for testing') is of type RowProxy which cannot be serialized.
[2022-04-21 14:24:17,362] {taskinstance.py:1513} INFO - Marking task as FAILED. dag_id=ge_test_01, task_id=validation_customer_table, execution_date=20220421T142326, start_date=20220421T142338, end_date=20220421T142417
[2022-04-21 14:24:22,341] {local_task_job.py:151} INFO - Task exited with return code 1
[2022-04-21 14:24:23,283] {local_task_job.py:261} INFO - 0 downstream tasks scheduled from follow-on schedule check
I think calling the bashoperator to run the checkpoint can be a workaround , but we want to avoid the bashoperator.
Thanks in advance !!
GX has the option to store metadata in stores: https://docs.greatexpectations.io/docs/reference/learn/terms/store/
Currently the GXO does not have a way to pull these credentials from an Airflow connection.
Hello team,
Iโm receiving the below error when I try to use GreatExpectationsBigQueryOperator
[2021-05-20 00:23:39,025] {validation_operators.py:405} ERROR - Error running action with name update_data_docs
Traceback (most recent call last)
File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/validation_operators/validation_operators.py", line 392, in _run_action
payload=batch_actions_results
File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/checkpoint/actions.py", line 62, in ru
**kwargs
File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/checkpoint/actions.py", line 939, in _ru
validation_result_suite_identifier.expectation_suite_identifier
File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/core/usage_statistics/usage_statistics.py", line 262, in usage_statistics_wrapped_metho
result = func(*args, **kwargs
File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/data_context/data_context.py", line 2304, in build_data_doc
resource_identifiers, build_index=build_inde
File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/render/renderer/site_builder.py", line 293, in buil
site_section_builder.build(resource_identifiers=resource_identifiers
File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/render/renderer/site_builder.py", line 388, in buil
source_store_keys = self.source_store.list_keys(
File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/data_context/store/store.py", line 142, in list_key
return [self.tuple_to_key(key) for key in keys_without_store_backend_id
File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/data_context/store/store.py", line 142, in <listcomp
return [self.tuple_to_key(key) for key in keys_without_store_backend_id
File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/data_context/store/store.py", line 110, in tuple_to_ke
return self._key_class.from_tuple(tuple_
File "/opt/python3.6/lib/python3.6/site-packages/great_expectations/data_context/types/resource_identifiers.py", line 164, in from_tupl
RunIdentifier.from_tuple((tuple_[-3], tuple_[-2]))
IndexError: tuple index out of rang
Here is my airflow task:
validate_output = GreatExpectationsBigQueryOperator(
task_id='validate_output',
gcp_project='<project_id>',
#data_context=DC,
gcs_bucket='<my_bucket_id>',
gcs_datadocs_prefix='dags/playground/test_great_expectations/great_expectations',
gcs_expectations_prefix='dags/playground/test_great_expectations/great_expectations/expectations',
gcs_validations_prefix='dags/playground/test_great_expectations/great_expectations',
expectation_suite_name='test_new_suite',
bq_dataset_name='data_platform_staging',
email_to='[email protected]',
send_alert_email=False,
bigquery_conn_id='bigquery-greatexpectations',
table='project_id.test_dataset.test_table',
dag=dag
)
Iโm using the latest great-expectations & airflow-provider-great-expectations dependencies.
I'm using Version : 1.10.14+composer.
Thanks in advance!
I am currently using the following version:
airflow-provider-great-expectations==0.2.0
I am trying to run Great Expectations operator and pass the snowflake connection id and execute the query.
My code throws out the following error trace:
AIRFLOW_CTX_DAG_RUN_ID=manual__2023-01-30T10:18:21.654701+00:00
[2023-01-30, 10:18:36 UTC] {great_expectations.py:470} INFO - Running validation with Great Expectations...
[2023-01-30, 10:18:36 UTC] {great_expectations.py:472} INFO - Instantiating Data Context...
[2023-01-30, 10:18:36 UTC] {base.py:71} INFO - Using connection ID 'snowflake_conn' for task execution.
[2023-01-30, 10:18:36 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/great_expectations_provider/operators/great_expectations.py", line 474, in execute
self.build_runtime_datasources()
File "/usr/local/lib/python3.8/site-packages/great_expectations_provider/operators/great_expectations.py", line 368, in build_runtime_datasources
self.build_runtime_sql_datasource_config_from_conn_id()
File "/usr/local/lib/python3.8/site-packages/great_expectations_provider/operators/great_expectations.py", line 304, in build_runtime_sql_datasource_config_from_conn_id
"connection_string": self.make_connection_string(),
File "/usr/local/lib/python3.8/site-packages/great_expectations_provider/operators/great_expectations.py", line 251, in make_connection_string
uri_string = f"snowflake://{self.conn.login}:{self.conn.password}@{self.conn.extra_dejson['extra__snowflake__account']}.{self.conn.extra_dejson['extra__snowflake__region']}/{self.conn.extra_dejson['extra__snowflake__database']}/{self.conn.schema}?warehouse={self.conn.extra_dejson['extra__snowflake__warehouse']}&role={self.conn.extra_dejson['extra__snowflake__role']}" # noqa
KeyError: 'extra__snowflake__account'
On logging the conn in Airflow using the below code:
@task
def test_conn():
from airflow.hooks.base import BaseHook
conn = BaseHook.get_connection(SNOWFLAKE_CONN_ID)
task_logger.info(f"connection info {conn.extra_dejson} %s")
Below is the information logged:
[2023-01-30, 10:18:28 UTC] {base.py:71} INFO - Using connection ID 'snowflake_conn' for task execution.
[2023-01-30, 10:18:28 UTC] {test_gx_snowflake.py:265} INFO - connection info {'account': 'rtb82372.us-east-1', 'insecure_mode': False, 'database': 'SFSALES_SFC_SAMPLES_VA3_SAMPLE_DATA', 'warehouse': 'XLARGEWH'} %s
[2023-01-30, 10:18:28 UTC] {python.py:177} INFO - Done. Returned value was: None
Reference issue at airflow side:
apache/airflow#26764
Hi,
Would you consider adding Trino connection to the operator?
Apparently there is a TODO, but I don't know if it's in your foreseeable rodmap.
Thanks
Describe the bug
Hi,
I have deployed Great Expectations with Google Cloud Composer. I've created an Airflow DAG to insert data into table from another table and then validate if the data is correct. Then I generated suite file using the scaffold example. I added all of GE files to GCP bucket. But I'm receiving this error: DataContextError: No validation operator 'action_list_operator' was found in your project. Please verify this in your great_expectations.yml
. I'm sure that action_list_operator
exists in my great_expectations.yml
file.
Error message:
DataContextError: No validation operator 'action_list_operator' was found in your project. Please verify this in your great_expectations.yml
. I'm sure that action_list_operator
exists in my great_expectations.yml
file.
How to reproduce:
pybigquery
, great_expectations
, airflow-provider-great-expectations
on the composer environment.Environment Versions:
Airflow Version: 1.10.9+composer
Great Expectations Version: 0.13.11
I've also attached my DAG
and great_expectations
.yml files.
Workaround:
One workaround to fix this is that I've created my custom operator using this code and added action_list_operator
part manually inside create_data_context_config
function. But I'm receiving another error: IndexError: tuple index out of range
Thanks in advance!
Hi!
I noticed that when only using the Airflow UI fields to set a connection the Extra JSON rendered has different keys than what the GXO is looking for.
Extra field automatically rendered from connection UI fields:
{"account": "myacc", "warehouse": "HUMANS", "database": "DWH_DEV", "region": "us-east-1", "role": "myrole", "insecure_mode": false}
The GXO is looking for extra__snowflake__account
, extra__snowflake__region
etc. I think this might have been how keys used to be rendered? Setting the connection with these keys adding extra__snowflake__
in front of the parameters everything works as expected.
Error trace:
[2023-01-05, 13:37:40 UTC] {great_expectations.py:454} INFO - Instantiating Data Context...
[2023-01-05, 13:37:40 UTC] {base.py:73} INFO - Using connection ID 'galaxy_snowflake_etl' for task execution.
[2023-01-05, 13:37:40 UTC] {taskinstance.py:1772} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/great_expectations_provider/operators/great_expectations.py", line 456, in execute
self.build_runtime_datasources()
File "/usr/local/lib/python3.9/site-packages/great_expectations_provider/operators/great_expectations.py", line 366, in build_runtime_datasources
self.datasource = self.build_configured_sql_datasource_config_from_conn_id()
File "/usr/local/lib/python3.9/site-packages/great_expectations_provider/operators/great_expectations.py", line 259, in build_configured_sql_datasource_config_from_conn_id
conn_str = self.make_connection_string()
File "/usr/local/lib/python3.9/site-packages/great_expectations_provider/operators/great_expectations.py", line 246, in make_connection_string
uri_string = f"snowflake://{self.conn.login}:{self.conn.password}@{self.conn.extra_dejson['extra__snowflake__account']}.{self.conn.extra_dejson['extra__snowflake__region']}/{self.conn.extra_dejson['extra__snowflake__database']}/{schema}?warehouse={self.conn.extra_dejson['extra__snowflake__warehouse']}&role={self.conn.extra_dejson['extra__snowflake__role']}" # noqa
KeyError: 'extra__snowflake__account'```
Currently gexp provider assumes a snowflake region will always be provided.
Some snowflake regions (ie. us-west-2) do not require (or allow) a region in the connection string. Additionally, some regions have multiple deployments and it becomes necessary to specify region and deployment. This is best accomplished by putting the fully-qualified name in the account field (ie. account.deployment.region.cloud). For this to work the region field must be empty.
Is:
Somebody removed the template_fields
declaration for the operator, which basically makes the operator useless. Please fix that ASAP and COVER it with unit tests to prevent such situation from happening in the future.
I have several GreatExpectationOperator tasks running concurrently. I would get an error often letting me know that the great_expectations.yml file could not be parsed. Inspecting the file, I noticed the file would normally have at the end of the file ult: false
. It corresponded with the parsing error in the Airflow logs page of the task.
I suspect a race condition is happening when the great_expectations.yml file is generated due to needing to update the datasource
section for a job. After changing GreatExpectationOperator tasks to run one at a time, the great_expectations.yml file has not been corrupted any longer.
In Airflow, there is a feature which allows for external links to be present as buttons on the Task Instance modal in the Airflow UI called an operator extra link. This feature is really useful to directly navigate from the Airflow UI to an external site which may be for monitoring a third-party execution that Airflow just triggered, handy documentation, etc.
IMO linking to Data Docs directly from a GreatExpectationsOperator
task would be really beneficial to users and increase the holistic visibility that's available at their fingertips.
So apply_defaults
throws an error if any args or kwargs which are explicitly mentioned in the function signature are empty (i.e. None
). Since GreatExpectationsBigQueryOperator
names expectation_suite_name
as an positional arg (not variable, i.e. hidden in *args
), it becomes mandatory to the function call.
The problem is when I want to use checkpoints, which have this check in the GreatExpectationsOperator
:
# Check that only the correct args to validate are passed
# this doesn't cover the case where only one of expectation_suite_name or batch_kwargs is specified
# along with one of the others, but I'm ok with just giving precedence to the correct one
if sum(bool(x) for x in [(expectation_suite_name and batch_kwargs), assets_to_validate, checkpoint_name]) != 1:
raise ValueError("Exactly one of expectation_suite_name + batch_kwargs, assets_to_validate, \
or checkpoint_name is required to run validation.")
As a result, I am mandated by GreatExpectationsBigQueryOperator
apply_defaults()
call to have expectation_suite_name
, but the GreatExpectationsOperator
requires me to only name the checkpoint_name
.
Without expectation_suite_name
:
airflow.exceptions.AirflowException: Argument ['expectation_suite_name'] is required
With expectation_suite_name
:
Traceback (most recent call last):
File "dags/gfk/test_ge.py", line 54, in <module>
dag=dag,
File "/usr/local/lib/python3.7/site-packages/airflow/utils/decorators.py", line 98, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/great_expectations_provider/operators/great_expectations_bigquery.py", line 150, in __init__
**kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/utils/decorators.py", line 98, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/great_expectations_provider/operators/great_expectations.py", line 85, in __init__
or checkpoint_name is required to run validation.")
ValueError: Exactly one of expectation_suite_name + batch_kwargs, assets_to_validate, or checkpoint_name is required to run validation.
I wonder why it fails though, since the check seems to allow for one of the values (batch_kwargs
or expectation_suite_name
) to be set, but it does not work. I will investigate it bit further tomorrow.
In order to access the expectation_suite_name
of the parent class, just put it back to the variable kwargs, call super()
and access it via self
, maybe? Or remove apply_defaults
?
airflow==1.10.12
airflow-provider-great-expectations==0.0.4
great-expectations==0.13.19
Python 3.7.10
When using the GreatExpectationsOperator
with an Athena conn_id, I received the following from PyAthena: Unable to locate credentials
.
I would like to know the best way to provide credentials, as this page here: GX docs - connect to data does not specify how to setup with just an airflow connection.
Should I be using credentials:
in the great_expectations.yml
file, with secrets injected from the config_variables.yml
? Is there anyway to connect with only the conn_id
via Airflow Connection?
I'm using the ODBC Driver 18 for SQL Server and have it listed as an extra under "Driver:" in my Airflow connection - this would be correct if the source code for the method 'make_connection_configuration'
in great_expectations.py didn't specify that the extra should be a lower-case "driver". As soon as I noticed this detail and changed it in my connection the GXOperator works perfectly. To prevent this problem from occuring the code should be changed to allow variations in writing of the "driver" extra.
...
elif conn_type == "mssql":
odbc_connector = "mssql+pyodbc"
ms_driver = self.conn.extra_dejson.get("driver") or "ODBC Driver 17 for SQL Server"
driver = f"?driver={ms_driver}"
database_name = self.conn.schema or "master"
...
It seems to be nice to be able to log out specific expectations that failed, when the Operator fails. I am not sure this should be internalized, so instead, perhaps it makes sense to set a callback function on fail?
I have something as naive as this in mind:
def generate_list_failed_expectations(r) -> str:
"""Note - only for validation results
Input:
r (reat_expectations.validation_operators.types.validation_operator_result.ValidationOperatorResult)
Returns: str
"""
from itertools import groupby
lines = ["Validation with Great Expectations failed."]
for batch, result in r["run_results"].items():
lines.append(f"{batch}")
failed = [
r.expectation_config
for r in result["validation_result"].results
if not r.success
]
for col, group_failed in groupby(failed, key=lambda k: k["kwargs"]["column"]):
lines.append(f" {col}")
for test in group_failed:
lines.append(f' - {test["expectation_type"]}')
raise AirflowException("\n".join(lines))
I'm wondering if we can add dataframe_to_validate
as a templated field?
template_fields = (
"run_name",
"conn_id",
"data_context_root_dir",
"checkpoint_name",
"checkpoint_kwargs",
"query_to_validate",
)
Describe the bug:
DAG contating great_expecations opeartor in astronomer airflow environment - doesn't load.
Tried downgrading to protobuf==3.2.1, but have several google packages that require latest (4.2.x) protobuf version.
Steps to reproduce the behavior:
Restart astronomer airflow environment
Expected behavior:
DAG is succsesfully loaded
Environment (please complete the following information):
Operating System: Linux (docker on vm)
Great Expectations Version: 0.15.46
Additional context:
Broken DAG: [/usr/local/airflow/dags/great_expectations/great_expectations_athena.py] Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/google/cloud/secretmanager_v1/proto/resources_pb2.py", line 57, in <module>
_descriptor.EnumValueDescriptor(
File "/usr/local/lib/python3.9/site-packages/google/protobuf/descriptor.py", line 755, in __new__
_message.Message._CheckCalledFromGeneratedFile()
TypeError: Descriptors cannot not be created directly.
If this call came from a _pb2.py file, your generated code is out of date and must be regenerated with protoc >= 3.19.0.
If you cannot immediately regenerate your protos, some other possible workarounds are:
1. Downgrade the protobuf package to 3.20.x or lower.
2. Set PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python (but this will use pure-Python parsing and will be much slower).
More information: https://developers.google.com/protocol-buffers/docs/news/2022-05-06#python-updates
Use proper Airflow's s3 hooks, rather than default configuration, while running GE.
(have code, will PR asap)
This is most likely an issue with airflow itself (we use 1.10.9), but we didn't have any similar issue with it before - on clearing GE operator, whole system goes kabum with this log:
Node: airflow-airflow-web-6f4fbbfc58-cnx4j
-------------------------------------------------------------------------------
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 2446, in wsgi_app
response = self.full_dispatch_request()
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1951, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1820, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/usr/local/lib/python3.7/site-packages/flask/_compat.py", line 39, in reraise
raise value
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1949, in full_dispatch_request
rv = self.dispatch_request()
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1935, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/usr/local/lib/python3.7/site-packages/flask_admin/base.py", line 69, in inner
return self._run_view(f, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/flask_admin/base.py", line 368, in _run_view
return fn(self, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/flask_login/utils.py", line 261, in decorated_view
return func(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/www/utils.py", line 290, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/www/utils.py", line 337, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/www/views.py", line 1296, in clear
include_upstream=upstream)
File "/usr/local/lib/python3.7/site-packages/airflow/models/dag.py", line 1211, in sub_dag
for t in regex_match + also_include}
File "/usr/local/lib/python3.7/site-packages/airflow/models/dag.py", line 1211, in <dictcomp>
for t in regex_match + also_include}
File "/usr/local/lib/python3.7/copy.py", line 161, in deepcopy
y = copier(memo)
File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 681, in __deepcopy__
setattr(result, k, copy.deepcopy(v, memo))
File "/usr/local/lib/python3.7/copy.py", line 180, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/usr/local/lib/python3.7/copy.py", line 280, in _reconstruct
state = deepcopy(state, memo)
File "/usr/local/lib/python3.7/copy.py", line 150, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.7/copy.py", line 240, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.7/copy.py", line 180, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/usr/local/lib/python3.7/copy.py", line 280, in _reconstruct
state = deepcopy(state, memo)
File "/usr/local/lib/python3.7/copy.py", line 150, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.7/copy.py", line 240, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.7/copy.py", line 180, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/usr/local/lib/python3.7/copy.py", line 280, in _reconstruct
state = deepcopy(state, memo)
File "/usr/local/lib/python3.7/copy.py", line 150, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.7/copy.py", line 240, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.7/copy.py", line 169, in deepcopy
rv = reductor(4)
TypeError: can't pickle _thread.lock objects
The GreatExpecatationsBigQueryOperator makes a get_connection
request in the init
method of the class. This is problematic for a few reasons:
init
method of your class. If the init
method makes requests to the database or over the network, it will place an unnecessary burden on the database.The general pattern to follow is to have init
store the conn_id
string and build the hook via get_connection
at runtime.
In great_expectations==0.14.4
the Checkpoint
class does not have a module_name
class attribute (https://github.com/great-expectations/great_expectations/blob/0.14.4/great_expectations/checkpoint/checkpoint.py#L74) however when initializing a Checkpoint
on the Great Expectations Operator (https://github.com/great-expectations/airflow-provider-great-expectations/blob/main/great_expectations_provider/operators/great_expectations.py#L139) a Checkpoint
is initialized with all the attributes of checkpoint_config.to_json_dict()
but the CheckpointConfig has a module_name
attribute so this causes an error when initializing the Checkpoint.
Stack Trace
File "/usr/local/airflow/.local/lib/python3.7/site-packages/great_expectations_provider/operators/great_expectations.py", line 139, in __init__
data_context=self.data_context, **self.checkpoint_config.to_json_dict()
TypeError: __init__() got an unexpected keyword argument 'module_name'
There is even a TODO comment in operator code for adding support for Athena connection.
Can you please add support for this?
Hi
I am getting the following error in the logs when trying to run my expectations via airflow.
great_expectations.exceptions.exceptions.DataContextError: expectation_suite accounts not found
import os
import airflow
from airflow import DAG
from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator
default_args = {
'dir': '/project/great_expectations',
'start_date': airflow.utils.dates.days_ago(1),
'profiles_dir': '/home/airflow/.dbt/',
'catchup': False
}
with DAG(dag_id='accounts_great_expectations', default_args=default_args, schedule_interval='@daily') as dag:
accounts = GreatExpectationsOperator(
task_id='accounts',
expectation_suite_name='accounts',
data_context_root_dir='/project/great_expectations',
batch_kwargs={
'table': 'raw_accounts',
'datasource': 'fusioncell_local'
},
dag=dag
)
accounts_great_expectations
It can find the great_expectation.yml file fine though which in /project/great_expectations and my expectations are in a directory one below that called expectations??
Thanks
With the new 0.2.1 release of the Great Expectations Operator we've been getting the below error. We reverted to 0.2.0 and the error went away.
Error:
[2022-11-17, 01:58:38 UTC] {{abstractoperator.py:175}} ERROR - Failed to resolve template field 'data_context_root_dir'
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/airflow/models/abstractoperator.py", line 173, in resolve_template_files
setattr(self, field, env.loader.get_source(env, content)[0]) # type: ignore
File "/usr/local/lib/python3.9/site-packages/jinja2/loaders.py", line 218, in get_source
raise TemplateNotFound(template)
jinja2.exceptions.TemplateNotFound: /usr/src/app/great_expectations
Code:
ge_data_unit_test_op = GreatExpectationsOperator(
task_id="test_package",
data_context_root_dir=ge_root_dir,
checkpoint_name=test_package,
checkpoint_kwargs={
"batch_request": {
"datasource_name": TARGET_PROJECT_ID,
"data_asset_name": data_asset_name,
}
},
)
When using great expectations with datasources from great_expectations.yml you can use
create_temp_table: False
setting for SqlAlchemyExecutionEngine to prevent SqlAlchemy from issuing SQLs like this:
CREATE OR REPLACE TEMPORARY TABLE gx_temp_d5435 AS select ...
Question:
How to do the same if using airflow-provider-great-expectations with Airflow conn_id (which overrides datasources from great_expectations.yml)?
GreatExpectationsOperator
fails when run validations agains Athena
and work correct against Redshift
docker-compose
airflow
airflow==2.2.3
airflow-provider-great-expectations==0.1.3
great-expectations==0.14.7
SqlAlchemyEngine
.datasources
identically, just change the connection string / credentials."exception_info": {
"exception_traceback": "Traceback (most recent call last):\n File \"/home/***/.local/lib/python3.7/site-packages/great_expectations/execution_engine/execution_engine.py\", line 397, in resolve_metrics\n new_resolved = self.resolve_metric_bundle(metric_fn_bundle)\n File \"/home/***/.local/lib/python3.7/site-packages/great_expectations/execution_engine/sqlalchemy_execution_engine.py\", line 815, in resolve_metric_bundle\n domain_kwargs=domain_kwargs,\n File \"/home/***/.local/lib/python3.7/site-packages/great_expectations/execution_engine/sqlalchemy_execution_engine.py\", line 505, in get_domain_records\n selectable = selectable.columns().subquery()\nAttributeError: 'TextAsFrom' object has no attribute 'subquery'\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n File \"/home/***/.local/lib/python3.7/site-packages/great_expectations/validator/validator.py\", line 845, in resolve_validation_graph\n runtime_configuration=runtime_configuration,\n File \"/home/***/.local/lib/python3.7/site-packages/great_expectations/validator/validator.py\", line 1749, in _resolve_metrics\n runtime_configuration=runtime_configuration,\n File \"/home/***/.local/lib/python3.7/site-packages/great_expectations/execution_engine/execution_engine.py\", line 401, in resolve_metrics\n message=str(e), failed_metrics=[x[0] for x in metric_fn_bundle]\ngreat_expectations.exceptions.exceptions.MetricResolutionError: 'TextAsFrom' object has no attribute 'subquery'\n",
"exception_message": "'TextAsFrom' object has no attribute 'subquery'",
"raised_exception": true
}
While `Redshift with the exact same configurations, runs OK.
RuntimeBatchRequest
query need to be between ( )
to work."query": f"(select * from db.table limit 100)"
RuntimeBatchRequest
by code, and pushing as a parameter for the checkpoint. This create the EXACTLY SAME ERROR as Athena now.DAG gist file for reference (modified / deleted confidential information): dag_poc.py
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.