From b265b8b1e0c26ee5cc245109f1ab3a877871e45f Mon Sep 17 00:00:00 2001 From: Benjamin Gamard Date: Wed, 11 Apr 2018 11:27:55 +0200 Subject: [PATCH] process events concurrently --- .../listener/async/AclCreatedAsyncListener.java | 2 ++ .../listener/async/AclDeletedAsyncListener.java | 2 ++ .../async/DocumentCreatedAsyncListener.java | 2 ++ .../async/DocumentDeletedAsyncListener.java | 3 ++- .../async/DocumentUpdatedAsyncListener.java | 2 ++ .../listener/async/FileDeletedAsyncListener.java | 2 ++ .../async/FileProcessingAsyncListener.java | 2 ++ .../listener/async/PasswordLostAsyncListener.java | 12 +++++++----- .../listener/async/RebuildIndexAsyncListener.java | 8 +++++--- .../async/RouteStepValidateAsyncListener.java | 14 ++++++++------ .../docs/core/model/context/AppContext.java | 4 ++-- 11 files changed, 36 insertions(+), 17 deletions(-) diff --git a/docs-core/src/main/java/com/sismics/docs/core/listener/async/AclCreatedAsyncListener.java b/docs-core/src/main/java/com/sismics/docs/core/listener/async/AclCreatedAsyncListener.java index d5a9fb6c..63dabca7 100644 --- a/docs-core/src/main/java/com/sismics/docs/core/listener/async/AclCreatedAsyncListener.java +++ b/docs-core/src/main/java/com/sismics/docs/core/listener/async/AclCreatedAsyncListener.java @@ -1,5 +1,6 @@ package com.sismics.docs.core.listener.async; +import com.google.common.eventbus.AllowConcurrentEvents; import com.google.common.eventbus.Subscribe; import com.sismics.docs.core.event.AclCreatedAsyncEvent; import com.sismics.docs.core.model.context.AppContext; @@ -24,6 +25,7 @@ public class AclCreatedAsyncListener { * @param event ACL created event */ @Subscribe + @AllowConcurrentEvents public void on(final AclCreatedAsyncEvent event) { if (log.isInfoEnabled()) { log.info("ACL created event: " + event.toString()); diff --git a/docs-core/src/main/java/com/sismics/docs/core/listener/async/AclDeletedAsyncListener.java b/docs-core/src/main/java/com/sismics/docs/core/listener/async/AclDeletedAsyncListener.java index e5a24425..2dd3cc06 100644 --- a/docs-core/src/main/java/com/sismics/docs/core/listener/async/AclDeletedAsyncListener.java +++ b/docs-core/src/main/java/com/sismics/docs/core/listener/async/AclDeletedAsyncListener.java @@ -1,5 +1,6 @@ package com.sismics.docs.core.listener.async; +import com.google.common.eventbus.AllowConcurrentEvents; import com.google.common.eventbus.Subscribe; import com.sismics.docs.core.event.AclDeletedAsyncEvent; import com.sismics.docs.core.model.context.AppContext; @@ -24,6 +25,7 @@ public class AclDeletedAsyncListener { * @param event ACL deleted event */ @Subscribe + @AllowConcurrentEvents public void on(final AclDeletedAsyncEvent event) { if (log.isInfoEnabled()) { log.info("ACL deleted event: " + event.toString()); diff --git a/docs-core/src/main/java/com/sismics/docs/core/listener/async/DocumentCreatedAsyncListener.java b/docs-core/src/main/java/com/sismics/docs/core/listener/async/DocumentCreatedAsyncListener.java index 6c781385..036d7ece 100644 --- a/docs-core/src/main/java/com/sismics/docs/core/listener/async/DocumentCreatedAsyncListener.java +++ b/docs-core/src/main/java/com/sismics/docs/core/listener/async/DocumentCreatedAsyncListener.java @@ -1,5 +1,6 @@ package com.sismics.docs.core.listener.async; +import com.google.common.eventbus.AllowConcurrentEvents; import com.google.common.eventbus.Subscribe; import com.sismics.docs.core.dao.ContributorDao; import com.sismics.docs.core.event.DocumentCreatedAsyncEvent; @@ -26,6 +27,7 @@ public class DocumentCreatedAsyncListener { * @param event Document created event */ @Subscribe + @AllowConcurrentEvents public void on(final DocumentCreatedAsyncEvent event) { if (log.isInfoEnabled()) { log.info("Document created event: " + event.toString()); diff --git a/docs-core/src/main/java/com/sismics/docs/core/listener/async/DocumentDeletedAsyncListener.java b/docs-core/src/main/java/com/sismics/docs/core/listener/async/DocumentDeletedAsyncListener.java index 691471e6..0a7bef2f 100644 --- a/docs-core/src/main/java/com/sismics/docs/core/listener/async/DocumentDeletedAsyncListener.java +++ b/docs-core/src/main/java/com/sismics/docs/core/listener/async/DocumentDeletedAsyncListener.java @@ -1,5 +1,6 @@ package com.sismics.docs.core.listener.async; +import com.google.common.eventbus.AllowConcurrentEvents; import com.google.common.eventbus.Subscribe; import com.sismics.docs.core.event.DocumentDeletedAsyncEvent; import com.sismics.docs.core.model.context.AppContext; @@ -22,9 +23,9 @@ public class DocumentDeletedAsyncListener { * Document deleted. * * @param event Document deleted event - * @throws Exception e */ @Subscribe + @AllowConcurrentEvents public void on(final DocumentDeletedAsyncEvent event) { if (log.isInfoEnabled()) { log.info("Document deleted event: " + event.toString()); diff --git a/docs-core/src/main/java/com/sismics/docs/core/listener/async/DocumentUpdatedAsyncListener.java b/docs-core/src/main/java/com/sismics/docs/core/listener/async/DocumentUpdatedAsyncListener.java index f813616f..da0eb5b7 100644 --- a/docs-core/src/main/java/com/sismics/docs/core/listener/async/DocumentUpdatedAsyncListener.java +++ b/docs-core/src/main/java/com/sismics/docs/core/listener/async/DocumentUpdatedAsyncListener.java @@ -1,5 +1,6 @@ package com.sismics.docs.core.listener.async; +import com.google.common.eventbus.AllowConcurrentEvents; import com.google.common.eventbus.Subscribe; import com.sismics.docs.core.dao.ContributorDao; import com.sismics.docs.core.dao.DocumentDao; @@ -30,6 +31,7 @@ public class DocumentUpdatedAsyncListener { * @param event Document updated event */ @Subscribe + @AllowConcurrentEvents public void on(final DocumentUpdatedAsyncEvent event) { if (log.isInfoEnabled()) { log.info("Document updated event: " + event.toString()); diff --git a/docs-core/src/main/java/com/sismics/docs/core/listener/async/FileDeletedAsyncListener.java b/docs-core/src/main/java/com/sismics/docs/core/listener/async/FileDeletedAsyncListener.java index b4ab3691..d1897c8b 100644 --- a/docs-core/src/main/java/com/sismics/docs/core/listener/async/FileDeletedAsyncListener.java +++ b/docs-core/src/main/java/com/sismics/docs/core/listener/async/FileDeletedAsyncListener.java @@ -1,5 +1,6 @@ package com.sismics.docs.core.listener.async; +import com.google.common.eventbus.AllowConcurrentEvents; import com.google.common.eventbus.Subscribe; import com.sismics.docs.core.event.FileDeletedAsyncEvent; import com.sismics.docs.core.model.context.AppContext; @@ -27,6 +28,7 @@ public class FileDeletedAsyncListener { * @throws Exception e */ @Subscribe + @AllowConcurrentEvents public void on(final FileDeletedAsyncEvent event) throws Exception { if (log.isInfoEnabled()) { log.info("File deleted event: " + event.toString()); diff --git a/docs-core/src/main/java/com/sismics/docs/core/listener/async/FileProcessingAsyncListener.java b/docs-core/src/main/java/com/sismics/docs/core/listener/async/FileProcessingAsyncListener.java index 1445a587..5d6d3b31 100644 --- a/docs-core/src/main/java/com/sismics/docs/core/listener/async/FileProcessingAsyncListener.java +++ b/docs-core/src/main/java/com/sismics/docs/core/listener/async/FileProcessingAsyncListener.java @@ -1,5 +1,6 @@ package com.sismics.docs.core.listener.async; +import com.google.common.eventbus.AllowConcurrentEvents; import com.google.common.eventbus.Subscribe; import com.sismics.docs.core.dao.FileDao; import com.sismics.docs.core.dao.UserDao; @@ -45,6 +46,7 @@ public class FileProcessingAsyncListener { * @param event File created event */ @Subscribe + @AllowConcurrentEvents public void on(final FileCreatedAsyncEvent event) { if (log.isInfoEnabled()) { log.info("File created event: " + event.toString()); diff --git a/docs-core/src/main/java/com/sismics/docs/core/listener/async/PasswordLostAsyncListener.java b/docs-core/src/main/java/com/sismics/docs/core/listener/async/PasswordLostAsyncListener.java index 616bc10f..37acc398 100644 --- a/docs-core/src/main/java/com/sismics/docs/core/listener/async/PasswordLostAsyncListener.java +++ b/docs-core/src/main/java/com/sismics/docs/core/listener/async/PasswordLostAsyncListener.java @@ -1,5 +1,6 @@ package com.sismics.docs.core.listener.async; +import com.google.common.eventbus.AllowConcurrentEvents; import com.google.common.eventbus.Subscribe; import com.sismics.docs.core.constant.Constants; import com.sismics.docs.core.dao.dto.UserDto; @@ -27,17 +28,18 @@ public class PasswordLostAsyncListener { /** * Handle events. * - * @param passwordLostEvent Event + * @param event Event */ @Subscribe - public void onPasswordLost(final PasswordLostEvent passwordLostEvent) { + @AllowConcurrentEvents + public void on(final PasswordLostEvent event) { if (log.isInfoEnabled()) { - log.info("Password lost event: " + passwordLostEvent.toString()); + log.info("Password lost event: " + event.toString()); } TransactionUtil.handle(() -> { - final UserDto user = passwordLostEvent.getUser(); - final PasswordRecovery passwordRecovery = passwordLostEvent.getPasswordRecovery(); + final UserDto user = event.getUser(); + final PasswordRecovery passwordRecovery = event.getPasswordRecovery(); // Send the password recovery email Map paramRootMap = new HashMap<>(); diff --git a/docs-core/src/main/java/com/sismics/docs/core/listener/async/RebuildIndexAsyncListener.java b/docs-core/src/main/java/com/sismics/docs/core/listener/async/RebuildIndexAsyncListener.java index 074e79a8..7bf3bff6 100644 --- a/docs-core/src/main/java/com/sismics/docs/core/listener/async/RebuildIndexAsyncListener.java +++ b/docs-core/src/main/java/com/sismics/docs/core/listener/async/RebuildIndexAsyncListener.java @@ -1,5 +1,6 @@ package com.sismics.docs.core.listener.async; +import com.google.common.eventbus.AllowConcurrentEvents; import com.google.common.eventbus.Subscribe; import com.sismics.docs.core.dao.DocumentDao; import com.sismics.docs.core.dao.FileDao; @@ -27,12 +28,13 @@ public class RebuildIndexAsyncListener { /** * Rebuild Lucene index. * - * @param rebuildIndexAsyncEvent Index rebuild event + * @param event Index rebuild event */ @Subscribe - public void on(final RebuildIndexAsyncEvent rebuildIndexAsyncEvent) { + @AllowConcurrentEvents + public void on(final RebuildIndexAsyncEvent event) { if (log.isInfoEnabled()) { - log.info("Rebuild index event: " + rebuildIndexAsyncEvent.toString()); + log.info("Rebuild index event: " + event.toString()); } // Fetch all documents and files diff --git a/docs-core/src/main/java/com/sismics/docs/core/listener/async/RouteStepValidateAsyncListener.java b/docs-core/src/main/java/com/sismics/docs/core/listener/async/RouteStepValidateAsyncListener.java index 10675d0d..dcfbdfe0 100644 --- a/docs-core/src/main/java/com/sismics/docs/core/listener/async/RouteStepValidateAsyncListener.java +++ b/docs-core/src/main/java/com/sismics/docs/core/listener/async/RouteStepValidateAsyncListener.java @@ -1,5 +1,6 @@ package com.sismics.docs.core.listener.async; +import com.google.common.eventbus.AllowConcurrentEvents; import com.google.common.eventbus.Subscribe; import com.sismics.docs.core.constant.Constants; import com.sismics.docs.core.dao.dto.UserDto; @@ -26,22 +27,23 @@ public class RouteStepValidateAsyncListener { /** * Handle events. * - * @param routeStepValidateEvent Event + * @param event Event */ @Subscribe - public void onRouteStepValidate(final RouteStepValidateEvent routeStepValidateEvent) { + @AllowConcurrentEvents + public void on(final RouteStepValidateEvent event) { if (log.isInfoEnabled()) { - log.info("Route step validate event: " + routeStepValidateEvent.toString()); + log.info("Route step validate event: " + event.toString()); } TransactionUtil.handle(() -> { - final UserDto user = routeStepValidateEvent.getUser(); + final UserDto user = event.getUser(); // Send route step validated email Map paramRootMap = new HashMap<>(); paramRootMap.put("user_name", user.getUsername()); - paramRootMap.put("document_id", routeStepValidateEvent.getDocument().getId()); - paramRootMap.put("document_title", routeStepValidateEvent.getDocument().getTitle()); + paramRootMap.put("document_id", event.getDocument().getId()); + paramRootMap.put("document_title", event.getDocument().getTitle()); EmailUtil.sendEmail(Constants.EMAIL_TEMPLATE_ROUTE_STEP_VALIDATE, user, paramRootMap); }); diff --git a/docs-core/src/main/java/com/sismics/docs/core/model/context/AppContext.java b/docs-core/src/main/java/com/sismics/docs/core/model/context/AppContext.java index c340ad62..73184c01 100644 --- a/docs-core/src/main/java/com/sismics/docs/core/model/context/AppContext.java +++ b/docs-core/src/main/java/com/sismics/docs/core/model/context/AppContext.java @@ -174,7 +174,7 @@ public class AppContext { return new EventBus(); } else { ThreadPoolExecutor executor = new ThreadPoolExecutor(8, 8, - 0L, TimeUnit.MILLISECONDS, + 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<>()); asyncExecutorList.add(executor); return new AsyncEventBus(executor); @@ -206,7 +206,7 @@ public class AppContext { // Shutdown executor, don't accept any more tasks (can cause error with nested events) try { executor.shutdown(); - executor.awaitTermination(60, TimeUnit.SECONDS); + executor.awaitTermination(1, TimeUnit.MINUTES); } catch (InterruptedException e) { // NOP }