*SQLAlchemy とは
Pythonでデータベースとやり取りをするためのORMライブラリのひとつです。用意されている関数を使うことでSQLを実行できます。
また、メタデータを使うことでDBのテーブルとPythonのモデルクラスをマッピングすることができ、コードとテーブルを完全に同期させることができます。このため、開発中のマイグレーションの手間を省略することができるといったメリットがあります。
今まで触ったことがなかったので、SQLAlchemy を使って基本的なデータ操作をしてみました。
今回は MySQL を使っていますが、そのほか
PostgreSQL
やOracle
にも対応しています。*環境
- MacOS
- SQLAlchemy 1.2.16
- mysql client 1.4.1
- Python 2.7.14
*参考
*インストール
以下の作業は、すべて仮想環境の中で行います。下記コマンドを実行してインストールします。
$ pip install sqlalchemy
*MySQL の起動
MySQLをインストールして起動します。$ brew install mysql
$ pip install mysqlclient
$ mysql.server start
下記コマンドを実行して、ユーザやパスワードなどMySQLの初期設定をします。(2回目以降は不要です)
$ mysql_secure_installation
MySQLにログインします。
$ mysql -u root -p
私はしばらくMySQLを触っていなかったせいで、下記のエラーが発生しました。
mysql> show databases;
ERROR 1449 (HY000): The user specified as a definer ('mysql.infoschema'@'localhost') does not
原因はMySQLが5.8にアップデータした際に、ユーザーの設定がうまく更新できなかったからのようです。
なので、
SET GLOBAL innodb_fast_shutdown = 1;
を打って、ログアウトしてから mysql_upgrade
で更新してログインし直すとエラーを解消することができました。$ mysql -u root -p
mysql> SET GLOBAL innodb_fast_shutdown = 1;
mysql> exit
$ mysql_upgrade -u root -p
...
Upgrade process completed successfully.
Checking if update is needed.
$ mysql -u root -p
mysql> show databases;
*DBの接続設定
任意のディレクトリを作成し、その中でsetting.py
を新規作成します。データベースの設定は、自分の環境に合わせて設定してください。
<setting.py>
# coding=utf-8
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import scoped_session, sessionmaker
DATABASE = 'mysql://%s:%s@%s/%s?charset=utf8' % (
"user_name",
"password",
"host(127.0.0.1)",
"database_name",
)
ENGINE = create_engine(
DATABASE,
encoding="utf-8",
echo=True
)
session = scoped_session(
sessionmaker(
autocommit=False,
autoflush=False,
bind=ENGINE
)
)
Base = declarative_base()
Base.query = session.query_property()
*モデルの作成
setting.py
と同じディレクトリでuser.py
を新規作成します。モデルクラスでテーブル定義を作成し、
create_all()
でDBにテーブルを作成します。<user.py>
# coding=utf-8
import sys
from setting import ENGINE, Base
from sqlalchemy import Column, Integer, String
class User(Base):
__tablename__ = 'users'
id = Column('id', Integer, primary_key=True)
name = Column('name', String(200))
age = Column('age', Integer)
email = Column('email', String(100))
def main(args):
# Base と ENGINE をDB接続設定からインポート
Base.metadata.create_all(bind=ENGINE)
if __name__ == '__main__':
main(sys.argv)
MySQLに接続して、テーブルが作成されているか確認をします。
mysql> use sample2;
Database changed
mysql> show tables;
Empty set (0.00 sec)
mysql> show tables;
+-------------------+
| Tables_in_sample2 |
+-------------------+
| users |
+-------------------+
1 row in set (0.00 sec)
mysql> desc users;
+-------+--------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------+--------------+------+-----+---------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| name | varchar(200) | YES | | NULL | |
| age | int(11) | YES | | NULL | |
| email | varchar(100) | YES | | NULL | |
+-------+--------------+------+-----+---------+----------------+
4 rows in set (0.01 sec)
*登録(INSERT)
データの登録は、モデルのインスタンスを作成してadd()
でデータを追加したあとcommit()
でクエリを実行します。# coding=utf-8
from setting import session
from user import User
user = User()
user.name = '平成太郎'
user.age = 20
user.email = 'taro@aaa.bbb.ccc.com'
session.add(user)
session.commit()
*全データの取得(SELECT)
all()
で全データを取得します。結果はリストになります。# coding=utf-8
from setting import session
from user import User
users = session.query(User).all()
for user in users:
print(user.name)
print(user.age)
print(user.email)
<実行結果>
平成太郎
21
taro@aaa.bbb.ccc.com
平成花子
18
hanako@aaa.bbb.ccc.com
昭和太郎
31
taro@aaa.bbb.ccc.com
昭和花子
33
hanako@aaa.bbb.ccc.com
*特定カラムのみ取得(SELECT)
session.query()
でカラムを指定すると、特定のカラムのみ取得することができます。# coding=utf-8
from setting import session
from user import User
users = session.query(User.name, User.email).all()
for user in users:
print(user.name)
print(user.email)
<実行結果>
平成太郎
taro@aaa.bbb.ccc.com
平成花子
hanako@aaa.bbb.ccc.com
昭和太郎
taro@aaa.bbb.ccc.com
昭和花子
hanako@aaa.bbb.ccc.com
*条件で取得(WHERE)
filter()
を使って条件で絞り込むことができます。# coding=utf-8
from setting import session
from user import User
users = session.query(User).filter(User.age > 30).all()
for user in users:
print(user.name)
print(user.age)
print(user.email)
<実行結果>
昭和太郎
31
taro@aaa.bbb.ccc.com
昭和花子
33
hanako@aaa.bbb.ccc.com
*条件で取得(IN)
条件の絞り込みにはin_()
も使うことができますが、処理は遅くなるようです。# coding=utf-8
from setting import session
from user import User
ages = [18, 33]
users = session.query(User).filter(User.age.in_(ages)).all()
for user in users:
print(user.name)
print(user.age)
print(user.email)
<実行結果>
平成花子
18
hanako@aaa.bbb.ccc.com
昭和花子
33
hanako@aaa.bbb.ccc.com
*指定件数を取得(limit)
limit()
を使うことで、指定した件数分のみのデータを取得することができます。# coding=utf-8
from setting import session
from user import User
users = session.query(User).limit(3).all()
for user in users:
print(user.name)
print(user.age)
print(user.email)
<実行結果>
平成太郎
21
taro@aaa.bbb.ccc.com
平成花子
18
hanako@aaa.bbb.ccc.com
昭和太郎
31
taro@aaa.bbb.ccc.com
*並び順の指定(ORDER BY)
order_by()
で並び替えをすることができます。desc
など指定する場合は、個別に import する必要があります。# coding=utf-8
from setting import session
from user import User
from sqlalchemy import desc
users = session.query(User).order_by(desc(User.age)).all()
for user in users:
print(user.name)
print(user.age)
print(user.email)
<実行結果>
昭和花子
33
hanako@aaa.bbb.ccc.com
昭和太郎
31
taro@aaa.bbb.ccc.com
平成太郎
21
taro@aaa.bbb.ccc.com
平成花子
18
hanako@aaa.bbb.ccc.com
*更新(UPDATE)
更新をする際は、データを取得して値を変更してからcommit()
します。# coding=utf-8
from setting import session
from user import User
user1 = session.query(User).filter(User.id == 1).first()
user1.age = 21
session.commit()
users = session.query(User).filter(User.id == 1).all()
for user in users:
print(user.name)
print(user.age)
print(user.email)
<実行結果>
平成太郎
21
taro@aaa.bbb.ccc.com
*削除(DELETE)
データを取得してdelete()
してからcommit()
をすると削除することができます。# coding=utf-8
from setting import session
from user import User
user1 = session.query(User).filter(User.age == 33).delete()
session.commit()
users = session.query(User).all()
for user in users:
print(user.name)
print(user.age)
print(user.email)
<実行結果>
平成太郎
21
taro@aaa.bbb.ccc.com
平成花子
18
hanako@aaa.bbb.ccc.com
昭和太郎
31
taro@aaa.bbb.ccc.com
*SQLを直接実行
SQL文を直接実行したい場合はexecute()
を使います。# coding=utf-8
from setting import session
from user import User
user_id = 1
sql = 'select name from users where id = %s' % (user_id)
users = session.execute(sql)
for user in users:
print(user.name)
<実行結果>
平成太郎
*所感
提供されている関数名がわかりやすかったので、直感的に使うことができました。ただ、データベースごとに少し書き方が異なるようだったので、使い方をよく理解しておく必要がありそうです。公式ドキュメントもありますが、全て英語なので少し学習コストがかかりそうです。時間のあるときによく読んで理解を深めておこうと思います。
Sign up here with your email
ConversionConversion EmoticonEmoticon