大数据之hadoop YARN 应用日志收集 Flume/Logstash 集成

大数据阿木 发布于 2025-07-11 9 次阅读


YARN 应用日志收集:Flume 与 Logstash 集成实践

随着大数据时代的到来,企业对于数据处理和分析的需求日益增长。Hadoop 作为大数据处理框架,已经成为业界的事实标准。在 Hadoop 生态系统中,YARN(Yet Another Resource Negotiator)作为资源管理器,负责管理集群资源,并分配给不同的应用程序。YARN 应用的日志收集对于监控和分析应用性能至关重要。本文将介绍如何使用 Flume 和 Logstash 集成,实现 YARN 应用日志的收集。

Flume 简介

Flume 是一个分布式、可靠且可扩展的日志收集系统,用于有效地收集、聚合和移动大量日志数据。它支持多种数据源和目的地,包括文件、HDFS、HBase、Kafka 等。Flume 的核心组件包括:

- Agent:Flume 的基本工作单元,负责数据收集、处理和传输。

- Source:数据源,负责从各种数据源读取数据。

- Channel:数据缓冲区,用于在 Source 和 Sink 之间暂存数据。

- Sink:数据目的地,负责将数据写入到目标存储系统。

Logstash 简介

Logstash 是一个开源的数据处理管道,用于收集、转换和传输数据。它可以将来自各种数据源的数据转换为统一的格式,并传输到目标系统,如 Elasticsearch、HDFS、数据库等。Logstash 的核心组件包括:

- Pipelines:数据处理的流程,包括输入、过滤器、输出。

- Inputs:数据源,如文件、JMS、TCP 等。

- Filters:数据转换,如正则表达式、JSON 解析等。

- Outputs:数据目的地,如 Elasticsearch、数据库、文件等。

YARN 应用日志收集方案

1. 数据源

YARN 应用的日志通常存储在 Hadoop 集群的 HDFS 或本地文件系统中。我们可以使用 Flume 的 `exec` Source 来定期执行命令,获取 YARN 应用的日志文件。

2. 数据处理

在 Flume Agent 中,我们可以使用 `Taildir` Source 来实时读取日志文件,并使用 `HDFS` Sink 将数据写入 HDFS。

3. 数据传输

使用 Logstash 将 HDFS 中的数据传输到 Elasticsearch,以便进行搜索和分析。

实现步骤

1. 安装 Flume 和 Logstash

bash

安装 Flume


sudo apt-get install flume

安装 Logstash


sudo apt-get install logstash


2. 配置 Flume Agent

创建一个名为 `yarn_log_collector.conf` 的 Flume 配置文件,内容如下:

conf

定义 Agent


agent.sources = yarn_source


agent.sinks = hdfs_sink


agent.channels = memory_channel

定义 Source


agent.sources.yarn_source.type = Taildir


agent.sources.yarn_source.filepath = /path/to/yarn/logs


agent.sources.yarn_source.positionfile = /path/to/yarn/logs/.flume_position_file


agent.sources.yarn_source.startPosition = begin

定义 Channel


agent.channels.memory_channel.type = memory


agent.channels.memory_channel.capacity = 1000


agent.channels.memory_channel.transactionCapacity = 100

定义 Sink


agent.sinks.hdfs_sink.type = HDFS


agent.sinks.hdfs_sink.hdfs.path = /user/hadoop/yarn/logs


agent.sinks.hdfs_sink.hdfs.rollInterval = 3600


agent.sinks.hdfs_sink.hdfs.rollSize = 0


agent.sinks.hdfs_sink.hdfs.rollCount = 0


agent.sinks.hdfs_sink.hdfs.filePrefix = yarn_log_


agent.sinks.hdfs_sink.hdfs.round = true


agent.sinks.hdfs_sink.hdfs.roundValue = 3600


agent.sinks.hdfs_sink.hdfs.roundUnit = minute


3. 配置 Logstash Pipeline

创建一个名为 `logstash.conf` 的 Logstash 配置文件,内容如下:

conf

input {


file {


path => "/user/hadoop/yarn/logs/.log"


start_position => "begin"


sincedb_path => "/dev/null"


}


}

filter {


mutate {


add_field => ["message", "%{message}"]


}


}

output {


elasticsearch {


hosts => ["localhost:9200"]


index => "yarn_logs"


document_type => "log"


}


}


4. 启动 Flume 和 Logstash

bash

启动 Flume Agent


flume-ng agent -n yarn_log_collector -c /path/to/flume/conf -f /path/to/flume/conf/yarn_log_collector.conf

启动 Logstash


logstash -f /path/to/logstash/conf/logstash.conf


总结

本文介绍了如何使用 Flume 和 Logstash 集成,实现 YARN 应用日志的收集。通过 Flume,我们可以实时收集 YARN 应用的日志,并将其写入 HDFS。然后,使用 Logstash 将数据传输到 Elasticsearch,以便进行搜索和分析。这种方案可以帮助企业更好地监控和分析 YARN 应用的性能,提高大数据处理效率。