Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FD sharing protocol on localhost #82

Merged
merged 53 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from 47 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
9f17ea0
Added X_LINK_LOCAL_SHDMEM protocol skeleton
TheMutta Jun 11, 2024
c4d4c45
Examples added.
TheMutta Jun 11, 2024
03539a3
Added checks for shdmem functions to compile only in Linux system
TheMutta Jun 12, 2024
283bbdc
Removed #warning directive.
TheMutta Jun 12, 2024
5562d45
Using correct FD for socket and not including headears on non-unix fu…
TheMutta Jun 12, 2024
3faa07c
Added XLINK_READ_FD and XLINK_WRITE_FD operations
TheMutta Jun 12, 2024
8aa655b
Removed binary files.
TheMutta Jun 12, 2024
bdbc176
Revert "Removed binary files."
TheMutta Jun 12, 2024
4cc25ab
Revert "Added XLINK_READ_FD and XLINK_WRITE_FD operations"
TheMutta Jun 12, 2024
4603388
Added XLINK_WRITE_FD_REQ with XLinkWriteFd function to send over FDs
TheMutta Jun 12, 2024
685245b
Adding include files with definitions for XLINK_WRITE_FD_REQ with XLi…
TheMutta Jun 12, 2024
43e39d6
Working examples
TheMutta Jun 12, 2024
011eab6
Fixed naming convention in the examples.
TheMutta Jun 12, 2024
706eea4
Added examples to CMakeLists.txt
TheMutta Jun 12, 2024
c3f6f3c
Added XLINK_WRITE_FD_RESP
TheMutta Jun 12, 2024
204f070
Finalized examples.
TheMutta Jun 12, 2024
4ce854b
SHDMEM protocol output reworked and finalized.
TheMutta Jun 12, 2024
c082504
Transferring fd through streamPacket
TheMutta Jun 13, 2024
0fb3d9a
Preparations for streaming files over the connection when XLinkWriteF…
TheMutta Jun 13, 2024
3a3c15a
Updated examples.
TheMutta Jun 13, 2024
ffbda08
Implemented file streaming on XLinkPlatformWriteFd
TheMutta Jun 13, 2024
43d2ced
Fixed broken CMakeLists.txt
TheMutta Jun 13, 2024
8c33a71
Automatic localhost detection and change to LOCAL_SHDMEM when using T…
TheMutta Jun 14, 2024
aa45e40
Updated examples
TheMutta Jun 14, 2024
48b6af8
Fixed typo in XLinkPlatformConnect
TheMutta Jun 19, 2024
9fb7537
Set fixed size for fd in streamPacketDesc_t at uint32_t
TheMutta Jun 19, 2024
0878b3b
Implemented XLinkWriteFdData, akin to XLinkWriteData2
TheMutta Jun 19, 2024
13a02ab
Changed fd to int32_t to allow for negative error values
TheMutta Jun 19, 2024
0a3444a
Fixed typo with dataBuffer instead of dataSize.
TheMutta Jun 19, 2024
fa9bfd9
shmem functions now returning byte count.
TheMutta Jun 20, 2024
05c8f0c
Examples enhanced for more comprehensive showcase of shdmem
TheMutta Jun 20, 2024
3c8d502
Server accepting data from all connections.
TheMutta Jun 20, 2024
7fb600a
Added extra parameters for completness
TheMutta Jun 25, 2024
f87dd85
Fixed xLinkEventType enum
TheMutta Jun 27, 2024
58a7eb1
Aded XLINK_WRITE_FD_RESP in TypeToStr in XLinkDispatcher
TheMutta Jun 27, 2024
53cb90c
Fixed writeFdEventMultipart
TheMutta Jun 27, 2024
77cbf61
Added size parameters to check wether the stream has enough space for…
TheMutta Jun 27, 2024
0e972c8
Passing FDs by value and not by reference
TheMutta Jun 28, 2024
cd96e6e
Removed binary files.
TheMutta Jun 28, 2024
e783d55
Revert "Added size parameters to check wether the stream has enough s…
TheMutta Jun 28, 2024
d35a4ef
Merge branch 'shdmem-merge' into shared-memory-on-localhost
TheMutta Jun 28, 2024
92f17cb
Added MSG_WAITALL flag to reads.
TheMutta Jun 28, 2024
7ab1388
Smart detection for shared memory protocol in TCP/IP
TheMutta Jul 1, 2024
629691f
Using multiple threads and cond var to connect to TCP/IP or SHDMEM
TheMutta Jul 2, 2024
ec429c3
Added conditional compilation for shdmemPlatform close
TheMutta Jul 2, 2024
89a4c13
Fixed flags for Windows.
TheMutta Jul 2, 2024
4e77fc1
Made certain example UNIX only
TheMutta Jul 2, 2024
741aff6
Removed tcpipIsLocalhost function from tcpip_host.cpp
TheMutta Jul 4, 2024
a0b71c4
Removed tcpipIsLocalhost function from tcpip_host.h
TheMutta Jul 4, 2024
fe25ef9
Added the custom X_LINK_TCP_IP_OR_LOCAL_SHDMEM protocol
TheMutta Jul 4, 2024
f017c1a
Merge branch 'shared-memory-on-localhost' of github.com:luxonis/XLink…
TheMutta Jul 4, 2024
5e89479
Made sure not to alter the first few event types for legacy systems, …
TheMutta Jul 5, 2024
62909dc
Conditional example compilation only for unix
TheMutta Jul 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,8 @@ add_example(xlink_server xlink_server.cpp)
add_example(xlink_server2 xlink_server2.cpp)
# Boot firmware
add_example(device_connect_reset device_connect_reset.cpp)

