while (record.hasRemaining()) { // full buffer, partial record finishUnicastBufferBuilder(targetSubpartition); buffer = appendUnicastDataForRecordContinuation(record, targetSubpartition); }
if (buffer.isFull()) { // full buffer, full record finishUnicastBufferBuilder(targetSubpartition); }
// partial buffer, full record }
这里把 record 写入到 buffer 中,如果 buffer 不够,则会从 LocalBufferPool 中申请新的 buffer,申请到之后就会继续写入。下面是具体的申请过程。
if (!availableMemorySegments.isEmpty()) { segment = availableMemorySegments.poll(); } elseif (isRequestedSizeReached()) { // Only when the buffer request reaches the upper limit(i.e. current pool size), // requests an overdraft buffer. segment = requestOverdraftMemorySegmentFromGlobal(); }
if (segment == null) { returnnull; }
if (targetChannel != UNKNOWN_CHANNEL) { if (++subpartitionBuffersCount[targetChannel] == maxBuffersPerChannel) { unavailableSubpartitionsCount++; } }
public DataInputStatus emitNext(DataOutput<T> output)throws Exception {
while (true) { // get the stream element from the deserializer if (currentRecordDeserializer != null) { RecordDeserializer.DeserializationResult result; try { result = currentRecordDeserializer.getNextRecord(deserializationDelegate); } catch (IOException e) { thrownewIOException( String.format("Can't get next record for channel %s", lastChannel), e); } if (result.isBufferConsumed()) { currentRecordDeserializer = null; }
if (result.isFullRecord()) { finalbooleanbreakBatchEmitting= processElement(deserializationDelegate.getInstance(), output); if (canEmitBatchOfRecords.check() && !breakBatchEmitting) { continue; } return DataInputStatus.MORE_AVAILABLE; } }
Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext(); if (bufferOrEvent.isPresent()) { // return to the mailbox after receiving a checkpoint barrier to avoid processing of // data after the barrier before checkpoint is performed for unaligned checkpoint // mode if (bufferOrEvent.get().isBuffer()) { processBuffer(bufferOrEvent.get()); } else { DataInputStatusstatus= processEvent(bufferOrEvent.get(), output); if (status == DataInputStatus.MORE_AVAILABLE && canEmitBatchOfRecords.check()) { continue; } return status; } } else { if (checkpointedInputGate.isFinished()) { checkState( checkpointedInputGate.getAvailableFuture().isDone(), "Finished BarrierHandler should be available"); return DataInputStatus.END_OF_INPUT; } return DataInputStatus.NOTHING_AVAILABLE; } } }
public Optional<BufferAndAvailability> getNextBuffer()throws IOException { checkError();
if (!toBeConsumedBuffers.isEmpty()) { return getBufferAndAvailability(toBeConsumedBuffers.removeFirst()); }
ResultSubpartitionViewsubpartitionView=this.subpartitionView; if (subpartitionView == null) { // There is a possible race condition between writing a EndOfPartitionEvent (1) and // flushing (3) the Local // channel on the sender side, and reading EndOfPartitionEvent (2) and processing flush // notification (4). When // they happen in that order (1 - 2 - 3 - 4), flush notification can re-enqueue // LocalInputChannel after (or // during) it was released during reading the EndOfPartitionEvent (2). if (isReleased) { return Optional.empty(); }
// this can happen if the request for the partition was triggered asynchronously // by the time trigger // would be good to avoid that, by guaranteeing that the requestPartition() and // getNextBuffer() always come from the same thread // we could do that by letting the timer insert a special "requesting channel" into the // input gate's queue subpartitionView = checkAndWaitForSubpartitionView(); }
BufferAndBacklognext= subpartitionView.getNextBuffer(); // ignore the empty buffer directly while (next != null && next.buffer().readableBytes() == 0) { next.buffer().recycleBuffer(); next = subpartitionView.getNextBuffer(); numBuffersIn.inc(); }
if (next == null) { if (subpartitionView.isReleased()) { thrownewCancelTaskException( "Consumed partition " + subpartitionView + " has been released."); } else { return Optional.empty(); } }