分享

Storm 多语言支持之ShellBolt原理及改进

xioaxu790 发表于 2014-8-10 15:03:01 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 15489
问题导读
1、ShellBolt原理是什么?
2、ShellBolt的改进空间有哪些?




Storm是一个实时分布式流处理框架,现在正被越来越多的人使用。众所周知,Storm是一个Java平台,这就给我们的使用带来了一个问题:我们在实际工作中很少从0开始,往往是在一些已有的基础执行进行开发,而如果我们已有的基础程序不是Java平台而是C/C++,python等,如何将其移植到Storm中运行呢?

为了解决这个问题,Storm本身提出了ShellBolt,用于支持由不同语言便编写的程序在Storm平台中运行。


1.ShellBolt原理
Storm中以Topology作为运行的基本单位。而Topology又是由Spout和Bolt组成,实际上Spout是数据接入者,而Bolt才是Topology中数据的真正处理者。

于是我们只需要能将程序封装为一个Topology中Bolt的就可以了。而Storm提出的ShellBolt就完成了该功能,ShellBolt本质上是一个壳子程序,他允许开发者将自己的程序(任意的程序)封装成一个ShellBolt,从而加入到Topology中运行。是不是很神奇呢?

下面我们就来看一下ShellBolt的原理。


1)ShellBolt本质上是一个Bolt;

2)ShellBolt中接收Shell命令,根据Shell命令调用创建一个ShellProcess。并分别启动两个线程,向该ShellProcess发送消息和读取消息;

3)再进一步,ShellProcess实际上又是调用了ProcessBuild创建了一个Process,并通过该Process的InputStream,OutputStream,ErrorStream和该Process进行交互。
到这里,事情似乎变得清晰了:
ShellBolt本质上就是通过Shell命令,启动新的进程,并通过该进程的stdIn,stdOut,stdErr和其进行交互。
方法看似简单,其实蕴含了至理“简单的才是最好的”。

2.ShellBolt可能存在的问题
上面说到ShellBolt通过通过Shell命令,启动新的进程,并通过该进程的stdIn,stdOut,stdErr和其进行交互。这样做可能可回导致一下三个问题:

1)潜藏风险

   交互完全是读写进程的StdIn,StdOut;这就对编程者提出了要求,不能私自向stdIn和StdOUt上输出东西,也就是说在程序中绝对不能有printf,scanf,cout,cin,System.Out,System.in之类的。

  这一点,对于小规模程序或者完全新开发的程序可以进行约定。可是对于打响程序,或者基于已有程序进行重构的时候就变得不靠谱了,只有上帝才知道有没有人偷偷的写了stdin或stdout,这必然会导致程序崩溃。而且你根本无从查起。

2)效率低
  还是交互,通过读写进程的StdIn,StdOut交互,所有数据必须是文本的,Storm中通过Json编码实现。而编码解码,写Stdio,Stdout,这种交互方式的效率无疑是比较低的。

3)僵尸进程

  ShellBolt根据Shell命令,启动新的进程,而目前还没有很好的方法保证它会杀死所有他启动的进程。本人在使用的过程中就经常发现一个topology停止后,后台经常会驻留有还没有死掉的进程。

4)占用资源

  ShellBolt根据Shell命令,启动新的进程。也即是说在task较多时,它会启动很多进程,比较占用资源。当然这个不能说是缺点,因为进程是独立空间,当你的程序需要的资源比较多时,启动单独的进程是很好的选择。


3.ShellBolt的改进空间
针对上面分析的问题,有以下几点改进空间:

1)不再是使用stdio,stdout通讯,而是使用其它通讯方式进行进程间的通讯,如pipe,共享内存,socket等。由于Java本身的限制(没有进程通讯的管道和共享内存),所以最后是使用socket通讯。这样不但避免了使用stdio和stdout时的风险,而且不再需要Json编码,从而可以提交效率。

     大概的方法就是:

     改写ShellProcess,启动一个ServerSocket,并监听端口。通过Shell命令将该端口传递给子Process,子Process通过该端口和ShellProcess进行通讯。

    貌似挺复杂的,因此作者也没有花时间去实现。而且不同的客户端都得去连接该socket,解析消息协议。

    这就说明了:虽然是一个小小的改动,但代价是巨大的!

2)可以通过JNI封装与C/C++进行通讯,这个效率最高,实现也比较简单。但应用场景有限,只能适用于C++。不过他刚好能满足作者的需求。

   实现方法和 1)中类似,改写ShellProcess,其中实现一个JNIInvoker,通过JNIInvoker调用C++,写消息,读消息。

   相比1)中方法效率更高,实现难度小,但只适用于C++。
