分享

ElasticSearch与hbase进行整合的一个搜索案例

本帖最后由 Oner 于 2017-1-5 16:56 编辑
问题导读:
1.  ElasticSearch和Hbase分别有有什么特点呢?
2.  如何安装一个ElasticSearch集群?
3.  如何导入索引?

4.  如何实现搜素?
5.  如何使用使用spring控制层处理?


首先祝大家2017新年快乐,我今天分享的是通过ElasticSearch与Hbase进行整合的一个搜索案例,这个案例涉及的技术面比较广,首先你得有JAVAEE的基础,要会SSM,而且还要会大数据中的hdfs、zookeeper、hbase以及ElasticSearch和kibana。环境部署在4台centos7上。主机名为node1-node4。这里假设你已经安装好了zookeeper、Hadoop、hbase和ElasticSearch还有kibana,我这里使用的是hadoop2.5.2,ElasticSearch用的你是2.2,kibana是4.4.1。我这里的环境是 hadoop是4台在node1-node4, zookeeper是3台再node1-node3,,ElasticSearch是3台在node1-node3,kibana是一台在node1上。该系统可以对亿万数据查询进行秒回,是一般的关系型数据库很难做到的。在IntelliJ IDEA 中进行代码编写。环境搭建我这里就不啰嗦,相信大家作为一名由经验的开发人员来说都是小事一桩。文末提供源码下载链接。

一、ElasticSearch和Hbase

ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。 Elasticsearch的性能是solr的50倍。

HBase – Hadoop Database,是一个高可靠性、高性能、面向列、可伸缩、
实时读写的分布式数据库
– 利用Hadoop HDFS作为其文件存储系统,利用Hadoop MapReduce来处理
HBase中的海量数据,利用Zookeeper作为其分布式协同服务
– 主要用来存储非结构化和半结构化的松散数据(列存 NoSQL 数据库)


二、需求分析&服务器环境设置


主要是做一个文章的搜索。有文章标题、作者、摘要、内容四个主要信息。效果图如下:这里样式我就没怎么设置了。。。。想要好看一点的可以自己加css。
20170101155427057.jpg

服务器:
在3台centos7中部署,主机名为node1-node3.安装好ElasticSearch并配置好集群,

1.     解压
2.     修改config/elasticsearch.yml    (注意要顶格写,冒号后面要加一个空格)
a)      Cluster.name: tf   (同一集群要一样)
b)      Node.name: node-1  (同一集群要不一样)
c)       Network.Host: 192.168.44.137  这里不能写127.0.0.1
3.     解压安装kibana
4.     再congfig目录下的kibana.yml中修改elasticsearch.url
5.     安装插件

Step 1: Install Marvel into Elasticsearch:
bin/plugin install license
bin/plugin install marvel-agent
Step 2: Install Marvel into Kibana
bin/kibana plugin --install elasticsearch/marvel/latest
Step 3: Start Elasticsearch and Kibana
bin/elasticsearch
bin/kibana


启动好elasticsearch集群后,
然后启动zookeeper、hdfs、hbase。zkService.sh start  、start-all.sh、start-hbase.sh。接下来就是剩下编码步骤了。
20170101161201098.jgp.png


三、编码开发


