async awit

https://juejin.im/post/5b3f540af265da0f742ec5e1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

import asyncio
import sys

def set_loop():
"""Attempt to use uvloop. 更快的事件轮询"""
import asyncio
from asyncio.events import BaseDefaultEventLoopPolicy

policy = None

if sys.platform == 'win32':
if hasattr(asyncio, 'WindowsProactorEventLoopPolicy'):
# pylint: disable=no-member
policy = asyncio.WindowsProactorEventLoopPolicy()
else:
class ProactorPolicy(BaseDefaultEventLoopPolicy):
"""Event loop policy to create proactor loops."""

_loop_factory = asyncio.ProactorEventLoop

policy = ProactorPolicy()
else:
try:
import uvloop
except ImportError:
pass
else:
policy = uvloop.EventLoopPolicy()

if policy is not None:
asyncio.set_event_loop_policy(policy)

async def slow_operation(n):
await asyncio.sleep(n)
print('Slow operation {} complete'.format(n))
return n
all_tasks = [
asyncio.ensure_future(
slow_operation(1)
),
slow_operation(2),
slow_operation(9),
slow_operation(2),
slow_operation(1),
slow_operation(2),
slow_operation(3),
]

set_loop()
loop = asyncio.get_event_loop()
dones, pendings = loop.run_until_complete(asyncio.wait( all_tasks ))
for fut in dones:
print("return value is {}".format(fut.result()))


# results = await asyncio.gather(*all_tasks)
# for result in results:
# print('Task ret: ', result)
#

Read More

celery for prometheus monitoring

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# coding=utf-8
import celery
import celery.states
import celery.events
import collections
from itertools import chain
import logging
import prometheus_client
import sys
from threading import Thread
import time
import json
import os
from app.tasks import celery as app

# 监测的指标

# 在线worker数量
WORKERS = prometheus_client.Gauge('celery_workers', 'Number of alive workers')

# 每种状态任务的数量
TASKS = prometheus_client.Gauge('celery_tasks', 'Number of tasks per state', ['state'])
# 每种状态任务的名字和数量, celery所有任务概览
TASKS_NAME = prometheus_client.Gauge('celery_tasks_by_name', 'Number of tasks per state and name', ['state', 'name'])
# 每个任务的执行时间,监测任务本身性能,用于优化sql
TASKS_RUNTIME = prometheus_client.Histogram('celery_tasks_runtime_seconds', 'Task runtime (seconds)', ['name'])
# 每个任务的启动时间,监测阻塞情况, 用于分配调节worker数量
LATENCY = prometheus_client.Histogram('celery_task_latency', 'Seconds between a task is received and started.')

logger = logging.getLogger(__name__)


class WorkerMonitoring:

def __init__(self, app):
self._app = app

def run(self):
while True:
self.update_workers_count()
time.sleep(5)

def update_workers_count(self):
try:
WORKERS.set(len(self._app.control.ping(timeout=5)))
except Exception as exc:
logger.exception("Error while pinging workers")


class TaskMonitoring:

def __init__(self, app):
self._app = app
self._state = self._app.events.State()
self._known_states = set()
self._known_states_names = set()

def run(self):
self._monitor()

def _process_event(self, event):
print(event)
# 时间可能并发过来,加锁
with self._state._mutex:
if event['type'] != 'worker-heartbeat':
event_type = event['type'][5:]
state = celery.events.state.TASK_EVENT_TO_STATE[event_type]
if state == celery.states.STARTED:
# 监测启动时间
self._observe_latency(event)

self._collect_tasks(event, state)

def _observe_latency(self, event):
try:
prev_evt = self._state.tasks[event['uuid']]
except KeyError:
pass
else:
if prev_evt.state == celery.states.RECEIVED:
LATENCY.observe(
event['local_received'] - prev_evt.local_received)

def _collect_tasks(self, event, state):
if state in celery.states.READY_STATES:
self._incr_ready_task(event, state)
else:

self._state._event(event)

self._collect_unready_tasks()

def _incr_ready_task(self, event, state):

# 'FAILURE', 'REVOKED', 'SUCCESS' 任务信息
TASKS.labels(state=state).inc()
try:
name = self._state.tasks.pop(event['uuid']).name
runtime = event.get('runtime')

if name is not None and runtime is not None:
TASKS_RUNTIME.labels(name=name).observe(runtime)
except (KeyError, AttributeError):
pass

