Copyright © 2015 Powered by MWeb, Theme used GitHub CSS.
Postoffice是个单例类,是整个ps-lite的核心,相当于整个ps-lite的调控中心,包括对调起Van负责整个网络的拉起、通信、命令管理如增加节点、移除节点、恢复节点等等;整个集群基本信息的管理,比如worker、server数的获取、server端feature分布的获取、worker/server Rank与node id的互转、节点角色身份等等;
Van是ps-lite的一个基类, 实现了基础的公共函数,ZMQVan是基于zeromq的Van的实现,Van是整个Parameter Server的通信模块;在整个训练任务的生命周期中,有以下几点值得注意:
Customer主要是request、response,比如新建一次request,会返回一个timestamp,这个timestamp会作为这次request的id,每次请求会自增1,相应的res也会自增1,调用wait时会保证 后续比如做Wait以此为ID识别,tracker_是Customer内用来记录request(使用request id)和对应的response的次数的一个map;recv_handle_绑定Customer接收到request后的处理函数(SimpleApp::Process);Customer会新拉起一个线程,用于在customer生命周期内,使用recv_handle_来处理接受的请求,这里是使用了一个线程安全队列,Accept()用于往队列中一直发送消息,对于Worker,比如KVWorker,recv_handle_保存拉取的msg中的数据,对于Server,需要使用set_request_handle来设置对应的处理函数,如KVServerDefaultHandle,使用std::unordered_map<Key, Val> store保存server的参数,当请求为push时,对store参数做更新,请求为pull时对参数进行拉取;
Message封装包括Meta, Control, Node等消息, 其中data的部分,采用的SArray这个数据结构可以理解为一个零拷贝的vector,能兼容vector的数据结构,另外为了保证高效会对Message里的Meta,Control,Node使用protobuf来打包,这里有个疑问,为啥不会数据比如推送的梯度信息用protobuf打包呢?
PS构建网络步骤如下:
ps-lite里面有两个涉及到等待同步的地方:
Worker pull时是异步操作,通常调用Wait来调用Customer::WaitRequest()来保证customer里面的request和response两者相等,即保证Pull完成后再做其他操作;
另外在一个worker内,可以存在多个Customer,当第一个发送barrier后,scheduler接收到request请求,然后根据msg判断是request,然后,向barrier_group里的所有node,node接到后, Postoffice::Get()->Manage(*msg)将barrier_done_中的customer_id对应的bool置true,完成同步操作,这里貌似没有我们常说的asp、bsp、ssp,可以通过增加相应的Command来完成;
当构建节点连接时,也可以进行一个barrier;
更复杂的比如Asp,bsp,ssp可以通过发送新定Command来完成
void Van::ProcessBarrierCommand(Message* msg) {
auto& ctrl = msg->meta.control;
if (msg->meta.request) {
if (barrier_count_.empty()) {
barrier_count_.resize(8, 0);
}
int group = ctrl.barrier_group;
++barrier_count_[group];
PS_VLOG(1) << "Barrier count for " << group << " : " << barrier_count_[group];
if (barrier_count_[group] ==
static_cast
barrier_count_[group] = 0;
Message res;
res.meta.request = false;
res.meta.app_id = msg->meta.app_id;
res.meta.customer_id = msg->meta.customer_id;
res.meta.control.cmd = Control::BARRIER;
for (int r : Postoffice::Get()->GetNodeIDs(group)) {
int recver_id = r;
if (shared_node_mapping_.find(r) == shared_node_mapping_.end()) {
res.meta.recver = recver_id;
res.meta.timestamp = timestamp_++;
CHECK_GT(Send(res), 0);
}
}
}
} else {
Postoffice::Get()->Manage(*msg);
}
}
一个基类,封装了基本的PS app的操作,KVWorker、KVServer集成SampleApp,完成相应逻辑: 如push、pull、key的切片等等;
Postoffice是个单例类,是整个ps-lite的核心,相当于整个ps-lite的调控中心,包括对调起Van负责整个网络的拉起、通信、命令管理如增加节点、移除节点、恢复节点等等;整个集群基本信息的管理,比如worker、server数的获取、server端feature分布的获取、worker/server Rank与node id的互转、节点角色身份等等;
Van是ps-lite的一个基类, 实现了基础的公共函数,ZMQVan是基于zeromq的Van的实现,Van是整个Parameter Server的通信模块;在整个训练任务的生命周期中,有以下几点值得注意:
Customer主要是request、response,比如新建一次request,会返回一个timestamp,这个timestamp会作为这次request的id,每次请求会自增1,相应的res也会自增1,调用wait时会保证 后续比如做Wait以此为ID识别,tracker_是Customer内用来记录request(使用request id)和对应的response的次数的一个map;recv_handle_绑定Customer接收到request后的处理函数(SimpleApp::Process);Customer会新拉起一个线程,用于在customer生命周期内,使用recv_handle_来处理接受的请求,这里是使用了一个线程安全队列,Accept()用于往队列中一直发送消息,对于Worker,比如KVWorker,recv_handle_保存拉取的msg中的数据,对于Server,需要使用set_request_handle来设置对应的处理函数,如KVServerDefaultHandle,使用std::unordered_map<Key, Val> store保存server的参数,当请求为push时,对store参数做更新,请求为pull时对参数进行拉取;
Message封装包括Meta, Control, Node等消息, 其中data的部分,采用的SArray这个数据结构可以理解为一个零拷贝的vector,能兼容vector的数据结构,另外为了保证高效会对Message里的Meta,Control,Node使用protobuf来打包,这里有个疑问,为啥不会数据比如推送的梯度信息用protobuf打包呢?
PS构建网络步骤如下:
ps-lite里面有两个涉及到等待同步的地方:
Worker pull时是异步操作,通常调用Wait来调用Customer::WaitRequest()来保证customer里面的request和response两者相等,即保证Pull完成后再做其他操作;
另外在一个worker内,可以存在多个Customer,当第一个发送barrier后,scheduler接收到request请求,然后根据msg判断是request,然后,向barrier_group里的所有node,node接到后, Postoffice::Get()->Manage(*msg)将barrier_done_中的customer_id对应的bool置true,完成同步操作,这里貌似没有我们常说的asp、bsp、ssp,可以通过增加相应的Command来完成;
当构建节点连接时,也可以进行一个barrier;
更复杂的比如Asp,bsp,ssp可以通过发送新定Command来完成
void Van::ProcessBarrierCommand(Message* msg) {
auto& ctrl = msg->meta.control;
if (msg->meta.request) {
if (barrier_count_.empty()) {
barrier_count_.resize(8, 0);
}
int group = ctrl.barrier_group;
++barrier_count_[group];
PS_VLOG(1) << "Barrier count for " << group << " : " << barrier_count_[group];
if (barrier_count_[group] ==
static_cast
barrier_count_[group] = 0;
Message res;
res.meta.request = false;
res.meta.app_id = msg->meta.app_id;
res.meta.customer_id = msg->meta.customer_id;
res.meta.control.cmd = Control::BARRIER;
for (int r : Postoffice::Get()->GetNodeIDs(group)) {
int recver_id = r;
if (shared_node_mapping_.find(r) == shared_node_mapping_.end()) {
res.meta.recver = recver_id;
res.meta.timestamp = timestamp_++;
CHECK_GT(Send(res), 0);
}
}
}
} else {
Postoffice::Get()->Manage(*msg);
}
}
一个基类,封装了基本的PS app的操作,KVWorker、KVServer集成SampleApp,完成相应逻辑: 如push、pull、key的切片等等;
Copyright © 2015 Powered by MWeb, Theme used GitHub CSS.