问题导读:
1.什么是Spring XD?
2.Spring XD在Hadoop中做什么?
3.如何安装Spring XD?
4.Spring XD应用如何?
简介SpringXD(eXtreme Data,极限数据)是Pivotal的大数据产品。它结合了 Spring Boot和 Grails,组成Spring IO平台的执行部分。尽管Spring XD利用了大量现存的Spring项目,但它是一种运行时环境,而不是一个类库或者框架,它包含带有服务器的bin目录,你可以通过命令行启动并与之交互。运行时可以运行在开发机上、客户端自己的服务器上、AWS EC2上或者Cloud Foundry上。 SpringXD中的关键组件是管理和容器服务器(Admin and ContainerServers)。使用一种 DSL,你可以把所需处理任务的描述通过HTTP提交给管理服务器。然后管理服务器会把处理的任务映射到处理模块(每个模块都是一个执行单元,作为Spring应用程序上下文实现)中。 该产品具有两种操作模式:-single和multi-node。第一种是由单独的进程负责所有处理和管理的工作。这对于入门很有用,同样适合于应用程序的快速开发和测试。本文中的所有实例都被设计为在单一节点模式下工作。第二种是一种分布式模式。分布式集成运行时(DistributedIntegration Runtime,DIRT)会在多个节点之间分发处理的任务。除了可以拥有VM或者物理服务器作为这些节点之外,Spring XD还让你可以在 Hadoop YARN集群上运行。 XD管理服务器会把处理的任务切分成彼此独立的模块定义,并把每个模块分配给使用Apache ZooKeeper的容器实例。每个容器都会监听分配给它的模块定义,然后部署模块,创建Spring应用程序上下文来运行它。需要注意的是,在我撰写这篇文章的时候,Spring XD中还不会自带Zookeeper。兼容的版本是3.4.6,你可以从 这里下载。 模块通过使用配置好的消息中间件传递消息来共享数据。传输层是可插拔的,并且支持其他两种Pivotal项目—— Redis和 Rabbit MQ——以及现成可用的内存数据库。 用例下图让你可以对Spring XD有个总体上的了解。 Spring XD团队认为,对于创建大数据解决方案来说,创建的主要用例有四种:数据吸纳、实时分析、工作流调度以及导出。 数据吸纳提供了一种能力,可以从各种输入源接收数据,并把它传输给大数据存储库,像HDFS(Hadoop文件系统)、Splunk或者MPP数据库。和文件一样,数据源可能包括来自于移动设备、支持MQ遥感传输协议(MQTT)的传感器以及像Twitter之类的社交流的事件。 吸纳过程会贯穿事件驱动数据的处理,以及针对其他类型数据的批处理(MR、PIG、Hive、Cascading、SQL等等)。流和作业的两个世界截然不同,但是Spring XD试图使用 通道抽象(channel abstraction)来模糊二者之间的边界,从而让流可以触发批处理作业,而批处理作业也可以发送事件从而触发其他流。 对于流来说,会通过叫做“Taps”的抽象来支持某些实时分析,像获取指标和计数值。从概念上,Taps让你可以介入到流中,执行实时分析,并有选择地为外部系统生成数据,像GemFire、Redis或者其他内存数据网格。 一旦你在大数据仓库中拥有数据,那么就需要某种工作流工具来对处理进行调度。调度非常必要,因为你编写的脚本或者map-reduce作业通常会长时间运行,并采用带有多个步骤的事件链的方式。理想状况下,你需要在事件失败的时候,能够从特定的步骤重新启动,而不是完全从头来过。 最后还需要导出步骤,从而把数据放到更适合展现的系统中,可能还会做进一步的分析。例如从HDFS到RDBMS(关系型数据库管理系统),在那里你可以使用更为传统的商业智能工具。 SpringXD想要提供一种统一、分布式和可扩展的服务来满足这些用例。它没有从头开始,而是利用了大量已经存在的Spring技术。例如,它使用了Spring Batch来支持工作流调度和导出用例,使用Spring Integration来支持流处理,此外还使用了各种各样的企业应用程序集成模式。其他关键的Spring产品包括:使用Spring Data处理NoSQL/Hadoop工作,使用Reactor为编写异步程序提供简化的API,特别是在使用 LMAX Disruptor的时候。 安装Spring XD在接下来的部分,我们会详细看一下每个用例。你可能想要自己来试验一下这些例子。起步非常简单。 为了开始,你要确保系统至少安装了Java JDK 6或者更新的版本。我推荐使用Java JDK 7。 对于OSX用户,如果还没有Homebrew的话,请安装,然后运行: brew tap pivotal/tap brew install springxd
这会安装到 /usr/local/Cellar/springxd/1.0.0.M7/libexec (依赖于SpringXD的库)。 注意:如果你随后想要安装更新的版本,那么使用brewupgrade springXD就可以。 红帽或者CentOS的用户可以使用Yum来安装。 Windows用户可以下载最新的.zip文件,解压,安装到文件夹,然后把XD_HOME这个环境变量设置成安装文件夹。 你可以通过键入以下命令,从而在单一节点上启动Spring XD: xd-singlenode
键入以下命令来打开另一个终端窗口并启动shell程序: xd-shell
你会看到下面这样的情况: 为了检查它是否正常工作,让我们创建一个快速的流: stream create --definition "time | log" --name ticktock --deploy
在你启动Spring XD的控制台中,你会看到下面这样的显示: 你可以从shell中使用stream destroy命令删除流。 stream destroy --name ticktock
数据吸纳流在Spring XD中,基本的流会定义事件驱动数据的吸纳,从源到目的地,经过任意多个处理器。 SpringXD外壳程序支持针对流定义的一种DSL,其中带有管道和过滤器语法 - source |processor | sink。 例如,像这样的命令 stream create --name filetest --definition "file | log" --deploy会记录文件内容的日志。
除了能够处理文件之外,Spring XD还支持很多其他源,包括: HTTP命令 HTTP POST /streams/myStream"http | file --deploy" -表示“从HTTP消费我的流,并转到文件”。这会默认到9000端口。你可以使用--port选项覆盖默认的端口设置。这是针对HTTP的唯一参数。 例如(从XD的外壳程序): xd:> stream create --name infoqhttptest9010 --definition "http --port=9010 | file" --deploy
你可以向这个新端口提交一些数据来测试: xd:> http post --target http://localhost:9010 --data "hello world"
你会在控制台窗口看到以下文本: > POST (text/plain;Charset=UTF-8) http://localhost:9010 hello world > 200 OK
打开另一个终端窗口并键入: $ cd /tmp/xd/output $ tail -f infoqhttptest9010.out
你会在输出中看到“helloworld”。 想要发送二进制数据,你需要把Content-Type头部说明设置为application/octet-string: $ curl --data-binary @foo.zip -H'Content-Type: application-octet-string' http://localhost:9000
键入 streamdestroy infoqhttptest9010 来完成清理工作。 MailMail是用来接收email的源模块。根据所使用的协议,它可以以池的形式工作,或者在可用的时候就接收email。 例如: xd:> stream create --name infoqmailstream --definition "mail --host=imap.gmail.com --username=charles@c4media.com --password=secret --delete=false | file" --deploy
注意:这里的delete选项很重要,因为对于Spring XD来说一旦被消费,默认情况就会删除电子邮件。Spring XD也拥有markAsRead选项,但默认值是false。 Spring集成文档中对此做出了详细的说明,但主要问题是,POP3协议只知道在单独一个会话中读取了什么。作为POP3邮件适配器运行的结果,当邮件在每个池中变成可用状态时,就会被成功发送,且没有任何一个邮件消息会被多次发送。然而,当你重启适配器并开始新的会话时,所有位于上一个会话中已经获取过的邮件消息就可能会被再次获取。 如果你在控制台日志中看到这样的错误信息: WARN task-scheduler-1 org.springframework.integration.mail.ImapIdleChannelAdapter:230 - error occurred in idle task javax.mail.AuthenticationFailedException: failed to connect, no password specified?
试着在你的URL把@符号替换为URL编码的样子: %40: stream create --name infoqmailstream --definition "mail --host=imap.gmail.com --username=charles%40c4media.com --password=secret --delete=false | file" --deploy
打开另一个终端窗口并键入: $ cd /tmp/xd/output $ tail -f infoqmailstream.out
给你自己发送一封邮件,以看到它在日志文件中显示的内容。 Twitter搜索例如: xd:> stream create --name twittersearchinfoq --definition "twittersearch --outputType=application/json --fixedDelay=1000 --consumerKey=afes2uqo6JAuFljdJFhqA --consumerSecret=0top8crpmd1MXGEbbgzAwVJSAODMcbeAbhwHXLnsg --query='infoq' | file" --deploy
它使用twittersearch的JSON输出格式,每1000毫秒使用令牌“infoq”在Twitter中进行查询。为了运行上面的内容,你需要一个消费者密钥(由Twitter发放的应用程序消费者密钥)以及它相关的密钥。 它的结果会通过管道以同步的方式传输给一个文件,默认是/tmp/xd/output/[streamName].out。 打开另一个终端窗口并键入: $ cd /tmp/xd/output $ tail -f twittersearchjava.out
稍等一会儿,你会发现超出了TwitterAPE搜索的限制,并且会在控制台窗口中(你在其中在单一节点上启动了XD)看到这样的消息: 11:27:01,468 WARN task-scheduler-1 client.RestTemplate:581 - GET request for "https://api.twitter.com/1.1/search/tweets.json?q=infoq&count=20&since_id=478845525597237248" resulted in 429 (Client Error (429)); invoking error handler11:27:01,471 ERROR task-scheduler-1 handler.LoggingHandler:145 - org.springframework.social.RateLimitExceededException: The rate limit has been exceeded.
键入 streamdestroy twittersearchinfoq 来完成清理工作。 其他输入流GemFire:在XD容器进程中配置一个缓存(cache)和副本区域,它和Spring Integration GemFire同时存在于通道适配器中,它们由CacheListener支持,而后者会输出区域中外部输入事件所触发的输出消息。它还支持连续的查询,那让客户端应用程序可以使用对象查询语言(OQL)来创建GemFire查询,并注册一个CQ监听器,它会订阅查询,每次查询的结果集发生变化的时候都会得到通知。 Syslog:有三种syslog源:reactor-syslog、syslog-udp和syslog-tcp。reactor-syslog适配器使用tcp,会构建Reactor项目中可用的功能,并提供超过syslog-tcp适配器中更好的吞吐量。 TCP:它会作为服务器,让远程的组织能够连接到XD,并通过原生的TCP socket提交数据。 MQTT:连接到MQTT服务器并接收遥测消息。 Taps从概念上说,你会在通道中插入一个简单的接收列表,它会把每个进入的消息发布到主通道和次通道中。流并不知道它的管道中任何tap的存在。删除流并不会自动删除tap——它们需要单独删除。然而,如果加入了tap的流被重新创建,那么已经存在的tap会继续起作用。 tap可以在流的任意位置(或者多个位置)插入。 处理器流中的数据可以以多种方式处理: 过滤器:它可以用于决定消息是否应该发送给输出通道。最简单的情况是,过滤器只是一个 SpEL布尔表达式,它会返回真或假。例如: xd:> stream create --name filtertest --definition "http | filter --expression=payload=='good' | log" --deploy
会记录带有“good”关键字的所有内容的日志。然而,过滤器也可以相当复杂。Spring XD支持JSONPath计算式以及自定义的Groovy脚本。 转换:用来转换消息的内容或结构。它支持简单的SpEL,对于更复杂的转换,可以使用Groovy脚本。 分割器:和Spring集成中的分割器概念类似,这里的分割器会使用SpEL表达式,它会计算一个数组或者集合的值,从而把单独一条消息切分成多个独立的消息。你可以使用JSON oath表达式,但无法使用自定义的Groovy脚本。 聚合器(Aggregator):和分割器相反,它会把多条消息组合成一条。 最后是脚本,可以用于调用特定的Groovy脚本作为处理步骤。 槽(Sinks)最简单的槽是日志和文件。其他可以支持的槽包括Hadoop(HDFS)、JDBC、TCP、Mail、RabbitMQ、GemFire服务器、Splunk服务器和MQQT。还有一个动态路由选项,允许基于SpEL表达式或Groovy脚本的值,把SpringXD消息路由到命名通道中。让我有一点奇怪的是,在这里缺少一般目的的JMS槽,尽管我们可以像 这里描述的一样构建自定义的槽模块。 实时分析Spring XD为各种机器学习评分算法的实时计算提供了支持,还为使用各种类型的计数器和计量器进行实时数据分析提供了支持。分析功能是通过可以添加到流中的模块实现的。在那种情况下,实时分析是通过和数据吸纳一样的模块完成的。 尽管流的主要角色可以是执行实时分析,但更为常见的是添加一个tap来初始化另一个流,其中分析——例如:一个字段值的计数器——会应用给通过主要流吸纳的同样数据之上。 Spring XD中自带提供了一些简单的分析工具,它们都实现为抽象API,针对内存数据库和Redis而实现,如下: · 简单计数器 · 字段值计数器:计算特定字段出现的次数。 · 聚合计数器: 在Mongo和Redis之类的工具中比较常见,让你可以对数据根据时间——例如分钟、小时、月、年等——进行分片。 · 计量器(Gauge):最新的值 · 富计量器:最新的值,运行的平均值,最大、最小值 产品还包含了一些抽象,可以在流处理应用程序中事件分析模型。在撰写这篇文章的时候,只支持预测性模块标记语言(PredictiveModel Markup Language,PMML), 他们期望,随着时间的推移能够在这个领域看到发展,并且对预测性建模提供额外的支持。 批处理作业、工作流调度和导出除了流之外,Spring XD还包含了基于Spring Batch启动和监控批处理作业的功能,而Spring Batch也被用于支持工作流调度和导出用例。 工作流的概念会被转换成批处理作业,那可以被认为是各个步骤的有向图,每个图都是一个处理步骤: 根据配置的情况,步骤可以顺序或者并行执行。它们可以复制或者处理来自于文件、数据库、MapReduce、Pig、Hive或Cascading作业的数据,并且和允许重启的检查点一起持久化。和流一样,作业支持单节点,或者可以和数据分区一起分布。 SpringXD自身带有少量预定义的作业,可以用来向Hadoop文件系统HDFS导出数据,或者从中导入数据。这些作业覆盖了FTP到HDFS、HDFS到JDBC、HDFS到MongoDB和JDBC到HDFS。还有一个作业用于向JDBC导出文件。你可以在/libexec/xd/modules/job文件夹中找到。 SpringXD提供了相当基础的、基于浏览器的图形化界面,当前让你可以执行和任务相关的批处理作业。对于启动Spring XD,管理员界面在 这里提供: 正如在上面的截屏中可以看到的,管理员界面当前包括四个标签页: 1. 模块:列举了可用的批处理作业和更多细节(像作业模块选项以及模块的XML配置文件)。 2. 定义:列举了XD批处理作业定义,并提供了部署或者卸载那些作业的动作。 3. 部署:列举了所有部署了的作业,并提供了一种选项来启动部署好的作业。一旦作业已经部署,它就可以通过管理员界面启动。 4. 执行:列举了批处理作业的执行状况,并提供了一种选项,如果批处理作业可以重启,并且处于停止或者失败状态,那么就重启。 结论 Spring XD当前还处于开发中。第一个里程碑版本已经在2013年六月发布,而GA版本期望在今年(2014年)七月发布。它基于Apache第二版许可。在GitHub上提供了 源代码和 示例。你还可以找到在线的 Sonar代码度量。 产品可能还很新,但正如我们看到的,它构建在成熟的基础之上——Spring Batch、Spring Integration和Sping Data,以及Reactor项目、LMAX Disruptor和Apache Hadoop——并提供了一种轻量级的运行时环境,可以通过DSL来配置和集成,只需要很少代码,甚至不需要。Spring XD为开发者提供了一种便利的方式,可以开始构建大数据应用程序,为构建和部署这样的应用程序提供了“一站式服务”。 转自:Charles Humble
|