From 9d33f0e6d765580c65c6afc6c1e4ff05d6e9a574 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 15 Jul 2020 10:26:11 +0200 Subject: [PATCH] first implementation for the persistence of project and community relations to result having doi --- .../actionmanager/remapping/ASResultInfo.java | 47 +++++ .../actionmanager/remapping/ActionSet.java | 42 +++++ .../remapping/InferenceInfo.java | 43 +++++ .../actionmanager/remapping/PrepareInfo.java | 147 +++++++++++++++ .../remapping/QueryInformationSystem.java | 39 ++++ .../remapping/RelationMerges.java | 25 +++ .../actionmanager/remapping/ResultPid.java | 24 +++ .../remapping/SparkExpandResultInfo.java | 90 +++++++++ .../remapping/SparkPrepareInfo.java | 71 +++++++ .../remapping/SparkRedistributeIISResult.java | 176 ++++++++++++++++++ .../remapping/SparkSelectResults.java | 77 ++++++++ .../dhp/actionmanager/remapping/common.java | 27 +++ .../remapping/input_prepare_parameters.json | 0 .../remapping/PrepareInfoTest.java | 4 + .../actionmanager/remapping/relation/relation | 0 15 files changed, 812 insertions(+) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/ASResultInfo.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/ActionSet.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/InferenceInfo.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/PrepareInfo.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/QueryInformationSystem.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/RelationMerges.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/ResultPid.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/SparkExpandResultInfo.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/SparkPrepareInfo.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/SparkRedistributeIISResult.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/SparkSelectResults.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/common.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/remapping/input_prepare_parameters.json create mode 100644 dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/remapping/PrepareInfoTest.java create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/remapping/relation/relation diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/ASResultInfo.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/ASResultInfo.java new file mode 100644 index 0000000000..7c0d924fc1 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/ASResultInfo.java @@ -0,0 +1,47 @@ +package eu.dnetlib.dhp.actionmanager.remapping; + +import java.io.Serializable; +import java.util.List; +import java.util.stream.Collectors; + +public class ASResultInfo implements Serializable { + private String id; + private String type; //result or relation + private List value; //the community or the project + + public String getId() { + return id; + } + + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public List getValue() { + return value; + } + + public void setValue(List value) { + this.value = value; + } + + public void setId(String id) { + this.id = id; + } + + + public static ASResultInfo copy(ASResultInfo as){ + ASResultInfo ar = new ASResultInfo(); + ar.id = as.id; + ar.type = as.type; + ar.value = as.value.stream().map(InferenceInfo::copy).collect(Collectors.toList()); + return ar; + } + + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/ActionSet.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/ActionSet.java new file mode 100644 index 0000000000..cbc176fb74 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/ActionSet.java @@ -0,0 +1,42 @@ +package eu.dnetlib.dhp.actionmanager.remapping; + +import java.io.Serializable; + +public class ActionSet implements Serializable { + private String name; + private String rawset; + private String directory; + + public String getDirectory() { + return directory; + } + + public void setDirectory(String directory) { + this.directory = directory; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getRawset() { + return rawset; + } + + public void setRawset(String rawset) { + this.rawset = rawset; + } + + public static ActionSet newInstance(String name, String rawset, String directory){ + ActionSet as = new ActionSet(); + as.name = name; + as.rawset = rawset; + as.directory = directory; + return as; + + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/InferenceInfo.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/InferenceInfo.java new file mode 100644 index 0000000000..6451b75d7a --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/InferenceInfo.java @@ -0,0 +1,43 @@ +package eu.dnetlib.dhp.actionmanager.remapping; + +import java.io.Serializable; + +public class InferenceInfo implements Serializable { + private String value; + private String trust; + private String inference_provenance; + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + + public String getTrust() { + return trust; + } + + public void setTrust(String trust) { + this.trust = trust; + } + + public String getInference_provenance() { + return inference_provenance; + } + + public void setInference_provenance(String inference_provenance) { + this.inference_provenance = inference_provenance; + } + + public static InferenceInfo copy(InferenceInfo ii){ + InferenceInfo iinfo = new InferenceInfo(); + iinfo.value = ii.value; + iinfo.trust = ii.trust; + iinfo.inference_provenance = ii.inference_provenance; + return iinfo; + } + + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/PrepareInfo.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/PrepareInfo.java new file mode 100644 index 0000000000..231ba21713 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/PrepareInfo.java @@ -0,0 +1,147 @@ +package eu.dnetlib.dhp.actionmanager.remapping; + +import com.google.gson.Gson; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.Result; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +public class PrepareInfo implements Serializable { + private String outputPath; + private Boolean isSparkSessionManaged; + private String asInputPath; + private String inputGraphPath; + private List actionSetActive; + private List actionSetList; + + public PrepareInfo(Boolean isSparkSessionManaged, + String outputPath, + String asInputPath, + String inputGraphPath, + List actionSetActive, + List actionSetList) { + this.isSparkSessionManaged = isSparkSessionManaged; + this.outputPath = outputPath; + this.inputGraphPath = inputGraphPath; + this.asInputPath = asInputPath; + this.actionSetActive = actionSetActive; + this.actionSetList = actionSetList; + } + + public void run(){ + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + common.removeOutputDir(spark, outputPath); + exec(spark, asInputPath, inputGraphPath, outputPath, + actionSetList.stream().map(as -> { + if(actionSetActive.contains(as.getName())){ + return as; + } + return null; + }).filter(Objects::nonNull) + .collect(Collectors.toList())); + }); + } + + private static void exec(SparkSession spark, String asInputPath, String graphInputPath, String outputPath, + List actionSets){ + + + + Dataset relation = common.readPath(spark, graphInputPath + "/relation", Relation.class); + + + actionSets.forEach(as -> resolveActionSet(spark, asInputPath + "/" + as.getDirectory() + "/" + as.getRawset() , + outputPath )); + + relation.createOrReplaceTempView("relation"); + + spark.sql("SELECT source dedupId, collect_set(target) as merges " + + "FROM relation " + + "WHERE relclass = 'merges' " + + "GROUP BY source") + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "/relation/"); + + } + + private static void resolveActionSet(SparkSession spark, String asInputPath, String outputPath){ + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + spark + .createDataset(sc.sequenceFile(asInputPath, Text.class, Text.class) + .map(a -> common.OBJECT_MAPPER.readValue(a._2().toString(),AtomicAction.class)) + .map(aa -> getAsResultInfo(aa)).filter(Objects::nonNull) + .rdd(), Encoders.bean(ASResultInfo.class)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Append) + .json(outputPath + "/actionset/"); + + } + + private static ASResultInfo getAsResultInfo(AtomicAction aa) { + ASResultInfo ri = new ASResultInfo(); + if(ModelSupport.resultRelationType.get(aa.getClazz()).equals("result")){ + + Result result = (Result) aa.getPayload(); + ri.setId(result.getId()); + ri.setType("result"); + List inferenceInfoList = new ArrayList<>(); + + result.getContext().forEach(c -> { + String id = c.getId(); + c.getDataInfo().forEach(di -> { + InferenceInfo ii = new InferenceInfo(); + ii.setValue(id); + ii.setTrust(di.getTrust()); + ii.setInference_provenance(di.getInferenceprovenance()); + inferenceInfoList.add(ii); + }); + }); + + ri.setValue(inferenceInfoList); + + }else{ + Relation rel = (Relation)aa.getPayload(); + if(rel.getSource().startsWith("50|")){ + ri.setId(rel.getSource()); + ri.setType("relation"); + InferenceInfo ii = new InferenceInfo(); + ii.setInference_provenance(rel.getDataInfo().getInferenceprovenance()); + ii.setTrust(rel.getDataInfo().getTrust()); + ii.setValue(rel.getTarget()); + ri.setValue(Arrays.asList(ii)); + }else{ + return null; + } + } + return ri; + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/QueryInformationSystem.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/QueryInformationSystem.java new file mode 100644 index 0000000000..f258206b7d --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/QueryInformationSystem.java @@ -0,0 +1,39 @@ +package eu.dnetlib.dhp.actionmanager.remapping; + +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import java.io.Serializable; +import java.util.List; +import java.util.stream.Collectors; + +public class QueryInformationSystem implements Serializable { + + private static final String ACTION_MANAGER_PATH_QUERY = "for $x in " + + "collection(' /db/DRIVER/ServiceResources/ActionManagerServiceResourceType') " + + "return data($x//PROPERTY[./@key='basePath']/@value)"; + + private static final String ACTION_SET_QUERY = "for $x in " + + "collection('/db/DRIVER/ActionManagerSetDSResources/ActionManagerSetDSResourceType') " + + "return concat (data($x//SET/@id),'@@',data($x//LATEST/@id),'@@',data($x//SET/@directory))"; + + public static String getActionManagerPath(final String isLookupUrl) + throws ISLookUpException { + ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl); + return isLookUp.quickSearchProfile(ACTION_MANAGER_PATH_QUERY).get(0); + + } + + public static List getActionSet (final String isLookupUrl) + throws ISLookUpException { + ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl); + return isLookUp.quickSearchProfile(ACTION_SET_QUERY) + .stream() + .map(res -> { + String[] tmp = res.split("@@"); + return ActionSet.newInstance(tmp[0], tmp[1], tmp[2]); + }).collect(Collectors.toList()); + } + + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/RelationMerges.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/RelationMerges.java new file mode 100644 index 0000000000..ef2a8bac86 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/RelationMerges.java @@ -0,0 +1,25 @@ +package eu.dnetlib.dhp.actionmanager.remapping; + +import java.io.Serializable; +import java.util.List; + +public class RelationMerges implements Serializable { + private String dedupId; + private List merges ; + + public String getDedupId() { + return dedupId; + } + + public void setDedupId(String dedupId) { + this.dedupId = dedupId; + } + + public List getMerges() { + return merges; + } + + public void setMerges(List merges) { + this.merges = merges; + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/ResultPid.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/ResultPid.java new file mode 100644 index 0000000000..7fa4d570ef --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/ResultPid.java @@ -0,0 +1,24 @@ +package eu.dnetlib.dhp.actionmanager.remapping; + +import java.io.Serializable; + +public class ResultPid implements Serializable { + private String resultId; + private String doi; + + public String getResultId() { + return resultId; + } + + public void setResultId(String resultId) { + this.resultId = resultId; + } + + public String getDoi() { + return doi; + } + + public void setDoi(String doi) { + this.doi = doi; + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/SparkExpandResultInfo.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/SparkExpandResultInfo.java new file mode 100644 index 0000000000..3861bff41a --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/SparkExpandResultInfo.java @@ -0,0 +1,90 @@ +package eu.dnetlib.dhp.actionmanager.remapping; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.io.Serializable; +import java.util.*; +import java.util.stream.Collectors; + + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +public class SparkExpandResultInfo implements Serializable { + private static final Logger log = LoggerFactory.getLogger(SparkExpandResultInfo.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkExpandResultInfo.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/remapping/input_expand_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String relationInputPath = parser.get("relationInputPath"); + + final String asInputPath = parser.get("asInputPath"); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + common.removeOutputDir(spark, outputPath); + exec(spark, relationInputPath, asInputPath, outputPath); + }); + } + + private static void exec(SparkSession spark, String relationInputPath, String asInputPath, String outputPath){ + Dataset rel = common.readPath(spark, relationInputPath, RelationMerges.class); + + Dataset asResultInfo = common.readPath(spark, asInputPath, ASResultInfo.class); + + asResultInfo.joinWith(rel, asResultInfo.col("id").equalTo(rel.col("dedupId")), "left") + .flatMap((FlatMapFunction, ASResultInfo>) value -> { + ASResultInfo ri = value._1(); + if(Objects.isNull(value._2())){ + return Arrays.asList(ri).iterator(); + } + + return value._2().getMerges().stream() + .map(res -> { + ASResultInfo copy = ASResultInfo.copy(ri); + copy.setId(res); + return copy; + }).collect(Collectors.toList()).iterator(); + + }, Encoders.bean(ASResultInfo.class) + ) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + + } + + + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/SparkPrepareInfo.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/SparkPrepareInfo.java new file mode 100644 index 0000000000..7bc306f80c --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/SparkPrepareInfo.java @@ -0,0 +1,71 @@ +package eu.dnetlib.dhp.actionmanager.remapping; + +import com.google.gson.Gson; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Relation; + +import eu.dnetlib.dhp.schema.oaf.Result; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.spark.SparkConf; + +import org.apache.spark.api.java.JavaSparkContext; + +import org.apache.spark.sql.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.*; +import java.util.stream.Collectors; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +public class SparkPrepareInfo implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(SparkPrepareInfo.class); + + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkPrepareInfo.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/remapping/input_prepare_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String isLookUpUrl = parser.get("isLookUpUrl"); + log.info("isLookUpUrl: {}", isLookUpUrl); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String inputPath = parser.get("inputPath"); + + final List actionSetActive = new Gson().fromJson(parser.get("actionSets"), List.class); + + final String asInputPath = QueryInformationSystem.getActionManagerPath(isLookUpUrl); + + final List actionSetList = QueryInformationSystem.getActionSet(isLookUpUrl); + + PrepareInfo prepareInfo = new PrepareInfo(isSparkSessionManaged, outputPath, asInputPath, inputPath, actionSetActive, actionSetList); + + prepareInfo.run(); + } + + + + + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/SparkRedistributeIISResult.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/SparkRedistributeIISResult.java new file mode 100644 index 0000000000..66f2eed49c --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/SparkRedistributeIISResult.java @@ -0,0 +1,176 @@ +package eu.dnetlib.dhp.actionmanager.remapping; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.io.Serializable; +import java.util.*; +import java.util.stream.Collectors; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +public class SparkRedistributeIISResult implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(SparkRedistributeIISResult.class); + + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkRedistributeIISResult.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/remapping/input_redistribute_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String outputPathResult = parser.get("outputPathResult"); + log.info("outputPathResult: {}", outputPathResult); + + final String outputPathRelation = parser.get("outputPathRelation"); + log.info("outputPathRelation: {}", outputPathRelation); + + final String inputPath = parser.get("inputPath"); + + final String asInputPath = parser.get("asInputPath"); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + common.removeOutputDir(spark, outputPathResult); + Dataset resultPidDataset = common.readPath(spark, inputPath + "/publication", ResultPid.class) + .union(common.readPath(spark, inputPath + "/dataset", ResultPid.class)) + .union(common.readPath(spark, inputPath +"/software" , ResultPid.class)) + .union(common.readPath(spark, inputPath +"/otherresearchproduct", ResultPid.class)); + Dataset asResultInfoDataset = common.readPath(spark, asInputPath, ASResultInfo.class); + execResult(spark, asResultInfoDataset.filter("type = 'result'"), resultPidDataset, outputPathResult); + execRelation(spark, asResultInfoDataset.filter("type = 'relation'"), resultPidDataset, outputPathRelation); + }); + } + + private static void execRelation(SparkSession spark, Dataset asResultInfoDataset, + Dataset resultPidDataset, String outputPathRelation) { + resultPidDataset.joinWith(asResultInfoDataset, resultPidDataset.col("resultId").equalTo(asResultInfoDataset.col("id")), "left") + .flatMap((FlatMapFunction, Relation>) value -> { + List relationList = new ArrayList<>(); + if(Objects.nonNull(value._2())){ + relationList.add(getRelation(value._2(), "result")); + relationList.add(getRelation(value._2(), "project")); + + } + return relationList.iterator(); + }, Encoders.bean(Relation.class)) + .filter(Objects::nonNull) + .toJavaRDD() + .map(p -> new AtomicAction(Relation.class, p)) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(common.OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile(outputPathRelation, Text.class, Text.class, SequenceFileOutputFormat.class); + + } + + private static Relation getRelation(ASResultInfo asResultInfo, String type) { + Relation r = new Relation(); + if(type.equals("result")){ + r.setSource(asResultInfo.getId()); + r.setRelClass("isProducedBy"); + r.setTarget(asResultInfo.getValue().get(0).getValue()); + }else{ + r.setRelClass("produces"); + r.setSource(asResultInfo.getValue().get(0).getValue()); + r.setTarget(asResultInfo.getId()); + } + + + r.setRelType("resultProject"); + r.setSubRelType("outcome"); + + r.setDataInfo(getDataInfo(asResultInfo)); + return r; + } + + private static DataInfo getDataInfo(ASResultInfo asResultInfo) { + DataInfo di = new DataInfo(); + di.setInvisible(false) ; + di.setInvisible(true); + di.setDeletedbyinference(false); + di.setTrust(asResultInfo.getValue().get(0).getTrust()); + di.setInferenceprovenance(asResultInfo.getValue().get(0).getInference_provenance()); + Qualifier pAction = new Qualifier(); + pAction.setClassid("iis"); + pAction.setClassname("iss"); + pAction.setSchemename("dnet:provenanceActions"); + pAction.setSchemeid("dnet:provenanceActions"); + di.setProvenanceaction(pAction); + return di; + } + + private static void execResult(SparkSession spark, Dataset asResultInfoDataset, Dataset resultPidDataset, String outputPath){ + + resultPidDataset.joinWith(asResultInfoDataset, resultPidDataset.col("resultId").equalTo(asResultInfoDataset.col("id")), "left") + .map(value -> { + if(Objects.isNull(value._2())){ + return null; + } + Result r = new Result(); + ASResultInfo asResultInfo = value._2(); + r.setId(asResultInfo.getId()); + StructuredProperty pid = new StructuredProperty(); + pid.setValue(value._1().getDoi()); + Qualifier qualifier = new Qualifier(); + qualifier.setClassid("doi"); + qualifier.setClassname("doi"); + qualifier.setSchemeid("dnet:pid"); + qualifier.setSchemename("dnet:pid"); + pid.setQualifier(qualifier); + r.setContext(asResultInfo.getValue().stream().map(v -> { + Context c = new Context(); + c.setId(v.getValue()); + DataInfo dataInfo = new DataInfo(); + dataInfo.setTrust(v.getTrust()); + dataInfo.setInferenceprovenance(v.getInference_provenance()); + dataInfo.setDeletedbyinference(false); + dataInfo.setInferred(true); + dataInfo.setInvisible(false); + Qualifier pAction = new Qualifier(); + pAction.setClassid("iis"); + pAction.setClassname("iis"); + pAction.setSchemeid("dnet:provenanceActions"); + pAction.setSchemename("dnet:provenanceActions"); + dataInfo.setProvenanceaction(pAction); + c.setDataInfo(Arrays.asList(dataInfo)); + return c; + }).collect(Collectors.toList())); + return r; + }, Encoders.bean(Result.class)) + .filter(Objects::nonNull) + .toJavaRDD() + .map(p -> new AtomicAction(Result.class, p)) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(common.OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); + + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/SparkSelectResults.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/SparkSelectResults.java new file mode 100644 index 0000000000..914a01fe70 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/SparkSelectResults.java @@ -0,0 +1,77 @@ +package eu.dnetlib.dhp.actionmanager.remapping; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.Result; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Optional; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +public class SparkSelectResults implements Serializable { + private static final Logger log = LoggerFactory.getLogger(SparkSelectResults.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkSelectResults.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/remapping/input_select_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String inputPath = parser.get("inputPath"); + + final String resultClassName = parser.get("resultTableName"); + log.info("resultTableName: {}", resultClassName); + + Class resultClazz = (Class) Class.forName(resultClassName); + + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + common.removeOutputDir(spark, outputPath); + exec(spark, inputPath, outputPath, resultClazz); + }); + } + + private static void exec(SparkSession spark, String inputPath, String outputPath, Class resultClazz ){ + Dataset result = common.readPath(spark, inputPath, resultClazz); + + result.createOrReplaceTempView("result"); + + spark.sql("SELECT id resultId, persId.value doi " + + "from result " + + "lateral view explode(pid) p as persId " + + "lateral view explode(collectedfrom) c as cf " + + "where persId.qualifier.classid = 'doi' " + + "and (cf.key = '10|openaire____::9e3be59865b2c1c335d32dae2fe7b254' or " + + "cf.key = '10|openaire____::081b82f96300b6a6e3d282bad31cb6e2') " + + "and result.id not like '50|dedup%' ") + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/common.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/common.java new file mode 100644 index 0000000000..e3c261a5e7 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/remapping/common.java @@ -0,0 +1,27 @@ +package eu.dnetlib.dhp.actionmanager.remapping; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.common.HdfsSupport; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; + +import java.io.Serializable; + +public class common implements Serializable { + + public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static Dataset readPath( + SparkSession spark, String inputPath, Class clazz) { + return spark + .read() + .textFile(inputPath) + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); + } + + public static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/remapping/input_prepare_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/remapping/input_prepare_parameters.json new file mode 100644 index 0000000000..e69de29bb2 diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/remapping/PrepareInfoTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/remapping/PrepareInfoTest.java new file mode 100644 index 0000000000..ac4b59e755 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/remapping/PrepareInfoTest.java @@ -0,0 +1,4 @@ +package eu.dnetlib.dhp.actionmanager.remapping; + +public class PrepareInfoTest { +} diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/remapping/relation/relation b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/remapping/relation/relation new file mode 100644 index 0000000000..e69de29bb2