#412: better handle concurrent updates and async listeners

This commit is contained in:
bgamard 2020-05-17 21:00:01 +02:00
parent 0d058b9c9c
commit 520b143165
14 changed files with 93 additions and 83 deletions

View File

@ -154,6 +154,14 @@ public class FileDao {
return file; return file;
} }
public void updateContent(File file) {
EntityManager em = ThreadLocalContext.get().getEntityManager();
Query query = em.createNativeQuery("update T_FILE f set FIL_CONTENT_C = :content where f.FIL_ID_C = :id");
query.setParameter("content", file.getContent());
query.setParameter("id", file.getId());
query.executeUpdate();
}
/** /**
* Gets a file by its ID. * Gets a file by its ID.
* *

View File

@ -1,7 +1,6 @@
package com.sismics.docs.core.event; package com.sismics.docs.core.event;
import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects;
import com.sismics.docs.core.model.jpa.Document;
/** /**
* Document created event. * Document created event.
@ -10,32 +9,22 @@ import com.sismics.docs.core.model.jpa.Document;
*/ */
public class DocumentCreatedAsyncEvent extends UserEvent { public class DocumentCreatedAsyncEvent extends UserEvent {
/** /**
* Created document. * Document ID.
*/ */
private Document document; private String documentId;
/** public String getDocumentId() {
* Getter of document. return documentId;
*
* @return the document
*/
public Document getDocument() {
return document;
} }
/** public void setDocumentId(String documentId) {
* Setter of document. this.documentId = documentId;
*
* @param document document
*/
public void setDocument(Document document) {
this.document = document;
} }
@Override @Override
public String toString() { public String toString() {
return MoreObjects.toStringHelper(this) return MoreObjects.toStringHelper(this)
.add("document", document) .add("documentId", documentId)
.toString(); .toString();
} }
} }

View File

@ -1,7 +1,6 @@
package com.sismics.docs.core.event; package com.sismics.docs.core.event;
import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects;
import com.sismics.docs.core.model.jpa.File;
/** /**
* File deleted event. * File deleted event.
@ -10,22 +9,22 @@ import com.sismics.docs.core.model.jpa.File;
*/ */
public class FileDeletedAsyncEvent extends UserEvent { public class FileDeletedAsyncEvent extends UserEvent {
/** /**
* Deleted file. * File ID.
*/ */
private File file; private String fileId;
public File getFile() { public String getFileId() {
return file; return fileId;
} }
public void setFile(File file) { public void setFileId(String fileId) {
this.file = file; this.fileId = fileId;
} }
@Override @Override
public String toString() { public String toString() {
return MoreObjects.toStringHelper(this) return MoreObjects.toStringHelper(this)
.add("file", file) .add("fileId", fileId)
.toString(); .toString();
} }
} }

View File

