Python 之分布式并行

  1. 数据并行模型
  2. 基于线程的并行
  • res
  • Total time : 1.7221925258636475
    1. 基于进程的并行
    2. 同步问题
  • res
  • total money: 1000
    1. 死锁
  • 分布式理论
  • 多线程模型
    消息传递模型
    Actor
    Actors Mailbox 消息存储机制
    CSP

    数据并行模型

    • 每个节点都有一份模型
    • 各个节点取不同的数据,batch_size
    • 各个节点完成梯度下降
    • 更新参数 (参数服务器按一定规则更新参数)

    Task farming (Master/Source <==> Slaver/Worker)

    基于线程的并行

    • 线程是进程的组件,一个进程中可以有多个线程
    • 多个线程共享父进程的内存空间
    • 由OS调度
    • 例子:浏览器,播放器
      “假多线程”
      全局解释器锁(GIL)防止多线程混乱
      总的流水线,每一个线程来获取一定的执行时间,类似 CPU 时间片
      示例(多线程不一定比单线程快):
      多线程版本:
      ```python
      from threading import Thread
      import time
      “””
      多线程 demo
      “””
      def my_count():
      I=0
      while I <= 10000000:
        I = I + 1
      

    def main():
    start_time = time.time()
    threads = []
    for i in range(2):
    t = Thread(target=my_count)
    t.start()
    t.join()
    threads.append(t)
    end_time = time.time()
    print(f”Total time : {end_time - start_time}”)

    if name == ‘main‘:
    main()

    res

    Total time : 1.7221925258636475

    单线程版本:
    ```python
    from threading import Thread
    import time
    def my_count():
        I =0
        while I <= 10000000:
            I = I + 1
    
    def main():
        start_time = time.time()
        t = Thread(target=my_count)
        t.start()
        t.join()
        end_time = time.time()
        print(f"Total time : {end_time - start_time}")
    
    if __name__ == '__main__':
        main()
    # res
    # Total time : 0.922821044921875
    

    示例 (thread.join):

    import threading
    
    def f():
        print("Function called by thread ", threading.current_thread().name)
        for i in range(8):
            print(f"thread {threading.current_thread().name} >>> {i}")
    
        print(f"thread {threading.current_thread().name} ended")
    print(f'thread {threading.current_thread().name} is running...')
    t = threading.Thread(target=f, name='LoopThread')
    t.start()
    # 不加 join 主线程不会等待子线程结束
    t.join()
    print(f'thread {threading.current_thread().name} ended.')
    # res
    # thread LoopThread >>> 0
    # thread LoopThread >>> 1
    # thread LoopThread >>> 2
    # thread LoopThread >>> 3
    # thread LoopThread >>> 4
    # thread LoopThread >>> 5
    # thread LoopThread >>> 6
    # thread LoopThread >>> 7
    # thread LoopThread ended
    # thread MainThread ended.
    

    基于进程的并行

    • 拥有独立的内存空间
    • 充分利用多核
    • 开销比线程大
      ```python
      from multiprocessing import Process, Pool
      import multiprocessing
      “””
      多进程 demo
      根据 CPU 核数量进行多进程池设定
      “””
      def f(name):
      print(f”thread {multiprocessing.current_process().name}”)
      print(f”Hello {name}”)

    def f_1(x):
    return x * x

    if name == ‘main‘:
    print(f”thread {multiprocessing.current_process().name}”)
    cpus = multiprocessing.cpu_count()
    print(f”Cpus: {cpus}”)

    # p = Process(target=f, args=("Rambo", ))
    # p.start()
    # p.join()
    with Pool(cpus) as p:
        print(p.map(f_1, list(range(3))))
    
    进程间的通信可以用 Queue()
    ```python
    from multiprocessing import Process, Pool, Queue
    import multiprocessing
    
    def f(name):
        print(f"thread {multiprocessing.current_process().name}")
        print(f"Hello {name}")
    
    def f_1(x):
        return x * x
    
    def f_2(q):
        # 发送消息
        q.put([1, 2, "xx", "yy"])
    
    if __name__ == '__main__':
        print(f"thread {multiprocessing.current_process().name}")
        cpus = multiprocessing.cpu_count()
        print(f"Cpus: {cpus}")
        q = Queue()
        p = Process(target=f_2, args=(q, ))
        p.start()
        # 拿消息
        print(q.get())
        p.join()
    # res
    # thread MainProcess
    # Cpus: 4
    # [1, 2, 'xx', 'yy']
    

    同步问题

    • 不管是线程还是进程,都涉及到资源抢占的问题
    • 例子:银行账户
      对操作加锁
      ```python
      from threading import Thread, Lock
      _lock = Lock()
      class Account:
      def init(self, money):

        self.money = money
        self._lock = Lock()
      

      def add(self, money):

        self.money = self.money + money
      

      def desc(self, money):

        self.money = self.money - money
      

    “””
    money = money + n

    Thread A Thread B
    get lock
    tmp = money + n tmp = money + n
    money = tmp money = tmp
    release lock

    “””
    account = Account(1000)

    def change(money):

        # lock = Lock()
        # lock.acquire()
    account.add(money)
    account.desc(money)
        # lock.release()
    

    def run(money):
    global lock
    for
    in range(1000000):
    with _lock:
    change(money)
    “””
    def change(money):
    temp = money
    with threading.Lock():
    acct.add(temp)
    acct.desc(temp)
    “””
    if name == ‘main‘:
    t1 = Thread(target=run, args=(100, ))
    t2 = Thread(target=run, args=(200, ))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(f”total money: {account.money}”)

    res

    total money: 1000

    ```

    死锁

    例子:哲学家吃面条

    分布式理论

    1. CAP
      Consistency Availability Partition-Tolerance
      CA 取舍保证容错性

    2. 基于multiprocessing.managers的分布式进程

      • 通过 managers 模块将 Queue 暴露在网络上
      • 其他机器上的进程通过网络访问 Queue
      • 实现简单的多机 Master/Worker 模型
    3. Celery
      分布式框架介绍
      消息中间件(message broker): 本身不提供消息服务,可以和第三方消息中间件集成,常用的有 redis mongodb rabbitMQ
      任务执行单元(worker): 是Celery提供的任务执行单元, worker并发的运行在分布式的系统节点中
      任务执行结果存储 (task result store) :用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括Redis,MongoDB,AMQP等

    4. 分布式实战
      基于Ray实现MapReduce,用以实现分布式词频统计(WordCount)


    请多多指教。

    文章标题:Python 之分布式并行

    本文作者:顺强

    发布时间:2019-09-10, 23:59:00

    原始链接:http://shunqiang.ml/python-python-distri/

    版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。

    目录
    ×

    喜欢就点赞,疼爱就打赏