分享

Thrift了解4:C#通过Thrift操作HBase实战

howtodown 2014-3-16 23:48:33 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 2 27627
通过前面三篇
thrift了解1:让你真正明白到底什么是thrift
Thrift了解2: Thrift使用方法
Thrift了解3:Thrift使用实例--生成各种语言指导


了解了Thrift,这里面我们进一步了解Thrift,来达到我们能够通过Thrift来操作我们所关心的大数据。


同样我们需要知道:
1.C#如何通过Thrift来操作HBase?
2.Hbase.thrift的作用是什么?
如果能够明白第二问题,那么整个Thrift也就通透了。

已有(2)人评论

跳转到指定楼层
howtodown 发表于 2014-3-16 23:54:25
在基于HBase数据库的开发中,对应Java语言来说,可以直接使用HBase的原生API来操作HBase表数据,当然你要是不嫌麻烦可以使用Thrift客户端Java API。对于具有其他编程语言背景的开发人员,为了获取HBase带来的好处,那么就可以选择使用HBase Thrift客户端对应编程语言的API,来实现与HBase的交互。

这里,我们使用C#客户端来操作HBase。HBase的Thrift接口的定义,我们需要安装Thrift编译器,才能生成HBase跨语言的API,这里,我使用的版本是0.9.0。需要注意的是,一定要保证,安装了某个版本Thrift的Thrift编译器,在导入对应语言库的时候,版本一定要统一,否则就会出现各种各样的问题,因为不同Thrift版本,对应编程语言的库API可能有变化。

执行如下命令,生成C#编程语言的HBase Thrift客户端API:(通过下面,我们知道首先需要Hbase.thrift)
  1. [hadoop@master hbase]$ thrift --gen csharp Hbase.thrift
  2. [hadoop@master hbase]$ ls
  3. gen-csharp
复制代码
这里,我们基于C#语言,使用HBase 的Thrift 客户端API访问HBase表。事实上,如果使用Java来实现对HBase表的操作,最好是使用HBase的原生API,无论从性能还是便利性方面,都会提供更好的体验。使用Thrift API访问,实际也是在HBase API之上进行了一层封装,可能初次使用Thrift API感觉很别扭,有时候还要参考Thrift服务端的实现代码。
准备工作如下:

  • 下载Thrift软件包,解压缩后,拷贝thrift-0.9.0/lib/java/src下面的代码到工作区(开发工具中)
  • 将上面生成的gen-csharp目录中代码拷贝到工作区
  • 保证HBase集群正常运行,接着启动HBase的Thrift服务,执行如下命令:
  1. bin/hbase thrift -b master -p 9090 start
复制代码
上面,HBase的Thrift服务端口为9090,下面通过Thrift API访问的时候,需要用到,而不是HBase的服务端口(默认60000)。
接着,实现一个简单的例子,访问Hbase表。

首先,我们通过HBase Shell创建一个表:
  1. create 'test_info', 'info'
复制代码
表名为test_info,列簇名称为info。
然后,我们开始基于上面生成的Thrift代码来实现对HBase表的操作。

