diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 5835ec3..e1691ab 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -47,3 +47,10 @@ add_example(xlink_server xlink_server.cpp) add_example(xlink_server2 xlink_server2.cpp) # Boot firmware add_example(device_connect_reset device_connect_reset.cpp) + +# Local shared memory example +if (UNIX) + add_example(xlink_server_local xlink_server_local.cpp) + add_example(xlink_client_local xlink_client_local.cpp) +endif (UNIX) + diff --git a/examples/xlink_client_local.cpp b/examples/xlink_client_local.cpp new file mode 100644 index 0000000..604cf18 --- /dev/null +++ b/examples/xlink_client_local.cpp @@ -0,0 +1,109 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "XLink/XLink.h" +#include "XLink/XLinkPublicDefines.h" +#include "XLink/XLinkLog.h" + +const long MAXIMUM_SHM_SIZE = 4096; +const char *SHARED_MEMORY_NAME = "/xlink_shared_memory_b"; + +XLinkGlobalHandler_t xlinkGlobalHandler = {}; + +int main(int argc, const char** argv){ + xlinkGlobalHandler.protocol = X_LINK_TCP_IP_OR_LOCAL_SHDMEM; + + mvLogDefaultLevelSet(MVLOG_ERROR); + + printf("Initializing XLink...\n"); + auto status = XLinkInitialize(&xlinkGlobalHandler); + if(X_LINK_SUCCESS != status) { + printf("Initializing wasn't successful\n"); + return 1; + } + + XLinkHandler_t handler; + handler.devicePath = "127.0.0.1"; + handler.protocol = X_LINK_TCP_IP_OR_LOCAL_SHDMEM; + status = XLinkConnect(&handler); + if(X_LINK_SUCCESS != status) { + printf("Connecting wasn't successful\n"); + return 1; + } + + streamPacketDesc_t *packet; + + auto s = XLinkOpenStream(0, "test", 1024); + assert(s != INVALID_STREAM_ID); + + // Read the data packet containing the FD + auto r = XLinkReadData(s, &packet); + assert(r == X_LINK_SUCCESS); + + long receivedFd = packet->fd; + if (receivedFd < 0) { + printf("Not a valid FD, data streamed through message\n"); + return 1; + } + + // Map the shared memory + void *sharedMemAddr = + mmap(NULL, MAXIMUM_SHM_SIZE, PROT_READ, MAP_SHARED, receivedFd, 0); + if (sharedMemAddr == MAP_FAILED) { + perror("mmap"); + return 1; + } + + // Read and print the message from shared memory + printf("Message from Process A: %s\n", static_cast(sharedMemAddr)); + + const char *normalMessage = "Normal message from Process B"; + auto w = XLinkWriteData(s, (uint8_t*)normalMessage, strlen(normalMessage) + 1); + assert(w == X_LINK_SUCCESS); + + const char *shmName = SHARED_MEMORY_NAME; + long shmFd = shm_open(shmName, O_CREAT | O_RDWR, 0666); + if (shmFd < 0) { + perror("shm_open"); + return 1; + } + + ftruncate(shmFd, MAXIMUM_SHM_SIZE); + + void *addr = mmap(NULL, MAXIMUM_SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmFd, 0); + if (addr == MAP_FAILED) { + perror("mmap"); + close(shmFd); + shm_unlink(shmName); + return 1; + } + + // Write a message to the shared memory + const char *message = "Shared message from Process B!"; + memcpy(addr, message, strlen(message) + 1); + + // Send the FD through the XLinkWriteFd function + w = XLinkWriteFd(s, shmFd); + assert(w == X_LINK_SUCCESS); + + r = XLinkReadData(s, &packet); + assert(w == X_LINK_SUCCESS); + + printf("Message from Process A: %s\n", (char *)(packet->data)); + + + munmap(sharedMemAddr, MAXIMUM_SHM_SIZE); + + munmap(addr, MAXIMUM_SHM_SIZE); + close(shmFd); + unlink(shmName); + + return 0; +} diff --git a/examples/xlink_server_local.cpp b/examples/xlink_server_local.cpp new file mode 100644 index 0000000..074da73 --- /dev/null +++ b/examples/xlink_server_local.cpp @@ -0,0 +1,106 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "XLink/XLink.h" +#include "XLink/XLinkPublicDefines.h" +#include "XLink/XLinkLog.h" + +const long MAXIMUM_SHM_SIZE = 4096; +const char *SHARED_MEMORY_NAME = "/xlink_shared_memory_a"; + +XLinkGlobalHandler_t xlinkGlobalHandler = {}; + +int main(int argc, const char** argv){ + xlinkGlobalHandler.protocol = X_LINK_TCP_IP_OR_LOCAL_SHDMEM; + + mvLogDefaultLevelSet(MVLOG_ERROR); + + printf("Initializing XLink...\n"); + auto status = XLinkInitialize(&xlinkGlobalHandler); + if(X_LINK_SUCCESS != status) { + printf("Initializing wasn't successful\n"); + return 1; + } + + XLinkHandler_t handler; + handler.devicePath = "0.0.0.0"; + handler.protocol = X_LINK_TCP_IP_OR_LOCAL_SHDMEM; + status = XLinkServerOnly(&handler); + if(X_LINK_SUCCESS != status) { + printf("Connecting wasn't successful\n"); + return 1; + } + + auto s = XLinkOpenStream(0, "test", 1024); + assert(s != INVALID_STREAM_ID); + + const char *shmName = SHARED_MEMORY_NAME; + long shmFd = shm_open(shmName, O_CREAT | O_RDWR, 0666); + if (shmFd < 0) { + perror("shm_open"); + return 1; + } + + ftruncate(shmFd, MAXIMUM_SHM_SIZE); + + void *addr = mmap(NULL, MAXIMUM_SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmFd, 0); + if (addr == MAP_FAILED) { + perror("mmap"); + close(shmFd); + shm_unlink(shmName); + return 1; + } + + // Write a message to the shared memory + const char *message = "Shared message from Process A!"; + memcpy(addr, message, strlen(message) + 1); + + // Send the FD through the XLinkWriteFd function + auto w = XLinkWriteFd(s, shmFd); + assert(w == X_LINK_SUCCESS); + + streamPacketDesc_t *packet; + auto r = XLinkReadData(s, &packet); + assert(w == X_LINK_SUCCESS); + + printf("Message from Process B: %s\n", (char *)(packet->data)); + + // Read the data packet containing the FD + r = XLinkReadData(s, &packet); + assert(r == X_LINK_SUCCESS); + + long receivedFd = packet->fd; + if (receivedFd < 0) { + printf("Not a valid FD, data streamed through message\n"); + return 1; + } + + // Map the shared memory + void *sharedMemAddr = + mmap(NULL, MAXIMUM_SHM_SIZE, PROT_READ, MAP_SHARED, receivedFd, 0); + if (sharedMemAddr == MAP_FAILED) { + perror("mmap"); + return 1; + } + + // Read and print the message from shared memory + printf("Message from Process B: %s\n", static_cast(sharedMemAddr)); + + const char *normalMessage = "Normal message from Process A"; + w = XLinkWriteData(s, (uint8_t*)normalMessage, strlen(normalMessage) + 1); + assert(w == X_LINK_SUCCESS); + + munmap(sharedMemAddr, MAXIMUM_SHM_SIZE); + + munmap(addr, MAXIMUM_SHM_SIZE); + close(shmFd); + unlink(shmName); + + return 0; +} diff --git a/include/XLink/XLink.h b/include/XLink/XLink.h index 09c3ba5..1f10bd9 100644 --- a/include/XLink/XLink.h +++ b/include/XLink/XLink.h @@ -304,6 +304,18 @@ XLinkError_t XLinkWriteData(streamId_t const streamId, const uint8_t* buffer, in XLinkError_t XLinkWriteData_(streamId_t streamId, const uint8_t* buffer, int size, XLinkTimespec* outTSend); +/** + * @brief Sends a package to initiate the writing of a file descriptor + * @warning Actual size of the written data is ALIGN_UP(size, 64) + * @param[in] streamId - stream link Id obtained from XLinkOpenStream call + * @param[in] buffer - FD to be transmitted + * @return Status code of the operation: X_LINK_SUCCESS (0) for success + */ +XLinkError_t XLinkWriteFd(streamId_t const streamId, const long fd); +XLinkError_t XLinkWriteFd_(streamId_t streamId, const long fd, XLinkTimespec* outTSend); +XLinkError_t XLinkWriteFdData(streamId_t streamId, const long fd, int fdSize, const uint8_t* dataBuffer, int dataSize); + + /** * @brief Sends a package to initiate the writing of data to a remote stream * @warning Actual size of the written data is ALIGN_UP(size, 64) diff --git a/include/XLink/XLinkPlatform.h b/include/XLink/XLinkPlatform.h index 6fbc82b..dbef2dc 100644 --- a/include/XLink/XLinkPlatform.h +++ b/include/XLink/XLinkPlatform.h @@ -34,6 +34,7 @@ typedef enum { X_LINK_PLATFORM_DRIVER_NOT_LOADED = -128, X_LINK_PLATFORM_USB_DRIVER_NOT_LOADED = X_LINK_PLATFORM_DRIVER_NOT_LOADED+X_LINK_USB_VSC, X_LINK_PLATFORM_TCP_IP_DRIVER_NOT_LOADED = X_LINK_PLATFORM_DRIVER_NOT_LOADED+X_LINK_TCP_IP, + X_LINK_PLATFORM_LOCAL_SHDMEM_DRIVER_NOT_LOADED = X_LINK_PLATFORM_DRIVER_NOT_LOADED+X_LINK_LOCAL_SHDMEM, X_LINK_PLATFORM_PCIE_DRIVER_NOT_LOADED = X_LINK_PLATFORM_DRIVER_NOT_LOADED+X_LINK_PCIE, } xLinkPlatformErrorCode_t; @@ -64,10 +65,10 @@ xLinkPlatformErrorCode_t XLinkPlatformFindArrayOfDevicesNames( xLinkPlatformErrorCode_t XLinkPlatformBootRemote(const deviceDesc_t* deviceDesc, const char* binaryPath); xLinkPlatformErrorCode_t XLinkPlatformBootFirmware(const deviceDesc_t* deviceDesc, const char* firmware, size_t length); xLinkPlatformErrorCode_t XLinkPlatformConnect(const char* devPathRead, const char* devPathWrite, - XLinkProtocol_t protocol, void** fd); + XLinkProtocol_t *protocol, void** fd); xLinkPlatformErrorCode_t XLinkPlatformBootBootloader(const char* name, XLinkProtocol_t protocol); xLinkPlatformErrorCode_t XLinkPlatformServer(const char* devPathRead, const char* devPathWrite, - XLinkProtocol_t protocol, void** fd); + XLinkProtocol_t *protocol, void** fd); UsbSpeed_t get_usb_speed(); const char* get_mx_serial(); @@ -86,7 +87,8 @@ xLinkPlatformErrorCode_t XLinkPlatformCloseRemote(xLinkDeviceHandle_t* deviceHan // ------------------------------------ int XLinkPlatformWrite(xLinkDeviceHandle_t *deviceHandle, void *data, int size); -int XLinkPlatformRead(xLinkDeviceHandle_t *deviceHandle, void *data, int size); +int XLinkPlatformWriteFd(xLinkDeviceHandle_t *deviceHandle, const long fd, void *data2, int size2); +int XLinkPlatformRead(xLinkDeviceHandle_t *deviceHandle, void *data, int size, long *fd); void* XLinkPlatformAllocateData(uint32_t size, uint32_t alignment); void XLinkPlatformDeallocateData(void *ptr, uint32_t size, uint32_t alignment); diff --git a/include/XLink/XLinkPrivateDefines.h b/include/XLink/XLinkPrivateDefines.h index 777c699..9b9e92f 100644 --- a/include/XLink/XLinkPrivateDefines.h +++ b/include/XLink/XLinkPrivateDefines.h @@ -89,7 +89,8 @@ typedef enum XLINK_CLOSE_STREAM_REQ, XLINK_PING_REQ, XLINK_RESET_REQ, - XLINK_REQUEST_LAST, + + XLINK_STATIC_REQUEST_LAST, //note that is important to separate request and response XLINK_WRITE_RESP, XLINK_READ_RESP, @@ -98,20 +99,28 @@ typedef enum XLINK_CLOSE_STREAM_RESP, XLINK_PING_RESP, XLINK_RESET_RESP, - XLINK_RESP_LAST, + + XLINK_STATIC_RESP_LAST, /*X_LINK_IPC related events*/ IPC_WRITE_REQ, IPC_READ_REQ, IPC_CREATE_STREAM_REQ, IPC_CLOSE_STREAM_REQ, + // IPC_WRITE_RESP, IPC_READ_RESP, IPC_CREATE_STREAM_RESP, IPC_CLOSE_STREAM_RESP, + XLINK_READ_REL_SPEC_REQ, + XLINK_WRITE_FD_REQ, // only for the shared mem protocol + XLINK_REQUEST_LAST, + XLINK_READ_REL_SPEC_RESP, + XLINK_WRITE_FD_RESP, // only for the shared mem protocol + XLINK_RESP_LAST, } xLinkEventType_t; typedef enum diff --git a/include/XLink/XLinkPublicDefines.h b/include/XLink/XLinkPublicDefines.h index 7aa1dae..4c2a52c 100644 --- a/include/XLink/XLinkPublicDefines.h +++ b/include/XLink/XLinkPublicDefines.h @@ -54,6 +54,7 @@ typedef enum{ X_LINK_NOT_IMPLEMENTED, X_LINK_INIT_USB_ERROR, X_LINK_INIT_TCP_IP_ERROR, + X_LINK_INIT_LOCAL_SHDMEM_ERROR, X_LINK_INIT_PCIE_ERROR, } XLinkError_t; @@ -63,6 +64,8 @@ typedef enum{ X_LINK_PCIE, X_LINK_IPC, X_LINK_TCP_IP, + X_LINK_LOCAL_SHDMEM, + X_LINK_TCP_IP_OR_LOCAL_SHDMEM, X_LINK_NMB_OF_PROTOCOLS, X_LINK_ANY_PROTOCOL } XLinkProtocol_t; @@ -138,6 +141,7 @@ typedef struct streamPacketDesc_t { uint8_t* data; uint32_t length; + int32_t fd; // file descriptor XLinkTimespec tRemoteSent; /// remote timestamp of when the packet was sent. Related to remote clock. Note: not directly related to local clock XLinkTimespec tReceived; /// local timestamp of when the packet was received. Related to local monotonic clock } streamPacketDesc_t; diff --git a/src/pc/PlatformData.c b/src/pc/PlatformData.c index 874c2ae..8432d73 100644 --- a/src/pc/PlatformData.c +++ b/src/pc/PlatformData.c @@ -14,6 +14,7 @@ #include "usb_host.h" #include "pcie_host.h" #include "tcpip_host.h" +#include "local_memshd.h" #include "PlatformDeviceFd.h" #include "inttypes.h" @@ -35,6 +36,9 @@ #include #include #include +#include +#include +#include #endif #ifdef USE_LINK_JTAG @@ -91,12 +95,78 @@ int XLinkPlatformWrite(xLinkDeviceHandle_t *deviceHandle, void *data, int size) case X_LINK_TCP_IP: return tcpipPlatformWrite(deviceHandle->xLinkFD, data, size); +#if defined(__unix__) + case X_LINK_LOCAL_SHDMEM: + return shdmemPlatformWrite(deviceHandle->xLinkFD, data, size); +#endif + + default: + return X_LINK_PLATFORM_INVALID_PARAMETERS; + } +} + +int XLinkPlatformWriteFd(xLinkDeviceHandle_t *deviceHandle, const long fd, void *data2, int size2) +{ + if(!XLinkIsProtocolInitialized(deviceHandle->protocol)) { + return X_LINK_PLATFORM_DRIVER_NOT_LOADED+deviceHandle->protocol; + } + + switch (deviceHandle->protocol) { +#if defined(__unix__) + case X_LINK_LOCAL_SHDMEM: + return shdmemPlatformWriteFd(deviceHandle->xLinkFD, fd, data2, size2); + + case X_LINK_USB_VSC: + case X_LINK_USB_CDC: + case X_LINK_PCIE: + case X_LINK_TCP_IP: + { + if (fd <= 0) { + return X_LINK_ERROR; + } + + // Determine file size through fstat + struct stat fileStats; + fstat(fd, &fileStats); + int size = fileStats.st_size; + + // mmap the fine in memory + void *addr = mmap(NULL, 4096, PROT_READ, MAP_SHARED, fd, 0); + if (addr == MAP_FAILED) { + mvLog(MVLOG_ERROR, "Failed to mmap file to stream it over\n"); + return X_LINK_ERROR; + } + + // Use the respective write function to copy and send the message + int result = X_LINK_ERROR; + switch(deviceHandle->protocol) { + case X_LINK_USB_VSC: + case X_LINK_USB_CDC: + result = usbPlatformWrite(deviceHandle->xLinkFD, addr, size); + break; + case X_LINK_PCIE: + result = pciePlatformWrite(deviceHandle->xLinkFD, addr, size); + break; + case X_LINK_TCP_IP: + result = tcpipPlatformWrite(deviceHandle->xLinkFD, addr, size); + break; + default: + result = X_LINK_PLATFORM_INVALID_PARAMETERS; + break; + } + + // Unmap file + munmap(addr, size); + + return result; + } +#endif default: return X_LINK_PLATFORM_INVALID_PARAMETERS; } } -int XLinkPlatformRead(xLinkDeviceHandle_t *deviceHandle, void *data, int size) +int XLinkPlatformRead(xLinkDeviceHandle_t *deviceHandle, void *data, int size, long *fd) { if(!XLinkIsProtocolInitialized(deviceHandle->protocol)) { return X_LINK_PLATFORM_DRIVER_NOT_LOADED+deviceHandle->protocol; @@ -112,6 +182,11 @@ int XLinkPlatformRead(xLinkDeviceHandle_t *deviceHandle, void *data, int size) case X_LINK_TCP_IP: return tcpipPlatformRead(deviceHandle->xLinkFD, data, size); + +#if defined(__unix__) + case X_LINK_LOCAL_SHDMEM: + return shdmemPlatformRead(deviceHandle->xLinkFD, data, size, fd); +#endif default: return X_LINK_PLATFORM_INVALID_PARAMETERS; diff --git a/src/pc/PlatformDeviceControl.c b/src/pc/PlatformDeviceControl.c index 855edc9..6bd45fd 100644 --- a/src/pc/PlatformDeviceControl.c +++ b/src/pc/PlatformDeviceControl.c @@ -10,6 +10,8 @@ #include "usb_host.h" #include "pcie_host.h" #include "tcpip_host.h" +#include "local_memshd.h" +#include "tcpip_memshd.h" #include "XLinkStringUtils.h" #include "PlatformDeviceFd.h" @@ -95,6 +97,15 @@ xLinkPlatformErrorCode_t XLinkPlatformInit(XLinkGlobalHandler_t* globalHandler) xlinkSetProtocolInitialized(X_LINK_TCP_IP, 0); } +#if defined(__unix__) + // Initialize the shared memory protocol if necessary + if (shdmem_initialize() != 0) { + xlinkSetProtocolInitialized(X_LINK_LOCAL_SHDMEM, 0); + } +#endif + + xlinkSetProtocolInitialized(X_LINK_TCP_IP_OR_LOCAL_SHDMEM, 1); + return X_LINK_PLATFORM_SUCCESS; } @@ -165,12 +176,13 @@ xLinkPlatformErrorCode_t XLinkPlatformBootFirmware(const deviceDesc_t* deviceDes } -xLinkPlatformErrorCode_t XLinkPlatformConnect(const char* devPathRead, const char* devPathWrite, XLinkProtocol_t protocol, void** fd) +xLinkPlatformErrorCode_t XLinkPlatformConnect(const char* devPathRead, const char* devPathWrite, XLinkProtocol_t *protocol, void** fd) { - if(!XLinkIsProtocolInitialized(protocol)) { - return X_LINK_PLATFORM_DRIVER_NOT_LOADED+protocol; + if(!XLinkIsProtocolInitialized(*protocol)) { + return X_LINK_PLATFORM_DRIVER_NOT_LOADED+*protocol; } - switch (protocol) { + + switch (*protocol) { case X_LINK_USB_VSC: case X_LINK_USB_CDC: return usbPlatformConnect(devPathRead, devPathWrite, fd); @@ -180,17 +192,33 @@ xLinkPlatformErrorCode_t XLinkPlatformConnect(const char* devPathRead, const cha case X_LINK_TCP_IP: return tcpipPlatformConnect(devPathRead, devPathWrite, fd); + + case X_LINK_TCP_IP_OR_LOCAL_SHDMEM: + return tcpipOrLocalShdmemPlatformConnect(protocol, devPathRead, devPathWrite, fd); + +#if defined(__unix__) + case X_LINK_LOCAL_SHDMEM: + return shdmemPlatformConnect(devPathRead, devPathWrite, fd); +#endif default: return X_LINK_PLATFORM_INVALID_PARAMETERS; } } -xLinkPlatformErrorCode_t XLinkPlatformServer(const char* devPathRead, const char* devPathWrite, XLinkProtocol_t protocol, void** fd) +xLinkPlatformErrorCode_t XLinkPlatformServer(const char* devPathRead, const char* devPathWrite, XLinkProtocol_t *protocol, void** fd) { - switch (protocol) { + switch (*protocol) { case X_LINK_TCP_IP: - return tcpipPlatformServer(devPathRead, devPathWrite, fd); + return tcpipPlatformServer(devPathRead, devPathWrite, fd, NULL); + + case X_LINK_TCP_IP_OR_LOCAL_SHDMEM: + return tcpipOrLocalShdmemPlatformServer(protocol, devPathRead, devPathWrite, fd); + +#if defined(__unix__) + case X_LINK_LOCAL_SHDMEM: + return shdmemPlatformServer(devPathRead, devPathWrite, fd, NULL); +#endif default: return X_LINK_PLATFORM_INVALID_PARAMETERS; @@ -239,6 +267,11 @@ xLinkPlatformErrorCode_t XLinkPlatformCloseRemote(xLinkDeviceHandle_t* deviceHan case X_LINK_TCP_IP: return tcpipPlatformClose(deviceHandle->xLinkFD); + +#if defined(__unix__) + case X_LINK_LOCAL_SHDMEM: + return shdmemPlatformClose(deviceHandle->xLinkFD); +#endif default: return X_LINK_PLATFORM_INVALID_PARAMETERS; diff --git a/src/pc/PlatformDeviceSearch.c b/src/pc/PlatformDeviceSearch.c index c15133b..575c142 100644 --- a/src/pc/PlatformDeviceSearch.c +++ b/src/pc/PlatformDeviceSearch.c @@ -149,6 +149,7 @@ char* XLinkPlatformErrorToStr(const xLinkPlatformErrorCode_t errorCode) { case X_LINK_PLATFORM_TIMEOUT: return "X_LINK_PLATFORM_TIMEOUT"; case X_LINK_PLATFORM_USB_DRIVER_NOT_LOADED: return "X_LINK_PLATFORM_USB_DRIVER_NOT_LOADED"; case X_LINK_PLATFORM_TCP_IP_DRIVER_NOT_LOADED: return "X_LINK_PLATFORM_TCP_IP_DRIVER_NOT_LOADED"; + case X_LINK_PLATFORM_LOCAL_SHDMEM_DRIVER_NOT_LOADED: return "X_LINK_PLATFORM_LOCAL_SHDMEM_DRIVER_NOT_LOADED"; case X_LINK_PLATFORM_PCIE_DRIVER_NOT_LOADED: return "X_LINK_PLATFORM_PCIE_DRIVER_NOT_LOADED"; case X_LINK_PLATFORM_INVALID_PARAMETERS: return "X_LINK_PLATFORM_INVALID_PARAMETERS"; default: return ""; diff --git a/src/pc/protocols/local_memshd.cpp b/src/pc/protocols/local_memshd.cpp new file mode 100644 index 0000000..4de49d7 --- /dev/null +++ b/src/pc/protocols/local_memshd.cpp @@ -0,0 +1,218 @@ +/** + * @file local_memshd.c + * @brief Shared memory helper definitions +*/ + +#include "local_memshd.h" +#include "../PlatformDeviceFd.h" + +#define MVLOG_UNIT_NAME local_memshd +#include "XLinkLog.h" + +#if defined(__unix__) +#include +#include +#include +#include +#include +#include +#include +#include + +int shdmem_initialize() { + mvLog(MVLOG_DEBUG, "Shared memory initialized\n"); + return X_LINK_SUCCESS; +} + +int shdmemPlatformConnect(const char *devPathRead, const char *devPathWrite, void **desc) { + const char *socketPath = devPathWrite; + + mvLog(MVLOG_DEBUG, "Shared memory connect invoked with socket path %s\n", socketPath); + + int socketFd = socket(AF_UNIX, SOCK_STREAM, 0); + if (socketFd < 0) { + mvLog(MVLOG_FATAL, "Socket creation failed"); + return X_LINK_ERROR; + } + + struct sockaddr_un sockAddr; + memset(&sockAddr, 0, sizeof(sockAddr)); + sockAddr.sun_family = AF_UNIX; + strcpy(sockAddr.sun_path, socketPath); + + if (connect(socketFd, (struct sockaddr *)&sockAddr, sizeof(sockAddr)) < 0) { + mvLog(MVLOG_FATAL, "Socket connection failed"); + return X_LINK_ERROR; + } + + // Store the socket and create a "unique" key instead + // (as file descriptors are reused and can cause a clash with lookups between scheduler and link) + *desc = createPlatformDeviceFdKey((void*) (uintptr_t) socketFd); + + return X_LINK_SUCCESS; +} + +int shdmemPlatformServer(const char *devPathRead, const char *devPathWrite, void **desc, long *sockFd) { + const char *socketPath = devPathWrite; + mvLog(MVLOG_DEBUG, "Shared memory server invoked with socket path %s\n", socketPath); + + int socketFd = socket(AF_UNIX, SOCK_STREAM, 0); + if (socketFd < 0) { + mvLog(MVLOG_FATAL, "Socket creation failed"); + return X_LINK_ERROR; + } + + if (sockFd != nullptr) *sockFd = socketFd; + + struct sockaddr_un addrUn; + memset(&addrUn, 0, sizeof(addrUn)); + addrUn.sun_family = AF_UNIX; + strcpy(addrUn.sun_path, socketPath); + unlink(socketPath); + + if (bind(socketFd, (struct sockaddr *)&addrUn, sizeof(addrUn)) < 0) { + mvLog(MVLOG_FATAL, "Socket bind failed"); + return X_LINK_ERROR; + } + + listen(socketFd, 1); + mvLog(MVLOG_DEBUG, "Waiting for a connection...\n"); + int clientFd = accept(socketFd, NULL, NULL); + close(socketFd); + if (sockFd != nullptr) *sockFd = -1; + if (clientFd < 0) { + mvLog(MVLOG_FATAL, "Socket accept failed"); + return X_LINK_ERROR; + } + + + // Store the socket and create a "unique" key instead + // (as file descriptors are reused and can cause a clash with lookups between scheduler and link) + *desc = createPlatformDeviceFdKey((void*) (uintptr_t) clientFd); + + printf("XLink SHDMEM connected\n"); + return X_LINK_SUCCESS; + +} + +int shdmemPlatformClose(void **desc) { + long socketFd = 0; + if(getPlatformDeviceFdFromKey(desc, (void**)&socketFd)) { + mvLog(MVLOG_DEBUG, "Failed\n"); + return X_LINK_ERROR; + } + + close(socketFd); + + return X_LINK_SUCCESS; +} + +int shdmemPlatformRead(void *desc, void *data, int size, long *fd) { + long socketFd = 0; + if(getPlatformDeviceFdFromKey(desc, (void**)&socketFd)) { + mvLog(MVLOG_DEBUG, "Failed\n"); + return X_LINK_ERROR; + } + + struct msghdr msg = {}; + struct iovec iov; + iov.iov_base = data; + iov.iov_len = size; + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + + char ancillaryElementBuffer[CMSG_SPACE(sizeof(long))]; + msg.msg_control = ancillaryElementBuffer; + msg.msg_controllen = sizeof(ancillaryElementBuffer); + + int bytes; + if(bytes = recvmsg(socketFd, &msg, MSG_WAITALL) < 0) { + mvLog(MVLOG_ERROR, "Failed to recieve message"); + return X_LINK_ERROR; + } + + struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg); + if (cmsg && cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) { + long recvFd = *((long*)CMSG_DATA(cmsg)); + mvLog(MVLOG_DEBUG, "We received ad FD: %d\n", recvFd); + + + /* We have recieved a FD */ + *fd = recvFd; + } + + return bytes; +} + +int shdmemPlatformWrite(void *desc, void *data, int size) { + long socketFd = 0; + if(getPlatformDeviceFdFromKey(desc, (void**)&socketFd)) { + mvLog(MVLOG_ERROR, "Failed to get the socket FD\n"); + return X_LINK_ERROR; + } + + struct msghdr msg = {}; + struct iovec iov; + iov.iov_base = data; + iov.iov_len = size; + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + + int bytes; + if(bytes = sendmsg(socketFd, &msg, 0) < 0) { + mvLog(MVLOG_ERROR, "Failed to send message\n"); + return X_LINK_ERROR; + } + + return bytes; +} + +int shdmemPlatformWriteFd(void *desc, const long fd, void *data2, int size2) { + long socketFd = 0; + if(getPlatformDeviceFdFromKey(desc, (void**)&socketFd)) { + mvLog(MVLOG_ERROR, "Failed to get the socket FD\n"); + return X_LINK_ERROR; + } + + struct msghdr msg = {}; + struct iovec iov; + char buf[1] = {0}; // Buffer for single byte of data to send + if (data2 != NULL && size2 > 0) { + iov.iov_base = data2; + iov.iov_len = size2; + } else { + iov.iov_base = buf; + iov.iov_len = sizeof(buf); + } + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + + if (fd >= 0) { + char ancillaryElementBuffer[CMSG_SPACE(sizeof(long))]; + msg.msg_control = ancillaryElementBuffer; + msg.msg_controllen = sizeof(ancillaryElementBuffer); + + struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = CMSG_LEN(sizeof(long)); + + *((long*)CMSG_DATA(cmsg)) = fd; + } + + int bytes; + if(bytes = sendmsg(socketFd, &msg, 0) < 0) { + mvLog(MVLOG_ERROR, "Failed to send message"); + return X_LINK_ERROR; + } + + return bytes; +} + +int shdmemSetProtocol(XLinkProtocol_t *protocol, const char* devPathRead, const char* devPathWrite) { + devPathWrite = devPathRead = SHDMEM_DEFAULT_SOCKET; + *protocol = X_LINK_LOCAL_SHDMEM; + return X_LINK_SUCCESS; +} + +#endif /* !defined(__unix__) */ diff --git a/src/pc/protocols/local_memshd.h b/src/pc/protocols/local_memshd.h new file mode 100644 index 0000000..b3f58e4 --- /dev/null +++ b/src/pc/protocols/local_memshd.h @@ -0,0 +1,44 @@ +/** + * @file local_memshd.h + * @brief Shared memory helper public header +*/ + +#ifndef LOCAL_MEMSHD_H +#define LOCAL_MEMSHD_H + +/* **************************************************************************/ +/* Include Files */ +/* **************************************************************************/ +#include "XLinkPlatform.h" +#include "XLinkPublicDefines.h" + + +#if defined(__unix__) + +#define SHDMEM_DEFAULT_SOCKET "/tmp/xlink.sock" + +#ifdef __cplusplus +extern "C" { +#endif +/** + * @brief Initializes Shared Memory protocol +*/ +int shdmem_initialize(); + +int shdmemPlatformConnect(const char *devPathRead, const char *devPathWrite, void **desc); +int shdmemPlatformServer(const char *devPathRead, const char *devPathWrite, void **desc, long *sockFd); +int shdmemPlatformClose(void **desc); + +int shdmemPlatformRead(void *desc, void *data, int size, long *fd); +int shdmemPlatformWrite(void *desc, void *data, int size); +int shdmemPlatformWriteFd(void *desc, const long fd, void *data2, int size2); + +int shdmemSetProtocol(XLinkProtocol_t *protocol, const char* devPathRead, const char* devPathWrite); + +#ifdef __cplusplus +} +#endif + +#endif /* !defined(__unix__) */ + +#endif /* LOCAL_MEMSHD_H */ diff --git a/src/pc/protocols/tcpip_host.cpp b/src/pc/protocols/tcpip_host.cpp index 8190d06..032e99f 100644 --- a/src/pc/protocols/tcpip_host.cpp +++ b/src/pc/protocols/tcpip_host.cpp @@ -934,7 +934,7 @@ int tcpipPlatformWrite(void *fdKey, void *data, int size) } // TODO add IPv6 to tcpipPlatformConnect() -int tcpipPlatformServer(const char *devPathRead, const char *devPathWrite, void **fd) +int tcpipPlatformServer(const char *devPathRead, const char *devPathWrite, void **fd, long *sockFd) { TCPIP_SOCKET sock = socket(AF_INET, SOCK_STREAM, 0); if(sock < 0) @@ -943,6 +943,8 @@ int tcpipPlatformServer(const char *devPathRead, const char *devPathWrite, void tcpip_close_socket(sock); return X_LINK_PLATFORM_ERROR; } + + if (sockFd != nullptr) *sockFd = sock; int reuse_addr = 1; int sc; @@ -998,9 +1000,10 @@ int tcpipPlatformServer(const char *devPathRead, const char *devPathWrite, void #endif socklen_portable len = (socklen_portable) sizeof(client); - int connfd = accept(sock, (struct sockaddr*) &client, &len); + int connfd = accept(sock, (struct sockaddr*)&client, &len); // Regardless of return, close the listening socket tcpip_close_socket(sock); + if (sockFd != nullptr) *sockFd = -1; // Then check if connection was accepted succesfully if(connfd < 0) { @@ -1163,7 +1166,6 @@ int tcpipPlatformBootFirmware(const deviceDesc_t* deviceDesc, const char* firmwa return -1; } - // Discovery Service static std::thread serviceThread; static std::mutex serviceMutex; diff --git a/src/pc/protocols/tcpip_host.h b/src/pc/protocols/tcpip_host.h index da12d4e..3f86bfa 100644 --- a/src/pc/protocols/tcpip_host.h +++ b/src/pc/protocols/tcpip_host.h @@ -96,7 +96,7 @@ xLinkPlatformErrorCode_t tcpip_boot_bootloader(const char* name); int tcpipPlatformRead(void *fd, void *data, int size); int tcpipPlatformWrite(void *fd, void *data, int size); int tcpipPlatformConnect(const char *devPathRead, const char *devPathWrite, void **fd); -int tcpipPlatformServer(const char *devPathRead, const char *devPathWrite, void **fd); +int tcpipPlatformServer(const char *devPathRead, const char *devPathWrite, void **fd, long *sockFd); xLinkPlatformErrorCode_t tcpipPlatformBootBootloader(const char *name); int tcpipPlatformDeviceFdDown(void *fd); int tcpipPlatformClose(void *fd); @@ -112,4 +112,4 @@ bool tcpip_is_running_discovery_service(); } #endif -#endif /* TCPIP_HOST_H */ \ No newline at end of file +#endif /* TCPIP_HOST_H */ diff --git a/src/pc/protocols/tcpip_memshd.cpp b/src/pc/protocols/tcpip_memshd.cpp new file mode 100644 index 0000000..370235d --- /dev/null +++ b/src/pc/protocols/tcpip_memshd.cpp @@ -0,0 +1,136 @@ +#include "tcpip_host.h" +#include "local_memshd.h" +#include "tcpip_memshd.h" + +#include + +#include +#include +#include + +#include + +#if defined(__unix__) + +int tcpipOrLocalShdmemPlatformServer(XLinkProtocol_t *protocol, const char *devPathRead, const char *devPathWrite, void **fd) +{ + std::mutex connectionMutex; + std::condition_variable cv; + + bool isShdmemThreadFinished = false; + bool isTcpIpThreadFinished = false; + + int retTcpIp = -1, retShdmem = -1; + void *fdTcpIp = nullptr, *fdShdmem = nullptr; + long tcpIpSockFd = -1, shdmemSockFd = -1; + + auto threadTcpip = std::thread([&connectionMutex, + &cv, + &isTcpIpThreadFinished, + &retTcpIp, + &devPathRead, + &devPathWrite, + &fdTcpIp, + &tcpIpSockFd]() { + auto ret = tcpipPlatformServer(devPathRead, devPathWrite, &fdTcpIp, &tcpIpSockFd); + { + std::unique_lock l(connectionMutex); + retTcpIp = ret; + isTcpIpThreadFinished = true; + } + cv.notify_one(); + }); + + auto threadShdmem = std::thread([&connectionMutex, + &cv, + &isShdmemThreadFinished, + &retShdmem, + &fdShdmem, + &shdmemSockFd](){ + auto ret = shdmemPlatformServer(SHDMEM_DEFAULT_SOCKET, SHDMEM_DEFAULT_SOCKET, &fdShdmem, &shdmemSockFd); + { + std::unique_lock l(connectionMutex); + retShdmem = ret; + isShdmemThreadFinished = true; + } + cv.notify_one(); + }); + + { + std::unique_lock lock(connectionMutex); + cv.wait(lock, [&isShdmemThreadFinished, &isTcpIpThreadFinished]{ return isShdmemThreadFinished || isTcpIpThreadFinished; }); + + } + + // As soon as either one finishes, the other can be cleaned + // Use signals, as "accept" cannot be unblocked by "close"ing the underlying socket + if(!isShdmemThreadFinished) { + if(shdmemSockFd >= 0) { + shutdown(shdmemSockFd, SHUT_RDWR); + #if defined(SO_LINGER) + const int set = 0; + setsockopt(shdmemSockFd, SOL_SOCKET, SO_LINGER, (const char*)&set, sizeof(set)); + #endif + close(shdmemSockFd); + } + + } + + if(!isTcpIpThreadFinished) { + if(tcpIpSockFd >= 0) { + shutdown(tcpIpSockFd, SHUT_RDWR); + #if defined(SO_LINGER) + const int set = 0; + setsockopt(tcpIpSockFd, SOL_SOCKET, SO_LINGER, (const char*)&set, sizeof(set)); + #endif + close(tcpIpSockFd); + } + } + + // Wait for both threads to wrap up + if(threadTcpip.joinable()) threadTcpip.join(); + if(threadShdmem.joinable()) threadShdmem.join(); + + // Asign the final protocol (once both threads finalize) + if(retTcpIp == 0) { + *fd = fdTcpIp; + *protocol = X_LINK_TCP_IP; + } + + if(retShdmem == X_LINK_SUCCESS) { + *fd = fdShdmem; + shdmemSetProtocol(protocol, devPathRead, devPathWrite); + } + + // if both connected, close TCP_IP + if(retTcpIp == 0 && retShdmem == X_LINK_SUCCESS) { + tcpipPlatformClose(fdTcpIp); + } + + return X_LINK_SUCCESS; +} + +int tcpipOrLocalShdmemPlatformConnect(XLinkProtocol_t *protocol, const char *devPathRead, const char *devPathWrite, void **fd) { + if(shdmemPlatformConnect(SHDMEM_DEFAULT_SOCKET, SHDMEM_DEFAULT_SOCKET, fd) == X_LINK_SUCCESS) { + return shdmemSetProtocol(protocol, devPathRead, devPathWrite); + } + + *protocol = X_LINK_TCP_IP; + return tcpipPlatformConnect(devPathRead, devPathWrite, fd); +} + +#else + +int tcpipOrLocalShdmemPlatformServer(XLinkProtocol_t *protocol, const char *devPathRead, const char *devPathWrite, void **fd) +{ + *protocol = X_LINK_TCP_IP; + return tcpipPlatformServer(devPathRead, devPathWrite, fd, nullptr); +} + +int tcpipOrLocalShdmemPlatformConnect(XLinkProtocol_t *protocol, const char *devPathRead, const char *devPathWrite, void **fd) +{ + *protocol = X_LINK_TCP_IP; + return tcpipPlatformConnect(devPathRead, devPathWrite, fd); +} + +#endif diff --git a/src/pc/protocols/tcpip_memshd.h b/src/pc/protocols/tcpip_memshd.h new file mode 100644 index 0000000..f382f84 --- /dev/null +++ b/src/pc/protocols/tcpip_memshd.h @@ -0,0 +1,15 @@ +#pragma once + +#include "XLinkPlatform.h" +#include "XLinkPublicDefines.h" + +#ifdef __cplusplus +extern "C" { +#endif + +int tcpipOrLocalShdmemPlatformServer(XLinkProtocol_t *protocol, const char *devPathRead, const char *devPathWrite, void **fd); +int tcpipOrLocalShdmemPlatformConnect(XLinkProtocol_t *protocol, const char *devPathRead, const char *devPathWrite, void **fd); + +#ifdef __cplusplus +} +#endif diff --git a/src/shared/XLinkData.c b/src/shared/XLinkData.c index bd0bd49..7904d41 100644 --- a/src/shared/XLinkData.c +++ b/src/shared/XLinkData.c @@ -139,6 +139,62 @@ XLinkError_t XLinkWriteData_(streamId_t streamId, const uint8_t* buffer, return X_LINK_SUCCESS; } +XLinkError_t XLinkWriteFd(streamId_t const streamId, const long fd) +{ + return XLinkWriteFd_(streamId, fd, NULL); +} + +XLinkError_t XLinkWriteFd_(streamId_t streamId, const long fd, XLinkTimespec* outTSend) +{ + float opTime = 0.0f; + xLinkDesc_t* link = NULL; + XLINK_RET_IF(getLinkByStreamId(streamId, &link)); + streamId_t streamIdOnly = EXTRACT_STREAM_ID(streamId); + + xLinkEvent_t event = {0}; + XLINK_INIT_EVENT(event, streamIdOnly, XLINK_WRITE_FD_REQ, + sizeof(long),(void*)fd, link->deviceHandle); + + event.data2 = (void*)NULL; + event.data2Size = -1; + + XLINK_RET_IF(addEventWithPerf_(&event, &opTime, XLINK_NO_RW_TIMEOUT, outTSend)); + + if( glHandler->profEnable) { + glHandler->profilingData.totalWriteBytes += sizeof(long); + glHandler->profilingData.totalWriteTime += opTime; + } + link->profilingData.totalWriteBytes += sizeof(long); + link->profilingData.totalWriteTime += sizeof(long); + + return X_LINK_SUCCESS; +} + +XLinkError_t XLinkWriteFdData(streamId_t streamId, const long fd, int fdSize, const uint8_t* dataBuffer, int dataSize) +{ + ASSERT_XLINK(dataBuffer); + + float opTime = 0; + xLinkDesc_t* link = NULL; + XLINK_RET_IF(getLinkByStreamId(streamId, &link)); + streamId = EXTRACT_STREAM_ID(streamId); + + int totalSize = dataSize; + xLinkEvent_t event = {0}; + XLINK_INIT_EVENT(event, streamId, XLINK_WRITE_FD_REQ, totalSize, (void*)fd, link->deviceHandle); + event.data2 = (void*)dataBuffer; + event.data2Size = dataSize; + + XLINK_RET_IF(addEventWithPerf(&event, &opTime, XLINK_NO_RW_TIMEOUT)); + + if( glHandler->profEnable) { + glHandler->profilingData.totalWriteBytes += totalSize; + glHandler->profilingData.totalWriteTime += opTime; + } + + return X_LINK_SUCCESS; +} + XLinkError_t XLinkWriteData(streamId_t const streamId, const uint8_t* buffer, int size) { diff --git a/src/shared/XLinkDevice.c b/src/shared/XLinkDevice.c index 657c58c..4bf8956 100644 --- a/src/shared/XLinkDevice.c +++ b/src/shared/XLinkDevice.c @@ -188,7 +188,7 @@ XLinkError_t XLinkServerOnly(XLinkHandler_t* handler) link->deviceHandle.protocol = handler->protocol; int connectStatus = XLinkPlatformServer(handler->devicePath2, handler->devicePath, - link->deviceHandle.protocol, &link->deviceHandle.xLinkFD); + &link->deviceHandle.protocol, &link->deviceHandle.xLinkFD); if (connectStatus < 0) { /** @@ -277,8 +277,8 @@ XLinkError_t XLinkConnect(XLinkHandler_t* handler) link->deviceHandle.protocol = handler->protocol; int connectStatus = XLinkPlatformConnect(handler->devicePath2, handler->devicePath, - link->deviceHandle.protocol, &link->deviceHandle.xLinkFD); - + &link->deviceHandle.protocol, &link->deviceHandle.xLinkFD); + if (connectStatus < 0) { /** * Connection may be unsuccessful at some amount of first tries. @@ -291,7 +291,7 @@ XLinkError_t XLinkConnect(XLinkHandler_t* handler) // Return an informative error return parsePlatformError(connectStatus); } - + XLINK_RET_ERR_IF( DispatcherStart(link) != X_LINK_SUCCESS, X_LINK_TIMEOUT); @@ -646,6 +646,8 @@ XLinkError_t parsePlatformError(xLinkPlatformErrorCode_t rc) { return X_LINK_INIT_USB_ERROR; case X_LINK_PLATFORM_TCP_IP_DRIVER_NOT_LOADED: return X_LINK_INIT_TCP_IP_ERROR; + case X_LINK_PLATFORM_LOCAL_SHDMEM_DRIVER_NOT_LOADED: + return X_LINK_INIT_LOCAL_SHDMEM_ERROR; case X_LINK_PLATFORM_PCIE_DRIVER_NOT_LOADED: return X_LINK_INIT_PCIE_ERROR; case X_LINK_PLATFORM_ERROR: @@ -675,6 +677,7 @@ const char* XLinkErrorToStr(XLinkError_t val) { case X_LINK_NOT_IMPLEMENTED: return "X_LINK_NOT_IMPLEMENTED"; case X_LINK_INIT_USB_ERROR: return "X_LINK_INIT_USB_ERROR"; case X_LINK_INIT_TCP_IP_ERROR: return "X_LINK_INIT_TCP_IP_ERROR"; + case X_LINK_INIT_LOCAL_SHDMEM_ERROR: return "X_LINK_INIT_LOCAL_SHDMEM_ERROR"; case X_LINK_INIT_PCIE_ERROR: return "X_LINK_INIT_PCIE_ERROR"; default: return "INVALID_ENUM_VALUE"; @@ -693,6 +696,8 @@ const char* XLinkProtocolToStr(XLinkProtocol_t val) { case X_LINK_PCIE: return "X_LINK_PCIE"; case X_LINK_IPC: return "X_LINK_IPC"; case X_LINK_TCP_IP: return "X_LINK_TCP_IP"; + case X_LINK_LOCAL_SHDMEM: return "X_LINK_LOCAL_SHDMEM"; + case X_LINK_TCP_IP_OR_LOCAL_SHDMEM: return "X_LINK_TCP_IP_OR_LOCAL_SHDMEM"; case X_LINK_NMB_OF_PROTOCOLS: return "X_LINK_NMB_OF_PROTOCOLS"; case X_LINK_ANY_PROTOCOL: return "X_LINK_ANY_PROTOCOL"; default: @@ -753,4 +758,4 @@ const char* XLinkPCIEBootloaderToStr(XLinkPCIEBootloader val) { // ------------------------------------ // Helpers implementation. End. -// ------------------------------------ \ No newline at end of file +// ------------------------------------ diff --git a/src/shared/XLinkDispatcher.c b/src/shared/XLinkDispatcher.c index c7b672a..01aec98 100644 --- a/src/shared/XLinkDispatcher.c +++ b/src/shared/XLinkDispatcher.c @@ -504,21 +504,26 @@ char* TypeToStr(int type) case XLINK_WRITE_REQ: return "XLINK_WRITE_REQ"; case XLINK_READ_REQ: return "XLINK_READ_REQ"; case XLINK_READ_REL_REQ: return "XLINK_READ_REL_REQ"; - case XLINK_READ_REL_SPEC_REQ: return "XLINK_READ_REL_SPEC_REQ"; case XLINK_CREATE_STREAM_REQ:return "XLINK_CREATE_STREAM_REQ"; case XLINK_CLOSE_STREAM_REQ: return "XLINK_CLOSE_STREAM_REQ"; case XLINK_PING_REQ: return "XLINK_PING_REQ"; case XLINK_RESET_REQ: return "XLINK_RESET_REQ"; - case XLINK_REQUEST_LAST: return "XLINK_REQUEST_LAST"; - case XLINK_WRITE_RESP: return "XLINK_WRITE_RESP"; + case XLINK_STATIC_REQUEST_LAST: return "XLINK_STATIC_REQUEST_LAST"; case XLINK_READ_RESP: return "XLINK_READ_RESP"; + case XLINK_WRITE_RESP: return "XLINK_WRITE_RESP"; case XLINK_READ_REL_RESP: return "XLINK_READ_REL_RESP"; - case XLINK_READ_REL_SPEC_RESP: return "XLINK_READ_REL_SPEC_RESP"; case XLINK_CREATE_STREAM_RESP: return "XLINK_CREATE_STREAM_RESP"; case XLINK_CLOSE_STREAM_RESP: return "XLINK_CLOSE_STREAM_RESP"; case XLINK_PING_RESP: return "XLINK_PING_RESP"; case XLINK_RESET_RESP: return "XLINK_RESET_RESP"; + case XLINK_STATIC_RESP_LAST: return "XLINK_STATIC_RESP_LAST"; + case XLINK_READ_REL_SPEC_REQ: return "XLINK_READ_REL_SPEC_REQ"; + case XLINK_WRITE_FD_REQ: return "XLINK_WRITE_FD_REQ"; + case XLINK_REQUEST_LAST: return "XLINK_REQUEST_LAST"; + case XLINK_READ_REL_SPEC_RESP: return "XLINK_READ_REL_SPEC_RESP"; + case XLINK_WRITE_FD_RESP: return "XLINK_WRITE_FD_REQ"; case XLINK_RESP_LAST: return "XLINK_RESP_LAST"; + default: break; } @@ -796,7 +801,9 @@ static void* eventSchedulerRun(void* ctx) static int isEventTypeRequest(xLinkEventPriv_t* event) { - return event->packet.header.type < XLINK_REQUEST_LAST; + return (event->packet.header.type < XLINK_STATIC_REQUEST_LAST || + (event->packet.header.type >= XLINK_READ_REL_SPEC_REQ && + event->packet.header.type < XLINK_REQUEST_LAST)); } static void postAndMarkEventServed(xLinkEventPriv_t *event) @@ -897,7 +904,8 @@ static int dispatcherResponseServe(xLinkEventPriv_t * event, xLinkSchedulerState if (curr->lQueue.q[i].isServed == EVENT_PENDING && header->id == evHeader->id && - header->type == evHeader->type - XLINK_REQUEST_LAST -1) + ((header->type == evHeader->type - XLINK_STATIC_REQUEST_LAST - 1) || + (header->type == evHeader->type - XLINK_REQUEST_LAST - 1 + XLINK_READ_REL_SPEC_REQ))) { mvLog(MVLOG_DEBUG,"----------------------ISserved %s\n", TypeToStr(header->type)); diff --git a/src/shared/XLinkDispatcherImpl.c b/src/shared/XLinkDispatcherImpl.c index cc83bd4..3696942 100644 --- a/src/shared/XLinkDispatcherImpl.c +++ b/src/shared/XLinkDispatcherImpl.c @@ -31,7 +31,7 @@ static streamPacketDesc_t* movePacketFromStream(streamDesc_t *stream); static streamPacketDesc_t* getPacketFromStream(streamDesc_t* stream); static int releasePacketFromStream(streamDesc_t* stream, uint32_t* releasedSize); static int releaseSpecificPacketFromStream(streamDesc_t* stream, uint32_t* releasedSize, uint8_t* data); -static int addNewPacketToStream(streamDesc_t* stream, void* buffer, uint32_t size, XLinkTimespec trsend, XLinkTimespec treceive); +static int addNewPacketToStream(streamDesc_t* stream, void* buffer, uint32_t size, long fd, XLinkTimespec trsend, XLinkTimespec treceive); static int handleIncomingEvent(xLinkEvent_t* event, XLinkTimespec treceive); @@ -146,6 +146,110 @@ int writeEventMultipart(xLinkDeviceHandle_t* deviceHandle, void* data, int total return writtenByteCount; } +int writeFdEventMultipart(xLinkDeviceHandle_t* deviceHandle, long fd, int totalSize, void* data2, int data2Size) +{ + // Regular, single-part case + if(data2 == NULL || data2Size <= 0) { + return XLinkPlatformWriteFd(deviceHandle, fd, NULL, -1); + } + + // Multipart case + int errorCode = 0; + void *dataToWrite[] = {data2, NULL}; + int sizeToWrite[] = {data2Size, 0}; + + int writtenByteCount = 0, toWrite = 0, rc = 0; + + int totalSizeToWrite = 0; + + int pktlen = 0; + + // restriction on the output data size + // mitigates kernel crash on RPI when USB is used + const int xlinkPacketSizeMultiply = deviceHandle->protocol == X_LINK_USB_VSC ? 1024 : 1; //for usb3, usb2 is 512 + uint8_t swapSpaceScratchBufferVsc[1024 + 64]; + uint8_t swapSpaceScratchBuffer[1 + 64]; + uint8_t* swapSpace = swapSpaceScratchBuffer + ALIGN_UP((((uintptr_t)swapSpaceScratchBuffer) % 64), 64); + if(deviceHandle->protocol == X_LINK_USB_VSC) { + swapSpace = swapSpaceScratchBufferVsc + ALIGN_UP((((uintptr_t)swapSpaceScratchBufferVsc) % 64), 64); + } + + // the amount of bytes written from split transfer for "next" packet + int previousSplitWriteSize = 0; + for (int i = 0;; i++) { + void *currentPacket = dataToWrite[i]; + int currentPacketSize = sizeToWrite[i]; + if (currentPacket == NULL) break; + if (currentPacketSize == 0) break; + // printf("currentPacket %p size %d \n", currentPacket, currentPacketSize); + void *nextPacket = dataToWrite[i + 1]; + int nextPacketSize = sizeToWrite[i + 1]; + bool shouldSplitData = false; + + if (nextPacket != NULL && nextPacketSize > 0) { + totalSizeToWrite += currentPacketSize - (currentPacketSize % xlinkPacketSizeMultiply); + if(currentPacketSize % xlinkPacketSizeMultiply) { + shouldSplitData = true; + } + } else { + totalSizeToWrite += currentPacketSize; + } + + // printf("writtenByteCount %d %d\n",writtenByteCount , totalSizeToWrite); + int byteCountRelativeOffset = writtenByteCount; + while (writtenByteCount < totalSizeToWrite) { + toWrite = (pktlen && (totalSizeToWrite - writtenByteCount) > pktlen) + ? pktlen + : (totalSizeToWrite - writtenByteCount); + + rc = XLinkPlatformWriteFd(deviceHandle, fd, &((char *)currentPacket)[writtenByteCount - byteCountRelativeOffset + previousSplitWriteSize], toWrite); + fd = -1; + + if (rc < 0) + { + errorCode = rc; + goto function_epilogue; + } + writtenByteCount += toWrite; + } + if (shouldSplitData) { + int remainingToWriteCurrent = currentPacketSize - (totalSizeToWrite - byteCountRelativeOffset); + // printf("remainingToWriteCurrent %d \n", remainingToWriteCurrent); + if(remainingToWriteCurrent < 0 || remainingToWriteCurrent > xlinkPacketSizeMultiply) ASSERT_XLINK(0); + int remainingToWriteNext = nextPacketSize > xlinkPacketSizeMultiply - remainingToWriteCurrent ? xlinkPacketSizeMultiply - remainingToWriteCurrent : nextPacketSize; + // printf("remainingToWriteNext %d \n", remainingToWriteNext); + if(remainingToWriteNext < 0 || remainingToWriteNext > xlinkPacketSizeMultiply) ASSERT_XLINK(0); + + if (remainingToWriteCurrent) { + memcpy(swapSpace, &((char *)currentPacket)[writtenByteCount - byteCountRelativeOffset + previousSplitWriteSize], remainingToWriteCurrent); + if(remainingToWriteNext) { + memcpy(swapSpace + remainingToWriteCurrent, nextPacket, remainingToWriteNext); + } + toWrite = remainingToWriteCurrent + remainingToWriteNext; + if(toWrite > xlinkPacketSizeMultiply) ASSERT_XLINK(0); + rc = XLinkPlatformWriteFd(deviceHandle, fd, swapSpace, toWrite); + fd = -1; + if (rc < 0) + { + errorCode = rc; + goto function_epilogue; + } + writtenByteCount += toWrite; + totalSizeToWrite += remainingToWriteCurrent; + // printf("%s wrote %d \n", __FUNCTION__, rc); + + previousSplitWriteSize = remainingToWriteNext; + } + } else { + previousSplitWriteSize = 0; + } + } + +function_epilogue: + if (errorCode) return errorCode; + return writtenByteCount; +} + //adds a new event with parameters and returns event id int dispatcherEventSend(xLinkEvent_t *event, XLinkTimespec* sendTime) { @@ -173,6 +277,12 @@ int dispatcherEventSend(xLinkEvent_t *event, XLinkTimespec* sendTime) mvLog(MVLOG_ERROR,"Write failed %d\n", rc); return rc; } + } else if (event->header.type == XLINK_WRITE_FD_REQ) { + rc = writeFdEventMultipart(&event->deviceHandle, (long)event->data, event->header.size, event->data2, event->data2Size); + if(rc < 0) { + mvLog(MVLOG_ERROR,"Write failed %d\n", rc); + return rc; + } } return 0; @@ -180,8 +290,10 @@ int dispatcherEventSend(xLinkEvent_t *event, XLinkTimespec* sendTime) int dispatcherEventReceive(xLinkEvent_t* event){ // static xLinkEvent_t prevEvent = {0}; + long fd = -1; int rc = XLinkPlatformRead(&event->deviceHandle, - &event->header, sizeof(event->header)); + &event->header, sizeof(event->header), &fd); + (void)fd; XLinkTimespec treceive; getMonotonicTimestamp(&treceive); @@ -221,6 +333,7 @@ int dispatcherLocalEventGetResponse(xLinkEvent_t* event, xLinkEvent_t* response, mvLog(MVLOG_DEBUG, "%s\n",TypeToStr(event->header.type)); switch (event->header.type){ case XLINK_WRITE_REQ: + case XLINK_WRITE_FD_REQ: { //in case local tries to write after it issues close (writeSize is zero) stream = getStreamById(event->deviceHandle.xLinkFD, event->header.streamId); @@ -357,7 +470,8 @@ int dispatcherLocalEventGetResponse(xLinkEvent_t* event, xLinkEvent_t* response, mvLog(MVLOG_DEBUG,"XLINK_PING_REQ - do nothing\n"); break; } - case XLINK_WRITE_RESP: + case XLINK_WRITE_RESP: + case XLINK_WRITE_FD_RESP: case XLINK_READ_RESP: case XLINK_READ_REL_RESP: case XLINK_READ_REL_SPEC_RESP: @@ -392,6 +506,25 @@ int dispatcherRemoteEventGetResponse(xLinkEvent_t* event, xLinkEvent_t* response switch (event->header.type) { + case XLINK_WRITE_FD_REQ: + { + //let remote write immediately as we have a local buffer for the data + response->header.type = XLINK_WRITE_FD_RESP; + response->header.size = event->header.size; + response->header.streamId = event->header.streamId; + response->deviceHandle = event->deviceHandle; + XLINK_EVENT_ACKNOWLEDGE(response); + + // we got some data. We should unblock a blocked read + int xxx = DispatcherUnblockEvent(-1, + XLINK_READ_REQ, + response->header.streamId, + event->deviceHandle.xLinkFD); + (void) xxx; + mvLog(MVLOG_DEBUG,"unblocked from stream %d %d\n", + (int)response->header.streamId, (int)xxx); + } + break; case XLINK_WRITE_REQ: { //let remote write immediately as we have a local buffer for the data @@ -555,6 +688,8 @@ int dispatcherRemoteEventGetResponse(xLinkEvent_t* event, xLinkEvent_t* response // need to send the response, serve the event and then reset break; case XLINK_WRITE_RESP: + break; + case XLINK_WRITE_FD_RESP: break; case XLINK_READ_RESP: break; @@ -717,6 +852,7 @@ streamPacketDesc_t* movePacketFromStream(streamDesc_t* stream) } ret->data = NULL; ret->length = 0; + ret->fd = -1; // copy fields of first unused packet *ret = stream->packets[stream->firstPacketUnused]; @@ -808,11 +944,12 @@ int releaseSpecificPacketFromStream(streamDesc_t* stream, uint32_t* releasedSize return 0; } -int addNewPacketToStream(streamDesc_t* stream, void* buffer, uint32_t size, XLinkTimespec trsend, XLinkTimespec treceive) { +int addNewPacketToStream(streamDesc_t* stream, void* buffer, uint32_t size, long fd, XLinkTimespec trsend, XLinkTimespec treceive) { if (stream->availablePackets + stream->blockedPackets < XLINK_MAX_PACKETS_PER_STREAM) { stream->packets[stream->firstPacketFree].data = buffer; stream->packets[stream->firstPacketFree].length = size; + stream->packets[stream->firstPacketFree].fd = fd; stream->packets[stream->firstPacketFree].tRemoteSent = trsend; stream->packets[stream->firstPacketFree].tReceived = treceive; CIRCULAR_INCREMENT(stream->firstPacketFree, XLINK_MAX_PACKETS_PER_STREAM); @@ -827,12 +964,15 @@ int handleIncomingEvent(xLinkEvent_t* event, XLinkTimespec treceive) { //specific actions to this peer mvLog(MVLOG_DEBUG, "%s, size %u, streamId %u.\n", TypeToStr(event->header.type), event->header.size, event->header.streamId); - ASSERT_XLINK(event->header.type >= XLINK_WRITE_REQ + ASSERT_XLINK((event->header.type >= XLINK_WRITE_REQ + && event->header.type != XLINK_STATIC_REQUEST_LAST + && event->header.type < XLINK_STATIC_RESP_LAST) || + (event->header.type >= XLINK_READ_REL_SPEC_REQ && event->header.type != XLINK_REQUEST_LAST - && event->header.type < XLINK_RESP_LAST); + && event->header.type < XLINK_RESP_LAST)); // Then read the data buffer, which is contained only in the XLINK_WRITE_REQ event - if(event->header.type != XLINK_WRITE_REQ) { + if(event->header.type != XLINK_WRITE_REQ && event->header.type != XLINK_WRITE_FD_REQ) { return 0; } @@ -848,12 +988,13 @@ int handleIncomingEvent(xLinkEvent_t* event, XLinkTimespec treceive) { XLINK_OUT_WITH_LOG_IF(buffer == NULL, mvLog(MVLOG_FATAL,"out of memory to receive data of size = %zu\n", event->header.size)); - const int sc = XLinkPlatformRead(&event->deviceHandle, buffer, event->header.size); + long fd = -1; + const int sc = XLinkPlatformRead(&event->deviceHandle, buffer, event->header.size, &fd); XLINK_OUT_WITH_LOG_IF(sc < 0, mvLog(MVLOG_ERROR,"%s() Read failed %d\n", __func__, sc)); event->data = buffer; uint64_t tsec = event->header.tsecLsb | ((uint64_t)event->header.tsecMsb << 32); - XLINK_OUT_WITH_LOG_IF(addNewPacketToStream(stream, buffer, event->header.size, (XLinkTimespec){tsec, event->header.tnsec}, treceive), + XLINK_OUT_WITH_LOG_IF(addNewPacketToStream(stream, buffer, event->header.size, fd, (XLinkTimespec){tsec, event->header.tnsec}, treceive), mvLog(MVLOG_WARN,"No more place in stream. release packet\n")); rc = 0;