Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Disconnect dead sockets from the multiplexer to free unused resources #1829

Draft
wants to merge 11 commits into
base: master
Choose a base branch
from
130 changes: 106 additions & 24 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ srt::CUDTUnited::CUDTUnited()
, m_InitLock()
, m_iInstanceCount(0)
, m_bGCStatus(false)
, m_ClosedSockets()
{
// Socket ID MUST start from a random value
m_SocketIDGenerator = genRandomInt(1, MAX_SOCKET_VAL);
Expand Down Expand Up @@ -469,6 +468,19 @@ SRTSOCKET srt::CUDTUnited::newSocket(CUDTSocket** pps)
return ns->m_SocketID;
}

// [[using locked(m_GlobControlLock)]]
void srt::CUDTUnited::swipeSocket_LOCKED(SRTSOCKET id, CUDTSocket* s, CUDTUnited::SwipeSocketTerm lateremove)
{
s->core().m_bConnected = false;
s->core().m_bConnecting = false;

m_ClosedSockets[id] = s;
if (!lateremove)
{
m_Sockets.erase(id);
}
}

int srt::CUDTUnited::newConnection(const SRTSOCKET listen,
const sockaddr_any& peer,
const CPacket& hspkt,
Expand Down Expand Up @@ -833,8 +845,7 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen,
ns->removeFromGroup(true);
}
#endif
m_Sockets.erase(id);
m_ClosedSockets[id] = ns;
swipeSocket_LOCKED(id, ns, SWIPE_NOW);
}

return -1;
Expand Down Expand Up @@ -2038,13 +2049,16 @@ int srt::CUDTUnited::close(CUDTSocket* s)
}
#endif

m_Sockets.erase(s->m_SocketID);
m_ClosedSockets[s->m_SocketID] = s;
swipeSocket_LOCKED(s->m_SocketID, s, SWIPE_NOW);
HLOGC(smlog.Debug, log << "@" << u << "U::close: Socket MOVED TO CLOSED for collecting later.");

CGlobEvent::triggerEvent();
}

// Force the worker threads to exit and all active status cleared,
// without explicitly removing the muxer, only if the muxer is unique-owned.
killMux(s);

HLOGC(smlog.Debug, log << "@" << u << ": GLOBAL: CLOSING DONE");

// Check if the ID is still in closed sockets before you access it
Expand Down Expand Up @@ -2642,7 +2656,9 @@ void srt::CUDTUnited::checkBrokenSockets()
// close broken connections and start removal timer
s->setClosed();
tbc.push_back(i->first);
m_ClosedSockets[i->first] = s;

// NOTE: removal from m_SocketID POSTPONED.
swipeSocket_LOCKED(i->first, s, SWIPE_LATER);

// remove from listener's queue
sockets_t::iterator ls = m_Sockets.find(s->m_ListenSocket);
Expand Down Expand Up @@ -2681,7 +2697,7 @@ void srt::CUDTUnited::checkBrokenSockets()
if (closed_ago > seconds_from(1))
{
CRNode* rnode = j->second->core().m_pRNode;
if (!rnode || !rnode->m_bOnList)
if (!rnode || !rnode->isOnList())
{
HLOGC(smlog.Debug,
log << "checkBrokenSockets: @" << j->second->m_SocketID << " closed "
Expand All @@ -2690,6 +2706,11 @@ void srt::CUDTUnited::checkBrokenSockets()
// HLOGC(smlog.Debug, log << "will unref socket: " << j->first);
tbr.push_back(j->first);
}
else
{
HLOGC(smlog.Debug,
log << "checkBrokenSockets: @" << j->second->m_SocketID << " still on the MX RcvUList - will retry...");
}
}
}

Expand Down Expand Up @@ -2720,11 +2741,11 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u)
// threads. If that's the case, SKIP IT THIS TIME. The
// socket will be checked next time the GC rollover starts.
CSNode* sn = s->core().m_pSNode;
if (sn && sn->m_iHeapLoc != -1)
if (sn && sn->isOnList())
return;

CRNode* rn = s->core().m_pRNode;
if (rn && rn->m_bOnList)
if (rn && rn->isOnList())
return;

#if ENABLE_BONDING
Expand All @@ -2735,9 +2756,6 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u)
s->removeFromGroup(true);
}
#endif
// decrease multiplexer reference count, and remove it if necessary
const int mid = s->m_iMuxID;

