分享

如何通过mapreduce 分析各种类型(word,pdf等)数字的文档

fc013 发表于 2016-11-13 19:23:07 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 3 12202


问题导读:

1.什么是Jaql?
2.什么是Tika?
3.怎样创建并使用Jaql模块?





本文将介绍如何使用 IBM InfoSphere BigInsights 分析各种类型的大量文档。对于接收不同格式的数据(例如法律文档、电子邮件和科学文章)的行业,InfoSphere BigInsights 可提供复杂的文本分析功能来帮助进行情绪预测、欺诈检测和其他高级数据分析。

学习如何集成 Apache Tika(一种可以提取文档的文本内容的开源库)与 InfoSphere BigInsights(构建于 Hadoop 平台之上,可扩展到数千个节点来分析数十亿个文档)。通常,Hadoop 用于处理大型文件,所以本文将介绍如何在大量小文档上高效地运行作业。您可以使用这里的步骤在 Jaql 创建一个模块来实现该集成。Jaql 是 Hadoop 中的一种处理数据的灵活语言。

基本上讲,Jaql 是 MapReduce 上的一层,支持轻松地分析和操作 Hadoop 中的数据。将 Jaql 模块与 Tika 相结合,使得在一个步骤中读取各种文档和使用 InfoSphere BigInsights 的分析功能(比如文本分析和数据挖掘)变得很容易,无需深厚的编程经验。

本文假设您对 Java™ 编程语言、Hadoop、MapReduce 和 Jaql 有一定的理解。有关这些技术的详细信息不属于本文的讨论范围,本文只重点介绍必须更新来容纳自定义代码的代码节。下载 本文中使用的示例数据。

概述:InfoSphere BigInsights、Tika、Jaql 和 MapReduce 类
InfoSphere BigInsights 构建于 Apache Hadoop 之上,并通过企业特性、分析功能和管理特性对 Apache Hadoop 进行了增强。Apache Hadoop 是一个开源项目,它使用商用服务器集群来实现对大量数据的处理。它可以从一个节点扩展到数千个节点,而且具有容错功能。Hadoop 可被视为一个笼统的词汇。它包含两个主要组件:
  • 分布式文件系统 (HDFS) 存储数据
  • MapReduce 框架处理数据
MapReduce 是一种编程范例,支持跨整个 Hadoop 集群的并行处理和大规模扩展能力。Hadoop 中的数据首先被分解为更小的部分,比如数据块,并分布在集群之上。然后,MapReduce 能够以并行方式分析这些数据块。

Apache Tika
Apache Tika 工具包是一个免费的开源项目,用于从各种类型的数字文档(比如 Word 文档、PDF 文件或富文本格式的文件)读取和提取文本和其他元数据。要查看 API 工作原理的基本示例,可以了解创建 Tika 类的一个实例并使用该实例打开一个流。

清单 1. Tika 示例
[mw_shl_code=java,true]import org.apache.tika.Tika;
...
private String read()
{
        Tika tika = new Tika();
        FileInputStream stream = new FileInputStream("/path_to_input_file.PDF");
        String output = tika.parseToString(stream);
        return output;
}[/mw_shl_code]

如果您的文档格式不受 Tika 支持(例如 Outlook PST 文件就不受支持),那么您可以在前面的代码清单中将它替换为一个不同的 Java 库。Tika 提供了提取元数据的能力,但这不属于本文的讨论范围。向代码中添加该功能相对比较简单。

Jaql
Jaql 主要是一种 JSON 查询语言,但它支持的不仅仅是 JSON。它支持您处理结构化和非传统的数据。使用 Jaql,您可以采用一种类似混合使用 Pig 和 Hive 的方式,选择、联结、分组和过滤 HDFS 中存储的数据。Jaql 查询语言的灵感来源于许多编程和查询语言,包括 Lisp、SQL、XQuery 和 Pig。Jaql 是一个函数式的声明性查询语言,专为处理大型数据集而设计。在并行性方面,Jaql 会在适当的时候将高级查询重写为包含 Java MapReduce 作业的低级查询。本文将演示如何在 Apache Tika 之上创建一个 Jaql I/O 适配器,以便读取各种文档格式,并在这一种语言中分析和转换它们。

用于分析小文件的 MapReduce 类
通常,MapReduce 将会处理存储在 HDFS 上的大型文件。在写入 HDFS 时,会根据 Hadoop 集群的配置将文件分解成较小的部分(数据块)。这些数据块位于这个分布式文件系统上。但是,如果需要使用 Hadoop 高效地处理大量小文件(具体地讲,是 PDF 或 RTF 文件等二进制文件),应该怎么办?

有多种途径可供选择。在许多情况下,可以通过创建一个序列文件 将小文件合并到一个大文件中,序列文件是针对 Hadoop 的原生存储格式。但是,在一个线程中创建序列文件可能会遇到瓶颈,您会面临着丢失原始文件的风险。本文提供了一种操作 MapReduce 中使用的一些 Java 类的不同方法。传统的类要求每个文件都有一个专用的映射器。但在有许多小文件时,这个过程会导致效率低下。

