Python类中使用__call__

对象通过提供call(slef, [,args [,*kwargs]])方法可以模拟函数的行为,如果一个对象x提供了该方法,就可以像函数一样使用它,也就是说x(arg1, arg2…) 等同于调用x.call(self, arg1, arg2) 。 模拟函数的对象可以用于创建防函数(functor) 或代理(proxy)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class DistanceForm(object):
def __init__(self, origin):
self.origin = origin
print "origin :"+str(origin)
def __call__(self, x):
print "x :"+str(x)
p = DistanceForm(100)
p(2000)
输出
>>>
origin :100
x :2000

Sqlalchemy

使用sqlalchemy

自动orm

database.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.ext.declarative import declarative_base
engine = create_engine('sqlite:///./test.db', convert_unicode=True) # 创建数据库引擎( 当前目录下保存数据库文件)
db_session = scoped_session(sessionmaker(autocommit=False,
autoflush=False,
bind=engine))
Base = declarative_base()
Base.query = db_session.query_property()
def init_db():
# 在这里导入所有的可能与定义模型有关的模块,这样他们才会合适地
# 在 metadata 中注册。否则,您将不得不在第一次执行 init_db() 时
# 先导入他们。
import models
Base.metadata.create_all(bind=engine)

Models.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from sqlalchemy import Column, Integer, String
from database import Base
# 注意继承的是Base
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(50), unique=True)
email = Column(String(120), unique=True)
def __init__(self, name=None, email=None):
self.name = name
self.email = email
def __repr__(self):
return '%s (%r, %r)' % (self.__class__.__name__, self.name, self.email)

main.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from flask import Flask
from database import init_db, db_session
from models import User
app = Flask(__name__)
@app.teardown_request
def shutdown_session(exception=None):
db_session.remove()
@app.route('/')
def index():
return 'hello world flask'
@app.route('/add/<name>/<email>')
def add(name, email):
u = User(name=name, email=email)
try:
db_session.add(u)
db_session.commit()
except Exception, e:
return 'wrong'
return 'Add %s user successfully' % name
@app.route('/get/<name>')
def get(name):
try:
u = User.query.filter(User.name==name).first()
except Exception, e:
return 'there isnot %s' % name
return 'hello %s' % u.name
if __name__ == '__main__':
init_db()
app.debug = True
app.run()

使用flask-sqlalchemy

仅需要知道与普通的 SQLAlchemy 不同之处:

SQLAlchemy 允许您访问下面的东西:

  • sqlalchemy 和 sqlalchemy.orm 下所有的函数和类
    一个叫做 session 的预配置范围的会话(session)
    metadata 属性
    engine 属性

  • SQLAlchemy.create_all() 和 SQLAlchemy.drop_all(),根据模型用来创建以及删除表格的方法
    一个 Model 基类,即是一个已配置的声明(declarative)的基础(base)
    Model 声明基类行为类似一个常规的 Python 类,不过有个 query 属性,可以用来查询模型 (Model 和 BaseQuery)

  • 您必须提交会话,但是没有必要在每个请求后删除它(session),Flask-SQLAlchemy 会帮您完成删除操作。

flask 小型项目

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////tmp/test.db'
db = SQLAlchemy(app)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True)
email = db.Column(db.String(120), unique=True)
def __init__(self, username, email):
self.username = username
self.email = email
def __repr__(self):
return '<User %r>' % self.username
db.create_all()
db.session.add(admin)
db.session.add(guest)
db.session.commit()

flask 大型项目

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
db.init_app(app)
db.session
配置
SQLALCHEMY_TRACK_MODIFICATIONS = True
SQLALCHEMY_RECORD_QUERIES = True
SQLALCHEMY_POOL_SIZE = 20
SQLALCHEMY_POOL_RECYCLE = 3000
SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://root:12345@127.0.0.1/bi_dev'
SQLALCHEMY_BINDS = {
'orig_wpt': 'mysql+pymysql://wpt_read:wxoselctWxjJnv0PhpQz@172.28.14.115/wpt',
'orig_wpt_mall': 'mysql+pymysql://wpt_read:wxoselctWxjJnv0PhpQz@172.28.14.115/wptmall',
'orig_wpt_payment': 'mysql+pymysql://wpt_read:wxoselctWxjJnv0PhpQz@172.28.14.115/wpt_payment',
'orig_wpt_ods': 'mysql+pymysql://localhost/orig_wpt_ods',
}

