refactoring

This commit is contained in:
Michele Artini 2023-09-19 12:36:14 +02:00
parent 700dc7a33f
commit 4610fb6ab5
113 changed files with 1716 additions and 691 deletions

View File

@ -37,7 +37,12 @@ server {
location /resource-manager/ {
proxy_pass http://resource-manager:8080/resource-manager/;
}
location /vocabulary-manager/ {
proxy_pass http://vocabulary-manager:8080/vocabulary-manager/;
}
location /wf-exec-postgres/ {
proxy_pass http://wf-exec-postgres:8080/wf-exec/;
}

View File

@ -0,0 +1,14 @@
# Fetching latest version of Java
FROM openjdk:18
# Setting up work directory
WORKDIR /app
# Copy the jar file into our app
COPY ./target/*.jar /app/app.jar
# Exposing port 8080
EXPOSE 8080
# Starting the application
CMD ["java", "-jar", "app.jar"]

View File

@ -1,31 +1,31 @@
<?xml version="1.0" encoding="utf-8"?>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>eu.dnetlib.docker</groupId>
<artifactId>apps</artifactId>
<version>7.0.0-SNAPSHOT</version>
</parent>
<parent>
<groupId>eu.dnetlib.docker</groupId>
<artifactId>apps</artifactId>
<version>7.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dnet-transformer-postgres</artifactId>
<artifactId>dnet-vocabulary-manager</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>dnet-db-common</artifactId>
<version>${project.version}</version>
</dependency>
<!-- hot swapping, disable cache for template, enable live reload -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
<!-- Tests -->
<dependency>
<groupId>org.junit.jupiter</groupId>

View File

@ -0,0 +1,22 @@
package eu.dnetlib.services.vocabularies;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import eu.dnetlib.common.app.AbstractDnetApp;
import eu.dnetlib.domain.context.Category;
import eu.dnetlib.domain.context.ConceptLevel0;
import eu.dnetlib.domain.context.ConceptLevel1;
import eu.dnetlib.domain.context.ConceptLevel2;
import eu.dnetlib.domain.context.Context;
@SpringBootApplication
@EntityScan(basePackageClasses = { Context.class, Category.class, ConceptLevel0.class, ConceptLevel1.class, ConceptLevel2.class })
public class VocabulariesApplication extends AbstractDnetApp {
public static void main(final String[] args) {
SpringApplication.run(VocabulariesApplication.class, args);
}
}

View File

@ -1,4 +1,4 @@
package eu.dnetlib.service.transform.controller;
package eu.dnetlib.services.vocabularies.controller;
import java.util.List;
@ -9,7 +9,7 @@ import org.springframework.web.bind.annotation.PathVariable;
import eu.dnetlib.common.controller.DnetRestController;
import eu.dnetlib.domain.vocabulary.Vocabulary;
import eu.dnetlib.domain.vocabulary.VocabularyTerm;
import eu.dnetlib.service.transform.vocabulary.VocabularyService;
import eu.dnetlib.services.vocabularies.service.VocabularyService;
public class AbstractVocabularyController extends DnetRestController {

View File

@ -1,4 +1,4 @@
package eu.dnetlib.service.transform.controller;
package eu.dnetlib.services.vocabularies.controller;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@ -12,7 +12,7 @@ import org.springframework.web.bind.annotation.RestController;
import eu.dnetlib.common.controller.DnetRestController;
import eu.dnetlib.domain.vocabulary.Vocabulary;
import eu.dnetlib.service.transform.importer.VocabularyImporter;
import eu.dnetlib.services.vocabularies.importer.VocabularyImporter;
import jakarta.servlet.http.HttpServletRequest;
@RestController

View File

@ -1,4 +1,4 @@
package eu.dnetlib.service.transform.controller;
package eu.dnetlib.services.vocabularies.controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

View File

@ -1,10 +1,10 @@
package eu.dnetlib.service.collector.controller;
package eu.dnetlib.services.vocabularies.controller;
import org.springframework.stereotype.Controller;
import eu.dnetlib.common.controller.DnetMainController;
@Controller
public class CollectorMainController extends DnetMainController {
public class VocabularyMainController extends DnetMainController {
}

View File

@ -1,4 +1,4 @@
package eu.dnetlib.service.transform.importer;
package eu.dnetlib.services.vocabularies.importer;
import org.dom4j.Document;
import org.dom4j.DocumentHelper;
@ -9,8 +9,8 @@ import org.springframework.stereotype.Service;
import eu.dnetlib.domain.vocabulary.Synonym;
import eu.dnetlib.domain.vocabulary.Vocabulary;
import eu.dnetlib.domain.vocabulary.VocabularyTerm;
import eu.dnetlib.service.transform.vocabulary.VocabularyRepository;
import eu.dnetlib.service.transform.vocabulary.VocabularyTermRepository;
import eu.dnetlib.services.vocabularies.repository.VocabularyRepository;
import eu.dnetlib.services.vocabularies.repository.VocabularyTermRepository;
import jakarta.transaction.Transactional;
@Service

View File

@ -1,4 +1,4 @@
package eu.dnetlib.service.transform.vocabulary;
package eu.dnetlib.services.vocabularies.repository;
import org.springframework.data.jpa.repository.JpaRepository;

View File

@ -1,4 +1,4 @@
package eu.dnetlib.service.transform.vocabulary;
package eu.dnetlib.services.vocabularies.repository;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;

View File

@ -1,4 +1,4 @@
package eu.dnetlib.service.transform.vocabulary;
package eu.dnetlib.services.vocabularies.service;
import java.util.List;
import java.util.stream.Collectors;
@ -13,6 +13,8 @@ import eu.dnetlib.domain.vocabulary.Vocabulary;
import eu.dnetlib.domain.vocabulary.VocabularyTerm;
import eu.dnetlib.domain.vocabulary.VocabularyTermPK;
import eu.dnetlib.errors.InformationServiceException;
import eu.dnetlib.services.vocabularies.repository.VocabularyRepository;
import eu.dnetlib.services.vocabularies.repository.VocabularyTermRepository;
import jakarta.transaction.Transactional;
@Service

View File

@ -0,0 +1,6 @@
server.title = D-NET Vocabulary Manager
server.description = D-NET Vocabulary Manager
server.servlet.context-path = /vocabulary-manager
server.public_url =
maven.pom.path = /META-INF/maven/eu.dnetlib.docker/dnet-vocabulary-manager/effective-pom.xml

View File

@ -13,6 +13,12 @@
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>dnet-wf-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>dnet-db-common</artifactId>

View File

@ -1,46 +0,0 @@
package eu.dnetlib.service.collector.utils;
import org.dom4j.Document;
import org.dom4j.DocumentHelper;
import eu.dnetlib.domain.mdstore.records.MetadataRecord;
import eu.dnetlib.domain.mdstore.records.MetadataRecordImpl;
import eu.dnetlib.domain.mdstore.records.Provenance;
import eu.dnetlib.domain.dsm.ApiDesc;
public class MetadataBuilder {
private final String xpath;
private final Provenance provenance;
public MetadataBuilder(final ApiDesc api) {
this.xpath = api.getMetadataIdentifierPath();
this.provenance = new Provenance(api.getDatasourceId(), api.getDatasourceName(), api.getNsPrefix());
}
public MetadataRecord mapRecord(final String xml) throws Exception {
try {
final Document doc = DocumentHelper.parseText(xml);
final String origId = doc.valueOf(xpath);
// TODO populate the xml record with missing oaf fields, see:
// https://svn.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/repo-hi/xslt/openaireMdBuilder.xslt.st
final MetadataRecord md = new MetadataRecordImpl();
md.setId(MetadataRecord.generateIdentifier(origId, provenance.getNsPrefix()));
md.setOriginalId(origId);
md.setBody(doc.asXML());
md.setEncoding("XML");
md.setDateOfCollection(System.currentTimeMillis());
md.setDateOfTransformation(null);
md.setProvenance(provenance);
return md;
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -1,4 +1,4 @@
package eu.dnetlib.manager;
package eu.dnetlib.wfs;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

View File

@ -19,6 +19,12 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>dnet-wf-common</artifactId>
<version>${project.version}</version>
</dependency>
<!-- hot swapping, disable cache for template, enable live reload -->
<dependency>
<groupId>org.springframework.boot</groupId>

View File

@ -1,17 +0,0 @@
package eu.dnetlib.manager.wf.client;
import java.time.LocalDateTime;
import eu.dnetlib.domain.dsm.Api;
public class CollectorClient {
public void collect(final Api api) {
// TODO Auto-generated method stub
}
public void collect(final Api api, final LocalDateTime from, final LocalDateTime until) {
// TODO Auto-generated method stub
}
}

View File

@ -1,20 +0,0 @@
package eu.dnetlib.manager.wf.client;
public class TransformerClient {
public void clean(final String inputMdId, final String outputMdId, final String ruleId) {
// TODO Auto-generated method stub
}
public void filter(final String inputMdId, final String outputMdId, final String xpath) {
// TODO Auto-generated method stub
}
public void transform(final String inputMdId, final String outputMdId, final String ruleId) {
// TODO Auto-generated method stub
}
}

View File

@ -1,36 +0,0 @@
package eu.dnetlib.manager.wf.controller;
import java.util.Arrays;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import eu.dnetlib.common.controller.DnetRestController;
import eu.dnetlib.manager.wf.importer.WfHistoryImporter;
@RestController
@RequestMapping("/import")
public class ImporterController extends DnetRestController {
// EXAMPLE:
// find ./VocabularyDSResourceType/ -name "*.xml" -exec curl -X POST "http://localhost:8280/api/import/vocabulary" -H "accept: */*" -H
// "Content-Type: text/plain" --data-binary @{} \;
// find ./DedupConfigurationDSResources/ -name "*.xml" -exec curl -X POST "http://localhost:8280/api/import/resource" -H "accept: */*"
// -H "Content-Type: text/plain" --data-binary @{} \;
@Autowired
private WfHistoryImporter wfHistoryImporter;
@GetMapping(value = "/wf_logs")
public List<String> importWfLogs(@RequestParam final String path) throws Exception {
// mongoexport -d dnet_logs -c wf_logs --jsonArray -o /tmp/mongodump.json
wfHistoryImporter.load(path);
return Arrays.asList("Done.");
}
}