# Local shared memory example
add_example(xlink_server_local xlink_server_local.cpp)
add_example(xlink_client_local xlink_client_local.cpp)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of doing "if unix" in source code, conditionally add these as examples IFF running on UNIX
Apply this elsewhere as well where needed (where we ifdef out a whole file - is better to make it "platform specific compiliation wise")


119 changes: 119 additions & 0 deletions examples/xlink_client_local.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
#if defined(__unix__)

#include <cstring>
#include <cstddef>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/stat.h>
#include <unistd.h>
#include <cassert>

#include "XLink/XLink.h"
#include "XLink/XLinkPublicDefines.h"
#include "XLink/XLinkLog.h"

const long MAXIMUM_SHM_SIZE = 4096;
const char *SHARED_MEMORY_NAME = "/xlink_shared_memory_b";

XLinkGlobalHandler_t xlinkGlobalHandler = {};

int main(int argc, const char** argv){
xlinkGlobalHandler.protocol = X_LINK_TCP_IP;

mvLogDefaultLevelSet(MVLOG_ERROR);

printf("Initializing XLink...\n");
auto status = XLinkInitialize(&xlinkGlobalHandler);
if(X_LINK_SUCCESS != status) {
printf("Initializing wasn't successful\n");
return 1;
}

XLinkHandler_t handler;
handler.devicePath = "127.0.0.1";
handler.protocol = X_LINK_TCP_IP;
status = XLinkConnect(&handler);
if(X_LINK_SUCCESS != status) {
printf("Connecting wasn't successful\n");
return 1;
}

streamPacketDesc_t *packet;

auto s = XLinkOpenStream(0, "test", 1024);
assert(s != INVALID_STREAM_ID);

// Read the data packet containing the FD
auto r = XLinkReadData(s, &packet);
assert(r == X_LINK_SUCCESS);

long receivedFd = packet->fd;
if (receivedFd < 0) {
printf("Not a valid FD, data streamed through message\n");
return 1;
}

// Map the shared memory
void *sharedMemAddr =
mmap(NULL, MAXIMUM_SHM_SIZE, PROT_READ, MAP_SHARED, receivedFd, 0);
if (sharedMemAddr == MAP_FAILED) {
perror("mmap");
return 1;
}

// Read and print the message from shared memory
printf("Message from Process A: %s\n", static_cast<char *>(sharedMemAddr));

const char *normalMessage = "Normal message from Process B";
auto w = XLinkWriteData(s, (uint8_t*)normalMessage, strlen(normalMessage) + 1);
assert(w == X_LINK_SUCCESS);

const char *shmName = SHARED_MEMORY_NAME;
long shmFd = shm_open(shmName, O_CREAT | O_RDWR, 0666);
if (shmFd < 0) {
perror("shm_open");
return 1;
}

ftruncate(shmFd, MAXIMUM_SHM_SIZE);

void *addr = mmap(NULL, MAXIMUM_SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmFd, 0);
if (addr == MAP_FAILED) {
perror("mmap");
close(shmFd);
shm_unlink(shmName);
return 1;
}

// Write a message to the shared memory
const char *message = "Shared message from Process B!";
memcpy(addr, message, strlen(message) + 1);

// Send the FD through the XLinkWriteFd function
w = XLinkWriteFd(s, shmFd);
assert(w == X_LINK_SUCCESS);

r = XLinkReadData(s, &packet);
assert(w == X_LINK_SUCCESS);

printf("Message from Process A: %s\n", (char *)(packet->data));


munmap(sharedMemAddr, MAXIMUM_SHM_SIZE);

munmap(addr, MAXIMUM_SHM_SIZE);
close(shmFd);
unlink(shmName);

return 0;
}