@ -1,7 +1,6 @@
package com.sismics.docs.core.event; package com.sismics.docs.core.event;
import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects;
import com.sismics.docs.core.model.jpa.File;
import java.nio.file.Path; import java.nio.file.Path;
@ -12,9 +11,9 @@ import java.nio.file.Path;
*/ */
public abstract class FileEvent extends UserEvent { public abstract class FileEvent extends UserEvent {
/** /**
* Created file. * File ID.
*/ */
private File file; private String fileId;
/** /**
* Language of the file. * Language of the file.
@ -26,12 +25,12 @@ public abstract class FileEvent extends UserEvent {
*/ */
private Path unencryptedFile; private Path unencryptedFile;
public File getFile() { public String getFileId() {
return file; return fileId;
} }
public void setFile(File file) { public void setFileId(String fileId) {
this.file = file; this.fileId = fileId;
} }
public String getLanguage() { public String getLanguage() {
@ -54,7 +53,7 @@ public abstract class FileEvent extends UserEvent {
@Override @Override
public String toString() { public String toString() {
return MoreObjects.toStringHelper(this) return MoreObjects.toStringHelper(this)
.add("file", file) .add("fileId", fileId)
.add("language", language) .add("language", language)
.toString(); .toString();
} }

View File

@ -3,9 +3,11 @@ package com.sismics.docs.core.listener.async;
import com.google.common.eventbus.AllowConcurrentEvents; import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe; import com.google.common.eventbus.Subscribe;
import com.sismics.docs.core.dao.ContributorDao; import com.sismics.docs.core.dao.ContributorDao;
import com.sismics.docs.core.dao.DocumentDao;
import com.sismics.docs.core.event.DocumentCreatedAsyncEvent; import com.sismics.docs.core.event.DocumentCreatedAsyncEvent;
import com.sismics.docs.core.model.context.AppContext; import com.sismics.docs.core.model.context.AppContext;
import com.sismics.docs.core.model.jpa.Contributor; import com.sismics.docs.core.model.jpa.Contributor;
import com.sismics.docs.core.model.jpa.Document;
import com.sismics.docs.core.util.TransactionUtil; import com.sismics.docs.core.util.TransactionUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -34,15 +36,22 @@ public class DocumentCreatedAsyncListener {
} }
TransactionUtil.handle(() -> { TransactionUtil.handle(() -> {
// Fetch a fresh document
Document document = new DocumentDao().getById(event.getDocumentId());
if (document == null) {
// The document has been deleted since
return;
}
// Add the first contributor (the creator of the document) // Add the first contributor (the creator of the document)
ContributorDao contributorDao = new ContributorDao(); ContributorDao contributorDao = new ContributorDao();
Contributor contributor = new Contributor(); Contributor contributor = new Contributor();
contributor.setDocumentId(event.getDocument().getId()); contributor.setDocumentId(event.getDocumentId());
contributor.setUserId(event.getUserId()); contributor.setUserId(event.getUserId());
contributorDao.create(contributor); contributorDao.create(contributor);
// Update index // Update index
AppContext.getInstance().getIndexingHandler().createDocument(event.getDocument()); AppContext.getInstance().getIndexingHandler().createDocument(document);
}); });
} }
} }

View File

@ -4,7 +4,6 @@ import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe; import com.google.common.eventbus.Subscribe;
import com.sismics.docs.core.event.FileDeletedAsyncEvent; import com.sismics.docs.core.event.FileDeletedAsyncEvent;
import com.sismics.docs.core.model.context.AppContext; import com.sismics.docs.core.model.context.AppContext;
import com.sismics.docs.core.model.jpa.File;
import com.sismics.docs.core.util.FileUtil; import com.sismics.docs.core.util.FileUtil;
import com.sismics.docs.core.util.TransactionUtil; import com.sismics.docs.core.util.TransactionUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -35,12 +34,11 @@ public class FileDeletedAsyncListener {
} }
// Delete the file from storage // Delete the file from storage
File file = event.getFile(); FileUtil.delete(event.getFileId());
FileUtil.delete(file);
TransactionUtil.handle(() -> { TransactionUtil.handle(() -> {
// Update index // Update index
AppContext.getInstance().getIndexingHandler().deleteDocument(file.getId()); AppContext.getInstance().getIndexingHandler().deleteDocument(event.getFileId());
}); });
} }
} }

View File

