分享

Apache Pig如何与Apache Solr集成(二)


问题导读


1.把生成的Lucene索引,拷贝至本地磁盘有哪些缺点?
2.Solr与ElasticSearch和Lucene相比,有什么优点?








上篇文章中介绍了,如何使用Apache Pig与Lucene集成,还不知道的道友们,可以先看上篇,熟悉下具体的流程。
在与Lucene集成过程中,我们发现最终还要把生成的Lucene索引,拷贝至本地磁盘,才能提供检索服务,这样以来,比较繁琐,而且有以下几个缺点:

(一)在生成索引以及最终能提供正常的服务之前,索引经过多次落地操作,这无疑会给磁盘和网络IO,带来巨大影响

(二)Lucene的Field的配置与其UDF函数的代码耦合性过强,而且提供的配置也比较简单,不太容易满足,灵活多变的检索需求和服务,如果改动索引配置,则有可能需要重新编译源码。

(三)对Hadoop的分布式存储系统HDFS依赖过强,如果使用与Lucene集成,那么则意味着你提供检索的Web服务器,则必须跟hadoop的存储节点在一个机器上,否则,无法从HDFS上下拉索引,除非你自己写程序,或使用scp再次从目标机传输,这样无疑又增加了,系统的复杂性。


鉴于有以上几个缺点,所以建议大家使用Solr或ElasticSearch这样的封装了Lucene更高级的API框架,那么Solr与ElasticSearch和Lucene相比,又有什么优点呢?

(1)在最终的写入数据时,我们可以直接最终结果写入solr或es,同时也可以在HDFS上保存一份,作为灾备。

(2)使用了solr或es,这时,我们字段的配置完全与UDF函数代码无关,我们的任何字段配置的变动,都不会影响Pig的UDF函数的代码,而在UDF函数里,唯一要做的,就是将最终数据,提供给solr和es服务。

(3)solr和es都提供了restful风格的http操作方式,这时候,我们的检索集群完全可以与Hadoop集群分离,从而让他们各自都专注自己的服务。



下面,散仙就具体说下如何使用Pig和Solr集成?

(1)依旧访问这个地址下载源码压缩包。
(2)提取出自己想要的部分,在eclipse工程中,修改定制适合自己环境的的代码(Solr版本是否兼容?hadoop版本是否兼容?,Pig版本是否兼容?)。
(3)使用ant重新打包成jar
(4)在pig里,注册相关依赖的jar包,并使用索引存储



注意,在github下载的压缩里直接提供了对SolrCloud模式的提供,而没有提供,普通模式的函数,散仙在这里稍作修改后,可以支持普通模式的Solr服务,代码如下:


