Ray 框架
创始人
2024-04-03 21:18:57
0

一 Ray是什么,优势

Spark则是一个面向数据处理的产品,RDD则是以数据作为抽象对象的,你关心的应该是数据如何处理,而不是去如何拆解任务,关心资源如何被分配,这其中涉及的概念比如Job,Stage,task你最好都不要管,RDD自己来决定。

Ray 是一个任务级别分配的分布式框架, Ray的系统层是以Task为抽象粒度的,用户可以在代码里任意生成和组合task,比如拆分成多个Stage,每个Task执行什么逻辑,每个task需要多少资源,非常自由,对资源把控力很强

Ray 可以用来在多个核心或机器上扩展 Python 应用。它有几个主要的优点,包括:

简单性:你可以扩展你的 Python 应用,而不需要重写,同样的代码可以在一台机器或多台机器上运行。

稳健性:应用程序可以优雅地处理机器故障和进程抢占。

性能:任务以毫秒级的延迟运行,可扩展到数万个内核,并以最小的序列化开销处理数值数据。

架构图
在这里插入图片描述
GlobalScheduler(全局调度器)— Master上启动一个全局调度器用于接收本地调度器提交的任务;并将任务分发给合适的本地任务调度器执行。
RedisServer Master(重新分配任务)— 启动一到多个RedisServer用于保存分布式任务的状态信息(Control State),包括对象机器的映射、任务描述、任务debug信息等。
LocalScheduler(局部调度器)— 每个Slave上启动一个本地调度器,用于提交任务到全局调度器,以及分配任务给当前机器的Worker进程。
Worker(工人)— 每个Slave上可以启动多个Worker进程执行分布式任务;并将计算结果存储到ObjectStore。
ObjectStore(对象存储)— 每个Slave上启动一个ObjectStore存储只读数据对象;Worker可以通过共享内存的方式访问这些对象数据;这样可以有效地减少内存拷贝和对象序列化成本,ObjectStore底层由Apache Arrow实现。
Plasma — 每个Slave上的ObjectStore都由一个名为Plasma的对象管理器进行管理;它可以在Worker访问本地ObjectStore上不存在的远程数据对象时主动拉取其它Slave上的对象数据到当前机器。

在这里插入图片描述
Ray的Driver节点和和Slave节点启动的组件几乎相同;不过却有以下区别:

Driver上的工作进程DriverProcess一般只有一个,即用户启动的PythonShell;Slave可以根据需要创建多个WorkerProcess。
Driver只能提交任务却不能来自全局调度器分配的任务。Slave可以提交任务也可以接收全局调度器分配的任务。
Driver可以主动绕过全局调度器给Slave发送Actor调用任务(此处设计是否合理尚不讨论);Slave只能接收全局调度器分配的计算任务。

二 Ray高效实现使用

  1. 无状态任务并行,同时发送多个square函数,此函数会异步提交到ray 服务中,并行执行任务,并返回task id并不会直接返回值,相当于指针一样,只有调用ray.get(futures) 可以得到原始值
import rayray.init()# Define the square task.
@ray.remote
def square(x):return x * x# Launch four parallel square tasks.
futures = [square.remote(i) for i in range(400)]# Retrieve results.
print(ray.get(futures))
# -> [0, 1, 4, 9]
  1. 有状态actor 类调用

增加remote装饰器,此类会在共享内存中实例化,其中类方法调用可以异步执行,类属性可以共享存在

import rayray.init()# Define the Counter actor.
@ray.remote
class Counter:def __init__(self):self.i = 0def get(self):return self.idef incr(self, value):self.i += value# Create a Counter actor.
c = Counter.remote()# Submit calls to the actor. These calls run asynchronously but in
# submission order on the remote actor process.
for _ in range(10):c.incr.remote(1)# Retrieve final actor state.
print(ray.get(c.get.remote()))
# -> 10
  1. ray版本paillier 加密,比单核版本快10倍以上,当然数据分流级别比分任务稍快一些
import ray
import time
from phe import paillierray.init(address="10.10.10.241:9937")class Test:def gen_pubk(self):pubk, prik = paillier.generate_paillier_keypair(n_length=1024)return pubk, prik@staticmethod@ray.remotedef encrypt(pubk, num):return pubk.encrypt(num)@staticmethod@ray.remotedef encrypt_batch(pubk, num):lis = []for i in range(num):lis.append(pubk.encrypt(i))return lisclass Run(object):def encrypt_test(self):t = Test()pubk, prik = t.gen_pubk()task = []t0 = time.time()for i in range(10000):task.append(t.encrypt.remote(pubk=pubk, num=i))ray.get(task)print("encrypt_test func time is {}".format(time.time() - t0))# 9sdef encrypt_batch(self):t = Test()pubk, prik = t.gen_pubk()t0 = time.time()task = []for i in range(20):task.append(t.encrypt_batch.remote(pubk=pubk, num=500))ray.get(task)print("encrypt_test func time is {}".format(time.time() - t0))# 8.3sif __name__ == '__main__':r = Run()r.encrypt_test()
  1. Map
# 单线程mapitems = list(range(100))
map_func = lambda i : i*2
output = [map_func(i) for i in items]
# ray map func@ray.remote
def map(obj, f):return f(obj)items = list(range(100))
map_func = lambda i : i*2
output = ray.get([map.remote(i, map_func) for i in items])
  1. reduce
# 单线程reduceitems = list(range(100))
map_func = lambda i : i*2
output = sum([map_func(i) for i in items])
# ray reduce@ray.remote
def map(obj, f):return f(obj)
@ray.remote
def sum_results(*elements):return np.sum(elements)items = list(range(100))
map_func = lambda i : i*2
remote_elements = [map.remote(i, map_func) for i in items]# simple reduce
remote_final_sum = sum_results.remote(*remote_elements)
result = ray.get(remote_final_sum)
  1. cal pi example
import ray
import math
import time
import randomray.init(address="10.10.10.241:9937")@ray.remote
class ProgressActor:def __init__(self, total_num_samples: int):self.total_num_samples = total_num_samplesself.num_samples_completed_per_task = {}def report_progress(self, task_id: int, num_samples_completed: int) -> None:self.num_samples_completed_per_task[task_id] = num_samples_completeddef get_progress(self) -> float:return (sum(self.num_samples_completed_per_task.values()) / self.total_num_samples)@ray.remote
def sampling_task(num_samples: int, task_id: int,progress_actor: ray.actor.ActorHandle) -> int:num_inside = 0for i in range(num_samples):x, y = random.uniform(-1, 1), random.uniform(-1, 1)if math.hypot(x, y) <= 1:num_inside += 1# Report progress every 1 million samples.if (i + 1) % 1_000_000 == 0:# This is async.progress_actor.report_progress.remote(task_id, i + 1)# Report the final progress.progress_actor.report_progress.remote(task_id, num_samples)return num_inside# Change this to match your cluster scale.
NUM_SAMPLING_TASKS = 24
NUM_SAMPLES_PER_TASK = 10_000_000
TOTAL_NUM_SAMPLES = NUM_SAMPLING_TASKS * NUM_SAMPLES_PER_TASK# Create the progress actor.
progress_actor = ProgressActor.remote(TOTAL_NUM_SAMPLES)results = [sampling_task.remote(NUM_SAMPLES_PER_TASK, i, progress_actor)for i in range(NUM_SAMPLING_TASKS)
]# Query progress periodically.
while True:progress = ray.get(progress_actor.get_progress.remote())print(f"Progress: {int(progress * 100)}%")if progress == 1:breaktime.sleep(1)# Get all the sampling tasks results.
"""
Monte Carlo 方法估计 π 的值,该方法 通过在 2x2 正方形内随机采样点来工作。
我们可以使用以原点为中心的单位圆内包含的点的比例来估计圆的面积与正方形面积的比值
。鉴于我们知道真实比率为 π/4,我们可以将估计的比率乘以 4 来近似 π 的值。我们为计算该近似值而采样的点越多,该值应该越接近 π 的真实值。
"""
total_num_inside = sum(ray.get(results))
pi = (total_num_inside * 4) / TOTAL_NUM_SAMPLES
print(f"Estimated value of π is: {pi}")

三 Ray数据

