ps-lite

ps-lite再一次研读源码

Postoffice

Postoffice是个单例类,是整个ps-lite的核心,相当于整个ps-lite的调控中心,包括对调起Van负责整个网络的拉起、通信、命令管理如增加节点、移除节点、恢复节点等等;整个集群基本信息的管理,比如worker、server数的获取、server端feature分布的获取、worker/server Rank与node id的互转、节点角色身份等等;

Van 和ZMQVan: 网络如何被构建, 如何通信

Van是ps-lite的一个基类, 实现了基础的公共函数,ZMQVan是基于zeromq的Van的实现,Van是整个Parameter Server的通信模块;在整个训练任务的生命周期中,有以下几点值得注意:

  1. 任务启动时,所有nodes,发送消息到scheduler,;
  2. 启动好scheduler后, worker与server会互相连接,注意worker之间、server之间不会连接;
  3. 框架运行过程中,通信中包括以下多种信息类型,如数据信息:worker向server更新梯度、心跳信息:worker/server向scheduler发送心跳、server和worker的连接、scheduler端的处理命令:如添加节点、恢复dead节点等等;
  4. Message中的Meta,如是否request、app_id、timestamp、nodes的ip、port、role等等在网络通信过程中会打包成protobuf,减少通信压力;
  5. 在节点挂掉(心跳时间内没回应)会恢复节点,这个过程中会将挂掉节点的id赋给恢复的节点;

Customer 消息如何被处理

Customer主要是request、response,比如新建一次request,会返回一个timestamp,这个timestamp会作为这次request的id,每次请求会自增1,相应的res也会自增1,调用wait时会保证 后续比如做Wait以此为ID识别,tracker_是Customer内用来记录request(使用request id)和对应的response的次数的一个map;recv_handle_绑定Customer接收到request后的处理函数(SimpleApp::Process);Customer会新拉起一个线程,用于在customer生命周期内,使用recv_handle_来处理接受的请求,这里是使用了一个线程安全队列,Accept()用于往队列中一直发送消息,对于Worker,比如KVWorker,recv_handle_保存拉取的msg中的数据,对于Server,需要使用set_request_handle来设置对应的处理函数,如KVServerDefaultHandle,使用std::unordered_map<Key, Val> store保存server的参数,当请求为push时,对store参数做更新,请求为pull时对参数进行拉取;

Message: 消息数据结构

Message封装包括Meta, Control, Node等消息, 其中data的部分,采用的SArray这个数据结构可以理解为一个零拷贝的vector,能兼容vector的数据结构,另外为了保证高效会对Message里的Meta,Control,Node使用protobuf来打包,这里有个疑问,为啥不会数据比如推送的梯度信息用protobuf打包呢?

PS如何构建网络

PS构建网络步骤如下:

  1. scheduler节点拉起;
  2. worker、server节点想scheduler发送请求,汇报ip、端口等等,scheduler分配node id给相应节点;
  3. 所有节点启动后,scheduler发送消息周知;
  4. 所有节点内部启动线程,一直发送心跳, scheduler会来处理相应的命令;

同步操作

ps-lite里面有两个涉及到等待同步的地方:

  1. Worker pull时是异步操作,通常调用Wait来调用Customer::WaitRequest()来保证customer里面的request和response两者相等,即保证Pull完成后再做其他操作;

  2. 另外在一个worker内,可以存在多个Customer,当第一个发送barrier后,scheduler接收到request请求,然后根据msg判断是request,然后,向barrier_group里的所有node,node接到后, Postoffice::Get()->Manage(*msg)将barrier_done_中的customer_id对应的bool置true,完成同步操作,这里貌似没有我们常说的asp、bsp、ssp,可以通过增加相应的Command来完成;

  3. 当构建节点连接时,也可以进行一个barrier;

  4. 更复杂的比如Asp,bsp,ssp可以通过发送新定Command来完成

    void Van::ProcessBarrierCommand(Message* msg) {
    auto& ctrl = msg->meta.control;
    if (msg->meta.request) {
    if (barrier_count_.empty()) {
    barrier_count_.resize(8, 0);
    }
    int group = ctrl.barrier_group;
    ++barrier_count_[group];
    PS_VLOG(1) << "Barrier count for " << group << " : " << barrier_count_[group];
    if (barrier_count_[group] ==
    static_cast(Postoffice::Get()->GetNodeIDs(group).size())) {
    barrier_count_[group] = 0;
    Message res;
    res.meta.request = false;
    res.meta.app_id = msg->meta.app_id;
    res.meta.customer_id = msg->meta.customer_id;
    res.meta.control.cmd = Control::BARRIER;
    for (int r : Postoffice::Get()->GetNodeIDs(group)) {
    int recver_id = r;
    if (shared_node_mapping_.find(r) == shared_node_mapping_.end()) {
    res.meta.recver = recver_id;
    res.meta.timestamp = timestamp_++;
    CHECK_GT(Send(res), 0);
    }
    }
    }
    } else {
    Postoffice::Get()->Manage(*msg);
    }
    }

SampleApp

一个基类,封装了基本的PS app的操作,KVWorker、KVServer集成SampleApp,完成相应逻辑: 如push、pull、key的切片等等;

2020/03/08 posted in  参数服务器