SolrOutputFormat函数
  1. package com.pig.support.solr;
  2. import java.io.IOException;
  3. import java.util.ArrayList;
  4. import java.util.List;
  5. import java.util.concurrent.Executors;
  6. import java.util.concurrent.ScheduledExecutorService;
  7. import java.util.concurrent.TimeUnit;
  8. import org.apache.hadoop.io.Writable;
  9. import org.apache.hadoop.mapreduce.JobContext;
  10. import org.apache.hadoop.mapreduce.OutputCommitter;
  11. import org.apache.hadoop.mapreduce.RecordWriter;
  12. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  14. import org.apache.solr.client.solrj.SolrServer;
  15. import org.apache.solr.client.solrj.SolrServerException;
  16. import org.apache.solr.client.solrj.impl.CloudSolrServer;
  17. import org.apache.solr.client.solrj.impl.HttpSolrServer;
  18. import org.apache.solr.common.SolrInputDocument;
  19. /**
  20. * @author qindongliang
  21. * 支持SOlr的SolrOutputFormat
  22. * 如果你想了解,或学习更多这方面的
  23. *
  24. * */
  25. public class SolrOutputFormat extends
  26. FileOutputFormat<Writable, SolrInputDocument> {
  27. final String address;
  28. final String collection;
  29. public SolrOutputFormat(String address, String collection) {
  30. this.address = address;
  31. this.collection = collection;
  32. }
  33. @Override
  34. public RecordWriter<Writable, SolrInputDocument> getRecordWriter(
  35. TaskAttemptContext ctx) throws IOException, InterruptedException {
  36. return new SolrRecordWriter(ctx, address, collection);
  37. }
  38. @Override
  39. public synchronized OutputCommitter getOutputCommitter(
  40. TaskAttemptContext arg0) throws IOException {
  41. return new OutputCommitter(){
  42. @Override
  43. public void abortTask(TaskAttemptContext ctx) throws IOException {
  44. }
  45. @Override
  46. public void commitTask(TaskAttemptContext ctx) throws IOException {
  47. }
  48. @Override
  49. public boolean needsTaskCommit(TaskAttemptContext arg0)
  50. throws IOException {
  51. return true;
  52. }
  53. @Override
  54. public void setupJob(JobContext ctx) throws IOException {
  55. }
  56. @Override
  57. public void setupTask(TaskAttemptContext ctx) throws IOException {
  58. }
  59. };
  60. }
  61. /**
  62. * Write out the LuceneIndex to a local temporary location.<br/>
  63. * On commit/close the index is copied to the hdfs output directory.<br/>
  64. *
  65. */
  66. static class SolrRecordWriter extends RecordWriter<Writable, SolrInputDocument> {
  67. /**Solr的地址*/
  68. SolrServer server;
  69. /**批处理提交的数量**/
  70. int batch = 5000;
  71. TaskAttemptContext ctx;
  72. List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(batch);
  73. ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
  74. /**
  75. * Opens and forces connect to CloudSolrServer
  76. *
  77. * @param address
  78. */
  79. public SolrRecordWriter(final TaskAttemptContext ctx, String address, String collection) {
  80. try {
  81. this.ctx = ctx;
  82. server = new HttpSolrServer(address);
  83. exec.scheduleWithFixedDelay(new Runnable(){
  84. public void run(){
  85. ctx.progress();
  86. }
  87. }, 1000, 1000, TimeUnit.MILLISECONDS);
  88. } catch (Exception e) {
  89. RuntimeException exc = new RuntimeException(e.toString(), e);
  90. exc.setStackTrace(e.getStackTrace());
  91. throw exc;
  92. }
  93. }
  94. /**
  95. * On close we commit
  96. */
  97. @Override
  98. public void close(final TaskAttemptContext ctx) throws IOException,
  99. InterruptedException {
  100. try {
  101. if (docs.size() > 0) {
  102. server.add(docs);
  103. docs.clear();
  104. }
  105. server.commit();
  106. } catch (SolrServerException e) {
  107. RuntimeException exc = new RuntimeException(e.toString(), e);
  108. exc.setStackTrace(e.getStackTrace());
  109. throw exc;
  110. } finally {
  111. server.shutdown();
  112. exec.shutdownNow();
  113. }
  114. }
  115. /**
  116. * We add the indexed documents without commit
  117. */
  118. @Override
  119. public void write(Writable key, SolrInputDocument doc)
  120. throws IOException, InterruptedException {
  121. try {
  122. docs.add(doc);
  123. if (docs.size() >= batch) {
  124. server.add(docs);
  125. docs.clear();
  126. }
  127. } catch (SolrServerException e) {
  128. RuntimeException exc = new RuntimeException(e.toString(), e);
  129. exc.setStackTrace(e.getStackTrace());
  130. throw exc;
  131. }
  132. }
  133. }
  134. }
复制代码




