问题导读:
1、如何实现SolrJ线程池连接数据库?
2、如何实现SolrJ的CRUD操作?
HttpSolrServer 使用了Apache Commons HTTP客户端来连接Solr. 注意在Solr 4.x中, CommonsHttpSolrServer已经改变为HttpSolrServer以及StreamingUpdateSolrServer已经改变为ConcurrentUpdateSolrServer 。ConcurrentUpdateSolrServer更适合update 操作,而HttpSolrServer 更适合query操作。
添加document或是修改document。假如这个document已经存在,就会update这个document。代码片段如下:
- public void indexDocs() throws IOException, SolrServerException {
- server.setParser(new XMLResponseParser());
-
- //Adds the docs and commit them.
- Collection<SolrInputDocument> docs = new LinkedList<SolrInputDocument>();
- /* i is used as identification of a document, which is treated as unique key.*/
- SolrInputDocument doc2 ;
- /*一千条数据,花费大约一小时,生产660M。使用多线程并发执行估计更好的*/
- for(int i =10000000; i < 10000002; i++){
- doc2 = new SolrInputDocument();
- doc2.addField("customer_id", i);
- doc2.addField("name", "John Natch-" + i);
- doc2.addField("level", "VIP");
- doc2.addField("sex", "男");
- doc2.addField("address", "【【【【【金刚金刚金刚金刚金刚金】】】】" + i);
- System.out.println("add doc "+ i);
- docs.add(doc2);
- if(docs.size() == 1000){
- server.add(docs);
- server.commit();
- logger.info("commit 1000 doc "+ i);
- docs.clear();
- }
- /*
- To immediately commit after adding documents, you could use:
-
- UpdateRequest req = new UpdateRequest();
- req.setAction( UpdateRequest.ACTION.COMMIT, false, false );
- req.add( docs );
- UpdateResponse rsp = req.process( server );
- */
- }
- server.add(docs);
- server.commit();
- logger.info("Commits successfully!......");
- }
复制代码
能够执行代码前,在Solr core的配置文件shema.xml中配置具体的字段。- <!-- core 'customer' schema field definition -->
-
- <field name="customer_id" type="int" indexed="true" stored="true" required="true" multiValued="false"/>
- <field name="name" type="string" indexed="true" stored="true"/>
- <field name="sex" type="string" indexed="true" stored="false"/>
- <field name="level" type="string" indexed="true" stored="true"/>
- <field name="address" type="string" indexed="true" multiValued="true" stored="true"/>
复制代码
- <uniqueKey>customer_id</uniqueKey>
复制代码
删除操作:
- private void commitDocs(Collection<SolrInputDocument> docs){
- try {
- //server.deleteById(1) //specify the id list you want to be deleted.
- server.add(docs);
- server.commit();
- docs.clear();
- } catch (SolrServerException e) {
- logger.error("SolrServerException", e);
- } catch (IOException e) {
- logger.error("IOException", e) ;
- }
- }
复制代码
与数据集成,实现使用SolrJ操作数据库。当然,这个可以使用Solr DIH实现。两种各有其优缺点,根据实际的应用来选择具体的实现方式。- public void indexDocsWithDB(){
- PoolingDataSourceDemo dataSource = new PoolingDataSourceDemo();
- List<List<Object>> rows = dataSource.executeQuerySQL("select * from customer");
- String[] columnNames = dataSource.getColNames();
- Collection<SolrInputDocument> docs = new LinkedList<SolrInputDocument>();
- SolrInputDocument doc ;
- for(List row : rows) {
- int size = row.size() + 1;
- doc = new SolrInputDocument();
- for(int i = 1; i < size ; i++){
- doc.addField(columnNames[i], row.get(i-1)) ;
- logger.info(columnNames[i]+"add filed "+ row.get(i-1)) ;
- }
- docs.add(doc);
- if(docs.size() > 100){
- commitDocs(docs);
- }
- }
- if(docs.size() > 0){
- commitDocs(docs);
- }
- }
复制代码
完整的代码:
PoolingDataSourceDemo.java 实现线程池连接数据库。
- import net.spy.memcached.compat.log.Logger;
- import net.spy.memcached.compat.log.LoggerFactory;
- import org.apache.commons.dbcp.*;
- import org.apache.commons.pool.impl.GenericObjectPool;
-
- import javax.sql.DataSource;
- import java.sql.*;
- import java.util.ArrayList;
- import java.util.LinkedList;
- import java.util.List;
-
- /**
- * @author John Liu
- * @see
- */
- public class PoolingDataSourceDemo {
-
- private final static Logger logger = LoggerFactory.getLogger(PoolingDataSourceDemo.class) ;
- /* These properties can be configured in a properties type file*/
- private final static String CONNECTION_URL = "jdbc:mysql://localhost/pythondb?autoReconnect=true";
- private final static String DRIVER_CLASS = "com.mysql.jdbc.Driver";
- private final static String USER_NAME = "elite";
- private final static String PASSWORD = "elite";
-
- private final static int MAX_ACTIVE_NUMBER = 10;
-
- private static GenericObjectPool connectionPool = null;
-
- private String[] colNames ;
-
-
-
- private static DataSource dataSource;
-
- static {
- dataSource = initDataSource();
- }
-
- public GenericObjectPool getConnectionPool() {
- return connectionPool;
- }
-
- public List<List<Object>> executeQuerySQL(String querySQL){
- Connection conn = null;
- Statement stmt = null;
- ResultSet resultSet = null;
- List<List<Object>> result = new LinkedList<List<Object>>();
- try {
- logger.info("Creating connection.");
- conn = dataSource.getConnection();
- stmt = conn.createStatement();
- resultSet = stmt.executeQuery(querySQL);
- //show the connection pool status
- printDataSourceStats();
- logger.info("Results:");
- int columnCount = resultSet.getMetaData().getColumnCount();
-
- ResultSetMetaData rsm = resultSet.getMetaData();
- colNames = new String[columnCount + 1];
- for (int i = 1; i < (columnCount + 1); i++) {
- colNames[i] = rsm.getColumnName(i).toLowerCase();
- logger.info("column name: "+ colNames[i]) ;
- }
- List<Object> list ;
- while(resultSet.next()) {
- list = new ArrayList<Object>() ;
- for(int i=1; i<= columnCount; i++) {
- Object obj = getColumnValue(rsm, resultSet, colNames, i);
- list.add(obj) ;
- }
- result.add(list);
- }
- } catch(SQLException e) {
- e.printStackTrace();
- shutdownDataSource(dataSource);
- } finally {
- try { if (resultSet != null) resultSet.close(); } catch(Exception e) { }
- try { if (stmt != null) stmt.close(); } catch(Exception e) { }
- try { if (conn != null) conn.close(); } catch(Exception e) { }
- logger.info("result size: "+ result.size());
- return result;
- }
- }
-
- public Object getColumnValue(ResultSetMetaData rsm, ResultSet rs, String[] colNames, int j) throws SQLException {
- Object f = null;
- if (colNames[j] != null) {
- switch (rsm.getColumnType(j)){
- case Types.BIGINT:{
- f = rs.getLong(j);
- break;
- }
- case Types.INTEGER: {
- f = rs.getInt(j);
- break;
- }
- case Types.DATE:{
- f = rs.getDate(j);
- break;
- }
- case Types.FLOAT:{
- f = rs.getFloat(j);
- break;
- }
- case Types.DOUBLE:{
- f = rs.getDouble(j);
- break;
- }
- case Types.TIME: {
- f = rs.getDate(j);
- break;
- }
- case Types.BOOLEAN:{
- f = rs.getBoolean(j);
- break;
- }
- default:{
- f = rs.getString(j);
- }
- }
- }
- logger.info("column value: "+ f) ;
- return f;
- }
- /**
- * [mysql]
- * #hibernate.connection.driver_class com.mysql.jdbc.Driver
- #hibernate.connection.url jdbc:mysql:///test
- #hibernate.connection.username gavin
- #hibernate.connection.password
- * @return DataSource
- */
- public static DataSource initDataSource(){
- //
- // Load JDBC Driver class.
- //
- try {
- Class.forName(DRIVER_CLASS).newInstance();
- } catch (InstantiationException e) {
- logger.error("InstantiationException error", e);
- } catch (IllegalAccessException e) {
- logger.error("IllegalAccessException error", e);
- } catch (ClassNotFoundException e) {
- logger.error("ClassNotFoundException error", e);
- }
-
- //
- // Creates an instance of GenericObjectPool that holds our
- // pool of connections object.
- //
- connectionPool = new GenericObjectPool();
- connectionPool.setMaxActive(MAX_ACTIVE_NUMBER);
-
- //
- // Creates a connection factory object which will be use by
- // the pool to create the connection object. We passes the
- // JDBC url info, username and password.
- //
- ConnectionFactory cf = new DriverManagerConnectionFactory(
- CONNECTION_URL,
- USER_NAME,
- PASSWORD);
-
- //
- // Creates a PoolableConnectionFactory that will wraps the
- // connection object created by the ConnectionFactory to add
- // object pooling functionality.
- //
- PoolableConnectionFactory pcf =
- new PoolableConnectionFactory(cf, connectionPool,
- null, null, false, true);
- return new PoolingDataSource(connectionPool);
- }
-
- public void printDataSourceStats() {
- logger.info("Max : " + getConnectionPool().getMaxActive() + "; " +
- "Active: " + getConnectionPool().getNumActive() + "; " +
- "Idle : " + getConnectionPool().getNumIdle());
- }
-
- public void shutdownDataSource(DataSource ds) throws SQLException {
- BasicDataSource bds = (BasicDataSource) ds;
- bds.close();
- }
-
- public String[] getColNames() {
- return colNames;
- }
-
- public void setColNames(String[] colNames) {
- this.colNames = colNames;
- }
-
-
- }
复制代码
SolrIndex.java 实现SolrJ的CRUD操作。- public class SolrIndex {
-
- Logger logger = LoggerFactory.getLogger(SolrIndex.class) ;
-
- /*specified the core customer url*/
- private static final String CORE_CUSTOMER_URL= "http://localhost:8088/solr/customer";
-
- private static HttpSolrServer server;
- static {
- server = new HttpSolrServer(CORE_CUSTOMER_URL);
- server.setMaxRetries(1); // defaults to 0. > 1 not recommended.
- server.setConnectionTimeout(5000); // 5 seconds to establish TCP
- // Setting the XML response parser is only required for cross
- // version compatibility and only when one side is 1.4.1 or
- // earlier and the other side is 3.1 or later.
- server.setParser(new XMLResponseParser()); // binary parser is used by default
- // The following settings are provided here for completeness.
- // They will not normally be required, and should only be used
- // after consulting javadocs to know whether they are truly required.
- server.setSoTimeout(1000); // socket read timeout
- server.setDefaultMaxConnectionsPerHost(1000);
- server.setMaxTotalConnections(1000);
- server.setFollowRedirects(false);
- // defaults to false
- // allowCompression defaults to false.
- // Server side must support gzip or deflate for this to have any effect.
- server.setAllowCompression(true);
- }
- /**
- * Index a document with specified fields in doc.
- * @throws IOException
- * @throws SolrServerException
- */
- public void indexDocs() throws IOException, SolrServerException {
- server.setParser(new XMLResponseParser());
-
- //Adds the docs and commit them.
- Collection<SolrInputDocument> docs = new LinkedList<SolrInputDocument>();
- /* i is used as identification of a document, which is treated as unique key.*/
- SolrInputDocument doc2 ;
- /*一千条数据,花费大约一小时,生产660M。使用多线程并发执行估计更好的*/
- for(int i =10000000; i < 10000002; i++){
- doc2 = new SolrInputDocument();
- doc2.addField("customer_id", i);
- doc2.addField("name", "John Natch-" + i);
- doc2.addField("level", "VIP");
- doc2.addField("sex", "男");
- doc2.addField("address", "【【【【【金刚金刚金刚金刚金刚金】】】】" + i);
- System.out.println("add doc "+ i);
- docs.add(doc2);
- if(docs.size() == 1000){
- server.add(docs);
- server.commit();
- logger.info("commit 1000 doc "+ i);
- docs.clear();
- }
- /*
- To immediately commit after adding documents, you could use:
-
- UpdateRequest req = new UpdateRequest();
- req.setAction( UpdateRequest.ACTION.COMMIT, false, false );
- req.add( docs );
- UpdateResponse rsp = req.process( server );
- */
- }
- server.add(docs);
- server.commit();
- logger.info("Commits successfully!......");
- }
-
- /**
- * solrJ与 database 集成,对数据库中的数据建立索引。当然,这个可以使用Solr DIH取代。
- */
- public void indexDocsWithDB(){
- PoolingDataSourceDemo dataSource = new PoolingDataSourceDemo();
- List<List<Object>> rows = dataSource.executeQuerySQL("select * from customer");
- String[] columnNames = dataSource.getColNames();
- Collection<SolrInputDocument> docs = new LinkedList<SolrInputDocument>();
- SolrInputDocument doc ;
- for(List row : rows) {
- int size = row.size() + 1;
- doc = new SolrInputDocument();
- for(int i = 1; i < size ; i++){
- doc.addField(columnNames[i], row.get(i-1)) ;
- logger.info(columnNames[i]+"add filed "+ row.get(i-1)) ;
- }
- docs.add(doc);
- if(docs.size() > 100){
- commitDocs(docs);
- }
- }
- if(docs.size() > 0){
- commitDocs(docs);
- }
- }
-
- private void commitDocs(Collection<SolrInputDocument> docs){
- try {
- //server.deleteById(1) //specify the id list you want to be deleted.
- server.add(docs);
- server.commit();
- docs.clear();
- } catch (SolrServerException e) {
- logger.error("SolrServerException", e);
- } catch (IOException e) {
- logger.error("IOException", e) ;
- }
- }
- /**
- * Query documents with specified query value.
- * @throws SolrServerException
- */
- public void queryDocs() throws SolrServerException {
- HttpSolrServer server = new HttpSolrServer(CORE_CUSTOMER_URL );
- server.setParser(new XMLResponseParser());
-
- /*query statement settings*/
- SolrQuery query = new SolrQuery();
- query.setQuery("李玲");
- query.setStart(0);
- query.setRows(10);
-
- QueryResponse response = server.query( query );
- SolrDocumentList documents = response.getResults();
- Iterator<SolrDocument> itr = documents.iterator();
- logger.info("id \t name");
- while (itr.hasNext()) {
- SolrDocument doc = itr.next();
- logger.info(doc.getFieldValue("customer_id") + ":" + "\t"+doc.
- getFieldValue("name"));
- }
- }
-
- public void delete(){
- try {
- server.deleteByQuery( "*:*" );
- server.commit();
- } catch (SolrServerException e) {
- logger.error("SolrServerException", e);
- } catch (IOException e) {
- logger.error("IOException", e);
- }
- }
- public static void main(String[] args){
- SolrIndex indexer = new SolrIndex();
- long startTime = System.currentTimeMillis();
-
- /*do index with specified documents*/
- try {
- indexer.indexDocs();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (SolrServerException e) {
- e.printStackTrace();
- }
-
- // try {
- // indexer.queryDocs();
- // } catch (SolrServerException e) {
- // e.printStackTrace();
- // }
- /*integration with db. It takes 1214 ms*/
- // indexer.delete();
- // indexer.indexDocsWithDB();
- System.out.println("--------It takes "+ (System.currentTimeMillis() - startTime) + " ms");
- }
-
- }
复制代码
另外,SolrJ操作Solr Cloud的机制与HttpSolrServer一样,除了Http的设置使用CloudSolrServer意外。
- <span style="color:#006600;">CloudSolrServer server = new CloudSolrServer("localhost:9983");
- server.setDefaultCollection("collection1");</span>
- SolrInputDocument doc = new SolrInputDocument();
- doc.addField( "id", "1234");
- doc.addField( "name", "A lovely summer holiday");
- server.add(doc);
- server.commit();
复制代码
运行代码前,假如下列依赖
|