@ -54,13 +54,18 @@ public class FileProcessingAsyncListener {
TransactionUtil.handle(() -> { TransactionUtil.handle(() -> {
// Generate thumbnail, extract content // Generate thumbnail, extract content
processFile(event); File file = new FileDao().getActiveById(event.getFileId());
if (file == null) {
// The file has been deleted since
return;
}
processFile(event, file);
// Update index // Update index with the file updated by side effect
AppContext.getInstance().getIndexingHandler().createFile(event.getFile()); AppContext.getInstance().getIndexingHandler().createFile(file);
}); });
FileUtil.endProcessingFile(event.getFile().getId()); FileUtil.endProcessingFile(event.getFileId());
} }
/** /**
@ -77,27 +82,31 @@ public class FileProcessingAsyncListener {
TransactionUtil.handle(() -> { TransactionUtil.handle(() -> {
// Generate thumbnail, extract content // Generate thumbnail, extract content
processFile(event); File file = new FileDao().getActiveById(event.getFileId());
if (file == null) {
// The file has been deleted since
return;
}
processFile(event, file);
// Update index // Update index with the file updated by side effect
AppContext.getInstance().getIndexingHandler().updateFile(event.getFile()); AppContext.getInstance().getIndexingHandler().updateFile(file);
}); });
FileUtil.endProcessingFile(event.getFile().getId()); FileUtil.endProcessingFile(event.getFileId());
} }
/** /**
* Process the file (create/update). * Process the file (create/update).
* *
* @param event File event * @param event File event
* @param file Fresh file
*/ */
private void processFile(FileEvent event) { private void processFile(FileEvent event, File file) {
// Find a format handler // Find a format handler
final File file = event.getFile();
FormatHandler formatHandler = FormatHandlerUtil.find(file.getMimeType()); FormatHandler formatHandler = FormatHandlerUtil.find(file.getMimeType());
if (formatHandler == null) { if (formatHandler == null) {
log.info("Format unhandled: " + file.getMimeType()); log.info("Format unhandled: " + file.getMimeType());
FileUtil.endProcessingFile(file.getId());
return; return;
} }
@ -106,7 +115,6 @@ public class FileProcessingAsyncListener {
User user = userDao.getById(file.getUserId()); User user = userDao.getById(file.getUserId());
if (user == null) { if (user == null) {
// The user has been deleted meanwhile // The user has been deleted meanwhile
FileUtil.endProcessingFile(file.getId());
return; return;
} }
@ -133,7 +141,7 @@ public class FileProcessingAsyncListener {
} }
} }
} catch (Exception e) { } catch (Exception e) {
log.error("Unable to generate thumbnails", e); log.error("Unable to generate thumbnails for: " + file, e);
} }
// Extract text content from the file // Extract text content from the file
@ -142,7 +150,7 @@ public class FileProcessingAsyncListener {
try { try {
content = formatHandler.extractContent(event.getLanguage(), event.getUnencryptedFile()); content = formatHandler.extractContent(event.getLanguage(), event.getUnencryptedFile());
} catch (Exception e) { } catch (Exception e) {
log.error("Error extracting content from: " + event.getFile(), e); log.error("Error extracting content from: " + file, e);
} }
log.info(MessageFormat.format("File content extracted in {0}ms", System.currentTimeMillis() - startTime)); log.info(MessageFormat.format("File content extracted in {0}ms", System.currentTimeMillis() - startTime));
@ -154,6 +162,6 @@ public class FileProcessingAsyncListener {
} }
file.setContent(content); file.setContent(content);
fileDao.update(file); fileDao.updateContent(file);
} }
} }

View File

