其实都有案例:
规律来讲:
第一部分:前面先配置SOLR_LOCATOR
如下面
SOLR_LOCATOR : {
# Name of solr collection
collection : collection1
# ZooKeeper ensemble
zkHost : "127.0.0.1:2181/solr"
}
这些如果懂solr,这个不难的。
第二部分:指定一个或则多个morphlines。
每一个定义一个ETL transformation链。一个morphline包含一个或则多个(多个潜在的嵌套)命令。一个morphline是consume记录的一种方式。把他们转换为stream记录,和pipe流,通过一组配置transformations的目标应用程序的方式比如solr.
morphlines : [
{
# 如果有多个,下面是名字morphline1
id : morphline1
#导入所有的morphline命令,在Java包和子包中
#可能在classpath存在其它命令,这是不可见的
importCommands : ["com.cloudera.**", "org.apache.solr.**"]
commands : [
{
# 解析 Avro container 文件和提交记录为每一个Avro object
readAvroContainer {
# 可选, 需要输入匹配的MIME类型:
# supportedMimeTypes : [avro/binary]
#可选,使用定制的 Avro schema 以 JSON 格式 inline:
# readerSchemaString : """<json can go here>"""
# 可选, use a custom Avro schema file in JSON format:
# readerSchemaFile : /path/to/syslog.avsc
}
}
{
#下面都是英文,如果楼主哪里不明白可以在继续讨论
# Consume the output record of the previous command and pipe another
# record downstream.
#
# extractAvroPaths is a command that uses zero or more Avro path
# expressions to extract values from an Avro object. Each expression
# consists of a record output field name (on the left side of the
# colon ':') as well as zero or more path steps (on the right hand
# side), each path step separated by a '/' slash. Avro arrays are
# traversed with the '[]' notation.
#
# The result of a path expression is a list of objects, each of which
# is added to the given record output field.
#
# The path language supports all Avro concepts, including nested
# structures, records, arrays, maps, unions, etc, as well as a flatten
# option that collects the primitives in a subtree into a flat list.
extractAvroPaths {
flatten : false
paths : {
id : /id
username : /user_screen_name
created_at : /created_at
text : /text
}
}
}
# Consume the output record of the previous command and pipe another
# record downstream.
#
# convert timestamp field to native Solr timestamp format
# e.g. 2012-09-06T07:14:34Z to 2012-09-06T07:14:34.000Z
{
convertTimestamp {
field : created_at
inputFormats : ["yyyy-MM-dd'T'HH:mm:ss'Z'", "yyyy-MM-dd"]
inputTimezone : America/Los_Angeles
outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
outputTimezone : UTC
}
}
# Consume the output record of the previous command and pipe another
# record downstream.
#
# Command that deletes record fields that are unknown to Solr
# schema.xml.
#
# Recall that Solr throws an exception on any attempt to load a document
# that contains a field that isn't specified in schema.xml.
{
sanitizeUnknownSolrFields {
# Location from which to fetch Solr schema
solrLocator : ${SOLR_LOCATOR}
}
}
# log the record at DEBUG level to SLF4J
{ logDebug { format : "output record: {}", args : ["@{}"] } }
# load the record into a Solr server or MapReduce Reducer
{
loadSolr {
solrLocator : ${SOLR_LOCATOR}
}
}
]
}
]
|