C++访问操作读写HbaseWord下载.docx
《C++访问操作读写HbaseWord下载.docx》由会员分享,可在线阅读,更多相关《C++访问操作读写HbaseWord下载.docx(15页珍藏版)》请在冰点文库上搜索。
4
chmod+xconfigure
./configure--with-boost=/usr/local--prefix=/usr/local/thrift
至此,安装Thrift的工作就完成了。
(2)用Thrift生成访问Hbase所需的C++文件
访问Hbase需要在你的程序中使用若干.h,.cpp文件,这些文件是用Thrift生成的。
解压Hbase源码安装包:
tarzxfhbase-0.90.4.tar.gz
cdhbase-0.90.4
在解压出来的文件中,你可以找到一个名为Hbase.thrift的文件,这个文件定义了如何通过Thrift接口来访问Hbase。
用这个Thrift文件,可以生成访问Hbase所需的C++文件:
[shel]
/usr/local/thrift/bin/thrift–gencpp./src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift
[/shell]
会发现生成了gen-cpp目录:
lsgen-cpp/
Hbase_constants.cpp
Hbase_constants.h
Hbase.cpp
Hbase.h
Hbase_server.skeleton.cpp
Hbase_types.cpp
Hbase_types.h
除了Hbase_server.skeleton.cpp之外,其余文件都是在我们的程序里要用到的,将它们拷贝到我们的工程目录下。
(3)在程序中使用Thrift来访问Hbase
要能通过Thrift访问Hbase,你必须首先要打开HBase的Thrift服务,请参考其他文档确保这一点是可用的。
下一步,我们在程序中如何读取Hbase的数据?
我们先看看hbase源码安装包中自带的例子:
在解压出来的安装包中的examples/thrift/目录下的DemoClient.cpp文件,有如下代码:
5
6
7
8
9
10
11
12
13
boost:
:
shared_ptr<
TTransport>
socket(newTSocket("
localhost"
9090));
transport(newTBufferedTransport(socket));
TProtocol>
protocol(newTBinaryProtocol(transport));
HbaseClientclient(protocol);
try{
transport->
open();
//dosomething
close();
}catch(TException&
tx){
printf("
ERROR:
%s\n"
tx.what());
}
我们就仿照这个例子来做。
从DemoClient.cpp可见,我们要先创建三个指针socket,transport和protocol,后两个分别依赖于前两个,最后再创建一个client对象,我们操作Hbase就是使用这个client对象。
在操作Hbase前,需要先打开到HbaseThriftservice的连接,即transport->
open(),在操作完Hbase之后,需要关闭连接,即transport->
close(),这下就比较清楚了:
我们可以写一个自己的类CHbaseOperate,它应该有一个open函数和一个close函数,分别用于打开、关闭连接,还应该有读写Hbase的基本功能。
读写Hbase的方法,请参考Hbase.h中的函数,例子还是看DemoClient.cpp。
下面上代码:
HbaseOperate.h:
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#ifndef__HBASE_OPERATE_H
#define__HBASE_OPERATE_H
#include<
string>
protocol/TBinaryProtocol.h>
transport/TSocket.h>
transport/TTransportUtils.h>
#include"
Hbase.h"
/**
*ClasstooperateHbase.
*
*@authorDarranZhang()
*@version11-08-24
*@declarationThesecodesareonlyfornon-commercialuse,andaredistributedonan"
ASIS"
basis,WITHOUTWARRANTYOFANYKIND,eitherexpressorimplied.
*Youmustnotremovethisdeclarationatanytime.
*/
usingnamespaceapache:
thrift;
thrift:
protocol;
transport;
hadoop:
hbase:
typedefstructhbaseRet{
std:
stringrowValue;
time_tts;
hbaseRet(){
ts=0;
}hbaseRet;
classCHbaseOperate
{
public:
CHbaseOperate();
virtual~CHbaseOperate();
private:
socket;
transport;
protocol;
HbaseClient*client;
string
hbaseServiceHost;
int
hbaseServicePort;
bool
isConnected;
connect();
connect(std:
stringhost,intport);
disconnect();
putRow(conststd:
string&
tableName,
conststd:
rowKey,
column,
rowValue);
getRow(hbaseRet&
result,
columnName);
};
#endif
HbaseOperate.cpp:
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
HbaseOperate.h"
log4cxx/log4cxx.h"
log4cxx/propertyconfigurator.h"
staticlog4cxx:
LoggerPtrlogger(log4cxx:
Logger:
getLogger("
HbaseOperate.cpp"
));
usingnamespacestd;
CHbaseOperate:
CHbaseOperate():
socket((TSocket*)NULL),transport((TBufferedTransport*)NULL),protocol((TBinaryProtocol*)NULL),client(NULL),hbaseServicePort(9090),isConnected(false)
~CHbaseOperate()
if(isConnected){
disconnect();
if(NULL!
=client){
deleteclient;
client=NULL;
*ConnectHbase.
boolCHbaseOperate:
connect()
LOG4CXX_INFO(logger,"
Alreadyconnected,don'
tneedtoconnectitagain"
);
returntrue;
socket.reset(newTSocket(hbaseServiceHost,hbaseServicePort));
transport.reset(newTBufferedTransport(socket));
protocol.reset(newTBinaryProtocol(transport));
client=newHbaseClient(protocol);
}catch(constTException&
LOG4CXX_ERROR(logger,"
ConnectHbaseerror:
"
<
<
tx.what());
returnfalse;
isConnected=true;
returnisConnected;
connect(std:
stringhost,intport)
hbaseServiceHost=host;
hbaseServicePort=port;
returnconnect();
*DisconnectfromHbase.
disconnect()
if(!
isConnected){
Haven'
tconnectedtoHbaseyet,can'
tdisconnectfromit"
=transport){
DisconnectHbaseerror:
}else{
isConnected=false;
*PutarowtoHbase.
*@paramtableName
[IN]Thetablename.
*@paramrowKey
[IN]Therowkey.
*@paramcolumn
[IN]The"
columnfamily:
qualifier"
.
*@paramrowValue
[IN]Therowvalue.
*@returnTrueforsuccessfullyputtherow,falseotherwise.
putRow(conststring&
tableName,conststring&
rowKey,conststring&
column,conststring&
rowValue)
tputrow"
vector<
Mutation>
mutations;
mutations.push_back(Mutation());
mutations.back().column=column;
mutations.back().value=rowValue;
client->
mutateRow(tableName,rowKey,mutations);
OperateHbaseerror:
*GetaHbaserow.
*@paramresult
[OUT]Theobjectwhichcontainsthereturneddata.
[IN]TheHbasetablename,e.g."
MyTable"
[IN]TheHbaserowkey,e.g."
kdr23790"
*@paramcolumnName
*@returnTrueforsuccessfullygettherow,falseotherwise.
getRow(hbaseRet&
result,conststd:
tableName,conststd:
rowKey,conststd:
columnName)
treaddatafromit"
columnNames;
columnNames.push_back(columnName);
TRowResult>
rowResult;
getRowWithColumns(rowResult,tableName,rowKey,columnNames);
if(0==rowResult.size()){
LOG4CXX_WARN(logger,"
Gotnorecordwiththekey:
["
rowKey<
]"
map<
string,TCell>
const_iteratorit=rowResult[rowResult.size()-1].columns.begin();
result.rowValue=it->
second.value;
result.ts=it->
second.timestamp;
注意我在程序中使用了Apachelog4cxx这个记录日志的库来打印/保存程序运行日志,使用方法可参考【此链接】。
如果你不想用,可以自己改为std: