Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
erjiaqing committed Nov 16, 2021
1 parent 527cd7d commit 6924534
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 40 deletions.
1 change: 1 addition & 0 deletions src/app/ReadHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ CHIP_ERROR ReadHandler::OnStatusResponse(Messaging::ExchangeContext * apExchange
{
mpExchangeCtx->WillSendMessage();
}
// Trigger ReportingEngine run for sending next chunk of data.
SuccessOrExit(err = InteractionModelEngine::GetInstance()->GetReportingEngine().ScheduleRun());
break;
case HandlerState::AwaitingReportResponse:
Expand Down
9 changes: 5 additions & 4 deletions src/app/ReadHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class ReadHandler : public Messaging::ExchangeDelegate
* Send ReportData to initiator
*
* @param[in] aPayload A payload that has read request data
* @param[in] aMoreChunks A flags indicating there will be more chunks for this read request
* @param[in] aMoreChunks A flags indicating there will be more chunks expected to be sent for this read request
*
* @retval #Others If fails to send report data
* @retval #CHIP_NO_ERROR On success.
Expand Down Expand Up @@ -135,7 +135,8 @@ class ReadHandler : public Messaging::ExchangeDelegate
AttributePathExpandIterator * GetAttributePathExpandIterator() { return &mAttributePathExpandIterator; }
void SetDirty()
{
mDirty = true;
mDirty = true;
// The interested path in this report is changed, make the path iterator ready to emit paths by reset it.
mAttributePathExpandIterator = AttributePathExpandIterator(mpAttributeClusterInfoList);
}
void ClearDirty() { mDirty = false; }
Expand All @@ -151,8 +152,8 @@ class ReadHandler : public Messaging::ExchangeDelegate
Initialized, ///< The handler has been initialized and is ready
GeneratingReports, ///< The handler has received either a Read or Subscribe request and is the process of generating a
///< report.
AwaitingChunkingResponse, ///< The handler just sent a report chunk and is waiting a status response.
AwaitingReportResponse, ///< The handler has sent the last report chunk to the client and is awaiting a status response.
AwaitingChunkingResponse, ///< The handler just sent a report chunked report and is awaiting a status response.
AwaitingReportResponse, ///< The handler has sent the last chunked report to the client and is awaiting a status response.
};

