Python 之分布式并行
多线程模型
消息传递模型
Actor
Actors Mailbox 消息存储机制
CSP
数据并行模型
- 每个节点都有一份模型
- 各个节点取不同的数据,batch_size
- 各个节点完成梯度下降
- 更新参数 (参数服务器按一定规则更新参数)
Task farming (Master/Source <==> Slaver/Worker)
基于线程的并行
- 线程是进程的组件,一个进程中可以有多个线程
- 多个线程共享父进程的内存空间
- 由OS调度
- 例子:浏览器,播放器
“假多线程”
全局解释器锁(GIL)防止多线程混乱
总的流水线,每一个线程来获取一定的执行时间,类似 CPU 时间片
示例(多线程不一定比单线程快):
多线程版本:
```python
from threading import Thread
import time
“””
多线程 demo
“””
def my_count():
I=0
while I <= 10000000:I = I + 1
def main():
start_time = time.time()
threads = []
for i in range(2):
t = Thread(target=my_count)
t.start()
t.join()
threads.append(t)
end_time = time.time()
print(f”Total time : {end_time - start_time}”)
if name == ‘main‘:
main()
res
Total time : 1.7221925258636475
单线程版本:
```python
from threading import Thread
import time
def my_count():
I =0
while I <= 10000000:
I = I + 1
def main():
start_time = time.time()
t = Thread(target=my_count)
t.start()
t.join()
end_time = time.time()
print(f"Total time : {end_time - start_time}")
if __name__ == '__main__':
main()
# res
# Total time : 0.922821044921875
示例 (thread.join):
import threading
def f():
print("Function called by thread ", threading.current_thread().name)
for i in range(8):
print(f"thread {threading.current_thread().name} >>> {i}")
print(f"thread {threading.current_thread().name} ended")
print(f'thread {threading.current_thread().name} is running...')
t = threading.Thread(target=f, name='LoopThread')
t.start()
# 不加 join 主线程不会等待子线程结束
t.join()
print(f'thread {threading.current_thread().name} ended.')
# res
# thread LoopThread >>> 0
# thread LoopThread >>> 1
# thread LoopThread >>> 2
# thread LoopThread >>> 3
# thread LoopThread >>> 4
# thread LoopThread >>> 5
# thread LoopThread >>> 6
# thread LoopThread >>> 7
# thread LoopThread ended
# thread MainThread ended.
基于进程的并行
- 拥有独立的内存空间
- 充分利用多核
- 开销比线程大
```python
from multiprocessing import Process, Pool
import multiprocessing
“””
多进程 demo
根据 CPU 核数量进行多进程池设定
“””
def f(name):
print(f”thread {multiprocessing.current_process().name}”)
print(f”Hello {name}”)
def f_1(x):
return x * x
if name == ‘main‘:
print(f”thread {multiprocessing.current_process().name}”)
cpus = multiprocessing.cpu_count()
print(f”Cpus: {cpus}”)
# p = Process(target=f, args=("Rambo", ))
# p.start()
# p.join()
with Pool(cpus) as p:
print(p.map(f_1, list(range(3))))
进程间的通信可以用 Queue()
```python
from multiprocessing import Process, Pool, Queue
import multiprocessing
def f(name):
print(f"thread {multiprocessing.current_process().name}")
print(f"Hello {name}")
def f_1(x):
return x * x
def f_2(q):
# 发送消息
q.put([1, 2, "xx", "yy"])
if __name__ == '__main__':
print(f"thread {multiprocessing.current_process().name}")
cpus = multiprocessing.cpu_count()
print(f"Cpus: {cpus}")
q = Queue()
p = Process(target=f_2, args=(q, ))
p.start()
# 拿消息
print(q.get())
p.join()
# res
# thread MainProcess
# Cpus: 4
# [1, 2, 'xx', 'yy']
同步问题
- 不管是线程还是进程,都涉及到资源抢占的问题
例子:银行账户
对操作加锁
```python
from threading import Thread, Lock
_lock = Lock()
class Account:
def init(self, money):self.money = money self._lock = Lock()
def add(self, money):
self.money = self.money + money
def desc(self, money):
self.money = self.money - money
“””
money = money + n
Thread A Thread B
get lock
tmp = money + n tmp = money + n
money = tmp money = tmp
release lock
“””
account = Account(1000)
def change(money):
# lock = Lock()
# lock.acquire()
account.add(money)
account.desc(money)
# lock.release()
def run(money):
global lock
for in range(1000000):
with _lock:
change(money)
“””
def change(money):
temp = money
with threading.Lock():
acct.add(temp)
acct.desc(temp)
“””
if name == ‘main‘:
t1 = Thread(target=run, args=(100, ))
t2 = Thread(target=run, args=(200, ))
t1.start()
t2.start()
t1.join()
t2.join()
print(f”total money: {account.money}”)
res
total money: 1000
```
死锁
例子:哲学家吃面条
分布式理论
CAP
Consistency Availability Partition-Tolerance
CA 取舍保证容错性基于multiprocessing.managers的分布式进程
- 通过 managers 模块将 Queue 暴露在网络上
- 其他机器上的进程通过网络访问 Queue
- 实现简单的多机 Master/Worker 模型
Celery
分布式框架介绍
消息中间件(message broker): 本身不提供消息服务,可以和第三方消息中间件集成,常用的有 redis mongodb rabbitMQ
任务执行单元(worker): 是Celery提供的任务执行单元, worker并发的运行在分布式的系统节点中
任务执行结果存储 (task result store) :用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括Redis,MongoDB,AMQP等分布式实战
基于Ray实现MapReduce,用以实现分布式词频统计(WordCount)
请多多指教。
文章标题:Python 之分布式并行
本文作者:顺强
发布时间:2019-09-10, 23:59:00
原始链接:http://shunqiang.ml/python-python-distri/版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。