From f206ff42d6ebec290ce3cf9c3222c10a02313ce6 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 20 Oct 2023 15:49:41 +0200 Subject: [PATCH] modified code to use the the API. Removing not needed parameters. Rewritten the code to exploit the parallel stream on the entity types --- .../dnetlib/dhp/bulktag/SparkBulkTagJob.java | 83 +++---- .../dhp/bulktag/community/Community.java | 9 +- .../community/CommunityConfiguration.java | 8 +- .../CommunityConfigurationFactory.java | 12 +- .../dhp/bulktag/community/Constraint.java | 17 +- .../dhp/bulktag/community/ResultTagger.java | 26 +- .../community/SelectionConstraints.java | 2 + .../PrepareResultCommunity2.java | 225 ++++++++++++++++++ 8 files changed, 301 insertions(+), 81 deletions(-) create mode 100644 dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunity2.java diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java index 3186ed5c0..68c740dd5 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java @@ -9,7 +9,6 @@ import java.util.*; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; -import org.apache.spark.api.java.function.ForeachFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -21,8 +20,11 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; +import eu.dnetlib.dhp.api.Utils; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.bulktag.community.*; +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.Result; @@ -53,50 +55,38 @@ public class SparkBulkTagJob { .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - Boolean isTest = Optional - .ofNullable(parser.get("isTest")) - .map(Boolean::valueOf) - .orElse(Boolean.FALSE); - log.info("isTest: {} ", isTest); - final String inputPath = parser.get("sourcePath"); log.info("inputPath: {}", inputPath); final String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); + final boolean production = Boolean.valueOf(parser.get("production")); + log.info("production: {}", production); + ProtoMap protoMappingParams = new Gson().fromJson(parser.get("pathMap"), ProtoMap.class); log.info("pathMap: {}", new Gson().toJson(protoMappingParams)); - final String resultClassName = parser.get("resultTableName"); - log.info("resultTableName: {}", resultClassName); - - final Boolean saveGraph = Optional - .ofNullable(parser.get("saveGraph")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("saveGraph: {}", saveGraph); - - Class resultClazz = (Class) Class.forName(resultClassName); - SparkConf conf = new SparkConf(); CommunityConfiguration cc; - String taggingConf = parser.get("taggingConf"); + String taggingConf = Optional + .ofNullable(parser.get("taggingConf")) + .map(String::valueOf) + .orElse(null); - if (isTest) { + if (taggingConf != null) { cc = CommunityConfigurationFactory.newInstance(taggingConf); } else { - cc = QueryInformationSystem.getCommunityConfiguration(parser.get("isLookUpUrl")); + cc = Utils.getCommunityConfiguration(production); } runWithSparkSession( conf, isSparkSessionManaged, spark -> { - removeOutputDir(spark, outputPath); extendCommunityConfigurationForEOSC(spark, inputPath, cc); - execBulkTag(spark, inputPath, outputPath, protoMappingParams, resultClazz, cc); + execBulkTag(spark, inputPath, outputPath, protoMappingParams, cc); }); } @@ -105,10 +95,7 @@ public class SparkBulkTagJob { Dataset datasources = readPath( spark, inputPath - .substring( - 0, - inputPath.lastIndexOf("/")) - + "/datasource", + + "datasource", Datasource.class) .filter((FilterFunction) ds -> isOKDatasource(ds)) .map((MapFunction) ds -> ds.getId(), Encoders.STRING()); @@ -116,10 +103,10 @@ public class SparkBulkTagJob { Map>> dsm = cc.getEoscDatasourceMap(); for (String ds : datasources.collectAsList()) { - final String dsId = ds.substring(3); - if (!dsm.containsKey(dsId)) { + // final String dsId = ds.substring(3); + if (!dsm.containsKey(ds)) { ArrayList> eoscList = new ArrayList<>(); - dsm.put(dsId, eoscList); + dsm.put(ds, eoscList); } } @@ -141,22 +128,30 @@ public class SparkBulkTagJob { String inputPath, String outputPath, ProtoMap protoMappingParams, - Class resultClazz, CommunityConfiguration communityConfiguration) { - ResultTagger resultTagger = new ResultTagger(); - readPath(spark, inputPath, resultClazz) - .map(patchResult(), Encoders.bean(resultClazz)) - .filter(Objects::nonNull) - .map( - (MapFunction) value -> resultTagger - .enrichContextCriteria( - value, communityConfiguration, protoMappingParams), - Encoders.bean(resultClazz)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath); + ModelSupport.entityTypes + .keySet() + .parallelStream() + .filter(e -> ModelSupport.isResult(e)) + .forEach(e -> { + removeOutputDir(spark, outputPath + e.name()); + ResultTagger resultTagger = new ResultTagger(); + Class resultClazz = ModelSupport.entityTypes.get(e); + readPath(spark, inputPath + e.name(), resultClazz) + .map(patchResult(), Encoders.bean(resultClazz)) + .filter(Objects::nonNull) + .map( + (MapFunction) value -> resultTagger + .enrichContextCriteria( + value, communityConfiguration, protoMappingParams), + Encoders.bean(resultClazz)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + e.name()); + }); + } public static Dataset readPath( diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/Community.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/Community.java index b44376e22..9cd3a8f82 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/Community.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/Community.java @@ -4,6 +4,7 @@ package eu.dnetlib.dhp.bulktag.community; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import com.google.gson.Gson; @@ -13,7 +14,7 @@ public class Community implements Serializable { private String id; private List subjects = new ArrayList<>(); private List providers = new ArrayList<>(); - private List zenodoCommunities = new ArrayList<>(); + private List zenodoCommunities = new ArrayList<>(); private SelectionConstraints constraints = new SelectionConstraints(); private SelectionConstraints removeConstraints = new SelectionConstraints(); @@ -26,7 +27,7 @@ public class Community implements Serializable { return !getSubjects().isEmpty() || !getProviders().isEmpty() || !getZenodoCommunities().isEmpty() - || getConstraints().getCriteria() != null; + || (Optional.ofNullable(getConstraints()).isPresent() && getConstraints().getCriteria() != null); } public String getId() { @@ -53,11 +54,11 @@ public class Community implements Serializable { this.providers = providers; } - public List getZenodoCommunities() { + public List getZenodoCommunities() { return zenodoCommunities; } - public void setZenodoCommunities(List zenodoCommunities) { + public void setZenodoCommunities(List zenodoCommunities) { this.zenodoCommunities = zenodoCommunities; } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/CommunityConfiguration.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/CommunityConfiguration.java index e061ccd5e..a658c7ff5 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/CommunityConfiguration.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/CommunityConfiguration.java @@ -81,7 +81,7 @@ public class CommunityConfiguration implements Serializable { this.removeConstraintsMap = removeConstraintsMap; } - CommunityConfiguration(final Map communities) { + public CommunityConfiguration(final Map communities) { this.communities = communities; init(); } @@ -117,10 +117,10 @@ public class CommunityConfiguration implements Serializable { add(d.getOpenaireId(), new Pair<>(id, d.getSelectionConstraints()), datasourceMap); } // get zenodo communities - for (ZenodoCommunity zc : c.getZenodoCommunities()) { + for (String zc : c.getZenodoCommunities()) { add( - zc.getZenodoCommunityId(), - new Pair<>(id, zc.getSelCriteria()), + zc, + new Pair<>(id, null), zenodocommunityMap); } selectionConstraintsMap.put(id, c.getConstraints()); diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/CommunityConfigurationFactory.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/CommunityConfigurationFactory.java index 7b9e03ef6..955ca3856 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/CommunityConfigurationFactory.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/CommunityConfigurationFactory.java @@ -143,16 +143,16 @@ public class CommunityConfigurationFactory { return providerList; } - private static List parseZenodoCommunities(final Node node) { + private static List parseZenodoCommunities(final Node node) { final List list = node.selectNodes("./zenodocommunities/zenodocommunity"); - final List zenodoCommunityList = new ArrayList<>(); + final List zenodoCommunityList = new ArrayList<>(); for (Node n : list) { - ZenodoCommunity zc = new ZenodoCommunity(); - zc.setZenodoCommunityId(n.selectSingleNode("./zenodoid").getText()); - zc.setSelCriteria(n.selectSingleNode("./selcriteria")); +// ZenodoCommunity zc = new ZenodoCommunity(); +// zc.setZenodoCommunityId(n.selectSingleNode("./zenodoid").getText()); +// zc.setSelCriteria(n.selectSingleNode("./selcriteria")); - zenodoCommunityList.add(zc); + zenodoCommunityList.add(n.selectSingleNode("./zenodoid").getText()); } log.info("size of the zenodo community list " + zenodoCommunityList.size()); diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/Constraint.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/Constraint.java index ed58cc14d..48d9be7cd 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/Constraint.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/Constraint.java @@ -4,6 +4,8 @@ package eu.dnetlib.dhp.bulktag.community; import java.io.Serializable; import java.lang.reflect.InvocationTargetException; +import org.apache.htrace.fasterxml.jackson.annotation.JsonIgnore; + import eu.dnetlib.dhp.bulktag.criteria.Selection; import eu.dnetlib.dhp.bulktag.criteria.VerbResolver; @@ -11,7 +13,8 @@ public class Constraint implements Serializable { private String verb; private String field; private String value; -// private String element; + + @JsonIgnore private Selection selection; public String getVerb() { @@ -38,10 +41,8 @@ public class Constraint implements Serializable { this.value = value; } - public void setSelection(Selection sel) { - selection = sel; - } + @JsonIgnore public void setSelection(VerbResolver resolver) throws InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException { @@ -52,11 +53,5 @@ public class Constraint implements Serializable { return selection.apply(metadata); } -// public String getElement() { -// return element; -// } -// -// public void setElement(String element) { -// this.element = element; -// } + } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java index 5f62c10f4..3b231a52d 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java @@ -85,16 +85,18 @@ public class ResultTagger implements Serializable { conf .getRemoveConstraintsMap() .keySet() - .forEach(communityId -> { - if (conf.getRemoveConstraintsMap().get(communityId).getCriteria() != null && - conf - .getRemoveConstraintsMap() - .get(communityId) - .getCriteria() - .stream() - .anyMatch(crit -> crit.verifyCriteria(param))) - removeCommunities.add(communityId); - }); + .forEach( + communityId -> { + if (conf.getRemoveConstraintsMap().keySet().contains(communityId) && + conf.getRemoveConstraintsMap().get(communityId).getCriteria() != null && + conf + .getRemoveConstraintsMap() + .get(communityId) + .getCriteria() + .stream() + .anyMatch(crit -> crit.verifyCriteria(param))) + removeCommunities.add(communityId); + }); // communities contains all the communities to be added as context for the result final Set communities = new HashSet<>(); @@ -124,10 +126,10 @@ public class ResultTagger implements Serializable { if (Objects.nonNull(result.getInstance())) { for (Instance i : result.getInstance()) { if (Objects.nonNull(i.getCollectedfrom()) && Objects.nonNull(i.getCollectedfrom().getKey())) { - collfrom.add(StringUtils.substringAfter(i.getCollectedfrom().getKey(), "|")); + collfrom.add(i.getCollectedfrom().getKey()); } if (Objects.nonNull(i.getHostedby()) && Objects.nonNull(i.getHostedby().getKey())) { - hostdby.add(StringUtils.substringAfter(i.getHostedby().getKey(), "|")); + hostdby.add(i.getHostedby().getKey()); } } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/SelectionConstraints.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/SelectionConstraints.java index c7dcce812..57cc658fc 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/SelectionConstraints.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/SelectionConstraints.java @@ -7,11 +7,13 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import eu.dnetlib.dhp.bulktag.criteria.VerbResolver; +@JsonAutoDetect public class SelectionConstraints implements Serializable { private List criteria; diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunity2.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunity2.java new file mode 100644 index 000000000..96523c502 --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunity2.java @@ -0,0 +1,225 @@ + +package eu.dnetlib.dhp.resulttocommunityfromorganization; + +/** + * @author miriam.baglioni + * @Date 16/10/23 + */ +/** + * @author miriam.baglioni + * @Date 16/10/23 + */ +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.PropagationConstant.OBJECT_MAPPER; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.*; +import java.util.stream.Collectors; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.*; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.Gson; + +import eu.dnetlib.dhp.api.Utils; +import eu.dnetlib.dhp.api.model.CommunityEntityMap; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.Relation; +import scala.Tuple2; + +public class PrepareResultCommunity2 { + + private static final Logger log = LoggerFactory + .getLogger(eu.dnetlib.dhp.resulttocommunityfromorganization.PrepareResultCommunitySet.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + eu.dnetlib.dhp.resulttocommunityfromorganization.PrepareResultCommunitySet.class + .getResourceAsStream( + "/eu/dnetlib/dhp/resulttocommunityfromorganization/input_preparecommunitytoresult_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final boolean production = Boolean.valueOf(parser.get("production")); + log.info("production: {}", production); + + final CommunityEntityMap organizationMap = Utils.getCommunityOrganization(production); + log.info("organizationMap: {}", new Gson().toJson(organizationMap)); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + removeOutputDir(spark, outputPath); + prepareInfo(spark, inputPath, outputPath, organizationMap); + }); + } + + private static void prepareInfo( + SparkSession spark, + String inputPath, + String outputPath, + CommunityEntityMap organizationMap) { + + final StructType structureSchema = new StructType() + .add( + "dataInfo", new StructType() + .add("deletedbyinference", DataTypes.BooleanType) + .add("invisible", DataTypes.BooleanType)) + .add("source", DataTypes.StringType) + .add("target", DataTypes.StringType) + .add("relClass", DataTypes.StringType); + + readPath(spark, inputPath, Relation.class) + .foreach((ForeachFunction) r -> System.out.println(new ObjectMapper().writeValueAsString(r))); + + Dataset resultOrganization = spark + .read() + .schema(Encoders.bean(Relation.class).schema()) + .json(inputPath) + .filter( + "dataInfo.deletedbyinference != true " + + "and relClass == '" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'") + .select( + new Column("source").as("resultId"), + new Column("target").as("organizationId")); + + resultOrganization + .foreach((ForeachFunction) r -> System.out.println(new ObjectMapper().writeValueAsString(r))); + + Dataset organizationOrganization = spark + .read() + .schema(structureSchema) + .json(inputPath) + .filter( + "dataInfo.deletedbyinference != true " + + "and relClass == '" + ModelConstants.MERGES + "'") + .select( + new Column("source").as("masterOrganization"), + new Column("target").as("duplicateOrganization")); + + resultOrganization + .joinWith( + organizationOrganization, resultOrganization + .col("organizationId") + .equalTo(organizationOrganization.col("masterOrganization")), + "left") + .groupByKey( + (MapFunction, String>) t2 -> (String) t2._1().getAs("resultId"), Encoders.STRING()) + .mapGroups((MapGroupsFunction, ResultCommunityList>) (k, v) -> { + ResultCommunityList rcl = new ResultCommunityList(); + rcl.setResultId(k); + ArrayList cl = new ArrayList<>(); + Tuple2 first = v.next(); + cl.addAll(organizationMap.get(first._1().getAs("organizationId"))); + if (Optional.ofNullable(first._2()).isPresent()) { + cl.addAll(organizationMap.get(first._2().getAs(("duplicateOrganization")))); + } + v.forEachRemaining(o -> cl.addAll(organizationMap.get(o._2().getAs("duplicateOrganization")))); + if (cl.size() == 0) + return null; + rcl.setCommunityList(new ArrayList<>(cl.stream().distinct().collect(Collectors.toList()))); + return rcl; + }, Encoders.bean(ResultCommunityList.class)) + .filter(Objects::nonNull) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + +// Dataset relation = readPath(spark, inputPath, Relation.class) +// .filter((FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() ); +// +// relation.createOrReplaceTempView("relation"); +// +// String query = "SELECT result_organization.source resultId, result_organization.target orgId, org_set merges " +// + "FROM (SELECT source, target " +// + " FROM relation " +// + " AND lower(relClass) = '" +// + ModelConstants.HAS_AUTHOR_INSTITUTION.toLowerCase() +// + "') result_organization " +// + "LEFT JOIN (SELECT source, collect_set(target) org_set " +// + " FROM relation " +// + " AND lower(relClass) = '" +// + ModelConstants.MERGES.toLowerCase() +// + "' " +// + " GROUP BY source) organization_organization " +// + "ON result_organization.target = organization_organization.source "; +// +// Dataset result_organizationset = spark +// .sql(query) +// .as(Encoders.bean(ResultOrganizations.class)); +// +// result_organizationset +// .map(mapResultCommunityFn(organizationMap), Encoders.bean(ResultCommunityList.class)) +// .filter(Objects::nonNull) +// .toJavaRDD() +// .mapToPair(value -> new Tuple2<>(value.getResultId(), value)) +// .reduceByKey((a, b) -> { +// ArrayList cl = a.getCommunityList(); +// b.getCommunityList().stream().forEach(s -> { +// if (!cl.contains(s)) { +// cl.add(s); +// } +// }); +// a.setCommunityList(cl); +// return a; +// }) +// .map(value -> OBJECT_MAPPER.writeValueAsString(value._2())) +// .saveAsTextFile(outputPath, GzipCodec.class); + } + + private static MapFunction mapResultCommunityFn( + CommunityEntityMap organizationMap) { + return value -> { + String rId = value.getResultId(); + Optional> orgs = Optional.ofNullable(value.getMerges()); + String oTarget = value.getOrgId(); + Set communitySet = new HashSet<>(); + if (organizationMap.containsKey(oTarget)) { + communitySet.addAll(organizationMap.get(oTarget)); + } + if (orgs.isPresent()) + for (String oId : orgs.get()) { + if (organizationMap.containsKey(oId)) { + communitySet.addAll(organizationMap.get(oId)); + } + } + if (!communitySet.isEmpty()) { + ResultCommunityList rcl = new ResultCommunityList(); + rcl.setResultId(rId); + ArrayList communityList = new ArrayList<>(); + communityList.addAll(communitySet); + rcl.setCommunityList(communityList); + return rcl; + } + return null; + }; + } +}