SEBSERV-445 new SEB event store strategy with background tasks
This commit is contained in:
parent
314ce82c00
commit
e3b44cb60b
16 changed files with 384 additions and 141 deletions
|
@ -14,10 +14,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
|
|||
|
||||
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator;
|
||||
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator.IndicatorType;
|
||||
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent;
|
||||
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
|
||||
import ch.ethz.seb.sebserver.gbl.monitoring.IndicatorValue;
|
||||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
|
||||
|
||||
/** A client indicator is a indicator value holder for a specific Indicator
|
||||
* on a running client connection.
|
||||
|
@ -64,14 +62,9 @@ public interface ClientIndicator extends IndicatorValue {
|
|||
/** This gets called on a value change e.g.: when a ClientEvent was received.
|
||||
* NOTE: that this is called only on the same machine (server-instance) on that the ClientEvent was received.
|
||||
*
|
||||
* @param event The ClientEvent instance */
|
||||
void notifyValueChange(ClientEvent event);
|
||||
|
||||
/** This gets called on a value change e.g.: when a ClientEvent was received.
|
||||
* NOTE: that this is called only on the same machine (server-instance) on that the ClientEvent was received.
|
||||
*
|
||||
* @param clientEventRecord The ClientEventRecord instance */
|
||||
void notifyValueChange(ClientEventRecord clientEventRecord);
|
||||
* @param textValue The text based value
|
||||
* @param numValue The value number */
|
||||
void notifyValueChange(String textValue, double numValue);
|
||||
|
||||
/** This indicates if the indicator indicates an incident. This is the case if the actual indicator value
|
||||
* is above or below the max or min value defined by the indicator threshold settings.
|
||||
|
|
|
@ -14,6 +14,7 @@ import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
|
|||
|
||||
/** A exam session SEB client event handling strategy implements a certain strategy to
|
||||
* store ClientEvent that are coming in within the specified endpoint in height frequency. */
|
||||
@Deprecated
|
||||
public interface EventHandlingStrategy extends Consumer<ClientEventRecord> {
|
||||
|
||||
String EVENT_CONSUMER_STRATEGY_CONFIG_PROPERTY_KEY = "sebserver.webservice.api.exam.event-handling-strategy";
|
||||
|
|
|
@ -10,7 +10,6 @@ package ch.ethz.seb.sebserver.webservice.servicelayer.session;
|
|||
|
||||
import ch.ethz.seb.sebserver.gbl.model.session.ClientConnection;
|
||||
import ch.ethz.seb.sebserver.gbl.model.session.ClientConnectionData;
|
||||
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent;
|
||||
import ch.ethz.seb.sebserver.gbl.util.Result;
|
||||
|
||||
public interface SEBClientSessionService {
|
||||
|
@ -39,7 +38,13 @@ public interface SEBClientSessionService {
|
|||
*
|
||||
* @param connectionToken the connection token
|
||||
* @param event The SEB client event data */
|
||||
void notifyClientEvent(String connectionToken, final ClientEvent event);
|
||||
void notifyClientEvent(String connectionToken, String jsonBody);
|
||||
|
||||
// /** Notify a SEB client event for live indication and storing to database.
|
||||
// *
|
||||
// * @param connectionToken the connection token
|
||||
// * @param event The SEB client event data */
|
||||
// void notifyClientEvent(String connectionToken, final ClientEvent event);
|
||||
|
||||
/** This is used to confirm SEB instructions that must be confirmed by the SEB client.
|
||||
*
|
||||
|
|
|
@ -77,7 +77,7 @@ public class AsyncBatchEventSaveStrategy implements EventHandlingStrategy {
|
|||
private final BlockingDeque<ClientEventRecord> eventQueue = new LinkedBlockingDeque<>();
|
||||
private final BlockingDeque<ClientNotification> notificationQueue = new LinkedBlockingDeque<>();
|
||||
private boolean workersRunning = false;
|
||||
private boolean enabled = false;
|
||||
private final boolean enabled = false;
|
||||
|
||||
public AsyncBatchEventSaveStrategy(
|
||||
final SEBClientNotificationService sebClientNotificationService,
|
||||
|
@ -95,7 +95,8 @@ public class AsyncBatchEventSaveStrategy implements EventHandlingStrategy {
|
|||
|
||||
@Override
|
||||
public void enable() {
|
||||
this.enabled = true;
|
||||
log.info("AsyncBatchEventSaveStrategy is deprecated");
|
||||
//this.enabled = true;
|
||||
}
|
||||
|
||||
@EventListener(SEBServerInitEvent.class)
|
||||
|
|
|
@ -39,6 +39,7 @@ public class ClientConnectionDataInternal extends ClientConnectionData {
|
|||
|
||||
private static final Logger log = LoggerFactory.getLogger(ClientConnectionDataInternal.class);
|
||||
|
||||
// TODO why list for type? Is it possible to restrict to one per type?
|
||||
final EnumMap<EventType, Collection<ClientIndicator>> indicatorMapping;
|
||||
|
||||
PingIntervalClientIndicator pingIndicator = null;
|
||||
|
|
|
@ -19,6 +19,7 @@ import ch.ethz.seb.sebserver.webservice.servicelayer.session.EventHandlingStrate
|
|||
@Lazy
|
||||
@Service
|
||||
@WebServiceProfile
|
||||
@Deprecated
|
||||
public class EventHandlingStrategyFactory {
|
||||
|
||||
private final EventHandlingStrategy eventHandlingStrategy;
|
||||
|
|
|
@ -0,0 +1,278 @@
|
|||
/*
|
||||
* Copyright (c) 2023 ETH Zürich, Educational Development and Technology (LET)
|
||||
*
|
||||
* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
*/
|
||||
|
||||
package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.BlockingDeque;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import javax.annotation.PreDestroy;
|
||||
|
||||
import org.apache.ibatis.session.ExecutorType;
|
||||
import org.apache.ibatis.session.SqlSessionFactory;
|
||||
import org.mybatis.spring.SqlSessionTemplate;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
import org.springframework.transaction.TransactionDefinition;
|
||||
import org.springframework.transaction.support.TransactionTemplate;
|
||||
|
||||
import ch.ethz.seb.sebserver.gbl.api.JSONMapper;
|
||||
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent;
|
||||
import ch.ethz.seb.sebserver.gbl.model.session.ClientNotification;
|
||||
import ch.ethz.seb.sebserver.gbl.model.session.ClientNotification.NotificationType;
|
||||
import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile;
|
||||
import ch.ethz.seb.sebserver.gbl.util.Pair;
|
||||
import ch.ethz.seb.sebserver.gbl.util.Utils;
|
||||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordMapper;
|
||||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientNotificationService;
|
||||
|
||||
@Lazy
|
||||
@Component
|
||||
@WebServiceProfile
|
||||
public class SEBClientEventBatchStore {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SEBClientEventBatchStore.class);
|
||||
|
||||
private final SEBClientNotificationService sebClientNotificationService;
|
||||
private final SqlSessionFactory sqlSessionFactory;
|
||||
private final TransactionTemplate transactionTemplate;
|
||||
private final ExamSessionCacheService examSessionCacheService;
|
||||
private final JSONMapper jsonMapper;
|
||||
|
||||
private final SqlSessionTemplate sqlSessionTemplate;
|
||||
private final ClientEventRecordMapper clientEventMapper;
|
||||
|
||||
public SEBClientEventBatchStore(
|
||||
final SEBClientNotificationService sebClientNotificationService,
|
||||
final SqlSessionFactory sqlSessionFactory,
|
||||
final PlatformTransactionManager transactionManager,
|
||||
final ExamSessionCacheService examSessionCacheService,
|
||||
final JSONMapper jsonMapper) {
|
||||
|
||||
this.sebClientNotificationService = sebClientNotificationService;
|
||||
this.sqlSessionFactory = sqlSessionFactory;
|
||||
this.transactionTemplate = new TransactionTemplate(transactionManager);
|
||||
this.transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
|
||||
this.examSessionCacheService = examSessionCacheService;
|
||||
this.jsonMapper = jsonMapper;
|
||||
|
||||
this.sqlSessionTemplate = new SqlSessionTemplate(
|
||||
this.sqlSessionFactory,
|
||||
ExecutorType.BATCH);
|
||||
this.clientEventMapper = this.sqlSessionTemplate.getMapper(
|
||||
ClientEventRecordMapper.class);
|
||||
}
|
||||
|
||||
private final BlockingDeque<EventData> eventDataQueue = new LinkedBlockingDeque<>();
|
||||
private final Collection<EventData> events = new ArrayList<>();
|
||||
|
||||
public void accept(final String connectionToken, final String jsonBody) {
|
||||
this.eventDataQueue.add(new EventData(
|
||||
connectionToken,
|
||||
Utils.getMillisecondsNow(),
|
||||
jsonBody));
|
||||
}
|
||||
|
||||
public void accept(final EventData eventData) {
|
||||
this.eventDataQueue.add(eventData);
|
||||
}
|
||||
|
||||
@Scheduled(
|
||||
fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.task:1000}",
|
||||
initialDelay = 1000)
|
||||
public void processEvents() {
|
||||
|
||||
final long startTime = Utils.getMillisecondsNow();
|
||||
|
||||
//if (log.isDebugEnabled()) {
|
||||
final int size = this.eventDataQueue.size();
|
||||
if (size > 1000) {
|
||||
log.warn("******* There are more then 1000 SEB client logs in the waiting queue: {}", size);
|
||||
}
|
||||
//}
|
||||
|
||||
try {
|
||||
|
||||
this.events.clear();
|
||||
this.eventDataQueue.drainTo(this.events);
|
||||
|
||||
if (this.events.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
System.out.println("********** processing: " + this.events.size());
|
||||
|
||||
final List<ClientEventRecord> events = this.events
|
||||
.stream()
|
||||
.map(this::convertData)
|
||||
.map(this::storeNotifications)
|
||||
.filter(Objects::nonNull)
|
||||
.map(this::toEventRecord)
|
||||
.filter(Objects::nonNull)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
this.transactionTemplate
|
||||
.execute(status -> {
|
||||
events.stream().forEach(this.clientEventMapper::insert);
|
||||
return null;
|
||||
});
|
||||
|
||||
this.sqlSessionTemplate.flushStatements();
|
||||
|
||||
//if (log.isTraceEnabled()) {
|
||||
log.info("****** Processing SEB events tuck: {}", Utils.getMillisecondsNow() - startTime);
|
||||
//}
|
||||
|
||||
} catch (final Exception e) {
|
||||
log.error("Failed to process SEB events from eventDataQueue: ", e);
|
||||
}
|
||||
}
|
||||
|
||||
private EventData convertData(final EventData eventData) {
|
||||
if (eventData == null || eventData.jsonBody == null) {
|
||||
return eventData;
|
||||
}
|
||||
|
||||
try {
|
||||
|
||||
final ClientEvent eventModel = this.jsonMapper.readValue(
|
||||
eventData.jsonBody,
|
||||
ClientEvent.class);
|
||||
|
||||
eventData.setEvent(eventModel);
|
||||
return eventData;
|
||||
|
||||
} catch (final Exception e) {
|
||||
log.error("Failed to convert SEB event JSON data to internal data for: {}", eventData);
|
||||
return eventData;
|
||||
}
|
||||
}
|
||||
|
||||
private EventData storeNotifications(final EventData eventData) {
|
||||
try {
|
||||
|
||||
if (!eventData.event.eventType.isNotificationEvent) {
|
||||
return eventData;
|
||||
}
|
||||
|
||||
System.out.println("******* storeNotifications: " + eventData);
|
||||
|
||||
final ClientConnectionDataInternal clientConnection = this.examSessionCacheService
|
||||
.getClientConnection(eventData.connectionToken);
|
||||
|
||||
final Pair<NotificationType, String> typeAndPlainText =
|
||||
ClientNotification.extractTypeAndPlainText(eventData.event.text);
|
||||
final ClientNotification notification = new ClientNotification(
|
||||
eventData.event.id,
|
||||
clientConnection.getConnectionId(),
|
||||
eventData.event.eventType,
|
||||
eventData.event.getClientTime(),
|
||||
eventData.event.getServerTime(),
|
||||
(eventData.event.numValue != null) ? eventData.event.numValue.doubleValue() : null,
|
||||
typeAndPlainText.b,
|
||||
typeAndPlainText.a);
|
||||
|
||||
switch (notification.eventType) {
|
||||
case NOTIFICATION: {
|
||||
this.sebClientNotificationService.newNotification(notification);
|
||||
break;
|
||||
}
|
||||
case NOTIFICATION_CONFIRMED: {
|
||||
this.sebClientNotificationService.confirmPendingNotification(notification);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
}
|
||||
|
||||
// skip this for further event processing
|
||||
return null;
|
||||
|
||||
} catch (final Exception e) {
|
||||
log.error("Failed to verify and process notification for SEB event: {}", eventData);
|
||||
return eventData;
|
||||
}
|
||||
}
|
||||
|
||||
private ClientEventRecord toEventRecord(final EventData eventData) {
|
||||
try {
|
||||
final ClientConnectionDataInternal clientConnection = this.examSessionCacheService
|
||||
.getClientConnection(eventData.connectionToken);
|
||||
|
||||
if (clientConnection == null) {
|
||||
log.warn("Failed to retrieve ClientConnection for token {}. Skip this event",
|
||||
eventData.connectionToken);
|
||||
return null;
|
||||
}
|
||||
|
||||
// handle indicator update
|
||||
clientConnection
|
||||
.getIndicatorMapping(eventData.event.eventType)
|
||||
.forEach(indicator -> indicator.notifyValueChange(
|
||||
eventData.event.text,
|
||||
(eventData.event.numValue != null) ? eventData.event.numValue : Double.NaN));
|
||||
|
||||
return ClientEvent.toRecord(eventData.event, clientConnection.clientConnection.id);
|
||||
|
||||
} catch (final Exception e) {
|
||||
log.error(
|
||||
"Unexpected error while converting SEB event data to record for: {} Skip this event", eventData,
|
||||
e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
protected void shutdown() {
|
||||
log.info("Shutdown SEBClientEventBatchStore...");
|
||||
if (this.sqlSessionTemplate != null) {
|
||||
try {
|
||||
this.sqlSessionTemplate.destroy();
|
||||
} catch (final Exception e) {
|
||||
log.error("Failed to close and destroy the SqlSessionTemplate for this thread: {}",
|
||||
Thread.currentThread(),
|
||||
e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public final static class EventData {
|
||||
final String connectionToken;
|
||||
final Long serverTime;
|
||||
final String jsonBody;
|
||||
ClientEvent event;
|
||||
|
||||
public EventData(final String connectionToken, final Long serverTime, final String jsonBody) {
|
||||
this.connectionToken = connectionToken;
|
||||
this.serverTime = serverTime;
|
||||
this.jsonBody = jsonBody;
|
||||
this.event = null;
|
||||
}
|
||||
|
||||
public EventData(final String connectionToken, final Long serverTime, final ClientEvent event) {
|
||||
this.connectionToken = connectionToken;
|
||||
this.serverTime = serverTime;
|
||||
this.jsonBody = null;
|
||||
this.event = event;
|
||||
}
|
||||
|
||||
void setEvent(final ClientEvent event) {
|
||||
this.event = event;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -8,7 +8,6 @@
|
|||
|
||||
package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
@ -26,14 +25,13 @@ import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
|
|||
import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile;
|
||||
import ch.ethz.seb.sebserver.gbl.util.Result;
|
||||
import ch.ethz.seb.sebserver.gbl.util.Utils;
|
||||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ClientConnectionDAO;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.institution.SecurityKeyService;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.EventHandlingStrategy;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.ExamSessionService;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientInstructionService;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientSessionService;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientVersionService;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.SEBClientEventBatchStore.EventData;
|
||||
|
||||
@Lazy
|
||||
@Service
|
||||
|
@ -45,7 +43,7 @@ public class SEBClientSessionServiceImpl implements SEBClientSessionService {
|
|||
private final ClientConnectionDAO clientConnectionDAO;
|
||||
private final ExamSessionService examSessionService;
|
||||
private final ExamSessionCacheService examSessionCacheService;
|
||||
private final EventHandlingStrategy eventHandlingStrategy;
|
||||
private final SEBClientEventBatchStore sebClientEventBatchStore;
|
||||
private final SEBClientInstructionService sebInstructionService;
|
||||
private final ClientIndicatorFactory clientIndicatorFactory;
|
||||
private final InternalClientConnectionDataFactory internalClientConnectionDataFactory;
|
||||
|
@ -55,7 +53,7 @@ public class SEBClientSessionServiceImpl implements SEBClientSessionService {
|
|||
public SEBClientSessionServiceImpl(
|
||||
final ClientConnectionDAO clientConnectionDAO,
|
||||
final ExamSessionService examSessionService,
|
||||
final EventHandlingStrategyFactory eventHandlingStrategyFactory,
|
||||
final SEBClientEventBatchStore sebClientEventBatchStore,
|
||||
final SEBClientInstructionService sebInstructionService,
|
||||
final ClientIndicatorFactory clientIndicatorFactory,
|
||||
final InternalClientConnectionDataFactory internalClientConnectionDataFactory,
|
||||
|
@ -65,7 +63,7 @@ public class SEBClientSessionServiceImpl implements SEBClientSessionService {
|
|||
this.clientConnectionDAO = clientConnectionDAO;
|
||||
this.examSessionService = examSessionService;
|
||||
this.examSessionCacheService = examSessionService.getExamSessionCacheService();
|
||||
this.eventHandlingStrategy = eventHandlingStrategyFactory.get();
|
||||
this.sebClientEventBatchStore = sebClientEventBatchStore;
|
||||
this.sebInstructionService = sebInstructionService;
|
||||
this.clientIndicatorFactory = clientIndicatorFactory;
|
||||
this.internalClientConnectionDataFactory = internalClientConnectionDataFactory;
|
||||
|
@ -127,32 +125,8 @@ public class SEBClientSessionServiceImpl implements SEBClientSessionService {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void notifyClientEvent(
|
||||
final String connectionToken,
|
||||
final ClientEvent event) {
|
||||
|
||||
try {
|
||||
final ClientConnectionDataInternal activeClientConnection =
|
||||
this.examSessionService.getConnectionDataInternal(connectionToken);
|
||||
|
||||
if (activeClientConnection != null) {
|
||||
|
||||
// store event
|
||||
this.eventHandlingStrategy.accept(ClientEvent.toRecord(
|
||||
event,
|
||||
activeClientConnection.getConnectionId()));
|
||||
|
||||
// handle indicator update
|
||||
activeClientConnection
|
||||
.getIndicatorMapping(event.eventType)
|
||||
.forEach(indicator -> indicator.notifyValueChange(event));
|
||||
|
||||
} else {
|
||||
log.warn("No active ClientConnection found for connectionToken: {}", connectionToken);
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
log.error("Failed to process SEB client event: ", e);
|
||||
}
|
||||
public final void notifyClientEvent(final String connectionToken, final String jsonBody) {
|
||||
this.sebClientEventBatchStore.accept(connectionToken, jsonBody);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -183,22 +157,28 @@ public class SEBClientSessionServiceImpl implements SEBClientSessionService {
|
|||
|
||||
final boolean missingPing = connection.getMissingPing();
|
||||
final long millisecondsNow = Utils.getMillisecondsNow();
|
||||
final ClientEventRecord clientEventRecord = new ClientEventRecord(
|
||||
final String textValue = (missingPing) ? "Missing Client Ping" : "Client Ping Back To Normal";
|
||||
final double numValue = connection.pingIndicator.getValue();
|
||||
|
||||
final EventData eventData = new EventData(
|
||||
connection.getClientConnection().connectionToken,
|
||||
millisecondsNow,
|
||||
new ClientEvent(
|
||||
null,
|
||||
connection.getConnectionId(),
|
||||
(missingPing) ? EventType.ERROR_LOG.id : EventType.INFO_LOG.id,
|
||||
(missingPing) ? EventType.ERROR_LOG : EventType.INFO_LOG,
|
||||
millisecondsNow,
|
||||
millisecondsNow,
|
||||
new BigDecimal(connection.pingIndicator.getValue()),
|
||||
(missingPing) ? "Missing Client Ping" : "Client Ping Back To Normal");
|
||||
numValue,
|
||||
textValue));
|
||||
|
||||
// store event and and flush cache
|
||||
this.eventHandlingStrategy.accept(clientEventRecord);
|
||||
// store missing-ping or ping-back event
|
||||
this.sebClientEventBatchStore.accept(eventData);
|
||||
|
||||
// update indicators
|
||||
if (clientEventRecord.getType() != null && EventType.ERROR_LOG.id == clientEventRecord.getType()) {
|
||||
if (EventType.ERROR_LOG == eventData.event.eventType) {
|
||||
connection.getIndicatorMapping(EventType.ERROR_LOG)
|
||||
.forEach(indicator -> indicator.notifyValueChange(clientEventRecord));
|
||||
.forEach(indicator -> indicator.notifyValueChange(textValue, numValue));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,12 +15,10 @@ import org.mybatis.dynamic.sql.SqlCriterion;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent;
|
||||
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
|
||||
import ch.ethz.seb.sebserver.gbl.util.Utils;
|
||||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordDynamicSqlSupport;
|
||||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordMapper;
|
||||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
|
||||
|
||||
public abstract class AbstractLogLevelCountIndicator extends AbstractLogIndicator {
|
||||
|
||||
|
@ -38,13 +36,13 @@ public abstract class AbstractLogLevelCountIndicator extends AbstractLogIndicato
|
|||
}
|
||||
|
||||
@Override
|
||||
public void notifyValueChange(final ClientEvent event) {
|
||||
valueChanged(event.text);
|
||||
public final void notifyValueChange(final String textValue, final double numValue) {
|
||||
if (this.tags == null || this.tags.length == 0 || hasTag(textValue)) {
|
||||
if (super.ditributedIndicatorValueRecordId != null) {
|
||||
this.distributedIndicatorValueService.incrementIndicatorValue(super.ditributedIndicatorValueRecordId);
|
||||
}
|
||||
this.currentValue = getValue() + 1d;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyValueChange(final ClientEventRecord clientEventRecord) {
|
||||
valueChanged(clientEventRecord.getText());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -112,13 +110,4 @@ public abstract class AbstractLogLevelCountIndicator extends AbstractLogIndicato
|
|||
return result;
|
||||
}
|
||||
|
||||
private void valueChanged(final String eventText) {
|
||||
if (this.tags == null || this.tags.length == 0 || hasTag(eventText)) {
|
||||
if (super.ditributedIndicatorValueRecordId != null) {
|
||||
this.distributedIndicatorValueService.incrementIndicatorValue(super.ditributedIndicatorValueRecordId);
|
||||
}
|
||||
this.currentValue = getValue() + 1d;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@ import org.mybatis.dynamic.sql.SqlCriterion;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent;
|
||||
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
|
||||
import ch.ethz.seb.sebserver.gbl.util.Utils;
|
||||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordDynamicSqlSupport;
|
||||
|
@ -41,31 +40,19 @@ public abstract class AbstractLogNumberIndicator extends AbstractLogIndicator {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void notifyValueChange(final ClientEvent event) {
|
||||
valueChanged(event.text, event.getValue());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyValueChange(final ClientEventRecord clientEventRecord) {
|
||||
final BigDecimal numericValue = clientEventRecord.getNumericValue();
|
||||
if (numericValue != null) {
|
||||
valueChanged(clientEventRecord.getText(), numericValue.doubleValue());
|
||||
}
|
||||
}
|
||||
|
||||
private void valueChanged(final String text, final double value) {
|
||||
if (this.tags == null || this.tags.length == 0 || hasTag(text)) {
|
||||
public void notifyValueChange(final String textValue, final double numValue) {
|
||||
if (this.tags == null || this.tags.length == 0 || hasTag(textValue)) {
|
||||
if (super.ditributedIndicatorValueRecordId != null) {
|
||||
if (!this.distributedIndicatorValueService.updateIndicatorValueAsync(
|
||||
this.ditributedIndicatorValueRecordId,
|
||||
Double.valueOf(value).longValue())) {
|
||||
Double.valueOf(numValue).longValue())) {
|
||||
|
||||
this.currentValue = computeValueAt(Utils.getMillisecondsNow());
|
||||
} else {
|
||||
this.currentValue = value;
|
||||
this.currentValue = numValue;
|
||||
}
|
||||
} else {
|
||||
this.currentValue = value;
|
||||
this.currentValue = numValue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,8 +19,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
|
|||
import ch.ethz.seb.sebserver.gbl.Constants;
|
||||
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator;
|
||||
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator.IndicatorType;
|
||||
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent;
|
||||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
|
||||
|
||||
@Lazy
|
||||
@Component(IndicatorType.Names.LAST_PING)
|
||||
|
@ -92,12 +90,7 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void notifyValueChange(final ClientEvent event) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyValueChange(final ClientEventRecord clientEventRecord) {
|
||||
public void notifyValueChange(final String textValue, final double numValue) {
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -45,7 +45,6 @@ import ch.ethz.seb.sebserver.gbl.api.POSTMapper;
|
|||
import ch.ethz.seb.sebserver.gbl.async.AsyncServiceSpringConfig;
|
||||
import ch.ethz.seb.sebserver.gbl.model.exam.Exam;
|
||||
import ch.ethz.seb.sebserver.gbl.model.session.ClientConnection;
|
||||
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent;
|
||||
import ch.ethz.seb.sebserver.gbl.model.session.RunningExamInfo;
|
||||
import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile;
|
||||
import ch.ethz.seb.sebserver.gbl.util.Utils;
|
||||
|
@ -358,10 +357,10 @@ public class ExamAPI_V1_Controller {
|
|||
@ResponseStatus(value = HttpStatus.NO_CONTENT)
|
||||
public void event(
|
||||
@RequestHeader(name = API.EXAM_API_SEB_CONNECTION_TOKEN, required = true) final String connectionToken,
|
||||
@RequestBody(required = true) final ClientEvent event) {
|
||||
@RequestBody(required = true) final String jsonBody) {
|
||||
|
||||
this.sebClientSessionService
|
||||
.notifyClientEvent(connectionToken, event);
|
||||
.notifyClientEvent(connectionToken, jsonBody);
|
||||
}
|
||||
|
||||
private Long getInstitutionId(final Principal principal) {
|
||||
|
|
|
@ -25,7 +25,7 @@ sebserver.webservice.clean-db-on-startup=false
|
|||
|
||||
# webservice configuration
|
||||
sebserver.init.adminaccount.gen-on-init=false
|
||||
sebserver.webservice.distributed=true
|
||||
sebserver.webservice.distributed=false
|
||||
#sebserver.webservice.master.delay.threshold=10000
|
||||
sebserver.webservice.http.external.scheme=http
|
||||
sebserver.webservice.http.external.servername=localhost
|
||||
|
|
|
@ -72,7 +72,6 @@ import ch.ethz.seb.sebserver.gbl.model.user.UserLogActivityType;
|
|||
import ch.ethz.seb.sebserver.gbl.model.user.UserMod;
|
||||
import ch.ethz.seb.sebserver.gbl.model.user.UserRole;
|
||||
import ch.ethz.seb.sebserver.gbl.monitoring.SimpleIndicatorValue;
|
||||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.ClientIndicator;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.PendingNotificationIndication;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.ClientConnectionDataInternal;
|
||||
|
@ -406,15 +405,8 @@ public class ModelObjectJSONGenerator {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void notifyValueChange(final ClientEvent event) {
|
||||
public void notifyValueChange(final String textValue, final double numValue) {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyValueChange(final ClientEventRecord clientEventRecord) {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -42,6 +42,7 @@ import ch.ethz.seb.sebserver.webservice.servicelayer.lms.LmsAPIService;
|
|||
import ch.ethz.seb.sebserver.webservice.servicelayer.lms.LmsAPITemplate;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.ClientConnectionDataInternal;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.ExamSessionCacheService;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.SEBClientEventBatchStore;
|
||||
|
||||
@Sql(scripts = { "classpath:schema-test.sql", "classpath:data-test.sql", "classpath:data-test-additional.sql" })
|
||||
public class SebConnectionTest extends ExamAPIIntegrationTester {
|
||||
|
@ -56,6 +57,8 @@ public class SebConnectionTest extends ExamAPIIntegrationTester {
|
|||
private ExamDAO examDAO;
|
||||
@Autowired
|
||||
private LmsAPIService lmsAPIService;
|
||||
@Autowired
|
||||
private SEBClientEventBatchStore sebClientEventBatchStore;
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
|
@ -558,6 +561,8 @@ public class SebConnectionTest extends ExamAPIIntegrationTester {
|
|||
// check correct response
|
||||
assertTrue(HttpStatus.NO_CONTENT.value() == sendEvent.getStatus());
|
||||
|
||||
this.sebClientEventBatchStore.processEvents();
|
||||
|
||||
// check event stored on db
|
||||
List<ClientEventRecord> events = this.clientEventRecordMapper
|
||||
.selectByExample()
|
||||
|
@ -582,6 +587,7 @@ public class SebConnectionTest extends ExamAPIIntegrationTester {
|
|||
10000.0,
|
||||
"testEvent2");
|
||||
|
||||
this.sebClientEventBatchStore.processEvents();
|
||||
// check correct response
|
||||
assertTrue(HttpStatus.NO_CONTENT.value() == sendEvent.getStatus());
|
||||
|
||||
|
@ -616,7 +622,7 @@ public class SebConnectionTest extends ExamAPIIntegrationTester {
|
|||
"testEvent1");
|
||||
// check correct response
|
||||
assertTrue(HttpStatus.NO_CONTENT.value() == sendEvent.getStatus());
|
||||
|
||||
this.sebClientEventBatchStore.processEvents();
|
||||
final List<ClientEventRecord> events = this.clientEventRecordMapper
|
||||
.selectByExample()
|
||||
.build()
|
||||
|
|
|
@ -12,15 +12,12 @@ import static org.junit.Assert.*;
|
|||
|
||||
import java.util.Collection;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.test.context.jdbc.Sql;
|
||||
|
||||
import ch.ethz.seb.sebserver.gbl.async.AsyncServiceSpringConfig;
|
||||
import ch.ethz.seb.sebserver.gbl.api.JSONMapper;
|
||||
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator.IndicatorType;
|
||||
import ch.ethz.seb.sebserver.gbl.model.session.ClientConnection;
|
||||
import ch.ethz.seb.sebserver.gbl.model.session.ClientConnection.ConnectionStatus;
|
||||
|
@ -35,6 +32,7 @@ import ch.ethz.seb.sebserver.webservice.servicelayer.dao.FilterMap;
|
|||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.ClientIndicator;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientConnectionService;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientSessionService;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.SEBClientEventBatchStore;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator.AbstractLogIndicator;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator.AbstractLogLevelCountIndicator;
|
||||
|
||||
|
@ -50,8 +48,12 @@ public class ClientEventServiceTest extends AdministrationAPIIntegrationTester {
|
|||
@Autowired
|
||||
private SEBClientSessionService sebClientSessionService;
|
||||
@Autowired
|
||||
@Qualifier(AsyncServiceSpringConfig.EXAM_API_EXECUTOR_BEAN_NAME)
|
||||
private Executor executor;
|
||||
private SEBClientEventBatchStore sebClientEventBatchStore;
|
||||
// @Autowired
|
||||
// @Qualifier(AsyncServiceSpringConfig.EXAM_API_EXECUTOR_BEAN_NAME)
|
||||
// private Executor executor;
|
||||
@Autowired
|
||||
private JSONMapper jsonMapper;
|
||||
|
||||
@Test
|
||||
public void testCreateLogEvents() {
|
||||
|
@ -110,13 +112,15 @@ public class ClientEventServiceTest extends AdministrationAPIIntegrationTester {
|
|||
|
||||
this.sebClientSessionService.notifyClientEvent(
|
||||
"token1",
|
||||
new ClientEvent(null, connection.id, EventType.ERROR_LOG, 1L, 1L, 1.0, "some error"));
|
||||
writeValueAsString(
|
||||
new ClientEvent(null, connection.id, EventType.ERROR_LOG, 1L, 1L, 1.0, "some error")));
|
||||
waitForExecutor();
|
||||
assertEquals("1", IndicatorValue.getDisplayValue(clientIndicator, IndicatorType.ERROR_COUNT));
|
||||
|
||||
this.sebClientSessionService.notifyClientEvent(
|
||||
"token1",
|
||||
new ClientEvent(null, connection.id, EventType.ERROR_LOG, 1L, 1L, 1.0, "some error"));
|
||||
writeValueAsString(
|
||||
new ClientEvent(null, connection.id, EventType.ERROR_LOG, 1L, 1L, 1.0, "some error")));
|
||||
waitForExecutor();
|
||||
assertEquals("2", IndicatorValue.getDisplayValue(clientIndicator, IndicatorType.ERROR_COUNT));
|
||||
|
||||
|
@ -126,6 +130,15 @@ public class ClientEventServiceTest extends AdministrationAPIIntegrationTester {
|
|||
|
||||
}
|
||||
|
||||
private String writeValueAsString(final ClientEvent event) {
|
||||
try {
|
||||
return this.jsonMapper.writeValueAsString(event);
|
||||
} catch (final Exception e) {
|
||||
e.printStackTrace();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInfoLogWithTagCountIndicator() {
|
||||
|
||||
|
@ -156,33 +169,39 @@ public class ClientEventServiceTest extends AdministrationAPIIntegrationTester {
|
|||
|
||||
this.sebClientSessionService.notifyClientEvent(
|
||||
"token2",
|
||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "some error"));
|
||||
writeValueAsString(
|
||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "some error")));
|
||||
waitForExecutor();
|
||||
assertEquals("0", IndicatorValue.getDisplayValue(clientIndicator, IndicatorType.INFO_COUNT));
|
||||
this.sebClientSessionService.notifyClientEvent(
|
||||
"token2",
|
||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "<top> some error"));
|
||||
writeValueAsString(
|
||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "<top> some error")));
|
||||
waitForExecutor();
|
||||
assertEquals("1", IndicatorValue.getDisplayValue(clientIndicator, IndicatorType.INFO_COUNT));
|
||||
this.sebClientSessionService.notifyClientEvent(
|
||||
"token2",
|
||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "some error"));
|
||||
writeValueAsString(
|
||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "some error")));
|
||||
waitForExecutor();
|
||||
assertEquals("1", IndicatorValue.getDisplayValue(clientIndicator, IndicatorType.INFO_COUNT));
|
||||
this.sebClientSessionService.notifyClientEvent(
|
||||
"token2",
|
||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "<vip> some error"));
|
||||
writeValueAsString(
|
||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "<vip> some error")));
|
||||
waitForExecutor();
|
||||
assertEquals("2", IndicatorValue.getDisplayValue(clientIndicator, IndicatorType.INFO_COUNT));
|
||||
this.sebClientSessionService.notifyClientEvent(
|
||||
"token2",
|
||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "some error"));
|
||||
writeValueAsString(
|
||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "some error")));
|
||||
waitForExecutor();
|
||||
assertEquals("2", IndicatorValue.getDisplayValue(clientIndicator, IndicatorType.INFO_COUNT));
|
||||
|
||||
this.sebClientSessionService.notifyClientEvent(
|
||||
"token2",
|
||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "<vip> some error"));
|
||||
writeValueAsString(
|
||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "<vip> some error")));
|
||||
waitForExecutor();
|
||||
// test reset indicator value and load it from persistent storage
|
||||
((AbstractLogLevelCountIndicator) clientIndicator).reset();
|
||||
|
@ -191,13 +210,7 @@ public class ClientEventServiceTest extends AdministrationAPIIntegrationTester {
|
|||
}
|
||||
|
||||
private void waitForExecutor() {
|
||||
try {
|
||||
while (((ThreadPoolTaskExecutor) this.executor).getActiveCount() > 0) {
|
||||
Thread.sleep(20);
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
this.sebClientEventBatchStore.processEvents();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -231,23 +244,27 @@ public class ClientEventServiceTest extends AdministrationAPIIntegrationTester {
|
|||
|
||||
this.sebClientSessionService.notifyClientEvent(
|
||||
"token3",
|
||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "some info other"));
|
||||
writeValueAsString(
|
||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "some info other")));
|
||||
waitForExecutor();
|
||||
this.sebClientSessionService.notifyClientEvent(
|
||||
"token3",
|
||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "<vip> some info other"));
|
||||
writeValueAsString(new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0,
|
||||
"<vip> some info other")));
|
||||
waitForExecutor();
|
||||
assertEquals("--", IndicatorValue.getDisplayValue(clientIndicator, IndicatorType.BATTERY_STATUS));
|
||||
|
||||
this.sebClientSessionService.notifyClientEvent(
|
||||
"token3",
|
||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 90.0, "<battery> some info other"));
|
||||
writeValueAsString(new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 90.0,
|
||||
"<battery> some info other")));
|
||||
waitForExecutor();
|
||||
assertEquals("90", IndicatorValue.getDisplayValue(clientIndicator, IndicatorType.BATTERY_STATUS));
|
||||
|
||||
this.sebClientSessionService.notifyClientEvent(
|
||||
"token3",
|
||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 40.0, "<battery> some info other"));
|
||||
writeValueAsString(new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 40.0,
|
||||
"<battery> some info other")));
|
||||
waitForExecutor();
|
||||
assertEquals("40", IndicatorValue.getDisplayValue(clientIndicator, IndicatorType.BATTERY_STATUS));
|
||||
|
||||
|
|
Loading…
Reference in a new issue