Giter Club home page Giter Club logo

Comments (12)

PApostol avatar PApostol commented on July 18, 2024

I'm having the same issue.

from airflow-provider-grafana-loki.

snjypl avatar snjypl commented on July 18, 2024

@markoftw @PApostol

can you please share some more details:

  1. airflow version
  2. airflow grafana provider version
  3. the loki log handler configuration
  4. are you getting the error for all the task?
  5. there is a known issue with fetching logs for older task retries. see if you can fetch log for latest task try.
  6. are the task logs visible in grafana?
  7. describe your airlfow setup. are you using airlfow standalone mode?
  8. can you try reproducing the issue with an example dag?

from airflow-provider-grafana-loki.

snjypl avatar snjypl commented on July 18, 2024

can you please the some more logs around the loki query?

from airflow-provider-grafana-loki.

markoftw avatar markoftw commented on July 18, 2024
  1. v2.6.2
  2. 0.0.2
  3. Host 'loki' is a resolvable container name in docker (same docker network) image
  4. Yes
  5. Have tried, the same thing for all
  6. I also don't see a Loki label within the Grafana explorer
  7. Using the docker-compose setup https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html
  8. Example dag I have tried

crm-elastig-dag.py

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from includes.test import hello


#from grafana_loki_provider.hooks.loki import LokiHook
#loki_hook = LokiHook(loki_conn_id="loki")
#import logging
#logging.info("HELLO")

import logging

logger = logging.getLogger(__name__)
logger.info("This is a log message")


# This seems to work, but it's a manual configuration
"""
params = {
    "query": "{foo=\"bar2\"}",
    "limit":5000,
    "direction": "forward",
}

resp = loki_hook.query_range(params)

print(resp)
"""


args = {
    'owner': 'Marko',
    'start_date': days_ago(1)
}

dag = DAG(
    dag_id='crm-elastic-dag',
    default_args=args,
    schedule_interval='@daily'
)

with dag:
    hello_world = PythonOperator(
        task_id='hello',
        python_callable=hello,
        # provide_context=True
    )

test.py

def hello():
    print('Hello!')

Logs from execution

image

Logs from graph:

*** Found local files:
***   * /opt/airflow/logs/dag_id=crm-elastic-dag/run_id=scheduled__2023-07-23T00:00:00+00:00/task_id=hello/attempt=1.log
[2023-07-24, 21:38:33 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: crm-elastic-dag.hello scheduled__2023-07-23T00:00:00+00:00 [queued]>
[2023-07-24, 21:38:33 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: crm-elastic-dag.hello scheduled__2023-07-23T00:00:00+00:00 [queued]>
[2023-07-24, 21:38:33 UTC] {taskinstance.py:1308} INFO - Starting attempt 1 of 1
[2023-07-24, 21:38:33 UTC] {taskinstance.py:1327} INFO - Executing <Task(PythonOperator): hello> on 2023-07-23 00:00:00+00:00
[2023-07-24, 21:38:33 UTC] {standard_task_runner.py:57} INFO - Started process 64 to run task
[2023-07-24, 21:38:33 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'crm-elastic-dag', 'hello', 'scheduled__2023-07-23T00:00:00+00:00', '--job-id', '78', '--raw', '--subdir', 'DAGS_FOLDER/crm-elastig-dag.py', '--cfg-path', '/tmp/tmpxhij1w1e']
[2023-07-24, 21:38:33 UTC] {standard_task_runner.py:85} INFO - Job 78: Subtask hello
[2023-07-24, 21:38:33 UTC] {task_command.py:410} INFO - Running <TaskInstance: crm-elastic-dag.hello scheduled__2023-07-23T00:00:00+00:00 [running]> on host 2862783298b4
[2023-07-24, 21:38:33 UTC] {taskinstance.py:1547} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='Marko Smej' AIRFLOW_CTX_DAG_ID='crm-elastic-dag' AIRFLOW_CTX_TASK_ID='hello' AIRFLOW_CTX_EXECUTION_DATE='2023-07-23T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-07-23T00:00:00+00:00'
[2023-07-24, 21:38:33 UTC] {logging_mixin.py:149} INFO - Hello!
[2023-07-24, 21:38:33 UTC] {python.py:183} INFO - Done. Returned value was: None
[2023-07-24, 21:38:33 UTC] {taskinstance.py:1350} INFO - Marking task as SUCCESS. dag_id=crm-elastic-dag, task_id=hello, execution_date=20230723T000000, start_date=20230724T213833, end_date=20230724T213833
[2023-07-24, 21:38:33 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2023-07-24, 21:38:33 UTC] {taskinstance.py:2653} INFO - 0 downstream tasks scheduled from follow-on schedule check

Query when opening logs:
image

from airflow-provider-grafana-loki.

snjypl avatar snjypl commented on July 18, 2024

@markoftw

i am not able to reproduce the issue using the docker-compose deployment you shared.

it will help, if you could run the below debugging code inside the webserver container.

import airflow
from grafana_loki_provider.hooks.loki import LokiHook
from grafana_loki_provider.log.loki_task_handler import LokiTaskHandler
from datetime import datetime as dt, timedelta


hook = LokiHook(loki_conn_id='loki')
c = hook.get_conn()


print("base_url: {}".format(hook.base_url))
endpoint = hook.v1_base_endpoint.format(method='push')
url = hook.url_from_endpoint(endpoint)
print("url: {}".format(url))

log_handler = LokiTaskHandler(base_log_folder = "", name='test')

log_handler.hook = hook

log_handler.labels = {"name":"test_loki_log"}
log_handler.extras = {}

r =log_handler.loki_write(['test_loki_line_1','test_loki_line_2'])

print("write response")
print(r)
print("wrote log to loki")
print('going to read log from loki')

params = {
	"query": '{name="test_loki_log"}',
	"limit":1000,
	"direction": "forward",
}
print("params: {}".format(params))

data = hook.query_range(params)
print(data)

you can do it by using the below command, please replace the AIRFLOW_WEBSERVER_CONTAINER_NAME with the actual container name:

docker exec AIRFLOW_WEBSERVER_CONTAINER_NAME   /bin/bash  -c "curl -k -s  https://gist.githubusercontent.com/snjypl/e73f13a090ce65acb7bf5d0d70ad3038/raw/ef9a2a3484bb602a600423e63cbd30e4e2b3c3c9/debug_loki_logging -o /tmp/debug_loki.py  & python /tmp/debug_loki.py"

from airflow-provider-grafana-loki.

markoftw avatar markoftw commented on July 18, 2024

Here are the logs from it:

curl -k -s  https://gist.githubusercontent.com/snjypl/e73f13a090ce65acb7bf5d0d70ad3038/raw/ef9a2a3484bb602a600423e63cbd30e4e2b3c3c9/debug_loki_logging -o /tmp/debug_loki.py  & python /tmp/debug_loki.py
[1] 16801
[2023-08-04T10:10:24.856+0000] {base.py:73} INFO - Using connection ID 'loki' for task execution.
base_url: http://loki:3100
url: http://loki:3100/loki/api/v1/push
[2023-08-04T10:10:24.861+0000] {base.py:73} INFO - Using connection ID 'loki' for task execution.
write response
None
wrote log to loki
going to read log from loki
params: {'query': '{name="test_loki_log"}', 'limit': 1000, 'direction': 'forward'}
[2023-08-04T10:10:24.873+0000] {base.py:73} INFO - Using connection ID 'loki' for task execution.
{'status': 'success', 'data': {'resultType': 'streams', 'result': [{'stream': {'name': 'test_loki_log'}, 'values': [['1691143824858186496', '{"line": "test_loki_line_1"}'], ['1691143824858215936', '{"line": "test_loki_line_2"}']]}], 'stats': {'summary': {'bytesProcessedPerSecond': 2028, 'linesProcessedPerSecond': 72, 'totalBytesProcessed': 56, 'totalLinesProcessed': 2, 'execTime': 0.027612, 'queueTime': 0.083636, 'subqueries': 0, 'totalEntriesReturned': 2, 'splits': 3, 'shards': 48}, 'querier': {'store': {'totalChunksRef': 0, 'totalChunksDownloaded': 0, 'chunksDownloadTime': 0, 'chunk': {'headChunkBytes': 0, 'headChunkLines': 0, 'decompressedBytes': 0, 'decompressedLines': 0, 'compressedBytes': 0, 'totalDuplicates': 0}}}, 'ingester': {'totalReached': 48, 'totalChunksMatched': 1, 'totalBatches': 1, 'totalLinesSent': 2, 'store': {'totalChunksRef': 0, 'totalChunksDownloaded': 0, 'chunksDownloadTime': 0, 'chunk': {'headChunkBytes': 56, 'headChunkLines': 2, 'decompressedBytes': 0, 'decompressedLines': 0, 'compressedBytes': 0, 'totalDuplicates': 0}}}, 'cache': {'chunk': {'entriesFound': 0, 'entriesRequested': 0, 'entriesStored': 0, 'bytesReceived': 0, 'bytesSent': 0, 'requests': 0, 'downloadTime': 0}, 'index': {'entriesFound': 0, 'entriesRequested': 0, 'entriesStored': 0, 'bytesReceived': 0, 'bytesSent': 0, 'requests': 0, 'downloadTime': 0}, 'result': {'entriesFound': 0, 'entriesRequested': 0, 'entriesStored': 0, 'bytesReceived': 0, 'bytesSent': 0, 'requests': 0, 'downloadTime': 0}}}}}
[1]+  Done                    curl -k -s https://gist.githubusercontent.com/snjypl/e73f13a090ce65acb7bf5d0d70ad3038/raw/ef9a2a3484bb602a600423e63cbd30e4e2b3c3c9/debug_loki_logging -o /tmp/debug_loki.py

This seems to have created the loki name successfully

image

from airflow-provider-grafana-loki.

snjypl avatar snjypl commented on July 18, 2024

@markoftw i don't see any issue there. you were able to push to loki and also read from it.
maybe the issue in with the logger configuration.

can you share the airflow_local_settings.py or equivalent where you are configuring the LokiTaskHandler. it will be better if you share the whole module?

also the output of the following:
airflow config list --section logging

Please make sure it does not contain any confidential info before sharing it here.

from airflow-provider-grafana-loki.

markoftw avatar markoftw commented on July 18, 2024

Sure, here are they:

Click me (airflow_local_settings.py)
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Airflow logging settings."""
from __future__ import annotations

import os
from pathlib import Path
from typing import Any
from urllib.parse import urlsplit

from airflow.configuration import conf
from airflow.exceptions import AirflowException

LOG_LEVEL: str = conf.get_mandatory_value("logging", "LOGGING_LEVEL").upper()


# Flask appbuilder's info level log is very verbose,
# so it's set to 'WARN' by default.
FAB_LOG_LEVEL: str = conf.get_mandatory_value("logging", "FAB_LOGGING_LEVEL").upper()

LOG_FORMAT: str = conf.get_mandatory_value("logging", "LOG_FORMAT")
DAG_PROCESSOR_LOG_FORMAT: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_FORMAT")

LOG_FORMATTER_CLASS: str = conf.get_mandatory_value(
  "logging", "LOG_FORMATTER_CLASS", fallback="airflow.utils.log.timezone_aware.TimezoneAware"
)

COLORED_LOG_FORMAT: str = conf.get_mandatory_value("logging", "COLORED_LOG_FORMAT")

COLORED_LOG: bool = conf.getboolean("logging", "COLORED_CONSOLE_LOG")

COLORED_FORMATTER_CLASS: str = conf.get_mandatory_value("logging", "COLORED_FORMATTER_CLASS")

DAG_PROCESSOR_LOG_TARGET: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_TARGET")

BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "BASE_LOG_FOLDER")

PROCESSOR_LOG_FOLDER: str = conf.get_mandatory_value("scheduler", "CHILD_PROCESS_LOG_DIRECTORY")

DAG_PROCESSOR_MANAGER_LOG_LOCATION: str = conf.get_mandatory_value(
  "logging", "DAG_PROCESSOR_MANAGER_LOG_LOCATION"
)

# FILENAME_TEMPLATE only uses in Remote Logging Handlers since Airflow 2.3.3
# All of these handlers inherited from FileTaskHandler and providing any value rather than None
# would raise deprecation warning.
FILENAME_TEMPLATE: str | None = None

PROCESSOR_FILENAME_TEMPLATE: str = conf.get_mandatory_value("logging", "LOG_PROCESSOR_FILENAME_TEMPLATE")

DEFAULT_LOGGING_CONFIG: dict[str, Any] = {
  "version": 1,
  "disable_existing_loggers": False,
  "formatters": {
      "airflow": {
          "format": LOG_FORMAT,
          "class": LOG_FORMATTER_CLASS,
      },
      "airflow_coloured": {
          "format": COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT,
          "class": COLORED_FORMATTER_CLASS if COLORED_LOG else LOG_FORMATTER_CLASS,
      },
      "source_processor": {
          "format": DAG_PROCESSOR_LOG_FORMAT,
          "class": LOG_FORMATTER_CLASS,
      },
  },
  "filters": {
      "mask_secrets": {
          "()": "airflow.utils.log.secrets_masker.SecretsMasker",
      },
  },
  "handlers": {
      "console": {
          "class": "airflow.utils.log.logging_mixin.RedirectStdHandler",
          "formatter": "airflow_coloured",
          "stream": "sys.stdout",
          "filters": ["mask_secrets"],
      },
      "task": {
          "class": "airflow.utils.log.file_task_handler.FileTaskHandler",
          "formatter": "airflow",
          "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER),
          "filters": ["mask_secrets"],
      },
      "processor": {
          "class": "airflow.utils.log.file_processor_handler.FileProcessorHandler",
          "formatter": "airflow",
          "base_log_folder": os.path.expanduser(PROCESSOR_LOG_FOLDER),
          "filename_template": PROCESSOR_FILENAME_TEMPLATE,
          "filters": ["mask_secrets"],
      },
      "processor_to_stdout": {
          "class": "airflow.utils.log.logging_mixin.RedirectStdHandler",
          "formatter": "source_processor",
          "stream": "sys.stdout",
          "filters": ["mask_secrets"],
      },
  },
  "loggers": {
      "airflow.processor": {
          "handlers": ["processor_to_stdout" if DAG_PROCESSOR_LOG_TARGET == "stdout" else "processor"],
          "level": LOG_LEVEL,
          # Set to true here (and reset via set_context) so that if no file is configured we still get logs!
          "propagate": True,
      },
      "airflow.task": {
          "handlers": ["task"],
          "level": LOG_LEVEL,
          # Set to true here (and reset via set_context) so that if no file is configured we still get logs!
          "propagate": True,
          "filters": ["mask_secrets"],
      },
      "flask_appbuilder": {
          "handlers": ["console"],
          "level": FAB_LOG_LEVEL,
          "propagate": True,
      },
  },
  "root": {
      "handlers": ["console"],
      "level": LOG_LEVEL,
      "filters": ["mask_secrets"],
  },
}

EXTRA_LOGGER_NAMES: str | None = conf.get("logging", "EXTRA_LOGGER_NAMES", fallback=None)
if EXTRA_LOGGER_NAMES:
  new_loggers = {
      logger_name.strip(): {
          "handlers": ["console"],
          "level": LOG_LEVEL,
          "propagate": True,
      }
      for logger_name in EXTRA_LOGGER_NAMES.split(",")
  }
  DEFAULT_LOGGING_CONFIG["loggers"].update(new_loggers)

DEFAULT_DAG_PARSING_LOGGING_CONFIG: dict[str, dict[str, dict[str, Any]]] = {
  "handlers": {
      "processor_manager": {
          "class": "airflow.utils.log.non_caching_file_handler.NonCachingRotatingFileHandler",
          "formatter": "airflow",
          "filename": DAG_PROCESSOR_MANAGER_LOG_LOCATION,
          "mode": "a",
          "maxBytes": 104857600,  # 100MB
          "backupCount": 5,
      }
  },
  "loggers": {
      "airflow.processor_manager": {
          "handlers": ["processor_manager"],
          "level": LOG_LEVEL,
          "propagate": False,
      }
  },
}

# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is set.
# This is to avoid exceptions when initializing RotatingFileHandler multiple times
# in multiple processes.
if os.environ.get("CONFIG_PROCESSOR_MANAGER_LOGGER") == "True":
  DEFAULT_LOGGING_CONFIG["handlers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"])
  DEFAULT_LOGGING_CONFIG["loggers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["loggers"])

  # Manually create log directory for processor_manager handler as RotatingFileHandler
  # will only create file but not the directory.
  processor_manager_handler_config: dict[str, Any] = DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"][
      "processor_manager"
  ]
  directory: str = os.path.dirname(processor_manager_handler_config["filename"])
  Path(directory).mkdir(parents=True, exist_ok=True, mode=0o755)

##################
# Remote logging #
##################

REMOTE_LOGGING: bool = conf.getboolean("logging", "remote_logging")

if REMOTE_LOGGING:

  ELASTICSEARCH_HOST: str | None = conf.get("elasticsearch", "HOST")

  # Storage bucket URL for remote logging
  # S3 buckets should start with "s3://"
  # Cloudwatch log groups should start with "cloudwatch://"
  # GCS buckets should start with "gs://"
  # WASB buckets should start with "wasb"
  # HDFS path should start with "hdfs://"
  # just to help Airflow select correct handler
  REMOTE_BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "REMOTE_BASE_LOG_FOLDER")
  REMOTE_TASK_HANDLER_KWARGS = conf.getjson("logging", "REMOTE_TASK_HANDLER_KWARGS", fallback={})

  if REMOTE_BASE_LOG_FOLDER.startswith("s3://"):
      S3_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
          "task": {
              "class": "airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler",
              "formatter": "airflow",
              "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
              "s3_log_folder": REMOTE_BASE_LOG_FOLDER,
              "filename_template": FILENAME_TEMPLATE,
          },
      }

      DEFAULT_LOGGING_CONFIG["handlers"].update(S3_REMOTE_HANDLERS)
  elif REMOTE_BASE_LOG_FOLDER.startswith("cloudwatch://"):
      url_parts = urlsplit(REMOTE_BASE_LOG_FOLDER)
      CLOUDWATCH_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
          "task": {
              "class": "airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler",
              "formatter": "airflow",
              "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
              "log_group_arn": url_parts.netloc + url_parts.path,
              "filename_template": FILENAME_TEMPLATE,
          },
      }

      DEFAULT_LOGGING_CONFIG["handlers"].update(CLOUDWATCH_REMOTE_HANDLERS)
  elif REMOTE_BASE_LOG_FOLDER.startswith("gs://"):
      key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None)
      GCS_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
          "task": {
              "class": "airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler",
              "formatter": "airflow",
              "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
              "gcs_log_folder": REMOTE_BASE_LOG_FOLDER,
              "filename_template": FILENAME_TEMPLATE,
              "gcp_key_path": key_path,
          },
      }

      DEFAULT_LOGGING_CONFIG["handlers"].update(GCS_REMOTE_HANDLERS)
  elif REMOTE_BASE_LOG_FOLDER.startswith("wasb"):
      WASB_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = {
          "task": {
              "class": "airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler",
              "formatter": "airflow",
              "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
              "wasb_log_folder": REMOTE_BASE_LOG_FOLDER,
              "wasb_container": "airflow-logs",
              "filename_template": FILENAME_TEMPLATE,
          },
      }

      DEFAULT_LOGGING_CONFIG["handlers"].update(WASB_REMOTE_HANDLERS)
  elif REMOTE_BASE_LOG_FOLDER.startswith("stackdriver://"):
      key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None)
      # stackdriver:///airflow-tasks => airflow-tasks
      log_name = urlsplit(REMOTE_BASE_LOG_FOLDER).path[1:]
      STACKDRIVER_REMOTE_HANDLERS = {
          "task": {
              "class": "airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler",
              "formatter": "airflow",
              "name": log_name,
              "gcp_key_path": key_path,
          }
      }

      DEFAULT_LOGGING_CONFIG["handlers"].update(STACKDRIVER_REMOTE_HANDLERS)
  elif REMOTE_BASE_LOG_FOLDER.startswith("oss://"):
      OSS_REMOTE_HANDLERS = {
          "task": {
              "class": "airflow.providers.alibaba.cloud.log.oss_task_handler.OSSTaskHandler",
              "formatter": "airflow",
              "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER),
              "oss_log_folder": REMOTE_BASE_LOG_FOLDER,
              "filename_template": FILENAME_TEMPLATE,
          },
      }
      DEFAULT_LOGGING_CONFIG["handlers"].update(OSS_REMOTE_HANDLERS)
  elif REMOTE_BASE_LOG_FOLDER.startswith("hdfs://"):
      HDFS_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
          "task": {
              "class": "airflow.providers.apache.hdfs.log.hdfs_task_handler.HdfsTaskHandler",
              "formatter": "airflow",
              "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
              "hdfs_log_folder": REMOTE_BASE_LOG_FOLDER,
              "filename_template": FILENAME_TEMPLATE,
          },
      }
      DEFAULT_LOGGING_CONFIG["handlers"].update(HDFS_REMOTE_HANDLERS)
  elif ELASTICSEARCH_HOST:
      ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK")
      ELASTICSEARCH_FRONTEND: str = conf.get_mandatory_value("elasticsearch", "frontend")
      ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean("elasticsearch", "WRITE_STDOUT")
      ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean("elasticsearch", "JSON_FORMAT")
      ELASTICSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("elasticsearch", "JSON_FIELDS")
      ELASTICSEARCH_HOST_FIELD: str = conf.get_mandatory_value("elasticsearch", "HOST_FIELD")
      ELASTICSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("elasticsearch", "OFFSET_FIELD")

      ELASTIC_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = {
          "task": {
              "class": "airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler",
              "formatter": "airflow",
              "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
              "filename_template": FILENAME_TEMPLATE,
              "end_of_log_mark": ELASTICSEARCH_END_OF_LOG_MARK,
              "host": ELASTICSEARCH_HOST,
              "frontend": ELASTICSEARCH_FRONTEND,
              "write_stdout": ELASTICSEARCH_WRITE_STDOUT,
              "json_format": ELASTICSEARCH_JSON_FORMAT,
              "json_fields": ELASTICSEARCH_JSON_FIELDS,
              "host_field": ELASTICSEARCH_HOST_FIELD,
              "offset_field": ELASTICSEARCH_OFFSET_FIELD,
          },
      }

      DEFAULT_LOGGING_CONFIG["handlers"].update(ELASTIC_REMOTE_HANDLERS)
  elif REMOTE_BASE_LOG_FOLDER.startswith('loki'):
      LOKI_HANDLER: Dict[str, Dict[str, Union[str, bool]]] = {
          'task': {
              'class': 'grafana_loki_provider.log.loki_task_handler.LokiTaskHandler',
              'formatter': 'airflow',
              'name':"airflow_task",
              'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)),
              'filename_template': FILENAME_TEMPLATE
          },
      }

      DEFAULT_LOGGING_CONFIG['handlers'].update(LOKI_HANDLER)
  else:
      raise AirflowException(
          "Incorrect remote log configuration. Please check the configuration of option 'host' in "
          "section 'elasticsearch' if you are using Elasticsearch. In the other case, "
          "'remote_base_log_folder' option in the 'logging' section."
      )
  DEFAULT_LOGGING_CONFIG["handlers"]["task"].update(REMOTE_TASK_HANDLER_KWARGS)

airflow config list --section logging

[logging]
base_log_folder = /opt/airflow/logs
remote_logging = True
remote_log_conn_id = loki
delete_local_logs = False
google_key_path =
remote_base_log_folder = loki
remote_task_handler_kwargs =
encrypt_s3_logs = False
logging_level = DEBUG
celery_logging_level =
fab_logging_level = WARNING
logging_config_class = log_config.DEFAULT_LOGGING_CONFIG
colored_console_log = True
colored_log_format = [%(blue)s%(asctime)s%(reset)s] {%(blue)s%(filename)s:%(reset)s%(lineno)d} %(log_color)s%(levelname)s%(reset)s - %(log_color)s%(message)s%(reset)s
colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatter
log_format = [%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s
simple_log_format = %(asctime)s %(levelname)s - %(message)s
dag_processor_log_target = file
dag_processor_log_format = [%(asctime)s] [SOURCE:DAG_PROCESSOR] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s
log_formatter_class = airflow.utils.log.timezone_aware.TimezoneAware
secret_mask_adapter =
task_log_prefix_template =
log_filename_template = dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/{% if ti.map_index >= 0 %}map_index={{ ti.map_index }}/{% endif %}attempt={{ try_number }}.log
log_processor_filename_template = {{ filename }}.log
dag_processor_manager_log_location = /opt/airflow/logs/dag_processor_manager/dag_processor_manager.log
task_log_reader = task
extra_logger_names =
worker_log_server_port = 8793
trigger_log_server_port = 8794
file_task_handler_new_folder_permissions = 0o775
file_task_handler_new_file_permissions = 0o664

from airflow-provider-grafana-loki.

snjypl avatar snjypl commented on July 18, 2024

@markoftw , i tried your dag and logger config. am still not able to reproduce the issue.

in your code, the 'Using connection ID 'loki' for task execution.` line is missing in the logs. looks like the loki connection is not being used. it is mostly a configuration issue.


[2023-08-05T12:18:08.135+0530] {task_command.py:410} INFO - Running <TaskInstance: crm-elastic-dag.hello manual__2023-08-05T06:48:06.821339+00:00 [queued]> on host PF3Y5KBZ.
[2023-08-05T12:18:08.797+0530] {base.py:73} INFO - Using connection ID 'loki' for task execution.
[2023-08-05T12:18:08.811+0530] {dagrun.py:630} INFO - Marking run <DagRun crm-elastic-dag @ 2023-08-05 06:48:06.821339+00:00: manual__2023-08-05T06:48:06.821339+00:00, state:running, queued_at: 2023-08-05 06:48:06.849665+00:00. externally triggered: True> successful

i think, it would be good if we could connect on slack and troubleshoot the issue.

otherwise, make sure the webserver/worker and scheduler are all having the same configuration and you are not overriding the log handlers in any of your dag code.

from inside the worker container you can try executing the below code:

import airflow
import logging
logger = logging.root.manager.loggerDict['airflow.task']
print(logger)
loki_handler = logger.handlers[0]
print(loki_handler)
print(loki_handler.hook)

from airflow-provider-grafana-loki.

ragavanks avatar ragavanks commented on July 18, 2024

@snjypl I attempted to utilize the "airflow-provider-grafana-loki" provider to send logs to Loki. However, the logs do not appear in the Airflow UI. Is it necessary for the Loki connection to have read access, implying that the Airflow UI retrieves logs directly from Loki?

from airflow-provider-grafana-loki.

snjypl avatar snjypl commented on July 18, 2024

@ragavanks airflow webserver reads the logs from Loki. it will be better if you could open a new issue with details. eg: log configuration, airflow task log, airflow webserver log etc.

you can also try the debugging steps mentioned in this issue and see if it helps.

from airflow-provider-grafana-loki.

markoftw avatar markoftw commented on July 18, 2024

The issue was solved.

It was a misconfiguration in the docker-compose.yml for the worker container.
The worker and webserver containers (possibly others if required, depending on your needs) need to have the correct airflow.cfg bound to their volumes.
e.g.

    volumes:
      - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
      - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
      - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
      - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
      - ${AIRFLOW_PROJ_DIR:-.}/airflow.cfg:/opt/airflow/airflow.cfg

Verify this in each container with airflow config list --section logging to have the following settings:

[logging]
base_log_folder = /opt/airflow/logs
remote_logging = True
remote_log_conn_id = loki
logging_config_class = log_config.DEFAULT_LOGGING_CONFIG
...

Thanks to @snjypl!

from airflow-provider-grafana-loki.

Related Issues (10)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.