Файл загружен
Принцип этого процессора заключается в получении объектов HttpObject и их обработке в соответствии с HttpRequest и HttpContent.Содержимое файла доставляется сообщением HttpContent.
Затем прочитайте фрагмент за фрагментом в HttpContent, и размер фрагмента может быть установлен при инициализации HttpServerCodec. Отправьте каждый фрагмент в httpDecoder и сделайте копию. Когда объект LastHttpContent будет прочитан, это означает, что загрузка завершена. Вы можете записать кэшированный файл в httpDecoder на диск через HttpDataFactory, а затем удалить кэшированный объект HttpContent.
@Slf4j
public class HttUploadHandler extends SimpleChannelInboundHandler<HttpObject> {
public HttUploadHandler() {
super(false);
}
private static final HttpDataFactory factory = new DefaultHttpDataFactory(true);
private static final String FILE_UPLOAD = "/data/";
private static final String URI = "/upload";
private HttpPostRequestDecoder httpDecoder;
HttpRequest request;
@Override
protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject httpObject)
throws Exception {
if (httpObject instanceof HttpRequest) {
request = (HttpRequest) httpObject;
if (request.uri().startsWith(URI) && request.method().equals(HttpMethod.POST)) {
httpDecoder = new HttpPostRequestDecoder(factory, request);
httpDecoder.setDiscardThreshold(0);
} else {
//传递给下一个Handler
ctx.fireChannelRead(httpObject);
}
}
if (httpObject instanceof HttpContent) {
if (httpDecoder != null) {
final HttpContent chunk = (HttpContent) httpObject;
httpDecoder.offer(chunk);
if (chunk instanceof LastHttpContent) {
writeChunk(ctx);
//关闭httpDecoder
httpDecoder.destroy();
httpDecoder = null;
}
ReferenceCountUtil.release(httpObject);
} else {
ctx.fireChannelRead(httpObject);
}
}
}
private void writeChunk(ChannelHandlerContext ctx) throws IOException {
while (httpDecoder.hasNext()) {
InterfaceHttpData data = httpDecoder.next();
if (data != null && HttpDataType.FileUpload.equals(data.getHttpDataType())) {
final FileUpload fileUpload = (FileUpload) data;
final File file = new File(FILE_UPLOAD + fileUpload.getFilename());
log.info("upload file: {}", file);
try (FileChannel inputChannel = new FileInputStream(fileUpload.getFile()).getChannel();
FileChannel outputChannel = new FileOutputStream(file).getChannel()) {
outputChannel.transferFrom(inputChannel, 0, inputChannel.size());
ResponseUtil.response(ctx, request, new GeneralResponse(HttpResponseStatus.OK, "SUCCESS", null));
}
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.warn("{}", cause);
ctx.channel().close();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (httpDecoder != null) {
httpDecoder.cleanFiles();
}
}
}
Загрузка файла
Обратитесь к официальной демонстрации:GitHub.com/Нетти/Нетти…
Внесены изменения:
- Для более эффективной передачи больших данных в примере используется кодировщик ChunkedWriteHandler, который обеспечивает способ записи файлов без копирования памяти.
- Контролируйте процесс загрузки файла через ChannelProgressiveFutureListener.
// 新增ChunkedHandler,主要作用是支持异步发送大的码流(例如大文件传输),但是不占用过多的内存,防止发生java内存溢出错误
ch.pipeline().addLast(new ChunkedWriteHandler());
// 用于下载文件
ch.pipeline().addLast(new HttpDownloadHandler());
@Slf4j
public class HttpDownloadHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
public HttpDownloadHandler() {
super(false);
}
private String filePath = "/data/body.csv";
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {
String uri = request.uri();
if (uri.startsWith("/download") && request.method().equals(HttpMethod.GET)) {
GeneralResponse generalResponse = null;
File file = new File(filePath);
try {
final RandomAccessFile raf = new RandomAccessFile(file, "r");
long fileLength = raf.length();
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.OK);
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, fileLength);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/octet-stream");
response.headers().add(HttpHeaderNames.CONTENT_DISPOSITION, String.format("attachment; filename=\"%s\"", file.getName()));
ctx.write(response);
ChannelFuture sendFileFuture = ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise());
sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
@Override
public void operationComplete(ChannelProgressiveFuture future)
throws Exception {
log.info("file {} transfer complete.", file.getName());
raf.close();
}
@Override
public void operationProgressed(ChannelProgressiveFuture future,
long progress, long total) throws Exception {
if (total < 0) {
log.warn("file {} transfer progress: {}", file.getName(), progress);
} else {
log.debug("file {} transfer progress: {}/{}", file.getName(), progress, total);
}
}
});
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
} catch (FileNotFoundException e) {
log.warn("file {} not found", file.getPath());
generalResponse = new GeneralResponse(HttpResponseStatus.NOT_FOUND, String.format("file %s not found", file.getPath()), null);
ResponseUtil.response(ctx, request, generalResponse);
} catch (IOException e) {
log.warn("file {} has a IOException: {}", file.getName(), e.getMessage());
generalResponse = new GeneralResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, String.format("读取 file %s 发生异常", filePath), null);
ResponseUtil.response(ctx, request, generalResponse);
}
} else {
ctx.fireChannelRead(request);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
log.warn("{}", e);
ctx.close();
}
}
Яма, обнаруженная при загрузке файлов
так какRandomAccessFile
Это файловый ресурс, поэтому я обычно закрываю файловый ресурс в конце, используя Java7.try-with-resources
синтаксис, поэтому проблема возникает, потому чтоctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise());
асинхронно, когда я закрываюRandomAccessFile
Если файл не был передан, загрузка файла будет остановлена.
Код размещен в:GitHub.com/more think/От…