{
ScopedLock cg(s->m_AcceptLock);

Expand All @@ -2758,8 +2776,7 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u)
CUDTSocket* as = si->second;

as->breakSocket_LOCKED();
m_ClosedSockets[*q] = as;
m_Sockets.erase(*q);
swipeSocket_LOCKED(*q, as, SWIPE_NOW);
}
}

Expand All @@ -2784,33 +2801,46 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u)

HLOGC(smlog.Debug, log << "GC/removeSocket: closing associated UDT @" << u);
s->core().closeInternal();
removeMux(s);
HLOGC(smlog.Debug, log << "GC/removeSocket: DELETING SOCKET @" << u);
delete s;
HLOGC(smlog.Debug, log << "GC/removeSocket: socket @" << u << " DELETED. Checking muxer.");
}

if (mid == -1)
// decrease multiplexer reference count, and remove it if necessary
// [[using locked(m_GlobControlLock)]]
void srt::CUDTUnited::removeMux(CUDTSocket* s)
{
int mid = s->m_iMuxID;
if (mid == -1) // Ignore those already removed
{
HLOGC(smlog.Debug, log << "GC/removeSocket: no muxer found, finishing.");
HLOGC(smlog.Debug, log << "removeMux: @" << s->m_SocketID << " has no muxer, ok.");
return;
}

// In case when the socket isn't to be immediately deleted
// the MuxID field must be updated in order to catch the above
// condition when it's called for the same socket second time.
// --- s->m_iMuxID = -1;
// --- s->core().m_pRcvQueue = NULL;
// --- s->core().m_pSndQueue = NULL;

map<int, CMultiplexer>::iterator m;
m = m_mMultiplexer.find(mid);
if (m == m_mMultiplexer.end())
{
LOGC(smlog.Fatal, log << "IPE: For socket @" << u << " MUXER id=" << mid << " NOT FOUND!");
LOGC(smlog.Fatal, log << "IPE: For socket @" << s->m_SocketID << " MUXER id=" << mid << " NOT FOUND!");
return;
}

CMultiplexer& mx = m->second;

mx.m_iRefCount--;
HLOGC(smlog.Debug, log << "unrefing underlying muxer " << mid << " for @" << u << ", ref=" << mx.m_iRefCount);
if (0 == mx.m_iRefCount)
HLOGC(smlog.Debug, log << "unrefing underlying muxer " << mid << " for @" << s->m_SocketID << ", ref=" << mx.m_iRefCount);
if (mx.m_iRefCount <= 0)
{
HLOGC(smlog.Debug,
log << "MUXER id=" << mid << " lost last socket @" << u << " - deleting muxer bound to port "
<< mx.m_pChannel->bindAddressAny().hport());
HLOGC(smlog.Debug, log << "MUXER id=" << mid << " lost last socket @"
<< s->m_SocketID << " - deleting muxer bound to "
<< mx.m_pChannel->bindAddressAny().str());
// The channel has no access to the queues and
// it looks like the multiplexer is the master of all of them.
// The queues must be silenced before closing the channel
Expand All @@ -2821,6 +2851,54 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u)
mx.destroy();
m_mMultiplexer.erase(m);
}
else
{
HLOGC(smlog.Debug, log << "MUXER id=" << mid << " has still " << mx.m_iRefCount << " users");
}
}

void srt::CUDTUnited::killMux(CUDTSocket* s)
{
int mid = s->m_iMuxID;
if (mid == -1) // Ignore those already removed
{
HLOGC(smlog.Debug, log << "removeMux: @" << s->m_SocketID << " has no muxer, ok.");
return;
}

// In case when the socket isn't to be immediately deleted
// the MuxID field must be updated in order to catch the above
// condition when it's called for the same socket second time.
// --- s->m_iMuxID = -1;
// --- s->core().m_pRcvQueue = NULL;
// --- s->core().m_pSndQueue = NULL;

map<int, CMultiplexer>::iterator m;
m = m_mMultiplexer.find(mid);
if (m == m_mMultiplexer.end())
{
LOGC(smlog.Fatal, log << "IPE: For socket @" << s->m_SocketID << " MUXER id=" << mid << " NOT FOUND!");
return;
}

CMultiplexer& mx = m->second;

// Ok, careful now. First, check if you have exactly ONE
// user left. Note that due to the mutex lock, all pending
// binders will have to wait.

if (mx.m_iRefCount != 1)
return;

// Ok, we are not going to remove the binder YET, this will
// have to happen normally as usual. But we need to close the
// socket right now, so we order the queue workers to quit.
mx.m_pSndQueue->setClosing();
mx.m_pRcvQueue->setClosing();
CGlobEvent::triggerEvent();
mx.m_pChannel->close();
mx.m_pSndQueue->stopWorker();
mx.m_pRcvQueue->stopWorker();
}

