當前位置:網站首頁>cyberRT 發現機制總結

cyberRT 發現機制總結

2022-05-15 08:31:54shenkaibo

cyberRT 發現機制總結

代碼目錄

cyber 發現機制的代碼都在cyber/service_discovery中,後面我們直接從topology_manager中去分析整個發現機制流程。
以下代碼流程中非重要代碼都用…替換。

發現機制初始化

bool TopologyManager::Init() {
  if (init_.exchange(true)) {
    return true;
  }
//創建三個類型節點的manager
  node_manager_ = std::make_shared<NodeManager>();
  channel_manager_ = std::make_shared<ChannelManager>();
  service_manager_ = std::make_shared<ServiceManager>();

  CreateParticipant();
//主要是這步處理,對所有類型的manager做初始化
  bool result =
      InitNodeManager() && InitChannelManager() && InitServiceManager();
  if (!result) {
	...
  }

  return true;
}

TopologyManager::Init中主要幹三件事,對node、channel、service這三個manager的初始化。init中創建了三個manager,node_manager_ 、channel_manager_ 、service_manager_ ,這三個manager都是繼承自Manager。並且三個manager的init函數中做的東西很簡單,都是調用了基類的StartDiscovery。我們看下StartDiscovery中的處理:

bool Manager::StartDiscovery(RtpsParticipant* participant) {
  if (participant == nullptr) {
    return false;
  }
  if (is_discovery_started_.exchange(true)) {
    return true;
  }
  //這裏創建的是fastRtps的pub和sub
  if (!CreatePublisher(participant) || !CreateSubscriber(participant)) {
    AERROR << "create publisher or subscriber failed.";
    StopDiscovery();
    return false;
  }
  return true;
}

可以看到StartDiscovery中做的處理就是創建出fastRtps的pub和sub,這裏我們也可以總結出cyber內部的發現機制通信都是通過fastRtps。這裏我理解直接使用fastRtps通信是為了避免自己再去實現trans local的機制。不管是同一機器內的通信,還是不同機器間的通信都使用了fastRtps方式。當然為了性能的極致化,也可以考慮同一機器內發現消息使用共享內存,但是這種的話你就要自己去實現trans local機制。確保晚加入的節點,能收到先加入節點的發現消息。

Topology的join和leave

這裏只舉例Join。
所有的節點join都是調用的基類方法Manager::Join,可以看下這個函數

bool Manager::Join(const RoleAttributes& attr, RoleType role,
                   bool need_publish) {
  if (is_shutdown_.load()) {
    ADEBUG << "the manager has been shut down.";
    return false;
  }
  RETURN_VAL_IF(!((1 << role) & allowed_role_), false);
  RETURN_VAL_IF(!Check(attr), false);
  ChangeMsg msg;
  Convert(attr, role, OperateType::OPT_JOIN, &msg);
  Dispose(msg);
  if (need_publish) {
    return Publish(msg);
  }
  return true;
}

這裏主要看Dispose和Publish這兩個。Dispose調用的是各個子類的方法,比如channel拓撲節點變化了,就會調用ChannelManager::Dispose來通過channelManager的節點變化,看下面:

void ChannelManager::Dispose(const ChangeMsg& msg) {
  if (msg.operate_type() == OperateType::OPT_JOIN) {
    DisposeJoin(msg);
  } else {
    DisposeLeave(msg);
  }
  Notify(msg);
}

Dispose會根據這個節點是join還是leave來調用子類的DisposeJoin或者DisposeLeave方法,目的就是更新各個子類manager中保存的節點關系。另外這裏還會調用Notify,裏面用了QT信號和槽的機制,其實就是觸發了一個callback。這個callback是使用方調用TopologyManager中的AddChangeListener設置的一個callback。

再看Publish函數,這函數就是將join或者leave消息通過fastRtps消息發給所有subscribe,讓所有sub都去更新拓撲節點關系。可以看下sub收到消息後的處理,收到消息的處理是Manager::OnRemoteChange:

void Manager::OnRemoteChange(const std::string& msg_str) {
  if (is_shutdown_.load()) {
    ADEBUG << "the manager has been shut down.";
    return;
  }

  ChangeMsg msg;
  RETURN_IF(!message::ParseFromString(msg_str, &msg));
  if (IsFromSameProcess(msg)) {
    return;
  }
  RETURN_IF(!Check(msg.role_attr()));
  Dispose(msg);
}

這裏的處理也很簡單,主要是收到sub拓撲節點變更消息後,觸發OnRemoteChange再調用Dispose 來更新自己保存的拓撲關系。

leave這裏就不細說了,和join流程差不多。

版權聲明
本文為[shenkaibo]所創,轉載請帶上原文鏈接,感謝
https://cht.chowdera.com/2022/135/202205150000357169.html

隨機推薦