Ray源码分析-remote修饰器

Ray version: 0.7.3

@ray.remote修饰器的源码分析

基础知识:python中的修饰器,python中的修饰器就是一个以函数为参数,返回函数的函数。目的是为了方便的添加/修改函数的一些操作,却不想修改原有函数的代码。

见:廖雪峰python-修饰器

主要定义代码:在ray/python/ray/worker.py中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def remote(*args, **kwargs):
...简介用法...
worker = get_global_worker()

if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
# 如果装饰器仅仅是@ray.remote
return make_decorator(worker=worker)(args[0])

# 带有参数,解析关键字
.....
return make_decorator(
num_return_vals=num_return_vals,
num_cpus=num_cpus,
num_gpus=num_gpus,
resources=resources,
max_calls=max_calls,
max_reconstructions=max_reconstructions,
worker=worker)

在ray/python/ray/worker.py中:make_decorator(...)

主要逻辑:判断是生成一个remote function还是一个Actor,然后根据对应的,去生成。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def make_decorator(num_return_vals=None,
num_cpus=None,
num_gpus=None,
resources=None,
max_calls=None,
max_reconstructions=None,
worker=None):
def decorator(function_or_class):
if (inspect.isfunction(function_or_class) or is_cython(function_or_class)):
# Set the remote function default resources.
if max_reconstructions is not None:
raise Exception("...")

return ray.remote_function.RemoteFunction(
function_or_class, num_cpus, num_gpus, resources,
num_return_vals, max_calls)

if inspect.isclass(function_or_class):
if num_return_vals is not None:
raise Exception("...")
if max_calls is not None:
raise Exception("...")

return worker.make_actor(function_or_class, num_cpus, num_gpus,
resources, max_reconstructions)

raise Exception("...")

return decorator

这个函数里进一步判断,判断装饰器传进来的参数,是要生成一个remote function还是要生成一个具有连续执行顺序的Actor。

我们的例子里是一个remote function,在ray/python/ray/remote_function.py

有代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class RemoteFunction(object):
"""A remote function.
This is a decorated function. It can be used to spawn tasks.
参数介绍
"""

def __init__(self, function, num_cpus, num_gpus, resources,
num_return_vals, max_calls):
....
# Override task.remote's signature and docstring
@wraps(function)
def _remote_proxy(*args, **kwargs):
return self._remote(args=args, kwargs=kwargs)

self.remote = _remote_proxy

def __call__(self, *args, **kwargs):
raise Exception("Remote functions cannot be called directly. Instead "
"of running '{}()', try '{}.remote()'.".format(
self._function_name, self._function_name))
# 已经弃用的装饰器,在引用中的博文分析的是0.4.0版本的,那个时候还在用_submit
def _submit(self,
args=None,
kwargs=None,
num_return_vals=None,
num_cpus=None,
num_gpus=None,
resources=None):
logger.warning(
"WARNING: _submit() is being deprecated. Please use _remote().")
return self._remote(
args=args,
kwargs=kwargs,
num_return_vals=num_return_vals,
num_cpus=num_cpus,
num_gpus=num_gpus,
resources=resources)

def _remote(self,
args=None,
kwargs=None,
num_return_vals=None,
num_cpus=None,
num_gpus=None,
resources=None):
"""An experimental alternate way to submit remote functions."""
worker = ray.worker.get_global_worker()
worker.check_connected()

if self._last_export_session_and_job != worker.current_session_and_job:
# If this function was not exported in this session and job,
# we need to export this function again, because current GCS
# doesn't have it.
# 与Actor有关的,防止重复发布的
self._last_export_session_and_job = worker.current_session_and_job
worker.function_actor_manager.export(self)

kwargs = {} if kwargs is None else kwargs
args = [] if args is None else args

....

resources = ray.utils.resources_from_resource_arguments(
self._num_cpus, self._num_gpus, self._resources, num_cpus,
num_gpus, resources)

def invocation(args, kwargs):
args = ray.signature.extend_args(self._function_signature, args,
kwargs)

if worker.mode == ray.worker.LOCAL_MODE:
...
else:
object_ids = worker.submit_task(
self._function_descriptor,
args,
num_return_vals=num_return_vals,
resources=resources)
# 设置返回的参数
if len(object_ids) == 1:
return object_ids[0]
elif len(object_ids) > 1:
return object_ids

if self._decorator is not None:
invocation = self._decorator(invocation)

return invocation(args, kwargs)

至此,这个remote修饰器就完成了。

流程分析:

remote是一个通用的装饰器,可以装饰普通的Python函数,或者是Python的class。

进入remote装饰器体,首先得到全局Worker然后调用make_decorator,这是一个装饰器工厂函数。根据类型的不同,创建remote function或者Actor。

这里创建的是RemoteFunction(),首先判断是无参修饰还是带参修饰,如果是无参修饰,那么就是

1
2
3
4
@ray.remote
def f():
time.sleep(1)
return 1

等价于

1
2
3
4
def f():
...

f = ray.remote(f)

示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@ray.remote
def f1():
time.sleep(1)
return 1
print(ray.get(f1.remote()))

def f2():
time.sleep(1)
return 2

f2 = ray.remote(f2)
result = f2.remote()

print(ray.get(result))

如果是带参修饰,示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
@ray.remote(num_cpus = 1)
def remote_add(a,b):
time.sleep(1)
return a+b

print(ray.get(remote_add.remote(1,2)))

def local_add(a,b):
time.sleep(1)
return a+b

change_to_remote = ray.remote(num_cpus = 1)(local_add)
print(ray.get(change_to_remote.remote(1,2)))

至此,remote修饰器就完成了