SEBSERV-160 more back-end implementation
This commit is contained in:
parent
53bddff84f
commit
5b8d920a4f
4 changed files with 85 additions and 13 deletions
|
@ -22,10 +22,11 @@ public interface BatchActionService {
|
||||||
|
|
||||||
/** Use this to register a new batch action for further processing.
|
/** Use this to register a new batch action for further processing.
|
||||||
*
|
*
|
||||||
|
* @param institutionId The institution identifier
|
||||||
* @param actionType The batch action type
|
* @param actionType The batch action type
|
||||||
* @param ids comma separated String of model ids to process
|
* @param ids comma separated String of model ids to process
|
||||||
* @return Result refer to the stored batch action or to an error when happened */
|
* @return Result refer to the stored batch action or to an error when happened */
|
||||||
Result<BatchAction> registerNewBatchAction(BatchActionType actionType, String ids);
|
Result<BatchAction> registerNewBatchAction(final Long institutionId, BatchActionType actionType, String ids);
|
||||||
|
|
||||||
/** Use this to get all currently running batch actions for a given institution.
|
/** Use this to get all currently running batch actions for a given institution.
|
||||||
*
|
*
|
||||||
|
|
|
@ -9,17 +9,22 @@
|
||||||
package ch.ethz.seb.sebserver.webservice.servicelayer.bulkaction.impl;
|
package ch.ethz.seb.sebserver.webservice.servicelayer.bulkaction.impl;
|
||||||
|
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.EnumMap;
|
import java.util.EnumMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.scheduling.TaskScheduler;
|
import org.springframework.scheduling.TaskScheduler;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import ch.ethz.seb.sebserver.gbl.Constants;
|
||||||
|
import ch.ethz.seb.sebserver.gbl.api.API;
|
||||||
import ch.ethz.seb.sebserver.gbl.api.API.BatchActionType;
|
import ch.ethz.seb.sebserver.gbl.api.API.BatchActionType;
|
||||||
import ch.ethz.seb.sebserver.gbl.api.EntityType;
|
import ch.ethz.seb.sebserver.gbl.api.EntityType;
|
||||||
import ch.ethz.seb.sebserver.gbl.model.BatchAction;
|
import ch.ethz.seb.sebserver.gbl.model.BatchAction;
|
||||||
|
@ -29,6 +34,7 @@ import ch.ethz.seb.sebserver.gbl.util.Result;
|
||||||
import ch.ethz.seb.sebserver.webservice.servicelayer.bulkaction.BatchActionExec;
|
import ch.ethz.seb.sebserver.webservice.servicelayer.bulkaction.BatchActionExec;
|
||||||
import ch.ethz.seb.sebserver.webservice.servicelayer.bulkaction.BatchActionService;
|
import ch.ethz.seb.sebserver.webservice.servicelayer.bulkaction.BatchActionService;
|
||||||
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.BatchActionDAO;
|
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.BatchActionDAO;
|
||||||
|
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.FilterMap;
|
||||||
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ResourceNotFoundException;
|
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ResourceNotFoundException;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
|
@ -59,27 +65,59 @@ public class BatchActionServiceImpl implements BatchActionService {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Result<BatchAction> registerNewBatchAction(final BatchActionType actionType, final String ids) {
|
public Result<BatchAction> registerNewBatchAction(
|
||||||
// TODO Auto-generated method stub
|
final Long institutionId,
|
||||||
return null;
|
final BatchActionType actionType,
|
||||||
|
final String ids) {
|
||||||
|
|
||||||
|
return Result.tryCatch(() -> {
|
||||||
|
|
||||||
|
final Collection<String> sourceIds = Arrays.asList(StringUtils.split(
|
||||||
|
ids,
|
||||||
|
Constants.LIST_SEPARATOR));
|
||||||
|
|
||||||
|
return this.batchActionDAO
|
||||||
|
.createNew(new BatchAction(null, institutionId, actionType, sourceIds, null, null, null))
|
||||||
|
.map(res -> {
|
||||||
|
processNextBatchAction();
|
||||||
|
return res;
|
||||||
|
})
|
||||||
|
.getOrThrow();
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Result<Collection<BatchAction>> getRunningActions(final Long institutionId) {
|
public Result<Collection<BatchAction>> getRunningActions(final Long institutionId) {
|
||||||
// TODO Auto-generated method stub
|
return this.batchActionDAO.allMatching(new FilterMap().putIfAbsent(
|
||||||
return null;
|
API.PARAM_INSTITUTION_ID,
|
||||||
|
String.valueOf(institutionId)))
|
||||||
|
.map(results -> results.stream()
|
||||||
|
.filter(action -> StringUtils.isNotBlank(action.processorId) &&
|
||||||
|
!action.processorId.endsWith(BatchActionDAO.FLAG_FINISHED))
|
||||||
|
.collect(Collectors.toList()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Result<Collection<BatchAction>> getRunningActions(final Long institutionId, final EntityType entityType) {
|
public Result<Collection<BatchAction>> getRunningActions(final Long institutionId, final EntityType entityType) {
|
||||||
// TODO Auto-generated method stub
|
return this.batchActionDAO.allMatching(new FilterMap().putIfAbsent(
|
||||||
return null;
|
API.PARAM_INSTITUTION_ID,
|
||||||
|
String.valueOf(institutionId)))
|
||||||
|
.map(results -> results.stream()
|
||||||
|
.filter(action -> StringUtils.isNotBlank(action.processorId) &&
|
||||||
|
!action.processorId.endsWith(BatchActionDAO.FLAG_FINISHED))
|
||||||
|
.filter(action -> action.actionType.entityType == entityType)
|
||||||
|
.collect(Collectors.toList()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Result<Collection<BatchAction>> getFinishedActions(final Long institutionId) {
|
public Result<Collection<BatchAction>> getFinishedActions(final Long institutionId) {
|
||||||
// TODO Auto-generated method stub
|
return this.batchActionDAO.allMatching(new FilterMap().putIfAbsent(
|
||||||
return null;
|
API.PARAM_INSTITUTION_ID,
|
||||||
|
String.valueOf(institutionId)))
|
||||||
|
.map(results -> results.stream()
|
||||||
|
.filter(action -> StringUtils.isNotBlank(action.processorId) &&
|
||||||
|
action.processorId.endsWith(BatchActionDAO.FLAG_FINISHED))
|
||||||
|
.collect(Collectors.toList()));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processNextBatchAction() {
|
private void processNextBatchAction() {
|
||||||
|
@ -161,7 +199,6 @@ public class BatchActionServiceImpl implements BatchActionService {
|
||||||
log.info("Skip this batch action.");
|
log.info("Skip this batch action.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,10 +15,30 @@ import ch.ethz.seb.sebserver.gbl.util.Result;
|
||||||
|
|
||||||
public interface BatchActionDAO extends EntityDAO<BatchAction, BatchAction> {
|
public interface BatchActionDAO extends EntityDAO<BatchAction, BatchAction> {
|
||||||
|
|
||||||
|
public static final String FLAG_FINISHED = "_FINISHED";
|
||||||
|
|
||||||
|
/** This checks if there is a pending batch action to process next.
|
||||||
|
* If so this reserves the pending batch action and mark it to be processed
|
||||||
|
* by the given pocessId.
|
||||||
|
* If there is no pending batch action this results with a ResourceNotFoundException.
|
||||||
|
*
|
||||||
|
* @param processId The process id to reserve a pending batch action before processing
|
||||||
|
* @return Result refer to the batch action to process or to an error when happened */
|
||||||
Result<BatchAction> getAndReserveNext(String processId);
|
Result<BatchAction> getAndReserveNext(String processId);
|
||||||
|
|
||||||
|
/** Use this to update the processing of a running batch action
|
||||||
|
*
|
||||||
|
* @param actionId The batch action identifier
|
||||||
|
* @param processId The process identifier (must match with the processId on persistent storage)
|
||||||
|
* @param modelIds Collection of model identifiers of entities that has successfully been processed.
|
||||||
|
* @return Result refer to the involved batch action or to an error when happened. */
|
||||||
Result<BatchAction> updateProgress(Long actionId, String processId, Collection<String> modelIds);
|
Result<BatchAction> updateProgress(Long actionId, String processId, Collection<String> modelIds);
|
||||||
|
|
||||||
|
/** Use this to mark processing of a single entity of a specified batch action as successful completed.
|
||||||
|
*
|
||||||
|
* @param actionId The batch action identifier
|
||||||
|
* @param processId The process identifier (must match with the processId on persistent storage)
|
||||||
|
* @param modelId The model identifier to mark as completed for the given batch action */
|
||||||
void setSuccessfull(Long actionId, String processId, String modelId);
|
void setSuccessfull(Long actionId, String processId, String modelId);
|
||||||
|
|
||||||
/** This is used by a processing background task that is processing a batch action to finish up
|
/** This is used by a processing background task that is processing a batch action to finish up
|
||||||
|
|
|
@ -48,8 +48,6 @@ import ch.ethz.seb.sebserver.webservice.servicelayer.dao.TransactionHandler;
|
||||||
@WebServiceProfile
|
@WebServiceProfile
|
||||||
public class BatchActionDAOImpl implements BatchActionDAO {
|
public class BatchActionDAOImpl implements BatchActionDAO {
|
||||||
|
|
||||||
private static final String FLAG_FINISHED = "_FINISHED";
|
|
||||||
|
|
||||||
private final BatchActionRecordMapper batchActionRecordMapper;
|
private final BatchActionRecordMapper batchActionRecordMapper;
|
||||||
|
|
||||||
public BatchActionDAOImpl(final BatchActionRecordMapper batchActionRecordMapper) {
|
public BatchActionDAOImpl(final BatchActionRecordMapper batchActionRecordMapper) {
|
||||||
|
@ -140,6 +138,22 @@ public class BatchActionDAOImpl implements BatchActionDAO {
|
||||||
public void setSuccessfull(final Long actionId, final String processId, final String modelId) {
|
public void setSuccessfull(final Long actionId, final String processId, final String modelId) {
|
||||||
try {
|
try {
|
||||||
|
|
||||||
|
final BatchActionRecord rec = this.batchActionRecordMapper.selectByPrimaryKey(actionId);
|
||||||
|
|
||||||
|
if (!processId.equals(rec.getProcessorId())) {
|
||||||
|
throw new RuntimeException("Batch action processor id mismatch: " + processId + " " + rec);
|
||||||
|
}
|
||||||
|
|
||||||
|
final BatchActionRecord newRecord = new BatchActionRecord(
|
||||||
|
actionId,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
rec.getSuccessful() + Constants.LIST_SEPARATOR + modelId,
|
||||||
|
Utils.getMillisecondsNow(),
|
||||||
|
processId);
|
||||||
|
this.batchActionRecordMapper.updateByPrimaryKeySelective(newRecord);
|
||||||
|
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
log.error("Failed to mark entity sucessfuly processed: modelId: {}, processId");
|
log.error("Failed to mark entity sucessfuly processed: modelId: {}, processId");
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue