forked from D-Net/dnet-hadoop
Added preparation classes before actual propagation
This commit is contained in:
parent
72c63a326e
commit
fbf5c27c27
|
@ -0,0 +1,152 @@
|
||||||
|
package eu.dnetlib.dhp.resulttocommunityfromsemrel;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.gson.Gson;
|
||||||
|
import eu.dnetlib.dhp.QueryInformationSystem;
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.hadoop.io.compress.GzipCodec;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.Row;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||||
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||||
|
|
||||||
|
public class PrepareResultCommunitySetStep1 {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(PrepareResultCommunitySetStep1.class);
|
||||||
|
|
||||||
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
String jsonConfiguration = IOUtils.toString(PrepareResultCommunitySetStep1.class
|
||||||
|
.getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_preparecommunitytoresult_parameters.json"));
|
||||||
|
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
jsonConfiguration);
|
||||||
|
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
|
||||||
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
|
String inputPath = parser.get("sourcePath");
|
||||||
|
log.info("inputPath: {}", inputPath);
|
||||||
|
|
||||||
|
final String outputPath = parser.get("outputPath");
|
||||||
|
log.info("outputPath: {}", outputPath);
|
||||||
|
|
||||||
|
final String resultClassName = parser.get("resultTableName");
|
||||||
|
log.info("resultTableName: {}", resultClassName);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
|
||||||
|
|
||||||
|
final List<String> allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";"));
|
||||||
|
log.info("allowedSemRel: {}", new Gson().toJson(allowedsemrel));
|
||||||
|
|
||||||
|
final String isLookupUrl = parser.get("isLookupUrl");
|
||||||
|
log.info("isLookupUrl: {}", isLookupUrl);
|
||||||
|
|
||||||
|
final List<String> communityIdList = QueryInformationSystem.getCommunityList(isLookupUrl);
|
||||||
|
log.info("communityIdList: {}", new Gson().toJson(communityIdList));
|
||||||
|
|
||||||
|
final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase();
|
||||||
|
log.info("resultType: {}", resultType);
|
||||||
|
|
||||||
|
|
||||||
|
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
|
||||||
|
|
||||||
|
|
||||||
|
runWithSparkHiveSession(conf, isSparkSessionManaged,
|
||||||
|
spark -> {
|
||||||
|
if (isTest(parser)) {
|
||||||
|
removeOutputDir(spark, outputPath);
|
||||||
|
}
|
||||||
|
prepareInfo(spark, inputPath, outputPath, allowedsemrel, resultClazz, resultType,
|
||||||
|
communityIdList);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private static <R extends Result> void prepareInfo(SparkSession spark, String inputPath, String outputPath,
|
||||||
|
List<String> allowedsemrel, Class<R> resultClazz, String resultType,
|
||||||
|
List<String> communityIdList) {
|
||||||
|
//read the relation table and the table related to the result it is using
|
||||||
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
org.apache.spark.sql.Dataset<Relation> relation = spark.createDataset(sc.textFile(inputPath + "/relation")
|
||||||
|
.map(item -> OBJECT_MAPPER.readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class));
|
||||||
|
relation.createOrReplaceTempView("relation");
|
||||||
|
|
||||||
|
log.info("Reading Graph table from: {}", inputPath + "/" + resultType);
|
||||||
|
Dataset<R> result = readPathEntity(spark, inputPath + "/" + resultType, resultClazz);
|
||||||
|
|
||||||
|
result.createOrReplaceTempView("result");
|
||||||
|
|
||||||
|
getPossibleResultcommunityAssociation(spark, allowedsemrel, outputPath + "/" + resultType, communityIdList);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void getPossibleResultcommunityAssociation(SparkSession spark, List<String> allowedsemrel, String outputPath,
|
||||||
|
List<String> communityIdList) {
|
||||||
|
|
||||||
|
String communitylist = getConstraintList(" co.id = '", communityIdList);
|
||||||
|
String semrellist = getConstraintList(" relClass = '", allowedsemrel );
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
associates to each result the set of community contexts they are associated to
|
||||||
|
select id, collect_set(co.id) community_context " +
|
||||||
|
" from result " +
|
||||||
|
" lateral view explode (context) c as co " +
|
||||||
|
" where datainfo.deletedbyinference = false "+ communitylist +
|
||||||
|
" group by id
|
||||||
|
|
||||||
|
associates to each target of a relation with allowed semantics the set of community context it could possibly
|
||||||
|
inherit from the source of the relation
|
||||||
|
*/
|
||||||
|
String query = "Select target resultId, community_context " +
|
||||||
|
"from (select id, collect_set(co.id) community_context " +
|
||||||
|
" from result " +
|
||||||
|
" lateral view explode (context) c as co " +
|
||||||
|
" where datainfo.deletedbyinference = false "+ communitylist +
|
||||||
|
" group by id) p " +
|
||||||
|
"JOIN " +
|
||||||
|
"(select source, target " +
|
||||||
|
"from relation " +
|
||||||
|
"where datainfo.deletedbyinference = false " + semrellist + ") r " +
|
||||||
|
"ON p.id = r.source";
|
||||||
|
|
||||||
|
org.apache.spark.sql.Dataset<Row> result_context = spark.sql( query);
|
||||||
|
result_context.createOrReplaceTempView("result_context");
|
||||||
|
|
||||||
|
//( target, (mes, dh-ch-, ni))
|
||||||
|
/*
|
||||||
|
a dataset for example could be linked to more than one publication. For each publication linked to that dataset
|
||||||
|
the previous query will produce a row: targetId set of community context the te=arget could possibly inherit
|
||||||
|
with the following query there will be a single row for each result linked to more than one result of the result type
|
||||||
|
currently being used
|
||||||
|
*/
|
||||||
|
query = "select resultId , collect_set(co) communityList " +
|
||||||
|
"from result_context " +
|
||||||
|
"lateral view explode (community_context) c as co " +
|
||||||
|
"where length(co) > 0 " +
|
||||||
|
"group by resultId";
|
||||||
|
|
||||||
|
spark.sql(query)
|
||||||
|
.as(Encoders.bean(ResultCommunityList.class))
|
||||||
|
.toJavaRDD()
|
||||||
|
.map(r -> OBJECT_MAPPER.writeValueAsString(r))
|
||||||
|
.saveAsTextFile(outputPath, GzipCodec.class);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,105 @@
|
||||||
|
package eu.dnetlib.dhp.resulttocommunityfromsemrel;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.orcidtoresultfromsemrel.ResultOrcidList;
|
||||||
|
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.hadoop.io.compress.GzipCodec;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||||
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
|
||||||
|
public class PrepareResultCommunitySetStep2 {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(PrepareResultCommunitySetStep2.class);
|
||||||
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
|
String jsonConfiguration = IOUtils.toString(PrepareResultCommunitySetStep2.class
|
||||||
|
.getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_prepareresulttocommunity2_parameters.json"));
|
||||||
|
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
jsonConfiguration);
|
||||||
|
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
|
||||||
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
|
String inputPath = parser.get("sourcePath");
|
||||||
|
log.info("inputPath: {}", inputPath);
|
||||||
|
|
||||||
|
final String outputPath = parser.get("outputPath");
|
||||||
|
log.info("outputPath: {}", outputPath);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
|
||||||
|
runWithSparkSession(conf, isSparkSessionManaged,
|
||||||
|
spark -> {
|
||||||
|
if (isTest(parser)) {
|
||||||
|
removeOutputDir(spark, outputPath);
|
||||||
|
}
|
||||||
|
mergeInfo(spark, inputPath, outputPath);
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void mergeInfo(SparkSession spark, String inputPath, String outputPath) {
|
||||||
|
|
||||||
|
Dataset<ResultCommunityList> resultOrcidAssocCommunityList = readResultCommunityList(spark, inputPath + "/publication")
|
||||||
|
.union(readResultCommunityList(spark, inputPath + "/dataset"))
|
||||||
|
.union(readResultCommunityList(spark, inputPath + "/otherresearchproduct"))
|
||||||
|
.union(readResultCommunityList(spark, inputPath + "/software"));
|
||||||
|
|
||||||
|
resultOrcidAssocCommunityList
|
||||||
|
.toJavaRDD()
|
||||||
|
.mapToPair(r -> new Tuple2<>(r.getResultId(), r))
|
||||||
|
.reduceByKey((a, b) -> {
|
||||||
|
if (a == null) {
|
||||||
|
return b;
|
||||||
|
}
|
||||||
|
if (b == null) {
|
||||||
|
return a;
|
||||||
|
}
|
||||||
|
Set<String> community_set = new HashSet<>();
|
||||||
|
|
||||||
|
a.getCommunityList().stream().forEach(aa -> community_set.add(aa));
|
||||||
|
|
||||||
|
b.getCommunityList().stream().forEach(aa -> {
|
||||||
|
if (!community_set.contains(aa)) {
|
||||||
|
a.getCommunityList().add(aa);
|
||||||
|
community_set.add(aa);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return a;
|
||||||
|
})
|
||||||
|
.map(c -> c._2())
|
||||||
|
.map(r -> OBJECT_MAPPER.writeValueAsString(r))
|
||||||
|
.saveAsTextFile(outputPath, GzipCodec.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Dataset<ResultCommunityList> readResultCommunityList(SparkSession spark, String relationPath) {
|
||||||
|
return spark
|
||||||
|
.read()
|
||||||
|
.textFile(relationPath)
|
||||||
|
.map(value -> OBJECT_MAPPER.readValue(value, ResultCommunityList.class), Encoders.bean(ResultCommunityList.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package eu.dnetlib.dhp.communitytoresultthroughsemrel;
|
package eu.dnetlib.dhp.resulttocommunityfromsemrel;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import eu.dnetlib.dhp.QueryInformationSystem;
|
import eu.dnetlib.dhp.QueryInformationSystem;
|
||||||
|
@ -33,7 +33,8 @@ public class SparkResultToCommunityThroughSemRelJob {
|
||||||
System.out.println(key + " = " + parser.get(key));
|
System.out.println(key + " = " + parser.get(key));
|
||||||
}
|
}
|
||||||
|
|
||||||
/* SparkConf conf = new SparkConf();
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
|
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
|
||||||
final SparkSession spark = SparkSession
|
final SparkSession spark = SparkSession
|
||||||
.builder()
|
.builder()
|
||||||
|
@ -45,7 +46,7 @@ public class SparkResultToCommunityThroughSemRelJob {
|
||||||
|
|
||||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
final String inputPath = parser.get("sourcePath");
|
final String inputPath = parser.get("sourcePath");
|
||||||
final String outputPath = "/tmp/provision/propagation/communitytoresultthroughsemrel";
|
final String outputPath = "/tmp/provision/propagation/resulttocommunityfromsemrel";
|
||||||
|
|
||||||
//final List<String> allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";"));
|
//final List<String> allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";"));
|
||||||
final List<String> allowedsemrel = Arrays.asList("isSupplementedBy", "isSupplementTo");
|
final List<String> allowedsemrel = Arrays.asList("isSupplementedBy", "isSupplementTo");
|
||||||
|
@ -104,8 +105,14 @@ public class SparkResultToCommunityThroughSemRelJob {
|
||||||
software.createOrReplaceTempView("software");
|
software.createOrReplaceTempView("software");
|
||||||
other.createOrReplaceTempView("other");
|
other.createOrReplaceTempView("other");
|
||||||
|
|
||||||
org.apache.spark.sql.Dataset<Row> publication_context = getContext(spark, "publication");
|
// org.apache.spark.sql.Dataset<Row> publication_context = getContext(spark, "publication");
|
||||||
publication_context.createOrReplaceTempView("publication_context");
|
// publication_context.createOrReplaceTempView("publication_context");
|
||||||
|
|
||||||
|
org.apache.spark.sql.Dataset<Row> publication_context = spark.sql( "SELECT relation.source, " +
|
||||||
|
"publication.context , relation.target " +
|
||||||
|
"FROM publication " +
|
||||||
|
" JOIN relation " +
|
||||||
|
"ON id = source");
|
||||||
|
|
||||||
org.apache.spark.sql.Dataset<Row> dataset_context = getContext(spark, "dataset");
|
org.apache.spark.sql.Dataset<Row> dataset_context = getContext(spark, "dataset");
|
||||||
dataset_context.createOrReplaceTempView("dataset_context");
|
dataset_context.createOrReplaceTempView("dataset_context");
|
||||||
|
@ -202,7 +209,7 @@ public class SparkResultToCommunityThroughSemRelJob {
|
||||||
*/
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
/* private static org.apache.spark.sql.Dataset<Row> getUpdateCommunitiesForTable(SparkSession spark, String table){
|
private static org.apache.spark.sql.Dataset<Row> getUpdateCommunitiesForTable(SparkSession spark, String table){
|
||||||
String query = "SELECT target_id, collect_set(co.id) context_id " +
|
String query = "SELECT target_id, collect_set(co.id) context_id " +
|
||||||
" FROM (SELECT t.id target_id, s.context source_context " +
|
" FROM (SELECT t.id target_id, s.context source_context " +
|
||||||
" FROM context_software s " +
|
" FROM context_software s " +
|
||||||
|
@ -223,7 +230,7 @@ public class SparkResultToCommunityThroughSemRelJob {
|
||||||
" FROM other_context o " +
|
" FROM other_context o " +
|
||||||
" JOIN " + table + " t " +
|
" JOIN " + table + " t " +
|
||||||
" ON o.target = t.id) TMP " +
|
" ON o.target = t.id) TMP " +
|
||||||
" LATERAL VIEW EXPLORE(source_context) MyT as co " +
|
" LATERAL VIEW EXPLODE(source_context) MyT as co " +
|
||||||
" GROUP BY target_id" ;
|
" GROUP BY target_id" ;
|
||||||
|
|
||||||
return spark.sql(query);
|
return spark.sql(query);
|
||||||
|
@ -413,7 +420,7 @@ public class SparkResultToCommunityThroughSemRelJob {
|
||||||
|
|
||||||
|
|
||||||
private static org.apache.spark.sql.Dataset<Row> getContext(SparkSession spark, String table){
|
private static org.apache.spark.sql.Dataset<Row> getContext(SparkSession spark, String table){
|
||||||
String query = "SELECT source, context , target " +
|
String query = "SELECT relation.source, " + table +".context , relation.target " +
|
||||||
"FROM " + table +
|
"FROM " + table +
|
||||||
" JOIN relation " +
|
" JOIN relation " +
|
||||||
"ON id = source" ;
|
"ON id = source" ;
|
||||||
|
@ -484,5 +491,5 @@ public class SparkResultToCommunityThroughSemRelJob {
|
||||||
return tp;
|
return tp;
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}*/
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
package eu.dnetlib.dhp.resulttocommunityfromsemrel;
|
package eu.dnetlib.dhp.resulttocommunityfromsemrel;
|
||||||
|
|
||||||
import com.cloudera.com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import eu.dnetlib.dhp.QueryInformationSystem;
|
import eu.dnetlib.dhp.QueryInformationSystem;
|
||||||
import eu.dnetlib.dhp.TypedRow;
|
import eu.dnetlib.dhp.TypedRow;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
@ -21,24 +21,19 @@ import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.PropagationConstant.*;
|
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||||
|
|
||||||
public class SparkResultToCommunityThroughSemRelJob {
|
public class SparkResultToCommunityThroughSemRelJob2 {
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils
|
||||||
.toString(SparkResultToCommunityThroughSemRelJob.class
|
.toString(SparkResultToCommunityThroughSemRelJob2.class
|
||||||
.getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_propagationresultcommunityfromsemrel_parameters.json")));
|
.getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_propagationresultcommunityfromsemrel_parameters.json")));
|
||||||
parser.parseArgument(args);
|
parser.parseArgument(args);
|
||||||
|
|
||||||
for(String key : parser.getObjectMap().keySet()){
|
|
||||||
System.out.println(key + " = " + parser.get(key));
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
SparkConf conf = new SparkConf();
|
||||||
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
|
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
|
||||||
final SparkSession spark = SparkSession
|
final SparkSession spark = SparkSession
|
||||||
.builder()
|
.builder()
|
||||||
.appName(SparkResultToCommunityThroughSemRelJob.class.getSimpleName())
|
.appName(SparkResultToCommunityThroughSemRelJob2.class.getSimpleName())
|
||||||
.master(parser.get("master"))
|
.master(parser.get("master"))
|
||||||
.config(conf)
|
.config(conf)
|
||||||
.enableHiveSupport()
|
.enableHiveSupport()
|
||||||
|
@ -48,128 +43,157 @@ public class SparkResultToCommunityThroughSemRelJob {
|
||||||
final String inputPath = parser.get("sourcePath");
|
final String inputPath = parser.get("sourcePath");
|
||||||
final String outputPath = "/tmp/provision/propagation/resulttocommunityfromsemrel";
|
final String outputPath = "/tmp/provision/propagation/resulttocommunityfromsemrel";
|
||||||
|
|
||||||
//final List<String> allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";"));
|
final List<String> allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";"));
|
||||||
final List<String> allowedsemrel = Arrays.asList("isSupplementedBy", "isSupplementTo");
|
//final List<String> allowedsemrel = Arrays.asList("isSupplementedBy", "isSupplementTo");
|
||||||
//final List<String> communityIdList = QueryInformationSystem.getCommunityList(parser.get("isLookupUrl"));
|
final List<String> communityIdList = QueryInformationSystem.getCommunityList(parser.get("isLookupUrl"));
|
||||||
final List<String> communityIdList = QueryInformationSystem.getCommunityList("http://beta.services.openaire.eu:8280/is/services/isLookUp");
|
//final List<String> communityIdList = QueryInformationSystem.getCommunityList("http://beta.services.openaire.eu:8280/is/services/isLookUp");
|
||||||
|
|
||||||
createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration()));
|
createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration()));
|
||||||
|
|
||||||
|
|
||||||
JavaRDD<Publication> all_publication_rdd = sc.textFile(inputPath + "/publication")
|
JavaRDD<Publication> publication_rdd = sc.textFile(inputPath + "/publication")
|
||||||
.map(item -> new ObjectMapper().readValue(item, Publication.class))
|
.map(item -> new ObjectMapper().readValue(item, Publication.class));
|
||||||
.filter(p -> !p.getDataInfo().getDeletedbyinference()).cache();
|
|
||||||
JavaRDD<Publication> publication_rdd = all_publication_rdd
|
|
||||||
.filter(p -> relatedToCommunities(p, communityIdList)).cache();
|
|
||||||
|
|
||||||
JavaRDD<Dataset> all_dataset_rdd = sc.textFile(inputPath + "/dataset")
|
System.out.println(publication_rdd.count());
|
||||||
.map(item -> new ObjectMapper().readValue(item, Dataset.class))
|
// JavaRDD<Dataset> dataset_rdd = sc.textFile(inputPath + "/dataset")
|
||||||
.filter(p -> !p.getDataInfo().getDeletedbyinference()).cache();
|
// .map(item -> new ObjectMapper().readValue(item, Dataset.class));
|
||||||
JavaRDD<Dataset> dataset_rdd = all_dataset_rdd
|
//
|
||||||
.filter(p -> relatedToCommunities(p, communityIdList)).cache();
|
// JavaRDD<OtherResearchProduct> orp_rdd = sc.textFile(inputPath + "/otherresearchproduct")
|
||||||
|
// .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class));
|
||||||
JavaRDD<OtherResearchProduct> all_orp_rdd = sc.textFile(inputPath + "/otherresearchproduct")
|
//
|
||||||
.map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class))
|
// JavaRDD<Software> software_rdd = sc.textFile(inputPath + "/software")
|
||||||
.filter(p -> !p.getDataInfo().getDeletedbyinference()).cache();
|
// .map(item -> new ObjectMapper().readValue(item, Software.class));
|
||||||
JavaRDD<OtherResearchProduct> orp_rdd = all_orp_rdd.filter(p -> relatedToCommunities(p, communityIdList)).cache();
|
|
||||||
|
|
||||||
JavaRDD<Software> all_software_rdd = sc.textFile(inputPath + "/software")
|
|
||||||
.map(item -> new ObjectMapper().readValue(item, Software.class))
|
|
||||||
.filter(p -> !p.getDataInfo().getDeletedbyinference()).cache();
|
|
||||||
JavaRDD<Software> software_rdd = all_software_rdd.filter(p -> relatedToCommunities(p, communityIdList)).cache();
|
|
||||||
|
|
||||||
JavaRDD<Relation> relation_rdd = sc.textFile(inputPath + "/relation")
|
JavaRDD<Relation> relation_rdd = sc.textFile(inputPath + "/relation")
|
||||||
.map(item -> new ObjectMapper().readValue(item, Relation.class))
|
.map(item -> new ObjectMapper().readValue(item, Relation.class));
|
||||||
.filter(r -> !r.getDataInfo().getDeletedbyinference())
|
|
||||||
.filter(r -> allowedsemrel.contains(r.getRelClass()) && RELATION_RESULTRESULT_REL_TYPE.equals(r.getRelType())).cache();
|
System.out.println(relation_rdd.count());
|
||||||
|
|
||||||
|
// .filter(r -> !r.getDataInfo().getDeletedbyinference())
|
||||||
|
// .filter(r -> allowedsemrel.contains(r.getRelClass()) && RELATION_RESULTRESULT_REL_TYPE.equals(r.getRelType())).cache();
|
||||||
|
|
||||||
|
|
||||||
org.apache.spark.sql.Dataset<Publication> publication = spark.createDataset(publication_rdd.rdd(),
|
org.apache.spark.sql.Dataset<Publication> publication = spark.createDataset(publication_rdd.rdd(),
|
||||||
Encoders.bean(Publication.class));
|
Encoders.bean(Publication.class));
|
||||||
|
|
||||||
org.apache.spark.sql.Dataset<Dataset> dataset = spark.createDataset(dataset_rdd.rdd(),
|
|
||||||
Encoders.bean(Dataset.class));
|
|
||||||
|
|
||||||
org.apache.spark.sql.Dataset<OtherResearchProduct> other = spark.createDataset(orp_rdd.rdd(),
|
|
||||||
Encoders.bean(OtherResearchProduct.class));
|
|
||||||
|
|
||||||
org.apache.spark.sql.Dataset<Software> software = spark.createDataset(software_rdd.rdd(),
|
|
||||||
Encoders.bean(Software.class));
|
|
||||||
|
|
||||||
org.apache.spark.sql.Dataset<Relation> relation = spark.createDataset(relation_rdd.rdd(),
|
org.apache.spark.sql.Dataset<Relation> relation = spark.createDataset(relation_rdd.rdd(),
|
||||||
Encoders.bean(Relation.class));
|
Encoders.bean(Relation.class));
|
||||||
|
|
||||||
|
// org.apache.spark.sql.Dataset<Dataset> dataset = spark.createDataset(dataset_rdd.rdd(),
|
||||||
|
// Encoders.bean(Dataset.class));
|
||||||
|
//
|
||||||
|
// org.apache.spark.sql.Dataset<OtherResearchProduct> other = spark.createDataset(orp_rdd.rdd(),
|
||||||
|
// Encoders.bean(OtherResearchProduct.class));
|
||||||
|
//
|
||||||
|
// org.apache.spark.sql.Dataset<Software> software = spark.createDataset(software_rdd.rdd(),
|
||||||
|
// Encoders.bean(Software.class));
|
||||||
|
//
|
||||||
|
// org.apache.spark.sql.Dataset<Relation> relation = spark.createDataset(relation_rdd.rdd(),
|
||||||
|
// Encoders.bean(Relation.class));
|
||||||
|
|
||||||
publication.createOrReplaceTempView("publication");
|
publication.createOrReplaceTempView("publication");
|
||||||
relation.createOrReplaceTempView("relation");
|
relation.createOrReplaceTempView("relation");
|
||||||
dataset.createOrReplaceTempView("dataset");
|
// relation.createOrReplaceTempView("relation");
|
||||||
software.createOrReplaceTempView("software");
|
// dataset.createOrReplaceTempView("dataset");
|
||||||
other.createOrReplaceTempView("other");
|
// software.createOrReplaceTempView("software");
|
||||||
|
// other.createOrReplaceTempView("other");
|
||||||
|
|
||||||
// org.apache.spark.sql.Dataset<Row> publication_context = getContext(spark, "publication");
|
String communitylist = getConstraintList(" co.id = '", communityIdList);
|
||||||
// publication_context.createOrReplaceTempView("publication_context");
|
|
||||||
|
|
||||||
org.apache.spark.sql.Dataset<Row> publication_context = spark.sql( "SELECT relation.source, " +
|
String semrellist = getConstraintList(" relClass = '", allowedsemrel );
|
||||||
"publication.context , relation.target " +
|
|
||||||
"FROM publication " +
|
|
||||||
" JOIN relation " +
|
|
||||||
"ON id = source");
|
|
||||||
|
|
||||||
org.apache.spark.sql.Dataset<Row> dataset_context = getContext(spark, "dataset");
|
|
||||||
dataset_context.createOrReplaceTempView("dataset_context");
|
|
||||||
|
|
||||||
org.apache.spark.sql.Dataset<Row> software_context = getContext(spark, "software");
|
|
||||||
software_context.createOrReplaceTempView("software_context");
|
|
||||||
|
|
||||||
org.apache.spark.sql.Dataset<Row> other_context = getContext(spark, "other");
|
|
||||||
other_context.createOrReplaceTempView("other_context");
|
|
||||||
|
|
||||||
publication = spark.createDataset(all_publication_rdd.rdd(),
|
|
||||||
Encoders.bean(Publication.class));
|
|
||||||
publication.createOrReplaceTempView("publication");
|
|
||||||
|
|
||||||
dataset = spark.createDataset(all_dataset_rdd.rdd(),
|
|
||||||
Encoders.bean(Dataset.class));
|
|
||||||
dataset.createOrReplaceTempView("dataset");
|
|
||||||
|
|
||||||
other = spark.createDataset(all_orp_rdd.rdd(),
|
|
||||||
Encoders.bean(OtherResearchProduct.class));
|
|
||||||
other.createOrReplaceTempView("other");
|
|
||||||
|
|
||||||
software = spark.createDataset(all_software_rdd.rdd(),
|
|
||||||
Encoders.bean(Software.class));
|
|
||||||
software.createOrReplaceTempView("software");
|
|
||||||
|
|
||||||
|
|
||||||
org.apache.spark.sql.Dataset<Row> toupdatesoftwareresult = getUpdateCommunitiesForTable(spark, "software");
|
String query = "Select source, community_context, target " +
|
||||||
org.apache.spark.sql.Dataset<Row> toupdatedatasetresult = getUpdateCommunitiesForTable(spark, "dataset");
|
"from (select id, collect_set(co.id) community_context " +
|
||||||
org.apache.spark.sql.Dataset<Row> toupdatepublicationreresult = getUpdateCommunitiesForTable(spark, "publication");
|
"from publication " +
|
||||||
org.apache.spark.sql.Dataset<Row> toupdateotherresult = getUpdateCommunitiesForTable(spark, "other");
|
"lateral view explode (context) c as co " +
|
||||||
|
"where datainfo.deletedbyinference = false "+ communitylist +
|
||||||
createUpdateForResultDatasetWrite(toupdatesoftwareresult.toJavaRDD(), outputPath, "software_update",
|
" group by id) p " +
|
||||||
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
"JOIN " +
|
||||||
|
"(select * " +
|
||||||
createUpdateForResultDatasetWrite(toupdatedatasetresult.toJavaRDD(), outputPath, "dataset_update",
|
"from relation " +
|
||||||
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
"where datainfo.deletedbyinference = false and (relClass = 'isSupplementedBy' OR relClass = 'isSupplementTo')) r " +
|
||||||
|
"ON p.id = r.source";
|
||||||
createUpdateForResultDatasetWrite(toupdatepublicationreresult.toJavaRDD(), outputPath, "publication_update",
|
|
||||||
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
|
||||||
|
|
||||||
createUpdateForResultDatasetWrite(toupdateotherresult.toJavaRDD(), outputPath, "other_update",
|
|
||||||
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
|
||||||
|
|
||||||
|
|
||||||
updateForDatasetDataset(toupdatedatasetresult.toJavaRDD(), dataset.toJavaRDD(), outputPath, "dataset",
|
org.apache.spark.sql.Dataset<Row> publication_context = spark.sql( query);
|
||||||
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
publication_context.createOrReplaceTempView("publication_context");
|
||||||
|
|
||||||
updateForOtherDataset(toupdateotherresult.toJavaRDD(), other.toJavaRDD(), outputPath, "otherresearchproduct",
|
//( source, (mes, dh-ch-, ni), target )
|
||||||
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
query = "select target , collect_set(co) " +
|
||||||
|
"from (select target, community_context " +
|
||||||
|
"from publication_context pc join publication p on " +
|
||||||
|
"p.id = pc.source) tmp " +
|
||||||
|
"lateral view explode (community_context) c as co " +
|
||||||
|
"group by target";
|
||||||
|
|
||||||
updateForSoftwareDataset(toupdatesoftwareresult.toJavaRDD(), software.toJavaRDD(), outputPath, "software",
|
|
||||||
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
|
||||||
|
|
||||||
updateForPublicationDataset(toupdatepublicationreresult.toJavaRDD(), publication.toJavaRDD(), outputPath, "publication",
|
|
||||||
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
|
||||||
|
|
||||||
|
org.apache.spark.sql.Dataset<Row> toupdatepublicationreresult = spark.sql(query);
|
||||||
|
|
||||||
|
System.out.println(toupdatepublicationreresult.count());
|
||||||
|
|
||||||
|
toupdatepublicationreresult.toJavaRDD()
|
||||||
|
.map(r -> {
|
||||||
|
TypedRow tp = new TypedRow();
|
||||||
|
tp.setSourceId(r.getString(0));
|
||||||
|
r.getList(1).stream().forEach(c -> tp.add((String)c));
|
||||||
|
return tp;
|
||||||
|
})
|
||||||
|
.map(tr -> new ObjectMapper().writeValueAsString(tr))
|
||||||
|
.saveAsTextFile(outputPath + "/community2semrelonpublication");
|
||||||
|
// toupdatepublicationreresult.toJavaRDD().flatMap(c -> {
|
||||||
|
//
|
||||||
|
// String source = c.getString(0);
|
||||||
|
// List<Relation> relation_list = new ArrayList<>();
|
||||||
|
// c.getList(1).stream()
|
||||||
|
// .forEach(res -> {
|
||||||
|
// Relation r = new Relation();
|
||||||
|
// r.setSource(source);
|
||||||
|
// r.setTarget((String)res);
|
||||||
|
// r.setRelClass("produces");
|
||||||
|
// relation_list.add(r);
|
||||||
|
// r = new Relation();
|
||||||
|
// r.setSource((String)res);
|
||||||
|
// r.setTarget(source);
|
||||||
|
// r.setRelClass("isProducedBy");
|
||||||
|
// relation_list.add(r);
|
||||||
|
// });
|
||||||
|
// return relation_list.iterator();
|
||||||
|
// }).map(tr -> new ObjectMapper().writeValueAsString(tr))
|
||||||
|
// .saveAsTextFile(outputPath + "/community2semrel");
|
||||||
|
//
|
||||||
|
|
||||||
|
// org.apache.spark.sql.Dataset<Row> toupdatesoftwareresult = getUpdateCommunitiesForTable(spark, "software");
|
||||||
|
// org.apache.spark.sql.Dataset<Row> toupdatedatasetresult = getUpdateCommunitiesForTable(spark, "dataset");
|
||||||
|
// org.apache.spark.sql.Dataset<Row> toupdatepublicationreresult = getUpdateCommunitiesForTable(spark, "publication");
|
||||||
|
// org.apache.spark.sql.Dataset<Row> toupdateotherresult = getUpdateCommunitiesForTable(spark, "other");
|
||||||
|
|
||||||
|
// createUpdateForResultDatasetWrite(toupdatesoftwareresult.toJavaRDD(), outputPath, "software_update",
|
||||||
|
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||||
|
//
|
||||||
|
// createUpdateForResultDatasetWrite(toupdatedatasetresult.toJavaRDD(), outputPath, "dataset_update",
|
||||||
|
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||||
|
|
||||||
|
// createUpdateForResultDatasetWrite(toupdatepublicationreresult.toJavaRDD(), outputPath, "publication_update",
|
||||||
|
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||||
|
|
||||||
|
// createUpdateForResultDatasetWrite(toupdateotherresult.toJavaRDD(), outputPath, "other_update",
|
||||||
|
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||||
|
//
|
||||||
|
//
|
||||||
|
// updateForDatasetDataset(toupdatedatasetresult.toJavaRDD(), dataset.toJavaRDD(), outputPath, "dataset",
|
||||||
|
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||||
|
//
|
||||||
|
// updateForOtherDataset(toupdateotherresult.toJavaRDD(), other.toJavaRDD(), outputPath, "otherresearchproduct",
|
||||||
|
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||||
|
//
|
||||||
|
// updateForSoftwareDataset(toupdatesoftwareresult.toJavaRDD(), software.toJavaRDD(), outputPath, "software",
|
||||||
|
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||||
|
//
|
||||||
|
// updateForPublicationDataset(toupdatepublicationreresult.toJavaRDD(), publication.toJavaRDD(), outputPath, "publication",
|
||||||
|
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||||
|
//
|
||||||
|
|
||||||
/*
|
/*
|
||||||
JavaPairRDD<String, TypedRow> resultLinkedToCommunities = publication
|
JavaPairRDD<String, TypedRow> resultLinkedToCommunities = publication
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
package eu.dnetlib.dhp.resulttocommunityfromsemrel;
|
package eu.dnetlib.dhp.resulttocommunityfromsemrel;
|
||||||
|
|
||||||
import com.cloudera.com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import eu.dnetlib.dhp.QueryInformationSystem;
|
import eu.dnetlib.dhp.QueryInformationSystem;
|
||||||
import eu.dnetlib.dhp.TypedRow;
|
import eu.dnetlib.dhp.TypedRow;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
@ -21,11 +21,11 @@ import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.PropagationConstant.*;
|
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||||
|
|
||||||
public class SparkResultToCommunityThroughSemRelJob2 {
|
public class SparkResultToCommunityThroughSemRelJob3 {
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils
|
||||||
.toString(SparkResultToCommunityThroughSemRelJob2.class
|
.toString(SparkResultToCommunityThroughSemRelJob3.class
|
||||||
.getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_propagationresultcommunityfromsemrel_parameters.json")));
|
.getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_propagationresultcommunityfromsemrel_parameters.json")));
|
||||||
parser.parseArgument(args);
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
@ -33,7 +33,7 @@ public class SparkResultToCommunityThroughSemRelJob2 {
|
||||||
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
|
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
|
||||||
final SparkSession spark = SparkSession
|
final SparkSession spark = SparkSession
|
||||||
.builder()
|
.builder()
|
||||||
.appName(SparkResultToCommunityThroughSemRelJob2.class.getSimpleName())
|
.appName(SparkResultToCommunityThroughSemRelJob3.class.getSimpleName())
|
||||||
.master(parser.get("master"))
|
.master(parser.get("master"))
|
||||||
.config(conf)
|
.config(conf)
|
||||||
.enableHiveSupport()
|
.enableHiveSupport()
|
||||||
|
@ -44,9 +44,8 @@ public class SparkResultToCommunityThroughSemRelJob2 {
|
||||||
final String outputPath = "/tmp/provision/propagation/resulttocommunityfromsemrel";
|
final String outputPath = "/tmp/provision/propagation/resulttocommunityfromsemrel";
|
||||||
|
|
||||||
final List<String> allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";"));
|
final List<String> allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";"));
|
||||||
//final List<String> allowedsemrel = Arrays.asList("isSupplementedBy", "isSupplementTo");
|
|
||||||
final List<String> communityIdList = QueryInformationSystem.getCommunityList(parser.get("isLookupUrl"));
|
final List<String> communityIdList = QueryInformationSystem.getCommunityList(parser.get("isLookupUrl"));
|
||||||
//final List<String> communityIdList = QueryInformationSystem.getCommunityList("http://beta.services.openaire.eu:8280/is/services/isLookUp");
|
|
||||||
|
|
||||||
createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration()));
|
createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration()));
|
||||||
|
|
||||||
|
@ -54,21 +53,18 @@ public class SparkResultToCommunityThroughSemRelJob2 {
|
||||||
JavaRDD<Publication> publication_rdd = sc.textFile(inputPath + "/publication")
|
JavaRDD<Publication> publication_rdd = sc.textFile(inputPath + "/publication")
|
||||||
.map(item -> new ObjectMapper().readValue(item, Publication.class));
|
.map(item -> new ObjectMapper().readValue(item, Publication.class));
|
||||||
|
|
||||||
// JavaRDD<Dataset> dataset_rdd = sc.textFile(inputPath + "/dataset")
|
JavaRDD<Dataset> dataset_rdd = sc.textFile(inputPath + "/dataset")
|
||||||
// .map(item -> new ObjectMapper().readValue(item, Dataset.class));
|
.map(item -> new ObjectMapper().readValue(item, Dataset.class));
|
||||||
//
|
|
||||||
// JavaRDD<OtherResearchProduct> orp_rdd = sc.textFile(inputPath + "/otherresearchproduct")
|
JavaRDD<OtherResearchProduct> orp_rdd = sc.textFile(inputPath + "/otherresearchproduct")
|
||||||
// .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class));
|
.map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class));
|
||||||
//
|
|
||||||
// JavaRDD<Software> software_rdd = sc.textFile(inputPath + "/software")
|
JavaRDD<Software> software_rdd = sc.textFile(inputPath + "/software")
|
||||||
// .map(item -> new ObjectMapper().readValue(item, Software.class));
|
.map(item -> new ObjectMapper().readValue(item, Software.class));
|
||||||
|
|
||||||
JavaRDD<Relation> relation_rdd = sc.textFile(inputPath + "/relation")
|
JavaRDD<Relation> relation_rdd = sc.textFile(inputPath + "/relation")
|
||||||
.map(item -> new ObjectMapper().readValue(item, Relation.class));
|
.map(item -> new ObjectMapper().readValue(item, Relation.class));
|
||||||
|
|
||||||
// .filter(r -> !r.getDataInfo().getDeletedbyinference())
|
|
||||||
// .filter(r -> allowedsemrel.contains(r.getRelClass()) && RELATION_RESULTRESULT_REL_TYPE.equals(r.getRelType())).cache();
|
|
||||||
|
|
||||||
|
|
||||||
org.apache.spark.sql.Dataset<Publication> publication = spark.createDataset(publication_rdd.rdd(),
|
org.apache.spark.sql.Dataset<Publication> publication = spark.createDataset(publication_rdd.rdd(),
|
||||||
Encoders.bean(Publication.class));
|
Encoders.bean(Publication.class));
|
||||||
|
@ -76,24 +72,21 @@ public class SparkResultToCommunityThroughSemRelJob2 {
|
||||||
org.apache.spark.sql.Dataset<Relation> relation = spark.createDataset(relation_rdd.rdd(),
|
org.apache.spark.sql.Dataset<Relation> relation = spark.createDataset(relation_rdd.rdd(),
|
||||||
Encoders.bean(Relation.class));
|
Encoders.bean(Relation.class));
|
||||||
|
|
||||||
// org.apache.spark.sql.Dataset<Dataset> dataset = spark.createDataset(dataset_rdd.rdd(),
|
org.apache.spark.sql.Dataset<Dataset> dataset = spark.createDataset(dataset_rdd.rdd(),
|
||||||
// Encoders.bean(Dataset.class));
|
Encoders.bean(Dataset.class));
|
||||||
//
|
|
||||||
// org.apache.spark.sql.Dataset<OtherResearchProduct> other = spark.createDataset(orp_rdd.rdd(),
|
org.apache.spark.sql.Dataset<OtherResearchProduct> other = spark.createDataset(orp_rdd.rdd(),
|
||||||
// Encoders.bean(OtherResearchProduct.class));
|
Encoders.bean(OtherResearchProduct.class));
|
||||||
//
|
|
||||||
// org.apache.spark.sql.Dataset<Software> software = spark.createDataset(software_rdd.rdd(),
|
org.apache.spark.sql.Dataset<Software> software = spark.createDataset(software_rdd.rdd(),
|
||||||
// Encoders.bean(Software.class));
|
Encoders.bean(Software.class));
|
||||||
//
|
|
||||||
// org.apache.spark.sql.Dataset<Relation> relation = spark.createDataset(relation_rdd.rdd(),
|
|
||||||
// Encoders.bean(Relation.class));
|
|
||||||
|
|
||||||
publication.createOrReplaceTempView("publication");
|
publication.createOrReplaceTempView("publication");
|
||||||
relation.createOrReplaceTempView("relation");
|
relation.createOrReplaceTempView("relation");
|
||||||
// relation.createOrReplaceTempView("relation");
|
dataset.createOrReplaceTempView("dataset");
|
||||||
// dataset.createOrReplaceTempView("dataset");
|
software.createOrReplaceTempView("software");
|
||||||
// software.createOrReplaceTempView("software");
|
other.createOrReplaceTempView("other");
|
||||||
// other.createOrReplaceTempView("other");
|
|
||||||
|
|
||||||
String communitylist = getConstraintList(" co.id = '", communityIdList);
|
String communitylist = getConstraintList(" co.id = '", communityIdList);
|
||||||
|
|
||||||
|
@ -109,7 +102,7 @@ public class SparkResultToCommunityThroughSemRelJob2 {
|
||||||
"JOIN " +
|
"JOIN " +
|
||||||
"(select * " +
|
"(select * " +
|
||||||
"from relation " +
|
"from relation " +
|
||||||
"where datainfo.deletedbyinference = false and (relClass = 'isSupplementedBy' OR relClass = 'isSupplementTo')) r " +
|
"where datainfo.deletedbyinference = false " + semrellist + ") r " +
|
||||||
"ON p.id = r.source";
|
"ON p.id = r.source";
|
||||||
|
|
||||||
|
|
||||||
|
@ -127,37 +120,33 @@ public class SparkResultToCommunityThroughSemRelJob2 {
|
||||||
|
|
||||||
|
|
||||||
org.apache.spark.sql.Dataset<Row> toupdatepublicationreresult = spark.sql(query);
|
org.apache.spark.sql.Dataset<Row> toupdatepublicationreresult = spark.sql(query);
|
||||||
|
org.apache.spark.sql.Dataset<Row> toupdatesoftwareresult = getUpdateCommunitiesForTable(spark, "software");
|
||||||
|
org.apache.spark.sql.Dataset<Row> toupdatedatasetresult = getUpdateCommunitiesForTable(spark, "dataset");
|
||||||
|
org.apache.spark.sql.Dataset<Row> toupdateotherresult = getUpdateCommunitiesForTable(spark, "other");
|
||||||
|
|
||||||
|
createUpdateForResultDatasetWrite(toupdatesoftwareresult.toJavaRDD(), outputPath, "software_update",
|
||||||
|
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||||
|
|
||||||
// org.apache.spark.sql.Dataset<Row> toupdatesoftwareresult = getUpdateCommunitiesForTable(spark, "software");
|
createUpdateForResultDatasetWrite(toupdatedatasetresult.toJavaRDD(), outputPath, "dataset_update",
|
||||||
// org.apache.spark.sql.Dataset<Row> toupdatedatasetresult = getUpdateCommunitiesForTable(spark, "dataset");
|
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||||
// org.apache.spark.sql.Dataset<Row> toupdatepublicationreresult = getUpdateCommunitiesForTable(spark, "publication");
|
|
||||||
// org.apache.spark.sql.Dataset<Row> toupdateotherresult = getUpdateCommunitiesForTable(spark, "other");
|
|
||||||
|
|
||||||
// createUpdateForResultDatasetWrite(toupdatesoftwareresult.toJavaRDD(), outputPath, "software_update",
|
|
||||||
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
|
||||||
//
|
|
||||||
// createUpdateForResultDatasetWrite(toupdatedatasetresult.toJavaRDD(), outputPath, "dataset_update",
|
|
||||||
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
|
||||||
|
|
||||||
createUpdateForResultDatasetWrite(toupdatepublicationreresult.toJavaRDD(), outputPath, "publication_update",
|
createUpdateForResultDatasetWrite(toupdatepublicationreresult.toJavaRDD(), outputPath, "publication_update",
|
||||||
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||||
|
|
||||||
// createUpdateForResultDatasetWrite(toupdateotherresult.toJavaRDD(), outputPath, "other_update",
|
createUpdateForResultDatasetWrite(toupdateotherresult.toJavaRDD(), outputPath, "other_update",
|
||||||
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||||
//
|
|
||||||
//
|
updateForDatasetDataset(toupdatedatasetresult.toJavaRDD(), dataset.toJavaRDD(), outputPath, "dataset",
|
||||||
// updateForDatasetDataset(toupdatedatasetresult.toJavaRDD(), dataset.toJavaRDD(), outputPath, "dataset",
|
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||||
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
|
||||||
//
|
updateForOtherDataset(toupdateotherresult.toJavaRDD(), other.toJavaRDD(), outputPath, "otherresearchproduct",
|
||||||
// updateForOtherDataset(toupdateotherresult.toJavaRDD(), other.toJavaRDD(), outputPath, "otherresearchproduct",
|
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||||
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
|
||||||
//
|
updateForSoftwareDataset(toupdatesoftwareresult.toJavaRDD(), software.toJavaRDD(), outputPath, "software",
|
||||||
// updateForSoftwareDataset(toupdatesoftwareresult.toJavaRDD(), software.toJavaRDD(), outputPath, "software",
|
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||||
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
|
||||||
//
|
updateForPublicationDataset(toupdatepublicationreresult.toJavaRDD(), publication.toJavaRDD(), outputPath, "publication",
|
||||||
// updateForPublicationDataset(toupdatepublicationreresult.toJavaRDD(), publication.toJavaRDD(), outputPath, "publication",
|
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
||||||
// PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
|
|
||||||
//
|
//
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -0,0 +1,33 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"paramName":"s",
|
||||||
|
"paramLongName":"sourcePath",
|
||||||
|
"paramDescription": "the path of the sequencial file to read",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName":"ocm",
|
||||||
|
"paramLongName":"organizationtoresultcommunitymap",
|
||||||
|
"paramDescription": "the map for the association organization communities",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName":"h",
|
||||||
|
"paramLongName":"hive_metastore_uris",
|
||||||
|
"paramDescription": "the hive metastore uris",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName":"wu",
|
||||||
|
"paramLongName":"writeUpdate",
|
||||||
|
"paramDescription": "true if the update must be writte. No double check if information is already present",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName":"sg",
|
||||||
|
"paramLongName":"saveGraph",
|
||||||
|
"paramDescription": "true if the new version of the graph must be saved",
|
||||||
|
"paramRequired": true
|
||||||
|
}
|
||||||
|
|
||||||
|
]
|
Loading…
Reference in New Issue