#221: increase ThreadPoolExecutor size to 8 threads

This commit is contained in:
Benjamin Gamard 2018-04-05 12:45:04 +02:00
parent 156e67bc52
commit c0678e9a90
13 changed files with 131 additions and 119 deletions

View File

@ -1,35 +0,0 @@
package com.sismics.docs.core.event;
import com.google.common.base.MoreObjects;
import com.sismics.docs.core.model.jpa.File;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.List;
/**
* Cleanup temporary files event.
*
* @author bgamard
*/
public class TemporaryFileCleanupAsyncEvent {
/**
* Temporary files.
*/
private List<Path> fileList;
public TemporaryFileCleanupAsyncEvent(List<Path> fileList) {
this.fileList = fileList;
}
public List<Path> getFileList() {
return fileList;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("files", fileList)
.toString();
}
}

View File

@ -1,38 +0,0 @@
package com.sismics.docs.core.listener.async;
import com.google.common.eventbus.Subscribe;
import com.sismics.docs.core.event.TemporaryFileCleanupAsyncEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Files;
import java.nio.file.Path;
/**
* Listener to cleanup temporary files created during a request.
*
* @author bgamard
*/
public class TemporaryFileCleanupAsyncListener {
/**
* Logger.
*/
private static final Logger log = LoggerFactory.getLogger(TemporaryFileCleanupAsyncListener.class);
/**
* Cleanup temporary files.
*
* @param event Temporary file cleanup event
* @throws Exception e
*/
@Subscribe
public void on(final TemporaryFileCleanupAsyncEvent event) throws Exception {
if (log.isInfoEnabled()) {
log.info("Cleanup temporary files event: " + event.toString());
}
for (Path file : event.getFileList()) {
Files.delete(file);
}
}
}

View File

@ -7,6 +7,7 @@ import com.sismics.docs.core.dao.UserDao;
import com.sismics.docs.core.event.RebuildIndexAsyncEvent; import com.sismics.docs.core.event.RebuildIndexAsyncEvent;
import com.sismics.docs.core.listener.async.*; import com.sismics.docs.core.listener.async.*;
import com.sismics.docs.core.model.jpa.User; import com.sismics.docs.core.model.jpa.User;
import com.sismics.docs.core.service.FileService;
import com.sismics.docs.core.service.InboxService; import com.sismics.docs.core.service.InboxService;
import com.sismics.docs.core.util.PdfUtil; import com.sismics.docs.core.util.PdfUtil;
import com.sismics.docs.core.util.indexing.IndexingHandler; import com.sismics.docs.core.util.indexing.IndexingHandler;
@ -58,6 +59,11 @@ public class AppContext {
*/ */
private InboxService inboxService; private InboxService inboxService;
/**
* File service.
*/
private FileService fileService;
/** /**
* Asynchronous executors. * Asynchronous executors.
*/ */
@ -79,6 +85,11 @@ public class AppContext {
asyncEventBus.post(rebuildIndexAsyncEvent); asyncEventBus.post(rebuildIndexAsyncEvent);
} }
// Start file service
fileService = new FileService();
fileService.startAsync();
fileService.awaitRunning();
// Start inbox service // Start inbox service
inboxService = new InboxService(); inboxService = new InboxService();
inboxService.startAsync(); inboxService.startAsync();
@ -123,7 +134,6 @@ public class AppContext {
asyncEventBus.register(new DocumentUpdatedAsyncListener()); asyncEventBus.register(new DocumentUpdatedAsyncListener());
asyncEventBus.register(new DocumentDeletedAsyncListener()); asyncEventBus.register(new DocumentDeletedAsyncListener());
asyncEventBus.register(new RebuildIndexAsyncListener()); asyncEventBus.register(new RebuildIndexAsyncListener());
asyncEventBus.register(new TemporaryFileCleanupAsyncListener());
asyncEventBus.register(new AclCreatedAsyncListener()); asyncEventBus.register(new AclCreatedAsyncListener());
asyncEventBus.register(new AclDeletedAsyncListener()); asyncEventBus.register(new AclDeletedAsyncListener());
@ -154,8 +164,7 @@ public class AppContext {
if (EnvironmentUtil.isUnitTest()) { if (EnvironmentUtil.isUnitTest()) {
return new EventBus(); return new EventBus();
} else { } else {
// /!\ Don't add more threads because a cleanup event is fired at the end of each request ThreadPoolExecutor executor = new ThreadPoolExecutor(8, 8,
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>()); new LinkedBlockingQueue<>());
asyncExecutorList.add(executor); asyncExecutorList.add(executor);
@ -179,6 +188,10 @@ public class AppContext {
return inboxService; return inboxService;
} }
public FileService getFileService() {
return fileService;
}
public void shutDown() { public void shutDown() {
for (ExecutorService executor : asyncExecutorList) { for (ExecutorService executor : asyncExecutorList) {
// Shutdown executor, don't accept any more tasks (can cause error with nested events) // Shutdown executor, don't accept any more tasks (can cause error with nested events)
@ -199,6 +212,10 @@ public class AppContext {
inboxService.awaitTerminated(); inboxService.awaitTerminated();
} }
if (fileService != null) {
fileService.stopAsync();
}
instance = null; instance = null;
} }
} }

View File

@ -0,0 +1,95 @@
package com.sismics.docs.core.service;
import com.google.common.util.concurrent.AbstractScheduledService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.ref.PhantomReference;
import java.lang.ref.ReferenceQueue;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* File service.
*
* @author bgamard
*/
public class FileService extends AbstractScheduledService {
/**
* Logger.
*/
private static final Logger log = LoggerFactory.getLogger(FileService.class);
/**
* Phantom references queue.
*/
private final ReferenceQueue<Path> referenceQueue = new ReferenceQueue<>();
private final Set<TemporaryPathReference> referenceSet = new HashSet<>();
public FileService() {
}
@Override
protected void startUp() {
log.info("File service starting up");
}
@Override
protected void shutDown() {
log.info("File service shutting down");
}
@Override
protected void runOneIteration() {
try {
deleteTemporaryFiles();
} catch (Throwable e) {
log.error("Exception during file service iteration", e);
}
}
/**
* Delete unreferenced temporary files.
*/
private void deleteTemporaryFiles() throws Exception {
TemporaryPathReference ref;
while ((ref = (TemporaryPathReference) referenceQueue.poll()) != null) {
Files.delete(Paths.get(ref.path));
referenceSet.remove(ref);
}
}
@Override
protected Scheduler scheduler() {
return Scheduler.newFixedDelaySchedule(0, 5, TimeUnit.SECONDS);
}
/**
* Create a temporary file.
*
* @return New temporary file
*/
public Path createTemporaryFile() throws IOException {
Path path = Files.createTempFile("sismics_docs", null);
referenceSet.add(new TemporaryPathReference(path, referenceQueue));
return path;
}
/**
* Phantom reference to a temporary file.
*
* @author bgamard
*/
class TemporaryPathReference extends PhantomReference<Path> {
String path;
TemporaryPathReference(Path referent, ReferenceQueue<? super Path> q) {
super(referent, q);
path = referent.toAbsolutePath().toString();
}
}
}

View File

@ -1,7 +1,7 @@
package com.sismics.docs.core.util; package com.sismics.docs.core.util;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import com.sismics.util.context.ThreadLocalContext; import com.sismics.docs.core.model.context.AppContext;
import org.bouncycastle.jce.provider.BouncyCastleProvider; import org.bouncycastle.jce.provider.BouncyCastleProvider;
import javax.crypto.Cipher; import javax.crypto.Cipher;
@ -74,7 +74,7 @@ public class EncryptionUtil {
return file; return file;
} }
Path tmpFile = ThreadLocalContext.get().createTemporaryFile(); Path tmpFile = AppContext.getInstance().getFileService().createTemporaryFile();
try (InputStream is = Files.newInputStream(file)) { try (InputStream is = Files.newInputStream(file)) {
Files.copy(new CipherInputStream(is, getCipher(privateKey, Cipher.DECRYPT_MODE)), tmpFile, StandardCopyOption.REPLACE_EXISTING); Files.copy(new CipherInputStream(is, getCipher(privateKey, Cipher.DECRYPT_MODE)), tmpFile, StandardCopyOption.REPLACE_EXISTING);
} }

View File

