Разговор о параллельном выполнении flink

Flink

последовательность

В этой статье в основном изучается параллельное выполнение flink.

пример

Operator Level

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
    .flatMap(new LineSplitter())
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1).setParallelism(5);

wordCounts.print();

env.execute("Word Count Example");
  • Операторы, источники данных и приемники данных могут вызывать метод setParallelism() для установки параллелизма.

Execution Environment Level

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = [...]
wordCounts.print();

env.execute("Word Count Example");
  • В ExecutionEnvironment вы можете использовать setParallelism для установки параллелизма по умолчанию для операторов, источников данных и приемников данных; если операторы, источники данных и приемники данных имеют собственный набор параллелизма, они переопределяют параллелизм, заданный ExecutionEnvironment.

Client Level

./bin/flink run -p 10 ../examples/*WordCount-java*.jar

или

try {
    PackagedProgram program = new PackagedProgram(file, args);
    InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
    Configuration config = new Configuration();

    Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());

    // set the parallelism to 10 here
    client.run(program, 10, true);

} catch (ProgramInvocationException e) {
    e.printStackTrace();
}
  • Используя клиент CLI, вы можете указать параллелизм при вызове командной строки с помощью -p или указать параллелизм в параметре Client.run при вызове Java/Scala.

System Level

# The parallelism used for programs that did not specify and other parallelism.

parallelism.default: 1
  • Параллелизм по умолчанию на системном уровне можно указать для всех сред выполнения с помощью элемента конфигурации parallelism.default в файле flink-conf.yaml.

ExecutionEnvironment

flink-java-1.7.1-sources.jar!/org/apache/flink/api/java/ExecutionEnvironment.java

@Public
public abstract class ExecutionEnvironment {
	//......

	private final ExecutionConfig config = new ExecutionConfig();

	/**
	 * Sets the parallelism for operations executed through this environment.
	 * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run with
	 * x parallel instances.
	 *
	 * <p>This method overrides the default parallelism for this environment.
	 * The {@link LocalEnvironment} uses by default a value equal to the number of hardware
	 * contexts (CPU cores / threads). When executing the program via the command line client
	 * from a JAR file, the default parallelism is the one configured for that setup.
	 *
	 * @param parallelism The parallelism
	 */
	public void setParallelism(int parallelism) {
		config.setParallelism(parallelism);
	}

	@Internal
	public Plan createProgramPlan(String jobName, boolean clearSinks) {
		if (this.sinks.isEmpty()) {
			if (wasExecuted) {
				throw new RuntimeException("No new data sinks have been defined since the " +
						"last execution. The last execution refers to the latest call to " +
						"'execute()', 'count()', 'collect()', or 'print()'.");
			} else {
				throw new RuntimeException("No data sinks have been created yet. " +
						"A program needs at least one sink that consumes data. " +
						"Examples are writing the data set or printing it.");
			}
		}

		if (jobName == null) {
			jobName = getDefaultName();
		}

		OperatorTranslation translator = new OperatorTranslation();
		Plan plan = translator.translateToPlan(this.sinks, jobName);

		if (getParallelism() > 0) {
			plan.setDefaultParallelism(getParallelism());
		}
		plan.setExecutionConfig(getConfig());

		// Check plan for GenericTypeInfo's and register the types at the serializers.
		if (!config.isAutoTypeRegistrationDisabled()) {
			plan.accept(new Visitor<org.apache.flink.api.common.operators.Operator<?>>() {

				private final Set<Class<?>> registeredTypes = new HashSet<>();
				private final Set<org.apache.flink.api.common.operators.Operator<?>> visitedOperators = new HashSet<>();

				@Override
				public boolean preVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {
					if (!visitedOperators.add(visitable)) {
						return false;
					}
					OperatorInformation<?> opInfo = visitable.getOperatorInfo();
					Serializers.recursivelyRegisterType(opInfo.getOutputType(), config, registeredTypes);
					return true;
				}

				@Override
				public void postVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {}
			});
		}

		try {
			registerCachedFilesWithPlan(plan);
		} catch (Exception e) {
			throw new RuntimeException("Error while registering cached files: " + e.getMessage(), e);
		}

		// clear all the sinks such that the next execution does not redo everything
		if (clearSinks) {
			this.sinks.clear();
			wasExecuted = true;
		}

		// All types are registered now. Print information.
		int registeredTypes = config.getRegisteredKryoTypes().size() +
				config.getRegisteredPojoTypes().size() +
				config.getRegisteredTypesWithKryoSerializerClasses().size() +
				config.getRegisteredTypesWithKryoSerializers().size();
		int defaultKryoSerializers = config.getDefaultKryoSerializers().size() +
				config.getDefaultKryoSerializerClasses().size();
		LOG.info("The job has {} registered types and {} default Kryo serializers", registeredTypes, defaultKryoSerializers);

		if (config.isForceKryoEnabled() && config.isForceAvroEnabled()) {
			LOG.warn("In the ExecutionConfig, both Avro and Kryo are enforced. Using Kryo serializer");
		}
		if (config.isForceKryoEnabled()) {
			LOG.info("Using KryoSerializer for serializing POJOs");
		}
		if (config.isForceAvroEnabled()) {
			LOG.info("Using AvroSerializer for serializing POJOs");
		}

		if (LOG.isDebugEnabled()) {
			LOG.debug("Registered Kryo types: {}", config.getRegisteredKryoTypes().toString());
			LOG.debug("Registered Kryo with Serializers types: {}", config.getRegisteredTypesWithKryoSerializers().entrySet().toString());
			LOG.debug("Registered Kryo with Serializer Classes types: {}", config.getRegisteredTypesWithKryoSerializerClasses().entrySet().toString());
			LOG.debug("Registered Kryo default Serializers: {}", config.getDefaultKryoSerializers().entrySet().toString());
			LOG.debug("Registered Kryo default Serializers Classes {}", config.getDefaultKryoSerializerClasses().entrySet().toString());
			LOG.debug("Registered POJO types: {}", config.getRegisteredPojoTypes().toString());

			// print information about static code analysis
			LOG.debug("Static code analysis mode: {}", config.getCodeAnalysisMode());
		}

		return plan;
	}

	//......
}
  • ExecutionEnvironment предоставляет метод setParallelism, который задает параллелизм для ExecutionConfig; наконец, метод createProgramPlan считывает параллелизм ExecutionConfig после создания плана и устанавливает defaultParallelism для плана.

