последовательность
В этой статье в основном изучается SourceFunction flink.
пример
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStreamSource = env.addSource(new RandomWordSource());
dataStreamSource.map(new UpperCaseMapFunc()).print();
env.execute("sourceFunctionDemo");
- Здесь пользовательская функция SourceFunction добавляется с помощью метода addSource.
SourceFunction
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.java
/**
* Base interface for all stream data sources in Flink. The contract of a stream source
* is the following: When the source should start emitting elements, the {@link #run} method
* is called with a {@link SourceContext} that can be used for emitting elements.
* The run method can run for as long as necessary. The source must, however, react to an
* invocation of {@link #cancel()} by breaking out of its main loop.
*
* <h3>CheckpointedFunction Sources</h3>
*
* <p>Sources that also implement the {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction}
* interface must ensure that state checkpointing, updating of internal state and emission of
* elements are not done concurrently. This is achieved by using the provided checkpointing lock
* object to protect update of state and emission of elements in a synchronized block.
*
* <p>This is the basic pattern one should follow when implementing a checkpointed source:
*
* <pre>{@code
* public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction {
* private long count = 0L;
* private volatile boolean isRunning = true;
*
* private transient ListState<Long> checkpointedCount;
*
* public void run(SourceContext<T> ctx) {
* while (isRunning && count < 1000) {
* // this synchronized block ensures that state checkpointing,
* // internal state updates and emission of elements are an atomic operation
* synchronized (ctx.getCheckpointLock()) {
* ctx.collect(count);
* count++;
* }
* }
* }
*
* public void cancel() {
* isRunning = false;
* }
*
* public void initializeState(FunctionInitializationContext context) {
* this.checkpointedCount = context
* .getOperatorStateStore()
* .getListState(new ListStateDescriptor<>("count", Long.class));
*
* if (context.isRestored()) {
* for (Long count : this.checkpointedCount.get()) {
* this.count = count;
* }
* }
* }
*
* public void snapshotState(FunctionSnapshotContext context) {
* this.checkpointedCount.clear();
* this.checkpointedCount.add(count);
* }
* }
* }</pre>
*
*
* <h3>Timestamps and watermarks:</h3>
* Sources may assign timestamps to elements and may manually emit watermarks.
* However, these are only interpreted if the streaming program runs on
* {@link TimeCharacteristic#EventTime}. On other time characteristics
* ({@link TimeCharacteristic#IngestionTime} and {@link TimeCharacteristic#ProcessingTime}),
* the watermarks from the source function are ignored.
*
* <h3>Gracefully Stopping Functions</h3>
* Functions may additionally implement the {@link org.apache.flink.api.common.functions.StoppableFunction}
* interface. "Stopping" a function, in contrast to "canceling" means a graceful exit that leaves the
* state and the emitted elements in a consistent state.
*
* <p>When a source is stopped, the executing thread is not interrupted, but expected to leave the
* {@link #run(SourceContext)} method in reasonable time on its own, preserving the atomicity
* of state updates and element emission.
*
* @param <T> The type of the elements produced by this source.
*
* @see org.apache.flink.api.common.functions.StoppableFunction
* @see org.apache.flink.streaming.api.TimeCharacteristic
*/
@Public
public interface SourceFunction<T> extends Function, Serializable {
/**
* Starts the source. Implementations can use the {@link SourceContext} emit
* elements.
*
* <p>Sources that implement {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction}
* must lock on the checkpoint lock (using a synchronized block) before updating internal
* state and emitting elements, to make both an atomic operation:
*
* <pre>{@code
* public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction {
* private long count = 0L;
* private volatile boolean isRunning = true;
*
* private transient ListState<Long> checkpointedCount;
*
* public void run(SourceContext<T> ctx) {
* while (isRunning && count < 1000) {
* // this synchronized block ensures that state checkpointing,
* // internal state updates and emission of elements are an atomic operation
* synchronized (ctx.getCheckpointLock()) {
* ctx.collect(count);
* count++;
* }
* }
* }
*
* public void cancel() {
* isRunning = false;
* }
*
* public void initializeState(FunctionInitializationContext context) {
* this.checkpointedCount = context
* .getOperatorStateStore()
* .getListState(new ListStateDescriptor<>("count", Long.class));
*
* if (context.isRestored()) {
* for (Long count : this.checkpointedCount.get()) {
* this.count = count;
* }
* }
* }
*
* public void snapshotState(FunctionSnapshotContext context) {
* this.checkpointedCount.clear();
* this.checkpointedCount.add(count);
* }
* }
* }</pre>
*
* @param ctx The context to emit elements to and for accessing locks.
*/
void run(SourceContext<T> ctx) throws Exception;
/**
* Cancels the source. Most sources will have a while loop inside the
* {@link #run(SourceContext)} method. The implementation needs to ensure that the
* source will break out of that loop after this method is called.
*
* <p>A typical pattern is to have an {@code "volatile boolean isRunning"} flag that is set to
* {@code false} in this method. That flag is checked in the loop condition.
*
* <p>When a source is canceled, the executing thread will also be interrupted
* (via {@link Thread#interrupt()}). The interruption happens strictly after this
* method has been called, so any interruption handler can rely on the fact that
* this method has completed. It is good practice to make any flags altered by
* this method "volatile", in order to guarantee the visibility of the effects of
* this method to any interruption handler.
*/
void cancel();
// ------------------------------------------------------------------------
// source context
// ------------------------------------------------------------------------
/**
* Interface that source functions use to emit elements, and possibly watermarks.
*
* @param <T> The type of the elements produced by the source.
*/
@Public // Interface might be extended in the future with additional methods.
interface SourceContext<T> {
//......
}
}
- Источники данных по потоковым потоком исходной записи по существу являются интерфейсами, где первый и метод прогона определен метод отмены, в то время как интерфейс определил SourceContext
SourceContext
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.java
/**
* Interface that source functions use to emit elements, and possibly watermarks.
*
* @param <T> The type of the elements produced by the source.
*/
@Public // Interface might be extended in the future with additional methods.
interface SourceContext<T> {
/**
* Emits one element from the source, without attaching a timestamp. In most cases,
* this is the default way of emitting elements.
*
* <p>The timestamp that the element will get assigned depends on the time characteristic of
* the streaming program:
* <ul>
* <li>On {@link TimeCharacteristic#ProcessingTime}, the element has no timestamp.</li>
* <li>On {@link TimeCharacteristic#IngestionTime}, the element gets the system's
* current time as the timestamp.</li>
* <li>On {@link TimeCharacteristic#EventTime}, the element will have no timestamp initially.
* It needs to get a timestamp (via a {@link TimestampAssigner}) before any time-dependent
* operation (like time windows).</li>
* </ul>
*
* @param element The element to emit
*/
void collect(T element);
/**
* Emits one element from the source, and attaches the given timestamp. This method
* is relevant for programs using {@link TimeCharacteristic#EventTime}, where the
* sources assign timestamps themselves, rather than relying on a {@link TimestampAssigner}
* on the stream.
*
* <p>On certain time characteristics, this timestamp may be ignored or overwritten.
* This allows programs to switch between the different time characteristics and behaviors
* without changing the code of the source functions.
* <ul>
* <li>On {@link TimeCharacteristic#ProcessingTime}, the timestamp will be ignored,
* because processing time never works with element timestamps.</li>
* <li>On {@link TimeCharacteristic#IngestionTime}, the timestamp is overwritten with the
* system's current time, to realize proper ingestion time semantics.</li>
* <li>On {@link TimeCharacteristic#EventTime}, the timestamp will be used.</li>
* </ul>
*
* @param element The element to emit
* @param timestamp The timestamp in milliseconds since the Epoch
*/
@PublicEvolving
void collectWithTimestamp(T element, long timestamp);
/**
* Emits the given {@link Watermark}. A Watermark of value {@code t} declares that no
* elements with a timestamp {@code t' <= t} will occur any more. If further such
* elements will be emitted, those elements are considered <i>late</i>.
*
* <p>This method is only relevant when running on {@link TimeCharacteristic#EventTime}.
* On {@link TimeCharacteristic#ProcessingTime},Watermarks will be ignored. On
* {@link TimeCharacteristic#IngestionTime}, the Watermarks will be replaced by the
* automatic ingestion time watermarks.
*
* @param mark The Watermark to emit
*/
@PublicEvolving
void emitWatermark(Watermark mark);
/**
* Marks the source to be temporarily idle. This tells the system that this source will
* temporarily stop emitting records and watermarks for an indefinite amount of time. This
* is only relevant when running on {@link TimeCharacteristic#IngestionTime} and
* {@link TimeCharacteristic#EventTime}, allowing downstream tasks to advance their
* watermarks without the need to wait for watermarks from this source while it is idle.
*
* <p>Source functions should make a best effort to call this method as soon as they
* acknowledge themselves to be idle. The system will consider the source to resume activity
* again once {@link SourceContext#collect(T)}, {@link SourceContext#collectWithTimestamp(T, long)},
* or {@link SourceContext#emitWatermark(Watermark)} is called to emit elements or watermarks from the source.
*/
@PublicEvolving
void markAsTemporarilyIdle();
/**
* Returns the checkpoint lock. Please refer to the class-level comment in
* {@link SourceFunction} for details about how to write a consistent checkpointed
* source.
*
* @return The object to use as the lock
*/
Object getCheckpointLock();
/**
* This method is called by the system to shut down the context.
*/
void close();
}
- SourceContext в основном определяет интерфейс источника данных для передачи данных, вот метод collect (
如果数据本身没有时间,则在使用TimeCharacteristic.EventTime的时候,可以使用TimestampAssigner在进行依赖时间的相关操作时指定timestamp;如果是配合TimeCharacteristic.IngestionTime,则无需指定,系统会自动生成timestamp
); в дополнение к методу collect есть еще метод collectWithTimestamp для передачи данных и указания временной метки (配合TimeCharacteristic.EventTime使用
) - Кроме того, также определен метод emitWatermark. При обработке данных не по порядку учитываются только данные во временном диапазоне. Это допустимо только при использовании с TimeCharacteristic.EventTime; если это TimeCharacteristic.ProcessingTime, водяной знак будет игнорироваться. ; если это TimeCharacteristic .IngestionTime, водяной знак будет заменен автоматически сгенерированным водяным знаком времени приема
- Здесь также определяется метод markAsTemporariesIdle, который используется, чтобы сообщить системе, что текущий источник приостановит передачу данных на определенный период времени.Это допустимо только при совместном использовании TimeCharacteristic.IngestionTime или TimeCharacteristic.EventTime; когда SourceContext.collect( T) или SourceContext.collectWithTimestamp( T, long) или SourceContext.emitWatermark(Watermark), система будет думать, что источник восстановлен, чтобы продолжить производство данных
- Здесь также определяется метод getCheckpointLock, который используется для возврата блокировки чекпоинта, что удобно для обработки источником логики, связанной с чекпойнтом.
- Метод close в основном вызывается системой для закрытия ресурсов, связанных с контекстом.
Task.run(上游
)
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java
/**
* The Task represents one execution of a parallel subtask on a TaskManager.
* A Task wraps a Flink operator (which may be a user function) and
* runs it, providing all services necessary for example to consume input data,
* produce its results (intermediate result partitions) and communicate
* with the JobManager.
*
* <p>The Flink operators (implemented as subclasses of
* {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks.
* The task connects those to the network stack and actor messages, and tracks the state
* of the execution and handles exceptions.
*
* <p>Tasks have no knowledge about how they relate to other tasks, or whether they
* are the first attempt to execute the task, or a repeated attempt. All of that
* is only known to the JobManager. All the task knows are its own runnable code,
* the task's configuration, and the IDs of the intermediate results to consume and
* produce (if any).
*
* <p>Each Task is run by one dedicated thread.
*/
public class Task implements Runnable, TaskActions, CheckpointListener {
//......
/**
* The core work method that bootstraps the task and executes its code.
*/
@Override
public void run() {
//......
// now load and instantiate the task's invokable code
invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
// ----------------------------------------------------------------
// actual task core work
// ----------------------------------------------------------------
// we must make strictly sure that the invokable is accessible to the cancel() call
// by the time we switched to running.
this.invokable = invokable;
// switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
throw new CancelTaskException();
}
// notify everyone that we switched to running
notifyObservers(ExecutionState.RUNNING, null);
taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
// make sure the user code classloader is accessible thread-locally
executingThread.setContextClassLoader(userCodeClassLoader);
// run the invokable
invokable.invoke();
//......
}
}
- Метод запуска Задачи вызовет invokeable.invoke(), где вызываемым является StreamTask.
StreamTask.invoke
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java
/**
* Base class for all streaming tasks. A task is the unit of local processing that is deployed
* and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form
* the Task's operator chain. Operators that are chained together execute synchronously in the
* same thread and hence on the same stream partition. A common case for these chains
* are successive map/flatmap/filter tasks.
*
* <p>The task chain contains one "head" operator and multiple chained operators.
* The StreamTask is specialized for the type of the head operator: one-input and two-input tasks,
* as well as for sources, iteration heads and iteration tails.
*
* <p>The Task class deals with the setup of the streams read by the head operator, and the streams
* produced by the operators at the ends of the operator chain. Note that the chain may fork and
* thus have multiple ends.
*
* <p>The life cycle of the task is set up as follows:
* <pre>{@code
* -- setInitialState -> provides state of all operators in the chain
*
* -- invoke()
* |
* +----> Create basic utils (config, etc) and load the chain of operators
* +----> operators.setup()
* +----> task specific init()
* +----> initialize-operator-states()
* +----> open-operators()
* +----> run()
* +----> close-operators()
* +----> dispose-operators()
* +----> common cleanup
* +----> task specific cleanup()
* }</pre>
*
* <p>The {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a
* {@code StreamOperator} must be synchronized on this lock object to ensure that no methods
* are called concurrently.
*
* @param <OUT>
* @param <OP>
*/
@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
extends AbstractInvokable
implements AsyncExceptionHandler {
//......
@Override
public final void invoke() throws Exception {
boolean disposed = false;
try {
//......
// let the task do its work
isRunning = true;
run();
// if this left the run() method cleanly despite the fact that this was canceled,
// make sure the "clean shutdown" is not attempted
if (canceled) {
throw new CancelTaskException();
}
LOG.debug("Finished task {}", getName());
//......
}
finally {
// clean up everything we initialized
isRunning = false;
//......
}
}
}
- Метод вызова StreamTask вызывает метод запуска подкласса, где подклассом является SourceStreamTask.
SourceStreamTask.run
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@Override
protected void run() throws Exception {
headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
}
- Метод запуска SourceStreamTask в основном вызывает метод запуска headOperator, где headOperator — это SourceStream.
SourceStream.run
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java
public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception {
run(lockingObject, streamStatusMaintainer, output);
}
public void run(final Object lockingObject,
final StreamStatusMaintainer streamStatusMaintainer,
final Output<StreamRecord<OUT>> collector) throws Exception {
final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured()
? getExecutionConfig().getLatencyTrackingInterval()
: configuration.getLong(MetricOptions.LATENCY_INTERVAL);
LatencyMarksEmitter<OUT> latencyEmitter = null;
if (latencyTrackingInterval > 0) {
latencyEmitter = new LatencyMarksEmitter<>(
getProcessingTimeService(),
collector,
latencyTrackingInterval,
this.getOperatorID(),
getRuntimeContext().getIndexOfThisSubtask());
}
final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
this.ctx = StreamSourceContexts.getSourceContext(
timeCharacteristic,
getProcessingTimeService(),
lockingObject,
streamStatusMaintainer,
collector,
watermarkInterval,
-1);
try {
userFunction.run(ctx);
// if we get here, then the user function either exited after being done (finite source)
// or the function was canceled or stopped. For the finite source case, we should emit
// a final watermark that indicates that we reached the end of event-time
if (!isCanceledOrStopped()) {
ctx.emitWatermark(Watermark.MAX_WATERMARK);
}
} finally {
// make sure that the context is closed in any case
ctx.close();
if (latencyEmitter != null) {
latencyEmitter.close();
}
}
}
- Метод запуска SourceStream здесь сначала создает SourceFunction.SourceContext через StreamSourceContexts.getSourceContext, а затем вызывает метод запуска userFunction, где userFunction — это RandomWordSource, то есть определяемая пользователем функция SourceFunction(
这里要注意在调用userFunction.run(ctx)之前,如果latencyTrackingInterval大于0,还创建了LatencyMarksEmitter
)
RandomWordSource.run
public class RandomWordSource implements SourceFunction<String> {
private static final Logger LOGGER = LoggerFactory.getLogger(RandomWordSource.class);
private volatile boolean isRunning = true;
private static final String[] words = new String[]{"The", "brown", "fox", "quick", "jump", "sucky", "5dolla"};
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (isRunning) {
Thread.sleep(300);
int rnd = (int) (Math.random() * 10 % words.length);
LOGGER.info("emit word: {}", words[rnd]);
ctx.collect(words[rnd]);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
- Метод run RandomWordSource всегда будет выдавать данные в цикле.
StreamSource.LatencyMarksEmitter
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java
private static class LatencyMarksEmitter<OUT> {
private final ScheduledFuture<?> latencyMarkTimer;
public LatencyMarksEmitter(
final ProcessingTimeService processingTimeService,
final Output<StreamRecord<OUT>> output,
long latencyTrackingInterval,
final OperatorID operatorId,
final int subtaskIndex) {
latencyMarkTimer = processingTimeService.scheduleAtFixedRate(
new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long timestamp) throws Exception {
try {
// ProcessingTimeService callbacks are executed under the checkpointing lock
output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex));
} catch (Throwable t) {
// we catch the Throwables here so that we don't trigger the processing
// timer services async exception handler
LOG.warn("Error while emitting latency marker.", t);
}
}
},
0L,
latencyTrackingInterval);
}
public void close() {
latencyMarkTimer.cancel(true);
}
}
- LatencyMarksEmitter создается в методе запуска StreamSource перед вызовом метода запуска userFunction (
如果latencyTrackingInterval>0的话
), где latencyTrackingInterval сначала вызывает getExecutionConfig().isLatencyTrackingConfigured(), чтобы определить, настроено ли это значение в executeConfig. Если есть конфигурация, используйте значение, возвращаемое getExecutionConfig().getLatencyTrackingInterval(). Если конфигурации нет, используйте configuration.getLong(MetricOptions.LATENCY_INTERVAL) возвращает значение, которое по умолчанию равно 2000L(这里使用的是后者的配置,即为2000
) - Конструктор LatencyMarksEmitter вызывает метод processingTimeService.scheduleAtFixedRate для регистрации задачи синхронизации с фиксированной скоростью, а интервал планирования равен latencyTrackingInterval.
- Содержимое обработки задачи синхронизации находится в методе onProcessTime ProcessingTimeCallback, который вызывает output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex)) для отправки LatencyMarker; processingTimeService здесь — SystemProcessingTimeService; вывод здесь — AbstractStreamOperator.CountingOutput
SystemProcessingTimeService.scheduleAtFixedRate
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@Override
public ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period) {
long nextTimestamp = getCurrentProcessingTime() + initialDelay;
// we directly try to register the timer and only react to the status on exception
// that way we save unnecessary volatile accesses for each timer
try {
return timerService.scheduleAtFixedRate(
new RepeatedTriggerTask(status, task, checkpointLock, callback, nextTimestamp, period),
initialDelay,
period,
TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
final int status = this.status.get();
if (status == STATUS_QUIESCED) {
return new NeverCompleteFuture(initialDelay);
}
else if (status == STATUS_SHUTDOWN) {
throw new IllegalStateException("Timer service is shut down");
}
else {
// something else happened, so propagate the exception
throw e;
}
}
}
@Override
public long getCurrentProcessingTime() {
return System.currentTimeMillis();
}
- Метод scheduleAtFixedRate класса SystemProcessingTimeService на самом деле выполняется методом scheduleAtFixedRate объекта timerService.Здесь timerService — это ScheduledThreadPoolExecutor, его corePoolSize равен 1, а затем запланированная задача — RepeatedTriggerTask.
RepeatedTriggerTask
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
/**
* Internal task which is repeatedly called by the processing time service.
*/
private static final class RepeatedTriggerTask implements Runnable {
private final AtomicInteger serviceStatus;
private final Object lock;
private final ProcessingTimeCallback target;
private final long period;
private final AsyncExceptionHandler exceptionHandler;
private long nextTimestamp;
private RepeatedTriggerTask(
final AtomicInteger serviceStatus,
final AsyncExceptionHandler exceptionHandler,
final Object lock,
final ProcessingTimeCallback target,
final long nextTimestamp,
final long period) {
this.serviceStatus = Preconditions.checkNotNull(serviceStatus);
this.lock = Preconditions.checkNotNull(lock);
this.target = Preconditions.checkNotNull(target);
this.period = period;
this.exceptionHandler = Preconditions.checkNotNull(exceptionHandler);
this.nextTimestamp = nextTimestamp;
}
@Override
public void run() {
synchronized (lock) {
try {
if (serviceStatus.get() == STATUS_ALIVE) {
target.onProcessingTime(nextTimestamp);
}
nextTimestamp += period;
} catch (Throwable t) {
TimerException asyncException = new TimerException(t);
exceptionHandler.handleAsyncException("Caught exception while processing repeated timer task.", asyncException);
}
}
}
}
- RepeatedTriggerTask вызовет onProcessingTime of ProcessingTimeCallback, когда serviceStatus имеет значение STATUS_ALIVE; nextTimestamp здесь первоначально рассчитывается на основе getCurrentProcessingTime() + initialDelay, а затем период непрерывно накапливается
AbstractStreamOperator.CountingOutput.emitLatencyMarker
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
/**
* Wrapping {@link Output} that updates metrics on the number of emitted elements.
*/
public static class CountingOutput<OUT> implements Output<StreamRecord<OUT>> {
private final Output<StreamRecord<OUT>> output;
private final Counter numRecordsOut;
public CountingOutput(Output<StreamRecord<OUT>> output, Counter counter) {
this.output = output;
this.numRecordsOut = counter;
}
@Override
public void emitWatermark(Watermark mark) {
output.emitWatermark(mark);
}
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
output.emitLatencyMarker(latencyMarker);
}
@Override
public void collect(StreamRecord<OUT> record) {
numRecordsOut.inc();
output.collect(record);
}
@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
numRecordsOut.inc();
output.collect(outputTag, record);
}
@Override
public void close() {
output.close();
}
}
- То, что он на самом деле обертывает, - это RecordWriterOutput
RecordWriterOutput.emitLatencyMarker
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
/**
* Implementation of {@link Output} that sends data using a {@link RecordWriter}.
*/
@Internal
public class RecordWriterOutput<OUT> implements OperatorChain.WatermarkGaugeExposingOutput<StreamRecord<OUT>> {
private StreamRecordWriter<SerializationDelegate<StreamElement>> recordWriter;
private SerializationDelegate<StreamElement> serializationDelegate;
//......
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
serializationDelegate.setInstance(latencyMarker);
try {
recordWriter.randomEmit(serializationDelegate);
}
catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}
- В данном случае emitLatencyMarker в основном вызывает randomEmit(
它实际上是通过父类RecordWriter来发射
), чтобы запустить LatencyMarker
RecordWriter
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
/**
* This is used to send LatencyMarks to a random target channel.
*/
public void randomEmit(T record) throws IOException, InterruptedException {
sendToTarget(record, rng.nextInt(numChannels));
}
private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
RecordSerializer<T> serializer = serializers[targetChannel];
SerializationResult result = serializer.addRecord(record);
while (result.isFullBuffer()) {
if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {
// If this was a full record, we are done. Not breaking
// out of the loop at this point will lead to another
// buffer request before breaking out (that would not be
// a problem per se, but it can lead to stalls in the
// pipeline).
if (result.isFullRecord()) {
break;
}
}
BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);
result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);
}
checkState(!serializer.hasSerializedData(), "All data should be written at once");
if (flushAlways) {
targetPartition.flush(targetChannel);
}
}
- randomEmit RecordWriter состоит в том, чтобы случайным образом выбрать targetChannel, а затем отправить его.
Task.run(下游
)
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java
/**
* The Task represents one execution of a parallel subtask on a TaskManager.
* A Task wraps a Flink operator (which may be a user function) and
* runs it, providing all services necessary for example to consume input data,
* produce its results (intermediate result partitions) and communicate
* with the JobManager.
*
* <p>The Flink operators (implemented as subclasses of
* {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks.
* The task connects those to the network stack and actor messages, and tracks the state
* of the execution and handles exceptions.
*
* <p>Tasks have no knowledge about how they relate to other tasks, or whether they
* are the first attempt to execute the task, or a repeated attempt. All of that
* is only known to the JobManager. All the task knows are its own runnable code,
* the task's configuration, and the IDs of the intermediate results to consume and
* produce (if any).
*
* <p>Each Task is run by one dedicated thread.
*/
public class Task implements Runnable, TaskActions, CheckpointListener {
//......
/**
* The core work method that bootstraps the task and executes its code.
*/
@Override
public void run() {
//......
// now load and instantiate the task's invokable code
invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
// ----------------------------------------------------------------
// actual task core work
// ----------------------------------------------------------------
// we must make strictly sure that the invokable is accessible to the cancel() call
// by the time we switched to running.
this.invokable = invokable;
// switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
throw new CancelTaskException();
}
// notify everyone that we switched to running
notifyObservers(ExecutionState.RUNNING, null);
taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
// make sure the user code classloader is accessible thread-locally
executingThread.setContextClassLoader(userCodeClassLoader);
// run the invokable
invokable.invoke();
//......
}
}
- Метод запуска нижестоящей задачи вызовет invokeable.invoke(), где вызываемым является OneInputStreamTask.
OneInputStreamTask
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@Override
protected void run() throws Exception {
// cache processor reference on the stack, to make the code more JIT friendly
final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
while (running && inputProcessor.processInput()) {
// all the work happens in the "processInput" method
}
}
- Метод запуска Задачи будет вызывать метод вызова StreamTask, а метод вызова вызовет метод запуска OneInputStreamTask. Это в основном для непрерывного вызова inputProcessor.processInput() в цикле; inputProcessor здесь StreamInputProcessor
StreamInputProcessor
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
public boolean processInput() throws Exception {
if (isFinished) {
return false;
}
if (numRecordsIn == null) {
try {
numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
} catch (Exception e) {
LOG.warn("An exception occurred during the metrics setup.", e);
numRecordsIn = new SimpleCounter();
}
}
while (true) {
if (currentRecordDeserializer != null) {
DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
if (result.isBufferConsumed()) {
currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
currentRecordDeserializer = null;
}
if (result.isFullRecord()) {
StreamElement recordOrMark = deserializationDelegate.getInstance();
if (recordOrMark.isWatermark()) {
// handle watermark
statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
continue;
} else if (recordOrMark.isStreamStatus()) {
// handle stream status
statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
continue;
} else if (recordOrMark.isLatencyMarker()) {
// handle latency marker
synchronized (lock) {
streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
}
continue;
} else {
// now we can do the actual processing
StreamRecord<IN> record = recordOrMark.asRecord();
synchronized (lock) {
numRecordsIn.inc();
streamOperator.setKeyContextElement1(record);
streamOperator.processElement(record);
}
return true;
}
}
}
final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
if (bufferOrEvent != null) {
if (bufferOrEvent.isBuffer()) {
currentChannel = bufferOrEvent.getChannelIndex();
currentRecordDeserializer = recordDeserializers[currentChannel];
currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
}
else {
// Event received
final AbstractEvent event = bufferOrEvent.getEvent();
if (event.getClass() != EndOfPartitionEvent.class) {
throw new IOException("Unexpected event: " + event);
}
}
}
else {
isFinished = true;
if (!barrierHandler.isEmpty()) {
throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
}
return false;
}
}
}
- Метод processInput сначала вызывает currentRecordDeserializer.getNextRecord(deserializationDelegate) для получения nextRecord, а затем обрабатывает его только тогда, когда result.isFullRecord()
- При обработке он будет обрабатываться по-разному в соответствии с различными типами StreamElement, в основном разделенными на водяной знак, streamStatus, latencyMarker и обычные данные.
- Если это обычные данные, вызовите streamOperator.processElement(record), где streamOperator — это StreamMap.
StreamMap.processElement
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamMap.java
/**
* A {@link StreamOperator} for executing {@link MapFunction MapFunctions}.
*/
@Internal
public class StreamMap<IN, OUT>
extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT> {
private static final long serialVersionUID = 1L;
public StreamMap(MapFunction<IN, OUT> mapper) {
super(mapper);
chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
output.collect(element.replace(userFunction.map(element.getValue())));
}
}
- Здесь userFunction.map(element.getValue()) вызывается для выполнения операции карты, где userFunction — это UpperCaseMapFunc.
резюме
- SourceFunction — это базовый интерфейс источников данных потока flink. Этот заголовок определяет метод запуска и метод отмены, а также интерфейс SourceContext. Интерфейс SourceContext в основном определяет методы collect и collectWithTimestamp для передачи данных, а также предоставляет emitWatermark для передачи водяного знака.
- Для передачи данных используется следующая последовательность вызова: Task.run --> StreamTask.invoke --> SourceStreamTask.run --> SourceStream.run --> userFunction.run(ctx)(
RandomWordSource.run
); в SourceStream.run перед вызовом userFunction.run он определит, больше ли latencyTrackingInterval 0. Если он больше 0, будет создан LatencyMarksEmitter, который регистрирует задачу синхронизации для периодического обратного вызова метода onProcessingTime ProcessingTimeCallback. для запуска output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex)) - Это эквивалентно тому, что нисходящий поток получит пользовательские данные, отправленные userFunction.run, а также получит LatencyMarker, отправленный запланированной задачей; последовательность вызова нисходящего потока: Task.run --> StreamTask.invoke --> OneInputStreamTask.run - -> StreamInputProcessor.processInput --> statusWatermarkValve.inputWatermark или statusWatermarkValve.inputStreamStatus, или streamOperator.processLatencyMarker, или streamOperator.processElement; вы можете видеть, что StreamInputProcessor.processInput будет выполнять различную обработку в соответствии с различными типами данных. Если это пользовательские данные, вызовите streamOperator .processElement, который является StreamMap.processElement --> userFunction.map(
UpperCaseMapFunc.map
)