Ray源码分析-执行remote函数

Ray version:0.7.4

当定义了一个remote function之后,我们通过remote_func.remote()执行这个函数的时候,会发生什么?

当我们运行了修饰器@ray.remote之后,得到了一个远程函数remote function

最后在执行的时候,主要就是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def invocation(args, kwargs):
args = ray.signature.extend_args(self._function_signature, args, kwargs)

if worker.mode == ray.worker.LOCAL_MODE:
object_ids = worker.local_mode_manager.execute(
self._function, self._function_descriptor, args,
num_return_vals)
else:
object_ids = worker.submit_task(
self._function_descriptor,
args,
num_return_vals=num_return_vals,
resources=resources)

if len(object_ids) == 1:
return object_ids[0]
elif len(object_ids) > 1:
return object_ids

不考虑LOCAL_MODE的情况下,主要工作就俩:submit_task发布任务,和返回一个object_id,以供ray.get(object_id)

这是submit_task的函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def submit_task(...):
"""Submit a remote task to the scheduler.

Returns:
The return object IDs for this task.
"""
with profiling.profile("submit_task"):
....

# Increment the worker's task index to track how many tasks
# have been submitted by the current task so far.
self.task_context.task_index += 1
# The parent task must be set for the submitted task.
assert not self.current_task_id.is_nil()
# Current driver id must not be nil when submitting a task.
# Because every task must belong to a driver.
assert not self.current_job_id.is_nil()
# Submit the task to raylet.
function_descriptor_list = (function_descriptor.get_function_descriptor_list())

....

task = ray._raylet.TaskSpec( ...)
self.raylet_client.submit_task(task)

return task.returns()

当我们进行调度的时候,Driver会提交N个task给local scheduler,然后local scheduler把这些task分给各个Worker。

默认情况下,会启动【CPU核心数】个Worker。