一些通用的用法

简单查询

1
2
3
4
5
6
7
User.query.all()
session.query(User).all()
session.query(User.name, User.fullname).all()
session.query(User, User.name).all()

带条件查询

1
2
3
4
print(session.query(User).filter_by(name='user1').all())
print(session.query(User).filter(User.name == "user").all())
print(session.query(User).filter(User.name.like("user%")).all())

多条件查询

1
2
print(session.query(User).filter(and_(User.name.like("user%"), User.fullname.like("first%"))).all())
print(session.query(User).filter(or_(User.name.like("user%"), User.password != None)).all())

与原生sql条件对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#equals
query.filter(User.name == 'ed')
#not equals
query.filter(User.name != 'ed')
#LIKE
query.filter(User.name.like('%ed%'))
#IN
uery.filter(User.name.in_(['ed','wendy', 'jack']))
#not IN
query.filter(~User.name.in_(['ed','wendy', 'jack']))
#is None
query.filter(User.name == None)
#not None
query.filter(User.name != None)
from sqlalchemy import and_
query.filter(and_(User.name =='ed',User.fullname =='Ed Jones')) # and
query.filter(User.name == 'ed',User.fullname =='Ed Jones') # and
query.filter(User.name == 'ed').filter(User.fullname == 'Ed Jones')# and
from sqlalchemy import or_
query.filter(or_(User.name =='ed', User.name =='wendy')) #or
query.filter(User.name.match('wendy')) #match

sql过滤

1
print(session.query(User).filter("id>:id").params(id=1).all())

关联查询

1
2
3
print(session.query(User, Address).filter(User.id == Address.user_id).all())
print(session.query(User).join(User.addresses).all())
print(session.query(User).outerjoin(User.addresses).all())

聚合查询

1
2
print(session.query(User.name, func.count('*').label("user_count")).group_by(User.name).all())
print(session.query(User.name, func.sum(User.id).label("user_id_sum")).group_by(User.name).all())

子查询

1
2
stmt = session.query(Address.user_id, func.count('*').label("address_count")).group_by(Address.user_id).subquery()
print(session.query(User, stmt.c.address_count).outerjoin((stmt, User.id == stmt.c.user_id)).order_by(User.id).all())

exists

1
2
3
4
5
6
7
8
9
10
11
12
print(session.query(User).filter(exists().where(Address.user_id == User.id)))
print(session.query(User).filter(User.addresses.any()))
--deals=交易表,areas=地域表,例如香港;我们的目的:查看有交易的地域
select * from areas where id in (select city_id from deals);
select * from areas where id in (select city_id from deals where deals.city_id = areas.id);
select * from areas where exists (select null from deals where deals.city_id = areas.id);
区别:
EXISTS语法并没有说哪个字段落在了子查寻的结果中,而是说exists后面的语句执行的结果是不是有记录,只要有记录,则主查询语句就成立。它代表‘存在’,用来引领嵌套查询的子查询,它不返回任何数据,只产生逻辑真值‘true’与逻辑假值‘False’。由EXISTS引出的子查询,其目标列表达式通常都用*(用null也可以),因为带有EXISTS的子查询只返回真值或假值,给出列名没有实际意义

as

1
2
3
for row in session.query(User.name.label('name_label')).all():
print(row.name_label)

限制返回字段查询

1
2
3
4
5
person = session.query(Person.name, Person.created_at,Person.updated_at).filter_by(name="zhongwei").order_by(
Person.created_at).first()
session.query(User).order_by(User.id)[1:3]:

