Ray源码分析-进程调度

Ray version:0.7.4。进程调度部分源码分析

调度部分包括六个文件:

  • ray-releases-0.7.3/src/ray/raylet/scheduling_policy.h
  • ray-releases-0.7.3/src/ray/raylet/scheduling_policy.cc
  • ray-releases-0.7.3/src/ray/raylet/scheduling_queue.h
  • ray-releases-0.7.3/src/ray/raylet/scheduling_queue.cc
  • ray-releases-0.7.3/src/ray/common/task/scheduling_resources.h
  • ray-releases-0.7.3/src/ray/common/task/scheduling_resources.cc

很好理解,分别是调度策略,调度队列,调度资源。

raylet部分使用C++写的,通过cython与python相连,我还不会cython....

调度策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// ray-releases-0.7.3/src/ray/raylet/scheduling_policy.h

namespace raylet {

/// \class SchedulingPolicy
/// \brief Implements a scheduling policy for the node manager.
class SchedulingPolicy {
public:
/// \brief SchedulingPolicy constructor.
///
/// \param scheduling_queue: reference to a scheduler queues object for access to
/// tasks.
/// \return Void.
SchedulingPolicy(const SchedulingQueue &scheduling_queue);

/// \brief Perform a scheduling operation, given a set of cluster resources and
/// producing a mapping of tasks to raylets.
///
/// \param cluster_resources: a set of cluster resources containing resource and load
/// information for some subset of the cluster. For all client IDs in the returned
/// placement map, the corresponding SchedulingResources::resources_load_ is
/// incremented by the aggregate resource demand of the tasks assigned to it.
/// \param local_client_id The ID of the node manager that owns this
/// SchedulingPolicy object.
/// \return Scheduling decision, mapping tasks to raylets for placement.
std::unordered_map<TaskID, ClientID> Schedule(
std::unordered_map<ClientID, SchedulingResources> &cluster_resources,
const ClientID &local_client_id);

/// \brief Given a set of cluster resources perform a spill-over scheduling operation.
///
/// \param cluster_resources: a set of cluster resources containing resource and load
/// information for some subset of the cluster. For all client IDs in the returned
/// placement map, the corresponding SchedulingResources::resources_load_ is
/// incremented by the aggregate resource demand of the tasks assigned to it.
/// \return Scheduling decision, mapping tasks to raylets for placement.
std::vector<TaskID> SpillOver(SchedulingResources &remote_scheduling_resources) const;

/// \brief SchedulingPolicy destructor.
virtual ~SchedulingPolicy();

private:
/// An immutable reference to the scheduling task queues.
const SchedulingQueue &scheduling_queue_;
/// Internally maintained random number generator.
std::mt19937_64 gen_;
};

}

调度策略的头文件,描述了SchedulingPolicy类的结构,包括构造函数,析构函数,Schedule函数(关键调度决策)。

构造函数传入需要调度的任务队列,Schedule函数接受集群资源的一个unordered_map作为输入,输出为调度决策,也是一个unordered_map

Schedule接受的输入map是以ClientID即本节点id为key,SchedulingResources调度资源为value,即一个节点->资源的映射。

Schedule输出map则是以TaskID为键,ClientID为值,即表示TaskID表示的Task调度到ClientID代表的节点上。

累的撑不住了,待续..