分享

基于Solr的HBase多条件查询测试

xioaxu790 2014-10-31 09:21:40 发表于 实操演练 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 6 18039
问题导读
1、基于Solr的HBase多条件查询原理是什么?
2、读取到了HBase源表的数据后,怎么在Solr中建立索引?
3、如何通过java客户端结合Solr查询HBase数据?







背景:
某电信项目中采用HBase来存储用户终端明细数据,供前台页面即时查询。HBase无可置疑拥有其优势,但其本身只对rowkey支持毫秒级的快速检索,对于多字段的组合查询却无能为力。针对HBase的多条件查询也有多种方案,但是这些方案要么太复杂,要么效率太低,本文只对基于Solr的HBase多条件查询方案进行测试和验证。


原理:
基于Solr的HBase多条件查询原理很简单,将HBase表中涉及条件过滤的字段和rowkey在Solr中建立索引,通过Solr的多条件查询快速获得符合过滤条件的rowkey值,拿到这些rowkey之后在HBASE中通过指定rowkey进行查询。
1.PNG



测试环境:
solr 4.0.0版本,使用其自带的jetty服务端容器,单节点;

hbase-0.94.2-cdh4.2.1,10台Lunux服务器组成的HBase集群。

HBase中2512万条数据172个字段;

Solr索引HBase中的100万条数据;

测试结果:
1、100万条数据在Solr中对8个字段建立索引。在Solr中最多8个过滤条件获取51316条数据的rowkey值,基本在57-80毫秒。根据Solr返回的rowkey值在HBase表中获取所有51316条数据12个字段值,耗时基本在15秒;

2、数据量同上,过滤条件同上,采用Solr分页查询,每次获取20条数据,Solr获得20个rowkey值耗时4-10毫秒,拿到Solr传入的rowkey值在HBase中获取对应20条12个字段的数据,耗时6毫秒。

以下列出测试环境的搭建、以及相关代码实现过程。


一、Solr环境的搭建
因为初衷只是测试Solr的使用,Solr的运行环境也只是用了其自带的jetty,而非大多人用的Tomcat;没有搭建Solr集群,只是一个单一的Solr服务端,也没有任何参数调优。

1)在Apache网站上下载Solr 4:http://lucene.apache.org/solr/downloads.html,我们这里下载的是“apache-solr-4.0.0.tgz”;

2)在当前目录解压Solr压缩包:
  1. cd /opt
  2. tar -xvzf apache-solr-4.0.0.tgz
复制代码



3)修改Solr的配置文件schema.xml,添加我们需要索引的多个字段(配置文件位于“/opt/apache-solr-4.0.0/example/solr/collection1/conf/”)
  1.    <field name="rowkey" type="string" indexed="true" stored="true" required="true" multiValued="false" />
  2.    <field name="time" type="string" indexed="true" stored="true" required="false" multiValued="false" />
  3.    <field name="tebid" type="string" indexed="true" stored="true" required="false" multiValued="false" />
  4.    <field name="tetid" type="string" indexed="true" stored="true" required="false" multiValued="false" />
  5.    <field name="puid" type="string" indexed="true" stored="true" required="false" multiValued="false" />
  6.    <field name="mgcvid" type="string" indexed="true" stored="true" required="false" multiValued="false" />
  7.    <field name="mtcvid" type="string" indexed="true" stored="true" required="false" multiValued="false" />
  8.    <field name="smaid" type="string" indexed="true" stored="true" required="false" multiValued="false" />
  9.    <field name="mtlkid" type="string" indexed="true" stored="true" required="false" multiValued="false" />
复制代码


另外关键的一点是修改原有的uniqueKey,本文设置HBase表的rowkey字段为Solr索引的uniqueKey:
  1. <uniqueKey>rowkey</uniqueKey>
复制代码


type 参数代表索引数据类型,我这里将type全部设置为string是为了避免异常类型的数据导致索引建立失败,正常情况下应该根据实际字段类型设置,比如整型字段设置为int,更加有利于索引的建立和检索;

indexed 参数代表此字段是否建立索引,根据实际情况设置,建议不参与条件过滤的字段一律设置为false;

