分享

Thrift简单原理及使用(JAVA)

本帖最后由 howtodown 于 2014-3-16 20:36 编辑

    thrift是一个软件框架,用来进行可扩展且跨语言的服务的开发。它结合了功能强大的软件堆栈和代码生成引擎,以构建在 C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml 这些编程语言间无缝结合的、高效的服务。    thrift最初由facebook开发,07年四月开放源码,08年5月进入apache孵化器。
    目前thrift在国内很多互联网公司已经开始用,thrift开发比较方便,并且支持语言多,你定义一个简单的定义文件中的数据类型和服务接口,以作为输入文件,编译器生成代码用来方便地生成RPC客户端和服务器通信的无缝跨编程语言。


下面是thrift传输过程图:

4d086e061d950a7b386f9bee08d162d9f3d3572c11dfd17e.jpg
一. 定义service.thrift数据类型和服务接口
  1. struct User{
  2.   1:i64 id,
  3.   2:string name,
  4.   3:i64 timestamp,
  5.   4:bool vip        
  6. }
  7. service UserService{
  8.   User getById(1:i64 id)
  9. }
复制代码

二. 生成API文件
        首先下载和安装thrift客户端,比如在windows平台下,下载thrift.exe,不过此处需要提醒,不同的thrift客户端版本生成的API可能不兼容.本例使用thrift-0.9.0.exe;通过"--gen"指定生成API所适配的语言.本实例为生成java客户端API.
  1. > thrift.exe --gen java -o service service.thrift
复制代码

三. UserService实现类
  1. public class UserServiceImpl implements UserService.Iface {
  2.   @Override
  3.   public User getById(long id){
  4.     System.out.println("invoke...id:" + id);
  5.     return new User();//for test
  6.   }
  7. }
复制代码

四.原理分析

1. User.java : thrift生成API的能力还是非常的有限,比如在struct中只能使用简单的数据类型(不支持Date,Collection<?>等),不过我们能从User中看出,它生成的类实现了"Serializable"接口和"TBase"接口.

    其中Serializable接口表明这个类的实例是需要序列化之后在网络中传输的,为了不干扰JAVA本身的序列化和反序列化机制,它还重写了readObject和writeObject方法.不过这对thrift本身并没有帮助.

    TBase接口是thrift序列化和反序列化时使用的,它的两个核心方法:read和write.在上述的thrift文件中,struct定义的每个属性都有一个序号,比如:1:id,那么thrift在序列化时,将会根据序号的顺序依次将属性的"名称 + 值"写入inputStream中,反序列化也是如此.(具体参见read和write的实现).

    因为thrift的序列化和反序列化实例数据时,是根据"属性序号"进行,这可以保证数据在inputstream和outputstream中顺序是严格的.这一点也要求API开发者,如果更改了thrift文件中的struct定义,需要重新生成客户端API,否则服务将无法继续使用(可能报错,也可能数据错误).thrift序列化/反序列化的过程和JAVA自带的序列化机制不同,它将不会携带额外的class结构,此外thrift这种序列化机制更加适合网络传输,而且性能更加高效.

    2. UserService.Client:   在生成的UserService中,有个Client静态类,这个类就是一个典型的代理类,此类已经实现了UserService的所有方法.开发者需要使用Client类中的API方法与Thrift server端交互,它将负责与Thrift server的Socket链接中,发送请求和接收响应.

    需要注意的时,每次Client方法调用,都会在一个Socket链接中进行,这就意味着,在使用Client消费服务之前,需要和Thrift server建立有效的TCP链接.(稍后代码示例)

1) 发送请求:

  1. //参见:TServiceClient
  2. //API方法调用时,发送请求数据流
  3. protected void sendBase(String methodName, TBase args) throws TException {
  4.   oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL, ++seqid_));//首先写入"方法名称"和"seqid_"
  5.   args.write(oprot_);//序列化参数
  6.   oprot_.writeMessageEnd();
  7.   oprot_.getTransport().flush();
  8. }
  9. protected void receiveBase(TBase result, String methodName) throws TException {
  10.   TMessage msg = iprot_.readMessageBegin();//如果执行有异常
  11.   if (msg.type == TMessageType.EXCEPTION) {
  12.     TApplicationException x = TApplicationException.read(iprot_);
  13.     iprot_.readMessageEnd();
  14.     throw x;
  15.   }//检测seqid是否一致
  16.   if (msg.seqid != seqid_) {
  17.     throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName + " failed: out of sequence response");
  18.   }
  19.   result.read(iprot_);//反序列化
  20.   iprot_.readMessageEnd();
  21. }
