Python progress and thread

当你的不同任务间需要大量共享数据或频繁通信时,使用多线程,其他情况下尽量使用多进程。

start — 开启子任务,加入到大任务中
join — 是否让大任务等待自己完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import codecs
import csv
import logging
import multiprocessing
import os
import warnings

import click
import MySQLdb
import sqlalchemy

warnings.filterwarnings('ignore', category=MySQLdb.Warning)

# 批量插入的记录数量
BATCH = 5000

DB_URI = 'mysql://root@localhost:3306/example?charset=utf8'

engine = sqlalchemy.create_engine(DB_URI)


def get_table_cols(table):
sql = 'SELECT * FROM `{table}` LIMIT 0'.format(table=table)
res = engine.execute(sql)
return res.keys()


def insert_many(table, cols, rows, cursor):
sql = 'INSERT INTO `{table}` ({cols}) VALUES ({marks})'.format(
table=table,
cols=', '.join(cols),
marks=', '.join(['%s'] * len(cols)))
cursor.execute(sql, *rows)
logging.info('process %s inserted %s rows into table %s', os.getpid(), len(rows), table)


def insert_worker(table, cols, queue):
rows = []
# 每个子进程创建自己的 engine 对象
cursor = sqlalchemy.create_engine(DB_URI)
while True:
row = queue.get()
if row is None:
if rows:
insert_many(table, cols, rows, cursor)
break

rows.append(row)
if len(rows) == BATCH:
insert_many(table, cols, rows, cursor)
rows = []


def insert_parallel(table, reader, w=10):
cols = get_table_cols(table)

# 数据队列,主进程读文件并往里写数据,worker 进程从队列读数据
# 注意一下控制队列的大小,避免消费太慢导致堆积太多数据,占用过多内存
queue = multiprocessing.Queue(maxsize=w*BATCH*2)
workers = []
for i in range(w):
p = multiprocessing.Process(target=insert_worker, args=(table, cols, queue))
p.start()
workers.append(p)
logging.info('starting # %s worker process, pid: %s...', i + 1, p.pid)

dirty_data_file = './{}_dirty_rows.csv'.format(table)
xf = open(dirty_data_file, 'w')
writer = csv.writer(xf, delimiter=reader.dialect.delimiter)

for line in reader:
# 记录并跳过脏数据: 键值数量不一致
if len(line) != len(cols):
writer.writerow(line)
continue

# 把 None 值替换为 'NULL'
clean_line = [None if x == 'NULL' else x for x in line]

# 只要往队列里写数据,10个worker就会不断的拿取,生产者和消费者模型,主进程读文件,多个 worker 进程执行插入
queue.put(tuple(clean_line))
if reader.line_num % 500000 == 0:
logging.info('put %s tasks into queue.', reader.line_num)

xf.close()

# 给每个 worker 发送任务结束的信号
logging.info('send close signal to worker processes')
for i in range(w):
queue.put(None)

for p in workers:
p.join()


def convert_file_to_utf8(f, rv_file=None):
if not rv_file:
name, ext = os.path.splitext(f)
if isinstance(name, unicode):
name = name.encode('utf8')
rv_file = '{}_utf8{}'.format(name, ext)
logging.info('start to process file %s', f)
with open(f) as infd:
with open(rv_file, 'w') as outfd:
lines = []
loop = 0
chunck = 200000
first_line = infd.readline().strip(codecs.BOM_UTF8).strip() + '\n'
lines.append(first_line)
for line in infd:
clean_line = line.decode('gb18030').encode('utf8')
clean_line = clean_line.rstrip() + '\n'
lines.append(clean_line)
if len(lines) == chunck:
outfd.writelines(lines)
lines = []
loop += 1
logging.info('processed %s lines.', loop * chunck)

outfd.writelines(lines)
logging.info('processed %s lines.', loop * chunck + len(lines))


@click.group()
def cli():
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(name)s - %(message)s')


@cli.command('gbk_to_utf8')
@click.argument('f')
def convert_gbk_to_utf8(f):
convert_file_to_utf8(f)


@cli.command('load')
@click.option('-t', '--table', required=True, help='表名')
@click.option('-i', '--filename', required=True, help='输入文件')
@click.option('-w', '--workers', default=10, help='worker 数量,默认 10')
def load_fac_day_pro_nos_sal_table(table, filename, workers):
with open(filename) as fd:
fd.readline() # skip header
reader = csv.reader(fd)
insert_parallel(table, reader, w=workers)


if __name__ == '__main__':
cli()

Read More

Nmap