1、创建工程
首先在IntelliJ IDEA中新建一个maven工程,加入如下依赖。
  1. <dependencies>  
  2.         <dependency>  
  3.             <groupId>junit</groupId>  
  4.             <artifactId>junit</artifactId>  
  5.             <version>4.9</version>  
  6.         </dependency>  
  7.   
  8.   
  9.         <!-- spring 3.2 -->  
  10.         <dependency>  
  11.             <groupId>org.springframework</groupId>  
  12.             <artifactId>spring-context</artifactId>  
  13.             <version>3.2.0.RELEASE</version>  
  14.         </dependency>  
  15.         <dependency>  
  16.             <groupId>org.springframework</groupId>  
  17.             <artifactId>spring-orm</artifactId>  
  18.             <version>3.2.0.RELEASE</version>  
  19.         </dependency>  
  20.         <dependency>  
  21.             <groupId>org.springframework</groupId>  
  22.             <artifactId>spring-aspects</artifactId>  
  23.             <version>3.2.0.RELEASE</version>  
  24.         </dependency>  
  25.         <dependency>  
  26.             <groupId>org.springframework</groupId>  
  27.             <artifactId>spring-web</artifactId>  
  28.             <version>3.2.0.RELEASE</version>  
  29.         </dependency>  
  30.         <dependency>  
  31.             <groupId>org.springframework</groupId>  
  32.             <artifactId>spring-webmvc</artifactId>  
  33.             <version>3.2.0.RELEASE</version>  
  34.         </dependency>  
  35.         <dependency>  
  36.             <groupId>org.springframework</groupId>  
  37.             <artifactId>spring-test</artifactId>  
  38.             <version>3.2.0.RELEASE</version>  
  39.         </dependency>  
  40.   
  41.         <!-- JSTL -->  
  42.         <dependency>  
  43.             <groupId>jstl</groupId>  
  44.             <artifactId>jstl</artifactId>  
  45.             <version>1.2</version>  
  46.         </dependency>  
  47.         <dependency>  
  48.             <groupId>taglibs</groupId>  
  49.             <artifactId>standard</artifactId>  
  50.             <version>1.1.2</version>  
  51.         </dependency>  
  52.         <!-- slf4j -->  
  53.         <dependency>  
  54.             <groupId>org.slf4j</groupId>  
  55.             <artifactId>slf4j-api</artifactId>  
  56.             <version>1.7.10</version>  
  57.         </dependency>  
  58.         <dependency>  
  59.             <groupId>org.slf4j</groupId>  
  60.             <artifactId>slf4j-log4j12</artifactId>  
  61.             <version>1.7.10</version>  
  62.         </dependency>  
  63.   
  64.         <!-- elasticsearch -->  
  65.         <dependency>  
  66.             <groupId>org.elasticsearch</groupId>  
  67.             <artifactId>elasticsearch</artifactId>  
  68.             <version>2.2.0</version>  
  69.         </dependency>  
  70.   
  71.         <!-- habse -->  
  72.         <dependency>  
  73.             <groupId>org.apache.hbase</groupId>  
  74.             <artifactId>hbase-client</artifactId>  
  75.             <version>1.1.3</version>  
  76.             <exclusions>  
  77.                 <exclusion>  
  78.                     <groupId>com.google.guava</groupId>  
  79.                     <artifactId>guava</artifactId>  
  80.                 </exclusion>  
  81.             </exclusions>  
  82.         </dependency>  
  83.   
  84.   
  85.     </dependencies>  
复制代码

2、Dao层
  1. private Integer id;  
  2. private String title;  
  3.   
  4. private String describe;  
  5.   
  6. private String content;  
  7.   
  8. private String author;  
复制代码
实现其getter/setter方法。

3、数据准备
在桌面新建一个doc1.txt文档,用于把我们需要查询的数据写入到里面,这里我只准备了5条数据。中间用tab键隔开。
20170101161016542.jpg

4、在hbase中建立表。
表名师doc,列族是cf。
  1. public static void main(String[] args) throws Exception {
  2.       HbaseUtils hbase = new HbaseUtils();
  3.       //创建一张表
  4.         hbase.createTable("doc","cf");
  5. }
  6. /**
  7. * 创建一张表
  8. * @param tableName
  9. * @param column
  10. * @throws Exception
  11. */
  12. public void createTable(String tableName, String column) throws Exception {
  13.    if(admin.tableExists(TableName.valueOf(tableName))){
  14.       System.out.println(tableName+"表已经存在!");
  15.    }else{
  16.       HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
  17.       tableDesc.addFamily(new HColumnDescriptor(column.getBytes()));
  18.       admin.createTable(tableDesc);
  19.       System.out.println(tableName+"表创建成功!");
  20.    }
  21. }
复制代码


5、导入索引。