View File

@ -1,4 +1,4 @@
package eu.dnetlib.service.transform.controller;
package eu.dnetlib.manager.wf.controller;
import java.nio.charset.StandardCharsets;
@ -13,7 +13,7 @@ import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import eu.dnetlib.common.controller.DnetRestController;
import eu.dnetlib.service.transform.cleaner.CleanerFactory;
import eu.dnetlib.wfs.utils.cleaner.CleanerFactory;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;

View File

@ -1,4 +1,4 @@
package eu.dnetlib.service.transform.controller;
package eu.dnetlib.manager.wf.controller;
import org.springframework.stereotype.Controller;

View File

@ -11,7 +11,7 @@ import org.springframework.web.bind.annotation.RestController;
import eu.dnetlib.common.controller.DnetRestController;
import eu.dnetlib.domain.wfs.WfHistoryEntry;
import eu.dnetlib.manager.wf.service.WorkflowLogger;
import eu.dnetlib.wfs.utils.WorkflowLogger;
@RestController
@RequestMapping("/ajax/wf_history")

View File

@ -18,7 +18,7 @@ import eu.dnetlib.domain.wfs.WorkflowConfiguration;
import eu.dnetlib.domain.wfs.WorkflowSection;
import eu.dnetlib.domain.wfs.WorkflowSubscription;
import eu.dnetlib.manager.wf.service.WorkflowManagerService;
import eu.dnetlib.manager.wf.workflows.procs.ExecutionStatus;
import eu.dnetlib.wfs.procs.ExecutionStatus;
@RestController
@RequestMapping("/api/wfs")

View File

