Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FD sharing protocol on localhost #82

Merged
merged 53 commits into from
Jul 5, 2024

Conversation

TheMutta
Copy link

@TheMutta TheMutta commented Jul 1, 2024

This update creates a new interface to stream over files and a new protocol that allows for hosts where both the Server and the Client instances of XLink are running on the same device to simply send over the FD and its ownership instead of just streaming the file data over. This allows for sharing of memory buffers without the need for copying, i.e. zerocopy.

An automatic discovery for the protocol is setup for TCP/IP.

It is to be noted that the packet size for XLink has been altered by adding a new FD attribute, meaning that older versions of XLink won't be able to parse the packets correctly.

TheMutta added 30 commits June 11, 2024 19:49
…d is called on a protocol different from X_LINK_LOCAL_SHDMEM
@TheMutta TheMutta self-assigned this Jul 1, 2024
Copy link
Collaborator

@themarpe themarpe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets change the server to be "dual server" with the TCP/IP mechanism.

Instead of connect/disconnect/reconnect, lets spin up both TCP & SHDMEM, by the following mechanism (C++):

  • Create two threads and return the fd's for the server sockets back to main context
  • cond_var to which completes sooner
  • close the other that did not complete yet
  • join both threads
  • continue & return which protocol

For this dual option, we might create a separate protocol entry to signify going down this path. (so TCP_IP would just be TCP, SHDMEM just shdmem and TCP_IP_OR_LOCAL_SHDMEM would be the grouped one)

For the connecting client, it would be similar, if TCP_IP_OR_LOCAL_SHDMEM is picked, then following occurs:

  • First try connecting via local shdmem
  • if fails, go TCP/IP route

So no need for the threading on the client side

@TheMutta
Copy link
Author

TheMutta commented Jul 1, 2024

Instead of connect/disconnect/reconnect, lets spin up both TCP & SHDMEM, by the following mechanism (C++):

  • Create two threads and return the fd's for the server sockets back to main context
  • cond_var to which completes sooner
  • close the other that did not complete yet
  • join both threads
  • continue & return which protocol

Ok that sounds doable, but i have some thoughts.
If we're localhost and the TCP/IP connection resolves faster than the SHDMEM connection, which could definitely be possible, since the TCP/IP thread could be the first to be executed and therefore the first to complete the task, then we'd lose all the benefits of zerocopy.
Apart from that, we also risk that on the server side the TCP/IP thread completes first, while on the client side the SHDMEM could complete first, which is not to be excluded on a multicore system. And if we're single core, the first one to connect would absolutely be arbitrary. And that would be detremental.

For this dual option, we might create a separate protocol entry to signify going down this path. (so TCP_IP would just be TCP, SHDMEM just shdmem and TCP_IP_OR_LOCAL_SHDMEM would be the grouped one)

For the connecting client, it would be similar, if TCP_IP_OR_LOCAL_SHDMEM is picked, then following occurs:

First try connecting via local shdmem
if fails, go TCP/IP route
So no need for the threading on the client side

I'd prefer a similar approach to this, and like the one i put in, which would only require some minor changes to depthai and would avoid all the shenaningans that multithreading adds to a multicore system. Maybe adding a timeout for SHDMEM connection of, for example, 5 seconds, would be perfect, since also the SHDMEM connection can happen only after TCP/IP has been connected, meaning we have both a server and a client already in place.

@themarpe
Copy link
Collaborator

themarpe commented Jul 1, 2024

If we're localhost and the TCP/IP connection resolves faster than the SHDMEM connection, which could definitely be possible, since the TCP/IP thread could be the first to be executed and therefore the first to complete the task, then we'd lose all the benefits of zerocopy.
Apart from that, we also risk that on the server side the TCP/IP thread completes first, while on the client side the SHDMEM could complete first, which is not to be excluded on a multicore system. And if we're single core, the first one to connect would absolutely be arbitrary. And that would be detremental.

So, WRT the server, it always waits for both - the the timing doesn't matter, just the sequence.
The client will try SHDMEM first, and if that succeeds, then the server will wrap up and not wait for TCPIP.


