forked from D-Net/dnet-hadoop
[CommunityPatents] extends the community propagation considering also the results of type patents linked with a isrelatedto semantcis
This commit is contained in:
parent
cf7d9a32ab
commit
e5b04e61ff
|
@ -171,7 +171,7 @@ public class Utils implements Serializable {
|
||||||
public static List<String> getCommunityIdList(String baseURL) throws IOException {
|
public static List<String> getCommunityIdList(String baseURL) throws IOException {
|
||||||
return getValidCommunities(baseURL)
|
return getValidCommunities(baseURL)
|
||||||
.stream()
|
.stream()
|
||||||
.map(community -> community.getId())
|
.map(CommunityModel::getId)
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,11 +3,14 @@ package eu.dnetlib.dhp.resulttocommunityfromsemrel;
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.PropagationConstant.*;
|
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||||
|
import static java.lang.String.join;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.sql.*;
|
import org.apache.spark.sql.*;
|
||||||
|
@ -45,7 +48,7 @@ public class PrepareResultCommunitySetStep1 {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* a dataset for example could be linked to more than one publication. For each publication linked to that dataset
|
* a dataset for example could be linked to more than one publication. For each publication linked to that dataset
|
||||||
* the previous query will produce a row: targetId set of community context the target could possibly inherit with
|
* the previous query will produce a row: targetId, set of community context the target could possibly inherit. With
|
||||||
* the following query there will be a single row for each result linked to more than one result of the result type
|
* the following query there will be a single row for each result linked to more than one result of the result type
|
||||||
* currently being used
|
* currently being used
|
||||||
*/
|
*/
|
||||||
|
@ -56,6 +59,16 @@ public class PrepareResultCommunitySetStep1 {
|
||||||
+ "where length(co) > 0 "
|
+ "where length(co) > 0 "
|
||||||
+ "group by resultId";
|
+ "group by resultId";
|
||||||
|
|
||||||
|
private static final String RESULT_CONTEXT_QUERY_TEMPLATE_IS_RELATED_TO = "select target resultId, community_context "
|
||||||
|
+ "from (select id, collect_set(co.id) community_context "
|
||||||
|
+ " from result "
|
||||||
|
+ " lateral view explode (context) c as co "
|
||||||
|
+ " where datainfo.deletedbyinference = false %s "
|
||||||
|
+ " and array_contains(instance.instancetype.classname, 'Patent') group by id) p "
|
||||||
|
+ " JOIN "
|
||||||
|
+ " (select source, target from relation "
|
||||||
|
+ " where datainfo.deletedbyinference = false %s ) r ON p.id = r.source";
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
String jsonConfiguration = IOUtils
|
String jsonConfiguration = IOUtils
|
||||||
.toString(
|
.toString(
|
||||||
|
@ -82,15 +95,20 @@ public class PrepareResultCommunitySetStep1 {
|
||||||
SparkConf conf = new SparkConf();
|
SparkConf conf = new SparkConf();
|
||||||
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
|
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
|
||||||
|
|
||||||
final List<String> allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";"));
|
final String allowedsemrel = join(",", Arrays.stream(parser.get("allowedsemrels").split(";"))
|
||||||
|
.map(value -> "'" + value.toLowerCase() + "'")
|
||||||
|
.toArray(String[]::new));
|
||||||
|
|
||||||
log.info("allowedSemRel: {}", new Gson().toJson(allowedsemrel));
|
log.info("allowedSemRel: {}", new Gson().toJson(allowedsemrel));
|
||||||
|
|
||||||
final String baseURL = parser.get("baseURL");
|
final String baseURL = parser.get("baseURL");
|
||||||
log.info("baseURL: {}", baseURL);
|
log.info("baseURL: {}", baseURL);
|
||||||
|
|
||||||
final List<String> communityIdList = getCommunityList(baseURL);
|
final String communityIdList = join(",", getCommunityList(baseURL).stream()
|
||||||
log.info("communityIdList: {}", new Gson().toJson(communityIdList));
|
.map(value -> "'" + value.toLowerCase() + "'")
|
||||||
|
.toArray(String[]::new));
|
||||||
|
|
||||||
|
log.info("communityIdList: {}", new Gson().toJson(communityIdList));
|
||||||
final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase();
|
final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase();
|
||||||
log.info("resultType: {}", resultType);
|
log.info("resultType: {}", resultType);
|
||||||
|
|
||||||
|
@ -118,10 +136,10 @@ public class PrepareResultCommunitySetStep1 {
|
||||||
SparkSession spark,
|
SparkSession spark,
|
||||||
String inputPath,
|
String inputPath,
|
||||||
String outputPath,
|
String outputPath,
|
||||||
List<String> allowedsemrel,
|
String allowedsemrel,
|
||||||
Class<R> resultClazz,
|
Class<R> resultClazz,
|
||||||
String resultType,
|
String resultType,
|
||||||
List<String> communityIdList) {
|
String communityIdList) {
|
||||||
|
|
||||||
final String inputResultPath = inputPath + "/" + resultType;
|
final String inputResultPath = inputPath + "/" + resultType;
|
||||||
log.info("Reading Graph table from: {}", inputResultPath);
|
log.info("Reading Graph table from: {}", inputResultPath);
|
||||||
|
@ -141,8 +159,15 @@ public class PrepareResultCommunitySetStep1 {
|
||||||
String resultContextQuery = String
|
String resultContextQuery = String
|
||||||
.format(
|
.format(
|
||||||
RESULT_CONTEXT_QUERY_TEMPLATE,
|
RESULT_CONTEXT_QUERY_TEMPLATE,
|
||||||
getConstraintList(" lower(co.id) = '", communityIdList),
|
" lower(co.id) IN " + communityIdList,
|
||||||
getConstraintList(" lower(relClass) = '", allowedsemrel));
|
" AND lower(relClass) IN " + allowedsemrel);
|
||||||
|
|
||||||
|
String resultContextQueryIsRelatedTo = String
|
||||||
|
.format(
|
||||||
|
RESULT_CONTEXT_QUERY_TEMPLATE_IS_RELATED_TO,
|
||||||
|
" AND lower(co.id) IN " + communityIdList,
|
||||||
|
"AND lower(relClass) = '"+
|
||||||
|
ModelConstants.IS_RELATED_TO.toLowerCase() + "'");
|
||||||
|
|
||||||
Dataset<Row> result_context = spark.sql(resultContextQuery);
|
Dataset<Row> result_context = spark.sql(resultContextQuery);
|
||||||
result_context.createOrReplaceTempView("result_context");
|
result_context.createOrReplaceTempView("result_context");
|
||||||
|
@ -154,6 +179,16 @@ public class PrepareResultCommunitySetStep1 {
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.json(outputResultPath);
|
.json(outputResultPath);
|
||||||
|
|
||||||
|
result_context = spark.sql(resultContextQueryIsRelatedTo);
|
||||||
|
result_context.createOrReplaceTempView("result_context");
|
||||||
|
spark
|
||||||
|
.sql(RESULT_COMMUNITY_LIST_QUERY)
|
||||||
|
.as(Encoders.bean(ResultCommunityList.class))
|
||||||
|
.write()
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.mode(SaveMode.Append)
|
||||||
|
.json(outputResultPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static List<String> getCommunityList(final String baseURL) throws IOException {
|
public static List<String> getCommunityList(final String baseURL) throws IOException {
|
||||||
|
|
|
@ -4,6 +4,7 @@ package eu.dnetlib.dhp.resulttocommunityfromsemrel;
|
||||||
import static eu.dnetlib.dhp.PropagationConstant.*;
|
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
@ -76,22 +77,13 @@ public class PrepareResultCommunitySetStep2 {
|
||||||
if (b == null) {
|
if (b == null) {
|
||||||
return a;
|
return a;
|
||||||
}
|
}
|
||||||
Set<String> community_set = new HashSet<>();
|
Set<String> community_set = new HashSet<>(a.getCommunityList());
|
||||||
a.getCommunityList().stream().forEach(aa -> community_set.add(aa));
|
community_set.addAll(b.getCommunityList());
|
||||||
b
|
a.setCommunityList(new ArrayList<>(community_set));
|
||||||
.getCommunityList()
|
|
||||||
.stream()
|
|
||||||
.forEach(
|
|
||||||
aa -> {
|
|
||||||
if (!community_set.contains(aa)) {
|
|
||||||
a.getCommunityList().add(aa);
|
|
||||||
community_set.add(aa);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return a;
|
return a;
|
||||||
})
|
})
|
||||||
.map(Tuple2::_2)
|
.map(Tuple2::_2)
|
||||||
.map(r -> OBJECT_MAPPER.writeValueAsString(r))
|
.map(OBJECT_MAPPER::writeValueAsString)
|
||||||
.saveAsTextFile(outputPath, GzipCodec.class);
|
.saveAsTextFile(outputPath, GzipCodec.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue