последовательность
В этой статье в основном изучается конфигурация log.file для flink.
log4j.properties
flink-release-1.6.2/flink-dist/src/main/flink-bin/conf/log4j.properties
# This affects logging for both user code and Flink
log4j.rootLogger=INFO, file
# Uncomment this if you want to _only_ change Flink's logging
#log4j.logger.org.apache.flink=INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO
# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
- Здесь log4j.appender.file.file настраивается с использованием системного свойства log.file.
MiniCluster
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/minicluster/MiniCluster.java
/**
* Starts the mini cluster, based on the configured properties.
*
* @throws Exception This method passes on any exception that occurs during the startup of
* the mini cluster.
*/
public void start() throws Exception {
synchronized (lock) {
checkState(!running, "FlinkMiniCluster is already running");
LOG.info("Starting Flink Mini Cluster");
LOG.debug("Using configuration {}", miniClusterConfiguration);
final Configuration configuration = miniClusterConfiguration.getConfiguration();
final Time rpcTimeout = miniClusterConfiguration.getRpcTimeout();
final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();
final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;
try {
initializeIOFormatClasses(configuration);
LOG.info("Starting Metrics Registry");
metricRegistry = createMetricRegistry(configuration);
this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
metricRegistry,
"localhost");
final RpcService jobManagerRpcService;
final RpcService resourceManagerRpcService;
final RpcService[] taskManagerRpcServices = new RpcService[numTaskManagers];
// bring up all the RPC services
LOG.info("Starting RPC Service(s)");
// we always need the 'commonRpcService' for auxiliary calls
commonRpcService = createRpcService(configuration, rpcTimeout, false, null);
// TODO: Temporary hack until the metric query service is ported to the RpcEndpoint
final ActorSystem actorSystem = ((AkkaRpcService) commonRpcService).getActorSystem();
metricRegistry.startQueryService(actorSystem, null);
if (useSingleRpcService) {
for (int i = 0; i < numTaskManagers; i++) {
taskManagerRpcServices[i] = commonRpcService;
}
jobManagerRpcService = commonRpcService;
resourceManagerRpcService = commonRpcService;
this.resourceManagerRpcService = null;
this.jobManagerRpcService = null;
this.taskManagerRpcServices = null;
}
else {
// start a new service per component, possibly with custom bind addresses
final String jobManagerBindAddress = miniClusterConfiguration.getJobManagerBindAddress();
final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress();
final String resourceManagerBindAddress = miniClusterConfiguration.getResourceManagerBindAddress();
jobManagerRpcService = createRpcService(configuration, rpcTimeout, true, jobManagerBindAddress);
resourceManagerRpcService = createRpcService(configuration, rpcTimeout, true, resourceManagerBindAddress);
for (int i = 0; i < numTaskManagers; i++) {
taskManagerRpcServices[i] = createRpcService(
configuration, rpcTimeout, true, taskManagerBindAddress);
}
this.jobManagerRpcService = jobManagerRpcService;
this.taskManagerRpcServices = taskManagerRpcServices;
this.resourceManagerRpcService = resourceManagerRpcService;
}
// create the high-availability services
LOG.info("Starting high-availability services");
haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
configuration,
commonRpcService.getExecutor());
blobServer = new BlobServer(configuration, haServices.createBlobStore());
blobServer.start();
heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
// bring up the ResourceManager(s)
LOG.info("Starting ResourceManger");
resourceManagerRunner = startResourceManager(
configuration,
haServices,
heartbeatServices,
metricRegistry,
resourceManagerRpcService,
new ClusterInformation("localhost", blobServer.getPort()),
jobManagerMetricGroup);
blobCacheService = new BlobCacheService(
configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort())
);
// bring up the TaskManager(s) for the mini cluster
LOG.info("Starting {} TaskManger(s)", numTaskManagers);
taskManagers = startTaskManagers(
configuration,
haServices,
heartbeatServices,
metricRegistry,
blobCacheService,
numTaskManagers,
taskManagerRpcServices);
// starting the dispatcher rest endpoint
LOG.info("Starting dispatcher rest endpoint.");
dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
jobManagerRpcService,
DispatcherGateway.class,
DispatcherId::fromUuid,
20,
Time.milliseconds(20L));
final RpcGatewayRetriever<ResourceManagerId, ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
jobManagerRpcService,
ResourceManagerGateway.class,
ResourceManagerId::fromUuid,
20,
Time.milliseconds(20L));
this.dispatcherRestEndpoint = new DispatcherRestEndpoint(
RestServerEndpointConfiguration.fromConfiguration(configuration),
dispatcherGatewayRetriever,
configuration,
RestHandlerConfiguration.fromConfiguration(configuration),
resourceManagerGatewayRetriever,
blobServer.getTransientBlobService(),
WebMonitorEndpoint.createExecutorService(
configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1),
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint"),
new AkkaQueryServiceRetriever(
actorSystem,
Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
haServices.getWebMonitorLeaderElectionService(),
new ShutDownFatalErrorHandler());
dispatcherRestEndpoint.start();
restAddressURI = new URI(dispatcherRestEndpoint.getRestBaseUrl());
// bring up the dispatcher that launches JobManagers when jobs submitted
LOG.info("Starting job dispatcher(s) for JobManger");
this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, "localhost");
final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, dispatcherRestEndpoint);
dispatcher = new StandaloneDispatcher(
jobManagerRpcService,
Dispatcher.DISPATCHER_NAME + UUID.randomUUID(),
configuration,
haServices,
resourceManagerRunner.getResourceManageGateway(),
blobServer,
heartbeatServices,
jobManagerMetricGroup,
metricRegistry.getMetricQueryServicePath(),
new MemoryArchivedExecutionGraphStore(),
Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
new ShutDownFatalErrorHandler(),
dispatcherRestEndpoint.getRestBaseUrl(),
historyServerArchivist);
dispatcher.start();
resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();
dispatcherLeaderRetriever = haServices.getDispatcherLeaderRetriever();
resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever);
dispatcherLeaderRetriever.start(dispatcherGatewayRetriever);
}
catch (Exception e) {
// cleanup everything
try {
close();
} catch (Exception ee) {
e.addSuppressed(ee);
}
throw e;
}
// create a new termination future
terminationFuture = new CompletableFuture<>();
// now officially mark this as running
running = true;
LOG.info("Flink Mini Cluster started successfully");
}
}
- Здесь сначала создаются metricRegistry, commonRpcService, jobManagerRpcService, resourceManagerRpcService, haServices, blobServer, heartbeatServices, resourceManagerRunner, blobCacheService, taskManagers, dispatcherGatewayRetriever, dispatcherRestEndpoint, диспетчер, диспетчерLeaderRetriever.
RestServerEndpoint
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/rest/RestServerEndpoint.java
/**
* Starts this REST server endpoint.
*
* @throws Exception if we cannot start the RestServerEndpoint
*/
public final void start() throws Exception {
synchronized (lock) {
Preconditions.checkState(state == State.CREATED, "The RestServerEndpoint cannot be restarted.");
log.info("Starting rest endpoint.");
final Router router = new Router();
final CompletableFuture<String> restAddressFuture = new CompletableFuture<>();
List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = initializeHandlers(restAddressFuture);
/* sort the handlers such that they are ordered the following:
* /jobs
* /jobs/overview
* /jobs/:jobid
* /jobs/:jobid/config
* /:*
*/
Collections.sort(
handlers,
RestHandlerUrlComparator.INSTANCE);
handlers.forEach(handler -> {
log.debug("Register handler {} under {}@{}.", handler.f1, handler.f0.getHttpMethod(), handler.f0.getTargetRestEndpointURL());
registerHandler(router, handler);
});
ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
RouterHandler handler = new RouterHandler(router, responseHeaders);
// SSL should be the first handler in the pipeline
if (sslEngineFactory != null) {
ch.pipeline().addLast("ssl",
new RedirectingSslHandler(restAddress, restAddressFuture, sslEngineFactory));
}
ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new FileUploadHandler(uploadDir))
.addLast(new FlinkHttpObjectAggregator(maxContentLength, responseHeaders))
.addLast(new ChunkedWriteHandler())
.addLast(handler.getName(), handler)
.addLast(new PipelineErrorHandler(log, responseHeaders));
}
};
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("flink-rest-server-netty-boss"));
NioEventLoopGroup workerGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("flink-rest-server-netty-worker"));
bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort);
final ChannelFuture channel;
if (restBindAddress == null) {
channel = bootstrap.bind(restBindPort);
} else {
channel = bootstrap.bind(restBindAddress, restBindPort);
}
serverChannel = channel.syncUninterruptibly().channel();
final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
final String advertisedAddress;
if (bindAddress.getAddress().isAnyLocalAddress()) {
advertisedAddress = this.restAddress;
} else {
advertisedAddress = bindAddress.getAddress().getHostAddress();
}
final int port = bindAddress.getPort();
log.info("Rest endpoint listening at {}:{}", advertisedAddress, port);
final String protocol;
if (sslEngineFactory != null) {
protocol = "https://";
} else {
protocol = "http://";
}
restBaseUrl = protocol + advertisedAddress + ':' + port;
restAddressFuture.complete(restBaseUrl);
state = State.RUNNING;
startInternal();
}
}
- InitializeHandlers вызывается здесь для получения ChannelInboundHandler, а initializeHandlers реализован в подклассе DispatcherRestEndpoint.
DispatcherRestEndpoint
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@Override
protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = super.initializeHandlers(restAddressFuture);
// Add the Dispatcher specific handlers
final Time timeout = restConfiguration.getTimeout();
JobSubmitHandler jobSubmitHandler = new JobSubmitHandler(
restAddressFuture,
leaderRetriever,
timeout,
responseHeaders,
executor,
clusterConfiguration);
if (clusterConfiguration.getBoolean(WebOptions.SUBMIT_ENABLE)) {
try {
webSubmissionExtension = WebMonitorUtils.loadWebSubmissionExtension(
leaderRetriever,
restAddressFuture,
timeout,
responseHeaders,
uploadDir,
executor,
clusterConfiguration);
// register extension handlers
handlers.addAll(webSubmissionExtension.getHandlers());
} catch (FlinkException e) {
if (log.isDebugEnabled()) {
log.debug("Failed to load web based job submission extension.", e);
} else {
log.info("Failed to load web based job submission extension. " +
"Probable reason: flink-runtime-web is not in the classpath.");
}
}
} else {
log.info("Web-based job submission is not enabled.");
}
handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
return handlers;
}
- Здесь сначала вызывается initializeHandlers родительского класса, а родительским классом здесь является WebMonitorEndpoint(
它是RestServerEndpoint的直接子类,而DispatcherRestEndpoint又继承了WebMonitorEndpoint
)
WebMonitorEndpoint
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@Override
protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(30);
final Time timeout = restConfiguration.getTimeout();
//......
// TODO: Remove once the Yarn proxy can forward all REST verbs
handlers.add(Tuple2.of(YarnCancelJobTerminationHeaders.getInstance(), jobCancelTerminationHandler));
handlers.add(Tuple2.of(YarnStopJobTerminationHeaders.getInstance(), jobStopTerminationHandler));
handlers.add(Tuple2.of(shutdownHandler.getMessageHeaders(), shutdownHandler));
//......
// load the log and stdout file handler for the main cluster component
final WebMonitorUtils.LogFileLocation logFileLocation = WebMonitorUtils.LogFileLocation.find(clusterConfiguration);
final ChannelInboundHandler logFileHandler = createStaticFileHandler(
restAddressFuture,
timeout,
logFileLocation.logFile);
final ChannelInboundHandler stdoutFileHandler = createStaticFileHandler(
restAddressFuture,
timeout,
logFileLocation.stdOutFile);
handlers.add(Tuple2.of(LogFileHandlerSpecification.getInstance(), logFileHandler));
handlers.add(Tuple2.of(StdoutFileHandlerSpecification.getInstance(), stdoutFileHandler));
// TaskManager log and stdout file handler
final Time cacheEntryDuration = Time.milliseconds(restConfiguration.getRefreshInterval());
final TaskManagerLogFileHandler taskManagerLogFileHandler = new TaskManagerLogFileHandler(
restAddressFuture,
leaderRetriever,
timeout,
responseHeaders,
TaskManagerLogFileHeaders.getInstance(),
resourceManagerRetriever,
transientBlobService,
cacheEntryDuration);
final TaskManagerStdoutFileHandler taskManagerStdoutFileHandler = new TaskManagerStdoutFileHandler(
restAddressFuture,
leaderRetriever,
timeout,
responseHeaders,
TaskManagerStdoutFileHeaders.getInstance(),
resourceManagerRetriever,
transientBlobService,
cacheEntryDuration);
handlers.add(Tuple2.of(TaskManagerLogFileHeaders.getInstance(), taskManagerLogFileHandler));
handlers.add(Tuple2.of(TaskManagerStdoutFileHeaders.getInstance(), taskManagerStdoutFileHandler));
//......
}
@Nonnull
private ChannelInboundHandler createStaticFileHandler(
CompletableFuture<String> restAddressFuture,
Time timeout,
File fileToServe) {
if (fileToServe == null) {
return new ConstantTextHandler("(file unavailable)");
} else {
try {
return new StaticFileServerHandler<>(
leaderRetriever,
restAddressFuture,
timeout,
fileToServe);
} catch (IOException e) {
log.info("Cannot load log file handler.", e);
return new ConstantTextHandler("(log file unavailable)");
}
}
}
- Он инициализирует серию ChannelInboundHandlers и регистрируется в обработчиках.
- Для FileHandler JobManager он сначала вызывает WebMonitorUtils.LogFileLocation.find(clusterConfiguration) для построения logFileLocation, а затем использует logFileLocation.logFile и logFileLocation.stdOutFile для построения соответственно logFileHandler и stdoutFileHandler, которые используются для обработки загрузки файлов журнала и стандартного вывода. соответственно.
- Для FileHandler TaskManager TaskManagerLogFileHandler и TaskManagerStdoutFileHandler созданы для обработки загрузки файлов журнала и stdout.
JobManager FileHandler
WebMonitorUtils.LogFileLocation.find
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
/**
* Singleton to hold the log and stdout file.
*/
public static class LogFileLocation {
public final File logFile;
public final File stdOutFile;
private LogFileLocation(File logFile, File stdOutFile) {
this.logFile = logFile;
this.stdOutFile = stdOutFile;
}
/**
* Finds the Flink log directory using log.file Java property that is set during startup.
*/
public static LogFileLocation find(Configuration config) {
final String logEnv = "log.file";
String logFilePath = System.getProperty(logEnv);
if (logFilePath == null) {
LOG.warn("Log file environment variable '{}' is not set.", logEnv);
logFilePath = config.getString(WebOptions.LOG_PATH);
}
// not configured, cannot serve log files
if (logFilePath == null || logFilePath.length() < 4) {
LOG.warn("JobManager log files are unavailable in the web dashboard. " +
"Log file location not found in environment variable '{}' or configuration key '{}'.",
logEnv, WebOptions.LOG_PATH);
return new LogFileLocation(null, null);
}
String outFilePath = logFilePath.substring(0, logFilePath.length() - 3).concat("out");
LOG.info("Determined location of main cluster component log file: {}", logFilePath);
LOG.info("Determined location of main cluster component stdout file: {}", outFilePath);
return new LogFileLocation(resolveFileLocation(logFilePath), resolveFileLocation(outFilePath));
}
/**
* Verify log file location.
*
* @param logFilePath Path to log file
* @return File or null if not a valid log file
*/
private static File resolveFileLocation(String logFilePath) {
File logFile = new File(logFilePath);
return (logFile.exists() && logFile.canRead()) ? logFile : null;
}
}
- Здесь сначала прочитайте свойство log.file из системного свойства, если оно не найдено, выведите предупреждение (
Log file environment variable 'log.file' is not set.
) - Если log.file не настроен, прочитайте WebOptions.LOG_PATH из конфигурации flink (
web.log.path
) конфигурации, если нет или logFilePath.length() меньше 4, вывести предупреждение(JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'Key: 'web.log.path' , default: null (deprecated keys: [jobmanager.web.log.path])'.
) - Причина, по которой logFilePath.length() здесь больше или равна 4, в основном заключается в использовании logFilePath.substring(0, logFilePath.length() - 3).concat("out") для построения outFilePath; затем проверьте logFilePath через Метод resolveFileLocation и outFilePath, сборка LogFileLocation возвращает
StaticFileServerHandler
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
/**
* Simple file server handler that serves requests to web frontend's static files, such as
* HTML, CSS, or JS files.
*
* <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
* example.</p>
*/
@ChannelHandler.Sharable
public class StaticFileServerHandler<T extends RestfulGateway> extends RedirectHandler<T> {
/** Timezone in which this server answers its "if-modified" requests. */
private static final TimeZone GMT_TIMEZONE = TimeZone.getTimeZone("GMT");
/** Date format for HTTP. */
public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz";
/** Be default, we allow files to be cached for 5 minutes. */
private static final int HTTP_CACHE_SECONDS = 300;
// ------------------------------------------------------------------------
/** The path in which the static documents are. */
private final File rootPath;
public StaticFileServerHandler(
GatewayRetriever<? extends T> retriever,
CompletableFuture<String> localJobManagerAddressFuture,
Time timeout,
File rootPath) throws IOException {
super(localJobManagerAddressFuture, retriever, timeout, Collections.emptyMap());
this.rootPath = checkNotNull(rootPath).getCanonicalFile();
}
// ------------------------------------------------------------------------
// Responses to requests
// ------------------------------------------------------------------------
@Override
protected void respondAsLeader(ChannelHandlerContext channelHandlerContext, RoutedRequest routedRequest, T gateway) throws Exception {
final HttpRequest request = routedRequest.getRequest();
final String requestPath;
// make sure we request the "index.html" in case there is a directory request
if (routedRequest.getPath().endsWith("/")) {
requestPath = routedRequest.getPath() + "index.html";
}
// in case the files being accessed are logs or stdout files, find appropriate paths.
else if (routedRequest.getPath().equals("/jobmanager/log") || routedRequest.getPath().equals("/jobmanager/stdout")) {
requestPath = "";
} else {
requestPath = routedRequest.getPath();
}
respondToRequest(channelHandlerContext, request, requestPath);
}
//......
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
if (ctx.channel().isActive()) {
logger.error("Caught exception", cause);
sendError(ctx, INTERNAL_SERVER_ERROR);
}
}
}
- Для /jobmanager/log и /jobmanager/stdout он сбрасывает requestPath, а затем вызывает responseToRequest, который передает файл на основе rootPath.
TaskManager FileHandler
TaskManagerLogFileHandler
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerLogFileHandler.java
/**
* Rest handler which serves the log files from {@link TaskExecutor}.
*/
public class TaskManagerLogFileHandler extends AbstractTaskManagerFileHandler<TaskManagerMessageParameters> {
public TaskManagerLogFileHandler(
@Nonnull CompletableFuture<String> localAddressFuture,
@Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@Nonnull Time timeout,
@Nonnull Map<String, String> responseHeaders,
@Nonnull UntypedResponseMessageHeaders<EmptyRequestBody, TaskManagerMessageParameters> untypedResponseMessageHeaders,
@Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
@Nonnull TransientBlobService transientBlobService,
@Nonnull Time cacheEntryDuration) {
super(localAddressFuture, leaderRetriever, timeout, responseHeaders, untypedResponseMessageHeaders, resourceManagerGatewayRetriever, transientBlobService, cacheEntryDuration);
}
@Override
protected CompletableFuture<TransientBlobKey> requestFileUpload(ResourceManagerGateway resourceManagerGateway, ResourceID taskManagerResourceId) {
return resourceManagerGateway.requestTaskManagerFileUpload(taskManagerResourceId, FileType.LOG, timeout);
}
}
- Его requestFileUpload вызывает ResourceManager.requestTaskManagerFileUpload, а переданный FileType — FileType.LOG
TaskManagerStdoutFileHandler.requestFileUpload
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerStdoutFileHandler.java
/**
* Rest handler which serves the stdout file of the {@link TaskExecutor}.
*/
public class TaskManagerStdoutFileHandler extends AbstractTaskManagerFileHandler<TaskManagerMessageParameters> {
public TaskManagerStdoutFileHandler(
@Nonnull CompletableFuture<String> localAddressFuture,
@Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@Nonnull Time timeout,
@Nonnull Map<String, String> responseHeaders,
@Nonnull UntypedResponseMessageHeaders<EmptyRequestBody, TaskManagerMessageParameters> untypedResponseMessageHeaders,
@Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
@Nonnull TransientBlobService transientBlobService,
@Nonnull Time cacheEntryDuration) {
super(localAddressFuture, leaderRetriever, timeout, responseHeaders, untypedResponseMessageHeaders, resourceManagerGatewayRetriever, transientBlobService, cacheEntryDuration);
}
@Override
protected CompletableFuture<TransientBlobKey> requestFileUpload(ResourceManagerGateway resourceManagerGateway, ResourceID taskManagerResourceId) {
return resourceManagerGateway.requestTaskManagerFileUpload(taskManagerResourceId, FileType.STDOUT, timeout);
}
}
- Его requestFileUpload вызывает ResourceManager.requestTaskManagerFileUpload, а переданный FileType — FileType.STDOUT.
ResourceManager.requestTaskManagerFileUpload
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@Override
public CompletableFuture<TransientBlobKey> requestTaskManagerFileUpload(ResourceID taskManagerId, FileType fileType, Time timeout) {
log.debug("Request file {} upload from TaskExecutor {}.", fileType, taskManagerId);
final WorkerRegistration<WorkerType> taskExecutor = taskExecutors.get(taskManagerId);
if (taskExecutor == null) {
log.debug("Requested file {} upload from unregistered TaskExecutor {}.", fileType, taskManagerId);
return FutureUtils.completedExceptionally(new UnknownTaskExecutorException(taskManagerId));
} else {
return taskExecutor.getTaskExecutorGateway().requestFileUpload(fileType, timeout);
}
}
- RequestTaskManagerFileUpload ResourceManager реализован через TaskExecutor.requestFileUpload
TaskExecutor.requestFileUpload
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@Override
public CompletableFuture<TransientBlobKey> requestFileUpload(FileType fileType, Time timeout) {
log.debug("Request file {} upload.", fileType);
final String filePath;
switch (fileType) {
case LOG:
filePath = taskManagerConfiguration.getTaskManagerLogPath();
break;
case STDOUT:
filePath = taskManagerConfiguration.getTaskManagerStdoutPath();
break;
default:
filePath = null;
}
if (filePath != null && !filePath.isEmpty()) {
final File file = new File(filePath);
if (file.exists()) {
final TransientBlobCache transientBlobService = blobCacheService.getTransientBlobService();
final TransientBlobKey transientBlobKey;
try (FileInputStream fileInputStream = new FileInputStream(file)) {
transientBlobKey = transientBlobService.putTransient(fileInputStream);
} catch (IOException e) {
log.debug("Could not upload file {}.", fileType, e);
return FutureUtils.completedExceptionally(new FlinkException("Could not upload file " + fileType + '.', e));
}
return CompletableFuture.completedFuture(transientBlobKey);
} else {
log.debug("The file {} does not exist on the TaskExecutor {}.", fileType, getResourceID());
return FutureUtils.completedExceptionally(new FlinkException("The file " + fileType + " does not exist on the TaskExecutor."));
}
} else {
log.debug("The file {} is unavailable on the TaskExecutor {}.", fileType, getResourceID());
return FutureUtils.completedExceptionally(new FlinkException("The file " + fileType + " is not available on the TaskExecutor."));
}
}
- ЗапросFileUpload TaskExecutor получит filePath в соответствии с типом файла.Если это тип LOG, он примет taskManagerConfiguration.getTaskManagerLogPath(); если это тип STDOUT, он примет taskManagerConfiguration.getTaskManagerStdoutPath(), а затем передаст файл.
TaskManagerRunner.startTaskManager
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
public static TaskExecutor startTaskManager(
Configuration configuration,
ResourceID resourceID,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
BlobCacheService blobCacheService,
boolean localCommunicationOnly,
FatalErrorHandler fatalErrorHandler) throws Exception {
checkNotNull(configuration);
checkNotNull(resourceID);
checkNotNull(rpcService);
checkNotNull(highAvailabilityServices);
LOG.info("Starting TaskManager with ResourceID: {}", resourceID);
InetAddress remoteAddress = InetAddress.getByName(rpcService.getAddress());
TaskManagerServicesConfiguration taskManagerServicesConfiguration =
TaskManagerServicesConfiguration.fromConfiguration(
configuration,
remoteAddress,
localCommunicationOnly);
TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
taskManagerServicesConfiguration,
resourceID,
rpcService.getExecutor(), // TODO replace this later with some dedicated executor for io.
EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag(),
EnvironmentInformation.getMaxJvmHeapMemory());
TaskManagerMetricGroup taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
metricRegistry,
taskManagerServices.getTaskManagerLocation(),
taskManagerServices.getNetworkEnvironment());
TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
return new TaskExecutor(
rpcService,
taskManagerConfiguration,
highAvailabilityServices,
taskManagerServices,
heartbeatServices,
taskManagerMetricGroup,
blobCacheService,
fatalErrorHandler);
}
- TaskManagerRunner.startTaskManager строит taskManagerConfiguration через TaskManagerConfiguration.fromConfiguration(configuration)
TaskManagerConfiguration.fromConfiguration
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
public static TaskManagerConfiguration fromConfiguration(Configuration configuration) {
int numberSlots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
if (numberSlots == -1) {
numberSlots = 1;
}
//......
final String taskManagerLogPath = configuration.getString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, System.getProperty("log.file"));
final String taskManagerStdoutPath;
if (taskManagerLogPath != null) {
final int extension = taskManagerLogPath.lastIndexOf('.');
if (extension > 0) {
taskManagerStdoutPath = taskManagerLogPath.substring(0, extension) + ".out";
} else {
taskManagerStdoutPath = null;
}
} else {
taskManagerStdoutPath = null;
}
return new TaskManagerConfiguration(
numberSlots,
tmpDirPaths,
timeout,
finiteRegistrationDuration,
initialRegistrationPause,
maxRegistrationPause,
refusedRegistrationPause,
configuration,
exitOnOom,
FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder),
alwaysParentFirstLoaderPatterns,
taskManagerLogPath,
taskManagerStdoutPath);
}
- В TaskManagerConfiguration.fromConfiguration сначала в соответствии с ConfigConstants.TASK_MANAGER_LOG_PATH_KEY(
taskmanager.log.path
) Прочитать taskManagerLogPath из конфигурации flink.Если он не может быть прочитан, он возьмет системное свойство log.file; если он прочитан, что taskManagerLogPath не равен нулю, измени суффикс, чтобы построить taskManagerStdoutPath
резюме
- Приложение файла настраивается в файле log4j.properties flink, и используется системное свойство log.file.
- MiniCluster Flink создаст DispatcherRestEndpoint при запуске, а его метод start будет использовать initializeHandlers для инициализации ряда обработчиков.Для обработчика файлов JobManager используйте WebMonitorUtils.LogFileLocation.find(clusterConfiguration) для получения logFileLocation, который сначала считывает журнал из файла свойств системы. свойство, если оно не найдено, прочитайте WebOptions.LOG_PATH из конфигурации flink (
web.log.path
) конфигурации; затем создайте два StaticFileServerHandler, используя logFileLocation.logFile и logFileLocation.stdOutFile соответственно. - Для файлового обработчика TaskManager создаются TaskManagerLogFileHandler и TaskManagerStdoutFileHandler соответственно для обработки загрузки файлов журнала и стандартного вывода.Они оба вызывают метод ResourceManager.requestTaskManagerFileUpload внутри, но тип файла отличается, один — LOG, другой — STDOUT; и ResourceManager Метод .requestTaskManagerFileUpload наконец Передача файлов осуществляется через TaskExecutor.requestFileUpload; TaskManagerRunner.startTaskManager создает TaskManagerConfiguration при создании TaskExecutor, который сначала получает ConfigConstants.TASK_MANAGER_LOG_PATH_KEY(
taskmanager.log.path
), если нет, то взять системное свойство log.file