@ -12,11 +12,11 @@ import org.springframework.scheduling.support.CronExpression;
import org.springframework.stereotype.Service;
import eu.dnetlib.domain.wfs.WorkflowConfiguration;
import eu.dnetlib.manager.wf.repository.WorkflowConfigurationRepository;
import eu.dnetlib.manager.wf.service.WorkflowLogger;
import eu.dnetlib.manager.wf.service.WorkflowManagerService;
import eu.dnetlib.manager.wf.workflows.procs.ProcessRegistry;
import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess;
import eu.dnetlib.wfs.procs.ProcessRegistry;
import eu.dnetlib.wfs.procs.WorkflowProcess;
import eu.dnetlib.wfs.repository.WorkflowConfigurationRepository;
import eu.dnetlib.wfs.utils.WorkflowLogger;
@Service
public class ScheduledWorkflowLauncher {
@ -43,19 +43,19 @@ public class ScheduledWorkflowLauncher {
log.debug("Verifying scheduled workflows - START");
workflowConfigurationRepository.findAll()
.stream()
.filter(WorkflowConfiguration::isEnabled)
.filter(WorkflowConfiguration::isConfigured)
.filter(WorkflowConfiguration::isSchedulingEnabled)
.filter(this::isNotRunning)
.filter(this::isReady)
.forEach(conf -> {
try {
wfManagerService.startWorkflowConfiguration(conf);
} catch (final Exception e) {
log.error("Error launching scheduled wf conf: " + conf.getId(), e);
}
});
.stream()
.filter(WorkflowConfiguration::isEnabled)
.filter(WorkflowConfiguration::isConfigured)
.filter(WorkflowConfiguration::isSchedulingEnabled)
.filter(this::isNotRunning)
.filter(this::isReady)
.forEach(conf -> {
try {
wfManagerService.startWorkflowConfiguration(conf);
} catch (final Exception e) {
log.error("Error launching scheduled wf conf: " + conf.getId(), e);
}
});
log.debug("Verifying scheduled workflows - END");
}
@ -72,7 +72,7 @@ public class ScheduledWorkflowLauncher {
final boolean res;
if (lastExecutionDate != null) {
final long elapsed = ChronoUnit.MINUTES.between(lastExecutionDate, now);
res = elapsed > minInterval && verifyCron(cron, now);
res = (elapsed > minInterval) && verifyCron(cron, now);
} else {
res = verifyCron(cron, now);
}
@ -96,8 +96,8 @@ public class ScheduledWorkflowLauncher {
private LocalDateTime calculateLastExecutionDate(final String id) {
return logger.getLastLogForConfiguration(id)
.map(e -> e.getEndDate())
.orElse(LocalDateTime.MIN);
.map(e -> e.getEndDate())
.orElse(LocalDateTime.MIN);
}
private boolean verifyCron(final String cronExpression, final LocalDateTime now) {

View File

@ -20,7 +20,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.domain.wfs.WfHistoryEntry;
import eu.dnetlib.manager.wf.repository.WfHistoryEntryRepository;
import eu.dnetlib.wfs.repository.WfHistoryEntryRepository;
@Service
public class WfHistoryImporter {
@ -35,7 +35,7 @@ public class WfHistoryImporter {
final JsonNode rootNode = mapper.readTree(new File(path));
rootNode.forEach(wf -> saveWf(wf));
rootNode.forEach(this::saveWf);
}
private void saveWf(final JsonNode node) {
@ -56,7 +56,7 @@ public class WfHistoryImporter {
}
wf.setStartDate(LocalDateTime
.ofInstant(Instant.ofEpochMilli(NumberUtils.toLong(node.get("system:startDate").asText())), TimeZone.getDefault().toZoneId()));
.ofInstant(Instant.ofEpochMilli(NumberUtils.toLong(node.get("system:startDate").asText())), TimeZone.getDefault().toZoneId()));
wf.setEndDate(LocalDateTime.ofInstant(Instant.ofEpochMilli(NumberUtils.toLong(node.get("system:endDate").asText())), TimeZone.getDefault().toZoneId()));

View File

@ -1,8 +0,0 @@
package eu.dnetlib.manager.wf.nodes;
public enum NodeStatus {
CONFIGURED,
NOT_CONFIGURED,
DISABLED,
SYSTEM
}

View File

@ -1,25 +0,0 @@
package eu.dnetlib.manager.wf.nodes.collect;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.domain.dsm.Api;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.client.CollectorClient;
import eu.dnetlib.manager.wf.nodes.AbstractJobNode;
@WfNode("collect")
public class CollectNode extends AbstractJobNode {
@WfInputParam
private Api api;
@Autowired
private CollectorClient collector;
@Override
protected void execute() throws Exception {
collector.collect(api);
}
}

View File

@ -1,33 +0,0 @@
package eu.dnetlib.manager.wf.nodes.collect;
import java.time.LocalDateTime;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.domain.dsm.Api;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.client.CollectorClient;
import eu.dnetlib.manager.wf.nodes.AbstractJobNode;
@WfNode("dateRangeCollect")
public class DateRangeCollectNode extends AbstractJobNode {
@WfInputParam
private Api api;
@WfInputParam
private LocalDateTime from;
@WfInputParam
private LocalDateTime until;
@Autowired
private CollectorClient collector;
@Override
protected void execute() {
collector.collect(api, from, until);
}
}

View File

@ -1,51 +0,0 @@
package eu.dnetlib.manager.wf.nodes.ds;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.domain.dsm.Api;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.annotations.WfOutputParam;
import eu.dnetlib.manager.wf.client.DsmClient;
import eu.dnetlib.manager.wf.nodes.AbstractJobNode;
@WfNode("updateApiExtraFields")
public class UpdateApiExtraFieldsNode extends AbstractJobNode {
@WfInputParam
@WfOutputParam
private Api api;
@WfInputParam
private String infoType; // COLLECT, TRANSFORM, DOWNLOAD
@WfInputParam
private String mdId;
@WfInputParam
private long total;
@Autowired
private DsmClient dsm;
@Override
protected void execute() throws Exception {
switch (infoType.toUpperCase()) {
case "COLLECT":
api = dsm.updateUpdateApiCollectionInfo(api.getId(), mdId, total);
break;
case "AGGREGATOR":
case "TRANSFORM":
api = dsm.updateUpdateApiAggregationInfo(api.getId(), mdId, total);
break;
case "DOWNLOAD":
api = dsm.updateUpdateApiDownloadInfo(api.getId(), mdId, total);
break;
default:
throw new RuntimeException("Invalid infoType: " + infoType);
}
}
}

View File

@ -1,24 +0,0 @@
package eu.dnetlib.manager.wf.nodes.store;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.client.MDStoreClient;
import eu.dnetlib.manager.wf.nodes.AbstractJobNode;
@WfNode("deleteMdStore")
public class DeleteMdStoreNode extends AbstractJobNode {
@WfInputParam
private String mdId;
@Autowired
private MDStoreClient mdStore;
@Override
protected void execute() throws Exception {
mdStore.deleteMdStore(mdId);
}
}

View File

@ -1,30 +0,0 @@
package eu.dnetlib.manager.wf.nodes.transfrom;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.client.TransformerClient;
import eu.dnetlib.manager.wf.nodes.AbstractJobNode;
@WfNode("clean")
public class MetadataCleanerNode extends AbstractJobNode {
@WfInputParam
private String inputMdId;
@WfInputParam
private String outputMdId;
@WfInputParam
private String ruleId;
@Autowired
private TransformerClient transformer;
@Override
protected void execute() throws Exception {
transformer.clean(inputMdId, outputMdId, ruleId);
}
}

View File

@ -1,31 +0,0 @@
package eu.dnetlib.manager.wf.nodes.transfrom;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.client.TransformerClient;
import eu.dnetlib.manager.wf.nodes.AbstractJobNode;
@WfNode("xpath_filter")
public class MetadataFilterNode extends AbstractJobNode {
@WfInputParam
private String inputMdId;
@WfInputParam
private String outputMdId;
@WfInputParam
private String xpath;
@Autowired
private TransformerClient transformer;
@Override
protected void execute() throws Exception {
transformer.filter(inputMdId, outputMdId, xpath);
}
}

View File

@ -1,30 +0,0 @@
package eu.dnetlib.manager.wf.nodes.transfrom;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.client.TransformerClient;
import eu.dnetlib.manager.wf.nodes.AbstractJobNode;
@WfNode("applyXslt")
public class MetadataXsltTransformNode extends AbstractJobNode {
@WfInputParam
private String inputMdId;
@WfInputParam
private String outputMdId;
@WfInputParam
private String ruleId;
@Autowired
private TransformerClient transformer;
@Override
protected void execute() throws Exception {
transformer.transform(inputMdId, outputMdId, ruleId);
}
}

View File

@ -22,18 +22,18 @@ import eu.dnetlib.domain.wfs.WorkflowTemplate;
import eu.dnetlib.errors.WorkflowManagerException;
import eu.dnetlib.manager.wf.client.DsmClient;
import eu.dnetlib.manager.wf.client.SimpleResourceClient;
import eu.dnetlib.manager.wf.repository.WorkflowConfigurationRepository;
import eu.dnetlib.manager.wf.repository.WorkflowSectionRepository;
import eu.dnetlib.manager.wf.repository.WorkflowSubscriptionRepository;
import eu.dnetlib.manager.wf.workflows.procs.ExecutionStatus;
import eu.dnetlib.manager.wf.workflows.procs.ProcessEngine;
import eu.dnetlib.manager.wf.workflows.procs.ProcessFactory;
import eu.dnetlib.manager.wf.workflows.procs.ProcessRegistry;
import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess;
import eu.dnetlib.manager.wf.workflows.util.ProcessCallback;
import eu.dnetlib.manager.wf.workflows.util.WorkflowsConstants;
import eu.dnetlib.utils.Stoppable;
import eu.dnetlib.utils.StoppableDetails;
import eu.dnetlib.wfs.procs.ExecutionStatus;
import eu.dnetlib.wfs.procs.ProcessEngine;
import eu.dnetlib.wfs.procs.ProcessFactory;
import eu.dnetlib.wfs.procs.ProcessRegistry;
import eu.dnetlib.wfs.procs.WorkflowProcess;
import eu.dnetlib.wfs.repository.WorkflowConfigurationRepository;
import eu.dnetlib.wfs.repository.WorkflowSubscriptionRepository;
import eu.dnetlib.wfs.utils.ProcessCallback;
import eu.dnetlib.wfs.utils.WorkflowsConstants;
import jakarta.transaction.Transactional;
@Service
@ -64,7 +64,7 @@ public class WorkflowManagerService implements Stoppable {
@PostConstruct
public void init() {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
if (isPaused() || processRegistry.countRunningWfs() >= WorkflowsConstants.MAX_RUNNING_PROCS_SIZE) { return; }
if (isPaused() || (processRegistry.countRunningWfs() >= WorkflowsConstants.MAX_RUNNING_PROCS_SIZE)) { return; }
final WorkflowProcess process = processRegistry.nextProcessToStart();
if (process != null) {
@ -80,9 +80,9 @@ public class WorkflowManagerService implements Stoppable {
}
public ExecutionStatus startRepoHiWorkflow(final String wfId,
final String dsId,
final String apiId,
final ProcessCallback callback) throws WorkflowManagerException {
final String dsId,
final String apiId,
final ProcessCallback callback) throws WorkflowManagerException {
if (isPaused()) {
log.warn("Wf " + wfId + " not launched, because WorkflowExecutor is preparing for shutdown");
@ -125,9 +125,9 @@ public class WorkflowManagerService implements Stoppable {
}
public ExecutionStatus startWorkflow(final String wfId,
final WorkflowConfiguration conf,
final ProcessCallback callback)
throws WorkflowManagerException {
final WorkflowConfiguration conf,
final ProcessCallback callback)
throws WorkflowManagerException {
if (!conf.isEnabled() || !conf.isConfigured()) {
log.warn("Wf configuration " + conf.getId() + " is not ready to start");
@ -135,12 +135,12 @@ public class WorkflowManagerService implements Stoppable {
}
final SimpleResource wfMetadata = simpleResourceClient.findResource(wfId);
if (!wfMetadata.getType().equals("wf_template")) { throw new WorkflowManagerException("WF not found: " + conf.getWorkflow()); }
if (!"wf_template".equals(wfMetadata.getType())) { throw new WorkflowManagerException("WF not found: " + conf.getWorkflow()); }
final WorkflowTemplate wfTmpl = simpleResourceClient.findResourceContent(wfId, WorkflowTemplate.class);
final WorkflowProcess process =
processFactory.newProcess(wfMetadata, wfTmpl, conf, callback);
processFactory.newProcess(wfMetadata, wfTmpl, conf, callback);
processRegistry.registerProcess(process, conf);
@ -162,7 +162,7 @@ public class WorkflowManagerService implements Stoppable {
final int count = processRegistry.countRunningWfs();
final StoppableDetails.StopStatus status =
isPaused() ? count == 0 ? StoppableDetails.StopStatus.STOPPED : StoppableDetails.StopStatus.STOPPING : StoppableDetails.StopStatus.RUNNING;
isPaused() ? count == 0 ? StoppableDetails.StopStatus.STOPPED : StoppableDetails.StopStatus.STOPPING : StoppableDetails.StopStatus.RUNNING;
return new StoppableDetails("D-NET workflow manager", "Running workflows: " + count, status);
}

View File

@ -1,11 +0,0 @@
package eu.dnetlib.manager.wf.workflows.util;
import eu.dnetlib.manager.wf.workflows.procs.Token;
public interface NodeCallback {
void onComplete(Token t);
void onFail(Token t);
}

View File

@ -12,7 +12,7 @@ import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import eu.dnetlib.manager.wf.workflows.procs.Env;
import eu.dnetlib.wfs.procs.Env;
public class GraphLoaderTest {

View File

@ -15,6 +15,7 @@
<module>dnet-resource-manager</module>
<module>dnet-mdstore-manager</module>
<module>dnet-context-manager</module>
<module>dnet-vocabulary-manager</module>
<module>dnet-datasource-manager</module>
<module>dnet-wf-manager</module>
<module>dnet-wf-executor-postgres</module>

View File

@ -0,0 +1,113 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>eu.dnetlib.docker</groupId>
<artifactId>libs</artifactId>
<version>7.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dnet-wf-common</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>dnet-app-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>net.sf.saxon</groupId>
<artifactId>Saxon-HE</artifactId>
</dependency>
<dependency>
<groupId>com.github.sisyphsu</groupId>
<artifactId>dateparser</artifactId>
<version>1.0.11</version>
</dependency>
<!-- for jpa annotations -->
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-jpa</artifactId>
<scope>provided</scope>
</dependency>
<!-- for /metrics and /health controllers -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<dependency>
<groupId>org.apache.maven</groupId>
<artifactId>maven-model</artifactId>
</dependency>
<!-- Swagger -->
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
</dependency>
<!-- for /metrics and /health controllers -->
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_spring_boot</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_hotspot</artifactId>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_servlet</artifactId>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_spring_web</artifactId>
</dependency>
<!-- Tests -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -1,4 +1,4 @@
package eu.dnetlib.manager.wf.annotations;
package eu.dnetlib.wfs.annotations;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;

View File

@ -1,4 +1,4 @@
package eu.dnetlib.manager.wf.annotations;
package eu.dnetlib.wfs.annotations;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;

View File

@ -1,4 +1,4 @@
package eu.dnetlib.manager.wf.annotations;
package eu.dnetlib.wfs.annotations;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;

View File

@ -1,8 +1,8 @@
package eu.dnetlib.manager.wf.workflows.graph;
package eu.dnetlib.wfs.graph;
import java.util.function.Function;
import eu.dnetlib.manager.wf.workflows.procs.Env;
import eu.dnetlib.wfs.procs.Env;
public class Arc {

View File

@ -1,4 +1,4 @@
package eu.dnetlib.manager.wf.workflows.graph;
package eu.dnetlib.wfs.graph;
import java.util.ArrayList;
import java.util.Collection;
@ -11,7 +11,7 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.manager.wf.workflows.procs.Env;
import eu.dnetlib.wfs.procs.Env;
public class Graph {

View File

@ -1,4 +1,4 @@
package eu.dnetlib.manager.wf.workflows.graph;
package eu.dnetlib.wfs.graph;
import java.util.Collection;
import java.util.HashSet;
@ -23,9 +23,9 @@ import org.springframework.stereotype.Service;
import eu.dnetlib.domain.wfs.WorkflowTemplate;
import eu.dnetlib.errors.WorkflowManagerException;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.nodes.ProcessNode;
import eu.dnetlib.manager.wf.workflows.procs.Env;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.nodes.ProcessNode;
import eu.dnetlib.wfs.procs.Env;
@Service
public class GraphLoader {
@ -60,7 +60,7 @@ public class GraphLoader {
}
public Graph loadGraph(final WorkflowTemplate wfTemplate, final Map<String, String> globalParams)
throws WorkflowManagerException {
throws WorkflowManagerException {
final Graph graph = new Graph();
graph.addNode(GraphNode.newSuccessNode());
@ -82,7 +82,7 @@ public class GraphLoader {
graph.addNode(GraphNode.newNode(nodeName, nodeType, params, envParams));
}
if (node.getArcs() == null || node.getArcs().isEmpty()) {
if ((node.getArcs() == null) || node.getArcs().isEmpty()) {
graph.addArc(new Arc(nodeName, GraphNode.SUCCESS_NODE, generateFunction(null)));
} else {
for (final WorkflowTemplate.Arc a : node.getArcs()) {
@ -100,18 +100,15 @@ public class GraphLoader {
}
private Function<Env, Boolean> generateFunction(final String condition) {
if (StringUtils.isBlank(condition)) {
return (env) -> true;
} else {
return env -> {
final ExpressionParser parser = new SpelExpressionParser();
if (StringUtils.isBlank(condition)) { return env -> true; }
return env -> {
final ExpressionParser parser = new SpelExpressionParser();
final StandardEvaluationContext context = new StandardEvaluationContext(env.getAttributes());
context.addPropertyAccessor(new MapAccessor());
final StandardEvaluationContext context = new StandardEvaluationContext(env.getAttributes());
context.addPropertyAccessor(new MapAccessor());
return parser.parseExpression(condition).getValue(context, Boolean.class);
};
}
return parser.parseExpression(condition).getValue(context, Boolean.class);
};
}
private void checkValidity(final Graph graph) throws WorkflowManagerException {
@ -143,7 +140,7 @@ public class GraphLoader {
if (n.isStart()) {
foundStart = true;
}
if (n.getType() != null && !validTypes.contains(n.getType())) { throw new WorkflowManagerException("Invalid node type: " + n.getType()); }
if ((n.getType() != null) && !validTypes.contains(n.getType())) { throw new WorkflowManagerException("Invalid node type: " + n.getType()); }
}
if (!foundStart) { throw new WorkflowManagerException("Start node not found"); }
}

View File

@ -1,4 +1,4 @@
package eu.dnetlib.manager.wf.workflows.graph;
package eu.dnetlib.wfs.graph;
import java.io.StringWriter;
import java.util.HashMap;

View File

@ -1,10 +1,10 @@
package eu.dnetlib.manager.wf.nodes;
package eu.dnetlib.wfs.nodes;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.dnetlib.manager.wf.workflows.procs.Token;
import eu.dnetlib.manager.wf.workflows.util.NodeCallback;
import eu.dnetlib.wfs.procs.Token;
import eu.dnetlib.wfs.utils.NodeCallback;
public abstract class AbstractJobNode extends ProcessNode {

View File

@ -1,4 +1,4 @@
package eu.dnetlib.manager.wf.nodes;
package eu.dnetlib.wfs.nodes;
public final class DefaultJobNode extends AbstractJobNode {

View File

@ -1,4 +1,4 @@
package eu.dnetlib.manager.wf.nodes;
package eu.dnetlib.wfs.nodes;
import java.util.HashMap;
import java.util.UUID;
@ -8,15 +8,15 @@ import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.domain.wfs.WorkflowConfiguration;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.service.WorkflowManagerService;
import eu.dnetlib.manager.wf.workflows.procs.ExecutionStatus;
import eu.dnetlib.manager.wf.workflows.procs.ProcessAware;
import eu.dnetlib.manager.wf.workflows.procs.Token;
import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess;
import eu.dnetlib.manager.wf.workflows.util.NodeCallback;
import eu.dnetlib.manager.wf.workflows.util.ProcessCallback;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.procs.ExecutionStatus;
import eu.dnetlib.wfs.procs.ProcessAware;
import eu.dnetlib.wfs.procs.Token;
import eu.dnetlib.wfs.procs.WorkflowProcess;
import eu.dnetlib.wfs.service.WfExecutorService;
import eu.dnetlib.wfs.utils.NodeCallback;
import eu.dnetlib.wfs.utils.ProcessCallback;
@WfNode("launchWorkflow")
public class LaunchWorkflowJobNode extends ProcessNode implements ProcessAware {
@ -27,7 +27,7 @@ public class LaunchWorkflowJobNode extends ProcessNode implements ProcessAware {
private String wfId;
@Autowired
private WorkflowManagerService wfManagerService;
private WfExecutorService wfExecutorService;
private WorkflowProcess process;
@ -52,7 +52,7 @@ public class LaunchWorkflowJobNode extends ProcessNode implements ProcessAware {
conf.setSystemParams(process.getGlobalParams());
conf.setUserParams(new HashMap<>());
final ExecutionStatus info = wfManagerService.startWorkflow(wfId, conf, new ProcessCallback() {
final ExecutionStatus info = wfExecutorService.startWorkflow(wfId, conf, new ProcessCallback() {
@Override
public void onSuccess(final WorkflowProcess t) {

View File

@ -0,0 +1,5 @@
package eu.dnetlib.wfs.nodes;
public enum NodeStatus {
CONFIGURED, NOT_CONFIGURED, DISABLED, SYSTEM
}

View File

@ -1,4 +1,4 @@
package eu.dnetlib.manager.wf.nodes;
package eu.dnetlib.wfs.nodes;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
@ -12,12 +12,12 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanNameAware;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.annotations.WfOutputParam;
import eu.dnetlib.manager.wf.workflows.procs.Env;
import eu.dnetlib.manager.wf.workflows.procs.Token;
import eu.dnetlib.manager.wf.workflows.util.NodeCallback;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.annotations.WfOutputParam;
import eu.dnetlib.wfs.procs.Env;
import eu.dnetlib.wfs.procs.Token;
import eu.dnetlib.wfs.utils.NodeCallback;
public abstract class ProcessNode implements BeanNameAware {
@ -94,7 +94,7 @@ public abstract class ProcessNode implements BeanNameAware {
}
public String getBeanName() {
return this.beanName;
return beanName;
}
@Override
@ -103,7 +103,7 @@ public abstract class ProcessNode implements BeanNameAware {
}
public String getNodeName() {
return this.nodeName;
return nodeName;
}
public void setNodeName(final String nodeName) {
@ -112,7 +112,7 @@ public abstract class ProcessNode implements BeanNameAware {
@Override
public String toString() {
return String.format("[node beanName=%s, name=%s, object: %s]", this.beanName, this.nodeName, super.toString());
return String.format("[node beanName=%s, name=%s, object: %s]", beanName, nodeName, super.toString());
}
private final Set<Field> findFields(final Class<?> clazz, final Class<? extends Annotation> ann) {

View File

@ -1,4 +1,4 @@
package eu.dnetlib.manager.wf.nodes;
package eu.dnetlib.wfs.nodes;
public class SuccessNode extends AbstractJobNode {

View File

@ -0,0 +1,36 @@
package eu.dnetlib.wfs.nodes.collect;
import java.util.stream.Stream;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.domain.dsm.Api;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.service.CollectorService;
import eu.dnetlib.wfs.stream.StreamSupplierNode;
@WfNode("collect")
public class CollectNode extends StreamSupplierNode<String> {
@WfInputParam
private Api api;
@Autowired
private CollectorService collectorService;
@Override
protected Stream<String> prepareStream() {
return collectorService.collect(api);
}
@Override
protected void before() {}
@Override
protected void after() {}
@Override
protected void abort() {}
}

View File

@ -0,0 +1,43 @@
package eu.dnetlib.wfs.nodes.collect;
import java.time.LocalDateTime;
import java.util.stream.Stream;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.domain.dsm.Api;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.service.CollectorService;
import eu.dnetlib.wfs.stream.StreamSupplierNode;
@WfNode("dateRangeCollect")
public class DateRangeCollectNode extends StreamSupplierNode<String> {
@WfInputParam
private Api api;
@WfInputParam
private LocalDateTime from;
@WfInputParam
private LocalDateTime until;
@Autowired
private CollectorService collectorService;
@Override
protected Stream<String> prepareStream() {
return collectorService.collect(api, from, until);
}
@Override
protected void before() {}
@Override
protected void after() {}
@Override
protected void abort() {}
}

View File

@ -1,12 +1,12 @@
package eu.dnetlib.manager.wf.nodes.collect;
package eu.dnetlib.wfs.nodes.collect;
import java.time.LocalDateTime;
import eu.dnetlib.domain.dsm.Api;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.annotations.WfOutputParam;
import eu.dnetlib.manager.wf.nodes.AbstractJobNode;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.annotations.WfOutputParam;
import eu.dnetlib.wfs.nodes.AbstractJobNode;
@WfNode("findDateRangeForIncrementalHarvesting")
public class FindDateRangeForIncrementalHarvestingNode extends AbstractJobNode {

View File

@ -1,9 +1,9 @@
package eu.dnetlib.manager.wf.nodes.conf;
package eu.dnetlib.wfs.nodes.conf;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.client.SimpleResourceClient;
import eu.dnetlib.manager.wf.nodes.AbstractJobNode;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.nodes.AbstractJobNode;
import eu.dnetlib.wfs.utils.clients.SimpleResourceClient;
@WfNode("deleteWfConfiguration")
public class DeleteWfConfigurationNode extends AbstractJobNode {

View File

@ -1,4 +1,4 @@
package eu.dnetlib.manager.wf.nodes.conf;
package eu.dnetlib.wfs.nodes.conf;
import java.util.HashMap;
import java.util.LinkedHashMap;
@ -10,11 +10,11 @@ import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.domain.dsm.Api;
import eu.dnetlib.domain.dsm.Datasource;
import eu.dnetlib.domain.wfs.WorkflowConfiguration;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.annotations.WfOutputParam;
import eu.dnetlib.manager.wf.nodes.AbstractJobNode;
import eu.dnetlib.manager.wf.repository.WorkflowConfigurationRepository;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.annotations.WfOutputParam;
import eu.dnetlib.wfs.nodes.AbstractJobNode;
import eu.dnetlib.wfs.repository.WorkflowConfigurationRepository;
@WfNode("registerWfConfiguration")
public class RegisterWfConfigurationNode extends AbstractJobNode {

View File

@ -1,11 +1,11 @@
package eu.dnetlib.manager.wf.nodes.ds;
package eu.dnetlib.wfs.nodes.ds;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.client.DsmClient;
import eu.dnetlib.manager.wf.nodes.AbstractJobNode;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.nodes.AbstractJobNode;
import eu.dnetlib.wfs.utils.clients.DsmClient;
@WfNode("clearApiExtraFields")
public class ClearApiExtraFieldsNode extends AbstractJobNode {

View File

@ -1,14 +1,14 @@
package eu.dnetlib.manager.wf.nodes.ds;
package eu.dnetlib.wfs.nodes.ds;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.domain.dsm.Api;
import eu.dnetlib.domain.dsm.Datasource;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.annotations.WfOutputParam;
import eu.dnetlib.manager.wf.client.DsmClient;
import eu.dnetlib.manager.wf.nodes.AbstractJobNode;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.annotations.WfOutputParam;
import eu.dnetlib.wfs.nodes.AbstractJobNode;
import eu.dnetlib.wfs.utils.clients.DsmClient;
@WfNode("loadDatasourceInfo")
public class LoadDatasourceInfoNode extends AbstractJobNode {

View File

@ -0,0 +1,43 @@
package eu.dnetlib.wfs.nodes.ds;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.domain.dsm.Api;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.annotations.WfOutputParam;
import eu.dnetlib.wfs.nodes.AbstractJobNode;
import eu.dnetlib.wfs.utils.clients.DsmClient;
@WfNode("updateApiExtraFields")
public class UpdateApiExtraFieldsNode extends AbstractJobNode {
@WfInputParam
@WfOutputParam
private Api api;
@WfInputParam
private String infoType; // COLLECT, TRANSFORM, DOWNLOAD
@WfInputParam
private String mdId;
@WfInputParam
private long total;
@Autowired
private DsmClient dsm;
@Override
protected void execute() throws Exception {
api = switch (infoType.toUpperCase()) {
case "COLLECT" -> dsm.updateUpdateApiCollectionInfo(api.getId(), mdId, total);
case "AGGREGATOR", "TRANSFORM" -> dsm.updateUpdateApiAggregationInfo(api.getId(), mdId, total);
case "DOWNLOAD" -> dsm.updateUpdateApiDownloadInfo(api.getId(), mdId, total);
default -> throw new RuntimeException("Invalid infoType: " + infoType);
};
}
}

View File

@ -1,13 +1,13 @@
package eu.dnetlib.manager.wf.nodes.ds;
package eu.dnetlib.wfs.nodes.ds;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.domain.dsm.Api;
import eu.dnetlib.domain.dsm.Datasource;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.client.DsmClient;
import eu.dnetlib.manager.wf.nodes.AbstractJobNode;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.nodes.AbstractJobNode;
import eu.dnetlib.wfs.utils.clients.DsmClient;
@WfNode("updateApiStatus")
public class UpdateApiStatusNode extends AbstractJobNode {

View File

@ -1,4 +1,4 @@
package eu.dnetlib.manager.wf.nodes.ds;
package eu.dnetlib.wfs.nodes.ds;
import java.util.Arrays;
@ -7,9 +7,9 @@ import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.domain.dsm.Api;
import eu.dnetlib.domain.dsm.Datasource;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.nodes.AbstractJobNode;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.nodes.AbstractJobNode;
@WfNode("verifyDatasource")
public class VerifyDatasourceNode extends AbstractJobNode {
@ -33,18 +33,18 @@ public class VerifyDatasourceNode extends AbstractJobNode {
if (StringUtils.isNotBlank(expectedEoscDatasourceTypes)) {
Arrays.stream(expectedEoscDatasourceTypes.split(","))
.filter(StringUtils::isNotBlank)
.filter(s -> s.equalsIgnoreCase(type))
.findFirst()
.orElseThrow(() -> new Exception("Invalid type: " + type));
.filter(StringUtils::isNotBlank)
.filter(s -> s.equalsIgnoreCase(type))
.findFirst()
.orElseThrow(() -> new Exception("Invalid type: " + type));
}
if (StringUtils.isNotBlank(expectedCompatibilities)) {
Arrays.stream(expectedCompatibilities.split(","))
.filter(StringUtils::isNotBlank)
.filter(s -> s.equalsIgnoreCase(compatibility))
.findFirst()
.orElseThrow(() -> new Exception("Invalid type: " + compatibility));;
.filter(StringUtils::isNotBlank)
.filter(s -> s.equalsIgnoreCase(compatibility))
.findFirst()
.orElseThrow(() -> new Exception("Invalid type: " + compatibility));
}
}

View File

@ -1,15 +1,15 @@
package eu.dnetlib.manager.wf.nodes.store;
package eu.dnetlib.wfs.nodes.store;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.domain.mdstore.MDStoreType;
import eu.dnetlib.domain.dsm.Api;
import eu.dnetlib.domain.dsm.Datasource;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.annotations.WfOutputParam;
import eu.dnetlib.manager.wf.client.MDStoreClient;
import eu.dnetlib.manager.wf.nodes.AbstractJobNode;
import eu.dnetlib.domain.mdstore.MDStoreType;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.annotations.WfOutputParam;
import eu.dnetlib.wfs.nodes.AbstractJobNode;
import eu.dnetlib.wfs.utils.clients.MDStoreClient;
@WfNode("createMdStore")
public class CreateMdStoreNode extends AbstractJobNode {
@ -36,11 +36,11 @@ public class CreateMdStoreNode extends AbstractJobNode {
private String mdId;
@Autowired
private MDStoreClient mdStore;
private MDStoreClient mdStoreClient;
@Override
protected void execute() throws Exception {
mdId = mdStore.createMDStore(format, layout, interpretation, MDStoreType.valueOf(backend), ds.getOfficialname(), ds.getId(), api.getId());
mdId = mdStoreClient.createMDStore(format, layout, interpretation, MDStoreType.valueOf(backend), ds.getOfficialname(), ds.getId(), api.getId());
}
}

View File

@ -0,0 +1,24 @@
package eu.dnetlib.wfs.nodes.store;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.nodes.AbstractJobNode;
import eu.dnetlib.wfs.utils.clients.MDStoreClient;
@WfNode("deleteMdStore")
public class DeleteMdStoreNode extends AbstractJobNode {
@WfInputParam
private String mdId;
@Autowired
private MDStoreClient mdStoreClient;
@Override
protected void execute() throws Exception {
mdStoreClient.deleteMdStore(mdId);
}
}

View File

@ -1,9 +1,9 @@
package eu.dnetlib.manager.wf.nodes.test;
package eu.dnetlib.wfs.nodes.test;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.annotations.WfOutputParam;
import eu.dnetlib.manager.wf.nodes.AbstractJobNode;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.annotations.WfOutputParam;
import eu.dnetlib.wfs.nodes.AbstractJobNode;
@WfNode("test01")
public class Test01Node extends AbstractJobNode {

View File

@ -0,0 +1,83 @@
package eu.dnetlib.wfs.nodes.transform;
import org.dom4j.Document;
import org.dom4j.DocumentHelper;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.domain.dsm.Api;
import eu.dnetlib.domain.dsm.Datasource;
import eu.dnetlib.domain.mdstore.records.MetadataRecord;
import eu.dnetlib.domain.mdstore.records.MetadataRecordImpl;
import eu.dnetlib.domain.mdstore.records.Provenance;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.stream.StreamMapperNode;
import eu.dnetlib.wfs.utils.clients.DsmClient;
@WfNode("mdbuilder")
public class MetadataBuilderNode extends StreamMapperNode<String, MetadataRecord> {
@WfInputParam
private Datasource ds;
@WfInputParam
private Api api;
@WfInputParam
private boolean inferred;
@WfInputParam
private boolean deletedbyinference;
@WfInputParam
private String inferenceprovenance;
@WfInputParam
private float trust;
@WfInputParam
private String provenanceactionclassname;
@WfInputParam
private String provenanceactionclassid;
@Autowired
private DsmClient dsmClient;
private String xpath;
private long now;
private Provenance provenance;
@Override
protected void init() throws Exception {
xpath = api.getMetadataIdentifierPath();
provenance = new Provenance();
provenance.setDatasourceId(ds.getId());
provenance.setDatasourceName(ds.getOfficialname());
provenance.setNsPrefix(ds.getNamespaceprefix());
now = System.currentTimeMillis();
}
@Override
protected MetadataRecord mapRecord(final String xml) throws Exception {
try {
final Document doc = DocumentHelper.parseText(xml);
final String origId = doc.valueOf(xpath);
// TODO populate the xml record with missing oaf fields, see:
// https://svn.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/repo-hi/xslt/openaireMdBuilder.xslt.st
final MetadataRecord md = new MetadataRecordImpl();
md.setId(MetadataRecord.generateIdentifier(origId, ds.getNamespaceprefix()));
md.setOriginalId(origId);
md.setBody(doc.asXML());
md.setEncoding("XML");
md.setDateOfCollection(now);
md.setDateOfTransformation(null);
md.setProvenance(provenance);
return md;
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,39 @@
package eu.dnetlib.wfs.nodes.transform;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.domain.mdstore.records.MetadataRecord;
import eu.dnetlib.domain.mdstore.records.MetadataRecordImpl;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.stream.StreamMapperNode;
import eu.dnetlib.wfs.utils.cleaner.Cleaner;
import eu.dnetlib.wfs.utils.cleaner.CleanerFactory;
@WfNode("clean")
public class MetadataCleanerNode extends StreamMapperNode<MetadataRecord, MetadataRecord> {
@WfInputParam
private String ruleId;
@Autowired
private CleanerFactory cleanerFactory;
private Cleaner cleaner;
@Override
protected void init() throws Exception {
cleaner = cleanerFactory.obtainCleaningRule(ruleId);
}
@Override
protected MetadataRecord mapRecord(final MetadataRecord input) throws Exception {
final MetadataRecord out = new MetadataRecordImpl();
BeanUtils.copyProperties(input, out);
out.setBody(cleaner.transform(input.getBody()));
out.setDateOfTransformation(System.currentTimeMillis());
return out;
}
}

View File

@ -0,0 +1,31 @@
package eu.dnetlib.wfs.nodes.transform;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.DocumentHelper;
import eu.dnetlib.domain.mdstore.records.MetadataRecord;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.stream.StreamFilterNode;
@WfNode("xpath_filter")
public class MetadataFilterNode extends StreamFilterNode<MetadataRecord> {
@WfInputParam
private String xpath;
@Override
protected void init() throws Exception {}
@Override
protected boolean filterRecord(final MetadataRecord input) throws Exception {
try {
final Document doc = DocumentHelper.parseText(input.getBody());
return !doc.selectNodes(xpath).isEmpty();
} catch (final DocumentException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,48 @@
package eu.dnetlib.wfs.nodes.transform;
import java.util.HashMap;
import java.util.Map;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.domain.mdstore.records.MetadataRecord;
import eu.dnetlib.domain.mdstore.records.MetadataRecordImpl;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.stream.StreamMapperNode;
import eu.dnetlib.wfs.utils.xslt.RecordTransformer;
import eu.dnetlib.wfs.utils.xslt.XsltTransformFactory;
@WfNode("applyXslt")
public class MetadataXsltTransformNode extends StreamMapperNode<MetadataRecord, MetadataRecord> {
@WfInputParam
private String ruleId;
@Autowired
private XsltTransformFactory xsltTransformFactory;
private RecordTransformer<String, String> xslt;
private long now;
@Override
protected void init() throws Exception {
now = System.currentTimeMillis();
final Map<String, String> params = new HashMap<>();
// TODO: which params ?
xslt = xsltTransformFactory.getTransformer(ruleId, params);
}
@Override
protected MetadataRecord mapRecord(final MetadataRecord input) throws Exception {
final MetadataRecord output = new MetadataRecordImpl();
BeanUtils.copyProperties(input, output);
output.setBody(xslt.transform(input.getBody()));
output.setDateOfTransformation(now);
return output;
}
}

View File

@ -1,4 +1,4 @@
package eu.dnetlib.manager.wf.workflows.procs;
package eu.dnetlib.wfs.procs;
import java.util.HashMap;
import java.util.Map;

View File

@ -1,4 +1,4 @@
package eu.dnetlib.manager.wf.workflows.procs;
package eu.dnetlib.wfs.procs;
import java.io.Serializable;

View File

@ -1,4 +1,4 @@
package eu.dnetlib.manager.wf.workflows.procs;
package eu.dnetlib.wfs.procs;
/**
* Created by michele on 24/11/15.

View File

@ -1,4 +1,4 @@
package eu.dnetlib.manager.wf.workflows.procs;
package eu.dnetlib.wfs.procs;
import java.time.LocalDateTime;
import java.util.ArrayList;
@ -17,13 +17,13 @@ import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import eu.dnetlib.errors.WorkflowManagerException;
import eu.dnetlib.manager.wf.nodes.DefaultJobNode;
import eu.dnetlib.manager.wf.nodes.ProcessNode;
import eu.dnetlib.manager.wf.nodes.SuccessNode;
import eu.dnetlib.manager.wf.notification.EmailSender;
import eu.dnetlib.manager.wf.service.WorkflowLogger;
import eu.dnetlib.manager.wf.workflows.graph.GraphNode;
import eu.dnetlib.manager.wf.workflows.util.NodeCallback;
import eu.dnetlib.wfs.graph.GraphNode;
import eu.dnetlib.wfs.nodes.DefaultJobNode;
import eu.dnetlib.wfs.nodes.ProcessNode;
import eu.dnetlib.wfs.nodes.SuccessNode;
import eu.dnetlib.wfs.utils.EmailSender;
import eu.dnetlib.wfs.utils.NodeCallback;
import eu.dnetlib.wfs.utils.WorkflowLogger;
@Service
public class ProcessEngine {
@ -142,23 +142,18 @@ public class ProcessEngine {
}
private ProcessNode newProcessNode(final GraphNode graphNode, final WorkflowProcess process) throws WorkflowManagerException {
if (graphNode.isSucessNode()) {
return new SuccessNode();
} else if (StringUtils.isBlank(graphNode.getType())) {
return new DefaultJobNode(graphNode.getName());
} else {
final ProcessNode pnode = (ProcessNode) applicationContext.getBean(graphNode.getType());
if (pnode != null) {
pnode.setNodeName(graphNode.getName());
if (pnode instanceof ProcessAware) {
((ProcessAware) pnode).setProcess(process);
}
return pnode;
} else {
log.error("cannot find bean of type " + graphNode.getType());
throw new WorkflowManagerException("cannot find bean of type " + graphNode.getType());
}
if (graphNode.isSucessNode()) { return new SuccessNode(); }
if (StringUtils.isBlank(graphNode.getType())) { return new DefaultJobNode(graphNode.getName()); }
final ProcessNode pnode = (ProcessNode) applicationContext.getBean(graphNode.getType());
if (pnode == null) {
log.error("cannot find bean of type " + graphNode.getType());
throw new WorkflowManagerException("cannot find bean of type " + graphNode.getType());
}
pnode.setNodeName(graphNode.getName());
if (pnode instanceof ProcessAware) {
((ProcessAware) pnode).setProcess(process);
}
return pnode;
}

View File

@ -1,4 +1,4 @@
package eu.dnetlib.manager.wf.workflows.procs;
package eu.dnetlib.wfs.procs;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
@ -14,9 +14,9 @@ import eu.dnetlib.domain.resource.SimpleResource;
import eu.dnetlib.domain.wfs.WorkflowConfiguration;
import eu.dnetlib.domain.wfs.WorkflowTemplate;
import eu.dnetlib.errors.WorkflowManagerException;
import eu.dnetlib.manager.wf.workflows.graph.Graph;
import eu.dnetlib.manager.wf.workflows.graph.GraphLoader;
import eu.dnetlib.manager.wf.workflows.util.ProcessCallback;
import eu.dnetlib.wfs.graph.Graph;
import eu.dnetlib.wfs.graph.GraphLoader;
import eu.dnetlib.wfs.utils.ProcessCallback;
@Component
public class ProcessFactory {
@ -31,9 +31,9 @@ public class ProcessFactory {
private GraphLoader graphLoader;
public WorkflowProcess newProcess(final SimpleResource wfMetadata,
final WorkflowTemplate wfTemplate,
final WorkflowConfiguration conf,
final ProcessCallback callback) throws WorkflowManagerException {
final WorkflowTemplate wfTemplate,
final WorkflowConfiguration conf,
final ProcessCallback callback) throws WorkflowManagerException {
final Map<String, String> globalParams = new HashMap<>();
globalParams.putAll(conf.getSystemParams());

View File

@ -1,9 +1,10 @@
package eu.dnetlib.manager.wf.workflows.procs;
package eu.dnetlib.wfs.procs;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.commons.logging.Log;
@ -13,7 +14,7 @@ import org.springframework.stereotype.Service;
import eu.dnetlib.domain.wfs.WorkflowConfiguration;
import eu.dnetlib.errors.WorkflowManagerException;
import eu.dnetlib.manager.wf.workflows.util.WorkflowsConstants;
import eu.dnetlib.wfs.utils.WorkflowsConstants;
@Service
public class ProcessRegistry {
@ -29,7 +30,7 @@ public class ProcessRegistry {
synchronized public int countRunningWfs() {
int count = 0;
for (final Map.Entry<String, WorkflowProcess> e : this.procs.entrySet()) {
for (final Map.Entry<String, WorkflowProcess> e : procs.entrySet()) {
final WorkflowProcess proc = e.getValue();
if (!proc.isTerminated()) {
count++;
@ -39,36 +40,36 @@ public class ProcessRegistry {
}
public WorkflowProcess findProcess(final String procId) {
return this.procs.get(procId);
return procs.get(procId);
}
public Collection<WorkflowProcess> listProcesses() {
return this.procs.values();
return procs.values();
}
public WorkflowProcess findProcsByConfigurationId(final String id) {
return this.byConfId.get(id);
return byConfId.get(id);
}
public void registerProcess(final WorkflowProcess process, final WorkflowConfiguration conf) throws WorkflowManagerException {
if (this.procs.containsValue(process) || this.procs.containsKey(process.getId())) {
if (procs.containsValue(process) || procs.containsKey(process.getId())) {
log.error("Already registerd process: " + process);
throw new WorkflowManagerException("Already registerd process: " + process);
}
if (this.procs.size() >= this.maxSize) {
if (procs.size() >= maxSize) {
removeOldestProcess();
}
this.procs.put(process.getId(), process);
this.byConfId.put(conf.getId(), process);
procs.put(process.getId(), process);
byConfId.put(conf.getId(), process);
synchronized (this.pendingProcs) {
if (this.pendingProcs.size() > WorkflowsConstants.MAX_PENDING_PROCS_SIZE) {
synchronized (pendingProcs) {
if (pendingProcs.size() > WorkflowsConstants.MAX_PENDING_PROCS_SIZE) {
log.warn("Wf [" + process.getName() + "] not launched, Max number of pending procs reached: " + WorkflowsConstants.MAX_PENDING_PROCS_SIZE);
throw new WorkflowManagerException("Max number of pending procs reached: " + WorkflowsConstants.MAX_PENDING_PROCS_SIZE);
}
this.pendingProcs.put(process);
pendingProcs.put(process);
log.info("WorkflowProcess [" + process + "] in queue, priority=" + process.getPriority());
}
@ -78,7 +79,7 @@ public class ProcessRegistry {
LocalDateTime oldDate = LocalDateTime.now();
String oldId = null;
for (final Map.Entry<String, WorkflowProcess> e : this.procs.entrySet()) {
for (final Map.Entry<String, WorkflowProcess> e : procs.entrySet()) {
final WorkflowProcess proc = e.getValue();
if (proc.isTerminated()) {
@ -98,21 +99,21 @@ public class ProcessRegistry {
public void unregisterProcess(final String procId) {
synchronized (this) {
final WorkflowProcess process = this.procs.remove(procId);
final WorkflowProcess process = procs.remove(procId);
if (process != null) {
this.byConfId.entrySet()
.stream()
.filter(e -> e.getValue().getId().equals(process.getId()))
.map(e -> e.getKey())
.forEach(confId -> this.byConfId.remove(confId, process));
byConfId.entrySet()
.stream()
.filter(e -> e.getValue().getId().equals(process.getId()))
.map(Entry::getKey)
.forEach(confId -> byConfId.remove(confId, process));
}
}
}
public WorkflowProcess nextProcessToStart() {
synchronized (this.pendingProcs) {
return this.pendingProcs.poll();
synchronized (pendingProcs) {
return pendingProcs.poll();
}
}

View File

@ -1,4 +1,4 @@
package eu.dnetlib.manager.wf.workflows.procs;
package eu.dnetlib.wfs.procs;
import java.time.LocalDateTime;
import java.util.UUID;

View File

@ -1,4 +1,4 @@
package eu.dnetlib.manager.wf.workflows.procs;
package eu.dnetlib.wfs.procs;
import java.time.LocalDateTime;
import java.util.HashMap;
@ -12,9 +12,9 @@ import org.apache.commons.lang3.math.NumberUtils;
import eu.dnetlib.domain.resource.SimpleResource;
import eu.dnetlib.domain.wfs.WfHistoryEntry;
import eu.dnetlib.domain.wfs.WorkflowConfiguration;
import eu.dnetlib.manager.wf.workflows.graph.Graph;
import eu.dnetlib.manager.wf.workflows.util.ProcessCallback;
import eu.dnetlib.manager.wf.workflows.util.WorkflowsConstants;
import eu.dnetlib.wfs.graph.Graph;
import eu.dnetlib.wfs.utils.ProcessCallback;
import eu.dnetlib.wfs.utils.WorkflowsConstants;
/**
* Created by michele on 19/11/15.
@ -22,17 +22,11 @@ import eu.dnetlib.manager.wf.workflows.util.WorkflowsConstants;
public class WorkflowProcess implements Comparable<WorkflowProcess> {
public enum Status {
created,
running,
success,
failure,
killed;
created, running, success, failure, killed;
}
public enum StartMode {
AUTO,
MANUAL,
DISABLED
AUTO, MANUAL, DISABLED
}
private final String id;
@ -53,21 +47,21 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
private final Map<String, String> outputParams = new HashMap<>();
public WorkflowProcess(
final String id,
final SimpleResource wfMetadata,
final WorkflowConfiguration wfConf,
final Graph graph,
final Map<String, String> globalParams,
final ProcessCallback callback) {
final String id,
final SimpleResource wfMetadata,
final WorkflowConfiguration wfConf,
final Graph graph,
final Map<String, String> globalParams,
final ProcessCallback callback) {
this.id = id;
this.wfMetadata = wfMetadata;
this.wfConf = wfConf;
this.graph = graph;
this.callback = callback;
this.status = Status.created;
this.env = new Env();
status = Status.created;
env = new Env();
this.globalParams = globalParams;
this.lastActivityDate = LocalDateTime.now();
lastActivityDate = LocalDateTime.now();
}
public String getId() {
@ -139,14 +133,10 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
}
public boolean isTerminated() {
switch (status) {
case success:
case failure:
case killed:
return true;
default:
return false;
}
return switch (status) {
case success, failure, killed -> true;
default -> false;
};
}
public LocalDateTime getLastActivityDate() {
@ -231,7 +221,7 @@ public class WorkflowProcess implements Comparable<WorkflowProcess> {
if (token.isFailed()) {
callback.onFail(this);
} else {
callback.onSuccess(this);;
callback.onSuccess(this);
}
}

View File

@ -1,4 +1,4 @@
package eu.dnetlib.manager.wf.repository;
package eu.dnetlib.wfs.repository;
import java.time.LocalDateTime;
import java.util.List;

View File

@ -0,0 +1,23 @@
package eu.dnetlib.wfs.service;
import java.time.LocalDateTime;
import java.util.stream.Stream;
import org.springframework.stereotype.Service;
import eu.dnetlib.domain.dsm.Api;
@Service
public class CollectorService {
public Stream<String> collect(final Api api) {
// TODO Auto-generated method stub
return null;
}
public Stream<String> collect(final Api api, final LocalDateTime from, final LocalDateTime until) {
// TODO Auto-generated method stub
return null;
}
}

View File

@ -0,0 +1,121 @@
package eu.dnetlib.wfs.service;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import eu.dnetlib.domain.resource.SimpleResource;
import eu.dnetlib.domain.wfs.WorkflowConfiguration;
import eu.dnetlib.domain.wfs.WorkflowTemplate;
import eu.dnetlib.errors.WorkflowManagerException;
import eu.dnetlib.utils.Stoppable;
import eu.dnetlib.utils.StoppableDetails;
import eu.dnetlib.wfs.procs.ExecutionStatus;
import eu.dnetlib.wfs.procs.ProcessEngine;
import eu.dnetlib.wfs.procs.ProcessFactory;
import eu.dnetlib.wfs.procs.ProcessRegistry;
import eu.dnetlib.wfs.procs.WorkflowProcess;
import eu.dnetlib.wfs.utils.ProcessCallback;
import eu.dnetlib.wfs.utils.WorkflowsConstants;
import eu.dnetlib.wfs.utils.clients.SimpleResourceClient;
@Service
public class WfExecutorService implements Stoppable {
private static final Log log = LogFactory.getLog(WfExecutorService.class);
@Autowired
private ProcessRegistry processRegistry;
@Autowired
private ProcessFactory processFactory;
@Autowired
private ProcessEngine processEngine;
@Autowired
private SimpleResourceClient simpleResourceClient;
private boolean paused = false;
@PostConstruct
public void init() {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
if (isPaused() || (processRegistry.countRunningWfs() >= WorkflowsConstants.MAX_RUNNING_PROCS_SIZE)) { return; }
final WorkflowProcess process = processRegistry.nextProcessToStart();
if (process != null) {
processEngine.startProcess(process);
} else {
log.debug("WorkflowProcess queue is empty");
}
}, 10, 10, TimeUnit.SECONDS);
}
public ExecutionStatus startWorkflow(final String wfId,
final WorkflowConfiguration conf,
final ProcessCallback callback)
throws WorkflowManagerException {
if (!conf.isEnabled() || !conf.isConfigured()) {
log.warn("Wf configuration " + conf.getId() + " is not ready to start");
throw new WorkflowManagerException("Wf configuration " + conf.getId() + " is not ready to start");
}
final SimpleResource wfMetadata = simpleResourceClient.findResource(wfId);
if (!"wf_template".equals(wfMetadata.getType())) { throw new WorkflowManagerException("WF not found: " + conf.getWorkflow()); }
final WorkflowTemplate wfTmpl = simpleResourceClient.findResourceContent(wfId, WorkflowTemplate.class);
final WorkflowProcess process =
processFactory.newProcess(wfMetadata, wfTmpl, conf, callback);
processRegistry.registerProcess(process, conf);
return process.getExecutionStatus();
}
@Override
public void stop() {
paused = true;
}
@Override
public void resume() {
paused = false;
}
@Override
public StoppableDetails getStopDetails() {
final int count = processRegistry.countRunningWfs();
final StoppableDetails.StopStatus status =
isPaused() ? count == 0 ? StoppableDetails.StopStatus.STOPPED : StoppableDetails.StopStatus.STOPPING : StoppableDetails.StopStatus.RUNNING;
return new StoppableDetails("D-NET workflow manager", "Running workflows: " + count, status);
}
public ProcessRegistry getProcessRegistry() {
return processRegistry;
}
public boolean isPaused() {
return paused;
}
public void setPaused(final boolean paused) {
this.paused = paused;
}
public ExecutionStatus findProcess(final String procId) {
return processRegistry.findProcess(procId).getExecutionStatus();
}
public void killProcess(final String procId) {
processRegistry.findProcess(procId).kill();
}
}

View File

@ -0,0 +1,61 @@
package eu.dnetlib.wfs.stream;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.dnetlib.wfs.utils.SimpleCallable;
public class DnetStream<T> {
private static final Log log = LogFactory.getLog(DnetStream.class);
private final Stream<T> stream;
private final SimpleCallable before;
private final SimpleCallable after;
private final SimpleCallable abort;
public DnetStream(final Stream<T> stream, final SimpleCallable before, final SimpleCallable after, final SimpleCallable abort) {
this.stream = stream;
this.before = before;
this.after = after;
this.abort = abort;
}
public DnetStream<T> filter(final Predicate<T> predicate) {
return new DnetStream<>(stream.filter(predicate), before, after, abort);
}
public <K> DnetStream<K> map(final Function<T, K> mapper) {
return new DnetStream<>(stream.map(mapper), before, after, abort);
}
public void consume(final Consumer<T> action) {
try {
if (before != null) {
before.call();
}
stream.forEach(action);
if (after != null) {
after.call();
}
} catch (final Throwable e) {
log.error("Error reading stream: " + e);
try {
if (abort != null) {
abort.call();
}
} catch (final Exception e1) {
log.error("Error aborting stream reading: " + e1);
}
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,30 @@
package eu.dnetlib.wfs.stream;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.nodes.AbstractJobNode;
public abstract class StreamConsumerNode<T> extends AbstractJobNode {
@WfInputParam
private DnetStream<T> inputStream;
abstract protected void init() throws Exception;
abstract protected void consumeRecord(T input) throws Exception;
abstract protected void complete() throws Exception;
@Override
protected final void execute() throws Exception {
init();
inputStream.consume(x -> {
try {
consumeRecord(x);
} catch (final Exception e) {
throw new RuntimeException(e);
}
});
complete();
}
}

View File

@ -0,0 +1,31 @@
package eu.dnetlib.wfs.stream;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfOutputParam;
import eu.dnetlib.wfs.nodes.AbstractJobNode;
public abstract class StreamFilterNode<T> extends AbstractJobNode {
@WfInputParam
private DnetStream<T> inputStream;
@WfOutputParam
private DnetStream<T> outputStream;
abstract protected void init() throws Exception;
abstract protected boolean filterRecord(T obj) throws Exception;
@Override
protected final void execute() throws Exception {
init();
outputStream = inputStream.filter(x -> {
try {
return filterRecord(x);
} catch (final Exception e) {
throw new RuntimeException(e);
}
});
}
}

View File

@ -0,0 +1,31 @@
package eu.dnetlib.wfs.stream;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfOutputParam;
import eu.dnetlib.wfs.nodes.AbstractJobNode;
public abstract class StreamMapperNode<T, K> extends AbstractJobNode {
@WfInputParam
private DnetStream<T> inputStream;
@WfOutputParam
private DnetStream<K> outputStream;
abstract protected void init() throws Exception;
abstract protected K mapRecord(T input) throws Exception;
@Override
protected void execute() throws Exception {
init();
outputStream = inputStream.map(x -> {
try {
return mapRecord(x);
} catch (final Exception e) {
throw new RuntimeException(e);
}
});
}
}

View File

@ -0,0 +1,26 @@
package eu.dnetlib.wfs.stream;
import java.util.stream.Stream;
import eu.dnetlib.wfs.annotations.WfOutputParam;
import eu.dnetlib.wfs.nodes.AbstractJobNode;
public abstract class StreamSupplierNode<T> extends AbstractJobNode {
@WfOutputParam
private DnetStream<T> outputStream;
abstract protected Stream<T> prepareStream() throws Exception;
abstract protected void before() throws Exception;
abstract protected void after() throws Exception;
abstract protected void abort() throws Exception;
@Override
protected void execute() throws Exception {
outputStream = new DnetStream<>(prepareStream(), this::before, this::after, this::abort);
}
}

View File

@ -1,4 +1,4 @@
package eu.dnetlib.manager.wf.notification;
package eu.dnetlib.wfs.utils;
import java.util.HashMap;
import java.util.Map;
@ -9,10 +9,10 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import eu.dnetlib.domain.wfs.NotificationCondition;
import eu.dnetlib.manager.wf.client.EmailClient;
import eu.dnetlib.manager.wf.repository.WorkflowSubscriptionRepository;
import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess;
import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess.Status;
import eu.dnetlib.wfs.procs.WorkflowProcess;
import eu.dnetlib.wfs.procs.WorkflowProcess.Status;
import eu.dnetlib.wfs.repository.WorkflowSubscriptionRepository;
import eu.dnetlib.wfs.utils.clients.EmailSenderClient;
@Service
public class EmailSender {
@ -23,14 +23,14 @@ public class EmailSender {
private WorkflowSubscriptionRepository wfSubscriptionRepository;
@Autowired
private EmailClient emailClient;
private EmailSenderClient emailClient;
public void sendMails(final WorkflowProcess proc) {
wfSubscriptionRepository.findByWfConfigurationId(proc.getWfConfId()).forEach(s -> {
if (s.getCondition() == NotificationCondition.ALWAYS ||
s.getCondition() == NotificationCondition.ONLY_FAILED && proc.getStatus() == Status.failure ||
s.getCondition() == NotificationCondition.ONLY_SUCCESS && proc.getStatus() == Status.success) {
if ((s.getCondition() == NotificationCondition.ALWAYS) ||
((s.getCondition() == NotificationCondition.ONLY_FAILED) && (proc.getStatus() == Status.failure)) ||
((s.getCondition() == NotificationCondition.ONLY_SUCCESS) && (proc.getStatus() == Status.success))) {
try {
final Map<String, Object> params = new HashMap<>();

View File

@ -0,0 +1,11 @@
package eu.dnetlib.wfs.utils;
import eu.dnetlib.wfs.procs.Token;
public interface NodeCallback {
void onComplete(Token t);
void onFail(Token t);
}

View File

@ -1,6 +1,6 @@
package eu.dnetlib.manager.wf.workflows.util;
package eu.dnetlib.wfs.utils;
import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess;
import eu.dnetlib.wfs.procs.WorkflowProcess;
public interface ProcessCallback {

View File

@ -1,4 +1,4 @@
package eu.dnetlib.manager.wf.workflows.util;
package eu.dnetlib.wfs.utils;
@FunctionalInterface
public interface SimpleCallable {

View File

@ -1,4 +1,4 @@
package eu.dnetlib.manager.wf.workflows.util;
package eu.dnetlib.wfs.utils;
import java.util.ArrayList;
import java.util.List;
@ -9,7 +9,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.dnetlib.errors.WorkflowManagerException;
import eu.dnetlib.manager.wf.workflows.util.ValidNodeValuesFetcher.DnetParamValue;
import eu.dnetlib.wfs.utils.ValidNodeValuesFetcher.DnetParamValue;
public abstract class ValidNodeValuesFetcher implements Function<Map<String, String>, List<DnetParamValue>> {
@ -28,11 +28,11 @@ public abstract class ValidNodeValuesFetcher implements Function<Map<String, Str
}
public String getId() {
return this.id;
return id;
}
public String getName() {
return this.name;
return name;
}
@Override
@ -54,7 +54,7 @@ public abstract class ValidNodeValuesFetcher implements Function<Map<String, Str
abstract protected List<DnetParamValue> obtainValues(Map<String, String> params) throws Exception;
public String getName() {
return this.name;
return name;
}
public void setName(final String name) {

View File

@ -1,4 +1,4 @@
package eu.dnetlib.manager.wf.service;
package eu.dnetlib.wfs.utils;
import java.time.Instant;
import java.time.LocalDateTime;
@ -12,7 +12,7 @@ import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Service;
import eu.dnetlib.domain.wfs.WfHistoryEntry;
import eu.dnetlib.manager.wf.repository.WfHistoryEntryRepository;
import eu.dnetlib.wfs.repository.WfHistoryEntryRepository;
@Service
public class WorkflowLogger {
@ -21,19 +21,15 @@ public class WorkflowLogger {
private WfHistoryEntryRepository wfHistoryEntryRepository;
public List<WfHistoryEntry> history(final int total, final Long from, final Long to) {
if (from == null && to == null) {
return wfHistoryEntryRepository.findAll(PageRequest.of(0, total, Sort.by("endDate").descending())).toList();
} else {
final LocalDateTime fromTime = from != null ? LocalDateTime.ofInstant(Instant.ofEpochMilli(from), TimeZone
if ((from == null) && (to == null)) { return wfHistoryEntryRepository.findAll(PageRequest.of(0, total, Sort.by("endDate").descending())).toList(); }
final LocalDateTime fromTime = from != null ? LocalDateTime.ofInstant(Instant.ofEpochMilli(from), TimeZone
.getDefault()
.toZoneId()) : LocalDateTime.MIN;
final LocalDateTime toTime = to != null ? LocalDateTime.ofInstant(Instant.ofEpochMilli(to), TimeZone
final LocalDateTime toTime = to != null ? LocalDateTime.ofInstant(Instant.ofEpochMilli(to), TimeZone
.getDefault()
.toZoneId()) : LocalDateTime.MAX;
return wfHistoryEntryRepository.findByEndDateBetweenOrderByEndDateDesc(fromTime, toTime);
}
return wfHistoryEntryRepository.findByEndDateBetweenOrderByEndDateDesc(fromTime, toTime);
}
public List<WfHistoryEntry> history(final String wfConfId) {

View File

@ -1,4 +1,4 @@
package eu.dnetlib.manager.wf.workflows.util;
package eu.dnetlib.wfs.utils;
public class WorkflowsConstants {

View File

@ -1,4 +1,4 @@
package eu.dnetlib.service.transform.cleaner;
package eu.dnetlib.wfs.utils.cleaner;
import java.io.StringReader;
import java.util.ArrayList;
@ -13,8 +13,7 @@ import org.dom4j.Namespace;
import org.dom4j.QName;
import org.dom4j.io.SAXReader;
import eu.dnetlib.service.transform.common.RecordTransformer;
import eu.dnetlib.service.transform.common.VocabularyRule;
import eu.dnetlib.wfs.utils.xslt.RecordTransformer;
public class Cleaner implements RecordTransformer<String, String> {

View File

@ -1,4 +1,4 @@
package eu.dnetlib.service.transform.cleaner;
package eu.dnetlib.wfs.utils.cleaner;
import java.io.StringReader;
@ -10,9 +10,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import eu.dnetlib.errors.CleanerException;
import eu.dnetlib.service.transform.client.SimpleResourceClient;
import eu.dnetlib.service.transform.common.VocabularyRule;
import eu.dnetlib.service.transform.vocabulary.VocabularyService;
import eu.dnetlib.wfs.utils.clients.SimpleResourceClient;
import eu.dnetlib.wfs.utils.clients.VocabularyClient;
@Service
public class CleanerFactory {
@ -21,11 +20,11 @@ public class CleanerFactory {
private SimpleResourceClient simpleResourceClient;
@Autowired
private VocabularyService vocabularyService;
private VocabularyClient vocabularyClient;
public Cleaner obtainCleaningRule(final String ruleId) throws CleanerException {
final String prof = simpleResourceClient.getResourceContent(ruleId, "cleaning_rule");
final String prof = simpleResourceClient.findResourceContent(ruleId, String.class);
try {
final SAXReader reader = new SAXReader();
@ -38,7 +37,7 @@ public class CleanerFactory {
final String voc = node.valueOf("@vocabulary");
final boolean strict = BooleanUtils.toBoolean(node.valueOf("@strict"));
final VocabularyRule rule = new VocabularyRule(xpath, strict, voc, vocabularyService.listTerms(voc));
final VocabularyRule rule = new VocabularyRule(xpath, strict, voc, vocabularyClient.listTerms(voc));
cleaner.getRules().add(rule);
}

View File

@ -1,4 +1,4 @@
package eu.dnetlib.service.transform.common;
package eu.dnetlib.wfs.utils.cleaner;
import java.util.ArrayList;
import java.util.HashMap;

View File

@ -0,0 +1,46 @@
package eu.dnetlib.wfs.utils.clients;
import org.springframework.stereotype.Service;
import eu.dnetlib.domain.dsm.Api;
import eu.dnetlib.domain.dsm.Datasource;
@Service
public class DsmClient {
public Api updateUpdateApiCollectionInfo(final String apiId, final String mdId, final long size) {
// TODO Auto-generated method stub
return null;
}
public Api updateUpdateApiAggregationInfo(final String apiId, final String mdId, final long size) {
// TODO Auto-generated method stub
return null;
}
public Api updateUpdateApiDownloadInfo(final String apiId, final String objId, final long size) {
// TODO Auto-generated method stub
return null;
}
public Datasource findDs(final String dsId) {
// TODO Auto-generated method stub
return null;
}
public Api findApi(final String apiId) {
// TODO Auto-generated method stub
return null;
}
public void setManaged(final Datasource ds, final boolean b) {
// TODO Auto-generated method stub
}
public void setApiActive(final Api api, final boolean b) {
// TODO Auto-generated method stub
}
}

View File

@ -0,0 +1,15 @@
package eu.dnetlib.wfs.utils.clients;
import java.util.Map;
import org.springframework.stereotype.Service;
@Service
public class EmailSenderClient {
public void sendStoredMail(final String email, final String messageId, final Map<String, Object> params) {
// TODO Auto-generated method stub
}
}

Some files were not shown because too many files have changed in this diff Show More