这里,我们实际上是对HBase Thrift客户端Java API实践中的Java代码进行了翻译,改写成C#语言的相关操作。我们在客户端,进行了一层抽象,更加便于传递各种参数,抽象类为AbstractHBaseThriftService,对应的命名空间为HbaseThrift.HBase.Thrift,该类实现代码如下所示:
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using System.Threading.Tasks;
  6. using Thrift.Transport;
  7. using Thrift.Protocol;
  8. namespace HbaseThrift.HBase.Thrift
  9. {
  10.     public abstract class AbstractHBaseThriftService
  11.     {
  12.         protected static readonly string CHARSET = "UTF-8";
  13.             private string host = "localhost";
  14.             private int port = 9090;
  15.             private readonly TTransport transport;
  16.             protected readonly Hbase.Client client;
  17.         public AbstractHBaseThriftService() : this("localhost", 9090)
  18.         {
  19.             
  20.         }
  21.         public AbstractHBaseThriftService(string host, int port)
  22.         {
  23.             this.host = host;
  24.             this.port = port;
  25.             transport = new TSocket(host, port);
  26.             TProtocol protocol = new TBinaryProtocol(transport, true, true);
  27.             client = new Hbase.Client(protocol);
  28.         }
  29.         public void Open() {
  30.             if (transport != null)
  31.             {
  32.                 transport.Open();
  33.             }
  34.             }
  35.         public void Close()
  36.         {
  37.             if (transport != null)
  38.             {
  39.                 transport.Close();
  40.             }
  41.         }
  42.         public abstract List<string> GetTables();
  43.         
  44.             public abstract void Update(string table, string rowKey, bool writeToWal,
  45.                         string fieldName, string fieldValue, Dictionary<string, string> attributes);
  46.         public abstract void Update(string table, string rowKey, bool writeToWal,
  47.                         Dictionary<string, string> fieldNameValues, Dictionary<string, string> attributes);
  48.         
  49.             public abstract void DeleteCell(string table, string rowKey, bool writeToWal,
  50.                             string column, Dictionary<string, string> attributes);
  51.             public abstract void DeleteCells(string table, string rowKey, bool writeToWal,
  52.                             List<string> columns, Dictionary<string, string> attributes);
  53.         
  54.              public abstract void DeleteRow(string table, string rowKey,
  55.                             Dictionary<string, string> attributes);
  56.                         
  57.             public abstract int ScannerOpen(string table, string startRow, List<string> columns,
  58.                     Dictionary<string, string> attributes);
  59.             public abstract int ScannerOpen(string table, string startRow, string stopRow, List<string> columns,
  60.                     Dictionary<string, string> attributes);
  61.             public abstract int ScannerOpenWithPrefix(string table, string startAndPrefix,
  62.                 List<string> columns, Dictionary<string, string> attributes);
  63.             public abstract int ScannerOpenTs(string table, string startRow,
  64.                     List<string> columns, long timestamp, Dictionary<string, string> attributes);
  65.             public abstract int ScannerOpenTs(string table, string startRow, string stopRow,
  66.                     List<string> columns, long timestamp, Dictionary<string, string> attributes);
  67.                         
  68.             public abstract List<TRowResult> ScannerGetList(int id, int nbRows);
  69.             public abstract List<TRowResult> ScannerGet(int id);
  70.         
  71.             public abstract List<TRowResult> GetRow(string table, string row,
  72.                             Dictionary<string, string> attributes);
  73.             public abstract List<TRowResult> GetRows(string table,
  74.                  List<string> rows, Dictionary<string, string> attributes);
  75.             public abstract List<TRowResult> GetRowsWithColumns(string table,
  76.                  List<string> rows, List<string> columns, Dictionary<string, string> attributes);
  77.         
  78.             public abstract void ScannerClose(int id);
  79.         
  80.             /**
  81.              * Iterate result rows(just for test purpose)
  82.              * @param result
  83.              */
  84.             public abstract void IterateResults(TRowResult result);
  85.     }
  86. }
复制代码
这里,简单叙述一下,我们提供的客户端API的基本功能:

  • 建立到Thrift服务的连接:Open()
  • 获取到HBase中的所有表名:GetTables()
  • 更新HBase表记录:Update()
  • 删除HBase表中一行的记录的数据(cell):DeleteCell()和DeleCells()
  • 删除HBase表中一行记录:deleteRow()
  • 打开一个Scanner,返回id:ScannerOpen()、ScannerOpenWithPrefix()和ScannerOpenTs();然后用返回的id迭代记录:ScannerGetList()和ScannerGet()
  • 获取一行记录结果:GetRow()、GetRows()和GetRowsWithColumns()
  • 关闭一个Scanner:ScannerClose()
  • 迭代结果,用于调试:IterateResults()




