问题导读:
1.如何安装ELK(Elasticsearch,logstash,kibana),ELK能用来干什么?
2.如何收集日志文件,并一次性查询导出千百万条数据?
1. ELK安装和实战
Elk是Elastic search, Logstash和Kibana三者的简称。
通常,日志被分散的储存不同的设备上。如果你管理数十上百台服务器,你还在使用依次登录每台机器的传统方法查阅日志。这样是不是感觉很繁琐和效率低下。当务之急我们使用集中化的日志管理,例如:开源的syslog,将所有服务器上的日志收集汇总。 集中化管理日志后,日志的统计和检索又成为一件比较麻烦的事情,一般我们使用grep、awk和wc等Linux命令能实现检索和统计,但是对于要求更高的查询、排序和统计等要求和庞大的机器数量依然使用这样的方法难免有点力不从心。 - 开源实时日志分析ELK平台能够完美的解决我们上述的问题,ELK由ElasticSearch、Logstash和Kiabana三个开源工具组成。官方网站:https://www.elastic.co/productsElasticsearch是个开源分布式搜索引擎,它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。
- Logstash是一个完全开源的工具,他可以对你的日志进行收集、过滤,并将其存储供以后使用(如,搜索)。
- Kibana 也是一个开源和免费的工具,它Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助您汇总、分析和搜索重要数据日志。
画了一个ELK工作的原理图:
1.1 ELK平台搭建系统环境System: Centos release 6.7 (Final) ElasticSearch: 2.1.0 Logstash: 2.1.1 Kibana: 4.3.0 Java: openjdk version "1.8.0_65" 注:由于Logstash的运行依赖于Java环境, 而Logstash 1.5以上版本不低于java 1.7,因此推荐使用最新版本的Java。因为我们只需要Java的运行环境,所以可以只安装JRE,不过这里我依然使用JDK,请自行搜索安装。 1.2 ElasticSearch配置ElasticSearch: [mw_shl_code=shell,true]tar -zxvf elasticsearch-2.1.0.tar.gz
cd elasticsearch-2.1.0[/mw_shl_code] 安装Head插件(Optional): [mw_shl_code=applescript,true]./bin/plugin install mobz/elasticsearch-head[/mw_shl_code] 然后编辑ES的配置文件: [mw_shl_code=shell,true]vi config/elasticsearch.yml[/mw_shl_code] 修改以下配置项: [mw_shl_code=shell,true]cluster.name=es_cluster
node.name=node0
path.data=/tmp/elasticsearch/data
path.logs=/tmp/elasticsearch/logs
#当前hostname或IP,我这里是centos2
network.host=centos2
network.port=9200[/mw_shl_code] 然后启动ES: [mw_shl_code=shell,true]./bin/elasticsearch[/mw_shl_code]
1.3 LogstashLogstash的功能如下: 其实它就是一个收集器而已,我们需要为它指定Input和Output(当然Input和Output可以为多个)。 配置Logstash: [mw_shl_code=shell,true]tar -zxvf logstash-2.1.1.tar.gz
cd logstash-2.1.1[/mw_shl_code] 编写配置文件(名字和位置可以随意) [mw_shl_code=shell,true]mkdir config
vi config/log4j_to_es.conf[/mw_shl_code] 输入以下内容: 因此使用agent来启动它(使用-f指定配置文件): [mw_shl_code=shell,true]./bin/logstash agent -f config/log4j_to_es.conf[/mw_shl_code] 可以用head插件查看
可以看到,除了基础的message字段是我们的日志内容,Logstash还为我们增加了许多字段。
上面使用了ES的Head插件观察了ES集群的状态和数据,但这只是个简单的用于跟ES交互的页面而已,并不能生成报表或者图表什么的,接下来使用Kibana来执行搜索并生成图表。
1.4 Kibana配置Kibana: [mw_shl_code=shell,true]tar -zxvf kibana-4.3.0-linux-x86.tar.gz
cd kibana-4.3.0-linux-x86
vi config/kibana.yml[/mw_shl_code] 修改以下几项(由于是单机版的,因此host的值也可以使用localhost来代替,这里仅仅作为演示): [mw_shl_code=shell,true]server.port: 5601
server.host: “centos2”
elasticsearch.url: http://centos2:9200
kibana.index: “.kibana”[/mw_shl_code] 启动kibana: [mw_shl_code=shell,true]./bin/kibana[/mw_shl_code] 用浏览器打开该地址: 为了后续使用Kibana,需要配置至少一个Index名字或者Pattern,它用于在分析时确定ES中的Index。这里我输入之前配置的Index名字applog,Kibana会自动加载该Index下doc的field,并自动选择合适的field用于图标中的时间字段: 点击Create后,可以看到左侧增加了配置的Index名字: 接下来切换到Discover标签上,注意右上角是查询的时间范围,如果没有查找到数据,那么你就可能需要调整这个时间范围了,这里我选择Today:
接下来就能看到ES中的数据了:
点击右边的保存按钮,保存该查询为search_all_logs。接下来去Visualize页面,点击新建一个柱状图(Vertical Bar Chart),然后选择刚刚保存的查询search_all_logs,之后,Kibana将生成类似于下图的柱状图(只有10条日志,而且是在同一时间段的,比较丑,但足可以说明问题了: 如果有较多数据,我们可以根据业务需求和关注点在Dashboard页面添加多个图表:柱形图,折线图,地图,饼图等等。当然,我们可以设置更新频率,让图表自动更新: 如果设置的时间间隔够短,就很趋近于实时分析了。 到这里,ELK平台部署和基本的测试已完成。 2. Elasticsearch与logstash结合,收集日志文件,并查询导出千百万条数据 ES本身的限制,size大小为10000,也就是说一次最多只能查10000条,若想查1W以上,就需要用到分页查询,或者scroll查询。 2.1【分页查询】: 当Elasticsearch响应请求时,它必须确定docs的顺序,排列响应结果。如果请求的页数较少(假设每页10个docs), Elasticsearch不会有什么问题,但是如果页数较大时,比如请求第100页,Elasticsearch不得不取出第1页到第100页的所有docs,再去除第1页到第99页的docs,得到第100页的docs。
2.2【scroll查询】: 相对于from和size的分页来说,使用scroll可以模拟一个传统数据的游标,记录当前读取的文档信息位置。这个分页的用法,不是为了实时查询数据,而是为了一次性查询大量的数据(甚至是全部的数据)。
因为这个scroll相当于维护了一份当前索引段的快照信息,这个快照信息是你执行这个scroll查询时的快照。在这个查询后的任何新索引进来的数据,都不会在这个快照中查询到。但是它相对于from和size,不是查询所有数据然后剔除不要的部分,而是记录一个读取的位置,保证下一次快速继续读取。
from+size方式以及scroll方式优缺点对比:
1)对于from+size方式:当结果足够大的时候,会大大加大内存和CPU的消耗。但,该方式使用非常方便。
2)对于scroll方式: 当结果足够大的时候, scroll 性能更佳。但是不灵活和 scroll_id 难管理问题存在。
2.3 实战:查询opt/logs/目录下所有以sql结尾的日志文件 2.3.1 logstash配置 config/query.conf [mw_shl_code=shell,true]input{
file{
path =>"opt/logs/*.sql"
type =>"logjson"
start_position => "beginning"
sincedb_path =>"/dev/null"
}
}
filter{
json{
#将默认中的message内容转换成json内容,并删除message域
source => "message"
remove_field =>"message"
}
}
output{
elasticsearch{
hosts =>["172.16.117.93:9200"]
index =>"query"
document_type=> "%{type}"
flush_size=>20000
idle_flush_time =>10
}
}[/mw_shl_code]
2.3.2 java代码,连接操作es [mw_shl_code=java,true]public class ESsearch{
static Essearch ts=new ESearch();
static Client client =ts.TransportClientContect();
//返回client对象
public Client TransportClientContect(){
Settings settings =Setting.settingsBuilder().put("cluster.name","topic").build();
Client client=null;
try{
client =TransportClient.builder().setting(settings).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("172.16.117.93"),9300));
}catch(){}
return client;
}
}
}
class ESmain{
static ESearch ts=new ESearch();
static Client client=ts.TransportClientContect();
public static void main(String[] args) throws IOException{
Excel excel =new Excel();
Instant startClock=Instant.now();
System.out.println("开始时间"+startClock);
String index="logstash-sql---3p";
String type="loghson";
String value;
List<String> stringList=new ArrayList<String>();
//scroll模式启动 每次50000
SearchResponse scrollResponse=client.prepareSearch(index)
.setSearchType(SearchType.SCAN).setSize(10000)
.setQuery(QueryBuilders.matchAllQuery())
.setQuery(QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("q1","q1v"))//查询条件1
.operator(prg.elasticsearch.index.query.MatchQueryBuilder.Operator.AND)
.must(QueryBuilders.matchQuery("q2","q2v"))//查询条件2
setScroll(TimeVakue.timeValueMinutes(1))
.execute().actionGet();
int count =(int)scrollResponse.getHits().getTotalHits();//第一次不返回数据
for(int i=0,sum=0;sum<count;i++){
scrollResponse=client.prepareSearchScroll(scrollResponse.getScrollId())
.setScroll(Timevalue.timeValueMinutes(8))
.execute().actionGet();
sum+=scrollResponse.getHits().hits().length;
for(SearchHit hit:srollResponse.getHits()){
value=hits.getSource.get("param").toString();
stringList.add(value);
}
File destFile=new File("./output/a"+i+".xls"); //把文件导出到output目录下
try{
excel.createStringExcelFile(stringList,destFile);
}catch(){}
StringList.clear();
System.out.println("总数:"+count+"已查到:"+sum);
}
}
}[/mw_shl_code] 这样查询到的数据就全部导出到excel中了,这里需要说明一点07版的excel只能存100W条数据,之前版本只能存取6W3 左右的数据。
|