作为传统类的替代方案,可以在 Hadoop 中处理小文件,创建一组自定义类来告知任务,这些文件很小,可以采用与传统方法不同的方式来对待它们。

在映射阶段,定义了称为碎片 (split) 的逻辑容器,而且在每个碎片上放置了一个映射处理任务。可以使用自定义类定义一个固定大小的碎片,在其中填入它可以容纳的任意多个小文件。碎片装满后,作业会创建一个新碎片并进行填充,直到该碎片被装满。然后,每个碎片会被分配给一个映射器。

读取文件的 MapReduce 类
在一个 MapReduce 作业期间,可以使用 3 个主要的 MapReduce Java 类来定义碎片并读取数据:InputSplit、InputFormat 和 RecordReader。

将文件从一个本地文件系统传输到 HDFS 时,该文件会被转换为 128 MB 大小的数据块。(可以在 InfoSphere BigInsights 中更改这个默认值。)设想一个大到可占用 10 个数据块的文件。从 HDFS 读取该文件作为 MapReduce 作业的输入时,这些数据块通常会逐个映射到碎片。在这种情况下,该文件被拆分为 10 个碎片(这意味着有 10 个任务)来处理。默认情况下,数据块大小和碎片大小相同,但这些大小取决于 InputSplit 类的配置设置。

从 Java 编程角度讲,负责此转换的类称为 InputFormat,它是从 HDFS 读取数据的主要入口点。根据文件的数据块,它创建了一个InputSplits 列表。对于每个碎片,都会创建一个映射器。然后,使用 RecordReader 类将每个 InputSplit 分解为记录。每个记录表示一个键-值对。

对比FileInputFormatCombineFileInputFormat
在运行 MapReduce 作业之前,可以指定要使用的 InputFormat 类。要实现 FileInputFormat,则需要您创建 RecordReader 的一个实例,而且前面已经提到过,RecordReader 会为映射器创建键-值对。

FileInputFormat 是一个抽象类,它是 InputFormat 的大多数实现的基础。它包含输入文件的位置,以及从这些文件生成碎片的过程的实现。将碎片转换为键-值对的方式是在子类中定义的。它的子类的一些示例包括 TextInputFormatKeyValueTextInputFormatCombineFileInputFormat

Hadoop 处理大文件(占用多于一个 数据块的文件)的效率更高。FileInputFormat 将每个大文件都转换为碎片,而且创建的每个碎片都包含一个文件的一部分。前面已经提到过,会为每个碎片生成一个映射器。图 1 描绘了如何在映射阶段如何使用 FileInputFormatRecordReader 处理文件。

图 1. 使用 FileInputFormat 处理大文件
fig1.png

但是,当输入文件小于默认的数据块大小时,会创建许多碎片(从而会创建许多映射器)。这种安排会降低作业的效率。 图 2 显示了在为许多小文件使用 FileInputFormat 时,会创建太多的映射器。

图 2. 使用 FileInputFormat 处理许多小文件
fig2.png

为了避免出现这种情形,我们引入了 CombineFileInputFormat。这个 InputFormat 能够很好地处理小文件,因为它将许多文件打包到一个碎片中,所以映射器更少,而且每个映射器有更多的数据要处理。不同于 FileInputFormat 的其他子类,CombineFileInputFormat 是一个抽象类,在使用它之前需要执行额外的更改。除了这些更改之外,您必须确保已阻止对输入进行拆分。 图 3 显示了CombineFileInputFormat 如何处理小文件,以便创建更少的映射器。

图 3. 使用CombineFileInputFormat 处理许多小文件
fig3.png

用于写入文件的 MapReduce 类
需要将文档的文本内容保存在容易在 Hadoop 中处理的文件中。可以使用序列文件,但在本例中,创建了分隔的文本文件,在每条记录中包含了一个文件的内容。此方法使内容很容易读取,很容易在下游的 MapReduce 作业中使用。用于在 MapReduce 中写入文件的 Java 类是 OutputFormatRecordWriter。这些类类似于 InputFormatRecordReader,但它们用于输出。FileOutputFormat 实现了OutputFormat。它包含输出文件和目录的路径,而且包含必须如何运行写入作业的指令。

RecordWriterOutputFormat 类中创建,定义从映射器传递的每条记录写入到输出路径中的方式。

实现自定义的 MapReduce 类
在本文中使用的试验场景中,您可能希望在 Hadoop 中处理并对大量较小的二进制文件进行归档。例如,您可能需要让 Hadoop 分析多个 PDF 格式的研究论文。使用传统的 MapReduce 技术,需要花费相对较长的时间才能完成此作业,因为您使用了太多的小文件作为输入。此外,MapReduce 无法自然地读取您文件的 PDF 格式。除了这些限制之外,在 Hadoop 分布式文件系统中存储许多小文件也会消耗 NameNode 上的大量内存。每 100 万个文件或数据块大约需要使用 1 GB 内存。因此,使用传统的 MapReduce 技术处理小于一个数据块的文件的效率较低。开发一个具有以下特征的程序会提高效率:
  • 针对处理大量小文件而进行优化
  • 可读取二进制文件
  • 生成更少、更大的文件作为输出
