大容量导入¶
大容量导入接口是神通数据库扩展的一套快速加载接口,和直接路径加载类似,大容量导入的设计更加简易。同时,直接路径加载在ACI的实现层也是基于大容量导入的方法进行实现的。大容量导入的设计采用异步模式,应用程序将数据持续的放到加载缓存中,如果缓存中的数据超过了设定的提交行数或缓存大小,会自动的将缓存中的数据提交到数据库表中。
大容量导入无需引入专属头文件,只要引入了aci.h即可使用,同时需要分配一个专属的导入句柄ACIHIMP* ,句柄类型为ACI_HTYPE_BULKIMP, ACIHIMP的父句柄为环境句柄,因此环境句柄释放时,导入句柄也会释放。
句柄分配用方式如下:
r = ACIHandleAlloc (env,(void **)&imp,ACI_HTYPE_BULKIMP,0,0);
句柄释放方式如下:
r = ACIHandleFree(imp,ACI_HTYPE_BULKIMP);
BULK句柄相关属性¶
- ACI_ATTR_BUF_SIZE:
缓冲区的默认大小。默认情况下,缓冲区为10m。
- ACI_ATTR_NUM_ROWS:
执行 ACIBulkExec 后,获得导入句柄本次成功导入的数据行数。
int total_count=0;//总行数
r = ACIAttrGet(imp, ACI_HTYPE_BULKIMP, &total_count,
0, ACI_ATTR_NUM_ROWS, err);
相关函数和属性介绍¶
| 接口名称 | 作用 |
|---|---|
ACIBulkInit |
初始化一个导入句柄,需要传入表名 |
ACIBulkSetColumn |
将一列数据放入缓存,同时需要明确数据类型 |
ACIBulkEndRow |
结束一行数据,做一个结束标记 |
ACIBulkExec |
用户需要使用 ACIBulkExec 接口向后台发送数据。提交后,Bulk句柄内内容将会重置,如果需要继续使用大容量数据导入功能,需要对句柄进行初始化工作 |
导入流程详解¶
- 初始化Bulk句柄
想要使用大容量数据导入功能,需要在申请ACI_HTYPE_BULKIMP类型句柄后提供Svc句柄、表的名称等数据使用 ACIBulkInit 接口对其进行初始化:
r = ACIBulkInit(svc,imp,NULL,"bulkTab",NULL,err);
bulkTab为表名。
在插入的数据的顺序与表中的顺序不一致 或者 并非导入数据库中所有列时,需要提供符合格式的列描述语句,如“B text,A text”
//设置列顺序
char * collist = "B text,A text"
r = ACIBulkInit(svc,imp,NULL,"bulkTab", collist,err);
//设置部分列:数据库中有A、B、C三列,只想导入A、B两列,如下设置也可以
char * collist = "B text,A text"
r = ACIBulkInit(svc,imp,NULL,"bulkTab", collist,err);
- 添加数据
在使用 ACIBulkSetColumn 接口添加数据时,用户需要提供数据的类型,数据指针及数据的长度。对于数据类型,目前暂时只支持字符串SQLT_CHR和二进制SQLT_LBI类型,不支持的类型都可以转换成字符串,然后用SQLT_CHR类型进行数据绑定。需要注意的是,当数据长度为-1时代表该数据为空,当数据长度为其他非法值时,插入的是该属性的默认值。
添加完一列数据时,需要使用 ACIBulkEndRow 接口标识行的结束。在添加数据过程中,如果缓冲区的剩余空间不足,工具将使用新的线程将缓冲区内数据提交到数据库服务器端,并创建新的缓冲区存放新的数据,这个过程是透明的,开发者感知不到。
- 提交数据
添加完所有数据后,用户需要使用 ACIBulkExec 接口向后台发送数据。提交后,Bulk句柄内内容将会重置,如果需要继续使用大容量数据导入功能,需要对句柄进行初始化工作(从步骤1开始重新进行)。
大容量导入示例¶
表定义:bulkTab(id int,col1 varchar2(10))
#include "aci.h"
#define RECORD 100 /*大容量导入的记录数*/
ACIEnv* env = NULL;
ACIError* err = NULL;
ACIServer* srv = NULL;
ACISvcCtx* svc = NULL;
ACISession* ses = NULL;
ACIStmt* stmt = NULL;
ACIBind* pBind[10];
ACIDefine* pDef[10];
char *dblink = "localhost:2003/osrdb";
char *dbuser = "SYSDBA";
char *dbpwd = "szoscar55";
void bulkinsert;
void checkerr(ACIError* err, sword status)
{ … … }
int main(int args, char* argv[])
{
sword r = 0;
//初始化环境句柄
checkerr(err, ACIInitialize(ACI_DEFAULT, NULL, NULL, NULL, NULL));
checkerr(err, ACIEnvInit(&env, ACI_DEFAULT, 0, 0));
//分配并初始化各类应用句柄
checkerr(err, ACIHandleAlloc(env, (void**)&err, ACI_HTYPE_ERROR, 0, 0));
checkerr(err, ACIHandleAlloc(env, (void**)&srv, ACI_HTYPE_SERVER, 0, 0));
checkerr(err, ACIHandleAlloc(env, (void**)&svc, ACI_HTYPE_SVCCTX, 0, 0));
checkerr(err, ACIHandleAlloc(env, (void**)&ses, ACI_HTYPE_SESSION, 0, 0));
//连接数据库
checkerr(err, ACIServerAttach(srv, err, (const OraText *)dblink, (dblink != NULL ? (sb4)strlen(dblink):0), ACI_DEFAULT));
checkerr(err, ACIAttrSet(svc, ACI_HTYPE_SVCCTX, srv, sizeof(srv),ACI_ATTR_SERVER, err));
checkerr(err, ACIAttrSet(ses, ACI_HTYPE_SESSION, dbuser, (ub4)strlen(dbuser),ACI_ATTR_USERNAME, err));
checkerr(err, ACIAttrSet(ses, ACI_HTYPE_SESSION, dbpwd, (ub4)strlen(dbpwd),ACI_ATTR_PASSWORD, err));
checkerr(err, ACISessionBegin(svc, err, ses, ACI_CRED_RDBMS, ACI_DEFAULT));
checkerr(err, ACIAttrSet(svc, ACI_HTYPE_SVCCTX, ses, sizeof(ses),ACI_ATTR_SESSION, err));
//分配语句句柄
checkerr(err, ACIHandleAlloc(env, (void**)&stmt, ACI_HTYPE_STMT, 0, 0));
//大容量导入
bulkinsert;
return 0;
}
void bulkinsert
{
/*bulk insert*/
int sdwRowsPerBatch=20; //每批中数据的行数
char col1arr[RECORD][32];
int bufferSize = 1024 * 1024;
int total_count=0;//总行数
char str_i[32];
int fetchrowids=0;
ACIHIMP *imp;
ub4 i = 0;
sword r = 0;
r = ACIHandleAlloc(env,(void **)&imp,ACI_HTYPE_BULKIMP,0,0);
for(i=0;i<RECORD;i++)
{
sprintf(col1arr[i],"col1=%d",i);
}
checkerr(err, ACIAttrSet(imp, ACI_HTYPE_BULKIMP, &bufferSize,
sizeof(bufferSize), ACI_ATTR_BUF_SIZE, err));
checkerr(err, ACIAttrSet(imp, ACI_HTYPE_BULKIMP, &fetchrowids,
sizeof(fetchrowids), ACI_ATTR_FETCH_ROWIDS, err));
checkerr(err, ACIAttrSet(imp, ACI_HTYPE_BULKIMP, &sdwRowsPerBatch,
sizeof(sdwRowsPerBatch), ACI_ATTR_NUM_ROWS, err));
checkerr(err, ACIBulkInit(svc,imp,NULL,"bulkTab",NULL,err));
for(i=0;i<RECORD;i++)
{
itoa(i,str_i,10);
checkerr(err, ACIBulkSetColumn(imp,SQLT_CHR,(void*)str_i, (sb2)sizeof(str_i),err));
checkerr(err, ACIBulkSetColumn(imp,SQLT_CHR,(void *)col1arr[i],
(sb2)sizeof(col1arr[i]),err));
checkerr(err, ACIBulkEndRow(imp,err)); //结束一行的插入
}
checkerr(err, ACIBulkExec(imp,err));
checkerr(err, ACIAttrGet(imp, ACI_HTYPE_BULKIMP, &total_count,
0, ACI_ATTR_NUM_ROWS, err));
printf("导入的记录条数为:%d\\n", total_count);
}