@ -9,6 +9,7 @@ import com.sismics.docs.core.dao.FileDao;
import com.sismics.docs.core.dao.UserDao; import com.sismics.docs.core.dao.UserDao;
import com.sismics.docs.core.event.DocumentUpdatedAsyncEvent; import com.sismics.docs.core.event.DocumentUpdatedAsyncEvent;
import com.sismics.docs.core.event.FileCreatedAsyncEvent; import com.sismics.docs.core.event.FileCreatedAsyncEvent;
import com.sismics.docs.core.model.context.AppContext;
import com.sismics.docs.core.model.jpa.File; import com.sismics.docs.core.model.jpa.File;
import com.sismics.docs.core.model.jpa.User; import com.sismics.docs.core.model.jpa.User;
import com.sismics.util.ImageDeskew; import com.sismics.util.ImageDeskew;
@ -55,7 +56,7 @@ public class FileUtil {
ImageDeskew imageDeskew = new ImageDeskew(resizedImage); ImageDeskew imageDeskew = new ImageDeskew(resizedImage);
BufferedImage deskewedImage = Scalr.rotate(resizedImage, - imageDeskew.getSkewAngle(), Scalr.OP_ANTIALIAS, Scalr.OP_GRAYSCALE); BufferedImage deskewedImage = Scalr.rotate(resizedImage, - imageDeskew.getSkewAngle(), Scalr.OP_ANTIALIAS, Scalr.OP_GRAYSCALE);
resizedImage.flush(); resizedImage.flush();
Path tmpFile = ThreadLocalContext.get().createTemporaryFile(); Path tmpFile = AppContext.getInstance().getFileService().createTemporaryFile();
ImageIO.write(deskewedImage, "tiff", tmpFile.toFile()); ImageIO.write(deskewedImage, "tiff", tmpFile.toFile());
List<String> result = Lists.newLinkedList(Arrays.asList("tesseract", tmpFile.toAbsolutePath().toString(), "stdout", "-l", language)); List<String> result = Lists.newLinkedList(Arrays.asList("tesseract", tmpFile.toAbsolutePath().toString(), "stdout", "-l", language));

View File

@ -1,7 +1,7 @@
package com.sismics.docs.core.util.format; package com.sismics.docs.core.util.format;
import com.google.common.io.Closer; import com.google.common.io.Closer;
import com.sismics.util.context.ThreadLocalContext; import com.sismics.docs.core.model.context.AppContext;
import com.sismics.util.mime.MimeType; import com.sismics.util.mime.MimeType;
import fr.opensagres.poi.xwpf.converter.pdf.PdfConverter; import fr.opensagres.poi.xwpf.converter.pdf.PdfConverter;
import fr.opensagres.poi.xwpf.converter.pdf.PdfOptions; import fr.opensagres.poi.xwpf.converter.pdf.PdfOptions;
@ -58,7 +58,7 @@ public class DocxFormatHandler implements FormatHandler {
*/ */
private Path getGeneratedPdf(Path file) throws Exception { private Path getGeneratedPdf(Path file) throws Exception {
if (temporaryPdfFile == null) { if (temporaryPdfFile == null) {
temporaryPdfFile = ThreadLocalContext.get().createTemporaryFile(); temporaryPdfFile = AppContext.getInstance().getFileService().createTemporaryFile();
try (InputStream inputStream = Files.newInputStream(file); try (InputStream inputStream = Files.newInputStream(file);
OutputStream outputStream = Files.newOutputStream(temporaryPdfFile)) { OutputStream outputStream = Files.newOutputStream(temporaryPdfFile)) {
XWPFDocument document = new XWPFDocument(inputStream); XWPFDocument document = new XWPFDocument(inputStream);

View File

@ -1,7 +1,7 @@
package com.sismics.docs.core.util.format; package com.sismics.docs.core.util.format;
import com.google.common.io.Closer; import com.google.common.io.Closer;
import com.sismics.util.context.ThreadLocalContext; import com.sismics.docs.core.model.context.AppContext;
import com.sismics.util.mime.MimeType; import com.sismics.util.mime.MimeType;
import fr.opensagres.odfdom.converter.pdf.PdfConverter; import fr.opensagres.odfdom.converter.pdf.PdfConverter;
import fr.opensagres.odfdom.converter.pdf.PdfOptions; import fr.opensagres.odfdom.converter.pdf.PdfOptions;
@ -58,7 +58,7 @@ public class OdtFormatHandler implements FormatHandler {
*/ */
private Path getGeneratedPdf(Path file) throws Exception { private Path getGeneratedPdf(Path file) throws Exception {
if (temporaryPdfFile == null) { if (temporaryPdfFile == null) {
temporaryPdfFile = ThreadLocalContext.get().createTemporaryFile(); temporaryPdfFile = AppContext.getInstance().getFileService().createTemporaryFile();
try (InputStream inputStream = Files.newInputStream(file); try (InputStream inputStream = Files.newInputStream(file);
OutputStream outputStream = Files.newOutputStream(temporaryPdfFile)) { OutputStream outputStream = Files.newOutputStream(temporaryPdfFile)) {
OdfTextDocument document = OdfTextDocument.loadDocument(inputStream); OdfTextDocument document = OdfTextDocument.loadDocument(inputStream);

View File

@ -4,7 +4,7 @@ import com.google.common.base.Charsets;
import com.google.common.io.Closer; import com.google.common.io.Closer;
import com.lowagie.text.*; import com.lowagie.text.*;
import com.lowagie.text.pdf.PdfWriter; import com.lowagie.text.pdf.PdfWriter;
import com.sismics.util.context.ThreadLocalContext; import com.sismics.docs.core.model.context.AppContext;
import com.sismics.util.mime.MimeType; import com.sismics.util.mime.MimeType;
import org.apache.pdfbox.io.MemoryUsageSetting; import org.apache.pdfbox.io.MemoryUsageSetting;
import org.apache.pdfbox.pdmodel.PDDocument; import org.apache.pdfbox.pdmodel.PDDocument;
@ -28,7 +28,7 @@ public class TextPlainFormatHandler implements FormatHandler {
@Override @Override
public BufferedImage generateThumbnail(Path file) throws Exception { public BufferedImage generateThumbnail(Path file) throws Exception {
Document output = new Document(PageSize.A4, 40, 40, 40, 40); Document output = new Document(PageSize.A4, 40, 40, 40, 40);
Path tempFile = ThreadLocalContext.get().createTemporaryFile(); Path tempFile = AppContext.getInstance().getFileService().createTemporaryFile();
OutputStream pdfOutputStream = Files.newOutputStream(tempFile); OutputStream pdfOutputStream = Files.newOutputStream(tempFile);
PdfWriter.getInstance(output, pdfOutputStream); PdfWriter.getInstance(output, pdfOutputStream);

View File

@ -6,9 +6,9 @@ import com.sismics.docs.core.constant.ConfigType;
import com.sismics.docs.core.constant.Constants; import com.sismics.docs.core.constant.Constants;
import com.sismics.docs.core.dao.ConfigDao; import com.sismics.docs.core.dao.ConfigDao;
import com.sismics.docs.core.dao.dto.UserDto; import com.sismics.docs.core.dao.dto.UserDto;
import com.sismics.docs.core.model.context.AppContext;
import com.sismics.docs.core.model.jpa.Config; import com.sismics.docs.core.model.jpa.Config;
import com.sismics.docs.core.util.ConfigUtil; import com.sismics.docs.core.util.ConfigUtil;
import com.sismics.util.context.ThreadLocalContext;
import freemarker.template.Configuration; import freemarker.template.Configuration;
import freemarker.template.DefaultObjectWrapperBuilder; import freemarker.template.DefaultObjectWrapperBuilder;
import freemarker.template.Template; import freemarker.template.Template;
@ -196,7 +196,7 @@ public class EmailUtil {
if (Part.ATTACHMENT.equalsIgnoreCase(disposition)) { if (Part.ATTACHMENT.equalsIgnoreCase(disposition)) {
FileContent fileContent = new FileContent(); FileContent fileContent = new FileContent();
fileContent.name = subPart.getFileName(); fileContent.name = subPart.getFileName();
fileContent.file = ThreadLocalContext.get().createTemporaryFile(); fileContent.file = AppContext.getInstance().getFileService().createTemporaryFile();
Files.copy(subPart.getInputStream(), fileContent.file, StandardCopyOption.REPLACE_EXISTING); Files.copy(subPart.getInputStream(), fileContent.file, StandardCopyOption.REPLACE_EXISTING);
fileContent.size = Files.size(fileContent.file); fileContent.size = Files.size(fileContent.file);
mailContent.fileContentList.add(fileContent); mailContent.fileContentList.add(fileContent);
@ -219,7 +219,7 @@ public class EmailUtil {
} }
} else if (content instanceof InputStream) { } else if (content instanceof InputStream) {
FileContent fileContent = new FileContent(); FileContent fileContent = new FileContent();
fileContent.file = ThreadLocalContext.get().createTemporaryFile(); fileContent.file = AppContext.getInstance().getFileService().createTemporaryFile();
Files.copy((InputStream) content, fileContent.file, StandardCopyOption.REPLACE_EXISTING); Files.copy((InputStream) content, fileContent.file, StandardCopyOption.REPLACE_EXISTING);
fileContent.size = Files.size(fileContent.file); fileContent.size = Files.size(fileContent.file);
mailContent.fileContentList.add(fileContent); mailContent.fileContentList.add(fileContent);

View File

@ -1,13 +1,9 @@
package com.sismics.util.context; package com.sismics.util.context;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.sismics.docs.core.event.TemporaryFileCleanupAsyncEvent;
import com.sismics.docs.core.model.context.AppContext; import com.sismics.docs.core.model.context.AppContext;
import javax.persistence.EntityManager; import javax.persistence.EntityManager;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -32,12 +28,6 @@ public class ThreadLocalContext {
*/ */
private List<Object> asyncEventList = Lists.newArrayList(); private List<Object> asyncEventList = Lists.newArrayList();
/**
* List of temporary files created during this request.
* They are deleted at the end of each request.
*/
private List<Path> temporaryFileList = Lists.newArrayList();
/** /**
* Private constructor. * Private constructor.
*/ */
@ -98,17 +88,6 @@ public class ThreadLocalContext {
asyncEventList.add(asyncEvent); asyncEventList.add(asyncEvent);
} }
/**
* Create a temporary file linked to the request.
*
* @return New temporary file
*/
public Path createTemporaryFile() throws IOException {
Path path = Files.createTempFile("sismics_docs", null);
temporaryFileList.add(path);
return path;
}
/** /**
* Fire all pending async events. * Fire all pending async events.
*/ */
@ -119,13 +98,5 @@ public class ThreadLocalContext {
iterator.remove(); iterator.remove();
AppContext.getInstance().getAsyncEventBus().post(asyncEvent); AppContext.getInstance().getAsyncEventBus().post(asyncEvent);
} }
if (!temporaryFileList.isEmpty()) {
// Some files were created during this request, add a cleanup event to the queue
// It works because we are using a one thread executor
AppContext.getInstance().getAsyncEventBus().post(
new TemporaryFileCleanupAsyncEvent(Lists.newArrayList(temporaryFileList)));
temporaryFileList.clear();
}
} }
} }

View File

@ -844,7 +844,7 @@ public class DocumentResource extends BaseResource {
// Save the file to a temporary file // Save the file to a temporary file
java.nio.file.Path unencryptedFile; java.nio.file.Path unencryptedFile;
try { try {
unencryptedFile = ThreadLocalContext.get().createTemporaryFile(); unencryptedFile = AppContext.getInstance().getFileService().createTemporaryFile();
Files.copy(fileBodyPart.getValueAs(InputStream.class), unencryptedFile, StandardCopyOption.REPLACE_EXISTING); Files.copy(fileBodyPart.getValueAs(InputStream.class), unencryptedFile, StandardCopyOption.REPLACE_EXISTING);
} catch (IOException e) { } catch (IOException e) {
throw new ServerException("StreamError", "Error reading the input file", e); throw new ServerException("StreamError", "Error reading the input file", e);

View File

@ -12,6 +12,7 @@ import com.sismics.docs.core.dao.dto.DocumentDto;
import com.sismics.docs.core.event.DocumentUpdatedAsyncEvent; import com.sismics.docs.core.event.DocumentUpdatedAsyncEvent;
import com.sismics.docs.core.event.FileDeletedAsyncEvent; import com.sismics.docs.core.event.FileDeletedAsyncEvent;
import com.sismics.docs.core.event.FileUpdatedAsyncEvent; import com.sismics.docs.core.event.FileUpdatedAsyncEvent;
import com.sismics.docs.core.model.context.AppContext;
import com.sismics.docs.core.model.jpa.File; import com.sismics.docs.core.model.jpa.File;
import com.sismics.docs.core.model.jpa.User; import com.sismics.docs.core.model.jpa.User;
import com.sismics.docs.core.util.DirectoryUtil; import com.sismics.docs.core.util.DirectoryUtil;
@ -111,7 +112,7 @@ public class FileResource extends BaseResource {
java.nio.file.Path unencryptedFile; java.nio.file.Path unencryptedFile;
long fileSize; long fileSize;
try { try {
unencryptedFile = ThreadLocalContext.get().createTemporaryFile(); unencryptedFile = AppContext.getInstance().getFileService().createTemporaryFile();
Files.copy(fileBodyPart.getValueAs(InputStream.class), unencryptedFile, StandardCopyOption.REPLACE_EXISTING); Files.copy(fileBodyPart.getValueAs(InputStream.class), unencryptedFile, StandardCopyOption.REPLACE_EXISTING);
fileSize = Files.size(unencryptedFile); fileSize = Files.size(unencryptedFile);
} catch (IOException e) { } catch (IOException e) {