之前一直用telnet确定端口是否打开,还有个nmap命令很好用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# 查看对本机开放的端口
ubuntu@ip-10-10-1-74:~$ nmap -F -v 127.0.0.1

Starting Nmap 7.60 ( https://nmap.org ) at 2018-12-09 05:30 UTC
Initiating Ping Scan at 05:30
Scanning 127.0.0.1 [2 ports]
Completed Ping Scan at 05:30, 0.00s elapsed (1 total hosts)
Initiating Connect Scan at 05:30
Scanning localhost (127.0.0.1) [100 ports]
Discovered open port 22/tcp on 127.0.0.1
Discovered open port 80/tcp on 127.0.0.1
Discovered open port 9100/tcp on 127.0.0.1
Completed Connect Scan at 05:30, 0.00s elapsed (100 total ports)
Nmap scan report for localhost (127.0.0.1)
Host is up (0.00014s latency).
Not shown: 97 closed ports
PORT STATE SERVICE
22/tcp open ssh
80/tcp open http
9100/tcp open jetdirect

Read More

docker-compose的语法和命令

TODO

Read More

快速定位大文件

今天查个大文件,用du --max-depth=1 -h, 花了好久的时间才找到,后来发现有个

ncdu 软件很好用,就是做这种事情的

Read More

Redis

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248


QueryTaskTracker

利用redis 实现了,create , save ,update,get_by_task_id,all,prune的方法

其中create创建对象

save 类似于数据库操作的insert(使用的有序集合(不同状态,完成,消费中,的sql),字符串(sql的配置))

update类似于数据库操作的update

get_by_task_id 类似于数据操作的select * from where id =‘xxxx’

all 类似于数据操作的select * from limit xxxx(有序集合的分值区间)

prune 类似于数据库操作的delete

增,sagve

删,prune

改,update

查, all/get_by_task_id


class QueryTaskTracker(object):
DONE_LIST = 'query_task_trackers:done'
WAITING_LIST = 'query_task_trackers:waiting'
IN_PROGRESS_LIST = 'query_task_trackers:in_progress'
ALL_LISTS = (DONE_LIST, WAITING_LIST, IN_PROGRESS_LIST)

def __init__(self, data):
self.data = data

@classmethod
def create(cls, task_id, state, query_hash, data_source_id, scheduled, metadata):
data = dict(task_id=task_id, state=state,
query_hash=query_hash, data_source_id=data_source_id,
scheduled=scheduled,
username=metadata.get('Username', 'unknown'),
query_id=metadata.get('Query ID', 'unknown'),
retries=0,
scheduled_retries=0,
created_at=time.time(),
started_at=None,
run_time=None)

return cls(data)

def save(self, connection=None):
if connection is None:
connection = redis_connection

self.data['updated_at'] = time.time()
key_name = self._key_name(self.data['task_id'])
connection.set(key_name, utils.json_dumps(self.data))
connection.zadd(self._get_list(), time.time(), key_name)

for l in self.ALL_LISTS:
if l != self._get_list():
connection.zrem(l, key_name)

# TOOD: this is not thread/concurrency safe. In current code this is not an issue, but better to fix this.
def update(self, **kwargs):
self.data.update(kwargs)
self.save()

@staticmethod
def _key_name(task_id):
return 'query_task_tracker:{}'.format(task_id)

def _get_list(self):
if self.state in ('finished', 'failed', 'cancelled'):
return self.DONE_LIST

if self.state in ('created'):
return self.WAITING_LIST

return self.IN_PROGRESS_LIST

@classmethod
def get_by_task_id(cls, task_id, connection=None):
if connection is None:
connection = redis_connection

key_name = cls._key_name(task_id)
data = connection.get(key_name)
return cls.create_from_data(data)

@classmethod
def create_from_data(cls, data):
if data:
data = json.loads(data)
return cls(data)

return None

@classmethod
def all(cls, list_name, offset=0, limit=-1):
if limit != -1:
limit -= 1

if offset != 0:
offset -= 1


# 有序集合的获取
ids = redis_connection.zrevrange(list_name, offset, limit)
pipe = redis_connection.pipeline()
for id in ids:
pipe.get(id)

tasks = [cls.create_from_data(data) for data in pipe.execute()]
return tasks

@classmethod
# 删除
def prune(cls, list_name, keep_count, max_keys=100):

# 集合的元素数量
count = redis_connection.zcard(list_name)

if count <= keep_count:
return 0


# 一次最多100
remove_count = min(max_keys, count - keep_count)


# 获取有序集合0,到remove_count的元素
keys = redis_connection.zrange(list_name, 0, remove_count - 1)
# 删除元素对应的value
redis_connection.delete(*keys)

# 从有序集合移除上面不再使用的key
redis_connection.zremrangebyrank(list_name, 0, remove_count - 1)

return remove_count

def __getattr__(self, item):
return self.data[item]

def __contains__(self, item):
return item in self.data


def create_redis_connection():
logging.debug("Creating Redis connection (%s)", settings.REDIS_URL)
redis_url = urlparse.urlparse(settings.REDIS_URL)

if redis_url.scheme == 'redis+socket':
qs = urlparse.parse_qs(redis_url.query)
if 'virtual_host' in qs:
db = qs['virtual_host'][0]
else:
db = 0

r = redis.StrictRedis(unix_socket_path=redis_url.path, db=db)
else:
if redis_url.path:
redis_db = redis_url.path[1]
else:
redis_db = 0
# Redis passwords might be quoted with special characters
redis_password = redis_url.password and urllib.unquote(redis_url.password)
r = redis.StrictRedis(host=redis_url.hostname, port=redis_url.port, db=redis_db, password=redis_password)

return r


redis_connection = create_redis_connection()

# String


redis_connection.set(key_name, utils.json_dumps(self.data))


# Hash

获取
status = redis_connection.hgetall('redash:status')

status.get('last_refresh_at', now)

redis_connection.hmset('redash:status', {
'outdated_queries_count': outdated_queries_count,
'last_refresh_at': now,
'query_ids': json.dumps(query_ids)
})


# SortedSet 用的最多,可以根据score大小排序当做list使用

获取指定分值区间的值 zrevrange

ids = redis_connection.zrevrange(list_name, offset, limit)
pipe = redis_connection.pipeline()
for id in ids:
pipe.get(id)

tasks = [cls.create_from_data(data) for data in pipe.execute()]
return tasks


# Set

获取所有
blacklist = [int(ds_id) for ds_id in
redis_connection.smembers('data_sources:schema:blacklist') if ds_id]


redis> SADD language Ruby Python Clojure
(integer) 3

redis> SMEMBERS language
1) "Python"
2) "Ruby"
3) "Clojure"