3)记录ShellBolt启动的进程,在程序结束时,kill掉。


4.最后附上ShellBolt和ShellProcess代码
ShellBolt.Java

  1. package backtype.storm.task;  
  2.   
  3. import backtype.storm.generated.ShellComponent;  
  4. import backtype.storm.tuple.MessageId;  
  5. import backtype.storm.tuple.Tuple;  
  6. import backtype.storm.utils.Utils;  
  7. import backtype.storm.utils.ShellProcess;  
  8. import java.io.IOException;  
  9. import java.util.ArrayList;  
  10. import java.util.Arrays;  
  11. import java.util.concurrent.ConcurrentHashMap;  
  12. import java.util.concurrent.LinkedBlockingQueue;  
  13. import static java.util.concurrent.TimeUnit.SECONDS;  
  14. import java.util.List;  
  15. import java.util.Map;  
  16. import java.util.Random;  
  17. import org.apache.log4j.Logger;  
  18. import org.json.simple.JSONObject;  
  19.   
  20. /**
  21. * A bolt that shells out to another process to process tuples. ShellBolt
  22. * communicates with that process over stdio using a special protocol. An ~100
  23. * line library is required to implement that protocol, and adapter libraries
  24. * currently exist for Ruby and Python.
  25. *
  26. * <p>To run a ShellBolt on a cluster, the scripts that are shelled out to must be
  27. * in the resources directory within the jar submitted to the master.
  28. * During development/testing on a local machine, that resources directory just
  29. * needs to be on the classpath.</p>
  30. *
  31. * <p>When creating topologies using the Java API, subclass this bolt and implement
  32. * the IRichBolt interface to create components for the topology that use other languages. For example:
  33. * </p>
  34. *
  35. * <pre>
  36. * public class MyBolt extends ShellBolt implements IRichBolt {
  37. *      public MyBolt() {
  38. *          super("python", "mybolt.py");
  39. *      }
  40. *
  41. *      public void declareOutputFields(OutputFieldsDeclarer declarer) {
  42. *          declarer.declare(new Fields("field1", "field2"));
  43. *      }
  44. * }
  45. * </pre>
  46. */  
  47. public class ShellBolt implements IBolt {  
  48.     public static Logger LOG = Logger.getLogger(ShellBolt.class);  
  49.     Process _subprocess;  
  50.     OutputCollector _collector;  
  51.     Map<String, Tuple> _inputs = new ConcurrentHashMap<String, Tuple>();  
  52.   
  53.     private String[] _command;  
  54.     private ShellProcess _process;  
  55.     private volatile boolean _running = true;  
  56.     private volatile Throwable _exception;  
  57.     private LinkedBlockingQueue _pendingWrites = new LinkedBlockingQueue();  
  58.     private Random _rand;  
  59.       
  60.     private Thread _readerThread;  
  61.     private Thread _writerThread;  
  62.   
  63.     public ShellBolt(ShellComponent component) {  
  64.         this(component.get_execution_command(), component.get_script());  
  65.     }  
  66.   
  67.     public ShellBolt(String... command) {  
  68.         _command = command;  
  69.     }  
  70.   
  71.     public void prepare(Map stormConf, TopologyContext context,  
  72.                         final OutputCollector collector) {  
  73.         _rand = new Random();  
  74.         _process = new ShellProcess(_command);  
  75.         _collector = collector;  
  76.   
  77.         try {  
  78.             //subprocesses must send their pid first thing  
  79.             Number subpid = _process.launch(stormConf, context);  
  80.             LOG.info("Launched subprocess with pid " + subpid);  
  81.         } catch (IOException e) {  
  82.             throw new RuntimeException("Error when launching multilang subprocess\n" + _process.getErrorsString(), e);  
  83.         }  
  84.   
  85.         // reader  
  86.         _readerThread = new Thread(new Runnable() {  
  87.             public void run() {  
  88.                 while (_running) {  
  89.                     try {  
  90.                         JSONObject action = _process.readMessage();  
  91.                         if (action == null) {  
  92.                             // ignore sync  
  93.                         }  
  94.   
  95.                         String command = (String) action.get("command");  
  96.                         if(command.equals("ack")) {  
  97.                             handleAck(action);  
  98.                         } else if (command.equals("fail")) {  
  99.                             handleFail(action);  
  100.                         } else if (command.equals("error")) {  
  101.                             handleError(action);  
  102.                         } else if (command.equals("log")) {  
  103.                             String msg = (String) action.get("msg");  
  104.                             LOG.info("Shell msg: " + msg);  
  105.                         } else if (command.equals("emit")) {  
  106.                             handleEmit(action);  
  107.                         }  
  108.                     } catch (InterruptedException e) {  
  109.                     } catch (Throwable t) {  
  110.                         die(t);  
  111.                     }  
  112.                 }  
  113.             }  
  114.         });  
  115.          
  116.         _readerThread.start();  
  117.   
  118.         _writerThread = new Thread(new Runnable() {  
  119.             public void run() {  
  120.                 while (_running) {  
  121.                     try {  
  122.                         Object write = _pendingWrites.poll(1, SECONDS);  
  123.                         if (write != null) {  
  124.                             _process.writeMessage(write);  
  125.                         }  
  126.                     } catch (InterruptedException e) {  
  127.                     } catch (Throwable t) {  
  128.                         die(t);  
  129.                     }  
  130.                 }  
  131.             }  
  132.         });  
  133.          
  134.         _writerThread.start();  
  135.     }  
  136.   
  137.     public void execute(Tuple input) {  
  138.         if (_exception != null) {  
  139.             throw new RuntimeException(_exception);  
  140.         }  
  141.   
  142.         //just need an id  
  143.         String genId = Long.toString(_rand.nextLong());  
  144.         _inputs.put(genId, input);  
  145.         try {  
  146.             JSONObject obj = new JSONObject();  
  147.             obj.put("id", genId);  
  148.             obj.put("comp", input.getSourceComponent());  
  149.             obj.put("stream", input.getSourceStreamId());  
  150.             obj.put("task", input.getSourceTask());  
  151.             obj.put("tuple", input.getValues());  
  152.             _pendingWrites.put(obj);  
  153.         } catch(InterruptedException e) {  
  154.             throw new RuntimeException("Error during multilang processing", e);  
  155.         }  
  156.     }  
  157.   
  158.     public void cleanup() {  
  159.         _running = false;  
  160.         _process.destroy();  
  161.         _inputs.clear();  
  162.     }  
  163.   
  164.     private void handleAck(Map action) {  
  165.         String id = (String) action.get("id");  
  166.         Tuple acked = _inputs.remove(id);  
  167.         if(acked==null) {  
  168.             throw new RuntimeException("Acked a non-existent or already acked/failed id: " + id);  
  169.         }  
  170.         _collector.ack(acked);  
  171.     }  
  172.   
  173.     private void handleFail(Map action) {  
  174.         String id = (String) action.get("id");  
  175.         Tuple failed = _inputs.remove(id);  
  176.         if(failed==null) {  
  177.             throw new RuntimeException("Failed a non-existent or already acked/failed id: " + id);  
  178.         }  
  179.         _collector.fail(failed);  
  180.     }  
  181.   
  182.     private void handleError(Map action) {  
  183.         String msg = (String) action.get("msg");  
  184.         _collector.reportError(new Exception("Shell Process Exception: " + msg));  
  185.     }  
  186.   
  187.     private void handleEmit(Map action) throws InterruptedException {  
  188.         String stream = (String) action.get("stream");  
  189.         if(stream==null) stream = Utils.DEFAULT_STREAM_ID;  
  190.         Long task = (Long) action.get("task");  
  191.         List<Object> tuple = (List) action.get("tuple");  
  192.         List<Tuple> anchors = new ArrayList<Tuple>();  
  193.         Object anchorObj = action.get("anchors");  
  194.         if(anchorObj!=null) {  
  195.             if(anchorObj instanceof String) {  
  196.                 anchorObj = Arrays.asList(anchorObj);  
  197.             }  
  198.             for(Object o: (List) anchorObj) {  
  199.                 Tuple t = _inputs.get((String) o);  
  200.                 if (t == null) {  
  201.                     throw new RuntimeException("Anchored onto " + o + " after ack/fail");  
  202.                 }  
  203.                 anchors.add(t);  
  204.             }  
  205.         }  
  206.         if(task==null) {  
  207.             List<Integer> outtasks = _collector.emit(stream, anchors, tuple);  
  208.             Object need_task_ids = action.get("need_task_ids");  
  209.             if (need_task_ids == null || ((Boolean) need_task_ids).booleanValue()) {  
  210.                 _pendingWrites.put(outtasks);  
  211.             }  
  212.         } else {  
  213.             _collector.emitDirect((int)task.longValue(), stream, anchors, tuple);  
  214.         }  
  215.     }  
  216.   
  217.     private void die(Throwable exception) {  
  218.         _exception = exception;  
  219.     }  
  220. }  
