if (jobGraph.isDynamic()) { // 支持动态扩缩容场景,为动态图设置并行度 setVertexParallelismsForDynamicGraphIfNecessary(); }
// Note that we set all the non-chainable outputs configuration here because the // "setVertexParallelismsForDynamicGraphIfNecessary" may affect the parallelism of job // vertices and partition-reuse final Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs = newHashMap<>(); // 设置不能链化的输出边 setAllOperatorNonChainedOutputsConfigs(opIntermediateOutputs, jobVertexBuildContext); setAllVertexNonChainedOutputsConfigs(opIntermediateOutputs);
// Wait for the serialization of operator coordinators and stream config. // 序列化操作符协调器和流配置 serializeOperatorCoordinatorsAndStreamConfig(serializationExecutor, jobVertexBuildContext);
privatevoidsetChaining() { // we separate out the sources that run as inputs to another operator (chained inputs) // from the sources that needs to run as the main (head) operator. final Map<Integer, OperatorChainInfo> chainEntryPoints = buildChainedInputsAndGetHeadInputs(); final Collection<OperatorChainInfo> initialEntryPoints = chainEntryPoints.entrySet().stream() .sorted(Comparator.comparing(Map.Entry::getKey)) .map(Map.Entry::getValue) .collect(Collectors.toList());
// iterate over a copy of the values, because this map gets concurrently modified for (OperatorChainInfo info : initialEntryPoints) { createChain( info.getStartNodeId(), 1, // operators start at position 1 because 0 is for chained source inputs info, chainEntryPoints, true, serializationExecutor, jobVertexBuildContext, null); } }
// check that we do not have a union operation, because unions currently only work // through the network/byte-channel stack. // we check that by testing that each "type" (which means input position) is used only once // 检查是否为 Union 操作,Union 操作不能链化 for (StreamEdge inEdge : downStreamVertex.getInEdges()) { if (inEdge != edge && inEdge.getTypeNumber() == edge.getTypeNumber()) { returnfalse; } } returntrue; }
publicstatic List<StreamEdge> createChain( final Integer currentNodeId, finalint chainIndex, final OperatorChainInfo chainInfo, final Map<Integer, OperatorChainInfo> chainEntryPoints, finalboolean canCreateNewChain, final Executor serializationExecutor, final JobVertexBuildContext jobVertexBuildContext, final@Nullable Consumer<Integer> visitedStreamNodeConsumer) {
......
// 拆分可链化边和不可链化边 for (StreamEdge outEdge : currentNode.getOutEdges()) { if (isChainable(outEdge, streamGraph)) { chainableOutputs.add(outEdge); } else { nonChainableOutputs.add(outEdge); } }
// 处理可链化边 for (StreamEdge chainable : chainableOutputs) { StreamNodetargetNode= streamGraph.getStreamNode(chainable.getTargetId()); AttributetargetNodeAttribute= targetNode.getAttribute(); if (isNoOutputUntilEndOfInput) { if (targetNodeAttribute != null) { targetNodeAttribute.setNoOutputUntilEndOfInput(true); } } transitiveOutEdges.addAll( createChain( chainable.getTargetId(), chainIndex + 1, chainInfo, chainEntryPoints, canCreateNewChain, serializationExecutor, jobVertexBuildContext, visitedStreamNodeConsumer)); // Mark upstream nodes in the same chain as outputBlocking if (targetNodeAttribute != null && targetNodeAttribute.isNoOutputUntilEndOfInput()) { currentNodeAttribute.setNoOutputUntilEndOfInput(true); } }
// 处理不可链化边 for (StreamEdge nonChainable : nonChainableOutputs) { transitiveOutEdges.add(nonChainable); // Used to control whether a new chain can be created, this value is true in the // full graph generation algorithm and false in the progressive generation // algorithm. In the future, this variable can be a boolean type function to adapt // to more adaptive scenarios. if (canCreateNewChain) { createChain( nonChainable.getTargetId(), 1, // operators start at position 1 because 0 is for chained source // inputs chainEntryPoints.computeIfAbsent( nonChainable.getTargetId(), (k) -> chainInfo.newChain(nonChainable.getTargetId())), chainEntryPoints, canCreateNewChain, serializationExecutor, jobVertexBuildContext, visitedStreamNodeConsumer); } }