设置

key是完成,进行,失败, value(key_name)是sql_id, time.time()是score

connection.zadd(self._get_list(), time.time(), key_name)

删除


某个key,对应set里的key_name

for l in self.ALL_LISTS:
if l != self._get_list():
connection.zrem(l, key_name)


def _get_list(self):
if self.state in ('finished', 'failed', 'cancelled'):
return self.DONE_LIST

if self.state in ('created'):
return self.WAITING_LIST

return self.IN_PROGRESS_LIST

Read More

SSH config使用教程

SSH config是Linux系统下针对SSH客户端的一个参数配置方案,可以将一些关于SSH命令的参数放到配置文件中去,执行ssh命令的时候从文件中读取,简化命令行的操作。

配置ssh的config

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26


# configuration 1
Host cluster
HostName 192.168.11.11
User tom


# configuration 2
Host aliyun
HostName=202.44.2.2
User tom




# configuration 3


Host elk
HostName 10.10.1.74
User ubuntu

#跳板机
ProxyCommand ssh -W %h:%p fhp@54.157.70.250
ForwardAgent yes

Read More

Nginx配置本地域名路由

在nginx中配置proxy_pass代理转发时,如果在proxy_pass后面的url加/,表示绝对根路径;如果没有/,表示相对路径,把匹配的路径部分也给代理走。

假设下面四种情况分别用 http://192.168.1.1/proxy/test.html 进行访问

Read More

Nginx负载均衡

转发的文章 https://www.shixinke.com/nginx/nginx-upstream

一、什么是负载均衡?
根据一定的规则将请求分发到不同的服务器上,让各个服务器分摊请求,而不是让一台服务器来处理请求

Read More

Nginx配置本地域名路由

在nginx中配置proxy_pass代理转发时,如果在proxy_pass后面的url加/,表示绝对根路径;如果没有/,表示相对路径,把匹配的路径部分也给代理走。

假设下面四种情况分别用 http://192.168.1.1/proxy/test.html 进行访问

Read More

Nginx配置本地域名路由

处理过程

你在浏览器中 输入一个网址 会先check一下你本地的hosts 文件 如果有做映射的话 就直接通过映射的ip访问你的 web服务器(这边是nginx) 当这个请求被nginx 获得后 他会check一下请求的域名和servername是否匹配,匹配到的话 就根据相应的配置返回内容, 没有匹配到的话 就根据默认的配置返回内容。

Read More