最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

分布式计算框架

网站源码admin4浏览0评论

分布式计算框架

开源地址:

Ray是一个高性能的分布式计算框架,在AI和大模型领域得到了广泛应用,OpenAI的训练底层框架就是Ray。Ray提供了统一的分布式计算抽象,可以像在本机上执行python函数或类的实例,而不用关注该函数或实例在哪些机器上执行。

Ray框架图

使用方法

安装Ray:

代码语言:txt复制
pip install ray

主节点启动:

代码语言:txt复制
ray start --head --num-gpus=1
# num-gpus用于指定使用主节点上几张卡

启动后看输出日志,记录下来主节点的ip和port,从节点连接的时候需要。

从节点启动:

代码语言:txt复制
ray start --address='主节点ip:主节点端口' --num-gpus=1
# num-gpus用于指定使用从节点上几张卡

可以随意启动多个从节点

在集群内任意节点都可以查看集群状态,命令`ray status`

分布式任务执行:

在主节点上运行python程序,Ray会自动把任务分到多台机器上执行。下面是我自己写的,一个简单的三机三卡分布式python例子。

代码语言:txt复制
# pipeline_actor.py

import ray
from transformers import AutoModelForCausalLM


@ray.remote(num_gpus=1)  # 每个Actor分配1块GPU
class PipelineStage:
    def __init__(self, model_path: int, max_length: int):
        self.device = "cuda:0"

        # 加载模型
        self.model = AutoModelForCausalLM.from_pretrained(model_path, torch_dtype="auto", device_map="auto")
        self.max_length = max_length
        
    def forward(self, inputs: dict):
        # 将输入数据移动到当前设备
        inputs = {
            "input_ids": inputs["input_ids"].to(self.device),
            "attention_mask": inputs["attention_mask"].to(self.device)
        }

        # 执行当前阶段计算
        generated_ids = self.model.generate(**inputs, max_length=self.max_length)

        return generated_ids
代码语言:txt复制
# test_ray.py

import ray
import torch
from example.ray_dist.pipeline_actor import PipelineStage

from transformers import AutoTokenizer

master_node = "master_ip"
slave_node1 = "slave1_ip"
slave_node2 = "slave2_ip"
prompt = "Explain the theory of relativity in simple terms."
model_path = "./Llama-3.2-3B-Instruct/"

def main():
    # 初始化Ray集群
    ray.init(
        address="auto",
        runtime_env={"env_vars": {"RAY_ENABLE_WINDOWS_OR_OSX_CLUSTER": "1"}},
        _node_ip_address=master_node
    )

    # 在3台机器上各启动一个Actor
    stage1 = PipelineStage.options(
        resources={f"node:{master_node}": 0.01},  # 绑定到master node
        num_gpus=1
    ).remote(model_path=model_path, max_length=20)
    
    stage2 = PipelineStage.options(
        resources={f"node:{slave_node1}": 0.01},  # 绑定到slave node
        num_gpus=1
    ).remote(model_path=model_path, max_length=30)
    
    stage3 = PipelineStage.options(
        resources={f"node:{slave_node2}": 0.01},  # 绑定到slave node
        num_gpus=1
    ).remote(model_path=model_path, max_length=40)
    
    # 准备输入数据
    tokenizer = AutoTokenizer.from_pretrained(model_path)
    inputs = tokenizer(prompt, return_tensors="pt")

    # 执行pipeline推理
    generated_ids_1 = ray.get(stage1.forward.remote(inputs))
    
    inputs = {
        "input_ids": generated_ids_1,
        "attention_mask": torch.ones_like(generated_ids_1)
    }
    generated_ids_2 = ray.get(stage2.forward.remote(inputs))

    inputs = {
        "input_ids": generated_ids_2,
        "attention_mask": torch.ones_like(generated_ids_2)
    }
    generated_ids_3 = ray.get(stage3.forward.remote(inputs))
    

    # 解码输出
    print(tokenizer.batch_decode(generated_ids_1, skip_special_tokens=True))
    print(tokenizer.batch_decode(generated_ids_2, skip_special_tokens=True))
    print(tokenizer.batch_decode(generated_ids_3, skip_special_tokens=True))

    ray.shutdown()


if __name__ == "__main__":
    main()

核心原理

组件

Ray集群由一个或多个worker节点构成,每一个worker节点构成如下:

1、一个或多个worker进程,负责任务提交和执行。每一个worker进程负责处理一个具体的函数或类的实例,初始化的workers数量等于CPU数量。

2、raylet负责管理一个节点上的资源,主要由一个调度器和对象存储构成。调度器负责动态资源管理、任务分配,对象存储负责存储、转换大的对象。

其中一个worker节点会被指定为head节点,head节点除了具备上述worker节点的功能外,还有以下功能:

1、Global Control Services:是一个管理集群级别原数据的服务器,例如:以键值对形式缓存实例的位置、集群成员管理(成员新增、删除、健康检测)、actor管理(创建、销毁、重建)、资源管理、任务的调度和执行。总之,GCS管理的原数据访问频率较低,但会被集群中的大多数worker使用。在Ray 2.0中,GCS也可以运行在head节点外。代码位置:ray/src/ray/gcs at master · ray-project/ray

2、驱动进程:是一个特殊的工作进程,为了执行python main函数。可以提交任务,但不会执行。值得注意的是,驱动进程能够运行在任何节点,但通常默认在主节点。

内存模型

1、Ray worker在task或actor执行期间使用堆内存

2、大型Ray对象使用的共享内存(由‘ Ray .put() ’创建或由Ray任务返回的值),当worker调用‘ Ray.put() ’或从task返回时,它会将提供的值复制到Ray的共享内存对象存储中,然后Ray将使这些对象在整个集群中可用。

3、小Ray对象使用的堆内存(由Ray task返回),如果对象足够小(默认100KB), Ray将直接将值存储在所有者的“内存中”对象存储中,而不是Raylet共享内存对象存储中。任何读取该对象的worker(例如,通过‘ ray.get ’)都会将该值直接复制到自己的堆内存中。

4、Ray元数据使用的堆内存,这是Ray分配的内存,用于管理应用程序的元数据。

语言运行时

所有Ray核心组件都是用c++实现的,Ray通过一个称为“core worker”的通用c++库支持Python、Java和(实验性的)c++前端。这个库实现了所有权表、进程内存储,并管理gRPC与其他worker和raylet之间的通信。由于该库是用c++实现的,所以所有语言运行时都共享一个Ray worker协议的通用高性能实现。

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论