stored 参数代表是否存储此字段的值,建议根据实际需求只将需要获取值的字段设置为true,以免浪费存储,比如我们的场景只需要获取rowkey,那么只需把rowkey字段设置为true即可,其他字段全部设置flase;

required 参数代表此字段是否必需,如果数据源某个字段可能存在空值,那么此属性必需设置为false,不然Solr会抛出异常;

multiValued 参数代表此字段是否允许有多个值,通常都设置为false,根据实际需求可设置为true。

4)我们使用Solr自带的example来作为运行环境,定位到example目录,启动服务监听:
  1. cd /opt/apache-solr-4.0.0/example
  2. java -jar ./start.jar
复制代码


如果启动成功,可以通过浏览器打开此页面:http://192.168.1.10:8983/solr/
1.PNG



二、读取HBase源表的数据,在Solr中建立索引
一种方案是通过HBase的普通API获取数据建立索引,此方案的缺点是效率较低每秒只能处理100多条数据(或许可以通过多线程提高效率):

  1. package com.ultrapower.hbase.solrhbase;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.hbase.HBaseConfiguration;
  5. import org.apache.hadoop.hbase.KeyValue;
  6. import org.apache.hadoop.hbase.client.HTable;
  7. import org.apache.hadoop.hbase.client.Result;
  8. import org.apache.hadoop.hbase.client.ResultScanner;
  9. import org.apache.hadoop.hbase.client.Scan;
  10. import org.apache.hadoop.hbase.util.Bytes;
  11. import org.apache.solr.client.solrj.SolrServerException;
  12. import org.apache.solr.client.solrj.impl.HttpSolrServer;
  13. import org.apache.solr.common.SolrInputDocument;
  14. public class SolrIndexer {
  15.     /**
  16.      * @param args
  17.      * @throws IOException
  18.      * @throws SolrServerException
  19.      */
  20.     public static void main(String[] args) throws IOException,
  21.             SolrServerException {
  22.         final Configuration conf;
  23.         HttpSolrServer solrServer = new HttpSolrServer(
  24.                 "http://192.168.1.10:8983/solr"); // 因为服务端是用的Solr自带的jetty容器,默认端口号是8983
  25.         conf = HBaseConfiguration.create();
  26.         HTable table = new HTable(conf, "hb_app_xxxxxx"); // 这里指定HBase表名称
  27.         Scan scan = new Scan();
  28.         scan.addFamily(Bytes.toBytes("d")); // 这里指定HBase表的列族
  29.         scan.setCaching(500);
  30.         scan.setCacheBlocks(false);
  31.         ResultScanner ss = table.getScanner(scan);
  32.         System.out.println("start ...");
  33.         int i = 0;
  34.         try {
  35.             for (Result r : ss) {
  36.                 SolrInputDocument solrDoc = new SolrInputDocument();
  37.                 solrDoc.addField("rowkey", new String(r.getRow()));
  38.                 for (KeyValue kv : r.raw()) {
  39.                     String fieldName = new String(kv.getQualifier());
  40.                     String fieldValue = new String(kv.getValue());
  41.                     if (fieldName.equalsIgnoreCase("time")
  42.                             || fieldName.equalsIgnoreCase("tebid")
  43.                             || fieldName.equalsIgnoreCase("tetid")
  44.                             || fieldName.equalsIgnoreCase("puid")
  45.                             || fieldName.equalsIgnoreCase("mgcvid")
  46.                             || fieldName.equalsIgnoreCase("mtcvid")
  47.                             || fieldName.equalsIgnoreCase("smaid")
  48.                             || fieldName.equalsIgnoreCase("mtlkid")) {
  49.                         solrDoc.addField(fieldName, fieldValue);
  50.                     }
  51.                 }
  52.                 solrServer.add(solrDoc);
  53.                 solrServer.commit(true, true, true);
  54.                 i = i + 1;
  55.                 System.out.println("已经成功处理 " + i + " 条数据");
  56.             }
  57.             ss.close();
  58.             table.close();
  59.             System.out.println("done !");
  60.         } catch (IOException e) {
  61.         } finally {
  62.             ss.close();
  63.             table.close();
  64.             System.out.println("erro !");
  65.         }
  66.     }
  67. }