https://docs.ray.io/en/latest/data/dataset.html
object
在 Ray 中,任务和参与者在对象上创建和计算。我们将这些对象称为远程对象,因为它们可以存储在 Ray 集群中的任何位置,并且我们使用对象引用来引用它们。远程对象缓存在 Ray 的分布式共享内存 对象库中,集群中的每个节点都有一个对象库。对象 ref本质上是一个指针或唯一 ID,可用于在不查看其值的情况下引用远程对象。如果您熟悉期货,Ray 对象引用在概念上是相似的。

进程或者任务之间通信,可以通过对象的引用进行传递,避免了内存的拷贝

# !/usr/bin/python
# -*- coding: UTF-8 -*-"""Ray 将任务和参与者调用结果存储在其分布式对象存储中,返回可以稍后检索的对象引用。
对象引用也可以通过显式创建ray.put,并且对象引用可以作为参数值的替代物传递给任务:
"""
import ray
import numpy as npray.init()# Define a task that sums the values in a matrix.
@ray.remote
def sum_matrix(matrix):return np.sum(matrix)# Call the task with a literal argument value.
print(ray.get(sum_matrix.remote(np.ones((100, 100)))))
# -> 10000.0# Put a large array into the object store.
matrix_ref = ray.put(np.ones((1000, 1000)))# Call the task with the object reference as an argument.
print(ray.get(sum_matrix.remote(matrix_ref)))
# -> 1000000.0

分布式data


import ray
import pandas as pd# create data 将数据拆分成10块
ds = ray.data.range(10000)
ds = ds.repartition(10)
print("take ", ds.take(5))

csv数据处理

import ray
import pandas as pd# read csv
ds = ray.data.read_csv("/Users/tian/Projects/python-BasicUsage/算法/data/my_data_guest.csv")# 将数据拆分为10个partition
ds = ds.repartition(10)def pandas_transform(df: pd.DataFrame) -> pd.DataFrame:# Filter rows.print("df is ", df.count())return df# 设置batch_size ,会根据100一批并行的运行
ds.map_batches(pandas_transform, batch_size=100)

四 部署

安装:

直接安装"pip install 'ray" 会缺失一些package,资源监控,其他依赖的包等, ‘ray[default]’ 会默认安全所有功能的包

pip install 'ray[default]' -i https://mirrors.aliyun.com/pypi/simple --no-cache-dir

启动主节点

ray start --head --node-ip-address="10.10.10.123" --port="9937" --temp-dir="/data"

ray status 查看节点是否启动成功,目前存在两个节点,一个是主节点启动的节点,另外一个为父节点加入集群的节点
在这里插入图片描述
启动子节点:

输入主节点的IP 及 GCS服务端口,子节点会自动加入主节点作为集群一部分

ray start --address="10.10.10.123:9937" --temp-dir="/data"

额外常用的一些参数:
https://docs.ray.io/en/latest/ray-core/configure.html


--num-cpus="10" # 限制节点CPU可用数量
--object-store-memory="10737418240" # 限制对象存储量10G
--memory="10737418240" # 限制堆内存量
--dashboard-port="8265" # 设置监控端口
--dashboard-host='0.0.0.0' # 绑定所有ip,任何都可以访问
--temp-dir="/data" # 设置临时路径,各个服务日志都会保存在此
--system-config='{"object_spilling_config":"{\"type\":\"filesystem\",\"params\":{\"directory_path\":\"/tmp/spill\"}}"}'  # 内存占满时,磁盘移除路径设置

容器集群启动

首先保证服务器网络是可以互相访问的

yum install telnet -y

telnet ip port

容器以host模式启动容器,所有的端口都会与宿主机共享;或者一些主服务节点,在容器中暴漏出来

docker run --net=host -itd docker_image # 推荐这种,ray服务端口会比较多,逐一映射定义会有些麻烦docker run -p port:port -itd docker_image

Dashboard 服务无法运行: https://www.jianshu.com/p/40ff76cbf8ca

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...
有效的括号 一、题目 给定一个只包括 '(',')','{','}'...
【PdgCntEditor】解... 一、问题背景 大部分的图书对应的PDF,目录中的页码并非PDF中直接索引的页码...