比如,我们想要实现分页的逻辑,可能和传统的关系型数据库操作有些不同。基于HBase表的实现是,首先打开一个Scanner实例(例如调用ScannerOpen()),返回一个id,然后再使用该id,调用ScannerGetList()方法(可以指定每次返回几条记录的变量nbRows的值),返回一个记录列表,反复调用该ScannerGetList()方法,直到此次没有结果返回为止。后面会通过测试用例来实际体会。


现在,我们基于上抽象出来的客户端操作接口,给出一个基本的实现,代码如下所示:
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using System.Threading.Tasks;
  6. namespace HbaseThrift.HBase.Thrift
  7. {
  8.     class HBaseThriftClient : AbstractHBaseThriftService
  9.     {
  10.         public HBaseThriftClient() : this("localhost", 9090)
  11.         {
  12.         }
  13.         public HBaseThriftClient(string host, int port) : base(host, port)
  14.         {
  15.             
  16.         }
  17.         public override List<string> GetTables()
  18.         {
  19.             List<byte[]> tables = client.getTableNames();
  20.             List<String> list = new List<String>();
  21.             foreach(byte[] table in tables)
  22.             {
  23.                 list.Add(Decode(table));
  24.             }
  25.             return list;
  26.         }
  27.         public override void Update(string table, string rowKey, bool writeToWal, string fieldName, string fieldValue, Dictionary<string, string> attributes)
  28.         {
  29.             byte[] tableName = Encode(table);
  30.             byte[] row = Encode(rowKey);
  31.             Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
  32.             List<Mutation> mutations = new List<Mutation>();
  33.             Mutation mutation = new Mutation();
  34.             mutation.IsDelete = false;
  35.             mutation.WriteToWAL = writeToWal;
  36.             mutation.Column = Encode(fieldName);
  37.             mutation.Value = Encode(fieldValue);
  38.             mutations.Add(mutation);
  39.             client.mutateRow(tableName, row, mutations, encodedAttributes);
  40.         }
  41.         public override void Update(string table, string rowKey, bool writeToWal, Dictionary<string, string> fieldNameValues, Dictionary<string, string> attributes)
  42.         {
  43.             byte[] tableName = Encode(table);
  44.             byte[] row = Encode(rowKey);
  45.             Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
  46.             List<Mutation> mutations = new List<Mutation>();
  47.             foreach (KeyValuePair<String, String> pair in fieldNameValues)
  48.             {
  49.                 Mutation mutation = new Mutation();
  50.                 mutation.IsDelete = false;
  51.                 mutation.WriteToWAL = writeToWal;
  52.                 mutation.Column = Encode(pair.Key);
  53.                 mutation.Value = Encode(pair.Value);
  54.                 mutations.Add(mutation);
  55.             }
  56.             client.mutateRow(tableName, row, mutations, encodedAttributes);
  57.         }
  58.         public override void DeleteCell(string table, string rowKey, bool writeToWal, string column, Dictionary<string, string> attributes)
  59.         {
  60.             byte[] tableName = Encode(table);
  61.             byte[] row = Encode(rowKey);
  62.             Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
  63.             List<Mutation> mutations = new List<Mutation>();
  64.             Mutation mutation = new Mutation();
  65.             mutation.IsDelete = true;
  66.             mutation.WriteToWAL = writeToWal;
  67.             mutation.Column = Encode(column);
  68.             mutations.Add(mutation);
  69.             client.mutateRow(tableName, row, mutations, encodedAttributes);
  70.         }
  71.         public override void DeleteCells(string table, string rowKey, bool writeToWal, List<string> columns, Dictionary<string, string> attributes)
  72.         {
  73.             byte[] tableName = Encode(table);
  74.             byte[] row = Encode(rowKey);
  75.             Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
  76.             List<Mutation> mutations = new List<Mutation>();
  77.             foreach (string column in columns)
  78.             {
  79.                 Mutation mutation = new Mutation();
  80.                 mutation.IsDelete = true;
  81.                 mutation.WriteToWAL = writeToWal;
  82.                 mutation.Column = Encode(column);
  83.                 mutations.Add(mutation);
  84.             }
  85.             client.mutateRow(tableName, row, mutations, encodedAttributes);
  86.         }
  87.         public override void DeleteRow(string table, string rowKey, Dictionary<string, string> attributes)
  88.         {
  89.             byte[] tableName = Encode(table);
  90.             byte[] row = Encode(rowKey);
  91.             Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
  92.             client.deleteAllRow(tableName, row, encodedAttributes);
  93.         }
  94.         public override int ScannerOpen(string table, string startRow, List<string> columns, Dictionary<string, string> attributes)
  95.         {
  96.             byte[] tableName = Encode(table);
  97.             byte[] start = Encode(startRow);
  98.             List<byte[]> encodedColumns = EncodeStringList(columns);
  99.             Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
  100.             return client.scannerOpen(tableName, start, encodedColumns, encodedAttributes);
  101.         }
  102.         public override int ScannerOpen(string table, string startRow, string stopRow, List<string> columns, Dictionary<string, string> attributes)
  103.         {
  104.             byte[] tableName = Encode(table);
  105.             byte[] start = Encode(startRow);
  106.             byte[] stop = Encode(stopRow);
  107.             List<byte[]> encodedColumns = EncodeStringList(columns);
  108.             Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
  109.             return client.scannerOpenWithStop(tableName, start, stop, encodedColumns, encodedAttributes);
  110.         }
  111.         public override int ScannerOpenWithPrefix(string table, string startAndPrefix, List<string> columns, Dictionary<string, string> attributes)
  112.         {
  113.             byte[] tableName = Encode(table);
  114.             byte[] prefix = Encode(startAndPrefix);
  115.             List<byte[]> encodedColumns = EncodeStringList(columns);
  116.             Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
  117.             return client.scannerOpenWithPrefix(tableName, prefix, encodedColumns, encodedAttributes);
  118.         }
  119.         public override int ScannerOpenTs(string table, string startRow, List<string> columns, long timestamp, Dictionary<string, string> attributes)
  120.         {
  121.             byte[] tableName = Encode(table);
  122.             byte[] start = Encode(startRow);
  123.             List<byte[]> encodedColumns = EncodeStringList(columns);
  124.             Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
  125.             return client.scannerOpenTs(tableName, start, encodedColumns, timestamp, encodedAttributes);
  126.         }
  127.         public override int ScannerOpenTs(string table, string startRow, string stopRow, List<string> columns, long timestamp, Dictionary<string, string> attributes)
  128.         {
  129.             byte[] tableName = Encode(table);
  130.             byte[] start = Encode(startRow);
  131.             byte[] stop = Encode(stopRow);
  132.             List<byte[]> encodedColumns = EncodeStringList(columns);
  133.             Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
  134.             return client.scannerOpenWithStopTs(tableName, start, stop, encodedColumns, timestamp, encodedAttributes);
  135.         }
  136.         public override List<TRowResult> ScannerGetList(int id, int nbRows)
  137.         {
  138.             return client.scannerGetList(id, nbRows);
  139.         }
  140.         public override List<TRowResult> ScannerGet(int id)
  141.         {
  142.             return client.scannerGet(id);
  143.         }
  144.         public override List<TRowResult> GetRow(string table, string row, Dictionary<string, string> attributes)
  145.         {
  146.             byte[] tableName = Encode(table);
  147.             byte[] startRow = Encode(row);
  148.             Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
  149.             return client.getRow(tableName, startRow, encodedAttributes);
  150.         }
  151.         public override List<TRowResult> GetRows(string table, List<string> rows, Dictionary<string, string> attributes)
  152.         {
  153.             byte[] tableName = Encode(table);
  154.             List<byte[]> encodedRows = EncodeStringList(rows);
  155.             Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
  156.             return client.getRows(tableName, encodedRows, encodedAttributes);
  157.         }
  158.         public override List<TRowResult> GetRowsWithColumns(string table, List<string> rows, List<string> columns, Dictionary<string, string> attributes)
  159.         {
  160.             byte[] tableName = Encode(table);
  161.             List<byte[]> encodedRows = EncodeStringList(rows);
  162.             List<byte[]> encodedColumns = EncodeStringList(columns);
  163.             Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
  164.             return client.getRowsWithColumns(tableName, encodedRows, encodedColumns, encodedAttributes);
  165.         }
  166.         public override void ScannerClose(int id)
  167.         {
  168.             client.scannerClose(id);
  169.         }
  170.         public override void IterateResults(TRowResult result)
  171.         {
  172.             foreach (KeyValuePair<byte[], TCell> pair in result.Columns)
  173.             {
  174.                 Console.WriteLine("\tCol=" + Decode(pair.Key) + ", Value=" + Decode(pair.Value.Value));
  175.             }
  176.         }
  177.         private String Decode(byte[] bs)
  178.         {
  179.             return UTF8Encoding.Default.GetString(bs);
  180.         }
  181.         private byte[] Encode(String str)
  182.         {
  183.             return UTF8Encoding.Default.GetBytes(str);
  184.         }
  185.         private Dictionary<byte[], byte[]> EncodeAttributes(Dictionary<String, String> attributes)
  186.         {
  187.             Dictionary<byte[], byte[]> encodedAttributes = new Dictionary<byte[], byte[]>();
  188.             foreach (KeyValuePair<String, String> pair in attributes)
  189.             {
  190.                 encodedAttributes.Add(Encode(pair.Key), Encode(pair.Value));
  191.             }
  192.             return encodedAttributes;
  193.         }
  194.         private List<byte[]> EncodeStringList(List<String> strings)
  195.         {
  196.             List<byte[]> list = new List<byte[]>();
  197.             if (strings != null)
  198.             {
  199.                 foreach (String str in strings)
  200.                 {
  201.                     list.Add(Encode(str));
  202.                 }
  203.             }
  204.             return list;
  205.         }
  206.     }
  207. }