LocalEnvironment

flink-java-1.7.1-sources.jar!/org/apache/flink/api/java/LocalEnvironment.java

@Public
public class LocalEnvironment extends ExecutionEnvironment {

	//......

	public JobExecutionResult execute(String jobName) throws Exception {
		if (executor == null) {
			startNewSession();
		}

		Plan p = createProgramPlan(jobName);

		// Session management is disabled, revert this commit to enable
		//p.setJobId(jobID);
		//p.setSessionTimeout(sessionTimeout);

		JobExecutionResult result = executor.executePlan(p);

		this.lastJobExecutionResult = result;
		return result;
	}

	//......
}
  • Выполнение LocalEnvironment вызывает executePlan LocalExecutor.

LocalExecutor

flink-clients_2.11-1.7.1-sources.jar!/org/apache/flink/client/LocalExecutor.java

public class LocalExecutor extends PlanExecutor {
	
	//......

	@Override
	public JobExecutionResult executePlan(Plan plan) throws Exception {
		if (plan == null) {
			throw new IllegalArgumentException("The plan may not be null.");
		}

		synchronized (this.lock) {

			// check if we start a session dedicated for this execution
			final boolean shutDownAtEnd;

			if (jobExecutorService == null) {
				shutDownAtEnd = true;

				// configure the number of local slots equal to the parallelism of the local plan
				if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) {
					int maxParallelism = plan.getMaximumParallelism();
					if (maxParallelism > 0) {
						this.taskManagerNumSlots = maxParallelism;
					}
				}

				// start the cluster for us
				start();
			}
			else {
				// we use the existing session
				shutDownAtEnd = false;
			}

			try {
				// TODO: Set job's default parallelism to max number of slots
				final int slotsPerTaskManager = jobExecutorServiceConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots);
				final int numTaskManagers = jobExecutorServiceConfiguration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
				plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);