一种更好的方法是使用 Apache Tika 读取所有受支持的文档格式的文本,开发一个 TikaInputFormat 类来使用 MapReduce 任务读取和处理小文件,并使用 TikaOutputFormat 显示结果。使用 InputFormatRecordReaderRecordWriter 来处理解决方案。我们的目的是读取许多较小的 PDF 文件,生成具有类似于以下代码的分隔格式的输出。

清单 2. 想要的输出
[mw_shl_code=text,true]<file1.pdf>|<content of file1>
<file2.pdf>|<content of file2>
<file3.pdf>|<content of file3>
...[/mw_shl_code]

此输出可以在以后用于下游分析。以下各节将会详细介绍每个类。

TikaHelper 将二进制数据转换为文本
这个帮助器类的用途是将一个二进制数据流转换为文本格式。它接收一个 Java I/O 流作为输入,返回与该流等效的 string

如果您熟悉 MapReduce,就会知道所有任务都包含在运行时设置的一些配置参数。通过使用这些参数,可以定义作业应如何运行 — 例如输出所在的位置。您还可以添加将供这些类使用的参数。

在这个应用程序中,假设您希望输出一个分隔文件。那么您需要采用某种方式来将原始文本字段中所选的分隔字符替换为不同的字符,还需要采用某种方式来将文本中的新行替换为同样的取代字符。出于此用途,我们将添加两个参数:com.ibm.imte.tika.delimiter 和 com.ibm.imte.tika.replaceCharacterWith。如 清单 3 中所示,在 TikaHelper 类中,从一个 Configuration 实例读取这些参数来获得替换选项。Configuration 从 RecordReader 传递而来,它创建了 TikaHelper 实例,我们将在本文的下一节中介绍它。

清单 3. TikaHelper.java 构造函数
[mw_shl_code=java,true]public TikaHelper(Configuration conf)
{
        tika = new Tika();
        String confDelimiter = conf.get("com.ibm.imte.tika.delimiter");
        String confReplaceChar =
                conf.get("com.ibm.imte.tika.replaceCharacterWith");
        if (confDelimiter != null )
                this.delimiter = "["+ confDelimiter + "]";
        if (confReplaceChar != null )
                this.replaceWith = confReplaceChar;
        logger.info("Delimiter: " + delimiter);
        logger.info("Replace With character:" + replaceWith);
}[/mw_shl_code]

准备好选项后,请调用 readPath 方法来获取要转换为文本的数据流。替换了配置中所有想要替换的字符后,返回文件内容的 string 表示。

