forked from D-Net/dnet-hadoop
new config for countrypropagation
This commit is contained in:
parent
dd011f4a95
commit
beebbcf66b
|
@ -0,0 +1,27 @@
|
|||
[
|
||||
{
|
||||
"paramName":"is",
|
||||
"paramLongName":"isLookupUrl",
|
||||
"paramDescription": "URL of the isLookUp Service",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName":"mt",
|
||||
"paramLongName":"master",
|
||||
"paramDescription": "should be local or yarn",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName":"s",
|
||||
"paramLongName":"sourcePath",
|
||||
"paramDescription": "the path of the sequencial file to read",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "pm",
|
||||
"paramLongName":"protoMap",
|
||||
"paramDescription": "the json path associated to each selection field",
|
||||
"paramRequired": true
|
||||
}
|
||||
|
||||
]
|
|
@ -0,0 +1,22 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hive_metastore_uris</name>
|
||||
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -0,0 +1,61 @@
|
|||
<workflow-app name="result_to_community_from_semrel_propagation" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>sourcePath</name>
|
||||
<description>the source path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>allowedsemrels</name>
|
||||
<description>the semantic relationships allowed for propagation</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>isLookupUrl</name>
|
||||
<description>the isLookup service endpoint</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="ResultToCommunityFromSemRelPropagation"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="ResultToCommunityFromSemRelPropagation">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ResultToCommunitySemRelPropagation</name>
|
||||
<class>eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob</class>
|
||||
<jar>dhp-propagation-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory}
|
||||
--executor-cores ${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
|
||||
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
|
||||
</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
||||
<!-- <arg>-allowedsemrels</arg><arg>${allowedsemrels}</arg>-->
|
||||
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
|
||||
<!-- <arg>-isLookupUrl</arg><arg>${isLookupUrl}</arg>-->
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -0,0 +1,4 @@
|
|||
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
|
||||
|
||||
public class Author {
|
||||
}
|
|
@ -0,0 +1,4 @@
|
|||
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
|
||||
|
||||
public class AutoritativeAuthor {
|
||||
}
|
|
@ -0,0 +1,4 @@
|
|||
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
|
||||
|
||||
public class ResultWithOrcid {
|
||||
}
|
|
@ -0,0 +1,317 @@
|
|||
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.dhp.TypedRow;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.Function;
|
||||
import org.apache.spark.api.java.function.PairFunction;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||
|
||||
public class SparkOrcidToResultFromSemRelJob {
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkOrcidToResultFromSemRelJob.class.getResourceAsStream("/eu/dnetlib/dhp/orcidtoresultfromremrel/input_orcidtoresult_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
final SparkSession spark = SparkSession
|
||||
.builder()
|
||||
.appName(SparkOrcidToResultFromSemRelJob.class.getSimpleName())
|
||||
.master(parser.get("master"))
|
||||
.enableHiveSupport()
|
||||
.getOrCreate();
|
||||
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
final String inputPath = parser.get("sourcePath");
|
||||
final String outputPath = "/tmp/provision/propagation/orcidtoresult";
|
||||
|
||||
final List<String> allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";"));
|
||||
boolean writeUpdate = TRUE.equals(parser.get("writeUpdate"));
|
||||
boolean saveGraph = TRUE.equals(parser.get("saveGraph"));
|
||||
|
||||
createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration()));
|
||||
|
||||
JavaRDD<Relation> relations = sc.textFile(inputPath + "/relation")
|
||||
.map(item -> new ObjectMapper().readValue(item, Relation.class)).cache();
|
||||
|
||||
JavaPairRDD<String, TypedRow> result_result = getResultResultSemRel(allowedsemrel, relations);
|
||||
|
||||
JavaRDD<Publication> publications = sc.textFile(inputPath + "/publication")
|
||||
.map(item -> new ObjectMapper().readValue(item, Publication.class));
|
||||
JavaRDD<Dataset> datasets = sc.textFile(inputPath + "/dataset")
|
||||
.map(item -> new ObjectMapper().readValue(item, Dataset.class));
|
||||
JavaRDD<Software> software = sc.textFile(inputPath + "/software")
|
||||
.map(item -> new ObjectMapper().readValue(item, Software.class));
|
||||
JavaRDD<OtherResearchProduct> other = sc.textFile(inputPath + "/otherresearchproduct")
|
||||
.map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class));
|
||||
|
||||
//get the results having at least one author pid we are interested in
|
||||
JavaPairRDD<String, TypedRow> resultswithorcid = publications.map(p -> getTypedRow(p))
|
||||
.filter(p -> !(p == null))
|
||||
.mapToPair(toPair())
|
||||
.union(datasets.map(p -> getTypedRow(p))
|
||||
.filter(p -> !(p == null))
|
||||
.mapToPair(toPair()))
|
||||
.union(software.map(p -> getTypedRow(p))
|
||||
.filter(p -> !(p == null))
|
||||
.mapToPair(toPair()))
|
||||
.union(other.map(p -> getTypedRow(p))
|
||||
.filter(p -> !(p == null))
|
||||
.mapToPair(toPair()));
|
||||
|
||||
|
||||
JavaPairRDD<String, TypedRow> to_add_orcid_to_result = resultswithorcid.join(result_result)
|
||||
.map(p -> p._2()._1().setSourceId(p._2()._2().getTargetId())) //associate the pid of the result (target) which should get the orcid to the typed row containing the authors with the orcid from the result(source)
|
||||
.mapToPair(toPair());
|
||||
|
||||
JavaPairRDD<String, Result> pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p));
|
||||
JavaPairRDD<String, Result> dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p));
|
||||
JavaPairRDD<String, Result> sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p));
|
||||
JavaPairRDD<String, Result> orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p));
|
||||
|
||||
if(writeUpdate){
|
||||
writeResult(pubs, to_add_orcid_to_result, outputPath, "publication");
|
||||
writeResult(dss, to_add_orcid_to_result, outputPath, "dataset");
|
||||
writeResult(sfw, to_add_orcid_to_result, outputPath, "software");
|
||||
writeResult(orp, to_add_orcid_to_result, outputPath, "otherresearchproduct");
|
||||
}
|
||||
|
||||
if (saveGraph){
|
||||
updateResult(pubs, to_add_orcid_to_result, outputPath, "publication");
|
||||
updateResult(dss, to_add_orcid_to_result, outputPath, "dataset");
|
||||
updateResult(sfw, to_add_orcid_to_result, outputPath, "software");
|
||||
updateResult(orp, to_add_orcid_to_result, outputPath, "otherresearchproduct");
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
private static Author enrichAutor(Author autoritative_author, Author author) {
|
||||
boolean toaddpid = false;
|
||||
|
||||
if (StringUtils.isNoneEmpty(autoritative_author.getSurname())) {
|
||||
if (StringUtils.isNoneEmpty(author.getSurname())) {
|
||||
if (autoritative_author.getSurname().trim().equalsIgnoreCase(author.getSurname().trim())) {
|
||||
|
||||
//have the same surname. Check the name
|
||||
if (StringUtils.isNoneEmpty(autoritative_author.getName())) {
|
||||
if (StringUtils.isNoneEmpty(author.getName())) {
|
||||
if (autoritative_author.getName().trim().equalsIgnoreCase(author.getName().trim())) {
|
||||
toaddpid = true;
|
||||
}
|
||||
//they could be differently written (i.e. only the initials of the name in one of the two
|
||||
if (autoritative_author.getName().trim().substring(0, 0).equalsIgnoreCase(author.getName().trim().substring(0, 0))) {
|
||||
toaddpid = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (toaddpid){
|
||||
StructuredProperty pid = new StructuredProperty();
|
||||
for(StructuredProperty sp : autoritative_author.getPid()){
|
||||
if (PROPAGATION_AUTHOR_PID.equals(sp.getQualifier().getClassid())){
|
||||
pid.setValue(sp.getValue());
|
||||
pid.setQualifier(getQualifier(sp.getQualifier().getClassid(),sp.getQualifier().getClassname() ));
|
||||
pid.setDataInfo(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID, PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME));
|
||||
if(author.getPid() == null){
|
||||
author.setPid(Arrays.asList(pid));
|
||||
}else{
|
||||
author.getPid().add(pid);
|
||||
}
|
||||
}
|
||||
}
|
||||
return author;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
private static List<Author> enrichAutors(List<Author> autoritative_authors, List<Author> to_enrich_authors, boolean filter){
|
||||
// List<Author> autoritative_authors = p._2()._2().get().getAuthors();
|
||||
// List<Author> to_enrich_authors = r.getAuthor();
|
||||
|
||||
return to_enrich_authors
|
||||
.stream()
|
||||
.map(a -> {
|
||||
if (filter) {
|
||||
if (containsAllowedPid(a)) {
|
||||
return a;
|
||||
}
|
||||
}
|
||||
|
||||
List<Author> lst = autoritative_authors.stream()
|
||||
.map(aa -> enrichAutor(aa, a)).filter(au -> !(au == null)).collect(Collectors.toList());
|
||||
if (lst.size() == 0) {
|
||||
return a;
|
||||
}
|
||||
return lst.get(0);//Each author can be enriched at most once. It cannot be the same as many different people
|
||||
|
||||
}).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private static void writeResult(JavaPairRDD<String, Result> results, JavaPairRDD<String, TypedRow> toupdateresult,
|
||||
String outputPath, String type) {
|
||||
|
||||
results.join(toupdateresult)
|
||||
.map(p -> {
|
||||
Result r = p._2()._1();
|
||||
|
||||
List<Author> autoritative_authors = p._2()._2().getAuthors();
|
||||
List<Author> to_enrich_authors = r.getAuthor();
|
||||
|
||||
r.setAuthor(enrichAutors(autoritative_authors, to_enrich_authors, false));
|
||||
// .stream()
|
||||
// .map(a -> {
|
||||
// if(filter) {
|
||||
// if (containsAllowedPid(a)) {
|
||||
// return a;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// List<Author> lst = autoritative_authors.stream()
|
||||
// .map(aa -> enrichAutor(aa, a)).filter(au -> !(au == null)).collect(Collectors.toList());
|
||||
// if(lst.size() == 0){
|
||||
// return a;
|
||||
// }
|
||||
// return lst.get(0);//Each author can be enriched at most once. It cannot be the same as many different people
|
||||
//
|
||||
// }).collect(Collectors.toList()));
|
||||
|
||||
return r;
|
||||
})
|
||||
.map(p -> new ObjectMapper().writeValueAsString(p))
|
||||
.saveAsTextFile(outputPath + "/" + type + "_update");
|
||||
}
|
||||
|
||||
|
||||
private static void updateResult(JavaPairRDD<String, Result> results, JavaPairRDD<String, TypedRow> toupdateresult,
|
||||
String outputPath, String type) {
|
||||
results.leftOuterJoin(toupdateresult)
|
||||
.map(p -> {
|
||||
Result r = p._2()._1();
|
||||
if (p._2()._2().isPresent()){
|
||||
List<Author> autoritative_authors = p._2()._2().get().getAuthors();
|
||||
List<Author> to_enrich_authors = r.getAuthor();
|
||||
|
||||
r.setAuthor(enrichAutors(autoritative_authors, to_enrich_authors, true));
|
||||
// .stream()
|
||||
// .map(a -> {
|
||||
// if(filter) {
|
||||
// if (containsAllowedPid(a)) {
|
||||
// return a;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// List<Author> lst = autoritative_authors.stream()
|
||||
// .map(aa -> enrichAutor(aa, a)).filter(au -> !(au == null)).collect(Collectors.toList());
|
||||
// if(lst.size() == 0){
|
||||
// return a;
|
||||
// }
|
||||
// return lst.get(0);//Each author can be enriched at most once. It cannot be the same as many different people
|
||||
//
|
||||
// }).collect(Collectors.toList()));
|
||||
}
|
||||
return r;
|
||||
})
|
||||
.map(p -> new ObjectMapper().writeValueAsString(p))
|
||||
.saveAsTextFile(outputPath+"/"+type);
|
||||
}
|
||||
|
||||
private static TypedRow getTypedRow(Result p) {
|
||||
TypedRow tp = new TypedRow();
|
||||
tp.setSourceId(p.getId());
|
||||
List<Author> authorList = p.getAuthor()
|
||||
.stream()
|
||||
.map(a -> {
|
||||
if (a.getPid().stream().map(pid -> {
|
||||
if (PROPAGATION_AUTHOR_PID.equals(pid.getQualifier().getClassid())) {
|
||||
return a;
|
||||
}
|
||||
return null;
|
||||
}).filter(aut -> !(aut == null)).collect(Collectors.toList()).size() > 0){
|
||||
return a;
|
||||
}
|
||||
return null;
|
||||
}).filter(a -> !(a == null)).collect(Collectors.toList());
|
||||
tp.setAuthors(authorList);
|
||||
if(authorList.size() > 0){
|
||||
return tp;
|
||||
}
|
||||
return null;
|
||||
|
||||
|
||||
}
|
||||
|
||||
private static boolean containsAllowedPid(Author a){
|
||||
|
||||
|
||||
return (a.getPid().stream().map(pid -> {
|
||||
if (PROPAGATION_AUTHOR_PID.equals(pid.getQualifier().getClassid())) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}).filter(aut -> (aut == true)).collect(Collectors.toList()).size()) > 0;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
/*private ResultProtos.Result.Metadata.Builder searchMatch(List<FieldTypeProtos.Author> author_list){
|
||||
ResultProtos.Result.Metadata.Builder metadataBuilder = ResultProtos.Result.Metadata.newBuilder();
|
||||
boolean updated = false;
|
||||
|
||||
for (FieldTypeProtos.Author a: author_list){
|
||||
FieldTypeProtos.Author.Builder author = searchAuthor(a, autoritative_authors);
|
||||
if(author != null){
|
||||
updated = true;
|
||||
metadataBuilder.addAuthor(author);
|
||||
}else{
|
||||
metadataBuilder.addAuthor(FieldTypeProtos.Author.newBuilder(a));
|
||||
}
|
||||
}
|
||||
if(updated)
|
||||
return metadataBuilder;
|
||||
return null;
|
||||
}
|
||||
private FieldTypeProtos.Author.Builder searchAuthor(FieldTypeProtos.Author a, List<FieldTypeProtos.Author> author_list){
|
||||
if(containsOrcid(a.getPidList()))
|
||||
return null;
|
||||
for(FieldTypeProtos.Author autoritative_author : author_list) {
|
||||
if (equals(autoritative_author, a)) {
|
||||
if(!containsOrcid(a.getPidList()))
|
||||
return update(a, autoritative_author);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
|
||||
}
|
||||
|
||||
private boolean containsOrcid(List<FieldTypeProtos.KeyValue> pidList){
|
||||
if(pidList == null)
|
||||
return false;
|
||||
return pidList
|
||||
.stream()
|
||||
.filter(kv -> kv.getKey().equals(PropagationConstants.AUTHOR_PID))
|
||||
.collect(Collectors.toList()).size() > 0;
|
||||
}
|
||||
*/
|
|
@ -0,0 +1,222 @@
|
|||
package eu.dnetlib.dhp.projecttoresult;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.dhp.TypedRow;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.*;
|
||||
|
||||
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||
import static eu.dnetlib.dhp.PropagationConstant.toPair;
|
||||
|
||||
public class SparkResultToProjectThroughSemRelJob {
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkResultToProjectThroughSemRelJob.class.getResourceAsStream("/eu/dnetlib/dhp/projecttoresult/input_projecttoresult_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
|
||||
final SparkSession spark = SparkSession
|
||||
.builder()
|
||||
.appName(SparkResultToProjectThroughSemRelJob.class.getSimpleName())
|
||||
.master(parser.get("master"))
|
||||
.config(conf)
|
||||
.enableHiveSupport()
|
||||
.getOrCreate();
|
||||
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
final String inputPath = parser.get("sourcePath");
|
||||
final String outputPath = "/tmp/provision/propagation/projecttoresult";
|
||||
|
||||
final List<String> allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";"));
|
||||
|
||||
createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration()));
|
||||
|
||||
JavaRDD<Relation> all_relations = sc.textFile(inputPath + "/relation")
|
||||
.map(item -> new ObjectMapper().readValue(item, Relation.class));
|
||||
|
||||
JavaRDD<Relation> relations = all_relations.filter(r -> !r.getDataInfo().getDeletedbyinference()).cache();
|
||||
|
||||
JavaRDD<Relation> result_result = relations
|
||||
.filter(r -> allowedsemrel.contains(r.getRelClass()) && RELATION_RESULTRESULT_REL_TYPE.equals(r.getRelType()));
|
||||
|
||||
org.apache.spark.sql.Dataset<Relation> resres_relation = spark.createDataset(result_result.rdd(),
|
||||
Encoders.bean(Relation.class));
|
||||
|
||||
JavaRDD<Relation> result_project = relations
|
||||
.filter(r -> RELATION_RESULT_PROJECT_REL_CLASS.equals(r.getRelClass())
|
||||
&& RELATION_RESULTPROJECT_REL_TYPE.equals(r.getRelType()));
|
||||
|
||||
org.apache.spark.sql.Dataset<Relation> resproj_relation = spark.createDataset(result_project.rdd(),
|
||||
Encoders.bean(Relation.class));
|
||||
|
||||
resres_relation.createOrReplaceTempView("resres_relation");
|
||||
resproj_relation.createOrReplaceTempView("resproj_relation");
|
||||
|
||||
String query ="SELECT proj, collect_set(r1target) result_set " +
|
||||
"FROM (" +
|
||||
" SELECT r1.source as sourcer, r1.relclass as r1rel, r1.target as r1target, r2.target as proj " +
|
||||
" FROM resres_relation r1 " +
|
||||
" JOIN resproj_relation r2 " +
|
||||
" ON r1.source = r2.source " +
|
||||
" ) tmp " +
|
||||
"GROUP BY proj ";
|
||||
|
||||
Dataset<Row> toaddrelations = spark.sql(query);
|
||||
|
||||
|
||||
JavaPairRDD<String, TypedRow> project_resultlist = relations
|
||||
.filter(r -> RELATION_PROJECT_RESULT_REL_CLASS.equals(r.getRelClass()))
|
||||
.map(r -> {
|
||||
TypedRow tp = new TypedRow();
|
||||
tp.setSourceId(r.getSource());
|
||||
tp.add(r.getTarget());
|
||||
return tp;
|
||||
}).mapToPair(toPair())
|
||||
.reduceByKey((a, b) -> {
|
||||
if (a == null) {
|
||||
return b;
|
||||
}
|
||||
if (b == null) {
|
||||
return a;
|
||||
}
|
||||
|
||||
a.addAll(b.getAccumulator());
|
||||
return a;
|
||||
}).cache();
|
||||
|
||||
|
||||
JavaRDD<Relation> new_relations = toaddrelations.toJavaRDD().mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1)))
|
||||
.leftOuterJoin(project_resultlist)
|
||||
.flatMap(c -> {
|
||||
List<Object> toAddRel = new ArrayList<>();
|
||||
toAddRel.addAll(c._2()._1());
|
||||
if (c._2()._2().isPresent()) {
|
||||
Set<String> originalRels = c._2()._2().get().getAccumulator();
|
||||
for (String o : originalRels) {
|
||||
if (toAddRel.contains(o)) {
|
||||
toAddRel.remove(o);
|
||||
}
|
||||
}
|
||||
}
|
||||
List<Relation> relationList = new ArrayList<>();
|
||||
String projId = c._1();
|
||||
for (Object r : toAddRel) {
|
||||
String rId = (String) r;
|
||||
relationList.add(getRelation(rId, projId, RELATION_RESULT_PROJECT_REL_CLASS, RELATION_RESULTPROJECT_REL_TYPE,
|
||||
RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE,
|
||||
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID,
|
||||
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME));
|
||||
relationList.add(getRelation(projId, rId, RELATION_PROJECT_RESULT_REL_CLASS, RELATION_RESULTPROJECT_REL_TYPE,
|
||||
RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE,
|
||||
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID,
|
||||
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME));
|
||||
|
||||
}
|
||||
return relationList.iterator();
|
||||
}).cache();
|
||||
|
||||
toaddrelations.toJavaRDD().map(r->new ObjectMapper().writeValueAsString(r))
|
||||
.saveAsTextFile(outputPath + "/toupdaterelations");
|
||||
|
||||
new_relations.map(r-> new ObjectMapper().writeValueAsString(r))
|
||||
.saveAsTextFile(outputPath + "/new_relations" );
|
||||
|
||||
all_relations.union(new_relations)
|
||||
.map(r -> new ObjectMapper().writeValueAsString(r))
|
||||
.saveAsTextFile(outputPath + "/relation");
|
||||
|
||||
|
||||
//JavaPairRDD<String, TypedRow> result_result = getResultResultSemRel(allowedsemrel, relations);
|
||||
|
||||
// JavaPairRDD<String, TypedRow> result_project = relations
|
||||
// .filter(r -> !r.getDataInfo().getDeletedbyinference())
|
||||
// .filter(r -> RELATION_RESULT_PROJECT_REL_CLASS.equals(r.getRelClass())
|
||||
// && RELATION_RESULTPROJECT_REL_TYPE.equals(r.getRelType()))
|
||||
// .map(rel ->{
|
||||
//
|
||||
// TypedRow tr = new TypedRow();
|
||||
// tr.setSourceId(rel.getSource());
|
||||
// tr.setTargetId(rel.getTarget());
|
||||
// return tr;
|
||||
// })
|
||||
// .mapToPair(toPair());
|
||||
//
|
||||
// //relationships from project to result. One pair for each relationship for results having allowed semantics relation with another result
|
||||
// JavaPairRDD<String, TypedRow> project_result = result_project.join(result_result)
|
||||
// .map(c -> {
|
||||
// String projectId = c._2()._1().getTargetId();
|
||||
// String resultId = c._2()._2().getTargetId();
|
||||
// TypedRow tr = new TypedRow(); tr.setSourceId(projectId); tr.setTargetId(resultId);
|
||||
// return tr;
|
||||
// })
|
||||
// .mapToPair(toPair());
|
||||
//
|
||||
// //relationships from project to result. One Pair for each project => project id list of results related to the project
|
||||
// JavaPairRDD<String, TypedRow> project_results = relations
|
||||
// .filter(r -> !r.getDataInfo().getDeletedbyinference())
|
||||
// .filter(r -> RELATION_PROJECT_RESULT_REL_CLASS.equals(r.getRelClass()) && RELATION_RESULTPROJECT_REL_TYPE.equals(r.getRelType()))
|
||||
// .map(r -> {
|
||||
// TypedRow tr = new TypedRow(); tr.setSourceId(r.getSource()); tr.setTargetId(r.getTarget());
|
||||
// return tr;
|
||||
// })
|
||||
// .mapToPair(toPair())
|
||||
// .reduceByKey((a, b) -> {
|
||||
// if (a == null) {
|
||||
// return b;
|
||||
// }
|
||||
// if (b == null) {
|
||||
// return a;
|
||||
// }
|
||||
// a.addAll(b.getAccumulator());
|
||||
// return a;
|
||||
// });
|
||||
//
|
||||
//
|
||||
//
|
||||
// JavaRDD<Relation> newRels = project_result.join(project_results)
|
||||
// .flatMap(c -> {
|
||||
// String resId = c._2()._1().getTargetId();
|
||||
//
|
||||
// if (c._2()._2().getAccumulator().contains(resId)) {
|
||||
// return null;
|
||||
// }
|
||||
// String progId = c._2()._1().getSourceId();
|
||||
// List<Relation> rels = new ArrayList();
|
||||
//
|
||||
// rels.add(getRelation(progId, resId, RELATION_PROJECT_RESULT_REL_CLASS,
|
||||
// RELATION_RESULTPROJECT_REL_TYPE, RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE,
|
||||
// PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME));
|
||||
// rels.add(getRelation(resId, progId, RELATION_RESULT_PROJECT_REL_CLASS,
|
||||
// RELATION_RESULTPROJECT_REL_TYPE, RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE,
|
||||
// PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME));
|
||||
// return rels.iterator();
|
||||
// })
|
||||
// .cache();
|
||||
//
|
||||
// newRels.map(p -> new ObjectMapper().writeValueAsString(p))
|
||||
// .saveAsTextFile(outputPath + "/relation_new");
|
||||
//
|
||||
// newRels.union(relations).map(p -> new ObjectMapper().writeValueAsString(p))
|
||||
// .saveAsTextFile(outputPath + "/relation");
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -28,7 +28,12 @@ public class SparkResultToCommunityThroughSemRelJob {
|
|||
.toString(SparkResultToCommunityThroughSemRelJob.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_propagationresultcommunityfromsemrel_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
for(String key : parser.getObjectMap().keySet()){
|
||||
System.out.println(key + " = " + parser.get(key));
|
||||
}
|
||||
|
||||
/* SparkConf conf = new SparkConf();
|
||||
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
|
||||
final SparkSession spark = SparkSession
|
||||
.builder()
|
||||
|
@ -152,10 +157,10 @@ public class SparkResultToCommunityThroughSemRelJob {
|
|||
updateForOtherDataset(toupdateotherresult.toJavaRDD(), other.toJavaRDD(), outputPath, "otherresearchproduct",
|
||||
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||
|
||||
updateForDatasetDataset(toupdatesoftwareresult.toJavaRDD(), dataset.toJavaRDD(), outputPath, "software",
|
||||
updateForSoftwareDataset(toupdatesoftwareresult.toJavaRDD(), software.toJavaRDD(), outputPath, "software",
|
||||
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||
|
||||
updateForOtherDataset(toupdatepublicationreresult.toJavaRDD(), other.toJavaRDD(), outputPath, "publication",
|
||||
updateForPublicationDataset(toupdatepublicationreresult.toJavaRDD(), publication.toJavaRDD(), outputPath, "publication",
|
||||
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||
|
||||
|
||||
|
@ -197,7 +202,7 @@ public class SparkResultToCommunityThroughSemRelJob {
|
|||
*/
|
||||
}
|
||||
|
||||
private static org.apache.spark.sql.Dataset<Row> getUpdateCommunitiesForTable(SparkSession spark, String table){
|
||||
/* private static org.apache.spark.sql.Dataset<Row> getUpdateCommunitiesForTable(SparkSession spark, String table){
|
||||
String query = "SELECT target_id, collect_set(co.id) context_id " +
|
||||
" FROM (SELECT t.id target_id, s.context source_context " +
|
||||
" FROM context_software s " +
|
||||
|
@ -479,5 +484,5 @@ public class SparkResultToCommunityThroughSemRelJob {
|
|||
return tp;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}*/
|
||||
}
|
|
@ -0,0 +1,495 @@
|
|||
package eu.dnetlib.dhp.resulttocommunityfromsemrel;
|
||||
|
||||
import com.cloudera.com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.dhp.QueryInformationSystem;
|
||||
import eu.dnetlib.dhp.TypedRow;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||
|
||||
public class SparkResultToCommunityThroughSemRelJob {
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils
|
||||
.toString(SparkResultToCommunityThroughSemRelJob.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_propagationresultcommunityfromsemrel_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
for(String key : parser.getObjectMap().keySet()){
|
||||
System.out.println(key + " = " + parser.get(key));
|
||||
}
|
||||
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
|
||||
final SparkSession spark = SparkSession
|
||||
.builder()
|
||||
.appName(SparkResultToCommunityThroughSemRelJob.class.getSimpleName())
|
||||
.master(parser.get("master"))
|
||||
.config(conf)
|
||||
.enableHiveSupport()
|
||||
.getOrCreate();
|
||||
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
final String inputPath = parser.get("sourcePath");
|
||||
final String outputPath = "/tmp/provision/propagation/resulttocommunityfromsemrel";
|
||||
|
||||
//final List<String> allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";"));
|
||||
final List<String> allowedsemrel = Arrays.asList("isSupplementedBy", "isSupplementTo");
|
||||
//final List<String> communityIdList = QueryInformationSystem.getCommunityList(parser.get("isLookupUrl"));
|
||||
final List<String> communityIdList = QueryInformationSystem.getCommunityList("http://beta.services.openaire.eu:8280/is/services/isLookUp");
|
||||
|
||||
createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration()));
|
||||
|
||||
|
||||
JavaRDD<Publication> all_publication_rdd = sc.textFile(inputPath + "/publication")
|
||||
.map(item -> new ObjectMapper().readValue(item, Publication.class))
|
||||
.filter(p -> !p.getDataInfo().getDeletedbyinference()).cache();
|
||||
JavaRDD<Publication> publication_rdd = all_publication_rdd
|
||||
.filter(p -> relatedToCommunities(p, communityIdList)).cache();
|
||||
|
||||
JavaRDD<Dataset> all_dataset_rdd = sc.textFile(inputPath + "/dataset")
|
||||
.map(item -> new ObjectMapper().readValue(item, Dataset.class))
|
||||
.filter(p -> !p.getDataInfo().getDeletedbyinference()).cache();
|
||||
JavaRDD<Dataset> dataset_rdd = all_dataset_rdd
|
||||
.filter(p -> relatedToCommunities(p, communityIdList)).cache();
|
||||
|
||||
JavaRDD<OtherResearchProduct> all_orp_rdd = sc.textFile(inputPath + "/otherresearchproduct")
|
||||
.map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class))
|
||||
.filter(p -> !p.getDataInfo().getDeletedbyinference()).cache();
|
||||
JavaRDD<OtherResearchProduct> orp_rdd = all_orp_rdd.filter(p -> relatedToCommunities(p, communityIdList)).cache();
|
||||
|
||||
JavaRDD<Software> all_software_rdd = sc.textFile(inputPath + "/software")
|
||||
.map(item -> new ObjectMapper().readValue(item, Software.class))
|
||||
.filter(p -> !p.getDataInfo().getDeletedbyinference()).cache();
|
||||
JavaRDD<Software> software_rdd = all_software_rdd.filter(p -> relatedToCommunities(p, communityIdList)).cache();
|
||||
|
||||
JavaRDD<Relation> relation_rdd = sc.textFile(inputPath + "/relation")
|
||||
.map(item -> new ObjectMapper().readValue(item, Relation.class))
|
||||
.filter(r -> !r.getDataInfo().getDeletedbyinference())
|
||||
.filter(r -> allowedsemrel.contains(r.getRelClass()) && RELATION_RESULTRESULT_REL_TYPE.equals(r.getRelType())).cache();
|
||||
|
||||
|
||||
org.apache.spark.sql.Dataset<Publication> publication = spark.createDataset(publication_rdd.rdd(),
|
||||
Encoders.bean(Publication.class));
|
||||
|
||||
org.apache.spark.sql.Dataset<Dataset> dataset = spark.createDataset(dataset_rdd.rdd(),
|
||||
Encoders.bean(Dataset.class));
|
||||
|
||||
org.apache.spark.sql.Dataset<OtherResearchProduct> other = spark.createDataset(orp_rdd.rdd(),
|
||||
Encoders.bean(OtherResearchProduct.class));
|
||||
|
||||
org.apache.spark.sql.Dataset<Software> software = spark.createDataset(software_rdd.rdd(),
|
||||
Encoders.bean(Software.class));
|
||||
|
||||
org.apache.spark.sql.Dataset<Relation> relation = spark.createDataset(relation_rdd.rdd(),
|
||||
Encoders.bean(Relation.class));
|
||||
|
||||
publication.createOrReplaceTempView("publication");
|
||||
relation.createOrReplaceTempView("relation");
|
||||
dataset.createOrReplaceTempView("dataset");
|
||||
software.createOrReplaceTempView("software");
|
||||
other.createOrReplaceTempView("other");
|
||||
|
||||
// org.apache.spark.sql.Dataset<Row> publication_context = getContext(spark, "publication");
|
||||
// publication_context.createOrReplaceTempView("publication_context");
|
||||
|
||||
org.apache.spark.sql.Dataset<Row> publication_context = spark.sql( "SELECT relation.source, " +
|
||||
"publication.context , relation.target " +
|
||||
"FROM publication " +
|
||||
" JOIN relation " +
|
||||
"ON id = source");
|
||||
|
||||
org.apache.spark.sql.Dataset<Row> dataset_context = getContext(spark, "dataset");
|
||||
dataset_context.createOrReplaceTempView("dataset_context");
|
||||
|
||||
org.apache.spark.sql.Dataset<Row> software_context = getContext(spark, "software");
|
||||
software_context.createOrReplaceTempView("software_context");
|
||||
|
||||
org.apache.spark.sql.Dataset<Row> other_context = getContext(spark, "other");
|
||||
other_context.createOrReplaceTempView("other_context");
|
||||
|
||||
publication = spark.createDataset(all_publication_rdd.rdd(),
|
||||
Encoders.bean(Publication.class));
|
||||
publication.createOrReplaceTempView("publication");
|
||||
|
||||
dataset = spark.createDataset(all_dataset_rdd.rdd(),
|
||||
Encoders.bean(Dataset.class));
|
||||
dataset.createOrReplaceTempView("dataset");
|
||||
|
||||
other = spark.createDataset(all_orp_rdd.rdd(),
|
||||
Encoders.bean(OtherResearchProduct.class));
|
||||
other.createOrReplaceTempView("other");
|
||||
|
||||
software = spark.createDataset(all_software_rdd.rdd(),
|
||||
Encoders.bean(Software.class));
|
||||
software.createOrReplaceTempView("software");
|
||||
|
||||
|
||||
org.apache.spark.sql.Dataset<Row> toupdatesoftwareresult = getUpdateCommunitiesForTable(spark, "software");
|
||||
org.apache.spark.sql.Dataset<Row> toupdatedatasetresult = getUpdateCommunitiesForTable(spark, "dataset");
|
||||
org.apache.spark.sql.Dataset<Row> toupdatepublicationreresult = getUpdateCommunitiesForTable(spark, "publication");
|
||||
org.apache.spark.sql.Dataset<Row> toupdateotherresult = getUpdateCommunitiesForTable(spark, "other");
|
||||
|
||||
createUpdateForResultDatasetWrite(toupdatesoftwareresult.toJavaRDD(), outputPath, "software_update",
|
||||
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||
|
||||
createUpdateForResultDatasetWrite(toupdatedatasetresult.toJavaRDD(), outputPath, "dataset_update",
|
||||
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||
|
||||
createUpdateForResultDatasetWrite(toupdatepublicationreresult.toJavaRDD(), outputPath, "publication_update",
|
||||
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||
|
||||
createUpdateForResultDatasetWrite(toupdateotherresult.toJavaRDD(), outputPath, "other_update",
|
||||
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||
|
||||
|
||||
updateForDatasetDataset(toupdatedatasetresult.toJavaRDD(), dataset.toJavaRDD(), outputPath, "dataset",
|
||||
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||
|
||||
updateForOtherDataset(toupdateotherresult.toJavaRDD(), other.toJavaRDD(), outputPath, "otherresearchproduct",
|
||||
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||
|
||||
updateForSoftwareDataset(toupdatesoftwareresult.toJavaRDD(), software.toJavaRDD(), outputPath, "software",
|
||||
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||
|
||||
updateForPublicationDataset(toupdatepublicationreresult.toJavaRDD(), publication.toJavaRDD(), outputPath, "publication",
|
||||
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||
|
||||
|
||||
/*
|
||||
JavaPairRDD<String, TypedRow> resultLinkedToCommunities = publication
|
||||
.map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"publication"))
|
||||
.filter(p -> !(p == null))
|
||||
.mapToPair(toPair())
|
||||
.union(datasets
|
||||
.map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"dataset"))
|
||||
.filter(p -> !(p == null))
|
||||
.mapToPair(toPair())
|
||||
)
|
||||
.union(software
|
||||
.map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"software"))
|
||||
.filter(p -> !(p == null))
|
||||
.mapToPair(toPair())
|
||||
)
|
||||
.union(other
|
||||
.map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"otherresearchproduct"))
|
||||
.filter(p -> !(p == null))
|
||||
.mapToPair(toPair())
|
||||
);
|
||||
|
||||
JavaPairRDD<String, TypedRow> to_add_result_communities = resultLinkedToCommunities.join(result_result).map(r -> r._2()._1().setSourceId(r._2()._2().getTargetId()))
|
||||
.mapToPair(toPair());
|
||||
|
||||
JavaPairRDD<String, Result> pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p));
|
||||
JavaPairRDD<String, Result> dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p));
|
||||
JavaPairRDD<String, Result> sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p));
|
||||
JavaPairRDD<String, Result> orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p));
|
||||
|
||||
updateResultForCommunity(pubs, to_add_result_communities, outputPath, "publication", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME);
|
||||
updateResultForCommunity(dss, to_add_result_communities, outputPath, "dataset", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME);
|
||||
updateResultForCommunity(sfw, to_add_result_communities, outputPath, "software", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME);
|
||||
updateResultForCommunity(orp, to_add_result_communities, outputPath, "otherresearchproduct", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME);
|
||||
//leftouterjoin result.to_add_result_communities (result = java pair rdd result) [left outer join perche' li voglio tutti anche quelli che non ho aggiornato]
|
||||
//per quelli che matchano cercare nel risultato se i context da aggiungere sono gia' presenti. Se non ci sono aggiungerli, altrimenti nulla
|
||||
*/
|
||||
}
|
||||
|
||||
private static org.apache.spark.sql.Dataset<Row> getUpdateCommunitiesForTable(SparkSession spark, String table){
|
||||
String query = "SELECT target_id, collect_set(co.id) context_id " +
|
||||
" FROM (SELECT t.id target_id, s.context source_context " +
|
||||
" FROM context_software s " +
|
||||
" JOIN " + table + " t " +
|
||||
" ON s.target = t.id " +
|
||||
" UNION ALL " +
|
||||
" SELECT t.id target_id, d.context source_context " +
|
||||
" FROM dataset_context d " +
|
||||
" JOIN " + table + " t" +
|
||||
" ON s.target = t.id " +
|
||||
" UNION ALL " +
|
||||
" SELECT t.id target_id, p.context source_context " +
|
||||
" FROM publication_context p" +
|
||||
" JOIN " + table +" t " +
|
||||
" on p.target = t.id " +
|
||||
" UNION ALL " +
|
||||
" SELECT t.id target_id, o.context source_context " +
|
||||
" FROM other_context o " +
|
||||
" JOIN " + table + " t " +
|
||||
" ON o.target = t.id) TMP " +
|
||||
" LATERAL VIEW EXPLODE(source_context) MyT as co " +
|
||||
" GROUP BY target_id" ;
|
||||
|
||||
return spark.sql(query);
|
||||
}
|
||||
|
||||
private static JavaRDD<Result> createUpdateForResultDatasetWrite(JavaRDD<Row> toupdateresult, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){
|
||||
return toupdateresult.map(r -> {
|
||||
List<Context> contextList = new ArrayList();
|
||||
List<String> toAddContext = r.getList(1);
|
||||
for (String cId : toAddContext) {
|
||||
if (communityIdList.contains(cId)) {
|
||||
Context newContext = new Context();
|
||||
newContext.setId(cId);
|
||||
newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name)));
|
||||
contextList.add(newContext);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if (contextList.size() > 0) {
|
||||
Result ret = new Result();
|
||||
ret.setId(r.getString(0));
|
||||
ret.setContext(contextList);
|
||||
return ret;
|
||||
}
|
||||
return null;
|
||||
}).filter(r -> r != null);
|
||||
}
|
||||
|
||||
private static void updateForSoftwareDataset(JavaRDD<Row> toupdateresult, JavaRDD<Software> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){
|
||||
JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
|
||||
getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList)
|
||||
.map(r -> (Software) r)
|
||||
.map(s -> new ObjectMapper().writeValueAsString(s))
|
||||
.saveAsTextFile(outputPath + "/" + type);
|
||||
}
|
||||
|
||||
private static void updateForDatasetDataset(JavaRDD<Row> toupdateresult, JavaRDD<Dataset> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){
|
||||
JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
|
||||
getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList)
|
||||
.map( r-> (Dataset)r)
|
||||
.map(d -> new ObjectMapper().writeValueAsString(d))
|
||||
.saveAsTextFile(outputPath + "/" + type);
|
||||
}
|
||||
|
||||
private static void updateForPublicationDataset(JavaRDD<Row> toupdateresult, JavaRDD<Publication> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){
|
||||
JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
|
||||
getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList)
|
||||
.map(r -> (Publication)r)
|
||||
.map(p -> new ObjectMapper().writeValueAsString(p))
|
||||
.saveAsTextFile(outputPath + "/" + type);
|
||||
}
|
||||
|
||||
private static void updateForOtherDataset(JavaRDD<Row> toupdateresult, JavaRDD<OtherResearchProduct> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){
|
||||
JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
|
||||
getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList)
|
||||
.map( r -> (OtherResearchProduct)r)
|
||||
.map( o -> new ObjectMapper().writeValueAsString(o))
|
||||
.saveAsTextFile(outputPath + "/" + type);
|
||||
}
|
||||
|
||||
|
||||
|
||||
private static JavaRDD<Result> getUpdateForResultDataset(JavaRDD<Row> toupdateresult, JavaPairRDD<String, Result> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){
|
||||
return result.leftOuterJoin(toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1))))
|
||||
.map(c -> {
|
||||
if(! c._2()._2().isPresent()){
|
||||
return c._2()._1();
|
||||
}
|
||||
|
||||
List<Object> toAddContext = c._2()._2().get();
|
||||
Set<String> context_set = new HashSet<>();
|
||||
for(Object cId: toAddContext){
|
||||
String id = (String)cId;
|
||||
if (communityIdList.contains(id)){
|
||||
context_set.add(id);
|
||||
}
|
||||
}
|
||||
for (Context context: c._2()._1().getContext()){
|
||||
if(context_set.contains(context)){
|
||||
context_set.remove(context);
|
||||
}
|
||||
}
|
||||
|
||||
List<Context> contextList = context_set.stream().map(co -> {
|
||||
Context newContext = new Context();
|
||||
newContext.setId(co);
|
||||
newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name)));
|
||||
return newContext;
|
||||
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
if(contextList.size() > 0 ){
|
||||
Result r = new Result();
|
||||
r.setId(c._1());
|
||||
r.setContext(contextList);
|
||||
return r;
|
||||
}
|
||||
return null;
|
||||
}).filter(r -> r != null);
|
||||
|
||||
|
||||
// return toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1)))
|
||||
// .join(result)
|
||||
// .map(c -> {
|
||||
// List<Object> toAddContext = c._2()._1();
|
||||
// Set<String> context_set = new HashSet<>();
|
||||
// for(Object cId: toAddContext){
|
||||
// String id = (String)cId;
|
||||
// if (communityIdList.contains(id)){
|
||||
// context_set.add(id);
|
||||
// }
|
||||
// }
|
||||
// for (Context context: c._2()._2().getContext()){
|
||||
// if(context_set.contains(context)){
|
||||
// context_set.remove(context);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// List<Context> contextList = context_set.stream().map(co -> {
|
||||
// Context newContext = new Context();
|
||||
// newContext.setId(co);
|
||||
// newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name)));
|
||||
// return newContext;
|
||||
//
|
||||
// }).collect(Collectors.toList());
|
||||
//
|
||||
// if(contextList.size() > 0 ){
|
||||
// Result r = new Result();
|
||||
// r.setId(c._1());
|
||||
// r.setContext(contextList);
|
||||
// return r;
|
||||
// }
|
||||
// return null;
|
||||
// })
|
||||
// .filter(r -> r != null);
|
||||
}
|
||||
|
||||
private static JavaRDD<Software> createUpdateForSoftwareDataset(JavaRDD<Row> toupdateresult, List<String> communityList,
|
||||
JavaRDD<Software> result, String class_id, String class_name) {
|
||||
return result
|
||||
.mapToPair(s -> new Tuple2<>(s.getId(), s)).leftOuterJoin(getStringResultJavaPairRDD(toupdateresult, communityList))
|
||||
.map(c -> {
|
||||
Software oaf = c._2()._1();
|
||||
if (c._2()._2().isPresent()) {
|
||||
|
||||
HashSet<String> contexts = new HashSet<>(c._2()._2().get());
|
||||
|
||||
for (Context context : oaf.getContext()) {
|
||||
if (contexts.contains(context.getId())){
|
||||
if (!context.getDataInfo().stream().map(di -> di.getInferenceprovenance())
|
||||
.collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){
|
||||
context.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name));
|
||||
//community id already in the context of the result. Remove it from the set that has to be added
|
||||
contexts.remove(context.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
List<Context> cc = oaf.getContext();
|
||||
for(String cId: contexts){
|
||||
Context context = new Context();
|
||||
context.setId(cId);
|
||||
context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name)));
|
||||
cc.add(context);
|
||||
}
|
||||
oaf.setContext(cc);
|
||||
|
||||
}
|
||||
return oaf;
|
||||
});
|
||||
}
|
||||
|
||||
private static JavaPairRDD<String, List<String>> getStringResultJavaPairRDD(JavaRDD<Row> toupdateresult, List<String> communityList) {
|
||||
return toupdateresult.mapToPair(c -> {
|
||||
|
||||
List<String> contextList = new ArrayList<>();
|
||||
List<String> contexts = c.getList(1);
|
||||
for (String context : contexts) {
|
||||
if (communityList.contains(context)) {
|
||||
contextList.add(context);
|
||||
}
|
||||
}
|
||||
|
||||
return new Tuple2<>(c.getString(0) ,contextList);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
private static org.apache.spark.sql.Dataset<Row> getContext(SparkSession spark, String table){
|
||||
String query = "SELECT relation.source, " + table +".context , relation.target " +
|
||||
"FROM " + table +
|
||||
" JOIN relation " +
|
||||
"ON id = source" ;
|
||||
|
||||
return spark.sql(query);
|
||||
}
|
||||
|
||||
private static Boolean relatedToCommunities(Result r, List<String> communityIdList) {
|
||||
Set<String> result_communities = r.getContext()
|
||||
.stream()
|
||||
.map(c -> c.getId())
|
||||
.collect(Collectors.toSet());
|
||||
for (String communityId : result_communities) {
|
||||
if (communityIdList.contains(communityId)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private static void updateResult(JavaPairRDD<String, Result> results, JavaPairRDD<String, TypedRow> toupdateresult, String outputPath, String type) {
|
||||
results.leftOuterJoin(toupdateresult)
|
||||
.map(p -> {
|
||||
Result r = p._2()._1();
|
||||
if (p._2()._2().isPresent()){
|
||||
Set<String> communityList = p._2()._2().get().getAccumulator();
|
||||
for(Context c: r.getContext()){
|
||||
if (communityList.contains(c.getId())){
|
||||
//verify if the datainfo for this context contains propagation
|
||||
if (!c.getDataInfo().stream().map(di -> di.getInferenceprovenance()).collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){
|
||||
c.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME));
|
||||
//community id already in the context of the result. Remove it from the set that has to be added
|
||||
communityList.remove(c.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
List<Context> cc = r.getContext();
|
||||
for(String cId: communityList){
|
||||
Context context = new Context();
|
||||
context.setId(cId);
|
||||
context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME)));
|
||||
cc.add(context);
|
||||
}
|
||||
r.setContext(cc);
|
||||
}
|
||||
return r;
|
||||
})
|
||||
.map(p -> new ObjectMapper().writeValueAsString(p))
|
||||
.saveAsTextFile(outputPath+"/"+type);
|
||||
}
|
||||
|
||||
|
||||
|
||||
private static TypedRow getTypedRow(List<String> communityIdList, List<Context> context, String id, String type) {
|
||||
Set<String> result_communities = context
|
||||
.stream()
|
||||
.map(c -> c.getId())
|
||||
.collect(Collectors.toSet());
|
||||
TypedRow tp = new TypedRow();
|
||||
tp.setSourceId(id);
|
||||
tp.setType(type);
|
||||
for (String communityId : result_communities) {
|
||||
if (communityIdList.contains(communityId)) {
|
||||
tp.add(communityId);
|
||||
}
|
||||
}
|
||||
if (tp.getAccumulator() != null) {
|
||||
return tp;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,484 @@
|
|||
package eu.dnetlib.dhp.resulttocommunityfromsemrel;
|
||||
|
||||
import com.cloudera.com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.dhp.QueryInformationSystem;
|
||||
import eu.dnetlib.dhp.TypedRow;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||
|
||||
public class SparkResultToCommunityThroughSemRelJob2 {
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils
|
||||
.toString(SparkResultToCommunityThroughSemRelJob2.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_propagationresultcommunityfromsemrel_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
|
||||
final SparkSession spark = SparkSession
|
||||
.builder()
|
||||
.appName(SparkResultToCommunityThroughSemRelJob2.class.getSimpleName())
|
||||
.master(parser.get("master"))
|
||||
.config(conf)
|
||||
.enableHiveSupport()
|
||||
.getOrCreate();
|
||||
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
final String inputPath = parser.get("sourcePath");
|
||||
final String outputPath = "/tmp/provision/propagation/resulttocommunityfromsemrel";
|
||||
|
||||
final List<String> allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";"));
|
||||
//final List<String> allowedsemrel = Arrays.asList("isSupplementedBy", "isSupplementTo");
|
||||
final List<String> communityIdList = QueryInformationSystem.getCommunityList(parser.get("isLookupUrl"));
|
||||
//final List<String> communityIdList = QueryInformationSystem.getCommunityList("http://beta.services.openaire.eu:8280/is/services/isLookUp");
|
||||
|
||||
createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration()));
|
||||
|
||||
|
||||
JavaRDD<Publication> publication_rdd = sc.textFile(inputPath + "/publication")
|
||||
.map(item -> new ObjectMapper().readValue(item, Publication.class));
|
||||
|
||||
// JavaRDD<Dataset> dataset_rdd = sc.textFile(inputPath + "/dataset")
|
||||
// .map(item -> new ObjectMapper().readValue(item, Dataset.class));
|
||||
//
|
||||
// JavaRDD<OtherResearchProduct> orp_rdd = sc.textFile(inputPath + "/otherresearchproduct")
|
||||
// .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class));
|
||||
//
|
||||
// JavaRDD<Software> software_rdd = sc.textFile(inputPath + "/software")
|
||||
// .map(item -> new ObjectMapper().readValue(item, Software.class));
|
||||
|
||||
JavaRDD<Relation> relation_rdd = sc.textFile(inputPath + "/relation")
|
||||
.map(item -> new ObjectMapper().readValue(item, Relation.class));
|
||||
|
||||
// .filter(r -> !r.getDataInfo().getDeletedbyinference())
|
||||
// .filter(r -> allowedsemrel.contains(r.getRelClass()) && RELATION_RESULTRESULT_REL_TYPE.equals(r.getRelType())).cache();
|
||||
|
||||
|
||||
org.apache.spark.sql.Dataset<Publication> publication = spark.createDataset(publication_rdd.rdd(),
|
||||
Encoders.bean(Publication.class));
|
||||
|
||||
org.apache.spark.sql.Dataset<Relation> relation = spark.createDataset(relation_rdd.rdd(),
|
||||
Encoders.bean(Relation.class));
|
||||
|
||||
// org.apache.spark.sql.Dataset<Dataset> dataset = spark.createDataset(dataset_rdd.rdd(),
|
||||
// Encoders.bean(Dataset.class));
|
||||
//
|
||||
// org.apache.spark.sql.Dataset<OtherResearchProduct> other = spark.createDataset(orp_rdd.rdd(),
|
||||
// Encoders.bean(OtherResearchProduct.class));
|
||||
//
|
||||
// org.apache.spark.sql.Dataset<Software> software = spark.createDataset(software_rdd.rdd(),
|
||||
// Encoders.bean(Software.class));
|
||||
//
|
||||
// org.apache.spark.sql.Dataset<Relation> relation = spark.createDataset(relation_rdd.rdd(),
|
||||
// Encoders.bean(Relation.class));
|
||||
|
||||
publication.createOrReplaceTempView("publication");
|
||||
relation.createOrReplaceTempView("relation");
|
||||
// relation.createOrReplaceTempView("relation");
|
||||
// dataset.createOrReplaceTempView("dataset");
|
||||
// software.createOrReplaceTempView("software");
|
||||
// other.createOrReplaceTempView("other");
|
||||
|
||||
String communitylist = getConstraintList(" co.id = '", communityIdList);
|
||||
|
||||
String semrellist = getConstraintList(" relClass = '", allowedsemrel );
|
||||
|
||||
|
||||
String query = "Select source, community_context, target " +
|
||||
"from (select id, collect_set(co.id) community_context " +
|
||||
"from publication " +
|
||||
"lateral view explode (context) c as co " +
|
||||
"where datainfo.deletedbyinference = false "+ communitylist +
|
||||
" group by id) p " +
|
||||
"JOIN " +
|
||||
"(select * " +
|
||||
"from relation " +
|
||||
"where datainfo.deletedbyinference = false and (relClass = 'isSupplementedBy' OR relClass = 'isSupplementTo')) r " +
|
||||
"ON p.id = r.source";
|
||||
|
||||
|
||||
org.apache.spark.sql.Dataset<Row> publication_context = spark.sql( query);
|
||||
publication_context.createOrReplaceTempView("publication_context");
|
||||
|
||||
//( source, (mes, dh-ch-, ni), target )
|
||||
query = "select target , collect_set(co) " +
|
||||
"from (select target, community_context " +
|
||||
"from publication_context pc join publication p on " +
|
||||
"p.id = pc.source) tmp " +
|
||||
"lateral view explode (community_context) c as co " +
|
||||
"group by target";
|
||||
|
||||
|
||||
|
||||
org.apache.spark.sql.Dataset<Row> toupdatepublicationreresult = spark.sql(query);
|
||||
|
||||
|
||||
// org.apache.spark.sql.Dataset<Row> toupdatesoftwareresult = getUpdateCommunitiesForTable(spark, "software");
|
||||
// org.apache.spark.sql.Dataset<Row> toupdatedatasetresult = getUpdateCommunitiesForTable(spark, "dataset");
|
||||
// org.apache.spark.sql.Dataset<Row> toupdatepublicationreresult = getUpdateCommunitiesForTable(spark, "publication");
|
||||
// org.apache.spark.sql.Dataset<Row> toupdateotherresult = getUpdateCommunitiesForTable(spark, "other");
|
||||
|
||||
// createUpdateForResultDatasetWrite(toupdatesoftwareresult.toJavaRDD(), outputPath, "software_update",
|
||||
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||
//
|
||||
// createUpdateForResultDatasetWrite(toupdatedatasetresult.toJavaRDD(), outputPath, "dataset_update",
|
||||
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||
|
||||
createUpdateForResultDatasetWrite(toupdatepublicationreresult.toJavaRDD(), outputPath, "publication_update",
|
||||
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||
|
||||
// createUpdateForResultDatasetWrite(toupdateotherresult.toJavaRDD(), outputPath, "other_update",
|
||||
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||
//
|
||||
//
|
||||
// updateForDatasetDataset(toupdatedatasetresult.toJavaRDD(), dataset.toJavaRDD(), outputPath, "dataset",
|
||||
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||
//
|
||||
// updateForOtherDataset(toupdateotherresult.toJavaRDD(), other.toJavaRDD(), outputPath, "otherresearchproduct",
|
||||
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||
//
|
||||
// updateForSoftwareDataset(toupdatesoftwareresult.toJavaRDD(), software.toJavaRDD(), outputPath, "software",
|
||||
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||
//
|
||||
// updateForPublicationDataset(toupdatepublicationreresult.toJavaRDD(), publication.toJavaRDD(), outputPath, "publication",
|
||||
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||
//
|
||||
|
||||
/*
|
||||
JavaPairRDD<String, TypedRow> resultLinkedToCommunities = publication
|
||||
.map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"publication"))
|
||||
.filter(p -> !(p == null))
|
||||
.mapToPair(toPair())
|
||||
.union(datasets
|
||||
.map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"dataset"))
|
||||
.filter(p -> !(p == null))
|
||||
.mapToPair(toPair())
|
||||
)
|
||||
.union(software
|
||||
.map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"software"))
|
||||
.filter(p -> !(p == null))
|
||||
.mapToPair(toPair())
|
||||
)
|
||||
.union(other
|
||||
.map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"otherresearchproduct"))
|
||||
.filter(p -> !(p == null))
|
||||
.mapToPair(toPair())
|
||||
);
|
||||
|
||||
JavaPairRDD<String, TypedRow> to_add_result_communities = resultLinkedToCommunities.join(result_result).map(r -> r._2()._1().setSourceId(r._2()._2().getTargetId()))
|
||||
.mapToPair(toPair());
|
||||
|
||||
JavaPairRDD<String, Result> pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p));
|
||||
JavaPairRDD<String, Result> dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p));
|
||||
JavaPairRDD<String, Result> sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p));
|
||||
JavaPairRDD<String, Result> orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p));
|
||||
|
||||
updateResultForCommunity(pubs, to_add_result_communities, outputPath, "publication", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME);
|
||||
updateResultForCommunity(dss, to_add_result_communities, outputPath, "dataset", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME);
|
||||
updateResultForCommunity(sfw, to_add_result_communities, outputPath, "software", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME);
|
||||
updateResultForCommunity(orp, to_add_result_communities, outputPath, "otherresearchproduct", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME);
|
||||
//leftouterjoin result.to_add_result_communities (result = java pair rdd result) [left outer join perche' li voglio tutti anche quelli che non ho aggiornato]
|
||||
//per quelli che matchano cercare nel risultato se i context da aggiungere sono gia' presenti. Se non ci sono aggiungerli, altrimenti nulla
|
||||
*/
|
||||
}
|
||||
|
||||
private static org.apache.spark.sql.Dataset<Row> getUpdateCommunitiesForTable(SparkSession spark, String table){
|
||||
String query = "SELECT target_id, collect_set(co.id) context_id " +
|
||||
" FROM (SELECT t.id target_id, s.context source_context " +
|
||||
" FROM context_software s " +
|
||||
" JOIN " + table + " t " +
|
||||
" ON s.target = t.id " +
|
||||
" UNION ALL " +
|
||||
" SELECT t.id target_id, d.context source_context " +
|
||||
" FROM dataset_context d " +
|
||||
" JOIN " + table + " t" +
|
||||
" ON s.target = t.id " +
|
||||
" UNION ALL " +
|
||||
" SELECT t.id target_id, p.context source_context " +
|
||||
" FROM publication_context p" +
|
||||
" JOIN " + table +" t " +
|
||||
" on p.target = t.id " +
|
||||
" UNION ALL " +
|
||||
" SELECT t.id target_id, o.context source_context " +
|
||||
" FROM other_context o " +
|
||||
" JOIN " + table + " t " +
|
||||
" ON o.target = t.id) TMP " +
|
||||
" LATERAL VIEW EXPLODE(source_context) MyT as co " +
|
||||
" GROUP BY target_id" ;
|
||||
|
||||
return spark.sql(query);
|
||||
}
|
||||
|
||||
private static JavaRDD<Result> createUpdateForResultDatasetWrite(JavaRDD<Row> toupdateresult, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){
|
||||
return toupdateresult.map(r -> {
|
||||
List<Context> contextList = new ArrayList();
|
||||
List<String> toAddContext = r.getList(1);
|
||||
for (String cId : toAddContext) {
|
||||
if (communityIdList.contains(cId)) {
|
||||
Context newContext = new Context();
|
||||
newContext.setId(cId);
|
||||
newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name)));
|
||||
contextList.add(newContext);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if (contextList.size() > 0) {
|
||||
Result ret = new Result();
|
||||
ret.setId(r.getString(0));
|
||||
ret.setContext(contextList);
|
||||
return ret;
|
||||
}
|
||||
return null;
|
||||
}).filter(r -> r != null);
|
||||
}
|
||||
|
||||
private static void updateForSoftwareDataset(JavaRDD<Row> toupdateresult, JavaRDD<Software> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){
|
||||
JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
|
||||
getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList)
|
||||
.map(r -> (Software) r)
|
||||
.map(s -> new ObjectMapper().writeValueAsString(s))
|
||||
.saveAsTextFile(outputPath + "/" + type);
|
||||
}
|
||||
|
||||
private static void updateForDatasetDataset(JavaRDD<Row> toupdateresult, JavaRDD<Dataset> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){
|
||||
JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
|
||||
getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList)
|
||||
.map( r-> (Dataset)r)
|
||||
.map(d -> new ObjectMapper().writeValueAsString(d))
|
||||
.saveAsTextFile(outputPath + "/" + type);
|
||||
}
|
||||
|
||||
private static void updateForPublicationDataset(JavaRDD<Row> toupdateresult, JavaRDD<Publication> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){
|
||||
JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
|
||||
getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList)
|
||||
.map(r -> (Publication)r)
|
||||
.map(p -> new ObjectMapper().writeValueAsString(p))
|
||||
.saveAsTextFile(outputPath + "/" + type);
|
||||
}
|
||||
|
||||
private static void updateForOtherDataset(JavaRDD<Row> toupdateresult, JavaRDD<OtherResearchProduct> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){
|
||||
JavaPairRDD<String, Result> tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
|
||||
getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList)
|
||||
.map( r -> (OtherResearchProduct)r)
|
||||
.map( o -> new ObjectMapper().writeValueAsString(o))
|
||||
.saveAsTextFile(outputPath + "/" + type);
|
||||
}
|
||||
|
||||
|
||||
|
||||
private static JavaRDD<Result> getUpdateForResultDataset(JavaRDD<Row> toupdateresult, JavaPairRDD<String, Result> result, String outputPath, String type, String class_id, String class_name, List<String> communityIdList){
|
||||
return result.leftOuterJoin(toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1))))
|
||||
.map(c -> {
|
||||
if(! c._2()._2().isPresent()){
|
||||
return c._2()._1();
|
||||
}
|
||||
|
||||
List<Object> toAddContext = c._2()._2().get();
|
||||
Set<String> context_set = new HashSet<>();
|
||||
for(Object cId: toAddContext){
|
||||
String id = (String)cId;
|
||||
if (communityIdList.contains(id)){
|
||||
context_set.add(id);
|
||||
}
|
||||
}
|
||||
for (Context context: c._2()._1().getContext()){
|
||||
if(context_set.contains(context)){
|
||||
context_set.remove(context);
|
||||
}
|
||||
}
|
||||
|
||||
List<Context> contextList = context_set.stream().map(co -> {
|
||||
Context newContext = new Context();
|
||||
newContext.setId(co);
|
||||
newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name)));
|
||||
return newContext;
|
||||
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
if(contextList.size() > 0 ){
|
||||
Result r = new Result();
|
||||
r.setId(c._1());
|
||||
r.setContext(contextList);
|
||||
return r;
|
||||
}
|
||||
return null;
|
||||
}).filter(r -> r != null);
|
||||
|
||||
|
||||
// return toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1)))
|
||||
// .join(result)
|
||||
// .map(c -> {
|
||||
// List<Object> toAddContext = c._2()._1();
|
||||
// Set<String> context_set = new HashSet<>();
|
||||
// for(Object cId: toAddContext){
|
||||
// String id = (String)cId;
|
||||
// if (communityIdList.contains(id)){
|
||||
// context_set.add(id);
|
||||
// }
|
||||
// }
|
||||
// for (Context context: c._2()._2().getContext()){
|
||||
// if(context_set.contains(context)){
|
||||
// context_set.remove(context);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// List<Context> contextList = context_set.stream().map(co -> {
|
||||
// Context newContext = new Context();
|
||||
// newContext.setId(co);
|
||||
// newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name)));
|
||||
// return newContext;
|
||||
//
|
||||
// }).collect(Collectors.toList());
|
||||
//
|
||||
// if(contextList.size() > 0 ){
|
||||
// Result r = new Result();
|
||||
// r.setId(c._1());
|
||||
// r.setContext(contextList);
|
||||
// return r;
|
||||
// }
|
||||
// return null;
|
||||
// })
|
||||
// .filter(r -> r != null);
|
||||
}
|
||||
|
||||
private static JavaRDD<Software> createUpdateForSoftwareDataset(JavaRDD<Row> toupdateresult, List<String> communityList,
|
||||
JavaRDD<Software> result, String class_id, String class_name) {
|
||||
return result
|
||||
.mapToPair(s -> new Tuple2<>(s.getId(), s)).leftOuterJoin(getStringResultJavaPairRDD(toupdateresult, communityList))
|
||||
.map(c -> {
|
||||
Software oaf = c._2()._1();
|
||||
if (c._2()._2().isPresent()) {
|
||||
|
||||
HashSet<String> contexts = new HashSet<>(c._2()._2().get());
|
||||
|
||||
for (Context context : oaf.getContext()) {
|
||||
if (contexts.contains(context.getId())){
|
||||
if (!context.getDataInfo().stream().map(di -> di.getInferenceprovenance())
|
||||
.collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){
|
||||
context.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name));
|
||||
//community id already in the context of the result. Remove it from the set that has to be added
|
||||
contexts.remove(context.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
List<Context> cc = oaf.getContext();
|
||||
for(String cId: contexts){
|
||||
Context context = new Context();
|
||||
context.setId(cId);
|
||||
context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name)));
|
||||
cc.add(context);
|
||||
}
|
||||
oaf.setContext(cc);
|
||||
|
||||
}
|
||||
return oaf;
|
||||
});
|
||||
}
|
||||
|
||||
private static JavaPairRDD<String, List<String>> getStringResultJavaPairRDD(JavaRDD<Row> toupdateresult, List<String> communityList) {
|
||||
return toupdateresult.mapToPair(c -> {
|
||||
|
||||
List<String> contextList = new ArrayList<>();
|
||||
List<String> contexts = c.getList(1);
|
||||
for (String context : contexts) {
|
||||
if (communityList.contains(context)) {
|
||||
contextList.add(context);
|
||||
}
|
||||
}
|
||||
|
||||
return new Tuple2<>(c.getString(0) ,contextList);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
private static org.apache.spark.sql.Dataset<Row> getContext(SparkSession spark, String table){
|
||||
String query = "SELECT relation.source, " + table +".context , relation.target " +
|
||||
"FROM " + table +
|
||||
" JOIN relation " +
|
||||
"ON id = source" ;
|
||||
|
||||
return spark.sql(query);
|
||||
}
|
||||
|
||||
private static Boolean relatedToCommunities(Result r, List<String> communityIdList) {
|
||||
Set<String> result_communities = r.getContext()
|
||||
.stream()
|
||||
.map(c -> c.getId())
|
||||
.collect(Collectors.toSet());
|
||||
for (String communityId : result_communities) {
|
||||
if (communityIdList.contains(communityId)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private static void updateResult(JavaPairRDD<String, Result> results, JavaPairRDD<String, TypedRow> toupdateresult, String outputPath, String type) {
|
||||
results.leftOuterJoin(toupdateresult)
|
||||
.map(p -> {
|
||||
Result r = p._2()._1();
|
||||
if (p._2()._2().isPresent()){
|
||||
Set<String> communityList = p._2()._2().get().getAccumulator();
|
||||
for(Context c: r.getContext()){
|
||||
if (communityList.contains(c.getId())){
|
||||
//verify if the datainfo for this context contains propagation
|
||||
if (!c.getDataInfo().stream().map(di -> di.getInferenceprovenance()).collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){
|
||||
c.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME));
|
||||
//community id already in the context of the result. Remove it from the set that has to be added
|
||||
communityList.remove(c.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
List<Context> cc = r.getContext();
|
||||
for(String cId: communityList){
|
||||
Context context = new Context();
|
||||
context.setId(cId);
|
||||
context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME)));
|
||||
cc.add(context);
|
||||
}
|
||||
r.setContext(cc);
|
||||
}
|
||||
return r;
|
||||
})
|
||||
.map(p -> new ObjectMapper().writeValueAsString(p))
|
||||
.saveAsTextFile(outputPath+"/"+type);
|
||||
}
|
||||
|
||||
|
||||
|
||||
private static TypedRow getTypedRow(List<String> communityIdList, List<Context> context, String id, String type) {
|
||||
Set<String> result_communities = context
|
||||
.stream()
|
||||
.map(c -> c.getId())
|
||||
.collect(Collectors.toSet());
|
||||
TypedRow tp = new TypedRow();
|
||||
tp.setSourceId(id);
|
||||
tp.setType(type);
|
||||
for (String communityId : result_communities) {
|
||||
if (communityIdList.contains(communityId)) {
|
||||
tp.add(communityId);
|
||||
}
|
||||
}
|
||||
if (tp.getAccumulator() != null) {
|
||||
return tp;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -28,5 +28,17 @@
|
|||
"paramLongName":"hive_metastore_uris",
|
||||
"paramDescription": "the hive metastore uris",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName":"wu",
|
||||
"paramLongName":"writeUpdate",
|
||||
"paramDescription": "true if the update must be writte. No double check if information is already present",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName":"sg",
|
||||
"paramLongName":"saveGraph",
|
||||
"paramDescription": "true if the new version of the graph must be saved",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -19,4 +19,40 @@
|
|||
<name>hive_metastore_uris</name>
|
||||
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2YarnHistoryServerAddress</name>
|
||||
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2EventLogDir</name>
|
||||
<value>/user/spark/spark2ApplicationHistory</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2ExtraListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2SqlQueryExecutionListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorNumber</name>
|
||||
<value>4</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<value>15G</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<value>6G</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<value>1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2MaxExecutors</name>
|
||||
<value>50</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -24,6 +24,18 @@
|
|||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorNumber</name>
|
||||
<description>number of executors used</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>writeUpdate</name>
|
||||
<description>writes the information found for the update. No double check done if the information is already present</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>saveGraph</name>
|
||||
<description>writes new version of the graph after the propagation step</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="CountryPropagation"/>
|
||||
|
@ -41,17 +53,25 @@
|
|||
<name>CountryPropagation</name>
|
||||
<class>eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob</class>
|
||||
<jar>dhp-propagation-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory}
|
||||
--executor-cores ${sparkExecutorCores}
|
||||
<spark-opts>
|
||||
--num-executors=${sparkExecutorNumber}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
|
||||
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.dynamicAllocation.enabled=true
|
||||
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
|
||||
</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
||||
<arg>--whitelist</arg><arg>${whitelist}</arg>
|
||||
<arg>--allowedtypes</arg><arg>${allowedtypes}</arg>
|
||||
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
|
||||
<arg>--writeUpdate</arg><arg>${writeUpdate}</arg>
|
||||
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
|
|
Loading…
Reference in New Issue