From 9c84e21b87fea1141e06700a7abb2da5754d5bab Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 13 Mar 2020 15:56:52 +0100 Subject: [PATCH] added workflow to migrate latest version of each actionset content from DM to OCEAN cluster, mapping the targetValues from the old protobuf data model to the dhp.OAF datamodel --- dhp-workflows/dhp-aggregation/pom.xml | 54 ++ .../migration/actions/LicenseComparator.java | 49 ++ .../migration/actions/MigrateActionSet.java | 170 +++++ .../dhp/migration/actions/ProtoConverter.java | 580 ++++++++++++++++++ .../migration/actions/TransformActions.java | 120 ++++ .../migrate_actionsets_parameters.json | 10 + .../transform_actionsets_parameters.json | 5 + .../wfs/actions/oozie_app/config-default.xml | 30 + .../wfs/actions/oozie_app/workflow.xml | 111 ++++ pom.xml | 16 + 10 files changed, 1145 insertions(+) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/LicenseComparator.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/MigrateActionSet.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/ProtoConverter.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/TransformActions.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_actionsets_parameters.json create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/transform_actionsets_parameters.json create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/actions/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/actions/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-aggregation/pom.xml b/dhp-workflows/dhp-aggregation/pom.xml index 78831073fd..09dac83499 100644 --- a/dhp-workflows/dhp-aggregation/pom.xml +++ b/dhp-workflows/dhp-aggregation/pom.xml @@ -24,6 +24,12 @@ eu.dnetlib.dhp dhp-common ${project.version} + + + com.sun.xml.bind + jaxb-core + + @@ -32,6 +38,49 @@ ${project.version} + + eu.dnetlib + dnet-actionmanager-common + + + eu.dnetlib + dnet-openaireplus-mapping-utils + + + saxonica + saxon + + + saxonica + saxon-dom + + + jgrapht + jgrapht + + + net.sf.ehcache + ehcache + + + org.springframework + spring-test + + + org.apache.* + * + + + apache + * + + + + + eu.dnetlib + dnet-openaire-data-protos + + net.sf.saxon Saxon-HE @@ -55,6 +104,11 @@ org.mongodb mongo-java-driver + + + org.apache.hadoop + hadoop-distcp + org.postgresql diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/LicenseComparator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/LicenseComparator.java new file mode 100644 index 0000000000..9d0e82aca9 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/LicenseComparator.java @@ -0,0 +1,49 @@ +package eu.dnetlib.dhp.migration.actions; + +import eu.dnetlib.data.proto.FieldTypeProtos.Qualifier; + +import java.util.Comparator; + +public class LicenseComparator implements Comparator { + + @Override + public int compare(Qualifier left, Qualifier right) { + + if (left == null && right == null) return 0; + if (left == null) return 1; + if (right == null) return -1; + + String lClass = left.getClassid(); + String rClass = right.getClassid(); + + if (lClass.equals(rClass)) return 0; + + if (lClass.equals("OPEN SOURCE")) return -1; + if (rClass.equals("OPEN SOURCE")) return 1; + + if (lClass.equals("OPEN")) return -1; + if (rClass.equals("OPEN")) return 1; + + if (lClass.equals("6MONTHS")) return -1; + if (rClass.equals("6MONTHS")) return 1; + + if (lClass.equals("12MONTHS")) return -1; + if (rClass.equals("12MONTHS")) return 1; + + if (lClass.equals("EMBARGO")) return -1; + if (rClass.equals("EMBARGO")) return 1; + + if (lClass.equals("RESTRICTED")) return -1; + if (rClass.equals("RESTRICTED")) return 1; + + if (lClass.equals("CLOSED")) return -1; + if (rClass.equals("CLOSED")) return 1; + + if (lClass.equals("UNKNOWN")) return -1; + if (rClass.equals("UNKNOWN")) return 1; + + // Else (but unlikely), lexicographical ordering will do. + return lClass.compareTo(rClass); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/MigrateActionSet.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/MigrateActionSet.java new file mode 100644 index 0000000000..487fac359c --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/MigrateActionSet.java @@ -0,0 +1,170 @@ +package eu.dnetlib.dhp.migration.actions; + +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.tools.DistCp; +import org.apache.hadoop.tools.DistCpOptions; +import org.apache.hadoop.util.ToolRunner; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStream; +import java.util.*; +import java.util.stream.Collectors; + +public class MigrateActionSet { + + private static final Log log = LogFactory.getLog(MigrateActionSet.class); + + private static final String SEPARATOR = "/"; + private static final String TARGET_PATHS = "target_paths"; + private static final String RAWSET_PREFIX = "rawset_"; + + private static Boolean DEFAULT_TRANSFORM_ONLY = false; + + public static void main(String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils.toString(MigrateActionSet.class.getResourceAsStream( + "/eu/dnetlib/dhp/migration/migrate_actionsets_parameters.json"))); + parser.parseArgument(args); + + new MigrateActionSet().run(parser); + } + + private void run(ArgumentApplicationParser parser) throws Exception { + + final String isLookupUrl = parser.get("isLookupUrl"); + final String sourceNN = parser.get("sourceNameNode"); + final String targetNN = parser.get("targetNameNode"); + final String workDir = parser.get("workingDirectory"); + final Integer distcp_num_maps = Integer.parseInt(parser.get("distcp_num_maps")); + + final String distcp_memory_mb = parser.get("distcp_memory_mb"); + final String distcp_task_timeout = parser.get("distcp_task_timeout"); + + final String transform_only_s = parser.get("transform_only"); + + log.info("transform only param: " + transform_only_s); + + final Boolean transformOnly = Boolean.valueOf(parser.get("transform_only")); + + log.info("transform only: " + transformOnly); + + ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl); + + Configuration conf = getConfiguration(distcp_task_timeout, distcp_memory_mb, distcp_num_maps); + FileSystem targetFS = FileSystem.get(conf); + + Configuration sourceConf = getConfiguration(distcp_task_timeout, distcp_memory_mb, distcp_num_maps); + sourceConf.set(FileSystem.FS_DEFAULT_NAME_KEY, sourceNN); + FileSystem sourceFS = FileSystem.get(sourceConf); + + Properties props = new Properties(); + + List targetPaths = new ArrayList<>(); + + final List sourcePaths = getSourcePaths(sourceNN, isLookUp); + log.info(String.format("paths to process:\n%s", sourcePaths.stream().map(p -> p.toString()).collect(Collectors.joining("\n")))); + for(Path source : sourcePaths) { + + if (!sourceFS.exists(source)) { + log.warn(String.format("skipping unexisting path: %s", source)); + } else { + + LinkedList pathQ = Lists.newLinkedList(Splitter.on(SEPARATOR).split(source.toUri().getPath())); + + final String rawSet = pathQ.pollLast(); + log.info(String.format("got RAWSET: %s", rawSet)); + + if (StringUtils.isNotBlank(rawSet) && rawSet.startsWith(RAWSET_PREFIX)) { + + final String actionSetDirectory = pathQ.pollLast(); + + final Path targetPath = new Path(targetNN + workDir + SEPARATOR + actionSetDirectory + SEPARATOR + rawSet); + + log.info(String.format("using TARGET PATH: %s", targetPath)); + + if (!transformOnly) { + if (targetFS.exists(targetPath)) { + targetFS.delete(targetPath, true); + } + runDistcp(distcp_num_maps, distcp_memory_mb, distcp_task_timeout, conf, source, targetPath); + } + + targetPaths.add(targetPath); + } + } + } + + props.setProperty(TARGET_PATHS, targetPaths + .stream() + .map(p -> p.toString()) + .collect(Collectors.joining(","))); + File file = new File(System.getProperty("oozie.action.output.properties")); + + try(OutputStream os = new FileOutputStream(file)) { + props.store(os, ""); + } + System.out.println(file.getAbsolutePath()); + } + + private void runDistcp(Integer distcp_num_maps, String distcp_memory_mb, String distcp_task_timeout, Configuration conf, Path source, Path targetPath) throws Exception { + + final DistCpOptions op = new DistCpOptions(source, targetPath); + op.setMaxMaps(distcp_num_maps); + op.preserve(DistCpOptions.FileAttribute.BLOCKSIZE); + op.preserve(DistCpOptions.FileAttribute.REPLICATION); + op.preserve(DistCpOptions.FileAttribute.CHECKSUMTYPE); + + int res = ToolRunner.run(new DistCp(conf, op), new String[]{ + "-Dmapred.task.timeout=" + distcp_task_timeout, + "-Dmapreduce.map.memory.mb=" + distcp_memory_mb, + "-pb", + "-m " + distcp_num_maps, + source.toString(), + targetPath.toString()}); + + if (res != 0) { + throw new RuntimeException(String.format("distcp exited with code %s", res)); + } + } + + private Configuration getConfiguration(String distcp_task_timeout, String distcp_memory_mb, Integer distcp_num_maps) { + final Configuration conf = new Configuration(); + conf.set("dfs.webhdfs.socket.connect-timeout", distcp_task_timeout); + conf.set("dfs.webhdfs.socket.read-timeout", distcp_task_timeout); + conf.set("dfs.http.client.retry.policy.enabled", "true"); + conf.set("mapred.task.timeout", distcp_task_timeout); + conf.set("mapreduce.map.memory.mb", distcp_memory_mb); + conf.set("mapred.map.tasks", String.valueOf(distcp_num_maps)); + return conf; + } + + private List getSourcePaths(String sourceNN, ISLookUpService isLookUp) throws ISLookUpException { + String XQUERY = "distinct-values(\n" + + "let $basePath := collection('/db/DRIVER/ServiceResources/ActionManagerServiceResourceType')//SERVICE_PROPERTIES/PROPERTY[@key = 'basePath']/@value/string()\n" + + "for $x in collection('/db/DRIVER/ActionManagerSetDSResources/ActionManagerSetDSResourceType') \n" + + "let $setDir := $x//SET/@directory/string()\n" + + "let $rawSet := $x//RAW_SETS/LATEST/@id/string()\n" + + "return concat($basePath, '/', $setDir, '/', $rawSet))"; + + log.info(String.format("running xquery:\n%s", XQUERY)); + return isLookUp.quickSearchProfile(XQUERY) + .stream() + .map(p -> sourceNN + p) + .map(Path::new) + .collect(Collectors.toList()); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/ProtoConverter.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/ProtoConverter.java new file mode 100644 index 0000000000..a7e70ee813 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/ProtoConverter.java @@ -0,0 +1,580 @@ +package eu.dnetlib.dhp.migration.actions; + +import com.google.common.collect.Lists; +import com.googlecode.protobuf.format.JsonFormat; +import eu.dnetlib.data.proto.*; +import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.commons.lang3.StringUtils; + +import java.io.Serializable; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +public class ProtoConverter implements Serializable { + + public static final String UNKNOWN = "UNKNOWN"; + public static final String NOT_AVAILABLE = "not available"; + public static final String DNET_ACCESS_MODES = "dnet:access_modes"; + + public static Oaf convert(OafProtos.Oaf oaf) { + try { + switch (oaf.getKind()) { + case entity: + return convertEntity(oaf); + case relation: + return convertRelation(oaf); + default: + throw new IllegalArgumentException("invalid kind " + oaf.getKind()); + } + } catch (Throwable e) { + throw new RuntimeException("error on getting " + JsonFormat.printToString(oaf), e); + } + } + + private static Relation convertRelation(OafProtos.Oaf oaf) { + final OafProtos.OafRel r = oaf.getRel(); + final Relation rel = new Relation(); + rel.setDataInfo(mapDataInfo(oaf.getDataInfo())); + rel.setLastupdatetimestamp(oaf.getLastupdatetimestamp()); + rel.setSource(r.getSource()); + rel.setTarget(r.getTarget()); + rel.setRelType(r.getRelType().toString()); + rel.setSubRelType(r.getSubRelType().toString()); + rel.setRelClass(r.getRelClass()); + rel.setCollectedFrom(r.getCollectedfromCount() > 0 ? + r.getCollectedfromList().stream() + .map(kv -> mapKV(kv)) + .collect(Collectors.toList()) : null); + return rel; + } + + private static OafEntity convertEntity(OafProtos.Oaf oaf) { + + switch (oaf.getEntity().getType()) { + case result: + final Result r = convertResult(oaf); + r.setInstance(convertInstances(oaf)); + return r; + case project: + return convertProject(oaf); + case datasource: + return convertDataSource(oaf); + case organization: + return convertOrganization(oaf); + default: + throw new RuntimeException("received unknown type"); + } + } + + private static List convertInstances(OafProtos.Oaf oaf) { + + final ResultProtos.Result r = oaf.getEntity().getResult(); + if (r.getInstanceCount() > 0) { + return r.getInstanceList() + .stream() + .map(i -> convertInstance(i)) + .collect(Collectors.toList()); + } + return Lists.newArrayList(); + } + + private static Instance convertInstance(ResultProtos.Result.Instance ri) { + final Instance i = new Instance(); + i.setAccessright(mapQualifier(ri.getAccessright())); + i.setCollectedfrom(mapKV(ri.getCollectedfrom())); + i.setDateofacceptance(mapStringField(ri.getDateofacceptance())); + i.setDistributionlocation(ri.getDistributionlocation()); + i.setHostedby(mapKV(ri.getHostedby())); + i.setInstancetype(mapQualifier(ri.getInstancetype())); + i.setLicense(mapStringField(ri.getLicense())); + i.setUrl(ri.getUrlList()); + i.setRefereed(mapStringField(ri.getRefereed())); + i.setProcessingchargeamount(mapStringField(ri.getProcessingchargeamount())); + i.setProcessingchargecurrency(mapStringField(ri.getProcessingchargecurrency())); + return i; + } + + private static Organization convertOrganization(OafProtos.Oaf oaf) { + final OrganizationProtos.Organization.Metadata m = oaf.getEntity().getOrganization().getMetadata(); + final Organization org = setOaf(new Organization(), oaf); + setEntity(org, oaf); + org.setLegalshortname(mapStringField(m.getLegalshortname())); + org.setLegalname(mapStringField(m.getLegalname())); + org.setAlternativeNames(m.getAlternativeNamesList(). + stream() + .map(ProtoConverter::mapStringField) + .collect(Collectors.toList())); + org.setWebsiteurl(mapStringField(m.getWebsiteurl())); + org.setLogourl(mapStringField(m.getLogourl())); + org.setEclegalbody(mapStringField(m.getEclegalbody())); + org.setEclegalperson(mapStringField(m.getEclegalperson())); + org.setEcnonprofit(mapStringField(m.getEcnonprofit())); + org.setEcresearchorganization(mapStringField(m.getEcresearchorganization())); + org.setEchighereducation(mapStringField(m.getEchighereducation())); + org.setEcinternationalorganizationeurinterests(mapStringField(m.getEcinternationalorganizationeurinterests())); + org.setEcinternationalorganization(mapStringField(m.getEcinternationalorganization())); + org.setEcenterprise(mapStringField(m.getEcenterprise())); + org.setEcsmevalidated(mapStringField(m.getEcsmevalidated())); + org.setEcnutscode(mapStringField(m.getEcnutscode())); + org.setCountry(mapQualifier(m.getCountry())); + + return org; + } + + private static Datasource convertDataSource(OafProtos.Oaf oaf) { + final DatasourceProtos.Datasource.Metadata m = oaf.getEntity().getDatasource().getMetadata(); + final Datasource datasource = setOaf(new Datasource(), oaf); + setEntity(datasource, oaf); + datasource.setAccessinfopackage(m.getAccessinfopackageList() + .stream() + .map(ProtoConverter::mapStringField) + .collect(Collectors.toList())); + datasource.setCertificates(mapStringField(m.getCertificates())); + datasource.setCitationguidelineurl(mapStringField(m.getCitationguidelineurl())); + datasource.setContactemail(mapStringField(m.getContactemail())); + datasource.setDatabaseaccessrestriction(mapStringField(m.getDatabaseaccessrestriction())); + datasource.setDatabaseaccesstype(mapStringField(m.getDatabaseaccesstype())); + datasource.setDataprovider(mapBoolField(m.getDataprovider())); + datasource.setDatasourcetype(mapQualifier(m.getDatasourcetype())); + datasource.setDatauploadrestriction(mapStringField(m.getDatauploadrestriction())); + datasource.setCitationguidelineurl(mapStringField(m.getCitationguidelineurl())); + datasource.setDatauploadtype(mapStringField(m.getDatauploadtype())); + datasource.setDateofvalidation(mapStringField(m.getDateofvalidation())); + datasource.setDescription(mapStringField(m.getDescription())); + datasource.setEnglishname(mapStringField(m.getEnglishname())); + datasource.setLatitude(mapStringField(m.getLatitude())); + datasource.setLongitude(mapStringField(m.getLongitude())); + datasource.setLogourl(mapStringField(m.getLogourl())); + datasource.setMissionstatementurl(mapStringField(m.getMissionstatementurl())); + datasource.setNamespaceprefix(mapStringField(m.getNamespaceprefix())); + datasource.setOdcontenttypes(m.getOdcontenttypesList() + .stream() + .map(ProtoConverter::mapStringField) + .collect(Collectors.toList())); + datasource.setOdlanguages(m.getOdlanguagesList() + .stream() + .map(ProtoConverter::mapStringField) + .collect(Collectors.toList())); + datasource.setOdnumberofitems(mapStringField(m.getOdnumberofitems())); + datasource.setOdnumberofitemsdate(mapStringField(m.getOdnumberofitemsdate())); + datasource.setOdpolicies(mapStringField(m.getOdpolicies())); + datasource.setOfficialname(mapStringField(m.getOfficialname())); + datasource.setOpenairecompatibility(mapQualifier(m.getOpenairecompatibility())); + datasource.setPidsystems(mapStringField(m.getPidsystems())); + datasource.setPolicies(m.getPoliciesList() + .stream() + .map(ProtoConverter::mapKV) + .collect(Collectors.toList())); + datasource.setQualitymanagementkind(mapStringField(m.getQualitymanagementkind())); + datasource.setReleaseenddate(mapStringField(m.getReleaseenddate())); + datasource.setServiceprovider(mapBoolField(m.getServiceprovider())); + datasource.setReleasestartdate(mapStringField(m.getReleasestartdate())); + datasource.setSubjects(m.getSubjectsList() + .stream() + .map(ProtoConverter::mapStructuredProperty) + .collect(Collectors.toList())); + datasource.setVersioning(mapBoolField(m.getVersioning())); + datasource.setWebsiteurl(mapStringField(m.getWebsiteurl())); + datasource.setJournal(mapJournal(m.getJournal())); + + + return datasource; + } + + private static Project convertProject(OafProtos.Oaf oaf) { + final ProjectProtos.Project.Metadata m = oaf.getEntity().getProject().getMetadata(); + final Project project = setOaf(new Project(), oaf); + setEntity(project, oaf); + project.setAcronym(mapStringField(m.getAcronym())); + project.setCallidentifier(mapStringField(m.getCallidentifier())); + project.setCode(mapStringField(m.getCode())); + project.setContactemail(mapStringField(m.getContactemail())); + project.setContactfax(mapStringField(m.getContactfax())); + project.setContactfullname(mapStringField(m.getContactfullname())); + project.setContactphone(mapStringField(m.getContactphone())); + project.setContracttype(mapQualifier(m.getContracttype())); + project.setCurrency(mapStringField(m.getCurrency())); + project.setDuration(mapStringField(m.getDuration())); + project.setEcarticle29_3(mapStringField(m.getEcarticle293())); + project.setEcsc39(mapStringField(m.getEcsc39())); + project.setOamandatepublications(mapStringField(m.getOamandatepublications())); + project.setStartdate(mapStringField(m.getStartdate())); + project.setEnddate(mapStringField(m.getEnddate())); + project.setFundedamount(m.getFundedamount()); + project.setTotalcost(m.getTotalcost()); + project.setKeywords(mapStringField(m.getKeywords())); + project.setSubjects(m.getSubjectsList().stream() + .map(sp -> mapStructuredProperty(sp)) + .collect(Collectors.toList())); + project.setTitle(mapStringField(m.getTitle())); + project.setWebsiteurl(mapStringField(m.getWebsiteurl())); + project.setFundingtree(m.getFundingtreeList().stream() + .map(f -> mapStringField(f)) + .collect(Collectors.toList())); + project.setJsonextrainfo(mapStringField(m.getJsonextrainfo())); + project.setSummary(mapStringField(m.getSummary())); + project.setOptional1(mapStringField(m.getOptional1())); + project.setOptional2(mapStringField(m.getOptional2())); + return project; + } + + private static Result convertResult(OafProtos.Oaf oaf) { + switch (oaf.getEntity().getResult().getMetadata().getResulttype().getClassid()) { + case "dataset": + return createDataset(oaf); + case "publication": + return createPublication(oaf); + case "software": + return createSoftware(oaf); + case "other": + return createORP(oaf); + default: + Result result = setOaf(new Result(), oaf); + setEntity(result, oaf); + return setResult(result, oaf); + } + } + + private static Software createSoftware(OafProtos.Oaf oaf) { + ResultProtos.Result.Metadata m = oaf.getEntity().getResult().getMetadata(); + Software software = setOaf(new Software(), oaf); + setEntity(software, oaf); + setResult(software, oaf); + + software.setDocumentationUrl(m.getDocumentationUrlList() + .stream() + .map(ProtoConverter::mapStringField) + .collect(Collectors.toList())); + software.setLicense(m.getLicenseList() + .stream() + .map(ProtoConverter::mapStructuredProperty) + .collect(Collectors.toList())); + software.setCodeRepositoryUrl(mapStringField(m.getCodeRepositoryUrl())); + software.setProgrammingLanguage(mapQualifier(m.getProgrammingLanguage())); + return software; + } + + private static OtherResearchProduct createORP(OafProtos.Oaf oaf) { + ResultProtos.Result.Metadata m = oaf.getEntity().getResult().getMetadata(); + OtherResearchProduct otherResearchProducts = setOaf(new OtherResearchProduct(), oaf); + setEntity(otherResearchProducts, oaf); + setResult(otherResearchProducts, oaf); + otherResearchProducts.setContactperson(m.getContactpersonList() + .stream() + .map(ProtoConverter::mapStringField) + .collect(Collectors.toList())); + otherResearchProducts.setContactgroup(m.getContactgroupList() + .stream() + .map(ProtoConverter::mapStringField) + .collect(Collectors.toList())); + otherResearchProducts.setTool(m.getToolList() + .stream() + .map(ProtoConverter::mapStringField) + .collect(Collectors.toList())); + + return otherResearchProducts; + } + + private static Publication createPublication(OafProtos.Oaf oaf) { + + ResultProtos.Result.Metadata m = oaf.getEntity().getResult().getMetadata(); + Publication publication = setOaf(new Publication(), oaf); + setEntity(publication, oaf); + setResult(publication, oaf); + publication.setJournal(mapJournal(m.getJournal())); + return publication; + } + + private static Dataset createDataset(OafProtos.Oaf oaf) { + + ResultProtos.Result.Metadata m = oaf.getEntity().getResult().getMetadata(); + Dataset dataset = setOaf(new Dataset(), oaf); + setEntity(dataset, oaf); + setResult(dataset, oaf); + dataset.setStoragedate(mapStringField(m.getStoragedate())); + dataset.setDevice(mapStringField(m.getDevice())); + dataset.setSize(mapStringField(m.getSize())); + dataset.setVersion(mapStringField(m.getVersion())); + dataset.setLastmetadataupdate(mapStringField(m.getLastmetadataupdate())); + dataset.setMetadataversionnumber(mapStringField(m.getMetadataversionnumber())); + dataset.setGeolocation(m.getGeolocationList() + .stream() + .map(ProtoConverter::mapGeolocation) + .collect(Collectors.toList())); + return dataset; + + } + + public static T setOaf(T oaf, OafProtos.Oaf o) { + oaf.setDataInfo(mapDataInfo(o.getDataInfo())); + oaf.setLastupdatetimestamp(o.getLastupdatetimestamp()); + return oaf; + } + + public static T setEntity(T entity, OafProtos.Oaf oaf) { + //setting Entity fields + final OafProtos.OafEntity e = oaf.getEntity(); + entity.setId(e.getId()); + entity.setOriginalId(e.getOriginalIdList()); + entity.setCollectedfrom(e.getCollectedfromList() + .stream() + .map(ProtoConverter::mapKV) + .collect(Collectors.toList())); + entity.setPid(e.getPidList().stream() + .map(ProtoConverter::mapStructuredProperty) + .collect(Collectors.toList())); + entity.setDateofcollection(e.getDateofcollection()); + entity.setDateoftransformation(e.getDateoftransformation()); + entity.setExtraInfo(e.getExtraInfoList() + .stream() + .map(ProtoConverter::mapExtraInfo) + .collect(Collectors.toList())); + return entity; + } + + public static T setResult(T entity, OafProtos.Oaf oaf) { + //setting Entity fields + final ResultProtos.Result.Metadata m = oaf.getEntity().getResult().getMetadata(); + entity.setAuthor(m.getAuthorList() + .stream() + .map(ProtoConverter::mapAuthor) + .collect(Collectors.toList())); + entity.setResulttype(mapQualifier(m.getResulttype())); + entity.setLanguage(mapQualifier(m.getLanguage())); + entity.setCountry(m.getCountryList() + .stream() + .map(ProtoConverter::mapQualifierAsCountry) + .collect(Collectors.toList())); + entity.setSubject(m.getSubjectList() + .stream() + .map(ProtoConverter::mapStructuredProperty) + .collect(Collectors.toList())); + entity.setTitle(m.getTitleList() + .stream() + .map(ProtoConverter::mapStructuredProperty) + .collect(Collectors.toList())); + entity.setRelevantdate(m.getRelevantdateList() + .stream() + .map(ProtoConverter::mapStructuredProperty) + .collect(Collectors.toList())); + entity.setDescription(m.getDescriptionList() + .stream() + .map(ProtoConverter::mapStringField) + .collect(Collectors.toList())); + entity.setDateofacceptance(mapStringField(m.getDateofacceptance())); + entity.setPublisher(mapStringField(m.getPublisher())); + entity.setEmbargoenddate(mapStringField(m.getEmbargoenddate())); + entity.setSource(m.getSourceList() + .stream() + .map(ProtoConverter::mapStringField) + .collect(Collectors.toList())); + entity.setFulltext(m.getFulltextList() + .stream() + .map(ProtoConverter::mapStringField) + .collect(Collectors.toList())); + entity.setFormat(m.getFormatList() + .stream() + .map(ProtoConverter::mapStringField) + .collect(Collectors.toList())); + entity.setContributor(m.getContributorList() + .stream() + .map(ProtoConverter::mapStringField) + .collect(Collectors.toList())); + entity.setResourcetype(mapQualifier(m.getResourcetype())); + entity.setCoverage(m.getCoverageList() + .stream() + .map(ProtoConverter::mapStringField) + .collect(Collectors.toList())); + entity.setContext(m.getContextList() + .stream() + .map(ProtoConverter::mapContext) + .collect(Collectors.toList())); + + entity.setBestaccessright(getBestAccessRights(oaf.getEntity().getResult().getInstanceList())); + + return entity; + } + + private static Qualifier getBestAccessRights(List instanceList) { + if (instanceList != null) { + final Optional min = instanceList.stream() + .map(i -> i.getAccessright()).min(new LicenseComparator()); + + final Qualifier rights = min.isPresent() ? mapQualifier(min.get()) : new Qualifier(); + + if (StringUtils.isBlank(rights.getClassid())) { + rights.setClassid(UNKNOWN); + } + if (StringUtils.isBlank(rights.getClassname()) || UNKNOWN.equalsIgnoreCase(rights.getClassname())) { + rights.setClassname(NOT_AVAILABLE); + } + if (StringUtils.isBlank(rights.getSchemeid())) { + rights.setSchemeid(DNET_ACCESS_MODES); + } + if (StringUtils.isBlank(rights.getSchemename())) { + rights.setSchemename(DNET_ACCESS_MODES); + } + + return rights; + } + return null; + } + + private static Context mapContext(ResultProtos.Result.Context context) { + + final Context entity = new Context(); + entity.setId(context.getId()); + entity.setDataInfo(context.getDataInfoList() + .stream() + .map(ProtoConverter::mapDataInfo) + .collect(Collectors.toList())); + return entity; + } + + + public static KeyValue mapKV(FieldTypeProtos.KeyValue kv) { + final KeyValue keyValue = new KeyValue(); + keyValue.setKey(kv.getKey()); + keyValue.setValue(kv.getValue()); + keyValue.setDataInfo(mapDataInfo(kv.getDataInfo())); + return keyValue; + } + + public static DataInfo mapDataInfo(FieldTypeProtos.DataInfo d) { + final DataInfo dataInfo = new DataInfo(); + dataInfo.setDeletedbyinference(d.getDeletedbyinference()); + dataInfo.setInferenceprovenance(d.getInferenceprovenance()); + dataInfo.setInferred(d.getInferred()); + dataInfo.setInvisible(d.getInvisible()); + dataInfo.setProvenanceaction(mapQualifier(d.getProvenanceaction())); + dataInfo.setTrust(d.getTrust()); + return dataInfo; + } + + public static Qualifier mapQualifier(FieldTypeProtos.Qualifier q) { + final Qualifier qualifier = new Qualifier(); + qualifier.setClassid(q.getClassid()); + qualifier.setClassname(q.getClassname()); + qualifier.setSchemeid(q.getSchemeid()); + qualifier.setSchemename(q.getSchemename()); + return qualifier; + } + + public static Country mapQualifierAsCountry(FieldTypeProtos.Qualifier q) { + final Country c = new Country(); + c.setClassid(q.getClassid()); + c.setClassname(q.getClassname()); + c.setSchemeid(q.getSchemeid()); + c.setSchemename(q.getSchemename()); + c.setDataInfo(mapDataInfo(q.getDataInfo())); + return c; + } + + public static StructuredProperty mapStructuredProperty(FieldTypeProtos.StructuredProperty sp) { + final StructuredProperty structuredProperty = new StructuredProperty(); + structuredProperty.setValue(sp.getValue()); + structuredProperty.setQualifier(mapQualifier(sp.getQualifier())); + structuredProperty.setDataInfo(mapDataInfo(sp.getDataInfo())); + return structuredProperty; + } + + public static ExtraInfo mapExtraInfo(FieldTypeProtos.ExtraInfo extraInfo) { + final ExtraInfo entity = new ExtraInfo(); + entity.setName(extraInfo.getName()); + entity.setTypology(extraInfo.getTypology()); + entity.setProvenance(extraInfo.getProvenance()); + entity.setTrust(extraInfo.getTrust()); + entity.setValue(extraInfo.getValue()); + return entity; + } + + public static OAIProvenance mapOAIProvenance(FieldTypeProtos.OAIProvenance oaiProvenance) { + final OAIProvenance entity = new OAIProvenance(); + entity.setOriginDescription(mapOriginalDescription(oaiProvenance.getOriginDescription())); + return entity; + } + + public static OriginDescription mapOriginalDescription(FieldTypeProtos.OAIProvenance.OriginDescription originDescription) { + final OriginDescription originDescriptionResult = new OriginDescription(); + originDescriptionResult.setHarvestDate(originDescription.getHarvestDate()); + originDescriptionResult.setAltered(originDescription.getAltered()); + originDescriptionResult.setBaseURL(originDescription.getBaseURL()); + originDescriptionResult.setIdentifier(originDescription.getIdentifier()); + originDescriptionResult.setDatestamp(originDescription.getDatestamp()); + originDescriptionResult.setMetadataNamespace(originDescription.getMetadataNamespace()); + return originDescriptionResult; + } + + public static Field mapStringField(FieldTypeProtos.StringField s) { + final Field stringField = new Field<>(); + stringField.setValue(s.getValue()); + stringField.setDataInfo(mapDataInfo(s.getDataInfo())); + return stringField; + } + + public static Field mapBoolField(FieldTypeProtos.BoolField b) { + final Field booleanField = new Field<>(); + booleanField.setValue(b.getValue()); + booleanField.setDataInfo(mapDataInfo(b.getDataInfo())); + return booleanField; + } + + public static Field mapIntField(FieldTypeProtos.IntField b) { + final Field entity = new Field<>(); + entity.setValue(b.getValue()); + entity.setDataInfo(mapDataInfo(b.getDataInfo())); + return entity; + } + + public static Journal mapJournal(FieldTypeProtos.Journal j) { + final Journal journal = new Journal(); + journal.setConferencedate(j.getConferencedate()); + journal.setConferenceplace(j.getConferenceplace()); + journal.setEdition(j.getEdition()); + journal.setEp(j.getEp()); + journal.setIss(j.getIss()); + journal.setIssnLinking(j.getIssnLinking()); + journal.setIssnOnline(j.getIssnOnline()); + journal.setIssnPrinted(j.getIssnPrinted()); + journal.setName(j.getName()); + journal.setSp(j.getSp()); + journal.setVol(j.getVol()); + journal.setDataInfo(mapDataInfo(j.getDataInfo())); + return journal; + } + + public static Author mapAuthor(FieldTypeProtos.Author author) { + final Author entity = new Author(); + entity.setFullname(author.getFullname()); + entity.setName(author.getName()); + entity.setSurname(author.getSurname()); + entity.setRank(author.getRank()); + entity.setPid(author.getPidList() + .stream() + .map(kv -> { + final StructuredProperty sp = new StructuredProperty(); + sp.setValue(kv.getValue()); + final Qualifier q = new Qualifier(); + q.setClassid(kv.getKey()); + q.setClassname(kv.getKey()); + sp.setQualifier(q); + return sp; + }) + .collect(Collectors.toList())); + entity.setAffiliation(author.getAffiliationList() + .stream() + .map(ProtoConverter::mapStringField) + .collect(Collectors.toList())); + return entity; + + } + + public static GeoLocation mapGeolocation(ResultProtos.Result.GeoLocation geoLocation) { + final GeoLocation entity = new GeoLocation(); + entity.setPoint(geoLocation.getPoint()); + entity.setBox(geoLocation.getBox()); + entity.setPlace(geoLocation.getPlace()); + return entity; + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/TransformActions.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/TransformActions.java new file mode 100644 index 0000000000..9b6e7654fb --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/TransformActions.java @@ -0,0 +1,120 @@ +package eu.dnetlib.dhp.migration.actions; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import com.google.protobuf.InvalidProtocolBufferException; +import eu.dnetlib.actionmanager.actions.AtomicAction; +import eu.dnetlib.data.proto.OafProtos; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import scala.Tuple2; + +import java.io.IOException; +import java.io.Serializable; +import java.util.LinkedList; + +public class TransformActions implements Serializable { + + private static final Log log = LogFactory.getLog(TransformActions.class); + private static final String SEPARATOR = "/"; + + public static void main(String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils.toString(MigrateActionSet.class.getResourceAsStream( + "/eu/dnetlib/dhp/migration/transform_actionsets_parameters.json"))); + parser.parseArgument(args); + + new TransformActions().run(parser); + } + + private void run(ArgumentApplicationParser parser) throws ISLookUpException, IOException { + + final String isLookupUrl = parser.get("isLookupUrl"); + log.info("isLookupUrl: " + isLookupUrl); + + final String inputPaths = parser.get("inputPaths"); + + if (StringUtils.isBlank(inputPaths)) { + throw new RuntimeException("empty inputPaths"); + } + log.info("inputPaths: " + inputPaths); + + final String targetBaseDir = getTargetBaseDir(isLookupUrl); + + try(SparkSession spark = getSparkSession(parser)) { + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration()); + + for(String sourcePath : Lists.newArrayList(Splitter.on(",").split(inputPaths))) { + + LinkedList pathQ = Lists.newLinkedList(Splitter.on(SEPARATOR).split(sourcePath)); + + final String rawset = pathQ.pollLast(); + final String actionSetDirectory = pathQ.pollLast(); + + final Path targetDirectory = new Path(targetBaseDir + SEPARATOR + actionSetDirectory + SEPARATOR + rawset); + + if (fs.exists(targetDirectory)) { + log.info(String.format("found target directory '%s", targetDirectory)); + fs.delete(targetDirectory, true); + log.info(String.format("deleted target directory '%s", targetDirectory)); + } + + log.info(String.format("transforming actions from '%s' to '%s'", sourcePath, targetDirectory)); + + sc.sequenceFile(sourcePath, Text.class, Text.class) + .mapToPair(a -> new Tuple2<>(a._1(), AtomicAction.fromJSON(a._2().toString()))) + .mapToPair(a -> new Tuple2<>(a._1(), transformAction(a._2()))) + + .saveAsHadoopFile(targetDirectory.toString(), Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); + } + } + } + + private Text transformAction(AtomicAction aa) throws InvalidProtocolBufferException, JsonProcessingException { + + final ObjectMapper mapper = new ObjectMapper(); + if (aa.getTargetValue() != null && aa.getTargetValue().length > 0) { + Oaf oaf = ProtoConverter.convert(OafProtos.Oaf.parseFrom(aa.getTargetValue())); + aa.setTargetValue(mapper.writeValueAsString(oaf).getBytes()); + } + + return new Text(mapper.writeValueAsString(aa)); + } + + private String getTargetBaseDir(String isLookupUrl) throws ISLookUpException { + ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl); + String XQUERY = "collection('/db/DRIVER/ServiceResources/ActionManagerServiceResourceType')//SERVICE_PROPERTIES/PROPERTY[@key = 'basePath']/@value/string()"; + return isLookUp.getResourceProfileByQuery(XQUERY); + } + + private static SparkSession getSparkSession(ArgumentApplicationParser parser) { + SparkConf conf = new SparkConf(); + + return SparkSession + .builder() + .appName(TransformActions.class.getSimpleName()) + .master(parser.get("master")) + .config(conf) + .enableHiveSupport() + .getOrCreate(); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_actionsets_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_actionsets_parameters.json new file mode 100644 index 0000000000..c4910ec61b --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_actionsets_parameters.json @@ -0,0 +1,10 @@ +[ + {"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true}, + {"paramName":"sn", "paramLongName":"sourceNameNode", "paramDescription": "nameNode of the source cluster", "paramRequired": true}, + {"paramName":"tn", "paramLongName":"targetNameNode", "paramDescription": "namoNode of the target cluster", "paramRequired": true}, + {"paramName":"w", "paramLongName":"workingDirectory", "paramDescription": "working directory", "paramRequired": true}, + {"paramName":"nm", "paramLongName":"distcp_num_maps", "paramDescription": "maximum number of map tasks used in the distcp process", "paramRequired": true}, + {"paramName":"mm", "paramLongName":"distcp_memory_mb", "paramDescription": "memory for distcp action copying actionsets from remote cluster", "paramRequired": true}, + {"paramName":"tt", "paramLongName":"distcp_task_timeout", "paramDescription": "timeout for distcp copying actions from remote cluster", "paramRequired": true}, + {"paramName":"tr", "paramLongName":"transform_only", "paramDescription": "activate tranform-only mode. Only apply transformation step", "paramRequired": true} +] diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/transform_actionsets_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/transform_actionsets_parameters.json new file mode 100644 index 0000000000..ce72f53ca6 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/transform_actionsets_parameters.json @@ -0,0 +1,5 @@ +[ + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true}, + {"paramName":"i", "paramLongName":"inputPaths", "paramDescription": "URL of the isLookUp Service", "paramRequired": true} +] diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/actions/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/actions/oozie_app/config-default.xml new file mode 100644 index 0000000000..9637ebdc62 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/actions/oozie_app/config-default.xml @@ -0,0 +1,30 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + sourceNN + webhdfs://namenode2.hadoop.dm.openaire.eu:50071 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + spark2YarnHistoryServerAddress + http://iis-cdh5-test-gw.ocean.icm.edu.pl:18088 + + + spark2EventLogDir + /user/spark/applicationHistory + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/actions/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/actions/oozie_app/workflow.xml new file mode 100644 index 0000000000..ec2861a0ee --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/actions/oozie_app/workflow.xml @@ -0,0 +1,111 @@ + + + + sourceNN + the source name node + + + isLookupUrl + the isLookup service endpoint + + + workingDirectory + /tmp/actionsets + working directory + + + distcp_memory_mb + 6144 + memory for distcp copying actionsets from remote cluster + + + distcp_task_timeout + 60000000 + timeout for distcp copying actions from remote cluster + + + distcp_num_maps + 1 + mmaximum number of map tasks used in the distcp process + + + transform_only + activate tranform-only mode. Only apply transformation step + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.dhp.migration.actions.MigrateActionSet + -Dmapred.task.timeout=${distcp_task_timeout} + -is${isLookupUrl} + -sn${sourceNN} + -tn${nameNode} + -w${workingDirectory} + -nm${distcp_num_maps} + -mm${distcp_memory_mb} + -tt${distcp_task_timeout} + -tr${transform_only} + + + + + + + + + ${jobTracker} + ${nameNode} + yarn + cluster + transform_actions + eu.dnetlib.dhp.migration.actions.TransformActions + dhp-aggregation-${projectVersion}.jar + + --executor-cores ${sparkExecutorCores} + --executor-memory ${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" + --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + -mtyarn + -is${isLookupUrl} + --inputPaths${wf:actionData('migrate_actionsets')['target_paths']} + + + + + + + migrate_actions failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml index 74003a4078..0310a3f44e 100644 --- a/pom.xml +++ b/pom.xml @@ -110,6 +110,12 @@ ${dhp.hadoop.version} provided + + org.apache.hadoop + hadoop-distcp + ${dhp.hadoop.version} + provided + org.apache.spark spark-core_2.11 @@ -262,6 +268,16 @@ provided + + eu.dnetlib + dnet-actionmanager-common + 6.0.5 + + + eu.dnetlib + dnet-openaire-data-protos + 3.9.8-proto250 + eu.dnetlib dnet-pace-core