From 520b1431650f055bbd68f388717ac658da8fa5f1 Mon Sep 17 00:00:00 2001 From: bgamard Date: Sun, 17 May 2020 21:00:01 +0200 Subject: [PATCH] #412: better handle concurrent updates and async listeners --- .../com/sismics/docs/core/dao/FileDao.java | 10 ++++- .../core/event/DocumentCreatedAsyncEvent.java | 29 +++++--------- .../core/event/FileDeletedAsyncEvent.java | 19 +++++----- .../sismics/docs/core/event/FileEvent.java | 19 +++++----- .../async/DocumentCreatedAsyncListener.java | 13 ++++++- .../async/FileDeletedAsyncListener.java | 6 +-- .../async/FileProcessingAsyncListener.java | 38 +++++++++++-------- .../listener/async/WebhookAsyncListener.java | 8 ++-- .../docs/core/service/InboxService.java | 2 +- .../com/sismics/docs/core/util/FileUtil.java | 14 +++---- .../core/util/action/ProcessFilesAction.java | 2 +- .../docs/rest/resource/DocumentResource.java | 6 +-- .../docs/rest/resource/FileResource.java | 6 +-- .../docs/rest/resource/UserResource.java | 4 +- 14 files changed, 93 insertions(+), 83 deletions(-) diff --git a/docs-core/src/main/java/com/sismics/docs/core/dao/FileDao.java b/docs-core/src/main/java/com/sismics/docs/core/dao/FileDao.java index 473511c9..a8ba1684 100644 --- a/docs-core/src/main/java/com/sismics/docs/core/dao/FileDao.java +++ b/docs-core/src/main/java/com/sismics/docs/core/dao/FileDao.java @@ -153,7 +153,15 @@ public class FileDao { return file; } - + + public void updateContent(File file) { + EntityManager em = ThreadLocalContext.get().getEntityManager(); + Query query = em.createNativeQuery("update T_FILE f set FIL_CONTENT_C = :content where f.FIL_ID_C = :id"); + query.setParameter("content", file.getContent()); + query.setParameter("id", file.getId()); + query.executeUpdate(); + } + /** * Gets a file by its ID. * diff --git a/docs-core/src/main/java/com/sismics/docs/core/event/DocumentCreatedAsyncEvent.java b/docs-core/src/main/java/com/sismics/docs/core/event/DocumentCreatedAsyncEvent.java index f43575ff..e5394bbb 100644 --- a/docs-core/src/main/java/com/sismics/docs/core/event/DocumentCreatedAsyncEvent.java +++ b/docs-core/src/main/java/com/sismics/docs/core/event/DocumentCreatedAsyncEvent.java @@ -1,7 +1,6 @@ package com.sismics.docs.core.event; import com.google.common.base.MoreObjects; -import com.sismics.docs.core.model.jpa.Document; /** * Document created event. @@ -10,32 +9,22 @@ import com.sismics.docs.core.model.jpa.Document; */ public class DocumentCreatedAsyncEvent extends UserEvent { /** - * Created document. + * Document ID. */ - private Document document; - - /** - * Getter of document. - * - * @return the document - */ - public Document getDocument() { - return document; + private String documentId; + + public String getDocumentId() { + return documentId; } - /** - * Setter of document. - * - * @param document document - */ - public void setDocument(Document document) { - this.document = document; + public void setDocumentId(String documentId) { + this.documentId = documentId; } @Override public String toString() { return MoreObjects.toStringHelper(this) - .add("document", document) - .toString(); + .add("documentId", documentId) + .toString(); } } \ No newline at end of file diff --git a/docs-core/src/main/java/com/sismics/docs/core/event/FileDeletedAsyncEvent.java b/docs-core/src/main/java/com/sismics/docs/core/event/FileDeletedAsyncEvent.java index 05835ae5..4a6e769a 100644 --- a/docs-core/src/main/java/com/sismics/docs/core/event/FileDeletedAsyncEvent.java +++ b/docs-core/src/main/java/com/sismics/docs/core/event/FileDeletedAsyncEvent.java @@ -1,7 +1,6 @@ package com.sismics.docs.core.event; import com.google.common.base.MoreObjects; -import com.sismics.docs.core.model.jpa.File; /** * File deleted event. @@ -10,22 +9,22 @@ import com.sismics.docs.core.model.jpa.File; */ public class FileDeletedAsyncEvent extends UserEvent { /** - * Deleted file. + * File ID. */ - private File file; - - public File getFile() { - return file; + private String fileId; + + public String getFileId() { + return fileId; } - public void setFile(File file) { - this.file = file; + public void setFileId(String fileId) { + this.fileId = fileId; } - + @Override public String toString() { return MoreObjects.toStringHelper(this) - .add("file", file) + .add("fileId", fileId) .toString(); } } \ No newline at end of file diff --git a/docs-core/src/main/java/com/sismics/docs/core/event/FileEvent.java b/docs-core/src/main/java/com/sismics/docs/core/event/FileEvent.java index 9337e84b..08caf296 100644 --- a/docs-core/src/main/java/com/sismics/docs/core/event/FileEvent.java +++ b/docs-core/src/main/java/com/sismics/docs/core/event/FileEvent.java @@ -1,7 +1,6 @@ package com.sismics.docs.core.event; import com.google.common.base.MoreObjects; -import com.sismics.docs.core.model.jpa.File; import java.nio.file.Path; @@ -12,9 +11,9 @@ import java.nio.file.Path; */ public abstract class FileEvent extends UserEvent { /** - * Created file. + * File ID. */ - private File file; + private String fileId; /** * Language of the file. @@ -25,15 +24,15 @@ public abstract class FileEvent extends UserEvent { * Unencrypted original file. */ private Path unencryptedFile; - - public File getFile() { - return file; + + public String getFileId() { + return fileId; } - public void setFile(File file) { - this.file = file; + public void setFileId(String fileId) { + this.fileId = fileId; } - + public String getLanguage() { return language; } @@ -54,7 +53,7 @@ public abstract class FileEvent extends UserEvent { @Override public String toString() { return MoreObjects.toStringHelper(this) - .add("file", file) + .add("fileId", fileId) .add("language", language) .toString(); } diff --git a/docs-core/src/main/java/com/sismics/docs/core/listener/async/DocumentCreatedAsyncListener.java b/docs-core/src/main/java/com/sismics/docs/core/listener/async/DocumentCreatedAsyncListener.java index 036d7ece..003eaa9e 100644 --- a/docs-core/src/main/java/com/sismics/docs/core/listener/async/DocumentCreatedAsyncListener.java +++ b/docs-core/src/main/java/com/sismics/docs/core/listener/async/DocumentCreatedAsyncListener.java @@ -3,9 +3,11 @@ package com.sismics.docs.core.listener.async; import com.google.common.eventbus.AllowConcurrentEvents; import com.google.common.eventbus.Subscribe; import com.sismics.docs.core.dao.ContributorDao; +import com.sismics.docs.core.dao.DocumentDao; import com.sismics.docs.core.event.DocumentCreatedAsyncEvent; import com.sismics.docs.core.model.context.AppContext; import com.sismics.docs.core.model.jpa.Contributor; +import com.sismics.docs.core.model.jpa.Document; import com.sismics.docs.core.util.TransactionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,15 +36,22 @@ public class DocumentCreatedAsyncListener { } TransactionUtil.handle(() -> { + // Fetch a fresh document + Document document = new DocumentDao().getById(event.getDocumentId()); + if (document == null) { + // The document has been deleted since + return; + } + // Add the first contributor (the creator of the document) ContributorDao contributorDao = new ContributorDao(); Contributor contributor = new Contributor(); - contributor.setDocumentId(event.getDocument().getId()); + contributor.setDocumentId(event.getDocumentId()); contributor.setUserId(event.getUserId()); contributorDao.create(contributor); // Update index - AppContext.getInstance().getIndexingHandler().createDocument(event.getDocument()); + AppContext.getInstance().getIndexingHandler().createDocument(document); }); } } diff --git a/docs-core/src/main/java/com/sismics/docs/core/listener/async/FileDeletedAsyncListener.java b/docs-core/src/main/java/com/sismics/docs/core/listener/async/FileDeletedAsyncListener.java index d1897c8b..54faf5f8 100644 --- a/docs-core/src/main/java/com/sismics/docs/core/listener/async/FileDeletedAsyncListener.java +++ b/docs-core/src/main/java/com/sismics/docs/core/listener/async/FileDeletedAsyncListener.java @@ -4,7 +4,6 @@ import com.google.common.eventbus.AllowConcurrentEvents; import com.google.common.eventbus.Subscribe; import com.sismics.docs.core.event.FileDeletedAsyncEvent; import com.sismics.docs.core.model.context.AppContext; -import com.sismics.docs.core.model.jpa.File; import com.sismics.docs.core.util.FileUtil; import com.sismics.docs.core.util.TransactionUtil; import org.slf4j.Logger; @@ -35,12 +34,11 @@ public class FileDeletedAsyncListener { } // Delete the file from storage - File file = event.getFile(); - FileUtil.delete(file); + FileUtil.delete(event.getFileId()); TransactionUtil.handle(() -> { // Update index - AppContext.getInstance().getIndexingHandler().deleteDocument(file.getId()); + AppContext.getInstance().getIndexingHandler().deleteDocument(event.getFileId()); }); } } diff --git a/docs-core/src/main/java/com/sismics/docs/core/listener/async/FileProcessingAsyncListener.java b/docs-core/src/main/java/com/sismics/docs/core/listener/async/FileProcessingAsyncListener.java index 6839ee86..092688cf 100644 --- a/docs-core/src/main/java/com/sismics/docs/core/listener/async/FileProcessingAsyncListener.java +++ b/docs-core/src/main/java/com/sismics/docs/core/listener/async/FileProcessingAsyncListener.java @@ -54,13 +54,18 @@ public class FileProcessingAsyncListener { TransactionUtil.handle(() -> { // Generate thumbnail, extract content - processFile(event); + File file = new FileDao().getActiveById(event.getFileId()); + if (file == null) { + // The file has been deleted since + return; + } + processFile(event, file); - // Update index - AppContext.getInstance().getIndexingHandler().createFile(event.getFile()); + // Update index with the file updated by side effect + AppContext.getInstance().getIndexingHandler().createFile(file); }); - FileUtil.endProcessingFile(event.getFile().getId()); + FileUtil.endProcessingFile(event.getFileId()); } /** @@ -77,27 +82,31 @@ public class FileProcessingAsyncListener { TransactionUtil.handle(() -> { // Generate thumbnail, extract content - processFile(event); + File file = new FileDao().getActiveById(event.getFileId()); + if (file == null) { + // The file has been deleted since + return; + } + processFile(event, file); - // Update index - AppContext.getInstance().getIndexingHandler().updateFile(event.getFile()); + // Update index with the file updated by side effect + AppContext.getInstance().getIndexingHandler().updateFile(file); }); - FileUtil.endProcessingFile(event.getFile().getId()); + FileUtil.endProcessingFile(event.getFileId()); } /** * Process the file (create/update). * * @param event File event + * @param file Fresh file */ - private void processFile(FileEvent event) { + private void processFile(FileEvent event, File file) { // Find a format handler - final File file = event.getFile(); FormatHandler formatHandler = FormatHandlerUtil.find(file.getMimeType()); if (formatHandler == null) { log.info("Format unhandled: " + file.getMimeType()); - FileUtil.endProcessingFile(file.getId()); return; } @@ -106,7 +115,6 @@ public class FileProcessingAsyncListener { User user = userDao.getById(file.getUserId()); if (user == null) { // The user has been deleted meanwhile - FileUtil.endProcessingFile(file.getId()); return; } @@ -133,7 +141,7 @@ public class FileProcessingAsyncListener { } } } catch (Exception e) { - log.error("Unable to generate thumbnails", e); + log.error("Unable to generate thumbnails for: " + file, e); } // Extract text content from the file @@ -142,7 +150,7 @@ public class FileProcessingAsyncListener { try { content = formatHandler.extractContent(event.getLanguage(), event.getUnencryptedFile()); } catch (Exception e) { - log.error("Error extracting content from: " + event.getFile(), e); + log.error("Error extracting content from: " + file, e); } log.info(MessageFormat.format("File content extracted in {0}ms", System.currentTimeMillis() - startTime)); @@ -154,6 +162,6 @@ public class FileProcessingAsyncListener { } file.setContent(content); - fileDao.update(file); + fileDao.updateContent(file); } } diff --git a/docs-core/src/main/java/com/sismics/docs/core/listener/async/WebhookAsyncListener.java b/docs-core/src/main/java/com/sismics/docs/core/listener/async/WebhookAsyncListener.java index 5ad81ae1..841f6ec0 100644 --- a/docs-core/src/main/java/com/sismics/docs/core/listener/async/WebhookAsyncListener.java +++ b/docs-core/src/main/java/com/sismics/docs/core/listener/async/WebhookAsyncListener.java @@ -36,7 +36,7 @@ public class WebhookAsyncListener { @Subscribe @AllowConcurrentEvents public void on(final DocumentCreatedAsyncEvent event) { - triggerWebhook(WebhookEvent.DOCUMENT_CREATED, event.getDocument().getId()); + triggerWebhook(WebhookEvent.DOCUMENT_CREATED, event.getDocumentId()); } @Subscribe @@ -54,19 +54,19 @@ public class WebhookAsyncListener { @Subscribe @AllowConcurrentEvents public void on(final FileCreatedAsyncEvent event) { - triggerWebhook(WebhookEvent.FILE_CREATED, event.getFile().getId()); + triggerWebhook(WebhookEvent.FILE_CREATED, event.getFileId()); } @Subscribe @AllowConcurrentEvents public void on(final FileUpdatedAsyncEvent event) { - triggerWebhook(WebhookEvent.FILE_UPDATED, event.getFile().getId()); + triggerWebhook(WebhookEvent.FILE_UPDATED, event.getFileId()); } @Subscribe @AllowConcurrentEvents public void on(final FileDeletedAsyncEvent event) { - triggerWebhook(WebhookEvent.FILE_DELETED, event.getFile().getId()); + triggerWebhook(WebhookEvent.FILE_DELETED, event.getFileId()); } /** diff --git a/docs-core/src/main/java/com/sismics/docs/core/service/InboxService.java b/docs-core/src/main/java/com/sismics/docs/core/service/InboxService.java index ae0b7500..7cfaf995 100644 --- a/docs-core/src/main/java/com/sismics/docs/core/service/InboxService.java +++ b/docs-core/src/main/java/com/sismics/docs/core/service/InboxService.java @@ -227,7 +227,7 @@ public class InboxService extends AbstractScheduledService { // Raise a document created event DocumentCreatedAsyncEvent documentCreatedAsyncEvent = new DocumentCreatedAsyncEvent(); documentCreatedAsyncEvent.setUserId("admin"); - documentCreatedAsyncEvent.setDocument(document); + documentCreatedAsyncEvent.setDocumentId(document.getId()); ThreadLocalContext.get().addAsyncEvent(documentCreatedAsyncEvent); // Add files to the document diff --git a/docs-core/src/main/java/com/sismics/docs/core/util/FileUtil.java b/docs-core/src/main/java/com/sismics/docs/core/util/FileUtil.java index 49357e33..44227aa8 100644 --- a/docs-core/src/main/java/com/sismics/docs/core/util/FileUtil.java +++ b/docs-core/src/main/java/com/sismics/docs/core/util/FileUtil.java @@ -76,12 +76,12 @@ public class FileUtil { /** * Remove a file from the storage filesystem. * - * @param file File to delete + * @param fileId ID of file to delete */ - public static void delete(File file) throws IOException { - Path storedFile = DirectoryUtil.getStorageDirectory().resolve(file.getId()); - Path webFile = DirectoryUtil.getStorageDirectory().resolve(file.getId() + "_web"); - Path thumbnailFile = DirectoryUtil.getStorageDirectory().resolve(file.getId() + "_thumb"); + public static void delete(String fileId) throws IOException { + Path storedFile = DirectoryUtil.getStorageDirectory().resolve(fileId); + Path webFile = DirectoryUtil.getStorageDirectory().resolve(fileId + "_web"); + Path thumbnailFile = DirectoryUtil.getStorageDirectory().resolve(fileId + "_thumb"); if (Files.exists(storedFile)) { Files.delete(storedFile); @@ -126,7 +126,7 @@ public class FileUtil { // Validate global quota String globalStorageQuotaStr = System.getenv(Constants.GLOBAL_QUOTA_ENV); if (!Strings.isNullOrEmpty(globalStorageQuotaStr)) { - long globalStorageQuota = Long.valueOf(globalStorageQuotaStr); + long globalStorageQuota = Long.parseLong(globalStorageQuotaStr); long globalStorageCurrent = userDao.getGlobalStorageCurrent(); if (globalStorageCurrent + fileSize > globalStorageQuota) { throw new IOException("QuotaReached"); @@ -190,7 +190,7 @@ public class FileUtil { FileCreatedAsyncEvent fileCreatedAsyncEvent = new FileCreatedAsyncEvent(); fileCreatedAsyncEvent.setUserId(userId); fileCreatedAsyncEvent.setLanguage(language); - fileCreatedAsyncEvent.setFile(file); + fileCreatedAsyncEvent.setFileId(file.getId()); fileCreatedAsyncEvent.setUnencryptedFile(unencryptedFile); ThreadLocalContext.get().addAsyncEvent(fileCreatedAsyncEvent); diff --git a/docs-core/src/main/java/com/sismics/docs/core/util/action/ProcessFilesAction.java b/docs-core/src/main/java/com/sismics/docs/core/util/action/ProcessFilesAction.java index 920e228f..9f98f5ad 100644 --- a/docs-core/src/main/java/com/sismics/docs/core/util/action/ProcessFilesAction.java +++ b/docs-core/src/main/java/com/sismics/docs/core/util/action/ProcessFilesAction.java @@ -48,7 +48,7 @@ public class ProcessFilesAction implements Action { FileUpdatedAsyncEvent event = new FileUpdatedAsyncEvent(); event.setUserId("admin"); event.setLanguage(documentDto.getLanguage()); - event.setFile(file); + event.setFileId(file.getId()); event.setUnencryptedFile(unencryptedFile); ThreadLocalContext.get().addAsyncEvent(event); } diff --git a/docs-web/src/main/java/com/sismics/docs/rest/resource/DocumentResource.java b/docs-web/src/main/java/com/sismics/docs/rest/resource/DocumentResource.java index 8588980c..2bcbbe8e 100644 --- a/docs-web/src/main/java/com/sismics/docs/rest/resource/DocumentResource.java +++ b/docs-web/src/main/java/com/sismics/docs/rest/resource/DocumentResource.java @@ -727,7 +727,7 @@ public class DocumentResource extends BaseResource { // Raise a document created event DocumentCreatedAsyncEvent documentCreatedAsyncEvent = new DocumentCreatedAsyncEvent(); documentCreatedAsyncEvent.setUserId(principal.getId()); - documentCreatedAsyncEvent.setDocument(document); + documentCreatedAsyncEvent.setDocumentId(document.getId()); ThreadLocalContext.get().addAsyncEvent(documentCreatedAsyncEvent); JsonObjectBuilder response = Json.createObjectBuilder() @@ -944,7 +944,7 @@ public class DocumentResource extends BaseResource { // Raise a document created event DocumentCreatedAsyncEvent documentCreatedAsyncEvent = new DocumentCreatedAsyncEvent(); documentCreatedAsyncEvent.setUserId(principal.getId()); - documentCreatedAsyncEvent.setDocument(document); + documentCreatedAsyncEvent.setDocumentId(document.getId()); ThreadLocalContext.get().addAsyncEvent(documentCreatedAsyncEvent); // Add files to the document @@ -1013,7 +1013,7 @@ public class DocumentResource extends BaseResource { // Raise file deleted event FileDeletedAsyncEvent fileDeletedAsyncEvent = new FileDeletedAsyncEvent(); fileDeletedAsyncEvent.setUserId(principal.getId()); - fileDeletedAsyncEvent.setFile(file); + fileDeletedAsyncEvent.setFileId(file.getId()); ThreadLocalContext.get().addAsyncEvent(fileDeletedAsyncEvent); } diff --git a/docs-web/src/main/java/com/sismics/docs/rest/resource/FileResource.java b/docs-web/src/main/java/com/sismics/docs/rest/resource/FileResource.java index 16c6040f..18d43d83 100644 --- a/docs-web/src/main/java/com/sismics/docs/rest/resource/FileResource.java +++ b/docs-web/src/main/java/com/sismics/docs/rest/resource/FileResource.java @@ -202,7 +202,7 @@ public class FileResource extends BaseResource { FileUpdatedAsyncEvent fileUpdatedAsyncEvent = new FileUpdatedAsyncEvent(); fileUpdatedAsyncEvent.setUserId(principal.getId()); fileUpdatedAsyncEvent.setLanguage(documentDto.getLanguage()); - fileUpdatedAsyncEvent.setFile(file); + fileUpdatedAsyncEvent.setFileId(file.getId()); fileUpdatedAsyncEvent.setUnencryptedFile(unencryptedFile); ThreadLocalContext.get().addAsyncEvent(fileUpdatedAsyncEvent); @@ -310,7 +310,7 @@ public class FileResource extends BaseResource { FileUpdatedAsyncEvent event = new FileUpdatedAsyncEvent(); event.setUserId(principal.getId()); event.setLanguage(documentDto.getLanguage()); - event.setFile(file); + event.setFileId(file.getId()); event.setUnencryptedFile(unencryptedFile); ThreadLocalContext.get().addAsyncEvent(event); } catch (Exception e) { @@ -548,7 +548,7 @@ public class FileResource extends BaseResource { // Raise a new file deleted event FileDeletedAsyncEvent fileDeletedAsyncEvent = new FileDeletedAsyncEvent(); fileDeletedAsyncEvent.setUserId(principal.getId()); - fileDeletedAsyncEvent.setFile(file); + fileDeletedAsyncEvent.setFileId(file.getId()); ThreadLocalContext.get().addAsyncEvent(fileDeletedAsyncEvent); if (file.getDocumentId() != null) { diff --git a/docs-web/src/main/java/com/sismics/docs/rest/resource/UserResource.java b/docs-web/src/main/java/com/sismics/docs/rest/resource/UserResource.java index dd413cac..4c5c031f 100644 --- a/docs-web/src/main/java/com/sismics/docs/rest/resource/UserResource.java +++ b/docs-web/src/main/java/com/sismics/docs/rest/resource/UserResource.java @@ -482,7 +482,7 @@ public class UserResource extends BaseResource { for (File file : fileList) { FileDeletedAsyncEvent fileDeletedAsyncEvent = new FileDeletedAsyncEvent(); fileDeletedAsyncEvent.setUserId(principal.getId()); - fileDeletedAsyncEvent.setFile(file); + fileDeletedAsyncEvent.setFileId(file.getId()); ThreadLocalContext.get().addAsyncEvent(fileDeletedAsyncEvent); } @@ -564,7 +564,7 @@ public class UserResource extends BaseResource { for (File file : fileList) { FileDeletedAsyncEvent fileDeletedAsyncEvent = new FileDeletedAsyncEvent(); fileDeletedAsyncEvent.setUserId(principal.getId()); - fileDeletedAsyncEvent.setFile(file); + fileDeletedAsyncEvent.setFileId(file.getId()); ThreadLocalContext.get().addAsyncEvent(fileDeletedAsyncEvent); }