复制代码


另外一种方案是用到HBase的Mapreduce框架,分布式并行执行效率特别高,处理1000万条数据仅需5分钟,但是这种高并发需要对Solr服务器进行配置调优,不然会抛出服务器无法响应的异常:
  1. Error: org.apache.solr.common.SolrException: Server at http://192.168.1.10:8983/solr returned non ok status:503, message:Service Unavailable
复制代码



MapReduce入口程序:

  1. package com.ultrapower.hbase.solrhbase;
  2. import java.io.IOException;
  3. import java.net.URISyntaxException;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.hbase.HBaseConfiguration;
  6. import org.apache.hadoop.hbase.client.Scan;
  7. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
  8. import org.apache.hadoop.hbase.util.Bytes;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
  11. public class SolrHBaseIndexer {
  12.     private static void usage() {
  13.         System.err.println("输入参数: <配置文件路径> <起始行> <结束行>");
  14.         System.exit(1);
  15.     }
  16.     private static Configuration conf;
  17.     public static void main(String[] args) throws IOException,
  18.             InterruptedException, ClassNotFoundException, URISyntaxException {
  19.         if (args.length == 0 || args.length > 3) {
  20.             usage();
  21.         }
  22.         createHBaseConfiguration(args[0]);
  23.         ConfigProperties tutorialProperties = new ConfigProperties(args[0]);
  24.         String tbName = tutorialProperties.getHBTbName();
  25.         String tbFamily = tutorialProperties.getHBFamily();
  26.         Job job = new Job(conf, "SolrHBaseIndexer");
  27.         job.setJarByClass(SolrHBaseIndexer.class);
  28.         Scan scan = new Scan();
  29.         if (args.length == 3) {
  30.             scan.setStartRow(Bytes.toBytes(args[1]));
  31.             scan.setStopRow(Bytes.toBytes(args[2]));
  32.         }
  33.         scan.addFamily(Bytes.toBytes(tbFamily));
  34.         scan.setCaching(500); // 设置缓存数据量来提高效率
  35.         scan.setCacheBlocks(false);
  36.         // 创建Map任务
  37.         TableMapReduceUtil.initTableMapperJob(tbName, scan,
  38.                 SolrHBaseIndexerMapper.class, null, null, job);
  39.         // 不需要输出
  40.         job.setOutputFormatClass(NullOutputFormat.class);
  41.         // job.setNumReduceTasks(0);
  42.         System.exit(job.waitForCompletion(true) ? 0 : 1);
  43.     }
  44.     /**
  45.      * 从配置文件读取并设置HBase配置信息
  46.      *
  47.      * @param propsLocation
  48.      * @return
  49.      */
  50.     private static void createHBaseConfiguration(String propsLocation) {
  51.         ConfigProperties tutorialProperties = new ConfigProperties(
  52.                 propsLocation);
  53.         conf = HBaseConfiguration.create();
  54.         conf.set("hbase.zookeeper.quorum", tutorialProperties.getZKQuorum());
  55.         conf.set("hbase.zookeeper.property.clientPort",
  56.                 tutorialProperties.getZKPort());
  57.         conf.set("hbase.master", tutorialProperties.getHBMaster());
  58.         conf.set("hbase.rootdir", tutorialProperties.getHBrootDir());
  59.         conf.set("solr.server", tutorialProperties.getSolrServer());
  60.     }
  61. }
复制代码


