From a5335f1c985d37ecbb681d1820f27b98b43adf4f Mon Sep 17 00:00:00 2001 From: anhefti Date: Tue, 30 May 2023 09:21:53 +0200 Subject: [PATCH] SEBSERV-445 pings go to working queue just after arriving too now --- .../session/SEBClientSessionService.java | 6 - .../impl/ClientConnectionDataInternal.java | 4 +- .../impl/SEBClientEventBatchStore.java | 17 +-- .../session/impl/SEBClientPingService.java | 117 ++++++++++++++++++ .../impl/SEBClientSessionServiceImpl.java | 47 ++++--- .../impl/indicator/AbstractPingIndicator.java | 2 +- .../config/application-dev.properties | 1 + 7 files changed, 160 insertions(+), 34 deletions(-) create mode 100644 src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientPingService.java diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/SEBClientSessionService.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/SEBClientSessionService.java index 1d342868..ae10241b 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/SEBClientSessionService.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/SEBClientSessionService.java @@ -40,12 +40,6 @@ public interface SEBClientSessionService { * @param event The SEB client event data */ void notifyClientEvent(String connectionToken, String jsonBody); -// /** Notify a SEB client event for live indication and storing to database. -// * -// * @param connectionToken the connection token -// * @param event The SEB client event data */ -// void notifyClientEvent(String connectionToken, final ClientEvent event); - /** This is used to confirm SEB instructions that must be confirmed by the SEB client. * * @param connectionToken The SEB client connection token diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/ClientConnectionDataInternal.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/ClientConnectionDataInternal.java index 590ebe9f..a4699fef 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/ClientConnectionDataInternal.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/ClientConnectionDataInternal.java @@ -86,9 +86,9 @@ public class ClientConnectionDataInternal extends ClientConnectionData { } } - public final void notifyPing(final long timestamp, final int pingNumber) { + public final void notifyPing(final long timestamp) { if (this.pingIndicator != null) { - this.pingIndicator.notifyPing(timestamp, pingNumber); + this.pingIndicator.notifyPing(timestamp); } } diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchStore.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchStore.java index 78bb02f1..0f04e487 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchStore.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientEventBatchStore.java @@ -93,18 +93,16 @@ public class SEBClientEventBatchStore { } @Scheduled( - fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.task:1000}", + fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}", initialDelay = 1000) public void processEvents() { final long startTime = Utils.getMillisecondsNow(); - //if (log.isDebugEnabled()) { final int size = this.eventDataQueue.size(); if (size > 1000) { log.warn("******* There are more then 1000 SEB client logs in the waiting queue: {}", size); } - //} try { @@ -115,8 +113,6 @@ public class SEBClientEventBatchStore { return; } - System.out.println("********** processing: " + this.events.size()); - final List events = this.events .stream() .map(this::convertData) @@ -134,9 +130,14 @@ public class SEBClientEventBatchStore { this.sqlSessionTemplate.flushStatements(); - //if (log.isTraceEnabled()) { - log.info("****** Processing SEB events tuck: {}", Utils.getMillisecondsNow() - startTime); - //} + if (log.isTraceEnabled()) { + log.trace("Processing {} SEB events tuck: {}", + this.events.size(), + Utils.getMillisecondsNow() - startTime); + } + // TODO just for debugging + System.out.println("***** Processing " + this.events.size() + " SEB events tuck: " + + (Utils.getMillisecondsNow() - startTime)); } catch (final Exception e) { log.error("Failed to process SEB events from eventDataQueue: ", e); diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientPingService.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientPingService.java new file mode 100644 index 00000000..e5b7602d --- /dev/null +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientPingService.java @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2023 ETH Zürich, Educational Development and Technology (LET) + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.ehcache.impl.internal.concurrent.ConcurrentHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Lazy; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile; +import ch.ethz.seb.sebserver.gbl.util.Utils; +import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientInstructionService; + +@Lazy +@Component +@WebServiceProfile +public class SEBClientPingService { + + private static final Logger log = LoggerFactory.getLogger(SEBClientPingService.class); + + private final ExamSessionCacheService examSessionCacheService; + private final SEBClientInstructionService sebClientInstructionService; + + private final Map pings = new ConcurrentHashMap<>(); + private final Map instructions = new ConcurrentHashMap<>(); + + public SEBClientPingService( + final ExamSessionCacheService examSessionCacheService, + final SEBClientInstructionService sebClientInstructionService) { + + this.examSessionCacheService = examSessionCacheService; + this.sebClientInstructionService = sebClientInstructionService; + } + + @Scheduled( + fixedDelayString = "${sebserver.webservice.api.exam.session.ping.batch.interval:500}", + initialDelay = 1000) + public void processPings() { + if (this.pings.isEmpty()) { + return; + } + + final long startTime = Utils.getMillisecondsNow(); + + final int size = this.pings.size(); + if (size > 1000) { + log.warn("******* There are more then 1000 SEB client logs in the waiting queue: {}", size); + } + + try { + final Map pp = new HashMap<>(this.pings); + this.pings.clear(); + + pp.entrySet() + .stream() + .forEach(entry -> processPing(entry.getKey(), entry.getValue(), startTime)); + + if (log.isTraceEnabled()) { + log.trace("****** Processing {} SEB pings tuck: {}", Utils.getMillisecondsNow() - startTime); + } + // TODO just for debugging + System.out.println("***** Processing " + size + " SEB pings tuck: " + + (Utils.getMillisecondsNow() - startTime)); + + } catch (final Exception e) { + log.error("Failed to process SEB pings from pingDataQueue: ", e); + } + } + + public String notifyPing( + final String connectionToken, + final String instructionConfirm) { + + if (instructionConfirm != null) { + this.pings.put(connectionToken, instructionConfirm); + } else if (!this.pings.containsKey(connectionToken)) { + this.pings.put(connectionToken, StringUtils.EMPTY); + } + + return this.instructions.remove(connectionToken); + } + + private void processPing( + final String connectionToken, + final String instructionConfirm, + final long timestamp) { + + final ClientConnectionDataInternal activeClientConnection = this.examSessionCacheService + .getClientConnection(connectionToken); + + if (activeClientConnection != null) { + activeClientConnection.notifyPing(timestamp); + } + + if (instructionConfirm != StringUtils.EMPTY) { + this.sebClientInstructionService.confirmInstructionDone(connectionToken, instructionConfirm); + } + + final String instructionJSON = this.sebClientInstructionService.getInstructionJSON(connectionToken); + if (instructionJSON != null) { + this.instructions.put(connectionToken, instructionJSON); + } + } + +} diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientSessionServiceImpl.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientSessionServiceImpl.java index 7e7b182f..69fc9279 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientSessionServiceImpl.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientSessionServiceImpl.java @@ -49,6 +49,7 @@ public class SEBClientSessionServiceImpl implements SEBClientSessionService { private final InternalClientConnectionDataFactory internalClientConnectionDataFactory; private final SecurityKeyService securityKeyService; private final SEBClientVersionService sebClientVersionService; + private final SEBClientPingService sebClientPingService; public SEBClientSessionServiceImpl( final ClientConnectionDAO clientConnectionDAO, @@ -58,7 +59,8 @@ public class SEBClientSessionServiceImpl implements SEBClientSessionService { final ClientIndicatorFactory clientIndicatorFactory, final InternalClientConnectionDataFactory internalClientConnectionDataFactory, final SecurityKeyService securityKeyService, - final SEBClientVersionService sebClientVersionService) { + final SEBClientVersionService sebClientVersionService, + final SEBClientPingService sebClientPingService) { this.clientConnectionDAO = clientConnectionDAO; this.examSessionService = examSessionService; @@ -69,6 +71,7 @@ public class SEBClientSessionServiceImpl implements SEBClientSessionService { this.internalClientConnectionDataFactory = internalClientConnectionDataFactory; this.securityKeyService = securityKeyService; this.sebClientVersionService = sebClientVersionService; + this.sebClientPingService = sebClientPingService; } @Override @@ -115,15 +118,25 @@ public class SEBClientSessionServiceImpl implements SEBClientSessionService { final int pingNumber, final String instructionConfirm) { - processPing(connectionToken, timestamp, pingNumber); - - if (instructionConfirm != null) { - this.sebInstructionService.confirmInstructionDone(connectionToken, instructionConfirm); - } - - return this.sebInstructionService.getInstructionJSON(connectionToken); + return this.sebClientPingService.notifyPing(connectionToken, instructionConfirm); } +// @Override +// public String notifyPing( +// final String connectionToken, +// final long timestamp, +// final int pingNumber, +// final String instructionConfirm) { +// +// processPing(connectionToken, timestamp, pingNumber); +// +// if (instructionConfirm != null) { +// this.sebInstructionService.confirmInstructionDone(connectionToken, instructionConfirm); +// } +// +// return this.sebInstructionService.getInstructionJSON(connectionToken); +// } + @Override public final void notifyClientEvent(final String connectionToken, final String jsonBody) { this.sebClientEventBatchStore.accept(connectionToken, jsonBody); @@ -142,15 +155,15 @@ public class SEBClientSessionServiceImpl implements SEBClientSessionService { this.internalClientConnectionDataFactory.getGroupIds(clientConnection))); } - private void processPing(final String connectionToken, final long timestamp, final int pingNumber) { - - final ClientConnectionDataInternal activeClientConnection = this.examSessionCacheService - .getClientConnection(connectionToken); - - if (activeClientConnection != null) { - activeClientConnection.notifyPing(timestamp, pingNumber); - } - } +// private void processPing(final String connectionToken, final long timestamp, final int pingNumber) { +// +// final ClientConnectionDataInternal activeClientConnection = this.examSessionCacheService +// .getClientConnection(connectionToken); +// +// if (activeClientConnection != null) { +// activeClientConnection.notifyPing(timestamp); +// } +// } private void missingPingUpdate(final ClientConnectionDataInternal connection) { if (connection.pingIndicator.changeOnIncident()) { diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractPingIndicator.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractPingIndicator.java index 14d0ed5b..b93e1aba 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractPingIndicator.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractPingIndicator.java @@ -27,7 +27,7 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator { return this.EMPTY_SET; } - public final void notifyPing(final long timestamp, final int pingNumber) { + public final void notifyPing(final long timestamp) { super.currentValue = timestamp; if (!this.cachingEnabled && super.ditributedIndicatorValueRecordId != null) { diff --git a/src/main/resources/config/application-dev.properties b/src/main/resources/config/application-dev.properties index adbcaac7..5d9a3dbe 100644 --- a/src/main/resources/config/application-dev.properties +++ b/src/main/resources/config/application-dev.properties @@ -6,6 +6,7 @@ server.address=localhost server.port=8080 server.servlet.context-path=/ server.tomcat.uri-encoding=UTF-8 +server.http2.enabled=true logging.level.ROOT=INFO logging.level.ch=INFO