复制代码
Thrift提供了简单的容错方式:每次方法调用,都会在Client端标记一个seqid,这是一个自增的本地ID,在TCP请求时将此seqid追加到流中,同时Server端响应时,也将此seqid原样返回过来;这样客户端就可以根据此值用来判断"请求--响应"是对应的,如果出现乱序,将会导致此请求以异常的方式结束.

  2) 响应

  1. //参考: TBaseProcessor.java
  2. @Override
  3. public boolean process(TProtocol in, TProtocol out) throws TException {
  4.   TMessage msg = in.readMessageBegin();
  5.   ProcessFunction fn = processMap.get(msg.name);
  6.   if (fn == null) {
  7.     TProtocolUtil.skip(in, TType.STRUCT);
  8.     in.readMessageEnd();
  9.     TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
  10.     out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
  11.     x.write(out);//序列化响应结果,直接输出
  12.     out.writeMessageEnd();
  13.     out.getTransport().flush();
  14.     return true;
  15.   }
  16.   fn.process(msg.seqid, in, out, iface);
  17.   return true;
  18. }
复制代码
3) Server端Socket管理和执行策略
  1. TThreadPoolServer
  2. public void serve() {
  3.   try {
  4.     //启动服务
  5.     serverTransport_.listen();
  6.   } catch (TTransportException ttx) {
  7.     LOGGER.error("Error occurred during listening.", ttx);
  8.     return;
  9.   }
  10.   // Run the preServe event
  11.   if (eventHandler_ != null) {
  12.     eventHandler_.preServe();
  13.   }
  14.   stopped_ = false;
  15.   setServing(true);
  16.   //循环,直到被关闭
  17.   while (!stopped_) {
  18.     int failureCount = 0;
  19.     try {
  20.     //accept客户端Socket链接,
  21.     //对于每个新链接,将会封装成runnable,并提交给线程或者线程池中运行.
  22.     TTransport client = serverTransport_.accept();
  23.     WorkerProcess wp = new WorkerProcess(client);
  24.     executorService_.execute(wp);
  25.     } catch (TTransportException ttx) {
  26.     if (!stopped_) {
  27.       ++failureCount;
  28.       LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
  29.     }
  30.     }
  31.   }
  32.   //....
  33. }
复制代码
Thrift Server端,设计思路也非常的直接...当前Service server启动之后,将会以阻塞的方式侦听Socket链接(代码参考TThreadPoolServer),每建立一个Socket链接,都会将此Socket经过封装之后,放入线程池中,本质上也是一个Socket链接对应一个Worker Thread.这个Thread只会处理此Socket中的所有数据请求,直到Socket关闭.
  1. //参考:WorkerProcess
  2. while (true) {
  3.   if (eventHandler != null) {
  4.     eventHandler.processContext(connectionContext, inputTransport, outputTransport);
  5.   }
  6.   if(stopped_ || !processor.process(inputProtocol, outputProtocol)) {
  7.     break;
  8.   }
  9. }
复制代码
当有Socket链接不是很多的时候,TThreadPoolServer并不会有太大的性能问题,可以通过指定ThreadPool中线程的个数进行简单的调优..如果Socket链接很多,我们只能使用TThreadedSelectorServer来做支撑,TThreadedSelectorServer内部基于NIO + Socket模式,具有异步的特性;不过在绝大多数情况下,在thrift中使用"异步"似乎不太容易让人接受,毕竟这意味着Client端需要阻塞,并且这个阻塞时间是不可控的.单SelecorServer确实可以有效的提升Server的并发能力.
3. Client端代码示例
  1. public class UserServiceClient {
  2.     public void startClient() {
  3.         TTransport transport;
  4.         try {
  5.             transport = new TSocket("localhost", 1234);
  6.             TProtocol protocol = new TBinaryProtocol(transport);
  7.             UserService.Client client = new UserService.Client(protocol);
  8.             transport.open();
  9.             User user = client.getById(1000);
  10.       ////
  11.             transport.close();
  12.         } catch (TTransportException e) {
  13.             e.printStackTrace();
  14.         } catch (TException e) {
  15.             e.printStackTrace();
  16.         }
  17.     }
  18. }
复制代码
    4. Server端代码示例
  1. public class Server {
  2.     public void startServer() {
  3.         try {
  4.             TServerSocket serverTransport = new TServerSocket(1234);
  5.             UserService.Processor process = new Processor(new UserServiceImpl());
  6.             Factory portFactory = new TBinaryProtocol.Factory(true, true);
  7.             Args args = new Args(serverTransport);
  8.             args.processor(process);
  9.             args.protocolFactory(portFactory);
  10.             TServer server = new TThreadPoolServer(args);
  11.             server.serve();
  12.         } catch (TTransportException e) {
  13.             e.printStackTrace();
  14.         }
  15.     }
  16. }
复制代码
由于thrift对java socket封装一层为TSOCKET,大家在使用thrift客户端连接的时候,一定要用连接池方式,我后面也会将我以前写过的连接池分享给大家。thrift将传输数据封装成thrift对象传输所以在传输过程发送数据字节数要比socket传输大。一般thrift性能体现在使用多线程发送。




欢迎加入about云群9037177932227315139327136 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条