void srt::CUDTUnited::configureMuxer(CMultiplexer& w_m, const CUDTSocket* s, int af)
Expand Down Expand Up @@ -3307,7 +3385,11 @@ void* srt::CUDTUnited::garbageCollect(void* p)
s->removeFromGroup(false);
}
#endif
self->m_ClosedSockets[i->first] = s;

// NOTE: not removing the socket from m_Sockets.
// This is a loop over m_Sockets and after this loop ends,
// this whole container will be cleared.
self->swipeSocket_LOCKED(i->first, s, self->SWIPE_LATER);

// remove from listener's queue
sockets_t::iterator ls = self->m_Sockets.find(s->m_ListenSocket);
Expand Down
13 changes: 13 additions & 0 deletions srtcore/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,15 @@ class CUDTUnited
/// @return The new UDT socket ID, or INVALID_SOCK.
SRTSOCKET newSocket(CUDTSocket** pps = NULL);

enum SwipeSocketTerm { SWIPE_NOW = 0, SWIPE_LATER = 1 };
/// Removes the socket from the global socket container
/// and place it in the socket trashcan. The socket should
/// remain there until all still pending activities are
/// finished and there are no more users of this socket.
/// Note that the swiped socket is no longer dispatchable
/// by id.
void swipeSocket_LOCKED(SRTSOCKET id, CUDTSocket* s, SwipeSocketTerm);

/// Create (listener-side) a new socket associated with the incoming connection request.
/// @param [in] listen the listening socket ID.
/// @param [in] peer peer address.
Expand Down Expand Up @@ -446,6 +455,10 @@ class CUDTUnited
#endif
void updateMux(CUDTSocket* s, const sockaddr_any& addr, const UDPSOCKET* = NULL);
bool updateListenerMux(CUDTSocket* s, const CUDTSocket* ls);
void removeMux(CUDTSocket* s);

// m_GlobControlLock must be NOT locked to call this.
void killMux(CUDTSocket* s);

// Utility functions for updateMux
void configureMuxer(CMultiplexer& w_m, const CUDTSocket* s, int af);
Expand Down
15 changes: 4 additions & 11 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -947,17 +947,10 @@ void srt::CUDT::open()

// structures for queue
if (m_pSNode == NULL)
m_pSNode = new CSNode;
m_pSNode->m_pUDT = this;
m_pSNode->m_tsTimeStamp = steady_clock::now();
m_pSNode->m_iHeapLoc = -1;
m_pSNode = new CSNode(this, steady_clock::now());

if (m_pRNode == NULL)
m_pRNode = new CRNode;
m_pRNode->m_pUDT = this;
m_pRNode->m_tsTimeStamp = steady_clock::now();
m_pRNode->m_pPrev = m_pRNode->m_pNext = NULL;
m_pRNode->m_bOnList = false;
m_pRNode = new CRNode(this, steady_clock::now());

// Set initial values of smoothed RTT and RTT variance.
m_iSRTT = INITIAL_RTT;
Expand Down Expand Up @@ -4860,7 +4853,6 @@ EConnectStatus srt::CUDT::postConnect(const CPacket* pResponse, bool rendezvous,
m_bConnected = true;

// register this socket for receiving data packets
m_pRNode->m_bOnList = true;
m_pRcvQueue->setNewEntry(this);
}

Expand Down Expand Up @@ -5848,7 +5840,6 @@ void srt::CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any&
m_bConnected = true;

// Register this socket for receiving data packets.
m_pRNode->m_bOnList = true;
m_pRcvQueue->setNewEntry(this);

// Save the handshake in m_ConnRes in case when needs repeating.
Expand Down Expand Up @@ -6328,6 +6319,8 @@ bool srt::CUDT::closeInternal()
m_tsRcvPeerStartTime = steady_clock::time_point();

m_bOpened = false;
m_bConnected = false;
m_bConnecting = false;

return true;
}
Expand Down
Loading