直接路径加载

直接路径加载功能用于将数据从外部文件(或流式数据)加载到数据库表中,

通过直接路径加载接口,ACI应用程序可以访问数据库的直接路径加载引擎以执行批量加载(类似Oracle的SQL * Loader)实用程序的功能。

下图表示的客户端,数据通过输入缓冲区输入列数组。 ACIDirPathColArrayToStream 调用通过流格式将数据移至服务器端。 这些将数据传递到使用块格式化程序的列数组,以将数据发送到数据库表。

../../../../_images/image174.png

ACI直接路径加载接口可以通过加载包含多行数据的直接路径流来加载多行。要使用直接路径API,客户端应用程序执行以下步骤:

  1. 初始化ACI。
  2. 分配直接路径上下文句柄并设置属性。
  3. 提供要加载的对象(表)的名称。
  4. 描述对象列的外部数据类型。
  5. 准备直接路径接口。
  6. 分配一个或多个列数组。
  7. 分配一个或多个直接路径流。
  8. 在列数组中设置条目以指向每一列的输入数据值。
  9. 将列数组转换为直接路径流格式。
  10. 加载直接路径流。
  11. 检索可能发生的任何错误。
  12. 调用直接路径完成函数。
  13. 释放句柄和数据结构。
  14. 与服务器断开连接

根据要加载的数据,步骤8到11可以重复多次。

直接加载操作要求锁定正在加载的对象,以防止对该对象执行DML操作。 请注意,查询是无锁的,在加载对象时是允许查询的。 DML锁定的模式以及获得哪些DML锁定取决于ACI_ATTR_DIRPATH_PARALLEL选项设置,以及是否要进行分区或子分区加载而不是整个表加载。

支持的数据类型

直接路径加载支持以下类型:

  • SQLT_CHR
  • SQLT_DAT
  • SQLT_INT
  • SQLT_UIN
  • SQLT_FLT
  • SQLT_BIN
  • SQLT_NUM
  • SQLT_PDN
  • SQLT_CLOB
  • SQLT_BLOB
  • SQLT_DATE
  • SQLT_TIMESTAMP
  • SQLT_TIMESTAMP_TZ
  • SQLT_TIMESTAMP_LTZ
  • SQLT_INTERVAL_YM
  • SQLT_INTERVAL_DS

直接路径加载的句柄

直接路径上下文

必须为要加载的每个对象(表或分区表的分区)分配直接路径上下文句柄。由于ACIDirPathCtx句柄是ACIDirPathFuncCtx,ACIDirPathColArray和ACIDirPathStream句柄的父句柄,因此释放ACIDirPathCtx句柄也将释放其子句柄(尽管出于良好的编码习惯,请在释放父句柄之前单独释放子句柄)。

直接路径上下文是使用 ACIHandleAlloc 分配的。 请注意,直接路径上下文的父句柄始终是环境句柄。 使用 ACIHandleFree 释放直接路径上下文。

直接加载列数组

直接路径列数组句柄和直接路径函数列句柄用于向直接路径接口传递的行数组。行由三个数组表示:列值,列长度和列标志。 在列数组上使用的方法包括:分配数组句柄并设置或获取与数组条目对应的值。

两个句柄共享相同的数据结构ACIDirPathColArray,但是这些列数组句柄在父句柄和句柄类型上不同。

使用 ACIHandleAlloc 分配直接路径列数组句柄。 以下的代码片段显示了直接路径列数组句柄的显式分配:

ACIDirPathCtx *dpctx;      /*直接路径上下文*/
ACIDirPathColArray *dpca;  /* 直接路径列数组 */
sword error;
error = ACIHandleAlloc((void  *)dpctx, (void  **)&dpca,
               ACI_HTYPE_DIRPATH_COLUMN_ARRAY,
               (size_t)0, (void  **)0);
//使用ACIHandleFree释放直接路径列数组句柄。
error = ACIHandleFree(dpca, ACI_HTYPE_DIRPATH_COLUMN_ARRAY);

直接路径流

直接路径流是线性加载数据库表的方式, ACIDirPathColArrayToStream 用于直接路径流转换操作, ACIDirPathLoadStream 用于加载操作。

用于程序使用 ACIHandleAlloc 分配直接路径流句柄。 ACIDirPathStream句柄的结构可以认为是成对形式(缓冲区,缓冲区长度)。

转换操作始终附加到流的末尾。 加载操作始终从流的开头开始。 流完全加载后,必须通过调用 ACIDirPathStreamReset 来重置流。

