问题导读:
1.filebeat是什么公司推出的采集器?与Kafka有没有联系?
2.filebeat的工作原理是什么?和logstash该如何搭配?
3.如何进行安装配置Filebeat?
4.搭建好平台后,该怎么优化性能?
目录:
- 有关Filebeat和Logstash的介绍
- 安装配置
- 字段解析
- 实践:基于 Logstash + Filbeat 的数据采集
- 使用 K8s(Kubernetes)自动编排容器场景中的数据采集
- Filebeat性能优化
- 总结
有关Filebeat和Logstash的介绍
什么是Filebeat?
Filebeat是一个轻量级的采集器,用于转发和采集日志数据。 Filebeat作为代理安装在服务器上,监视那些指定的日志文件或位置,收集日志事件,并将它们转发到Elasticsearch或Logstash进行数据处理。最后由Logstash发送到Kafka或者ES。
Filebeat的构成和优点
filebeat 是基于原先 logstash-forwarder 的源码改造出来的。换句话说:filebeat 就是新版的 logstash-forwarder,也会是 Elastic Stack 在 shipper 端的第一选择。
Filebeat是Beat成员之一,基于Go语言,无任何依赖,并且比logstash更加轻量,非常适合安装在生产机器上,不会带来过高的资源占用,轻量意味着简单,所以Filebeat并没有集成和logstash一样的正则处理功能,而是将收集的日志原样上报。
Logstash是什么?
Logstash 是由 Elastic 公司推出的一款开源的服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送指定的存储库中。
有关logstash详细信息请参考https://www.sensorsdata.cn/manual/logstash_filebeat.html
Logstash的优点
Logstash 能够动态地采集、转换和传输数据,不受格式或复杂度的影响。Logstash利用 Grok 从非结构化数据中派生出结构,从 IP 地址解码出地理坐标,匿名化或排除敏感字段,并简化整体处理过程。
数据往往以各种各样的形式,或分散或集中地存在于很多系统中。 Logstash 支持各种输入选择 ,可以在同一时间从众多常用来源捕捉事件。能够以连续的流式传输方式,轻松地从您的日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。
数据从源传输到存储库的过程中,Logstash 过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值。
 利用 Grok 从非结构化数据中派生出结构
 从 IP 地址破译出地理坐标
 将 PII 数据匿名化,完全排除敏感字段
 简化整体处理,不受数据源、格式或架构的影响