在一个 string 对象上调用 replaceAll 方法,将所有循环出现的字符替换为该参数中指定的字符。因为它接受一个正则表达式作为输入,所以在字符两边加上正则表达式分组字符 [ 和 。在解决方案中,指示如果 com.ibm.imte.tika.replaceCharacterWith 未指定,则将所有字符替换为一个空字符串。

在本文中,将输出保存为分隔文件。这会使得它们很容易被读取和处理。但是,您需要删除原始文本中的换行符和分隔字符。在情绪分析或欺诈检测等用例中,这些字符并不重要。如果需要 100% 地保留原始文本,那么可以将结果输出为二进制 Hadoop 序列文件。

清单 4. TikaHelper 构造函数
[mw_shl_code=java,true]public  String readPath(InputStream stream)
{
        try
        {
                String content = tika.parseToString(stream);
                content = content.replaceAll(delimiter, replaceWith);
                content = content.replaceAll(endLine, replaceWith);        
                return content;
        }
        catch (Exception e)
        {
                logger.error("Malformed PDF for Tika: " + e.getMessage());
        }
        return "Malformed PDF";
}[/mw_shl_code]

TikaInputFormat 定义作业
每个 MapReduce 任务都必须有一个 InputFormat。TikaInputFormat 是此解决方案中开发的 InputFormat。它从 CombineFileInputFormat 类扩展而来,使用 Text 作为键和值的输入参数。Text 是一个可写对象 (writable),它是 Hadoop 要用于键-值对的序列化格式。

TikaInputFormat 用于验证作业的配置,拆分输入数据块,并创建一个合适的 RecordReader。如 createRecordReader 方法中的 清单 5 中所示,可以返回一个 RecordReader 实例。前面已经介绍过,不需要拆分 TikaInputFormat 格式的文件,因为这些文件被视为很小。无论如何,TikaHelper 都无法读取文件的各个部分。因此,必须将 isSplitable 方法的返回值设置为 false。

清单 5. TikaInputFormat.java
[mw_shl_code=java,true]public class TikaInputFormat extends CombineFileInputFormat<Text, Text>
{
        @Override
        public RecordReader<Text, Text> createRecordReader(InputSplit split,
                        TaskAttemptContext context) throws IOException
        {
                return new TikaRecordReader((CombineFileSplit) split, context);
        }
        
        @Override
        protected boolean isSplitable(JobContext context, Path file)
        {
                return false;
        }
}[/mw_shl_code]

TikaRecordReader 生成键-值对
TikaRecordReader 使用提供给 TikaInputFormat 的数据生成键-值对。这个类派生自 RecordReader 抽象类。本节将介绍该构造函数和 nextKeyValue 方法。

在 清单 6 中所示的构造函数中,存储执行从 TikaInputFormat 传送的作业所需的信息。Path[] paths 存储每个文件的路径,FileSystem fs 表示 Hadoop 中的一个文件系统,CombineFileSplit split 包含碎片的条件。请注意,我们还使用 Configuration 创建了一个 TikaHelper 实例,以便解析 TikaRecordReader 类中的文件。

清单 6. TikaRecordReader.java 构造函数
[mw_shl_code=java,true]public TikaRecordReader(CombineFileSplit split, TaskAttemptContext context)
                        throws IOException
{
        this.paths = split.getPaths();
        this.fs = FileSystem.get(context.getConfiguration());
        this.split = split;
        this.tikaHelper = new TikaHelper(context.getConfiguration());
}[/mw_shl_code]

在 清单 7 中所示的nextKeyValue 方法中,遍历Path[] 中的每个文件,返回一个 Text 类型的键和值,其中分别包含文件路径和每个文件的内容。为此,首先确定是否已位于文件数组的末尾。如果不是,则前进到数组中的下一个可用文件。然后打开一个连接该文件的FSDataInputStream 流。在这种情况下,键是文件的路径,值是文本内容。将该流传递给 TikaHelper 来读取值的内容。(始终指向迭代中的当前文件的 currentStream 字段。)接下来,关闭用完的流。

为输入中的每个文件运行此方法一次。每个文件都生成一个键-值对。之前已经解释过,读取一个碎片时,会打开下一个碎片来获取记录,依此类推。此过程也会在其他碎片上并行发生。最终,通过返回 false 值来停止循环。

除了以下代码之外,还必须覆盖一些默认函数,如可通过 下载 获得的完整代码所示。

清单 7. TikaInputFormat.java nextKeyValue
[mw_shl_code=java,true]@Override
public boolean nextKeyValue() throws IOException, InterruptedException
{
        if (count >= split.getNumPaths())
        {
                done = true;
                return false;
                //we have no more data to parse
        }
        
        Path path = null;
        key = new Text();
        value = new Text();
               
               
        try {
                path = this.paths[count];
        } catch (Exception e) {
                return false;
        }
               
        currentStream = null;
        currentStream = fs.open(path);

        key.set(path.getName());
        value.set(tikaHelper.readPath(currentStream));

        currentStream.close();
        count++;

        return true; //we have more data to parse
}[/mw_shl_code]

TikaOutputFormat 指定了输出详细信息
这个类确定作业的输出在何处和如何存储。它必须从 OutputFormat 类扩展。在本例中,它从 FileOutputFormat 扩展而来。如 清单 8 中所示,首先分配输出的路径,然后创建一个 TikaRecordWriter 实例来生成输出文件。就像 TikaInputFormat 类一样,这个类必须在 main 方法中指定为用作 OutputFormat 类。

清单 8. TikaOutputFormat.java

[mw_shl_code=java,true]public class TikaOutputFormat extends FileOutputFormat<Text, Text>
{

        @Override
        public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context)
                        throws IOException, InterruptedException
        {
                //to get output files in part-r-00000 format
                Path path = getDefaultWorkFile(context, "");
                FileSystem fs = path.getFileSystem(context.getConfiguration());
                FSDataOutputStream output = fs.create(path, context);
                return new TikaRecordWriter(output, context);
        }

}[/mw_shl_code]

TikaRecordWriter 创建了输出
这个类用于创建输出。它必须从 RecordWriter 抽象类扩展。

在 清单 9 中所示的构造函数中,您获得输出流、上下文和自定义的配置参数,这个参数用作文件名和文件内容之间的分隔符。这个参数可在运行时进行设置(main 方法)。如果未指定分隔符,默认情况下会选择 | 作为分隔符。

清单 9. TikaRecordWriter.java 构造函数

[mw_shl_code=java,true]public TikaRecordWriter(DataOutputStream output, TaskAttemptContext context)
{
        this.out = output;
        String cDel = context.getConfiguration().get("com.ibm.imte.tika.delimiter");
        if (cDel != null)
                delimiter = cDel;
        logger.info("Delimiter character: " + delimiter);
}[/mw_shl_code]

在 清单 10 中所示的 write 方法中,使用了映射器中创建的 Text 类型的键和值,它们将被写入到输出流中。键包含文件名,值包含文件的文本内容。将键和值写入到输出中后,可以使用分隔符将它们分开,然后使用换行符将每行分开。

清单 10. TikaRecordWriter.java write
[mw_shl_code=java,true]@Override
public void write(Text key, Text value) throws IOException,
                InterruptedException
{
        out.writeBytes(key.toString());
        out.writeBytes(delimiter);
        out.writeBytes(value.toString());
        out.writeBytes("\n");
}[/mw_shl_code]

