SEBSERV-250 service for all indicators with separated table and update
This commit is contained in:
parent
bf0c5cf07f
commit
9fbc5bdbc1
33 changed files with 625 additions and 634 deletions
|
@ -34,13 +34,15 @@ public final class Indicator implements Entity {
|
|||
public static final String FILTER_ATTR_EXAM_ID = "examId";
|
||||
|
||||
public enum IndicatorType {
|
||||
LAST_PING(Names.LAST_PING, false, true, true, false, false),
|
||||
ERROR_COUNT(Names.ERROR_COUNT, false, true, false, true, false),
|
||||
WARN_COUNT(Names.WARN_COUNT, false, true, false, true, false),
|
||||
INFO_COUNT(Names.INFO_COUNT, false, true, false, true, false),
|
||||
BATTERY_STATUS(Names.BATTERY_STATUS, true, true, true, true, true),
|
||||
WLAN_STATUS(Names.WLAN_STATUS, true, true, true, true, true);
|
||||
NONE(0, "UNKNOWN", false, false, false, false, false),
|
||||
LAST_PING(1, Names.LAST_PING, false, true, true, false, false),
|
||||
ERROR_COUNT(2, Names.ERROR_COUNT, false, true, false, true, false),
|
||||
WARN_COUNT(3, Names.WARN_COUNT, false, true, false, true, false),
|
||||
INFO_COUNT(4, Names.INFO_COUNT, false, true, false, true, false),
|
||||
BATTERY_STATUS(5, Names.BATTERY_STATUS, true, true, true, true, true),
|
||||
WLAN_STATUS(6, Names.WLAN_STATUS, true, true, true, true, true);
|
||||
|
||||
public final int id;
|
||||
public final String name;
|
||||
public final boolean inverse;
|
||||
public final boolean integerValue;
|
||||
|
@ -49,6 +51,7 @@ public final class Indicator implements Entity {
|
|||
public final boolean tagsReadonly;
|
||||
|
||||
IndicatorType(
|
||||
final int id,
|
||||
final String name,
|
||||
final boolean inverse,
|
||||
final boolean integerValue,
|
||||
|
@ -56,6 +59,7 @@ public final class Indicator implements Entity {
|
|||
final boolean tags,
|
||||
final boolean tagsReadonly) {
|
||||
|
||||
this.id = id;
|
||||
this.name = name;
|
||||
this.inverse = inverse;
|
||||
this.integerValue = integerValue;
|
||||
|
|
|
@ -95,7 +95,7 @@ public class ClientConnectionData {
|
|||
while (i1.hasNext()) {
|
||||
final IndicatorValue iv1 = i1.next();
|
||||
final IndicatorValue iv2 = i2.next();
|
||||
if (iv1.getType() != iv2.getType() || Math.abs(iv1.getValue() - iv2.getValue()) > 0.1) {
|
||||
if (iv1.getIndicatorId() != iv2.getIndicatorId() || Math.abs(iv1.getValue() - iv2.getValue()) > 0.1) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,22 +22,16 @@ public interface IndicatorValue extends IndicatorValueHolder {
|
|||
@JsonProperty(SimpleIndicatorValue.ATTR_INDICATOR_ID)
|
||||
Long getIndicatorId();
|
||||
|
||||
/** Use this to get the type of indicator this value was computed from.
|
||||
*
|
||||
* @return the type of indicator this value was computed from. */
|
||||
@JsonProperty(SimpleIndicatorValue.ATTR_INDICATOR_TYPE)
|
||||
IndicatorType getType();
|
||||
|
||||
/** Use this to get the display value of the value of given IndicatorValue.
|
||||
* Since the internal value is a double this gets the correct display value for the IndicatorType
|
||||
*
|
||||
* @param indicatorValue The indicator value instance
|
||||
* @return the display value of the given IndicatorValue */
|
||||
static String getDisplayValue(final IndicatorValue indicatorValue) {
|
||||
static String getDisplayValue(final IndicatorValue indicatorValue, final IndicatorType type) {
|
||||
if (Double.isNaN(indicatorValue.getValue())) {
|
||||
return Constants.EMPTY_NOTE;
|
||||
}
|
||||
if (indicatorValue.getType().integerValue) {
|
||||
if (type.integerValue) {
|
||||
return String.valueOf((int) indicatorValue.getValue());
|
||||
} else {
|
||||
return String.valueOf(indicatorValue.getValue());
|
||||
|
|
|
@ -11,25 +11,19 @@ package ch.ethz.seb.sebserver.gbl.model.session;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator.IndicatorType;
|
||||
|
||||
public final class SimpleIndicatorValue implements IndicatorValue {
|
||||
|
||||
@JsonProperty(ATTR_INDICATOR_ID)
|
||||
public final Long indicatorId;
|
||||
@JsonProperty(ATTR_INDICATOR_TYPE)
|
||||
public final IndicatorType type;
|
||||
@JsonProperty(ATTR_INDICATOR_VALUE)
|
||||
public final double value;
|
||||
|
||||
@JsonCreator
|
||||
public SimpleIndicatorValue(
|
||||
@JsonProperty(ATTR_INDICATOR_ID) final Long indicatorId,
|
||||
@JsonProperty(ATTR_INDICATOR_TYPE) final IndicatorType type,
|
||||
@JsonProperty(ATTR_INDICATOR_VALUE) final double value) {
|
||||
|
||||
this.indicatorId = indicatorId;
|
||||
this.type = type;
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
|
@ -38,11 +32,6 @@ public final class SimpleIndicatorValue implements IndicatorValue {
|
|||
return this.indicatorId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndicatorType getType() {
|
||||
return this.type;
|
||||
}
|
||||
|
||||
@Override
|
||||
public double getValue() {
|
||||
return this.value;
|
||||
|
@ -53,8 +42,6 @@ public final class SimpleIndicatorValue implements IndicatorValue {
|
|||
final StringBuilder builder = new StringBuilder();
|
||||
builder.append("SimpleIndicatorValue [indicatorId=");
|
||||
builder.append(this.indicatorId);
|
||||
builder.append(", type=");
|
||||
builder.append(this.type);
|
||||
builder.append(", value=");
|
||||
builder.append(this.value);
|
||||
builder.append("]");
|
||||
|
|
|
@ -196,7 +196,7 @@ public class ClientConnectionDetails {
|
|||
.forEach(indValue -> {
|
||||
final IndicatorData indData = this.indicatorMapping.get(indValue.getIndicatorId());
|
||||
final double value = indValue.getValue();
|
||||
final String displayValue = IndicatorValue.getDisplayValue(indValue);
|
||||
final String displayValue = IndicatorValue.getDisplayValue(indValue, indData.indicator.type);
|
||||
|
||||
if (!this.connectionData.clientConnection.status.clientActiveStatus) {
|
||||
|
||||
|
|
|
@ -602,12 +602,14 @@ public final class ClientConnectionTable {
|
|||
if (!this.connectionData.clientConnection.status.clientActiveStatus) {
|
||||
final String value = (indicatorData.indicator.type.showOnlyInActiveState)
|
||||
? Constants.EMPTY_NOTE
|
||||
: IndicatorValue.getDisplayValue(indicatorValue);
|
||||
: IndicatorValue.getDisplayValue(indicatorValue, indicatorData.indicator.type);
|
||||
tableItem.setText(indicatorData.tableIndex, value);
|
||||
tableItem.setBackground(indicatorData.tableIndex, indicatorData.defaultColor);
|
||||
tableItem.setForeground(indicatorData.tableIndex, indicatorData.defaultTextColor);
|
||||
} else {
|
||||
tableItem.setText(indicatorData.tableIndex, IndicatorValue.getDisplayValue(indicatorValue));
|
||||
tableItem.setText(indicatorData.tableIndex, IndicatorValue.getDisplayValue(
|
||||
indicatorValue,
|
||||
indicatorData.indicator.type));
|
||||
final int weight = this.indicatorWeights[indicatorData.index];
|
||||
if (weight >= 0 && weight < indicatorData.thresholdColor.length) {
|
||||
final ThresholdColor thresholdColor = indicatorData.thresholdColor[weight];
|
||||
|
|
|
@ -60,7 +60,7 @@ public class WebserviceInfo {
|
|||
private final boolean isDistributed;
|
||||
private final String webserviceUUID;
|
||||
|
||||
private final long distributedPingUpdateInterval;
|
||||
private final long distributedUpdateInterval;
|
||||
|
||||
private Map<String, String> lmsExternalAddressAlias;
|
||||
|
||||
|
@ -76,8 +76,8 @@ public class WebserviceInfo {
|
|||
this.discoveryEndpoint = environment.getRequiredProperty(WEB_SERVICE_EXAM_API_DISCOVERY_ENDPOINT_KEY);
|
||||
this.contextPath = environment.getProperty(WEB_SERVICE_CONTEXT_PATH, "");
|
||||
|
||||
this.distributedPingUpdateInterval = environment.getProperty(
|
||||
"sebserver.webservice.distributed.pingUpdate",
|
||||
this.distributedUpdateInterval = environment.getProperty(
|
||||
"sebserver.webservice.distributed.updateInterval",
|
||||
Long.class,
|
||||
3000L);
|
||||
|
||||
|
@ -167,8 +167,8 @@ public class WebserviceInfo {
|
|||
return this.serverURLPrefix + this.discoveryEndpoint;
|
||||
}
|
||||
|
||||
public long getDistributedPingUpdateInterval() {
|
||||
return this.distributedPingUpdateInterval;
|
||||
public long getDistributedUpdateInterval() {
|
||||
return this.distributedUpdateInterval;
|
||||
}
|
||||
|
||||
public String getLocalHostName() {
|
||||
|
|
|
@ -146,10 +146,6 @@ public class WebserviceInit implements ApplicationListener<ApplicationReadyEvent
|
|||
SEBServerInit.INIT_LOGGER.info("----> *********************************************************");
|
||||
SEBServerInit.INIT_LOGGER.info("----> *** Webservice successfully started up! ***");
|
||||
SEBServerInit.INIT_LOGGER.info("----> *********************************************************");
|
||||
|
||||
SEBServerInit.INIT_LOGGER.info("----> log4j2.formatMsgNoLookups = {}",
|
||||
this.environment.getProperty("log4j2.formatMsgNoLookups", "none"));
|
||||
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
|
|
|
@ -18,6 +18,7 @@ import org.apache.ibatis.annotations.ConstructorArgs;
|
|||
import org.apache.ibatis.annotations.Mapper;
|
||||
import org.apache.ibatis.annotations.ResultType;
|
||||
import org.apache.ibatis.annotations.SelectProvider;
|
||||
import org.apache.ibatis.annotations.Update;
|
||||
import org.apache.ibatis.annotations.UpdateProvider;
|
||||
import org.apache.ibatis.type.JdbcType;
|
||||
import org.mybatis.dynamic.sql.select.MyBatis3SelectModelAdapter;
|
||||
|
@ -28,11 +29,11 @@ import org.mybatis.dynamic.sql.update.UpdateDSL;
|
|||
import org.mybatis.dynamic.sql.update.render.UpdateStatementProvider;
|
||||
import org.mybatis.dynamic.sql.util.SqlProviderAdapter;
|
||||
|
||||
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator.IndicatorType;
|
||||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientIndicatorRecordDynamicSqlSupport;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator.ClientIndicatorType;
|
||||
|
||||
@Mapper
|
||||
public interface ClientPingMapper {
|
||||
public interface ClientIndicatorValueMapper {
|
||||
|
||||
@SelectProvider(type = SqlProviderAdapter.class, method = "select")
|
||||
@ConstructorArgs({ @Arg(column = "value", javaType = Long.class, jdbcType = JdbcType.BIGINT) })
|
||||
|
@ -50,14 +51,14 @@ public interface ClientPingMapper {
|
|||
int update(UpdateStatementProvider updateStatement);
|
||||
|
||||
@SelectProvider(type = SqlProviderAdapter.class, method = "select")
|
||||
@ResultType(ClientLastPingRecord.class)
|
||||
@ResultType(ClientIndicatorValueRecord.class)
|
||||
@ConstructorArgs({
|
||||
@Arg(column = "id", javaType = Long.class, jdbcType = JdbcType.BIGINT),
|
||||
@Arg(column = "value", javaType = Long.class, jdbcType = JdbcType.BIGINT)
|
||||
})
|
||||
Collection<ClientLastPingRecord> selectMany(SelectStatementProvider select);
|
||||
Collection<ClientIndicatorValueRecord> selectMany(SelectStatementProvider select);
|
||||
|
||||
default Long selectPingTimeByPrimaryKey(final Long id_) {
|
||||
default Long selectValueByPrimaryKey(final Long id_) {
|
||||
return SelectDSL.selectWithMapper(
|
||||
this::selectPingTime,
|
||||
value.as("value"))
|
||||
|
@ -67,18 +68,29 @@ public interface ClientPingMapper {
|
|||
.execute();
|
||||
}
|
||||
|
||||
default Long indicatorRecordIdByConnectionId(final Long connectionId, final IndicatorType indicatorType) {
|
||||
return SelectDSL.selectDistinctWithMapper(
|
||||
this::selectPK,
|
||||
id.as("id"))
|
||||
.from(clientIndicatorRecord)
|
||||
.where(clientConnectionId, isEqualTo(connectionId))
|
||||
.and(type, isEqualTo(indicatorType.id))
|
||||
.build()
|
||||
.execute();
|
||||
}
|
||||
|
||||
default Long pingRecordIdByConnectionId(final Long connectionId) {
|
||||
return SelectDSL.selectDistinctWithMapper(
|
||||
this::selectPK,
|
||||
id.as("id"))
|
||||
.from(clientIndicatorRecord)
|
||||
.where(clientConnectionId, isEqualTo(connectionId))
|
||||
.and(type, isEqualTo(ClientIndicatorType.LAST_PING.id))
|
||||
.and(type, isEqualTo(IndicatorType.LAST_PING.id))
|
||||
.build()
|
||||
.execute();
|
||||
}
|
||||
|
||||
default QueryExpressionDSL<MyBatis3SelectModelAdapter<Collection<ClientLastPingRecord>>> selectByExample() {
|
||||
default QueryExpressionDSL<MyBatis3SelectModelAdapter<Collection<ClientIndicatorValueRecord>>> selectByExample() {
|
||||
|
||||
return SelectDSL.selectWithMapper(
|
||||
this::selectMany,
|
||||
|
@ -87,25 +99,31 @@ public interface ClientPingMapper {
|
|||
.from(ClientIndicatorRecordDynamicSqlSupport.clientIndicatorRecord);
|
||||
}
|
||||
|
||||
default int updatePingTime(final Long _id, final Long pingTime) {
|
||||
@Update("UPDATE client_indicator SET value = value + 1 WHERE id =#{pk}")
|
||||
int incrementIndicatorValue(final Long pk);
|
||||
|
||||
@Update("UPDATE client_indicator SET value = value - 1 WHERE id =#{pk}")
|
||||
int decrementIndicatorValue(final Long pk);
|
||||
|
||||
default int updateIndicatorValue(final Long pk, final Long v) {
|
||||
return UpdateDSL.updateWithMapper(this::update, clientIndicatorRecord)
|
||||
.set(value).equalTo(pingTime)
|
||||
.where(id, isEqualTo(_id))
|
||||
.set(value).equalTo(v)
|
||||
.where(id, isEqualTo(pk))
|
||||
.build()
|
||||
.execute();
|
||||
}
|
||||
|
||||
final class ClientLastPingRecord {
|
||||
final class ClientIndicatorValueRecord {
|
||||
|
||||
public final Long id;
|
||||
public final Long lastPingTime;
|
||||
public final Long indicatorValue;
|
||||
|
||||
public ClientLastPingRecord(
|
||||
public ClientIndicatorValueRecord(
|
||||
final Long id,
|
||||
final Long value) {
|
||||
|
||||
this.id = id;
|
||||
this.lastPingTime = value;
|
||||
this.indicatorValue = value;
|
||||
}
|
||||
}
|
||||
|
|
@ -10,12 +10,14 @@ package ch.ethz.seb.sebserver.webservice.servicelayer.session;
|
|||
|
||||
import java.util.Set;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
|
||||
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator;
|
||||
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator.IndicatorType;
|
||||
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent;
|
||||
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
|
||||
import ch.ethz.seb.sebserver.gbl.model.session.IndicatorValue;
|
||||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator.ClientIndicatorType;
|
||||
|
||||
/** A client indicator is a indicator value holder for a specific Indicator
|
||||
* on a running client connection.
|
||||
|
@ -32,10 +34,9 @@ public interface ClientIndicator extends IndicatorValue {
|
|||
* @param cachingEnabled defines whether indicator value caching is enabled or not. */
|
||||
void init(Indicator indicatorDefinition, Long connectionId, boolean active, boolean cachingEnabled);
|
||||
|
||||
/** Get the client indicator type
|
||||
*
|
||||
* @return the client indicator type */
|
||||
ClientIndicatorType indicatorType();
|
||||
/** get the indicator type */
|
||||
@JsonIgnore
|
||||
IndicatorType getType();
|
||||
|
||||
/** Get the exam identifier of the client connection of this ClientIndicator
|
||||
*
|
||||
|
|
|
@ -48,7 +48,7 @@ import ch.ethz.seb.sebserver.webservice.servicelayer.session.ExamSessionService;
|
|||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientConnectionService;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientInstructionService;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientNotificationService;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator.DistributedPingService;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator.DistributedIndicatorValueService;
|
||||
import ch.ethz.seb.sebserver.webservice.weblayer.api.APIConstraintViolationException;
|
||||
|
||||
@Lazy
|
||||
|
@ -74,7 +74,8 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
|
|||
private final SEBClientInstructionService sebInstructionService;
|
||||
private final SEBClientNotificationService sebClientNotificationService;
|
||||
private final ExamAdminService examAdminService;
|
||||
private final DistributedPingService distributedPingCache;
|
||||
// TODO get rid of this dependency and use application events for signaling client connection state changes
|
||||
private final DistributedIndicatorValueService distributedPingCache;
|
||||
private final Executor indicatorUpdateExecutor;
|
||||
private final boolean isDistributedSetup;
|
||||
|
||||
|
@ -85,7 +86,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
|
|||
final SEBClientInstructionService sebInstructionService,
|
||||
final SEBClientNotificationService sebClientNotificationService,
|
||||
final ExamAdminService examAdminService,
|
||||
final DistributedPingService distributedPingCache,
|
||||
final DistributedIndicatorValueService distributedPingCache,
|
||||
@Qualifier(AsyncServiceSpringConfig.EXAM_API_EXECUTOR_BEAN_NAME) final Executor indicatorUpdateExecutor) {
|
||||
|
||||
this.examSessionService = examSessionService;
|
||||
|
@ -495,7 +496,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
|
|||
// delete stored ping if this is a distributed setup
|
||||
if (this.isDistributedSetup) {
|
||||
this.distributedPingCache
|
||||
.deletePingIndicator(updatedClientConnection.id);
|
||||
.deleteIndicatorValues(updatedClientConnection.id);
|
||||
}
|
||||
|
||||
reloadConnectionCache(connectionToken);
|
||||
|
@ -549,7 +550,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
|
|||
// delete stored ping if this is a distributed setup
|
||||
if (this.isDistributedSetup) {
|
||||
this.distributedPingCache
|
||||
.deletePingIndicator(updatedClientConnection.id);
|
||||
.deleteIndicatorValues(updatedClientConnection.id);
|
||||
}
|
||||
|
||||
reloadConnectionCache(connectionToken);
|
||||
|
@ -637,7 +638,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
|
|||
activeClientConnection.getConnectionId()));
|
||||
|
||||
this.indicatorUpdateExecutor
|
||||
.execute(() -> updateIndicator(connectionToken, event, activeClientConnection));
|
||||
.execute(() -> handleEvent(connectionToken, event, activeClientConnection));
|
||||
|
||||
} else {
|
||||
log.warn("No active ClientConnection found for connectionToken: {}", connectionToken);
|
||||
|
@ -647,7 +648,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
|
|||
}
|
||||
}
|
||||
|
||||
private void updateIndicator(
|
||||
private void handleEvent(
|
||||
final String connectionToken,
|
||||
final ClientEvent event,
|
||||
final ClientConnectionDataInternal activeClientConnection) {
|
||||
|
@ -827,17 +828,18 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
|
|||
|
||||
// store event and and flush cache
|
||||
this.eventHandlingStrategy.accept(clientEventRecord);
|
||||
|
||||
// update indicators
|
||||
if (clientEventRecord.getType() != null && EventType.ERROR_LOG.id == clientEventRecord.getType()) {
|
||||
connection.getIndicatorMapping(EventType.ERROR_LOG)
|
||||
.forEach(indicator -> indicator.notifyValueChange(clientEventRecord));
|
||||
}
|
||||
|
||||
if (this.isDistributedSetup) {
|
||||
// mark for update and flush the cache
|
||||
this.clientConnectionDAO.save(connection.clientConnection);
|
||||
this.examSessionCacheService.evictClientConnection(
|
||||
connection.clientConnection.connectionToken);
|
||||
} else {
|
||||
// update indicators
|
||||
if (clientEventRecord.getType() != null && EventType.ERROR_LOG.id == clientEventRecord.getType()) {
|
||||
connection.getIndicatorMapping(EventType.ERROR_LOG)
|
||||
.forEach(indicator -> indicator.notifyValueChange(clientEventRecord));
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
|
|
@ -8,22 +8,35 @@
|
|||
|
||||
package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator;
|
||||
|
||||
import org.joda.time.DateTimeUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator;
|
||||
import ch.ethz.seb.sebserver.gbl.util.Utils;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.ClientIndicator;
|
||||
|
||||
public abstract class AbstractClientIndicator implements ClientIndicator {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(AbstractClientIndicator.class);
|
||||
|
||||
protected final DistributedIndicatorValueService distributedPingCache;
|
||||
|
||||
protected Long indicatorId;
|
||||
protected Long examId;
|
||||
protected Long connectionId;
|
||||
protected boolean cachingEnabled;
|
||||
protected boolean active = true;
|
||||
|
||||
protected boolean valueInitializes = false;
|
||||
protected Long ditributedIndicatorValueRecordId = null;
|
||||
|
||||
protected boolean initialized = false;
|
||||
protected double currentValue = Double.NaN;
|
||||
|
||||
public AbstractClientIndicator(final DistributedIndicatorValueService distributedPingCache) {
|
||||
super();
|
||||
this.distributedPingCache = distributedPingCache;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(
|
||||
final Indicator indicatorDefinition,
|
||||
|
@ -40,6 +53,40 @@ public abstract class AbstractClientIndicator implements ClientIndicator {
|
|||
this.connectionId = connectionId;
|
||||
this.active = active;
|
||||
this.cachingEnabled = cachingEnabled;
|
||||
|
||||
if (!this.cachingEnabled && this.active) {
|
||||
try {
|
||||
this.ditributedIndicatorValueRecordId = this.distributedPingCache.initIndicatorForConnection(
|
||||
connectionId,
|
||||
getType(),
|
||||
initValue());
|
||||
} catch (final Exception e) {
|
||||
tryRecoverIndicatorRecord();
|
||||
}
|
||||
}
|
||||
|
||||
this.currentValue = computeValueAt(Utils.getMillisecondsNow());
|
||||
this.initialized = true;
|
||||
}
|
||||
|
||||
protected long initValue() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
protected void tryRecoverIndicatorRecord() {
|
||||
|
||||
if (log.isWarnEnabled()) {
|
||||
log.warn("*** Missing indicator value record for connection: {}. Try to recover...", this.connectionId);
|
||||
}
|
||||
|
||||
try {
|
||||
this.ditributedIndicatorValueRecordId = this.distributedPingCache.initIndicatorForConnection(
|
||||
this.connectionId,
|
||||
getType(),
|
||||
initValue());
|
||||
} catch (final Exception e) {
|
||||
log.error("Failed to recover indicator value record for connection: {}", this.connectionId, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -58,22 +105,11 @@ public abstract class AbstractClientIndicator implements ClientIndicator {
|
|||
}
|
||||
|
||||
public void reset() {
|
||||
this.currentValue = Double.NaN;
|
||||
this.valueInitializes = false;
|
||||
this.currentValue = computeValueAt(Utils.getMillisecondsNow());
|
||||
}
|
||||
|
||||
@Override
|
||||
public double getValue() {
|
||||
final long now = DateTimeUtils.currentTimeMillis();
|
||||
if (!this.valueInitializes) {
|
||||
this.currentValue = computeValueAt(now);
|
||||
this.valueInitializes = true;
|
||||
}
|
||||
|
||||
if (!this.cachingEnabled && this.active) {
|
||||
this.currentValue = computeValueAt(now);
|
||||
}
|
||||
|
||||
return this.currentValue;
|
||||
}
|
||||
|
||||
|
|
|
@ -30,10 +30,11 @@ public abstract class AbstractLogIndicator extends AbstractClientIndicator {
|
|||
protected final List<Integer> eventTypeIds;
|
||||
protected String[] tags;
|
||||
|
||||
protected long lastDistributedUpdate = 0L;
|
||||
|
||||
protected AbstractLogIndicator(final EventType... eventTypes) {
|
||||
protected AbstractLogIndicator(
|
||||
final DistributedIndicatorValueService distributedPingCache,
|
||||
final EventType... eventTypes) {
|
||||
|
||||
super(distributedPingCache);
|
||||
this.observed = Collections.unmodifiableSet(EnumSet.of(eventTypes[0], eventTypes));
|
||||
this.eventTypeIds = Utils.immutableListOf(Arrays.stream(eventTypes)
|
||||
.map(et -> et.id)
|
||||
|
@ -50,6 +51,7 @@ public abstract class AbstractLogIndicator extends AbstractClientIndicator {
|
|||
|
||||
super.init(indicatorDefinition, connectionId, active, cachingEnabled);
|
||||
|
||||
// init tags
|
||||
if (indicatorDefinition == null || StringUtils.isBlank(indicatorDefinition.tags)) {
|
||||
this.tags = null;
|
||||
} else {
|
||||
|
@ -79,12 +81,4 @@ public abstract class AbstractLogIndicator extends AbstractClientIndicator {
|
|||
return this.observed;
|
||||
}
|
||||
|
||||
protected boolean loadFromPersistent(final long timestamp) {
|
||||
if (!super.valueInitializes) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return timestamp - this.lastDistributedUpdate > DISTRIBUTED_LOG_UPDATE_INTERVAL;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -29,10 +29,11 @@ public abstract class AbstractLogLevelCountIndicator extends AbstractLogIndicato
|
|||
protected final ClientEventRecordMapper clientEventRecordMapper;
|
||||
|
||||
protected AbstractLogLevelCountIndicator(
|
||||
final DistributedIndicatorValueService distributedPingCache,
|
||||
final ClientEventRecordMapper clientEventRecordMapper,
|
||||
final EventType... eventTypes) {
|
||||
|
||||
super(eventTypes);
|
||||
super(distributedPingCache, eventTypes);
|
||||
this.clientEventRecordMapper = clientEventRecordMapper;
|
||||
}
|
||||
|
||||
|
@ -47,9 +48,10 @@ public abstract class AbstractLogLevelCountIndicator extends AbstractLogIndicato
|
|||
}
|
||||
|
||||
private void valueChanged(final String eventText) {
|
||||
if (this.tags == null || this.tags.length == 0) {
|
||||
this.currentValue = getValue() + 1d;
|
||||
} else if (hasTag(eventText)) {
|
||||
if (this.tags == null || this.tags.length == 0 || hasTag(eventText)) {
|
||||
if (super.ditributedIndicatorValueRecordId != null) {
|
||||
this.distributedPingCache.incrementIndicatorValue(super.ditributedIndicatorValueRecordId);
|
||||
}
|
||||
this.currentValue = getValue() + 1d;
|
||||
}
|
||||
}
|
||||
|
@ -57,15 +59,9 @@ public abstract class AbstractLogLevelCountIndicator extends AbstractLogIndicato
|
|||
@Override
|
||||
public double computeValueAt(final long timestamp) {
|
||||
|
||||
if (!loadFromPersistent(timestamp)) {
|
||||
return super.currentValue;
|
||||
}
|
||||
|
||||
// TODO do this within a better reactive way like ping updates
|
||||
|
||||
try {
|
||||
|
||||
final Long errors = this.clientEventRecordMapper
|
||||
final Long numberOfLogs = this.clientEventRecordMapper
|
||||
.countByExample()
|
||||
.where(ClientEventRecordDynamicSqlSupport.clientConnectionId, isEqualTo(this.connectionId))
|
||||
.and(ClientEventRecordDynamicSqlSupport.type, isIn(this.eventTypeIds))
|
||||
|
@ -77,13 +73,16 @@ public abstract class AbstractLogLevelCountIndicator extends AbstractLogIndicato
|
|||
.build()
|
||||
.execute();
|
||||
|
||||
return errors.doubleValue();
|
||||
// update active indicator value record on persistent when caching is not enabled
|
||||
if (!this.cachingEnabled && this.active && this.ditributedIndicatorValueRecordId != null) {
|
||||
this.distributedPingCache.updateIndicatorValue(this.connectionId, numberOfLogs.longValue());
|
||||
}
|
||||
|
||||
return numberOfLogs.doubleValue();
|
||||
|
||||
} catch (final Exception e) {
|
||||
log.error("Failed to get indicator count from persistent storage: ", e);
|
||||
return super.currentValue;
|
||||
} finally {
|
||||
super.lastDistributedUpdate = timestamp;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -32,10 +32,11 @@ public abstract class AbstractLogNumberIndicator extends AbstractLogIndicator {
|
|||
protected final ClientEventRecordMapper clientEventRecordMapper;
|
||||
|
||||
protected AbstractLogNumberIndicator(
|
||||
final DistributedIndicatorValueService distributedPingCache,
|
||||
final ClientEventRecordMapper clientEventRecordMapper,
|
||||
final EventType... eventTypes) {
|
||||
|
||||
super(eventTypes);
|
||||
super(distributedPingCache, eventTypes);
|
||||
this.clientEventRecordMapper = clientEventRecordMapper;
|
||||
}
|
||||
|
||||
|
@ -53,9 +54,15 @@ public abstract class AbstractLogNumberIndicator extends AbstractLogIndicator {
|
|||
}
|
||||
|
||||
private void valueChanged(final String text, final double value) {
|
||||
if (this.tags == null || this.tags.length == 0) {
|
||||
this.currentValue = value;
|
||||
} else if (hasTag(text)) {
|
||||
|
||||
this.currentValue = getValue() + 1d;
|
||||
|
||||
if (this.tags == null || this.tags.length == 0 || hasTag(text)) {
|
||||
if (super.ditributedIndicatorValueRecordId != null) {
|
||||
this.distributedPingCache.updateIndicatorValueAsync(
|
||||
this.ditributedIndicatorValueRecordId,
|
||||
Double.valueOf(value).longValue());
|
||||
}
|
||||
this.currentValue = value;
|
||||
}
|
||||
}
|
||||
|
@ -63,12 +70,6 @@ public abstract class AbstractLogNumberIndicator extends AbstractLogIndicator {
|
|||
@Override
|
||||
public double computeValueAt(final long timestamp) {
|
||||
|
||||
if (!loadFromPersistent(timestamp)) {
|
||||
return super.currentValue;
|
||||
}
|
||||
|
||||
// TODO do this within a better reactive way like ping updates
|
||||
|
||||
try {
|
||||
|
||||
final List<ClientEventRecord> execute = this.clientEventRecordMapper.selectByExample()
|
||||
|
@ -89,6 +90,12 @@ public abstract class AbstractLogNumberIndicator extends AbstractLogIndicator {
|
|||
|
||||
final BigDecimal numericValue = execute.get(execute.size() - 1).getNumericValue();
|
||||
if (numericValue != null) {
|
||||
|
||||
// update active indicator value record on persistent when caching is not enabled
|
||||
if (!this.cachingEnabled && this.active && this.ditributedIndicatorValueRecordId != null) {
|
||||
this.distributedPingCache.updateIndicatorValue(this.connectionId, numericValue.longValue());
|
||||
}
|
||||
|
||||
return numericValue.doubleValue();
|
||||
} else {
|
||||
return super.currentValue;
|
||||
|
@ -97,8 +104,6 @@ public abstract class AbstractLogNumberIndicator extends AbstractLogIndicator {
|
|||
} catch (final Exception e) {
|
||||
log.error("Failed to get indicator number from persistent storage: {}", e.getMessage());
|
||||
return this.currentValue;
|
||||
} finally {
|
||||
super.lastDistributedUpdate = timestamp;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -12,25 +12,15 @@ import java.util.Collections;
|
|||
import java.util.EnumSet;
|
||||
import java.util.Set;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator;
|
||||
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator.DistributedPingService.PingUpdate;
|
||||
|
||||
public abstract class AbstractPingIndicator extends AbstractClientIndicator {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(AbstractPingIndicator.class);
|
||||
|
||||
private final Set<EventType> EMPTY_SET = Collections.unmodifiableSet(EnumSet.noneOf(EventType.class));
|
||||
|
||||
protected final DistributedPingService distributedPingCache;
|
||||
protected PingUpdate pingUpdate = null;
|
||||
|
||||
protected AbstractPingIndicator(final DistributedPingService distributedPingCache) {
|
||||
super();
|
||||
this.distributedPingCache = distributedPingCache;
|
||||
protected AbstractPingIndicator(final DistributedIndicatorValueService distributedPingCache) {
|
||||
super(distributedPingCache);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -41,14 +31,6 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator {
|
|||
final boolean cachingEnabled) {
|
||||
|
||||
super.init(indicatorDefinition, connectionId, active, cachingEnabled);
|
||||
|
||||
if (!this.cachingEnabled && this.active) {
|
||||
try {
|
||||
this.pingUpdate = this.distributedPingCache.createPingUpdate(connectionId);
|
||||
} catch (final Exception e) {
|
||||
this.pingUpdate = this.distributedPingCache.createPingUpdate(connectionId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public final void notifyPing(final long timestamp, final int pingNumber) {
|
||||
|
@ -56,30 +38,14 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator {
|
|||
|
||||
if (!this.cachingEnabled) {
|
||||
|
||||
if (this.pingUpdate == null) {
|
||||
tryRecoverPingRecord();
|
||||
if (this.pingUpdate == null) {
|
||||
if (super.ditributedIndicatorValueRecordId == null) {
|
||||
tryRecoverIndicatorRecord();
|
||||
if (this.ditributedIndicatorValueRecordId == null) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
this.distributedPingCache.updatePingAsync(this.pingUpdate);
|
||||
}
|
||||
}
|
||||
|
||||
private void tryRecoverPingRecord() {
|
||||
|
||||
if (log.isWarnEnabled()) {
|
||||
log.warn("*** Missing ping record for connection: {}. Try to recover...", this.connectionId);
|
||||
}
|
||||
|
||||
try {
|
||||
this.pingUpdate = this.distributedPingCache.createPingUpdate(this.connectionId);
|
||||
if (this.pingUpdate == null) {
|
||||
this.pingUpdate = this.distributedPingCache.createPingUpdate(this.connectionId);
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
log.error("Failed to recover ping record for connection: {}", this.connectionId, e);
|
||||
this.distributedPingCache.updatePingAsync(this.ditributedIndicatorValueRecordId);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -24,8 +24,11 @@ import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecord
|
|||
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
||||
public class BatteryStatusIndicator extends AbstractLogNumberIndicator {
|
||||
|
||||
protected BatteryStatusIndicator(final ClientEventRecordMapper clientEventRecordMapper) {
|
||||
super(clientEventRecordMapper, EventType.INFO_LOG);
|
||||
protected BatteryStatusIndicator(
|
||||
final DistributedIndicatorValueService distributedPingCache,
|
||||
final ClientEventRecordMapper clientEventRecordMapper) {
|
||||
|
||||
super(distributedPingCache, clientEventRecordMapper, EventType.INFO_LOG);
|
||||
super.tags = new String[] { API.LOG_EVENT_TAG_BATTERY_STATUS };
|
||||
}
|
||||
|
||||
|
@ -45,9 +48,4 @@ public class BatteryStatusIndicator extends AbstractLogNumberIndicator {
|
|||
return IndicatorType.BATTERY_STATUS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientIndicatorType indicatorType() {
|
||||
return ClientIndicatorType.BATTERY_STATUS;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,39 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2021 ETH Zürich, Educational Development and Technology (LET)
|
||||
*
|
||||
* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
*/
|
||||
|
||||
package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator;
|
||||
|
||||
|
||||
public enum ClientIndicatorType {
|
||||
UNKNOWN(0),
|
||||
LAST_PING(1),
|
||||
ERROR_LOG_COUNT(2),
|
||||
WARN_LOG_COUNT(3),
|
||||
INFO_LOG_COUNT(4),
|
||||
WLAN_STATUS(5),
|
||||
BATTERY_STATUS(5),
|
||||
|
||||
|
||||
;
|
||||
|
||||
public final int id;
|
||||
|
||||
ClientIndicatorType(final int id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public static ClientIndicatorType byId(final int id) {
|
||||
for (final ClientIndicatorType status : ClientIndicatorType.values()) {
|
||||
if (status.id == id) {
|
||||
return status;
|
||||
}
|
||||
}
|
||||
|
||||
return UNKNOWN;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,362 @@
|
|||
/*
|
||||
* Copyright (c) 2021 ETH Zürich, Educational Development and Technology (LET)
|
||||
*
|
||||
* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
*/
|
||||
|
||||
package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator;
|
||||
|
||||
import static org.mybatis.dynamic.sql.SqlBuilder.isEqualTo;
|
||||
import static org.mybatis.dynamic.sql.SqlBuilder.isIn;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.scheduling.TaskScheduler;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import ch.ethz.seb.sebserver.SEBServerInit;
|
||||
import ch.ethz.seb.sebserver.SEBServerInitEvent;
|
||||
import ch.ethz.seb.sebserver.gbl.async.AsyncServiceSpringConfig;
|
||||
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator.IndicatorType;
|
||||
import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile;
|
||||
import ch.ethz.seb.sebserver.gbl.util.Utils;
|
||||
import ch.ethz.seb.sebserver.webservice.WebserviceInfo;
|
||||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.ClientIndicatorValueMapper;
|
||||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.ClientIndicatorValueMapper.ClientIndicatorValueRecord;
|
||||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientIndicatorRecordDynamicSqlSupport;
|
||||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientIndicatorRecordMapper;
|
||||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientIndicatorRecord;
|
||||
|
||||
@Lazy
|
||||
@Component
|
||||
@WebServiceProfile
|
||||
/** This service is only needed within a distributed setup where more then one webservice works
|
||||
* simultaneously within one SEB Server and one persistent storage.
|
||||
* </p>
|
||||
* This service handles the SEB client indicator updates within such a setup and implements functionality to
|
||||
* efficiently store and load indicator values from and to shared store.
|
||||
* </p>
|
||||
* The update from the persistent store is done done periodically within a batch while the indicator value writes
|
||||
* are done individually per SEB client when they arrive. The update can be done within a dedicated task executor with
|
||||
* minimal task
|
||||
* queue to do not overflow other executor services when it comes to a leak on storing lot of ping times for example.
|
||||
* In this case some ping time updates will be just dropped and not go to the persistent store until the leak
|
||||
* is resolved. */
|
||||
public class DistributedIndicatorValueService implements DisposableBean {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(DistributedIndicatorValueService.class);
|
||||
|
||||
private final Executor indicatorValueUpdateExecutor;
|
||||
private final ClientIndicatorRecordMapper clientIndicatorRecordMapper;
|
||||
private final ClientIndicatorValueMapper clientIndicatorValueMapper;
|
||||
private long updateTolerance;
|
||||
|
||||
private ScheduledFuture<?> taskRef;
|
||||
private final Map<Long, Long> indicatorValueCache = new ConcurrentHashMap<>();
|
||||
private long lastUpdate = 0L;
|
||||
|
||||
public DistributedIndicatorValueService(
|
||||
@Qualifier(AsyncServiceSpringConfig.EXAM_API_PING_SERVICE_EXECUTOR_BEAN_NAME) final Executor pingUpdateExecutor,
|
||||
final ClientIndicatorRecordMapper clientIndicatorRecordMapper,
|
||||
final ClientIndicatorValueMapper clientIndicatorValueMapper) {
|
||||
|
||||
this.indicatorValueUpdateExecutor = pingUpdateExecutor;
|
||||
this.clientIndicatorRecordMapper = clientIndicatorRecordMapper;
|
||||
this.clientIndicatorValueMapper = clientIndicatorValueMapper;
|
||||
}
|
||||
|
||||
/** Initializes the service by attaching it to the scheduler for periodical update.
|
||||
* If the webservice is not initialized within a distributed setup, this will do nothing
|
||||
*
|
||||
* @param initEvent the SEB Server webservice init event */
|
||||
@EventListener(SEBServerInitEvent.class)
|
||||
public void init(final SEBServerInitEvent initEvent) {
|
||||
final ApplicationContext applicationContext = initEvent.webserviceInit.getApplicationContext();
|
||||
final WebserviceInfo webserviceInfo = applicationContext.getBean(WebserviceInfo.class);
|
||||
if (webserviceInfo.isDistributed()) {
|
||||
|
||||
SEBServerInit.INIT_LOGGER.info("------>");
|
||||
SEBServerInit.INIT_LOGGER.info("------> Activate distributed indicator value service:");
|
||||
|
||||
final TaskScheduler taskScheduler = applicationContext.getBean(TaskScheduler.class);
|
||||
final long distributedUpdateInterval = webserviceInfo.getDistributedUpdateInterval();
|
||||
this.updateTolerance = distributedUpdateInterval * 2 / 3;
|
||||
|
||||
SEBServerInit.INIT_LOGGER.info("------> with distributedUpdateInterval: {}",
|
||||
distributedUpdateInterval);
|
||||
SEBServerInit.INIT_LOGGER.info("------> with taskScheduler: {}", taskScheduler);
|
||||
|
||||
try {
|
||||
|
||||
this.taskRef = taskScheduler.scheduleAtFixedRate(
|
||||
this::updateIndicatorValueCache,
|
||||
distributedUpdateInterval);
|
||||
|
||||
SEBServerInit.INIT_LOGGER.info("------> distributed indicator value service successfully initialized!");
|
||||
|
||||
} catch (final Exception e) {
|
||||
SEBServerInit.INIT_LOGGER.error("------> Failed to initialize distributed indicator value service:", e);
|
||||
log.error("Failed to initialize distributed indicator value cache update task");
|
||||
this.taskRef = null;
|
||||
}
|
||||
} else {
|
||||
this.taskRef = null;
|
||||
}
|
||||
}
|
||||
|
||||
/** This initializes a SEB client indicator on the persistent storage for a given SEB client
|
||||
* connection identifier and of given IndicatorType.
|
||||
* If there is already such an indicator for the specified SEB client connection identifier and type,
|
||||
* this returns the id of the existing one.
|
||||
*
|
||||
* @param connectionId SEB client connection identifier
|
||||
* @param type indicator type
|
||||
* @param value the initial indicator value
|
||||
* @return SEB client indicator value identifier (PK) */
|
||||
@Transactional
|
||||
public Long initIndicatorForConnection(
|
||||
final Long connectionId,
|
||||
final IndicatorType type,
|
||||
final Long value) {
|
||||
|
||||
try {
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.trace("*** Initialize indicator value record for SEB connection: {}", connectionId);
|
||||
}
|
||||
|
||||
final Long recordId = this.clientIndicatorValueMapper
|
||||
.indicatorRecordIdByConnectionId(connectionId, type);
|
||||
|
||||
if (recordId == null) {
|
||||
final ClientIndicatorRecord clientEventRecord = new ClientIndicatorRecord(
|
||||
null, connectionId, type.id, value, null);
|
||||
|
||||
this.clientIndicatorRecordMapper.insert(clientEventRecord);
|
||||
|
||||
try {
|
||||
// This also double-check by trying again. If we have more then one entry here
|
||||
// this will throw an exception that causes a rollback
|
||||
return this.clientIndicatorValueMapper
|
||||
.indicatorRecordIdByConnectionId(connectionId, type);
|
||||
|
||||
} catch (final Exception e) {
|
||||
|
||||
log.warn(
|
||||
"Detected multiple client indicator entries for connection: {} and type: {}. Force rollback to prevent",
|
||||
connectionId, type);
|
||||
|
||||
// force rollback
|
||||
throw new RuntimeException("Detected multiple client indicator value entries");
|
||||
}
|
||||
}
|
||||
|
||||
return recordId;
|
||||
} catch (final Exception e) {
|
||||
|
||||
log.error("Failed to initialize indicator value for connection -> {}", connectionId, e);
|
||||
|
||||
// force rollback
|
||||
throw new RuntimeException("Failed to initialize indicator value for connection -> " + connectionId, e);
|
||||
}
|
||||
}
|
||||
|
||||
/** Deletes a existing SEB client indicator value record for a given SEB client connection identifier
|
||||
* on the persistent storage.
|
||||
*
|
||||
* @param connectionId SEB client connection identifier */
|
||||
@Transactional
|
||||
public void deleteIndicatorValues(final Long connectionId) {
|
||||
try {
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("*** Delete indicator value record for SEB connection: {}", connectionId);
|
||||
}
|
||||
|
||||
final Collection<ClientIndicatorValueRecord> records = this.clientIndicatorValueMapper
|
||||
.selectByExample()
|
||||
.where(ClientIndicatorRecordDynamicSqlSupport.clientConnectionId, isEqualTo(connectionId))
|
||||
.build()
|
||||
.execute();
|
||||
|
||||
if (records == null || records.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
final List<Long> toDelete = records.stream().map(rec -> {
|
||||
this.indicatorValueCache.remove(rec.id);
|
||||
return rec.id;
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
this.clientIndicatorRecordMapper
|
||||
.deleteByExample()
|
||||
.where(ClientIndicatorRecordDynamicSqlSupport.id, isIn(toDelete))
|
||||
.build()
|
||||
.execute();
|
||||
|
||||
} catch (final Exception e) {
|
||||
log.error("Failed to delete indicator value for connection -> {}", connectionId, e);
|
||||
try {
|
||||
log.info(
|
||||
"Because of failed indicator value record deletion, "
|
||||
+ "flushing the indicator value cache to ensure no dead connections remain in the cache");
|
||||
this.indicatorValueCache.clear();
|
||||
} catch (final Exception ee) {
|
||||
log.error("Failed to force flushing the indicator value cache: ", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Use this to get the last indicator value with a given indicator identifier (PK)
|
||||
* This fist tries to get the indicator value from internal cache. If not present, tries to get
|
||||
* the indicator value from persistent storage and put it to the cache.
|
||||
*
|
||||
* @param indicatorPK The indicator record id (PK).
|
||||
* @return The actual (last) indicator value. */
|
||||
public Long getIndicatorValue(final Long indicatorPK) {
|
||||
try {
|
||||
|
||||
Long value = this.indicatorValueCache.get(indicatorPK);
|
||||
if (value == null) {
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("*** Get and cache ping time: {}", indicatorPK);
|
||||
}
|
||||
|
||||
value = this.clientIndicatorValueMapper.selectValueByPrimaryKey(indicatorPK);
|
||||
}
|
||||
|
||||
return value;
|
||||
} catch (final Exception e) {
|
||||
log.error("Error while trying to get last indicator value from storage: {}", e.getMessage());
|
||||
return 0L;
|
||||
}
|
||||
}
|
||||
|
||||
/** Updates the internal indicator value cache by loading all actual SEB client indicators from persistent storage
|
||||
* and put it in the cache.
|
||||
* This is internally periodically scheduled by the task scheduler but also implements an execution drop if
|
||||
* the last update was less then 2/3 of the schedule interval ago. This is to prevent task queue overflows
|
||||
* and wait with update when there is a persistent storage leak or a lot of network latency. */
|
||||
private void updateIndicatorValueCache() {
|
||||
if (this.indicatorValueCache.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
final long millisecondsNow = Utils.getMillisecondsNow();
|
||||
if (millisecondsNow - this.lastUpdate < this.updateTolerance) {
|
||||
log.warn("Skip indicator value update schedule because the last one was less then 2 seconds ago");
|
||||
return;
|
||||
}
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.trace("*** Update distributed indicator value cache: {}", this.indicatorValueCache);
|
||||
}
|
||||
|
||||
try {
|
||||
|
||||
final Map<Long, Long> mapping = this.clientIndicatorValueMapper
|
||||
.selectByExample()
|
||||
.build()
|
||||
.execute()
|
||||
.stream()
|
||||
.collect(Collectors.toMap(entry -> entry.id, entry -> entry.indicatorValue));
|
||||
|
||||
if (mapping != null) {
|
||||
this.indicatorValueCache.clear();
|
||||
this.indicatorValueCache.putAll(mapping);
|
||||
this.lastUpdate = millisecondsNow;
|
||||
}
|
||||
|
||||
} catch (final Exception e) {
|
||||
log.error("Error while trying to update distributed indicator value cache: {}", this.indicatorValueCache,
|
||||
e);
|
||||
}
|
||||
|
||||
this.lastUpdate = millisecondsNow;
|
||||
}
|
||||
|
||||
/** Update last ping time on persistent storage asynchronously within a defines thread pool with no
|
||||
* waiting queue to skip further ping updates if all update threads are busy **/
|
||||
void updatePingAsync(final Long pingRecord) {
|
||||
try {
|
||||
this.indicatorValueUpdateExecutor
|
||||
.execute(() -> this.clientIndicatorValueMapper.updateIndicatorValue(
|
||||
pingRecord,
|
||||
Utils.getMillisecondsNow()));
|
||||
} catch (final Exception e) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.warn("Failed to schedule ping task: {}" + e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Update indicator value on persistent storage asynchronously within a defined thread pool with no
|
||||
* waiting queue to skip further indicator value updates if all update threads are busy **/
|
||||
void updateIndicatorValueAsync(final Long pk, final Long value) {
|
||||
try {
|
||||
this.indicatorValueUpdateExecutor
|
||||
.execute(() -> this.clientIndicatorValueMapper.updateIndicatorValue(pk, value));
|
||||
} catch (final Exception e) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.warn("Failed to schedule indicator update task: {}" + e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Update an indicator value within a transaction */
|
||||
@Transactional
|
||||
void updateIndicatorValue(final Long pk, final Long value) {
|
||||
try {
|
||||
this.clientIndicatorValueMapper.updateIndicatorValue(pk, value);
|
||||
} catch (final Exception e) {
|
||||
log.warn("Failed to update indicator value: {}" + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/** Simply increment a given indicator value */
|
||||
void incrementIndicatorValue(final Long pk) {
|
||||
try {
|
||||
this.clientIndicatorValueMapper.incrementIndicatorValue(pk);
|
||||
} catch (final Exception e) {
|
||||
log.warn("Failed to increment indicator value: {}" + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() throws Exception {
|
||||
if (this.taskRef != null) {
|
||||
|
||||
SEBServerInit.INIT_LOGGER.info("----> Shout down distributed indicator service...");
|
||||
|
||||
try {
|
||||
final boolean cancel = this.taskRef.cancel(true);
|
||||
if (!cancel) {
|
||||
log.warn("Failed to cancel distributed indicator cache update task");
|
||||
}
|
||||
|
||||
SEBServerInit.INIT_LOGGER.info("----> Distributed indicator service down");
|
||||
|
||||
} catch (final Exception e) {
|
||||
log.error("Failed to cancel distributed indicator cache update task: ", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,344 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2021 ETH Zürich, Educational Development and Technology (LET)
|
||||
*
|
||||
* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
*/
|
||||
|
||||
package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator;
|
||||
|
||||
import static org.mybatis.dynamic.sql.SqlBuilder.isEqualTo;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.joda.time.DateTimeUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.scheduling.TaskScheduler;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import ch.ethz.seb.sebserver.SEBServerInit;
|
||||
import ch.ethz.seb.sebserver.SEBServerInitEvent;
|
||||
import ch.ethz.seb.sebserver.gbl.async.AsyncServiceSpringConfig;
|
||||
import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile;
|
||||
import ch.ethz.seb.sebserver.gbl.util.Utils;
|
||||
import ch.ethz.seb.sebserver.webservice.WebserviceInfo;
|
||||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.ClientPingMapper;
|
||||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.ClientPingMapper.ClientLastPingRecord;
|
||||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientIndicatorRecordDynamicSqlSupport;
|
||||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientIndicatorRecordMapper;
|
||||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientIndicatorRecord;
|
||||
|
||||
@Lazy
|
||||
@Component
|
||||
@WebServiceProfile
|
||||
/** This service is only needed within a distributed setup where more then one webservice works
|
||||
* simultaneously within one SEB Server and one persistent storage.
|
||||
* </p>
|
||||
* This service handles the SEB client ping updates within such a setup and implements functionality to
|
||||
* efficiently store and load ping time indicators form and to shared store.
|
||||
* </p>
|
||||
* The update from the persistent store is done periodically within a batch while the ping time writes
|
||||
* are done individually per SEB client when they arrive but within a dedicated task executor with minimal task
|
||||
* queue to do not overflow other executor services when it comes to a leak on storing lot of ping times.
|
||||
* In this case some ping time updates will be just dropped and not go to the persistent store until the leak
|
||||
* is resolved.
|
||||
* </p>
|
||||
* Note that the ping time update and read operations are also not within a transaction for performance reasons
|
||||
* and because it is not a big deal to loose one ore two ping updates for a SEB client. */
|
||||
public class DistributedPingService implements DisposableBean {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(DistributedPingService.class);
|
||||
|
||||
private final Executor pingUpdateExecutor;
|
||||
private final ClientIndicatorRecordMapper clientIndicatorRecordMapper;
|
||||
private final ClientPingMapper clientPingMapper;
|
||||
private long pingUpdateTolerance;
|
||||
|
||||
private ScheduledFuture<?> taskRef;
|
||||
private final Map<Long, Long> pingCache = new ConcurrentHashMap<>();
|
||||
private long lastUpdate = 0L;
|
||||
|
||||
public DistributedPingService(
|
||||
@Qualifier(AsyncServiceSpringConfig.EXAM_API_PING_SERVICE_EXECUTOR_BEAN_NAME) final Executor pingUpdateExecutor,
|
||||
final ClientIndicatorRecordMapper clientIndicatorRecordMapper,
|
||||
final ClientPingMapper clientPingMapper) {
|
||||
|
||||
this.pingUpdateExecutor = pingUpdateExecutor;
|
||||
this.clientIndicatorRecordMapper = clientIndicatorRecordMapper;
|
||||
this.clientPingMapper = clientPingMapper;
|
||||
}
|
||||
|
||||
/** Initializes the service by attaching it to the scheduler for periodical update.
|
||||
* If the webservice is not initialized within a distributed setup, this will do nothing
|
||||
*
|
||||
* @param initEvent the SEB Server webservice init event */
|
||||
@EventListener(SEBServerInitEvent.class)
|
||||
public void init(final SEBServerInitEvent initEvent) {
|
||||
final ApplicationContext applicationContext = initEvent.webserviceInit.getApplicationContext();
|
||||
final WebserviceInfo webserviceInfo = applicationContext.getBean(WebserviceInfo.class);
|
||||
if (webserviceInfo.isDistributed()) {
|
||||
|
||||
SEBServerInit.INIT_LOGGER.info("------>");
|
||||
SEBServerInit.INIT_LOGGER.info("------> Activate distributed ping service:");
|
||||
|
||||
final TaskScheduler taskScheduler = applicationContext.getBean(TaskScheduler.class);
|
||||
final long distributedPingUpdateInterval = webserviceInfo.getDistributedPingUpdateInterval();
|
||||
this.pingUpdateTolerance = distributedPingUpdateInterval * 2 / 3;
|
||||
|
||||
SEBServerInit.INIT_LOGGER.info("------> with distributedPingUpdateInterval: {}",
|
||||
distributedPingUpdateInterval);
|
||||
SEBServerInit.INIT_LOGGER.info("------> with taskScheduler: {}", taskScheduler);
|
||||
|
||||
try {
|
||||
this.taskRef = taskScheduler.scheduleAtFixedRate(
|
||||
this::updatePingCache,
|
||||
distributedPingUpdateInterval);
|
||||
|
||||
SEBServerInit.INIT_LOGGER.info("------> distributed ping service successfully initialized!");
|
||||
|
||||
} catch (final Exception e) {
|
||||
SEBServerInit.INIT_LOGGER.error("------> Failed to initialize distributed ping service:", e);
|
||||
log.error("Failed to initialize distributed ping cache update task");
|
||||
this.taskRef = null;
|
||||
}
|
||||
} else {
|
||||
this.taskRef = null;
|
||||
}
|
||||
}
|
||||
|
||||
/** This initializes a SEB client ping indicator on the persistent storage for a given SEB client
|
||||
* connection identifier.
|
||||
* If there is already such a ping indicator for the specified SEB client connection identifier, returns
|
||||
* the id of the existing one.
|
||||
*
|
||||
* @param connectionId SEB client connection identifier
|
||||
* @return SEB client ping indicator identifier (PK) */
|
||||
@Transactional
|
||||
public Long initPingForConnection(final Long connectionId) {
|
||||
try {
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.trace("*** Initialize ping record for SEB connection: {}", connectionId);
|
||||
}
|
||||
|
||||
final Long recordId = this.clientPingMapper
|
||||
.pingRecordIdByConnectionId(connectionId);
|
||||
|
||||
if (recordId == null) {
|
||||
final long millisecondsNow = DateTimeUtils.currentTimeMillis();
|
||||
final ClientIndicatorRecord clientEventRecord = new ClientIndicatorRecord(
|
||||
null, connectionId, ClientIndicatorType.LAST_PING.id, millisecondsNow, null);
|
||||
|
||||
this.clientIndicatorRecordMapper.insert(clientEventRecord);
|
||||
|
||||
try {
|
||||
// This also double-check by trying again. If we have more then one entry here
|
||||
// this will throw an exception that causes a rollback
|
||||
return this.clientPingMapper
|
||||
.pingRecordIdByConnectionId(connectionId);
|
||||
|
||||
} catch (final Exception e) {
|
||||
|
||||
log.warn("Detected multiple client ping entries for connection: " + connectionId
|
||||
+ ". Force rollback to prevent");
|
||||
|
||||
// force rollback
|
||||
throw new RuntimeException("Detected multiple client ping entries");
|
||||
}
|
||||
}
|
||||
|
||||
return recordId;
|
||||
} catch (final Exception e) {
|
||||
|
||||
log.error("Failed to initialize ping for connection -> {}", connectionId, e);
|
||||
|
||||
// force rollback
|
||||
throw new RuntimeException("Failed to initialize ping for connection -> " + connectionId, e);
|
||||
}
|
||||
}
|
||||
|
||||
/** Deletes a existing SEB client ping indicator for a given SEB client connection identifier
|
||||
* on the persistent storage.
|
||||
*
|
||||
* @param connectionId SEB client connection identifier */
|
||||
@Transactional
|
||||
public void deletePingIndicator(final Long connectionId) {
|
||||
try {
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("*** Delete ping record for SEB connection: {}", connectionId);
|
||||
}
|
||||
|
||||
final Collection<ClientLastPingRecord> records = this.clientPingMapper
|
||||
.selectByExample()
|
||||
.where(ClientIndicatorRecordDynamicSqlSupport.clientConnectionId, isEqualTo(connectionId))
|
||||
.and(ClientIndicatorRecordDynamicSqlSupport.type, isEqualTo(ClientIndicatorType.LAST_PING.id))
|
||||
.build()
|
||||
.execute();
|
||||
|
||||
if (records == null || records.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
final Long id = records.iterator().next().id;
|
||||
this.pingCache.remove(id);
|
||||
this.clientIndicatorRecordMapper.deleteByPrimaryKey(id);
|
||||
|
||||
} catch (final Exception e) {
|
||||
log.error("Failed to delete ping for connection -> {}", connectionId, e);
|
||||
try {
|
||||
log.info(
|
||||
"Because of failed ping record deletion, "
|
||||
+ "flushing the ping cache to ensure no dead connections pings remain in the cache");
|
||||
this.pingCache.clear();
|
||||
} catch (final Exception ee) {
|
||||
log.error("Failed to force flushing the ping cache: ", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Use this to get the last ping time indicator value with a given indicator identifier (PK)
|
||||
* This fist tries to get the ping time from internal cache. If not present, tries to get
|
||||
* the ping indicator value from persistent storage and put it to the cache.
|
||||
*
|
||||
* @param pingRecordId The ping indicator record id (PK). Get one for a given SEB client connection identifier by
|
||||
* calling: initPingForConnection
|
||||
* @return The actual (last) ping time. */
|
||||
public Long getLastPing(final Long pingRecordId) {
|
||||
try {
|
||||
|
||||
Long ping = this.pingCache.get(pingRecordId);
|
||||
if (ping == null) {
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("*** Get and cache ping time: {}", pingRecordId);
|
||||
}
|
||||
|
||||
ping = this.clientPingMapper.selectPingTimeByPrimaryKey(pingRecordId);
|
||||
}
|
||||
|
||||
return ping;
|
||||
} catch (final Exception e) {
|
||||
log.error("Error while trying to get last ping from storage: {}", e.getMessage());
|
||||
return 0L;
|
||||
}
|
||||
}
|
||||
|
||||
/** Updates the internal ping cache by loading all actual SEB client ping indicators from persistent storage
|
||||
* and put it in the cache.
|
||||
* This is internally periodically scheduled by the task scheduler but also implements an execution drop if
|
||||
* the last update was less then 2/3 of the schedule interval ago. This is to prevent task queue overflows
|
||||
* and wait with update when there is a persistent storage leak or a lot of network latency. */
|
||||
private void updatePingCache() {
|
||||
if (this.pingCache.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
final long millisecondsNow = Utils.getMillisecondsNow();
|
||||
if (millisecondsNow - this.lastUpdate < this.pingUpdateTolerance) {
|
||||
log.warn("Skip ping update schedule because the last one was less then 2 seconds ago");
|
||||
return;
|
||||
}
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.trace("*** Update distributed ping cache: {}", this.pingCache);
|
||||
}
|
||||
|
||||
try {
|
||||
|
||||
final Map<Long, Long> mapping = this.clientPingMapper
|
||||
.selectByExample()
|
||||
.where(
|
||||
ClientIndicatorRecordDynamicSqlSupport.type,
|
||||
isEqualTo(ClientIndicatorType.LAST_PING.id))
|
||||
.build()
|
||||
.execute()
|
||||
.stream()
|
||||
.collect(Collectors.toMap(entry -> entry.id, entry -> entry.lastPingTime));
|
||||
|
||||
if (mapping != null) {
|
||||
this.pingCache.clear();
|
||||
this.pingCache.putAll(mapping);
|
||||
this.lastUpdate = millisecondsNow;
|
||||
}
|
||||
|
||||
} catch (final Exception e) {
|
||||
log.error("Error while trying to update distributed ping cache: {}", this.pingCache, e);
|
||||
}
|
||||
|
||||
this.lastUpdate = millisecondsNow;
|
||||
}
|
||||
|
||||
/** Update last ping time on persistent storage asynchronously within a defines thread pool with no
|
||||
* waiting queue to skip further ping updates if all update threads are busy **/
|
||||
void updatePingAsync(final PingUpdate pingUpdate) {
|
||||
try {
|
||||
this.pingUpdateExecutor.execute(pingUpdate);
|
||||
} catch (final Exception e) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.warn("Failed to schedule ping task: {}" + e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Create a PingUpdate for a specified SEB client connectionId.
|
||||
*
|
||||
* @param connectionId SEB client connection identifier
|
||||
* @return PingUpdate for a specified SEB client connectionId */
|
||||
PingUpdate createPingUpdate(final Long connectionId) {
|
||||
return new PingUpdate(
|
||||
this.clientPingMapper,
|
||||
this.initPingForConnection(connectionId));
|
||||
}
|
||||
|
||||
/** Encapsulates a SEB client ping update on persistent storage */
|
||||
static final class PingUpdate implements Runnable {
|
||||
|
||||
private final ClientPingMapper clientPingMapper;
|
||||
final Long pingRecord;
|
||||
|
||||
public PingUpdate(final ClientPingMapper clientPingMapper, final Long pingRecord) {
|
||||
this.clientPingMapper = clientPingMapper;
|
||||
this.pingRecord = pingRecord;
|
||||
}
|
||||
|
||||
@Override
|
||||
/** Processes the ping update on persistent storage by using the current time stamp. */
|
||||
public void run() {
|
||||
try {
|
||||
this.clientPingMapper
|
||||
.updatePingTime(this.pingRecord, Utils.getMillisecondsNow());
|
||||
} catch (final Exception e) {
|
||||
log.error("Failed to update ping: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() throws Exception {
|
||||
if (this.taskRef != null) {
|
||||
try {
|
||||
final boolean cancel = this.taskRef.cancel(true);
|
||||
if (!cancel) {
|
||||
log.warn("Failed to cancel distributed ping cache update task");
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
log.error("Failed to cancel distributed ping cache update task: ", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -22,8 +22,11 @@ import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecord
|
|||
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
||||
public final class ErrorLogCountClientIndicator extends AbstractLogLevelCountIndicator {
|
||||
|
||||
protected ErrorLogCountClientIndicator(final ClientEventRecordMapper clientEventRecordMapper) {
|
||||
super(clientEventRecordMapper, EventType.ERROR_LOG);
|
||||
protected ErrorLogCountClientIndicator(
|
||||
final DistributedIndicatorValueService distributedPingCache,
|
||||
final ClientEventRecordMapper clientEventRecordMapper) {
|
||||
|
||||
super(distributedPingCache, clientEventRecordMapper, EventType.ERROR_LOG);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -31,9 +34,4 @@ public final class ErrorLogCountClientIndicator extends AbstractLogLevelCountInd
|
|||
return IndicatorType.ERROR_COUNT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientIndicatorType indicatorType() {
|
||||
return ClientIndicatorType.ERROR_LOG_COUNT;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -22,8 +22,11 @@ import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecord
|
|||
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
||||
public class InfoLogCountClientIndicator extends AbstractLogLevelCountIndicator {
|
||||
|
||||
protected InfoLogCountClientIndicator(final ClientEventRecordMapper clientEventRecordMapper) {
|
||||
super(clientEventRecordMapper, EventType.INFO_LOG);
|
||||
protected InfoLogCountClientIndicator(
|
||||
final DistributedIndicatorValueService distributedPingCache,
|
||||
final ClientEventRecordMapper clientEventRecordMapper) {
|
||||
|
||||
super(distributedPingCache, clientEventRecordMapper, EventType.INFO_LOG);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -31,9 +34,4 @@ public class InfoLogCountClientIndicator extends AbstractLogLevelCountIndicator
|
|||
return IndicatorType.INFO_COUNT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientIndicatorType indicatorType() {
|
||||
return ClientIndicatorType.INFO_LOG_COUNT;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import ch.ethz.seb.sebserver.gbl.Constants;
|
|||
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator;
|
||||
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator.IndicatorType;
|
||||
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent;
|
||||
import ch.ethz.seb.sebserver.gbl.util.Utils;
|
||||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
|
||||
|
||||
@Lazy
|
||||
|
@ -42,11 +43,16 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator {
|
|||
private boolean missingPing = false;
|
||||
private boolean hidden = false;
|
||||
|
||||
public PingIntervalClientIndicator(final DistributedPingService distributedPingCache) {
|
||||
public PingIntervalClientIndicator(final DistributedIndicatorValueService distributedPingCache) {
|
||||
super(distributedPingCache);
|
||||
this.cachingEnabled = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long initValue() {
|
||||
return Utils.getMillisecondsNow();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(
|
||||
final Indicator indicatorDefinition,
|
||||
|
@ -56,13 +62,9 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator {
|
|||
|
||||
super.init(indicatorDefinition, connectionId, active, cachingEnabled);
|
||||
|
||||
final long now = DateTimeUtils.currentTimeMillis();
|
||||
this.currentValue = computeValueAt(now);
|
||||
if (Double.isNaN(this.currentValue)) {
|
||||
this.currentValue = now;
|
||||
}
|
||||
|
||||
// init ping error threshold
|
||||
try {
|
||||
|
||||
indicatorDefinition
|
||||
.getThresholds()
|
||||
.stream()
|
||||
|
@ -74,10 +76,10 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator {
|
|||
this.pingErrorThreshold = DEFAULT_PING_ERROR_THRESHOLD;
|
||||
}
|
||||
|
||||
// init missing ping indicator
|
||||
if (!cachingEnabled) {
|
||||
try {
|
||||
final double value = getValue();
|
||||
this.missingPing = this.pingErrorThreshold < value;
|
||||
this.missingPing = this.pingErrorThreshold < getValue();
|
||||
} catch (final Exception e) {
|
||||
log.error("Failed to initialize missingPing: {}", e.getMessage());
|
||||
this.missingPing = true;
|
||||
|
@ -86,11 +88,6 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientIndicatorType indicatorType() {
|
||||
return ClientIndicatorType.LAST_PING;
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public final boolean isMissingPing() {
|
||||
return this.missingPing;
|
||||
|
@ -113,8 +110,10 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator {
|
|||
|
||||
@Override
|
||||
public double getValue() {
|
||||
final double value = super.getValue();
|
||||
return DateTimeUtils.currentTimeMillis() - value;
|
||||
if (!this.initialized) {
|
||||
return Double.NaN;
|
||||
}
|
||||
return DateTimeUtils.currentTimeMillis() - this.currentValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -129,9 +128,12 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator {
|
|||
|
||||
@Override
|
||||
public final double computeValueAt(final long timestamp) {
|
||||
if (!this.cachingEnabled && super.pingUpdate != null) {
|
||||
|
||||
final Long lastPing = this.distributedPingCache.getLastPing(super.pingUpdate.pingRecord);
|
||||
if (!this.cachingEnabled && super.ditributedIndicatorValueRecordId != null) {
|
||||
|
||||
final Long lastPing = this.distributedPingCache
|
||||
.getIndicatorValue(super.ditributedIndicatorValueRecordId);
|
||||
|
||||
if (lastPing != null) {
|
||||
final double doubleValue = lastPing.doubleValue();
|
||||
return Math.max(Double.isNaN(this.currentValue) ? doubleValue : this.currentValue, doubleValue);
|
||||
|
@ -140,11 +142,15 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator {
|
|||
return this.currentValue;
|
||||
}
|
||||
|
||||
return !this.valueInitializes ? timestamp : this.currentValue;
|
||||
return !this.initialized ? timestamp : this.currentValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean missingPingUpdate(final long now) {
|
||||
if (this.currentValue <= 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
final long value = now - (long) super.currentValue;
|
||||
if (this.missingPing) {
|
||||
if (this.pingErrorThreshold > value) {
|
||||
|
|
|
@ -24,8 +24,11 @@ import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecord
|
|||
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
||||
public class WLANStatusIndicator extends AbstractLogNumberIndicator {
|
||||
|
||||
protected WLANStatusIndicator(final ClientEventRecordMapper clientEventRecordMapper) {
|
||||
super(clientEventRecordMapper, EventType.INFO_LOG);
|
||||
protected WLANStatusIndicator(
|
||||
final DistributedIndicatorValueService distributedPingCache,
|
||||
final ClientEventRecordMapper clientEventRecordMapper) {
|
||||
|
||||
super(distributedPingCache, clientEventRecordMapper, EventType.INFO_LOG);
|
||||
super.tags = new String[] { API.LOG_EVENT_TAG_WLAN_STATUS };
|
||||
}
|
||||
|
||||
|
@ -45,9 +48,4 @@ public class WLANStatusIndicator extends AbstractLogNumberIndicator {
|
|||
return IndicatorType.WLAN_STATUS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientIndicatorType indicatorType() {
|
||||
return ClientIndicatorType.WLAN_STATUS;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -22,8 +22,11 @@ import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecord
|
|||
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
||||
public class WarnLogCountClientIndicator extends AbstractLogLevelCountIndicator {
|
||||
|
||||
protected WarnLogCountClientIndicator(final ClientEventRecordMapper clientEventRecordMapper) {
|
||||
super(clientEventRecordMapper, EventType.WARN_LOG);
|
||||
protected WarnLogCountClientIndicator(
|
||||
final DistributedIndicatorValueService distributedPingCache,
|
||||
final ClientEventRecordMapper clientEventRecordMapper) {
|
||||
|
||||
super(distributedPingCache, clientEventRecordMapper, EventType.WARN_LOG);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -31,8 +34,4 @@ public class WarnLogCountClientIndicator extends AbstractLogLevelCountIndicator
|
|||
return IndicatorType.WARN_COUNT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientIndicatorType indicatorType() {
|
||||
return ClientIndicatorType.WARN_LOG_COUNT;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@ spring.datasource.hikari.leakDetectionThreshold=2000
|
|||
sebserver.http.client.connect-timeout=15000
|
||||
sebserver.http.client.connection-request-timeout=10000
|
||||
sebserver.http.client.read-timeout=20000
|
||||
sebserver.webservice.distributed.pingUpdate=1000
|
||||
sebserver.webservice.distributed.updateInterval=1000
|
||||
sebserver.webservice.distributed.connectionUpdate=2000
|
||||
sebserver.webservice.clean-db-on-startup=false
|
||||
|
||||
|
|
|
@ -39,7 +39,7 @@ sebserver.webservice.internalSecret=${sebserver.password}
|
|||
### webservice networking
|
||||
sebserver.webservice.forceMaster=false
|
||||
sebserver.webservice.distributed=true
|
||||
sebserver.webservice.distributed.pingUpdate=3000
|
||||
sebserver.webservice.distributed.updateInterval=3000
|
||||
sebserver.webservice.http.external.scheme=https
|
||||
sebserver.webservice.http.external.servername=
|
||||
sebserver.webservice.http.external.port=
|
||||
|
|
|
@ -245,7 +245,7 @@ public class ModelObjectJSONGenerator {
|
|||
System.out.println(writerWithDefaultPrettyPrinter.writeValueAsString(domainObject));
|
||||
|
||||
domainObject =
|
||||
new SimpleIndicatorValue(1L, IndicatorType.LAST_PING, 1.0);
|
||||
new SimpleIndicatorValue(1L, 1.0);
|
||||
System.out.println(domainObject.getClass().getSimpleName() + ":");
|
||||
System.out.println(writerWithDefaultPrettyPrinter.writeValueAsString(domainObject));
|
||||
final long currentTimeMillis = System.currentTimeMillis();
|
||||
|
@ -267,9 +267,9 @@ public class ModelObjectJSONGenerator {
|
|||
null,
|
||||
null),
|
||||
Arrays.asList(
|
||||
new SimpleIndicatorValue(1L, IndicatorType.LAST_PING, 1.0),
|
||||
new SimpleIndicatorValue(2L, IndicatorType.ERROR_COUNT, 2.0),
|
||||
new SimpleIndicatorValue(3L, IndicatorType.WARN_COUNT, 3.0)));
|
||||
new SimpleIndicatorValue(1L, 1.0),
|
||||
new SimpleIndicatorValue(2L, 2.0),
|
||||
new SimpleIndicatorValue(3L, 3.0)));
|
||||
System.out.println(domainObject.getClass().getSimpleName() + ":");
|
||||
System.out.println(writerWithDefaultPrettyPrinter.writeValueAsString(domainObject));
|
||||
|
||||
|
|
|
@ -2149,7 +2149,7 @@ public class UseCasesIntegrationTest extends GuiIntegrationTest {
|
|||
assertEquals(exam.id, conData.clientConnection.examId);
|
||||
assertFalse(conData.indicatorValues.isEmpty());
|
||||
final IndicatorValue indicatorValue = conData.indicatorValues.get(0);
|
||||
assertEquals("LAST_PING", indicatorValue.getType().name);
|
||||
assertEquals("1", String.valueOf(indicatorValue.getIndicatorId())); // LAST_PING indicator
|
||||
|
||||
// disable connection
|
||||
final Result<String> disableCall = restService.getBuilder(DisableClientConnection.class)
|
||||
|
|
|
@ -28,7 +28,6 @@ import ch.ethz.seb.sebserver.gbl.api.API;
|
|||
import ch.ethz.seb.sebserver.gbl.api.APIMessage;
|
||||
import ch.ethz.seb.sebserver.gbl.api.APIMessage.ErrorMessage;
|
||||
import ch.ethz.seb.sebserver.gbl.api.JSONMapper;
|
||||
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator.IndicatorType;
|
||||
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
|
||||
import ch.ethz.seb.sebserver.gbl.model.session.IndicatorValue;
|
||||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientConnectionRecordMapper;
|
||||
|
@ -466,7 +465,7 @@ public class SebConnectionTest extends ExamAPIIntegrationTester {
|
|||
assertNotNull(ccdi.clientConnection.examId);
|
||||
assertFalse(ccdi.indicatorValues.isEmpty());
|
||||
final IndicatorValue pingIndicator = ccdi.indicatorValues.iterator().next();
|
||||
assertTrue(pingIndicator.getType() == IndicatorType.LAST_PING);
|
||||
assertTrue(pingIndicator.getIndicatorId() == 1L);
|
||||
|
||||
super.sendPing(accessToken, connectionToken, 1);
|
||||
Thread.sleep(200);
|
||||
|
@ -524,7 +523,7 @@ public class SebConnectionTest extends ExamAPIIntegrationTester {
|
|||
assertNotNull(ccdi.clientConnection.examId);
|
||||
assertFalse(ccdi.indicatorValues.isEmpty());
|
||||
final IndicatorValue pingIndicator = ccdi.indicatorValues.iterator().next();
|
||||
assertTrue(pingIndicator.getType() == IndicatorType.LAST_PING);
|
||||
assertTrue(pingIndicator.getIndicatorId() == 1L);
|
||||
|
||||
MockHttpServletResponse sendEvent = super.sendEvent(
|
||||
accessToken,
|
||||
|
|
|
@ -32,6 +32,7 @@ import ch.ethz.seb.sebserver.webservice.integration.api.admin.AdministrationAPII
|
|||
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ClientConnectionDAO;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ClientEventDAO;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.FilterMap;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.ClientIndicator;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientConnectionService;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator.AbstractLogIndicator;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator.AbstractLogLevelCountIndicator;
|
||||
|
@ -94,27 +95,27 @@ public class ClientEventServiceTest extends AdministrationAPIIntegrationTester {
|
|||
assertNotNull(connectionData);
|
||||
final Optional<? extends IndicatorValue> findFirst = connectionData.indicatorValues
|
||||
.stream()
|
||||
.filter(indicator -> indicator.getType() == IndicatorType.ERROR_COUNT)
|
||||
.filter(indicator -> ((ClientIndicator) indicator).getType() == IndicatorType.ERROR_COUNT)
|
||||
.findFirst();
|
||||
assertTrue(findFirst.isPresent());
|
||||
final IndicatorValue clientIndicator = findFirst.get();
|
||||
assertEquals("0", IndicatorValue.getDisplayValue(clientIndicator));
|
||||
assertEquals("0", IndicatorValue.getDisplayValue(clientIndicator, IndicatorType.ERROR_COUNT));
|
||||
|
||||
this.sebClientConnectionService.notifyClientEvent(
|
||||
"token1",
|
||||
new ClientEvent(null, connection.id, EventType.ERROR_LOG, 1L, 1L, 1.0, "some error"));
|
||||
waitForExecutor();
|
||||
assertEquals("1", IndicatorValue.getDisplayValue(clientIndicator));
|
||||
assertEquals("1", IndicatorValue.getDisplayValue(clientIndicator, IndicatorType.ERROR_COUNT));
|
||||
|
||||
this.sebClientConnectionService.notifyClientEvent(
|
||||
"token1",
|
||||
new ClientEvent(null, connection.id, EventType.ERROR_LOG, 1L, 1L, 1.0, "some error"));
|
||||
waitForExecutor();
|
||||
assertEquals("2", IndicatorValue.getDisplayValue(clientIndicator));
|
||||
assertEquals("2", IndicatorValue.getDisplayValue(clientIndicator, IndicatorType.ERROR_COUNT));
|
||||
|
||||
// test reset indicator value and load it from persistent storage
|
||||
((AbstractLogLevelCountIndicator) clientIndicator).reset();
|
||||
assertEquals("2", IndicatorValue.getDisplayValue(clientIndicator));
|
||||
assertEquals("2", IndicatorValue.getDisplayValue(clientIndicator, IndicatorType.ERROR_COUNT));
|
||||
|
||||
}
|
||||
|
||||
|
@ -138,37 +139,37 @@ public class ClientEventServiceTest extends AdministrationAPIIntegrationTester {
|
|||
assertNotNull(connectionData);
|
||||
final Optional<? extends IndicatorValue> findFirst = connectionData.indicatorValues
|
||||
.stream()
|
||||
.filter(indicator -> indicator.getType() == IndicatorType.INFO_COUNT)
|
||||
.filter(indicator -> ((ClientIndicator) indicator).getType() == IndicatorType.INFO_COUNT)
|
||||
.findFirst();
|
||||
assertTrue(findFirst.isPresent());
|
||||
final IndicatorValue clientIndicator = findFirst.get();
|
||||
assertEquals("0", IndicatorValue.getDisplayValue(clientIndicator));
|
||||
assertEquals("0", IndicatorValue.getDisplayValue(clientIndicator, IndicatorType.INFO_COUNT));
|
||||
|
||||
this.sebClientConnectionService.notifyClientEvent(
|
||||
"token2",
|
||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "some error"));
|
||||
waitForExecutor();
|
||||
assertEquals("0", IndicatorValue.getDisplayValue(clientIndicator));
|
||||
assertEquals("0", IndicatorValue.getDisplayValue(clientIndicator, IndicatorType.INFO_COUNT));
|
||||
this.sebClientConnectionService.notifyClientEvent(
|
||||
"token2",
|
||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "<top> some error"));
|
||||
waitForExecutor();
|
||||
assertEquals("1", IndicatorValue.getDisplayValue(clientIndicator));
|
||||
assertEquals("1", IndicatorValue.getDisplayValue(clientIndicator, IndicatorType.INFO_COUNT));
|
||||
this.sebClientConnectionService.notifyClientEvent(
|
||||
"token2",
|
||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "some error"));
|
||||
waitForExecutor();
|
||||
assertEquals("1", IndicatorValue.getDisplayValue(clientIndicator));
|
||||
assertEquals("1", IndicatorValue.getDisplayValue(clientIndicator, IndicatorType.INFO_COUNT));
|
||||
this.sebClientConnectionService.notifyClientEvent(
|
||||
"token2",
|
||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "<vip> some error"));
|
||||
waitForExecutor();
|
||||
assertEquals("2", IndicatorValue.getDisplayValue(clientIndicator));
|
||||
assertEquals("2", IndicatorValue.getDisplayValue(clientIndicator, IndicatorType.INFO_COUNT));
|
||||
this.sebClientConnectionService.notifyClientEvent(
|
||||
"token2",
|
||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "some error"));
|
||||
waitForExecutor();
|
||||
assertEquals("2", IndicatorValue.getDisplayValue(clientIndicator));
|
||||
assertEquals("2", IndicatorValue.getDisplayValue(clientIndicator, IndicatorType.INFO_COUNT));
|
||||
|
||||
this.sebClientConnectionService.notifyClientEvent(
|
||||
"token2",
|
||||
|
@ -176,7 +177,7 @@ public class ClientEventServiceTest extends AdministrationAPIIntegrationTester {
|
|||
waitForExecutor();
|
||||
// test reset indicator value and load it from persistent storage
|
||||
((AbstractLogLevelCountIndicator) clientIndicator).reset();
|
||||
assertEquals("3", IndicatorValue.getDisplayValue(clientIndicator));
|
||||
assertEquals("3", IndicatorValue.getDisplayValue(clientIndicator, IndicatorType.INFO_COUNT));
|
||||
|
||||
}
|
||||
|
||||
|
@ -210,12 +211,12 @@ public class ClientEventServiceTest extends AdministrationAPIIntegrationTester {
|
|||
assertNotNull(connectionData);
|
||||
final Optional<? extends IndicatorValue> findFirst = connectionData.indicatorValues
|
||||
.stream()
|
||||
.filter(indicator -> indicator.getType() == IndicatorType.BATTERY_STATUS)
|
||||
.filter(indicator -> ((ClientIndicator) indicator).getType() == IndicatorType.BATTERY_STATUS)
|
||||
.findFirst();
|
||||
assertTrue(findFirst.isPresent());
|
||||
final IndicatorValue clientIndicator = findFirst.get();
|
||||
|
||||
assertEquals("--", IndicatorValue.getDisplayValue(clientIndicator));
|
||||
assertEquals("--", IndicatorValue.getDisplayValue(clientIndicator, IndicatorType.BATTERY_STATUS));
|
||||
|
||||
this.sebClientConnectionService.notifyClientEvent(
|
||||
"token3",
|
||||
|
@ -225,23 +226,23 @@ public class ClientEventServiceTest extends AdministrationAPIIntegrationTester {
|
|||
"token3",
|
||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "<vip> some info other"));
|
||||
waitForExecutor();
|
||||
assertEquals("--", IndicatorValue.getDisplayValue(clientIndicator));
|
||||
assertEquals("--", IndicatorValue.getDisplayValue(clientIndicator, IndicatorType.BATTERY_STATUS));
|
||||
|
||||
this.sebClientConnectionService.notifyClientEvent(
|
||||
"token3",
|
||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 90.0, "<battery> some info other"));
|
||||
waitForExecutor();
|
||||
assertEquals("90", IndicatorValue.getDisplayValue(clientIndicator));
|
||||
assertEquals("90", IndicatorValue.getDisplayValue(clientIndicator, IndicatorType.BATTERY_STATUS));
|
||||
|
||||
this.sebClientConnectionService.notifyClientEvent(
|
||||
"token3",
|
||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 40.0, "<battery> some info other"));
|
||||
waitForExecutor();
|
||||
assertEquals("40", IndicatorValue.getDisplayValue(clientIndicator));
|
||||
assertEquals("40", IndicatorValue.getDisplayValue(clientIndicator, IndicatorType.BATTERY_STATUS));
|
||||
|
||||
// test reset indicator value and load it from persistent storage
|
||||
((AbstractLogIndicator) clientIndicator).reset();
|
||||
assertEquals("40", IndicatorValue.getDisplayValue(clientIndicator));
|
||||
assertEquals("40", IndicatorValue.getDisplayValue(clientIndicator, IndicatorType.BATTERY_STATUS));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -11,19 +11,23 @@ package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
|
||||
import ch.ethz.seb.sebserver.gbl.api.JSONMapper;
|
||||
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator;
|
||||
|
||||
public class IndicatorValueJSONTest {
|
||||
|
||||
@Test
|
||||
public void testJSONForExtendedIndicatorValue() throws JsonProcessingException {
|
||||
final JSONMapper jsonMapper = new JSONMapper();
|
||||
final ErrorLogCountClientIndicator indicator = new ErrorLogCountClientIndicator(null);
|
||||
final DistributedIndicatorValueService mock = Mockito.mock(DistributedIndicatorValueService.class);
|
||||
final ErrorLogCountClientIndicator indicator = new ErrorLogCountClientIndicator(mock, null);
|
||||
indicator.init(new Indicator(1L, null, null, null, null, null, null, null), 2L, true, true);
|
||||
final String json = jsonMapper.writeValueAsString(indicator);
|
||||
assertEquals("{\"indicatorType\":\"ERROR_COUNT\",\"indicatorValue\":\"NaN\"}", json);
|
||||
assertEquals("{\"indicatorId\":1,\"indicatorValue\":\"NaN\"}", json);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ import org.mockito.Mockito;
|
|||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
|
||||
import ch.ethz.seb.sebserver.gbl.api.JSONMapper;
|
||||
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator;
|
||||
|
||||
public class PingIntervalClientIndicatorTest {
|
||||
|
||||
|
@ -31,10 +32,12 @@ public class PingIntervalClientIndicatorTest {
|
|||
|
||||
DateTimeUtils.setCurrentMillisProvider(() -> 1L);
|
||||
|
||||
final DistributedPingService distributedPingCache = Mockito.mock(DistributedPingService.class);
|
||||
final DistributedIndicatorValueService distributedPingCache =
|
||||
Mockito.mock(DistributedIndicatorValueService.class);
|
||||
|
||||
final PingIntervalClientIndicator pingIntervalClientIndicator =
|
||||
new PingIntervalClientIndicator(distributedPingCache);
|
||||
pingIntervalClientIndicator.init(null, null, true, true);
|
||||
assertEquals("0.0", String.valueOf(pingIntervalClientIndicator.getValue()));
|
||||
}
|
||||
|
||||
|
@ -43,10 +46,12 @@ public class PingIntervalClientIndicatorTest {
|
|||
|
||||
DateTimeUtils.setCurrentMillisProvider(() -> 1L);
|
||||
|
||||
final DistributedPingService distributedPingCache = Mockito.mock(DistributedPingService.class);
|
||||
final DistributedIndicatorValueService distributedPingCache =
|
||||
Mockito.mock(DistributedIndicatorValueService.class);
|
||||
|
||||
final PingIntervalClientIndicator pingIntervalClientIndicator =
|
||||
new PingIntervalClientIndicator(distributedPingCache);
|
||||
pingIntervalClientIndicator.init(null, null, true, true);
|
||||
assertEquals("0.0", String.valueOf(pingIntervalClientIndicator.getValue()));
|
||||
|
||||
DateTimeUtils.setCurrentMillisProvider(() -> 10L);
|
||||
|
@ -58,13 +63,15 @@ public class PingIntervalClientIndicatorTest {
|
|||
public void testSerialization() throws JsonProcessingException {
|
||||
DateTimeUtils.setCurrentMillisProvider(() -> 1L);
|
||||
|
||||
final DistributedPingService distributedPingCache = Mockito.mock(DistributedPingService.class);
|
||||
final DistributedIndicatorValueService distributedPingCache =
|
||||
Mockito.mock(DistributedIndicatorValueService.class);
|
||||
|
||||
final PingIntervalClientIndicator pingIntervalClientIndicator =
|
||||
new PingIntervalClientIndicator(distributedPingCache);
|
||||
pingIntervalClientIndicator.init(new Indicator(2L, null, null, null, null, null, null, null), 1L, true, true);
|
||||
final JSONMapper jsonMapper = new JSONMapper();
|
||||
final String json = jsonMapper.writeValueAsString(pingIntervalClientIndicator);
|
||||
assertEquals("{\"indicatorValue\":0.0,\"indicatorType\":\"LAST_PING\"}", json);
|
||||
assertEquals("{\"indicatorId\":2,\"indicatorValue\":0.0}", json);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue