diff --git a/apps/dnet-is-application/src/main/java/eu/dnetlib/MainDBConfig.java b/apps/dnet-is-application/src/main/java/eu/dnetlib/MainDBConfig.java index 0a5cb752..5978f4ac 100644 --- a/apps/dnet-is-application/src/main/java/eu/dnetlib/MainDBConfig.java +++ b/apps/dnet-is-application/src/main/java/eu/dnetlib/MainDBConfig.java @@ -20,7 +20,7 @@ import org.springframework.transaction.annotation.EnableTransactionManagement; @Configuration @EnableTransactionManagement @EnableJpaRepositories(entityManagerFactoryRef = "mainEntityManagerFactory", transactionManagerRef = "mainTransactionManager", basePackages = { - "eu.dnetlib.is", "eu.dnetlib.data.mdstore", "eu.dnetlib.msro" + "eu.dnetlib.is", "eu.dnetlib.data.mdstore", "eu.dnetlib.manager" }) public class MainDBConfig { @@ -38,7 +38,7 @@ public class MainDBConfig { @Qualifier("mainDataSource") final DataSource ds) { return builder .dataSource(ds) - .packages("eu.dnetlib.is.model", "eu.dnetlib.msro.model", "eu.dnetlib.data.mdstore.model") + .packages("eu.dnetlib.is.model", "eu.dnetlib.manager.wf.model", "eu.dnetlib.data.mdstore.model") .persistenceUnit("is") .build(); } diff --git a/apps/dnet-is-application/src/main/java/eu/dnetlib/is/importer/OldProfilesImporter.java b/apps/dnet-is-application/src/main/java/eu/dnetlib/is/importer/OldProfilesImporter.java index 58a395d8..e37519b2 100644 --- a/apps/dnet-is-application/src/main/java/eu/dnetlib/is/importer/OldProfilesImporter.java +++ b/apps/dnet-is-application/src/main/java/eu/dnetlib/is/importer/OldProfilesImporter.java @@ -1,6 +1,6 @@ package eu.dnetlib.is.importer; -import java.util.Date; +import java.time.LocalDateTime; import javax.transaction.Transactional; @@ -47,7 +47,7 @@ public class OldProfilesImporter { final Document doc = DocumentHelper.parseText(xml); final String id = StringUtils.substringBefore(doc.valueOf("//RESOURCE_IDENTIFIER/@value"), "_"); - final Date now = new Date(); + final LocalDateTime now = LocalDateTime.now(); final SimpleResource res = new SimpleResource(); res.setId(id); diff --git a/apps/dnet-is-application/src/main/java/eu/dnetlib/is/importer/WfHistoryImporter.java b/apps/dnet-is-application/src/main/java/eu/dnetlib/is/importer/WfHistoryImporter.java index 95d1d6c2..a1eaa4b0 100644 --- a/apps/dnet-is-application/src/main/java/eu/dnetlib/is/importer/WfHistoryImporter.java +++ b/apps/dnet-is-application/src/main/java/eu/dnetlib/is/importer/WfHistoryImporter.java @@ -1,11 +1,13 @@ package eu.dnetlib.is.importer; import java.io.File; -import java.util.Date; +import java.time.Instant; +import java.time.LocalDateTime; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.Map.Entry; +import java.util.TimeZone; import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.math.NumberUtils; @@ -17,8 +19,8 @@ import org.springframework.stereotype.Service; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.msro.history.repository.WfProcessExecutionRepository; -import eu.dnetlib.msro.model.history.WfProcessExecution; +import eu.dnetlib.manager.wf.model.WfProcessExecution; +import eu.dnetlib.manager.wf.repository.WfProcessExecutionRepository; @Service public class WfHistoryImporter { @@ -53,8 +55,10 @@ public class WfHistoryImporter { wf.setDsApi(node.get("dataprovider:interface").asText()); } - wf.setStartDate(new Date(NumberUtils.toLong(node.get("system:startDate").asText()))); - wf.setEndDate(new Date(NumberUtils.toLong(node.get("system:endDate").asText()))); + wf.setStartDate(LocalDateTime + .ofInstant(Instant.ofEpochMilli(NumberUtils.toLong(node.get("system:startDate").asText())), TimeZone.getDefault().toZoneId())); + + wf.setEndDate(LocalDateTime.ofInstant(Instant.ofEpochMilli(NumberUtils.toLong(node.get("system:endDate").asText())), TimeZone.getDefault().toZoneId())); if (BooleanUtils.toBoolean(node.get("system:isCompletedSuccessfully").asText())) { wf.setStatus("success"); diff --git a/apps/dnet-is-application/src/main/java/eu/dnetlib/manager/wf/WfHistoryAjaxController.java b/apps/dnet-is-application/src/main/java/eu/dnetlib/manager/wf/WfHistoryAjaxController.java new file mode 100644 index 00000000..cccf4db3 --- /dev/null +++ b/apps/dnet-is-application/src/main/java/eu/dnetlib/manager/wf/WfHistoryAjaxController.java @@ -0,0 +1,34 @@ +package eu.dnetlib.manager.wf; + +import java.util.List; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import eu.dnetlib.manager.wf.model.WfProcessExecution; + +@RestController +@RequestMapping("/ajax/wfs") +public class WfHistoryAjaxController { + + @Autowired + private WorkflowLogger logger; + + @GetMapping("/") + public List history( + @RequestParam(required = true) final int total, + @RequestParam(required = false) final Long from, + @RequestParam(required = false) final Long to) { + return logger.history(total, from, to); + } + + @GetMapping("/{processId}") + public WfProcessExecution getProcessExecution(@PathVariable final String processId) { + return logger.getProcessExecution(processId); + } + +} diff --git a/apps/dnet-is-application/src/main/java/eu/dnetlib/msro/history/WfHistoryAjaxController.java b/apps/dnet-is-application/src/main/java/eu/dnetlib/msro/history/WfHistoryAjaxController.java deleted file mode 100644 index a6ccc190..00000000 --- a/apps/dnet-is-application/src/main/java/eu/dnetlib/msro/history/WfHistoryAjaxController.java +++ /dev/null @@ -1,49 +0,0 @@ -package eu.dnetlib.msro.history; - -import java.util.Date; -import java.util.List; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.domain.PageRequest; -import org.springframework.data.domain.Sort; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.RestController; - -import eu.dnetlib.msro.history.repository.WfProcessExecutionRepository; -import eu.dnetlib.msro.model.history.WfProcessExecution; - -@RestController -@RequestMapping("/ajax/wfs") -public class WfHistoryAjaxController { - - @Autowired - private WfProcessExecutionRepository wfProcessExecutionRepository; - - @Deprecated - public static final int MAX_NUMBER_OF_RECENT_WFS = 100; - - @GetMapping("/") - public List history( - @RequestParam(required = true) final int total, - @RequestParam(required = false) final Long from, - @RequestParam(required = false) final Long to) { - if (from == null && to == null) { - return wfProcessExecutionRepository.findAll(PageRequest.of(0, total, Sort.by("endDate").descending())).toList(); - } else if (from == null) { - return wfProcessExecutionRepository.findByEndDateBetweenOrderByEndDateDesc(new Date(0), new Date(to)); - } else if (to == null) { - return wfProcessExecutionRepository.findByEndDateBetweenOrderByEndDateDesc(new Date(from), new Date()); - } else { - return wfProcessExecutionRepository.findByEndDateBetweenOrderByEndDateDesc(new Date(from), new Date(to)); - } - } - - @GetMapping("/{processId}") - public WfProcessExecution getProcess(@PathVariable final String processId) { - return wfProcessExecutionRepository.findById(processId).get(); - } - -} diff --git a/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/MDStoreService.java b/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/MDStoreService.java index db7f48b1..41adb299 100644 --- a/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/MDStoreService.java +++ b/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/MDStoreService.java @@ -1,7 +1,8 @@ package eu.dnetlib.data.mdstore; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.Comparator; -import java.util.Date; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -112,9 +113,9 @@ public class MDStoreService { private MDStoreVersion newMDStoreVersion(final MDStore md, final boolean writing) { final MDStoreVersion v = new MDStoreVersion(); - final Date now = new Date(); + final LocalDateTime now = LocalDateTime.now(); - final String versionId = md.getId() + "-" + now.getTime(); + final String versionId = md.getId() + "-" + now.toEpochSecond(ZoneOffset.UTC); v.setId(versionId); v.setMdstore(md.getId()); v.setLastUpdate(null); @@ -189,7 +190,7 @@ public class MDStoreService { mdstoreCurrentVersionRepository.save(MDStoreCurrentVersion.newInstance(v)); v.setWriting(false); v.setSize(size); - v.setLastUpdate(new Date()); + v.setLastUpdate(LocalDateTime.now()); mdstoreVersionRepository.save(v); return v; @@ -258,7 +259,7 @@ public class MDStoreService { md.setLayout(layout); md.setType(type); md.setInterpretation(interpretation); - md.setCreationDate(new Date()); + md.setCreationDate(LocalDateTime.now()); md.setDatasourceName(dsName); md.setDatasourceId(dsId); md.setApiId(apiId); diff --git a/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/MockBackend.java b/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/MockBackend.java index 66ba910b..5b1a1d84 100644 --- a/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/MockBackend.java +++ b/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/MockBackend.java @@ -1,8 +1,8 @@ package eu.dnetlib.data.mdstore.backends; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; -import java.util.Date; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; @@ -49,8 +49,8 @@ public class MockBackend implements MDStoreBackend { rec.setOriginalId("mck-" + i); rec.setId("mock________::mck-" + i); rec.setBody("" + i + ""); - rec.setDateOfCollection(new Date().getTime()); - rec.setDateOfTransformation(new Date().getTime()); + rec.setDateOfCollection(Instant.now().toEpochMilli()); + rec.setDateOfTransformation(Instant.now().toEpochMilli()); rec.setEncoding("XML"); rec.setProvenance(MOCK_PROVENANCE); list.add(rec); diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/data/mdstore/model/MDStore.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/data/mdstore/model/MDStore.java index 82548da9..aaf488da 100644 --- a/libs/dnet-is-common/src/main/java/eu/dnetlib/data/mdstore/model/MDStore.java +++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/data/mdstore/model/MDStore.java @@ -1,7 +1,7 @@ package eu.dnetlib.data.mdstore.model; import java.io.Serializable; -import java.util.Date; +import java.time.LocalDateTime; import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; @@ -56,7 +56,7 @@ public class MDStore implements Serializable { @Column(name = "creation_date") @Temporal(TemporalType.TIMESTAMP) - private Date creationDate; + private LocalDateTime creationDate; @Override public int hashCode() { @@ -143,11 +143,11 @@ public class MDStore implements Serializable { this.params = params; } - public Date getCreationDate() { + public LocalDateTime getCreationDate() { return creationDate; } - public void setCreationDate(final Date creationDate) { + public void setCreationDate(final LocalDateTime creationDate) { this.creationDate = creationDate; } diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/data/mdstore/model/MDStoreVersion.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/data/mdstore/model/MDStoreVersion.java index d2e07c08..4ba17ffa 100644 --- a/libs/dnet-is-common/src/main/java/eu/dnetlib/data/mdstore/model/MDStoreVersion.java +++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/data/mdstore/model/MDStoreVersion.java @@ -1,7 +1,7 @@ package eu.dnetlib.data.mdstore.model; import java.io.Serializable; -import java.util.Date; +import java.time.LocalDateTime; import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; @@ -37,7 +37,7 @@ public class MDStoreVersion implements Serializable { @Column(name = "lastupdate") @Temporal(TemporalType.TIMESTAMP) - private Date lastUpdate; + private LocalDateTime lastUpdate; @Column(name = "size") private long size = 0; @@ -78,11 +78,11 @@ public class MDStoreVersion implements Serializable { this.readCount = readCount; } - public Date getLastUpdate() { + public LocalDateTime getLastUpdate() { return lastUpdate; } - public void setLastUpdate(final Date lastUpdate) { + public void setLastUpdate(final LocalDateTime lastUpdate) { this.lastUpdate = lastUpdate; } diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/data/mdstore/model/MDStoreWithInfo.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/data/mdstore/model/MDStoreWithInfo.java index 17bdb4f8..b95133f1 100644 --- a/libs/dnet-is-common/src/main/java/eu/dnetlib/data/mdstore/model/MDStoreWithInfo.java +++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/data/mdstore/model/MDStoreWithInfo.java @@ -1,7 +1,7 @@ package eu.dnetlib.data.mdstore.model; import java.io.Serializable; -import java.util.Date; +import java.time.LocalDateTime; import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; @@ -55,11 +55,11 @@ public class MDStoreWithInfo implements Serializable { @Column(name = "creation_date") @Temporal(TemporalType.TIMESTAMP) - private Date creationDate; + private LocalDateTime creationDate; @Column(name = "lastupdate") @Temporal(TemporalType.TIMESTAMP) - private Date lastUpdate; + private LocalDateTime lastUpdate; @Column(name = "size") private long size = 0; @@ -143,19 +143,19 @@ public class MDStoreWithInfo implements Serializable { this.currentVersion = currentVersion; } - public Date getCreationDate() { + public LocalDateTime getCreationDate() { return creationDate; } - public void setCreationDate(final Date creationDate) { + public void setCreationDate(final LocalDateTime creationDate) { this.creationDate = creationDate; } - public Date getLastUpdate() { + public LocalDateTime getLastUpdate() { return lastUpdate; } - public void setLastUpdate(final Date lastUpdate) { + public void setLastUpdate(final LocalDateTime lastUpdate) { this.lastUpdate = lastUpdate; } diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/dsm/model/Datasource.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/dsm/model/Datasource.java index 36f357c4..fe882b1a 100644 --- a/libs/dnet-is-common/src/main/java/eu/dnetlib/dsm/model/Datasource.java +++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/dsm/model/Datasource.java @@ -2,6 +2,7 @@ package eu.dnetlib.dsm.model; import java.io.Serializable; import java.sql.Date; +import java.time.LocalDateTime; import java.util.HashSet; import java.util.Set; @@ -89,7 +90,7 @@ public class Datasource implements Serializable { private String registeredby; - private Date registrationdate; + private LocalDateTime registrationdate; private String subjects; @@ -99,10 +100,10 @@ public class Datasource implements Serializable { private Boolean consentTermsOfUse; @Column(name = "consenttermsofusedate") - private Date consentTermsOfUseDate; + private LocalDateTime consentTermsOfUseDate; @Column(name = "lastconsenttermsofusedate") - private Date lastConsentTermsOfUseDate; + private LocalDateTime lastConsentTermsOfUseDate; @Column(name = "fulltextdownload") private Boolean fullTextDownload; @@ -401,11 +402,11 @@ public class Datasource implements Serializable { this.registeredby = registeredby; } - public Date getRegistrationdate() { + public LocalDateTime getRegistrationdate() { return registrationdate; } - public void setRegistrationdate(final Date registrationdate) { + public void setRegistrationdate(final LocalDateTime registrationdate) { this.registrationdate = registrationdate; } @@ -433,19 +434,19 @@ public class Datasource implements Serializable { this.consentTermsOfUse = consentTermsOfUse; } - public Date getConsentTermsOfUseDate() { + public LocalDateTime getConsentTermsOfUseDate() { return consentTermsOfUseDate; } - public void setConsentTermsOfUseDate(final Date consentTermsOfUseDate) { + public void setConsentTermsOfUseDate(final LocalDateTime consentTermsOfUseDate) { this.consentTermsOfUseDate = consentTermsOfUseDate; } - public Date getLastConsentTermsOfUseDate() { + public LocalDateTime getLastConsentTermsOfUseDate() { return lastConsentTermsOfUseDate; } - public void setLastConsentTermsOfUseDate(final Date lastConsentTermsOfUseDate) { + public void setLastConsentTermsOfUseDate(final LocalDateTime lastConsentTermsOfUseDate) { this.lastConsentTermsOfUseDate = lastConsentTermsOfUseDate; } diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/dsm/model/Organization.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/dsm/model/Organization.java index ec5c9e87..25ee8533 100644 --- a/libs/dnet-is-common/src/main/java/eu/dnetlib/dsm/model/Organization.java +++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/dsm/model/Organization.java @@ -1,7 +1,7 @@ package eu.dnetlib.dsm.model; import java.io.Serializable; -import java.sql.Date; +import java.time.LocalDateTime; import java.util.Set; import javax.persistence.CascadeType; @@ -34,7 +34,7 @@ public class Organization implements Serializable { private String country; private String collectedfrom; - private Date dateofcollection; + private LocalDateTime dateofcollection; private String provenanceaction; @ManyToMany(cascade = { @@ -98,11 +98,11 @@ public class Organization implements Serializable { this.collectedfrom = collectedfrom; } - public Date getDateofcollection() { + public LocalDateTime getDateofcollection() { return dateofcollection; } - public void setDateofcollection(final Date dateofcollection) { + public void setDateofcollection(final LocalDateTime dateofcollection) { this.dateofcollection = dateofcollection; } diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/errors/WorkflowManagerException.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/errors/WorkflowManagerException.java new file mode 100644 index 00000000..bc29ade1 --- /dev/null +++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/errors/WorkflowManagerException.java @@ -0,0 +1,14 @@ +package eu.dnetlib.errors; + +public class WorkflowManagerException extends Exception { + + private static final long serialVersionUID = -9067581185191425823L; + + public WorkflowManagerException(final String message, final Throwable cause) { + super(message, cause); + } + + public WorkflowManagerException(final String message) { + super(message); + } +} diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/is/model/resource/SimpleResource.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/is/model/resource/SimpleResource.java index 60565e0b..a2ec6c79 100644 --- a/libs/dnet-is-common/src/main/java/eu/dnetlib/is/model/resource/SimpleResource.java +++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/is/model/resource/SimpleResource.java @@ -1,7 +1,7 @@ package eu.dnetlib.is.model.resource; import java.io.Serializable; -import java.util.Date; +import java.time.LocalDateTime; import javax.persistence.Column; import javax.persistence.Entity; @@ -31,11 +31,11 @@ public class SimpleResource implements Serializable { @Temporal(TemporalType.TIMESTAMP) @Column(name = "creation_date") - private Date creationDate; + private LocalDateTime creationDate; @Temporal(TemporalType.TIMESTAMP) @Column(name = "modification_date") - private Date modificationDate; + private LocalDateTime modificationDate; public String getId() { return id; @@ -69,19 +69,19 @@ public class SimpleResource implements Serializable { this.description = description; } - public Date getCreationDate() { + public LocalDateTime getCreationDate() { return creationDate; } - public void setCreationDate(final Date creationDate) { + public void setCreationDate(final LocalDateTime creationDate) { this.creationDate = creationDate; } - public Date getModificationDate() { + public LocalDateTime getModificationDate() { return modificationDate; } - public void setModificationDate(final Date modificationDate) { + public void setModificationDate(final LocalDateTime modificationDate) { this.modificationDate = modificationDate; } diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/EmailTemplate.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/EmailTemplate.java new file mode 100644 index 00000000..0ba91984 --- /dev/null +++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/EmailTemplate.java @@ -0,0 +1,50 @@ +package eu.dnetlib.manager.wf.model; + +import java.io.Serializable; + +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.Table; + +@Entity +@Table(name = "emails") +public class EmailTemplate implements Serializable { + + private static final long serialVersionUID = -8424437958140000626L; + + @Id + @Column(name = "id") + private String id; + + @Column(name = "subject") + private String subject; + + @Column(name = "message") + private String message; + + public String getId() { + return id; + } + + public void setId(final String id) { + this.id = id; + } + + public String getSubject() { + return subject; + } + + public void setSubject(final String subject) { + this.subject = subject; + } + + public String getMessage() { + return message; + } + + public void setMessage(final String message) { + this.message = message; + } + +} diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphArcDbEntry.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphArcDbEntry.java new file mode 100644 index 00000000..5ec1447f --- /dev/null +++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphArcDbEntry.java @@ -0,0 +1,28 @@ +package eu.dnetlib.manager.wf.model; + +import java.io.Serializable; + +public class GraphArcDbEntry implements Serializable { + + private static final long serialVersionUID = 7866138976929522262L; + + private String name; + private String to; + + public String getName() { + return name; + } + + public void setName(final String name) { + this.name = name; + } + + public String getTo() { + return to; + } + + public void setTo(final String to) { + this.to = to; + } + +} diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphNodeDbEntry.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphNodeDbEntry.java new file mode 100644 index 00000000..ca1e0f91 --- /dev/null +++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphNodeDbEntry.java @@ -0,0 +1,64 @@ +package eu.dnetlib.manager.wf.model; + +import java.io.Serializable; + +public class GraphNodeDbEntry implements Serializable { + + private static final long serialVersionUID = -3695762832959801906L; + + private String name; + private String type; + private boolean isStart = false; + private boolean isJoin = false; + private GraphArcDbEntry[] arcs; + private GraphParameterDbEntry[] params; + + public String getName() { + return name; + } + + public void setName(final String name) { + this.name = name; + } + + public String getType() { + return type; + } + + public void setType(final String type) { + this.type = type; + } + + public boolean isStart() { + return isStart; + } + + public void setStart(final boolean isStart) { + this.isStart = isStart; + } + + public boolean isJoin() { + return isJoin; + } + + public void setJoin(final boolean isJoin) { + this.isJoin = isJoin; + } + + public GraphArcDbEntry[] getArcs() { + return arcs; + } + + public void setArcs(final GraphArcDbEntry[] arcs) { + this.arcs = arcs; + } + + public GraphParameterDbEntry[] getParams() { + return params; + } + + public void setParams(final GraphParameterDbEntry[] params) { + this.params = params; + } + +} diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphParameterDbEntry.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphParameterDbEntry.java new file mode 100644 index 00000000..6edcf0f7 --- /dev/null +++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphParameterDbEntry.java @@ -0,0 +1,39 @@ +package eu.dnetlib.manager.wf.model; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +public class GraphParameterDbEntry extends GraphParameterValueDbEntry implements Serializable { + + private static final long serialVersionUID = 1894419948433994453L; + + private String name; + private List values; + private Map map; + + public String getName() { + return name; + } + + public void setName(final String name) { + this.name = name; + } + + public List getValues() { + return values; + } + + public void setValues(final List values) { + this.values = values; + } + + public Map getMap() { + return map; + } + + public void setMap(final Map map) { + this.map = map; + } + +} diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphParameterValueDbEntry.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphParameterValueDbEntry.java new file mode 100644 index 00000000..c04a6e30 --- /dev/null +++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/GraphParameterValueDbEntry.java @@ -0,0 +1,45 @@ +package eu.dnetlib.manager.wf.model; + +import java.io.Serializable; + +public class GraphParameterValueDbEntry implements Serializable { + + private static final long serialVersionUID = 7815785723401725707L; + + private String value; + private String ref; + private String property; + private String env; + + public String getValue() { + return value; + } + + public void setValue(final String value) { + this.value = value; + } + + public String getRef() { + return ref; + } + + public void setRef(final String ref) { + this.ref = ref; + } + + public String getProperty() { + return property; + } + + public void setProperty(final String property) { + this.property = property; + } + + public String getEnv() { + return env; + } + + public void setEnv(final String env) { + this.env = env; + } +} diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/NotificationCondition.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/NotificationCondition.java new file mode 100644 index 00000000..dbb0c205 --- /dev/null +++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/NotificationCondition.java @@ -0,0 +1,5 @@ +package eu.dnetlib.manager.wf.model; + +public enum NotificationCondition { + ALWAYS, NEVER, ONLY_SUCCESS, ONLY_FAILED +} diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/msro/model/history/WfProcessExecution.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WfProcessExecution.java similarity index 81% rename from libs/dnet-is-common/src/main/java/eu/dnetlib/msro/model/history/WfProcessExecution.java rename to libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WfProcessExecution.java index 0bea598f..4863e27d 100644 --- a/libs/dnet-is-common/src/main/java/eu/dnetlib/msro/model/history/WfProcessExecution.java +++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WfProcessExecution.java @@ -1,7 +1,7 @@ -package eu.dnetlib.msro.model.history; +package eu.dnetlib.manager.wf.model; import java.io.Serializable; -import java.util.Date; +import java.time.LocalDateTime; import java.util.Map; import javax.persistence.Column; @@ -32,6 +32,9 @@ public class WfProcessExecution implements Serializable { @Column(name = "process_id") private String processId; + @Column(name = "wf_instance_id") + private String wfInstanceId; + @Column(name = "name") private String name; @@ -43,11 +46,11 @@ public class WfProcessExecution implements Serializable { @Temporal(TemporalType.TIMESTAMP) @Column(name = "start_date") - private Date startDate; + private LocalDateTime startDate; @Temporal(TemporalType.TIMESTAMP) @Column(name = "end_date") - private Date endDate; + private LocalDateTime endDate; @Column(name = "ds_id") private String dsId; @@ -70,6 +73,14 @@ public class WfProcessExecution implements Serializable { this.processId = processId; } + public String getWfInstanceId() { + return wfInstanceId; + } + + public void setWfInstanceId(final String wfInstanceId) { + this.wfInstanceId = wfInstanceId; + } + public String getName() { return name; } @@ -94,19 +105,19 @@ public class WfProcessExecution implements Serializable { this.status = status; } - public Date getStartDate() { + public LocalDateTime getStartDate() { return startDate; } - public void setStartDate(final Date startDate) { + public void setStartDate(final LocalDateTime startDate) { this.startDate = startDate; } - public Date getEndDate() { + public LocalDateTime getEndDate() { return endDate; } - public void setEndDate(final Date endDate) { + public void setEndDate(final LocalDateTime endDate) { this.endDate = endDate; } diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowDbEntry.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowDbEntry.java new file mode 100644 index 00000000..f991bec7 --- /dev/null +++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowDbEntry.java @@ -0,0 +1,83 @@ +package eu.dnetlib.manager.wf.model; + +import java.io.Serializable; + +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.Table; + +import org.hibernate.annotations.Type; +import org.hibernate.annotations.TypeDef; +import org.hibernate.annotations.TypeDefs; + +import com.vladmihalcea.hibernate.type.json.JsonBinaryType; +import com.vladmihalcea.hibernate.type.json.JsonStringType; + +@Entity +@Table(name = "workflows") +@TypeDefs({ + @TypeDef(name = "json", typeClass = JsonStringType.class), + @TypeDef(name = "jsonb", typeClass = JsonBinaryType.class) +}) +public class WorkflowDbEntry implements Serializable { + + private static final long serialVersionUID = -4684952453322136556L; + + @Id + @Column(name = "id") + private String id; + + @Column(name = "name") + private String name; + + @Column(name = "family") + private String family; + + @Column(name = "description") + private String description; + + @Type(type = "jsonb") + @Column(name = "data", columnDefinition = "jsonb") + private GraphNodeDbEntry[] graph; + + public String getId() { + return id; + } + + public void setId(final String id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(final String name) { + this.name = name; + } + + public String getFamily() { + return family; + } + + public void setFamily(final String family) { + this.family = family; + } + + public String getDescription() { + return description; + } + + public void setDescription(final String description) { + this.description = description; + } + + public GraphNodeDbEntry[] getGraph() { + return graph; + } + + public void setGraph(final GraphNodeDbEntry[] graph) { + this.graph = graph; + } +} diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowInstance.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowInstance.java new file mode 100644 index 00000000..a0ddca0a --- /dev/null +++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowInstance.java @@ -0,0 +1,195 @@ +package eu.dnetlib.manager.wf.model; + +import java.io.Serializable; +import java.util.LinkedHashMap; +import java.util.Map; + +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.Table; + +import org.hibernate.annotations.Type; +import org.hibernate.annotations.TypeDef; +import org.hibernate.annotations.TypeDefs; + +import com.vladmihalcea.hibernate.type.json.JsonBinaryType; +import com.vladmihalcea.hibernate.type.json.JsonStringType; + +@Entity +@Table(name = "workflow_instances") +@TypeDefs({ + @TypeDef(name = "json", typeClass = JsonStringType.class), + @TypeDef(name = "jsonb", typeClass = JsonBinaryType.class) +}) +public class WorkflowInstance implements Serializable { + + private static final long serialVersionUID = 7503841966138333044L; + + @Id + @Column(name = "id") + private String id; + + @Type(type = "jsonb") + @Column(name = "data", columnDefinition = "jsonb") + private Map details = new LinkedHashMap<>(); + + @Column(name = "priority") + private Integer priority; + + @Column(name = "dsid") + private String dsId; + + @Column(name = "dsname") + private String dsName; + + @Column(name = "apiid") + private String apiId; + + @Column(name = "enabled") + private boolean enabled; + + @Column(name = "configured") + private boolean configured; + + @Column(name = "scheduling_enabled") + private boolean schedulingEnabled; + + @Column(name = "scheduling_cron") + private String cronExpression; + + @Column(name = "scheduling_min_interval") + private int cronMinInterval; + + @Column(name = "workflow") + private String workflow; + + @Column(name = "destroy_wf") + private String destroyWf; + + @Column(name = "system_params") + private Map systemParams = new LinkedHashMap<>(); + + @Column(name = "user_params") + private Map userParams = new LinkedHashMap<>(); + + public String getId() { + return id; + } + + public void setId(final String id) { + this.id = id; + } + + public Map getDetails() { + return details; + } + + public void setDetails(final Map details) { + this.details = details; + } + + public Integer getPriority() { + return priority; + } + + public void setPriority(final Integer priority) { + this.priority = priority; + } + + public String getDsId() { + return dsId; + } + + public void setDsId(final String dsId) { + this.dsId = dsId; + } + + public String getDsName() { + return dsName; + } + + public void setDsName(final String dsName) { + this.dsName = dsName; + } + + public String getApiId() { + return apiId; + } + + public void setApiId(final String apiId) { + this.apiId = apiId; + } + + public boolean isEnabled() { + return enabled; + } + + public void setEnabled(final boolean enabled) { + this.enabled = enabled; + } + + public boolean isConfigured() { + return configured; + } + + public void setConfigured(final boolean configured) { + this.configured = configured; + } + + public boolean isSchedulingEnabled() { + return schedulingEnabled; + } + + public void setSchedulingEnabled(final boolean schedulingEnabled) { + this.schedulingEnabled = schedulingEnabled; + } + + public String getCronExpression() { + return cronExpression; + } + + public void setCronExpression(final String cronExpression) { + this.cronExpression = cronExpression; + } + + public int getCronMinInterval() { + return cronMinInterval; + } + + public void setCronMinInterval(final int cronMinInterval) { + this.cronMinInterval = cronMinInterval; + } + + public String getWorkflow() { + return workflow; + } + + public void setWorkflow(final String workflow) { + this.workflow = workflow; + } + + public String getDestroyWf() { + return destroyWf; + } + + public void setDestroyWf(final String destroyWf) { + this.destroyWf = destroyWf; + } + + public Map getSystemParams() { + return systemParams; + } + + public void setSystemParams(final Map systemParams) { + this.systemParams = systemParams; + } + + public Map getUserParams() { + return userParams; + } + + public void setUserParams(final Map userParams) { + this.userParams = userParams; + } +} diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowParamDesc.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowParamDesc.java new file mode 100644 index 00000000..9e0f177f --- /dev/null +++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowParamDesc.java @@ -0,0 +1,5 @@ +package eu.dnetlib.manager.wf.model; + +public class WorkflowParamDesc { + +} diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowSubscription.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowSubscription.java new file mode 100644 index 00000000..9442e279 --- /dev/null +++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowSubscription.java @@ -0,0 +1,68 @@ +package eu.dnetlib.manager.wf.model; + +import java.io.Serializable; + +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.EnumType; +import javax.persistence.Enumerated; +import javax.persistence.Id; +import javax.persistence.IdClass; +import javax.persistence.Table; + +@Entity +@Table(name = "workflow_subscriptions") +@IdClass(WorkflowSubscriptionPK.class) +public class WorkflowSubscription implements Serializable { + + private static final long serialVersionUID = -3662770213782581404L; + + @Id + @Column(name = "wf_instance_id") + private String wfInstanceId; + + @Id + @Column(name = "condition") + @Enumerated(EnumType.STRING) + private NotificationCondition condition; + + @Id + @Column(name = "email") + private String email; + + @Column(name = "message_id") + private String messageId; + + public String getWfInstanceId() { + return wfInstanceId; + } + + public void setWfInstanceId(final String wfInstanceId) { + this.wfInstanceId = wfInstanceId; + } + + public NotificationCondition getCondition() { + return condition; + } + + public void setCondition(final NotificationCondition condition) { + this.condition = condition; + } + + public String getEmail() { + return email; + } + + public void setEmail(final String email) { + this.email = email; + } + + public String getMessageId() { + return messageId; + } + + public void setMessageId(final String messageId) { + this.messageId = messageId; + } + +} diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowSubscriptionPK.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowSubscriptionPK.java new file mode 100644 index 00000000..61b3abfc --- /dev/null +++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/manager/wf/model/WorkflowSubscriptionPK.java @@ -0,0 +1,39 @@ +package eu.dnetlib.manager.wf.model; + +import java.io.Serializable; + +public class WorkflowSubscriptionPK implements Serializable { + + private static final long serialVersionUID = -7569690774071644848L; + + private String wfInstanceId; + + private NotificationCondition condition; + + private String email; + + public String getWfInstanceId() { + return wfInstanceId; + } + + public void setWfInstanceId(final String wfInstanceId) { + this.wfInstanceId = wfInstanceId; + } + + public NotificationCondition getCondition() { + return condition; + } + + public void setCondition(final NotificationCondition condition) { + this.condition = condition; + } + + public String getEmail() { + return email; + } + + public void setEmail(final String email) { + this.email = email; + } + +} diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/utils/DateUtils.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/utils/DateUtils.java index 63691bf7..3cdda237 100644 --- a/libs/dnet-is-common/src/main/java/eu/dnetlib/utils/DateUtils.java +++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/utils/DateUtils.java @@ -1,9 +1,11 @@ package eu.dnetlib.utils; import java.io.StringWriter; -import java.text.SimpleDateFormat; -import java.util.Date; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; import java.util.Locale; +import java.util.TimeZone; public class DateUtils { @@ -12,7 +14,7 @@ public class DateUtils { private static final long HOUR = MINUTE * 60; private static final long DAY = HOUR * 24; private static final long YEAR = DAY * 365; - private static final SimpleDateFormat ISO8601FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ", Locale.US); + private static final DateTimeFormatter ISO8601FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ", Locale.getDefault()); public static String elapsedTime(long t) { final StringWriter a = new StringWriter(); @@ -78,7 +80,13 @@ public class DateUtils { } public static String calculate_ISO8601(final long l) { - String result = ISO8601FORMAT.format(new Date(l)); + + final LocalDateTime time = + LocalDateTime.ofInstant(Instant.ofEpochMilli(l), TimeZone + .getDefault() + .toZoneId()); + + String result = time.format(ISO8601FORMAT); // convert YYYYMMDDTHH:mm:ss+HH00 into YYYYMMDDTHH:mm:ss+HH:00 // - note the added colon for the Timezone result = result.substring(0, result.length() - 2) + ":" + result.substring(result.length() - 2); @@ -89,4 +97,8 @@ public class DateUtils { return Long.toString(Math.floorDiv(d, n)); } + public static long now() { + return Instant.now().toEpochMilli(); + } + } diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/utils/Stoppable.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/utils/Stoppable.java new file mode 100644 index 00000000..dd1fb957 --- /dev/null +++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/utils/Stoppable.java @@ -0,0 +1,10 @@ +package eu.dnetlib.utils; + +public interface Stoppable { + + void stop(); + + void resume(); + + public StoppableDetails getStopDetails(); +} diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/utils/StoppableDetails.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/utils/StoppableDetails.java new file mode 100644 index 00000000..708198c0 --- /dev/null +++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/utils/StoppableDetails.java @@ -0,0 +1,33 @@ +package eu.dnetlib.utils; + +public class StoppableDetails { + + public enum StopStatus { + STOPPED, + STOPPING, + RUNNING + }; + + private final String title; + private final String description; + private final StopStatus status; + + public StoppableDetails(final String title, final String description, final StopStatus status) { + this.title = title; + this.description = description; + this.status = status; + } + + public String getTitle() { + return title; + } + + public String getDescription() { + return description; + } + + public StopStatus getStatus() { + return status; + } + +} diff --git a/libs/dnet-is-common/src/main/resources/sql/schema.sql b/libs/dnet-is-common/src/main/resources/sql/schema.sql index a17f71f8..c734c2e3 100644 --- a/libs/dnet-is-common/src/main/resources/sql/schema.sql +++ b/libs/dnet-is-common/src/main/resources/sql/schema.sql @@ -105,16 +105,17 @@ CREATE INDEX ON context_cat_concepts_lvl_2 (parent); -- WF History CREATE TABLE wf_history ( - process_id text PRIMARY KEY, - name text NOT NULL, - family text NOT NULL, - status text NOT NULL, - start_date timestamp NOT NULL, - end_date timestamp NOT NULL, - ds_id text, - ds_name text, - ds_api text, - details jsonb + process_id text PRIMARY KEY, + wf_instance_id text NOT NULL, + name text NOT NULL, + family text NOT NULL, + status text NOT NULL, + start_date timestamp NOT NULL, + end_date timestamp NOT NULL, + ds_id text, + ds_name text, + ds_api text, + details jsonb ); -- Other Resources @@ -241,3 +242,55 @@ GROUP BY md.id, v1.lastupdate, v1.size; +-- Email Templates +CREATE TABLE emails ( + id text PRIMARY KEY, + subject text NOT NULL, + message text NOT NULL +); + +-- Workflows + +CREATE TABLE workflows ( + id text PRIMARY KEY, + name text NOT NULL, + family text NOT NULL, + description text, + data jsonb NOT NULL +); + +CREATE TABLE workflow_instances ( + id text PRIMARY KEY, + details jsonb NOT NULL DEFAULT '{}', + priority int, + dsid text, + dsname text, + apiid text, + enabled boolean NOT NULL DEFAULT false, + configured boolean NOT NULL DEFAULT false, + scheduling_enabled boolean NOT NULL DEFAULT false, + scheduling_cron text, + scheduling_min_interval int, + workflow text REFERENCES workflows(id), + destroy_wf text REFERENCES workflows(id), + system_params jsonb NOT NULL DEFAULT '{}', + user_params jsonb NOT NULL DEFAULT '{}' +); + +CREATE TABLE workflow_expected_params ( + wf_id text REFERENCES workflows(id), + name text, + description text, + type text, + required boolean, + default_value text, + PRIMARY KEY (wf_id, name) +); + +CREATE TABLE workflow_subscriptions ( + wf_instance_id text NOT NULL REFERENCES workflow_instances(id), + condition text NOT NULL, + email text NOT NULL, + message_id text NOT NULL REFERENCES emails(id), + PRIMARY KEY (wf_instance_id, condition, email) +); diff --git a/libs/dnet-is-services/pom.xml b/libs/dnet-is-services/pom.xml index a21a4f9d..709c81b9 100644 --- a/libs/dnet-is-services/pom.xml +++ b/libs/dnet-is-services/pom.xml @@ -24,6 +24,11 @@ jaxen jaxen + + + org.springframework.boot + spring-boot-starter-thymeleaf + diff --git a/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/DsmService.java b/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/DsmService.java index 5f292ce1..f9a1c8a3 100644 --- a/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/DsmService.java +++ b/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/DsmService.java @@ -9,7 +9,7 @@ import static eu.dnetlib.dsm.utils.DsmMappingUtils.createId; import java.io.IOException; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; -import java.sql.Date; +import java.time.LocalDate; import java.util.HashSet; import java.util.List; import java.util.Objects; @@ -368,7 +368,7 @@ public class DsmService { private void ensureRegistrationDate(final String dsId) { if (!dsRepository.hasRegistrationdate(dsId)) { log.info("setting registration date for datasource: " + dsId); - dsRepository.setRegistrationDate(dsId, new Date(System.currentTimeMillis())); + dsRepository.setRegistrationDate(dsId, LocalDate.now()); } } diff --git a/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/domain/ApiDetails.java b/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/domain/ApiDetails.java index 6bf9c977..8e12cb5c 100644 --- a/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/domain/ApiDetails.java +++ b/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/domain/ApiDetails.java @@ -1,6 +1,6 @@ package eu.dnetlib.dsm.domain; -import java.sql.Date; +import java.time.LocalDateTime; import java.util.Set; import com.fasterxml.jackson.annotation.JsonAutoDetect; @@ -27,15 +27,15 @@ public class ApiDetails extends ApiIgnoredProperties { private Integer lastCollectionTotal; - private Date lastCollectionDate; + private LocalDateTime lastCollectionDate; private Integer lastAggregationTotal; - private Date lastAggregationDate; + private LocalDateTime lastAggregationDate; private Integer lastDownloadTotal; - private Date lastDownloadDate; + private LocalDateTime lastDownloadDate; private String baseurl; @@ -71,7 +71,7 @@ public class ApiDetails extends ApiIgnoredProperties { return lastCollectionTotal; } - public Date getLastCollectionDate() { + public LocalDateTime getLastCollectionDate() { return lastCollectionDate; } @@ -79,7 +79,7 @@ public class ApiDetails extends ApiIgnoredProperties { return lastAggregationTotal; } - public Date getLastAggregationDate() { + public LocalDateTime getLastAggregationDate() { return lastAggregationDate; } @@ -87,7 +87,7 @@ public class ApiDetails extends ApiIgnoredProperties { return lastDownloadTotal; } - public Date getLastDownloadDate() { + public LocalDateTime getLastDownloadDate() { return lastDownloadDate; } @@ -125,7 +125,7 @@ public class ApiDetails extends ApiIgnoredProperties { return this; } - public ApiDetails setLastCollectionDate(final Date lastCollectionDate) { + public ApiDetails setLastCollectionDate(final LocalDateTime lastCollectionDate) { this.lastCollectionDate = lastCollectionDate; return this; } @@ -135,7 +135,7 @@ public class ApiDetails extends ApiIgnoredProperties { return this; } - public ApiDetails setLastAggregationDate(final Date lastAggregationDate) { + public ApiDetails setLastAggregationDate(final LocalDateTime lastAggregationDate) { this.lastAggregationDate = lastAggregationDate; return this; } @@ -145,7 +145,7 @@ public class ApiDetails extends ApiIgnoredProperties { return this; } - public ApiDetails setLastDownloadDate(final Date lastDownloadDate) { + public ApiDetails setLastDownloadDate(final LocalDateTime lastDownloadDate) { this.lastDownloadDate = lastDownloadDate; return this; } diff --git a/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/domain/DatasourceDetails.java b/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/domain/DatasourceDetails.java index 096a76dd..ef7c0a08 100644 --- a/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/domain/DatasourceDetails.java +++ b/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/domain/DatasourceDetails.java @@ -1,6 +1,6 @@ package eu.dnetlib.dsm.domain; -import java.sql.Date; +import java.time.LocalDate; import java.util.Set; import javax.persistence.Transient; @@ -48,12 +48,12 @@ public class DatasourceDetails extends DatasourceIgnoredProperties { private String languages; - private Date dateofvalidation; + private LocalDate dateofvalidation; @NotBlank private String eoscDatasourceType; - private Date dateofcollection; + private LocalDate dateofcollection; private String platform; @@ -82,9 +82,9 @@ public class DatasourceDetails extends DatasourceIgnoredProperties { private Boolean fullTextDownload; - private Date consentTermsOfUseDate; + private LocalDate consentTermsOfUseDate; - private Date lastConsentTermsOfUseDate; + private LocalDate lastConsentTermsOfUseDate; private Set organizations; @@ -95,7 +95,7 @@ public class DatasourceDetails extends DatasourceIgnoredProperties { @Deprecated private String typology; - private Date registrationdate; + private LocalDate registrationdate; public String getId() { return id; @@ -145,7 +145,7 @@ public class DatasourceDetails extends DatasourceIgnoredProperties { return namespaceprefix; } - public Date getDateofvalidation() { + public LocalDate getDateofvalidation() { return dateofvalidation; } @@ -153,7 +153,7 @@ public class DatasourceDetails extends DatasourceIgnoredProperties { return eoscDatasourceType; } - public Date getDateofcollection() { + public LocalDate getDateofcollection() { return dateofcollection; } @@ -277,7 +277,7 @@ public class DatasourceDetails extends DatasourceIgnoredProperties { return this; } - public DatasourceDetails setDateofvalidation(final Date dateofvalidation) { + public DatasourceDetails setDateofvalidation(final LocalDate dateofvalidation) { this.dateofvalidation = dateofvalidation; return this; } @@ -287,7 +287,7 @@ public class DatasourceDetails extends DatasourceIgnoredProperties { return this; } - public DatasourceDetails setDateofcollection(final Date dateofcollection) { + public DatasourceDetails setDateofcollection(final LocalDate dateofcollection) { this.dateofcollection = dateofcollection; return this; } @@ -367,11 +367,11 @@ public class DatasourceDetails extends DatasourceIgnoredProperties { return this; } - public Date getConsentTermsOfUseDate() { + public LocalDate getConsentTermsOfUseDate() { return consentTermsOfUseDate; } - public DatasourceDetails setConsentTermsOfUseDate(final Date consentTermsOfUseDate) { + public DatasourceDetails setConsentTermsOfUseDate(final LocalDate consentTermsOfUseDate) { this.consentTermsOfUseDate = consentTermsOfUseDate; return this; } @@ -396,20 +396,20 @@ public class DatasourceDetails extends DatasourceIgnoredProperties { return this; } - public Date getLastConsentTermsOfUseDate() { + public LocalDate getLastConsentTermsOfUseDate() { return lastConsentTermsOfUseDate; } - public DatasourceDetails setLastConsentTermsOfUseDate(final Date lastConsentTermsOfUseDate) { + public DatasourceDetails setLastConsentTermsOfUseDate(final LocalDate lastConsentTermsOfUseDate) { this.lastConsentTermsOfUseDate = lastConsentTermsOfUseDate; return this; } - public Date getRegistrationdate() { + public LocalDate getRegistrationdate() { return registrationdate; } - public DatasourceDetails setRegistrationdate(final Date registrationdate) { + public DatasourceDetails setRegistrationdate(final LocalDate registrationdate) { this.registrationdate = registrationdate; return this; } diff --git a/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/domain/DatasourceDetailsUpdate.java b/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/domain/DatasourceDetailsUpdate.java index e9fe39f8..3451ba70 100644 --- a/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/domain/DatasourceDetailsUpdate.java +++ b/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/domain/DatasourceDetailsUpdate.java @@ -1,6 +1,6 @@ package eu.dnetlib.dsm.domain; -import java.sql.Date; +import java.time.LocalDate; import java.util.Set; import javax.validation.constraints.Email; @@ -57,9 +57,9 @@ public class DatasourceDetailsUpdate { private Boolean consentTermsOfUse; - private Date consentTermsOfUseDate; + private LocalDate consentTermsOfUseDate; - private Date lastConsentTermsOfUseDate; + private LocalDate lastConsentTermsOfUseDate; private Boolean fullTextDownload; @@ -207,11 +207,11 @@ public class DatasourceDetailsUpdate { return this; } - public Date getConsentTermsOfUseDate() { + public LocalDate getConsentTermsOfUseDate() { return consentTermsOfUseDate; } - public DatasourceDetailsUpdate setConsentTermsOfUseDate(final Date consentTermsOfUseDate) { + public DatasourceDetailsUpdate setConsentTermsOfUseDate(final LocalDate consentTermsOfUseDate) { this.consentTermsOfUseDate = consentTermsOfUseDate; return this; } @@ -225,11 +225,11 @@ public class DatasourceDetailsUpdate { return this; } - public Date getLastConsentTermsOfUseDate() { + public LocalDate getLastConsentTermsOfUseDate() { return lastConsentTermsOfUseDate; } - public DatasourceDetailsUpdate setLastConsentTermsOfUseDate(final Date lastConsentTermsOfUseDate) { + public DatasourceDetailsUpdate setLastConsentTermsOfUseDate(final LocalDate lastConsentTermsOfUseDate) { this.lastConsentTermsOfUseDate = lastConsentTermsOfUseDate; return this; } diff --git a/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/domain/DatasourceIgnoredProperties.java b/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/domain/DatasourceIgnoredProperties.java index 4a1a3e10..dfea4420 100644 --- a/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/domain/DatasourceIgnoredProperties.java +++ b/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/domain/DatasourceIgnoredProperties.java @@ -1,6 +1,6 @@ package eu.dnetlib.dsm.domain; -import java.sql.Date; +import java.time.LocalDate; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -13,10 +13,10 @@ public abstract class DatasourceIgnoredProperties { protected String provenanceaction; @JsonIgnore - protected Date releasestartdate; + protected LocalDate releasestartdate; @JsonIgnore - protected Date releaseenddate; + protected LocalDate releaseenddate; @JsonIgnore protected String missionstatementurl; @@ -76,19 +76,19 @@ public abstract class DatasourceIgnoredProperties { this.provenanceaction = provenanceaction; } - public Date getReleasestartdate() { + public LocalDate getReleasestartdate() { return releasestartdate; } - public void setReleasestartdate(final Date releasestartdate) { + public void setReleasestartdate(final LocalDate releasestartdate) { this.releasestartdate = releasestartdate; } - public Date getReleaseenddate() { + public LocalDate getReleaseenddate() { return releaseenddate; } - public void setReleaseenddate(final Date releaseenddate) { + public void setReleaseenddate(final LocalDate releaseenddate) { this.releaseenddate = releaseenddate; } diff --git a/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/domain/DatasourceSnippetExtended.java b/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/domain/DatasourceSnippetExtended.java index b44f19d8..adf99be5 100644 --- a/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/domain/DatasourceSnippetExtended.java +++ b/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/domain/DatasourceSnippetExtended.java @@ -1,6 +1,6 @@ package eu.dnetlib.dsm.domain; -import java.util.Date; +import java.time.LocalDateTime; import java.util.Set; import javax.validation.constraints.Email; @@ -28,7 +28,7 @@ public class DatasourceSnippetExtended { @Email private String registeredby; - private Date registrationdate; + private LocalDateTime registrationdate; private String eoscDatasourceType; @@ -38,9 +38,9 @@ public class DatasourceSnippetExtended { private Boolean consentTermsOfUse; - private Date consentTermsOfUseDate; + private LocalDateTime consentTermsOfUseDate; - private Date lastConsentTermsOfUseDate; + private LocalDateTime lastConsentTermsOfUseDate; private Boolean fullTextDownload; @@ -89,11 +89,11 @@ public class DatasourceSnippetExtended { this.registeredby = registeredby; } - public Date getRegistrationdate() { + public LocalDateTime getRegistrationdate() { return registrationdate; } - public void setRegistrationdate(final Date registrationdate) { + public void setRegistrationdate(final LocalDateTime registrationdate) { this.registrationdate = registrationdate; } @@ -118,11 +118,11 @@ public class DatasourceSnippetExtended { return this; } - public Date getConsentTermsOfUseDate() { + public LocalDateTime getConsentTermsOfUseDate() { return consentTermsOfUseDate; } - public DatasourceSnippetExtended setConsentTermsOfUseDate(final Date consentTermsOfUseDate) { + public DatasourceSnippetExtended setConsentTermsOfUseDate(final LocalDateTime consentTermsOfUseDate) { this.consentTermsOfUseDate = consentTermsOfUseDate; return this; } @@ -170,11 +170,11 @@ public class DatasourceSnippetExtended { return this; } - public Date getLastConsentTermsOfUseDate() { + public LocalDateTime getLastConsentTermsOfUseDate() { return lastConsentTermsOfUseDate; } - public DatasourceSnippetExtended setLastConsentTermsOfUseDate(final Date lastConsentTermsOfUseDate) { + public DatasourceSnippetExtended setLastConsentTermsOfUseDate(final LocalDateTime lastConsentTermsOfUseDate) { this.lastConsentTermsOfUseDate = lastConsentTermsOfUseDate; return this; } diff --git a/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/domain/OrganizationIgnoredProperties.java b/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/domain/OrganizationIgnoredProperties.java index 2a87622f..be9b507a 100644 --- a/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/domain/OrganizationIgnoredProperties.java +++ b/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/domain/OrganizationIgnoredProperties.java @@ -1,6 +1,6 @@ package eu.dnetlib.dsm.domain; -import java.util.Date; +import java.time.LocalDateTime; import java.util.Set; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -14,7 +14,7 @@ public class OrganizationIgnoredProperties { protected String collectedfrom; @JsonIgnore - protected Date dateofcollection; + protected LocalDateTime dateofcollection; @JsonIgnore protected String provenanceaction; @@ -38,11 +38,11 @@ public class OrganizationIgnoredProperties { this.collectedfrom = collectedfrom; } - public Date getDateofcollection() { + public LocalDateTime getDateofcollection() { return dateofcollection; } - public void setDateofcollection(final Date dateofcollection) { + public void setDateofcollection(final LocalDateTime dateofcollection) { this.dateofcollection = dateofcollection; } diff --git a/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/repository/DatasourceRepository.java b/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/repository/DatasourceRepository.java index b7319c14..c426dee0 100644 --- a/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/repository/DatasourceRepository.java +++ b/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/repository/DatasourceRepository.java @@ -1,6 +1,6 @@ package eu.dnetlib.dsm.repository; -import java.sql.Date; +import java.time.LocalDate; import java.util.Optional; import javax.transaction.Transactional; @@ -64,7 +64,7 @@ public interface DatasourceRepository extends JpaRepository, @Modifying @Transactional @Query("update #{#entityName} d set d.registrationdate = ?2 where d.id = ?1") - void setRegistrationDate(String id, Date registrationdate); + void setRegistrationDate(String id, LocalDate registrationdate); @Modifying @Transactional diff --git a/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/utils/DsmMappingUtils.java b/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/utils/DsmMappingUtils.java index efe75a43..666842ce 100644 --- a/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/utils/DsmMappingUtils.java +++ b/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/utils/DsmMappingUtils.java @@ -1,6 +1,6 @@ package eu.dnetlib.dsm.utils; -import java.sql.Date; +import java.time.LocalDateTime; import java.util.HashSet; import java.util.Set; import java.util.stream.Collectors; @@ -98,7 +98,7 @@ public class DsmMappingUtils { final String prefix = StringUtils.isNotBlank(dbe.getNamespaceprefix()) ? dbe.getNamespaceprefix() : dbe.getId(); o.setId(prefix + ID_SEPARATOR + o.getLegalname()); if (o.getDateofcollection() == null) { - o.setDateofcollection(new Date(System.currentTimeMillis())); + o.setDateofcollection(LocalDateTime.now()); } o.setCollectedfrom(dbe.getCollectedfrom()); }); diff --git a/libs/dnet-is-services/src/main/java/eu/dnetlib/is/resource/SimpleResourceService.java b/libs/dnet-is-services/src/main/java/eu/dnetlib/is/resource/SimpleResourceService.java index 10da2b98..42fbb7cb 100644 --- a/libs/dnet-is-services/src/main/java/eu/dnetlib/is/resource/SimpleResourceService.java +++ b/libs/dnet-is-services/src/main/java/eu/dnetlib/is/resource/SimpleResourceService.java @@ -1,6 +1,6 @@ package eu.dnetlib.is.resource; -import java.util.Date; +import java.time.LocalDateTime; import java.util.List; import java.util.UUID; import java.util.stream.Collectors; @@ -65,7 +65,7 @@ public class SimpleResourceService { resourceValidator.validate(type, content); - final Date now = new Date(); + final LocalDateTime now = LocalDateTime.now(); final SimpleResource res = new SimpleResource(); res.setId(UUID.randomUUID().toString()); diff --git a/libs/dnet-is-services/src/main/java/eu/dnetlib/manager/wf/WorkflowLogger.java b/libs/dnet-is-services/src/main/java/eu/dnetlib/manager/wf/WorkflowLogger.java new file mode 100644 index 00000000..31ae50cd --- /dev/null +++ b/libs/dnet-is-services/src/main/java/eu/dnetlib/manager/wf/WorkflowLogger.java @@ -0,0 +1,51 @@ +package eu.dnetlib.manager.wf; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.util.List; +import java.util.Optional; +import java.util.TimeZone; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.domain.PageRequest; +import org.springframework.data.domain.Sort; +import org.springframework.stereotype.Service; + +import eu.dnetlib.manager.wf.model.WfProcessExecution; +import eu.dnetlib.manager.wf.repository.WfProcessExecutionRepository; + +@Service +public class WorkflowLogger { + + @Autowired + private WfProcessExecutionRepository wfProcessExecutionRepository; + + public List history(final int total, final Long from, final Long to) { + if (from == null && to == null) { + return wfProcessExecutionRepository.findAll(PageRequest.of(0, total, Sort.by("endDate").descending())).toList(); + } else { + + final LocalDateTime fromTime = from != null ? LocalDateTime.ofInstant(Instant.ofEpochMilli(from), TimeZone + .getDefault() + .toZoneId()) : LocalDateTime.MIN; + final LocalDateTime toTime = to != null ? LocalDateTime.ofInstant(Instant.ofEpochMilli(to), TimeZone + .getDefault() + .toZoneId()) : LocalDateTime.MAX; + + return wfProcessExecutionRepository.findByEndDateBetweenOrderByEndDateDesc(fromTime, toTime); + } + } + + public WfProcessExecution getProcessExecution(final String processId) { + return wfProcessExecutionRepository.findById(processId).get(); + } + + public void saveProcessExecution(final WfProcessExecution proc) { + wfProcessExecutionRepository.save(proc); + } + + public Optional getLastExecutionForInstance(final String id) { + return wfProcessExecutionRepository.findOneByWfInstanceIdOrderByEndDateAsc(id); + } + +} diff --git a/libs/dnet-is-services/src/main/java/eu/dnetlib/manager/wf/repository/EmailTemplateRepository.java b/libs/dnet-is-services/src/main/java/eu/dnetlib/manager/wf/repository/EmailTemplateRepository.java new file mode 100644 index 00000000..5e98cf23 --- /dev/null +++ b/libs/dnet-is-services/src/main/java/eu/dnetlib/manager/wf/repository/EmailTemplateRepository.java @@ -0,0 +1,9 @@ +package eu.dnetlib.manager.wf.repository; + +import org.springframework.data.jpa.repository.JpaRepository; + +import eu.dnetlib.manager.wf.model.EmailTemplate; + +public interface EmailTemplateRepository extends JpaRepository { + +} diff --git a/libs/dnet-is-services/src/main/java/eu/dnetlib/manager/wf/repository/WfProcessExecutionRepository.java b/libs/dnet-is-services/src/main/java/eu/dnetlib/manager/wf/repository/WfProcessExecutionRepository.java new file mode 100644 index 00000000..cf58142d --- /dev/null +++ b/libs/dnet-is-services/src/main/java/eu/dnetlib/manager/wf/repository/WfProcessExecutionRepository.java @@ -0,0 +1,16 @@ +package eu.dnetlib.manager.wf.repository; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.Optional; + +import org.springframework.data.jpa.repository.JpaRepository; + +import eu.dnetlib.manager.wf.model.WfProcessExecution; + +public interface WfProcessExecutionRepository extends JpaRepository { + + List findByEndDateBetweenOrderByEndDateDesc(LocalDateTime start, LocalDateTime end); + + Optional findOneByWfInstanceIdOrderByEndDateAsc(String id); +} diff --git a/libs/dnet-is-services/src/main/java/eu/dnetlib/manager/wf/repository/WorkflowDbEntryRepository.java b/libs/dnet-is-services/src/main/java/eu/dnetlib/manager/wf/repository/WorkflowDbEntryRepository.java new file mode 100644 index 00000000..9e007e54 --- /dev/null +++ b/libs/dnet-is-services/src/main/java/eu/dnetlib/manager/wf/repository/WorkflowDbEntryRepository.java @@ -0,0 +1,9 @@ +package eu.dnetlib.manager.wf.repository; + +import org.springframework.data.jpa.repository.JpaRepository; + +import eu.dnetlib.manager.wf.model.WorkflowDbEntry; + +public interface WorkflowDbEntryRepository extends JpaRepository { + +} diff --git a/libs/dnet-is-services/src/main/java/eu/dnetlib/manager/wf/repository/WorkflowInstanceRepository.java b/libs/dnet-is-services/src/main/java/eu/dnetlib/manager/wf/repository/WorkflowInstanceRepository.java new file mode 100644 index 00000000..cb477905 --- /dev/null +++ b/libs/dnet-is-services/src/main/java/eu/dnetlib/manager/wf/repository/WorkflowInstanceRepository.java @@ -0,0 +1,9 @@ +package eu.dnetlib.manager.wf.repository; + +import org.springframework.data.jpa.repository.JpaRepository; + +import eu.dnetlib.manager.wf.model.WorkflowInstance; + +public interface WorkflowInstanceRepository extends JpaRepository { + +} diff --git a/libs/dnet-is-services/src/main/java/eu/dnetlib/manager/wf/repository/WorkflowSubscriptionRepository.java b/libs/dnet-is-services/src/main/java/eu/dnetlib/manager/wf/repository/WorkflowSubscriptionRepository.java new file mode 100644 index 00000000..ff493ca7 --- /dev/null +++ b/libs/dnet-is-services/src/main/java/eu/dnetlib/manager/wf/repository/WorkflowSubscriptionRepository.java @@ -0,0 +1,13 @@ +package eu.dnetlib.manager.wf.repository; + +import java.util.List; + +import org.springframework.data.jpa.repository.JpaRepository; + +import eu.dnetlib.manager.wf.model.WorkflowSubscription; +import eu.dnetlib.manager.wf.model.WorkflowSubscriptionPK; + +public interface WorkflowSubscriptionRepository extends JpaRepository { + + List findByWfInstanceId(String wfInstanceId); +} diff --git a/libs/dnet-is-services/src/main/java/eu/dnetlib/msro/history/repository/WfProcessExecutionRepository.java b/libs/dnet-is-services/src/main/java/eu/dnetlib/msro/history/repository/WfProcessExecutionRepository.java deleted file mode 100644 index be3a8c9a..00000000 --- a/libs/dnet-is-services/src/main/java/eu/dnetlib/msro/history/repository/WfProcessExecutionRepository.java +++ /dev/null @@ -1,13 +0,0 @@ -package eu.dnetlib.msro.history.repository; - -import java.util.Date; -import java.util.List; - -import org.springframework.data.jpa.repository.JpaRepository; - -import eu.dnetlib.msro.model.history.WfProcessExecution; - -public interface WfProcessExecutionRepository extends JpaRepository { - - List findByEndDateBetweenOrderByEndDateDesc(Date start, Date end); -} diff --git a/libs/dnet-is-services/src/main/java/eu/dnetlib/notifications/mail/EmailDispatcher.java b/libs/dnet-is-services/src/main/java/eu/dnetlib/notifications/mail/EmailDispatcher.java new file mode 100644 index 00000000..c5396b69 --- /dev/null +++ b/libs/dnet-is-services/src/main/java/eu/dnetlib/notifications/mail/EmailDispatcher.java @@ -0,0 +1,147 @@ +package eu.dnetlib.notifications.mail; + +import java.util.Date; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import javax.annotation.PostConstruct; +import javax.mail.Authenticator; +import javax.mail.Message; +import javax.mail.MessagingException; +import javax.mail.PasswordAuthentication; +import javax.mail.Session; +import javax.mail.Transport; +import javax.mail.internet.InternetAddress; +import javax.mail.internet.MimeMessage; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import com.google.common.base.Splitter; + +import eu.dnetlib.manager.wf.repository.EmailTemplateRepository; + +@Service +public class EmailDispatcher { + + private static final Log log = LogFactory.getLog(EmailDispatcher.class); + private final BlockingQueue queue = new LinkedBlockingQueue<>(); + + @Value("${'dnet.configuration.mail.sender.email'}") + private String from; + @Value("${'dnet.configuration.mail.sender.name'}") + private String fromName; + @Value("${'dnet.configuration.mail.cc'}") + private String cc; + @Value("${'dnet.configuration.mail.smtp.host'}") + private String smtpHost; + @Value("${'dnet.configuration.mail.smtp.port'}") + private final int smtpPort = 587; + @Value("${'dnet.configuration.mail.smtp.user'}") + private String smtpUser; + @Value("${'dnet.configuration.mail.smtp.password'}") + private String smtpPassword; + @Value("${'dnet.configuration.baseUrl'}") + private String baseUrl; + @Value("${'dnet.configuration.infrastructure'}") + private String infrastructure; + + @Autowired + private EmailTemplateRepository emailTemplateRepository; + + @PostConstruct + private void init() { + new Thread(() -> { + while (true) { + try { + final Message message = this.queue.take(); + if (message != null) { + try { + log.info("Sending mail..."); + Transport.send(message); + log.info("...sent"); + } catch (final MessagingException e) { + log.error("Error sending email", e); + this.queue.add(message); + } + } + } catch (final InterruptedException e1) { + throw new RuntimeException(e1); + } + } + }).run(); + } + + public void sendMail(final String to, final String subject, final String message) { + try { + final Session session = Session.getInstance(obtainProperties(), obtainAuthenticator()); + + final MimeMessage mimeMessage = new MimeMessage(session); + mimeMessage.setFrom(new InternetAddress(this.from, this.fromName)); + mimeMessage.setSubject(subject); + mimeMessage.setContent(message, "text/html; charset=utf-8"); + mimeMessage.setSentDate(new Date()); + + mimeMessage.addRecipient(Message.RecipientType.TO, new InternetAddress(to)); + + if (this.cc != null && !this.cc.isEmpty()) { + for (final String aCC : Splitter.on(",").omitEmptyStrings().trimResults().split(cc)) { + mimeMessage.addRecipient(Message.RecipientType.CC, new InternetAddress(aCC)); + } + } + + this.queue.add(mimeMessage); + + log.info("Mail to " + to + " in queue"); + } catch (final Exception e) { + log.error("Error sending mail", e); + } + } + + public void sendStoredMail(final String to, final String emailId, final Map params) { + + // TODO use a real template library + emailTemplateRepository.findById(emailId).ifPresent(tmpl -> { + String msg = tmpl.getMessage(); + String subject = tmpl.getSubject(); + + for (final Entry e : params.entrySet()) { + msg = msg.replaceAll("{" + e.getKey() + "}", e.getValue().toString()); + subject = subject.replaceAll("{" + e.getKey() + "}", e.getValue().toString()); + } + sendMail(to, subject, msg); + }); + + } + + private Properties obtainProperties() { + final Properties props = new Properties(); + props.put("mail.transport.protocol", "smtp"); + props.put("mail.smtp.host", this.smtpHost); + props.put("mail.smtp.port", this.smtpPort); + props.put("mail.smtp.auth", Boolean.toString(this.smtpUser != null && !this.smtpUser.isEmpty())); + return props; + } + + private Authenticator obtainAuthenticator() { + if (this.smtpUser == null || this.smtpUser.isEmpty()) { return null; } + + return new Authenticator() { + + private final PasswordAuthentication authentication = new PasswordAuthentication(EmailDispatcher.this.smtpUser, EmailDispatcher.this.smtpPassword); + + @Override + protected PasswordAuthentication getPasswordAuthentication() { + return this.authentication; + } + + }; + } + +} diff --git a/libs/dnet-wf-service/pom.xml b/libs/dnet-wf-service/pom.xml new file mode 100644 index 00000000..6a398249 --- /dev/null +++ b/libs/dnet-wf-service/pom.xml @@ -0,0 +1,48 @@ + + + + eu.dnetlib.dhp + libs + 3.3.3-SNAPSHOT + ../ + + + 4.0.0 + dnet-wf-service + jar + + + + eu.dnetlib.dhp + dnet-is-services + ${project.version} + + + + eu.dnetlib.dhp + dnet-data-services + ${project.version} + + + + + org.junit.jupiter + junit-jupiter + test + + + + org.mockito + mockito-core + test + + + + org.mockito + mockito-junit-jupiter + test + + + + + diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/NodeInfo.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/NodeInfo.java new file mode 100644 index 00000000..5301a8b0 --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/NodeInfo.java @@ -0,0 +1,26 @@ +package eu.dnetlib.manager.wf; + +import java.util.LinkedHashMap; +import java.util.Map; + +public class NodeInfo { + + private String name; + private Map params = new LinkedHashMap<>(); + + public String getName() { + return name; + } + + public void setName(final String name) { + this.name = name; + } + + public Map getParams() { + return params; + } + + public void setParams(final Map params) { + this.params = params; + } +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/WfManager.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/WfManager.java new file mode 100644 index 00000000..8955711d --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/WfManager.java @@ -0,0 +1,20 @@ +package eu.dnetlib.manager.wf; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.stereotype.Service; + +@Service +public class WfManager { + + @Autowired + private ApplicationContext applicationContext; + + /* + * public Object obtainBaseNode(final String name) { return applicationContext.getBean(name); } + * + * public NodeInfo obtainInfo(final BaseNode node) { final NodeInfo info = new NodeInfo(); final WfNode annotation = + * node.getClass().getAnnotation(WfNode.class); info.setName(annotation.name()); for (final WfParam p : annotation.inputParams()) { + * info.getParams().put(p.name(), p.type()); } return info; } + */ +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/annotations/StreamMimeType.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/annotations/StreamMimeType.java new file mode 100644 index 00000000..ee5f811f --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/annotations/StreamMimeType.java @@ -0,0 +1,8 @@ +package eu.dnetlib.manager.wf.annotations; + +public enum StreamMimeType { + XML, + JSON, + TEXT, + UNDEFINED +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/annotations/WfNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/annotations/WfNode.java new file mode 100644 index 00000000..fa5930f1 --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/annotations/WfNode.java @@ -0,0 +1,20 @@ +package eu.dnetlib.manager.wf.annotations; + +public @interface WfNode { + + String name(); + + WfNodeOperation operation(); + + Class inputStreamType() default void.class; + + StreamMimeType inputStreamMimeType() default StreamMimeType.UNDEFINED; + + Class outputStreamType() default void.class; + + StreamMimeType outputStreamMimeType() default StreamMimeType.UNDEFINED; + + WfParam[] inputParams() default {}; + + WfParam[] outputParams() default {}; +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/annotations/WfNodeOperation.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/annotations/WfNodeOperation.java new file mode 100644 index 00000000..05a88378 --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/annotations/WfNodeOperation.java @@ -0,0 +1,11 @@ +package eu.dnetlib.manager.wf.annotations; + +public enum WfNodeOperation { + CREATE, + DROP, + READ, + WRITE, + PRODUCER, + TRANSFORM, + SETENV +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/annotations/WfParam.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/annotations/WfParam.java new file mode 100644 index 00000000..5566dc12 --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/annotations/WfParam.java @@ -0,0 +1,8 @@ +package eu.dnetlib.manager.wf.annotations; + +public @interface WfParam { + + String name(); + + String type(); +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/cron/ScheduledWorkflowLauncher.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/cron/ScheduledWorkflowLauncher.java new file mode 100644 index 00000000..4f07469c --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/cron/ScheduledWorkflowLauncher.java @@ -0,0 +1,138 @@ +package eu.dnetlib.manager.wf.cron; + +import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.scheduling.support.CronExpression; +import org.springframework.stereotype.Service; + +import eu.dnetlib.manager.wf.WorkflowLogger; +import eu.dnetlib.manager.wf.model.WorkflowInstance; +import eu.dnetlib.manager.wf.repository.WorkflowInstanceRepository; +import eu.dnetlib.manager.wf.workflows.procs.ProcessRegistry; +import eu.dnetlib.manager.wf.workflows.procs.WorkflowExecutor; +import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess; +import eu.dnetlib.utils.DateUtils; + +@Service +public class ScheduledWorkflowLauncher { + + private static final Log log = LogFactory.getLog(ScheduledWorkflowLauncher.class); + + private static final DateUtils dateUtils = new DateUtils(); + + @Autowired + private WorkflowExecutor workflowExecutor; + + @Autowired + private ProcessRegistry processRegistry; + + @Autowired + private WorkflowInstanceRepository workflowInstanceRepository; + + @Autowired + private WorkflowLogger logger; + + @Value("${'dnet.workflow.scheduler.windowSize'}") + private int windowSize; // 1800000 are 30 minutes + + @Scheduled(cron = "${'dnet.workflow.scheduler.cron'}") + public void verifySheduledWorkflows() { + log.debug("Verifying scheduled workflows - START"); + + workflowInstanceRepository.findAll() + .stream() + .filter(WorkflowInstance::isEnabled) + .filter(WorkflowInstance::isConfigured) + .filter(WorkflowInstance::isSchedulingEnabled) + .filter(this::isNotRunning) + .filter(this::isReady) + .forEach(instance -> { + try { + workflowExecutor.startWorkflowInstance(instance, null, null); + } catch (final Exception e) { + log.error("Error launching scheduled wf instance: " + instance.getId(), e); + } + }); + log.debug("Verifying scheduled workflows - END"); + } + + private boolean isReady(final WorkflowInstance instance) { + final LocalDateTime lastExecutionDate = calculateLastExecutionDate(instance.getId()); + + final LocalDateTime now = LocalDateTime.now(); + + final String cron = instance.getCronExpression(); + + if (CronExpression.isValidExpression(cron)) { + final int minInterval = instance.getCronMinInterval(); // in minutes + + final boolean res; + if (lastExecutionDate != null) { + final long elapsed = ChronoUnit.MINUTES.between(lastExecutionDate, now); + res = elapsed > minInterval && verifyCron(cron, now); + } else { + res = verifyCron(cron, now); + } + + if (log.isDebugEnabled()) { + log.debug("**************************************************************"); + log.debug("WORKFLOW INSTANCE ID : " + instance.getId()); + log.debug("NOW : " + now); + log.debug("LAST EXECUTION DATE : " + lastExecutionDate); + log.debug("MIN INTERVAL (minutes) : " + minInterval); + log.debug("WINDOW SIZE (ms) : " + windowSize); + log.debug("MUST BE EXECUTED : " + res); + log.debug("**************************************************************"); + } + + return res; + } + + return false; + } + + private LocalDateTime calculateLastExecutionDate(final String id) { + return logger.getLastExecutionForInstance(id) + .map(e -> e.getEndDate()) + .orElse(LocalDateTime.MIN); + } + + private boolean verifyCron(final String cronExpression, final LocalDateTime now) { + try { + final CronExpression cron = CronExpression.parse(cronExpression); + + final LocalDateTime date = now.minus(windowSize, ChronoUnit.MINUTES); + final LocalDateTime nextDate = cron.next(date); + + if (log.isDebugEnabled()) { + log.debug("NEXT EXECUTION DATE: " + nextDate); + log.debug("FIRED : " + nextDate.isBefore(now)); + } + return nextDate.isBefore(now); + } catch (final Exception e) { + log.error("Error calculating next cron event: " + cronExpression, e); + return false; + } + } + + private boolean isNotRunning(final WorkflowInstance instance) { + for (final WorkflowProcess p : processRegistry.findProcsByOtherId(instance.getId())) { + switch (p.getStatus()) { + case CREATED: + return false; + case EXECUTING: + return false; + default: + break; + } + } + return true; + } + +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/AbstractJobNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/AbstractJobNode.java new file mode 100644 index 00000000..3e8807f3 --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/AbstractJobNode.java @@ -0,0 +1,45 @@ +package eu.dnetlib.manager.wf.nodes; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import eu.dnetlib.manager.wf.workflows.procs.Env; +import eu.dnetlib.manager.wf.workflows.procs.Token; + +public abstract class AbstractJobNode extends ProcessNode { + + private static final Log log = LogFactory.getLog(SimpleJobNode.class); + + protected void doExecute(final Token token) { + try { + log.debug("START NODE: " + getBeanName()); + beforeStart(token); + final String arc = execute(token.getEnv()); + beforeCompleted(token); + log.debug("END NODE (SUCCESS): " + getBeanName()); + + token.release(arc); + + } catch (final Throwable e) { + log.error("got exception while executing workflow node", e); + log.debug("END NODE (FAILED): " + getBeanName()); + beforeFailed(token); + token.releaseAsFailed(e); + } + } + + abstract protected String execute(final Env env) throws Exception; + + protected void beforeStart(final Token token) { + // For optional overwrites + } + + protected void beforeCompleted(final Token token) { + // For optional overwrites + } + + protected void beforeFailed(final Token token) { + // For optional overwrites + } + +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/AsyncJobNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/AsyncJobNode.java new file mode 100644 index 00000000..fe84b46d --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/AsyncJobNode.java @@ -0,0 +1,28 @@ +package eu.dnetlib.manager.wf.nodes; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import eu.dnetlib.manager.wf.workflows.procs.Token; + +public abstract class AsyncJobNode extends AbstractJobNode { + + /** + * logger. + */ + private static final Log log = LogFactory.getLog(AsyncJobNode.class); + + private final ExecutorService executor = Executors.newCachedThreadPool(); + + @Override + public final void execute(final Token token) { + + log.info("executing async node"); + + executor.execute(() -> doExecute(token)); + } + +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/CollectOAINode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/CollectOAINode.java new file mode 100644 index 00000000..b8321c00 --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/CollectOAINode.java @@ -0,0 +1,29 @@ +package eu.dnetlib.manager.wf.nodes; + +import java.util.function.Supplier; +import java.util.stream.Stream; + +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; + +import eu.dnetlib.manager.wf.annotations.StreamMimeType; +import eu.dnetlib.manager.wf.annotations.WfNode; +import eu.dnetlib.manager.wf.annotations.WfNodeOperation; +import eu.dnetlib.manager.wf.annotations.WfParam; + +@Component("oai_collect") +@Scope("prototype") +@WfNode(name = "oai_collect", operation = WfNodeOperation.PRODUCER, inputParams = { + @WfParam(name = "configuration", type = "ApiDescriptor.class"), +}, outputStreamType = String.class, outputStreamMimeType = StreamMimeType.XML) +public class CollectOAINode implements Supplier> { + + private String datasourceID; + + @Override + public Stream get() { + // TODO Auto-generated method stub + return null; + } + +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/DefaultJobNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/DefaultJobNode.java new file mode 100644 index 00000000..32a8c750 --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/DefaultJobNode.java @@ -0,0 +1,21 @@ +package eu.dnetlib.manager.wf.nodes; + +import eu.dnetlib.manager.wf.workflows.graph.Arc; +import eu.dnetlib.manager.wf.workflows.procs.Token; + +/** + * Created by michele on 26/11/15. + */ +public final class DefaultJobNode extends ProcessNode { + + public DefaultJobNode(final String name) { + super(); + setNodeName(name); + } + + @Override + public void execute(final Token token) { + token.setNextArc(Arc.DEFAULT_ARC); + token.release(); + } +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/LaunchWorkflowJobNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/LaunchWorkflowJobNode.java new file mode 100644 index 00000000..1818bda1 --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/LaunchWorkflowJobNode.java @@ -0,0 +1,78 @@ +package eu.dnetlib.manager.wf.nodes; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.annotation.Autowired; + +import eu.dnetlib.manager.wf.workflows.graph.Arc; +import eu.dnetlib.manager.wf.workflows.procs.ProcessAware; +import eu.dnetlib.manager.wf.workflows.procs.ProcessRegistry; +import eu.dnetlib.manager.wf.workflows.procs.Token; +import eu.dnetlib.manager.wf.workflows.procs.WorkflowExecutor; +import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess; +import eu.dnetlib.manager.wf.workflows.util.ProcessCallback; +import eu.dnetlib.manager.wf.workflows.util.SubWorkflowProgressProvider; + +/** + * Created by michele on 18/11/15. + */ +public class LaunchWorkflowJobNode extends ProcessNode implements ProcessAware { + + private static final Log log = LogFactory.getLog(LaunchWorkflowJobNode.class); + + private String wfInstanceId; + + @Autowired + private WorkflowExecutor executor; + + @Autowired + private ProcessRegistry processRegistry; + + private WorkflowProcess process; + + @Override + public final void execute(final Token token) { + + try { + final String procId = executor.startWorkflowInstance(getWfInstanceId(), new ProcessCallback() { + + @Override + public void onSuccess() { + log.debug("Child workflow has been completed successfully"); + token.setNextArc(Arc.DEFAULT_ARC); + token.release(); + } + + @Override + public void onFail() { + log.error("Child workflow is failed"); + token.releaseAsFailed("Child workflow is failed"); + } + }, process.getWfInstanceId()); + + if (log.isDebugEnabled()) { + log.debug("The child workflow [instance: " + getWfInstanceId() + "] is starting with procId: " + procId); + } + + token.setProgressProvider(new SubWorkflowProgressProvider(procId, processRegistry)); + + } catch (final Throwable e) { + log.error("got exception while launching child workflow", e); + token.releaseAsFailed(e); + } + } + + public String getWfInstanceId() { + return wfInstanceId; + } + + public void setWfInstanceId(final String wfInstanceId) { + this.wfInstanceId = wfInstanceId; + } + + @Override + public void setProcess(final WorkflowProcess process) { + this.process = process; + } + +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/NodeStatus.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/NodeStatus.java new file mode 100644 index 00000000..ea9aa2cb --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/NodeStatus.java @@ -0,0 +1,8 @@ +package eu.dnetlib.manager.wf.nodes; + +public enum NodeStatus { + CONFIGURED, + NOT_CONFIGURED, + DISABLED, + SYSTEM +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/ProcessNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/ProcessNode.java new file mode 100644 index 00000000..5052bb31 --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/ProcessNode.java @@ -0,0 +1,39 @@ +package eu.dnetlib.manager.wf.nodes; + +import org.springframework.beans.factory.BeanNameAware; + +import eu.dnetlib.manager.wf.workflows.procs.Token; + +/** + * Created by michele on 19/11/15. + */ +public abstract class ProcessNode implements BeanNameAware { + + private String beanName; + + private String nodeName; + + public abstract void execute(final Token token); + + public String getBeanName() { + return this.beanName; + } + + @Override + public void setBeanName(final String beanName) { + this.beanName = beanName; + } + + public String getNodeName() { + return this.nodeName; + } + + public void setNodeName(final String nodeName) { + this.nodeName = nodeName; + } + + @Override + public String toString() { + return String.format("[node beanName=%s, name=%s]", this.beanName, this.nodeName); + } +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/SimpleJobNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/SimpleJobNode.java new file mode 100644 index 00000000..1dd14429 --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/SimpleJobNode.java @@ -0,0 +1,12 @@ +package eu.dnetlib.manager.wf.nodes; + +import eu.dnetlib.manager.wf.workflows.procs.Token; + +public abstract class SimpleJobNode extends AbstractJobNode { + + @Override + public final void execute(final Token token) { + doExecute(token); + } + +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/SuccessNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/SuccessNode.java new file mode 100644 index 00000000..022d3cec --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/SuccessNode.java @@ -0,0 +1,17 @@ +package eu.dnetlib.manager.wf.nodes; + +import eu.dnetlib.manager.wf.workflows.procs.Token; + +/** + * Created by michele on 26/11/15. + */ +public class SuccessNode extends ProcessNode { + + public SuccessNode() { + super(); + setNodeName("success"); + } + + @Override + public void execute(final Token token) {} +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/notification/EmailSender.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/notification/EmailSender.java new file mode 100644 index 00000000..e844d6d1 --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/notification/EmailSender.java @@ -0,0 +1,46 @@ +package eu.dnetlib.manager.wf.notification; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import eu.dnetlib.manager.wf.model.NotificationCondition; +import eu.dnetlib.manager.wf.repository.WorkflowSubscriptionRepository; +import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess; +import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess.Status; +import eu.dnetlib.notifications.mail.EmailDispatcher; + +@Service +public class EmailSender { + + private static final Log log = LogFactory.getLog(EmailSender.class); + + @Autowired + private WorkflowSubscriptionRepository wfSubscriptionRepository; + + @Autowired + private EmailDispatcher dispatcher; + + public void sendMails(final WorkflowProcess proc) { + + wfSubscriptionRepository.findByWfInstanceId(proc.getWfInstanceId()).forEach(s -> { + if (s.getCondition() == NotificationCondition.ALWAYS || + s.getCondition() == NotificationCondition.ONLY_FAILED && proc.getStatus() == Status.FAILURE || + s.getCondition() == NotificationCondition.ONLY_SUCCESS && proc.getStatus() == Status.SUCCESS) { + try { + final Map params = new HashMap<>(); + + dispatcher.sendStoredMail(s.getEmail(), s.getMessageId(), params); + + } catch (final Exception e) { + log.error("Error sending mail to " + s.getEmail(), e); + } + } + + }); + } +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/Arc.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/Arc.java new file mode 100644 index 00000000..a0347f36 --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/Arc.java @@ -0,0 +1,35 @@ +package eu.dnetlib.manager.wf.workflows.graph; + +import org.apache.commons.lang3.StringUtils; + +public class Arc { + + public static final String DEFAULT_ARC = null; + + private final String name; + private final String from; + private final String to; + + public Arc(final String name, final String from, final String to) { + this.name = name; + this.from = from; + this.to = to; + } + + public String getName() { + return this.name; + } + + public String getFrom() { + return this.from; + } + + public String getTo() { + return this.to; + } + + @Override + public String toString() { + return String.format("[ %s: %s -> %s ]", StringUtils.isBlank(this.name) ? "DEFAULT" : this.name, this.from, this.to); + } +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/Graph.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/Graph.java new file mode 100644 index 00000000..c9027e7f --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/Graph.java @@ -0,0 +1,80 @@ +package eu.dnetlib.manager.wf.workflows.graph; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.lang3.StringUtils; + +public class Graph { + + private final Map nodes = new HashMap<>(); + private List arcs = new ArrayList<>(); + + public void addArc(final Arc arc) { + this.arcs.add(arc); + } + + public void addNode(final GraphNode node) { + this.nodes.put(node.getName(), node); + } + + public Set nodeNames() { + return this.nodes.keySet(); + } + + public Collection nodes() { + return this.nodes.values(); + } + + public GraphNode getNode(final String name) { + return this.nodes.get(name); + } + + public List getArcs() { + return this.arcs; + } + + public void setArcs(final List arcs) { + this.arcs = arcs; + } + + public Set startNodes() { + final Set res = new HashSet<>(); + for (final GraphNode n : this.nodes.values()) { + if (n.isStart()) { + res.add(n); + } + } + return res; + } + + public Set nextNodes(final GraphNode current, final String arcName) { + final Set res = new HashSet<>(); + for (final Arc arc : this.arcs) { + if (StringUtils.equals(arc.getFrom(), current.getName()) && StringUtils.equals(arc.getName(), arcName)) { + res.add(this.nodes.get(arc.getTo())); + } + } + return res; + } + + public int getNumberOfIncomingArcs(final GraphNode node) { + int count = 0; + for (final Arc arc : this.arcs) { + if (arc.getTo().equals(node.getName())) { + count++; + } + } + return count; + } + + @Override + public String toString() { + return "\n************************\nNodes: " + this.nodes + "\nArcs: " + this.arcs + "\n************************\n"; + } +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/GraphLoader.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/GraphLoader.java new file mode 100644 index 00000000..18f5d2d6 --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/GraphLoader.java @@ -0,0 +1,180 @@ +package eu.dnetlib.manager.wf.workflows.graph; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.env.Environment; +import org.springframework.stereotype.Service; + +import com.google.common.collect.Sets; + +import eu.dnetlib.errors.WorkflowManagerException; +import eu.dnetlib.manager.wf.model.GraphArcDbEntry; +import eu.dnetlib.manager.wf.model.GraphNodeDbEntry; +import eu.dnetlib.manager.wf.model.GraphParameterDbEntry; +import eu.dnetlib.manager.wf.model.GraphParameterValueDbEntry; +import eu.dnetlib.manager.wf.workflows.util.NodeHelper; + +@Service +public class GraphLoader { + + private static final Log log = LogFactory.getLog(GraphLoader.class); + + private final String regExRef = "\\$\\{(\\w*)\\}"; + + final Pattern pattern = Pattern.compile(regExRef, Pattern.MULTILINE); + + @Autowired + private NodeHelper nodeHelper; + + @Autowired + private Environment env; + + public Graph loadGraph(final GraphNodeDbEntry[] workflowGraph, final Map globalParams) throws WorkflowManagerException { + final Graph graph = new Graph(); + + for (final GraphNodeDbEntry node : workflowGraph) { + final String nodeName = node.getName(); + final String nodeType = node.getType(); + final boolean isStart = node.isStart(); + final boolean isJoin = node.isJoin(); + + final Map params = calculateParamsForNode(node, globalParams); + + if (isStart) { + graph.addNode(GraphNode.newStartNode(nodeName, nodeType, params)); + } else if (isJoin) { + graph.addNode(GraphNode.newJoinNode(nodeName, nodeType, params)); + } else { + graph.addNode(GraphNode.newNode(nodeName, nodeType, params)); + } + + if (graph.getArcs() != null) { + for (final GraphArcDbEntry a : node.getArcs()) { + final String arcName = a.getName(); + final String to = a.getTo(); + graph.addArc(new Arc(StringUtils.isNotBlank(arcName) ? arcName : Arc.DEFAULT_ARC, nodeName, to)); + } + } + + graph.addNode(GraphNode.newSuccessNode()); + } + + checkValidity(graph); + + return graph; + } + + public Map calculateParamsForNode(final GraphNodeDbEntry node, final Map globalParams) { + + final Map params = new HashMap<>(); + + if (node.getParams() != null) { + for (final GraphParameterDbEntry p : node.getParams()) { + + final String pName = p.getName(); + + final GraphNodeParameter pValue = calculateSimpleValue(p, globalParams); + + if (pValue != null) { + params.put(pName, pValue); + } else if (p.getMap() != null) { + + final Map map = p.getMap() + .entrySet() + .stream() + .collect(Collectors.toMap(e -> e.getKey(), e -> { + final GraphNodeParameter gnp = calculateSimpleValue(e.getValue(), globalParams); + if (gnp == null) { + final String msg = String.format("missing value for param: \"%s\"", e.getKey()); + log.debug(msg); + return GraphNodeParameter.newNullParam(); + } + return gnp; + })); + params.put(pName, GraphNodeParameter.newMapParam(map)); + } else if (p.getValues() != null) { + final List list = p.getValues() + .stream() + .map(e -> calculateSimpleValue(e, globalParams)) + .collect(Collectors.toList()); + params.put(pName, GraphNodeParameter.newListParam(list)); + } + } + } + + return params; + } + + private GraphNodeParameter calculateSimpleValue(final GraphParameterValueDbEntry graphValue, final Map globalParams) { + String value = graphValue.getValue(); + final String ref = graphValue.getRef(); + final String prop = graphValue.getProperty(); + final String envRef = graphValue.getEnv(); + + if (StringUtils.isNotBlank(ref) && StringUtils.isNotBlank(globalParams.get(ref))) { + return GraphNodeParameter.newSimpleParam(globalParams.get(ref)); + } else if (StringUtils.isNotBlank(envRef)) { + return GraphNodeParameter.newEnvParam(envRef); + } else if (StringUtils.isNotBlank(value)) { + final Matcher matcher = pattern.matcher(value); + while (matcher.find()) { + final String rName = matcher.group(1); + final String rValue = globalParams.get(rName); + if (StringUtils.isBlank(rValue)) { return null; } + value = value.replaceAll(Pattern.quote(matcher.group(0)), rValue); + System.out.println("NEW VALUE " + value); + } + return GraphNodeParameter.newSimpleParam(value); + } else if (StringUtils.isNotBlank(prop)) { + return GraphNodeParameter.newSimpleParam(env.getProperty(prop)); + } else { + return null; + } + + } + + private void checkValidity(final Graph graph) throws WorkflowManagerException { + + final Set nodesFromArcs = new HashSet<>(); + + boolean foundSuccess = false; + boolean foundStart = false; + + for (final Arc arc : graph.getArcs()) { + if (StringUtils.isBlank(arc.getFrom()) || StringUtils.isBlank(arc.getFrom())) { + throw new WorkflowManagerException("Invalid arc: missing from e/o to"); + } + if (StringUtils.equals(arc.getTo(), GraphNode.SUCCESS_NODE)) { + foundSuccess = true; + } + nodesFromArcs.add(arc.getFrom()); + nodesFromArcs.add(arc.getTo()); + } + + if (!foundSuccess) { throw new WorkflowManagerException("Arc to success not found"); } + + final Set diff = Sets.symmetricDifference(graph.nodeNames(), nodesFromArcs); + if (!diff.isEmpty()) { throw new WorkflowManagerException("Missing or invalid nodes in arcs: " + diff); } + + for (final GraphNode n : graph.nodes()) { + if (StringUtils.isBlank(n.getName())) { throw new WorkflowManagerException("Invalid node: missing name"); } + if (n.isStart()) { + foundStart = true; + } + if (!this.nodeHelper.isValidType(n.getType())) { throw new WorkflowManagerException("Invalid node type: " + n.getType()); } + } + if (!foundStart) { throw new WorkflowManagerException("Start node not found"); } + } + +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/GraphNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/GraphNode.java new file mode 100644 index 00000000..db9c5c67 --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/GraphNode.java @@ -0,0 +1,149 @@ +package eu.dnetlib.manager.wf.workflows.graph; + +import java.io.StringWriter; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; + +import eu.dnetlib.manager.wf.workflows.procs.Env; + +public class GraphNode { + + public static final String SUCCESS_NODE = "success"; + + private final String name; + private final String type; + private final boolean isStart; + private final boolean isJoin; + private final boolean isSucessNode; + private final Map nodeParams; + + private GraphNode(final String name, + final String type, + final boolean isStart, + final boolean isJoin, + final boolean isSuccessNode, + final Map nodeParams) { + this.name = name; + this.type = type; + this.isStart = isStart; + this.isJoin = isJoin; + this.isSucessNode = isSuccessNode; + this.nodeParams = nodeParams; + } + + public static GraphNode newNode(final String name, + final String type, + final Map nodeParams) { + return new GraphNode(name, type, false, false, false, nodeParams); + } + + public static GraphNode newStartNode(final String name, + final String type, + final Map nodeParams) { + return new GraphNode(name, type, true, false, false, nodeParams); + } + + public static GraphNode newJoinNode(final String name, + final String type, + final Map nodeParams) { + return new GraphNode(name, type, false, true, false, nodeParams); + } + + public static GraphNode newSuccessNode() { + return new GraphNode(SUCCESS_NODE, null, false, true, true, null); + } + + public String getName() { + return this.name; + } + + public String getType() { + return this.type; + } + + public boolean isStart() { + return this.isStart; + } + + public boolean isJoin() { + return this.isJoin; + } + + public boolean isSucessNode() { + return this.isSucessNode; + } + + @Override + public String toString() { + final StringWriter sw = new StringWriter(); + sw.append("[ name: "); + sw.append(this.name); + if (StringUtils.isNotBlank(this.type)) { + sw.append(", type: "); + sw.append(this.type); + } + if (isStart()) { + sw.append(" - isStart"); + } + if (isJoin()) { + sw.append(" - isJoin"); + } + sw.append(" ]"); + return sw.toString(); + } + + public Map getNodeParams() { + return this.nodeParams; + } + + public Map resolveParamsWithNoEnv() { + return resolveParams(null); + } + + @SuppressWarnings("unchecked") + public Map resolveParams(final Env env) { + final Map params = new HashMap<>(); + + if (this.nodeParams != null) { + + for (final Map.Entry e : this.nodeParams.entrySet()) { + final String pName = e.getKey(); + final GraphNodeParameter param = e.getValue(); + + if (param.isEnvParam()) { + params.put(pName, resolveFromEnv(param, env)); + } else if (param.isMap()) { + + final Map map = new HashMap<>(); + + for (final Map.Entry e1 : ((Map) param.getValue()).entrySet()) { + map.put(e1.getKey(), e1.getValue().isEnvParam() ? resolveFromEnv(e1.getValue(), env) : e1.getValue().getValue()); + } + + params.put(pName, map); + + } else if (param.isList()) { + params.put(pName, ((List) param.getValue()) + .stream() + .map(p -> p.isEnvParam() ? resolveFromEnv(p, env) : p.getValue()) + .collect(Collectors.toList())); + + } else { + params.put(pName, param.getValue()); + } + + } + } + + return params; + } + + private Object resolveFromEnv(final GraphNodeParameter param, final Env env) { + return env != null ? env.getAttribute(param.getEnvVariable()) : "[this value will be resolved using the runtime ENV]"; + } + +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/GraphNodeParameter.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/GraphNodeParameter.java new file mode 100644 index 00000000..bbe054eb --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/graph/GraphNodeParameter.java @@ -0,0 +1,56 @@ +package eu.dnetlib.manager.wf.workflows.graph; + +import java.util.List; +import java.util.Map; + +public class GraphNodeParameter { + + private final Object value; + private final String envVariable; + + private GraphNodeParameter(final Object value, final String envVariable) { + this.value = value; + this.envVariable = envVariable; + } + + public static GraphNodeParameter newNullParam() { + return new GraphNodeParameter(null, null); + } + + public static GraphNodeParameter newSimpleParam(final Object value) { + return new GraphNodeParameter(value, null); + } + + public static GraphNodeParameter newMapParam(final Map map) { + return new GraphNodeParameter(map, null); + } + + public static GraphNodeParameter newListParam(final List list) { + return new GraphNodeParameter(list, null); + } + + public static GraphNodeParameter newEnvParam(final String envVariable) { + return new GraphNodeParameter(null, envVariable); + } + + public Object getValue() { + return this.value; + } + + public boolean isEnvParam() { + return this.envVariable != null; + } + + public String getEnvVariable() { + return this.envVariable; + } + + public boolean isMap() { + return this.value != null && (this.value instanceof Map); + } + + public boolean isList() { + return this.value != null && (this.value instanceof List); + } + +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/Env.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/Env.java new file mode 100644 index 00000000..30ef34e7 --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/Env.java @@ -0,0 +1,54 @@ +package eu.dnetlib.manager.wf.workflows.procs; + +import java.util.HashMap; +import java.util.Map; + +/** + * Created by michele on 23/11/15. + */ +public class Env { + + private final Map attrs; + + public Env() { + this.attrs = new HashMap<>(); + } + + public Env(final Map attrs) { + this.attrs = attrs; + } + + public Map getAttributes() { + return attrs; + } + + public void clear() { + attrs.clear(); + } + + public void addAttributes(final Map map) { + if (map != null) { + attrs.putAll(map); + } + } + + public void setAttribute(final String name, final Object value) { + attrs.put(name, value); + } + + public Object getAttribute(final String name) { + return attrs.get(name); + } + + public T getAttribute(final String name, Class clazz) { + return clazz.cast(attrs.get(name)); + } + + public boolean hasAttribute(final String name) { + return attrs.containsKey(name); + } + + public Object removeAttribute(final String name) { + return attrs.remove(name); + } +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessAware.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessAware.java new file mode 100644 index 00000000..37645ccb --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessAware.java @@ -0,0 +1,10 @@ +package eu.dnetlib.manager.wf.workflows.procs; + +/** + * Created by michele on 24/11/15. + */ +public interface ProcessAware { + + void setProcess(WorkflowProcess process); + +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessEngine.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessEngine.java new file mode 100644 index 00000000..64beaccd --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessEngine.java @@ -0,0 +1,203 @@ +package eu.dnetlib.manager.wf.workflows.procs; + +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import com.google.common.base.Throwables; + +import eu.dnetlib.manager.wf.WorkflowLogger; +import eu.dnetlib.manager.wf.model.WfProcessExecution; +import eu.dnetlib.manager.wf.nodes.ProcessNode; +import eu.dnetlib.manager.wf.notification.EmailSender; +import eu.dnetlib.manager.wf.workflows.graph.GraphNode; +import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess.Status; +import eu.dnetlib.manager.wf.workflows.util.NodeHelper; +import eu.dnetlib.manager.wf.workflows.util.NodeTokenCallback; +import eu.dnetlib.manager.wf.workflows.util.WorkflowsConstants; + +@Service +public class ProcessEngine { + + private static final Log log = LogFactory.getLog(ProcessEngine.class); + + @Autowired + private NodeHelper nodeHelper; + @Autowired + private EmailSender emailSender; + @Autowired + private WorkflowLogger wfLogger; + + public void startProcess(final WorkflowProcess process) { + log.info(process.getGraph()); + + log.info("Starting workflow: " + process); + + final LocalDateTime now = LocalDateTime.now(); + process.setStatus(WorkflowProcess.Status.EXECUTING); + process.setStartDate(now); + process.setLastActivityDate(now); + + try { + for (final GraphNode node : process.getGraph().startNodes()) { + final ProcessNode pNode = nodeHelper.newProcessNode(node, process, process.getEnv()); + final Token token = new Token(node.getName(), newNodeTokenCallback(process, node)); + + token.getEnv().addAttributes(process.getEnv().getAttributes()); + process.getTokens().add(token); + + pNode.execute(token); + } + } catch (final Throwable e) { + log.error("WorkflowProcess node instantiation failed", e); + process.setStatus(WorkflowProcess.Status.FAILURE); + } + } + + public void releaseToken(final WorkflowProcess process, final GraphNode oldGraphNode, final Token oldToken) { + process.setLastActivityDate(LocalDateTime.now()); + + try { + for (final GraphNode node : process.getGraph().nextNodes(oldGraphNode, oldToken.getNextArc())) { + if (node.isJoin() || node.isSucessNode()) { + if (!process.getPausedJoinNodeTokens().containsKey(node.getName())) { + process.getPausedJoinNodeTokens().put(node.getName(), new ArrayList()); + } + + final List list = process.getPausedJoinNodeTokens().get(node.getName()); + + list.add(oldToken); + + if (list.size() == process.getGraph().getNumberOfIncomingArcs(node)) { + final Token token = new Token(node.getName(), newNodeTokenCallback(process, node)); + token.getEnv().addAttributes(mergeEnvParams(list.toArray(new Token[list.size()]))); + final ProcessNode pNode = nodeHelper.newProcessNode(node, process, token.getEnv()); + + process.getTokens().add(token); + process.setLastActivityDate(LocalDateTime.now()); + + if (node.isSucessNode()) { + markAsCompleted(process, token); + } else { + pNode.execute(token); + } + } + } else { + final Token token = new Token(node.getName(), newNodeTokenCallback(process, node)); + token.getEnv().addAttributes(oldToken.getEnv().getAttributes()); + final ProcessNode pNode = nodeHelper.newProcessNode(node, process, token.getEnv()); + + process.getTokens().add(token); + process.setLastActivityDate(LocalDateTime.now()); + pNode.execute(token); + } + } + } catch (final Throwable e) { + log.error("WorkflowProcess node instantiation failed", e); + process.setStatus(WorkflowProcess.Status.FAILURE); + process.setError(e.getMessage()); + process.setErrorStacktrace(Throwables.getStackTraceAsString(e)); + process.setLastActivityDate(LocalDateTime.now()); + } + + } + + private NodeTokenCallback newNodeTokenCallback(final WorkflowProcess process, final GraphNode node) { + return new NodeTokenCallback() { + + @Override + public void onSuccess(final Token token) { + releaseToken(process, node, token); + } + + @Override + public void onFail(final Token token) { + completeProcess(process, token); + } + }; + } + + private Map mergeEnvParams(final Token... tokens) { + final Map map = new HashMap<>(); + Arrays.stream(tokens).forEach(t -> map.putAll(t.getEnv().getAttributes())); + return map; + } + + private void markAsCompleted(final WorkflowProcess process, final Token token) { + completeProcess(process, token); + } + + private void completeProcess(final WorkflowProcess process, final Token token) { + if (token.isActive()) { + if (StringUtils.isNotBlank(token.getError())) { + token.releaseAsFailed(token.getError()); + } else { + token.release(); + } + } + + final LocalDateTime now = token.getEndDate(); + + process.setLastActivityDate(now); + process.setEndDate(now); + process.setStatus(token.isFailed() ? WorkflowProcess.Status.FAILURE : WorkflowProcess.Status.SUCCESS); + + if (token.isFailed()) { + process.setStatus(Status.FAILURE); + process.setError(token.getError()); + process.setErrorStacktrace(token.getErrorStackTrace()); + process.setLastActivityDate(LocalDateTime.now()); + } + + if (process.getCallback() != null) { + if (token.isFailed()) { + process.getCallback().onFail(); + } else { + process.getCallback().onSuccess(); + } + } + + final Map details = new LinkedHashMap<>(); + details.putAll(process.getOutputParams()); + details.put(WorkflowsConstants.LOG_WF_PRIORITY, "" + process.getPriority()); + details.put(WorkflowsConstants.LOG_WF_ID, process.getWfId()); + details.put(WorkflowsConstants.LOG_WF_ID, process.getWfInstanceId()); + + if (process.getError() != null) { + details.put(WorkflowsConstants.LOG_SYSTEM_ERROR, process.getError()); + details.put(WorkflowsConstants.LOG_SYSTEM_ERROR_STACKTRACE, process.getErrorStacktrace()); + } + + final WfProcessExecution pe = new WfProcessExecution(); + pe.setProcessId(process.getId()); + pe.setName(process.getName()); + pe.setFamily(process.getFamily()); + + pe.setDsId(process.getDsId()); + pe.setDsName(process.getDsName()); + pe.setDsApi(process.getDsInterface()); + + pe.setStartDate(process.getStartDate()); + pe.setEndDate(process.getEndDate()); + + pe.setStatus(process.getStatus().toString()); + + pe.setDetails(details); + + wfLogger.saveProcessExecution(pe); + + emailSender.sendMails(process); + + } + +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessFactory.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessFactory.java new file mode 100644 index 00000000..8cf041e3 --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessFactory.java @@ -0,0 +1,67 @@ +package eu.dnetlib.manager.wf.workflows.procs; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import eu.dnetlib.errors.WorkflowManagerException; +import eu.dnetlib.manager.wf.model.WorkflowDbEntry; +import eu.dnetlib.manager.wf.model.WorkflowInstance; +import eu.dnetlib.manager.wf.workflows.graph.GraphLoader; +import eu.dnetlib.manager.wf.workflows.util.ProcessCallback; + +@Component +public class ProcessFactory { + + private static final Log log = LogFactory.getLog(ProcessFactory.class); + + private String oldGeneratedId = ""; + + private final DateTimeFormatter processIdFormatter = DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss_S"); + + @Autowired + private GraphLoader graphLoader; + + public WorkflowProcess newProcess(final WorkflowDbEntry wf, + final WorkflowInstance instance, + final ProcessCallback processCallback, + final String parent) throws WorkflowManagerException { + + final Map globalParams = new HashMap<>(); + globalParams.putAll(instance.getSystemParams()); + globalParams.putAll(instance.getUserParams()); + + return new WorkflowProcess(generateProcessId(), + wf.getName(), + wf.getFamily(), + instance.getDsId(), + instance.getDsName(), + instance.getApiId(), + graphLoader.loadGraph(wf.getGraph(), globalParams), + instance.getPriority(), + wf.getId(), + instance.getId(), + globalParams, + + processCallback, parent); + + } + + private synchronized String generateProcessId() { + String id = ""; + do { + id = "wf_" + LocalDateTime.now().format(processIdFormatter); + log.info("Generated processID " + id); + } while (id.equals(oldGeneratedId)); + + oldGeneratedId = id; + + return id; + } +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessRegistry.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessRegistry.java new file mode 100644 index 00000000..45533072 --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/ProcessRegistry.java @@ -0,0 +1,132 @@ +package eu.dnetlib.manager.wf.workflows.procs; + +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.PriorityBlockingQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; + +import eu.dnetlib.errors.WorkflowManagerException; +import eu.dnetlib.manager.wf.workflows.util.WorkflowsConstants; + +@Service +public class ProcessRegistry { + + private static final Log log = LogFactory.getLog(ProcessRegistry.class); + private final BiMap procs = HashBiMap.create(); + private final Map> byOtherId = new HashMap<>(); + + private final PriorityBlockingQueue pendingProcs = new PriorityBlockingQueue<>(); + + @Value("${'dnet.wf.registry.size'}") + private int maxSize; + + synchronized public int countRunningWfs() { + int count = 0; + for (final Map.Entry e : this.procs.entrySet()) { + final WorkflowProcess proc = e.getValue(); + if (!proc.isTerminated()) { + count++; + } + } + return count; + } + + public WorkflowProcess findProcess(final String procId) { + return this.procs.get(procId); + } + + public Set listProcesses() { + return this.procs.values(); + } + + public Collection findProcsByOtherId(final String id) { + synchronized (this) { + final Collection res = this.byOtherId.get(id); + return res != null ? res : new ArrayList<>(); + } + } + + public String registerProcess(final WorkflowProcess process, final String... ids) throws WorkflowManagerException { + if (this.procs.containsValue(process) || this.procs.containsKey(process.getId())) { + log.error("Already registerd process: " + process); + throw new WorkflowManagerException("Already registerd process: " + process); + } + + if (this.procs.size() >= this.maxSize) { + removeOldestProcess(); + } + + this.procs.put(process.getId(), process); + for (final String id : ids) { + synchronized (this) { + if (!this.byOtherId.containsKey(id)) { + this.byOtherId.put(id, new ArrayList()); + } + this.byOtherId.get(id).add(process); + } + } + + synchronized (this.pendingProcs) { + if (this.pendingProcs.size() > WorkflowsConstants.MAX_PENDING_PROCS_SIZE) { + log.warn("Wf [" + process.getName() + "] not launched, Max number of pending procs reached: " + WorkflowsConstants.MAX_PENDING_PROCS_SIZE); + throw new WorkflowManagerException("Max number of pending procs reached: " + WorkflowsConstants.MAX_PENDING_PROCS_SIZE); + } + this.pendingProcs.put(process); + + log.info("WorkflowProcess [" + process + "] in queue, priority=" + process.getPriority()); + } + + return process.getId(); + } + + private void removeOldestProcess() { + LocalDateTime oldDate = LocalDateTime.now(); + String oldId = null; + + for (final Map.Entry e : this.procs.entrySet()) { + final WorkflowProcess proc = e.getValue(); + + if (proc.isTerminated()) { + final LocalDateTime date = proc.getLastActivityDate(); + if (date.isBefore(oldDate)) { + oldDate = date; + oldId = e.getKey(); + } + } + } + + if (oldId != null) { + unregisterProcess(oldId); + } + + } + + public void unregisterProcess(final String procId) { + synchronized (this) { + final WorkflowProcess process = this.procs.remove(procId); + if (process != null) { + for (final Collection processes : this.byOtherId.values()) { + processes.remove(process); + } + } + } + } + + public WorkflowProcess nextProcessToStart() { + synchronized (this.pendingProcs) { + return this.pendingProcs.poll(); + } + } + +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/Token.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/Token.java new file mode 100644 index 00000000..9524dfe1 --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/Token.java @@ -0,0 +1,147 @@ +package eu.dnetlib.manager.wf.workflows.procs; + +import java.time.LocalDateTime; +import java.util.UUID; + +import com.google.common.base.Throwables; + +import eu.dnetlib.manager.wf.workflows.graph.Arc; +import eu.dnetlib.manager.wf.workflows.util.NodeTokenCallback; +import eu.dnetlib.manager.wf.workflows.util.ProgressProvider; + +/** + * Created by michele on 19/11/15. + */ +public class Token { + + private final String id; + private final String nodeName; + private final Env env = new Env(); + private final LocalDateTime startDate; + private final NodeTokenCallback callback; + private boolean failed = false; + private LocalDateTime endDate = LocalDateTime.MIN; + private String nextArc = Arc.DEFAULT_ARC; + private boolean active = true; + private String error = ""; + private String errorStackTrace = ""; + private ProgressProvider progressProvider; + + public Token(final String nodeName, final NodeTokenCallback callback) { + this.id = "token-" + UUID.randomUUID(); + this.nodeName = nodeName; + this.startDate = LocalDateTime.now(); + this.callback = callback; + } + + public String getId() { + return this.id; + } + + public Env getEnv() { + return this.env; + } + + public LocalDateTime getStartDate() { + return this.startDate; + } + + public LocalDateTime getEndDate() { + return this.endDate; + } + + public void setEndDate(final LocalDateTime endDate) { + this.endDate = endDate; + } + + public String getNextArc() { + return this.nextArc; + } + + public void setNextArc(final String nextArc) { + this.nextArc = nextArc; + } + + public boolean isActive() { + return this.active; + } + + public void setActive(final boolean active) { + this.active = active; + } + + public boolean isFailed() { + return this.failed; + } + + public void setFailed(final boolean failed) { + this.failed = failed; + } + + public void release(final String arcName) { + setNextArc(arcName); + setEndDate(LocalDateTime.now()); + setActive(false); + if (this.callback != null) { + this.callback.onSuccess(this); + } + } + + public void release() { + setEndDate(LocalDateTime.now()); + setActive(false); + if (this.callback != null) { + this.callback.onSuccess(this); + } + } + + public void releaseAsFailed(final Throwable e) { + setEndDate(LocalDateTime.now()); + setActive(false); + setFailed(true); + setError(e.getMessage()); + setErrorStackTrace(Throwables.getStackTraceAsString(e)); + if (this.callback != null) { + this.callback.onFail(this); + } + } + + public void releaseAsFailed(final String error) { + setEndDate(LocalDateTime.now()); + setActive(false); + setFailed(true); + setError(error); + if (this.callback != null) { + this.callback.onFail(this); + } + } + + public String getNodeName() { + return this.nodeName; + } + + public String getError() { + return this.error; + } + + public void setError(final String error) { + this.error = error; + } + + public String getErrorStackTrace() { + return this.errorStackTrace; + } + + public void setErrorStackTrace(final String errorStackTrace) { + this.errorStackTrace = errorStackTrace; + } + + public ProgressProvider getProgressProvider() { + return this.progressProvider; + } + + public void setProgressProvider(final ProgressProvider progressProvider) { + this.progressProvider = progressProvider; + } + +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowExecutor.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowExecutor.java new file mode 100644 index 00000000..525d8bb1 --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowExecutor.java @@ -0,0 +1,155 @@ +package eu.dnetlib.manager.wf.workflows.procs; + +import java.util.HashMap; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import eu.dnetlib.dsm.DsmService; +import eu.dnetlib.errors.DsmException; +import eu.dnetlib.errors.WorkflowManagerException; +import eu.dnetlib.manager.wf.model.WorkflowDbEntry; +import eu.dnetlib.manager.wf.model.WorkflowInstance; +import eu.dnetlib.manager.wf.repository.WorkflowDbEntryRepository; +import eu.dnetlib.manager.wf.repository.WorkflowInstanceRepository; +import eu.dnetlib.manager.wf.workflows.graph.GraphLoader; +import eu.dnetlib.manager.wf.workflows.util.ProcessCallback; +import eu.dnetlib.manager.wf.workflows.util.WorkflowsConstants; +import eu.dnetlib.utils.Stoppable; +import eu.dnetlib.utils.StoppableDetails; + +@Service +public class WorkflowExecutor implements Stoppable { + + private static final Log log = LogFactory.getLog(WorkflowExecutor.class); + + @Autowired + private GraphLoader graphLoader; + @Autowired + private ProcessRegistry processRegistry; + @Autowired + private ProcessFactory processFactory; + @Autowired + private ProcessEngine processEngine; + @Autowired + private DsmService dsmService; + + @Autowired + private WorkflowDbEntryRepository workflowDbEntryRepository; + @Autowired + private WorkflowInstanceRepository workflowInstanceRepository; + + private boolean paused = false; + + public void init() { + Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { + if (isPaused() || processRegistry.countRunningWfs() >= WorkflowsConstants.MAX_RUNNING_PROCS_SIZE) { return; } + + final WorkflowProcess process = processRegistry.nextProcessToStart(); + if (process != null) { + processEngine.startProcess(process); + } else { + log.debug("WorkflowProcess queue is empty"); + } + }, 10, 10, TimeUnit.SECONDS); + } + + public String startRepoHiWorkflow(final String wfId, final String dsId, final String apiId, final ProcessCallback processCallback, final String parent) + throws WorkflowManagerException { + + if (isPaused()) { + log.warn("Wf " + wfId + " not launched, because WorkflowExecutor is preparing for shutdown"); + throw new WorkflowManagerException("WorkflowExecutor is preparing for shutdown"); + } + + try { + final String dsName = dsmService.getDs(dsId).getOfficialname(); + + final WorkflowInstance instance = new WorkflowInstance(); + instance.setId("REPO_HI_" + UUID.randomUUID()); + instance.setDetails(new HashMap<>()); + instance.setPriority(100); + instance.setDsId(dsId); + instance.setDsName(dsName); + instance.setApiId(apiId); + instance.setEnabled(true); + instance.setConfigured(true); + instance.setSchedulingEnabled(false); + instance.setCronExpression(""); + instance.setCronMinInterval(0); + instance.setWorkflow(wfId); + instance.setDestroyWf(null); + instance.setSystemParams(new HashMap<>()); + instance.setUserParams(new HashMap<>()); + + return startWorkflowInstance(instance, processCallback, parent); + } catch (final DsmException e) { + throw new WorkflowManagerException("Invalid datasource: " + dsId, e); + } + } + + public String startWorkflowInstance(final String wfInstanceId, final ProcessCallback processCallback, final String parent) throws Exception { + + if (isPaused()) { + log.warn("Wf instance " + wfInstanceId + " not launched, because WorkflowExecutor is preparing for shutdown"); + throw new WorkflowManagerException("WorkflowExecutor is preparing for shutdown"); + } + + final WorkflowInstance instance = + workflowInstanceRepository.findById(wfInstanceId).orElseThrow(() -> new WorkflowManagerException("WF instance not found: " + wfInstanceId)); + return startWorkflowInstance(instance, processCallback, parent); + } + + public String startWorkflowInstance(final WorkflowInstance instance, final ProcessCallback processCallback, final String parent) + throws WorkflowManagerException { + final WorkflowDbEntry wf = workflowDbEntryRepository.findById(instance.getWorkflow()) + .orElseThrow(() -> new WorkflowManagerException("WF not found: " + instance.getWorkflow())); + + if (!instance.isEnabled() || !instance.isConfigured()) { + log.warn("Wf instance " + instance.getId() + " not launched, because it is not ready to start or it is disabled"); + throw new WorkflowManagerException("Workflow " + instance.getId() + " is not ready to start"); + } + + final WorkflowProcess process = + processFactory.newProcess(wf, instance, processCallback, parent); + + return processRegistry.registerProcess(process, instance.getId()); + } + + @Override + public void stop() { + paused = true; + } + + @Override + public void resume() { + paused = false; + } + + @Override + public StoppableDetails getStopDetails() { + final int count = processRegistry.countRunningWfs(); + + final StoppableDetails.StopStatus status = + isPaused() ? count == 0 ? StoppableDetails.StopStatus.STOPPED : StoppableDetails.StopStatus.STOPPING : StoppableDetails.StopStatus.RUNNING; + + return new StoppableDetails("D-NET workflow manager", "Running workflows: " + count, status); + } + + public ProcessRegistry getProcessRegistry() { + return processRegistry; + } + + public boolean isPaused() { + return paused; + } + + public void setPaused(final boolean paused) { + this.paused = paused; + } +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowProcess.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowProcess.java new file mode 100644 index 00000000..9a479806 --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/procs/WorkflowProcess.java @@ -0,0 +1,258 @@ +package eu.dnetlib.manager.wf.workflows.procs; + +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; + +import org.apache.commons.lang3.math.NumberUtils; + +import eu.dnetlib.manager.wf.workflows.graph.Graph; +import eu.dnetlib.manager.wf.workflows.util.ProcessCallback; + +/** + * Created by michele on 19/11/15. + */ +public class WorkflowProcess implements Comparable { + + private final String id; + private final String name; + private final String family; + private String dsId; + private String dsName; + private String dsInterface; + private final Graph graph; + private final ProcessCallback callback; + private final int priority; + private final Env env; + private final List tokens = new CopyOnWriteArrayList<>(); + private LocalDateTime lastActivityDate; + private Status status; + private LocalDateTime startDate = LocalDateTime.MIN; + private LocalDateTime endDate = LocalDateTime.MIN; + private final String wfId; + private final String wfInstanceId; + private Map> pausedJoinNodeTokens = new HashMap<>(); + private Map globalParams; + private String error; + private String errorStacktrace; + private Map outputParams = new HashMap<>(); + private String parentProfileId; + + public WorkflowProcess( + final String id, + final String name, + final String family, + final String dsId, + final String dsName, + final String dsInterface, + final Graph graph, + final int priority, + final String wfId, + final String wfInstanceId, + final Map globalParams, + final ProcessCallback callback, + final String parentProfileId) { + this.id = id; + this.name = name; + this.family = family; + this.dsId = dsId; + this.dsName = dsName; + this.dsInterface = dsInterface; + this.graph = graph; + this.priority = priority; + this.callback = callback; + this.status = Status.CREATED; + this.env = new Env(); + this.wfId = wfId; + this.wfInstanceId = wfInstanceId; + this.globalParams = globalParams; + this.lastActivityDate = LocalDateTime.now(); + this.parentProfileId = parentProfileId; + } + + public void setParentProfileId(final String parentProfileId) { + this.parentProfileId = parentProfileId; + } + + public String getDsId() { + return dsId; + } + + public void setDsId(final String dsId) { + this.dsId = dsId; + } + + public String getDsName() { + return dsName; + } + + public void setDsName(final String dsName) { + this.dsName = dsName; + } + + public String getDsInterface() { + return dsInterface; + } + + public void setDsInterface(final String dsInterface) { + this.dsInterface = dsInterface; + } + + public Map> getPausedJoinNodeTokens() { + return pausedJoinNodeTokens; + } + + public void setPausedJoinNodeTokens(final Map> pausedJoinNodeTokens) { + this.pausedJoinNodeTokens = pausedJoinNodeTokens; + } + + public String getId() { + return id; + } + + public String getName() { + return name; + } + + public Env getEnv() { + return env; + } + + public Status getStatus() { + return status; + } + + public void setStatus(final Status status) { + this.status = status; + } + + public Graph getGraph() { + return graph; + } + + public List getTokens() { + return tokens; + } + + public void kill() { + setStatus(Status.KILLED); + } + + public ProcessCallback getCallback() { + return callback; + } + + public int getPriority() { + return priority; + } + + public boolean isTerminated() { + switch (status) { + case SUCCESS: + case FAILURE: + case KILLED: + return true; + default: + return false; + } + } + + public LocalDateTime getLastActivityDate() { + return lastActivityDate; + } + + public void setLastActivityDate(final LocalDateTime lastActivityDate) { + this.lastActivityDate = lastActivityDate; + } + + @Override + public String toString() { + return String.format("[process id='%s' name='%s']", id, name); + } + + @Override + public int compareTo(final WorkflowProcess wp) { + return NumberUtils.compare(getPriority(), wp.getPriority()); + } + + public Map getGlobalParams() { + return globalParams; + } + + public void setGlobalParams(final Map globalParams) { + this.globalParams = globalParams; + } + + public String getFamily() { + return family; + } + + public String getWfId() { + return wfId; + } + + public String getWfInstanceId() { + return wfInstanceId; + } + + public void setStartDate(final LocalDateTime startDate) { + this.startDate = startDate; + } + + public void setEndDate(final LocalDateTime endDate) { + this.endDate = endDate; + } + + public LocalDateTime getStartDate() { + return startDate; + } + + public LocalDateTime getEndDate() { + return endDate; + } + + public enum Status { + CREATED, + EXECUTING, + SUCCESS, + FAILURE, + KILLED; + } + + public enum StartMode { + AUTO, + MANUAL, + DISABLED + } + + public String getError() { + return error; + } + + public void setError(final String error) { + this.error = error; + } + + public String getErrorStacktrace() { + return errorStacktrace; + } + + public void setErrorStacktrace(final String errorStacktrace) { + this.errorStacktrace = errorStacktrace; + } + + public void setOutputParams(final Map outputParams) { + this.outputParams = outputParams; + } + + public Map getOutputParams() { + return outputParams; + } + + public String getParentProfileId() { + return parentProfileId; + } + +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/NodeHelper.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/NodeHelper.java new file mode 100644 index 00000000..35cf72ce --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/NodeHelper.java @@ -0,0 +1,66 @@ +package eu.dnetlib.manager.wf.workflows.util; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.beans.BeansException; +import org.springframework.beans.PropertyAccessorFactory; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; + +import eu.dnetlib.errors.WorkflowManagerException; +import eu.dnetlib.manager.wf.nodes.DefaultJobNode; +import eu.dnetlib.manager.wf.nodes.ProcessNode; +import eu.dnetlib.manager.wf.nodes.SuccessNode; +import eu.dnetlib.manager.wf.workflows.graph.GraphNode; +import eu.dnetlib.manager.wf.workflows.procs.Env; +import eu.dnetlib.manager.wf.workflows.procs.ProcessAware; +import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess; + +/** + * Created by michele on 19/11/15. + */ +public class NodeHelper implements ApplicationContextAware { + + public static final String beanNamePrefix = "wfNode"; + private static final Log log = LogFactory.getLog(NodeHelper.class); + + private ApplicationContext applicationContext; + + public ProcessNode newProcessNode(final GraphNode node, final WorkflowProcess process, final Env env) throws WorkflowManagerException { + if (node.isSucessNode()) { + return new SuccessNode(); + } else if (StringUtils.isBlank(node.getType())) { + return new DefaultJobNode(node.getName()); + } else { + final ProcessNode pnode = this.applicationContext.getBean(beanNamePrefix + node.getType(), ProcessNode.class); + if (pnode != null) { + pnode.setNodeName(node.getName()); + // I invoke the setter methods using the static params of the graph node + try { + PropertyAccessorFactory.forBeanPropertyAccess(pnode).setPropertyValues(node.resolveParams(env)); + } catch (final Throwable e) { + throw new WorkflowManagerException(String.format("error setting parameters in wfNode %s", node.getName()), e); + } + if (pnode instanceof ProcessAware) { + ((ProcessAware) pnode).setProcess(process); + } + return pnode; + } else { + log.error("cannot find bean " + beanNamePrefix + node.getType()); + throw new WorkflowManagerException("cannot find bean " + beanNamePrefix + node.getType()); + } + } + } + + public boolean isValidType(final String type) { + return StringUtils.isBlank(type) || this.applicationContext.isPrototype(beanNamePrefix + type) && this.applicationContext + .isTypeMatch(beanNamePrefix + type, ProcessNode.class); + } + + @Override + public void setApplicationContext(final ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + } + +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/NodeTokenCallback.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/NodeTokenCallback.java new file mode 100644 index 00000000..7496186b --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/NodeTokenCallback.java @@ -0,0 +1,13 @@ +package eu.dnetlib.manager.wf.workflows.util; + +import eu.dnetlib.manager.wf.workflows.procs.Token; + +/** + * Created by michele on 26/11/15. + */ +public interface NodeTokenCallback { + + void onSuccess(Token token); + + void onFail(Token token); +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/ProcessCallback.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/ProcessCallback.java new file mode 100644 index 00000000..4f70b5da --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/ProcessCallback.java @@ -0,0 +1,12 @@ +package eu.dnetlib.manager.wf.workflows.util; + +/** + * Created by michele on 18/11/15. + */ +public interface ProcessCallback { + + void onSuccess(); + + void onFail(); + +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/ProgressProvider.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/ProgressProvider.java new file mode 100644 index 00000000..d9f84f45 --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/ProgressProvider.java @@ -0,0 +1,6 @@ +package eu.dnetlib.manager.wf.workflows.util; + +public interface ProgressProvider { + + String getProgressDescription(); +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/SubWorkflowProgressProvider.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/SubWorkflowProgressProvider.java new file mode 100644 index 00000000..499349a3 --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/SubWorkflowProgressProvider.java @@ -0,0 +1,37 @@ +package eu.dnetlib.manager.wf.workflows.util; + +import java.util.List; +import java.util.stream.Collectors; + +import eu.dnetlib.manager.wf.workflows.procs.ProcessRegistry; +import eu.dnetlib.manager.wf.workflows.procs.WorkflowProcess; + +public class SubWorkflowProgressProvider implements ProgressProvider { + + private final String procId; + private final ProcessRegistry processRegistry; + + public SubWorkflowProgressProvider(final String procId, final ProcessRegistry processRegistry) { + super(); + this.procId = procId; + this.processRegistry = processRegistry; + } + + @Override + public String getProgressDescription() { + final WorkflowProcess proc = this.processRegistry.findProcess(this.procId); + + if (proc == null) { return "-"; } + + final List list = proc.getTokens() + .stream() + .filter(t -> t.isActive()) + .map(t -> t.getProgressProvider() != null ? String.format("%s (%s)", t.getNodeName(), t.getProgressProvider().getProgressDescription()) + : t.getNodeName()) + .collect(Collectors.toList()); + if (!list.isEmpty()) { return list.stream().collect(Collectors.joining(", ")); } + + return "-"; + } + +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/ValidNodeValuesFetcher.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/ValidNodeValuesFetcher.java new file mode 100644 index 00000000..956abac4 --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/ValidNodeValuesFetcher.java @@ -0,0 +1,71 @@ +package eu.dnetlib.manager.wf.workflows.util; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.annotation.Required; + +import eu.dnetlib.errors.WorkflowManagerException; +import eu.dnetlib.manager.wf.workflows.util.ValidNodeValuesFetcher.DnetParamValue; + +public abstract class ValidNodeValuesFetcher implements Function, List> { + + private String name; + + private static final Log log = LogFactory.getLog(ValidNodeValuesFetcher.class); + + public class DnetParamValue implements Comparable { + + private final String id; + private final String name; + + public DnetParamValue(final String id, final String name) { + this.id = id; + this.name = name; + } + + public String getId() { + return this.id; + } + + public String getName() { + return this.name; + } + + @Override + public int compareTo(final DnetParamValue o) { + return getName().compareTo(o.getName()); + } + } + + @Override + final public List apply(final Map params) { + try { + return obtainValues(params); + } catch (final Throwable e) { + log.error("Error obtaing values", e); + return new ArrayList<>(); + } + } + + abstract protected List obtainValues(Map params) throws Exception; + + public String getName() { + return this.name; + } + + @Required + public void setName(final String name) { + this.name = name; + } + + protected void verifyParams(final Map params, final String... pnames) throws WorkflowManagerException { + for (final String s : pnames) { + if (!params.containsKey(s)) { throw new WorkflowManagerException("Parameter not found: " + s); } + } + } +} diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/WorkflowsConstants.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/WorkflowsConstants.java new file mode 100644 index 00000000..3be95afc --- /dev/null +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/workflows/util/WorkflowsConstants.java @@ -0,0 +1,119 @@ +package eu.dnetlib.manager.wf.workflows.util; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.springframework.beans.factory.annotation.Required; + +import com.google.gson.Gson; + +public class WorkflowsConstants { + + public static final String DATASOURCE_PREFIX = "datasource:"; + + public static final String LOG_WF_NAME = "system:wfName"; + public static final String LOG_WF_ID = "system:wfId"; + public static final String LOG_WF_INSTANCE_ID = "system:wfInstanceId"; + + public static final String LOG_WF_FAMILY = "system:family"; + public static final String LOG_WF_PRIORITY = "system:priority"; + public static final String LOG_WF_PROCESS_ID = "system:processId"; + public static final String LOG_WF_PROCESS_STATUS = "system:processStatus"; + public static final String LOG_WF_PROCESS_START_DATE = "system:startDate"; + public static final String LOG_WF_PROCESS_END_DATE = "system:endDate"; + public static final String LOG_WF_PARENT = "system:parentProfileId"; + + public static final String LOG_SYSTEM_ERROR = "system:error"; + public static final String LOG_SYSTEM_ERROR_STACKTRACE = "system:error:stacktrace"; + + public static final String LOG_DATASOURCE_NAME = WorkflowsConstants.DATASOURCE_PREFIX + "name"; + public static final String LOG_DATASOURCE_ID = WorkflowsConstants.DATASOURCE_PREFIX + "id"; + public static final String LOG_DATASOURCE_INTERFACE = WorkflowsConstants.DATASOURCE_PREFIX + "interface"; + + public static final String BLACKBOARD_IS_BLACKBOARD = "blackboard:isBlackboard"; + public static final String BLACKBOARD_JOB = "blackboard:job"; + public static final String BLACKBOARD_SERVICE_ID = "blackboard:serviceId"; + public static final String BLACKBOARD_IS_GOING = "blackboard:isOngoing"; + public static final String BLACKBOARD_PARAM_PREFIX = "blackboard:param:"; + + // public static final String DATASOURCE_ACRONYM = WorkflowsConstants.DATASOURCE_PREFIX + "acronym"; + // public static final String DATASOURCE_URL = WorkflowsConstants.DATASOURCE_PREFIX + "url"; + + public static final int MIN_WF_PRIORITY = 0; + public static final int MAX_WF_PRIORITY = 100; + public static final int DEFAULT_WF_PRIORITY = 50; + public static final int MAX_PENDING_PROCS_SIZE = 100; + public static final int MAX_RUNNING_PROCS_SIZE = 100; + + public static final String MAIN_LOG_PREFIX = "mainlog:"; + + private String datasourceProtocolsJson; + private String datasourceTypologiesJson; + private List> datasourceProtocols; + private List> datasourceTypologies; + private List> datasourceWorkflowStatuses; + + @SuppressWarnings("unchecked") + public void init() { + final Gson gson = new Gson(); + datasourceProtocols = gson.fromJson(datasourceProtocolsJson, List.class); + datasourceTypologies = gson.fromJson(datasourceTypologiesJson, List.class); + datasourceWorkflowStatuses = new ArrayList<>(); + for (final WorkflowStatus s : WorkflowStatus.values()) { + final Map map = new HashMap<>(); + map.put("name", s.displayName); + map.put("icon", s.icon); + map.put("value", s.toString()); + datasourceWorkflowStatuses.add(map); + } + } + + public String getDatasourceProtocolsJson() { + return datasourceProtocolsJson; + } + + @Required + public void setDatasourceProtocolsJson(final String datasourceProtocolsJson) { + this.datasourceProtocolsJson = datasourceProtocolsJson; + } + + public String getDatasourceTypologiesJson() { + return datasourceTypologiesJson; + } + + @Required + public void setDatasourceTypologiesJson(final String datasourceTypologiesJson) { + this.datasourceTypologiesJson = datasourceTypologiesJson; + } + + public List> getDatasourceProtocols() { + return datasourceProtocols; + } + + public List> getDatasourceTypologies() { + return datasourceTypologies; + } + + public List> getDatasourceWorkflowStatuses() { + return datasourceWorkflowStatuses; + } + + public enum WorkflowStatus { + + EXECUTABLE("Executable", "icon-ok"), + WAIT_USER_SETTINGS("Waiting user settings", "icon-edit"), + WAIT_SYS_SETTINGS("Waiting system settings", + "icon-refresh"); + + public String displayName; + public String icon; + + WorkflowStatus(final String displayName, final String icon) { + this.displayName = displayName; + this.icon = icon; + } + } + +} diff --git a/libs/pom.xml b/libs/pom.xml index e7256d31..20ba7d51 100644 --- a/libs/pom.xml +++ b/libs/pom.xml @@ -19,6 +19,7 @@ dnet-is-common dnet-is-services dnet-data-services + dnet-wf-service diff --git a/pom.xml b/pom.xml index b7839b62..2de8e485 100644 --- a/pom.xml +++ b/pom.xml @@ -347,8 +347,8 @@ maven-compiler-plugin ${maven.compiler.plugin.version} - 1.8 - 1.8 + 17 + 17 ${project.build.sourceEncoding} @@ -451,7 +451,7 @@ UTF-8 UTF-8 - 3.6.0 + 3.9.0 1.8 2.14.0 7.1.0