-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FD sharing protocol on localhost #82
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets change the server to be "dual server" with the TCP/IP mechanism.
Instead of connect/disconnect/reconnect, lets spin up both TCP & SHDMEM, by the following mechanism (C++):
- Create two threads and return the fd's for the server sockets back to main context
- cond_var to which completes sooner
- close the other that did not complete yet
- join both threads
- continue & return which protocol
For this dual option, we might create a separate protocol entry to signify going down this path. (so TCP_IP would just be TCP, SHDMEM just shdmem and TCP_IP_OR_LOCAL_SHDMEM would be the grouped one)
For the connecting client, it would be similar, if TCP_IP_OR_LOCAL_SHDMEM
is picked, then following occurs:
- First try connecting via local shdmem
- if fails, go TCP/IP route
So no need for the threading on the client side
Ok that sounds doable, but i have some thoughts.
I'd prefer a similar approach to this, and like the one i put in, which would only require some minor changes to depthai and would avoid all the shenaningans that multithreading adds to a multicore system. Maybe adding a timeout for SHDMEM connection of, for example, 5 seconds, would be perfect, since also the SHDMEM connection can happen only after TCP/IP has been connected, meaning we have both a server and a client already in place. |
So, WRT the server, it always waits for both - the the timing doesn't matter, just the sequence.
Above case does cover that - only one protocol at once. |
Yes exactly, it's a completely indipendent protocol, and read/writes work perfectly. The only use for TCP/IP is to, well, determine if we're connecting to localhost and as a backup if SHDMEM connection doesn't work, meaning the connection is immediate if we're not connecting to localhost, and just takes the minimum time to open a socket if we want to connect through SHDMEM.
Ok sure, that seems doable. It's a bit more complicated than the current system, and would function similarly. I still think the present system may be less of a hassle, with the multithreaded example we'll have to think about cleanup of open sockets that have not been connected to. In any case, perfectly doable. |
So this is what i came up with:
The only issue would be cleaning up the remaining open sockets. This can only be done with cleanup handlers inside the tcpipPlatformServer and the shdmemPlatformServer functions. The fact is, being a mixed C and C++ codebase, this absolutely makes the pthreads function throw out bogus errors like Therefore, our multithreading approach seems more trouble than it's worth. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't cancel the thread, but "close" the fd and just join both threads as is. Also, might be simpler to do this in C++ context due to thread, mutex & conditional_variable, etc...
Otherwise if really PITA, we can leave as is - but the "sleep" should atleast be "magic'd out" & add to comments as "future impl." posiblity, that we can go to spinning both protocols up in parallel
Also, this is Posix only I assume, modify CMakeLists.txt to only activate if Posix & be optionally disable-able as well
& add to CI & tests (which will make sure that we don't compile it in for Windows as well, etc...)
include/XLink/XLinkPrivateDefines.h
Outdated
XLINK_READ_REL_SPEC_REQ, | ||
XLINK_READ_REL_SPEC_RESP, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why removed?
Also, all new events MUST GO to the end of enum, otherwise backward compatibility is broken
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved them before XLINK_LAST_REQ and XLINK_LAST_RESP, otherwise they will cause this assert to fail:
int handleIncomingEvent(xLinkEvent_t* event, XLinkTimespec treceive) {
//this function will be dependent whether this is a client or a Remote
//specific actions to this peer
mvLog(MVLOG_DEBUG, "%s, size %u, streamId %u.\n", TypeToStr(event->header.type), event->header.size, event->header.streamId);
ASSERT_XLINK(event->header.type >= XLINK_WRITE_REQ
&& event->header.type != XLINK_REQUEST_LAST
&& event->header.type < XLINK_RESP_LAST);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm - do we need a special type of an event?
This will likely not work due to it being shared with (fixed) Bootloader on RVC2 (eg communicating with it will cause issues).
So lets try to either:
- Remove the need for custom xLinkEventType entry
- reorder them to the end & fix assert (eg, to check between two ranges)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Side note: xLinkEventHeader_t
is also a structure that should not change (or "reserved bits" should be used under bitField)
I think i misunderstood, you mean that we'd spin up a separate thread waiting for the SHDMEM connection and, if successful, we just close the TCP/IP connection, create a new platform FD and switch to that protocol?
It probably is more of a PITA that it's worth, at least imo. For the sleep, i think we could hide it in the shdmem protocol, with multiple retries delayed by a fixed amounts of ms to wait in between in connect().
Yes, i guess a simple preprocessor directive like |
So, that has been completed. It was indeed far easier to do in C++, i just had to add some minor modification to make sure the socket is closed properly and the unused thread is detached. Does it look good? |
include/XLink/XLinkPrivateDefines.h
Outdated
XLINK_READ_REL_SPEC_REQ, | ||
XLINK_READ_REL_SPEC_RESP, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm - do we need a special type of an event?
This will likely not work due to it being shared with (fixed) Bootloader on RVC2 (eg communicating with it will cause issues).
So lets try to either:
- Remove the need for custom xLinkEventType entry
- reorder them to the end & fix assert (eg, to check between two ranges)
include/XLink/XLinkPrivateDefines.h
Outdated
XLINK_READ_REL_SPEC_REQ, | ||
XLINK_READ_REL_SPEC_RESP, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Side note: xLinkEventHeader_t
is also a structure that should not change (or "reserved bits" should be used under bitField)
src/pc/protocols/tcpip_host.h
Outdated
xLinkPlatformErrorCode_t tcpipPlatformBootBootloader(const char *name); | ||
int tcpipPlatformDeviceFdDown(void *fd); | ||
int tcpipPlatformClose(void *fd); | ||
int tcpipPlatformBootFirmware(const deviceDesc_t* deviceDesc, const char* firmware, size_t length); | ||
|
||
bool tcpipIsLocalhost(const char *ip); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for this anymore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
src/pc/protocols/tcpip_host.cpp
Outdated
// Checks whether ip is localhost | ||
bool tcpipIsLocalhost(const char *ip) { | ||
if(strncmp(ip, "127.0.0.1", strlen("127.0.0.1")) == 0 || | ||
strncmp(ip, "0.0.0.0", strlen("0.0.0.0")) == 0) { | ||
return true; | ||
} | ||
|
||
return false; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for this anymore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
@@ -1163,6 +1233,16 @@ int tcpipPlatformBootFirmware(const deviceDesc_t* deviceDesc, const char* firmwa | |||
return -1; | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Retain tcpip_host.cpp
and local_memshd.cpp
as much as is, and move this code elsewhere.
Final goal being able to have this "special case" only under TCP_IP_OR_LOCAL_SHDMEM
protocol, and the new code calls tpcServer + cancel & memshdServer + cancel with threads (some state will have to be shared, so some changes to the tcpip & local shdmem will have to be done, but let those be rather minimal - that code is already super finicky & its good that it remains that way, with this change essentially just "closing" & joining the thread whose server didn't got any connections)
Also, no detaches - just joins() - if we cannot join, we have a problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And - move the thread & vars into func scope or struct. should look something like this in the end:
tcpipOrLocalShdmemServer(*protocol) {
if(protocol == nullptr) return;
std::mutex mtx;
std::condition_variable cv;
bool isTcpIpThreadFinished = false;
bool isShdmemThreadFinished = false;
int retTcpIp = -1, retShdmem = -1;
void *fdTcpIp = nullptr, *fdShdmem = nullptr;
auto thread_tcpip = std::thread([&mtx, &cv, &isTcpIpThreadFinished, &retTcpIp, &fdTcpIp](){
auto ret = tcpipPlatformServer(..., &fdTcpIp);
{
std::unique_lock<std::mutex> l(mtx);
isTcpIpThreadFinished = true;
retTcpIp = ret;
}
cv.notify_one();
});
auto thread_shdmem = std::thread([&mtx, &cv, &isShdmemThreadFinished, &retSdhmem, &fdShdmem](){
auto ret = shdmemPlatformServer(..., &fdShdmem);
{
std::unique_lock<std::mutex> l(mtx);
isShdmemThreadFinished = true;
retSdhmem = ret;
}
cv.notify_one();
});
{
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [&isShdmemThreadFinished, &isTcpIpThreadFinished]{ return isShdmemThreadFinished || isTcpIpThreadFinished; });
// As soon as either one finishes, the other can be cleaned
// Use signals, as "accept" cannot be unblocked by "close"ing the underlying socket
if(!isShdmemThreadFinished) pthread_kill(thread_shdmem.native(), SIGINT);
if(!isTcpIpThreadFinished) pthread_kill(thread_tcpip.native(), SIGINT);
}
// Wait for both threads to wrap up
if(thread_tcpip.joinable()) thread_tcpip.join();
if(thread_shdmem.joinable()) thread_shdmem.join();
// Asign the final protocol (once both threads finalize)
if(retTcpIp == 0) *protocol = X_LINK_TCP_IP;
if(retShdmem == 0) *protocol = X_LINK_LOCAL_SHDMEM;
// if both connected, close TCP_IP
if(retTcpIp == 0 && retShdmem == 0) {
tcpipPlatformClose(fdTcpIp);
}
// None of the connections were successful
if(retTcpIp || retShdmem) return -1;
...
}
That is if "signaling" works as expected with "accept". Otherwise you might have to switch to select
in the servers, for which might be easiest to modify funcs with additional parameter with a fd that one can "unblock" (instead the pthread_kill calls)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Signaling will work, but there's an issue: we don't clean up the socket instantiated with the socket() syscall. To stop the accept call, it is sufficent to use this procedure:
shutdown(sock, SHUT_RDWR);
#if defined(SO_LINGER)
const int set = 0;
setsockopt(sock, SOL_SOCKET, SO_LINGER, (const char*)&set, sizeof(set));
#endif
tcpip_close_socket(sock);
Since shutdown blocks any data transfer on the socket and setting SO_LINGER to zero means that when calling close() the socket will just be forced closed.
This has the added benefit of the thread ending gracefully and not needing a SIGINT.
That's the reason detach() was called, because the thread ends normally with no need of intervention.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK (as per reading yesterday), accept
cannot be unblocked with shutdown
We'd have to switch to select
If above works however, I'm up for that path - rather than signals
That's the reason detach() was called, because the thread ends normally with no need of intervention.
lets join if thread is joinable - rather than detach (if thread actually finished as expected)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue seems to be that signals cause issues, they don't work, while the shutdown(),setsockopt() and close() methods in succession do work. While shutdown() isn't the one closing the socket, it's instead the combination of setting SO_LINGER to 0 and then calling close() which forces an immediate closure of the socket, it's sort of a security measure to make sure the socket is neither sending or receiving data and is in the middle of something (if there's a pending connection request and the socket, for example, we don't want it to block in the middle, we can call shutdown and let it complete that request and then just close the socket).
lets join if thread is joinable - rather than detach (if thread actually finished as expected)
Gotcha.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noted - lets create a duplicate of "tcpipServer" function for this, where we have this (mostly) same logic flow as the original function, but with these changes (to not cause potential issues for the regular TCP_IP flow)
Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes have been pushed
Also, WRT the crosscompilation - any "unix" only things, especially if whole files, can be added either to:
To fix the CI |
Hmm, I understand the issue. I think option 2 is the most convinient. That said, will the fact that the streamPacketDesc_t structure has been expanded to add the fd field cause issues with the RVC2 bootloader? |
…and instead new event types will go at the end of the enum
I've moved the requests in a way that the first few will remain constant and the new requests can be added without causing any issues for the new ones. The detection protocol has been fixed with threads, and the conditional compilation has been put in place. The remaining fail CI's seem to be because of the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apart form this LGTM if all works well integrated
examples/CMakeLists.txt
Outdated
# Local shared memory example | ||
add_example(xlink_server_local xlink_server_local.cpp) | ||
add_example(xlink_client_local xlink_client_local.cpp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of doing "if unix" in source code, conditionally add these as examples IFF running on UNIX
Apply this elsewhere as well where needed (where we ifdef out a whole file - is better to make it "platform specific compiliation wise")
No, no issues - just the xLinkEventHeader_t structure and all the downstream types (enums, etc...)
Might be a bug in the mingw unix "translation" helpers, though I wasn't able to repro on my machine. Lets leave as is, we don't support mingw officially anyway |
Merging the new shared memory zerocopy protocol with the mainline branch |
This update creates a new interface to stream over files and a new protocol that allows for hosts where both the Server and the Client instances of XLink are running on the same device to simply send over the FD and its ownership instead of just streaming the file data over. This allows for sharing of memory buffers without the need for copying, i.e. zerocopy.
An automatic discovery for the protocol is setup for TCP/IP.
It is to be noted that the packet size for XLink has been altered by adding a new FD attribute, meaning that older versions of XLink won't be able to parse the packets correctly.