Comments (12)
I'm having the same issue.
from airflow-provider-grafana-loki.
can you please share some more details:
- airflow version
- airflow grafana provider version
- the loki log handler configuration
- are you getting the error for all the task?
- there is a known issue with fetching logs for older task retries. see if you can fetch log for latest task try.
- are the task logs visible in grafana?
- describe your airlfow setup. are you using airlfow standalone mode?
- can you try reproducing the issue with an example dag?
from airflow-provider-grafana-loki.
can you please the some more logs around the loki query?
from airflow-provider-grafana-loki.
- v2.6.2
- 0.0.2
- Host 'loki' is a resolvable container name in docker (same docker network)
- Yes
- Have tried, the same thing for all
- I also don't see a Loki label within the Grafana explorer
- Using the docker-compose setup https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html
- Example dag I have tried
crm-elastig-dag.py
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from includes.test import hello
#from grafana_loki_provider.hooks.loki import LokiHook
#loki_hook = LokiHook(loki_conn_id="loki")
#import logging
#logging.info("HELLO")
import logging
logger = logging.getLogger(__name__)
logger.info("This is a log message")
# This seems to work, but it's a manual configuration
"""
params = {
"query": "{foo=\"bar2\"}",
"limit":5000,
"direction": "forward",
}
resp = loki_hook.query_range(params)
print(resp)
"""
args = {
'owner': 'Marko',
'start_date': days_ago(1)
}
dag = DAG(
dag_id='crm-elastic-dag',
default_args=args,
schedule_interval='@daily'
)
with dag:
hello_world = PythonOperator(
task_id='hello',
python_callable=hello,
# provide_context=True
)
test.py
def hello():
print('Hello!')
Logs from execution
Logs from graph:
*** Found local files:
*** * /opt/airflow/logs/dag_id=crm-elastic-dag/run_id=scheduled__2023-07-23T00:00:00+00:00/task_id=hello/attempt=1.log
[2023-07-24, 21:38:33 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: crm-elastic-dag.hello scheduled__2023-07-23T00:00:00+00:00 [queued]>
[2023-07-24, 21:38:33 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: crm-elastic-dag.hello scheduled__2023-07-23T00:00:00+00:00 [queued]>
[2023-07-24, 21:38:33 UTC] {taskinstance.py:1308} INFO - Starting attempt 1 of 1
[2023-07-24, 21:38:33 UTC] {taskinstance.py:1327} INFO - Executing <Task(PythonOperator): hello> on 2023-07-23 00:00:00+00:00
[2023-07-24, 21:38:33 UTC] {standard_task_runner.py:57} INFO - Started process 64 to run task
[2023-07-24, 21:38:33 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'crm-elastic-dag', 'hello', 'scheduled__2023-07-23T00:00:00+00:00', '--job-id', '78', '--raw', '--subdir', 'DAGS_FOLDER/crm-elastig-dag.py', '--cfg-path', '/tmp/tmpxhij1w1e']
[2023-07-24, 21:38:33 UTC] {standard_task_runner.py:85} INFO - Job 78: Subtask hello
[2023-07-24, 21:38:33 UTC] {task_command.py:410} INFO - Running <TaskInstance: crm-elastic-dag.hello scheduled__2023-07-23T00:00:00+00:00 [running]> on host 2862783298b4
[2023-07-24, 21:38:33 UTC] {taskinstance.py:1547} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='Marko Smej' AIRFLOW_CTX_DAG_ID='crm-elastic-dag' AIRFLOW_CTX_TASK_ID='hello' AIRFLOW_CTX_EXECUTION_DATE='2023-07-23T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-07-23T00:00:00+00:00'
[2023-07-24, 21:38:33 UTC] {logging_mixin.py:149} INFO - Hello!
[2023-07-24, 21:38:33 UTC] {python.py:183} INFO - Done. Returned value was: None
[2023-07-24, 21:38:33 UTC] {taskinstance.py:1350} INFO - Marking task as SUCCESS. dag_id=crm-elastic-dag, task_id=hello, execution_date=20230723T000000, start_date=20230724T213833, end_date=20230724T213833
[2023-07-24, 21:38:33 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2023-07-24, 21:38:33 UTC] {taskinstance.py:2653} INFO - 0 downstream tasks scheduled from follow-on schedule check
from airflow-provider-grafana-loki.
i am not able to reproduce the issue using the docker-compose deployment you shared.
it will help, if you could run the below debugging code inside the webserver container.
import airflow
from grafana_loki_provider.hooks.loki import LokiHook
from grafana_loki_provider.log.loki_task_handler import LokiTaskHandler
from datetime import datetime as dt, timedelta
hook = LokiHook(loki_conn_id='loki')
c = hook.get_conn()
print("base_url: {}".format(hook.base_url))
endpoint = hook.v1_base_endpoint.format(method='push')
url = hook.url_from_endpoint(endpoint)
print("url: {}".format(url))
log_handler = LokiTaskHandler(base_log_folder = "", name='test')
log_handler.hook = hook
log_handler.labels = {"name":"test_loki_log"}
log_handler.extras = {}
r =log_handler.loki_write(['test_loki_line_1','test_loki_line_2'])
print("write response")
print(r)
print("wrote log to loki")
print('going to read log from loki')
params = {
"query": '{name="test_loki_log"}',
"limit":1000,
"direction": "forward",
}
print("params: {}".format(params))
data = hook.query_range(params)
print(data)
you can do it by using the below command, please replace the AIRFLOW_WEBSERVER_CONTAINER_NAME
with the actual container name:
docker exec AIRFLOW_WEBSERVER_CONTAINER_NAME /bin/bash -c "curl -k -s https://gist.githubusercontent.com/snjypl/e73f13a090ce65acb7bf5d0d70ad3038/raw/ef9a2a3484bb602a600423e63cbd30e4e2b3c3c9/debug_loki_logging -o /tmp/debug_loki.py & python /tmp/debug_loki.py"
from airflow-provider-grafana-loki.
Here are the logs from it:
curl -k -s https://gist.githubusercontent.com/snjypl/e73f13a090ce65acb7bf5d0d70ad3038/raw/ef9a2a3484bb602a600423e63cbd30e4e2b3c3c9/debug_loki_logging -o /tmp/debug_loki.py & python /tmp/debug_loki.py
[1] 16801
[2023-08-04T10:10:24.856+0000] {base.py:73} INFO - Using connection ID 'loki' for task execution.
base_url: http://loki:3100
url: http://loki:3100/loki/api/v1/push
[2023-08-04T10:10:24.861+0000] {base.py:73} INFO - Using connection ID 'loki' for task execution.
write response
None
wrote log to loki
going to read log from loki
params: {'query': '{name="test_loki_log"}', 'limit': 1000, 'direction': 'forward'}
[2023-08-04T10:10:24.873+0000] {base.py:73} INFO - Using connection ID 'loki' for task execution.
{'status': 'success', 'data': {'resultType': 'streams', 'result': [{'stream': {'name': 'test_loki_log'}, 'values': [['1691143824858186496', '{"line": "test_loki_line_1"}'], ['1691143824858215936', '{"line": "test_loki_line_2"}']]}], 'stats': {'summary': {'bytesProcessedPerSecond': 2028, 'linesProcessedPerSecond': 72, 'totalBytesProcessed': 56, 'totalLinesProcessed': 2, 'execTime': 0.027612, 'queueTime': 0.083636, 'subqueries': 0, 'totalEntriesReturned': 2, 'splits': 3, 'shards': 48}, 'querier': {'store': {'totalChunksRef': 0, 'totalChunksDownloaded': 0, 'chunksDownloadTime': 0, 'chunk': {'headChunkBytes': 0, 'headChunkLines': 0, 'decompressedBytes': 0, 'decompressedLines': 0, 'compressedBytes': 0, 'totalDuplicates': 0}}}, 'ingester': {'totalReached': 48, 'totalChunksMatched': 1, 'totalBatches': 1, 'totalLinesSent': 2, 'store': {'totalChunksRef': 0, 'totalChunksDownloaded': 0, 'chunksDownloadTime': 0, 'chunk': {'headChunkBytes': 56, 'headChunkLines': 2, 'decompressedBytes': 0, 'decompressedLines': 0, 'compressedBytes': 0, 'totalDuplicates': 0}}}, 'cache': {'chunk': {'entriesFound': 0, 'entriesRequested': 0, 'entriesStored': 0, 'bytesReceived': 0, 'bytesSent': 0, 'requests': 0, 'downloadTime': 0}, 'index': {'entriesFound': 0, 'entriesRequested': 0, 'entriesStored': 0, 'bytesReceived': 0, 'bytesSent': 0, 'requests': 0, 'downloadTime': 0}, 'result': {'entriesFound': 0, 'entriesRequested': 0, 'entriesStored': 0, 'bytesReceived': 0, 'bytesSent': 0, 'requests': 0, 'downloadTime': 0}}}}}
[1]+ Done curl -k -s https://gist.githubusercontent.com/snjypl/e73f13a090ce65acb7bf5d0d70ad3038/raw/ef9a2a3484bb602a600423e63cbd30e4e2b3c3c9/debug_loki_logging -o /tmp/debug_loki.py
This seems to have created the loki name successfully
from airflow-provider-grafana-loki.
@markoftw i don't see any issue there. you were able to push to loki and also read from it.
maybe the issue in with the logger configuration.
can you share the airflow_local_settings.py
or equivalent where you are configuring the LokiTaskHandler. it will be better if you share the whole module?
also the output of the following:
airflow config list --section logging
Please make sure it does not contain any confidential info before sharing it here.
from airflow-provider-grafana-loki.
Sure, here are they:
Click me (airflow_local_settings.py)
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Airflow logging settings."""
from __future__ import annotations
import os
from pathlib import Path
from typing import Any
from urllib.parse import urlsplit
from airflow.configuration import conf
from airflow.exceptions import AirflowException
LOG_LEVEL: str = conf.get_mandatory_value("logging", "LOGGING_LEVEL").upper()
# Flask appbuilder's info level log is very verbose,
# so it's set to 'WARN' by default.
FAB_LOG_LEVEL: str = conf.get_mandatory_value("logging", "FAB_LOGGING_LEVEL").upper()
LOG_FORMAT: str = conf.get_mandatory_value("logging", "LOG_FORMAT")
DAG_PROCESSOR_LOG_FORMAT: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_FORMAT")
LOG_FORMATTER_CLASS: str = conf.get_mandatory_value(
"logging", "LOG_FORMATTER_CLASS", fallback="airflow.utils.log.timezone_aware.TimezoneAware"
)
COLORED_LOG_FORMAT: str = conf.get_mandatory_value("logging", "COLORED_LOG_FORMAT")
COLORED_LOG: bool = conf.getboolean("logging", "COLORED_CONSOLE_LOG")
COLORED_FORMATTER_CLASS: str = conf.get_mandatory_value("logging", "COLORED_FORMATTER_CLASS")
DAG_PROCESSOR_LOG_TARGET: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_TARGET")
BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "BASE_LOG_FOLDER")
PROCESSOR_LOG_FOLDER: str = conf.get_mandatory_value("scheduler", "CHILD_PROCESS_LOG_DIRECTORY")
DAG_PROCESSOR_MANAGER_LOG_LOCATION: str = conf.get_mandatory_value(
"logging", "DAG_PROCESSOR_MANAGER_LOG_LOCATION"
)
# FILENAME_TEMPLATE only uses in Remote Logging Handlers since Airflow 2.3.3
# All of these handlers inherited from FileTaskHandler and providing any value rather than None
# would raise deprecation warning.
FILENAME_TEMPLATE: str | None = None
PROCESSOR_FILENAME_TEMPLATE: str = conf.get_mandatory_value("logging", "LOG_PROCESSOR_FILENAME_TEMPLATE")
DEFAULT_LOGGING_CONFIG: dict[str, Any] = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"airflow": {
"format": LOG_FORMAT,
"class": LOG_FORMATTER_CLASS,
},
"airflow_coloured": {
"format": COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT,
"class": COLORED_FORMATTER_CLASS if COLORED_LOG else LOG_FORMATTER_CLASS,
},
"source_processor": {
"format": DAG_PROCESSOR_LOG_FORMAT,
"class": LOG_FORMATTER_CLASS,
},
},
"filters": {
"mask_secrets": {
"()": "airflow.utils.log.secrets_masker.SecretsMasker",
},
},
"handlers": {
"console": {
"class": "airflow.utils.log.logging_mixin.RedirectStdHandler",
"formatter": "airflow_coloured",
"stream": "sys.stdout",
"filters": ["mask_secrets"],
},
"task": {
"class": "airflow.utils.log.file_task_handler.FileTaskHandler",
"formatter": "airflow",
"base_log_folder": os.path.expanduser(BASE_LOG_FOLDER),
"filters": ["mask_secrets"],
},
"processor": {
"class": "airflow.utils.log.file_processor_handler.FileProcessorHandler",
"formatter": "airflow",
"base_log_folder": os.path.expanduser(PROCESSOR_LOG_FOLDER),
"filename_template": PROCESSOR_FILENAME_TEMPLATE,
"filters": ["mask_secrets"],
},
"processor_to_stdout": {
"class": "airflow.utils.log.logging_mixin.RedirectStdHandler",
"formatter": "source_processor",
"stream": "sys.stdout",
"filters": ["mask_secrets"],
},
},
"loggers": {
"airflow.processor": {
"handlers": ["processor_to_stdout" if DAG_PROCESSOR_LOG_TARGET == "stdout" else "processor"],
"level": LOG_LEVEL,
# Set to true here (and reset via set_context) so that if no file is configured we still get logs!
"propagate": True,
},
"airflow.task": {
"handlers": ["task"],
"level": LOG_LEVEL,
# Set to true here (and reset via set_context) so that if no file is configured we still get logs!
"propagate": True,
"filters": ["mask_secrets"],
},
"flask_appbuilder": {
"handlers": ["console"],
"level": FAB_LOG_LEVEL,
"propagate": True,
},
},
"root": {
"handlers": ["console"],
"level": LOG_LEVEL,
"filters": ["mask_secrets"],
},
}
EXTRA_LOGGER_NAMES: str | None = conf.get("logging", "EXTRA_LOGGER_NAMES", fallback=None)
if EXTRA_LOGGER_NAMES:
new_loggers = {
logger_name.strip(): {
"handlers": ["console"],
"level": LOG_LEVEL,
"propagate": True,
}
for logger_name in EXTRA_LOGGER_NAMES.split(",")
}
DEFAULT_LOGGING_CONFIG["loggers"].update(new_loggers)
DEFAULT_DAG_PARSING_LOGGING_CONFIG: dict[str, dict[str, dict[str, Any]]] = {
"handlers": {
"processor_manager": {
"class": "airflow.utils.log.non_caching_file_handler.NonCachingRotatingFileHandler",
"formatter": "airflow",
"filename": DAG_PROCESSOR_MANAGER_LOG_LOCATION,
"mode": "a",
"maxBytes": 104857600, # 100MB
"backupCount": 5,
}
},
"loggers": {
"airflow.processor_manager": {
"handlers": ["processor_manager"],
"level": LOG_LEVEL,
"propagate": False,
}
},
}
# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is set.
# This is to avoid exceptions when initializing RotatingFileHandler multiple times
# in multiple processes.
if os.environ.get("CONFIG_PROCESSOR_MANAGER_LOGGER") == "True":
DEFAULT_LOGGING_CONFIG["handlers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"])
DEFAULT_LOGGING_CONFIG["loggers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["loggers"])
# Manually create log directory for processor_manager handler as RotatingFileHandler
# will only create file but not the directory.
processor_manager_handler_config: dict[str, Any] = DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"][
"processor_manager"
]
directory: str = os.path.dirname(processor_manager_handler_config["filename"])
Path(directory).mkdir(parents=True, exist_ok=True, mode=0o755)
##################
# Remote logging #
##################
REMOTE_LOGGING: bool = conf.getboolean("logging", "remote_logging")
if REMOTE_LOGGING:
ELASTICSEARCH_HOST: str | None = conf.get("elasticsearch", "HOST")
# Storage bucket URL for remote logging
# S3 buckets should start with "s3://"
# Cloudwatch log groups should start with "cloudwatch://"
# GCS buckets should start with "gs://"
# WASB buckets should start with "wasb"
# HDFS path should start with "hdfs://"
# just to help Airflow select correct handler
REMOTE_BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "REMOTE_BASE_LOG_FOLDER")
REMOTE_TASK_HANDLER_KWARGS = conf.getjson("logging", "REMOTE_TASK_HANDLER_KWARGS", fallback={})
if REMOTE_BASE_LOG_FOLDER.startswith("s3://"):
S3_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
"task": {
"class": "airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler",
"formatter": "airflow",
"base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
"s3_log_folder": REMOTE_BASE_LOG_FOLDER,
"filename_template": FILENAME_TEMPLATE,
},
}
DEFAULT_LOGGING_CONFIG["handlers"].update(S3_REMOTE_HANDLERS)
elif REMOTE_BASE_LOG_FOLDER.startswith("cloudwatch://"):
url_parts = urlsplit(REMOTE_BASE_LOG_FOLDER)
CLOUDWATCH_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
"task": {
"class": "airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler",
"formatter": "airflow",
"base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
"log_group_arn": url_parts.netloc + url_parts.path,
"filename_template": FILENAME_TEMPLATE,
},
}
DEFAULT_LOGGING_CONFIG["handlers"].update(CLOUDWATCH_REMOTE_HANDLERS)
elif REMOTE_BASE_LOG_FOLDER.startswith("gs://"):
key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None)
GCS_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
"task": {
"class": "airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler",
"formatter": "airflow",
"base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
"gcs_log_folder": REMOTE_BASE_LOG_FOLDER,
"filename_template": FILENAME_TEMPLATE,
"gcp_key_path": key_path,
},
}
DEFAULT_LOGGING_CONFIG["handlers"].update(GCS_REMOTE_HANDLERS)
elif REMOTE_BASE_LOG_FOLDER.startswith("wasb"):
WASB_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = {
"task": {
"class": "airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler",
"formatter": "airflow",
"base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
"wasb_log_folder": REMOTE_BASE_LOG_FOLDER,
"wasb_container": "airflow-logs",
"filename_template": FILENAME_TEMPLATE,
},
}
DEFAULT_LOGGING_CONFIG["handlers"].update(WASB_REMOTE_HANDLERS)
elif REMOTE_BASE_LOG_FOLDER.startswith("stackdriver://"):
key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None)
# stackdriver:///airflow-tasks => airflow-tasks
log_name = urlsplit(REMOTE_BASE_LOG_FOLDER).path[1:]
STACKDRIVER_REMOTE_HANDLERS = {
"task": {
"class": "airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler",
"formatter": "airflow",
"name": log_name,
"gcp_key_path": key_path,
}
}
DEFAULT_LOGGING_CONFIG["handlers"].update(STACKDRIVER_REMOTE_HANDLERS)
elif REMOTE_BASE_LOG_FOLDER.startswith("oss://"):
OSS_REMOTE_HANDLERS = {
"task": {
"class": "airflow.providers.alibaba.cloud.log.oss_task_handler.OSSTaskHandler",
"formatter": "airflow",
"base_log_folder": os.path.expanduser(BASE_LOG_FOLDER),
"oss_log_folder": REMOTE_BASE_LOG_FOLDER,
"filename_template": FILENAME_TEMPLATE,
},
}
DEFAULT_LOGGING_CONFIG["handlers"].update(OSS_REMOTE_HANDLERS)
elif REMOTE_BASE_LOG_FOLDER.startswith("hdfs://"):
HDFS_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
"task": {
"class": "airflow.providers.apache.hdfs.log.hdfs_task_handler.HdfsTaskHandler",
"formatter": "airflow",
"base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
"hdfs_log_folder": REMOTE_BASE_LOG_FOLDER,
"filename_template": FILENAME_TEMPLATE,
},
}
DEFAULT_LOGGING_CONFIG["handlers"].update(HDFS_REMOTE_HANDLERS)
elif ELASTICSEARCH_HOST:
ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK")
ELASTICSEARCH_FRONTEND: str = conf.get_mandatory_value("elasticsearch", "frontend")
ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean("elasticsearch", "WRITE_STDOUT")
ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean("elasticsearch", "JSON_FORMAT")
ELASTICSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("elasticsearch", "JSON_FIELDS")
ELASTICSEARCH_HOST_FIELD: str = conf.get_mandatory_value("elasticsearch", "HOST_FIELD")
ELASTICSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("elasticsearch", "OFFSET_FIELD")
ELASTIC_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = {
"task": {
"class": "airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler",
"formatter": "airflow",
"base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
"filename_template": FILENAME_TEMPLATE,
"end_of_log_mark": ELASTICSEARCH_END_OF_LOG_MARK,
"host": ELASTICSEARCH_HOST,
"frontend": ELASTICSEARCH_FRONTEND,
"write_stdout": ELASTICSEARCH_WRITE_STDOUT,
"json_format": ELASTICSEARCH_JSON_FORMAT,
"json_fields": ELASTICSEARCH_JSON_FIELDS,
"host_field": ELASTICSEARCH_HOST_FIELD,
"offset_field": ELASTICSEARCH_OFFSET_FIELD,
},
}
DEFAULT_LOGGING_CONFIG["handlers"].update(ELASTIC_REMOTE_HANDLERS)
elif REMOTE_BASE_LOG_FOLDER.startswith('loki'):
LOKI_HANDLER: Dict[str, Dict[str, Union[str, bool]]] = {
'task': {
'class': 'grafana_loki_provider.log.loki_task_handler.LokiTaskHandler',
'formatter': 'airflow',
'name':"airflow_task",
'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)),
'filename_template': FILENAME_TEMPLATE
},
}
DEFAULT_LOGGING_CONFIG['handlers'].update(LOKI_HANDLER)
else:
raise AirflowException(
"Incorrect remote log configuration. Please check the configuration of option 'host' in "
"section 'elasticsearch' if you are using Elasticsearch. In the other case, "
"'remote_base_log_folder' option in the 'logging' section."
)
DEFAULT_LOGGING_CONFIG["handlers"]["task"].update(REMOTE_TASK_HANDLER_KWARGS)
airflow config list --section logging
[logging]
base_log_folder = /opt/airflow/logs
remote_logging = True
remote_log_conn_id = loki
delete_local_logs = False
google_key_path =
remote_base_log_folder = loki
remote_task_handler_kwargs =
encrypt_s3_logs = False
logging_level = DEBUG
celery_logging_level =
fab_logging_level = WARNING
logging_config_class = log_config.DEFAULT_LOGGING_CONFIG
colored_console_log = True
colored_log_format = [%(blue)s%(asctime)s%(reset)s] {%(blue)s%(filename)s:%(reset)s%(lineno)d} %(log_color)s%(levelname)s%(reset)s - %(log_color)s%(message)s%(reset)s
colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatter
log_format = [%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s
simple_log_format = %(asctime)s %(levelname)s - %(message)s
dag_processor_log_target = file
dag_processor_log_format = [%(asctime)s] [SOURCE:DAG_PROCESSOR] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s
log_formatter_class = airflow.utils.log.timezone_aware.TimezoneAware
secret_mask_adapter =
task_log_prefix_template =
log_filename_template = dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/{% if ti.map_index >= 0 %}map_index={{ ti.map_index }}/{% endif %}attempt={{ try_number }}.log
log_processor_filename_template = {{ filename }}.log
dag_processor_manager_log_location = /opt/airflow/logs/dag_processor_manager/dag_processor_manager.log
task_log_reader = task
extra_logger_names =
worker_log_server_port = 8793
trigger_log_server_port = 8794
file_task_handler_new_folder_permissions = 0o775
file_task_handler_new_file_permissions = 0o664
from airflow-provider-grafana-loki.
@markoftw , i tried your dag and logger config. am still not able to reproduce the issue.
in your code, the 'Using connection ID 'loki' for task execution.` line is missing in the logs. looks like the loki connection is not being used. it is mostly a configuration issue.
[2023-08-05T12:18:08.135+0530] {task_command.py:410} INFO - Running <TaskInstance: crm-elastic-dag.hello manual__2023-08-05T06:48:06.821339+00:00 [queued]> on host PF3Y5KBZ.
[2023-08-05T12:18:08.797+0530] {base.py:73} INFO - Using connection ID 'loki' for task execution.
[2023-08-05T12:18:08.811+0530] {dagrun.py:630} INFO - Marking run <DagRun crm-elastic-dag @ 2023-08-05 06:48:06.821339+00:00: manual__2023-08-05T06:48:06.821339+00:00, state:running, queued_at: 2023-08-05 06:48:06.849665+00:00. externally triggered: True> successful
i think, it would be good if we could connect on slack and troubleshoot the issue.
otherwise, make sure the webserver/worker and scheduler are all having the same configuration and you are not overriding the log handlers in any of your dag code.
from inside the worker container you can try executing the below code:
import airflow
import logging
logger = logging.root.manager.loggerDict['airflow.task']
print(logger)
loki_handler = logger.handlers[0]
print(loki_handler)
print(loki_handler.hook)
from airflow-provider-grafana-loki.
@snjypl I attempted to utilize the "airflow-provider-grafana-loki" provider to send logs to Loki. However, the logs do not appear in the Airflow UI. Is it necessary for the Loki connection to have read access, implying that the Airflow UI retrieves logs directly from Loki?
from airflow-provider-grafana-loki.
@ragavanks airflow webserver reads the logs from Loki. it will be better if you could open a new issue with details. eg: log configuration, airflow task log, airflow webserver log etc.
you can also try the debugging steps mentioned in this issue and see if it helps.
from airflow-provider-grafana-loki.
The issue was solved.
It was a misconfiguration in the docker-compose.yml
for the worker
container.
The worker
and webserver
containers (possibly others if required, depending on your needs) need to have the correct airflow.cfg
bound to their volumes.
e.g.
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
- ${AIRFLOW_PROJ_DIR:-.}/airflow.cfg:/opt/airflow/airflow.cfg
Verify this in each container with airflow config list --section logging
to have the following settings:
[logging]
base_log_folder = /opt/airflow/logs
remote_logging = True
remote_log_conn_id = loki
logging_config_class = log_config.DEFAULT_LOGGING_CONFIG
...
Thanks to @snjypl!
from airflow-provider-grafana-loki.
Related Issues (10)
- No module named 'airflow.providers.grafana_loki_provider' HOT 2
- Date intervals in Loki queries too wide? HOT 2
- Add custom labels to pushed logs HOT 2
- Configurable log reading range HOT 1
- Error viewing logs in UI HOT 35
- Debug: Error viewing logs when running in airflow standlone mode HOT 2
- Plugin does not work with Python 3.7 (cannot import name 'cached_property' from 'functools') HOT 2
- TypeError: unsupported operand type(s) for +: 'NoneType' and 'datetime.timedelta' HOT 1
- Grafana Loki (Loki) connection hook HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from airflow-provider-grafana-loki.