From 91e7220f2099f3d9519a8b960c9663cde34ce650 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 29 Apr 2021 10:09:52 +0200 Subject: [PATCH] cleaned up workflow for actionset migration, adjusted dnet|cnr* dependency versions --- dhp-common/pom.xml | 1 - dhp-workflows/dhp-actionmanager/pom.xml | 5 - .../migration/LicenseComparator.java | 69 -- .../migration/MigrateActionSet.java | 196 ----- .../migration/ProtoConverter.java | 719 ------------------ .../migration/TransformActions.java | 172 ----- .../migrate_actionsets_parameters.json | 56 -- .../transform_actionsets_parameters.json | 20 - .../wf/migration/oozie_app/config-default.xml | 30 - .../wf/migration/oozie_app/workflow.xml | 138 ---- pom.xml | 31 +- 11 files changed, 18 insertions(+), 1419 deletions(-) delete mode 100644 dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/LicenseComparator.java delete mode 100644 dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/MigrateActionSet.java delete mode 100644 dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/ProtoConverter.java delete mode 100644 dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/TransformActions.java delete mode 100644 dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/migration/migrate_actionsets_parameters.json delete mode 100644 dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/migration/transform_actionsets_parameters.json delete mode 100644 dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/migration/oozie_app/config-default.xml delete mode 100644 dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/migration/oozie_app/workflow.xml diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml index 9f6bf78d5..acac3594f 100644 --- a/dhp-common/pom.xml +++ b/dhp-common/pom.xml @@ -107,7 +107,6 @@ eu.dnetlib.dhp dhp-schemas - stable_ids diff --git a/dhp-workflows/dhp-actionmanager/pom.xml b/dhp-workflows/dhp-actionmanager/pom.xml index a1173ad75..f63f3aa01 100644 --- a/dhp-workflows/dhp-actionmanager/pom.xml +++ b/dhp-workflows/dhp-actionmanager/pom.xml @@ -51,11 +51,6 @@ hadoop-distcp - - eu.dnetlib - dnet-openaire-data-protos - - eu.dnetlib dnet-actionmanager-api diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/LicenseComparator.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/LicenseComparator.java deleted file mode 100644 index 7b6046f8b..000000000 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/LicenseComparator.java +++ /dev/null @@ -1,69 +0,0 @@ - -package eu.dnetlib.dhp.actionmanager.migration; - -import java.util.Comparator; - -import eu.dnetlib.data.proto.FieldTypeProtos.Qualifier; - -public class LicenseComparator implements Comparator { - - @Override - public int compare(Qualifier left, Qualifier right) { - - if (left == null && right == null) - return 0; - if (left == null) - return 1; - if (right == null) - return -1; - - String lClass = left.getClassid(); - String rClass = right.getClassid(); - - if (lClass.equals(rClass)) - return 0; - - if (lClass.equals("OPEN SOURCE")) - return -1; - if (rClass.equals("OPEN SOURCE")) - return 1; - - if (lClass.equals("OPEN")) - return -1; - if (rClass.equals("OPEN")) - return 1; - - if (lClass.equals("6MONTHS")) - return -1; - if (rClass.equals("6MONTHS")) - return 1; - - if (lClass.equals("12MONTHS")) - return -1; - if (rClass.equals("12MONTHS")) - return 1; - - if (lClass.equals("EMBARGO")) - return -1; - if (rClass.equals("EMBARGO")) - return 1; - - if (lClass.equals("RESTRICTED")) - return -1; - if (rClass.equals("RESTRICTED")) - return 1; - - if (lClass.equals("CLOSED")) - return -1; - if (rClass.equals("CLOSED")) - return 1; - - if (lClass.equals("UNKNOWN")) - return -1; - if (rClass.equals("UNKNOWN")) - return 1; - - // Else (but unlikely), lexicographical ordering will do. - return lClass.compareTo(rClass); - } -} diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/MigrateActionSet.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/MigrateActionSet.java deleted file mode 100644 index 77be7652e..000000000 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/MigrateActionSet.java +++ /dev/null @@ -1,196 +0,0 @@ - -package eu.dnetlib.dhp.actionmanager.migration; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; -import java.util.stream.Collectors; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.tools.DistCp; -import org.apache.hadoop.tools.DistCpOptions; -import org.apache.hadoop.util.ToolRunner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Splitter; -import com.google.common.collect.Lists; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; - -public class MigrateActionSet { - - private static final Logger log = LoggerFactory.getLogger(MigrateActionSet.class); - - private static final String SEPARATOR = "/"; - private static final String TARGET_PATHS = "target_paths"; - private static final String RAWSET_PREFIX = "rawset_"; - - public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - MigrateActionSet.class - .getResourceAsStream( - "/eu/dnetlib/dhp/actionmanager/migration/migrate_actionsets_parameters.json"))); - parser.parseArgument(args); - - new MigrateActionSet().run(parser); - } - - private void run(ArgumentApplicationParser parser) throws Exception { - - final String isLookupUrl = parser.get("isLookupUrl"); - final String sourceNN = parser.get("sourceNameNode"); - final String targetNN = parser.get("targetNameNode"); - final String workDir = parser.get("workingDirectory"); - final Integer distcp_num_maps = Integer.parseInt(parser.get("distcp_num_maps")); - - final String distcp_memory_mb = parser.get("distcp_memory_mb"); - final String distcp_task_timeout = parser.get("distcp_task_timeout"); - - final String transform_only_s = parser.get("transform_only"); - - log.info("transform only param: {}", transform_only_s); - - final Boolean transformOnly = Boolean.valueOf(parser.get("transform_only")); - - log.info("transform only: {}", transformOnly); - - ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl); - - Configuration conf = getConfiguration(distcp_task_timeout, distcp_memory_mb, distcp_num_maps); - FileSystem targetFS = FileSystem.get(conf); - - Configuration sourceConf = getConfiguration(distcp_task_timeout, distcp_memory_mb, distcp_num_maps); - sourceConf.set(FileSystem.FS_DEFAULT_NAME_KEY, sourceNN); - FileSystem sourceFS = FileSystem.get(sourceConf); - - Properties props = new Properties(); - - List targetPaths = new ArrayList<>(); - - final List sourcePaths = getSourcePaths(sourceNN, isLookUp); - log - .info( - "paths to process:\n{}", sourcePaths - .stream() - .map(p -> p.toString()) - .collect(Collectors.joining("\n"))); - - for (Path source : sourcePaths) { - - if (!sourceFS.exists(source)) { - log.warn("skipping unexisting path: {}", source); - } else { - - LinkedList pathQ = Lists.newLinkedList(Splitter.on(SEPARATOR).split(source.toUri().getPath())); - - final String rawSet = pathQ.pollLast(); - log.info("got RAWSET: {}", rawSet); - - if (StringUtils.isNotBlank(rawSet) && rawSet.startsWith(RAWSET_PREFIX)) { - - final String actionSetDirectory = pathQ.pollLast(); - - final Path targetPath = new Path( - targetNN + workDir + SEPARATOR + actionSetDirectory + SEPARATOR + rawSet); - - log.info("using TARGET PATH: {}", targetPath); - - if (!transformOnly) { - if (targetFS.exists(targetPath)) { - targetFS.delete(targetPath, true); - } - runDistcp( - distcp_num_maps, distcp_memory_mb, distcp_task_timeout, conf, source, targetPath); - } - - targetPaths.add(targetPath); - } - } - } - - final String targetPathsCsv = targetPaths.stream().map(p -> p.toString()).collect(Collectors.joining(",")); - props.setProperty(TARGET_PATHS, targetPathsCsv); - File file = new File(System.getProperty("oozie.action.output.properties")); - - try (OutputStream os = new FileOutputStream(file)) { - props.store(os, ""); - } - System.out.println(file.getAbsolutePath()); - } - - private void runDistcp( - Integer distcp_num_maps, - String distcp_memory_mb, - String distcp_task_timeout, - Configuration conf, - Path source, - Path targetPath) - throws Exception { - - final DistCpOptions op = new DistCpOptions(source, targetPath); - op.setMaxMaps(distcp_num_maps); - op.preserve(DistCpOptions.FileAttribute.BLOCKSIZE); - op.preserve(DistCpOptions.FileAttribute.REPLICATION); - op.preserve(DistCpOptions.FileAttribute.CHECKSUMTYPE); - - int res = ToolRunner - .run( - new DistCp(conf, op), - new String[] { - "-Dmapred.task.timeout=" + distcp_task_timeout, - "-Dmapreduce.map.memory.mb=" + distcp_memory_mb, - "-pb", - "-m " + distcp_num_maps, - source.toString(), - targetPath.toString() - }); - - if (res != 0) { - throw new RuntimeException(String.format("distcp exited with code %s", res)); - } - } - - private Configuration getConfiguration( - String distcp_task_timeout, String distcp_memory_mb, Integer distcp_num_maps) { - final Configuration conf = new Configuration(); - conf.set("dfs.webhdfs.socket.connect-timeout", distcp_task_timeout); - conf.set("dfs.webhdfs.socket.read-timeout", distcp_task_timeout); - conf.set("dfs.http.client.retry.policy.enabled", "true"); - conf.set("mapred.task.timeout", distcp_task_timeout); - conf.set("mapreduce.map.memory.mb", distcp_memory_mb); - conf.set("mapred.map.tasks", String.valueOf(distcp_num_maps)); - return conf; - } - - private List getSourcePaths(String sourceNN, ISLookUpService isLookUp) - throws ISLookUpException { - String XQUERY = "distinct-values(\n" - + "let $basePath := collection('/db/DRIVER/ServiceResources/ActionManagerServiceResourceType')//SERVICE_PROPERTIES/PROPERTY[@key = 'basePath']/@value/string()\n" - + "for $x in collection('/db/DRIVER/ActionManagerSetDSResources/ActionManagerSetDSResourceType') \n" - + "let $setDir := $x//SET/@directory/string()\n" - + "let $rawSet := $x//RAW_SETS/LATEST/@id/string()\n" - + "return concat($basePath, '/', $setDir, '/', $rawSet))"; - - log.info(String.format("running xquery:\n%s", XQUERY)); - return isLookUp - .quickSearchProfile(XQUERY) - .stream() - .map(p -> sourceNN + p) - .map(Path::new) - .collect(Collectors.toList()); - } -} diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/ProtoConverter.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/ProtoConverter.java deleted file mode 100644 index 70f37fba3..000000000 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/ProtoConverter.java +++ /dev/null @@ -1,719 +0,0 @@ - -package eu.dnetlib.dhp.actionmanager.migration; - -import static eu.dnetlib.dhp.schema.common.ModelConstants.*; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; - -import org.apache.commons.lang3.StringUtils; - -import com.google.common.collect.Lists; -import com.googlecode.protobuf.format.JsonFormat; - -import eu.dnetlib.data.proto.*; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.*; - -public class ProtoConverter implements Serializable { - - public static Oaf convert(OafProtos.Oaf oaf) { - try { - switch (oaf.getKind()) { - case entity: - return convertEntity(oaf); - case relation: - return convertRelation(oaf); - default: - throw new IllegalArgumentException("invalid kind " + oaf.getKind()); - } - } catch (Throwable e) { - throw new RuntimeException("error on getting " + JsonFormat.printToString(oaf), e); - } - } - - private static Relation convertRelation(OafProtos.Oaf oaf) { - final OafProtos.OafRel r = oaf.getRel(); - final Relation rel = new Relation(); - rel.setDataInfo(mapDataInfo(oaf.getDataInfo())); - rel.setLastupdatetimestamp(oaf.getLastupdatetimestamp()); - rel.setSource(r.getSource()); - rel.setTarget(r.getTarget()); - rel.setRelType(r.getRelType().toString()); - rel.setSubRelType(r.getSubRelType().toString()); - rel.setRelClass(r.getRelClass()); - rel - .setCollectedfrom( - r.getCollectedfromCount() > 0 - ? r.getCollectedfromList().stream().map(kv -> mapKV(kv)).collect(Collectors.toList()) - : null); - return rel; - } - - private static OafEntity convertEntity(OafProtos.Oaf oaf) { - - switch (oaf.getEntity().getType()) { - case result: - final Result r = convertResult(oaf); - r.setInstance(convertInstances(oaf)); - r.setExternalReference(convertExternalRefs(oaf)); - return r; - case project: - return convertProject(oaf); - case datasource: - return convertDataSource(oaf); - case organization: - return convertOrganization(oaf); - default: - throw new RuntimeException("received unknown type"); - } - } - - private static List convertInstances(OafProtos.Oaf oaf) { - - final ResultProtos.Result r = oaf.getEntity().getResult(); - if (r.getInstanceCount() > 0) { - return r.getInstanceList().stream().map(i -> convertInstance(i)).collect(Collectors.toList()); - } - return Lists.newArrayList(); - } - - private static Instance convertInstance(ResultProtos.Result.Instance ri) { - final Instance i = new Instance(); - i.setAccessright(mapAccessRight(ri.getAccessright())); - i.setCollectedfrom(mapKV(ri.getCollectedfrom())); - i.setDateofacceptance(mapStringField(ri.getDateofacceptance())); - i.setDistributionlocation(ri.getDistributionlocation()); - i.setHostedby(mapKV(ri.getHostedby())); - i.setInstancetype(mapQualifier(ri.getInstancetype())); - i.setLicense(mapStringField(ri.getLicense())); - i - .setUrl( - ri.getUrlList() != null ? ri - .getUrlList() - .stream() - .distinct() - .collect(Collectors.toCollection(ArrayList::new)) : null); - i.setRefereed(mapRefereed(ri.getRefereed())); - i.setProcessingchargeamount(mapStringField(ri.getProcessingchargeamount())); - i.setProcessingchargecurrency(mapStringField(ri.getProcessingchargecurrency())); - return i; - } - - private static Qualifier mapRefereed(FieldTypeProtos.StringField refereed) { - Qualifier q = new Qualifier(); - q.setClassid(refereed.getValue()); - q.setSchemename(refereed.getValue()); - q.setSchemeid(DNET_REVIEW_LEVELS); - q.setSchemename(DNET_REVIEW_LEVELS); - return q; - } - - private static List convertExternalRefs(OafProtos.Oaf oaf) { - ResultProtos.Result r = oaf.getEntity().getResult(); - if (r.getExternalReferenceCount() > 0) { - return r - .getExternalReferenceList() - .stream() - .map(e -> convertExtRef(e)) - .collect(Collectors.toList()); - } - return Lists.newArrayList(); - } - - private static ExternalReference convertExtRef(ResultProtos.Result.ExternalReference e) { - ExternalReference ex = new ExternalReference(); - ex.setUrl(e.getUrl()); - ex.setSitename(e.getSitename()); - ex.setRefidentifier(e.getRefidentifier()); - ex.setQuery(e.getQuery()); - ex.setQualifier(mapQualifier(e.getQualifier())); - ex.setLabel(e.getLabel()); - ex.setDataInfo(ex.getDataInfo()); - return ex; - } - - private static Organization convertOrganization(OafProtos.Oaf oaf) { - final OrganizationProtos.Organization.Metadata m = oaf.getEntity().getOrganization().getMetadata(); - final Organization org = setOaf(new Organization(), oaf); - setEntity(org, oaf); - org.setLegalshortname(mapStringField(m.getLegalshortname())); - org.setLegalname(mapStringField(m.getLegalname())); - org - .setAlternativeNames( - m - .getAlternativeNamesList() - .stream() - .map(ProtoConverter::mapStringField) - .collect(Collectors.toList())); - org.setWebsiteurl(mapStringField(m.getWebsiteurl())); - org.setLogourl(mapStringField(m.getLogourl())); - org.setEclegalbody(mapStringField(m.getEclegalbody())); - org.setEclegalperson(mapStringField(m.getEclegalperson())); - org.setEcnonprofit(mapStringField(m.getEcnonprofit())); - org.setEcresearchorganization(mapStringField(m.getEcresearchorganization())); - org.setEchighereducation(mapStringField(m.getEchighereducation())); - org - .setEcinternationalorganizationeurinterests( - mapStringField(m.getEcinternationalorganizationeurinterests())); - org.setEcinternationalorganization(mapStringField(m.getEcinternationalorganization())); - org.setEcenterprise(mapStringField(m.getEcenterprise())); - org.setEcsmevalidated(mapStringField(m.getEcsmevalidated())); - org.setEcnutscode(mapStringField(m.getEcnutscode())); - org.setCountry(mapQualifier(m.getCountry())); - - return org; - } - - private static Datasource convertDataSource(OafProtos.Oaf oaf) { - final DatasourceProtos.Datasource.Metadata m = oaf.getEntity().getDatasource().getMetadata(); - final Datasource datasource = setOaf(new Datasource(), oaf); - setEntity(datasource, oaf); - datasource - .setAccessinfopackage( - m - .getAccessinfopackageList() - .stream() - .map(ProtoConverter::mapStringField) - .collect(Collectors.toList())); - datasource.setCertificates(mapStringField(m.getCertificates())); - datasource.setCitationguidelineurl(mapStringField(m.getCitationguidelineurl())); - datasource.setContactemail(mapStringField(m.getContactemail())); - datasource.setDatabaseaccessrestriction(mapStringField(m.getDatabaseaccessrestriction())); - datasource.setDatabaseaccesstype(mapStringField(m.getDatabaseaccesstype())); - datasource.setDataprovider(mapBoolField(m.getDataprovider())); - datasource.setDatasourcetype(mapQualifier(m.getDatasourcetype())); - datasource.setDatauploadrestriction(mapStringField(m.getDatauploadrestriction())); - datasource.setCitationguidelineurl(mapStringField(m.getCitationguidelineurl())); - datasource.setDatauploadtype(mapStringField(m.getDatauploadtype())); - datasource.setDateofvalidation(mapStringField(m.getDateofvalidation())); - datasource.setDescription(mapStringField(m.getDescription())); - datasource.setEnglishname(mapStringField(m.getEnglishname())); - datasource.setLatitude(mapStringField(m.getLatitude())); - datasource.setLongitude(mapStringField(m.getLongitude())); - datasource.setLogourl(mapStringField(m.getLogourl())); - datasource.setMissionstatementurl(mapStringField(m.getMissionstatementurl())); - datasource.setNamespaceprefix(mapStringField(m.getNamespaceprefix())); - datasource - .setOdcontenttypes( - m - .getOdcontenttypesList() - .stream() - .map(ProtoConverter::mapStringField) - .collect(Collectors.toList())); - datasource - .setOdlanguages( - m - .getOdlanguagesList() - .stream() - .map(ProtoConverter::mapStringField) - .collect(Collectors.toList())); - datasource.setOdnumberofitems(mapStringField(m.getOdnumberofitems())); - datasource.setOdnumberofitemsdate(mapStringField(m.getOdnumberofitemsdate())); - datasource.setOdpolicies(mapStringField(m.getOdpolicies())); - datasource.setOfficialname(mapStringField(m.getOfficialname())); - datasource.setOpenairecompatibility(mapQualifier(m.getOpenairecompatibility())); - datasource.setPidsystems(mapStringField(m.getPidsystems())); - datasource - .setPolicies( - m.getPoliciesList().stream().map(ProtoConverter::mapKV).collect(Collectors.toList())); - datasource.setQualitymanagementkind(mapStringField(m.getQualitymanagementkind())); - datasource.setReleaseenddate(mapStringField(m.getReleaseenddate())); - datasource.setServiceprovider(mapBoolField(m.getServiceprovider())); - datasource.setReleasestartdate(mapStringField(m.getReleasestartdate())); - datasource - .setSubjects( - m - .getSubjectsList() - .stream() - .map(ProtoConverter::mapStructuredProperty) - .collect(Collectors.toList())); - datasource.setVersioning(mapBoolField(m.getVersioning())); - datasource.setWebsiteurl(mapStringField(m.getWebsiteurl())); - datasource.setJournal(mapJournal(m.getJournal())); - - return datasource; - } - - private static Project convertProject(OafProtos.Oaf oaf) { - final ProjectProtos.Project.Metadata m = oaf.getEntity().getProject().getMetadata(); - final Project project = setOaf(new Project(), oaf); - setEntity(project, oaf); - project.setAcronym(mapStringField(m.getAcronym())); - project.setCallidentifier(mapStringField(m.getCallidentifier())); - project.setCode(mapStringField(m.getCode())); - project.setContactemail(mapStringField(m.getContactemail())); - project.setContactfax(mapStringField(m.getContactfax())); - project.setContactfullname(mapStringField(m.getContactfullname())); - project.setContactphone(mapStringField(m.getContactphone())); - project.setContracttype(mapQualifier(m.getContracttype())); - project.setCurrency(mapStringField(m.getCurrency())); - project.setDuration(mapStringField(m.getDuration())); - project.setEcarticle29_3(mapStringField(m.getEcarticle293())); - project.setEcsc39(mapStringField(m.getEcsc39())); - project.setOamandatepublications(mapStringField(m.getOamandatepublications())); - project.setStartdate(mapStringField(m.getStartdate())); - project.setEnddate(mapStringField(m.getEnddate())); - project.setFundedamount(m.getFundedamount()); - project.setTotalcost(m.getTotalcost()); - project.setKeywords(mapStringField(m.getKeywords())); - project - .setSubjects( - m - .getSubjectsList() - .stream() - .map(sp -> mapStructuredProperty(sp)) - .collect(Collectors.toList())); - project.setTitle(mapStringField(m.getTitle())); - project.setWebsiteurl(mapStringField(m.getWebsiteurl())); - project - .setFundingtree( - m.getFundingtreeList().stream().map(f -> mapStringField(f)).collect(Collectors.toList())); - project.setJsonextrainfo(mapStringField(m.getJsonextrainfo())); - project.setSummary(mapStringField(m.getSummary())); - project.setOptional1(mapStringField(m.getOptional1())); - project.setOptional2(mapStringField(m.getOptional2())); - return project; - } - - private static Result convertResult(OafProtos.Oaf oaf) { - switch (oaf.getEntity().getResult().getMetadata().getResulttype().getClassid()) { - case "dataset": - return createDataset(oaf); - case "publication": - return createPublication(oaf); - case "software": - return createSoftware(oaf); - case "other": - return createORP(oaf); - default: - Result result = setOaf(new Result(), oaf); - setEntity(result, oaf); - return setResult(result, oaf); - } - } - - private static Software createSoftware(OafProtos.Oaf oaf) { - ResultProtos.Result.Metadata m = oaf.getEntity().getResult().getMetadata(); - Software software = setOaf(new Software(), oaf); - setEntity(software, oaf); - setResult(software, oaf); - - software - .setDocumentationUrl( - m - .getDocumentationUrlList() - .stream() - .map(ProtoConverter::mapStringField) - .collect(Collectors.toList())); - software - .setLicense( - m - .getLicenseList() - .stream() - .map(ProtoConverter::mapStructuredProperty) - .collect(Collectors.toList())); - software.setCodeRepositoryUrl(mapStringField(m.getCodeRepositoryUrl())); - software.setProgrammingLanguage(mapQualifier(m.getProgrammingLanguage())); - return software; - } - - private static OtherResearchProduct createORP(OafProtos.Oaf oaf) { - ResultProtos.Result.Metadata m = oaf.getEntity().getResult().getMetadata(); - OtherResearchProduct otherResearchProducts = setOaf(new OtherResearchProduct(), oaf); - setEntity(otherResearchProducts, oaf); - setResult(otherResearchProducts, oaf); - otherResearchProducts - .setContactperson( - m - .getContactpersonList() - .stream() - .map(ProtoConverter::mapStringField) - .collect(Collectors.toList())); - otherResearchProducts - .setContactgroup( - m - .getContactgroupList() - .stream() - .map(ProtoConverter::mapStringField) - .collect(Collectors.toList())); - otherResearchProducts - .setTool( - m.getToolList().stream().map(ProtoConverter::mapStringField).collect(Collectors.toList())); - - return otherResearchProducts; - } - - private static Publication createPublication(OafProtos.Oaf oaf) { - - ResultProtos.Result.Metadata m = oaf.getEntity().getResult().getMetadata(); - Publication publication = setOaf(new Publication(), oaf); - setEntity(publication, oaf); - setResult(publication, oaf); - publication.setJournal(mapJournal(m.getJournal())); - return publication; - } - - private static Dataset createDataset(OafProtos.Oaf oaf) { - - ResultProtos.Result.Metadata m = oaf.getEntity().getResult().getMetadata(); - Dataset dataset = setOaf(new Dataset(), oaf); - setEntity(dataset, oaf); - setResult(dataset, oaf); - dataset.setStoragedate(mapStringField(m.getStoragedate())); - dataset.setDevice(mapStringField(m.getDevice())); - dataset.setSize(mapStringField(m.getSize())); - dataset.setVersion(mapStringField(m.getVersion())); - dataset.setLastmetadataupdate(mapStringField(m.getLastmetadataupdate())); - dataset.setMetadataversionnumber(mapStringField(m.getMetadataversionnumber())); - dataset - .setGeolocation( - m - .getGeolocationList() - .stream() - .map(ProtoConverter::mapGeolocation) - .collect(Collectors.toList())); - return dataset; - } - - public static T setOaf(T oaf, OafProtos.Oaf o) { - oaf.setDataInfo(mapDataInfo(o.getDataInfo())); - oaf.setLastupdatetimestamp(o.getLastupdatetimestamp()); - return oaf; - } - - public static T setEntity(T entity, OafProtos.Oaf oaf) { - // setting Entity fields - final OafProtos.OafEntity e = oaf.getEntity(); - entity.setId(e.getId()); - entity.setOriginalId(e.getOriginalIdList()); - entity - .setCollectedfrom( - e.getCollectedfromList().stream().map(ProtoConverter::mapKV).collect(Collectors.toList())); - entity - .setPid( - e - .getPidList() - .stream() - .map(ProtoConverter::mapStructuredProperty) - .collect(Collectors.toList())); - entity.setDateofcollection(e.getDateofcollection()); - entity.setDateoftransformation(e.getDateoftransformation()); - entity - .setExtraInfo( - e - .getExtraInfoList() - .stream() - .map(ProtoConverter::mapExtraInfo) - .collect(Collectors.toList())); - return entity; - } - - public static T setResult(T entity, OafProtos.Oaf oaf) { - // setting Entity fields - final ResultProtos.Result.Metadata m = oaf.getEntity().getResult().getMetadata(); - entity - .setAuthor( - m.getAuthorList().stream().map(ProtoConverter::mapAuthor).collect(Collectors.toList())); - entity.setResulttype(mapQualifier(m.getResulttype())); - entity.setLanguage(mapQualifier(m.getLanguage())); - entity - .setCountry( - m - .getCountryList() - .stream() - .map(ProtoConverter::mapQualifierAsCountry) - .collect(Collectors.toList())); - entity - .setSubject( - m - .getSubjectList() - .stream() - .map(ProtoConverter::mapStructuredProperty) - .collect(Collectors.toList())); - entity - .setTitle( - m - .getTitleList() - .stream() - .map(ProtoConverter::mapStructuredProperty) - .collect(Collectors.toList())); - entity - .setRelevantdate( - m - .getRelevantdateList() - .stream() - .map(ProtoConverter::mapStructuredProperty) - .collect(Collectors.toList())); - entity - .setDescription( - m - .getDescriptionList() - .stream() - .map(ProtoConverter::mapStringField) - .collect(Collectors.toList())); - entity.setDateofacceptance(mapStringField(m.getDateofacceptance())); - entity.setPublisher(mapStringField(m.getPublisher())); - entity.setEmbargoenddate(mapStringField(m.getEmbargoenddate())); - entity - .setSource( - m - .getSourceList() - .stream() - .map(ProtoConverter::mapStringField) - .collect(Collectors.toList())); - entity - .setFulltext( - m - .getFulltextList() - .stream() - .map(ProtoConverter::mapStringField) - .collect(Collectors.toList())); - entity - .setFormat( - m - .getFormatList() - .stream() - .map(ProtoConverter::mapStringField) - .collect(Collectors.toList())); - entity - .setContributor( - m - .getContributorList() - .stream() - .map(ProtoConverter::mapStringField) - .collect(Collectors.toList())); - entity.setResourcetype(mapQualifier(m.getResourcetype())); - entity - .setCoverage( - m - .getCoverageList() - .stream() - .map(ProtoConverter::mapStringField) - .collect(Collectors.toList())); - entity - .setContext( - m.getContextList().stream().map(ProtoConverter::mapContext).collect(Collectors.toList())); - - entity.setBestaccessright(getBestAccessRights(oaf.getEntity().getResult().getInstanceList())); - - return entity; - } - - private static Qualifier getBestAccessRights(List instanceList) { - if (instanceList != null) { - final Optional min = instanceList - .stream() - .map(i -> i.getAccessright()) - .min(new LicenseComparator()); - - final Qualifier rights = min.isPresent() ? mapAccessRight(min.get()) : new Qualifier(); - - if (StringUtils.isBlank(rights.getClassid())) { - rights.setClassid(UNKNOWN); - } - if (StringUtils.isBlank(rights.getClassname()) - || UNKNOWN.equalsIgnoreCase(rights.getClassname())) { - rights.setClassname(NOT_AVAILABLE); - } - if (StringUtils.isBlank(rights.getSchemeid())) { - rights.setSchemeid(DNET_ACCESS_MODES); - } - if (StringUtils.isBlank(rights.getSchemename())) { - rights.setSchemename(DNET_ACCESS_MODES); - } - - return rights; - } - return null; - } - - private static Context mapContext(ResultProtos.Result.Context context) { - if (context == null || StringUtils.isBlank(context.getId())) { - return null; - } - final Context entity = new Context(); - entity.setId(context.getId()); - entity - .setDataInfo( - context - .getDataInfoList() - .stream() - .map(ProtoConverter::mapDataInfo) - .collect(Collectors.toList())); - return entity; - } - - public static KeyValue mapKV(FieldTypeProtos.KeyValue kv) { - if (kv == null || StringUtils.isBlank(kv.getKey()) & StringUtils.isBlank(kv.getValue())) { - return null; - } - - final KeyValue keyValue = new KeyValue(); - keyValue.setKey(kv.getKey()); - keyValue.setValue(kv.getValue()); - keyValue.setDataInfo(mapDataInfo(kv.getDataInfo())); - return keyValue; - } - - public static DataInfo mapDataInfo(FieldTypeProtos.DataInfo d) { - final DataInfo dataInfo = new DataInfo(); - dataInfo.setDeletedbyinference(d.getDeletedbyinference()); - dataInfo.setInferenceprovenance(d.getInferenceprovenance()); - dataInfo.setInferred(d.getInferred()); - dataInfo.setInvisible(d.getInvisible()); - dataInfo.setProvenanceaction(mapQualifier(d.getProvenanceaction())); - dataInfo.setTrust(d.getTrust()); - return dataInfo; - } - - public static Qualifier mapQualifier(FieldTypeProtos.Qualifier q) { - final Qualifier qualifier = new Qualifier(); - qualifier.setClassid(q.getClassid()); - qualifier.setClassname(q.getClassname()); - qualifier.setSchemeid(q.getSchemeid()); - qualifier.setSchemename(q.getSchemename()); - return qualifier; - } - - public static AccessRight mapAccessRight(FieldTypeProtos.Qualifier q) { - final AccessRight accessRight = new AccessRight(); - accessRight.setClassid(q.getClassid()); - accessRight.setClassname(q.getClassname()); - accessRight.setSchemeid(q.getSchemeid()); - accessRight.setSchemename(q.getSchemename()); - return accessRight; - } - - public static Country mapQualifierAsCountry(FieldTypeProtos.Qualifier q) { - final Country c = new Country(); - c.setClassid(q.getClassid()); - c.setClassname(q.getClassname()); - c.setSchemeid(q.getSchemeid()); - c.setSchemename(q.getSchemename()); - c.setDataInfo(mapDataInfo(q.getDataInfo())); - return c; - } - - public static StructuredProperty mapStructuredProperty(FieldTypeProtos.StructuredProperty sp) { - if (sp == null | StringUtils.isBlank(sp.getValue())) { - return null; - } - - final StructuredProperty structuredProperty = new StructuredProperty(); - structuredProperty.setValue(sp.getValue()); - structuredProperty.setQualifier(mapQualifier(sp.getQualifier())); - structuredProperty.setDataInfo(mapDataInfo(sp.getDataInfo())); - return structuredProperty; - } - - public static ExtraInfo mapExtraInfo(FieldTypeProtos.ExtraInfo extraInfo) { - final ExtraInfo entity = new ExtraInfo(); - entity.setName(extraInfo.getName()); - entity.setTypology(extraInfo.getTypology()); - entity.setProvenance(extraInfo.getProvenance()); - entity.setTrust(extraInfo.getTrust()); - entity.setValue(extraInfo.getValue()); - return entity; - } - - public static OAIProvenance mapOAIProvenance(FieldTypeProtos.OAIProvenance oaiProvenance) { - final OAIProvenance entity = new OAIProvenance(); - entity.setOriginDescription(mapOriginalDescription(oaiProvenance.getOriginDescription())); - return entity; - } - - public static OriginDescription mapOriginalDescription( - FieldTypeProtos.OAIProvenance.OriginDescription originDescription) { - final OriginDescription originDescriptionResult = new OriginDescription(); - originDescriptionResult.setHarvestDate(originDescription.getHarvestDate()); - originDescriptionResult.setAltered(originDescription.getAltered()); - originDescriptionResult.setBaseURL(originDescription.getBaseURL()); - originDescriptionResult.setIdentifier(originDescription.getIdentifier()); - originDescriptionResult.setDatestamp(originDescription.getDatestamp()); - originDescriptionResult.setMetadataNamespace(originDescription.getMetadataNamespace()); - return originDescriptionResult; - } - - public static Field mapStringField(FieldTypeProtos.StringField s) { - if (s == null || StringUtils.isBlank(s.getValue())) { - return null; - } - - final Field stringField = new Field<>(); - stringField.setValue(s.getValue()); - stringField.setDataInfo(mapDataInfo(s.getDataInfo())); - return stringField; - } - - public static Field mapBoolField(FieldTypeProtos.BoolField b) { - if (b == null) { - return null; - } - - final Field booleanField = new Field<>(); - booleanField.setValue(b.getValue()); - booleanField.setDataInfo(mapDataInfo(b.getDataInfo())); - return booleanField; - } - - public static Journal mapJournal(FieldTypeProtos.Journal j) { - final Journal journal = new Journal(); - journal.setConferencedate(j.getConferencedate()); - journal.setConferenceplace(j.getConferenceplace()); - journal.setEdition(j.getEdition()); - journal.setEp(j.getEp()); - journal.setIss(j.getIss()); - journal.setIssnLinking(j.getIssnLinking()); - journal.setIssnOnline(j.getIssnOnline()); - journal.setIssnPrinted(j.getIssnPrinted()); - journal.setName(j.getName()); - journal.setSp(j.getSp()); - journal.setVol(j.getVol()); - journal.setDataInfo(mapDataInfo(j.getDataInfo())); - return journal; - } - - public static Author mapAuthor(FieldTypeProtos.Author author) { - final Author entity = new Author(); - entity.setFullname(author.getFullname()); - entity.setName(author.getName()); - entity.setSurname(author.getSurname()); - entity.setRank(author.getRank()); - entity - .setPid( - author - .getPidList() - .stream() - .map( - kv -> { - final StructuredProperty sp = new StructuredProperty(); - sp.setValue(kv.getValue()); - final Qualifier q = new Qualifier(); - q.setClassid(kv.getKey()); - q.setClassname(kv.getKey()); - sp.setQualifier(q); - return sp; - }) - .collect(Collectors.toList())); - entity - .setAffiliation( - author - .getAffiliationList() - .stream() - .map(ProtoConverter::mapStringField) - .collect(Collectors.toList())); - return entity; - } - - public static GeoLocation mapGeolocation(ResultProtos.Result.GeoLocation geoLocation) { - final GeoLocation entity = new GeoLocation(); - entity.setPoint(geoLocation.getPoint()); - entity.setBox(geoLocation.getBox()); - entity.setPlace(geoLocation.getPlace()); - return entity; - } -} diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/TransformActions.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/TransformActions.java deleted file mode 100644 index 490668606..000000000 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/TransformActions.java +++ /dev/null @@ -1,172 +0,0 @@ - -package eu.dnetlib.dhp.actionmanager.migration; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.io.IOException; -import java.io.Serializable; -import java.util.LinkedList; -import java.util.Objects; -import java.util.Optional; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.SparkSession; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Splitter; -import com.google.common.collect.Lists; -import com.google.protobuf.InvalidProtocolBufferException; - -import eu.dnetlib.data.proto.OafProtos; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.action.AtomicAction; -import eu.dnetlib.dhp.schema.oaf.*; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import scala.Tuple2; - -public class TransformActions implements Serializable { - - private static final Logger log = LoggerFactory.getLogger(TransformActions.class); - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - private static final String SEPARATOR = "/"; - - public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - MigrateActionSet.class - .getResourceAsStream( - "/eu/dnetlib/dhp/actionmanager/migration/transform_actionsets_parameters.json"))); - parser.parseArgument(args); - - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - final String isLookupUrl = parser.get("isLookupUrl"); - log.info("isLookupUrl: {}", isLookupUrl); - - final String inputPaths = parser.get("inputPaths"); - - if (StringUtils.isBlank(inputPaths)) { - throw new RuntimeException("empty inputPaths"); - } - log.info("inputPaths: {}", inputPaths); - - final String targetBaseDir = getTargetBaseDir(isLookupUrl); - - SparkConf conf = new SparkConf(); - - runWithSparkSession( - conf, isSparkSessionManaged, spark -> transformActions(inputPaths, targetBaseDir, spark)); - } - - private static void transformActions(String inputPaths, String targetBaseDir, SparkSession spark) - throws IOException { - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - final FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration()); - - for (String sourcePath : Lists.newArrayList(Splitter.on(",").split(inputPaths))) { - - LinkedList pathQ = Lists.newLinkedList(Splitter.on(SEPARATOR).split(sourcePath)); - - final String rawset = pathQ.pollLast(); - final String actionSetDirectory = pathQ.pollLast(); - - final Path targetDirectory = new Path(targetBaseDir + SEPARATOR + actionSetDirectory + SEPARATOR + rawset); - - if (fs.exists(targetDirectory)) { - log.info("found target directory '{}", targetDirectory); - fs.delete(targetDirectory, true); - log.info("deleted target directory '{}", targetDirectory); - } - - log.info("transforming actions from '{}' to '{}'", sourcePath, targetDirectory); - - sc - .sequenceFile(sourcePath, Text.class, Text.class) - .map(a -> eu.dnetlib.actionmanager.actions.AtomicAction.fromJSON(a._2().toString())) - .map(TransformActions::doTransform) - .filter(Objects::nonNull) - .mapToPair( - a -> new Tuple2<>(a.getClazz().toString(), OBJECT_MAPPER.writeValueAsString(a))) - .mapToPair(t -> new Tuple2(new Text(t._1()), new Text(t._2()))) - .saveAsNewAPIHadoopFile( - targetDirectory.toString(), - Text.class, - Text.class, - SequenceFileOutputFormat.class, - sc.hadoopConfiguration()); - } - } - - private static AtomicAction doTransform(eu.dnetlib.actionmanager.actions.AtomicAction aa) - throws InvalidProtocolBufferException { - - // dedup similarity relations had empty target value, don't migrate them - if (aa.getTargetValue().length == 0) { - return null; - } - final OafProtos.Oaf proto_oaf = OafProtos.Oaf.parseFrom(aa.getTargetValue()); - final Oaf oaf = ProtoConverter.convert(proto_oaf); - switch (proto_oaf.getKind()) { - case entity: - switch (proto_oaf.getEntity().getType()) { - case datasource: - return new AtomicAction<>(Datasource.class, (Datasource) oaf); - case organization: - return new AtomicAction<>(Organization.class, (Organization) oaf); - case project: - return new AtomicAction<>(Project.class, (Project) oaf); - case result: - final String resulttypeid = proto_oaf - .getEntity() - .getResult() - .getMetadata() - .getResulttype() - .getClassid(); - switch (resulttypeid) { - case "publication": - return new AtomicAction<>(Publication.class, (Publication) oaf); - case "software": - return new AtomicAction<>(Software.class, (Software) oaf); - case "other": - return new AtomicAction<>(OtherResearchProduct.class, (OtherResearchProduct) oaf); - case "dataset": - return new AtomicAction<>(Dataset.class, (Dataset) oaf); - default: - // can be an update, where the resulttype is not specified - return new AtomicAction<>(Result.class, (Result) oaf); - } - default: - throw new IllegalArgumentException( - "invalid entity type: " + proto_oaf.getEntity().getType()); - } - case relation: - return new AtomicAction<>(Relation.class, (Relation) oaf); - default: - throw new IllegalArgumentException("invalid kind: " + proto_oaf.getKind()); - } - } - - private static String getTargetBaseDir(String isLookupUrl) throws ISLookUpException { - ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl); - String XQUERY = "collection('/db/DRIVER/ServiceResources/ActionManagerServiceResourceType')//SERVICE_PROPERTIES/PROPERTY[@key = 'basePath']/@value/string()"; - return isLookUp.getResourceProfileByQuery(XQUERY); - } -} diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/migration/migrate_actionsets_parameters.json b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/migration/migrate_actionsets_parameters.json deleted file mode 100644 index c7b931c44..000000000 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/migration/migrate_actionsets_parameters.json +++ /dev/null @@ -1,56 +0,0 @@ -[ - { - "paramName": "issm", - "paramLongName": "isSparkSessionManaged", - "paramDescription": "when true will stop SparkSession after job execution", - "paramRequired": false - }, - { - "paramName": "is", - "paramLongName": "isLookupUrl", - "paramDescription": "URL of the isLookUp Service", - "paramRequired": true - }, - { - "paramName": "sn", - "paramLongName": "sourceNameNode", - "paramDescription": "nameNode of the source cluster", - "paramRequired": true - }, - { - "paramName": "tn", - "paramLongName": "targetNameNode", - "paramDescription": "namoNode of the target cluster", - "paramRequired": true - }, - { - "paramName": "w", - "paramLongName": "workingDirectory", - "paramDescription": "working directory", - "paramRequired": true - }, - { - "paramName": "nm", - "paramLongName": "distcp_num_maps", - "paramDescription": "maximum number of map tasks used in the distcp process", - "paramRequired": true - }, - { - "paramName": "mm", - "paramLongName": "distcp_memory_mb", - "paramDescription": "memory for distcp action copying actionsets from remote cluster", - "paramRequired": true - }, - { - "paramName": "tt", - "paramLongName": "distcp_task_timeout", - "paramDescription": "timeout for distcp copying actions from remote cluster", - "paramRequired": true - }, - { - "paramName": "tr", - "paramLongName": "transform_only", - "paramDescription": "activate tranform-only mode. Only apply transformation step", - "paramRequired": true - } -] diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/migration/transform_actionsets_parameters.json b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/migration/transform_actionsets_parameters.json deleted file mode 100644 index 85c39c5b3..000000000 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/migration/transform_actionsets_parameters.json +++ /dev/null @@ -1,20 +0,0 @@ -[ - { - "paramName": "issm", - "paramLongName": "isSparkSessionManaged", - "paramDescription": "when true will stop SparkSession after job execution", - "paramRequired": false - }, - { - "paramName": "is", - "paramLongName": "isLookupUrl", - "paramDescription": "URL of the isLookUp Service", - "paramRequired": true - }, - { - "paramName": "i", - "paramLongName": "inputPaths", - "paramDescription": "URL of the isLookUp Service", - "paramRequired": true - } -] diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/migration/oozie_app/config-default.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/migration/oozie_app/config-default.xml deleted file mode 100644 index 9637ebdc6..000000000 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/migration/oozie_app/config-default.xml +++ /dev/null @@ -1,30 +0,0 @@ - - - jobTracker - yarnRM - - - nameNode - hdfs://nameservice1 - - - sourceNN - webhdfs://namenode2.hadoop.dm.openaire.eu:50071 - - - oozie.use.system.libpath - true - - - oozie.action.sharelib.for.spark - spark2 - - - spark2YarnHistoryServerAddress - http://iis-cdh5-test-gw.ocean.icm.edu.pl:18088 - - - spark2EventLogDir - /user/spark/applicationHistory - - \ No newline at end of file diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/migration/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/migration/oozie_app/workflow.xml deleted file mode 100644 index d8888de9d..000000000 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/migration/oozie_app/workflow.xml +++ /dev/null @@ -1,138 +0,0 @@ - - - - sourceNN - the source name node - - - isLookupUrl - the isLookup service endpoint - - - workingDirectory - working directory - - - distcp_memory_mb - 6144 - memory for distcp copying actionsets from remote cluster - - - distcp_task_timeout - 60000000 - timeout for distcp copying actions from remote cluster - - - distcp_num_maps - 1 - mmaximum number of map tasks used in the distcp process - - - transform_only - activate tranform-only mode. Only apply transformation step - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - - - oozieActionShareLibForSpark2 - oozie action sharelib for spark 2.* - - - spark2ExtraListeners - com.cloudera.spark.lineage.NavigatorAppListener - spark 2.* extra listeners classname - - - spark2SqlQueryExecutionListeners - com.cloudera.spark.lineage.NavigatorQueryListener - spark 2.* sql query execution listeners classname - - - spark2YarnHistoryServerAddress - spark 2.* yarn history server address - - - spark2EventLogDir - spark 2.* event log dir location - - - - - ${jobTracker} - ${nameNode} - - - mapreduce.job.queuename - ${queueName} - - - oozie.launcher.mapred.job.queue.name - ${oozieLauncherQueueName} - - - oozie.action.sharelib.for.spark - ${oozieActionShareLibForSpark2} - - - - - - - - - eu.dnetlib.dhp.actionmanager.migration.MigrateActionSet - -Dmapred.task.timeout=${distcp_task_timeout} - --isLookupUrl${isLookupUrl} - --sourceNameNode${sourceNN} - --targetNameNode${nameNode} - --workingDirectory${workingDirectory} - --distcp_num_maps${distcp_num_maps} - --distcp_memory_mb${distcp_memory_mb} - --distcp_task_timeout${distcp_task_timeout} - --transform_only${transform_only} - - - - - - - - - yarn - cluster - transform_actions - eu.dnetlib.dhp.actionmanager.migration.TransformActions - dhp-actionmanager-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --isLookupUrl${isLookupUrl} - --inputPaths${wf:actionData('migrate_actionsets')['target_paths']} - - - - - - - migrate_actions failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - \ No newline at end of file diff --git a/pom.xml b/pom.xml index 07bf7954d..ff1532101 100644 --- a/pom.xml +++ b/pom.xml @@ -136,8 +136,7 @@ eu.dnetlib.dhp dhp-schemas - 2.2.5-SNAPSHOT - stable_ids + ${dhp-schemas.version} org.apache.hadoop @@ -344,7 +343,7 @@ eu.dnetlib dnet-actionmanager-common - 6.0.5 + ${dnet-actionmanager-common.version} org.apache.hadoop @@ -355,29 +354,30 @@ eu.dnetlib dnet-actionmanager-api - [4.0.1,5.0.0) + ${dnet-actionmanager-api.version} + + + eu.dnetlib + cnr-misc-utils + + - - eu.dnetlib - dnet-openaire-data-protos - 3.9.8-proto250 - eu.dnetlib dnet-pace-core - 4.0.5 + ${dnet-pace-core.version} eu.dnetlib cnr-rmi-api - [2.0.0,3.0.0) + ${cnr-rmi-api.version} eu.dnetlib.dhp dnet-openaire-broker-common - ${dnet.openaire.broker.common} + ${dnet-openaire-broker-common.version} @@ -725,7 +725,12 @@ 3.3.3 3.4.2 [2.12,3.0) - 3.1.6 + [2.2.5] + [4.0.3] + [6.0.5] + [3.1.6] + [4.0.5] + [2.6.1] 7.5.0 4.7.2 1.20