ps-lite再一次研读源码

Postoffice

Postoffice是个单例类,是整个ps-lite的核心,相当于整个ps-lite的调控中心,包括对调起Van负责整个网络的拉起、通信、命令管理如增加节点、移除节点、恢复节点等等;整个集群基本信息的管理,比如worker、server数的获取、server端feature分布的获取、worker/server Rank与node id的互转、节点角色身份等等;

Van 和ZMQVan: 网络如何被构建, 如何通信

Van是ps-lite的一个基类, 实现了基础的公共函数,ZMQVan是基于zeromq的Van的实现,Van是整个Parameter Server的通信模块;在整个训练任务的生命周期中,有以下几点值得注意:

  1. 任务启动时,所有nodes,发送消息到scheduler,;
  2. 启动好scheduler后, worker与server会互相连接,注意worker之间、server之间不会连接;
  3. 框架运行过程中,通信中包括以下多种信息类型,如数据信息:worker向server更新梯度、心跳信息:worker/server向scheduler发送心跳、server和worker的连接、scheduler端的处理命令:如添加节点、恢复dead节点等等;
  4. Message中的Meta,如是否request、app_id、timestamp、nodes的ip、port、role等等在网络通信过程中会打包成protobuf,减少通信压力;
  5. 在节点挂掉(心跳时间内没回应)会恢复节点,这个过程中会将挂掉节点的id赋给恢复的节点;

Customer 消息如何被处理

Customer主要是request、response,比如新建一次request,会返回一个timestamp,这个timestamp会作为这次request的id,每次请求会自增1,相应的res也会自增1,调用wait时会保证 后续比如做Wait以此为ID识别,tracker_是Customer内用来记录request(使用request id)和对应的response的次数的一个map;recv_handle_绑定Customer接收到request后的处理函数(SimpleApp::Process);Customer会新拉起一个线程,用于在customer生命周期内,使用recv_handle_来处理接受的请求,这里是使用了一个线程安全队列,Accept()用于往队列中一直发送消息,对于Worker,比如KVWorker,recv_handle_保存拉取的msg中的数据,对于Server,需要使用set_request_handle来设置对应的处理函数,如KVServerDefaultHandle,使用std::unordered_map<Key, Val> store保存server的参数,当请求为push时,对store参数做更新,请求为pull时对参数进行拉取;

Message: 消息数据结构

Message封装包括Meta, Control, Node等消息, 其中data的部分,采用的SArray这个数据结构可以理解为一个零拷贝的vector,能兼容vector的数据结构,另外为了保证高效会对Message里的Meta,Control,Node使用protobuf来打包,这里有个疑问,为啥不会数据比如推送的梯度信息用protobuf打包呢?

PS如何构建网络

PS构建网络步骤如下:

  1. scheduler节点拉起;
  2. worker、server节点想scheduler发送请求,汇报ip、端口等等,scheduler分配node id给相应节点;
  3. 所有节点启动后,scheduler发送消息周知;
  4. 所有节点内部启动线程,一直发送心跳, scheduler会来处理相应的命令;

同步操作

ps-lite里面有两个涉及到等待同步的地方:

  1. Worker pull时是异步操作,通常调用Wait来调用Customer::WaitRequest()来保证customer里面的request和response两者相等,即保证Pull完成后再做其他操作;

  2. 另外在一个worker内,可以存在多个Customer,当第一个发送barrier后,scheduler接收到request请求,然后根据msg判断是request,然后,向barrier_group里的所有node,node接到后, Postoffice::Get()->Manage(*msg)将barrier_done_中的customer_id对应的bool置true,完成同步操作,这里貌似没有我们常说的asp、bsp、ssp,可以通过增加相应的Command来完成;

  3. 当构建节点连接时,也可以进行一个barrier;

  4. 更复杂的比如Asp,bsp,ssp可以通过发送新定Command来完成

    void Van::ProcessBarrierCommand(Message* msg) {
    auto& ctrl = msg->meta.control;
    if (msg->meta.request) {
    if (barrier_count_.empty()) {
    barrier_count_.resize(8, 0);
    }
    int group = ctrl.barrier_group;
    ++barrier_count_[group];
    PS_VLOG(1) << "Barrier count for " << group << " : " << barrier_count_[group];
    if (barrier_count_[group] ==
    static_cast(Postoffice::Get()->GetNodeIDs(group).size())) {
    barrier_count_[group] = 0;
    Message res;
    res.meta.request = false;
    res.meta.app_id = msg->meta.app_id;
    res.meta.customer_id = msg->meta.customer_id;
    res.meta.control.cmd = Control::BARRIER;
    for (int r : Postoffice::Get()->GetNodeIDs(group)) {
    int recver_id = r;
    if (shared_node_mapping_.find(r) == shared_node_mapping_.end()) {
    res.meta.recver = recver_id;
    res.meta.timestamp = timestamp_++;
    CHECK_GT(Send(res), 0);
    }
    }
    }
    } else {
    Postoffice::Get()->Manage(*msg);
    }
    }

SampleApp

一个基类,封装了基本的PS app的操作,KVWorker、KVServer集成SampleApp,完成相应逻辑: 如push、pull、key的切片等等;

2019/03/26 posted in  参数服务器