I'd prefer a similar approach to this, and like the one i put in, which would only require some minor changes to depthai and would avoid all the shenaningans that multithreading adds to a multicore system. Maybe adding a timeout for SHDMEM connection of, for example, 5 seconds, would be perfect, since also the SHDMEM connection can happen only after TCP/IP has been connected, meaning we have both a server and a client already in place.

Above case does cover that - only one protocol at once.
Also, SHDMEM protocol fully supports all read/write ops right? (there is no need to have the TCP alongside it, correct)

@TheMutta
Copy link
Author

TheMutta commented Jul 1, 2024

Also, SHDMEM protocol fully supports all read/write ops right? (there is no need to have the TCP alongside it, correct)

Yes exactly, it's a completely indipendent protocol, and read/writes work perfectly. The only use for TCP/IP is to, well, determine if we're connecting to localhost and as a backup if SHDMEM connection doesn't work, meaning the connection is immediate if we're not connecting to localhost, and just takes the minimum time to open a socket if we want to connect through SHDMEM.

So, WRT the server, it always waits for both - the the timing doesn't matter, just the sequence.
The client will try SHDMEM first, and if that succeeds, then the server will wrap up and not wait for TCPIP.

Ok sure, that seems doable. It's a bit more complicated than the current system, and would function similarly. I still think the present system may be less of a hassle, with the multithreaded example we'll have to think about cleanup of open sockets that have not been connected to. In any case, perfectly doable.

@TheMutta
Copy link
Author

TheMutta commented Jul 1, 2024

So this is what i came up with:

typedef struct {
    bool lock;
    const char **devPathRead;
    const char **devPathWrite;
    XLinkProtocol_t **protocol;
    void **fd;
} args_t;

static void *tcpipServer(args_t *args) {
    if(tcpipPlatformServer(*(args->protocol), *(args->devPathRead), *(args->devPathWrite), args->fd) == X_LINK_SUCCESS) {
	args->lock = true;
    }

    return NULL;
}

static void *shdmemServer(args_t *args) {
    if(shdmemPlatformServer(SHDMEM_DEFAULT_SOCKET, SHDMEM_DEFAULT_SOCKET, args->fd) == X_LINK_SUCCESS) {
	args->lock = true;
	shdmemSetProtocol(*(args->protocol), *(args->devPathRead), *(args->devPathWrite));

    }
    return NULL;
}

