Skip to content

Separated PcapLiveDevice statistics update worker under its own class. #1785

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions Pcap++/header/PcapDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,18 @@ namespace pcpp

namespace internal
{
/// @struct PcapStats
/// A container for pcap device statistics
struct PcapStats
{
/// Number of packets received
uint64_t packetsRecv;
/// Number of packets dropped
uint64_t packetsDrop;
/// number of packets dropped by interface (not supported on all platforms)
uint64_t packetsDropByInterface;
};

/// @class PcapHandle
/// @brief A wrapper class for pcap_t* which is the libpcap packet capture descriptor.
/// This class is used to manage the lifecycle of the pcap_t* object
Expand Down Expand Up @@ -77,6 +89,14 @@ namespace pcpp
/// @return True if the filter was removed successfully or if no filter was set, false otherwise.
bool clearFilter();

/// @brief Retrieves statistics from the pcap handle.
///
/// The function internally calls pcap_stats() to retrieve the statistics and only works on live devices.
///
/// @param stats Structure to store the statistics.
/// @return True if the statistics were retrieved successfully, false otherwise.
bool getStatistics(PcapStats& stats) const;

/// @return True if the handle is not null, false otherwise.
explicit operator bool() const noexcept
{
Expand Down Expand Up @@ -110,17 +130,7 @@ namespace pcpp
{}

public:
/// @struct PcapStats
/// A container for pcap device statistics
struct PcapStats
{
/// Number of packets received
uint64_t packetsRecv;
/// Number of packets dropped
uint64_t packetsDrop;
/// number of packets dropped by interface (not supported on all platforms)
uint64_t packetsDropByInterface;
};
using PcapStats = internal::PcapStats;

virtual ~IPcapDevice();

Expand Down
53 changes: 47 additions & 6 deletions Pcap++/header/PcapLiveDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,50 @@ namespace pcpp
bool isLoopback;
};

/// @brief A worker thread that periodically calls the provided callback with updated statistics.
class StatisticsUpdateWorker
{
public:
/// @brief Constructs and starts a worker thread that periodically calls the provided callback with updated
/// statistics.
/// @param pcapDevice A pointer to the PcapLiveDevice instance to be monitored.
/// @param onStatsUpdateCallback A callback function to be called with updated statistics.
/// @param m_cbOnStatsUpdateUserCookie A user-defined pointer that is passed to the callback function.
/// @param updateIntervalMs The interval in milliseconds between each callback invocation.
StatisticsUpdateWorker(PcapLiveDevice const& pcapDevice, OnStatsUpdateCallback onStatsUpdateCallback,
void* m_cbOnStatsUpdateUserCookie = nullptr, unsigned int updateIntervalMs = 1000);

bool isRunning() const
{
// TODO: A thread that has finished executing will be joinable, before join is called.
return m_WorkerThread.joinable();
}

/// @brief Stops the worker thread.
void stopWorker();

private:
struct ThreadData
{
PcapLiveDevice const* m_PcapDevice = nullptr;
OnStatsUpdateCallback m_cbOnStatsUpdate;
void* m_cbOnStatsUpdateUserCookie = nullptr;
unsigned int m_updateIntervalMs = 1000; // Default update interval is 1 second
};

struct SharedThreadData
{
std::atomic_bool m_stopRequested{ false };
};

/// @brief Main function for the worker thread.
/// @remarks This function is static to allow the worker class to be movable.
static void workerMain(std::shared_ptr<SharedThreadData> sharedThreadData, ThreadData threadData);

std::shared_ptr<SharedThreadData> m_sharedThreadData;
std::thread m_WorkerThread;
};

// This is a second descriptor for the same device. It is needed because of a bug
// that occurs in libpcap on Linux (on Windows using WinPcap/Npcap it works well):
// It's impossible to capture packets sent by the same descriptor
Expand All @@ -94,8 +138,9 @@ namespace pcpp
MacAddress m_MacAddress;
IPv4Address m_DefaultGateway;
std::thread m_CaptureThread;
std::thread m_StatsThread;
bool m_StatsThreadStarted;

// TODO: Cpp17 Using std::optional might be better here
std::unique_ptr<StatisticsUpdateWorker> m_StatisticsUpdateWorker;

// Should be set to true by the Caller for the Callee
std::atomic<bool> m_StopThread;
Expand All @@ -104,11 +149,8 @@ namespace pcpp

OnPacketArrivesCallback m_cbOnPacketArrives;
void* m_cbOnPacketArrivesUserCookie;
OnStatsUpdateCallback m_cbOnStatsUpdate;
void* m_cbOnStatsUpdateUserCookie;
OnPacketArrivesStopBlocking m_cbOnPacketArrivesBlockingMode;
void* m_cbOnPacketArrivesBlockingModeUserCookie;
int m_IntervalToUpdateStats;
RawPacketVector* m_CapturedPackets;
bool m_CaptureCallbackMode;
LinkLayerType m_LinkType;
Expand All @@ -128,7 +170,6 @@ namespace pcpp

// threads
void captureThreadMain();
void statsThreadMain();

static void onPacketArrives(uint8_t* user, const struct pcap_pkthdr* pkthdr, const uint8_t* packet);
static void onPacketArrivesNoCallback(uint8_t* user, const struct pcap_pkthdr* pkthdr, const uint8_t* packet);
Expand Down
21 changes: 21 additions & 0 deletions Pcap++/src/PcapDevice.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,27 @@ namespace pcpp
{
return setFilter("");
}

bool PcapHandle::getStatistics(PcapStats& stats) const
{
if (!isValid())
{
PCPP_LOG_ERROR("Cannot get stats from invalid handle");
return false;
}

pcap_stat pcapStats;
if (pcap_stats(m_PcapDescriptor, &pcapStats) < 0)
{
PCPP_LOG_ERROR("Error getting stats. Error message is: " << getLastError());
return false;
}

stats.packetsRecv = pcapStats.ps_recv;
stats.packetsDrop = pcapStats.ps_drop;
stats.packetsDropByInterface = pcapStats.ps_ifdrop;
return true;
}
} // namespace internal