#else

int main() {
return -1;
}

#endif
116 changes: 116 additions & 0 deletions examples/xlink_server_local.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
#if defined(__unix__)

#include <cstring>
#include <cstddef>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <cassert>

#include "XLink/XLink.h"
#include "XLink/XLinkPublicDefines.h"
#include "XLink/XLinkLog.h"

const long MAXIMUM_SHM_SIZE = 4096;
const char *SHARED_MEMORY_NAME = "/xlink_shared_memory_a";

XLinkGlobalHandler_t xlinkGlobalHandler = {};

int main(int argc, const char** argv){
xlinkGlobalHandler.protocol = X_LINK_TCP_IP;

mvLogDefaultLevelSet(MVLOG_ERROR);

printf("Initializing XLink...\n");
auto status = XLinkInitialize(&xlinkGlobalHandler);
if(X_LINK_SUCCESS != status) {
printf("Initializing wasn't successful\n");
return 1;
}

XLinkHandler_t handler;
handler.devicePath = "0.0.0.0";
handler.protocol = X_LINK_TCP_IP;
status = XLinkServerOnly(&handler);
if(X_LINK_SUCCESS != status) {
printf("Connecting wasn't successful\n");
return 1;
}

auto s = XLinkOpenStream(0, "test", 1024);
assert(s != INVALID_STREAM_ID);

const char *shmName = SHARED_MEMORY_NAME;
long shmFd = shm_open(shmName, O_CREAT | O_RDWR, 0666);
if (shmFd < 0) {
perror("shm_open");
return 1;
}

ftruncate(shmFd, MAXIMUM_SHM_SIZE);

void *addr = mmap(NULL, MAXIMUM_SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmFd, 0);
if (addr == MAP_FAILED) {
perror("mmap");
close(shmFd);
shm_unlink(shmName);
return 1;
}

// Write a message to the shared memory
const char *message = "Shared message from Process A!";
memcpy(addr, message, strlen(message) + 1);

// Send the FD through the XLinkWriteFd function
auto w = XLinkWriteFd(s, shmFd);
assert(w == X_LINK_SUCCESS);

streamPacketDesc_t *packet;
auto r = XLinkReadData(s, &packet);
assert(w == X_LINK_SUCCESS);

printf("Message from Process B: %s\n", (char *)(packet->data));

// Read the data packet containing the FD
r = XLinkReadData(s, &packet);
assert(r == X_LINK_SUCCESS);

long receivedFd = packet->fd;
if (receivedFd < 0) {
printf("Not a valid FD, data streamed through message\n");
return 1;
}

// Map the shared memory
void *sharedMemAddr =
mmap(NULL, MAXIMUM_SHM_SIZE, PROT_READ, MAP_SHARED, receivedFd, 0);
if (sharedMemAddr == MAP_FAILED) {
perror("mmap");
return 1;
}

// Read and print the message from shared memory
printf("Message from Process B: %s\n", static_cast<char *>(sharedMemAddr));

const char *normalMessage = "Normal message from Process A";
w = XLinkWriteData(s, (uint8_t*)normalMessage, strlen(normalMessage) + 1);
assert(w == X_LINK_SUCCESS);

munmap(sharedMemAddr, MAXIMUM_SHM_SIZE);

munmap(addr, MAXIMUM_SHM_SIZE);
close(shmFd);
unlink(shmName);

return 0;
}