使用 ACIHandleAlloc 分配的直接路径流句柄。 父句柄始终是ACIDirPathCtx句柄:

ACIDirPathCtx *dpctx;    /* direct path context */
ACIDirPathStream *dpstr; /* direct path stream */
sword error;
error = ACIHandleAlloc((void  *)dpctx, (void  **)&dpstr,
               ACI_HTYPE_DIRPATH_STREAM, (size_t)0,(void  **)0);

使用 ACIHandleFree 释放直接路径流句柄:

error = ACIHandleFree(dpstr, ACI_HTYPE_DIRPATH_STREAM);

直接路径相关函数说明

直接路径上下文函数:

函数 作用
ACIDirPathAbort 终止加载操作
ACIDirPathDataSave 执行数据保存
ACIDirPathFinish 提交已经加载的数据
ACIDirPathFlushRow 刷已经加载的行到数据库服务端,这个方法不推荐使用。
ACIDirPathLoadStream 加载已经完成转换的流数据
ACIDirPathPrepare 准备一个加载操作

直接路径列数组上的操作相关函数:

函数 作用
ACIDirPathColArrayEntryGet 获得一个列数组实体
ACIDirPathColArrayEntrySet 设置一个列数组实体
ACIDirPathColArrayRowGet 给列数组中的行数
ACIDirPathColArrayReset 重置列数组
ACIDirPathColArrayToStream 将列数组转到直接路径流格式

操作直接路径流由函数 ACIDirPathStreamReset 进行重置。

直接路径加载示例

/*表定义
create table dp_test (col_1 int,col_2 varchar(20),col_3 float,col_4 varchar(19),col_5 varchar(5) not NULL,col_6 varchar(15),col_7 varchar(20),col_8 varchar(22),col_9 varchar(10),col_10 varchar(5))
*/
#define COL_NUM 1500
#define MAX_NAME_LEN  128              //最大的对像名长度
//在获取语句或存贮过程返回的结果集时,一次性从服务器提取的行数
#define MAX_FETCH_ROWS  10
#define TABLE_NAME          "dp_test" //表名
#define MAX_COL_NUM         10                        //表的列数
#define MAX_RECORD          1                 //加载数据的行数
#define MAX_BUFFER_RECORD   1000                      //数据缓冲区行数
char  dfltdatemask_tbl[100] = "YYYY-MM-DD HH24:MI:SS";
ub1* data_buffer;
ub1* data_bufder_point;
int data_buffer_size = 0;
int all_record= 10;
ub4 buffer_record = 1;
ub4 rownum=0;
ub2 colIdx=0;

ub1 dirpathinput = ACI_DIRPATH_INPUT_STREAM;
ACIParam           *colLstDesc = NULL;
ub1 *cvalp= NULL;

//定义列属性结构
typedef struct ColAttr{
      char    col_name[MAX_NAME_LEN + 1];
      ub2             data_type;  //ACI的数据类型
      ub4             data_size;  //数据的大小,以字节计算
      ub4             data_offset; //这个参数是留给ACI内部自己使用的
      ub2             precision; //数据类型的精度
      ub1             scale;  //数据的刻度
//注意,如果在使用行绑定的方式调用DirPathLoadStream,那么这个参数是没有作用
      void    *data;
//这个参数可以为NULL,表示该列的数据大小没有空值,如果要指定该列的某一行为NULL
      sb2             *indp;
}ColAttr_t;

ColAttr_t col_info[MAX_COL_NUM];
//设置列信息的函数
set_colinfo (char *name,  ub2 type, ub4 size)
{
    strcpy(col_info[i].col_name,name);
    col_info[i].data_type = type;
    col_info[i].data_size = size;
}

for(i=0;i<MAX_COL_NUM;i++)
   {
       switch(i)
        {
        case 0: set_colinfo ("col_1",SQLT_INT,sizeof(int));break;
        case 1: set_colinfo ("col_2",SQLT_CHR,20);break;
        case 2: set_colinfo ("col_3",SQLT_FLT,sizeof(float));break;
        case 3: set_colinfo ("col_4",SQLT_CHR,19);break;
        case 4: set_colinfo ("col_5",SQLT_CHR,5);break;
        case 5: set_colinfo ("col_6",SQLT_CHR,15);break;
        case 6: set_colinfo ("col_7",SQLT_CHR,20);break;
        case 7: set_colinfo ("col_8",SQLT_CHR,22);break;
        case 8: set_colinfo ("col_9",SQLT_CHR,10);break;
        case 9: set_colinfo ("col_10",SQLT_CHR,5);break;
        }
 }