xLinkPlatformErrorCode_t XLinkPlatformServer(const char* devPathRead, const char* devPathWrite, XLinkProtocol_t *protocol, void** fd)
{
    switch (*protocol) {
        case X_LINK_TCP_IP: {
	    pthread_t shdmemThread;
	    pthread_t tcpipThread;

	    args_t args;
	    args.lock = false;
	    args.devPathRead = &devPathRead;
	    args.devPathWrite = &devPathWrite;
	    args.protocol = &protocol;
	    args.fd = fd;

	    pthread_create(&shdmemThread, NULL, shdmemServer, (void*)&args);
	    pthread_create(&tcpipThread, NULL, tcpipServer, (void*)&args);

	    while(!*((volatile bool*)&args.lock)) {
	    }

	    if (*protocol == X_LINK_TCP_IP) {
		pthread_join(tcpipThread, NULL);
		pthread_cancel(shdmemThread);

	        return X_LINK_SUCCESS;
	    } else if (*protocol == X_LINK_LOCAL_SHDMEM) {
	        pthread_join(shdmemThread, NULL);
		pthread_cancel(tcpipThread);

		return X_LINK_SUCCESS;
	    } else {
		return X_LINK_PLATFORM_ERROR;
	    }

	    }
	    ....

The only issue would be cleaning up the remaining open sockets. This can only be done with cleanup handlers inside the tcpipPlatformServer and the shdmemPlatformServer functions. The fact is, being a mixed C and C++ codebase, this absolutely makes the pthreads function throw out bogus errors like error: expected ‘while’ before ‘listen’, which i couldn't believe at first. Turns out that pthreads are bugged in a mixed C/C++ context and, therefore, the cleanup functions can't be used.

Therefore, our multithreading approach seems more trouble than it's worth.

Copy link
Collaborator

@themarpe themarpe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't cancel the thread, but "close" the fd and just join both threads as is. Also, might be simpler to do this in C++ context due to thread, mutex & conditional_variable, etc...

Otherwise if really PITA, we can leave as is - but the "sleep" should atleast be "magic'd out" & add to comments as "future impl." posiblity, that we can go to spinning both protocols up in parallel

Also, this is Posix only I assume, modify CMakeLists.txt to only activate if Posix & be optionally disable-able as well

& add to CI & tests (which will make sure that we don't compile it in for Windows as well, etc...)

Comment on lines 113 to 114
XLINK_READ_REL_SPEC_REQ,
XLINK_READ_REL_SPEC_RESP,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why removed?

Also, all new events MUST GO to the end of enum, otherwise backward compatibility is broken

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved them before XLINK_LAST_REQ and XLINK_LAST_RESP, otherwise they will cause this assert to fail:

int handleIncomingEvent(xLinkEvent_t* event, XLinkTimespec treceive) {
    //this function will be dependent whether this is a client or a Remote
    //specific actions to this peer
    mvLog(MVLOG_DEBUG, "%s, size %u, streamId %u.\n", TypeToStr(event->header.type), event->header.size, event->header.streamId);

    ASSERT_XLINK(event->header.type >= XLINK_WRITE_REQ
               && event->header.type != XLINK_REQUEST_LAST
               && event->header.type < XLINK_RESP_LAST); 

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm - do we need a special type of an event?
This will likely not work due to it being shared with (fixed) Bootloader on RVC2 (eg communicating with it will cause issues).

So lets try to either:

  • Remove the need for custom xLinkEventType entry
  • reorder them to the end & fix assert (eg, to check between two ranges)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note: xLinkEventHeader_t is also a structure that should not change (or "reserved bits" should be used under bitField)

@TheMutta
Copy link
Author

TheMutta commented Jul 1, 2024

Don't cancel the thread, but "close" the fd and just join both threads as is. Also, might be simpler to do this in C++ context due to thread, mutex & conditional_variable, etc...

I think i misunderstood, you mean that we'd spin up a separate thread waiting for the SHDMEM connection and, if successful, we just close the TCP/IP connection, create a new platform FD and switch to that protocol?

Otherwise if really PITA, we can leave as is - but the "sleep" should atleast be "magic'd out" & add to comments as "future impl." posiblity, that we can go to spinning both protocols up in parallel

It probably is more of a PITA that it's worth, at least imo. For the sleep, i think we could hide it in the shdmem protocol, with multiple retries delayed by a fixed amounts of ms to wait in between in connect().

Also, this is Posix only I assume, modify CMakeLists.txt to only activate if Posix & be optionally disable-able as well
& add to CI & tests (which will make sure that we don't compile it in for Windows as well, etc...)

Yes, i guess a simple preprocessor directive like #ifdef(__unix__)) will do the trick, since the code doesn't work in any other platform anyway.

@TheMutta
Copy link
Author

TheMutta commented Jul 2, 2024

Don't cancel the thread, but "close" the fd and just join both threads as is. Also, might be simpler to do this in C++ context due to thread, mutex & conditional_variable, etc...

So, that has been completed. It was indeed far easier to do in C++, i just had to add some minor modification to make sure the socket is closed properly and the unused thread is detached.

Does it look good?

Comment on lines 113 to 114
XLINK_READ_REL_SPEC_REQ,
XLINK_READ_REL_SPEC_RESP,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm - do we need a special type of an event?
This will likely not work due to it being shared with (fixed) Bootloader on RVC2 (eg communicating with it will cause issues).

So lets try to either:

  • Remove the need for custom xLinkEventType entry
  • reorder them to the end & fix assert (eg, to check between two ranges)

Comment on lines 113 to 114
XLINK_READ_REL_SPEC_REQ,
XLINK_READ_REL_SPEC_RESP,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note: xLinkEventHeader_t is also a structure that should not change (or "reserved bits" should be used under bitField)

xLinkPlatformErrorCode_t tcpipPlatformBootBootloader(const char *name);
int tcpipPlatformDeviceFdDown(void *fd);
int tcpipPlatformClose(void *fd);
int tcpipPlatformBootFirmware(const deviceDesc_t* deviceDesc, const char* firmware, size_t length);

bool tcpipIsLocalhost(const char *ip);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this anymore

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

Comment on lines 1236 to 1245
// Checks whether ip is localhost
bool tcpipIsLocalhost(const char *ip) {
if(strncmp(ip, "127.0.0.1", strlen("127.0.0.1")) == 0 ||
strncmp(ip, "0.0.0.0", strlen("0.0.0.0")) == 0) {
return true;
}

return false;
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this anymore

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

@@ -1163,6 +1233,16 @@ int tcpipPlatformBootFirmware(const deviceDesc_t* deviceDesc, const char* firmwa
return -1;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retain tcpip_host.cpp and local_memshd.cpp as much as is, and move this code elsewhere.

Final goal being able to have this "special case" only under TCP_IP_OR_LOCAL_SHDMEM protocol, and the new code calls tpcServer + cancel & memshdServer + cancel with threads (some state will have to be shared, so some changes to the tcpip & local shdmem will have to be done, but let those be rather minimal - that code is already super finicky & its good that it remains that way, with this change essentially just "closing" & joining the thread whose server didn't got any connections)

Also, no detaches - just joins() - if we cannot join, we have a problem.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And - move the thread & vars into func scope or struct. should look something like this in the end:

tcpipOrLocalShdmemServer(*protocol) {
    if(protocol == nullptr) return;

    std::mutex mtx;
    std::condition_variable cv;
    bool isTcpIpThreadFinished = false;
    bool isShdmemThreadFinished = false;
    int retTcpIp = -1, retShdmem = -1;
    void *fdTcpIp = nullptr, *fdShdmem = nullptr;

    auto thread_tcpip = std::thread([&mtx, &cv, &isTcpIpThreadFinished, &retTcpIp, &fdTcpIp](){
        auto ret = tcpipPlatformServer(..., &fdTcpIp);
        {
            std::unique_lock<std::mutex> l(mtx);
            isTcpIpThreadFinished = true;
            retTcpIp = ret;
        }
        cv.notify_one();
    });  

    auto thread_shdmem = std::thread([&mtx, &cv, &isShdmemThreadFinished, &retSdhmem, &fdShdmem](){
        auto ret = shdmemPlatformServer(..., &fdShdmem);
        {
            std::unique_lock<std::mutex> l(mtx);
            isShdmemThreadFinished = true;
            retSdhmem = ret;
        }
        cv.notify_one();
    });  

    {
        std::unique_lock<std::mutex> lock(mtx);
        cv.wait(lock, [&isShdmemThreadFinished, &isTcpIpThreadFinished]{ return isShdmemThreadFinished || isTcpIpThreadFinished; });

        // As soon as either one finishes, the other can be cleaned
        // Use signals, as "accept" cannot be unblocked by "close"ing the underlying socket
        if(!isShdmemThreadFinished) pthread_kill(thread_shdmem.native(), SIGINT);
        if(!isTcpIpThreadFinished) pthread_kill(thread_tcpip.native(), SIGINT);
    }

    // Wait for both threads to wrap up
    if(thread_tcpip.joinable()) thread_tcpip.join();
    if(thread_shdmem.joinable()) thread_shdmem.join();

    // Asign the final protocol (once both threads finalize)
    if(retTcpIp == 0) *protocol = X_LINK_TCP_IP;
    if(retShdmem == 0) *protocol = X_LINK_LOCAL_SHDMEM;

    // if both connected, close TCP_IP
    if(retTcpIp == 0 && retShdmem == 0) {
        tcpipPlatformClose(fdTcpIp);
    }

    // None of the connections were successful
    if(retTcpIp || retShdmem) return -1;

    ...
}

That is if "signaling" works as expected with "accept". Otherwise you might have to switch to select in the servers, for which might be easiest to modify funcs with additional parameter with a fd that one can "unblock" (instead the pthread_kill calls)

Copy link
Author

@TheMutta TheMutta Jul 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Signaling will work, but there's an issue: we don't clean up the socket instantiated with the socket() syscall. To stop the accept call, it is sufficent to use this procedure:

shutdown(sock, SHUT_RDWR);
#if defined(SO_LINGER)
const int set = 0;
setsockopt(sock, SOL_SOCKET, SO_LINGER, (const char*)&set, sizeof(set));
#endif
tcpip_close_socket(sock);

Since shutdown blocks any data transfer on the socket and setting SO_LINGER to zero means that when calling close() the socket will just be forced closed.
This has the added benefit of the thread ending gracefully and not needing a SIGINT.
That's the reason detach() was called, because the thread ends normally with no need of intervention.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TheMutta

AFAIK (as per reading yesterday), accept cannot be unblocked with shutdown
We'd have to switch to select

If above works however, I'm up for that path - rather than signals

That's the reason detach() was called, because the thread ends normally with no need of intervention.

lets join if thread is joinable - rather than detach (if thread actually finished as expected)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue seems to be that signals cause issues, they don't work, while the shutdown(),setsockopt() and close() methods in succession do work. While shutdown() isn't the one closing the socket, it's instead the combination of setting SO_LINGER to 0 and then calling close() which forces an immediate closure of the socket, it's sort of a security measure to make sure the socket is neither sending or receiving data and is in the middle of something (if there's a pending connection request and the socket, for example, we don't want it to block in the middle, we can call shutdown and let it complete that request and then just close the socket).

lets join if thread is joinable - rather than detach (if thread actually finished as expected)

Gotcha.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted - lets create a duplicate of "tcpipServer" function for this, where we have this (mostly) same logic flow as the original function, but with these changes (to not cause potential issues for the regular TCP_IP flow)

Thanks!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes have been pushed

@themarpe
Copy link
Collaborator

themarpe commented Jul 3, 2024

Also, WRT the crosscompilation - any "unix" only things, especially if whole files, can be added either to:

  • CMake - condiitioanlly compiled
  • if def'd in/out

To fix the CI

@TheMutta
Copy link
Author

TheMutta commented Jul 4, 2024

Hmm - do we need a special type of an event?
This will likely not work due to it being shared with (fixed) Bootloader on RVC2 (eg communicating with it will cause issues).

So lets try to either:

Remove the need for custom xLinkEventType entry
reorder them to the end & fix assert (eg, to check between two ranges)

Hmm, I understand the issue. I think option 2 is the most convinient. That said, will the fact that the streamPacketDesc_t structure has been expanded to add the fd field cause issues with the RVC2 bootloader?

…and instead new event types will go at the end of the enum
@TheMutta
Copy link
Author

TheMutta commented Jul 5, 2024

I've moved the requests in a way that the first few will remain constant and the new requests can be added without causing any issues for the new ones.

The detection protocol has been fixed with threads, and the conditional compilation has been put in place.

The remaining fail CI's seem to be because of the _findfirst64i32 or the ftruncate function redifiniton, none of which are in the source code.

Copy link
Collaborator

@themarpe themarpe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apart form this LGTM if all works well integrated

Comment on lines 51 to 53
# Local shared memory example
add_example(xlink_server_local xlink_server_local.cpp)
add_example(xlink_client_local xlink_client_local.cpp)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of doing "if unix" in source code, conditionally add these as examples IFF running on UNIX
Apply this elsewhere as well where needed (where we ifdef out a whole file - is better to make it "platform specific compiliation wise")

@themarpe
Copy link
Collaborator

themarpe commented Jul 5, 2024

That said, will the fact that the streamPacketDesc_t structure has been expanded to add the fd field cause issues with the RVC2 bootloader?

No, no issues - just the xLinkEventHeader_t structure and all the downstream types (enums, etc...)

The remaining fail CI's seem to be because of the _findfirst64i32 or the ftruncate function redifiniton, none of which are in the source code.

Might be a bug in the mingw unix "translation" helpers, though I wasn't able to repro on my machine. Lets leave as is, we don't support mingw officially anyway

@TheMutta
Copy link
Author

TheMutta commented Jul 5, 2024

Merging the new shared memory zerocopy protocol with the mainline branch

@TheMutta TheMutta merged commit ddfc42c into develop_server Jul 5, 2024
10 of 16 checks passed
@TheMutta TheMutta deleted the shared-memory-on-localhost branch July 5, 2024 15:13
@TheMutta TheMutta restored the shared-memory-on-localhost branch July 5, 2024 15:14
@TheMutta TheMutta deleted the shared-memory-on-localhost branch July 12, 2024 09:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants