Python 之分布式并行
多线程模型
消息传递模型
Actor
Actors Mailbox 消息存储机制
CSP
数据并行模型
- 每个节点都有一份模型
- 各个节点取不同的数据,batch_size
- 各个节点完成梯度下降
- 更新参数 (参数服务器按一定规则更新参数)
Task farming (Master/Source <==> Slaver/Worker)
基于线程的并行
- 线程是进程的组件,一个进程中可以有多个线程
- 多个线程共享父进程的内存空间
- 由OS调度
- 例子:浏览器,播放器
 “假多线程”
 全局解释器锁(GIL)防止多线程混乱
 总的流水线,每一个线程来获取一定的执行时间,类似 CPU 时间片
 示例(多线程不一定比单线程快):
 多线程版本:
 ```python
 from threading import Thread
 import time
 “””
 多线程 demo
 “””
 def my_count():
 I=0
 while I <= 10000000:I = I + 1
def main():
    start_time = time.time()
    threads = []
    for i in range(2):
        t = Thread(target=my_count)
        t.start()
        t.join()
        threads.append(t)
    end_time = time.time()
    print(f”Total time : {end_time - start_time}”)
if name == ‘main‘:
    main()
res
Total time : 1.7221925258636475
单线程版本:
```python
from threading import Thread
import time
def my_count():
    I =0
    while I <= 10000000:
        I = I + 1
def main():
    start_time = time.time()
    t = Thread(target=my_count)
    t.start()
    t.join()
    end_time = time.time()
    print(f"Total time : {end_time - start_time}")
if __name__ == '__main__':
    main()
# res
# Total time : 0.922821044921875
示例 (thread.join):
import threading
def f():
    print("Function called by thread ", threading.current_thread().name)
    for i in range(8):
        print(f"thread {threading.current_thread().name} >>> {i}")
    print(f"thread {threading.current_thread().name} ended")
print(f'thread {threading.current_thread().name} is running...')
t = threading.Thread(target=f, name='LoopThread')
t.start()
# 不加 join 主线程不会等待子线程结束
t.join()
print(f'thread {threading.current_thread().name} ended.')
# res
# thread LoopThread >>> 0
# thread LoopThread >>> 1
# thread LoopThread >>> 2
# thread LoopThread >>> 3
# thread LoopThread >>> 4
# thread LoopThread >>> 5
# thread LoopThread >>> 6
# thread LoopThread >>> 7
# thread LoopThread ended
# thread MainThread ended.
基于进程的并行
- 拥有独立的内存空间
- 充分利用多核
- 开销比线程大
 ```python
 from multiprocessing import Process, Pool
 import multiprocessing
 “””
 多进程 demo
 根据 CPU 核数量进行多进程池设定
 “””
 def f(name):
 print(f”thread {multiprocessing.current_process().name}”)
 print(f”Hello {name}”)
def f_1(x):
    return x * x
if name == ‘main‘:
    print(f”thread {multiprocessing.current_process().name}”)
    cpus = multiprocessing.cpu_count()
    print(f”Cpus: {cpus}”)
# p = Process(target=f, args=("Rambo", ))
# p.start()
# p.join()
with Pool(cpus) as p:
    print(p.map(f_1, list(range(3))))
进程间的通信可以用 Queue()
```python
from multiprocessing import Process, Pool, Queue
import multiprocessing
def f(name):
    print(f"thread {multiprocessing.current_process().name}")
    print(f"Hello {name}")
def f_1(x):
    return x * x
def f_2(q):
    # 发送消息
    q.put([1, 2, "xx", "yy"])
if __name__ == '__main__':
    print(f"thread {multiprocessing.current_process().name}")
    cpus = multiprocessing.cpu_count()
    print(f"Cpus: {cpus}")
    q = Queue()
    p = Process(target=f_2, args=(q, ))
    p.start()
    # 拿消息
    print(q.get())
    p.join()
# res
# thread MainProcess
# Cpus: 4
# [1, 2, 'xx', 'yy']
同步问题
- 不管是线程还是进程,都涉及到资源抢占的问题
- 例子:银行账户 
 对操作加锁
 ```python
 from threading import Thread, Lock
 _lock = Lock()
 class Account:
 def init(self, money):- self.money = money self._lock = Lock()- def add(self, money): - self.money = self.money + money- def desc(self, money): - self.money = self.money - money
“””
money = money + n
Thread A                  Thread B
get lock
tmp = money + n           tmp = money + n
money = tmp               money = tmp
release lock
“””
account = Account(1000)
def change(money):
    # lock = Lock()
    # lock.acquire()
account.add(money)
account.desc(money)
    # lock.release()
def run(money):
    global lock
    for  in range(1000000):
        with _lock:
            change(money)
“””
def change(money):
    temp = money
    with threading.Lock():
    acct.add(temp)
    acct.desc(temp)
“””
if name == ‘main‘:
    t1 = Thread(target=run, args=(100, ))
    t2 = Thread(target=run, args=(200, ))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(f”total money: {account.money}”)
res
total money: 1000
```
死锁
例子:哲学家吃面条
分布式理论
- CAP 
 Consistency Availability Partition-Tolerance
 CA 取舍保证容错性
- 基于multiprocessing.managers的分布式进程 - 通过 managers 模块将 Queue 暴露在网络上
- 其他机器上的进程通过网络访问 Queue
- 实现简单的多机 Master/Worker 模型
 
- Celery 
 分布式框架介绍
 消息中间件(message broker): 本身不提供消息服务,可以和第三方消息中间件集成,常用的有 redis mongodb rabbitMQ
 任务执行单元(worker): 是Celery提供的任务执行单元, worker并发的运行在分布式的系统节点中
 任务执行结果存储 (task result store) :用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括Redis,MongoDB,AMQP等
- 分布式实战 
 基于Ray实现MapReduce,用以实现分布式词频统计(WordCount)
请多多指教。
文章标题:Python 之分布式并行
本文作者:顺强
发布时间:2019-09-10, 23:59:00
原始链接:http://shunqiang.ml/python-python-distri/版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。
 
            