大容量导入

大容量导入接口是神通数据库扩展的一套快速加载接口,和直接路径加载类似,大容量导入的设计更加简易。同时,直接路径加载在ACI的实现层也是基于大容量导入的方法进行实现的。大容量导入的设计采用异步模式,应用程序将数据持续的放到加载缓存中,如果缓存中的数据超过了设定的提交行数或缓存大小,会自动的将缓存中的数据提交到数据库表中。

大容量导入无需引入专属头文件,只要引入了aci.h即可使用,同时需要分配一个专属的导入句柄ACIHIMP* ,句柄类型为ACI_HTYPE_BULKIMP, ACIHIMP的父句柄为环境句柄,因此环境句柄释放时,导入句柄也会释放。

句柄分配用方式如下:

r = ACIHandleAlloc (env,(void **)&imp,ACI_HTYPE_BULKIMP,0,0);

句柄释放方式如下:

r = ACIHandleFree(imp,ACI_HTYPE_BULKIMP);

支持的数据类型

目前支持以下数据类型,对于不支持的类型,可以转换成字符串方式进行导入:

  • SQLT_CHR : 字符串
  • SQLT_LBI : 二进制

BULK句柄相关属性

  • ACI_ATTR_BUF_SIZE:

缓冲区的默认大小。默认情况下,缓冲区为10m。

  • ACI_ATTR_NUM_ROWS:

执行 ACIBulkExec 后,获得导入句柄本次成功导入的数据行数。

int total_count=0;//总行数
r = ACIAttrGet(imp, ACI_HTYPE_BULKIMP, &total_count,
      0, ACI_ATTR_NUM_ROWS, err);

相关函数和属性介绍

接口名称 作用
ACIBulkInit 初始化一个导入句柄,需要传入表名
ACIBulkSetColumn 将一列数据放入缓存,同时需要明确数据类型
ACIBulkEndRow 结束一行数据,做一个结束标记
ACIBulkExec 用户需要使用 ACIBulkExec 接口向后台发送数据。提交后,Bulk句柄内内容将会重置,如果需要继续使用大容量数据导入功能,需要对句柄进行初始化工作

导入流程详解

  1. 初始化Bulk句柄

想要使用大容量数据导入功能,需要在申请ACI_HTYPE_BULKIMP类型句柄后提供Svc句柄、表的名称等数据使用 ACIBulkInit 接口对其进行初始化:

r = ACIBulkInit(svc,imp,NULL,"bulkTab",NULL,err);

bulkTab为表名。

在插入的数据的顺序与表中的顺序不一致 或者 并非导入数据库中所有列时,需要提供符合格式的列描述语句,如“B text,A text”

//设置列顺序
char * collist = "B text,A text"
r = ACIBulkInit(svc,imp,NULL,"bulkTab", collist,err);

//设置部分列:数据库中有A、B、C三列,只想导入A、B两列,如下设置也可以
char * collist = "B text,A text"
r = ACIBulkInit(svc,imp,NULL,"bulkTab", collist,err);
  1. 添加数据

在使用 ACIBulkSetColumn 接口添加数据时,用户需要提供数据的类型,数据指针及数据的长度。对于数据类型,目前暂时只支持字符串SQLT_CHR和二进制SQLT_LBI类型,不支持的类型都可以转换成字符串,然后用SQLT_CHR类型进行数据绑定。需要注意的是,当数据长度为-1时代表该数据为空,当数据长度为其他非法值时,插入的是该属性的默认值。

添加完一列数据时,需要使用 ACIBulkEndRow 接口标识行的结束。在添加数据过程中,如果缓冲区的剩余空间不足,工具将使用新的线程将缓冲区内数据提交到数据库服务器端,并创建新的缓冲区存放新的数据,这个过程是透明的,开发者感知不到。

  1. 提交数据

添加完所有数据后,用户需要使用 ACIBulkExec 接口向后台发送数据。提交后,Bulk句柄内内容将会重置,如果需要继续使用大容量数据导入功能,需要对句柄进行初始化工作(从步骤1开始重新进行)。

大容量导入示例

表定义:bulkTab(id int,col1 varchar2(10))

#include "aci.h"
#define RECORD 100 /*大容量导入的记录数*/