对应的Mapper:

  1. package com.ultrapower.hbase.solrhbase;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.hbase.KeyValue;
  5. import org.apache.hadoop.hbase.client.Result;
  6. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  7. import org.apache.hadoop.hbase.mapreduce.TableMapper;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.solr.client.solrj.SolrServerException;
  10. import org.apache.solr.client.solrj.impl.HttpSolrServer;
  11. import org.apache.solr.common.SolrInputDocument;
  12. public class SolrHBaseIndexerMapper extends TableMapper<Text, Text> {
  13.     public void map(ImmutableBytesWritable key, Result hbaseResult,
  14.             Context context) throws InterruptedException, IOException {
  15.         Configuration conf = context.getConfiguration();
  16.         HttpSolrServer solrServer = new HttpSolrServer(conf.get("solr.server"));
  17.         solrServer.setDefaultMaxConnectionsPerHost(100);
  18.         solrServer.setMaxTotalConnections(1000);
  19.         solrServer.setSoTimeout(20000);
  20.         solrServer.setConnectionTimeout(20000);
  21.         SolrInputDocument solrDoc = new SolrInputDocument();
  22.         try {
  23.             solrDoc.addField("rowkey", new String(hbaseResult.getRow()));
  24.             for (KeyValue rowQualifierAndValue : hbaseResult.list()) {
  25.                 String fieldName = new String(
  26.                         rowQualifierAndValue.getQualifier());
  27.                 String fieldValue = new String(rowQualifierAndValue.getValue());
  28.                 if (fieldName.equalsIgnoreCase("time")
  29.                         || fieldName.equalsIgnoreCase("tebid")
  30.                         || fieldName.equalsIgnoreCase("tetid")
  31.                         || fieldName.equalsIgnoreCase("puid")
  32.                         || fieldName.equalsIgnoreCase("mgcvid")
  33.                         || fieldName.equalsIgnoreCase("mtcvid")
  34.                         || fieldName.equalsIgnoreCase("smaid")
  35.                         || fieldName.equalsIgnoreCase("mtlkid")) {
  36.                     solrDoc.addField(fieldName, fieldValue);
  37.                 }
  38.             }
  39.             solrServer.add(solrDoc);
  40.             solrServer.commit(true, true, true);
  41.         } catch (SolrServerException e) {
  42.             System.err.println("更新Solr索引异常:" + new String(hbaseResult.getRow()));
  43.         }
  44.     }
  45. }
复制代码


读取参数配置文件的辅助类:

  1. package com.ultrapower.hbase.solrhbase;
  2. import java.io.File;
  3. import java.io.FileReader;
  4. import java.io.IOException;
  5. import java.util.Properties;
  6. public class ConfigProperties {
  7.     private static Properties props;
  8.     private String HBASE_ZOOKEEPER_QUORUM;
  9.     private String HBASE_ZOOKEEPER_PROPERTY_CLIENT_PORT;
  10.     private String HBASE_MASTER;
  11.     private String HBASE_ROOTDIR;
  12.     private String DFS_NAME_DIR;
  13.     private String DFS_DATA_DIR;
  14.     private String FS_DEFAULT_NAME;
  15.     private String SOLR_SERVER; // Solr服务器地址
  16.     private String HBASE_TABLE_NAME; // 需要建立Solr索引的HBase表名称
  17.     private String HBASE_TABLE_FAMILY; // HBase表的列族
  18.     public ConfigProperties(String propLocation) {
  19.         props = new Properties();
  20.         try {
  21.             File file = new File(propLocation);
  22.             System.out.println("从以下位置加载配置文件: " + file.getAbsolutePath());
  23.             FileReader is = new FileReader(file);
  24.             props.load(is);
  25.             HBASE_ZOOKEEPER_QUORUM = props.getProperty("HBASE_ZOOKEEPER_QUORUM");
  26.             HBASE_ZOOKEEPER_PROPERTY_CLIENT_PORT = props.getProperty("HBASE_ZOOKEEPER_PROPERTY_CLIENT_PORT");
  27.             HBASE_MASTER = props.getProperty("HBASE_MASTER");
  28.             HBASE_ROOTDIR = props.getProperty("HBASE_ROOTDIR");
  29.             DFS_NAME_DIR = props.getProperty("DFS_NAME_DIR");
  30.             DFS_DATA_DIR = props.getProperty("DFS_DATA_DIR");
  31.             FS_DEFAULT_NAME = props.getProperty("FS_DEFAULT_NAME");
  32.             SOLR_SERVER = props.getProperty("SOLR_SERVER");
  33.             HBASE_TABLE_NAME = props.getProperty("HBASE_TABLE_NAME");
  34.             HBASE_TABLE_FAMILY = props.getProperty("HBASE_TABLE_FAMILY");
  35.         } catch (IOException e) {
  36.             throw new RuntimeException("加载配置文件出错");
  37.         } catch (NullPointerException e) {
  38.             throw new RuntimeException("文件不存在");
  39.         }
  40.     }
  41.     public String getZKQuorum() {
  42.         return HBASE_ZOOKEEPER_QUORUM;
  43.     }
  44.     public String getZKPort() {
  45.         return HBASE_ZOOKEEPER_PROPERTY_CLIENT_PORT;
  46.     }
  47.     public String getHBMaster() {
  48.         return HBASE_MASTER;
  49.     }
  50.     public String getHBrootDir() {
  51.         return HBASE_ROOTDIR;
  52.     }
  53.     public String getDFSnameDir() {
  54.         return DFS_NAME_DIR;
  55.     }
  56.     public String getDFSdataDir() {
  57.         return DFS_DATA_DIR;
  58.     }
  59.     public String getFSdefaultName() {
  60.         return FS_DEFAULT_NAME;
  61.     }
  62.     public String getSolrServer() {
  63.         return SOLR_SERVER;
  64.     }
  65.     public String getHBTbName() {
  66.         return HBASE_TABLE_NAME;
  67.     }
  68.     public String getHBFamily() {
  69.         return HBASE_TABLE_FAMILY;
  70.     }
  71. }