@ -36,7 +36,7 @@ public class WebhookAsyncListener {
@Subscribe @Subscribe
@AllowConcurrentEvents @AllowConcurrentEvents
public void on(final DocumentCreatedAsyncEvent event) { public void on(final DocumentCreatedAsyncEvent event) {
triggerWebhook(WebhookEvent.DOCUMENT_CREATED, event.getDocument().getId()); triggerWebhook(WebhookEvent.DOCUMENT_CREATED, event.getDocumentId());
} }
@Subscribe @Subscribe
@ -54,19 +54,19 @@ public class WebhookAsyncListener {
@Subscribe @Subscribe
@AllowConcurrentEvents @AllowConcurrentEvents
public void on(final FileCreatedAsyncEvent event) { public void on(final FileCreatedAsyncEvent event) {
triggerWebhook(WebhookEvent.FILE_CREATED, event.getFile().getId()); triggerWebhook(WebhookEvent.FILE_CREATED, event.getFileId());
} }
@Subscribe @Subscribe
@AllowConcurrentEvents @AllowConcurrentEvents
public void on(final FileUpdatedAsyncEvent event) { public void on(final FileUpdatedAsyncEvent event) {
triggerWebhook(WebhookEvent.FILE_UPDATED, event.getFile().getId()); triggerWebhook(WebhookEvent.FILE_UPDATED, event.getFileId());
} }
@Subscribe @Subscribe
@AllowConcurrentEvents @AllowConcurrentEvents
public void on(final FileDeletedAsyncEvent event) { public void on(final FileDeletedAsyncEvent event) {
triggerWebhook(WebhookEvent.FILE_DELETED, event.getFile().getId()); triggerWebhook(WebhookEvent.FILE_DELETED, event.getFileId());
} }
/** /**

View File

@ -227,7 +227,7 @@ public class InboxService extends AbstractScheduledService {
// Raise a document created event // Raise a document created event
DocumentCreatedAsyncEvent documentCreatedAsyncEvent = new DocumentCreatedAsyncEvent(); DocumentCreatedAsyncEvent documentCreatedAsyncEvent = new DocumentCreatedAsyncEvent();
documentCreatedAsyncEvent.setUserId("admin"); documentCreatedAsyncEvent.setUserId("admin");
documentCreatedAsyncEvent.setDocument(document); documentCreatedAsyncEvent.setDocumentId(document.getId());
ThreadLocalContext.get().addAsyncEvent(documentCreatedAsyncEvent); ThreadLocalContext.get().addAsyncEvent(documentCreatedAsyncEvent);
// Add files to the document // Add files to the document

View File

@ -76,12 +76,12 @@ public class FileUtil {
/** /**
* Remove a file from the storage filesystem. * Remove a file from the storage filesystem.
* *
* @param file File to delete * @param fileId ID of file to delete
*/ */
public static void delete(File file) throws IOException { public static void delete(String fileId) throws IOException {
Path storedFile = DirectoryUtil.getStorageDirectory().resolve(file.getId()); Path storedFile = DirectoryUtil.getStorageDirectory().resolve(fileId);
Path webFile = DirectoryUtil.getStorageDirectory().resolve(file.getId() + "_web"); Path webFile = DirectoryUtil.getStorageDirectory().resolve(fileId + "_web");
Path thumbnailFile = DirectoryUtil.getStorageDirectory().resolve(file.getId() + "_thumb"); Path thumbnailFile = DirectoryUtil.getStorageDirectory().resolve(fileId + "_thumb");
if (Files.exists(storedFile)) { if (Files.exists(storedFile)) {
Files.delete(storedFile); Files.delete(storedFile);
@ -126,7 +126,7 @@ public class FileUtil {
// Validate global quota // Validate global quota
String globalStorageQuotaStr = System.getenv(Constants.GLOBAL_QUOTA_ENV); String globalStorageQuotaStr = System.getenv(Constants.GLOBAL_QUOTA_ENV);
if (!Strings.isNullOrEmpty(globalStorageQuotaStr)) { if (!Strings.isNullOrEmpty(globalStorageQuotaStr)) {
long globalStorageQuota = Long.valueOf(globalStorageQuotaStr); long globalStorageQuota = Long.parseLong(globalStorageQuotaStr);
long globalStorageCurrent = userDao.getGlobalStorageCurrent(); long globalStorageCurrent = userDao.getGlobalStorageCurrent();
if (globalStorageCurrent + fileSize > globalStorageQuota) { if (globalStorageCurrent + fileSize > globalStorageQuota) {
throw new IOException("QuotaReached"); throw new IOException("QuotaReached");
@ -190,7 +190,7 @@ public class FileUtil {
FileCreatedAsyncEvent fileCreatedAsyncEvent = new FileCreatedAsyncEvent(); FileCreatedAsyncEvent fileCreatedAsyncEvent = new FileCreatedAsyncEvent();
fileCreatedAsyncEvent.setUserId(userId); fileCreatedAsyncEvent.setUserId(userId);
fileCreatedAsyncEvent.setLanguage(language); fileCreatedAsyncEvent.setLanguage(language);
fileCreatedAsyncEvent.setFile(file); fileCreatedAsyncEvent.setFileId(file.getId());
fileCreatedAsyncEvent.setUnencryptedFile(unencryptedFile); fileCreatedAsyncEvent.setUnencryptedFile(unencryptedFile);
ThreadLocalContext.get().addAsyncEvent(fileCreatedAsyncEvent); ThreadLocalContext.get().addAsyncEvent(fileCreatedAsyncEvent);

View File

@ -48,7 +48,7 @@ public class ProcessFilesAction implements Action {
FileUpdatedAsyncEvent event = new FileUpdatedAsyncEvent(); FileUpdatedAsyncEvent event = new FileUpdatedAsyncEvent();
event.setUserId("admin"); event.setUserId("admin");
event.setLanguage(documentDto.getLanguage()); event.setLanguage(documentDto.getLanguage());
event.setFile(file); event.setFileId(file.getId());
event.setUnencryptedFile(unencryptedFile); event.setUnencryptedFile(unencryptedFile);
ThreadLocalContext.get().addAsyncEvent(event); ThreadLocalContext.get().addAsyncEvent(event);
} }

View File

@ -727,7 +727,7 @@ public class DocumentResource extends BaseResource {
// Raise a document created event // Raise a document created event
DocumentCreatedAsyncEvent documentCreatedAsyncEvent = new DocumentCreatedAsyncEvent(); DocumentCreatedAsyncEvent documentCreatedAsyncEvent = new DocumentCreatedAsyncEvent();
documentCreatedAsyncEvent.setUserId(principal.getId()); documentCreatedAsyncEvent.setUserId(principal.getId());
documentCreatedAsyncEvent.setDocument(document); documentCreatedAsyncEvent.setDocumentId(document.getId());
ThreadLocalContext.get().addAsyncEvent(documentCreatedAsyncEvent); ThreadLocalContext.get().addAsyncEvent(documentCreatedAsyncEvent);
JsonObjectBuilder response = Json.createObjectBuilder() JsonObjectBuilder response = Json.createObjectBuilder()
@ -944,7 +944,7 @@ public class DocumentResource extends BaseResource {
// Raise a document created event // Raise a document created event
DocumentCreatedAsyncEvent documentCreatedAsyncEvent = new DocumentCreatedAsyncEvent(); DocumentCreatedAsyncEvent documentCreatedAsyncEvent = new DocumentCreatedAsyncEvent();
documentCreatedAsyncEvent.setUserId(principal.getId()); documentCreatedAsyncEvent.setUserId(principal.getId());
documentCreatedAsyncEvent.setDocument(document); documentCreatedAsyncEvent.setDocumentId(document.getId());
ThreadLocalContext.get().addAsyncEvent(documentCreatedAsyncEvent); ThreadLocalContext.get().addAsyncEvent(documentCreatedAsyncEvent);
// Add files to the document // Add files to the document
@ -1013,7 +1013,7 @@ public class DocumentResource extends BaseResource {
// Raise file deleted event // Raise file deleted event
FileDeletedAsyncEvent fileDeletedAsyncEvent = new FileDeletedAsyncEvent(); FileDeletedAsyncEvent fileDeletedAsyncEvent = new FileDeletedAsyncEvent();
fileDeletedAsyncEvent.setUserId(principal.getId()); fileDeletedAsyncEvent.setUserId(principal.getId());
fileDeletedAsyncEvent.setFile(file); fileDeletedAsyncEvent.setFileId(file.getId());
ThreadLocalContext.get().addAsyncEvent(fileDeletedAsyncEvent); ThreadLocalContext.get().addAsyncEvent(fileDeletedAsyncEvent);
} }

View File

@ -202,7 +202,7 @@ public class FileResource extends BaseResource {
FileUpdatedAsyncEvent fileUpdatedAsyncEvent = new FileUpdatedAsyncEvent(); FileUpdatedAsyncEvent fileUpdatedAsyncEvent = new FileUpdatedAsyncEvent();
fileUpdatedAsyncEvent.setUserId(principal.getId()); fileUpdatedAsyncEvent.setUserId(principal.getId());
fileUpdatedAsyncEvent.setLanguage(documentDto.getLanguage()); fileUpdatedAsyncEvent.setLanguage(documentDto.getLanguage());
fileUpdatedAsyncEvent.setFile(file); fileUpdatedAsyncEvent.setFileId(file.getId());
fileUpdatedAsyncEvent.setUnencryptedFile(unencryptedFile); fileUpdatedAsyncEvent.setUnencryptedFile(unencryptedFile);
ThreadLocalContext.get().addAsyncEvent(fileUpdatedAsyncEvent); ThreadLocalContext.get().addAsyncEvent(fileUpdatedAsyncEvent);
@ -310,7 +310,7 @@ public class FileResource extends BaseResource {
FileUpdatedAsyncEvent event = new FileUpdatedAsyncEvent(); FileUpdatedAsyncEvent event = new FileUpdatedAsyncEvent();
event.setUserId(principal.getId()); event.setUserId(principal.getId());
event.setLanguage(documentDto.getLanguage()); event.setLanguage(documentDto.getLanguage());
event.setFile(file); event.setFileId(file.getId());
event.setUnencryptedFile(unencryptedFile); event.setUnencryptedFile(unencryptedFile);
ThreadLocalContext.get().addAsyncEvent(event); ThreadLocalContext.get().addAsyncEvent(event);
} catch (Exception e) { } catch (Exception e) {
@ -548,7 +548,7 @@ public class FileResource extends BaseResource {
// Raise a new file deleted event // Raise a new file deleted event
FileDeletedAsyncEvent fileDeletedAsyncEvent = new FileDeletedAsyncEvent(); FileDeletedAsyncEvent fileDeletedAsyncEvent = new FileDeletedAsyncEvent();
fileDeletedAsyncEvent.setUserId(principal.getId()); fileDeletedAsyncEvent.setUserId(principal.getId());
fileDeletedAsyncEvent.setFile(file); fileDeletedAsyncEvent.setFileId(file.getId());
ThreadLocalContext.get().addAsyncEvent(fileDeletedAsyncEvent); ThreadLocalContext.get().addAsyncEvent(fileDeletedAsyncEvent);
if (file.getDocumentId() != null) { if (file.getDocumentId() != null) {

View File

@ -482,7 +482,7 @@ public class UserResource extends BaseResource {
for (File file : fileList) { for (File file : fileList) {
FileDeletedAsyncEvent fileDeletedAsyncEvent = new FileDeletedAsyncEvent(); FileDeletedAsyncEvent fileDeletedAsyncEvent = new FileDeletedAsyncEvent();
fileDeletedAsyncEvent.setUserId(principal.getId()); fileDeletedAsyncEvent.setUserId(principal.getId());
fileDeletedAsyncEvent.setFile(file); fileDeletedAsyncEvent.setFileId(file.getId());
ThreadLocalContext.get().addAsyncEvent(fileDeletedAsyncEvent); ThreadLocalContext.get().addAsyncEvent(fileDeletedAsyncEvent);
} }
@ -564,7 +564,7 @@ public class UserResource extends BaseResource {
for (File file : fileList) { for (File file : fileList) {
FileDeletedAsyncEvent fileDeletedAsyncEvent = new FileDeletedAsyncEvent(); FileDeletedAsyncEvent fileDeletedAsyncEvent = new FileDeletedAsyncEvent();
fileDeletedAsyncEvent.setUserId(principal.getId()); fileDeletedAsyncEvent.setUserId(principal.getId());
fileDeletedAsyncEvent.setFile(file); fileDeletedAsyncEvent.setFileId(file.getId());
ThreadLocalContext.get().addAsyncEvent(fileDeletedAsyncEvent); ThreadLocalContext.get().addAsyncEvent(fileDeletedAsyncEvent);
} }