				Optimizer pc = new Optimizer(new DataStatistics(), jobExecutorServiceConfiguration);
				OptimizedPlan op = pc.compile(plan);

				JobGraphGenerator jgg = new JobGraphGenerator(jobExecutorServiceConfiguration);
				JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId());

				return jobExecutorService.executeJobBlocking(jobGraph);
			}
			finally {
				if (shutDownAtEnd) {
					stop();
				}
			}
		}
	}

	//......
}
  • Метод executePlan LocalExecutor также устанавливает defaultParallelism для плана в соответствии с slotsPerTaskManager и numTaskManagers.

RemoteEnvironment

flink-java-1.7.1-sources.jar!/org/apache/flink/api/java/RemoteEnvironment.java

@Public
public class RemoteEnvironment extends ExecutionEnvironment {

	//......

	public JobExecutionResult execute(String jobName) throws Exception {
		PlanExecutor executor = getExecutor();

		Plan p = createProgramPlan(jobName);

		// Session management is disabled, revert this commit to enable
		//p.setJobId(jobID);
		//p.setSessionTimeout(sessionTimeout);

		JobExecutionResult result = executor.executePlan(p);

		this.lastJobExecutionResult = result;
		return result;
	}

	//......
}
  • Выполнение RemoteEnvironment вызывает план выполнения RemoteExecutor.

RemoteExecutor

flink-clients_2.11-1.7.1-sources.jar!/org/apache/flink/client/RemoteExecutor.java

public class RemoteExecutor extends PlanExecutor {

	private final Object lock = new Object();

	private final List<URL> jarFiles;

	private final List<URL> globalClasspaths;

	private final Configuration clientConfiguration;

	private ClusterClient<?> client;

	//......

	@Override
	public JobExecutionResult executePlan(Plan plan) throws Exception {
		if (plan == null) {
			throw new IllegalArgumentException("The plan may not be null.");
		}

		JobWithJars p = new JobWithJars(plan, this.jarFiles, this.globalClasspaths);
		return executePlanWithJars(p);
	}

	public JobExecutionResult executePlanWithJars(JobWithJars program) throws Exception {
		if (program == null) {
			throw new IllegalArgumentException("The job may not be null.");
		}

		synchronized (this.lock) {
			// check if we start a session dedicated for this execution
			final boolean shutDownAtEnd;

			if (client == null) {
				shutDownAtEnd = true;
				// start the executor for us
				start();
			}
			else {
				// we use the existing session
				shutDownAtEnd = false;
			}

			try {
				return client.run(program, defaultParallelism).getJobExecutionResult();
			}
			finally {
				if (shutDownAtEnd) {
					stop();
				}
			}
		}
	}

	//......
}
  • executePlan RemoteExecutor вызывает метод executePlanWithJars, который вызывает запуск ClusterClient и указывает в параметре defaultParallelism

ClusterClient

flink-clients_2.11-1.7.1-sources.jar!/org/apache/flink/client/program/ClusterClient.java

public abstract class ClusterClient<T> {
	//......

	public JobSubmissionResult run(JobWithJars program, int parallelism) throws ProgramInvocationException {
		return run(program, parallelism, SavepointRestoreSettings.none());
	}

	public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, SavepointRestoreSettings savepointSettings)
			throws CompilerException, ProgramInvocationException {
		ClassLoader classLoader = jobWithJars.getUserCodeClassLoader();
		if (classLoader == null) {
			throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
		}

		OptimizedPlan optPlan = getOptimizedPlan(compiler, jobWithJars, parallelism);
		return run(optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), classLoader, savepointSettings);
	}

	private static OptimizedPlan getOptimizedPlan(Optimizer compiler, JobWithJars prog, int parallelism)
			throws CompilerException, ProgramInvocationException {
		return getOptimizedPlan(compiler, prog.getPlan(), parallelism);
	}

	public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int parallelism) throws CompilerException {
		Logger log = LoggerFactory.getLogger(ClusterClient.class);

		if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
			log.debug("Changing plan default parallelism from {} to {}", p.getDefaultParallelism(), parallelism);
			p.setDefaultParallelism(parallelism);
		}
		log.debug("Set parallelism {}, plan default parallelism {}", parallelism, p.getDefaultParallelism());

		return compiler.compile(p);
	}

	//......
}
  • Параллелизм в методе запуска ClusterClient будет применяться к Plan, когда parallelism > 0 и p.getDefaultParallelism()