IPcapDevice::~IPcapDevice()
Expand Down
110 changes: 73 additions & 37 deletions Pcap++/src/PcapLiveDevice.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,68 @@ namespace pcpp
}
}

PcapLiveDevice::StatisticsUpdateWorker::StatisticsUpdateWorker(PcapLiveDevice const& pcapDevice,
OnStatsUpdateCallback onStatsUpdateCallback,
void* m_cbOnStatsUpdateUserCookie,
unsigned int updateIntervalMs)
{
// Setup thread data
m_sharedThreadData = std::make_shared<SharedThreadData>();

ThreadData threadData;
threadData.m_PcapDevice = &pcapDevice;
threadData.m_cbOnStatsUpdate = onStatsUpdateCallback;
threadData.m_cbOnStatsUpdateUserCookie = m_cbOnStatsUpdateUserCookie;
threadData.m_updateIntervalMs = updateIntervalMs;

// Start the thread
m_WorkerThread = std::thread(&StatisticsUpdateWorker::workerMain, m_sharedThreadData, std::move(threadData));
}

void PcapLiveDevice::StatisticsUpdateWorker::stopWorker()
{
m_sharedThreadData->m_stopRequested = true;
if (m_WorkerThread.joinable())
{
m_WorkerThread.join();
}
}

void PcapLiveDevice::StatisticsUpdateWorker::workerMain(std::shared_ptr<SharedThreadData> sharedThreadData,
ThreadData threadData)
{
if (sharedThreadData == nullptr)
{
PCPP_LOG_ERROR("Shared thread data is null");
return;
}

if (threadData.m_PcapDevice == nullptr)
{
PCPP_LOG_ERROR("Pcap device is null");
return;
}

if (threadData.m_cbOnStatsUpdate == nullptr)
{
PCPP_LOG_ERROR("Statistics Callback is null");
return;
}

PCPP_LOG_DEBUG("Started statistics thread");

PcapStats stats;
auto sleepDuration = std::chrono::milliseconds(threadData.m_updateIntervalMs);
while (!sharedThreadData->m_stopRequested)
{
threadData.m_PcapDevice->getStatistics(stats);
threadData.m_cbOnStatsUpdate(stats, threadData.m_cbOnStatsUpdateUserCookie);
std::this_thread::sleep_for(sleepDuration);
}

PCPP_LOG_DEBUG("Stopped statistics thread");
}

PcapLiveDevice::PcapLiveDevice(DeviceInterfaceDetails interfaceDetails, bool calculateMTU, bool calculateMacAddress,
bool calculateDefaultGateway)
: IPcapDevice(), m_PcapSendDescriptor(nullptr), m_PcapSelectableFd(-1),
Expand Down Expand Up @@ -266,17 +328,12 @@ namespace pcpp

// init all other members
m_CaptureThreadStarted = false;
m_StatsThreadStarted = false;
m_StopThread = false;
m_CaptureThread = {};
m_StatsThread = {};
m_cbOnPacketArrives = nullptr;
m_cbOnStatsUpdate = nullptr;
m_cbOnPacketArrivesBlockingMode = nullptr;
m_cbOnPacketArrivesBlockingModeUserCookie = nullptr;
m_IntervalToUpdateStats = 0;
m_cbOnPacketArrivesUserCookie = nullptr;
m_cbOnStatsUpdateUserCookie = nullptr;
m_CaptureCallbackMode = true;
m_CapturedPackets = nullptr;
if (calculateMacAddress)
Expand Down Expand Up @@ -366,19 +423,6 @@ namespace pcpp
PCPP_LOG_DEBUG("Ended capture thread for device '" << m_InterfaceDetails.name << "'");
}