elastic官网 https://www.elastic.co/cn/products/logstash
Filebeat的工作原理
启动Filebeat时,它会启动一个或多个输入,这些输入将查找您为日志数据指定的位置。 对于Filebeat找到的每个日志,Filebeat启动一个采集器。 每个采集器为新内容读取单个日志,并将新日志数据发送到libbeat,libbeat聚合事件并将聚合数据发送到Filebeat配置的输出。工作原理图如下:
如上图所示,Filebeat由两个主要组件构成: prospector 和 harvesters。这两类组件一起协同完成Filebeat的工作,从指定文件中把数据读取出来,然后发送事件数据到配置的output中。
Harvesters负责进行单个文件的内容收集,在运行过程中,每一个Harvester会对一个文件逐行进行内容读取,并且把读写到的内容发送到配置的output中。
Prospector负责管理Harvsters,并且找到所有需要进行读取的数据源。如果input type配置的是log类型,Prospector将会去配置度路径下查找所有能匹配上的文件,然后为每一个文件创建一个Harvster。
安装配置
在开始使用和设置你的Filebeat之前,建议安装并配置以下相关的产品:
Elasticsearch :用于储存数据和建立索引
Kibana :图形化界面
Logstash(可选):向Elasticsearch写入数据
安装好Filebeat后,再配置Filebeat,有关安装的详细步骤和代码可以访问
https://www.ctolib.com/docs/sfil ... -cn/beats/file.html
只有配置好filebeat,才能使用Logstash,然后再在Elasticsearch中读取索引模板以及设置kibana饼图,最后启动Filebeat。需要注意的是,在windows内安装时,可能需要额外授予执行权限,
命令为:[mw_shl_code=shell,true]PowerShell.exe -ExecutionPolicy RemoteSigned -File .\install-service-filebeat.ps1.[/mw_shl_code]
filebeat配置
这里介绍有关配置Filebeat的详细操作,请编辑配置文件。 默认配置文件名为filebeat.yml。 文件的位置因平台而异。 如果是rpm或deb安装的,配置文件在/usr/share/filebeat /filebeat.yml;如果是docker安装,配置文件在/usr/share/filebeat/filebeat.yml;如果是mac或win系统,则在你的解压文件里面。并且里面还有一个名为filebeat.reference.yml的文件展示了全部的不推荐使用的配置选项。
有关filebeat.yml文件的示例可以点击:
https://blog.csdn.net/bunny0101/article/details/79878370
如果你想直接将日志输出到Elasticsearch(不经过Logstash),在Filebeat的配置文件中设置Elasticsearch的ip地址和端口以便能连接到:
[mw_shl_code=shell,true]output.elasticsearch:
hosts: ["192.168.1.42:9200"]
[/mw_shl_code]
如果你打算使用kibana的可视化服务,需要这样配置:
[mw_shl_code=shell,true]setup.kibana:
host: "localhost:5601"
[/mw_shl_code]
这里host表示Kibana运行的主机名(hostname),例如localhost:5601。
如果你想在端口号后面指定路径,你需要在后面加上路径名:http://localhost:5601/path.
如果你想安全使用Elasticsearch和Kibana,你需要在配置文件里面指定安全凭据,然后再启动Filebeat,例如:
[mw_shl_code=shell,true]
output.elasticsearch:
hosts: ["myEShost:9200"]
username: "elastic"
password: "elastic"
setup.kibana:
host: "mykibanahost:5601"
username: "elastic"
password: "elastic"[/mw_shl_code]
Kibana的username和password的设置是可选的,如果不设置Kibana的安全凭据,Filebeat将使用Elasticsearch的username和password进行输出。
也可以参考https://www.bookstack.cn/read/ELKstack-guide-cn/beats-file.md里 filebeat 在 input 段的配置。
字段解析
Filebeat 发送的日志,会包含以下字段,这些字段各自的意思如下:
- beat.hostname beat 运行的主机名
- beat.name shipper 配置段设置的 name,如果没设置,等于 beat.hostname
- @timestamp 读取到该行内容的时间
- type 通过 document_type 设定的内容
- input_type 来自 “log” 还是 “stdin”
- source 具体的文件名全路径
- offset 该行日志的起始偏移量
- message 日志内容
- fields 添加的其他固定字段都存在这个对象里面
实践:基于 Logstash + Filbeat 的数据采集
当有关的配置和安装完成后,我们可以开始实践了,很多数据采集都可以在基于Logstash和Filebeat的场景上进行。本文将不再介绍有关Logstash的有关下载安装步骤,如想了解请点击 https://www.sensorsdata.cn/manual/logstash_filebeat.html。
各种数据采集总体过程:后端 SDK 生成数据文件 => Filebeat 读取文件 => Logstash Beat input => Logstash sensors_analytic output => 神策分析 。
结构图:
如果您生产日志的后端应用直接部署在服务器上,本节内容将介绍如何使用 Filebeat + Logstash 采集产生的日志数据。该场景下也可使用 LogAgent 完成日志的收集工作。
实践:使用 K8s(Kubernetes)自动编排容器场景中的数据采集
Logstash 部署
如果您已经在使用 Logstash 做一些其他的日志收集工作请参考 2.3.1 Logstash Pipeline 配置 。为避免容器意外关闭导致丢失数据,请设法保存缓冲区内的数据。
注册一份 Logstash 配置文件,使用 Filebeat 作为输入,sensors_analytics 作为输入,并指定运行配置。
- 配置文件 logstash-conf.yaml 参考示例:
[mw_shl_code=shell,true]apiVersion: v1
kind: ConfigMap
metadata:
name: logstash-set
labels:
sa-app: logstash
data:
logstash.yml: |-
http.host: 0.0.0.0
pipeline.workers: 1
queue.type: persisted
# 用于限制缓冲队列的大小默认值为 1024MB ,该数值在设置时应该小于 Pod 使用的存储卷大小。
queue.max_bytes: 900mb
queue.drain: true
---
apiVersion: v1
kind: ConfigMap
metadata:
name: logstash-pipe-conf
labels:
sa-app: logstash
data:
logstash.conf: |-
input {
beats {
port => "5044"
}
}
output {
sensors_analytics {
url => "http://10.42.34.189:8106/sa?project=default"
}
}[/mw_shl_code]
为了不丢失数据,使用了基于硬盘的数据缓冲队列 (queue.type: persisted),所以需要在容器外保存 Logstash 的进度信息,这样在重启 Logstash 的时候可以继续完成发送。
建议通过 StatefulSet 的方式进行部署从而保存 Logstash 的状态。
首先,创建一个 StorageClass 用于生成保存进度的 PV ,设置手动回收,下面以 NFS 为例。
logstash-sc.yaml 的参考示例如下:
[mw_shl_code=shell,true]apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: logstash-nfs-storage
provisioner: nfs-provisioner
reclaimPolicy: Retain
[/mw_shl_code]
然后,创建一个 StatefulSet 应用 logstash-nfs-storage ,通过 Headless Service 来为每个 Logstash Pod 提供网络访问方式。
logstash-sts.yaml 参考示例如下:
[mw_shl_code=shell,true]apiVersion: v1
kind: Service
metadata:
name: logstash
labels:
app: logstash
spec:
ports:
- port: 5044
name: beat-in
clusterIP: None
selector:
app: logstash
---
apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
name: logstash
spec:
# 使用上面的 Headless Service
serviceName: "logstash"
selector:
matchLabels:
app: logstash
replicas: 3
template:
metadata:
labels:
app: logstash
spec:
containers:
- name: logstash
image: sensorsdata/logstash:7.2.0
ports:
- containerPort: 5044
name: beat-in
volumeMounts:
- name: logstash-pipe-conf
mountPath: /usr/share/logstash/pipeline/logstash.conf
subPath: logstash.conf
- name: logstash-set
mountPath: /usr/share/logstash/config/logstash.yml
subPath: logstash.yml
# 容器中 /usr/share/logstash/data 目录下保存着缓冲队列 ,与进度信息。
- name: ldata
mountPath: /usr/share/logstash/data
volumes:
- name: logstash-pipe-conf
configMap:
name: logstash-pipe-conf
- name: logstash-set
configMap:
name: logstash-set
volumeClaimTemplates: # Logstash 进度数据使用的 PVC 模板
- metadata:
name: ldata
spec:
accessModes: [ "ReadWriteOnce" ]
# 使用的存储类名称,需要提前创建。
storageClassName: "logstash-nfs-storage"
resources:
requests:
# 大小要高于缓冲队列的最大长度限制
storage: 1Gi
[/mw_shl_code]
StatefulSet 创建完成后 Pod name 的生成规则为 StatsfulSet.name - Pod 序号。
上面的配置文件会生成 logstash-0、logstash-1,logstash-2 这样命名的 Pod。 Pod 副本也是按照序号 0 到 N-1 的顺序依次进行创建的,在删除时是按照序号 N-1 到 0 依次删除。
Headless Service 为控制的每个 Pod 副本创建了一个 DNS 域名,完整的域名规则为:(pod name).(headless server name).namespace.svc.cluster.local,因此 Filebeat 是通过域名来寻找 Logstash 的,而不是 IP 。当使用默认的 namespace 时可省略 namespace.svc.cluster.local 。
StatefulSet 根据 volumeClaimTemplates,为每个 Pod 创建一个 PVC,PVC 的命名前缀为:namespace-volumeMounts.name - volumeClaimTemplates.name - pod_name ,删除一个 Pod 副本不会删除 PVC ,在重启后新的 Pod 会复用之前 PVC 中的进度继续完成发送。
创建完成后检查一下运行的情况:
[mw_shl_code=shell,true]kubectl get pods -l app=logstash
NAME READY STATUS RESTARTS AGE
logstash-0 1/1 Running 0 3h56m
logstash-1 1/1 Running 0 3h56m
logstash-2 1/1 Running 0 3h56m[/mw_shl_code]
查看一下数据卷的创建情况:
[mw_shl_code=shell,true]kubectl get pvc -l app=logstash
NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE
ldata-logstash-0 Bound pvc-c1833d35-d2ee-49a5-ae16-3a9d3227ebe5 1Gi RWO logstash-nfs-storage 3h56m
ldata-logstash-1 Bound pvc-9aa4b50c-45f7-4b64-9e4d-056838906675 1Gi RWO logstash-nfs-storage 3h56m
ldata-logstash-2 Bound pvc-95bcdbf0-e84d-4068-9967-3c69c731311b 1Gi RWO logstash-nfs-storage 3h56m[/mw_shl_code]
检查一下集群内部的 DNS 创建情况:
[mw_shl_code=shell,true]
for i in 0 1; do kubectl exec logstash-$i -- sh -c 'hostname'; done
logstash-0
logstash-1
logstash-2
kubectl run -i --tty --image busybox:1.28.3 dns-test --restart=Never --rm /bin/sh
nslookup logstash-0.logstash
Server: 10.96.0.10
Address 1: 10.96.0.10 kube-dns.kube-system.svc.cluster.local
Name: logstash-0.logstash
Address 1: 10.244.7.54 logstash-0.logstash.default.svc.cluster.local
nslookup logstash-1.logstash
Server: 10.96.0.10
Address 1: 10.96.0.10 kube-dns.kube-system.svc.cluster.local
Name: logstash-1.logstash
Address 1: 10.244.5.150 logstash-1.logstash.default.svc.cluster.local
nslookup logstash-2.logstash
Server: 10.96.0.10
Address 1: 10.96.0.10 kube-dns.kube-system.svc.cluster.local
Name: logstash-2.logstash
Address 1: 10.244.34.177 logstash-2.logstash.default.svc.cluster.local[/mw_shl_code]
Logstash 的扩容/缩容
StatefulSet 的更新策略也是顺序的。
将之前设置的 StatefulSet 容量从 3 变成 5 。
[mw_shl_code=shell,true]kubectl scale sts web --replicas=5
kubectl get pods -l app=logstash
NAME READY STATUS RESTARTS AGE
logstash-0 1/1 Running 0 6h1m
logstash-1 1/1 Running 0 6h1m
logstash-2 1/1 Running 0 6h1m
logstash-3 1/1 Running 0 1h3m
logstash-4 1/1 Running 0 1h3m
[/mw_shl_code]
新增的 Pod 在原有的基础上序号递增。
将 StatefulSet 容量从 5 变回 3 。
kubectl scale sts web --replicas=3
之前新增的 PVC 不会被删除,当下次达到该容量时会继续复用。不用担心有 Filebeat 会向被删除的 Logstash 发送数据, Filebeat 会自行寻找另一个运行正常的 Logstash。 由于设置了 queue.drain: true 所以撤除的 Logstash 在关闭前会将缓冲区内的数据发送完毕。
Filebeat性能优化
在实现上面的一些数据采集的时候,我们可以发现依旧有某些性能不够让人满意,目前比较主流的日志采集系统ELK(ES+Logstash+Kibana),EFK(ES+Fluentd+Kibana)等。由于Logstash出现较早,大多数日志文件搜集采用了Logstash。但由于Logstash是JRuby实现的,性能开销较大,因此我们的日志搜集采用的Filebeat,然后发送到Logstash进行数据处理(例如:解析json,正则解析文件名称等),最后由Logstash发送到Kafka或者ES。这种方式虽然减轻了每个节点的处理压力,但部署Logstash的节点性能开销依旧很大,而且经常出现Filebeat无法发送数据到Logstash的情况。所以,这里提出了一种策略,可以提高客户端的性能。
抛弃Logstas
由于Logstash性能开销较大,为了提高客户端的日志采集性能,又减少数据传输环节和部署复杂度,并更充分地将 Go 语言的性能优势利用于日志解析,于是决定在 Filebeat 上通过开发插件的方式,实现针对公司日志格式规范的解析,直接作为 Logstash 的替代品。
开发自己的Processor
开发平台是基于Kubernetes的,因此我们需要解析每一条日志的source,从日志文件名称中获取Kubernetes资源名称,以确定该条日志的发往Topic。解析文件名称需要用到正则匹配,但由于正则性能开销较大,如果每一条日志都用正则解析名称将会带来比较大的性能开销,因此我们决定采用缓存来解决这一问题。即每个文件只解析一次名称,存放到一个Map变量中,如果已经解析过的文件名称则不再解析。这样大大提高了Filebeat的吞吐量。
性能优化
Filebeat配置文件如下,其中kubernetes_metadata是自己开发的Processor。
配置代码可以参考:https://my.oschina.net/u/2612999/blog/1518876?p=2
测试环境:
性能测试工具使用https://github.com/urso/ljtest
火焰图生成使用uber的go-torch https://github.com/uber/go-torch
CPU通过runtime.GOMAXPROCS(1)限制使用一个核
Filebeat 开发是基于 5.5.1 版本,Go 版本是 1.8.3。测试中Filebeat使用runtime.GOMAXPROCS(1)限制只使用一个核。第一版性能数据可以达到每秒一万一千多条的速度,100万条数据总时间需83.5秒。
生成的CPU火焰图如下
从火焰图中可以看出 CPU 时间占用最多的主要有两块。一块是 Output 处理部分,写文件。另一块就比较奇怪了,是 common.MapStr.Clone() 方法,居然占了 34.3% 的 CPU 时间。其中Errorf 占据了21%的CPU时间。看下代码:
[mw_shl_code=java,true]func toMapStr(v interface{}) (MapStr, error) {
switch v.(type) {
case MapStr:
return v.(MapStr), nil
case map[string]interface{}:
m := v.(map[string]interface{})
return MapStr(m), nil
default:
return nil, errors.Errorf("expected map but type is %T", v)
}
}[/mw_shl_code]
errors.Errorf生成error对象占据了大块时间,把这一块判断逻辑放到MapStr.Clone()中就可以避免产生error,到此你是不是该有些思考?go的error虽然是很好的设计,但不能滥用,不能滥用,不能滥用!否则你可能会为此付出惨痛的代价。
优化后的平均速度达到了每秒一万八千多条!处理100万条的总时间才五十多秒。
处理速度竟然提高了50%多,没想到几行代码的优化,吞吐量竟然能提高这么多,惊不惊喜,意不意外。 再看下修改后的火焰图
发现MapStr.Clone() 的性能消耗几乎可以忽略不计了。
进一步优化:
我们的日志都是Docker产生的,使用 JSON 格式,而 Filebeat 使用 Go 自带的 encoding/json 包是基于反射实现的,性能有一定问题。 既然我们的日志格式是固定的,解析出来的字段也是固定的,这时就可以基于固定的日志结构体做 JSON 的序列化,而不必用低效率的反射来实现。Go 有多个针对给定结构体做 JSON 序列化 / 反序列化的第三方包,这里使用的是 easyjson:https://github.com/mailru/easyjson。
由于解析的日志格式是固定的,所以提前定义好日志的结构体,然后使用easyjson解析。 由于测试是在同一台机器上使用相同数据进行的,将日志输出到文件对测试结果影响不大。处理速度性能提升到每秒两万条左右,100万条的总时间一共49秒。
但这样修改后就会使decode_json_fields 这个processor只能处理特定的日志格式,适用范围会有所降低。所以json解析这块暂时没有修改。
总结
本文介绍了详细有关filebeat和logstash的信息,以及如何安装配置他们,很多不够详细明白的地方读者可以参考文章中的链接去阅读。日志处理一直是系统运维中比较重要的环节,无论是传统的运维方式还是基于Kubernetes(或者Mesos,Swarm等)的新型云平台日志搜集都格外重要。无论选用哪种方式搜集日志,都有可能遇到性能瓶颈,但一小段代码的改善就可能完全解决了你的问题,路漫漫其修远兮,优化永无止境。
|