diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java index 6079da3656..1e39d99c3d 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java @@ -171,7 +171,7 @@ public class Utils implements Serializable { public static List getCommunityIdList(String baseURL) throws IOException { return getValidCommunities(baseURL) .stream() - .map(community -> community.getId()) + .map(CommunityModel::getId) .collect(Collectors.toList()); } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep1.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep1.java index aede9ef05b..ff496bb878 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep1.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep1.java @@ -3,11 +3,14 @@ package eu.dnetlib.dhp.resulttocommunityfromsemrel; import static eu.dnetlib.dhp.PropagationConstant.*; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; +import static java.lang.String.join; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import eu.dnetlib.dhp.schema.common.ModelConstants; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.sql.*; @@ -45,7 +48,7 @@ public class PrepareResultCommunitySetStep1 { /** * a dataset for example could be linked to more than one publication. For each publication linked to that dataset - * the previous query will produce a row: targetId set of community context the target could possibly inherit with + * the previous query will produce a row: targetId, set of community context the target could possibly inherit. With * the following query there will be a single row for each result linked to more than one result of the result type * currently being used */ @@ -56,6 +59,16 @@ public class PrepareResultCommunitySetStep1 { + "where length(co) > 0 " + "group by resultId"; + private static final String RESULT_CONTEXT_QUERY_TEMPLATE_IS_RELATED_TO = "select target resultId, community_context " + + "from (select id, collect_set(co.id) community_context " + + " from result " + + " lateral view explode (context) c as co " + + " where datainfo.deletedbyinference = false %s " + + " and array_contains(instance.instancetype.classname, 'Patent') group by id) p " + + " JOIN " + + " (select source, target from relation " + + " where datainfo.deletedbyinference = false %s ) r ON p.id = r.source"; + public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( @@ -82,15 +95,20 @@ public class PrepareResultCommunitySetStep1 { SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); + final String allowedsemrel = join(",", Arrays.stream(parser.get("allowedsemrels").split(";")) + .map(value -> "'" + value.toLowerCase() + "'") + .toArray(String[]::new)); + log.info("allowedSemRel: {}", new Gson().toJson(allowedsemrel)); final String baseURL = parser.get("baseURL"); log.info("baseURL: {}", baseURL); - final List communityIdList = getCommunityList(baseURL); - log.info("communityIdList: {}", new Gson().toJson(communityIdList)); + final String communityIdList = join(",", getCommunityList(baseURL).stream() + .map(value -> "'" + value.toLowerCase() + "'") + .toArray(String[]::new)); + log.info("communityIdList: {}", new Gson().toJson(communityIdList)); final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase(); log.info("resultType: {}", resultType); @@ -118,10 +136,10 @@ public class PrepareResultCommunitySetStep1 { SparkSession spark, String inputPath, String outputPath, - List allowedsemrel, + String allowedsemrel, Class resultClazz, String resultType, - List communityIdList) { + String communityIdList) { final String inputResultPath = inputPath + "/" + resultType; log.info("Reading Graph table from: {}", inputResultPath); @@ -141,8 +159,15 @@ public class PrepareResultCommunitySetStep1 { String resultContextQuery = String .format( RESULT_CONTEXT_QUERY_TEMPLATE, - getConstraintList(" lower(co.id) = '", communityIdList), - getConstraintList(" lower(relClass) = '", allowedsemrel)); + " lower(co.id) IN " + communityIdList, + " AND lower(relClass) IN " + allowedsemrel); + + String resultContextQueryIsRelatedTo = String + .format( + RESULT_CONTEXT_QUERY_TEMPLATE_IS_RELATED_TO, + " AND lower(co.id) IN " + communityIdList, + "AND lower(relClass) = '"+ + ModelConstants.IS_RELATED_TO.toLowerCase() + "'"); Dataset result_context = spark.sql(resultContextQuery); result_context.createOrReplaceTempView("result_context"); @@ -154,6 +179,16 @@ public class PrepareResultCommunitySetStep1 { .option("compression", "gzip") .mode(SaveMode.Overwrite) .json(outputResultPath); + + result_context = spark.sql(resultContextQueryIsRelatedTo); + result_context.createOrReplaceTempView("result_context"); + spark + .sql(RESULT_COMMUNITY_LIST_QUERY) + .as(Encoders.bean(ResultCommunityList.class)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Append) + .json(outputResultPath); } public static List getCommunityList(final String baseURL) throws IOException { diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep2.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep2.java index a53d3dfe32..9bebc36e58 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep2.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep2.java @@ -4,6 +4,7 @@ package eu.dnetlib.dhp.resulttocommunityfromsemrel; import static eu.dnetlib.dhp.PropagationConstant.*; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import java.util.ArrayList; import java.util.HashSet; import java.util.Set; @@ -76,22 +77,13 @@ public class PrepareResultCommunitySetStep2 { if (b == null) { return a; } - Set community_set = new HashSet<>(); - a.getCommunityList().stream().forEach(aa -> community_set.add(aa)); - b - .getCommunityList() - .stream() - .forEach( - aa -> { - if (!community_set.contains(aa)) { - a.getCommunityList().add(aa); - community_set.add(aa); - } - }); + Set community_set = new HashSet<>(a.getCommunityList()); + community_set.addAll(b.getCommunityList()); + a.setCommunityList(new ArrayList<>(community_set)); return a; }) .map(Tuple2::_2) - .map(r -> OBJECT_MAPPER.writeValueAsString(r)) + .map(OBJECT_MAPPER::writeValueAsString) .saveAsTextFile(outputPath, GzipCodec.class); }