1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# coding=utf-8
import celery
import celery.states
import celery.events
import collections
from itertools import chain
import logging
import prometheus_client
import sys
from threading import Thread
import time
import json
import os
from app.tasks import celery as app

# 监测的指标

# 在线worker数量
WORKERS = prometheus_client.Gauge('celery_workers', 'Number of alive workers')

# 每种状态任务的数量
TASKS = prometheus_client.Gauge('celery_tasks', 'Number of tasks per state', ['state'])
# 每种状态任务的名字和数量, celery所有任务概览
TASKS_NAME = prometheus_client.Gauge('celery_tasks_by_name', 'Number of tasks per state and name', ['state', 'name'])
# 每个任务的执行时间,监测任务本身性能,用于优化sql
TASKS_RUNTIME = prometheus_client.Histogram('celery_tasks_runtime_seconds', 'Task runtime (seconds)', ['name'])
# 每个任务的启动时间,监测阻塞情况, 用于分配调节worker数量
LATENCY = prometheus_client.Histogram('celery_task_latency', 'Seconds between a task is received and started.')

logger = logging.getLogger(__name__)


class WorkerMonitoring:

def __init__(self, app):
self._app = app

def run(self):
while True:
self.update_workers_count()
time.sleep(5)

def update_workers_count(self):
try:
WORKERS.set(len(self._app.control.ping(timeout=5)))
except Exception as exc:
logger.exception("Error while pinging workers")


class TaskMonitoring:

def __init__(self, app):
self._app = app
self._state = self._app.events.State()
self._known_states = set()
self._known_states_names = set()

def run(self):
self._monitor()

def _process_event(self, event):
print(event)
# 时间可能并发过来,加锁
with self._state._mutex:
if event['type'] != 'worker-heartbeat':
event_type = event['type'][5:]
state = celery.events.state.TASK_EVENT_TO_STATE[event_type]
if state == celery.states.STARTED:
# 监测启动时间
self._observe_latency(event)

self._collect_tasks(event, state)

def _observe_latency(self, event):
try:
prev_evt = self._state.tasks[event['uuid']]
except KeyError:
pass
else:
if prev_evt.state == celery.states.RECEIVED:
LATENCY.observe(
event['local_received'] - prev_evt.local_received)

def _collect_tasks(self, event, state):
if state in celery.states.READY_STATES:
self._incr_ready_task(event, state)
else:

self._state._event(event)

self._collect_unready_tasks()

def _incr_ready_task(self, event, state):

# 'FAILURE', 'REVOKED', 'SUCCESS' 任务信息
TASKS.labels(state=state).inc()
try:
name = self._state.tasks.pop(event['uuid']).name
runtime = event.get('runtime')

if name is not None and runtime is not None:
TASKS_RUNTIME.labels(name=name).observe(runtime)
except (KeyError, AttributeError):
pass

def _collect_unready_tasks(self):
# 'PENDING', 'RECEIVED', 'REJECTED', 'RETRY', 'STARTED 任务信息
cnt = collections.Counter(t.state for t in self._state.tasks.values())
self._known_states.update(cnt.elements())
for task_state in self._known_states:
TASKS.labels(state=task_state).set(cnt[task_state])

cnt = collections.Counter((t.state, t.name) for t in self._state.tasks.values() if t.name)
self._known_states_names.update(cnt.elements())
for task_state in self._known_states_names:
TASKS_NAME.labels(state=task_state[0], name=task_state[1], ).set(cnt[task_state])

def _monitor(self):
while True:
try:
with self._app.connection() as conn:
# 从broker接收所有的事件,并交给process_event处理
logger.info("Try to connect to broker")
recv = self._app.events.Receiver(conn, handlers={'*': self._process_event, })

setup_metrics(self._app)
recv.capture(limit=None, timeout=None, wakeup=True)
logger.info("Connected to broker")

except Exception as e:
logger.exception("Queue connection failed")
setup_metrics(self._app)
time.sleep(5)


def setup_metrics(app):
WORKERS.set(0)
try:
registered_tasks = app.control.inspect().registered_tasks().values()
except Exception as e:

for metric in TASKS.collect():
for name, labels, cnt in metric.samples:
TASKS.labels(**labels).set(0)
for metric in TASKS_NAME.collect():
for name, labels, cnt in metric.samples:
TASKS_NAME.labels(**labels).set(0)
else:

# 'FAILURE', 'PENDING', 'RECEIVED', 'RETRY', 'REVOKED', 'STARTED', 'SUCCESS'
for state in celery.states.ALL_STATES:

TASKS.labels(state=state).set(0)
for task_name in set(chain.from_iterable(registered_tasks)):
TASKS_NAME.labels(state=state, name=task_name).set(0)


class EnableEvents:

# celery有个问题,即使配置了 CELERY_SEND_EVENTS,也不发送事件,采取这种方式

def __init__(self, app):
self._app = app

def run(self): # pragma: no cover
while True:
try:
self.enable_events()
except Exception as exc:
self.log.exception("Error while trying to enable events")
time.sleep(5)

def enable_events(self):
self._app.control.enable_events()

def start_httpd(addr):
host, port = addr.split(':')
logging.info('Starting HTTPD on {}:{}'.format(host, port))
prometheus_client.start_http_server(int(port), host)


def celery_monitoring():
setup_metrics(app)

e = Thread(target=EnableEvents(app).run)
e.daemon = True
e.start()

w = Thread(target=WorkerMonitoring(app).run)
w.daemon = True
w.start()

t = Thread(target=TaskMonitoring(app).run)
t.daemon = True
t.start()

start_httpd('0.0.0.0:49792')

t.join()
w.join()
e.join()

@manager.command
def start_celery_monitoring():
"""
nohup python manage.py start_celery_monitoring &
"""
celery_monitoring()