复制代码



ShellProcess.java
  1. package backtype.storm.utils;  
  2.   
  3. import backtype.storm.task.TopologyContext;  
  4. import java.io.BufferedReader;  
  5. import java.io.InputStream;  
  6. import java.io.InputStreamReader;  
  7. import java.io.DataOutputStream;  
  8. import java.io.File;  
  9. import java.io.IOException;  
  10. import java.util.Map;  
  11. import java.util.List;  
  12.   
  13. import org.apache.commons.io.IOUtils;  
  14. import org.json.simple.JSONObject;  
  15. import org.json.simple.JSONValue;  
  16.   
  17. public class ShellProcess {  
  18.     private DataOutputStream processIn;  
  19.     private BufferedReader processOut;  
  20.     private InputStream processErrorStream;  
  21.     private Process _subprocess;  
  22.     private String[] command;  
  23.   
  24.     public ShellProcess(String[] command) {  
  25.         this.command = command;  
  26.     }  
  27.   
  28.     public Number launch(Map conf, TopologyContext context) throws IOException {  
  29.         ProcessBuilder builder = new ProcessBuilder(command);  
  30.         builder.directory(new File(context.getCodeDir()));  
  31.         _subprocess = builder.start();  
  32.   
  33.         processIn = new DataOutputStream(_subprocess.getOutputStream());  
  34.         processOut = new BufferedReader(new InputStreamReader(_subprocess.getInputStream()));  
  35.         processErrorStream = _subprocess.getErrorStream();  
  36.   
  37.         JSONObject setupInfo = new JSONObject();  
  38.         setupInfo.put("pidDir", context.getPIDDir());  
  39.         setupInfo.put("conf", conf);  
  40.         setupInfo.put("context", context);  
  41.         writeMessage(setupInfo);  
  42.   
  43.         return (Number)readMessage().get("pid");  
  44.     }  
  45.   
  46.     public void destroy() {  
  47.         _subprocess.destroy();  
  48.     }  
  49.   
  50.     public void writeMessage(Object msg) throws IOException {  
  51.         writeString(JSONValue.toJSONString(msg));  
  52.     }  
  53.   
  54.     private void writeString(String str) throws IOException {  
  55.         byte[] strBytes = str.getBytes("UTF-8");  
  56.         processIn.write(strBytes, 0, strBytes.length);  
  57.         processIn.writeBytes("\nend\n");  
  58.         processIn.flush();  
  59.     }  
  60.   
  61.     public JSONObject readMessage() throws IOException {  
  62.         String string = readString();  
  63.         JSONObject msg = (JSONObject)JSONValue.parse(string);  
  64.         if (msg != null) {  
  65.             return msg;  
  66.         } else {  
  67.             throw new IOException("unable to parse: " + string);  
  68.         }  
  69.     }  
  70.   
  71.     public String getErrorsString() {  
  72.         if(processErrorStream!=null) {  
  73.             try {  
  74.                 return IOUtils.toString(processErrorStream);  
  75.             } catch(IOException e) {  
  76.                 return "(Unable to capture error stream)";  
  77.             }  
  78.         } else {  
  79.             return "";  
  80.         }  
  81.     }  
  82.   
  83.     private String readString() throws IOException {  
  84.         StringBuilder line = new StringBuilder();  
  85.   
  86.         //synchronized (processOut) {  
  87.             while (true) {  
  88.                 String subline = processOut.readLine();  
  89.                 if(subline==null) {  
  90.                     StringBuilder errorMessage = new StringBuilder();  
  91.                     errorMessage.append("Pipe to subprocess seems to be broken!");  
  92.                     if (line.length() == 0) {  
  93.                         errorMessage.append(" No output read.\n");  
  94.                     }  
  95.                     else {  
  96.                         errorMessage.append(" Currently read output: " + line.toString() + "\n");  
  97.                     }  
  98.                     errorMessage.append("Shell Process Exception:\n");  
  99.                     errorMessage.append(getErrorsString() + "\n");  
  100.                     throw new RuntimeException(errorMessage.toString());  
  101.                 }  
  102.                 if(subline.equals("end")) {  
  103.                     break;  
  104.                 }  
  105.                 if(line.length()!=0) {  
  106.                     line.append("\n");  
  107.                 }  
  108.                 line.append(subline);  
  109.             }  
  110.             //}  
  111.   
  112.         return line.toString();  
  113.     }  
  114. }  
复制代码






本文转自:http://blog.csdn.net/jmppok/article/details/17752221


没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条