分享

ZooKeeper Java API 使用样例

阿飞 2014-4-12 02:24:31 发表于 实操演练 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 9 60968
1.使用什么API可以创建ZK(ZooKeeper)连接?
2.如何关闭ZK(ZooKeeper)连接?
3.如何创建节点?
4.如何读取数据?
5.如何删除指定节点?
6.如何收到来自Server的Watcher通知后的处理?



ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务框架,包含一组简单的原语集合。通过这些原语言的组合使用,能够帮助我们解决更高层次的分布式问题。

本文主要针对ZooKeeper提供的Java API,通过实际代码讲述如何使用API。
  1. package com.taobao.taokeeper.research.sample;
  2. import java.io.IOException;
  3. import java.util.concurrent.CountDownLatch;
  4. import org.apache.zookeeper.CreateMode;
  5. import org.apache.zookeeper.KeeperException;
  6. import org.apache.zookeeper.WatchedEvent;
  7. import org.apache.zookeeper.Watcher;
  8. import org.apache.zookeeper.Watcher.Event.KeeperState;
  9. import org.apache.zookeeper.ZooDefs.Ids;
  10. import org.apache.zookeeper.ZooKeeper;
  11. import common.toolkit.java.util.ObjectUtil;
  12. /**
  13. * ZooKeeper Java Api 使用样例<br>
  14. * ZK Api Version: 3.4.3
  15. *  
  16. * @author nileader/nileader@gmail.com
  17. */
  18. public class JavaApiSample implements Watcher {
  19.     private static final int SESSION_TIMEOUT = 10000;
  20.     private static final String CONNECTION_STRING = "test.zookeeper.connection_string:2181";
  21.     private static final String ZK_PATH = "/nileader";
  22.     private ZooKeeper zk = null;
  23.      
  24.     private CountDownLatch connectedSemaphore = new CountDownLatch( 1 );
  25.     /**
  26.      * 创建ZK连接
  27.      * @param connectString  ZK服务器地址列表
  28.      * @param sessionTimeout   Session超时时间
  29.      */
  30.     public void createConnection( String connectString, int sessionTimeout ) {
  31.         this.releaseConnection();
  32.         try {
  33.             zk = new ZooKeeper( connectString, sessionTimeout, this );
  34.             connectedSemaphore.await();
  35.         } catch ( InterruptedException e ) {
  36.             System.out.println( "连接创建失败,发生 InterruptedException" );
  37.             e.printStackTrace();
  38.         } catch ( IOException e ) {
  39.             System.out.println( "连接创建失败,发生 IOException" );
  40.             e.printStackTrace();
  41.         }
  42.     }
  43.     /**
  44.      * 关闭ZK连接
  45.      */
  46.     public void releaseConnection() {
  47.         if ( !ObjectUtil.isBlank( this.zk ) ) {
  48.             try {
  49.                 this.zk.close();
  50.             } catch ( InterruptedException e ) {
  51.                 // ignore
  52.                 e.printStackTrace();
  53.             }
  54.         }
  55.     }
  56.     /**
  57.      *  创建节点
  58.      * @param path 节点path
  59.      * @param data 初始数据内容
  60.      * @return
  61.      */
  62.     public boolean createPath( String path, String data ) {
  63.         try {
  64.             System.out.println( "节点创建成功, Path: "
  65.                     + this.zk.create( path, //
  66.                                               data.getBytes(), //
  67.                                               Ids.OPEN_ACL_UNSAFE, //
  68.                                               CreateMode.EPHEMERAL )
  69.                     + ", content: " + data );
  70.         } catch ( KeeperException e ) {
  71.             System.out.println( "节点创建失败,发生KeeperException" );
  72.             e.printStackTrace();
  73.         } catch ( InterruptedException e ) {
  74.             System.out.println( "节点创建失败,发生 InterruptedException" );
  75.             e.printStackTrace();
  76.         }
  77.         return true;
  78.     }
  79.     /**
  80.      * 读取指定节点数据内容
  81.      * @param path 节点path
  82.      * @return
  83.      */
  84.     public String readData( String path ) {
  85.         try {
  86.             System.out.println( "获取数据成功,path:" + path );
  87.             return new String( this.zk.getData( path, false, null ) );
  88.         } catch ( KeeperException e ) {
  89.             System.out.println( "读取数据失败,发生KeeperException,path: " + path  );
  90.             e.printStackTrace();
  91.             return "";
  92.         } catch ( InterruptedException e ) {
  93.             System.out.println( "读取数据失败,发生 InterruptedException,path: " + path  );
  94.             e.printStackTrace();
  95.             return "";
  96.         }
  97.     }
  98.     /**
  99.      * 更新指定节点数据内容
  100.      * @param path 节点path
  101.      * @param data  数据内容
  102.      * @return
  103.      */
  104.     public boolean writeData( String path, String data ) {
  105.         try {
  106.             System.out.println( "更新数据成功,path:" + path + ", stat: " +
  107.                                                         this.zk.setData( path, data.getBytes(), -1 ) );
  108.         } catch ( KeeperException e ) {
  109.             System.out.println( "更新数据失败,发生KeeperException,path: " + path  );
  110.             e.printStackTrace();
  111.         } catch ( InterruptedException e ) {
  112.             System.out.println( "更新数据失败,发生 InterruptedException,path: " + path  );
  113.             e.printStackTrace();
  114.         }
  115.         return false;
  116.     }
  117.     /**
  118.      * 删除指定节点
  119.      * @param path 节点path
  120.      */
  121.     public void deleteNode( String path ) {
  122.         try {
  123.             this.zk.delete( path, -1 );
  124.             System.out.println( "删除节点成功,path:" + path );
  125.         } catch ( KeeperException e ) {
  126.             System.out.println( "删除节点失败,发生KeeperException,path: " + path  );
  127.             e.printStackTrace();
  128.         } catch ( InterruptedException e ) {
  129.             System.out.println( "删除节点失败,发生 InterruptedException,path: " + path  );
  130.             e.printStackTrace();
  131.         }
  132.     }
  133.     public static void main( String[] args ) {
  134.         JavaApiSample sample = new JavaApiSample();
  135.         sample.createConnection( CONNECTION_STRING, SESSION_TIMEOUT );
  136.         if ( sample.createPath( ZK_PATH, "我是节点初始内容" ) ) {
  137.             System.out.println();
  138.             System.out.println( "数据内容: " + sample.readData( ZK_PATH ) + "\n" );
  139.             sample.writeData( ZK_PATH, "更新后的数据" );
  140.             System.out.println( "数据内容: " + sample.readData( ZK_PATH ) + "\n" );
  141.             sample.deleteNode( ZK_PATH );
  142.         }
  143.         sample.releaseConnection();
  144.     }
  145.     /**
  146.      * 收到来自Server的Watcher通知后的处理。
  147.      */
  148.     @Override
  149.     public void process( WatchedEvent event ) {
  150.         System.out.println( "收到事件通知:" + event.getState() +"\n"  );
  151.         if ( KeeperState.SyncConnected == event.getState() ) {
  152.             connectedSemaphore.countDown();
  153.         }
  154.     }
  155. }
