18#ifndef IGNITION_TRANSPORT_DISCOVERY_HH_
19#define IGNITION_TRANSPORT_DISCOVERY_HH_
31 #include <sys/types.h>
33 #include <sys/socket.h>
37 #include <arpa/inet.h>
41 #include <netinet/in.h>
47 #pragma warning(push, 0)
52 #pragma warning(disable: 4503)
54 #pragma warning(disable: 4996)
58#include <condition_variable>
66#include "ignition/transport/Export.hh"
95 template<
typename Pub>
105 const bool _verbose =
false)
109 silenceInterval(kDefSilenceInterval),
110 activityInterval(kDefActivityInterval),
111 heartbeatInterval(kDefHeartbeatInterval),
112 connectionCb(nullptr),
113 disconnectionCb(nullptr),
116 numHeartbeatsUninitialized(0),
121 if (
env(
"IGN_IP", ignIp) && !ignIp.
empty())
122 this->hostInterfaces = {ignIp};
126 this->hostInterfaces = determineInterfaces();
130 WORD wVersionRequested;
134 wVersionRequested = MAKEWORD(2, 2);
136 if (WSAStartup(wVersionRequested, &wsaData) != 0)
142 for (
const auto &netIface : this->hostInterfaces)
144 auto succeed = this->RegisterNetIface(netIface);
149 if (netIface == this->hostAddr && !succeed)
151 this->RegisterNetIface(
"127.0.0.1");
152 std::cerr <<
"Did you set the environment variable IGN_IP with a "
154 <<
" [" << netIface <<
"] seems an invalid local IP "
156 <<
" Using 127.0.0.1 as hostname." <<
std::endl;
157 this->hostAddr =
"127.0.0.1";
166 if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEADDR,
167 reinterpret_cast<const char *
>(&reuseAddr),
sizeof(reuseAddr)) != 0)
169 std::cerr <<
"Error setting socket option (SO_REUSEADDR)."
180 if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEPORT,
181 reinterpret_cast<const char *
>(&reusePort),
sizeof(reusePort)) != 0)
183 std::cerr <<
"Error setting socket option (SO_REUSEPORT)."
189 sockaddr_in localAddr;
190 memset(&localAddr, 0,
sizeof(localAddr));
191 localAddr.sin_family = AF_INET;
192 localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
193 localAddr.sin_port = htons(
static_cast<u_short
>(this->port));
195 if (bind(this->sockets.at(0),
196 reinterpret_cast<sockaddr *
>(&localAddr),
sizeof(sockaddr_in)) < 0)
203 memset(&this->mcastAddr, 0,
sizeof(this->mcastAddr));
204 this->mcastAddr.sin_family = AF_INET;
205 this->mcastAddr.sin_addr.s_addr =
206 inet_addr(this->kMulticastGroup.c_str());
207 this->mcastAddr.sin_port = htons(
static_cast<u_short
>(this->port));
217 this->exitMutex.lock();
219 this->exitMutex.unlock();
222 if (this->threadReception.joinable())
223 this->threadReception.join();
231 for (
const auto &sock : this->sockets)
254 this->enabled =
true;
258 this->timeNextHeartbeat = now;
259 this->timeNextActivity = now;
262 this->threadReception =
std::thread(&Discovery::RecvMessages,
this);
278 if (!this->info.AddPublisher(_publisher))
285 this->SendMsg(
AdvType, _publisher);
312 cb = this->connectionCb;
317 pub.SetPUuid(this->pUuid);
332 for (
const auto &node : proc.second)
364 return this->info.Publishers(
_topic, _publishers);
385 if (!this->info.Publisher(
_topic, this->pUuid, _nUuid, inf))
389 this->info.DelPublisherByNode(
_topic, this->pUuid, _nUuid);
405 return this->hostAddr;
415 return this->activityInterval;
426 return this->heartbeatInterval;
436 return this->silenceInterval;
445 this->activityInterval = _ms;
454 this->heartbeatInterval = _ms;
463 this->silenceInterval = _ms;
473 this->connectionCb =
_cb;
483 this->disconnectionCb =
_cb;
497 std::cout <<
"\tActivity: " << this->activityInterval
499 std::cout <<
"\tHeartbeat: " << this->heartbeatInterval
501 std::cout <<
"\tSilence: " << this->silenceInterval
534 this->info.TopicList(_topics);
543 if (!this->initialized)
545 this->initializedCv.wait(
lk, [
this]{
return this->initialized;});
552 private:
void UpdateActivity()
558 if (now < this->timeNextActivity)
561 for (
auto it = this->
activity.
cbegin(); it != this->activity.cend();)
564 auto elapsed = now - it->second;
568 (elapsed).count() > this->silenceInterval)
577 publisher.SetPUuid(it->first);
578 this->disconnectionCb(publisher);
592 private:
void UpdateHeartbeat()
597 std::lock_guard<std::mutex>
lock(this->mutex);
599 if (now < this->timeNextHeartbeat)
603 Publisher pub(
"",
"", this->pUuid,
"", AdvertiseOptions());
606 std::map<std::string, std::vector<Pub>> nodes;
608 std::lock_guard<std::mutex>
lock(this->mutex);
611 this->info.PublishersByProc(this->pUuid, nodes);
614 for (
const auto &topic : nodes)
616 for (
const auto &node : topic.second)
621 std::lock_guard<std::mutex>
lock(this->mutex);
622 if (!this->initialized)
624 ++this->numHeartbeatsUninitialized;
625 if (this->numHeartbeatsUninitialized == 2)
629 this->initialized =
true;
632 this->initializedCv.notify_all();
637 std::chrono::milliseconds(this->heartbeatInterval);
650 private:
int NextTimeout()
const
653 auto timeUntilNextHeartbeat = this->timeNextHeartbeat - now;
654 auto timeUntilNextActivity = this->timeNextActivity - now;
656 int t =
static_cast<int>(
658 (
std::min(timeUntilNextHeartbeat, timeUntilNextActivity)).count());
659 int t2 =
std::min(t, this->kTimeout);
664 private:
void RecvMessages()
666 bool timeToExit =
false;
670 int timeout = this->NextTimeout();
674 this->RecvDiscoveryUpdate();
680 this->UpdateHeartbeat();
681 this->UpdateActivity();
685 std::lock_guard<std::mutex>
lock(this->exitMutex);
693 private:
void RecvDiscoveryUpdate()
695 char rcvStr[Discovery::kMaxRcvStr];
698 sockaddr_in clntAddr;
699 socklen_t addrLen =
sizeof(clntAddr);
701 if ((recvfrom(this->sockets.at(0),
702 reinterpret_cast<raw_type *
>(rcvStr),
704 reinterpret_cast<sockaddr *
>(&clntAddr),
705 reinterpret_cast<socklen_t *
>(&addrLen))) < 0)
707 std::cerr <<
"Discovery::RecvDiscoveryUpdate() recvfrom error"
711 srcAddr = inet_ntoa(clntAddr.sin_addr);
712 srcPort = ntohs(clntAddr.sin_port);
716 std::cout <<
"\nReceived discovery update from " << srcAddr <<
": "
720 this->DispatchDiscoveryMsg(srcAddr, rcvStr);
727 private:
void DispatchDiscoveryMsg(
const std::string &_fromIp,
735 pBody += header.HeaderLength();
738 if (this->kWireVersion != header.Version())
741 auto recvPUuid = header.PUuid();
744 if (recvPUuid == this->pUuid)
751 std::lock_guard<std::mutex>
lock(this->mutex);
753 connectCb = this->connectionCb;
754 disconnectCb = this->disconnectionCb;
757 switch (header.Type())
762 transport::AdvertiseMessage<Pub> advMsg;
763 advMsg.Unpack(pBody);
768 _fromIp != this->hostAddr))
776 std::lock_guard<std::mutex>
lock(this->mutex);
777 added = this->info.AddPublisher(advMsg.Publisher());
780 if (added && connectCb)
783 connectCb(advMsg.Publisher());
791 SubscriptionMsg subMsg;
792 subMsg.Unpack(pBody);
793 auto recvTopic = subMsg.Topic();
798 std::lock_guard<std::mutex>
lock(this->mutex);
799 if (!this->info.HasAnyPublishers(recvTopic, this->pUuid))
804 if (!this->info.Publishers(recvTopic,
addresses))
808 for (
const auto &nodeInfo :
addresses[this->pUuid])
813 _fromIp != this->hostAddr))
819 this->SendMsg(
AdvType, nodeInfo);
833 std::lock_guard<std::mutex>
lock(this->mutex);
840 pub.SetPUuid(recvPUuid);
847 std::lock_guard<std::mutex>
lock(this->mutex);
848 this->info.DelPublishersByProc(recvPUuid);
856 transport::AdvertiseMessage<Pub> advMsg;
857 advMsg.Unpack(pBody);
862 _fromIp != this->hostAddr))
870 disconnectCb(advMsg.Publisher());
875 std::lock_guard<std::mutex>
lock(this->mutex);
876 this->info.DelPublisherByNode(advMsg.Publisher().Topic(),
877 advMsg.Publisher().PUuid(), advMsg.Publisher().NUuid());
884 std::cerr <<
"Unknown message type [" << header.Type() <<
"]\n";
896 private:
template<
typename T>
897 void SendMsg(
const uint8_t _type,
899 const uint16_t _flags = 0)
const
902 Header header(this->Version(), _pub.PUuid(), _type, _flags);
904 std::vector<char> buffer;
906 std::string topic = _pub.Topic();
914 transport::AdvertiseMessage<T> advMsg(header, _pub);
917 buffer.
resize(advMsg.MsgLength());
918 advMsg.Pack(
reinterpret_cast<char*
>(&buffer[0]));
919 msgLength =
static_cast<int>(advMsg.MsgLength());
925 SubscriptionMsg subMsg(header, topic);
928 buffer.
resize(subMsg.MsgLength());
929 subMsg.Pack(
reinterpret_cast<char*
>(&buffer[0]));
930 msgLength =
static_cast<int>(subMsg.MsgLength());
937 buffer.
resize(header.HeaderLength());
938 header.Pack(
reinterpret_cast<char*
>(&buffer[0]));
939 msgLength = header.HeaderLength();
943 std::cerr <<
"Discovery::SendMsg() error: Unrecognized message"
944 <<
" type [" << _type <<
"]" <<
std::endl;
950 for (
const auto &sock : this->Sockets())
952 if (sendto(sock,
reinterpret_cast<const raw_type *
>(
953 reinterpret_cast<unsigned char*
>(&buffer[0])),
955 reinterpret_cast<const sockaddr *
>(this->MulticastAddr()),
956 sizeof(*(this->MulticastAddr()))) != msgLength)
958 std::cerr <<
"Exception sending a message" <<
std::endl;
966 <<
" msg [" << topic <<
"]" <<
std::endl;
972 private:
const std::vector<int> &Sockets()
const
974 return this->sockets;
979 private:
const sockaddr_in *MulticastAddr()
const
981 return &this->mcastAddr;
986 private:
bool Verbose()
const
988 return this->verbose;
993 private: uint8_t Version()
const
995 return this->kWireVersion;
1002 private:
bool RegisterNetIface(
const std::string &_ip)
1005 int sock =
static_cast<int>(socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP));
1008 std::cerr <<
"Socket creation failed." <<
std::endl;
1015 struct in_addr ifAddr;
1016 ifAddr.s_addr = inet_addr(_ip.
c_str());
1017 if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
1018 reinterpret_cast<const char*
>(&ifAddr),
sizeof(ifAddr)) != 0)
1020 std::cerr <<
"Error setting socket option (IP_MULTICAST_IF)."
1025 this->sockets.push_back(sock);
1030 struct ip_mreq group;
1031 group.imr_multiaddr.s_addr =
1032 inet_addr(this->kMulticastGroup.c_str());
1033 group.imr_interface.s_addr = inet_addr(_ip.
c_str());
1034 if (setsockopt(this->sockets.at(0), IPPROTO_IP, IP_ADD_MEMBERSHIP,
1035 reinterpret_cast<const char*
>(&group),
sizeof(group)) != 0)
1037 std::cerr <<
"Error setting socket option (IP_ADD_MEMBERSHIP)."
1048 private:
static const unsigned int kDefActivityInterval = 100;
1053 private:
static const unsigned int kDefHeartbeatInterval = 1000;
1058 private:
static const unsigned int kDefSilenceInterval = 3000;
1061 private:
const std::string kMulticastGroup =
"224.0.0.7";
1064 private:
const int kTimeout = 250;
1067 private:
static const int kMaxRcvStr = 65536;
1071 private:
static const uint8_t kWireVersion = 8;
1077 private: std::string hostAddr;
1080 private: std::vector<std::string> hostInterfaces;
1083 private: std::string pUuid;
1088 private:
unsigned int silenceInterval;
1093 private:
unsigned int activityInterval;
1098 private:
unsigned int heartbeatInterval;
1107 private: TopicStorage<Pub> info;
1116 private:
bool verbose;
1122 private: sockaddr_in mcastAddr;
1143 private:
bool initialized;
1146 private:
unsigned int numHeartbeatsUninitialized;
1155 private:
bool enabled;
void raw_type
Definition Discovery.hh:43
A class for customizing the publication options for a topic or service advertised....
Definition AdvertiseOptions.hh:55
A discovery class that implements a distributed topic discovery protocol. It uses UDP multicast for s...
Definition Discovery.hh:97
void WaitForInit() const
Check if ready/initialized. If not, then wait on the initializedCv condition variable.
Definition Discovery.hh:539
void Start()
Start the discovery service. You probably want to register the callbacks for receiving discovery noti...
Definition Discovery.hh:245
bool Advertise(const Pub &_publisher)
Advertise a new message.
Definition Discovery.hh:269
unsigned int ActivityInterval() const
The discovery checks the validity of the topic information every 'activity interval' milliseconds.
Definition Discovery.hh:412
unsigned int HeartbeatInterval() const
Each node broadcasts periodic heartbeats to keep its topic information alive in other nodes....
Definition Discovery.hh:423
std::string HostAddr() const
Get the IP address of this host.
Definition Discovery.hh:402
std::map< std::string, Timestamp > activity
Definition Discovery.hh:1113
bool Unadvertise(const std::string &_topic, const std::string &_nUuid)
Unadvertise a new message. Broadcast a discovery message that will cancel all the discovery informati...
Definition Discovery.hh:374
void SetHeartbeatInterval(const unsigned int _ms)
Set the heartbeat interval.
Definition Discovery.hh:451
unsigned int SilenceInterval() const
Get the maximum time allowed without receiving any discovery information from a node before canceling...
Definition Discovery.hh:433
void PrintCurrentState() const
Print the current discovery state.
Definition Discovery.hh:487
void SetSilenceInterval(const unsigned int _ms)
Set the maximum silence interval.
Definition Discovery.hh:460
const TopicStorage< Pub > & Info() const
Get the discovery information.
Definition Discovery.hh:350
bool Publishers(const std::string &_topic, Addresses_M< Pub > &_publishers) const
Get all the publishers' information known for a given topic.
Definition Discovery.hh:360
void ConnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery connection events. Each time a new topic is connected,...
Definition Discovery.hh:470
bool Discover(const std::string &_topic) const
Request discovery information about a topic. When using this method, the user might want to use SetCo...
Definition Discovery.hh:300
void SetActivityInterval(const unsigned int _ms)
Set the activity interval.
Definition Discovery.hh:442
void DisconnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery disconnection events. Each time a topic is no longer active,...
Definition Discovery.hh:480
virtual ~Discovery()
Destructor.
Definition Discovery.hh:214
void TopicList(std::vector< std::string > &_topics) const
Get the list of topics currently advertised in the network.
Definition Discovery.hh:530
Discovery(const std::string &_pUuid, const int _port, const bool _verbose=false)
Constructor.
Definition Discovery.hh:103
This class stores all the information about a publisher. It stores the topic name that publishes,...
Definition Publisher.hh:38
Store address information about topics and provide convenient methods for adding new topics,...
Definition TopicStorage.hh:38
bool DelPublishersByProc(const std::string &_pUuid)
Remove all the publishers associated to a given process.
Definition TopicStorage.hh:258
T duration_cast(T... args)
SrvAddresses_M addresses
Definition Node.hh:986
static const uint8_t ByeType
Definition Packet.hh:39
std::chrono::steady_clock::time_point Timestamp
Definition TransportTypes.hh:151
std::string determineHost()
Determine IP or hostname. Reference: https://github.com/ros/ros_comm/blob/hydro-devel/clients/ roscpp...
bool pollSockets(const std::vector< int > &_sockets, const int _timeout)
std::unique_lock< std::recursive_mutex > lk(this->Shared() ->mutex)
static const uint8_t UnadvType
Definition Packet.hh:37
*brief Advertise a new service without any output parameter *In this version the callback is a free function *param[in] _topic Topic name associated to the service *param[in] _cb Callback to handle the service request with the *following void(* _cb)(const RequestT &_req)
Definition Node.hh:527
static const uint8_t SubType
Definition Packet.hh:36
Discovery< MessagePublisher > MsgDiscovery
Definition Discovery.hh:1160
@ HOST
Topic/service only available to subscribers in the same machine as the publisher.
Definition AdvertiseOptions.hh:44
@ PROCESS
Topic/service only available to subscribers in the same process as the publisher.
Definition AdvertiseOptions.hh:41
bool env(const std::string &_name, std::string &_value)
Find the environment variable '_name' and return its value.
std::function< void(const T &_publisher)> DiscoveryCallback
Definition TransportTypes.hh:117
*brief Advertise a new service without any output parameter *In this version the callback is a free function *param[in] _topic Topic name associated to the service *param[in] _cb Callback to handle the service request with the *following void const AdvertiseServiceOptions ReplyT const std::string & _topic
Definition Node.hh:558
static const std::vector< std::string > MsgTypesStr
Used for debugging the message type received/send.
Definition Packet.hh:44
static const uint8_t HeartbeatType
Definition Packet.hh:38
std::map< std::string, std::vector< T > > Addresses_M
Definition TransportTypes.hh:54
Discovery< ServicePublisher > SrvDiscovery
Definition Discovery.hh:1164
static const uint8_t AdvType
Definition Packet.hh:35
if(!TopicUtils::FullyQualifiedName(this->Options().Partition(), this->Options().NameSpace(), _topic, fullyQualifiedTopic))
Definition Node.hh:939
Definition AdvertiseOptions.hh:28