diff --git a/dhp-workflows/dhp-bulktag/src/main/resources/eu/dnetlib/dhp/input_bulkTag_parameters.json b/dhp-workflows/dhp-bulktag/src/main/resources/eu/dnetlib/dhp/input_bulkTag_parameters.json new file mode 100644 index 0000000000..3221924bf3 --- /dev/null +++ b/dhp-workflows/dhp-bulktag/src/main/resources/eu/dnetlib/dhp/input_bulkTag_parameters.json @@ -0,0 +1,27 @@ +[ + { + "paramName":"is", + "paramLongName":"isLookupUrl", + "paramDescription": "URL of the isLookUp Service", + "paramRequired": true + }, + { + "paramName":"mt", + "paramLongName":"master", + "paramDescription": "should be local or yarn", + "paramRequired": true + }, + { + "paramName":"s", + "paramLongName":"sourcePath", + "paramDescription": "the path of the sequencial file to read", + "paramRequired": true + }, + { + "paramName": "pm", + "paramLongName":"protoMap", + "paramDescription": "the json path associated to each selection field", + "paramRequired": true + } + +] \ No newline at end of file diff --git a/dhp-workflows/dhp-bulktag/src/main/resources/eu/dnetlib/dhp/oozie_app/config-default.xml b/dhp-workflows/dhp-bulktag/src/main/resources/eu/dnetlib/dhp/oozie_app/config-default.xml new file mode 100644 index 0000000000..ea3a4d9223 --- /dev/null +++ b/dhp-workflows/dhp-bulktag/src/main/resources/eu/dnetlib/dhp/oozie_app/config-default.xml @@ -0,0 +1,22 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hive_metastore_uris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-bulktag/src/main/resources/eu/dnetlib/dhp/oozie_app/workflow.xml b/dhp-workflows/dhp-bulktag/src/main/resources/eu/dnetlib/dhp/oozie_app/workflow.xml new file mode 100644 index 0000000000..1866bb0a01 --- /dev/null +++ b/dhp-workflows/dhp-bulktag/src/main/resources/eu/dnetlib/dhp/oozie_app/workflow.xml @@ -0,0 +1,61 @@ + + + + sourcePath + the source path + + + allowedsemrels + the semantic relationships allowed for propagation + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + isLookupUrl + the isLookup service endpoint + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + ResultToCommunitySemRelPropagation + eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob + dhp-propagation-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" + --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" + + -mt yarn-cluster + --sourcePath${sourcePath} + + --hive_metastore_uris${hive_metastore_uris} + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/Author.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/Author.java new file mode 100644 index 0000000000..18332bc8f1 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/Author.java @@ -0,0 +1,4 @@ +package eu.dnetlib.dhp.orcidtoresultfromsemrel; + +public class Author { +} diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/AutoritativeAuthor.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/AutoritativeAuthor.java new file mode 100644 index 0000000000..7e496c7cf8 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/AutoritativeAuthor.java @@ -0,0 +1,4 @@ +package eu.dnetlib.dhp.orcidtoresultfromsemrel; + +public class AutoritativeAuthor { +} diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/ResultWithOrcid.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/ResultWithOrcid.java new file mode 100644 index 0000000000..49fbea567f --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/ResultWithOrcid.java @@ -0,0 +1,4 @@ +package eu.dnetlib.dhp.orcidtoresultfromsemrel; + +public class ResultWithOrcid { +} diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob2.java new file mode 100644 index 0000000000..73b8895e1d --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob2.java @@ -0,0 +1,317 @@ +package eu.dnetlib.dhp.orcidtoresultfromsemrel; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.TypedRow; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.SparkSession; +import scala.Tuple2; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static eu.dnetlib.dhp.PropagationConstant.*; + +public class SparkOrcidToResultFromSemRelJob { + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkOrcidToResultFromSemRelJob.class.getResourceAsStream("/eu/dnetlib/dhp/orcidtoresultfromremrel/input_orcidtoresult_parameters.json"))); + parser.parseArgument(args); + final SparkSession spark = SparkSession + .builder() + .appName(SparkOrcidToResultFromSemRelJob.class.getSimpleName()) + .master(parser.get("master")) + .enableHiveSupport() + .getOrCreate(); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final String inputPath = parser.get("sourcePath"); + final String outputPath = "/tmp/provision/propagation/orcidtoresult"; + + final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); + boolean writeUpdate = TRUE.equals(parser.get("writeUpdate")); + boolean saveGraph = TRUE.equals(parser.get("saveGraph")); + + createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); + + JavaRDD relations = sc.textFile(inputPath + "/relation") + .map(item -> new ObjectMapper().readValue(item, Relation.class)).cache(); + + JavaPairRDD result_result = getResultResultSemRel(allowedsemrel, relations); + + JavaRDD publications = sc.textFile(inputPath + "/publication") + .map(item -> new ObjectMapper().readValue(item, Publication.class)); + JavaRDD datasets = sc.textFile(inputPath + "/dataset") + .map(item -> new ObjectMapper().readValue(item, Dataset.class)); + JavaRDD software = sc.textFile(inputPath + "/software") + .map(item -> new ObjectMapper().readValue(item, Software.class)); + JavaRDD other = sc.textFile(inputPath + "/otherresearchproduct") + .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class)); + + //get the results having at least one author pid we are interested in + JavaPairRDD resultswithorcid = publications.map(p -> getTypedRow(p)) + .filter(p -> !(p == null)) + .mapToPair(toPair()) + .union(datasets.map(p -> getTypedRow(p)) + .filter(p -> !(p == null)) + .mapToPair(toPair())) + .union(software.map(p -> getTypedRow(p)) + .filter(p -> !(p == null)) + .mapToPair(toPair())) + .union(other.map(p -> getTypedRow(p)) + .filter(p -> !(p == null)) + .mapToPair(toPair())); + + + JavaPairRDD to_add_orcid_to_result = resultswithorcid.join(result_result) + .map(p -> p._2()._1().setSourceId(p._2()._2().getTargetId())) //associate the pid of the result (target) which should get the orcid to the typed row containing the authors with the orcid from the result(source) + .mapToPair(toPair()); + + JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p)); + + if(writeUpdate){ + writeResult(pubs, to_add_orcid_to_result, outputPath, "publication"); + writeResult(dss, to_add_orcid_to_result, outputPath, "dataset"); + writeResult(sfw, to_add_orcid_to_result, outputPath, "software"); + writeResult(orp, to_add_orcid_to_result, outputPath, "otherresearchproduct"); + } + + if (saveGraph){ + updateResult(pubs, to_add_orcid_to_result, outputPath, "publication"); + updateResult(dss, to_add_orcid_to_result, outputPath, "dataset"); + updateResult(sfw, to_add_orcid_to_result, outputPath, "software"); + updateResult(orp, to_add_orcid_to_result, outputPath, "otherresearchproduct"); + } + + + } + + + private static Author enrichAutor(Author autoritative_author, Author author) { + boolean toaddpid = false; + + if (StringUtils.isNoneEmpty(autoritative_author.getSurname())) { + if (StringUtils.isNoneEmpty(author.getSurname())) { + if (autoritative_author.getSurname().trim().equalsIgnoreCase(author.getSurname().trim())) { + + //have the same surname. Check the name + if (StringUtils.isNoneEmpty(autoritative_author.getName())) { + if (StringUtils.isNoneEmpty(author.getName())) { + if (autoritative_author.getName().trim().equalsIgnoreCase(author.getName().trim())) { + toaddpid = true; + } + //they could be differently written (i.e. only the initials of the name in one of the two + if (autoritative_author.getName().trim().substring(0, 0).equalsIgnoreCase(author.getName().trim().substring(0, 0))) { + toaddpid = true; + } + } + } + } + } + } + if (toaddpid){ + StructuredProperty pid = new StructuredProperty(); + for(StructuredProperty sp : autoritative_author.getPid()){ + if (PROPAGATION_AUTHOR_PID.equals(sp.getQualifier().getClassid())){ + pid.setValue(sp.getValue()); + pid.setQualifier(getQualifier(sp.getQualifier().getClassid(),sp.getQualifier().getClassname() )); + pid.setDataInfo(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID, PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME)); + if(author.getPid() == null){ + author.setPid(Arrays.asList(pid)); + }else{ + author.getPid().add(pid); + } + } + } + return author; + } + return null; + } + + + private static List enrichAutors(List autoritative_authors, List to_enrich_authors, boolean filter){ +// List autoritative_authors = p._2()._2().get().getAuthors(); +// List to_enrich_authors = r.getAuthor(); + + return to_enrich_authors + .stream() + .map(a -> { + if (filter) { + if (containsAllowedPid(a)) { + return a; + } + } + + List lst = autoritative_authors.stream() + .map(aa -> enrichAutor(aa, a)).filter(au -> !(au == null)).collect(Collectors.toList()); + if (lst.size() == 0) { + return a; + } + return lst.get(0);//Each author can be enriched at most once. It cannot be the same as many different people + + }).collect(Collectors.toList()); + } + + private static void writeResult(JavaPairRDD results, JavaPairRDD toupdateresult, + String outputPath, String type) { + + results.join(toupdateresult) + .map(p -> { + Result r = p._2()._1(); + + List autoritative_authors = p._2()._2().getAuthors(); + List to_enrich_authors = r.getAuthor(); + + r.setAuthor(enrichAutors(autoritative_authors, to_enrich_authors, false)); +// .stream() +// .map(a -> { +// if(filter) { +// if (containsAllowedPid(a)) { +// return a; +// } +// } +// +// List lst = autoritative_authors.stream() +// .map(aa -> enrichAutor(aa, a)).filter(au -> !(au == null)).collect(Collectors.toList()); +// if(lst.size() == 0){ +// return a; +// } +// return lst.get(0);//Each author can be enriched at most once. It cannot be the same as many different people +// +// }).collect(Collectors.toList())); + + return r; + }) + .map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath + "/" + type + "_update"); + } + + + private static void updateResult(JavaPairRDD results, JavaPairRDD toupdateresult, + String outputPath, String type) { + results.leftOuterJoin(toupdateresult) + .map(p -> { + Result r = p._2()._1(); + if (p._2()._2().isPresent()){ + List autoritative_authors = p._2()._2().get().getAuthors(); + List to_enrich_authors = r.getAuthor(); + + r.setAuthor(enrichAutors(autoritative_authors, to_enrich_authors, true)); +// .stream() +// .map(a -> { +// if(filter) { +// if (containsAllowedPid(a)) { +// return a; +// } +// } +// +// List lst = autoritative_authors.stream() +// .map(aa -> enrichAutor(aa, a)).filter(au -> !(au == null)).collect(Collectors.toList()); +// if(lst.size() == 0){ +// return a; +// } +// return lst.get(0);//Each author can be enriched at most once. It cannot be the same as many different people +// +// }).collect(Collectors.toList())); + } + return r; + }) + .map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath+"/"+type); + } + + private static TypedRow getTypedRow(Result p) { + TypedRow tp = new TypedRow(); + tp.setSourceId(p.getId()); + List authorList = p.getAuthor() + .stream() + .map(a -> { + if (a.getPid().stream().map(pid -> { + if (PROPAGATION_AUTHOR_PID.equals(pid.getQualifier().getClassid())) { + return a; + } + return null; + }).filter(aut -> !(aut == null)).collect(Collectors.toList()).size() > 0){ + return a; + } + return null; + }).filter(a -> !(a == null)).collect(Collectors.toList()); + tp.setAuthors(authorList); + if(authorList.size() > 0){ + return tp; + } + return null; + + + } + + private static boolean containsAllowedPid(Author a){ + + + return (a.getPid().stream().map(pid -> { + if (PROPAGATION_AUTHOR_PID.equals(pid.getQualifier().getClassid())) { + return true; + } + return false; + }).filter(aut -> (aut == true)).collect(Collectors.toList()).size()) > 0; + } + +} + + +/*private ResultProtos.Result.Metadata.Builder searchMatch(List author_list){ + ResultProtos.Result.Metadata.Builder metadataBuilder = ResultProtos.Result.Metadata.newBuilder(); + boolean updated = false; + + for (FieldTypeProtos.Author a: author_list){ + FieldTypeProtos.Author.Builder author = searchAuthor(a, autoritative_authors); + if(author != null){ + updated = true; + metadataBuilder.addAuthor(author); + }else{ + metadataBuilder.addAuthor(FieldTypeProtos.Author.newBuilder(a)); + } + } + if(updated) + return metadataBuilder; + return null; + } + private FieldTypeProtos.Author.Builder searchAuthor(FieldTypeProtos.Author a, List author_list){ + if(containsOrcid(a.getPidList())) + return null; + for(FieldTypeProtos.Author autoritative_author : author_list) { + if (equals(autoritative_author, a)) { + if(!containsOrcid(a.getPidList())) + return update(a, autoritative_author); + } + } + return null; + + } + + private boolean containsOrcid(List pidList){ + if(pidList == null) + return false; + return pidList + .stream() + .filter(kv -> kv.getKey().equals(PropagationConstants.AUTHOR_PID)) + .collect(Collectors.toList()).size() > 0; + } + */ \ No newline at end of file diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob2.java new file mode 100644 index 0000000000..563fcb3bcb --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/SparkResultToProjectThroughSemRelJob2.java @@ -0,0 +1,222 @@ +package eu.dnetlib.dhp.projecttoresult; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.TypedRow; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import scala.Tuple2; + +import java.io.File; +import java.util.*; + +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.PropagationConstant.toPair; + +public class SparkResultToProjectThroughSemRelJob { + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkResultToProjectThroughSemRelJob.class.getResourceAsStream("/eu/dnetlib/dhp/projecttoresult/input_projecttoresult_parameters.json"))); + parser.parseArgument(args); + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); + final SparkSession spark = SparkSession + .builder() + .appName(SparkResultToProjectThroughSemRelJob.class.getSimpleName()) + .master(parser.get("master")) + .config(conf) + .enableHiveSupport() + .getOrCreate(); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final String inputPath = parser.get("sourcePath"); + final String outputPath = "/tmp/provision/propagation/projecttoresult"; + + final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); + + createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); + + JavaRDD all_relations = sc.textFile(inputPath + "/relation") + .map(item -> new ObjectMapper().readValue(item, Relation.class)); + + JavaRDD relations = all_relations.filter(r -> !r.getDataInfo().getDeletedbyinference()).cache(); + + JavaRDD result_result = relations + .filter(r -> allowedsemrel.contains(r.getRelClass()) && RELATION_RESULTRESULT_REL_TYPE.equals(r.getRelType())); + + org.apache.spark.sql.Dataset resres_relation = spark.createDataset(result_result.rdd(), + Encoders.bean(Relation.class)); + + JavaRDD result_project = relations + .filter(r -> RELATION_RESULT_PROJECT_REL_CLASS.equals(r.getRelClass()) + && RELATION_RESULTPROJECT_REL_TYPE.equals(r.getRelType())); + + org.apache.spark.sql.Dataset resproj_relation = spark.createDataset(result_project.rdd(), + Encoders.bean(Relation.class)); + + resres_relation.createOrReplaceTempView("resres_relation"); + resproj_relation.createOrReplaceTempView("resproj_relation"); + + String query ="SELECT proj, collect_set(r1target) result_set " + + "FROM (" + + " SELECT r1.source as sourcer, r1.relclass as r1rel, r1.target as r1target, r2.target as proj " + + " FROM resres_relation r1 " + + " JOIN resproj_relation r2 " + + " ON r1.source = r2.source " + + " ) tmp " + + "GROUP BY proj "; + + Dataset toaddrelations = spark.sql(query); + + + JavaPairRDD project_resultlist = relations + .filter(r -> RELATION_PROJECT_RESULT_REL_CLASS.equals(r.getRelClass())) + .map(r -> { + TypedRow tp = new TypedRow(); + tp.setSourceId(r.getSource()); + tp.add(r.getTarget()); + return tp; + }).mapToPair(toPair()) + .reduceByKey((a, b) -> { + if (a == null) { + return b; + } + if (b == null) { + return a; + } + + a.addAll(b.getAccumulator()); + return a; + }).cache(); + + + JavaRDD new_relations = toaddrelations.toJavaRDD().mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1))) + .leftOuterJoin(project_resultlist) + .flatMap(c -> { + List toAddRel = new ArrayList<>(); + toAddRel.addAll(c._2()._1()); + if (c._2()._2().isPresent()) { + Set originalRels = c._2()._2().get().getAccumulator(); + for (String o : originalRels) { + if (toAddRel.contains(o)) { + toAddRel.remove(o); + } + } + } + List relationList = new ArrayList<>(); + String projId = c._1(); + for (Object r : toAddRel) { + String rId = (String) r; + relationList.add(getRelation(rId, projId, RELATION_RESULT_PROJECT_REL_CLASS, RELATION_RESULTPROJECT_REL_TYPE, + RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, + PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); + relationList.add(getRelation(projId, rId, RELATION_PROJECT_RESULT_REL_CLASS, RELATION_RESULTPROJECT_REL_TYPE, + RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, + PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); + + } + return relationList.iterator(); + }).cache(); + + toaddrelations.toJavaRDD().map(r->new ObjectMapper().writeValueAsString(r)) + .saveAsTextFile(outputPath + "/toupdaterelations"); + + new_relations.map(r-> new ObjectMapper().writeValueAsString(r)) + .saveAsTextFile(outputPath + "/new_relations" ); + + all_relations.union(new_relations) + .map(r -> new ObjectMapper().writeValueAsString(r)) + .saveAsTextFile(outputPath + "/relation"); + + + //JavaPairRDD result_result = getResultResultSemRel(allowedsemrel, relations); + +// JavaPairRDD result_project = relations +// .filter(r -> !r.getDataInfo().getDeletedbyinference()) +// .filter(r -> RELATION_RESULT_PROJECT_REL_CLASS.equals(r.getRelClass()) +// && RELATION_RESULTPROJECT_REL_TYPE.equals(r.getRelType())) +// .map(rel ->{ +// +// TypedRow tr = new TypedRow(); +// tr.setSourceId(rel.getSource()); +// tr.setTargetId(rel.getTarget()); +// return tr; +// }) +// .mapToPair(toPair()); +// +// //relationships from project to result. One pair for each relationship for results having allowed semantics relation with another result +// JavaPairRDD project_result = result_project.join(result_result) +// .map(c -> { +// String projectId = c._2()._1().getTargetId(); +// String resultId = c._2()._2().getTargetId(); +// TypedRow tr = new TypedRow(); tr.setSourceId(projectId); tr.setTargetId(resultId); +// return tr; +// }) +// .mapToPair(toPair()); +// +// //relationships from project to result. One Pair for each project => project id list of results related to the project +// JavaPairRDD project_results = relations +// .filter(r -> !r.getDataInfo().getDeletedbyinference()) +// .filter(r -> RELATION_PROJECT_RESULT_REL_CLASS.equals(r.getRelClass()) && RELATION_RESULTPROJECT_REL_TYPE.equals(r.getRelType())) +// .map(r -> { +// TypedRow tr = new TypedRow(); tr.setSourceId(r.getSource()); tr.setTargetId(r.getTarget()); +// return tr; +// }) +// .mapToPair(toPair()) +// .reduceByKey((a, b) -> { +// if (a == null) { +// return b; +// } +// if (b == null) { +// return a; +// } +// a.addAll(b.getAccumulator()); +// return a; +// }); +// +// +// +// JavaRDD newRels = project_result.join(project_results) +// .flatMap(c -> { +// String resId = c._2()._1().getTargetId(); +// +// if (c._2()._2().getAccumulator().contains(resId)) { +// return null; +// } +// String progId = c._2()._1().getSourceId(); +// List rels = new ArrayList(); +// +// rels.add(getRelation(progId, resId, RELATION_PROJECT_RESULT_REL_CLASS, +// RELATION_RESULTPROJECT_REL_TYPE, RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, +// PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); +// rels.add(getRelation(resId, progId, RELATION_RESULT_PROJECT_REL_CLASS, +// RELATION_RESULTPROJECT_REL_TYPE, RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, +// PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); +// return rels.iterator(); +// }) +// .cache(); +// +// newRels.map(p -> new ObjectMapper().writeValueAsString(p)) +// .saveAsTextFile(outputPath + "/relation_new"); +// +// newRels.union(relations).map(p -> new ObjectMapper().writeValueAsString(p)) +// .saveAsTextFile(outputPath + "/relation"); + + } + + + + +} diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/communitytoresultthroughsemrel/SparkResultToCommunityThroughSemRelJob.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java similarity index 97% rename from dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/communitytoresultthroughsemrel/SparkResultToCommunityThroughSemRelJob.java rename to dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java index acc411fd0c..7739ff99da 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/communitytoresultthroughsemrel/SparkResultToCommunityThroughSemRelJob.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java @@ -28,7 +28,12 @@ public class SparkResultToCommunityThroughSemRelJob { .toString(SparkResultToCommunityThroughSemRelJob.class .getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_propagationresultcommunityfromsemrel_parameters.json"))); parser.parseArgument(args); - SparkConf conf = new SparkConf(); + + for(String key : parser.getObjectMap().keySet()){ + System.out.println(key + " = " + parser.get(key)); + } + + /* SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); final SparkSession spark = SparkSession .builder() @@ -152,10 +157,10 @@ public class SparkResultToCommunityThroughSemRelJob { updateForOtherDataset(toupdateotherresult.toJavaRDD(), other.toJavaRDD(), outputPath, "otherresearchproduct", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); - updateForDatasetDataset(toupdatesoftwareresult.toJavaRDD(), dataset.toJavaRDD(), outputPath, "software", + updateForSoftwareDataset(toupdatesoftwareresult.toJavaRDD(), software.toJavaRDD(), outputPath, "software", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); - updateForOtherDataset(toupdatepublicationreresult.toJavaRDD(), other.toJavaRDD(), outputPath, "publication", + updateForPublicationDataset(toupdatepublicationreresult.toJavaRDD(), publication.toJavaRDD(), outputPath, "publication", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); @@ -197,7 +202,7 @@ public class SparkResultToCommunityThroughSemRelJob { */ } - private static org.apache.spark.sql.Dataset getUpdateCommunitiesForTable(SparkSession spark, String table){ + /* private static org.apache.spark.sql.Dataset getUpdateCommunitiesForTable(SparkSession spark, String table){ String query = "SELECT target_id, collect_set(co.id) context_id " + " FROM (SELECT t.id target_id, s.context source_context " + " FROM context_software s " + @@ -479,5 +484,5 @@ public class SparkResultToCommunityThroughSemRelJob { return tp; } return null; - } + }*/ } diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob2.java new file mode 100644 index 0000000000..2da8d648f8 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob2.java @@ -0,0 +1,495 @@ +package eu.dnetlib.dhp.resulttocommunityfromsemrel; + +import com.cloudera.com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.QueryInformationSystem; +import eu.dnetlib.dhp.TypedRow; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import scala.Tuple2; + +import java.util.*; +import java.util.stream.Collectors; + +import static eu.dnetlib.dhp.PropagationConstant.*; + +public class SparkResultToCommunityThroughSemRelJob { + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils + .toString(SparkResultToCommunityThroughSemRelJob.class + .getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_propagationresultcommunityfromsemrel_parameters.json"))); + parser.parseArgument(args); + + for(String key : parser.getObjectMap().keySet()){ + System.out.println(key + " = " + parser.get(key)); + } + + + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); + final SparkSession spark = SparkSession + .builder() + .appName(SparkResultToCommunityThroughSemRelJob.class.getSimpleName()) + .master(parser.get("master")) + .config(conf) + .enableHiveSupport() + .getOrCreate(); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final String inputPath = parser.get("sourcePath"); + final String outputPath = "/tmp/provision/propagation/resulttocommunityfromsemrel"; + + //final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); + final List allowedsemrel = Arrays.asList("isSupplementedBy", "isSupplementTo"); + //final List communityIdList = QueryInformationSystem.getCommunityList(parser.get("isLookupUrl")); + final List communityIdList = QueryInformationSystem.getCommunityList("http://beta.services.openaire.eu:8280/is/services/isLookUp"); + + createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); + + + JavaRDD all_publication_rdd = sc.textFile(inputPath + "/publication") + .map(item -> new ObjectMapper().readValue(item, Publication.class)) + .filter(p -> !p.getDataInfo().getDeletedbyinference()).cache(); + JavaRDD publication_rdd = all_publication_rdd + .filter(p -> relatedToCommunities(p, communityIdList)).cache(); + + JavaRDD all_dataset_rdd = sc.textFile(inputPath + "/dataset") + .map(item -> new ObjectMapper().readValue(item, Dataset.class)) + .filter(p -> !p.getDataInfo().getDeletedbyinference()).cache(); + JavaRDD dataset_rdd = all_dataset_rdd + .filter(p -> relatedToCommunities(p, communityIdList)).cache(); + + JavaRDD all_orp_rdd = sc.textFile(inputPath + "/otherresearchproduct") + .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class)) + .filter(p -> !p.getDataInfo().getDeletedbyinference()).cache(); + JavaRDD orp_rdd = all_orp_rdd.filter(p -> relatedToCommunities(p, communityIdList)).cache(); + + JavaRDD all_software_rdd = sc.textFile(inputPath + "/software") + .map(item -> new ObjectMapper().readValue(item, Software.class)) + .filter(p -> !p.getDataInfo().getDeletedbyinference()).cache(); + JavaRDD software_rdd = all_software_rdd.filter(p -> relatedToCommunities(p, communityIdList)).cache(); + + JavaRDD relation_rdd = sc.textFile(inputPath + "/relation") + .map(item -> new ObjectMapper().readValue(item, Relation.class)) + .filter(r -> !r.getDataInfo().getDeletedbyinference()) + .filter(r -> allowedsemrel.contains(r.getRelClass()) && RELATION_RESULTRESULT_REL_TYPE.equals(r.getRelType())).cache(); + + + org.apache.spark.sql.Dataset publication = spark.createDataset(publication_rdd.rdd(), + Encoders.bean(Publication.class)); + + org.apache.spark.sql.Dataset dataset = spark.createDataset(dataset_rdd.rdd(), + Encoders.bean(Dataset.class)); + + org.apache.spark.sql.Dataset other = spark.createDataset(orp_rdd.rdd(), + Encoders.bean(OtherResearchProduct.class)); + + org.apache.spark.sql.Dataset software = spark.createDataset(software_rdd.rdd(), + Encoders.bean(Software.class)); + + org.apache.spark.sql.Dataset relation = spark.createDataset(relation_rdd.rdd(), + Encoders.bean(Relation.class)); + + publication.createOrReplaceTempView("publication"); + relation.createOrReplaceTempView("relation"); + dataset.createOrReplaceTempView("dataset"); + software.createOrReplaceTempView("software"); + other.createOrReplaceTempView("other"); + +// org.apache.spark.sql.Dataset publication_context = getContext(spark, "publication"); +// publication_context.createOrReplaceTempView("publication_context"); + + org.apache.spark.sql.Dataset publication_context = spark.sql( "SELECT relation.source, " + + "publication.context , relation.target " + + "FROM publication " + + " JOIN relation " + + "ON id = source"); + + org.apache.spark.sql.Dataset dataset_context = getContext(spark, "dataset"); + dataset_context.createOrReplaceTempView("dataset_context"); + + org.apache.spark.sql.Dataset software_context = getContext(spark, "software"); + software_context.createOrReplaceTempView("software_context"); + + org.apache.spark.sql.Dataset other_context = getContext(spark, "other"); + other_context.createOrReplaceTempView("other_context"); + + publication = spark.createDataset(all_publication_rdd.rdd(), + Encoders.bean(Publication.class)); + publication.createOrReplaceTempView("publication"); + + dataset = spark.createDataset(all_dataset_rdd.rdd(), + Encoders.bean(Dataset.class)); + dataset.createOrReplaceTempView("dataset"); + + other = spark.createDataset(all_orp_rdd.rdd(), + Encoders.bean(OtherResearchProduct.class)); + other.createOrReplaceTempView("other"); + + software = spark.createDataset(all_software_rdd.rdd(), + Encoders.bean(Software.class)); + software.createOrReplaceTempView("software"); + + + org.apache.spark.sql.Dataset toupdatesoftwareresult = getUpdateCommunitiesForTable(spark, "software"); + org.apache.spark.sql.Dataset toupdatedatasetresult = getUpdateCommunitiesForTable(spark, "dataset"); + org.apache.spark.sql.Dataset toupdatepublicationreresult = getUpdateCommunitiesForTable(spark, "publication"); + org.apache.spark.sql.Dataset toupdateotherresult = getUpdateCommunitiesForTable(spark, "other"); + + createUpdateForResultDatasetWrite(toupdatesoftwareresult.toJavaRDD(), outputPath, "software_update", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + + createUpdateForResultDatasetWrite(toupdatedatasetresult.toJavaRDD(), outputPath, "dataset_update", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + + createUpdateForResultDatasetWrite(toupdatepublicationreresult.toJavaRDD(), outputPath, "publication_update", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + + createUpdateForResultDatasetWrite(toupdateotherresult.toJavaRDD(), outputPath, "other_update", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + + + updateForDatasetDataset(toupdatedatasetresult.toJavaRDD(), dataset.toJavaRDD(), outputPath, "dataset", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + + updateForOtherDataset(toupdateotherresult.toJavaRDD(), other.toJavaRDD(), outputPath, "otherresearchproduct", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + + updateForSoftwareDataset(toupdatesoftwareresult.toJavaRDD(), software.toJavaRDD(), outputPath, "software", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + + updateForPublicationDataset(toupdatepublicationreresult.toJavaRDD(), publication.toJavaRDD(), outputPath, "publication", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + + +/* + JavaPairRDD resultLinkedToCommunities = publication + .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"publication")) + .filter(p -> !(p == null)) + .mapToPair(toPair()) + .union(datasets + .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"dataset")) + .filter(p -> !(p == null)) + .mapToPair(toPair()) + ) + .union(software + .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"software")) + .filter(p -> !(p == null)) + .mapToPair(toPair()) + ) + .union(other + .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"otherresearchproduct")) + .filter(p -> !(p == null)) + .mapToPair(toPair()) + ); + + JavaPairRDD to_add_result_communities = resultLinkedToCommunities.join(result_result).map(r -> r._2()._1().setSourceId(r._2()._2().getTargetId())) + .mapToPair(toPair()); + + JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p)); + + updateResultForCommunity(pubs, to_add_result_communities, outputPath, "publication", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); + updateResultForCommunity(dss, to_add_result_communities, outputPath, "dataset", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); + updateResultForCommunity(sfw, to_add_result_communities, outputPath, "software", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); + updateResultForCommunity(orp, to_add_result_communities, outputPath, "otherresearchproduct", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); + //leftouterjoin result.to_add_result_communities (result = java pair rdd result) [left outer join perche' li voglio tutti anche quelli che non ho aggiornato] + //per quelli che matchano cercare nel risultato se i context da aggiungere sono gia' presenti. Se non ci sono aggiungerli, altrimenti nulla +*/ + } + + private static org.apache.spark.sql.Dataset getUpdateCommunitiesForTable(SparkSession spark, String table){ + String query = "SELECT target_id, collect_set(co.id) context_id " + + " FROM (SELECT t.id target_id, s.context source_context " + + " FROM context_software s " + + " JOIN " + table + " t " + + " ON s.target = t.id " + + " UNION ALL " + + " SELECT t.id target_id, d.context source_context " + + " FROM dataset_context d " + + " JOIN " + table + " t" + + " ON s.target = t.id " + + " UNION ALL " + + " SELECT t.id target_id, p.context source_context " + + " FROM publication_context p" + + " JOIN " + table +" t " + + " on p.target = t.id " + + " UNION ALL " + + " SELECT t.id target_id, o.context source_context " + + " FROM other_context o " + + " JOIN " + table + " t " + + " ON o.target = t.id) TMP " + + " LATERAL VIEW EXPLODE(source_context) MyT as co " + + " GROUP BY target_id" ; + + return spark.sql(query); + } + + private static JavaRDD createUpdateForResultDatasetWrite(JavaRDD toupdateresult, String outputPath, String type, String class_id, String class_name, List communityIdList){ + return toupdateresult.map(r -> { + List contextList = new ArrayList(); + List toAddContext = r.getList(1); + for (String cId : toAddContext) { + if (communityIdList.contains(cId)) { + Context newContext = new Context(); + newContext.setId(cId); + newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); + contextList.add(newContext); + } + + } + + if (contextList.size() > 0) { + Result ret = new Result(); + ret.setId(r.getString(0)); + ret.setContext(contextList); + return ret; + } + return null; + }).filter(r -> r != null); + } + + private static void updateForSoftwareDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ + JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); + getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) + .map(r -> (Software) r) + .map(s -> new ObjectMapper().writeValueAsString(s)) + .saveAsTextFile(outputPath + "/" + type); + } + + private static void updateForDatasetDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ + JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); + getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) + .map( r-> (Dataset)r) + .map(d -> new ObjectMapper().writeValueAsString(d)) + .saveAsTextFile(outputPath + "/" + type); + } + + private static void updateForPublicationDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ + JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); + getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) + .map(r -> (Publication)r) + .map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath + "/" + type); + } + + private static void updateForOtherDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ + JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); + getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) + .map( r -> (OtherResearchProduct)r) + .map( o -> new ObjectMapper().writeValueAsString(o)) + .saveAsTextFile(outputPath + "/" + type); + } + + + + private static JavaRDD getUpdateForResultDataset(JavaRDD toupdateresult, JavaPairRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ + return result.leftOuterJoin(toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1)))) + .map(c -> { + if(! c._2()._2().isPresent()){ + return c._2()._1(); + } + + List toAddContext = c._2()._2().get(); + Set context_set = new HashSet<>(); + for(Object cId: toAddContext){ + String id = (String)cId; + if (communityIdList.contains(id)){ + context_set.add(id); + } + } + for (Context context: c._2()._1().getContext()){ + if(context_set.contains(context)){ + context_set.remove(context); + } + } + + List contextList = context_set.stream().map(co -> { + Context newContext = new Context(); + newContext.setId(co); + newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); + return newContext; + + }).collect(Collectors.toList()); + + if(contextList.size() > 0 ){ + Result r = new Result(); + r.setId(c._1()); + r.setContext(contextList); + return r; + } + return null; + }).filter(r -> r != null); + + +// return toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1))) +// .join(result) +// .map(c -> { +// List toAddContext = c._2()._1(); +// Set context_set = new HashSet<>(); +// for(Object cId: toAddContext){ +// String id = (String)cId; +// if (communityIdList.contains(id)){ +// context_set.add(id); +// } +// } +// for (Context context: c._2()._2().getContext()){ +// if(context_set.contains(context)){ +// context_set.remove(context); +// } +// } +// +// List contextList = context_set.stream().map(co -> { +// Context newContext = new Context(); +// newContext.setId(co); +// newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); +// return newContext; +// +// }).collect(Collectors.toList()); +// +// if(contextList.size() > 0 ){ +// Result r = new Result(); +// r.setId(c._1()); +// r.setContext(contextList); +// return r; +// } +// return null; +// }) +// .filter(r -> r != null); + } + + private static JavaRDD createUpdateForSoftwareDataset(JavaRDD toupdateresult, List communityList, + JavaRDD result, String class_id, String class_name) { + return result + .mapToPair(s -> new Tuple2<>(s.getId(), s)).leftOuterJoin(getStringResultJavaPairRDD(toupdateresult, communityList)) + .map(c -> { + Software oaf = c._2()._1(); + if (c._2()._2().isPresent()) { + + HashSet contexts = new HashSet<>(c._2()._2().get()); + + for (Context context : oaf.getContext()) { + if (contexts.contains(context.getId())){ + if (!context.getDataInfo().stream().map(di -> di.getInferenceprovenance()) + .collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){ + context.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name)); + //community id already in the context of the result. Remove it from the set that has to be added + contexts.remove(context.getId()); + } + } + } + List cc = oaf.getContext(); + for(String cId: contexts){ + Context context = new Context(); + context.setId(cId); + context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); + cc.add(context); + } + oaf.setContext(cc); + + } + return oaf; + }); + } + + private static JavaPairRDD> getStringResultJavaPairRDD(JavaRDD toupdateresult, List communityList) { + return toupdateresult.mapToPair(c -> { + + List contextList = new ArrayList<>(); + List contexts = c.getList(1); + for (String context : contexts) { + if (communityList.contains(context)) { + contextList.add(context); + } + } + + return new Tuple2<>(c.getString(0) ,contextList); + }); + } + + + private static org.apache.spark.sql.Dataset getContext(SparkSession spark, String table){ + String query = "SELECT relation.source, " + table +".context , relation.target " + + "FROM " + table + + " JOIN relation " + + "ON id = source" ; + + return spark.sql(query); + } + + private static Boolean relatedToCommunities(Result r, List communityIdList) { + Set result_communities = r.getContext() + .stream() + .map(c -> c.getId()) + .collect(Collectors.toSet()); + for (String communityId : result_communities) { + if (communityIdList.contains(communityId)) { + return true; + } + } + return false; + } + + private static void updateResult(JavaPairRDD results, JavaPairRDD toupdateresult, String outputPath, String type) { + results.leftOuterJoin(toupdateresult) + .map(p -> { + Result r = p._2()._1(); + if (p._2()._2().isPresent()){ + Set communityList = p._2()._2().get().getAccumulator(); + for(Context c: r.getContext()){ + if (communityList.contains(c.getId())){ + //verify if the datainfo for this context contains propagation + if (!c.getDataInfo().stream().map(di -> di.getInferenceprovenance()).collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){ + c.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME)); + //community id already in the context of the result. Remove it from the set that has to be added + communityList.remove(c.getId()); + } + } + } + List cc = r.getContext(); + for(String cId: communityList){ + Context context = new Context(); + context.setId(cId); + context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME))); + cc.add(context); + } + r.setContext(cc); + } + return r; + }) + .map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath+"/"+type); + } + + + + private static TypedRow getTypedRow(List communityIdList, List context, String id, String type) { + Set result_communities = context + .stream() + .map(c -> c.getId()) + .collect(Collectors.toSet()); + TypedRow tp = new TypedRow(); + tp.setSourceId(id); + tp.setType(type); + for (String communityId : result_communities) { + if (communityIdList.contains(communityId)) { + tp.add(communityId); + } + } + if (tp.getAccumulator() != null) { + return tp; + } + return null; + } +} diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob3.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob3.java new file mode 100644 index 0000000000..c55c0e8eac --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob3.java @@ -0,0 +1,484 @@ +package eu.dnetlib.dhp.resulttocommunityfromsemrel; + +import com.cloudera.com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.QueryInformationSystem; +import eu.dnetlib.dhp.TypedRow; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import scala.Tuple2; + +import java.util.*; +import java.util.stream.Collectors; + +import static eu.dnetlib.dhp.PropagationConstant.*; + +public class SparkResultToCommunityThroughSemRelJob2 { + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils + .toString(SparkResultToCommunityThroughSemRelJob2.class + .getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_propagationresultcommunityfromsemrel_parameters.json"))); + parser.parseArgument(args); + + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); + final SparkSession spark = SparkSession + .builder() + .appName(SparkResultToCommunityThroughSemRelJob2.class.getSimpleName()) + .master(parser.get("master")) + .config(conf) + .enableHiveSupport() + .getOrCreate(); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final String inputPath = parser.get("sourcePath"); + final String outputPath = "/tmp/provision/propagation/resulttocommunityfromsemrel"; + + final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); + //final List allowedsemrel = Arrays.asList("isSupplementedBy", "isSupplementTo"); + final List communityIdList = QueryInformationSystem.getCommunityList(parser.get("isLookupUrl")); + //final List communityIdList = QueryInformationSystem.getCommunityList("http://beta.services.openaire.eu:8280/is/services/isLookUp"); + + createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); + + + JavaRDD publication_rdd = sc.textFile(inputPath + "/publication") + .map(item -> new ObjectMapper().readValue(item, Publication.class)); + +// JavaRDD dataset_rdd = sc.textFile(inputPath + "/dataset") +// .map(item -> new ObjectMapper().readValue(item, Dataset.class)); +// +// JavaRDD orp_rdd = sc.textFile(inputPath + "/otherresearchproduct") +// .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class)); +// +// JavaRDD software_rdd = sc.textFile(inputPath + "/software") +// .map(item -> new ObjectMapper().readValue(item, Software.class)); + + JavaRDD relation_rdd = sc.textFile(inputPath + "/relation") + .map(item -> new ObjectMapper().readValue(item, Relation.class)); + +// .filter(r -> !r.getDataInfo().getDeletedbyinference()) +// .filter(r -> allowedsemrel.contains(r.getRelClass()) && RELATION_RESULTRESULT_REL_TYPE.equals(r.getRelType())).cache(); + + + org.apache.spark.sql.Dataset publication = spark.createDataset(publication_rdd.rdd(), + Encoders.bean(Publication.class)); + + org.apache.spark.sql.Dataset relation = spark.createDataset(relation_rdd.rdd(), + Encoders.bean(Relation.class)); + +// org.apache.spark.sql.Dataset dataset = spark.createDataset(dataset_rdd.rdd(), +// Encoders.bean(Dataset.class)); +// +// org.apache.spark.sql.Dataset other = spark.createDataset(orp_rdd.rdd(), +// Encoders.bean(OtherResearchProduct.class)); +// +// org.apache.spark.sql.Dataset software = spark.createDataset(software_rdd.rdd(), +// Encoders.bean(Software.class)); +// +// org.apache.spark.sql.Dataset relation = spark.createDataset(relation_rdd.rdd(), +// Encoders.bean(Relation.class)); + + publication.createOrReplaceTempView("publication"); + relation.createOrReplaceTempView("relation"); +// relation.createOrReplaceTempView("relation"); +// dataset.createOrReplaceTempView("dataset"); +// software.createOrReplaceTempView("software"); +// other.createOrReplaceTempView("other"); + + String communitylist = getConstraintList(" co.id = '", communityIdList); + + String semrellist = getConstraintList(" relClass = '", allowedsemrel ); + + + String query = "Select source, community_context, target " + + "from (select id, collect_set(co.id) community_context " + + "from publication " + + "lateral view explode (context) c as co " + + "where datainfo.deletedbyinference = false "+ communitylist + + " group by id) p " + + "JOIN " + + "(select * " + + "from relation " + + "where datainfo.deletedbyinference = false and (relClass = 'isSupplementedBy' OR relClass = 'isSupplementTo')) r " + + "ON p.id = r.source"; + + + org.apache.spark.sql.Dataset publication_context = spark.sql( query); + publication_context.createOrReplaceTempView("publication_context"); + + //( source, (mes, dh-ch-, ni), target ) + query = "select target , collect_set(co) " + + "from (select target, community_context " + + "from publication_context pc join publication p on " + + "p.id = pc.source) tmp " + + "lateral view explode (community_context) c as co " + + "group by target"; + + + + org.apache.spark.sql.Dataset toupdatepublicationreresult = spark.sql(query); + + +// org.apache.spark.sql.Dataset toupdatesoftwareresult = getUpdateCommunitiesForTable(spark, "software"); +// org.apache.spark.sql.Dataset toupdatedatasetresult = getUpdateCommunitiesForTable(spark, "dataset"); +// org.apache.spark.sql.Dataset toupdatepublicationreresult = getUpdateCommunitiesForTable(spark, "publication"); +// org.apache.spark.sql.Dataset toupdateotherresult = getUpdateCommunitiesForTable(spark, "other"); + +// createUpdateForResultDatasetWrite(toupdatesoftwareresult.toJavaRDD(), outputPath, "software_update", +// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); +// +// createUpdateForResultDatasetWrite(toupdatedatasetresult.toJavaRDD(), outputPath, "dataset_update", +// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + + createUpdateForResultDatasetWrite(toupdatepublicationreresult.toJavaRDD(), outputPath, "publication_update", + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); + +// createUpdateForResultDatasetWrite(toupdateotherresult.toJavaRDD(), outputPath, "other_update", +// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); +// +// +// updateForDatasetDataset(toupdatedatasetresult.toJavaRDD(), dataset.toJavaRDD(), outputPath, "dataset", +// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); +// +// updateForOtherDataset(toupdateotherresult.toJavaRDD(), other.toJavaRDD(), outputPath, "otherresearchproduct", +// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); +// +// updateForSoftwareDataset(toupdatesoftwareresult.toJavaRDD(), software.toJavaRDD(), outputPath, "software", +// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); +// +// updateForPublicationDataset(toupdatepublicationreresult.toJavaRDD(), publication.toJavaRDD(), outputPath, "publication", +// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); +// + +/* + JavaPairRDD resultLinkedToCommunities = publication + .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"publication")) + .filter(p -> !(p == null)) + .mapToPair(toPair()) + .union(datasets + .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"dataset")) + .filter(p -> !(p == null)) + .mapToPair(toPair()) + ) + .union(software + .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"software")) + .filter(p -> !(p == null)) + .mapToPair(toPair()) + ) + .union(other + .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"otherresearchproduct")) + .filter(p -> !(p == null)) + .mapToPair(toPair()) + ); + + JavaPairRDD to_add_result_communities = resultLinkedToCommunities.join(result_result).map(r -> r._2()._1().setSourceId(r._2()._2().getTargetId())) + .mapToPair(toPair()); + + JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p)); + + updateResultForCommunity(pubs, to_add_result_communities, outputPath, "publication", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); + updateResultForCommunity(dss, to_add_result_communities, outputPath, "dataset", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); + updateResultForCommunity(sfw, to_add_result_communities, outputPath, "software", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); + updateResultForCommunity(orp, to_add_result_communities, outputPath, "otherresearchproduct", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); + //leftouterjoin result.to_add_result_communities (result = java pair rdd result) [left outer join perche' li voglio tutti anche quelli che non ho aggiornato] + //per quelli che matchano cercare nel risultato se i context da aggiungere sono gia' presenti. Se non ci sono aggiungerli, altrimenti nulla +*/ + } + + private static org.apache.spark.sql.Dataset getUpdateCommunitiesForTable(SparkSession spark, String table){ + String query = "SELECT target_id, collect_set(co.id) context_id " + + " FROM (SELECT t.id target_id, s.context source_context " + + " FROM context_software s " + + " JOIN " + table + " t " + + " ON s.target = t.id " + + " UNION ALL " + + " SELECT t.id target_id, d.context source_context " + + " FROM dataset_context d " + + " JOIN " + table + " t" + + " ON s.target = t.id " + + " UNION ALL " + + " SELECT t.id target_id, p.context source_context " + + " FROM publication_context p" + + " JOIN " + table +" t " + + " on p.target = t.id " + + " UNION ALL " + + " SELECT t.id target_id, o.context source_context " + + " FROM other_context o " + + " JOIN " + table + " t " + + " ON o.target = t.id) TMP " + + " LATERAL VIEW EXPLODE(source_context) MyT as co " + + " GROUP BY target_id" ; + + return spark.sql(query); + } + + private static JavaRDD createUpdateForResultDatasetWrite(JavaRDD toupdateresult, String outputPath, String type, String class_id, String class_name, List communityIdList){ + return toupdateresult.map(r -> { + List contextList = new ArrayList(); + List toAddContext = r.getList(1); + for (String cId : toAddContext) { + if (communityIdList.contains(cId)) { + Context newContext = new Context(); + newContext.setId(cId); + newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); + contextList.add(newContext); + } + + } + + if (contextList.size() > 0) { + Result ret = new Result(); + ret.setId(r.getString(0)); + ret.setContext(contextList); + return ret; + } + return null; + }).filter(r -> r != null); + } + + private static void updateForSoftwareDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ + JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); + getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) + .map(r -> (Software) r) + .map(s -> new ObjectMapper().writeValueAsString(s)) + .saveAsTextFile(outputPath + "/" + type); + } + + private static void updateForDatasetDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ + JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); + getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) + .map( r-> (Dataset)r) + .map(d -> new ObjectMapper().writeValueAsString(d)) + .saveAsTextFile(outputPath + "/" + type); + } + + private static void updateForPublicationDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ + JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); + getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) + .map(r -> (Publication)r) + .map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath + "/" + type); + } + + private static void updateForOtherDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ + JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); + getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList) + .map( r -> (OtherResearchProduct)r) + .map( o -> new ObjectMapper().writeValueAsString(o)) + .saveAsTextFile(outputPath + "/" + type); + } + + + + private static JavaRDD getUpdateForResultDataset(JavaRDD toupdateresult, JavaPairRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){ + return result.leftOuterJoin(toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1)))) + .map(c -> { + if(! c._2()._2().isPresent()){ + return c._2()._1(); + } + + List toAddContext = c._2()._2().get(); + Set context_set = new HashSet<>(); + for(Object cId: toAddContext){ + String id = (String)cId; + if (communityIdList.contains(id)){ + context_set.add(id); + } + } + for (Context context: c._2()._1().getContext()){ + if(context_set.contains(context)){ + context_set.remove(context); + } + } + + List contextList = context_set.stream().map(co -> { + Context newContext = new Context(); + newContext.setId(co); + newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); + return newContext; + + }).collect(Collectors.toList()); + + if(contextList.size() > 0 ){ + Result r = new Result(); + r.setId(c._1()); + r.setContext(contextList); + return r; + } + return null; + }).filter(r -> r != null); + + +// return toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1))) +// .join(result) +// .map(c -> { +// List toAddContext = c._2()._1(); +// Set context_set = new HashSet<>(); +// for(Object cId: toAddContext){ +// String id = (String)cId; +// if (communityIdList.contains(id)){ +// context_set.add(id); +// } +// } +// for (Context context: c._2()._2().getContext()){ +// if(context_set.contains(context)){ +// context_set.remove(context); +// } +// } +// +// List contextList = context_set.stream().map(co -> { +// Context newContext = new Context(); +// newContext.setId(co); +// newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); +// return newContext; +// +// }).collect(Collectors.toList()); +// +// if(contextList.size() > 0 ){ +// Result r = new Result(); +// r.setId(c._1()); +// r.setContext(contextList); +// return r; +// } +// return null; +// }) +// .filter(r -> r != null); + } + + private static JavaRDD createUpdateForSoftwareDataset(JavaRDD toupdateresult, List communityList, + JavaRDD result, String class_id, String class_name) { + return result + .mapToPair(s -> new Tuple2<>(s.getId(), s)).leftOuterJoin(getStringResultJavaPairRDD(toupdateresult, communityList)) + .map(c -> { + Software oaf = c._2()._1(); + if (c._2()._2().isPresent()) { + + HashSet contexts = new HashSet<>(c._2()._2().get()); + + for (Context context : oaf.getContext()) { + if (contexts.contains(context.getId())){ + if (!context.getDataInfo().stream().map(di -> di.getInferenceprovenance()) + .collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){ + context.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name)); + //community id already in the context of the result. Remove it from the set that has to be added + contexts.remove(context.getId()); + } + } + } + List cc = oaf.getContext(); + for(String cId: contexts){ + Context context = new Context(); + context.setId(cId); + context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); + cc.add(context); + } + oaf.setContext(cc); + + } + return oaf; + }); + } + + private static JavaPairRDD> getStringResultJavaPairRDD(JavaRDD toupdateresult, List communityList) { + return toupdateresult.mapToPair(c -> { + + List contextList = new ArrayList<>(); + List contexts = c.getList(1); + for (String context : contexts) { + if (communityList.contains(context)) { + contextList.add(context); + } + } + + return new Tuple2<>(c.getString(0) ,contextList); + }); + } + + + private static org.apache.spark.sql.Dataset getContext(SparkSession spark, String table){ + String query = "SELECT relation.source, " + table +".context , relation.target " + + "FROM " + table + + " JOIN relation " + + "ON id = source" ; + + return spark.sql(query); + } + + private static Boolean relatedToCommunities(Result r, List communityIdList) { + Set result_communities = r.getContext() + .stream() + .map(c -> c.getId()) + .collect(Collectors.toSet()); + for (String communityId : result_communities) { + if (communityIdList.contains(communityId)) { + return true; + } + } + return false; + } + + private static void updateResult(JavaPairRDD results, JavaPairRDD toupdateresult, String outputPath, String type) { + results.leftOuterJoin(toupdateresult) + .map(p -> { + Result r = p._2()._1(); + if (p._2()._2().isPresent()){ + Set communityList = p._2()._2().get().getAccumulator(); + for(Context c: r.getContext()){ + if (communityList.contains(c.getId())){ + //verify if the datainfo for this context contains propagation + if (!c.getDataInfo().stream().map(di -> di.getInferenceprovenance()).collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){ + c.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME)); + //community id already in the context of the result. Remove it from the set that has to be added + communityList.remove(c.getId()); + } + } + } + List cc = r.getContext(); + for(String cId: communityList){ + Context context = new Context(); + context.setId(cId); + context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME))); + cc.add(context); + } + r.setContext(cc); + } + return r; + }) + .map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath+"/"+type); + } + + + + private static TypedRow getTypedRow(List communityIdList, List context, String id, String type) { + Set result_communities = context + .stream() + .map(c -> c.getId()) + .collect(Collectors.toSet()); + TypedRow tp = new TypedRow(); + tp.setSourceId(id); + tp.setType(type); + for (String communityId : result_communities) { + if (communityIdList.contains(communityId)) { + tp.add(communityId); + } + } + if (tp.getAccumulator() != null) { + return tp; + } + return null; + } +} diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json index ff1b79d9c5..81fead58fa 100644 --- a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json @@ -28,5 +28,17 @@ "paramLongName":"hive_metastore_uris", "paramDescription": "the hive metastore uris", "paramRequired": true + }, + { + "paramName":"wu", + "paramLongName":"writeUpdate", + "paramDescription": "true if the update must be writte. No double check if information is already present", + "paramRequired": true + }, + { + "paramName":"sg", + "paramLongName":"saveGraph", + "paramDescription": "true if the new version of the graph must be saved", + "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/config-default.xml b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/config-default.xml index ea3a4d9223..2744ea92ba 100644 --- a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/config-default.xml @@ -19,4 +19,40 @@ hive_metastore_uris thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + spark2YarnHistoryServerAddress + http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 + + + spark2EventLogDir + /user/spark/spark2ApplicationHistory + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + + + sparkExecutorNumber + 4 + + + sparkDriverMemory + 15G + + + sparkExecutorMemory + 6G + + + sparkExecutorCores + 1 + + + spark2MaxExecutors + 50 + \ No newline at end of file diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml index 62d454fd82..aa1e6dc78e 100644 --- a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml @@ -24,6 +24,18 @@ sparkExecutorCores number of cores used by single executor + + sparkExecutorNumber + number of executors used + + + writeUpdate + writes the information found for the update. No double check done if the information is already present + + + saveGraph + writes new version of the graph after the propagation step + @@ -41,17 +53,25 @@ CountryPropagation eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob dhp-propagation-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} - --executor-cores ${sparkExecutorCores} + + --num-executors=${sparkExecutorNumber} + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" - --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.dynamicAllocation.enabled=true + --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} -mt yarn-cluster --sourcePath${sourcePath} --whitelist${whitelist} --allowedtypes${allowedtypes} --hive_metastore_uris${hive_metastore_uris} + --writeUpdate${writeUpdate} + --saveGraph${saveGraph}