void PcapLiveDevice::statsThreadMain()
{
PCPP_LOG_DEBUG("Started stats thread for device '" << m_InterfaceDetails.name << "'");
while (!m_StopThread)
{
PcapStats stats;
getStatistics(stats);
m_cbOnStatsUpdate(stats, m_cbOnStatsUpdateUserCookie);
std::this_thread::sleep_for(std::chrono::seconds(m_IntervalToUpdateStats));
}
PCPP_LOG_DEBUG("Ended stats thread for device '" << m_InterfaceDetails.name << "'");
}

pcap_t* PcapLiveDevice::doOpen(const DeviceConfiguration& config)
{
char errbuf[PCAP_ERRBUF_SIZE] = { '\0' };
Expand Down Expand Up @@ -606,8 +650,6 @@ namespace pcpp
return false;
}

m_IntervalToUpdateStats = intervalInSecondsToUpdateStats;

m_CaptureCallbackMode = true;
m_cbOnPacketArrives = std::move(onPacketArrives);
m_cbOnPacketArrivesUserCookie = onPacketArrivesUserCookie;
Expand All @@ -625,12 +667,12 @@ namespace pcpp

if (onStatsUpdate != nullptr && intervalInSecondsToUpdateStats > 0)
{
m_cbOnStatsUpdate = std::move(onStatsUpdate);
m_cbOnStatsUpdateUserCookie = onStatsUpdateUserCookie;
m_StatsThread = std::thread(&pcpp::PcapLiveDevice::statsThreadMain, this);
m_StatsThreadStarted = true;
PCPP_LOG_DEBUG("Successfully created stats thread for device '"
<< m_InterfaceDetails.name << "'. Thread id: " << m_StatsThread.get_id());
// Due to passing a this pointer, the current device object shouldn't be relocated, while toe worker is
// active.
m_StatisticsUpdateWorker = std::unique_ptr<StatisticsUpdateWorker>(new StatisticsUpdateWorker(
*this, std::move(onStatsUpdate), onStatsUpdateUserCookie, intervalInSecondsToUpdateStats * 1000));

PCPP_LOG_DEBUG("Successfully created stats thread for device '" << m_InterfaceDetails.name << "'.");
}

return true;
Expand Down Expand Up @@ -684,9 +726,7 @@ namespace pcpp
}

m_cbOnPacketArrives = nullptr;
m_cbOnStatsUpdate = nullptr;
m_cbOnPacketArrivesUserCookie = nullptr;
m_cbOnStatsUpdateUserCookie = nullptr;

m_cbOnPacketArrivesBlockingMode = std::move(onPacketArrives);
m_cbOnPacketArrivesBlockingModeUserCookie = userCookie;
Expand Down Expand Up @@ -805,11 +845,12 @@ namespace pcpp
PCPP_LOG_DEBUG("Capture thread stopped for device '" << m_InterfaceDetails.name << "'");
}
PCPP_LOG_DEBUG("Capture thread stopped for device '" << m_InterfaceDetails.name << "'");
if (m_StatsThreadStarted)

if (m_StatisticsUpdateWorker != nullptr)
{
PCPP_LOG_DEBUG("Stopping stats thread, waiting for it to join...");
m_StatsThread.join();
m_StatsThreadStarted = false;
m_StatisticsUpdateWorker->stopWorker();
m_StatisticsUpdateWorker.reset();
PCPP_LOG_DEBUG("Stats thread stopped for device '" << m_InterfaceDetails.name << "'");
}

Expand All @@ -823,15 +864,10 @@ namespace pcpp

void PcapLiveDevice::getStatistics(PcapStats& stats) const
{
pcap_stat pcapStats;
if (pcap_stats(m_PcapDescriptor.get(), &pcapStats) < 0)
if (!m_PcapDescriptor.getStatistics(stats))
{
PCPP_LOG_ERROR("Error getting statistics from live device '" << m_InterfaceDetails.name << "'");
}

stats.packetsRecv = pcapStats.ps_recv;
stats.packetsDrop = pcapStats.ps_drop;
stats.packetsDropByInterface = pcapStats.ps_ifdrop;
}

bool PcapLiveDevice::doMtuCheck(int packetPayloadLength) const
Expand Down
Loading