//计算一行数据的总字节大小
for(i=0;i<MAX_COL_NUM;i++)
{
    data_buffer_size += col_info[i].data_size + sizeof(sb2);
}
//分配数据缓冲区
data_buffer = (ub1*)calloc(MAX_BUFFER_RECORD,data_buffer_size);
    r=ACIHandleAlloc(m_penv,(void**)&dpctx,ACI_HTYPE_DIRPATH_CTX,0,0);
    ACI_CHECK_ERROR(m_perr,r);

//分配直接路径上下文句柄,设置表和日期格式、导入方式、列数
ACIHandleAlloc((dvoid *)m_penv,(dvoid **)&dpctx,(ub4)ACI_HTYPE_DIRPATH_CTX,(size_t)0, (dvoid **)0);
ACIAttrSet((dvoid *)dpctx, (ub4)ACI_HTYPE_DIRPATH_CTX,(dvoid *)table,(ub4)strlen((const char *)table),(ub4)ACI_ATTR_NAME, m_perr);
ACIAttrSet((dvoid *)dpctx, (ub4)ACI_HTYPE_DIRPATH_CTX,(dvoid *)dfltdatemask_tbl,(ub4)strlen((const char *)dfltdatemask_tbl),(ub4)ACI_ATTR_DATEFORMAT, m_perr);
ACIAttrSet ((dvoid *)dpctx, ACI_HTYPE_DIRPATH_CTX,(dvoid *)&dirpathinput, (ub4)0,ACI_ATTR_DIRPATH_INPUT, m_perr) ;
ACIAttrSet((dvoid *)dpctx, (ub4)ACI_HTYPE_DIRPATH_CTX,(dvoid *)&cols,(ub4)0, (ub4)ACI_ATTR_NUM_COLS, m_perr) ;

//获得元数据,并设置导入的类型信息
ACIAttrGet((dvoid *)dpctx,ACI_HTYPE_DIRPATH_CTX,(dvoid *)&colLstDesc, (ub4 *)0,ACI_ATTR_LIST_COLUMNS, m_perr);
for (i = 0, pos = 1; i<cols; i++, pos++)
{
      /* 获得列的描述信息 */
ACIParamGet((CONST dvoid *)colLstDesc,(ub4)ACI_DTYPE_PARAM, m_perr,(dvoid **)&colDesc, pos);
      *colLstDescpp = colLstDesc;
      /* 列名 */
ACIAttrSet((dvoid *)colDesc, (ub4)ACI_DTYPE_PARAM,(dvoid *)col[i].col_name,(ub4)strlen(col[i].col_name),(ub4)ACI_ATTR_NAME, m_perr);
      /* 列类型 */
      if (dirpathinput == ACI_DIRPATH_INPUT_STREAM)
      {
      ACIAttrSet((dvoid *)colDesc, (ub4)ACI_DTYPE_PARAM,(dvoid *)&col[i].data_type, (ub4)0,(ub4)ACI_ATTR_DATA_TYPE, m_perr);
      }
      else
      {
      ACIAttrSet((dvoid *)colDesc, (ub4)ACI_DTYPE_PARAM,(dvoid *)&dtype, (ub4)0,(ub4)ACI_ATTR_DATA_TYPE, m_perr);
      }
      /* 数据长度 */
ACIAttrSet((dvoid *)colDesc, (ub4)ACI_DTYPE_PARAM,(dvoid *)&col[i].data_size, (ub4)0,(ub4)ACI_ATTR_DATA_SIZE, m_perr);
      /* 释放描述句柄 */
      ACIDescriptorFree((dvoid *)colDesc,ACI_DTYPE_PARAM));
      *colLstDescpp = NULL;
}
      buf_size = 64 *1024;
      ACIAttrSet((dvoid *)dpctx, (ub4)ACI_HTYPE_DIRPATH_CTX,(dvoid *)&buf_size,(ub4)0, (ub4)ACI_ATTR_BUF_SIZE, m_perr);
      return TRUE;
}

//准备导入
ACIDirPathPrepare(dpctx,m_psvc,m_perr);
ACIHandleAlloc(dpctx,(void**)&dpca,
ACI_HTYPE_DIRPATH_COLUMN_ARRAY,0,0);
ACIHandleAlloc(dpctx,(void**)&dpstr,ACI_HTYPE_DIRPATH_STREAM,0,0);

