try with 4 event batch workers

This commit is contained in:
anhefti 2023-06-06 08:43:19 +02:00
parent c06bca1f86
commit 456b6094f7
3 changed files with 49 additions and 6 deletions

View file

@ -95,11 +95,47 @@ public class SEBClientEventBatchService {
@Scheduled( @Scheduled(
fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}", fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}",
initialDelay = 100) initialDelay = 100)
public void processEvents() { public void worker1() {
processEvents("worker1");
}
@Scheduled(
fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}",
initialDelay = 300)
public void worker2() {
processEvents("worker2");
}
@Scheduled(
fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}",
initialDelay = 600)
public void worker3() {
processEvents("worker3");
}
@Scheduled(
fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}",
initialDelay = 900)
public void worker4() {
processEvents("worker4");
}
public void processOneTime() {
processEvents("One Time Call");
}
private void processEvents(final String workerName) {
long start = 0L;
if (log.isDebugEnabled()) {
start = Utils.getMillisecondsNow();
}
final int size = this.eventDataQueue.size(); final int size = this.eventDataQueue.size();
if (size > 1000) { if (size > 1000) {
log.warn("-----> There are more then 1000 SEB client logs in the waiting queue: {}", size); log.warn("-----> There are more then 1000 SEB client logs in the waiting queue: {}, worker: {}",
size,
workerName);
} }
if (size == 0) { if (size == 0) {
@ -132,6 +168,13 @@ public class SEBClientEventBatchService {
this.sqlSessionTemplate.flushStatements(); this.sqlSessionTemplate.flushStatements();
if (log.isDebugEnabled()) {
log.debug("SEBClientEventBatchService worker {} processes batch of size {} in {} ms",
workerName,
size,
start - Utils.getMillisecondsNow());
}
} catch (final Exception e) { } catch (final Exception e) {
log.error("Failed to process SEB events from eventDataQueue: ", e); log.error("Failed to process SEB events from eventDataQueue: ", e);
} }

View file

@ -561,7 +561,7 @@ public class SebConnectionTest extends ExamAPIIntegrationTester {
// check correct response // check correct response
assertTrue(HttpStatus.NO_CONTENT.value() == sendEvent.getStatus()); assertTrue(HttpStatus.NO_CONTENT.value() == sendEvent.getStatus());
this.sebClientEventBatchStore.processEvents(); this.sebClientEventBatchStore.processOneTime();
// check event stored on db // check event stored on db
List<ClientEventRecord> events = this.clientEventRecordMapper List<ClientEventRecord> events = this.clientEventRecordMapper
@ -587,7 +587,7 @@ public class SebConnectionTest extends ExamAPIIntegrationTester {
10000.0, 10000.0,
"testEvent2"); "testEvent2");
this.sebClientEventBatchStore.processEvents(); this.sebClientEventBatchStore.processOneTime();
// check correct response // check correct response
assertTrue(HttpStatus.NO_CONTENT.value() == sendEvent.getStatus()); assertTrue(HttpStatus.NO_CONTENT.value() == sendEvent.getStatus());
@ -622,7 +622,7 @@ public class SebConnectionTest extends ExamAPIIntegrationTester {
"testEvent1"); "testEvent1");
// check correct response // check correct response
assertTrue(HttpStatus.NO_CONTENT.value() == sendEvent.getStatus()); assertTrue(HttpStatus.NO_CONTENT.value() == sendEvent.getStatus());
this.sebClientEventBatchStore.processEvents(); this.sebClientEventBatchStore.processOneTime();
final List<ClientEventRecord> events = this.clientEventRecordMapper final List<ClientEventRecord> events = this.clientEventRecordMapper
.selectByExample() .selectByExample()
.build() .build()

View file

@ -210,7 +210,7 @@ public class ClientEventServiceTest extends AdministrationAPIIntegrationTester {
} }
private void waitForExecutor() { private void waitForExecutor() {
this.sebClientEventBatchStore.processEvents(); this.sebClientEventBatchStore.processOneTime();
} }
@Test @Test