本帖最后由 hyj 于 2019-4-22 18:58 编辑
问题导读
1.HBase Connector是用来干什么的?
2.Catalog的作用是什么?
3.HBase Connector该如何使用?
在github上有一个写hbase效率比较高的连接器,这里翻译把源码分享给大家。
Spark - HBase Connector是一个支持Spark访问HBase表作为外部数据源或接收器的库。 有了它,用户可以在DataFrame和DataSet级别上使用Spark-SQL操作HBase。
通过DataFrame和DataSet支持,该库利用优化器中的所有优化技术,实现数据局部性,分区修剪,谓词下推,扫描和BulkGet等。
Catalog
对于每个表,必须提供目录(Catalog),其中包括行键和具有预定义列族的数据类型的列,并定义hbase列和表模式之间的映射。 目录是用户定义的json格式。
数据类型转换
支持Java原始类型。 将来,将支持其他数据类型,这依赖于用户指定的serdes。 SHC支持三种内部serdes:Avro,Phoenix,PrimitiveType。 用户可以通过在其目录中定义“tableCoder”来指定他们想要使用的serde。 以Avro为例。 用户定义的serdes将负责将字节数组转换为Avro对象,连接器将负责将Avro对象转换为催化剂支持的数据类型。 当用户定义一个新的serde时,他们需要让它“实现”特征“SHCDataType”。
请注意,如果用户希望DataFrame仅处理字节数组,则可以指定二进制类型。 然后,用户可以将每列的催化剂行作为字节数组。 用户可以使用自定义反序列化器进一步反序列化,或直接对DataFrame的RDD进行操作。
数据本地性
当Spark work节点与hbase region服务器共处一地时,通过识别region服务器位置并将worker与region server共同定位来实现数据局部性。 每个执行程序仅对同一主机上共存的数据部分执行Scan / BulkGet。
分区Pruning
通过从谓词中提取行键,我们将scan / BulkGet拆分为多个非重叠区域,只有具有所请求数据的区域服务器才会执行scan / BulkGet。 目前,在行键的第一维上执行分区修剪。 请注意,需要仔细定义WHERE条件。 否则,结果扫描可以包括大于用户期望的区域。 例如,以下条件将导致完整扫描(rowkey1是rowkey的第一个维度,而column是常规hbase列)。 在哪里rowkey1>“abc”OR列=“xyz”
扫描和BulkGet
两者都通过指定WHERE CLAUSE向用户公开,例如,其中列> x和列<y用于扫描,其中column = x用于get。 所有操作都在执行程序中执行,驱动程序仅构造这些操作。 在内部,我们将它们转换为扫描或获取或两者的组合,将Iterator [Row]返回到catalyst 引擎。
创建数据源
该库支持从/向HBase读/写。
编译
mvn package -DskipTests
运行测试用例
运行测试
[mw_shl_code=bash,true]mvn clean package test
[/mw_shl_code]
运行独立测试
[mw_shl_code=bash,true]mvn -DwildcardSuites=org.apache.spark.sql.DefaultSourceSuite test
[/mw_shl_code]
运行SHC例子
[mw_shl_code=bash,true]./bin/spark-submit --verbose --class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource --master yarn-cluster --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories http://repo.hortonworks.com/content/groups/public/ --files /usr/hdp/current/hbase-client/conf/hbase-site.xml shc-examples-1.1.1-2.1-s_2.11-SNAPSHOT.jar
[/mw_shl_code]
以下说明了如何在真正的hbase群集中运行应用程序。 您需要提供hbase-site.xml。 它可能会根据特定群集配置进行更改。
[mw_shl_code=bash,true]./bin/spark-submit --class your.application.class --master yarn-client --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories http://repo.hortonworks.com/content/groups/public/ --files /etc/hbase/conf/hbase-site.xml /To/your/application/jar
[/mw_shl_code]
使用此连接器运行Spark应用程序,默认情况下将提取版本为1.1.2的HBase jar。 如果在HBase集群上启用Phoenix,则需要使用“--jars”来传递“phoenix-server.jar”。 例如:
[mw_shl_code=bash,true]./bin/spark-submit --class your.application.class --master yarn-client --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories http://repo.hortonworks.com/content/groups/public/ --jars /usr/hdp/current/phoenix-client/phoenix-server.jar --files /etc/hbase/conf/hbase-site.xml /To/your/application/jar
[/mw_shl_code]
应用程序用法
以下说明了如何使用连接器的基本步骤。 有关更多详细信息和高级用例,例如Avro和组合密钥支持,请参阅存储库中的示例。
定义了HBase目录
[mw_shl_code=bash,true]def catalog = s"""{
|"table":{"namespace":"default", "name":"table1"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
|"col2":{"cf":"cf2", "col":"col2", "type":"double"},
|"col3":{"cf":"cf3", "col":"col3", "type":"float"},
|"col4":{"cf":"cf4", "col":"col4", "type":"int"},
|"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
|"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
|"col7":{"cf":"cf7", "col":"col7", "type":"string"},
|"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
|}
|}""".stripMargin[/mw_shl_code]
上面定义了一个HBase表的模式,其名称为table1,行键为key,列数为col1-col8。 请注意,还必须将rowkey详细定义为列(col0),该列具有特定的cf(rowkey)。
写入HBase表以填充数据
[mw_shl_code=scala,true]sc.parallelize(data).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()[/mw_shl_code]
给定具有指定模式的DataFrame,上面将创建一个包含5个区域的HBase表,并将DataFrame保存在其中。 请注意,如果未指定HBaseTableCatalog.newTable,则必须预先创建表。
在HBase表之上执行DataFrame操作
[mw_shl_code=scala,true]def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog->cat))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
}[/mw_shl_code]
复杂的查询
[mw_shl_code=bash,true]val df = withCatalog(catalog)
val s = df.filter((($"col0" <= "row050" && $"col0" > "row040") ||
$"col0" === "row005" ||
$"col0" === "row020" ||
$"col0" === "r20" ||
$"col0" <= "row005") &&
($"col4" === 1 ||
$"col4" === 42))
.select("col0", "col1", "col4")
s.show[/mw_shl_code]
SQL 支持
[mw_shl_code=scala,true]// Load the dataframe
val df = withCatalog(catalog)
//SQL example
df.createOrReplaceTempView("table")
sqlContext.sql("select count(col1) from table").show[/mw_shl_code]
配置Spark-package
用户可以将Spark-on-HBase连接器用作标准Spark包。 要在Spark应用程序中包含该软件包,请使用:
注意:com.hortonworks:shc-core:1.1.1-2.1-s_2.11尚未上传到spark-packages.org,但很快就会上传。
spark-shell,pyspark或spark-submit
[mw_shl_code=bash,true]$SPARK_HOME/bin/spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11
[/mw_shl_code]
用户也可以将包作为依赖项包含在SBT文件中。 格式是build.sbt文件中的spark-package-name:版本。
[mw_shl_code=bash,true]libraryDependencies += “com.hortonworks/shc-core:1.1.1-2.1-s_2.11”
[/mw_shl_code]
在安全集群中运行
要在启用Kerberos的群集中运行,用户必须将HBase相关的jar包含到类路径中,因为HBase令牌检索和续订由Spark完成,并且独立于连接器。 换句话说,用户需要通过kinit或提供principal / keytab以正常方式启动环境。 以下示例显示如何在具有yarn-client和yarn-cluster模式的安全集群中运行。 请注意,如果您的Spark不包含Apache Spark 2.1.1+中的SPARK-20059和Apache Spark 2.3.0+中的SPARK-21377,则需要为这两种模式设置SPARK_CLASSPATH(请参阅此处)。
假设hrt_qa是无头帐户,用户可以使用以下命令进行kinit:
[mw_shl_code=bash,true]kinit -k -t /tmp/hrt_qa.headless.keytab hrt_qa
/usr/hdp/current/spark-client/bin/spark-submit --class your.application.class --master yarn-client --files /etc/hbase/conf/hbase-site.xml --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories http://repo.hortonworks.com/content/groups/public/ /To/your/application/jar
/usr/hdp/current/spark-client/bin/spark-submit --class your.application.class --master yarn-cluster --files /etc/hbase/conf/hbase-site.xml --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories http://repo.hortonworks.com/content/groups/public/ /To/your/application/jar[/mw_shl_code]
如果上述解决方案不起作用,您会遇到如下错误:
[mw_shl_code=bash,true]org.apache.zookeeper.ZooKeeper: Initiating client connection, connectString=localhost:2181
[/mw_shl_code]
或则
[mw_shl_code=bash,true]ERROR ipc.AbstractRpcClient: SASL authentication failed. The most likely cause is missing or invalid credentials. Consider 'kinit'.
javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)][/mw_shl_code]
通过创建指向主hbase-site.xml的符号链接,在提交spark作业的主机上包含SPARK_CONF_DIR(/ etc / spark / conf)下的hbase-site.xml(以便与您的平台同步) 更新)。
使用SHCCredentialsManager
Spark仅支持访问单个安全HBase集群的用例。 如果您的应用程序需要访问多个安全HBase集群,则用户需要使用SHCCredentialsManager。 SHCCredentialsManager支持单个安全HBase集群以及多个安全HBase集群。 默认情况下禁用它,但用户可以将spark.hbase.connector.security.credentials.enabled设置为true以启用它。 此外,用户需要在运行其应用程序之前配置principal和keytab,如下所示。
[mw_shl_code=bash,true] spark.hbase.connector.security.credentials.enabled true
spark.hbase.connector.security.credentials ambari-qa-c1@EXAMPLE.COM
spark.hbase.connector.security.keytab /etc/security/keytabs/smokeuser.headless.keytab[/mw_shl_code]
或则:
[mw_shl_code=bash,true] spark.hbase.connector.security.credentials.enabled true
spark.yarn.principal ambari-qa-c1@EXAMPLE.COM
spark.yarn.keytab /etc/security/keytabs/smokeuser.headless.keytab[/mw_shl_code]
其它
支持Avro架构:
连接器完全支持所有avro架构。 用户可以使用完整的记录模式或部分字段模式作为其目录中的数据类型(有关详细信息,请参阅此处)。
[mw_shl_code=bash,true]val schema_array = s"""{"type": "array", "items": ["string","null"]}""".stripMargin
val schema_record =
s"""{"namespace": "example.avro",
| "type": "record", "name": "User",
| "fields": [ {"name": "name", "type": "string"},
| {"name": "favorite_number", "type": ["int", "null"]},
| {"name": "favorite_color", "type": ["string", "null"]} ] }""".stripMargin
val catalog = s"""{
|"table":{"namespace":"default", "name":"htable"},
|"rowkey":"key1",
|"columns":{
|"col1":{"cf":"rowkey", "col":"key1", "type":"double"},
|"col2":{"cf":"cf1", "col":"col1", "avro":"schema_array"},
|"col3":{"cf":"cf1", "col":"col2", "avro":"schema_record"},
|"col4":{"cf":"cf1", "col":"col3", "type":"double"},
|"col5":{"cf":"cf1", "col":"col4", "type":"string"}
|}
|}""".stripMargin
val df = sqlContext.read.options(Map("schema_array"->schema_array,"schema_record"->schema_record, HBaseTableCatalog.tableCatalog->catalog)).format("org.apache.spark.sql.execution.datasources.hbase").load()
df.write.options(Map("schema_array"->schema_array,"schema_record"->schema_record, HBaseTableCatalog.tableCatalog->catalog)).format("org.apache.spark.sql.execution.datasources.hbase").save()[/mw_shl_code]
TODO
[mw_shl_code=bash,true]val complex = s"""MAP<int, struct<varchar:string>>"""
val schema =
s"""{"namespace": "example.avro",
| "type": "record", "name": "User",
| "fields": [ {"name": "name", "type": "string"},
| {"name": "favorite_number", "type": ["int", "null"]},
| {"name": "favorite_color", "type": ["string", "null"]} ] }""".stripMargin
val catalog = s"""{
|"table":{"namespace":"default", "name":"htable"},
|"rowkey":"key1:key2",
|"columns":{
|"col1":{"cf":"rowkey", "col":"key1", "type":"binary"},
|"col2":{"cf":"rowkey", "col":"key2", "type":"double"},
|"col3":{"cf":"cf1", "col":"col1", "avro":"schema1"},
|"col4":{"cf":"cf1", "col":"col2", "type":"string"},
|"col5":{"cf":"cf1", "col":"col3", "type":"double", "sedes":"org.apache.spark.sql.execution.datasources.hbase.DoubleSedes"},
|"col6":{"cf":"cf1", "col":"col4", "type":"$complex"}
|}
|}""".stripMargin
val df = sqlContext.read.options(Map("schema1"->schema, HBaseTableCatalog.tableCatalog->catalog)).format("org.apache.spark.sql.execution.datasources.hbase").load()
df.write.options(Map("schema1"->schema, HBaseTableCatalog.tableCatalog->catalog)).format("org.apache.spark.sql.execution.datasources.hbase").save()[/mw_shl_code]
上图说明了我们的下一步,其中包括复合键支持,复杂数据类型,支持定制的serde和avro。 请注意,虽然所有主要部分都包含在当前代码库中,但它现在可能无法正常运行。
GitHub源码下载:
shc-master.zip
(122.33 KB, 下载次数: 7)
|
|