#else

int main() {
return -1;
}

#endif
12 changes: 12 additions & 0 deletions include/XLink/XLink.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,18 @@ XLinkError_t XLinkWriteData(streamId_t const streamId, const uint8_t* buffer, in

XLinkError_t XLinkWriteData_(streamId_t streamId, const uint8_t* buffer, int size, XLinkTimespec* outTSend);

/**
* @brief Sends a package to initiate the writing of a file descriptor
* @warning Actual size of the written data is ALIGN_UP(size, 64)
* @param[in] streamId - stream link Id obtained from XLinkOpenStream call
* @param[in] buffer - FD to be transmitted
* @return Status code of the operation: X_LINK_SUCCESS (0) for success
*/
XLinkError_t XLinkWriteFd(streamId_t const streamId, const long fd);
XLinkError_t XLinkWriteFd_(streamId_t streamId, const long fd, XLinkTimespec* outTSend);
XLinkError_t XLinkWriteFdData(streamId_t streamId, const long fd, int fdSize, const uint8_t* dataBuffer, int dataSize);


/**
* @brief Sends a package to initiate the writing of data to a remote stream
* @warning Actual size of the written data is ALIGN_UP(size, 64)
Expand Down
8 changes: 5 additions & 3 deletions include/XLink/XLinkPlatform.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ typedef enum {
X_LINK_PLATFORM_DRIVER_NOT_LOADED = -128,
X_LINK_PLATFORM_USB_DRIVER_NOT_LOADED = X_LINK_PLATFORM_DRIVER_NOT_LOADED+X_LINK_USB_VSC,
X_LINK_PLATFORM_TCP_IP_DRIVER_NOT_LOADED = X_LINK_PLATFORM_DRIVER_NOT_LOADED+X_LINK_TCP_IP,
X_LINK_PLATFORM_LOCAL_SHDMEM_DRIVER_NOT_LOADED = X_LINK_PLATFORM_DRIVER_NOT_LOADED+X_LINK_LOCAL_SHDMEM,
X_LINK_PLATFORM_PCIE_DRIVER_NOT_LOADED = X_LINK_PLATFORM_DRIVER_NOT_LOADED+X_LINK_PCIE,
} xLinkPlatformErrorCode_t;

Expand Down Expand Up @@ -64,10 +65,10 @@ xLinkPlatformErrorCode_t XLinkPlatformFindArrayOfDevicesNames(
xLinkPlatformErrorCode_t XLinkPlatformBootRemote(const deviceDesc_t* deviceDesc, const char* binaryPath);
xLinkPlatformErrorCode_t XLinkPlatformBootFirmware(const deviceDesc_t* deviceDesc, const char* firmware, size_t length);
xLinkPlatformErrorCode_t XLinkPlatformConnect(const char* devPathRead, const char* devPathWrite,
XLinkProtocol_t protocol, void** fd);
XLinkProtocol_t *protocol, void** fd);
xLinkPlatformErrorCode_t XLinkPlatformBootBootloader(const char* name, XLinkProtocol_t protocol);
xLinkPlatformErrorCode_t XLinkPlatformServer(const char* devPathRead, const char* devPathWrite,
XLinkProtocol_t protocol, void** fd);
XLinkProtocol_t *protocol, void** fd);

UsbSpeed_t get_usb_speed();
const char* get_mx_serial();
Expand All @@ -86,7 +87,8 @@ xLinkPlatformErrorCode_t XLinkPlatformCloseRemote(xLinkDeviceHandle_t* deviceHan
// ------------------------------------

int XLinkPlatformWrite(xLinkDeviceHandle_t *deviceHandle, void *data, int size);
int XLinkPlatformRead(xLinkDeviceHandle_t *deviceHandle, void *data, int size);
int XLinkPlatformWriteFd(xLinkDeviceHandle_t *deviceHandle, const long fd, void *data2, int size2);
int XLinkPlatformRead(xLinkDeviceHandle_t *deviceHandle, void *data, int size, long *fd);

void* XLinkPlatformAllocateData(uint32_t size, uint32_t alignment);
void XLinkPlatformDeallocateData(void *ptr, uint32_t size, uint32_t alignment);
Expand Down
11 changes: 9 additions & 2 deletions include/XLink/XLinkPrivateDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,35 +83,42 @@ typedef enum
{
/*USB-X_LINK_PCIE related events*/
XLINK_WRITE_REQ,
XLINK_WRITE_FD_REQ, // only for the shared mem protocol
XLINK_READ_REQ,
XLINK_READ_REL_REQ,
XLINK_READ_REL_SPEC_REQ,
XLINK_CREATE_STREAM_REQ,
XLINK_CLOSE_STREAM_REQ,
XLINK_PING_REQ,
XLINK_RESET_REQ,

XLINK_REQUEST_LAST,
//note that is important to separate request and response
XLINK_WRITE_RESP,
XLINK_WRITE_FD_RESP, // only for the shared mem protocol
XLINK_READ_RESP,
XLINK_READ_REL_RESP,
XLINK_READ_REL_SPEC_RESP,
XLINK_CREATE_STREAM_RESP,
XLINK_CLOSE_STREAM_RESP,
XLINK_PING_RESP,
XLINK_RESET_RESP,

XLINK_RESP_LAST,

/*X_LINK_IPC related events*/
IPC_WRITE_REQ,
IPC_WRITE_FD_REQ,
IPC_READ_REQ,
IPC_CREATE_STREAM_REQ,
IPC_CLOSE_STREAM_REQ,

//
IPC_WRITE_RESP,
IPC_WRITE_FD_RESP,
IPC_READ_RESP,
IPC_CREATE_STREAM_RESP,
IPC_CLOSE_STREAM_RESP,
XLINK_READ_REL_SPEC_REQ,
XLINK_READ_REL_SPEC_RESP,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why removed?

Also, all new events MUST GO to the end of enum, otherwise backward compatibility is broken

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved them before XLINK_LAST_REQ and XLINK_LAST_RESP, otherwise they will cause this assert to fail:

int handleIncomingEvent(xLinkEvent_t* event, XLinkTimespec treceive) {
    //this function will be dependent whether this is a client or a Remote
    //specific actions to this peer
    mvLog(MVLOG_DEBUG, "%s, size %u, streamId %u.\n", TypeToStr(event->header.type), event->header.size, event->header.streamId);

    ASSERT_XLINK(event->header.type >= XLINK_WRITE_REQ
               && event->header.type != XLINK_REQUEST_LAST
               && event->header.type < XLINK_RESP_LAST); 

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm - do we need a special type of an event?
This will likely not work due to it being shared with (fixed) Bootloader on RVC2 (eg communicating with it will cause issues).

So lets try to either:

  • Remove the need for custom xLinkEventType entry
  • reorder them to the end & fix assert (eg, to check between two ranges)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note: xLinkEventHeader_t is also a structure that should not change (or "reserved bits" should be used under bitField)

} xLinkEventType_t;

typedef enum
Expand Down
3 changes: 3 additions & 0 deletions include/XLink/XLinkPublicDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ typedef enum{
X_LINK_NOT_IMPLEMENTED,
X_LINK_INIT_USB_ERROR,
X_LINK_INIT_TCP_IP_ERROR,
X_LINK_INIT_LOCAL_SHDMEM_ERROR,
X_LINK_INIT_PCIE_ERROR,
} XLinkError_t;

Expand All @@ -63,6 +64,7 @@ typedef enum{
X_LINK_PCIE,
X_LINK_IPC,
X_LINK_TCP_IP,
X_LINK_LOCAL_SHDMEM,
X_LINK_NMB_OF_PROTOCOLS,
X_LINK_ANY_PROTOCOL
} XLinkProtocol_t;
Expand Down Expand Up @@ -138,6 +140,7 @@ typedef struct streamPacketDesc_t
{
uint8_t* data;
uint32_t length;
int32_t fd; // file descriptor
XLinkTimespec tRemoteSent; /// remote timestamp of when the packet was sent. Related to remote clock. Note: not directly related to local clock
XLinkTimespec tReceived; /// local timestamp of when the packet was received. Related to local monotonic clock
} streamPacketDesc_t;
Expand Down
Loading
Loading