SolrStore函数
  1. package com.pig.support.solr;
  2. import java.io.IOException;
  3. import java.util.Properties;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.Writable;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.OutputFormat;
  8. import org.apache.hadoop.mapreduce.RecordWriter;
  9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  10. import org.apache.pig.ResourceSchema;
  11. import org.apache.pig.ResourceSchema.ResourceFieldSchema;
  12. import org.apache.pig.ResourceStatistics;
  13. import org.apache.pig.StoreFunc;
  14. import org.apache.pig.StoreMetadata;
  15. import org.apache.pig.data.Tuple;
  16. import org.apache.pig.impl.util.UDFContext;
  17. import org.apache.pig.impl.util.Utils;
  18. import org.apache.solr.common.SolrInputDocument;
  19. /**
  20. *
  21. * Create a lucene index
  22. *
  23. */
  24. public class SolrStore extends StoreFunc implements StoreMetadata {
  25.         private static final String SCHEMA_SIGNATURE = "solr.output.schema";
  26.         ResourceSchema schema;
  27.         String udfSignature;
  28.         RecordWriter<Writable, SolrInputDocument> writer;
  29.         String address;
  30.         String collection;
  31.        
  32.         public SolrStore(String address, String collection) {
  33.                 this.address = address;
  34.                 this.collection = collection;
  35.         }
  36.         public void storeStatistics(ResourceStatistics stats, String location,
  37.                         Job job) throws IOException {
  38.         }
  39.         public void storeSchema(ResourceSchema schema, String location, Job job)
  40.                         throws IOException {
  41.         }
  42.         @Override
  43.         public void checkSchema(ResourceSchema s) throws IOException {
  44.                 UDFContext udfc = UDFContext.getUDFContext();
  45.                 Properties p = udfc.getUDFProperties(this.getClass(),
  46.                                 new String[] { udfSignature });
  47.                 p.setProperty(SCHEMA_SIGNATURE, s.toString());
  48.         }
  49.         public OutputFormat<Writable, SolrInputDocument> getOutputFormat()
  50.                         throws IOException {
  51.                 // not be used
  52.                 return new SolrOutputFormat(address, collection);
  53.         }
  54.         /**
  55.          * Not used
  56.          */
  57.         @Override
  58.         public void setStoreLocation(String location, Job job) throws IOException {
  59.                 FileOutputFormat.setOutputPath(job, new Path(location));
  60.         }
  61.         @Override
  62.         public void setStoreFuncUDFContextSignature(String signature) {
  63.                 this.udfSignature = signature;
  64.         }
  65.         @SuppressWarnings({ "unchecked", "rawtypes" })
  66.         @Override
  67.         public void prepareToWrite(RecordWriter writer) throws IOException {
  68.                 this.writer = writer;
  69.                 UDFContext udc = UDFContext.getUDFContext();
  70.                 String schemaStr = udc.getUDFProperties(this.getClass(),
  71.                                 new String[] { udfSignature }).getProperty(SCHEMA_SIGNATURE);
  72.                 if (schemaStr == null) {
  73.                         throw new RuntimeException("Could not find udf signature");
  74.                 }
  75.                 schema = new ResourceSchema(Utils.getSchemaFromString(schemaStr));
  76.         }
  77.         /**
  78.          * Shamelessly copied from : https://issues.apache.org/jira/secure/attachment/12484764/NUTCH-1016-2.0.patch
  79.          * @param input
  80.          * @return
  81.          */
  82.         private static String stripNonCharCodepoints(String input) {
  83.                 StringBuilder retval = new StringBuilder(input.length());
  84.                 char ch;
  85.                 for (int i = 0; i < input.length(); i++) {
  86.                         ch = input.charAt(i);
  87.                         // Strip all non-characters
  88.                         // http://unicode.org/cldr/utility/list-unicodeset.jsp?a=[:Noncharacter_Code_Point=True:]
  89.                         // and non-printable control characters except tabulator, new line
  90.                         // and carriage return
  91.                         if (ch % 0x10000 != 0xffff && // 0xffff - 0x10ffff range step
  92.                                                                                         // 0x10000
  93.                                         ch % 0x10000 != 0xfffe && // 0xfffe - 0x10fffe range
  94.                                         (ch <= 0xfdd0 || ch >= 0xfdef) && // 0xfdd0 - 0xfdef
  95.                                         (ch > 0x1F || ch == 0x9 || ch == 0xa || ch == 0xd)) {
  96.                                 retval.append(ch);
  97.                         }
  98.                 }
  99.                 return retval.toString();
  100.         }
  101.         @Override
  102.         public void putNext(Tuple t) throws IOException {
  103.                 final SolrInputDocument doc = new SolrInputDocument();
  104.                 final ResourceFieldSchema[] fields = schema.getFields();
  105.                 int docfields = 0;
  106.                 for (int i = 0; i < fields.length; i++) {
  107.                         final Object value = t.get(i);
  108.                         if (value != null) {
  109.                                 docfields++;
  110.                                 doc.addField(fields[i].getName().trim(), stripNonCharCodepoints(value.toString()));
  111.                         }
  112.                 }
  113.                 try {
  114.                         if (docfields > 0)
  115.                                 writer.write(null, doc);
  116.                 } catch (InterruptedException e) {
  117.                         Thread.currentThread().interrupt();
  118.                         return;
  119.                 }
  120.         }
  121. }
复制代码


Pig脚本如下:

  1. --注册依赖文件的jar包
  2. REGISTER ./dependfiles/tools.jar;
  3. --注册solr相关的jar包
  4. REGISTER  ./solrdependfiles/pigudf.jar;
  5. REGISTER  ./solrdependfiles/solr-core-4.10.2.jar;
  6. REGISTER  ./solrdependfiles/solr-solrj-4.10.2.jar;
  7. REGISTER  ./solrdependfiles/httpclient-4.3.1.jar
  8. REGISTER  ./solrdependfiles/httpcore-4.3.jar
  9. REGISTER  ./solrdependfiles/httpmime-4.3.1.jar
  10. REGISTER  ./solrdependfiles/noggit-0.5.jar
  11. --加载HDFS数据,并定义scheaml
  12. a = load '/tmp/data' using PigStorage(',') as (sword:chararray,scount:int);
  13. --存储到solr中,并提供solr的ip地址和端口号
  14. store d into '/user/search/solrindextemp'  using com.pig.support.solr.SolrStore('http://localhost:8983/solr/collection1','collection1');
  15. ~                                                                                                                                                            
  16. ~                                                                     
  17. ~                              
复制代码


配置成功之后,我们就可以运行程序,加载HDFS上数据,经过计算处理之后,并将最终的结果,存储到Solr之中,截图如下:


2.jpg



成功之后,我们就可以很方便的在solr中进行毫秒级别的操作了,例如各种各样的全文查询,过滤,排序统计等等!

同样的方式,我们也可以将索引存储在ElasticSearch中,关于如何使用Pig和ElasticSearch集成,散仙也会在后面的文章中介绍,敬请期待!



公众号:我是攻城师(woshigcs)
转自:散仙

已有(1)人评论

跳转到指定楼层
ainubis 发表于 2015-3-29 14:22:21
好东西,多xie楼主分享
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条