TikaDriver 使用应用程序
要运行 MapReduce 作业,需要定义一个驱动程序类 TikaDriver,它包含 main 方法,如 清单 11 中所示。可以将 TikaInputFormat 设置为自定义的 InputFormat,类似地,对于作业,可以将 TikaOutputFormat 设置为自定义的 OutputFormat。

清单 11. Main 方法
[mw_shl_code=java,true]public static void main(String[] args) throws Exception
{
        int exit = ToolRunner.run(new Configuration(), new TikaDriver(), args);
        System.exit(exit);
}

@Override
public int run(String[] args) throws Exception
{
        Configuration conf = new Configuration();
        //setting the input split size 64MB or 128MB are good.
        conf.setInt("mapreduce.input.fileinputformat.split.maxsize", 67108864);
        Job job = new Job(conf, "TikaMapreduce");
        conf.setStrings("com.ibm.imte.tika.delimiter", "|");
        conf.setStrings("com.ibm.imte.tika.replaceCharacterWith", "");
        job.setJarByClass(getClass());
        job.setJobName("TikaRead");
        
        job.setInputFormatClass(TikaInputFormat.class);
        job.setOutputFormatClass(TikaOutputFormat.class);
        
        FileInputFormat.addInputPath(job, new Path(args[0]));
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
               
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        return job.waitForCompletion(true) ? 0 : 1;
}[/mw_shl_code]

请注意加粗的第一行。如果未定义最大碎片大小,那么该任务会将所有输入文件归为一个碎片,所以只有一个映射任务。为了预防出现这种情况,可以定义最大碎片大小。此值可通过为 mapreduce.input.fileinputformat.split.maxsize 配置参数定义一个值来更改。这样,每个碎片都有一个可配置的大小,该大小在本例中为 64MB。

现在,我们已经完成了 MapReduce 作业。它夺取 HDFS 输入文件夹中的所有文件,将它们转码为分隔的输出文件。然后,您可以很方便地使用文本分析工具继续分析数据,比如 IBM Annotation Query Language (AQL)。如果想要一种不同的输出格式或想要直接转换数据,则必须相应地修改代码。因为许多人都对 Java 代码编程不太熟悉,所以本文将介绍如何在 Jaql 模块中使用同样的技术。

使用 Jaql 模块而不是 Java 类
本节将介绍如何使用上一节中的相同技术创建一个 Jaql 模块,如何使用此模块转换文档,从外部文件系统加载它们,以及如何直接分析它们。Jaql 模块使您能够使用一种直观的语法执行所有这些处理工作,无需编写任何 Java 代码。

前面描述的 InputFormatOutputFormatRecordReaderRecordWriter 类位于 org.apache.hadoop.mapreduceorg.apache.hadoop.mapreduce.lib.output 包中,这些包被称为新的 Hadoop API。

要为 Jaql 使用相同的方法,需要实现 org.apache.hadoop.mapred 包中的类,该包是一个较旧的 MapReduce API 版本。

首先,了解如何向旧包应用相同的方法。

TikaJaqlInputFormat 验证输入

这个类用于验证作业的输入配置,拆分输入数据块,以及创建 RecordReader。它扩展自org.apache.hadoop.mapred.MultiFileInputFormat 类,包含两个方法。

如 清单 12 中所示,构造函数创建了一个 TikaJaqlRecordReader 实例,isSplitable 方法被设置为返回 false 来覆盖阻止 InputFormat拆分文件的默认行为。为了能够在载入 Jaql 中后操作输入,可以使用一般类型 JsonHolder。

清单 12. TikaJaqlInputFormat.java
[mw_shl_code=java,true]public class TikaJaqlInputFormat extends MultiFileInputFormat<JsonHolder, JsonHolder>
{

        @Override
        public RecordReader<JsonHolder, JsonHolder> getRecordReader(
                        InputSplit split, JobConf job, Reporter reporter)
                        throws IOException
        {
                return new TikaJaqlRecordReader(job, (MultiFileSplit) split);
        }
        @Override
        protected boolean isSplitable(FileSystem fs, Path filename)
        {
                return false;
        }
}[/mw_shl_code]

TikaJaqlRecordReader 生成键-值对
此类用于生成 MapReduce 中使用的键-值对。它派生自 org.apache.hadoop.mapred.RecordReader 类,以便保持与 Jaql 的兼容性。本节将介绍该构造函数和 next 方法。

在 清单 13 中所示的构造函数中,初始化所要的类变量。获取包含有关文件的信息的碎片,然后创建一个新的 TikaHelper 实例来读取二进制文件。

清单 13. TikaJaqlRecordReader 构造函数
[mw_shl_code=java,true]public TikaJaqlRecordReader(Configuration conf, MultiFileSplit split)
                throws IOException
{
        this.split = split;
        this.conf = conf;
        this.paths = split.getPaths();
        this.tikaHelper = new TikaHelper(conf);
}[/mw_shl_code]