复制代码


参数配置文件“config.properties”:

  1. HBASE_ZOOKEEPER_QUORUM=slave-1,slave-2,slave-3,slave-4,slave-5
  2. HBASE_ZOOKEEPER_PROPERTY_CLIENT_PORT=2181
  3. HBASE_MASTER=master-1:60000
  4. HBASE_ROOTDIR=hdfs:///hbase
  5. DFS_NAME_DIR=/opt/data/dfs/name
  6. DFS_DATA_DIR=/opt/data/d0/dfs2/data
  7. FS_DEFAULT_NAME=hdfs://192.168.1.10:9000
  8. SOLR_SERVER=http://192.168.1.10:8983/solr
  9. HBASE_TABLE_NAME=hb_app_m_user_te
  10. HBASE_TABLE_FAMILY=d
复制代码


三、结合Solr进行HBase数据的多条件查询:
可以通过web页面操作Solr索引,

查询:
  1. http://192.168.1.10:8983/solr/select?(time:201307 AND tetid:1 AND mgcvid:101 AND smaid:101 AND puid:102)
复制代码
1.PNG


删除所有索引:
  1. http://192.168.1.10:8983/solr/update/?stream.body=<delete><query>*:*</query></delete>&stream.contentType=text/xml;charset=utf-8&commit=true
复制代码


