diff --git a/srtcore/api.cpp b/srtcore/api.cpp index 7ec8ff570..c9f789d2a 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -182,7 +182,6 @@ srt::CUDTUnited::CUDTUnited() , m_InitLock() , m_iInstanceCount(0) , m_bGCStatus(false) - , m_ClosedSockets() { // Socket ID MUST start from a random value m_SocketIDGenerator = genRandomInt(1, MAX_SOCKET_VAL); @@ -469,6 +468,19 @@ SRTSOCKET srt::CUDTUnited::newSocket(CUDTSocket** pps) return ns->m_SocketID; } +// [[using locked(m_GlobControlLock)]] +void srt::CUDTUnited::swipeSocket_LOCKED(SRTSOCKET id, CUDTSocket* s, CUDTUnited::SwipeSocketTerm lateremove) +{ + s->core().m_bConnected = false; + s->core().m_bConnecting = false; + + m_ClosedSockets[id] = s; + if (!lateremove) + { + m_Sockets.erase(id); + } +} + int srt::CUDTUnited::newConnection(const SRTSOCKET listen, const sockaddr_any& peer, const CPacket& hspkt, @@ -833,8 +845,7 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen, ns->removeFromGroup(true); } #endif - m_Sockets.erase(id); - m_ClosedSockets[id] = ns; + swipeSocket_LOCKED(id, ns, SWIPE_NOW); } return -1; @@ -2038,13 +2049,16 @@ int srt::CUDTUnited::close(CUDTSocket* s) } #endif - m_Sockets.erase(s->m_SocketID); - m_ClosedSockets[s->m_SocketID] = s; + swipeSocket_LOCKED(s->m_SocketID, s, SWIPE_NOW); HLOGC(smlog.Debug, log << "@" << u << "U::close: Socket MOVED TO CLOSED for collecting later."); CGlobEvent::triggerEvent(); } + // Force the worker threads to exit and all active status cleared, + // without explicitly removing the muxer, only if the muxer is unique-owned. + killMux(s); + HLOGC(smlog.Debug, log << "@" << u << ": GLOBAL: CLOSING DONE"); // Check if the ID is still in closed sockets before you access it @@ -2642,7 +2656,9 @@ void srt::CUDTUnited::checkBrokenSockets() // close broken connections and start removal timer s->setClosed(); tbc.push_back(i->first); - m_ClosedSockets[i->first] = s; + + // NOTE: removal from m_SocketID POSTPONED. + swipeSocket_LOCKED(i->first, s, SWIPE_LATER); // remove from listener's queue sockets_t::iterator ls = m_Sockets.find(s->m_ListenSocket); @@ -2681,7 +2697,7 @@ void srt::CUDTUnited::checkBrokenSockets() if (closed_ago > seconds_from(1)) { CRNode* rnode = j->second->core().m_pRNode; - if (!rnode || !rnode->m_bOnList) + if (!rnode || !rnode->isOnList()) { HLOGC(smlog.Debug, log << "checkBrokenSockets: @" << j->second->m_SocketID << " closed " @@ -2690,6 +2706,11 @@ void srt::CUDTUnited::checkBrokenSockets() // HLOGC(smlog.Debug, log << "will unref socket: " << j->first); tbr.push_back(j->first); } + else + { + HLOGC(smlog.Debug, + log << "checkBrokenSockets: @" << j->second->m_SocketID << " still on the MX RcvUList - will retry..."); + } } } @@ -2720,11 +2741,11 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u) // threads. If that's the case, SKIP IT THIS TIME. The // socket will be checked next time the GC rollover starts. CSNode* sn = s->core().m_pSNode; - if (sn && sn->m_iHeapLoc != -1) + if (sn && sn->isOnList()) return; CRNode* rn = s->core().m_pRNode; - if (rn && rn->m_bOnList) + if (rn && rn->isOnList()) return; #if ENABLE_BONDING @@ -2735,9 +2756,6 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u) s->removeFromGroup(true); } #endif - // decrease multiplexer reference count, and remove it if necessary - const int mid = s->m_iMuxID; - { ScopedLock cg(s->m_AcceptLock); @@ -2758,8 +2776,7 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u) CUDTSocket* as = si->second; as->breakSocket_LOCKED(); - m_ClosedSockets[*q] = as; - m_Sockets.erase(*q); + swipeSocket_LOCKED(*q, as, SWIPE_NOW); } } @@ -2784,33 +2801,46 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u) HLOGC(smlog.Debug, log << "GC/removeSocket: closing associated UDT @" << u); s->core().closeInternal(); + removeMux(s); HLOGC(smlog.Debug, log << "GC/removeSocket: DELETING SOCKET @" << u); delete s; - HLOGC(smlog.Debug, log << "GC/removeSocket: socket @" << u << " DELETED. Checking muxer."); +} - if (mid == -1) +// decrease multiplexer reference count, and remove it if necessary +// [[using locked(m_GlobControlLock)]] +void srt::CUDTUnited::removeMux(CUDTSocket* s) +{ + int mid = s->m_iMuxID; + if (mid == -1) // Ignore those already removed { - HLOGC(smlog.Debug, log << "GC/removeSocket: no muxer found, finishing."); + HLOGC(smlog.Debug, log << "removeMux: @" << s->m_SocketID << " has no muxer, ok."); return; } + // In case when the socket isn't to be immediately deleted + // the MuxID field must be updated in order to catch the above + // condition when it's called for the same socket second time. + // --- s->m_iMuxID = -1; + // --- s->core().m_pRcvQueue = NULL; + // --- s->core().m_pSndQueue = NULL; + map::iterator m; m = m_mMultiplexer.find(mid); if (m == m_mMultiplexer.end()) { - LOGC(smlog.Fatal, log << "IPE: For socket @" << u << " MUXER id=" << mid << " NOT FOUND!"); + LOGC(smlog.Fatal, log << "IPE: For socket @" << s->m_SocketID << " MUXER id=" << mid << " NOT FOUND!"); return; } CMultiplexer& mx = m->second; mx.m_iRefCount--; - HLOGC(smlog.Debug, log << "unrefing underlying muxer " << mid << " for @" << u << ", ref=" << mx.m_iRefCount); - if (0 == mx.m_iRefCount) + HLOGC(smlog.Debug, log << "unrefing underlying muxer " << mid << " for @" << s->m_SocketID << ", ref=" << mx.m_iRefCount); + if (mx.m_iRefCount <= 0) { - HLOGC(smlog.Debug, - log << "MUXER id=" << mid << " lost last socket @" << u << " - deleting muxer bound to port " - << mx.m_pChannel->bindAddressAny().hport()); + HLOGC(smlog.Debug, log << "MUXER id=" << mid << " lost last socket @" + << s->m_SocketID << " - deleting muxer bound to " + << mx.m_pChannel->bindAddressAny().str()); // The channel has no access to the queues and // it looks like the multiplexer is the master of all of them. // The queues must be silenced before closing the channel @@ -2821,6 +2851,54 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u) mx.destroy(); m_mMultiplexer.erase(m); } + else + { + HLOGC(smlog.Debug, log << "MUXER id=" << mid << " has still " << mx.m_iRefCount << " users"); + } +} + +void srt::CUDTUnited::killMux(CUDTSocket* s) +{ + int mid = s->m_iMuxID; + if (mid == -1) // Ignore those already removed + { + HLOGC(smlog.Debug, log << "removeMux: @" << s->m_SocketID << " has no muxer, ok."); + return; + } + + // In case when the socket isn't to be immediately deleted + // the MuxID field must be updated in order to catch the above + // condition when it's called for the same socket second time. + // --- s->m_iMuxID = -1; + // --- s->core().m_pRcvQueue = NULL; + // --- s->core().m_pSndQueue = NULL; + + map::iterator m; + m = m_mMultiplexer.find(mid); + if (m == m_mMultiplexer.end()) + { + LOGC(smlog.Fatal, log << "IPE: For socket @" << s->m_SocketID << " MUXER id=" << mid << " NOT FOUND!"); + return; + } + + CMultiplexer& mx = m->second; + + // Ok, careful now. First, check if you have exactly ONE + // user left. Note that due to the mutex lock, all pending + // binders will have to wait. + + if (mx.m_iRefCount != 1) + return; + + // Ok, we are not going to remove the binder YET, this will + // have to happen normally as usual. But we need to close the + // socket right now, so we order the queue workers to quit. + mx.m_pSndQueue->setClosing(); + mx.m_pRcvQueue->setClosing(); + CGlobEvent::triggerEvent(); + mx.m_pChannel->close(); + mx.m_pSndQueue->stopWorker(); + mx.m_pRcvQueue->stopWorker(); } void srt::CUDTUnited::configureMuxer(CMultiplexer& w_m, const CUDTSocket* s, int af) @@ -3307,7 +3385,11 @@ void* srt::CUDTUnited::garbageCollect(void* p) s->removeFromGroup(false); } #endif - self->m_ClosedSockets[i->first] = s; + + // NOTE: not removing the socket from m_Sockets. + // This is a loop over m_Sockets and after this loop ends, + // this whole container will be cleared. + self->swipeSocket_LOCKED(i->first, s, self->SWIPE_LATER); // remove from listener's queue sockets_t::iterator ls = self->m_Sockets.find(s->m_ListenSocket); diff --git a/srtcore/api.h b/srtcore/api.h index 9ba77d23a..a5d9e3797 100644 --- a/srtcore/api.h +++ b/srtcore/api.h @@ -256,6 +256,15 @@ class CUDTUnited /// @return The new UDT socket ID, or INVALID_SOCK. SRTSOCKET newSocket(CUDTSocket** pps = NULL); + enum SwipeSocketTerm { SWIPE_NOW = 0, SWIPE_LATER = 1 }; + /// Removes the socket from the global socket container + /// and place it in the socket trashcan. The socket should + /// remain there until all still pending activities are + /// finished and there are no more users of this socket. + /// Note that the swiped socket is no longer dispatchable + /// by id. + void swipeSocket_LOCKED(SRTSOCKET id, CUDTSocket* s, SwipeSocketTerm); + /// Create (listener-side) a new socket associated with the incoming connection request. /// @param [in] listen the listening socket ID. /// @param [in] peer peer address. @@ -446,6 +455,10 @@ class CUDTUnited #endif void updateMux(CUDTSocket* s, const sockaddr_any& addr, const UDPSOCKET* = NULL); bool updateListenerMux(CUDTSocket* s, const CUDTSocket* ls); + void removeMux(CUDTSocket* s); + + // m_GlobControlLock must be NOT locked to call this. + void killMux(CUDTSocket* s); // Utility functions for updateMux void configureMuxer(CMultiplexer& w_m, const CUDTSocket* s, int af); diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 873b6ad6f..c2753e766 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -947,17 +947,10 @@ void srt::CUDT::open() // structures for queue if (m_pSNode == NULL) - m_pSNode = new CSNode; - m_pSNode->m_pUDT = this; - m_pSNode->m_tsTimeStamp = steady_clock::now(); - m_pSNode->m_iHeapLoc = -1; + m_pSNode = new CSNode(this, steady_clock::now()); if (m_pRNode == NULL) - m_pRNode = new CRNode; - m_pRNode->m_pUDT = this; - m_pRNode->m_tsTimeStamp = steady_clock::now(); - m_pRNode->m_pPrev = m_pRNode->m_pNext = NULL; - m_pRNode->m_bOnList = false; + m_pRNode = new CRNode(this, steady_clock::now()); // Set initial values of smoothed RTT and RTT variance. m_iSRTT = INITIAL_RTT; @@ -4860,7 +4853,6 @@ EConnectStatus srt::CUDT::postConnect(const CPacket* pResponse, bool rendezvous, m_bConnected = true; // register this socket for receiving data packets - m_pRNode->m_bOnList = true; m_pRcvQueue->setNewEntry(this); } @@ -5848,7 +5840,6 @@ void srt::CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any& m_bConnected = true; // Register this socket for receiving data packets. - m_pRNode->m_bOnList = true; m_pRcvQueue->setNewEntry(this); // Save the handshake in m_ConnRes in case when needs repeating. @@ -6328,6 +6319,8 @@ bool srt::CUDT::closeInternal() m_tsRcvPeerStartTime = steady_clock::time_point(); m_bOpened = false; + m_bConnected = false; + m_bConnecting = false; return true; } diff --git a/srtcore/queue.cpp b/srtcore/queue.cpp index 863148b34..7740b97cf 100644 --- a/srtcore/queue.cpp +++ b/srtcore/queue.cpp @@ -434,6 +434,34 @@ srt::CSndQueue::~CSndQueue() delete m_pSndUList; } +void srt::CSndQueue::stopWorker() +{ + // We use the decent way, so we say to the thread "please exit". + m_bClosing = true; + + // Sanity check of the function's affinity. + if (srt::sync::this_thread::get_id() == m_WorkerThread.get_id()) + { + LOGC(rslog.Error, log << "IPE: SndQ:WORKER TRIES TO CLOSE ITSELF!"); + return; // do nothing else, this would cause a hangup or crash. + } + + if (m_pTimer != NULL) + { + m_pTimer->interrupt(); + } + + // Unblock CSndQueue worker thread if it is waiting. + m_pSndUList->signalInterrupt(); + + if (m_WorkerThread.joinable()) + { + HLOGC(rslog.Debug, log << "SndQueue: EXIT (forced)"); + m_WorkerThread.join(); + } +} + + int srt::CSndQueue::ioctlQuery(int type) const { return m_pChannel->ioctlQuery(type); @@ -625,9 +653,21 @@ srt::CRcvUList::~CRcvUList() {} void srt::CRcvUList::insert(const CUDT* u) { CRNode* n = u->m_pRNode; + + // The NODE is builtin into u, initially with m_bOnList = false. + // This should be set to true when the node is really on this list. + // Therefore check this because it may be that it can potentially + // already be on some list already. + if (n->m_bOnList) + { + LOGC(cnlog.Error, log << u->CONID() << " being inserted into the list is already on one!"); + return; // do not add, this would cause data mess + } + n->m_tsTimeStamp = steady_clock::now(); + n->m_bOnList = true; - if (NULL == m_pUList) + if (!m_pUList) { // empty list, insert as the single node n->m_pPrev = n->m_pNext = NULL; @@ -672,6 +712,7 @@ void srt::CRcvUList::remove(const CUDT* u) } n->m_pNext = n->m_pPrev = NULL; + n->m_bOnList = false; } void srt::CRcvUList::update(const CUDT* u) @@ -1281,6 +1322,8 @@ void* srt::CRcvQueue::worker(void* param) << "CChannel reported ERROR DURING TRANSMISSION - IPE. INTERRUPTING worker anyway."); } cst = CONN_REJECT; + + // All sockets will be broken, there's no point in doing updateConnStatus break; } // OTHERWISE: this is an "AGAIN" situation. No data was read, but the process should continue. @@ -1306,7 +1349,6 @@ void* srt::CRcvQueue::worker(void* param) // the socket must be removed from Hash table first, then RcvUList self->m_pHash->remove(u->m_SocketID); self->m_pRcvUList->remove(u); - u->m_pRNode->m_bOnList = false; } ul = self->m_pRcvUList->m_pUList; @@ -1331,6 +1373,23 @@ void* srt::CRcvQueue::worker(void* param) // however there's still m_mBuffer in CRcvQueue for that socket to care about. } + // As this thread is going to quit now, make sure that all sockets + // have been taken out the queue. + CRNode* ul = self->m_pRcvUList->m_pUList; + while (ul) + { + CUDT* u = ul->m_pUDT; + + HLOGC(qrlog.Debug, + log << CUDTUnited::CONID(u->m_SocketID) << " due to RcvQ:worker exit, REMOVING the socket FROM RCV QUEUE/MAP."); + // the socket must be removed from Hash table first, then RcvUList + self->m_pHash->remove(u->m_SocketID); + self->m_pRcvUList->remove(u); + + ul = self->m_pRcvUList->m_pUList; + } + + HLOGC(qrlog.Debug, log << "worker: EXIT"); THREAD_EXIT(); diff --git a/srtcore/queue.h b/srtcore/queue.h index dd68a7721..68f2e7689 100644 --- a/srtcore/queue.h +++ b/srtcore/queue.h @@ -137,10 +137,21 @@ class CUnitQueue struct CSNode { +private: CUDT* m_pUDT; // Pointer to the instance of CUDT socket sync::steady_clock::time_point m_tsTimeStamp; sync::atomic m_iHeapLoc; // location on the heap, -1 means not on the heap + + friend class CSndUList; + +public: + CSNode(CUDT* u, const sync::steady_clock::time_point& time) + :m_pUDT(u), m_tsTimeStamp(time), m_iHeapLoc(-1) + { + } + + bool isOnList() const { return m_iHeapLoc != -1; } }; class CSndUList @@ -221,6 +232,7 @@ class CSndUList struct CRNode { +private: CUDT* m_pUDT; // Pointer to the instance of CUDT socket sync::steady_clock::time_point m_tsTimeStamp; // Time Stamp @@ -228,6 +240,18 @@ struct CRNode CRNode* m_pNext; // next link sync::atomic m_bOnList; // if the node is already on the list + + friend class CRcvUList; + friend class CRcvQueue; + +public: + + CRNode(CUDT* u, const sync::steady_clock::time_point& time) + :m_pUDT(u), m_tsTimeStamp(time), m_pPrev(NULL), m_pNext(NULL), m_bOnList(false) + { + } + + bool isOnList() const { return m_bOnList; } }; class CRcvUList @@ -438,6 +462,7 @@ class CSndQueue int ioctlQuery(int type) const; int sockoptQuery(int level, int type) const; + void stopWorker(); void setClosing() { m_bClosing = true; } private: diff --git a/test/test_reuseaddr.cpp b/test/test_reuseaddr.cpp index fe9027311..06ef01d28 100644 --- a/test/test_reuseaddr.cpp +++ b/test/test_reuseaddr.cpp @@ -700,3 +700,57 @@ TEST(ReuseAddr, ProtocolVersionFaux6) (void)srt_epoll_release(client_pollid); (void)srt_epoll_release(server_pollid); } + +TEST(ReuseAddr, QuickClose) +{ + srt::TestInit srtinit; + + client_pollid = srt_epoll_create(); + ASSERT_NE(SRT_ERROR, client_pollid); + + server_pollid = srt_epoll_create(); + ASSERT_NE(SRT_ERROR, server_pollid); + + + SRTSOCKET bindsock_1 = createBinder("127.0.0.1", 5000, true); + SRTSOCKET bindsock_2 = createListener("127.0.0.1", 5000, true); + + testAccept(bindsock_2, "127.0.0.1", 5000, true); + + std::thread s1(shutdownListener, bindsock_1); + std::thread s2(shutdownListener, bindsock_2); + + s1.join(); + s2.join(); + + SRTSOCKET endpoint = createListener("127.0.0.1", 5001, true); + + for (int i = 0; i < 10; ++i) + { + SRTSOCKET next_binder = prepareSocket(); + EXPECT_NE(next_binder, SRT_INVALID_SOCK); + + int no = 0; + EXPECT_NE(srt_setsockflag(next_binder, SRTO_REUSEADDR, &no, sizeof no), SRT_ERROR); + + bool succeed = bindSocket(next_binder, "127.0.0.1", 5000, true); + + sockaddr_any endsa = srt::CreateAddr("127.0.0.1", 5001, AF_INET); + EXPECT_NE(srt_connect(next_binder, endsa.get(), endsa.size()), SRT_INVALID_SOCK); + + SRT_EPOLL_EVENT ev[2]; + EXPECT_NE(srt_epoll_uwait(server_pollid, ev, 2, 1000), SRT_ERROR); + + SRTSOCKET accepted = srt_accept(endpoint, 0, 0); + EXPECT_NE(accepted, SRT_INVALID_SOCK); + + srt_close(next_binder); + srt_close(accepted); + + if (!succeed || accepted == SRT_INVALID_SOCK) + break; + } + + (void)srt_epoll_release(client_pollid); + (void)srt_epoll_release(server_pollid); +}