skywalking监控airflow功能的设计文档
mufiye 内核新手

Skywalking监控airflow

相关的issue

相关的pull request:

一、前景提要

apache airflow是一个流行的workflow调度程序,目前skywalking还不支持监控airflow,但是这一块是被期待实现的。airflow通过statsd来暴露自己的metrics,而skywalking oap想要监控airflow的话,本质就是要拿到airflow的metric数据并且进行对数据进行再处理和展示。

二、Airflow Concepts

更多详细的介绍可以查看airflow官方文档

  • DAG:dag指的是有向无环图,它指定了任务之间的依赖关系,其决定了任务执行和运行重试的顺序。
  • Task: 任务是airflow的基本执行单元。Task通过DAG进行分配,然后它们之间存在上下游的依赖关系,这表示了它们的执行顺序。
  • Scheduler:它处理触发预定的工作流,并提交任务到执行程序运行。
  • Executor: 其处理正在运行的任务。如果是默认的airflow配置,其将运行调度程序中所有内容,但大多数适合生产的执行程序实际上将任务执行推给workers。执行器是运行任务实例的机制。

airflow-architecture

三、实现方案

3.1 数据获取

apache airflow定义的metrics使用的statsd的数据格式,而skywalking oap并不支持接收statsd格式的数据。不过openTelemetry collector的非官方版本支持接收statsd的数据格式,并且我们的skywalking oap支持接收opentelemetry的protocol,因此这一条数据接收链路就成功的建立了。

但是目前仍存在几个问题:

问题一:apache airflow定义的metrics没有tag这个概念

在skywalking oap接收metrics数据进行处理的过程中,主要是根据metrics的tag来进行过滤、聚合等操作,对于metric的name没有办法进行过滤和聚合;而airflow将所有的tag都放在了metric的name中,这使得skywalking oap拿到airflow传来的metric数据无法进行处理。由于修改meter analyzer过于复杂,我们将目光聚焦到了otel collector上面,我们希望能够在otel collector端将数据转化为skywalking oap能够处理的形式。

OpenTelemetry Collector能够接收各种形式的数据并将它们转化为各种形式的数据发送出去,其由三个模块组成,分别为collector、transformer和exporter,要对收集到的数据进行处理,我们需要查阅transformer的功能。我们尝试使用metrics transform processor, transform processor and attributes processor对收集到的数据进行处理,经过阅读文档和测试,我们发现通过set函数将metric name值设置为某个标签的值,之后对于标签的key使用replace_pattern函数剔除无用的字符,通过这种方式将metric name中的属性放入到tag中去,比如dag_id,task_id。同时对于sum和gauge类型的数据,combine函数可以自动将metric name中的一些属性变为标签。

问题二:对于聚合类型为delta的Sum metric,skywalking oap无法处理

在这里我解释一下Sum metric,更多信息也可以查阅promethus counter metric文档opentelemetry sum metric文档。简单来说,sum就是表示从某个时间点开始某个事件的累加和,在promethus的定义中,sum只增加不减少,而根据聚合类型,sum metric又可以分为delta类型和cumulative类型,delta类型就是每个发送来的指标表示这段时间的变化量,而cumulative类型就是表示累加的量。在skywalking oap中,之前是默认不处理delta类型的sum数据的,只处理cumulative类型,我为其做了适配,并将得到的sum数据转换为gauge,因为delta数据其实此时和gauge更像,表示这段时间的变化量。而更麻烦的问题发生了,在airflow中并不严格遵守promethus中counter数据只增加不减少的定义,其定义的dag_processing.processes表示的是当前在运行的解析dag的进程数量,如果我们将收到的每次metric定义为目前变化的解析dag进程数,未免过于奇怪,不过也没有更好的办法。

问题三: skywalking oap不适配opentelemetry Exponential Histogram类型metric

apache airflow一共会产生三种类型的metrics,分别是counter(也就是sum)、gauge(仪表盘数据)、timer。OpenTelemetry Collector接收到数据后会将其转换为otel支持的data,counter转化为sum,gauge转化为gauge,而对于timer数据有两种选择,分别是Exponential HistogramSummary,相比而言,histogram类型的数据能够提供更多的信息。而skywalking oap目前只适配histogram类型的metric,不支持较新的Exponential Histogram。从本质上来讲Exponential Histogram是对histogram的数据量进行了压缩,因为其使用传输底数和数组的index来表示boundary(做指数运算可以得到),我想这种数据形式相比于histogram更具有优势,未来可能会有更多的监控服务要使用到,因此对其进行了适配。

3.2 数据处理

Skywalking在收到metric数据后会将数据转换为自己定义的格式,关于skywalking metric的格式,可以参考该文档。转换之后,skywalking支持使用其定义的Meter Analysis Language来定义一系列的规则过滤和处理数据。下面介绍一些MAL常用的function:

Metric level function

在skywalking oap中,将指标的来源定义为service、instance、endpoint、process这几个级别。其中service表示为传入请求提供相同行为的一组工作负载;服务组中的每个工作负载都称为一个instance;endpoint表示服务中用于传入请求的路径,例如HTTP URI路径或gRPC服务类方法签名;process表示的是一个操作系统进程。该函数从metrics的label中提取出具体的level。

1
2
3
metric_1.service([svc_label1, svc_label2...], Layer)
metric_2.instance([svc_label1, svc_label2...], [ins_label1, ins_label2...], Layer)
metric_3.endpoint([svc_label1, svc_label2...], [ep_label1, ep_label2...])

Metric Filter

过滤metric用的,只有满足特定条件的metric才会被meter analysis language继续处理。

1
filter: <closure> # example: '{ tags -> tags.job_name == "vm-monitoring" }'

Tag filter

该filter可以根据metrics的tag来提取特定的metric

1
instance_trace_count.tagMatch("region", "us-west|asia-north").tagEqual("az", "az-1")

What’s more

更多信息可以参考官方文档

3.3 数据展示

得益于skywalking强大的booster-ui,我们能够通过编写json文件定制化我们的dashboard,我们可以使用各式各样的图标来展示我们的数据。

  • 本文标题:skywalking监控airflow功能的设计文档
  • 本文作者:mufiye
  • 创建时间:2023-03-03 14:56:45
  • 本文链接:http://mufiye.github.io/2023/03/03/skywalking监控airflow功能的设计文档/
  • 版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
 评论