分布式计算框架
开源地址:
Ray是一个高性能的分布式计算框架,在AI和大模型领域得到了广泛应用,OpenAI的训练底层框架就是Ray。Ray提供了统一的分布式计算抽象,可以像在本机上执行python函数或类的实例,而不用关注该函数或实例在哪些机器上执行。
使用方法
安装Ray:
代码语言:txt复制pip install ray
主节点启动:
代码语言:txt复制ray start --head --num-gpus=1
# num-gpus用于指定使用主节点上几张卡
启动后看输出日志,记录下来主节点的ip和port,从节点连接的时候需要。
从节点启动:
代码语言:txt复制ray start --address='主节点ip:主节点端口' --num-gpus=1
# num-gpus用于指定使用从节点上几张卡
可以随意启动多个从节点
在集群内任意节点都可以查看集群状态,命令`ray status`
分布式任务执行:
在主节点上运行python程序,Ray会自动把任务分到多台机器上执行。下面是我自己写的,一个简单的三机三卡分布式python例子。
代码语言:txt复制# pipeline_actor.py
import ray
from transformers import AutoModelForCausalLM
@ray.remote(num_gpus=1) # 每个Actor分配1块GPU
class PipelineStage:
def __init__(self, model_path: int, max_length: int):
self.device = "cuda:0"
# 加载模型
self.model = AutoModelForCausalLM.from_pretrained(model_path, torch_dtype="auto", device_map="auto")
self.max_length = max_length
def forward(self, inputs: dict):
# 将输入数据移动到当前设备
inputs = {
"input_ids": inputs["input_ids"].to(self.device),
"attention_mask": inputs["attention_mask"].to(self.device)
}
# 执行当前阶段计算
generated_ids = self.model.generate(**inputs, max_length=self.max_length)
return generated_ids
代码语言:txt复制# test_ray.py
import ray
import torch
from example.ray_dist.pipeline_actor import PipelineStage
from transformers import AutoTokenizer
master_node = "master_ip"
slave_node1 = "slave1_ip"
slave_node2 = "slave2_ip"
prompt = "Explain the theory of relativity in simple terms."
model_path = "./Llama-3.2-3B-Instruct/"
def main():
# 初始化Ray集群
ray.init(
address="auto",
runtime_env={"env_vars": {"RAY_ENABLE_WINDOWS_OR_OSX_CLUSTER": "1"}},
_node_ip_address=master_node
)
# 在3台机器上各启动一个Actor
stage1 = PipelineStage.options(
resources={f"node:{master_node}": 0.01}, # 绑定到master node
num_gpus=1
).remote(model_path=model_path, max_length=20)
stage2 = PipelineStage.options(
resources={f"node:{slave_node1}": 0.01}, # 绑定到slave node
num_gpus=1
).remote(model_path=model_path, max_length=30)
stage3 = PipelineStage.options(
resources={f"node:{slave_node2}": 0.01}, # 绑定到slave node
num_gpus=1
).remote(model_path=model_path, max_length=40)
# 准备输入数据
tokenizer = AutoTokenizer.from_pretrained(model_path)
inputs = tokenizer(prompt, return_tensors="pt")
# 执行pipeline推理
generated_ids_1 = ray.get(stage1.forward.remote(inputs))
inputs = {
"input_ids": generated_ids_1,
"attention_mask": torch.ones_like(generated_ids_1)
}
generated_ids_2 = ray.get(stage2.forward.remote(inputs))
inputs = {
"input_ids": generated_ids_2,
"attention_mask": torch.ones_like(generated_ids_2)
}
generated_ids_3 = ray.get(stage3.forward.remote(inputs))
# 解码输出
print(tokenizer.batch_decode(generated_ids_1, skip_special_tokens=True))
print(tokenizer.batch_decode(generated_ids_2, skip_special_tokens=True))
print(tokenizer.batch_decode(generated_ids_3, skip_special_tokens=True))
ray.shutdown()
if __name__ == "__main__":
main()
核心原理
组件
Ray集群由一个或多个worker节点构成,每一个worker节点构成如下:
1、一个或多个worker进程,负责任务提交和执行。每一个worker进程负责处理一个具体的函数或类的实例,初始化的workers数量等于CPU数量。
2、raylet负责管理一个节点上的资源,主要由一个调度器和对象存储构成。调度器负责动态资源管理、任务分配,对象存储负责存储、转换大的对象。
其中一个worker节点会被指定为head节点,head节点除了具备上述worker节点的功能外,还有以下功能:
1、Global Control Services:是一个管理集群级别原数据的服务器,例如:以键值对形式缓存实例的位置、集群成员管理(成员新增、删除、健康检测)、actor管理(创建、销毁、重建)、资源管理、任务的调度和执行。总之,GCS管理的原数据访问频率较低,但会被集群中的大多数worker使用。在Ray 2.0中,GCS也可以运行在head节点外。代码位置:ray/src/ray/gcs at master · ray-project/ray
2、驱动进程:是一个特殊的工作进程,为了执行python main函数。可以提交任务,但不会执行。值得注意的是,驱动进程能够运行在任何节点,但通常默认在主节点。
内存模型
1、Ray worker在task或actor执行期间使用堆内存
2、大型Ray对象使用的共享内存(由‘ Ray .put() ’创建或由Ray任务返回的值),当worker调用‘ Ray.put() ’或从task返回时,它会将提供的值复制到Ray的共享内存对象存储中,然后Ray将使这些对象在整个集群中可用。
3、小Ray对象使用的堆内存(由Ray task返回),如果对象足够小(默认100KB), Ray将直接将值存储在所有者的“内存中”对象存储中,而不是Raylet共享内存对象存储中。任何读取该对象的worker(例如,通过‘ ray.get ’)都会将该值直接复制到自己的堆内存中。
4、Ray元数据使用的堆内存,这是Ray分配的内存,用于管理应用程序的元数据。
语言运行时
所有Ray核心组件都是用c++实现的,Ray通过一个称为“core worker”的通用c++库支持Python、Java和(实验性的)c++前端。这个库实现了所有权表、进程内存储,并管理gRPC与其他worker和raylet之间的通信。由于该库是用c++实现的,所以所有语言运行时都共享一个Ray worker协议的通用高性能实现。