编程步骤¶
- 导入sqlalchemy包和类型和函数等,示例如下:
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine, MetaData, or_, join, outerjoin
from sqlalchemy import Column, String, Integer, Float, Boolean, DECIMAL
from sqlalchemy import Enum, Date, DateTime, Time, Text
from sqlalchemy import LargeBinary, UnicodeText, TIMESTAMP
from sqlalchemy import func, Table
from sqlalchemy.dialects.oscar import CLOB, BLOB
- 打开与数据库关联的Engine实例:使用“oscar+stpython”指定使用oscar方言和stpython驱动,以及使用连接字符串打开特定的数据库。程序运行过程中一般保持打开且只打开一次。echo表示是否显示执行过程产生的信息。
engine = create_engine(“oscar+stpython://sysdba:szoscar55@10.1.203.42:2003/osrdb”, echo=False)
- 关联数据表,打开session,进行数据的增删改查等工作,关闭session。
- 程序结束前关闭Engine实例。
数据类型¶
Sqlalchemy和oscar方言支持的数据类型和数据库数据类型以及python类型的对应关系如下表:
| Sqlalchemy/oscar方言类型 | 数据库数据类型 | Python类型 | 备注 |
|---|---|---|---|
| Boolean | SMALLINT | bool | |
| Integer | INT | int | |
| Float | FLOAT | float | |
| String(n) | VARCHAR(n) | string | n表示最大长度 |
| Enum | VARCHAR | string | |
| DECIMAL(m,n) | NUMERIC | float | |
| Date | DATE | datetime.date | |
| Datetime | DATE | datetime.datetime | |
| Text | CLOB | string | |
| LargeBinary | BLOB | Bytes | |
| oscar.CLOB | CLOB | string | |
| oscar.BLOB | BLOB | Bytes | |
| TIMESTAMP | TIMESTAMP | datetime.datetime | |
| LONG | CLOB | 整形 | |
| DOUBLE_PRECISION | DOUBLE PRECISION | Float | |
| BINARY_DOUBLE | DOUBLE PRECISION | Float | |
| RAW(n) | RAW(n)/BINARY(n) | Bytes | n表示最大长度 |
| BINARY_FLOAT | FLOAT | Float |
建立连接¶
连接字符串¶
格式:“oscar+stpython://用户名:密码@主机名:端口/数据库名” 例:
oscar+stpython://sysdba:szoscar55@10.1.203.42:2003/osrdb
打开数据库¶
通过sqlalchemy.create_engin函数使用连接字符串打开指定数据库
from sqlalchemy import create_engine
engine = create_engine(“oscar+stpython://sysdba:szoscar55@10.1.203.42:2003/osrdb”, echo=False)
建立连接session¶
调用sessionmaker函数,并绑定engine实例,通过返回的对象建立连接,如下:
session = sessionmaker(bind=engine)
mysession = session()
数据库查询等操作¶
直接执行SQL语句¶
使用session的execute函数执行sql语句,示例如下:
connect_string = 'oscar+stpython://sysdba:szoscar55@10.1.203.42:2003/osrdb'
engine_oscar = create_engine(connect_string, echo=True)
print(engine_oscar)
db_session = sessionmaker(bind=engine_oscar)
session = db_session()
sql = 'select * from dual'
result = session.execute(sql).fetchall()
print(result[0][0])
数据表关联等操作¶
数据表映射¶
通过declarative_base()函数创建一个基类,然后根据该基类映射多个数据表。在如下的示例中,Info类映射表info, User类表映射为userabc表,Uer.tstring映射userabc表的username列,同时示例中如果不指定username,则列名就是tsring。 在接下来的数据操作过程,就可以使用User代表userabce表,使用Uer.tstring代表userabc表的username列, 其中username的类型是String(64),可以为null。
Base = declarative_base()
class Info(Base):
__tablename__ = 'info'
ttint = Column(Integer, primary_key=True)
ttstring = Column(String(64))
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.ttstring)
class User(Base):
__tablename__ = 'userabc'
#id = Column(Integer, primary_key=True)
#指定将name映射为username2。如果不指定列名username2,则列名为name
#name = Column('username2', String(64), nullable=False)
tint = Column(Integer, primary_key=True) #一定要指定主键
tstring = Column('username',String(64), nullable=False)
tfloat = Column(Float)
tbool = Column(Boolean)
tdecimal = Column(DECIMAL(10, 4))
tenum = Column(Enum('1', '234'))
tdate = Column(Date)
ttime = Column(Time)
tdatetime = Column(DateTime)
ttext = Column(Text)
tlargebinary = Column(LargeBinary)
tclob = Column(CLOB)
tblob = Column(BLOB)
ttimestamp = Column(TIMESTAMP)
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.tstring)
数据表删除¶
使用Base.metadata.drop_all()函数从数据库中删除与Base关联所有的表,示例如下。注意:该方式会从数据库中删除与Base关联的所有表,需谨慎操作!
Base.metadata.drop_all(engine)
基础数据类型插入¶
通过映射类获取对象,然后给每个成员变量(对应数据表的每列)赋值,然后使用session.add()方式将数据插入到数据中, 最后使用session.comnit()函数提交事务。示例如下:
def get_user():
user1 = User()
user1.tint = 6
user1.tstring = 'hello'
user1.tfloat = 1234.1234
user1.tbool = True
user1.tdecimal = 1234.1234
user1.tenum = '234'
user1.tdate = date(2021, 2, 27)
user1.ttime = '17:00:00'
user1.tdatetime = datetime(2021,2,27,17,1,59)
user1.ttext = 'testtext'
user1.tlargebinary = bytes(b'testlargebinary')
user1.tclob = 'testclob'
user1.tblob = bytes(b'testblob')
user1.ttimestamp = datetime.now()
return user1
#插入数据
session = sessionmaker(bind=engine)
mysession = session()
mysession.add(get_user())
mysession.commit()
基础数据类型读取¶
调用session.query()函数查询表,参数可以是映射表或多个列。
session = sessionmaker(bind=engine)
mysession = session()
result = mysession.query(User)
for i in result:
print(str(i.tint) + '\t'+ str(i.tstring) + '\t'+ str(i.tfloat)
+'\t'+str(i.tbool)+'\t'+str(i.tdecimal)+'\t'+str(i.tenum)
+'\t'+str(i.tdate)+'\t'+str(i.ttime)+ '\t'+str(i.tdatetime)
+'\t'+str(i.ttext)+'\t'+str(i.tlargebinary)+'\t'+str(i.tclob)
+'\t'+str(i.tblob)+'\t'+str(i.ttimestamp))
数据更新¶
update方式更新¶
使用update()函数更新,参数为字典对象,更新后使用commit函数更新 示例:更新User表的tstring列
session = sessionmaker(bind=engine)
mysession = session()
data = {}
data[User.tstring] = 'update'
result = mysession.query(User).filter(User.tint == 6).update(data) #update({User.tstring:'update'})
mysession.commit()
通过修改结果集方式更新¶
修改完后使用commit函数提交
示例1:更新User表的tstring列
session = sessionmaker(bind=engine)
mysession = session()
result = mysession.query(User).filter(User.tint == 7).first() # first, all, one
result.tstring = 'update_by_flush'
mysession.commit()
示例2:更新User表的tstring列
session = sessionmaker(bind=engine)
mysession = session()
result = mysession.query(User).filter(User.tint > 7)
for i in result:
i.tstring = 'update_by_flush'
mysession.commit()
数据删除¶
使用delete函数删除,然后使用commit函数提交 示例:删除User表中tint字段等于9的行
session = sessionmaker(bind=engine)
mysession = session()
result = mysession.query(User).filter(User.tint == 9).delete()
mysession.commit()
数据过滤和筛选¶
使用filter函数过滤数据,对应sql语句中where条件。
等值查询¶
使用 “==”
示例:获取User表tbool列为true的数据,并从结果集的第二行开始获取,即offset(1)
session = sessionmaker(bind=engine)
mysession = session()
result = mysession.query(User).filter(User.tbool == True).offset(1)
print_user_result(result)
与 或 条件使用¶
使用or_()函数表达 或 关系,如果不使用or_()函数,则默认多个表达式为 与 关系
示例:获取User表数据,条件是tint列的值为7或tbool的值为True,通过tint列降序输出,并限制只获取一条数据
session = sessionmaker(bind=engine)
mysession = session()
result = mysession.query(User).filter(or_(User.tint > 7,
User.tbool == True)).order_by(User.tint.desc()).limit(1) #获取一条数据
print_user_result(result)
模糊匹配¶
使用like函数 示例:
session = sessionmaker(bind=engine)
mysession = session()
result = mysession.query(User.tint, User.tstring).filter(User.tstring.like('%el%'))
print_user_result(result)
包含¶
使用in_()函数
示例:返回User表tint列的值出现在Info表的ttint集合的所有行
session = sessionmaker(bind=engine)
mysession = session()
result = mysession.query(User).filter(User.tint.in_(mysession.query(Info.ttint)))
print_user_result(result)
分页查询¶
使用limit(m).offset(n)的方式进行分页查询,表示从查询结果第n+1行开始,输出m行数据
示例:
session = sessionmaker(bind=engine)
mysession = session()
result = mysession.query(User.tint).filter(User.tbool == True).limit(3).offset(2)
print_user_result(result)
分组以及函数使用¶
(1)使用group_by()函数按某字段值进行分组。 (2)使用func.count(),func.max(), func.min()等函数针对某一列数据操作,通过label()函数指定输出别名。
示例:
session = sessionmaker(bind=engine)
mysession = session()
counts = func.count(User.tstring).label('cc')
result = mysession.query(User.tstring, counts).filter(User.tint > 5).group_by(User.tstring).having(counts > 0)
for i in result:
print(str(i.tstring) + '\t' + str(i.cc))
表连接¶
自然连接¶
使用join()函数表达表的自然连接方式。
示例:User表和Info自然连接,连接条件是Usertint == Info.ttint
session = sessionmaker(bind=engine)
mysession = session()
result = mysession.query(User.tint, User.tstring).join(Info, User.tint == Info.ttint).filter(User.tint > 6)
for i in result:
print(str(i.tint) + '\t' + str(i.tstring))
外连接¶
使用outjoiner进行外连接
session = sessionmaker(bind=engine)
mysession = session()
result = mysession.query(User.tint, User.tstring, Info.ttstring).outerjoin(Info, User.tint == Info.ttint).filter(User.tint > 6)
for i in result:
print(str(i.tint) + '\t' + str(i.tstring) + '\t' + str(i.ttstring))
事务操作¶
方式1¶
通过sessionmaker开启的session默认开启了事务,写入数据后使用session.commit()提交事务,失败后使用session.rollback()回滚事务。
示例:
session = sessionmaker(bind=engine)
mysession = session()
try:
result = mysession.query(User).filter(User.tint == 9).update({User.tstring:'updatebytran'})
mysession.commit();
except BaseException as e:
print(str(e))
mysession.rollback()
mysession.close()
方式2¶
通过engine.connect()方式开启的连接,使用connect.begin()开启事务,并返回base.RootTransaction类型的对象,使用该对象的commit()函数提交事务,使用该对象的rollback()函数回滚事务。
示例:
mysession = engine.connect()
trans = mysession.begin()
print(type(trans))
try:
result = mysession.execute("update userabc set username = 'hello3' where tint = 7")
trans.commit();
except BaseException as e:
print(str(e))
trans.rollback()
mysession.close()
大对象操作¶
Clob¶
通过python 的string类型读写clob
示例:
session = sessionmaker(bind=engine)
mysession = session()
cl = '1'*1024*1024*5
user = get_user()
user.tint = 12
user.tclob = cl.. code-block:: Python
mysession.add(user) #插入数据
mysession.commit()
result = mysession.query(User.tclob).filter(User.tint == 12).first()#读取clob
print(result.tclob[1024*1024*5 - 1] +' len: '+ str(len(result.tclob)))
Blob¶
通过python 的bytes类型读写Blob
示例:
session = sessionmaker(bind=engine)
mysession = session()
cl = b'a'*1024*1024*50
user = get_user()
user.tint = 13
user.tblob = cl
mysession.add(user)
mysession.commit()
result = mysession.query(User.tblob).filter(User.tint == 13).first()
print(str(result.tblob[1024*1024*6 - 1]) +' len: '+ str(len(result.tblob)))
Metadata¶
通过metadata映射表和创建表等操作,与7.2中效果相同。
示例:
metadata = MetaData(engine)
student = Table('tb_test', metadata, Column('id', Float, primary_key=True), Column('c_varchar', String(50)),
Column('c_decimal', DECIMAL(10,3)),Column('c_blob', LargeBinary),Column('c_clob', Text),
Column('c_binary', RAW(10)),Column('c_double', DOUBLE_PRECISION))
metadata.create_all()
获取所有表信息¶
from sqlalchemy import create_engine, MetaData,Table
from sqlalchemy import Table, Column, Date, Integer, String, ForeignKey,Boolean,JSON
from sqlalchemy.ext.declarative import declarative_base, AbstractConcreteBase
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.pool import QueuePool
from sqlalchemy.orm import mapper
connect_string = 'oscar+stpython://sysdba:szoscar55@10.1.203.18:2003/osrdb'
engine = create_engine(connect_string, echo=False)
Base = declarative_base()
meta = MetaData()
meta.reflect(bind=engine)
db_tables = meta.sorted_tables
print(db_tables)
engine.dispose()
Json操作¶
Json支持中文、能力与Mysql相同。
from sqlalchemy import create_engine, MetaData
from sqlalchemy import Table, Column, Date, Integer, String, ForeignKey,Boolean
from sqlalchemy.ext.declarative import declarative_base, AbstractConcreteBase
from sqlalchemy import create_engine
from sqlalchemy.dialects.oscar import JSON
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.pool import QueuePool
from sqlalchemy.orm import mapper
connect_string = 'oscar+stpython://sysdba:szoscar55@10.1.203.18:2003/osrdb'
engine = create_engine(connect_string, echo=False)
metadata = MetaData(engine)
class_registry = {}
Base = declarative_base(class_registry=class_registry)
metadata = MetaData(engine)
job_grade = Table('job_grade', metadata, Column('id', Integer, primary_key=True), Column('flex_data', JSON))
metadata.create_all()
class User(Base):
__tablename__ = 'job_grade'
tint = Column('id',Integer, primary_key=True,autoincrement=True)
tstring = Column('flex_data',JSON)
def get_user():
user1 = User()
user1.tstring = dict(ka=[dict(kb='waz啊bc中'), dict(kb='v2')])
return user1
def add_data():
session = sessionmaker(bind=engine)
mysession = session()
tmp = get_user();
mysession.add(tmp)
mysession.commit()
print(tmp.tint)
def query_data():
session = sessionmaker(bind=engine)
mysession = session()
result = mysession.query(User.tstring["ka"][0]["kb"]).first()
print(result)
if __name__ == '__main__':
add_data()
query_data()
engine.dispose()