这一步的时候确保你的hdfs和hbase以及elasticsearch是处于开启状态。
  1. @Test  
  2.   public void createIndex() throws Exception {  
  3.       List<Doc> arrayList = new ArrayList<Doc>();  
  4.       File file = new File("C:\\Users\\asus\\Desktop\\doc1.txt");  
  5.       List<String> list = FileUtils.readLines(file,"UTF8");  
  6.       for(String line : list){  
  7.           Doc Doc = new Doc();  
  8.           String[] split = line.split("\t");  
  9.           System.out.print(split[0]);  
  10.           int parseInt = Integer.parseInt(split[0].trim());  
  11.           Doc.setId(parseInt);  
  12.           Doc.setTitle(split[1]);  
  13.           Doc.setAuthor(split[2]);  
  14.           Doc.setDescribe(split[3]);  
  15.           Doc.setContent(split[3]);  
  16.           arrayList.add(Doc);  
  17.       }  
  18.       HbaseUtils hbaseUtils = new HbaseUtils();  
  19.       for (Doc Doc : arrayList) {  
  20.           try {  
  21.               //把数据插入hbase  
  22.               hbaseUtils.put(hbaseUtils.TABLE_NAME, Doc.getId()+"", hbaseUtils.COLUMNFAMILY_1, hbaseUtils.COLUMNFAMILY_1_TITLE, Doc.getTitle());  
  23.               hbaseUtils.put(hbaseUtils.TABLE_NAME, Doc.getId()+"", hbaseUtils.COLUMNFAMILY_1, hbaseUtils.COLUMNFAMILY_1_AUTHOR, Doc.getAuthor());  
  24.               hbaseUtils.put(hbaseUtils.TABLE_NAME, Doc.getId()+"", hbaseUtils.COLUMNFAMILY_1, hbaseUtils.COLUMNFAMILY_1_DESCRIBE, Doc.getDescribe());  
  25.               hbaseUtils.put(hbaseUtils.TABLE_NAME, Doc.getId()+"", hbaseUtils.COLUMNFAMILY_1, hbaseUtils.COLUMNFAMILY_1_CONTENT, Doc.getContent());  
  26.               //把数据插入es  
  27.               Esutil.addIndex("tfjt","doc", Doc);  
  28.           } catch (Exception e) {  
  29.               e.printStackTrace();  
  30.           }  
  31.       }  
  32.   }  
复制代码

数据导入成功之后可以在服务器上通过命令查看一下:
curl -XGET http://node1:9200/tfjt/_search

20170101163005143.jpg

