Skip to content

Commit

Permalink
* Upgrade to libcluon v0.0.90
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Berger <christian.berger@gu.se>
  • Loading branch information
chrberger committed May 9, 2018
1 parent 0c61a90 commit 59cb19a
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 10 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ project(cluon-rec2fuse)

################################################################################
# Defining libcluon.
set(CLUON_COMPLETE cluon-complete-v0.0.89.hpp)
set(CLUON_COMPLETE cluon-complete-v0.0.90.hpp)

################################################################################
# This project requires C++14 or newer.
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ docker run --rm -ti -v $PWD/myrecording.rec:/opt/input.rec \
--device=/dev/fuse \
-v /etc/passwd:/etc/passwd:ro \
-v /etc/group:/etc/group \
chrberger/cluon-rec2fuse-amd64:v0.0.89 \
chrberger/cluon-rec2fuse-amd64:v0.0.90 \
/bin/sh -c "chown $UID:$UID /opt/output && \
su -s /bin/sh $USER -c 'cluon-rec2fuse --rec=/opt/input.rec --odvd=/opt/odvd -f /opt/output' \
&& tail -f /dev/null"
Expand Down
86 changes: 78 additions & 8 deletions src/cluon-complete-v0.0.89.hpp → src/cluon-complete-v0.0.90.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// This is an auto-generated header-only single-file distribution of libcluon.
// Date: Mon, 07 May 2018 12:05:07 +0200
// Version: 0.0.89
// Date: Tue, 08 May 2018 19:13:56 +0200
// Version: 0.0.90
//
//
// Implementation of N4562 std::experimental::any (merged into C++17) for C++11 compilers.
Expand Down Expand Up @@ -5160,9 +5160,16 @@ class LIBCLUON_API UDPSender {
*/
std::pair<ssize_t, int32_t> send(std::string &&data) const noexcept;

public:
/**
* @return Port that this UDP sender will use for sending or 0 if no information available.
*/
uint16_t getSendFromPort() const noexcept;

private:
mutable std::mutex m_socketMutex{};
int32_t m_socket{-1};
uint16_t m_portToSentFrom{0};
struct sockaddr_in m_sendToAddress {};
};
} // namespace cluon
Expand Down Expand Up @@ -5206,6 +5213,7 @@ class LIBCLUON_API UDPSender {
#include <chrono>
#include <functional>
#include <mutex>
#include <set>
#include <string>
#include <thread>

Expand Down Expand Up @@ -5261,10 +5269,12 @@ class LIBCLUON_API UDPReceiver {
* @param receiveFromAddress Numerical IPv4 address to receive UDP packets from.
* @param receiveFromPort Port to receive UDP packets from.
* @param delegate Functional (noexcept) to handle received bytes; parameters are received data, sender, timestamp.
* @param localSendFromPort Port that an application is using to send data. This port (> 0) is ignored when data is received.
*/
UDPReceiver(const std::string &receiveFromAddress,
uint16_t receiveFromPort,
std::function<void(std::string &&, std::string &&, std::chrono::system_clock::time_point &&)> delegate) noexcept;
std::function<void(std::string &&, std::string &&, std::chrono::system_clock::time_point &&)> delegate,
uint16_t localSendFromPort = 0) noexcept;
~UDPReceiver() noexcept;

/**
Expand All @@ -5287,6 +5297,8 @@ class LIBCLUON_API UDPReceiver {
private:
int32_t m_socket{-1};
bool m_isBlockingSocket{true};
std::set<unsigned long> m_listOfLocalIPAddresses{};
uint16_t m_localSendFromPort;
struct sockaddr_in m_receiveFromAddress {};
struct ip_mreq m_mreq {};
bool m_isMulticast{false};
Expand Down Expand Up @@ -7683,7 +7695,8 @@ namespace cluon {
/**
This class provides an interface to an OpenDaVINCI v4 session. An OpenDaVINCI
v4 session allows the automatic exchange of time-stamped Envelopes carrying
user-defined messages usually using UDP multicast.
user-defined messages usually using UDP multicast. A running OD4Session will not
receive the bytes that itself has sent to other microservices.

There are two ways to participate in an OpenDaVINCI session. Variant A is simply
calling a user-supplied lambda whenever a new Envelope is received:
Expand Down Expand Up @@ -8930,6 +8943,7 @@ inline TerminateHandler::TerminateHandler() noexcept {
#else
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#endif
// clang-format on
Expand Down Expand Up @@ -8970,6 +8984,23 @@ inline UDPSender::UDPSender(const std::string &sendToAddress, uint16_t sendToPor

m_socket = ::socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);

#ifndef WIN32
// Bind to random address/port but store sender port.
struct sockaddr_in sendFromAddress;
std::memset(&sendFromAddress, 0, sizeof(sendFromAddress));
sendFromAddress.sin_family = AF_INET;
sendFromAddress.sin_port = 0; // Randomly choose a port to bind.
if (0 == ::bind(m_socket, reinterpret_cast<struct sockaddr *>(&sendFromAddress), sizeof(sendFromAddress))) { // NOLINT
struct sockaddr tmpAddr;
socklen_t length = sizeof(tmpAddr);
if (0 == ::getsockname(m_socket, &tmpAddr, &length)) {
struct sockaddr_in tmpAddrIn;
std::memcpy(&tmpAddrIn, &tmpAddr, sizeof(tmpAddrIn)); // NOLINT
m_portToSentFrom = ntohs(tmpAddrIn.sin_port);
}
}
#endif

#ifdef WIN32
if (m_socket < 0) {
std::cerr << "[cluon::UDPSender] Error while creating socket: " << WSAGetLastError() << std::endl;
Expand All @@ -8993,6 +9024,10 @@ inline UDPSender::~UDPSender() noexcept {
m_socket = -1;
}

inline uint16_t UDPSender::getSendFromPort() const noexcept {
return m_portToSentFrom;
}

inline std::pair<ssize_t, int32_t> UDPSender::send(std::string &&data) const noexcept {
if (-1 == m_socket) {
return {-1, EBADF};
Expand Down Expand Up @@ -9053,6 +9088,11 @@ inline std::pair<ssize_t, int32_t> UDPSender::send(std::string &&data) const noe
#include <fcntl.h>
#include <unistd.h>
#endif

#ifdef __linux__
#include <ifaddrs.h>
#include <netdb.h>
#endif
// clang-format on

#include <cstring>
Expand All @@ -9068,8 +9108,10 @@ namespace cluon {

inline UDPReceiver::UDPReceiver(const std::string &receiveFromAddress,
uint16_t receiveFromPort,
std::function<void(std::string &&, std::string &&, std::chrono::system_clock::time_point &&)> delegate) noexcept
: m_receiveFromAddress()
std::function<void(std::string &&, std::string &&, std::chrono::system_clock::time_point &&)> delegate,
uint16_t localSendFromPort) noexcept
: m_localSendFromPort(localSendFromPort)
, m_receiveFromAddress()
, m_mreq()
, m_readFromSocketThread()
, m_delegate(std::move(delegate)) {
Expand Down Expand Up @@ -9209,6 +9251,25 @@ inline UDPReceiver::UDPReceiver(const std::string &receiveFromAddress,
}
}

#ifdef __linux__
// Fill list of local IP address to avoid sending data to ourselves.
if (!(m_socket < 0)) {
struct ifaddrs *interfaceAddress;
if (0 == ::getifaddrs(&interfaceAddress)) {
for (struct ifaddrs *it = interfaceAddress; nullptr != it; it = it->ifa_next) {
if ( (nullptr != it->ifa_addr) && (it->ifa_addr->sa_family == AF_INET) ) {
if (0 == ::getnameinfo(it->ifa_addr, sizeof(struct sockaddr_in), nullptr, 0, nullptr, 0, NI_NUMERICHOST)) {
std::memcpy(&tmpSocketAddress, it->ifa_addr, sizeof(tmpSocketAddress));
const unsigned long LOCAL_IP = tmpSocketAddress.sin_addr.s_addr;
m_listOfLocalIPAddresses.insert(LOCAL_IP);
}
}
}
::freeifaddrs(interfaceAddress);
}
}
#endif

if (!(m_socket < 0)) {
// Constructing the receiving thread could fail.
try {
Expand Down Expand Up @@ -9401,10 +9462,19 @@ inline void UDPReceiver::readFromSocket() noexcept {
&((reinterpret_cast<struct sockaddr_in *>(&remote))->sin_addr), // NOLINT
remoteAddress.data(),
remoteAddress.max_size());
const unsigned long RECVFROM_IP{reinterpret_cast<struct sockaddr_in *>(&remote)->sin_addr.s_addr}; // NOLINT
const uint16_t RECVFROM_PORT{ntohs(reinterpret_cast<struct sockaddr_in *>(&remote)->sin_port)}; // NOLINT

// Create a pipeline entry to be processed concurrently.
// Check if the bytes actually came from us.
bool sentFromUs{false};
{
auto pos = m_listOfLocalIPAddresses.find(RECVFROM_IP);
const bool sentFromLocalIP = (pos != m_listOfLocalIPAddresses.end() && (*pos == RECVFROM_IP));
sentFromUs = sentFromLocalIP && (m_localSendFromPort == RECVFROM_PORT);
}

// Create a pipeline entry to be processed concurrently.
if (!sentFromUs) {
PipelineEntry pe;
pe.m_data = std::string(buffer.data(), static_cast<size_t>(bytesRead));
pe.m_from = std::string(remoteAddress.data()) + ':' + std::to_string(RECVFROM_PORT);
Expand Down Expand Up @@ -12676,7 +12746,7 @@ inline OD4Session::OD4Session(uint16_t CID, std::function<void(cluon::data::Enve
m_receiver = std::make_unique<cluon::UDPReceiver>(
"225.0.0." + std::to_string(CID), 12175, [this](std::string &&data, std::string &&from, std::chrono::system_clock::time_point &&timepoint) {
this->callback(std::move(data), std::move(from), std::move(timepoint));
});
}, m_sender.getSendFromPort() /* passing our local send from port to the UDPReceiver to filter out our own bytes */);
}

inline void OD4Session::timeTrigger(float freq, std::function<bool()> delegate) noexcept {
Expand Down

0 comments on commit 59cb19a

Please sign in to comment.