DataStreamSource

flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/datastream/DataStreamSource.java

@Public
public class DataStreamSource<T> extends SingleOutputStreamOperator<T> {

	boolean isParallel;

	public DataStreamSource(StreamExecutionEnvironment environment,
			TypeInformation<T> outTypeInfo, StreamSource<T, ?> operator,
			boolean isParallel, String sourceName) {
		super(environment, new SourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism()));

		this.isParallel = isParallel;
		if (!isParallel) {
			setParallelism(1);
		}
	}

	public DataStreamSource(SingleOutputStreamOperator<T> operator) {
		super(operator.environment, operator.getTransformation());
		this.isParallel = true;
	}

	@Override
	public DataStreamSource<T> setParallelism(int parallelism) {
		if (parallelism != 1 && !isParallel) {
			throw new IllegalArgumentException("Source: " + transformation.getId() + " is not a parallel source");
		} else {
			super.setParallelism(parallelism);
			return this;
		}
	}
}
  • DataStreamSource наследует SingleOutputStreamOperator, который предоставляет метод setParallelism, и, наконец, вызывает setParallelism родительского класса SingleOutputStreamOperator.

SingleOutputStreamOperator

flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java

@Public
public class SingleOutputStreamOperator<T> extends DataStream<T> {
	//......

	/**
	 * Sets the parallelism for this operator.
	 *
	 * @param parallelism
	 *            The parallelism for this operator.
	 * @return The operator with set parallelism.
	 */
	public SingleOutputStreamOperator<T> setParallelism(int parallelism) {
		Preconditions.checkArgument(canBeParallel() || parallelism == 1,
				"The parallelism of non parallel operator must be 1.");

		transformation.setParallelism(parallelism);

		return this;
	}

	//......
}
  • Наконец, setParallelism SingleOutputStreamOperator применяется к StreamTransformation.

DataStreamSink

flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/datastream/DataStreamSink.java

@Public
public class DataStreamSink<T> {

	private final SinkTransformation<T> transformation;

	//......

	/**
	 * Sets the parallelism for this sink. The degree must be higher than zero.
	 *
	 * @param parallelism The parallelism for this sink.
	 * @return The sink with set parallelism.
	 */
	public DataStreamSink<T> setParallelism(int parallelism) {
		transformation.setParallelism(parallelism);
		return this;
	}

	//......
}
  • DataStreamSink предоставляет метод setParallelism и, наконец, воздействует на SinkTransformation.

резюме

  • Flink может установить несколько уровней параллелизма, включая уровень оператора, уровень среды выполнения, уровень клиента и уровень системы.
  • Укажите параллелизм по умолчанию на системном уровне для всех сред выполнения через элемент конфигурации parallelism.default в flink-conf.yaml; в ExecutionEnvironment вы можете установить параллелизм по умолчанию для операторов, источников данных и приемников данных через setParallelism; если операторы, данные источники, если у приемников данных есть собственный набор параллелизма, они переопределяют параллелизм, установленный ExecutionEnvironment.
  • Метод setParallelism, предоставляемый ExecutionEnvironment, используется для указания параллелизма для ExecutionConfig (如果使用CLI client,可以在命令行调用是用-p来指定,或者Java/Scala调用时在Client.run的参数中指定parallelism;LocalEnvironment及RemoteEnvironment设置的parallelism最后都是设置到Plan中); DataStreamSource наследует SingleOutputStreamOperator, который предоставляет метод setParallelism, который, наконец, вызывает setParallelism родительского класса SingleOutputStreamOperator; setParallelism SingleOutputStreamOperator, наконец, действует на StreamTransformation; DataStreamSink предоставляет метод setParallelism, который, наконец, действует на SinkTransformation

doc