记录总数查询:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from sqlalchemy import func
session.query(func.count(User.id))
session.query(func.count(User.id)).group_by(User.name)
from sqlalchemy import distinct
session.query(func.count(distinct(User.name)))
也有简单点的
session.query(User).filter(User.name.like('%ed')).count()
为了实现简单计数SELECT count(*) FROM table,可以这么写:
session.query(func.count('*')).select_from(User).scalar()
如果我们明确表达计数是根据User表的主键的话,可以省略select_from(User):
session.query(func.count(User.id)).scalar()

对于返回结果的处理

# 使用原生sql

一些技巧

字符串能使Query更加灵活,通过text()构造指定字符串的使用,这种方法可以用在很多方法中,像filter()和order_by()。

1
2
from sqlalchemy import text
for user in session.query(User).filter(text("id<224")).order_by(text("id")).all()

绑定参数可以指定字符串,用params()方法指定数值。

1
session.query(User).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(User.id).one()

使用原生sql

proxy = db.engine.execute(text(“”” SELECT :indication
FROM bi_statistic
WHERE on_day
BETWEEN :start_time
AND :end_time
“””), start_time=start_time, end_time=end_time, indication=’’.join(indication))

http://honmaple.org/tag/sqlalchemy/index.html#org9c10e0a

Python的__str__

str 直接打印对象的实现方法
————————————————————————————————————————————————————
在python语言里,str一般是格式是这样的。
class A:
def str(self):
return “this is in str”
事实上,str是被print函数调用的,一般都是return一个什么东西。这个东西应该是以字符串的形式表现的。如果不是要用str()函数转换。当你打印一个类的时候,那么print首先调用的就是类里面的定义的str,比如:str.py

#!/usr/bin/env python

class strtest:
def init(self):
print “init: this is only test”
def str(self):
return “str: this is only test”

if name == “main“:
st=strtest()
print st

$./str.py
init: this is only test
str: this is only test
从上面例子可以看出,当打印strtest的一个实例st的时候,str函数被调用到。
其实,python里面的对象基本上都默认有个str供print函数所用。比如字典里的str,见红色部分:

dir({})
[‘class‘, ‘cmp‘, ‘contains‘, ‘delattr‘, ‘delitem‘, ‘doc‘, ‘eq‘, ‘format‘, ‘ge‘, ‘getattribute‘, ‘getitem‘, ‘gt‘, ‘hash‘, ‘init‘, ‘iter‘, ‘le‘, ‘len‘, ‘lt‘, ‘ne‘, ‘new‘, ‘reduce‘, ‘reduce_ex‘, ‘repr‘, ‘setattr‘, ‘setitem‘, ‘sizeof‘, ‘str‘, ‘subclasshook‘, ‘clear’, ‘copy’, ‘fromkeys’, ‘get’, ‘has_key’, ‘items’, ‘iteritems’, ‘iterkeys’, ‘itervalues’, ‘keys’, ‘pop’, ‘popitem’, ‘setdefault’, ‘update’, ‘values’]
t={}
t[‘1’] = “hello”
t[‘2’] = “world”
t #等于 print t
{‘1’: ‘hello’, ‘2’: ‘world’}
t.str()
“{‘1’: ‘hello’, ‘2’: ‘world’}”
大家可以看到一个字典,print t 和 t.str()是一样的。只不过str()将字典内容以字符串形式输出。
看看下面的例子,如果在函数str里返回的不是字符串,请看str1.py

#!/us/bin/env/python

#metaclass = type

#if name == “main“:
class strtest:
def init(self):
self.val = 1
def str(self):
return self.val

if name == “main“:
st=strtest()
print st

$./str1.py
Traceback (most recent call last):
File “./str.py”, line 12, in
print st
TypeError: str returned non-string (type int)
错误的信息提示:str返回了一个非字符串。这时候我们应该这样做:请看str2.py

#!/usr/bin/env python

#metaclass = type

#if name == “main“:
class strtest:
def init(self):
self.val = 1
def str(self):
return str(self.val)

if name == “main“:
st=strtest()
print st

$./str2.py
1
我们用str()将整型变为了字符型。

http://blog.csdn.net/followingturing/article/details/7954204