复制代码
上面代码,给出了基本的实现,接着我们给出测试用例,调用我们实现的客户端操作,与HBase表进行交互。实现的测试用例类如下所示:
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using System.Threading.Tasks;
  6. namespace HbaseThrift.HBase.Thrift
  7. {
  8.     class Test
  9.     {
  10.         private readonly AbstractHBaseThriftService client;
  11.         public Test(String host, int port)
  12.         {
  13.             client = new HBaseThriftClient(host, port);
  14.         }
  15.         public Test() : this("master", 9090)
  16.         {
  17.             
  18.         }
  19.         static String RandomlyBirthday()
  20.         {
  21.             Random r = new Random();
  22.             int year = 1900 + r.Next(100);
  23.             int month = 1 + r.Next(12);
  24.             int date = 1 + r.Next(30);
  25.             return year + "-" + month.ToString().PadLeft(2, '0') + "-" + date.ToString().PadLeft(2, '0');
  26.         }
  27.         static String RandomlyGender()
  28.         {
  29.             Random r = new Random();
  30.             int flag = r.Next(2);
  31.             return flag == 0 ? "M" : "F";
  32.         }
  33.         static String RandomlyUserType()
  34.         {
  35.             Random r = new Random();
  36.             int flag = 1 + r.Next(10);
  37.             return flag.ToString();
  38.         }
  39.         public void Close()
  40.         {
  41.             client.Close();
  42.         }
  43.         public void CaseForUpdate() {
  44.                     bool writeToWal = false;
  45.             Dictionary<String, String> attributes = new Dictionary<String, String>(0);
  46.                     string table = SetTable();
  47.                     // put kv pairs
  48.                     for (int i = 0; i < 10000000; i++) {
  49.                 string rowKey = i.ToString().PadLeft(4, '0');
  50.                 Dictionary<String, String> fieldNameValues = new Dictionary<String, String>();
  51.                             fieldNameValues.Add("info:birthday", RandomlyBirthday());
  52.                             fieldNameValues.Add("info:user_type", RandomlyUserType());
  53.                             fieldNameValues.Add("info:gender", RandomlyGender());
  54.                             client.Update(table, rowKey, writeToWal, fieldNameValues, attributes);
  55.                     }
  56.             }
  57.         public void CaseForDeleteCells() {
  58.                     bool writeToWal = false;
  59.             Dictionary<String, String> attributes = new Dictionary<String, String>(0);
  60.                     String table = SetTable();
  61.                     // put kv pairs
  62.                     for (long i = 5; i < 10; i++) {
  63.                             String rowKey = i.ToString().PadLeft(4, '0');
  64.                             List<String> columns = new List<String>(0);
  65.                             columns.Add("info:birthday");
  66.                             client.DeleteCells(table, rowKey, writeToWal, columns, attributes);
  67.                     }
  68.             }
  69.         public void CaseForDeleteRow() {
  70.                     Dictionary<String, String> attributes = new Dictionary<String, String>(0);
  71.                     String table = SetTable();
  72.                     // delete rows
  73.                     for (long i = 5; i < 10; i++) {
  74.                             String rowKey = i.ToString().PadLeft(4, '0');
  75.                             client.DeleteRow(table, rowKey, attributes);
  76.                     }
  77.             }
  78.         
  79.             public void CaseForScan() {
  80.                     Dictionary<String, String> attributes = new Dictionary<String, String>(0);
  81.                     String table = SetTable();
  82.                     String startRow = "0005";
  83.                     String stopRow = "0015";
  84.                     List<String> columns = new List<String>(0);
  85.                     columns.Add("info:birthday");
  86.                     int id = client.ScannerOpen(table, startRow, stopRow, columns, attributes);
  87.                     int nbRows = 2;
  88.                     List<TRowResult> results = client.ScannerGetList(id, nbRows);
  89.                     while(results != null) {
  90.                             foreach(TRowResult result in results) {
  91.                                     client.IterateResults(result);
  92.                             }
  93.                             results = client.ScannerGetList(id, nbRows);
  94.                     }
  95.                     client.ScannerClose(id);
  96.             }
  97.         
  98.             public void CaseForGet() {
  99.                     Dictionary<String, String> attributes = new Dictionary<String, String>(0);
  100.                     String table = SetTable();
  101.                     List<String> rows = new List<String>(0);
  102.                     rows.Add("0009");
  103.                     rows.Add("0098");
  104.                     rows.Add("0999");
  105.                     List<String> columns = new List<String>(0);
  106.                     columns.Add("info:birthday");
  107.                     columns.Add("info:gender");
  108.                     List<TRowResult> results = client.GetRowsWithColumns(table, rows, columns, attributes);
  109.                     foreach(TRowResult result in results) {
  110.                             client.IterateResults(result);
  111.                     }
  112.             }
  113.         private string SetTable()
  114.         {
  115.             string table = "test_info";
  116.             return table;
  117.         }
  118.         static void Main(string[] args)
  119.         {
  120.             Test test = new Test();
  121.             //test.CaseForUpdate(); // insert or update rows/cells
  122.             //test.CaseForDeleteCells(); // delete cells
  123.             //test.CaseForDeleteRow(); // delete rows
  124.             test.CaseForScan(); // scan rows
  125.             //test.CaseForGet(); // get rows
  126.             test.Close();
  127.         }
  128.     }
  129. }
复制代码
上面的测试可以实现操作Hbase表数据。另外,在生成的Thrift客户端代码中,Iface中给出了全部的服务接口,可以根据需要来选择,客户端Client实现了与Thrift交互的一些逻辑的处理,通过该类对象可以代理HBase提供的Thrift服务。

上一篇:
Thrift了解3:Thrift使用实例--生成各种语言指导
回复

使用道具 举报

xuliang123789 发表于 2016-3-24 10:11:27
谢谢楼主,学习一下,赞~~
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条