在 next 方法中,如 清单 14 中所示,逐个迭代碎片中的所有文件。打开每个文件的一个流后,将它的名称和内容作为元素分配给一个新的 BufferedJsonRecord 实例。BufferedJsonRecord 帮助您以合适的格式保留各项。Jaql 在内部在 JSON 文档上运行,所以所有数据都由 I/O 适配器转换为有效的 JSON 对象。然后,BufferedJsonRecord被分配为记录的值。但是,键仍然是空的。

清单 14. TikaJaqlRecordReader next 方法
[mw_shl_code=java,true]public boolean next(JsonHolder key, JsonHolder value) throws IOException
{
        if (count >= split.getNumPaths())
        {
                done = true;
                return false;
        }
        
        Path file = paths[count];
        fs = file.getFileSystem(conf);
        InputStream stream = fs.open(file);
        

        BufferedJsonRecord bjr = new BufferedJsonRecord();
        
        bjr.setNotSorted();
        bjr.add(new JsonString("path"), new JsonString(file.getName()));

        bjr.add(new JsonString("content"),
                new JsonString(this.tikaHelper.readPath(stream)));
               
               
        value.setValue(bjr);
               
        stream.close();
               
        count++;

        return true;
}[/mw_shl_code]

创建 Jaql 模块
Jaql 模块使用户能够创建可重用的 Jaql 函数和资源的包。创建一个包含 I/O 适配器的 tika 模块。I/O 适配器被传递给 I/O 函数,允许 Jaql 从各种类型的来源读取或写入数据,这些来源包括分隔文件、序列文件、AVRO 文件、HBase 和 Hive 表,等等。这个 tika 模块使用户能够读取 Apache Tika 所支持的二进制文件(比如 Word 文件或 PDF 文档),提取文件名和文本内容。要创建 tika 模块,可以导出之前开发的 TikaJaql 类作为 JAR 文件。Jaql 可动态地加载 Java 资源,使用函数 addRelativeClassPath() 将它们添加到类路径中,以便注册这些额外的库。

在 Jaql 中,创建和引用模块很简单。通过将每个 Jaql 脚本添加到 Jaql 的搜索路径中,可以将它们添加为模块。实现此操作的最简单的方式是:在 $JAQL_HOME/modules 目录中创建一个新文件夹,并在这里包含您的文件。在本例中,该模块名为 tika,所以您需要创建文件夹 $JAQL_HOME/modules/tika。然后,可以使用 Jaql 脚本创建函数,并将它们包含在此文件夹中。

创建一个名为 tikaRead() 的自定义函数,它使用 com.ibm.imte.tika.jaql.TikaJaqlInputFormat 作为输入格式组件。此函数用于读取数据,所以只需更改 inoptions(无需更改 outoptions)。基于上一节中开发的已实现的类,调用 tikaRead() 函数作为读取的输入,这会为每个输入文件生成一个包含两个字段的记录:path 是完整的文件名,content 是文件的文本内容。对 tikaRead() 函数的调用类似于调用其他任何 Jaql 输入 I/O 适配器,比如 lines() 或 del()。后面的一节中包含一些使用示例。

创建文件 tika.jaql,如 清单 15 中所示,并将它放在 $JAQL_HOME/modules/tika 目录中,以便能够轻松地将它导入到其他 Jaql 脚本中。Jaql 文件的名称无关紧要,但在 modules 文件夹下创建的文件夹的名称很重要。您还可以使用命令行选项从 Jaql 支持的终端动态地添加模块。

此代码在 /home/biadmin/ 中寻找已生成的 JAR 文件。您需要将 Tika JAR 文件复制到此文件夹,将创建的类文件作为 TikaJaql.jar 导出到此文件夹。在 Eclipse 中,可以使用 Export 命令从一个项目创建一个 JAR 文件。