复制代码

输出结果:

  1. 收到事件通知:SyncConnected
  2. 节点创建成功, Path: /nileader, content: 我是节点初始内容
  3. 获取数据成功,path:/nileader
  4. 数据内容: 我是节点初始内容
  5. 更新数据成功,path:/nileader, stat: 42950186407,42950186408,1350820182392,1350820182406,1,0,0,232029990722229433,18,0,42950186407
  6. 获取数据成功,path:/nileader
  7. 数据内容: 更新后的数据
  8. 删除节点成功,path:/nileader
复制代码




http://nileader.blog.51cto.com/1381108/795265

已有(9)人评论

跳转到指定楼层
perfri 发表于 2014-4-29 22:44:39
这么好的帖子,为什么都没人顶呢
回复

使用道具 举报

星星星星笑 发表于 2014-9-18 22:38:15
定义个一个
回复

使用道具 举报

EASONLIU 发表于 2015-2-3 09:53:29
不错不错,先学着敲下
回复

使用道具 举报

ainubis 发表于 2015-3-29 06:44:05

学习了(*^__^*) 嘻嘻……
回复

使用道具 举报

zhangzh 发表于 2015-8-11 14:57:18
回复

使用道具 举报

ckwang17 发表于 2017-3-28 00:14:30
common.toolkit.java.util.ObjectUtil  这是什么包?
回复

使用道具 举报

spftoto 发表于 2018-11-3 11:14:13
简单的API哈
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条