本帖最后由 desehawk 于 2017-10-11 17:20 编辑
问题导读
1.flume和elasticsearch整合需要哪些步骤?
2.如何将Kafka数据导入elasticsearch?
3.将Kafka中的数据导入到elasticsearch中有几种方式?
一、flume和elasticsearch整合
flume的es.conf配置:
[hadoop@h153 ~]$ cat apache-flume-1.6.0-cdh5.5.2-bin/conf/es.conf
[mw_shl_code=bash,true]
a1.sources = s1
a1.sinks = k1
a1.channels = c1
a1.sources.s1.type = com.urey.flume.source.taildir.TaildirSource
a1.sources.s1.positionFile = /home/hadoop/hui/taildir_position.json
a1.sources.s1.filegroups = f1
a1.sources.s1.filegroups.f1 = /home/hadoop/q1/test.*.log
a1.sources.s1.batchSize = 100
a1.sources.s1.backoffSleepIncrement = 1000
a1.sources.s1.maxBackoffSleep = 5000
a1.sources.s1.channels = c1
a1.sinks.k1.type=org.apache.flume.sink.elasticsearch.ElasticSearchSink
a1.sinks.k1.batchSize=10000
a1.sinks.k1.hostNames=192.168.205.153:9300
a1.sinks.k1.indexType = flume_kafka
a1.sinks.k1.indexName=logstash
a1.sinks.k1.clusterName = elasticsearch
a1.sinks.k1.serializer=org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer
a1.sinks.k1.indexNameBuilder=org.apache.flume.sink.elasticsearch.SimpleIndexNameBuilder(索引名称不追加时间)
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1[/mw_shl_code]
启动flume:
[hadoop@h153 apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c conf/ -f conf/es.conf -n a1 -Dflume.root.logger=INFO,console
注意:一开始我用的是elasticsearch-2.2.0版本,结果报错:
[mw_shl_code=bash,true]
2017-09-27 03:26:10,849 (lifecycleSupervisor-1-1) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)] Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@3a7f44c9 counterGroup:{ name:null counters:{} } } - Exception follows.
java.lang.NoSuchMethodError: org.elasticsearch.common.transport.InetSocketTransportAddress.<init>(Ljava/lang/String;I)V
at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.configureHostnames(ElasticSearchTransportClient.java:143)
at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.<init>(ElasticSearchTransportClient.java:77)
at org.apache.flume.sink.elasticsearch.client.ElasticSearchClientFactory.getClient(ElasticSearchClientFactory.java:48)
at org.apache.flume.sink.elasticsearch.ElasticSearchSink.start(ElasticSearchSink.java:357)
at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46)
at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)[/mw_shl_code]
原因:Elasticsearch的版本过高,导致Flume的jar包与Elasticsearch不兼容
解决方案:将Elasticsearch版本换为1.7.1
产生模拟数据:
[hadoop@h153 q1]$ echo "2015-10-30 16:05:00| 967837DE00026C8DB2E|127.0.0.1" >> test.1.log
[hadoop@h153 q1]$ echo "awfeeeeeeeeeeeeeeeeeeeeeeeee" >> test.1.log
二、将Kafka数据导入elasticsearch(我这里用的是elasticsearch-2.2.0版本)
最近需要搭建一套日志监控平台,结合系统本身的特性总结一句话也就是:需要将Kafka中的数据导入到elasticsearch中。那么如何将Kafka中的数据导入到elasticsearch中去呢,总结起来大概有如下几种方式:
(1).Kafka->logstash->elasticsearch->kibana(简单,只需启动一个代理程序)
(2).Kafka->kafka-connect-elasticsearch->elasticsearch->kibana(与confluent绑定紧,有些复杂)
(3).Kafka->elasticsearch-river-kafka-1.2.1-plugin->elasticsearch->kibana(代码很久没更新,后续支持比较差)
elasticsearch-river-kafka-1.2.1-plugin插件的安装及配置可以参考:http://hqiang.me/2015/08/%E5%B0% ... 87%B3elasticsearch/
根据以上情况,项目决定采用方案一将Kafka中的数据存入到elasticsearch中去。
1.拓扑图
项目拓扑图如下所示:
此时消息的整体流向为:日志/消息整体流向Flume => kafka => logstash => elasticsearch => kibana
2.Flume日志收集
[hadoop@h153 ~]$ cat apache-flume-1.6.0-cdh5.5.2-bin/conf/kafka.conf
[mw_shl_code=bash,true]
a1.sources = s1
a1.sinks = k1
a1.channels = c1
a1.sources.s1.type = com.urey.flume.source.taildir.TaildirSource
a1.sources.s1.positionFile = /home/hadoop/hui/taildir_position.json
a1.sources.s1.filegroups = f1
a1.sources.s1.filegroups.f1 = /home/hadoop/q1/test.*.log
a1.sources.s1.batchSize = 100
a1.sources.s1.backoffSleepIncrement = 1000
a1.sources.s1.maxBackoffSleep = 5000
a1.sources.s1.channels = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = h153:9092
a1.sinks.k1.topic = hui
a1.sinks.k1.channel = memoryChannel
a1.sinks.k1.batch-size = 100
a1.sinks.k1.requiredAcks = -1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1[/mw_shl_code]
3.logstash的安装及配置
A.下载logstash的安装包:https://download.elastic.co/logs ... gstash-2.2.2.tar.gz
小窍门:如果你想要下载不同的logstash版本的话,不用去网上苦苦搜索了,你直接把上面这个链接的版本一改在网址上一输就能下载了,比如你想下载logstash-2.4.0版本的话,网址就为https://download.elastic.co/logs ... gstash-2.4.0.tar.gz。还有elasticsearch也可以这样做,给你个万能网址:https://download.elastic.co/elas ... search-2.2.2.tar.gz
B.新建kafka-logstash-es.conf置于logstash/conf目录下;(我解压后居然没有conf目录,那就新建一个呗)
C.配置kafka-logstash-es.conf如下:
logstash的配置语法如下:
input {
...#读取数据,logstash已提供非常多的插件,可以从file、redis、syslog等读取数据
}
filter{
...#想要从不规则的日志中提取关注的数据,就需要在这里处理。常用的有grok、mutate等
}
output{
...#输出数据,将上面处理后的数据输出到file、elasticsearch等
}
示例:
[mw_shl_code=bash,true]input {
kafka {
zk_connect => "h153:2181"
group_id => "elasticconsumer" ---随意取
topic_id => "hui" ---所要导入kafka对应的topic
reset_beginning => false
consumer_threads => 2
decorate_events => true
codec => "json"
}
}
output{
elasticsearch {
hosts => "192.168.205.153"
index => "traceid" ---与Kafka中json字段无任何关联关系,注意:index必须小写,或者写成index => "traceid-%{+YYYY-MM-dd}"
}
stdout {
codec => rubydebug ---还可以写成json类型
}
} [/mw_shl_code]
4.启动flume和kafka和es并产生测试数据
[hadoop@h153 q1]$ echo "hello world" >> test.1.log
5.运行logstash命令为:nohup bin/logstash -f /home/hadoop/logstash-2.2.2/conf/kafka-logstash-es.conf &(后台运行)
我这里为了观察方便,直接在控制台中运行了
[hadoop@h153 logstash-2.2.2]$ bin/logstash -f kafka-logstash-es.conf
[mw_shl_code=bash,true]
log4j:WARN No appenders could be found for logger (org.apache.http.client.protocol.RequestAuthCache).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Settings: Default pipeline workers: 1
Logstash startup completed
JSON parse failure. Falling back to plain-text {:error=>#<LogStash::Json::ParserError: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
at [Source: [B@10e07777; line: 1, column: 7]>, :data=>"hello world", :level=>:error}
{
"message" => "hello world",
"tags" => [
[0] "_jsonparsefailure"
],
"@version" => "1",
"@timestamp" => "2017-10-11T16:04:44.212Z",
"kafka" => {
"msg_size" => 11,
"topic" => "hui",
"consumer_group" => "elasticconsumer",
"partition" => 0,
"key" => nil
}
}[/mw_shl_code]
注意:一开始我为了测试logstash是否能将数据导入到es中,就直接运行测试文件
input{
stdin{}
}
output{
elasticsearch{
hosts => "192.168.205.153"
}
stdout{codec => rubydebug}
}
结果却报错:(logstash-2.2.2和elasticsearch-2.2.2或elasticsearch-2.2.0)
[mw_shl_code=bash,true]
The error reported is:
SSLConnectionSocketFactory not found in packages org.apache.http.client.methods, org.apache.http.client.entity, org.apache.http.client.config, org.apache.http.config, org.apache.http.conn.socket, org.apache.http.impl, org.apache.http.impl.client, org.apache.http.impl.conn, org.apache.http.impl.auth, org.apache.http.entity, org.apache.http.message, org.apache.http.params, org.apache.http.protocol, org.apache.http.auth, java.util.concurrent, org.apache.http.client.protocol, org.apache.http.conn.ssl, java.security.cert, java.security.spec, java.security, org.apache.http.client.utils; last error: cannot load Java class org.apache.http.client.utils.SSLConnectionSocketFactory[/mw_shl_code]
在logstash-2.4.0和elasticsearch-2.3.3中报:
[mw_shl_code=bash,true]
Settings: Default pipeline workers: 1
Pipeline aborted due to error {:exception=>"NameError", :backtrace=>["file:/home/hadoop/logstash-2.4.0/vendor/jruby/lib/jruby.jar!/jruby/java/core_ext/module.rb:45:in `const_missing'", "/home/hadoop/logstash-2.4.0/vendor/bundle/jruby/1.9/gems/manticore-0.6.0-java/lib/manticore/client.rb:587:in `ssl_socket_factory_from_options'", "/home/hadoop/logstash-2.4.0/vendor/bundle/jruby/1.9/gems/manticore-0.6.0-java/lib/manticore/client.rb:394:in `pool_builder'", "/home/hadoop/logstash-2.4.0/vendor/bundle/jruby/1.9/gems/manticore-0.6.0-java/lib/manticore/client.rb:402:in `pool'", "/home/hadoop/logstash-2.4.0/vendor/bundle/jruby/1.9/gems/manticore-0.6.0-java/lib/manticore/client.rb:208:in `initialize'", "/home/hadoop/logstash-2.4.0/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.1.0/lib/elasticsearch/transport/transport/http/manticore.rb:58:in `build_client'", "/home/hadoop/logstash-2.4.0/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.1.0/lib/elasticsearch/transport/transport/http/manticore.rb:49:in `initialize'", "/home/hadoop/logstash-2.4.0/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.1.0/lib/elasticsearch/transport/client.rb:118:in `initialize'", "/home/hadoop/logstash-2.4.0/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.1.0/lib/elasticsearch/transport.rb:26:in `new'", "/home/hadoop/logstash-2.4.0/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-2.7.1-java/lib/logstash/outputs/elasticsearch/http_client.rb:129:in `build_client'", "/home/hadoop/logstash-2.4.0/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-2.7.1-java/lib/logstash/outputs/elasticsearch/http_client.rb:20:in `initialize'", "/home/hadoop/logstash-2.4.0/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-2.7.1-java/lib/logstash/outputs/elasticsearch/http_client_builder.rb:44:in `build'", "/home/hadoop/logstash-2.4.0/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-2.7.1-java/lib/logstash/outputs/elasticsearch.rb:134:in `build_client'", "/home/hadoop/logstash-2.4.0/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-2.7.1-java/lib/logstash/outputs/elasticsearch/common.rb:14:in `register'", "/home/hadoop/logstash-2.4.0/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.0-java/lib/logstash/output_delegator.rb:75:in `register'", "/home/hadoop/logstash-2.4.0/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.0-java/lib/logstash/pipeline.rb:181:in `start_workers'", "org/jruby/RubyArray.java:1613:in `each'", "/home/hadoop/logstash-2.4.0/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.0-java/lib/logstash/pipeline.rb:181:in `start_workers'", "/home/hadoop/logstash-2.4.0/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.0-java/lib/logstash/pipeline.rb:136:in `run'", "/home/hadoop/logstash-2.4.0/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.0-java/lib/logstash/agent.rb:491:in `start_pipeline'"], :level=>:error}
stopping pipeline {:id=>"main"}[/mw_shl_code]
(后来我在同一台虚拟机上还原快照重新装了一遍却好使不报这个错了,可能是那会儿的环境有问题,具体原因我目前也不知道。后来我发现当时我装了单节点的hbase-1.0.0-cdh5.5.2,我把这个hbase全部删除掉或换成Apache版的hbase-1.0.0-bin.tar.gz就不报这个错了。并且当我把hbase-1.0.0-cdh5.5.2.tar.gz解压后什么都没有做再运行bin/logstash -f kafka-logstash-es.conf就又会报上面的错,后来我还原快照重装发现和hbase-1.0.0-cdh5.5.2.tar.gz并没有直接关系,也不知道我当时的环境是什么样才导致了这个原因,我也是醉了。。!)
补充:(测试kafka和logstash是否能正常传输数据)
kafka入logstash:
input{
kafka{
codec => "plain"
group_id => "logstash1"
auto_offset_reset => "smallest"
reset_beginning => true
topic_id => "hui"
zk_connect => "192.168.205.153:2181"
}
}
output {
stdout{
codec => rubydebug
}
}
logstash数据进kafka:
input {
stdin{}
}
output {
kafka{
topic_id => "hui"
bootstrap_servers => "192.168.205.153:9092"
batch_size => 5
}
stdout{
codec => json
}
}
来自:csdn:小强签名设计
http://blog.csdn.net/m0_37739193/article/details/78204474
|