问题导读:
1、一个完整的集中式日志系统,包含几个主要特点?
2、如何添加 Kafka 集群 broker list?
3、如何编写pulsar-kafka 脚本文件?
4、如何编写pulsar-daemon 脚本文件?
关于 Apache Pulsar
Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性。
当前已有众多国内外大型互联网和传统行业公司采用 Apache Pulsar,案例分布在人工智能、金融、电信运营商、直播与短视频、物联网、零售与电子商务、在线教育等多个行业,如美国有线电视网络巨头 Comcast、Yahoo!、腾讯、中国电信、中国移动、BIGO、VIPKID 等。
背景介绍
Apache Pulsar 作为一个云原生的分布式消息系统,包括 Zookeeper、bookie、broker、functions-worker、proxy 等多个组件,并且所有的组件以分布式的方式部署在多台主机上,因此每个组件的日志文件也就分散在多台主机上,当组件出现问题时,由于日志比较分散,想要检查各个服务是否有报错信息,需要挨个服务去排查,比较麻烦,通常我们的做法是直接对日志文件进行 grep、awk 等命令就可以获得想要的信息。但是,随着应用和服务体量的增加,支撑的节点也随之增加,那么传统方法暴露出很多问题,比如:效率低下、日志量太大如何归档、文本搜索太慢怎么办、如何多维度查询等等。所以我们希望通过对日志进行聚合、监控,能够快速的找到 Pulsar 各个服务的报错信息,快速的排查,使得运维更加具有目的性、针对性和直接性。
为了解决日志检索的问题,我们团队考虑使用集中式日志收集系统,将 Pulsar 所有节点上的日志统一收集,管理,访问。
一个完整的集中式日志系统,需要包含以下几个主要特点:
•收集-能够采集多种来源的日志数据;•传输-能够稳定的把日志数据传输到中央系统;•存储-如何存储日志数据;•分析-可以支持 UI 分析;•警告-能够提供错误报告,监控机制.
ELK 提供了一整套解决方案,并且都是开源软件,之间互相配合使用,完美衔接,高效的满足了很多场合的应用,是目前主流的一种日志系统。我们公司拥有自研的大数据管理平台,通过大数据管理平台部署和管理 ELK,并且在生产系统中已经使用ELK 为多个业务系统提供了支撑服务。 ELK 是三个开源软件的缩写,分别表示:Elasticsearch、Logstash、Kibana , 它们都是开源软件,最新版本已经改名为 Elastic Stack,并新增了 Beats项目,其中包括 FileBeat,它是一个轻量级的日志收集处理工具 (Agent),Filebeat 占用资源少,适合于在各个服务器上搜集日志后传输给 Logstash。
在上图中可以看到,如果 Pulsar 使用这种日志采集模式存在两个问题:
•部署了 Pulsar 服务的主机必须部署一套 Filebeat 服务;
•Pulsar 服务的日志必须以文件的方式落一次磁盘,占用了主机磁盘的 IO。
为此,我们考虑 Apache Pulsar 基于 Log4j2+Kafka+ELK 实现日志的快速检索,Log4j2 默认支持将日志发送到 Kafka 的功能,使用 Kafka 自带的 Log4j2Appender,在 Log4j2 配置文件中进行相应的配置,即可完成将 Log4j2 产生的日志实时发送至 Kafka 中。
如下图所示:
实施过程
下面以 Pulsar 2.6.2 版本为例,介绍 Apache Pulsar 基于 Log4j2+Kafka+ELK 实现日志的快速检索的解决方案的详细的实施过程。
一、准备工作
首先需要确定的是在 Kibana 中用于检索日志的字段,可以对这些字段聚合、多维度查询,然后,Elasticsearch 根据检索字段进行分词,并创建索引。
如上图所示:我们将对 Pulsar 的日志建立了 8 个检索字段,分别是:集群名、主机名、主机IP、组件名、日志内容、系统时间、日志级别、集群实例。
二、实施过程
说明:为了保证 Pulsar 原生的配置文件和脚本文件的结构不被破坏,我们通过添加新的配置文件和脚本文件来实现此方案。
1. 添加配置文件
在 {PULSAR_HOME}/conf 目录中添加以下两个配置文件:
1)logenv.sh
该文件是将Pulsar 组件启动时需要的 JVM 选项以配置的方式传递到 Pulsar 服务的 Java 进程中,内容示例如下:
KAFKA_CLUSTER=192.168.0.1:9092,192.168.0.2:9092,192.168.0.2:9092
PULSAR_CLUSTER=pulsar_cluster
PULSAR_TOPIC=pulsar_topic
HOST_IP=192.168.0.1
PULSAR_MODULE_INSTANCE_ID=1
复制代码
以上这些字段的意义分别是:
•KAFKA_CLUSTER:Kafka broker list 地址;•PULSAR_CLUSTER:Pulsar 的集群名称;•PULSAR_TOPIC:Kafka 中用于接入 Pulsar 服务日志的 Topic;•HOST_IP:Pulsar 主机的 IP;•PULSAR_MODULE_INSTANCE_ID:Pulsar 服务的实例标识,一个主机上可能会部署多个 Pulsar 集群,集群间通过实例标识来区分。
2)log4j2-kafka.yaml
该配置文件是从 log4j2.yaml 复制而来,在 log4j2.yaml 的基础上添加以下修改: (说明:下图中左侧为 log4j2.yaml,右侧为 log4j2-kafka.yaml。)
•添加 Kafka 集群 broker list,并定义 log4j2 写到 Kafka 中的消息记录格式,一条消息中的 8 个检索字段以空格分割,Elasticsearch 以空格作为分割符对 8 个检索字段进行分词。
•log4j2-kafka.yaml 配置文件的完整内容如下:
复制代码
注意事项:
•日志接入必须异步,绝对不能影响服务性能;•响应要求比较高的系统接入第三方系统,必须依赖解耦,此处的 Failover Appender 就是解耦对 Kafka 的依赖,当 Kafka Crash 时,日志触发 Failover,写本地即可;•log4j2 Failover appender retryIntervalSeconds 的默认值是 1 分钟,是通过异常来切换的,所以可以适量加大间隔,比如上面的 10分钟;•Kafka appender ignoreExceptions 必须设置为 false,否则无法触发 Failover;•这里有个比较大的坑是 max.block.ms Property,KafkaClient 包里默认值是 60000ms,当 Kafka 宕机时,尝试写 Kafka 需要 1 分钟才能返回 Exception,之后才会触发 Failover,当请求量大时,log4j2 队列很快就会打满,之后写日志就 Blocking,严重影响到主服务响应。所以要设置足够短,队列长度足够长。
2、添加脚本文件
在 {PULSAR_HOME}/bin 目录中添加以下两个脚本文件:
1)pulsar-kafka
该脚本文件是从 pulsar 脚本文件复制而来,在 pulsar 脚本文件的基础上添加如下修改: (说明:下图中左侧为 pulsar,右侧为 pulsar-kafka。)
•pulsar-kafka 脚本文件的完整内容如下:
复制代码
2)pulsar-daemon-kafka
该脚本文件是从 pulsar-daemon 脚本文件复制而来,在 pulsar-daemon 脚本文件的基础上添加如下修改: (说明:下图中左侧为 pulsar-daemon,右侧为 pulsar-daemon-kafka。)
•pulsar-daemon-kafka 脚本文件的完整内容如下:
复制代码
3、添加 Kafka Producer 依赖的 jar
在 pulsar 集群的所有节点上的 {PULSAR_HOME}/lib 目录中添加以下 3 个 jar:
connect-api-2.0.1.jar
disruptor-3.4.2.jar
kafka-clients-2.0.1.jar 复制代码
4、启动Pulsar 服务
1.为了确保 Pulsar 服务的日志能够正确的写入 Kafka,先通过 bin/pulsar-kafka 前台启动,在没有异常的情况下,再通过 bin/pulsar-daemon-kafka 命令后台启动。2.以启动 broker 为例,执行以下命令:
bin/pulsar-daemon-kafka start broker
复制代码
3. 通过 ps 命令查看 broker 进程如下:
在上图可以看到,我们通过 logenv.sh 配置的 OPTS 都已经传递到 broker 进程中,log4j2-kafka.yaml 中的 sys 标签便可以通过这些属性值实例化一个 Kafka Producer,broker 进程的日志便会通过 Kafka Producer 发送到 Kafka broker 中。
5、测试 Pulsar 日志是否成功写入 Kafka broker
启动一个Kafka Consumer ,订阅log4j2 发送消息的Topic,读取到的消息内容如下,多个检索字段之间以空格分开:
pulsar-cluster dapp21 192.168.0.1 broker 1 2020-12-26 17:40:14.363 [prometheus-stats-43-1] [org.eclipse.jetty.server.RequestLog] INFO - 192.168.0.1 - - [26/Dec/2020:17:40:14 +0800] "GET /metrics/ HTTP/1.1" 200 23445 "http://192.168.0.1:8080/metrics" "Prometheus/2.22.1" 4
6、日志检索
打开 kibana 页面,根据分词的字段进行检索,检索条件如下:
cluster:"pulsar-cluster" AND hostname:"XXX" AND module:"broker" AND level:"INFO"
在上图中可以看到某个时间段内的日志检索结果,并且可以根据需要,在检索结果中添加 Available fields。这样,开发或运维人员可以通过 kibana 从多个维度快速有效的分析 Pulsar 服务异常的原因。至此,就是 Apache Pulsar 基于 Log4j2+Kafka+ELK 实现日志的快速检索的一套完整的解决方案。
总结
目前,分布式、微服务化是比较流行的技术方向,在生产系统中,随着业务的不断发展, 应用和服务体量的快速扩张,从单体/垂直架构转移到分布式/微服务架构是自然而然的选择,它主要表现在降低复杂度、容错、独立部署、水平伸缩等方面。但同时也面临着新的挑战,如问题排查的效率,运维监控的便捷性等。本文以 Apache Pulsar 为例,分享 Java 进程如何使用 Log4j2+Kafka+ELK 实现分布式、微服务化的日志的快速检索,达到服务治理的效果。
作者:ApachePulsar
来源:https://mp.weixin.qq.com/s/igFlFvEYz1pXDiscWdwW2g
最新经典文章,欢迎关注公众号