diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchService.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchService.java index 0a3fe8ac..3a3852b0 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchService.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchService.java @@ -79,7 +79,7 @@ public class SEBClientEventBatchService { } private final BlockingDeque eventDataQueue = new LinkedBlockingDeque<>(); - private final Collection events = new ArrayList<>(); + //private final Collection events = new ArrayList<>(); public void accept(final String connectionToken, final String jsonBody) { this.eventDataQueue.add(new EventData( @@ -92,14 +92,58 @@ public class SEBClientEventBatchService { this.eventDataQueue.add(eventData); } + private final Collection events1 = new ArrayList<>(); + @Scheduled( fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}", initialDelay = 100) - public void processEvents() { + public void worker1() { + processEvents("worker1", this.events1); + } + + private final Collection events2 = new ArrayList<>(); + + @Scheduled( + fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}", + initialDelay = 300) + public void worker2() { + processEvents("worker2", this.events2); + } + + private final Collection events3 = new ArrayList<>(); + + @Scheduled( + fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}", + initialDelay = 600) + public void worker3() { + processEvents("worker3", this.events3); + } + + private final Collection events4 = new ArrayList<>(); + + @Scheduled( + fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}", + initialDelay = 900) + public void worker4() { + processEvents("worker4", this.events4); + } + + public void processOneTime() { + processEvents("One Time Call", new ArrayList<>()); + } + + private void processEvents(final String workerName, final Collection events) { + + long start = 0L; + if (log.isDebugEnabled()) { + start = Utils.getMillisecondsNow(); + } final int size = this.eventDataQueue.size(); if (size > 1000) { - log.warn("-----> There are more then 1000 SEB client logs in the waiting queue: {}", size); + log.warn("-----> There are more then 1000 SEB client logs in the waiting queue: {}, worker: {}", + size, + workerName); } if (size == 0) { @@ -108,14 +152,14 @@ public class SEBClientEventBatchService { try { - this.events.clear(); - this.eventDataQueue.drainTo(this.events); + events.clear(); + this.eventDataQueue.drainTo(events); - if (this.events.isEmpty()) { + if (events.isEmpty()) { return; } - final List events = this.events + final List records = events .stream() .map(this::convertData) .map(this::storeNotifications) @@ -126,12 +170,19 @@ public class SEBClientEventBatchService { this.transactionTemplate .execute(status -> { - events.stream().forEach(this.clientEventMapper::insert); + records.stream().forEach(this.clientEventMapper::insert); return null; }); this.sqlSessionTemplate.flushStatements(); + if (log.isDebugEnabled()) { + log.debug("SEBClientEventBatchService worker {} processes batch of size {} in {} ms", + workerName, + size, + start - Utils.getMillisecondsNow()); + } + } catch (final Exception e) { log.error("Failed to process SEB events from eventDataQueue: ", e); } @@ -200,7 +251,7 @@ public class SEBClientEventBatchService { return null; } catch (final Exception e) { - log.error("Failed to verify and process notification for SEB event: {}", eventData); + log.error("Failed to verify and process notification for SEB event: {}", eventData, e); return eventData; } } diff --git a/src/test/java/ch/ethz/seb/sebserver/webservice/integration/api/exam/SebConnectionTest.java b/src/test/java/ch/ethz/seb/sebserver/webservice/integration/api/exam/SebConnectionTest.java index f7baae0f..88b30db6 100644 --- a/src/test/java/ch/ethz/seb/sebserver/webservice/integration/api/exam/SebConnectionTest.java +++ b/src/test/java/ch/ethz/seb/sebserver/webservice/integration/api/exam/SebConnectionTest.java @@ -561,7 +561,7 @@ public class SebConnectionTest extends ExamAPIIntegrationTester { // check correct response assertTrue(HttpStatus.NO_CONTENT.value() == sendEvent.getStatus()); - this.sebClientEventBatchStore.processEvents(); + this.sebClientEventBatchStore.processOneTime(); // check event stored on db List events = this.clientEventRecordMapper @@ -587,7 +587,7 @@ public class SebConnectionTest extends ExamAPIIntegrationTester { 10000.0, "testEvent2"); - this.sebClientEventBatchStore.processEvents(); + this.sebClientEventBatchStore.processOneTime(); // check correct response assertTrue(HttpStatus.NO_CONTENT.value() == sendEvent.getStatus()); @@ -622,7 +622,7 @@ public class SebConnectionTest extends ExamAPIIntegrationTester { "testEvent1"); // check correct response assertTrue(HttpStatus.NO_CONTENT.value() == sendEvent.getStatus()); - this.sebClientEventBatchStore.processEvents(); + this.sebClientEventBatchStore.processOneTime(); final List events = this.clientEventRecordMapper .selectByExample() .build() diff --git a/src/test/java/ch/ethz/seb/sebserver/webservice/integration/services/ClientEventServiceTest.java b/src/test/java/ch/ethz/seb/sebserver/webservice/integration/services/ClientEventServiceTest.java index 075099b6..d2002e38 100644 --- a/src/test/java/ch/ethz/seb/sebserver/webservice/integration/services/ClientEventServiceTest.java +++ b/src/test/java/ch/ethz/seb/sebserver/webservice/integration/services/ClientEventServiceTest.java @@ -210,7 +210,7 @@ public class ClientEventServiceTest extends AdministrationAPIIntegrationTester { } private void waitForExecutor() { - this.sebClientEventBatchStore.processEvents(); + this.sebClientEventBatchStore.processOneTime(); } @Test