7、搜索。
在这里新建了一个工具类Esutil.java,主要用于处理搜索的。注意,我们默认的elasticsearch是9200端口的,这里数据传输用的是9300,不要写成9200了,然后就是集群名字为tf,也就是前面配置的集群名。还有就是主机名node1-node3,这里不能写ip地址,如果是本地测试的话,你需要在你的window下面配置hosts文件。
  1. public class Esutil {  
  2.     public static Client client = null;  
  3.   
  4.         /**
  5.          * 获取客户端
  6.          * @return
  7.          */  
  8.         public static  Client getClient() {  
  9.             if(client!=null){  
  10.                 return client;  
  11.             }  
  12.             Settings settings = Settings.settingsBuilder().put("cluster.name", "tf").build();  
  13.             try {  
  14.                 client = TransportClient.builder().settings(settings).build()  
  15.                         .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("node1"), 9300))  
  16.                         .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("node2"), 9300))  
  17.                         .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("node3"), 9300));  
  18.             } catch (UnknownHostException e) {  
  19.                 e.printStackTrace();  
  20.             }  
  21.             return client;  
  22.         }  
  23.       
  24.       
  25.       
  26.       
  27.     public static String addIndex(String index,String type,Doc Doc){  
  28.         HashMap<String, Object> hashMap = new HashMap<String, Object>();  
  29.         hashMap.put("id", Doc.getId());  
  30.         hashMap.put("title", Doc.getTitle());  
  31.         hashMap.put("describe", Doc.getDescribe());  
  32.         hashMap.put("author", Doc.getAuthor());  
  33.          
  34.         IndexResponse response = getClient().prepareIndex(index, type).setSource(hashMap).execute().actionGet();  
  35.         return response.getId();  
  36.     }  
  37.       
  38.       
  39.     public static Map<String, Object> search(String key,String index,String type,int start,int row){  
  40.         SearchRequestBuilder builder = getClient().prepareSearch(index);  
  41.         builder.setTypes(type);  
  42.         builder.setFrom(start);  
  43.         builder.setSize(row);  
  44.         //设置高亮字段名称  
  45.         builder.addHighlightedField("title");  
  46.         builder.addHighlightedField("describe");  
  47.         //设置高亮前缀  
  48.         builder.setHighlighterPreTags("<font color='red' >");  
  49.         //设置高亮后缀  
  50.         builder.setHighlighterPostTags("</font>");  
  51.         builder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);  
  52.         if(StringUtils.isNotBlank(key)){  
  53. //          builder.setQuery(QueryBuilders.termQuery("title",key));  
  54.             builder.setQuery(QueryBuilders.multiMatchQuery(key, "title","describe"));  
  55.         }  
  56.         builder.setExplain(true);  
  57.         SearchResponse searchResponse = builder.get();  
  58.          
  59.         SearchHits hits = searchResponse.getHits();  
  60.         long total = hits.getTotalHits();  
  61.         Map<String, Object> map = new HashMap<String,Object>();  
  62.         SearchHit[] hits2 = hits.getHits();  
  63.         map.put("count", total);  
  64.         List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();  
  65.         for (SearchHit searchHit : hits2) {  
  66.             Map<String, HighlightField> highlightFields = searchHit.getHighlightFields();  
  67.             HighlightField highlightField = highlightFields.get("title");  
  68.             Map<String, Object> source = searchHit.getSource();  
  69.             if(highlightField!=null){  
  70.                 Text[] fragments = highlightField.fragments();  
  71.                 String name = "";  
  72.                 for (Text text : fragments) {  
  73.                     name+=text;  
  74.                 }  
  75.                 source.put("title", name);  
  76.             }  
  77.             HighlightField highlightField2 = highlightFields.get("describe");  
  78.             if(highlightField2!=null){  
  79.                 Text[] fragments = highlightField2.fragments();  
  80.                 String describe = "";  
  81.                 for (Text text : fragments) {  
  82.                     describe+=text;  
  83.                 }  
  84.                 source.put("describe", describe);  
  85.             }  
  86.             list.add(source);  
  87.         }  
  88.         map.put("dataList", list);  
  89.         return map;  
  90.     }  
  91.   
  92. //  public static void main(String[] args) {  
  93. //      Map<String, Object> search = Esutil.search("hbase", "tfjt", "doc", 0, 10);  
  94. //      List<Map<String, Object>> list = (List<Map<String, Object>>) search.get("dataList");  
  95. //  }  
  96. }  
复制代码

8、使用spring控制层处理
在里面的spring配置这里就不说了,代码文末提供。
  1. @RequestMapping("/search.do")  
  2. public String serachArticle(Model model,  
  3.         @RequestParam(value="keyWords",required = false) String keyWords,  
  4.         @RequestParam(value = "pageNum", defaultValue = "1") Integer pageNum,  
  5.         @RequestParam(value = "pageSize", defaultValue = "3") Integer pageSize){  
  6.     try {  
  7.         keyWords = new String(keyWords.getBytes("ISO-8859-1"),"UTF-8");  
  8.     } catch (UnsupportedEncodingException e) {  
  9.         e.printStackTrace();  
  10.     }  
  11.     Map<String,Object> map = new HashMap<String, Object>();  
  12.     int count = 0;  
  13.     try {  
  14.         map = Esutil.search(keyWords,"tfjt","doc",(pageNum-1)*pageSize, pageSize);  
  15.         count = Integer.parseInt(((Long) map.get("count")).toString());  
  16.     } catch (Exception e) {  
  17.         logger.error("查询索引错误!{}",e);  
  18.         e.printStackTrace();  
  19.     }  
  20.     PageUtil<Map<String, Object>> page = new PageUtil<Map<String, Object>>(String.valueOf(pageNum),String.valueOf(pageSize),count);  
  21.     List<Map<String, Object>> articleList = (List<Map<String, Object>>)map.get("dataList");  
  22.     page.setList(articleList);  
  23.     model.addAttribute("total",count);  
  24.     model.addAttribute("pageNum",pageNum);  
  25.     model.addAttribute("page",page);  
  26.     model.addAttribute("kw",keyWords);  
  27.     return "index.jsp";  
  28. }  
复制代码

