SEBSERV-445 pings go to working queue just after arriving too now
This commit is contained in:
		
							parent
							
								
									e3b44cb60b
								
							
						
					
					
						commit
						a5335f1c98
					
				
					 7 changed files with 160 additions and 34 deletions
				
			
		| 
						 | 
					@ -40,12 +40,6 @@ public interface SEBClientSessionService {
 | 
				
			||||||
     * @param event The SEB client event data */
 | 
					     * @param event The SEB client event data */
 | 
				
			||||||
    void notifyClientEvent(String connectionToken, String jsonBody);
 | 
					    void notifyClientEvent(String connectionToken, String jsonBody);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
//    /** Notify a SEB client event for live indication and storing to database.
 | 
					 | 
				
			||||||
//     *
 | 
					 | 
				
			||||||
//     * @param connectionToken the connection token
 | 
					 | 
				
			||||||
//     * @param event The SEB client event data */
 | 
					 | 
				
			||||||
//    void notifyClientEvent(String connectionToken, final ClientEvent event);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    /** This is used to confirm SEB instructions that must be confirmed by the SEB client.
 | 
					    /** This is used to confirm SEB instructions that must be confirmed by the SEB client.
 | 
				
			||||||
     *
 | 
					     *
 | 
				
			||||||
     * @param connectionToken The SEB client connection token
 | 
					     * @param connectionToken The SEB client connection token
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -86,9 +86,9 @@ public class ClientConnectionDataInternal extends ClientConnectionData {
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    public final void notifyPing(final long timestamp, final int pingNumber) {
 | 
					    public final void notifyPing(final long timestamp) {
 | 
				
			||||||
        if (this.pingIndicator != null) {
 | 
					        if (this.pingIndicator != null) {
 | 
				
			||||||
            this.pingIndicator.notifyPing(timestamp, pingNumber);
 | 
					            this.pingIndicator.notifyPing(timestamp);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -93,18 +93,16 @@ public class SEBClientEventBatchStore {
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Scheduled(
 | 
					    @Scheduled(
 | 
				
			||||||
            fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.task:1000}",
 | 
					            fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}",
 | 
				
			||||||
            initialDelay = 1000)
 | 
					            initialDelay = 1000)
 | 
				
			||||||
    public void processEvents() {
 | 
					    public void processEvents() {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        final long startTime = Utils.getMillisecondsNow();
 | 
					        final long startTime = Utils.getMillisecondsNow();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        //if (log.isDebugEnabled()) {
 | 
					 | 
				
			||||||
        final int size = this.eventDataQueue.size();
 | 
					        final int size = this.eventDataQueue.size();
 | 
				
			||||||
        if (size > 1000) {
 | 
					        if (size > 1000) {
 | 
				
			||||||
            log.warn("******* There are more then 1000 SEB client logs in the waiting queue: {}", size);
 | 
					            log.warn("******* There are more then 1000 SEB client logs in the waiting queue: {}", size);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        //}
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        try {
 | 
					        try {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -115,8 +113,6 @@ public class SEBClientEventBatchStore {
 | 
				
			||||||
                return;
 | 
					                return;
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            System.out.println("********** processing: " + this.events.size());
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            final List<ClientEventRecord> events = this.events
 | 
					            final List<ClientEventRecord> events = this.events
 | 
				
			||||||
                    .stream()
 | 
					                    .stream()
 | 
				
			||||||
                    .map(this::convertData)
 | 
					                    .map(this::convertData)
 | 
				
			||||||
| 
						 | 
					@ -134,9 +130,14 @@ public class SEBClientEventBatchStore {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            this.sqlSessionTemplate.flushStatements();
 | 
					            this.sqlSessionTemplate.flushStatements();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            //if (log.isTraceEnabled()) {
 | 
					            if (log.isTraceEnabled()) {
 | 
				
			||||||
            log.info("****** Processing SEB events tuck: {}", Utils.getMillisecondsNow() - startTime);
 | 
					                log.trace("Processing {} SEB events tuck: {}",
 | 
				
			||||||
            //}
 | 
					                        this.events.size(),
 | 
				
			||||||
 | 
					                        Utils.getMillisecondsNow() - startTime);
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					            // TODO just for debugging
 | 
				
			||||||
 | 
					            System.out.println("***** Processing " + this.events.size() + " SEB events tuck: "
 | 
				
			||||||
 | 
					                    + (Utils.getMillisecondsNow() - startTime));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        } catch (final Exception e) {
 | 
					        } catch (final Exception e) {
 | 
				
			||||||
            log.error("Failed to process SEB events from eventDataQueue: ", e);
 | 
					            log.error("Failed to process SEB events from eventDataQueue: ", e);
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,117 @@
 | 
				
			||||||
 | 
					/*
 | 
				
			||||||
 | 
					 * Copyright (c) 2023 ETH Zürich, Educational Development and Technology (LET)
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 * This Source Code Form is subject to the terms of the Mozilla Public
 | 
				
			||||||
 | 
					 * License, v. 2.0. If a copy of the MPL was not distributed with this
 | 
				
			||||||
 | 
					 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import java.util.HashMap;
 | 
				
			||||||
 | 
					import java.util.Map;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import org.apache.commons.lang3.StringUtils;
 | 
				
			||||||
 | 
					import org.ehcache.impl.internal.concurrent.ConcurrentHashMap;
 | 
				
			||||||
 | 
					import org.slf4j.Logger;
 | 
				
			||||||
 | 
					import org.slf4j.LoggerFactory;
 | 
				
			||||||
 | 
					import org.springframework.context.annotation.Lazy;
 | 
				
			||||||
 | 
					import org.springframework.scheduling.annotation.Scheduled;
 | 
				
			||||||
 | 
					import org.springframework.stereotype.Component;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile;
 | 
				
			||||||
 | 
					import ch.ethz.seb.sebserver.gbl.util.Utils;
 | 
				
			||||||
 | 
					import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientInstructionService;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@Lazy
 | 
				
			||||||
 | 
					@Component
 | 
				
			||||||
 | 
					@WebServiceProfile
 | 
				
			||||||
 | 
					public class SEBClientPingService {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    private static final Logger log = LoggerFactory.getLogger(SEBClientPingService.class);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    private final ExamSessionCacheService examSessionCacheService;
 | 
				
			||||||
 | 
					    private final SEBClientInstructionService sebClientInstructionService;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    private final Map<String, String> pings = new ConcurrentHashMap<>();
 | 
				
			||||||
 | 
					    private final Map<String, String> instructions = new ConcurrentHashMap<>();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    public SEBClientPingService(
 | 
				
			||||||
 | 
					            final ExamSessionCacheService examSessionCacheService,
 | 
				
			||||||
 | 
					            final SEBClientInstructionService sebClientInstructionService) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        this.examSessionCacheService = examSessionCacheService;
 | 
				
			||||||
 | 
					        this.sebClientInstructionService = sebClientInstructionService;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Scheduled(
 | 
				
			||||||
 | 
					            fixedDelayString = "${sebserver.webservice.api.exam.session.ping.batch.interval:500}",
 | 
				
			||||||
 | 
					            initialDelay = 1000)
 | 
				
			||||||
 | 
					    public void processPings() {
 | 
				
			||||||
 | 
					        if (this.pings.isEmpty()) {
 | 
				
			||||||
 | 
					            return;
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        final long startTime = Utils.getMillisecondsNow();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        final int size = this.pings.size();
 | 
				
			||||||
 | 
					        if (size > 1000) {
 | 
				
			||||||
 | 
					            log.warn("******* There are more then 1000 SEB client logs in the waiting queue: {}", size);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        try {
 | 
				
			||||||
 | 
					            final Map<String, String> pp = new HashMap<>(this.pings);
 | 
				
			||||||
 | 
					            this.pings.clear();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            pp.entrySet()
 | 
				
			||||||
 | 
					                    .stream()
 | 
				
			||||||
 | 
					                    .forEach(entry -> processPing(entry.getKey(), entry.getValue(), startTime));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            if (log.isTraceEnabled()) {
 | 
				
			||||||
 | 
					                log.trace("****** Processing {} SEB pings tuck: {}", Utils.getMillisecondsNow() - startTime);
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					            // TODO just for debugging
 | 
				
			||||||
 | 
					            System.out.println("***** Processing " + size + " SEB pings tuck: "
 | 
				
			||||||
 | 
					                    + (Utils.getMillisecondsNow() - startTime));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        } catch (final Exception e) {
 | 
				
			||||||
 | 
					            log.error("Failed to process SEB pings from pingDataQueue: ", e);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    public String notifyPing(
 | 
				
			||||||
 | 
					            final String connectionToken,
 | 
				
			||||||
 | 
					            final String instructionConfirm) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if (instructionConfirm != null) {
 | 
				
			||||||
 | 
					            this.pings.put(connectionToken, instructionConfirm);
 | 
				
			||||||
 | 
					        } else if (!this.pings.containsKey(connectionToken)) {
 | 
				
			||||||
 | 
					            this.pings.put(connectionToken, StringUtils.EMPTY);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        return this.instructions.remove(connectionToken);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    private void processPing(
 | 
				
			||||||
 | 
					            final String connectionToken,
 | 
				
			||||||
 | 
					            final String instructionConfirm,
 | 
				
			||||||
 | 
					            final long timestamp) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        final ClientConnectionDataInternal activeClientConnection = this.examSessionCacheService
 | 
				
			||||||
 | 
					                .getClientConnection(connectionToken);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if (activeClientConnection != null) {
 | 
				
			||||||
 | 
					            activeClientConnection.notifyPing(timestamp);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if (instructionConfirm != StringUtils.EMPTY) {
 | 
				
			||||||
 | 
					            this.sebClientInstructionService.confirmInstructionDone(connectionToken, instructionConfirm);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        final String instructionJSON = this.sebClientInstructionService.getInstructionJSON(connectionToken);
 | 
				
			||||||
 | 
					        if (instructionJSON != null) {
 | 
				
			||||||
 | 
					            this.instructions.put(connectionToken, instructionJSON);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -49,6 +49,7 @@ public class SEBClientSessionServiceImpl implements SEBClientSessionService {
 | 
				
			||||||
    private final InternalClientConnectionDataFactory internalClientConnectionDataFactory;
 | 
					    private final InternalClientConnectionDataFactory internalClientConnectionDataFactory;
 | 
				
			||||||
    private final SecurityKeyService securityKeyService;
 | 
					    private final SecurityKeyService securityKeyService;
 | 
				
			||||||
    private final SEBClientVersionService sebClientVersionService;
 | 
					    private final SEBClientVersionService sebClientVersionService;
 | 
				
			||||||
 | 
					    private final SEBClientPingService sebClientPingService;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    public SEBClientSessionServiceImpl(
 | 
					    public SEBClientSessionServiceImpl(
 | 
				
			||||||
            final ClientConnectionDAO clientConnectionDAO,
 | 
					            final ClientConnectionDAO clientConnectionDAO,
 | 
				
			||||||
| 
						 | 
					@ -58,7 +59,8 @@ public class SEBClientSessionServiceImpl implements SEBClientSessionService {
 | 
				
			||||||
            final ClientIndicatorFactory clientIndicatorFactory,
 | 
					            final ClientIndicatorFactory clientIndicatorFactory,
 | 
				
			||||||
            final InternalClientConnectionDataFactory internalClientConnectionDataFactory,
 | 
					            final InternalClientConnectionDataFactory internalClientConnectionDataFactory,
 | 
				
			||||||
            final SecurityKeyService securityKeyService,
 | 
					            final SecurityKeyService securityKeyService,
 | 
				
			||||||
            final SEBClientVersionService sebClientVersionService) {
 | 
					            final SEBClientVersionService sebClientVersionService,
 | 
				
			||||||
 | 
					            final SEBClientPingService sebClientPingService) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        this.clientConnectionDAO = clientConnectionDAO;
 | 
					        this.clientConnectionDAO = clientConnectionDAO;
 | 
				
			||||||
        this.examSessionService = examSessionService;
 | 
					        this.examSessionService = examSessionService;
 | 
				
			||||||
| 
						 | 
					@ -69,6 +71,7 @@ public class SEBClientSessionServiceImpl implements SEBClientSessionService {
 | 
				
			||||||
        this.internalClientConnectionDataFactory = internalClientConnectionDataFactory;
 | 
					        this.internalClientConnectionDataFactory = internalClientConnectionDataFactory;
 | 
				
			||||||
        this.securityKeyService = securityKeyService;
 | 
					        this.securityKeyService = securityKeyService;
 | 
				
			||||||
        this.sebClientVersionService = sebClientVersionService;
 | 
					        this.sebClientVersionService = sebClientVersionService;
 | 
				
			||||||
 | 
					        this.sebClientPingService = sebClientPingService;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
| 
						 | 
					@ -115,15 +118,25 @@ public class SEBClientSessionServiceImpl implements SEBClientSessionService {
 | 
				
			||||||
            final int pingNumber,
 | 
					            final int pingNumber,
 | 
				
			||||||
            final String instructionConfirm) {
 | 
					            final String instructionConfirm) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        processPing(connectionToken, timestamp, pingNumber);
 | 
					        return this.sebClientPingService.notifyPing(connectionToken, instructionConfirm);
 | 
				
			||||||
 | 
					 | 
				
			||||||
        if (instructionConfirm != null) {
 | 
					 | 
				
			||||||
            this.sebInstructionService.confirmInstructionDone(connectionToken, instructionConfirm);
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        return this.sebInstructionService.getInstructionJSON(connectionToken);
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					//    @Override
 | 
				
			||||||
 | 
					//    public String notifyPing(
 | 
				
			||||||
 | 
					//            final String connectionToken,
 | 
				
			||||||
 | 
					//            final long timestamp,
 | 
				
			||||||
 | 
					//            final int pingNumber,
 | 
				
			||||||
 | 
					//            final String instructionConfirm) {
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					//        processPing(connectionToken, timestamp, pingNumber);
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					//        if (instructionConfirm != null) {
 | 
				
			||||||
 | 
					//            this.sebInstructionService.confirmInstructionDone(connectionToken, instructionConfirm);
 | 
				
			||||||
 | 
					//        }
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					//        return this.sebInstructionService.getInstructionJSON(connectionToken);
 | 
				
			||||||
 | 
					//    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
    public final void notifyClientEvent(final String connectionToken, final String jsonBody) {
 | 
					    public final void notifyClientEvent(final String connectionToken, final String jsonBody) {
 | 
				
			||||||
        this.sebClientEventBatchStore.accept(connectionToken, jsonBody);
 | 
					        this.sebClientEventBatchStore.accept(connectionToken, jsonBody);
 | 
				
			||||||
| 
						 | 
					@ -142,15 +155,15 @@ public class SEBClientSessionServiceImpl implements SEBClientSessionService {
 | 
				
			||||||
                this.internalClientConnectionDataFactory.getGroupIds(clientConnection)));
 | 
					                this.internalClientConnectionDataFactory.getGroupIds(clientConnection)));
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private void processPing(final String connectionToken, final long timestamp, final int pingNumber) {
 | 
					//    private void processPing(final String connectionToken, final long timestamp, final int pingNumber) {
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
        final ClientConnectionDataInternal activeClientConnection = this.examSessionCacheService
 | 
					//        final ClientConnectionDataInternal activeClientConnection = this.examSessionCacheService
 | 
				
			||||||
                .getClientConnection(connectionToken);
 | 
					//                .getClientConnection(connectionToken);
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
        if (activeClientConnection != null) {
 | 
					//        if (activeClientConnection != null) {
 | 
				
			||||||
            activeClientConnection.notifyPing(timestamp, pingNumber);
 | 
					//            activeClientConnection.notifyPing(timestamp);
 | 
				
			||||||
        }
 | 
					//        }
 | 
				
			||||||
    }
 | 
					//    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private void missingPingUpdate(final ClientConnectionDataInternal connection) {
 | 
					    private void missingPingUpdate(final ClientConnectionDataInternal connection) {
 | 
				
			||||||
        if (connection.pingIndicator.changeOnIncident()) {
 | 
					        if (connection.pingIndicator.changeOnIncident()) {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -27,7 +27,7 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator {
 | 
				
			||||||
        return this.EMPTY_SET;
 | 
					        return this.EMPTY_SET;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    public final void notifyPing(final long timestamp, final int pingNumber) {
 | 
					    public final void notifyPing(final long timestamp) {
 | 
				
			||||||
        super.currentValue = timestamp;
 | 
					        super.currentValue = timestamp;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if (!this.cachingEnabled && super.ditributedIndicatorValueRecordId != null) {
 | 
					        if (!this.cachingEnabled && super.ditributedIndicatorValueRecordId != null) {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -6,6 +6,7 @@ server.address=localhost
 | 
				
			||||||
server.port=8080
 | 
					server.port=8080
 | 
				
			||||||
server.servlet.context-path=/
 | 
					server.servlet.context-path=/
 | 
				
			||||||
server.tomcat.uri-encoding=UTF-8
 | 
					server.tomcat.uri-encoding=UTF-8
 | 
				
			||||||
 | 
					server.http2.enabled=true
 | 
				
			||||||
 | 
					
 | 
				
			||||||
logging.level.ROOT=INFO
 | 
					logging.level.ROOT=INFO
 | 
				
			||||||
logging.level.ch=INFO
 | 
					logging.level.ch=INFO
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		
		Reference in a new issue