-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[multistage] partial operator chain execution #9711
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks mostly good. although the encoding part seems less than elegant but i don't have a better solution.
|
||
if (!_readyToConstruct) { | ||
return TransferableBlockUtils.getNoOpTransferableBlock(); | ||
} | ||
|
||
return produceAggregatedBlock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wondering if we can wrapped around these with a BaseTransferrableOperator than handles metadata block (NOOP, ERROR, not sure if possible for EOS)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
se comment below, it isn't straightforward to make this clean (there isn't even a clean way to get the child block because you might have to know whether to read from left/right in case of a join or whether to read at all in the case that the operator is currently a 0->1 operator)
if (transferableBlock.isNoOpBlock()) { | ||
continue; | ||
} else if (transferableBlock.isEndOfStreamBlock()) { | ||
return resultDataBlocks; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these should be after null check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, we shouldn't be adding noops/eos to the results - right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in transferableBlock.isNoOpBlock()
it checks metadataBlock.getType() == MetadataBlock.MetadataBlockType.NOOP
. however metadataBlock can be null. which will throw NPE, no?
should we have a null checker? transferableBlock.getDataBlock() != null ?
previously the null check is not necessary b/c it only look at the BaseDataBlock.Type _type
member variable which cannot be null in TransferableBlock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, looking at the code I don't think TransferableBlock#getDataBlock
can ever return null
- if the field is null, it either builds the block or throws an exception, otherwise it returns the field.
I'll remove this check altogether since I think it's just misleading. I still think we shouldn't be adding to the result table on metadata blocks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
...t-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
Show resolved
Hide resolved
cdff0ce
to
e9be537
Compare
pinot-common/src/main/java/org/apache/pinot/common/datablock/MetadataBlock.java
Show resolved
Hide resolved
// if type is null, then we're reading a legacy block where we didn't encode any | ||
// data. assume that it is an EOS block if there's no exceptions and an ERROR block | ||
// otherwise | ||
return type == null | ||
? (getExceptions().isEmpty() ? MetadataBlockType.EOS : MetadataBlockType.ERROR) | ||
: MetadataBlockType.valueOf(type); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we dont need to consider backward compatibility here. --> if the type is null, that means it is a legacy block. you will throw when deserializing anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was very careful to make sure that it won't throw when deserializing (see the tests). is there any reason why we don't need to consider backwards compatibility? if I don't then I can be less hacky in the serialization format!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please let me know if my understanding is correct. if the metadata block transferred over the wire is of previous version. then using the current version of the code it cannot reconstruct a metadata block back from the byteBuffer
(as it will not be encoded using jackson).
in that case we will never reach a situation where the byteBuffer is decodable, and type is null. correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the metadata block transferred over the wire is of previous version. then using the current version of the code it cannot reconstruct a metadata block back from the byteBuffer
that's not correct - it can decode the byteBuffer, the only difference is that it will read it with an empty _variableBytesData
, which will mean the JSON contents will be empty. (see the MetadataBlockTest
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah... ok. yeah in that case we should keep this. thanks for the explanation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall looks good. got some minor comments
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
Show resolved
Hide resolved
...t-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
Outdated
Show resolved
Hide resolved
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
Outdated
Show resolved
Hide resolved
...t-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
Outdated
Show resolved
Hide resolved
// all the mailboxes we opened returned null but were not yet closed - early terminate | ||
// with a noop block. Otherwise, we have exhausted all data from all mailboxes and can | ||
// return EOS | ||
return openMailboxCount > 0 && (openMailboxCount != eosCount) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This condition is a bit hard for me to validate. can't we just do openMailboxCount > 0
?
IIUC, the last one is only for when you exactly close a mailbox afterwards and save another call to the getNextBlock() only to return an EOS, yes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah this condition isn't necessary, it's technically an optimization to avoid needing another call. I'll remove it (at first I thought it was necessary, but it was actually a different bug that I was figuring out)
if (transferableBlock.isNoOpBlock()) { | ||
continue; | ||
} else if (transferableBlock.isEndOfStreamBlock()) { | ||
return resultDataBlocks; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in transferableBlock.isNoOpBlock()
it checks metadataBlock.getType() == MetadataBlock.MetadataBlockType.NOOP
. however metadataBlock can be null. which will throw NPE, no?
should we have a null checker? transferableBlock.getDataBlock() != null ?
previously the null check is not necessary b/c it only look at the BaseDataBlock.Type _type
member variable which cannot be null in TransferableBlock
@walterddr just wanted to make sure you don't merge until tests pass, looks like the last commit actually introduced some regressions... double checking that now |
Codecov Report
@@ Coverage Diff @@
## master #9711 +/- ##
=============================================
+ Coverage 28.06% 70.05% +41.99%
- Complexity 53 4980 +4927
=============================================
Files 1939 1951 +12
Lines 104163 104561 +398
Branches 15792 15836 +44
=============================================
+ Hits 29231 73252 +44021
+ Misses 72068 26180 -45888
- Partials 2864 5129 +2265
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
3ef8734
to
2134a21
Compare
had to rebase after #9729 and #9676 - @walterddr the changes are in |
…9753) This is a follow-up to #9711 and follows the design outlined in [this design doc](https://docs.google.com/document/d/1XAMHAlhFbINvX-kK1ANlzbRz4_RkS0map4qhqs1yDtE/edit#heading=h.de4smgkh3bzk). This PR implements a round robin operator chain scheduling algorithm and sets up the interface for future PRs that will implement more advanced scheduling. As of this PR, we can be guaranteed that all queries will make progress (see the change in `pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SSBQueryIntegrationTest.java`, you can now run it under situations with only 2 cores available) but the algorithm is still very hungry for CPU (queries with nothing in their mailbox will still be scheduled).
See the design doc for a big picture view.
This is the first PR in a series of PRs to improve our execution model. It implements "partial execution" of operator chains by allowing them to return a "noop"
MetadataBlock
in the scenario where there is either no data to process or no data to output.This PR is a non-functional change because the
WorkerQueryExecutor
doesn't actually take advantage of the partial execution ability - it just callsoperator#nextBlock
whenever it processes a noop block.PR Review Guide
This PR is broken into 3 functional commits, which I recommend you review in order:
BaseDataBlock
to encode a JSON object with additional metadata. It's a little hacky, but in the grand scheme of things it's very localized and allows us to have a lot of flexibility in how we use the metadata blocks going forward and maintains backwards compatibility with the existing code. Specifically, this is used to introduce aNOOP
metadata block type that will be used to signal to the future scheduler that the task has completed the processing that it can do at the moment.MailboxReceiveOperator
when it has nothing available in its mailboxes and (b) stateful operators such as Sort/HashJoin that process a single block without producing anything (they need to process all blocks before producing).MetadataBlock
- specifically making sure that it is backwards compatible with the existing code.Testing
We don't currently have any tests for the operators in the multistage engine, so it was tough to add that into this PR. I will follow this one up with one dedicated to testing operators.