// create a new execution graph, if none exists so far finalDefaultExecutionGraphexecutionGraph= newDefaultExecutionGraph( jobInformation, futureExecutor, ioExecutor, rpcTimeout, executionHistorySizeLimit, classLoader, blobWriter, partitionGroupReleaseStrategyFactory, shuffleMaster, partitionTracker, executionDeploymentListener, executionStateUpdateListener, initializationTimestamp, vertexAttemptNumberStore, vertexParallelismStore, isDynamicGraph, executionJobVertexFactory, jobGraph.getJobStatusHooks(), markPartitionFinishedStrategy, taskDeploymentDescriptorFactory, jobStatusChangedListeners, executionPlanSchedulingContext);
try { executionGraph.setPlan(JsonPlanGenerator.generatePlan(jobGraph)); } catch (Throwable t) { log.warn("Cannot create plan for job", t); // give the graph an empty plan executionGraph.setPlan(newJobPlanInfo.Plan("", "", "", newArrayList<>())); }
// topologically sort the job vertices and attach the graph to the existing one List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
public List<JobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException { // early out on empty lists if (this.taskVertices.isEmpty()) { return Collections.emptyList(); }
// start by finding the vertices with no input edges // and the ones with disconnected inputs (that refer to some standalone data set) { Iterator<JobVertex> iter = remaining.iterator(); while (iter.hasNext()) { JobVertexvertex= iter.next();
if (vertex.isInputVertex()) { sorted.add(vertex); iter.remove(); } } }
intstartNodePos=0;
// traverse from the nodes that were added until we found all elements while (!remaining.isEmpty()) {
// first check if we have more candidates to start traversing from. if not, then the // graph is cyclic, which is not permitted if (startNodePos >= sorted.size()) { thrownewInvalidProgramException("The job graph is cyclic."); }
LOG.debug( "Attaching {} topologically sorted vertices to existing job graph with {} " + "vertices and {} intermediate results.", verticesToAttach.size(), tasks.size(), intermediateResults.size());
attachJobVertices(verticesToAttach, jobManagerJobMetricGroup); if (!isDynamic) { initializeJobVertices(verticesToAttach); }
// the topology assigning should happen before notifying new vertices to failoverStrategy executionTopology = DefaultExecutionTopology.fromExecutionGraph(this);
// create the execution job vertex and attach it to the graph ExecutionJobVertexejv= executionJobVertexFactory.createExecutionJobVertex( this, jobVertex, parallelismInfo, coordinatorStore, jobManagerJobMetricGroup);
ExecutionJobVertexpreviousTask=this.tasks.putIfAbsent(jobVertex.getID(), ejv); if (previousTask != null) { thrownewJobException( String.format( "Encountered two job vertices with ID %s : previous=[%s] / new=[%s]", jobVertex.getID(), ejv, previousTask)); }
IndexRangepartitionRange=newIndexRange(partitionNum, partitionNum); // Variable used in lambda expression should be final or effectively final finalintfinalPartitionNum= partitionNum; for (inti= start; i < end; i++) { IndexRangesubpartitionRange= computeConsumedSubpartitionRange( i, numConsumers, () -> numOfSubpartitionsRetriever.apply(finalPartitionNum), isDynamicGraph, false, false); executionVertexInputInfos.add( newExecutionVertexInputInfo(i, partitionRange, subpartitionRange)); } } } returnnewJobVertexInputInfo(executionVertexInputInfos); }
for (IntermediateResult res : ejv.getProducedDataSets()) { IntermediateResultpreviousDataSet= this.intermediateResults.putIfAbsent(res.getId(), res); if (previousDataSet != null) { thrownewJobException( String.format( "Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]", res.getId(), res, previousDataSet)); } }
// create all task vertices for (inti=0; i < this.parallelismInfo.getParallelism(); i++) { ExecutionVertexvertex= createExecutionVertex( this, i, producedDataSets, timeout, createTimestamp, executionHistorySizeLimit, initialAttemptCounts.getAttemptCount(i));
this.taskVertices[i] = vertex; }
// sanity check for the double referencing between intermediate result partitions and // execution vertices for (IntermediateResult ir : this.producedDataSets) { if (ir.getNumberOfAssignedPartitions() != this.parallelismInfo.getParallelism()) { thrownewRuntimeException( "The intermediate result's partitions were not correctly assigned."); } }
// set up the input splits, if the vertex has any try { @SuppressWarnings("unchecked") InputSplitSource<InputSplit> splitSource = (InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();