def _collect_unready_tasks(self):
# 'PENDING', 'RECEIVED', 'REJECTED', 'RETRY', 'STARTED 任务信息
cnt = collections.Counter(t.state for t in self._state.tasks.values())
self._known_states.update(cnt.elements())
for task_state in self._known_states:
TASKS.labels(state=task_state).set(cnt[task_state])

cnt = collections.Counter((t.state, t.name) for t in self._state.tasks.values() if t.name)
self._known_states_names.update(cnt.elements())
for task_state in self._known_states_names:
TASKS_NAME.labels(state=task_state[0], name=task_state[1], ).set(cnt[task_state])

def _monitor(self):
while True:
try:
with self._app.connection() as conn:
# 从broker接收所有的事件,并交给process_event处理
logger.info("Try to connect to broker")
recv = self._app.events.Receiver(conn, handlers={'*': self._process_event, })

setup_metrics(self._app)
recv.capture(limit=None, timeout=None, wakeup=True)
logger.info("Connected to broker")

except Exception as e:
logger.exception("Queue connection failed")
setup_metrics(self._app)
time.sleep(5)


def setup_metrics(app):
WORKERS.set(0)
try:
registered_tasks = app.control.inspect().registered_tasks().values()
except Exception as e:

for metric in TASKS.collect():
for name, labels, cnt in metric.samples:
TASKS.labels(**labels).set(0)
for metric in TASKS_NAME.collect():
for name, labels, cnt in metric.samples:
TASKS_NAME.labels(**labels).set(0)
else:

# 'FAILURE', 'PENDING', 'RECEIVED', 'RETRY', 'REVOKED', 'STARTED', 'SUCCESS'
for state in celery.states.ALL_STATES:

TASKS.labels(state=state).set(0)
for task_name in set(chain.from_iterable(registered_tasks)):
TASKS_NAME.labels(state=state, name=task_name).set(0)


class EnableEvents:

# celery有个问题,即使配置了 CELERY_SEND_EVENTS,也不发送事件,采取这种方式

def __init__(self, app):
self._app = app

def run(self): # pragma: no cover
while True:
try:
self.enable_events()
except Exception as exc:
self.log.exception("Error while trying to enable events")
time.sleep(5)

def enable_events(self):
self._app.control.enable_events()

def start_httpd(addr):
host, port = addr.split(':')
logging.info('Starting HTTPD on {}:{}'.format(host, port))
prometheus_client.start_http_server(int(port), host)


def celery_monitoring():
setup_metrics(app)

e = Thread(target=EnableEvents(app).run)
e.daemon = True
e.start()

w = Thread(target=WorkerMonitoring(app).run)
w.daemon = True
w.start()

t = Thread(target=TaskMonitoring(app).run)
t.daemon = True
t.start()

start_httpd('0.0.0.0:49792')

t.join()
w.join()
e.join()

@manager.command
def start_celery_monitoring():
"""
nohup python manage.py start_celery_monitoring &
"""
celery_monitoring()

Read More

缓存property

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

from threading import RLock

_missing = object()


class locked_cached_property(object):
"""A decorator that converts a function into a lazy property. The
function wrapped is called the first time to retrieve the result
and then that calculated result is used the next time you access
the value. Works like the one in Werkzeug but has a lock for
thread safety.
"""

def __init__(self, func, name=None, doc=None):
self.__name__ = name or func.__name__
self.__module__ = func.__module__
self.__doc__ = doc or func.__doc__
self.func = func
self.lock = RLock()

def __get__(self, obj, type=None):
if obj is None:
return self
with self.lock:
value = obj.__dict__.get(self.__name__, _missing)
if value is _missing:
value = self.func(obj)
obj.__dict__[self.__name__] = value
return value

In [27]: class B:
...: @locked_cached_property
...: def foo(self):
...: import time
...:
...: print('processing....')
...: time.sleep(10)
...: return 'b'
...:
...:

In [28]: b = B()

In [29]: b.foo
processing....
Out[29]: 'b'

In [30]: b.foo
Out[30]: 'b'

In [31]:

Read More

ELK stack

Docker-compose 配置

Read More

proc目录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
以下是/proc目录中进程N的信息

/proc/N pid为N的进程信息

/proc/N/cmdline 进程启动命令

/proc/N/cwd 链接到进程当前工作目录

/proc/N/environ 进程环境变量列表

/proc/N/exe 链接到进程的执行命令文件

