process events concurrently

This commit is contained in:
Benjamin Gamard 2018-04-11 11:27:55 +02:00
parent 7b3c0915d8
commit b265b8b1e0
11 changed files with 36 additions and 17 deletions

View File

@ -1,5 +1,6 @@
package com.sismics.docs.core.listener.async; package com.sismics.docs.core.listener.async;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe; import com.google.common.eventbus.Subscribe;
import com.sismics.docs.core.event.AclCreatedAsyncEvent; import com.sismics.docs.core.event.AclCreatedAsyncEvent;
import com.sismics.docs.core.model.context.AppContext; import com.sismics.docs.core.model.context.AppContext;
@ -24,6 +25,7 @@ public class AclCreatedAsyncListener {
* @param event ACL created event * @param event ACL created event
*/ */
@Subscribe @Subscribe
@AllowConcurrentEvents
public void on(final AclCreatedAsyncEvent event) { public void on(final AclCreatedAsyncEvent event) {
if (log.isInfoEnabled()) { if (log.isInfoEnabled()) {
log.info("ACL created event: " + event.toString()); log.info("ACL created event: " + event.toString());

View File

@ -1,5 +1,6 @@
package com.sismics.docs.core.listener.async; package com.sismics.docs.core.listener.async;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe; import com.google.common.eventbus.Subscribe;
import com.sismics.docs.core.event.AclDeletedAsyncEvent; import com.sismics.docs.core.event.AclDeletedAsyncEvent;
import com.sismics.docs.core.model.context.AppContext; import com.sismics.docs.core.model.context.AppContext;
@ -24,6 +25,7 @@ public class AclDeletedAsyncListener {
* @param event ACL deleted event * @param event ACL deleted event
*/ */
@Subscribe @Subscribe
@AllowConcurrentEvents
public void on(final AclDeletedAsyncEvent event) { public void on(final AclDeletedAsyncEvent event) {
if (log.isInfoEnabled()) { if (log.isInfoEnabled()) {
log.info("ACL deleted event: " + event.toString()); log.info("ACL deleted event: " + event.toString());

View File

@ -1,5 +1,6 @@
package com.sismics.docs.core.listener.async; package com.sismics.docs.core.listener.async;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe; import com.google.common.eventbus.Subscribe;
import com.sismics.docs.core.dao.ContributorDao; import com.sismics.docs.core.dao.ContributorDao;
import com.sismics.docs.core.event.DocumentCreatedAsyncEvent; import com.sismics.docs.core.event.DocumentCreatedAsyncEvent;
@ -26,6 +27,7 @@ public class DocumentCreatedAsyncListener {
* @param event Document created event * @param event Document created event
*/ */
@Subscribe @Subscribe
@AllowConcurrentEvents
public void on(final DocumentCreatedAsyncEvent event) { public void on(final DocumentCreatedAsyncEvent event) {
if (log.isInfoEnabled()) { if (log.isInfoEnabled()) {
log.info("Document created event: " + event.toString()); log.info("Document created event: " + event.toString());

View File

@ -1,5 +1,6 @@
package com.sismics.docs.core.listener.async; package com.sismics.docs.core.listener.async;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe; import com.google.common.eventbus.Subscribe;
import com.sismics.docs.core.event.DocumentDeletedAsyncEvent; import com.sismics.docs.core.event.DocumentDeletedAsyncEvent;
import com.sismics.docs.core.model.context.AppContext; import com.sismics.docs.core.model.context.AppContext;
@ -22,9 +23,9 @@ public class DocumentDeletedAsyncListener {
* Document deleted. * Document deleted.
* *
* @param event Document deleted event * @param event Document deleted event
* @throws Exception e
*/ */
@Subscribe @Subscribe
@AllowConcurrentEvents
public void on(final DocumentDeletedAsyncEvent event) { public void on(final DocumentDeletedAsyncEvent event) {
if (log.isInfoEnabled()) { if (log.isInfoEnabled()) {
log.info("Document deleted event: " + event.toString()); log.info("Document deleted event: " + event.toString());

View File

@ -1,5 +1,6 @@
package com.sismics.docs.core.listener.async; package com.sismics.docs.core.listener.async;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe; import com.google.common.eventbus.Subscribe;
import com.sismics.docs.core.dao.ContributorDao; import com.sismics.docs.core.dao.ContributorDao;
import com.sismics.docs.core.dao.DocumentDao; import com.sismics.docs.core.dao.DocumentDao;
@ -30,6 +31,7 @@ public class DocumentUpdatedAsyncListener {
* @param event Document updated event * @param event Document updated event
*/ */
@Subscribe @Subscribe
@AllowConcurrentEvents
public void on(final DocumentUpdatedAsyncEvent event) { public void on(final DocumentUpdatedAsyncEvent event) {
if (log.isInfoEnabled()) { if (log.isInfoEnabled()) {
log.info("Document updated event: " + event.toString()); log.info("Document updated event: " + event.toString());

View File

@ -1,5 +1,6 @@
package com.sismics.docs.core.listener.async; package com.sismics.docs.core.listener.async;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe; import com.google.common.eventbus.Subscribe;
import com.sismics.docs.core.event.FileDeletedAsyncEvent; import com.sismics.docs.core.event.FileDeletedAsyncEvent;
import com.sismics.docs.core.model.context.AppContext; import com.sismics.docs.core.model.context.AppContext;
@ -27,6 +28,7 @@ public class FileDeletedAsyncListener {
* @throws Exception e * @throws Exception e
*/ */
@Subscribe @Subscribe
@AllowConcurrentEvents
public void on(final FileDeletedAsyncEvent event) throws Exception { public void on(final FileDeletedAsyncEvent event) throws Exception {
if (log.isInfoEnabled()) { if (log.isInfoEnabled()) {
log.info("File deleted event: " + event.toString()); log.info("File deleted event: " + event.toString());

View File

@ -1,5 +1,6 @@
package com.sismics.docs.core.listener.async; package com.sismics.docs.core.listener.async;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe; import com.google.common.eventbus.Subscribe;
import com.sismics.docs.core.dao.FileDao; import com.sismics.docs.core.dao.FileDao;
import com.sismics.docs.core.dao.UserDao; import com.sismics.docs.core.dao.UserDao;
@ -45,6 +46,7 @@ public class FileProcessingAsyncListener {
* @param event File created event * @param event File created event
*/ */
@Subscribe @Subscribe
@AllowConcurrentEvents
public void on(final FileCreatedAsyncEvent event) { public void on(final FileCreatedAsyncEvent event) {
if (log.isInfoEnabled()) { if (log.isInfoEnabled()) {
log.info("File created event: " + event.toString()); log.info("File created event: " + event.toString());

View File

@ -1,5 +1,6 @@
package com.sismics.docs.core.listener.async; package com.sismics.docs.core.listener.async;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe; import com.google.common.eventbus.Subscribe;
import com.sismics.docs.core.constant.Constants; import com.sismics.docs.core.constant.Constants;
import com.sismics.docs.core.dao.dto.UserDto; import com.sismics.docs.core.dao.dto.UserDto;
@ -27,17 +28,18 @@ public class PasswordLostAsyncListener {
/** /**
* Handle events. * Handle events.
* *
* @param passwordLostEvent Event * @param event Event
*/ */
@Subscribe @Subscribe
public void onPasswordLost(final PasswordLostEvent passwordLostEvent) { @AllowConcurrentEvents
public void on(final PasswordLostEvent event) {
if (log.isInfoEnabled()) { if (log.isInfoEnabled()) {
log.info("Password lost event: " + passwordLostEvent.toString()); log.info("Password lost event: " + event.toString());
} }
TransactionUtil.handle(() -> { TransactionUtil.handle(() -> {
final UserDto user = passwordLostEvent.getUser(); final UserDto user = event.getUser();
final PasswordRecovery passwordRecovery = passwordLostEvent.getPasswordRecovery(); final PasswordRecovery passwordRecovery = event.getPasswordRecovery();
// Send the password recovery email // Send the password recovery email
Map<String, Object> paramRootMap = new HashMap<>(); Map<String, Object> paramRootMap = new HashMap<>();

View File

@ -1,5 +1,6 @@
package com.sismics.docs.core.listener.async; package com.sismics.docs.core.listener.async;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe; import com.google.common.eventbus.Subscribe;
import com.sismics.docs.core.dao.DocumentDao; import com.sismics.docs.core.dao.DocumentDao;
import com.sismics.docs.core.dao.FileDao; import com.sismics.docs.core.dao.FileDao;
@ -27,12 +28,13 @@ public class RebuildIndexAsyncListener {
/** /**
* Rebuild Lucene index. * Rebuild Lucene index.
* *
* @param rebuildIndexAsyncEvent Index rebuild event * @param event Index rebuild event
*/ */
@Subscribe @Subscribe
public void on(final RebuildIndexAsyncEvent rebuildIndexAsyncEvent) { @AllowConcurrentEvents
public void on(final RebuildIndexAsyncEvent event) {
if (log.isInfoEnabled()) { if (log.isInfoEnabled()) {
log.info("Rebuild index event: " + rebuildIndexAsyncEvent.toString()); log.info("Rebuild index event: " + event.toString());
} }
// Fetch all documents and files // Fetch all documents and files

View File

@ -1,5 +1,6 @@
package com.sismics.docs.core.listener.async; package com.sismics.docs.core.listener.async;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe; import com.google.common.eventbus.Subscribe;
import com.sismics.docs.core.constant.Constants; import com.sismics.docs.core.constant.Constants;
import com.sismics.docs.core.dao.dto.UserDto; import com.sismics.docs.core.dao.dto.UserDto;
@ -26,22 +27,23 @@ public class RouteStepValidateAsyncListener {
/** /**
* Handle events. * Handle events.
* *
* @param routeStepValidateEvent Event * @param event Event
*/ */
@Subscribe @Subscribe
public void onRouteStepValidate(final RouteStepValidateEvent routeStepValidateEvent) { @AllowConcurrentEvents
public void on(final RouteStepValidateEvent event) {
if (log.isInfoEnabled()) { if (log.isInfoEnabled()) {
log.info("Route step validate event: " + routeStepValidateEvent.toString()); log.info("Route step validate event: " + event.toString());
} }
TransactionUtil.handle(() -> { TransactionUtil.handle(() -> {
final UserDto user = routeStepValidateEvent.getUser(); final UserDto user = event.getUser();
// Send route step validated email // Send route step validated email
Map<String, Object> paramRootMap = new HashMap<>(); Map<String, Object> paramRootMap = new HashMap<>();
paramRootMap.put("user_name", user.getUsername()); paramRootMap.put("user_name", user.getUsername());
paramRootMap.put("document_id", routeStepValidateEvent.getDocument().getId()); paramRootMap.put("document_id", event.getDocument().getId());
paramRootMap.put("document_title", routeStepValidateEvent.getDocument().getTitle()); paramRootMap.put("document_title", event.getDocument().getTitle());
EmailUtil.sendEmail(Constants.EMAIL_TEMPLATE_ROUTE_STEP_VALIDATE, user, paramRootMap); EmailUtil.sendEmail(Constants.EMAIL_TEMPLATE_ROUTE_STEP_VALIDATE, user, paramRootMap);
}); });

View File

@ -174,7 +174,7 @@ public class AppContext {
return new EventBus(); return new EventBus();
} else { } else {
ThreadPoolExecutor executor = new ThreadPoolExecutor(8, 8, ThreadPoolExecutor executor = new ThreadPoolExecutor(8, 8,
0L, TimeUnit.MILLISECONDS, 1L, TimeUnit.MINUTES,
new LinkedBlockingQueue<>()); new LinkedBlockingQueue<>());
asyncExecutorList.add(executor); asyncExecutorList.add(executor);
return new AsyncEventBus(executor); return new AsyncEventBus(executor);
@ -206,7 +206,7 @@ public class AppContext {
// Shutdown executor, don't accept any more tasks (can cause error with nested events) // Shutdown executor, don't accept any more tasks (can cause error with nested events)
try { try {
executor.shutdown(); executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS); executor.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) { } catch (InterruptedException e) {
// NOP // NOP
} }