有没有人讨论下从flume的sink数据到solr中?
我的数据来源于从程序到kafka中,通过flume采集到solr,但是配置这里难住了,不太懂 morphlines.conf这个里面是怎么写的。我的数据格式是:xxx|xxx|xxx|xxxx|xxxxxxxx|xxxxx|xxxxx|xxxxx|xxx
这个是我的flume配置文件(参照http://www.aboutyun.com/thread-14925-1-1.html):
#指定本sink从哪个channel中读取数据
agent.sinks.solrSink.channel = memoryChannel
#指定sink类型,MorphlineSolrSink
agent.sinks.solrSink.type= org.apache.flume.sink.solr.morphline.MorphlineSolrSink
#指定morphline的配置文件路径,如果不加前面的路径,则默认读取该flume.conf文件所在同级路径的morphline文件。
agent.sinks.solrSink.morphlineFile = morphlines.conf
agent.sinks.solrSink.morphlineId = morphline1
#当channel中存在100条数据开始处理
agent.sinks.solrSink.batchSize = 100
#当channel中数据存在超过1000ms时开始处理。batchSize与batchDurationMillis采取优先原则,哪个参数值先到,哪个先处理。
agent.sinks.solrSink.batchDurationMillis = 1000
其实都有案例:
规律来讲:
第一部分:前面先配置SOLR_LOCATOR
如下面
SOLR_LOCATOR : {
# Name of solr collection
collection : collection1
# ZooKeeper ensemble
zkHost : "127.0.0.1:2181/solr"
}
这些如果懂solr,这个不难的。
第二部分:指定一个或则多个morphlines。
每一个定义一个ETL transformation链。一个morphline包含一个或则多个(多个潜在的嵌套)命令。一个morphline是consume记录的一种方式。把他们转换为stream记录,和pipe流,通过一组配置transformations的目标应用程序的方式比如solr.
morphlines : [
{
# 如果有多个,下面是名字morphline1
id : morphline1
#导入所有的morphline命令,在Java包和子包中
#可能在classpath存在其它命令,这是不可见的
importCommands : ["com.cloudera.**", "org.apache.solr.**"]
commands : [
{
# 解析 Avro container 文件和提交记录为每一个Avro object
readAvroContainer {
# 可选, 需要输入匹配的MIME类型:
# supportedMimeTypes :
#可选,使用定制的 Avro schema 以 JSON 格式 inline:
# readerSchemaString : """<json can go here>"""
# 可选, use a custom Avro schema file in JSON format:
# readerSchemaFile : /path/to/syslog.avsc
}
}
{
#下面都是英文,如果楼主哪里不明白可以在继续讨论
# Consume the output record of the previous command and pipe another
# record downstream.
#
# extractAvroPaths is a command that uses zero or more Avro path
# expressions to extract values from an Avro object. Each expression
# consists of a record output field name (on the left side of the
# colon ':') as well as zero or more path steps (on the right hand
# side), each path step separated by a '/' slash. Avro arrays are
# traversed with the '[]' notation.
#
# The result of a path expression is a list of objects, each of which
# is added to the given record output field.
#
# The path language supports all Avro concepts, including nested
# structures, records, arrays, maps, unions, etc, as well as a flatten
# option that collects the primitives in a subtree into a flat list.
extractAvroPaths {
flatten : false
paths : {
id : /id
username : /user_screen_name
created_at : /created_at
text : /text
}
}
}
# Consume the output record of the previous command and pipe another
# record downstream.
#
# convert timestamp field to native Solr timestamp format
# e.g. 2012-09-06T07:14:34Z to 2012-09-06T07:14:34.000Z
{
convertTimestamp {
field : created_at
inputFormats : ["yyyy-MM-dd'T'HH:mm:ss'Z'", "yyyy-MM-dd"]
inputTimezone : America/Los_Angeles
outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
outputTimezone : UTC
}
}
# Consume the output record of the previous command and pipe another
# record downstream.
#
# Command that deletes record fields that are unknown to Solr
# schema.xml.
#
# Recall that Solr throws an exception on any attempt to load a document
# that contains a field that isn't specified in schema.xml.
{
sanitizeUnknownSolrFields {
# Location from which to fetch Solr schema
solrLocator : ${SOLR_LOCATOR}
}
}
# log the record at DEBUG level to SLF4J
{ logDebug { format : "output record: {}", args : ["@{}"] } }
# load the record into a Solr server or MapReduce Reducer
{
loadSolr {
solrLocator : ${SOLR_LOCATOR}
}
}
]
}
]
比方下面比较简单的,就是
第一步:命名一个morphline1,
第二步导入包
第三步:使用命令
morphlines : [
{
id : morphline1
# Import all morphline commands in these java packages and their subpackages.
# Other commands that may be present on the classpath are not visible to this morphline.
importCommands : ["org.kitesdk.**"]
commands : [
{
readLine {
charset : UTF-8
}
}
]
}
]
楼主可以从简单的开始
yaojiank 发表于 2017-3-30 19:11
其实都有案例:
规律来讲:
那。原生环境的Flume sink 数据到solr怎么配置呢?需要morphline吗?
页:
[1]