/proc/N/fd 包含进程相关的所有的文件描述符

/proc/N/maps 与进程相关的内存映射信息

/proc/N/mem 指代进程持有的内存,不可读

/proc/N/root 链接到进程的根目录

/proc/N/stat 进程的状态

/proc/N/statm 进程使用的内存的状态

/proc/N/status 进程状态信息,比stat/statm更具可读性

/proc/self 链接到当前正在运行的进程

Read More

磁盘

df

df查看的是file system, 也就是文件系统层的所占的磁盘大小。

#lsblk

lsblk 查看的是block device,也就是逻辑磁盘真实大小。

Read More

Sqlalchemy 的级联删除配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class BaseModel(Model):
id = Column(Integer, primary_key=True)
created_at = Column(DateTime(True), default=func.now(), nullable=False)
updated_at = Column(DateTime(True), default=func.now(), onupdate=func.now(), nullable=False)

@classmethod
def create(cls, **kw):
session = db.session
if 'id' in kw:
obj = session.query(cls).get(kw['id'])
if obj:
return obj
obj = cls(**kw)
session.add(obj)
session.commit()
return obj

def to_dict(self):
columns = self.__table__.columns.keys()
return {key: getattr(self, key) for key in columns}


app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///../cnblogblog.db'

db = SQLAlchemy(app, model_class=BaseModel)


######################################################################################################

class Parent(db.Model):
__tablename__ = 'parent'
id = Column(Integer, primary_key=True)
name = Column(String(20))


class Child(db.Model):
__tablename__ = 'child'
id = Column(Integer, primary_key=True)
name = Column(String(20))

parent_id = Column(Integer, ForeignKey('parent.id'))
parent = relationship("Parent", backref=backref("child"))

### 只删除父级,子不影响
# 1. parent_id = Column(Integer, ForeignKey('parent.id', ondelete="CASCADE"))
# parent = relationship("Parent", backref=backref("child", passive_deletes=True))

### 子级跟随删除
# 2. parent = relationship("Parent", backref = backref("child", cascade="all, delete-orphan"))
# 3. parent = relationship("Parent", backref = backref("child", cascade="all,delete"))

## 父级删除,子级不删除,外键更新为 null
# 4. parent = relationship("Parent", backref = backref("child"))


if __name__ == '__main__':
db.create_all()
Parent.create(name='ZhangTian')
Parent.create(name='LiTian')
Child.create(name='ZhangDi', parent_id=1)
Child.create(name='LiDi', parent_id=2)

parent = db.session.query(Parent).first()
db.session.delete(parent)
db.session.commit()

Read More

参数装饰器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import functools


# 第一种
##################################################
def require_permission(permissions):
def decorated(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
has_permissions = permissions
if has_permissions:
return func(*args, **kwargs)
else:
print(403)

return wrapper

return decorated


@require_permission(True)
def foo():
print('foo')


@require_permission(False)
def foo():
print('foo')




# 第二种
#####################################################################
class require_permissions(object):
def __init__(self, permissions):
self.permissions = permissions

def __call__(self, fn):
@functools.wraps(fn)
def decorated(*args, **kwargs):

has_permissions = self.permissions

if has_permissions:
return fn(*args, **kwargs)
else:
print(403)

return decorated


def require_permission(permission):
return require_permissions(permission)


@require_permission(True)
def foo1():
print('foo')


if __name__ == '__main__':
foo()
foo1()

Read More

进程的通信方式

Python的进程间通信
进程间通讯有多种方式,包括信号,管道,消息队列,信号量,共享内存,socket等

#共享内存

Python可以通过mmap模块实现进程之间的共享内存

mmap文件对象既像一个字符串也像一个普通文件对象。像字符串时因为我们可以改变其中的单个字符,如,obj[index] = ‘a’,同时我们也可以改变一小段的字符,如 obj[2:5]=’aaa’。像文件对象是因为在mmap中会有操作标记,我们可以使用seek()方法来改变mmap对象的操作标记
mmap对象通过mmap()方法来构建,Windows系统和Unix系统下,构建方法是不同的。
window的构造方法:
class mmap.mmap(fileno, length[, tagname[, access[, offset]]])
linux的构造方法:
class mmap.mmap(fileno, length[, flags[, prot[, access[, offset]]]])

Read More

日志搜索

1
2

grep -RF 'error' celery-default.log* | cut -d ':' -f1 | uniq -c

Read More