通过java客户端结合Solr查询HBase数据:
  1. package com.ultrapower.hbase.solrhbase;
  2. import java.io.IOException;
  3. import java.nio.ByteBuffer;
  4. import java.util.ArrayList;
  5. import java.util.List;
  6. import org.apache.hadoop.conf.Configuration;
  7. import org.apache.hadoop.hbase.HBaseConfiguration;
  8. import org.apache.hadoop.hbase.client.Get;
  9. import org.apache.hadoop.hbase.client.HTable;
  10. import org.apache.hadoop.hbase.client.Result;
  11. import org.apache.hadoop.hbase.util.Bytes;
  12. import org.apache.solr.client.solrj.SolrQuery;
  13. import org.apache.solr.client.solrj.SolrServer;
  14. import org.apache.solr.client.solrj.SolrServerException;
  15. import org.apache.solr.client.solrj.impl.HttpSolrServer;
  16. import org.apache.solr.client.solrj.response.QueryResponse;
  17. import org.apache.solr.common.SolrDocument;
  18. import org.apache.solr.common.SolrDocumentList;
  19. public class QueryData {
  20.     /**
  21.      * @param args
  22.      * @throws SolrServerException
  23.      * @throws IOException
  24.      */
  25.     public static void main(String[] args) throws SolrServerException, IOException {
  26.         final Configuration conf;
  27.         conf = HBaseConfiguration.create();
  28.         HTable table = new HTable(conf, "hb_app_m_user_te");
  29.         Get get = null;
  30.         List<Get> list = new ArrayList<Get>();
  31.         
  32.         String url = "http://192.168.1.10:8983/solr";
  33.         SolrServer server = new HttpSolrServer(url);
  34.         SolrQuery query = new SolrQuery("time:201307 AND tetid:1 AND mgcvid:101 AND smaid:101 AND puid:102");
  35.         query.setStart(0); //数据起始行,分页用
  36.         query.setRows(10); //返回记录数,分页用
  37.         QueryResponse response = server.query(query);
  38.         SolrDocumentList docs = response.getResults();
  39.         System.out.println("文档个数:" + docs.getNumFound()); //数据总条数也可轻易获取
  40.         System.out.println("查询时间:" + response.getQTime());
  41.         for (SolrDocument doc : docs) {
  42.             get = new Get(Bytes.toBytes((String) doc.getFieldValue("rowkey")));
  43.             list.add(get);
  44.         }
  45.         
  46.         Result[] res = table.get(list);
  47.         
  48.         byte[] bt1 = null;
  49.         byte[] bt2 = null;
  50.         byte[] bt3 = null;
  51.         byte[] bt4 = null;
  52.         String str1 = null;
  53.         String str2 = null;
  54.         String str3 = null;
  55.         String str4 = null;
  56.         for (Result rs : res) {
  57.             bt1 = rs.getValue("d".getBytes(), "3mpon".getBytes());
  58.             bt2 = rs.getValue("d".getBytes(), "3mponid".getBytes());
  59.             bt3 = rs.getValue("d".getBytes(), "amarpu".getBytes());
  60.             bt4 = rs.getValue("d".getBytes(), "amarpuid".getBytes());
  61.             if (bt1 != null && bt1.length>0) {str1 = new String(bt1);} else {str1 = "无数据";} //对空值进行new String的话会抛出异常
  62.             if (bt2 != null && bt2.length>0) {str2 = new String(bt2);} else {str2 = "无数据";}
  63.             if (bt3 != null && bt3.length>0) {str3 = new String(bt3);} else {str3 = "无数据";}
  64.             if (bt4 != null && bt4.length>0) {str4 = new String(bt4);} else {str4 = "无数据";}
  65.             System.out.print(new String(rs.getRow()) + " ");
  66.             System.out.print(str1 + "|");
  67.             System.out.print(str2 + "|");
  68.             System.out.print(str3 + "|");
  69.             System.out.println(str4 + "|");
  70.         }
  71.         table.close();
  72.     }
  73. }
复制代码





小结:
通过测试发现,结合Solr索引可以很好的实现HBase的多条件查询,同时还能解决其两个难点:分页查询、数据总量统计。

实际场景中大多都是分页查询,分页查询返回的数据量很少,采用此种方案完全可以达到前端页面毫秒级的实时响应;若有大批量的数据交互,比如涉及到数据导出,实际上效率也是很高,十万数据仅耗时10秒。

另外,如果真的将Solr纳入使用,Solr以及HBase端都可以不断进行优化,比如可以搭建Solr集群,甚至可以采用SolrCloud基于hadoop的分布式索引服务。

总之,HBase不能多条件过滤查询的先天性缺陷,在Solr的配合之下可以得到较好的弥补,难怪诸如新蛋科技、国美电商、苏宁电商等互联网公司以及众多游戏公司,都使用Solr来支持快速查询。


点评

严重乱码  发表于 2014-10-31 11:51

已有(7)人评论

跳转到指定楼层
xioaxu790 发表于 2014-10-31 12:09:29
有部分网友,或许看到的代码是乱码,我不知道是什么原因造成的。但我看的不是乱码。如果是我的原因,我愿意真诚弥补。欢迎大家指正
回复

使用道具 举报

anyhuayong 发表于 2014-11-1 10:17:05
好文章,谢楼主分享
回复

使用道具 举报

wusang 发表于 2014-11-26 23:51:22
正需要,感谢楼主!好文章!
回复

使用道具 举报

EASONLIU 发表于 2014-12-17 10:19:31
路过,学习学习
回复

使用道具 举报

123456ddff 发表于 2015-1-8 17:56:18
真的很给力啊
回复

使用道具 举报

stephenKang 发表于 2016-7-15 08:59:36
请问楼主索引的建立是在数据插入时直接建立,还是有单独的程序在定时建立呢?
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条