static void OnRefreshSubscribeTimerSyncCallback(System::Layer * apSystemLayer, void * apAppState);
Expand Down
70 changes: 43 additions & 27 deletions src/app/reporting/Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,15 @@ namespace app {
namespace reporting {
CHIP_ERROR Engine::Init()
{
mMoreChunkedMessages = false;
mNumReportsInFlight = 0;
mCurReadHandlerIdx = 0;
mNumReportsInFlight = 0;
mCurReadHandlerIdx = 0;
return CHIP_NO_ERROR;
}

void Engine::Shutdown()
{
mMoreChunkedMessages = false;
mNumReportsInFlight = 0;
mCurReadHandlerIdx = 0;
mNumReportsInFlight = 0;
mCurReadHandlerIdx = 0;
InteractionModelEngine::GetInstance()->ReleaseClusterInfoList(mpGlobalDirtySet);
mpGlobalDirtySet = nullptr;
}
Expand Down Expand Up @@ -91,27 +89,30 @@ Engine::RetrieveClusterData(FabricIndex aAccessingFabricIndex, AttributeReportIB
}

CHIP_ERROR Engine::BuildSingleReportDataAttributeReportIBs(ReportDataMessage::Builder & aReportDataBuilder,
ReadHandler * apReadHandler)
ReadHandler * apReadHandler, bool * apHasMoreChunks)
{
CHIP_ERROR err = CHIP_NO_ERROR;
bool attributeDataWritten = false;
bool hasMoreChunks = true;
TLV::TLVWriter backup;
aReportDataBuilder.Checkpoint(backup);
auto attributeReportIBs = aReportDataBuilder.CreateAttributeReportIBs();
SuccessOrExit(err = aReportDataBuilder.GetError());

{
ConcreteAttributePath path;
mMoreChunkedMessages = true;
for (; apReadHandler->GetAttributePathExpandIterator()->Get(path); apReadHandler->GetAttributePathExpandIterator()->Next())
ConcreteAttributePath readPath;

// For each path included in the interested path of the read handler...
for (; apReadHandler->GetAttributePathExpandIterator()->Get(readPath);
apReadHandler->GetAttributePathExpandIterator()->Next())
{
if (!apReadHandler->IsInitialReport())
{
bool concretePathDirty = false;
// TODO: Optimize this implementation by making the iterator only emit intersected paths.
for (auto dirtyPath = mpGlobalDirtySet; dirtyPath != nullptr; dirtyPath = dirtyPath->mpNext)
{
if (dirtyPath->IsPathIncluded(path))
if (dirtyPath->IsPathIncluded(readPath))
{
concretePathDirty = true;
break;
Expand All @@ -124,19 +125,23 @@ CHIP_ERROR Engine::BuildSingleReportDataAttributeReportIBs(ReportDataMessage::Bu
continue;
}
}
// If we are processing a read request, or the initial report of a subscription, just regard all paths are dirty.

TLV::TLVWriter attributeBackup;
attributeReportIBs.Checkpoint(attributeBackup);
// Retrieve data for this cluster instance and clear its dirty flag.
err = RetrieveClusterData(apReadHandler->GetFabricIndex(), attributeReportIBs, path);
err = RetrieveClusterData(apReadHandler->GetFabricIndex(), attributeReportIBs, readPath);
if (err != CHIP_NO_ERROR)
{
// We met a error during writing reports, one common case is we are running out of buffer, rollback the
// attributeReportIB to avoid any possible dirty data written.
attributeReportIBs.Rollback(attributeBackup);
}
SuccessOrExit(err);
attributeDataWritten = true;
}
mMoreChunkedMessages = false;
// We just visited all paths interested by this read handler and did not abort in the middle of iteration, there are no more
// chunks for this report.
hasMoreChunks = false;
}
exit:
if ((err == CHIP_ERROR_BUFFER_TOO_SMALL) || (err == CHIP_ERROR_NO_MEMORY))
Expand All @@ -155,11 +160,16 @@ CHIP_ERROR Engine::BuildSingleReportDataAttributeReportIBs(ReportDataMessage::Bu
{
aReportDataBuilder.Rollback(backup);
}
else if (apHasMoreChunks != nullptr)
{
*apHasMoreChunks = hasMoreChunks;
}

return err;
}

CHIP_ERROR Engine::BuildSingleReportDataEventReports(ReportDataMessage::Builder & aReportDataBuilder, ReadHandler * apReadHandler)
CHIP_ERROR Engine::BuildSingleReportDataEventReports(ReportDataMessage::Builder & aReportDataBuilder, ReadHandler * apReadHandler,
bool * apHasMoreChunks)
{
CHIP_ERROR err = CHIP_NO_ERROR;
size_t eventCount = 0;
Expand All @@ -170,6 +180,7 @@ CHIP_ERROR Engine::BuildSingleReportDataEventReports(ReportDataMessage::Builder
EventNumber * eventNumberList = apReadHandler->GetVendedEventNumberList();
EventManagement & eventManager = EventManagement::GetInstance();
EventReports::Builder EventReports;
bool hasMoreChunks = false;

aReportDataBuilder.Checkpoint(backup);

Expand Down Expand Up @@ -214,7 +225,7 @@ CHIP_ERROR Engine::BuildSingleReportDataEventReports(ReportDataMessage::Builder
// priority level.
err = CHIP_NO_ERROR;
apReadHandler->MoveToNextScheduledDirtyPriority();
mMoreChunkedMessages = false;
hasMoreChunks = false;
}
else if ((err == CHIP_ERROR_BUFFER_TOO_SMALL) || (err == CHIP_ERROR_NO_MEMORY))
{
Expand All @@ -238,7 +249,7 @@ CHIP_ERROR Engine::BuildSingleReportDataEventReports(ReportDataMessage::Builder
err = CHIP_NO_ERROR;
break;
}
mMoreChunkedMessages = true;
hasMoreChunks = true;
}
else
{
Expand All @@ -259,6 +270,10 @@ CHIP_ERROR Engine::BuildSingleReportDataEventReports(ReportDataMessage::Builder
{
aReportDataBuilder.Rollback(backup);
}
else if (apHasMoreChunks != nullptr)
{
*apHasMoreChunks = hasMoreChunks;
}
return err;
}

Expand All @@ -269,6 +284,7 @@ CHIP_ERROR Engine::BuildAndSendSingleReportData(ReadHandler * apReadHandler)
ReportDataMessage::Builder reportDataBuilder;
chip::System::PacketBufferHandle bufHandle = System::PacketBufferHandle::New(chip::app::kMaxSecureSduLengthBytes);
uint16_t reservedSize = 0;
bool hasMoreChunks = false;

VerifyOrExit(!bufHandle.IsNull(), err = CHIP_ERROR_NO_MEMORY);

Expand All @@ -279,8 +295,8 @@ CHIP_ERROR Engine::BuildAndSendSingleReportData(ReadHandler * apReadHandler)

reportDataWriter.Init(std::move(bufHandle));

// Some platforms, the actual buffer capacity might be larger than kMaxSecureSduLengthBytes, then we may write more data in one
// chunk than expected. Use the ReserveBuffer to limit the use of buffer.
// Always limit the size of the generated packet to fit within kMaxSecureSduLengthBytes regardless of the available buffer
// capacity.
reportDataWriter.ReserveBuffer(reservedSize);

// Create a report data.
Expand All @@ -296,16 +312,16 @@ CHIP_ERROR Engine::BuildAndSendSingleReportData(ReadHandler * apReadHandler)

SuccessOrExit(err = reportDataWriter.ReserveBuffer(Engine::kReservedSizeForMoreChunksFlag));

err = BuildSingleReportDataAttributeReportIBs(reportDataBuilder, apReadHandler);
err = BuildSingleReportDataAttributeReportIBs(reportDataBuilder, apReadHandler, &hasMoreChunks);
SuccessOrExit(err);

err = BuildSingleReportDataEventReports(reportDataBuilder, apReadHandler);
err = BuildSingleReportDataEventReports(reportDataBuilder, apReadHandler, &hasMoreChunks);
SuccessOrExit(err);

SuccessOrExit(err = reportDataWriter.UnreserveBuffer(Engine::kReservedSizeForMoreChunksFlag));
if (mMoreChunkedMessages)
if (hasMoreChunks)
{
reportDataBuilder.MoreChunkedMessages(mMoreChunkedMessages);
reportDataBuilder.MoreChunkedMessages(hasMoreChunks);
}

reportDataBuilder.EndOfReportDataMessage();
Expand All @@ -332,12 +348,12 @@ CHIP_ERROR Engine::BuildAndSendSingleReportData(ReadHandler * apReadHandler)
#endif // CHIP_CONFIG_IM_ENABLE_SCHEMA_CHECK

ChipLogDetail(DataManagement, "<RE> Sending report (payload has %" PRIu32 " bytes)...", reportDataWriter.GetLengthWritten());
err = SendReport(apReadHandler, std::move(bufHandle));
err = SendReport(apReadHandler, std::move(bufHandle), hasMoreChunks);
VerifyOrExit(err == CHIP_NO_ERROR,
ChipLogError(DataManagement, "<RE> Error sending out report data with %" CHIP_ERROR_FORMAT "!", err.Format()));

ChipLogDetail(DataManagement, "<RE> ReportsInFlight = %" PRIu32 " with readHandler %" PRIu32 ", RE has %s", mNumReportsInFlight,
mCurReadHandlerIdx, mMoreChunkedMessages ? "more messages" : "no more messages");
mCurReadHandlerIdx, hasMoreChunks ? "more messages" : "no more messages");

exit:
if (err != CHIP_NO_ERROR)
Expand Down Expand Up @@ -457,13 +473,13 @@ void Engine::UpdateReadHandlerDirty(ReadHandler & aReadHandler)
}
}

CHIP_ERROR Engine::SendReport(ReadHandler * apReadHandler, System::PacketBufferHandle && aPayload)
CHIP_ERROR Engine::SendReport(ReadHandler * apReadHandler, System::PacketBufferHandle && aPayload, bool aHasMoreChunks)
{
CHIP_ERROR err = CHIP_NO_ERROR;

// We can only have 1 report in flight for any given read - increment and break out.
mNumReportsInFlight++;
err = apReadHandler->SendReportData(std::move(aPayload), mMoreChunkedMessages);
err = apReadHandler->SendReportData(std::move(aPayload), aHasMoreChunks);
return err;
}

Expand Down
14 changes: 5 additions & 9 deletions src/app/reporting/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,10 @@ class Engine
*/
CHIP_ERROR BuildAndSendSingleReportData(ReadHandler * apReadHandler);

CHIP_ERROR BuildSingleReportDataAttributeReportIBs(ReportDataMessage::Builder & reportDataBuilder, ReadHandler * apReadHandler);
CHIP_ERROR BuildSingleReportDataEventReports(ReportDataMessage::Builder & reportDataBuilder, ReadHandler * apReadHandler);
CHIP_ERROR BuildSingleReportDataAttributeReportIBs(ReportDataMessage::Builder & reportDataBuilder, ReadHandler * apReadHandler,
bool * apHasMoreChunks);
CHIP_ERROR BuildSingleReportDataEventReports(ReportDataMessage::Builder & reportDataBuilder, ReadHandler * apReadHandler,
bool * apHasMoreChunks);
CHIP_ERROR RetrieveClusterData(FabricIndex aAccessingFabricIndex, AttributeReportIBs::Builder & aAttributeReportIBs,
const ConcreteAttributePath & aClusterInfo);
EventNumber CountEvents(ReadHandler * apReadHandler, EventNumber * apInitialEvents);
Expand All @@ -108,20 +110,14 @@ class Engine
* Send Report via ReadHandler
*
*/
CHIP_ERROR SendReport(ReadHandler * apReadHandler, System::PacketBufferHandle && aPayload);
CHIP_ERROR SendReport(ReadHandler * apReadHandler, System::PacketBufferHandle && aPayload, bool aHasMoreChunks);

/**
* Generate and send the report data request when there exists subscription or read request
*
*/
static void Run(System::Layer * aSystemLayer, void * apAppState);

/**
* Boolean to show if more chunk message on the way
*
*/
bool mMoreChunkedMessages = false;

/**
* Boolean to indicate if ScheduleRun is pending. This flag is used to prevent calling ScheduleRun multiple times
* within the same execution context to avoid applying too much pressure on platforms that use small, fixed size event queues.
Expand Down
1 change: 1 addition & 0 deletions src/app/tests/TestReadInteraction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,7 @@ void TestReadInteraction::TestReadChunking(nlTestSuite * apSuite, void * apConte

InteractionModelEngine::GetInstance()->GetReportingEngine().Run();
InteractionModelEngine::GetInstance()->GetReportingEngine().Run();

NL_TEST_ASSERT(apSuite, delegate.mNumAttributeResponse == 6);
NL_TEST_ASSERT(apSuite, delegate.mGotReport);
NL_TEST_ASSERT(apSuite, !delegate.mReadError);
Expand Down

0 comments on commit 6924534

Please sign in to comment.