//准备数据
while(all_record > 0)
    {
        //决定一批次数据行数
        if(all_record > 5)
            buffer_record = 5;
        else
            buffer_record = all_record;

        //在数据缓冲区中构造一批次数据
        data_bufder_point = data_buffer;
        for(i=0;i<buffer_record;i++)
        {
            //第i行第一列
            *((int*)data_bufder_point) = j++;
            data_bufder_point += col_info[0].data_size;
            *((sb2*)data_bufder_point) = 0;
            data_bufder_point += sizeof(sb2);

            //第i行第二列
            strcpy((char*)data_bufder_point,"ABCDE12345ABCDE12345");
            data_bufder_point += col_info[1].data_size;
            *((sb2*)data_bufder_point) = (rand%2)-1;
            data_bufder_point += sizeof(sb2);

            //第i行第三列
            *((float*)data_bufder_point) = (float)12345.123;
            data_bufder_point += col_info[2].data_size;
            *((sb2*)data_bufder_point) = (rand%2)-1;
            data_bufder_point += sizeof(sb2);

            //第i行第四列
            sprintf((char*)data_bufder_point,
                "%04d-%02d-%02d %02d:%02d:%02d",2011,
                (rand%12+1),(rand%28+1),(rand%24),
                (rand%60),(rand%60));
            data_bufder_point += col_info[3].data_size;
            *((sb2*)data_bufder_point) = 0;
            data_bufder_point += sizeof(sb2);

            //第i行第五列
            strcpy((char*)data_bufder_point,"12345");
            data_bufder_point += col_info[4].data_size;
            *((sb2*)data_bufder_point) = (rand%2)-1;
            data_bufder_point += sizeof(sb2);

            //第i行第六列
            strcpy((char*)data_bufder_point,"1234567890a");
            data_bufder_point += col_info[5].data_size;
            *((sb2*)data_bufder_point) = (rand%2)-1;
            data_bufder_point += sizeof(sb2);

            //第i行第七列
            strcpy((char*)data_bufder_point,"12345678901234567890");
            data_bufder_point += col_info[6].data_size;
            *((sb2*)data_bufder_point) = (rand%2)-1;
            data_bufder_point += sizeof(sb2);

            //第i行第八列
            strcpy((char*)data_bufder_point,"1234567890123456789012");
            data_bufder_point += col_info[7].data_size;
            *((sb2*)data_bufder_point) = (rand%2)-1;
            data_bufder_point += sizeof(sb2);

            //第i行第九列
            sprintf((char*)data_bufder_point,"%04d-%02d-%02d",
                2011,(rand%12+1),(rand%28+1));
            data_bufder_point += col_info[8].data_size;
            *((sb2*)data_bufder_point) = 0;
            data_bufder_point += sizeof(sb2);

            //第i行第十列
            strcpy((char*)data_bufder_point,"0101010101");
            data_bufder_point += col_info[9].data_size;
            *((sb2*)data_bufder_point) = (rand%2)-1;
            data_bufder_point += sizeof(sb2);
        }
//重置列数组和流的状态
        ACIDirPathColArrayReset(dpca,m_perr);
        ACIDirPathStreamReset(dpstr,m_perr);
        cvalp=(ub1*)data_buffer;

//将数据写入列数组中
        for (rownum = 0; rownum<buffer_record; rownum++)
        {
            for (colIdx = 0; colIdx<MAX_COL_NUM; colIdx++)
            {
                if (*(sb2*)(cvalp +col_info[colIdx].data_size) == -1)
                {
                    ACIDirPathColArrayEntrySet(dpca, m_perr,rownum,
                        colIdx,cvalp,col_info[colIdx].data_size,
                        ACI_DIRPATH_COL_NULL);
                }
                else
                {
                    ACIDirPathColArrayEntrySet(dpca,m_perr,rownum, colIdx,
                        cvalp,col_info[colIdx].data_size,ACI_DIRPATH_COL_COMPLETE);
                }

                cvalp += col_info[colIdx].data_size + sizeof(sb2);
            }
        }
//将数据从列数组中转换到流中
        ACIDirPathColArrayToStream(dpca,dpctx,dpstr,m_perr,buffer_record,0);
        /* 加载流 */
ACIDirPathLoadStream(dpctx,dpstr,m_perr);
      /*保存数据*/
ACIDirPathDataSave(dpctx,m_perr,ACI_DIRPATH_DATASAVE_FINISH));
        /*结束*/
ACIDirPathFinish(dpctx,m_perr);
        //完成一批次加载,减少总行数
        all_record -= MAX_BUFFER_RECORD;
    }// end of while(all_record > 0)
  /*此处省略句柄释放,请释放每一个分配的句柄*/