diff --git a/dhp-workflows/dhp-aggregation/pom.xml b/dhp-workflows/dhp-aggregation/pom.xml
index 78831073f..09dac8349 100644
--- a/dhp-workflows/dhp-aggregation/pom.xml
+++ b/dhp-workflows/dhp-aggregation/pom.xml
@@ -24,6 +24,12 @@
eu.dnetlib.dhp
dhp-common
${project.version}
+
+
+ com.sun.xml.bind
+ jaxb-core
+
+
@@ -32,6 +38,49 @@
${project.version}
+
+ eu.dnetlib
+ dnet-actionmanager-common
+
+
+ eu.dnetlib
+ dnet-openaireplus-mapping-utils
+
+
+ saxonica
+ saxon
+
+
+ saxonica
+ saxon-dom
+
+
+ jgrapht
+ jgrapht
+
+
+ net.sf.ehcache
+ ehcache
+
+
+ org.springframework
+ spring-test
+
+
+ org.apache.*
+ *
+
+
+ apache
+ *
+
+
+
+
+ eu.dnetlib
+ dnet-openaire-data-protos
+
+
net.sf.saxon
Saxon-HE
@@ -55,6 +104,11 @@
org.mongodb
mongo-java-driver
+
+
+ org.apache.hadoop
+ hadoop-distcp
+
org.postgresql
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/LicenseComparator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/LicenseComparator.java
new file mode 100644
index 000000000..9d0e82aca
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/LicenseComparator.java
@@ -0,0 +1,49 @@
+package eu.dnetlib.dhp.migration.actions;
+
+import eu.dnetlib.data.proto.FieldTypeProtos.Qualifier;
+
+import java.util.Comparator;
+
+public class LicenseComparator implements Comparator {
+
+ @Override
+ public int compare(Qualifier left, Qualifier right) {
+
+ if (left == null && right == null) return 0;
+ if (left == null) return 1;
+ if (right == null) return -1;
+
+ String lClass = left.getClassid();
+ String rClass = right.getClassid();
+
+ if (lClass.equals(rClass)) return 0;
+
+ if (lClass.equals("OPEN SOURCE")) return -1;
+ if (rClass.equals("OPEN SOURCE")) return 1;
+
+ if (lClass.equals("OPEN")) return -1;
+ if (rClass.equals("OPEN")) return 1;
+
+ if (lClass.equals("6MONTHS")) return -1;
+ if (rClass.equals("6MONTHS")) return 1;
+
+ if (lClass.equals("12MONTHS")) return -1;
+ if (rClass.equals("12MONTHS")) return 1;
+
+ if (lClass.equals("EMBARGO")) return -1;
+ if (rClass.equals("EMBARGO")) return 1;
+
+ if (lClass.equals("RESTRICTED")) return -1;
+ if (rClass.equals("RESTRICTED")) return 1;
+
+ if (lClass.equals("CLOSED")) return -1;
+ if (rClass.equals("CLOSED")) return 1;
+
+ if (lClass.equals("UNKNOWN")) return -1;
+ if (rClass.equals("UNKNOWN")) return 1;
+
+ // Else (but unlikely), lexicographical ordering will do.
+ return lClass.compareTo(rClass);
+ }
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/MigrateActionSet.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/MigrateActionSet.java
new file mode 100644
index 000000000..487fac359
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/MigrateActionSet.java
@@ -0,0 +1,170 @@
+package eu.dnetlib.dhp.migration.actions;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.utils.ISLookupClientFactory;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.DistCp;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.util.ToolRunner;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class MigrateActionSet {
+
+ private static final Log log = LogFactory.getLog(MigrateActionSet.class);
+
+ private static final String SEPARATOR = "/";
+ private static final String TARGET_PATHS = "target_paths";
+ private static final String RAWSET_PREFIX = "rawset_";
+
+ private static Boolean DEFAULT_TRANSFORM_ONLY = false;
+
+ public static void main(String[] args) throws Exception {
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils.toString(MigrateActionSet.class.getResourceAsStream(
+ "/eu/dnetlib/dhp/migration/migrate_actionsets_parameters.json")));
+ parser.parseArgument(args);
+
+ new MigrateActionSet().run(parser);
+ }
+
+ private void run(ArgumentApplicationParser parser) throws Exception {
+
+ final String isLookupUrl = parser.get("isLookupUrl");
+ final String sourceNN = parser.get("sourceNameNode");
+ final String targetNN = parser.get("targetNameNode");
+ final String workDir = parser.get("workingDirectory");
+ final Integer distcp_num_maps = Integer.parseInt(parser.get("distcp_num_maps"));
+
+ final String distcp_memory_mb = parser.get("distcp_memory_mb");
+ final String distcp_task_timeout = parser.get("distcp_task_timeout");
+
+ final String transform_only_s = parser.get("transform_only");
+
+ log.info("transform only param: " + transform_only_s);
+
+ final Boolean transformOnly = Boolean.valueOf(parser.get("transform_only"));
+
+ log.info("transform only: " + transformOnly);
+
+ ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl);
+
+ Configuration conf = getConfiguration(distcp_task_timeout, distcp_memory_mb, distcp_num_maps);
+ FileSystem targetFS = FileSystem.get(conf);
+
+ Configuration sourceConf = getConfiguration(distcp_task_timeout, distcp_memory_mb, distcp_num_maps);
+ sourceConf.set(FileSystem.FS_DEFAULT_NAME_KEY, sourceNN);
+ FileSystem sourceFS = FileSystem.get(sourceConf);
+
+ Properties props = new Properties();
+
+ List targetPaths = new ArrayList<>();
+
+ final List sourcePaths = getSourcePaths(sourceNN, isLookUp);
+ log.info(String.format("paths to process:\n%s", sourcePaths.stream().map(p -> p.toString()).collect(Collectors.joining("\n"))));
+ for(Path source : sourcePaths) {
+
+ if (!sourceFS.exists(source)) {
+ log.warn(String.format("skipping unexisting path: %s", source));
+ } else {
+
+ LinkedList pathQ = Lists.newLinkedList(Splitter.on(SEPARATOR).split(source.toUri().getPath()));
+
+ final String rawSet = pathQ.pollLast();
+ log.info(String.format("got RAWSET: %s", rawSet));
+
+ if (StringUtils.isNotBlank(rawSet) && rawSet.startsWith(RAWSET_PREFIX)) {
+
+ final String actionSetDirectory = pathQ.pollLast();
+
+ final Path targetPath = new Path(targetNN + workDir + SEPARATOR + actionSetDirectory + SEPARATOR + rawSet);
+
+ log.info(String.format("using TARGET PATH: %s", targetPath));
+
+ if (!transformOnly) {
+ if (targetFS.exists(targetPath)) {
+ targetFS.delete(targetPath, true);
+ }
+ runDistcp(distcp_num_maps, distcp_memory_mb, distcp_task_timeout, conf, source, targetPath);
+ }
+
+ targetPaths.add(targetPath);
+ }
+ }
+ }
+
+ props.setProperty(TARGET_PATHS, targetPaths
+ .stream()
+ .map(p -> p.toString())
+ .collect(Collectors.joining(",")));
+ File file = new File(System.getProperty("oozie.action.output.properties"));
+
+ try(OutputStream os = new FileOutputStream(file)) {
+ props.store(os, "");
+ }
+ System.out.println(file.getAbsolutePath());
+ }
+
+ private void runDistcp(Integer distcp_num_maps, String distcp_memory_mb, String distcp_task_timeout, Configuration conf, Path source, Path targetPath) throws Exception {
+
+ final DistCpOptions op = new DistCpOptions(source, targetPath);
+ op.setMaxMaps(distcp_num_maps);
+ op.preserve(DistCpOptions.FileAttribute.BLOCKSIZE);
+ op.preserve(DistCpOptions.FileAttribute.REPLICATION);
+ op.preserve(DistCpOptions.FileAttribute.CHECKSUMTYPE);
+
+ int res = ToolRunner.run(new DistCp(conf, op), new String[]{
+ "-Dmapred.task.timeout=" + distcp_task_timeout,
+ "-Dmapreduce.map.memory.mb=" + distcp_memory_mb,
+ "-pb",
+ "-m " + distcp_num_maps,
+ source.toString(),
+ targetPath.toString()});
+
+ if (res != 0) {
+ throw new RuntimeException(String.format("distcp exited with code %s", res));
+ }
+ }
+
+ private Configuration getConfiguration(String distcp_task_timeout, String distcp_memory_mb, Integer distcp_num_maps) {
+ final Configuration conf = new Configuration();
+ conf.set("dfs.webhdfs.socket.connect-timeout", distcp_task_timeout);
+ conf.set("dfs.webhdfs.socket.read-timeout", distcp_task_timeout);
+ conf.set("dfs.http.client.retry.policy.enabled", "true");
+ conf.set("mapred.task.timeout", distcp_task_timeout);
+ conf.set("mapreduce.map.memory.mb", distcp_memory_mb);
+ conf.set("mapred.map.tasks", String.valueOf(distcp_num_maps));
+ return conf;
+ }
+
+ private List getSourcePaths(String sourceNN, ISLookUpService isLookUp) throws ISLookUpException {
+ String XQUERY = "distinct-values(\n" +
+ "let $basePath := collection('/db/DRIVER/ServiceResources/ActionManagerServiceResourceType')//SERVICE_PROPERTIES/PROPERTY[@key = 'basePath']/@value/string()\n" +
+ "for $x in collection('/db/DRIVER/ActionManagerSetDSResources/ActionManagerSetDSResourceType') \n" +
+ "let $setDir := $x//SET/@directory/string()\n" +
+ "let $rawSet := $x//RAW_SETS/LATEST/@id/string()\n" +
+ "return concat($basePath, '/', $setDir, '/', $rawSet))";
+
+ log.info(String.format("running xquery:\n%s", XQUERY));
+ return isLookUp.quickSearchProfile(XQUERY)
+ .stream()
+ .map(p -> sourceNN + p)
+ .map(Path::new)
+ .collect(Collectors.toList());
+ }
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/ProtoConverter.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/ProtoConverter.java
new file mode 100644
index 000000000..a7e70ee81
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/ProtoConverter.java
@@ -0,0 +1,580 @@
+package eu.dnetlib.dhp.migration.actions;
+
+import com.google.common.collect.Lists;
+import com.googlecode.protobuf.format.JsonFormat;
+import eu.dnetlib.data.proto.*;
+import eu.dnetlib.dhp.schema.oaf.*;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class ProtoConverter implements Serializable {
+
+ public static final String UNKNOWN = "UNKNOWN";
+ public static final String NOT_AVAILABLE = "not available";
+ public static final String DNET_ACCESS_MODES = "dnet:access_modes";
+
+ public static Oaf convert(OafProtos.Oaf oaf) {
+ try {
+ switch (oaf.getKind()) {
+ case entity:
+ return convertEntity(oaf);
+ case relation:
+ return convertRelation(oaf);
+ default:
+ throw new IllegalArgumentException("invalid kind " + oaf.getKind());
+ }
+ } catch (Throwable e) {
+ throw new RuntimeException("error on getting " + JsonFormat.printToString(oaf), e);
+ }
+ }
+
+ private static Relation convertRelation(OafProtos.Oaf oaf) {
+ final OafProtos.OafRel r = oaf.getRel();
+ final Relation rel = new Relation();
+ rel.setDataInfo(mapDataInfo(oaf.getDataInfo()));
+ rel.setLastupdatetimestamp(oaf.getLastupdatetimestamp());
+ rel.setSource(r.getSource());
+ rel.setTarget(r.getTarget());
+ rel.setRelType(r.getRelType().toString());
+ rel.setSubRelType(r.getSubRelType().toString());
+ rel.setRelClass(r.getRelClass());
+ rel.setCollectedFrom(r.getCollectedfromCount() > 0 ?
+ r.getCollectedfromList().stream()
+ .map(kv -> mapKV(kv))
+ .collect(Collectors.toList()) : null);
+ return rel;
+ }
+
+ private static OafEntity convertEntity(OafProtos.Oaf oaf) {
+
+ switch (oaf.getEntity().getType()) {
+ case result:
+ final Result r = convertResult(oaf);
+ r.setInstance(convertInstances(oaf));
+ return r;
+ case project:
+ return convertProject(oaf);
+ case datasource:
+ return convertDataSource(oaf);
+ case organization:
+ return convertOrganization(oaf);
+ default:
+ throw new RuntimeException("received unknown type");
+ }
+ }
+
+ private static List convertInstances(OafProtos.Oaf oaf) {
+
+ final ResultProtos.Result r = oaf.getEntity().getResult();
+ if (r.getInstanceCount() > 0) {
+ return r.getInstanceList()
+ .stream()
+ .map(i -> convertInstance(i))
+ .collect(Collectors.toList());
+ }
+ return Lists.newArrayList();
+ }
+
+ private static Instance convertInstance(ResultProtos.Result.Instance ri) {
+ final Instance i = new Instance();
+ i.setAccessright(mapQualifier(ri.getAccessright()));
+ i.setCollectedfrom(mapKV(ri.getCollectedfrom()));
+ i.setDateofacceptance(mapStringField(ri.getDateofacceptance()));
+ i.setDistributionlocation(ri.getDistributionlocation());
+ i.setHostedby(mapKV(ri.getHostedby()));
+ i.setInstancetype(mapQualifier(ri.getInstancetype()));
+ i.setLicense(mapStringField(ri.getLicense()));
+ i.setUrl(ri.getUrlList());
+ i.setRefereed(mapStringField(ri.getRefereed()));
+ i.setProcessingchargeamount(mapStringField(ri.getProcessingchargeamount()));
+ i.setProcessingchargecurrency(mapStringField(ri.getProcessingchargecurrency()));
+ return i;
+ }
+
+ private static Organization convertOrganization(OafProtos.Oaf oaf) {
+ final OrganizationProtos.Organization.Metadata m = oaf.getEntity().getOrganization().getMetadata();
+ final Organization org = setOaf(new Organization(), oaf);
+ setEntity(org, oaf);
+ org.setLegalshortname(mapStringField(m.getLegalshortname()));
+ org.setLegalname(mapStringField(m.getLegalname()));
+ org.setAlternativeNames(m.getAlternativeNamesList().
+ stream()
+ .map(ProtoConverter::mapStringField)
+ .collect(Collectors.toList()));
+ org.setWebsiteurl(mapStringField(m.getWebsiteurl()));
+ org.setLogourl(mapStringField(m.getLogourl()));
+ org.setEclegalbody(mapStringField(m.getEclegalbody()));
+ org.setEclegalperson(mapStringField(m.getEclegalperson()));
+ org.setEcnonprofit(mapStringField(m.getEcnonprofit()));
+ org.setEcresearchorganization(mapStringField(m.getEcresearchorganization()));
+ org.setEchighereducation(mapStringField(m.getEchighereducation()));
+ org.setEcinternationalorganizationeurinterests(mapStringField(m.getEcinternationalorganizationeurinterests()));
+ org.setEcinternationalorganization(mapStringField(m.getEcinternationalorganization()));
+ org.setEcenterprise(mapStringField(m.getEcenterprise()));
+ org.setEcsmevalidated(mapStringField(m.getEcsmevalidated()));
+ org.setEcnutscode(mapStringField(m.getEcnutscode()));
+ org.setCountry(mapQualifier(m.getCountry()));
+
+ return org;
+ }
+
+ private static Datasource convertDataSource(OafProtos.Oaf oaf) {
+ final DatasourceProtos.Datasource.Metadata m = oaf.getEntity().getDatasource().getMetadata();
+ final Datasource datasource = setOaf(new Datasource(), oaf);
+ setEntity(datasource, oaf);
+ datasource.setAccessinfopackage(m.getAccessinfopackageList()
+ .stream()
+ .map(ProtoConverter::mapStringField)
+ .collect(Collectors.toList()));
+ datasource.setCertificates(mapStringField(m.getCertificates()));
+ datasource.setCitationguidelineurl(mapStringField(m.getCitationguidelineurl()));
+ datasource.setContactemail(mapStringField(m.getContactemail()));
+ datasource.setDatabaseaccessrestriction(mapStringField(m.getDatabaseaccessrestriction()));
+ datasource.setDatabaseaccesstype(mapStringField(m.getDatabaseaccesstype()));
+ datasource.setDataprovider(mapBoolField(m.getDataprovider()));
+ datasource.setDatasourcetype(mapQualifier(m.getDatasourcetype()));
+ datasource.setDatauploadrestriction(mapStringField(m.getDatauploadrestriction()));
+ datasource.setCitationguidelineurl(mapStringField(m.getCitationguidelineurl()));
+ datasource.setDatauploadtype(mapStringField(m.getDatauploadtype()));
+ datasource.setDateofvalidation(mapStringField(m.getDateofvalidation()));
+ datasource.setDescription(mapStringField(m.getDescription()));
+ datasource.setEnglishname(mapStringField(m.getEnglishname()));
+ datasource.setLatitude(mapStringField(m.getLatitude()));
+ datasource.setLongitude(mapStringField(m.getLongitude()));
+ datasource.setLogourl(mapStringField(m.getLogourl()));
+ datasource.setMissionstatementurl(mapStringField(m.getMissionstatementurl()));
+ datasource.setNamespaceprefix(mapStringField(m.getNamespaceprefix()));
+ datasource.setOdcontenttypes(m.getOdcontenttypesList()
+ .stream()
+ .map(ProtoConverter::mapStringField)
+ .collect(Collectors.toList()));
+ datasource.setOdlanguages(m.getOdlanguagesList()
+ .stream()
+ .map(ProtoConverter::mapStringField)
+ .collect(Collectors.toList()));
+ datasource.setOdnumberofitems(mapStringField(m.getOdnumberofitems()));
+ datasource.setOdnumberofitemsdate(mapStringField(m.getOdnumberofitemsdate()));
+ datasource.setOdpolicies(mapStringField(m.getOdpolicies()));
+ datasource.setOfficialname(mapStringField(m.getOfficialname()));
+ datasource.setOpenairecompatibility(mapQualifier(m.getOpenairecompatibility()));
+ datasource.setPidsystems(mapStringField(m.getPidsystems()));
+ datasource.setPolicies(m.getPoliciesList()
+ .stream()
+ .map(ProtoConverter::mapKV)
+ .collect(Collectors.toList()));
+ datasource.setQualitymanagementkind(mapStringField(m.getQualitymanagementkind()));
+ datasource.setReleaseenddate(mapStringField(m.getReleaseenddate()));
+ datasource.setServiceprovider(mapBoolField(m.getServiceprovider()));
+ datasource.setReleasestartdate(mapStringField(m.getReleasestartdate()));
+ datasource.setSubjects(m.getSubjectsList()
+ .stream()
+ .map(ProtoConverter::mapStructuredProperty)
+ .collect(Collectors.toList()));
+ datasource.setVersioning(mapBoolField(m.getVersioning()));
+ datasource.setWebsiteurl(mapStringField(m.getWebsiteurl()));
+ datasource.setJournal(mapJournal(m.getJournal()));
+
+
+ return datasource;
+ }
+
+ private static Project convertProject(OafProtos.Oaf oaf) {
+ final ProjectProtos.Project.Metadata m = oaf.getEntity().getProject().getMetadata();
+ final Project project = setOaf(new Project(), oaf);
+ setEntity(project, oaf);
+ project.setAcronym(mapStringField(m.getAcronym()));
+ project.setCallidentifier(mapStringField(m.getCallidentifier()));
+ project.setCode(mapStringField(m.getCode()));
+ project.setContactemail(mapStringField(m.getContactemail()));
+ project.setContactfax(mapStringField(m.getContactfax()));
+ project.setContactfullname(mapStringField(m.getContactfullname()));
+ project.setContactphone(mapStringField(m.getContactphone()));
+ project.setContracttype(mapQualifier(m.getContracttype()));
+ project.setCurrency(mapStringField(m.getCurrency()));
+ project.setDuration(mapStringField(m.getDuration()));
+ project.setEcarticle29_3(mapStringField(m.getEcarticle293()));
+ project.setEcsc39(mapStringField(m.getEcsc39()));
+ project.setOamandatepublications(mapStringField(m.getOamandatepublications()));
+ project.setStartdate(mapStringField(m.getStartdate()));
+ project.setEnddate(mapStringField(m.getEnddate()));
+ project.setFundedamount(m.getFundedamount());
+ project.setTotalcost(m.getTotalcost());
+ project.setKeywords(mapStringField(m.getKeywords()));
+ project.setSubjects(m.getSubjectsList().stream()
+ .map(sp -> mapStructuredProperty(sp))
+ .collect(Collectors.toList()));
+ project.setTitle(mapStringField(m.getTitle()));
+ project.setWebsiteurl(mapStringField(m.getWebsiteurl()));
+ project.setFundingtree(m.getFundingtreeList().stream()
+ .map(f -> mapStringField(f))
+ .collect(Collectors.toList()));
+ project.setJsonextrainfo(mapStringField(m.getJsonextrainfo()));
+ project.setSummary(mapStringField(m.getSummary()));
+ project.setOptional1(mapStringField(m.getOptional1()));
+ project.setOptional2(mapStringField(m.getOptional2()));
+ return project;
+ }
+
+ private static Result convertResult(OafProtos.Oaf oaf) {
+ switch (oaf.getEntity().getResult().getMetadata().getResulttype().getClassid()) {
+ case "dataset":
+ return createDataset(oaf);
+ case "publication":
+ return createPublication(oaf);
+ case "software":
+ return createSoftware(oaf);
+ case "other":
+ return createORP(oaf);
+ default:
+ Result result = setOaf(new Result(), oaf);
+ setEntity(result, oaf);
+ return setResult(result, oaf);
+ }
+ }
+
+ private static Software createSoftware(OafProtos.Oaf oaf) {
+ ResultProtos.Result.Metadata m = oaf.getEntity().getResult().getMetadata();
+ Software software = setOaf(new Software(), oaf);
+ setEntity(software, oaf);
+ setResult(software, oaf);
+
+ software.setDocumentationUrl(m.getDocumentationUrlList()
+ .stream()
+ .map(ProtoConverter::mapStringField)
+ .collect(Collectors.toList()));
+ software.setLicense(m.getLicenseList()
+ .stream()
+ .map(ProtoConverter::mapStructuredProperty)
+ .collect(Collectors.toList()));
+ software.setCodeRepositoryUrl(mapStringField(m.getCodeRepositoryUrl()));
+ software.setProgrammingLanguage(mapQualifier(m.getProgrammingLanguage()));
+ return software;
+ }
+
+ private static OtherResearchProduct createORP(OafProtos.Oaf oaf) {
+ ResultProtos.Result.Metadata m = oaf.getEntity().getResult().getMetadata();
+ OtherResearchProduct otherResearchProducts = setOaf(new OtherResearchProduct(), oaf);
+ setEntity(otherResearchProducts, oaf);
+ setResult(otherResearchProducts, oaf);
+ otherResearchProducts.setContactperson(m.getContactpersonList()
+ .stream()
+ .map(ProtoConverter::mapStringField)
+ .collect(Collectors.toList()));
+ otherResearchProducts.setContactgroup(m.getContactgroupList()
+ .stream()
+ .map(ProtoConverter::mapStringField)
+ .collect(Collectors.toList()));
+ otherResearchProducts.setTool(m.getToolList()
+ .stream()
+ .map(ProtoConverter::mapStringField)
+ .collect(Collectors.toList()));
+
+ return otherResearchProducts;
+ }
+
+ private static Publication createPublication(OafProtos.Oaf oaf) {
+
+ ResultProtos.Result.Metadata m = oaf.getEntity().getResult().getMetadata();
+ Publication publication = setOaf(new Publication(), oaf);
+ setEntity(publication, oaf);
+ setResult(publication, oaf);
+ publication.setJournal(mapJournal(m.getJournal()));
+ return publication;
+ }
+
+ private static Dataset createDataset(OafProtos.Oaf oaf) {
+
+ ResultProtos.Result.Metadata m = oaf.getEntity().getResult().getMetadata();
+ Dataset dataset = setOaf(new Dataset(), oaf);
+ setEntity(dataset, oaf);
+ setResult(dataset, oaf);
+ dataset.setStoragedate(mapStringField(m.getStoragedate()));
+ dataset.setDevice(mapStringField(m.getDevice()));
+ dataset.setSize(mapStringField(m.getSize()));
+ dataset.setVersion(mapStringField(m.getVersion()));
+ dataset.setLastmetadataupdate(mapStringField(m.getLastmetadataupdate()));
+ dataset.setMetadataversionnumber(mapStringField(m.getMetadataversionnumber()));
+ dataset.setGeolocation(m.getGeolocationList()
+ .stream()
+ .map(ProtoConverter::mapGeolocation)
+ .collect(Collectors.toList()));
+ return dataset;
+
+ }
+
+ public static T setOaf(T oaf, OafProtos.Oaf o) {
+ oaf.setDataInfo(mapDataInfo(o.getDataInfo()));
+ oaf.setLastupdatetimestamp(o.getLastupdatetimestamp());
+ return oaf;
+ }
+
+ public static T setEntity(T entity, OafProtos.Oaf oaf) {
+ //setting Entity fields
+ final OafProtos.OafEntity e = oaf.getEntity();
+ entity.setId(e.getId());
+ entity.setOriginalId(e.getOriginalIdList());
+ entity.setCollectedfrom(e.getCollectedfromList()
+ .stream()
+ .map(ProtoConverter::mapKV)
+ .collect(Collectors.toList()));
+ entity.setPid(e.getPidList().stream()
+ .map(ProtoConverter::mapStructuredProperty)
+ .collect(Collectors.toList()));
+ entity.setDateofcollection(e.getDateofcollection());
+ entity.setDateoftransformation(e.getDateoftransformation());
+ entity.setExtraInfo(e.getExtraInfoList()
+ .stream()
+ .map(ProtoConverter::mapExtraInfo)
+ .collect(Collectors.toList()));
+ return entity;
+ }
+
+ public static T setResult(T entity, OafProtos.Oaf oaf) {
+ //setting Entity fields
+ final ResultProtos.Result.Metadata m = oaf.getEntity().getResult().getMetadata();
+ entity.setAuthor(m.getAuthorList()
+ .stream()
+ .map(ProtoConverter::mapAuthor)
+ .collect(Collectors.toList()));
+ entity.setResulttype(mapQualifier(m.getResulttype()));
+ entity.setLanguage(mapQualifier(m.getLanguage()));
+ entity.setCountry(m.getCountryList()
+ .stream()
+ .map(ProtoConverter::mapQualifierAsCountry)
+ .collect(Collectors.toList()));
+ entity.setSubject(m.getSubjectList()
+ .stream()
+ .map(ProtoConverter::mapStructuredProperty)
+ .collect(Collectors.toList()));
+ entity.setTitle(m.getTitleList()
+ .stream()
+ .map(ProtoConverter::mapStructuredProperty)
+ .collect(Collectors.toList()));
+ entity.setRelevantdate(m.getRelevantdateList()
+ .stream()
+ .map(ProtoConverter::mapStructuredProperty)
+ .collect(Collectors.toList()));
+ entity.setDescription(m.getDescriptionList()
+ .stream()
+ .map(ProtoConverter::mapStringField)
+ .collect(Collectors.toList()));
+ entity.setDateofacceptance(mapStringField(m.getDateofacceptance()));
+ entity.setPublisher(mapStringField(m.getPublisher()));
+ entity.setEmbargoenddate(mapStringField(m.getEmbargoenddate()));
+ entity.setSource(m.getSourceList()
+ .stream()
+ .map(ProtoConverter::mapStringField)
+ .collect(Collectors.toList()));
+ entity.setFulltext(m.getFulltextList()
+ .stream()
+ .map(ProtoConverter::mapStringField)
+ .collect(Collectors.toList()));
+ entity.setFormat(m.getFormatList()
+ .stream()
+ .map(ProtoConverter::mapStringField)
+ .collect(Collectors.toList()));
+ entity.setContributor(m.getContributorList()
+ .stream()
+ .map(ProtoConverter::mapStringField)
+ .collect(Collectors.toList()));
+ entity.setResourcetype(mapQualifier(m.getResourcetype()));
+ entity.setCoverage(m.getCoverageList()
+ .stream()
+ .map(ProtoConverter::mapStringField)
+ .collect(Collectors.toList()));
+ entity.setContext(m.getContextList()
+ .stream()
+ .map(ProtoConverter::mapContext)
+ .collect(Collectors.toList()));
+
+ entity.setBestaccessright(getBestAccessRights(oaf.getEntity().getResult().getInstanceList()));
+
+ return entity;
+ }
+
+ private static Qualifier getBestAccessRights(List instanceList) {
+ if (instanceList != null) {
+ final Optional min = instanceList.stream()
+ .map(i -> i.getAccessright()).min(new LicenseComparator());
+
+ final Qualifier rights = min.isPresent() ? mapQualifier(min.get()) : new Qualifier();
+
+ if (StringUtils.isBlank(rights.getClassid())) {
+ rights.setClassid(UNKNOWN);
+ }
+ if (StringUtils.isBlank(rights.getClassname()) || UNKNOWN.equalsIgnoreCase(rights.getClassname())) {
+ rights.setClassname(NOT_AVAILABLE);
+ }
+ if (StringUtils.isBlank(rights.getSchemeid())) {
+ rights.setSchemeid(DNET_ACCESS_MODES);
+ }
+ if (StringUtils.isBlank(rights.getSchemename())) {
+ rights.setSchemename(DNET_ACCESS_MODES);
+ }
+
+ return rights;
+ }
+ return null;
+ }
+
+ private static Context mapContext(ResultProtos.Result.Context context) {
+
+ final Context entity = new Context();
+ entity.setId(context.getId());
+ entity.setDataInfo(context.getDataInfoList()
+ .stream()
+ .map(ProtoConverter::mapDataInfo)
+ .collect(Collectors.toList()));
+ return entity;
+ }
+
+
+ public static KeyValue mapKV(FieldTypeProtos.KeyValue kv) {
+ final KeyValue keyValue = new KeyValue();
+ keyValue.setKey(kv.getKey());
+ keyValue.setValue(kv.getValue());
+ keyValue.setDataInfo(mapDataInfo(kv.getDataInfo()));
+ return keyValue;
+ }
+
+ public static DataInfo mapDataInfo(FieldTypeProtos.DataInfo d) {
+ final DataInfo dataInfo = new DataInfo();
+ dataInfo.setDeletedbyinference(d.getDeletedbyinference());
+ dataInfo.setInferenceprovenance(d.getInferenceprovenance());
+ dataInfo.setInferred(d.getInferred());
+ dataInfo.setInvisible(d.getInvisible());
+ dataInfo.setProvenanceaction(mapQualifier(d.getProvenanceaction()));
+ dataInfo.setTrust(d.getTrust());
+ return dataInfo;
+ }
+
+ public static Qualifier mapQualifier(FieldTypeProtos.Qualifier q) {
+ final Qualifier qualifier = new Qualifier();
+ qualifier.setClassid(q.getClassid());
+ qualifier.setClassname(q.getClassname());
+ qualifier.setSchemeid(q.getSchemeid());
+ qualifier.setSchemename(q.getSchemename());
+ return qualifier;
+ }
+
+ public static Country mapQualifierAsCountry(FieldTypeProtos.Qualifier q) {
+ final Country c = new Country();
+ c.setClassid(q.getClassid());
+ c.setClassname(q.getClassname());
+ c.setSchemeid(q.getSchemeid());
+ c.setSchemename(q.getSchemename());
+ c.setDataInfo(mapDataInfo(q.getDataInfo()));
+ return c;
+ }
+
+ public static StructuredProperty mapStructuredProperty(FieldTypeProtos.StructuredProperty sp) {
+ final StructuredProperty structuredProperty = new StructuredProperty();
+ structuredProperty.setValue(sp.getValue());
+ structuredProperty.setQualifier(mapQualifier(sp.getQualifier()));
+ structuredProperty.setDataInfo(mapDataInfo(sp.getDataInfo()));
+ return structuredProperty;
+ }
+
+ public static ExtraInfo mapExtraInfo(FieldTypeProtos.ExtraInfo extraInfo) {
+ final ExtraInfo entity = new ExtraInfo();
+ entity.setName(extraInfo.getName());
+ entity.setTypology(extraInfo.getTypology());
+ entity.setProvenance(extraInfo.getProvenance());
+ entity.setTrust(extraInfo.getTrust());
+ entity.setValue(extraInfo.getValue());
+ return entity;
+ }
+
+ public static OAIProvenance mapOAIProvenance(FieldTypeProtos.OAIProvenance oaiProvenance) {
+ final OAIProvenance entity = new OAIProvenance();
+ entity.setOriginDescription(mapOriginalDescription(oaiProvenance.getOriginDescription()));
+ return entity;
+ }
+
+ public static OriginDescription mapOriginalDescription(FieldTypeProtos.OAIProvenance.OriginDescription originDescription) {
+ final OriginDescription originDescriptionResult = new OriginDescription();
+ originDescriptionResult.setHarvestDate(originDescription.getHarvestDate());
+ originDescriptionResult.setAltered(originDescription.getAltered());
+ originDescriptionResult.setBaseURL(originDescription.getBaseURL());
+ originDescriptionResult.setIdentifier(originDescription.getIdentifier());
+ originDescriptionResult.setDatestamp(originDescription.getDatestamp());
+ originDescriptionResult.setMetadataNamespace(originDescription.getMetadataNamespace());
+ return originDescriptionResult;
+ }
+
+ public static Field mapStringField(FieldTypeProtos.StringField s) {
+ final Field stringField = new Field<>();
+ stringField.setValue(s.getValue());
+ stringField.setDataInfo(mapDataInfo(s.getDataInfo()));
+ return stringField;
+ }
+
+ public static Field mapBoolField(FieldTypeProtos.BoolField b) {
+ final Field booleanField = new Field<>();
+ booleanField.setValue(b.getValue());
+ booleanField.setDataInfo(mapDataInfo(b.getDataInfo()));
+ return booleanField;
+ }
+
+ public static Field mapIntField(FieldTypeProtos.IntField b) {
+ final Field entity = new Field<>();
+ entity.setValue(b.getValue());
+ entity.setDataInfo(mapDataInfo(b.getDataInfo()));
+ return entity;
+ }
+
+ public static Journal mapJournal(FieldTypeProtos.Journal j) {
+ final Journal journal = new Journal();
+ journal.setConferencedate(j.getConferencedate());
+ journal.setConferenceplace(j.getConferenceplace());
+ journal.setEdition(j.getEdition());
+ journal.setEp(j.getEp());
+ journal.setIss(j.getIss());
+ journal.setIssnLinking(j.getIssnLinking());
+ journal.setIssnOnline(j.getIssnOnline());
+ journal.setIssnPrinted(j.getIssnPrinted());
+ journal.setName(j.getName());
+ journal.setSp(j.getSp());
+ journal.setVol(j.getVol());
+ journal.setDataInfo(mapDataInfo(j.getDataInfo()));
+ return journal;
+ }
+
+ public static Author mapAuthor(FieldTypeProtos.Author author) {
+ final Author entity = new Author();
+ entity.setFullname(author.getFullname());
+ entity.setName(author.getName());
+ entity.setSurname(author.getSurname());
+ entity.setRank(author.getRank());
+ entity.setPid(author.getPidList()
+ .stream()
+ .map(kv -> {
+ final StructuredProperty sp = new StructuredProperty();
+ sp.setValue(kv.getValue());
+ final Qualifier q = new Qualifier();
+ q.setClassid(kv.getKey());
+ q.setClassname(kv.getKey());
+ sp.setQualifier(q);
+ return sp;
+ })
+ .collect(Collectors.toList()));
+ entity.setAffiliation(author.getAffiliationList()
+ .stream()
+ .map(ProtoConverter::mapStringField)
+ .collect(Collectors.toList()));
+ return entity;
+
+ }
+
+ public static GeoLocation mapGeolocation(ResultProtos.Result.GeoLocation geoLocation) {
+ final GeoLocation entity = new GeoLocation();
+ entity.setPoint(geoLocation.getPoint());
+ entity.setBox(geoLocation.getBox());
+ entity.setPlace(geoLocation.getPlace());
+ return entity;
+ }
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/TransformActions.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/TransformActions.java
new file mode 100644
index 000000000..9b6e7654f
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/TransformActions.java
@@ -0,0 +1,120 @@
+package eu.dnetlib.dhp.migration.actions;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.protobuf.InvalidProtocolBufferException;
+import eu.dnetlib.actionmanager.actions.AtomicAction;
+import eu.dnetlib.data.proto.OafProtos;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.schema.oaf.Oaf;
+import eu.dnetlib.dhp.utils.ISLookupClientFactory;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.LinkedList;
+
+public class TransformActions implements Serializable {
+
+ private static final Log log = LogFactory.getLog(TransformActions.class);
+ private static final String SEPARATOR = "/";
+
+ public static void main(String[] args) throws Exception {
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils.toString(MigrateActionSet.class.getResourceAsStream(
+ "/eu/dnetlib/dhp/migration/transform_actionsets_parameters.json")));
+ parser.parseArgument(args);
+
+ new TransformActions().run(parser);
+ }
+
+ private void run(ArgumentApplicationParser parser) throws ISLookUpException, IOException {
+
+ final String isLookupUrl = parser.get("isLookupUrl");
+ log.info("isLookupUrl: " + isLookupUrl);
+
+ final String inputPaths = parser.get("inputPaths");
+
+ if (StringUtils.isBlank(inputPaths)) {
+ throw new RuntimeException("empty inputPaths");
+ }
+ log.info("inputPaths: " + inputPaths);
+
+ final String targetBaseDir = getTargetBaseDir(isLookupUrl);
+
+ try(SparkSession spark = getSparkSession(parser)) {
+ final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
+ final FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());
+
+ for(String sourcePath : Lists.newArrayList(Splitter.on(",").split(inputPaths))) {
+
+ LinkedList pathQ = Lists.newLinkedList(Splitter.on(SEPARATOR).split(sourcePath));
+
+ final String rawset = pathQ.pollLast();
+ final String actionSetDirectory = pathQ.pollLast();
+
+ final Path targetDirectory = new Path(targetBaseDir + SEPARATOR + actionSetDirectory + SEPARATOR + rawset);
+
+ if (fs.exists(targetDirectory)) {
+ log.info(String.format("found target directory '%s", targetDirectory));
+ fs.delete(targetDirectory, true);
+ log.info(String.format("deleted target directory '%s", targetDirectory));
+ }
+
+ log.info(String.format("transforming actions from '%s' to '%s'", sourcePath, targetDirectory));
+
+ sc.sequenceFile(sourcePath, Text.class, Text.class)
+ .mapToPair(a -> new Tuple2<>(a._1(), AtomicAction.fromJSON(a._2().toString())))
+ .mapToPair(a -> new Tuple2<>(a._1(), transformAction(a._2())))
+
+ .saveAsHadoopFile(targetDirectory.toString(), Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
+ }
+ }
+ }
+
+ private Text transformAction(AtomicAction aa) throws InvalidProtocolBufferException, JsonProcessingException {
+
+ final ObjectMapper mapper = new ObjectMapper();
+ if (aa.getTargetValue() != null && aa.getTargetValue().length > 0) {
+ Oaf oaf = ProtoConverter.convert(OafProtos.Oaf.parseFrom(aa.getTargetValue()));
+ aa.setTargetValue(mapper.writeValueAsString(oaf).getBytes());
+ }
+
+ return new Text(mapper.writeValueAsString(aa));
+ }
+
+ private String getTargetBaseDir(String isLookupUrl) throws ISLookUpException {
+ ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl);
+ String XQUERY = "collection('/db/DRIVER/ServiceResources/ActionManagerServiceResourceType')//SERVICE_PROPERTIES/PROPERTY[@key = 'basePath']/@value/string()";
+ return isLookUp.getResourceProfileByQuery(XQUERY);
+ }
+
+ private static SparkSession getSparkSession(ArgumentApplicationParser parser) {
+ SparkConf conf = new SparkConf();
+
+ return SparkSession
+ .builder()
+ .appName(TransformActions.class.getSimpleName())
+ .master(parser.get("master"))
+ .config(conf)
+ .enableHiveSupport()
+ .getOrCreate();
+ }
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_actionsets_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_actionsets_parameters.json
new file mode 100644
index 000000000..c4910ec61
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_actionsets_parameters.json
@@ -0,0 +1,10 @@
+[
+ {"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true},
+ {"paramName":"sn", "paramLongName":"sourceNameNode", "paramDescription": "nameNode of the source cluster", "paramRequired": true},
+ {"paramName":"tn", "paramLongName":"targetNameNode", "paramDescription": "namoNode of the target cluster", "paramRequired": true},
+ {"paramName":"w", "paramLongName":"workingDirectory", "paramDescription": "working directory", "paramRequired": true},
+ {"paramName":"nm", "paramLongName":"distcp_num_maps", "paramDescription": "maximum number of map tasks used in the distcp process", "paramRequired": true},
+ {"paramName":"mm", "paramLongName":"distcp_memory_mb", "paramDescription": "memory for distcp action copying actionsets from remote cluster", "paramRequired": true},
+ {"paramName":"tt", "paramLongName":"distcp_task_timeout", "paramDescription": "timeout for distcp copying actions from remote cluster", "paramRequired": true},
+ {"paramName":"tr", "paramLongName":"transform_only", "paramDescription": "activate tranform-only mode. Only apply transformation step", "paramRequired": true}
+]
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/transform_actionsets_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/transform_actionsets_parameters.json
new file mode 100644
index 000000000..ce72f53ca
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/transform_actionsets_parameters.json
@@ -0,0 +1,5 @@
+[
+ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
+ {"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true},
+ {"paramName":"i", "paramLongName":"inputPaths", "paramDescription": "URL of the isLookUp Service", "paramRequired": true}
+]
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/actions/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/actions/oozie_app/config-default.xml
new file mode 100644
index 000000000..9637ebdc6
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/actions/oozie_app/config-default.xml
@@ -0,0 +1,30 @@
+
+
+ jobTracker
+ yarnRM
+
+
+ nameNode
+ hdfs://nameservice1
+
+
+ sourceNN
+ webhdfs://namenode2.hadoop.dm.openaire.eu:50071
+
+
+ oozie.use.system.libpath
+ true
+
+
+ oozie.action.sharelib.for.spark
+ spark2
+
+
+ spark2YarnHistoryServerAddress
+ http://iis-cdh5-test-gw.ocean.icm.edu.pl:18088
+
+
+ spark2EventLogDir
+ /user/spark/applicationHistory
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/actions/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/actions/oozie_app/workflow.xml
new file mode 100644
index 000000000..ec2861a0e
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/actions/oozie_app/workflow.xml
@@ -0,0 +1,111 @@
+
+
+
+ sourceNN
+ the source name node
+
+
+ isLookupUrl
+ the isLookup service endpoint
+
+
+ workingDirectory
+ /tmp/actionsets
+ working directory
+
+
+ distcp_memory_mb
+ 6144
+ memory for distcp copying actionsets from remote cluster
+
+
+ distcp_task_timeout
+ 60000000
+ timeout for distcp copying actions from remote cluster
+
+
+ distcp_num_maps
+ 1
+ mmaximum number of map tasks used in the distcp process
+
+
+ transform_only
+ activate tranform-only mode. Only apply transformation step
+
+
+ sparkDriverMemory
+ memory for driver process
+
+
+ sparkExecutorMemory
+ memory for individual executor
+
+
+ sparkExecutorCores
+ number of cores used by single executor
+
+
+ spark2YarnHistoryServerAddress
+ spark 2.* yarn history server address
+
+
+ spark2EventLogDir
+ spark 2.* event log dir location
+
+
+
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+ eu.dnetlib.dhp.migration.actions.MigrateActionSet
+ -Dmapred.task.timeout=${distcp_task_timeout}
+ -is${isLookupUrl}
+ -sn${sourceNN}
+ -tn${nameNode}
+ -w${workingDirectory}
+ -nm${distcp_num_maps}
+ -mm${distcp_memory_mb}
+ -tt${distcp_task_timeout}
+ -tr${transform_only}
+
+
+
+
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+ yarn
+ cluster
+ transform_actions
+ eu.dnetlib.dhp.migration.actions.TransformActions
+ dhp-aggregation-${projectVersion}.jar
+
+ --executor-cores ${sparkExecutorCores}
+ --executor-memory ${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
+ --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ -mtyarn
+ -is${isLookupUrl}
+ --inputPaths${wf:actionData('migrate_actionsets')['target_paths']}
+
+
+
+
+
+
+ migrate_actions failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 74003a407..0310a3f44 100644
--- a/pom.xml
+++ b/pom.xml
@@ -110,6 +110,12 @@
${dhp.hadoop.version}
provided
+
+ org.apache.hadoop
+ hadoop-distcp
+ ${dhp.hadoop.version}
+ provided
+
org.apache.spark
spark-core_2.11
@@ -262,6 +268,16 @@
provided
+
+ eu.dnetlib
+ dnet-actionmanager-common
+ 6.0.5
+
+
+ eu.dnetlib
+ dnet-openaire-data-protos
+ 3.9.8-proto250
+
eu.dnetlib
dnet-pace-core