From 456b6094f79ecdefcd83981064140958532ff44b Mon Sep 17 00:00:00 2001 From: anhefti Date: Tue, 6 Jun 2023 08:43:19 +0200 Subject: [PATCH 1/6] try with 4 event batch workers --- .../impl/SEBClientEventBatchService.java | 47 ++++++++++++++++++- .../api/exam/SebConnectionTest.java | 6 +-- .../services/ClientEventServiceTest.java | 2 +- 3 files changed, 49 insertions(+), 6 deletions(-) diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchService.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchService.java index 0a3fe8ac..2e9e3e81 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchService.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchService.java @@ -95,11 +95,47 @@ public class SEBClientEventBatchService { @Scheduled( fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}", initialDelay = 100) - public void processEvents() { + public void worker1() { + processEvents("worker1"); + } + + @Scheduled( + fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}", + initialDelay = 300) + public void worker2() { + processEvents("worker2"); + } + + @Scheduled( + fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}", + initialDelay = 600) + public void worker3() { + processEvents("worker3"); + } + + @Scheduled( + fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}", + initialDelay = 900) + public void worker4() { + processEvents("worker4"); + } + + public void processOneTime() { + processEvents("One Time Call"); + } + + private void processEvents(final String workerName) { + + long start = 0L; + if (log.isDebugEnabled()) { + start = Utils.getMillisecondsNow(); + } final int size = this.eventDataQueue.size(); if (size > 1000) { - log.warn("-----> There are more then 1000 SEB client logs in the waiting queue: {}", size); + log.warn("-----> There are more then 1000 SEB client logs in the waiting queue: {}, worker: {}", + size, + workerName); } if (size == 0) { @@ -132,6 +168,13 @@ public class SEBClientEventBatchService { this.sqlSessionTemplate.flushStatements(); + if (log.isDebugEnabled()) { + log.debug("SEBClientEventBatchService worker {} processes batch of size {} in {} ms", + workerName, + size, + start - Utils.getMillisecondsNow()); + } + } catch (final Exception e) { log.error("Failed to process SEB events from eventDataQueue: ", e); } diff --git a/src/test/java/ch/ethz/seb/sebserver/webservice/integration/api/exam/SebConnectionTest.java b/src/test/java/ch/ethz/seb/sebserver/webservice/integration/api/exam/SebConnectionTest.java index f7baae0f..88b30db6 100644 --- a/src/test/java/ch/ethz/seb/sebserver/webservice/integration/api/exam/SebConnectionTest.java +++ b/src/test/java/ch/ethz/seb/sebserver/webservice/integration/api/exam/SebConnectionTest.java @@ -561,7 +561,7 @@ public class SebConnectionTest extends ExamAPIIntegrationTester { // check correct response assertTrue(HttpStatus.NO_CONTENT.value() == sendEvent.getStatus()); - this.sebClientEventBatchStore.processEvents(); + this.sebClientEventBatchStore.processOneTime(); // check event stored on db List events = this.clientEventRecordMapper @@ -587,7 +587,7 @@ public class SebConnectionTest extends ExamAPIIntegrationTester { 10000.0, "testEvent2"); - this.sebClientEventBatchStore.processEvents(); + this.sebClientEventBatchStore.processOneTime(); // check correct response assertTrue(HttpStatus.NO_CONTENT.value() == sendEvent.getStatus()); @@ -622,7 +622,7 @@ public class SebConnectionTest extends ExamAPIIntegrationTester { "testEvent1"); // check correct response assertTrue(HttpStatus.NO_CONTENT.value() == sendEvent.getStatus()); - this.sebClientEventBatchStore.processEvents(); + this.sebClientEventBatchStore.processOneTime(); final List events = this.clientEventRecordMapper .selectByExample() .build() diff --git a/src/test/java/ch/ethz/seb/sebserver/webservice/integration/services/ClientEventServiceTest.java b/src/test/java/ch/ethz/seb/sebserver/webservice/integration/services/ClientEventServiceTest.java index 075099b6..d2002e38 100644 --- a/src/test/java/ch/ethz/seb/sebserver/webservice/integration/services/ClientEventServiceTest.java +++ b/src/test/java/ch/ethz/seb/sebserver/webservice/integration/services/ClientEventServiceTest.java @@ -210,7 +210,7 @@ public class ClientEventServiceTest extends AdministrationAPIIntegrationTester { } private void waitForExecutor() { - this.sebClientEventBatchStore.processEvents(); + this.sebClientEventBatchStore.processOneTime(); } @Test From f29aa52dd07b1655726c970ca44c9722c86e80a2 Mon Sep 17 00:00:00 2001 From: anhefti Date: Tue, 6 Jun 2023 08:54:47 +0200 Subject: [PATCH 2/6] only one worker again --- .../impl/SEBClientEventBatchService.java | 44 +++++++++---------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchService.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchService.java index 2e9e3e81..d1bb9181 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchService.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchService.java @@ -93,32 +93,32 @@ public class SEBClientEventBatchService { } @Scheduled( - fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}", + fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:500}", initialDelay = 100) public void worker1() { processEvents("worker1"); } - @Scheduled( - fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}", - initialDelay = 300) - public void worker2() { - processEvents("worker2"); - } - - @Scheduled( - fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}", - initialDelay = 600) - public void worker3() { - processEvents("worker3"); - } - - @Scheduled( - fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}", - initialDelay = 900) - public void worker4() { - processEvents("worker4"); - } +// @Scheduled( +// fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}", +// initialDelay = 300) +// public void worker2() { +// processEvents("worker2"); +// } +// +// @Scheduled( +// fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}", +// initialDelay = 600) +// public void worker3() { +// processEvents("worker3"); +// } +// +// @Scheduled( +// fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}", +// initialDelay = 900) +// public void worker4() { +// processEvents("worker4"); +// } public void processOneTime() { processEvents("One Time Call"); @@ -243,7 +243,7 @@ public class SEBClientEventBatchService { return null; } catch (final Exception e) { - log.error("Failed to verify and process notification for SEB event: {}", eventData); + log.error("Failed to verify and process notification for SEB event: {}", eventData, e); return eventData; } } From 4924c7f12545bd110bce8c2ad8fb28172cf89886 Mon Sep 17 00:00:00 2001 From: anhefti Date: Tue, 6 Jun 2023 09:01:18 +0200 Subject: [PATCH 3/6] fix concurrency error --- .../impl/SEBClientEventBatchService.java | 68 +++++++++++-------- 1 file changed, 38 insertions(+), 30 deletions(-) diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchService.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchService.java index d1bb9181..3a3852b0 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchService.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchService.java @@ -79,7 +79,7 @@ public class SEBClientEventBatchService { } private final BlockingDeque eventDataQueue = new LinkedBlockingDeque<>(); - private final Collection events = new ArrayList<>(); + //private final Collection events = new ArrayList<>(); public void accept(final String connectionToken, final String jsonBody) { this.eventDataQueue.add(new EventData( @@ -92,39 +92,47 @@ public class SEBClientEventBatchService { this.eventDataQueue.add(eventData); } + private final Collection events1 = new ArrayList<>(); + @Scheduled( - fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:500}", + fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}", initialDelay = 100) public void worker1() { - processEvents("worker1"); + processEvents("worker1", this.events1); } -// @Scheduled( -// fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}", -// initialDelay = 300) -// public void worker2() { -// processEvents("worker2"); -// } -// -// @Scheduled( -// fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}", -// initialDelay = 600) -// public void worker3() { -// processEvents("worker3"); -// } -// -// @Scheduled( -// fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}", -// initialDelay = 900) -// public void worker4() { -// processEvents("worker4"); -// } + private final Collection events2 = new ArrayList<>(); + + @Scheduled( + fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}", + initialDelay = 300) + public void worker2() { + processEvents("worker2", this.events2); + } + + private final Collection events3 = new ArrayList<>(); + + @Scheduled( + fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}", + initialDelay = 600) + public void worker3() { + processEvents("worker3", this.events3); + } + + private final Collection events4 = new ArrayList<>(); + + @Scheduled( + fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}", + initialDelay = 900) + public void worker4() { + processEvents("worker4", this.events4); + } public void processOneTime() { - processEvents("One Time Call"); + processEvents("One Time Call", new ArrayList<>()); } - private void processEvents(final String workerName) { + private void processEvents(final String workerName, final Collection events) { long start = 0L; if (log.isDebugEnabled()) { @@ -144,14 +152,14 @@ public class SEBClientEventBatchService { try { - this.events.clear(); - this.eventDataQueue.drainTo(this.events); + events.clear(); + this.eventDataQueue.drainTo(events); - if (this.events.isEmpty()) { + if (events.isEmpty()) { return; } - final List events = this.events + final List records = events .stream() .map(this::convertData) .map(this::storeNotifications) @@ -162,7 +170,7 @@ public class SEBClientEventBatchService { this.transactionTemplate .execute(status -> { - events.stream().forEach(this.clientEventMapper::insert); + records.stream().forEach(this.clientEventMapper::insert); return null; }); From 8322f07cf7a6e35118d21c13571e5f38978aad34 Mon Sep 17 00:00:00 2001 From: anhefti Date: Tue, 6 Jun 2023 09:06:02 +0200 Subject: [PATCH 4/6] with logging --- .../impl/SEBClientEventBatchService.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchService.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchService.java index 3a3852b0..f0ec5211 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchService.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchService.java @@ -135,9 +135,9 @@ public class SEBClientEventBatchService { private void processEvents(final String workerName, final Collection events) { long start = 0L; - if (log.isDebugEnabled()) { - start = Utils.getMillisecondsNow(); - } + //if (log.isDebugEnabled()) { + start = Utils.getMillisecondsNow(); + //} final int size = this.eventDataQueue.size(); if (size > 1000) { @@ -176,12 +176,12 @@ public class SEBClientEventBatchService { this.sqlSessionTemplate.flushStatements(); - if (log.isDebugEnabled()) { - log.debug("SEBClientEventBatchService worker {} processes batch of size {} in {} ms", - workerName, - size, - start - Utils.getMillisecondsNow()); - } + //if (log.isDebugEnabled()) { + log.debug("SEBClientEventBatchService worker {} processes batch of size {} in {} ms", + workerName, + size, + start - Utils.getMillisecondsNow()); + //} } catch (final Exception e) { log.error("Failed to process SEB events from eventDataQueue: ", e); From 886f457438a0ab2c53e657a5b66a6ff738735787 Mon Sep 17 00:00:00 2001 From: anhefti Date: Tue, 6 Jun 2023 09:12:01 +0200 Subject: [PATCH 5/6] with logging --- .../session/impl/SEBClientEventBatchService.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchService.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchService.java index f0ec5211..1a2d8f76 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchService.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchService.java @@ -176,12 +176,16 @@ public class SEBClientEventBatchService { this.sqlSessionTemplate.flushStatements(); - //if (log.isDebugEnabled()) { - log.debug("SEBClientEventBatchService worker {} processes batch of size {} in {} ms", + if (log.isDebugEnabled()) { + log.debug("SEBClientEventBatchService worker {} processes batch of size {} in {} ms", + workerName, + size, + start - Utils.getMillisecondsNow()); + } + log.info("SEBClientEventBatchService worker {} processes batch of size {} in {} ms", workerName, size, start - Utils.getMillisecondsNow()); - //} } catch (final Exception e) { log.error("Failed to process SEB events from eventDataQueue: ", e); From 6354da0f49b56225f204fd653ba1d7218250982b Mon Sep 17 00:00:00 2001 From: anhefti Date: Tue, 6 Jun 2023 09:18:45 +0200 Subject: [PATCH 6/6] seems to work well with 4 workers... consider to add to 1.6 --- .../session/impl/SEBClientEventBatchService.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchService.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchService.java index 1a2d8f76..3a3852b0 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchService.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchService.java @@ -135,9 +135,9 @@ public class SEBClientEventBatchService { private void processEvents(final String workerName, final Collection events) { long start = 0L; - //if (log.isDebugEnabled()) { - start = Utils.getMillisecondsNow(); - //} + if (log.isDebugEnabled()) { + start = Utils.getMillisecondsNow(); + } final int size = this.eventDataQueue.size(); if (size > 1000) { @@ -182,10 +182,6 @@ public class SEBClientEventBatchService { size, start - Utils.getMillisecondsNow()); } - log.info("SEBClientEventBatchService worker {} processes batch of size {} in {} ms", - workerName, - size, - start - Utils.getMillisecondsNow()); } catch (final Exception e) { log.error("Failed to process SEB events from eventDataQueue: ", e);