publicfinalvoidinvoke()throws Exception { // Allow invoking method 'invoke' without having to call 'restore' before it. if (!isRunning) { LOG.debug("Restoring during invoke will be called."); restoreInternal(); }
// final check to exit early before starting to run ensureNotCanceled();
scheduleBufferDebloater();
// let the task do its work getEnvironment().getMetricGroup().getIOMetricGroup().markTaskStart(); runMailboxLoop();
// if this left the run() method cleanly despite the fact that this was canceled, // make sure the "clean shutdown" is not attempted ensureNotCanceled();
while (isNextLoopPossible()) { // The blocking `processMail` call will not return until default action is available. processMail(localMailbox, false); if (isNextLoopPossible()) { mailboxDefaultAction.runDefaultAction( mailboxController); // lock is acquired inside default action as needed } } }
privatebooleanisNextLoopPossible() { // 'Suspended' can be false only when 'mailboxLoopRunning' is true. return !suspended; }
然后就进入到 while (isNextLoopPossible()) 循环了,循环中调用了 processMail,在这个方法中对 mail 进行处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
privatebooleanprocessMail(TaskMailbox mailbox, boolean singleStep)throws Exception { // Doing this check is an optimization to only have a volatile read in the expected hot // path, locks are only // acquired after this point. booleanisBatchAvailable= mailbox.createBatch();
// Take mails in a non-blockingly and execute them. booleanprocessed= isBatchAvailable && processMailsNonBlocking(singleStep); if (singleStep) { return processed; }
// If the default action is currently not available, we can run a blocking mailbox execution // until the default action becomes available again. processed |= processMailsWhenDefaultActionUnavailable();
while (isNextLoopPossible() && (maybeMail = mailbox.tryTakeFromBatch()).isPresent()) { if (processedMails++ == 0) { maybePauseIdleTimer(); } runMail(maybeMail.get()); if (singleStep) { break; } } if (processedMails > 0) { maybeRestartIdleTimer(); returntrue; } else { returnfalse; } }
privatevoidrunMail(Mail mail)throws Exception { mailboxMetricsControl.getMailCounter().inc(); mail.run(); if (!suspended) { // start latency measurement on first mail that is not suspending mailbox execution, // i.e., on first non-poison mail, otherwise latency measurement is not started to avoid // overhead if (!mailboxMetricsControl.isLatencyMeasurementStarted() && mailboxMetricsControl.isLatencyMeasurementSetup()) { mailboxMetricsControl.startLatencyMeasurement(); } } }
protectedvoidprocessInput(MailboxDefaultAction.Controller controller)throws Exception { DataInputStatusstatus= inputProcessor.processInput(); switch (status) { case MORE_AVAILABLE: if (taskIsAvailable()) { return; } break; case NOTHING_AVAILABLE: break; case END_OF_RECOVERY: thrownewIllegalStateException("We should not receive this event here."); case STOPPED: endData(StopMode.NO_DRAIN); return; case END_OF_DATA: endData(StopMode.DRAIN); notifyEndOfData(); return; case END_OF_INPUT: // Suspend the mailbox processor, it would be resumed in afterInvoke and finished // after all records processed by the downstream tasks. We also suspend the default // actions to avoid repeat executing the empty default operation (namely process // records). controller.suspendDefaultAction(); mailboxProcessor.suspend(); return; }
... }
当 status 是 MORE_AVAILABLE,表示还有更多数据可用立即处理,判断当前任务可用就立即返回。当 status 是 END_OF_INPUT 时,表示所有的输入都结束了,这时就会暂停循环事件的调用。
if (noUnfinishedInputGates) { result.complete( triggerCheckpointAsyncInMailbox( checkpointMetaData, checkpointOptions)); } else { result.complete( triggerUnfinishedChannelsCheckpoint( checkpointMetaData, checkpointOptions)); } } catch (Exception ex) { // Report the failure both via the Future result but also to the mailbox result.completeExceptionally(ex); throw ex; } }, "checkpoint %s with %s", checkpointMetaData, checkpointOptions); return result; }