清单 15. tika.jaql
[mw_shl_code=text,true]addRelativeClassPath(getSystemSearchPath(), '/home/biadmin/tika-app-1.5.jar,/hom
e/biadmin/TikaJaql.jar');

//creating the function
tikaRead = fn  (
                 location         : string,
                 inoptions        : {*}? = null,
                 outoptions        : {*}? = null
                )
{
  location,
  "inoptions": {
           "adapter": "com.ibm.jaql.io.hadoop.DefaultHadoopInputAdapter",
           "format": "com.ibm.imte.tika.jaql.TikaJaqlInputFormat",
           "configurator": "com.ibm.jaql.io.hadoop.FileInputConfigurator"
  }
};[/mw_shl_code]

使用 Jaql
创建该模块后,使用以下示例帮助了解此函数的一些可能用法。

Jaql 非常灵活,可用于转换和分析数据。它拥有与分析工具的连接器,比如数据挖掘和文本分析 (AQL)。它拥有各种文件格式(比如线、序列和 Avro)和外部来源(比如 Hive 和 HBase)的连接器。您还可以使用它从本地文件系统或者甚至从 Web 直接读取文件。

下一节将演示在 Jaql 中使用 tika 模块的 3 个示例。第一个示例展示了 HDFS 上的二进制文档向包含文本内容的分隔文件的基本转换。这个示例演示了模块的基本功能;它等效于在前几节中使用 MapReduce 作业执行的任务。第二个示例将展示如何使用 Jaql 直接从外部文件系统来源将二进制文档加载到 HDFS 并进行转换。实践证明,如果不希望在 HDFS 中存储二进制文档,仅希望以文本或序列文件格式存储内容,那么这个示例是一个有用的过程。在这种情况下,加载是单线程的,所以它没有与第一个方法相同的吞吐量。第三个示例展示了在读取文件后,如何直接在 Jaql 中执行文本分析,无需首先提取并持久化文本内容。

使用 清单 16 中的代码,从 HDFS 的一个目录内读取文件,然后将结果写回 HDFS 中。此方法非常类似于第一节的 MapReduce 作业中完成的操作。必须先导入所创建的 tika 模块,然后才能使用 tikaRead() 功能。然后,使用 read() 函数读取指定文件夹中的文件,以带分隔符的文件格式将文件名称和文本内容写入到 HDFS 中的一个文件中。

在 InfoSphere BigInsights 知识中心 中,可以找到 Jaql 的更多信息。

演示输入是一个文件夹中的一组 Word 格式的客户评论,如 清单 16 中所示。在 10 条评论中,一些评论是正面的,一些评论是负面的。假设您希望提取文本并存储为分隔格式。随后,您可能希望在文本之上执行文本分析。您希望保留文件名,因为它会告诉您是谁创建了该评论。通常,此关系记录在一个单独的表中。

清单 16. hdfs:/tmp/reviews/ 中的输入文件
[mw_shl_code=text,true]review1.doc
review2.doc
review3.doc
...[/mw_shl_code]

如 清单 17 中所示,运行 Jaql 命令来读取此文件夹中的所有支持的文档,提取文本,将其保存为一个分隔文件中,每个原始文档对应这个文件中的一行。

清单 17. 使用 Jaql 将 HDFS 转换为 HDF
[mw_shl_code=text,true]import tika(*);

read(tikaRead("/tmp/reviews")) //You could put data transformations here
        -> write(del("/tmp/output",
                        {schema:schema{path,content}, delimiter:"|", quoted:true}));[/mw_shl_code]

现在,可以在 /tmp/output 文件夹中找到输出。此文件夹包含来自 /tmp/reviews 的 Word 文档的文本内容,内容格式如下。

清单 18. Jaql Tika 转换的输出
[mw_shl_code=text,true]::::::::::::::
part-00000
::::::::::::::
"review1.doc"|"I do not care for the camera.  "
"review10.doc"|"It was very reliable "
"review2.doc"|"The product was simply bad. "
"review3.doc"|"The user interface was simply atrocious. "
"review4.doc"|"The product interface is simply broken. "
...
::::::::::::::
part-00001
::::::::::::::
"review5.doc"|"The Windows client is simply crappy. "
"review6.doc"|"I liked the camera. It is a good product. "
"review7.doc"|"It is a phenomenal camera. "
"review8.doc"|"Just an awesome product. "
"review9.doc"|"I really liked the Camera. It is excellent. "
...[/mw_shl_code]

现在,可以使用其他工具(比如 Hive、Pig、MapReduce 或 Jaql)轻松地分析文档内容。每个映射任务都有一个部分文件。

使用 Jaql,只能从 HDFS 读取文件。通过将输入路径替换为指向(Jaql 实例的)一个本地磁盘的路径,可以从本地文件系统读取文件,并使用write() 方法将它们复制到 HDFS 中,如 清单 19 中所示。这种方法可以在单个步骤中将文档加载到 InfoSphere BigInsights 中并进行转换。转换不是并行完成的(因为最初未并行读取数据),但如果数据量不是太高,此方法还是很方便。

如果操作受到 CPU 限制,那么您还可以使用在 MapReduce 中运行的正常读取操作。但是,此方法需要您将文件放在网络文件系统上,并该系统挂载到所有数据节点上。localRead 命令在一个本地任务中运行转换。

清单 19. 使用 Jaql 将数据加载到 HDFS 中
[mw_shl_code=text,true]import tika(*);
localRead(tikaRead("file:///home/biadmin/Tika/CameraReviews"))
        -> write(seq("/tmp/output"));
[/mw_shl_code]

可以看到,这里的惟一区别是本地文件路径。Jaql 非常灵活,它可以动态地从在 MapReduce 中运行更改为本地运行模式。您可以在一个步骤中继续执行所有数据转换和分析。但是,Jaql 不会并行运行这些任务,因为本地文件系统不是并行的。请注意,在上面的示例中,输出格式被更改为 Jaql 序列文件。此方法是二进制的,而且更快,所以不需要替换原始文本中的字符。但是,不足之处在于,输出文件是人类无法理解的。此格式非常适合用作中间文件的高效、临时存储。

清单 20 中的最后一个示例展示了如何在一组二进制输入文档中运行情绪检测算法。(这里省略了为此用途创建 AQL 文本分析代码的步骤,因为已有其他综合性文章和参考资料进行了更详细的介绍。

清单 20. 使用 Jaql 执行文本分析
[mw_shl_code=text,true]import tika(*);
import systemT;
read(tikaRead("/tmp/reviews"))
        -> transform { label: $.path, text: $.content }
        -> transform { label: $.label,  sentiments:
        systemT::annotateDocument( $, ["EmotiveTone"],
                        ["file:///home/biadmin/Tika/"],
                        tokenizer="multilingual",
                        outputViews=["EmotiveTone.AllClues"])};[/mw_shl_code]

简单地讲,前几节中的命令可以读取二进制输入文档,从中提取文本内容,并使用 AQL 应用一个简单的情绪语气检测注释器。输出结果类似于 清单 21。

清单 21. Jaql 输出
[mw_shl_code=text,true][
   {
      "label": "review1.doc",
      "sentiments": {
         "EmotiveTone.AllClues": [
            {
               "clueType": "dislike",
               "match": "not care for"
            }
         ],
         "label": "review1.doc",
         "text": "I do not care for the camera.  "
      }
   },
   {
      "label": "review10.doc",
      "sentiments": {
         "EmotiveTone.AllClues": [
            {
               "clueType": "positive",
               "match": "reliable"
            }
         ],
         "label": "review10.doc",
         "text": "It was very reliable "
      }
   },
...[/mw_shl_code]

现在可以使用 Jaql 进一步聚合结果,比如按照产品统计积极和消极情绪来执行更深入的分析查询,以及直接将结果上传到数据库来执行更深入的分析查询。

归档文件

前面已经提到过,HDFS 无法高效地存储许多小文件。HDFS 中存储的每个数据块都需要 HDFS NameNode 中的少量内存(约为 100B)。因此,过量的小文件可能会增加 NameNode 上使用的内存量。因为您已经实现了一个解决方案来读取小二进制文件,并将它们转换为大文件作为输出,所以现在可以删除原始的小文件。但是,您在以后可能希望使用不同的方法重新分析二进制文件。通过使用 Hadoop Archive (HAR),可以将所选的小文件打包为更大的文件,从而减少 NameNodes 上的内存使用。它基本上等效于 Linux&#174; TAR 格式或 Windows&#8482; CAB 文件,但它位于 HDFS 上。

可以使用下面的模板运行 archive 命令。

清单 22. Archive 命令

[mw_shl_code=text,true]hadoop archive -archiveName archive_name.har -p /path_to_input_files
                                                        /path_to_output_directory[/mw_shl_code]

第一个参数指定了输出文件名,第二个参数指定了来源目录。这个示例仅包含一个来源目录,但此工具可以接受多个目录。

在创建归档后,可以浏览内容文件。

清单 23. 列出 HAR 文件
[mw_shl_code=text,true]hadoop fs -lsr har:///path_to_output_directory/archive_name.har[/mw_shl_code]

因为您拥有 HAR 格式的输入文件,所以现在可删除原始的小文件,以便实现此流程的用途。

一定要注意的是,HAR 文件可用作 MapReduce 的输入。但是,处理许多小文件(甚至在一个 HAR 中)仍然效率低下,因为没有可感知归档的InputFormat 来将包含多个小文件的 HAR 文件转换为一个 MapReduce 碎片。此限制意味着只能将 HAR 文件用作一种备份方法,作为减少 NameNode 上的内存使用的一种方法,而不是分析任务的输入的理想选择。出于这个原因,在创建 HAR 备份之前,需要提取原始文件的文本内容。

结束语
本文介绍了一种使用 Hadoop 和 Apache Tika 分析大量小二进制文档的方法。此方法显然不是实现此功能的惟一方式。您还可以从二进制文件创建序列文件,或者使用另一种存储方法,比如 Avro。但是,本文中描述的方法提供了一种分析各种类型的大量文件的便捷方式。将此方法与 Jaql 技术结合使用,就能够在从各种来源读取文件时直接提取内容。

Apache Tika 是最有用的例子之一,但几乎可以在其他任何 Java 库上采用相同的方法。例如,可以提取目前不受 Apache Tika 支持的二进制文档,比如 Outlook PST 文件。

仅使用 Java MapReduce,就可以实现本文中描述的一切功能。但是,本文第 2 部分中创建的 Jaql 模块,是一种将数据加载到 Hadoop 中并进行转换的便捷方式,无需 Java 编程技能。Jaql 模块使您能够在加载期间执行转换过程,以及使用文本或统计分析等分析功能,这些功能可在单个作业中完成。


代码网盘下载:
SampleCode.zip

http://url.cn/41UGq4x

链接:http://pan.baidu.com/s/1cMMtnK 密码:rrzy

来源:ibm
作者:ibm





已有(3)人评论

跳转到指定楼层
小伙425 发表于 2016-11-14 12:31:24
好东西 LZ辛苦啦,收藏啦
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条