ACIEnv* env = NULL;
ACIError* err = NULL;
ACIServer* srv = NULL;
ACISvcCtx* svc = NULL;
ACISession* ses = NULL;
ACIStmt* stmt = NULL;
ACIBind* pBind[10];
ACIDefine* pDef[10];
char *dblink = "localhost:2003/osrdb";
char *dbuser = "SYSDBA";
char *dbpwd = "szoscar55";
void bulkinsert;
void checkerr(ACIError* err, sword status)
{ … … }

int main(int args, char* argv[])
{
    sword r = 0;
    //初始化环境句柄
    checkerr(err, ACIInitialize(ACI_DEFAULT, NULL, NULL, NULL, NULL));
    checkerr(err, ACIEnvInit(&env, ACI_DEFAULT, 0, 0));
    //分配并初始化各类应用句柄
checkerr(err, ACIHandleAlloc(env, (void**)&err, ACI_HTYPE_ERROR, 0, 0));
checkerr(err, ACIHandleAlloc(env, (void**)&srv, ACI_HTYPE_SERVER, 0, 0));
    checkerr(err, ACIHandleAlloc(env, (void**)&svc, ACI_HTYPE_SVCCTX, 0, 0));
    checkerr(err, ACIHandleAlloc(env, (void**)&ses, ACI_HTYPE_SESSION, 0, 0));
    //连接数据库
    checkerr(err, ACIServerAttach(srv, err, (const OraText *)dblink, (dblink != NULL ? (sb4)strlen(dblink):0), ACI_DEFAULT));
    checkerr(err, ACIAttrSet(svc, ACI_HTYPE_SVCCTX, srv, sizeof(srv),ACI_ATTR_SERVER, err));
    checkerr(err, ACIAttrSet(ses, ACI_HTYPE_SESSION, dbuser, (ub4)strlen(dbuser),ACI_ATTR_USERNAME, err));
    checkerr(err, ACIAttrSet(ses, ACI_HTYPE_SESSION, dbpwd, (ub4)strlen(dbpwd),ACI_ATTR_PASSWORD, err));
    checkerr(err, ACISessionBegin(svc, err, ses, ACI_CRED_RDBMS, ACI_DEFAULT));
    checkerr(err, ACIAttrSet(svc, ACI_HTYPE_SVCCTX, ses, sizeof(ses),ACI_ATTR_SESSION, err));
    //分配语句句柄
    checkerr(err, ACIHandleAlloc(env, (void**)&stmt, ACI_HTYPE_STMT, 0, 0));
    //大容量导入
    bulkinsert;
    return 0;
}
void bulkinsert
{
    /*bulk insert*/
    int sdwRowsPerBatch=20; //每批中数据的行数
    char col1arr[RECORD][32];
    int bufferSize = 1024 * 1024;
    int total_count=0;//总行数
    char str_i[32];
    int fetchrowids=0;
    ACIHIMP *imp;
    ub4 i = 0;
    sword r = 0;
    r = ACIHandleAlloc(env,(void **)&imp,ACI_HTYPE_BULKIMP,0,0);
    for(i=0;i<RECORD;i++)
    {
        sprintf(col1arr[i],"col1=%d",i);
    }
    checkerr(err, ACIAttrSet(imp, ACI_HTYPE_BULKIMP, &bufferSize,
        sizeof(bufferSize), ACI_ATTR_BUF_SIZE, err));
    checkerr(err, ACIAttrSet(imp, ACI_HTYPE_BULKIMP, &fetchrowids,
        sizeof(fetchrowids), ACI_ATTR_FETCH_ROWIDS, err));
    checkerr(err, ACIAttrSet(imp, ACI_HTYPE_BULKIMP, &sdwRowsPerBatch,
        sizeof(sdwRowsPerBatch), ACI_ATTR_NUM_ROWS, err));
    checkerr(err, ACIBulkInit(svc,imp,NULL,"bulkTab",NULL,err));
    for(i=0;i<RECORD;i++)
    {
        itoa(i,str_i,10);
        checkerr(err, ACIBulkSetColumn(imp,SQLT_CHR,(void*)str_i, (sb2)sizeof(str_i),err));
        checkerr(err, ACIBulkSetColumn(imp,SQLT_CHR,(void *)col1arr[i],
            (sb2)sizeof(col1arr[i]),err));
        checkerr(err, ACIBulkEndRow(imp,err)); //结束一行的插入
    }
    checkerr(err, ACIBulkExec(imp,err));
    checkerr(err, ACIAttrGet(imp, ACI_HTYPE_BULKIMP, &total_count,
        0, ACI_ATTR_NUM_ROWS, err));
    printf("导入的记录条数为:%d\\n", total_count);
}