9、页面
  1. <center>  
  2. <form action="search.do" method="get">  
  3.   <input type="text" name="keyWords" />  
  4.   <input type="submit" value="百度一下">  
  5.   <input type="hidden" value="1" name="pageNum">  
  6. </form>  
  7. <c:if test="${! empty page.list }">  
  8. <h3>百度为您找到相关结果约${total}个</h3>  
  9. <c:forEach items="${page.list}" var="bean">  
  10.   <a href="/es/detailDocById/${bean.id}.do">${bean.title}</a>  
  11.   <br/>  
  12.   <br/>  
  13.   <span>${bean.describe}</span>  
  14.   <br/>  
  15.   <br/>  
  16. </c:forEach>  
  17.   
  18. <c:if test="${page.hasPrevious }">  
  19.   <a href="search.do?pageNum=${page.previousPageNum }&keyWords=${kw}"> 上一页</a>  
  20. </c:if>  
  21. <c:forEach begin="${page.everyPageStart }" end="${page.everyPageEnd }" var="n">  
  22.   <a href="search.do?pageNum=${n }&keyWords=${kw}"> ${n }</a>     
  23. </c:forEach>  
  24.   
  25. <c:if test="${page.hasNext }">  
  26.   <a href="search.do?pageNum=${page.nextPageNum }&keyWords=${kw}"> 下一页</a>  
  27. </c:if>  
  28. </c:if>  
  29. </center>  
复制代码

10、发布项目
项目发布在IntelliJ IDEA 中配置好常用的项目,这里发布名Application context名字为es,当然你也可以自定义设置。

20170101163334348.jpg

最终效果如下:搜索COS会得到结果,速度非常快。
20170101162525173.jpg

总结:这个案例的操作流程还是挺多的,要有细心和耐心,特别是服务器配置,各种版本要匹配好,不然会出各种头疼的问题,当然了,这个还是需要有一定基础,不然搞不定这个事情。。。。。

源码地址:https://github.com/sdksdk0/es


来源:CSDN
作者:朱培


已有(11)人评论

跳转到指定楼层
leletuo2012 发表于 2017-1-6 09:34:27
es是solr的50倍 这个数据从何而得
回复

使用道具 举报

supertianxiang 发表于 2017-1-6 09:39:58
大牛,你好,没看到HBASE怎么和ES关联起来的呀
回复

使用道具 举报

wuwei 发表于 2017-1-6 09:47:19
没看出来hbase有啥用啊
回复

使用道具 举报

nextuser 发表于 2017-1-6 10:04:08
wuwei 发表于 2017-1-6 09:47
没看出来hbase有啥用啊

导入索引那块
回复

使用道具 举报

wuwei 发表于 2017-1-6 10:43:52

那他就是单单拿hbase做底层存储,数据备份了。然而es也并没有从hbase取数据。

点评

更多可以查看下源码  发表于 2017-1-6 18:10
回复

使用道具 举报

karo_lee 发表于 2017-1-6 18:11:42
是啊 hbase没有用到额??
回复

使用道具 举报

einhep 发表于 2017-1-9 19:58:16
supertianxiang 发表于 2017-1-6 09:39
大牛,你好,没看到HBASE怎么和ES关联起来的呀

hbase存储数据,es存储索引。通过查询讲查询的数据,插入es,es找到索引,然后根据索引找到对应的rowkey,然后找到对应的数据。
这就是二者的关系。其实hbase有二级索引这么一说,es个人认为可以是多级索引。这样hbase查询速度杠杠的
回复

使用道具 举报

supertianxiang 发表于 2017-1-10 09:18:56
einhep 发表于 2017-1-9 19:58
hbase存储数据,es存储索引。通过查询讲查询的数据,插入es,es找到索引,然后根据索引找到对应的rowkey ...

     的确ES可以存储索引,HBASE可以存储数据,但是楼主代码中可以看出ES中存储有完整的数据+索引,只是在导入数据的时候,ES和HBASE中各导入了一份,后面也没有看到那里用到了HBASE。
     按楼主的思路推断的话,我觉得应该是ES提供搜索服务,HBASE只是用于数据备份(其实跟搜索没有关系),对于以后增量或实时数据数据如何处理也没有提到,所以看的不是太明白。
     感谢
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条