diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java deleted file mode 100644 index cc90feac2..000000000 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java +++ /dev/null @@ -1,690 +0,0 @@ -package eu.dnetlib.dhp.resulttocommunityfromsemrel; - -import static eu.dnetlib.dhp.PropagationConstant.*; - -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.QueryInformationSystem; -import eu.dnetlib.dhp.TypedRow; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.*; -import java.util.*; -import java.util.stream.Collectors; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.fs.FileSystem; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import scala.Tuple2; - -public class SparkResultToCommunityThroughSemRelJob { - public static void main(String[] args) throws Exception { - - final ArgumentApplicationParser parser = - new ArgumentApplicationParser( - IOUtils.toString( - SparkResultToCommunityThroughSemRelJob.class.getResourceAsStream( - "/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_communitytoresult_parameters.json"))); - parser.parseArgument(args); - - for (String key : parser.getObjectMap().keySet()) { - System.out.println(key + " = " + parser.get(key)); - } - - SparkConf conf = new SparkConf(); - conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - final SparkSession spark = - SparkSession.builder() - .appName(SparkResultToCommunityThroughSemRelJob.class.getSimpleName()) - .master(parser.get("master")) - .config(conf) - .enableHiveSupport() - .getOrCreate(); - - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - final String inputPath = parser.get("sourcePath"); - final String outputPath = "/tmp/provision/propagation/resulttocommunityfromsemrel"; - - // final List allowedsemrel = - // Arrays.asList(parser.get("allowedsemrels").split(";")); - final List allowedsemrel = Arrays.asList("isSupplementedBy", "isSupplementTo"); - // final List communityIdList = - // QueryInformationSystem.getCommunityList(parser.get("isLookupUrl")); - final List communityIdList = - QueryInformationSystem.getCommunityList( - "http://beta.services.openaire.eu:8280/is/services/isLookUp"); - - createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); - - JavaRDD all_publication_rdd = - sc.textFile(inputPath + "/publication") - .map(item -> new ObjectMapper().readValue(item, Publication.class)) - .filter(p -> !p.getDataInfo().getDeletedbyinference()) - .cache(); - JavaRDD publication_rdd = - all_publication_rdd.filter(p -> relatedToCommunities(p, communityIdList)).cache(); - - JavaRDD all_dataset_rdd = - sc.textFile(inputPath + "/dataset") - .map(item -> new ObjectMapper().readValue(item, Dataset.class)) - .filter(p -> !p.getDataInfo().getDeletedbyinference()) - .cache(); - JavaRDD dataset_rdd = - all_dataset_rdd.filter(p -> relatedToCommunities(p, communityIdList)).cache(); - - JavaRDD all_orp_rdd = - sc.textFile(inputPath + "/otherresearchproduct") - .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class)) - .filter(p -> !p.getDataInfo().getDeletedbyinference()) - .cache(); - JavaRDD orp_rdd = - all_orp_rdd.filter(p -> relatedToCommunities(p, communityIdList)).cache(); - - JavaRDD all_software_rdd = - sc.textFile(inputPath + "/software") - .map(item -> new ObjectMapper().readValue(item, Software.class)) - .filter(p -> !p.getDataInfo().getDeletedbyinference()) - .cache(); - JavaRDD software_rdd = - all_software_rdd.filter(p -> relatedToCommunities(p, communityIdList)).cache(); - - JavaRDD relation_rdd = - sc.textFile(inputPath + "/relation") - .map(item -> new ObjectMapper().readValue(item, Relation.class)) - .filter(r -> !r.getDataInfo().getDeletedbyinference()) - .filter( - r -> - allowedsemrel.contains(r.getRelClass()) - && RELATION_RESULTRESULT_REL_TYPE.equals( - r.getRelType())) - .cache(); - - org.apache.spark.sql.Dataset publication = - spark.createDataset(publication_rdd.rdd(), Encoders.bean(Publication.class)); - - org.apache.spark.sql.Dataset dataset = - spark.createDataset(dataset_rdd.rdd(), Encoders.bean(Dataset.class)); - - org.apache.spark.sql.Dataset other = - spark.createDataset(orp_rdd.rdd(), Encoders.bean(OtherResearchProduct.class)); - - org.apache.spark.sql.Dataset software = - spark.createDataset(software_rdd.rdd(), Encoders.bean(Software.class)); - - org.apache.spark.sql.Dataset relation = - spark.createDataset(relation_rdd.rdd(), Encoders.bean(Relation.class)); - - publication.createOrReplaceTempView("publication"); - relation.createOrReplaceTempView("relation"); - dataset.createOrReplaceTempView("dataset"); - software.createOrReplaceTempView("software"); - other.createOrReplaceTempView("other"); - - // org.apache.spark.sql.Dataset publication_context = getContext(spark, - // "publication"); - // publication_context.createOrReplaceTempView("publication_context"); - - org.apache.spark.sql.Dataset publication_context = - spark.sql( - "SELECT relation.source, " - + "publication.context , relation.target " - + "FROM publication " - + " JOIN relation " - + "ON id = source"); - - org.apache.spark.sql.Dataset dataset_context = getContext(spark, "dataset"); - dataset_context.createOrReplaceTempView("dataset_context"); - - org.apache.spark.sql.Dataset software_context = getContext(spark, "software"); - software_context.createOrReplaceTempView("software_context"); - - org.apache.spark.sql.Dataset other_context = getContext(spark, "other"); - other_context.createOrReplaceTempView("other_context"); - - publication = - spark.createDataset(all_publication_rdd.rdd(), Encoders.bean(Publication.class)); - publication.createOrReplaceTempView("publication"); - - dataset = spark.createDataset(all_dataset_rdd.rdd(), Encoders.bean(Dataset.class)); - dataset.createOrReplaceTempView("dataset"); - - other = spark.createDataset(all_orp_rdd.rdd(), Encoders.bean(OtherResearchProduct.class)); - other.createOrReplaceTempView("other"); - - software = spark.createDataset(all_software_rdd.rdd(), Encoders.bean(Software.class)); - software.createOrReplaceTempView("software"); - - org.apache.spark.sql.Dataset toupdatesoftwareresult = - getUpdateCommunitiesForTable(spark, "software"); - org.apache.spark.sql.Dataset toupdatedatasetresult = - getUpdateCommunitiesForTable(spark, "dataset"); - org.apache.spark.sql.Dataset toupdatepublicationreresult = - getUpdateCommunitiesForTable(spark, "publication"); - org.apache.spark.sql.Dataset toupdateotherresult = - getUpdateCommunitiesForTable(spark, "other"); - - createUpdateForResultDatasetWrite( - toupdatesoftwareresult.toJavaRDD(), - outputPath, - "software_update", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, - communityIdList); - - createUpdateForResultDatasetWrite( - toupdatedatasetresult.toJavaRDD(), - outputPath, - "dataset_update", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, - communityIdList); - - createUpdateForResultDatasetWrite( - toupdatepublicationreresult.toJavaRDD(), - outputPath, - "publication_update", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, - communityIdList); - - createUpdateForResultDatasetWrite( - toupdateotherresult.toJavaRDD(), - outputPath, - "other_update", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, - communityIdList); - - updateForDatasetDataset( - toupdatedatasetresult.toJavaRDD(), - dataset.toJavaRDD(), - outputPath, - "dataset", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, - communityIdList); - - updateForOtherDataset( - toupdateotherresult.toJavaRDD(), - other.toJavaRDD(), - outputPath, - "otherresearchproduct", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, - communityIdList); - - updateForSoftwareDataset( - toupdatesoftwareresult.toJavaRDD(), - software.toJavaRDD(), - outputPath, - "software", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, - communityIdList); - - updateForPublicationDataset( - toupdatepublicationreresult.toJavaRDD(), - publication.toJavaRDD(), - outputPath, - "publication", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, - communityIdList); - - /* - JavaPairRDD resultLinkedToCommunities = publication - .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"publication")) - .filter(p -> !(p == null)) - .mapToPair(toPair()) - .union(datasets - .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"dataset")) - .filter(p -> !(p == null)) - .mapToPair(toPair()) - ) - .union(software - .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"software")) - .filter(p -> !(p == null)) - .mapToPair(toPair()) - ) - .union(other - .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"otherresearchproduct")) - .filter(p -> !(p == null)) - .mapToPair(toPair()) - ); - - JavaPairRDD to_add_result_communities = resultLinkedToCommunities.join(result_result).map(r -> r._2()._1().setSourceId(r._2()._2().getTargetId())) - .mapToPair(toPair()); - - JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p)); - - updateResultForCommunity(pubs, to_add_result_communities, outputPath, "publication", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); - updateResultForCommunity(dss, to_add_result_communities, outputPath, "dataset", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); - updateResultForCommunity(sfw, to_add_result_communities, outputPath, "software", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); - updateResultForCommunity(orp, to_add_result_communities, outputPath, "otherresearchproduct", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); - //leftouterjoin result.to_add_result_communities (result = java pair rdd result) [left outer join perche' li voglio tutti anche quelli che non ho aggiornato] - //per quelli che matchano cercare nel risultato se i context da aggiungere sono gia' presenti. Se non ci sono aggiungerli, altrimenti nulla - */ - } - - private static org.apache.spark.sql.Dataset getUpdateCommunitiesForTable( - SparkSession spark, String table) { - String query = - "SELECT target_id, collect_set(co.id) context_id " - + " FROM (SELECT t.id target_id, s.context source_context " - + " FROM context_software s " - + " JOIN " - + table - + " t " - + " ON s.target = t.id " - + " UNION ALL " - + " SELECT t.id target_id, d.context source_context " - + " FROM dataset_context d " - + " JOIN " - + table - + " t" - + " ON s.target = t.id " - + " UNION ALL " - + " SELECT t.id target_id, p.context source_context " - + " FROM publication_context p" - + " JOIN " - + table - + " t " - + " on p.target = t.id " - + " UNION ALL " - + " SELECT t.id target_id, o.context source_context " - + " FROM other_context o " - + " JOIN " - + table - + " t " - + " ON o.target = t.id) TMP " - + " LATERAL VIEW EXPLODE(source_context) MyT as co " - + " GROUP BY target_id"; - - return spark.sql(query); - } - - private static JavaRDD createUpdateForResultDatasetWrite( - JavaRDD toupdateresult, - String outputPath, - String type, - String class_id, - String class_name, - List communityIdList) { - return toupdateresult - .map( - r -> { - List contextList = new ArrayList(); - List toAddContext = r.getList(1); - for (String cId : toAddContext) { - if (communityIdList.contains(cId)) { - Context newContext = new Context(); - newContext.setId(cId); - newContext.setDataInfo( - Arrays.asList( - getDataInfo( - PROPAGATION_DATA_INFO_TYPE, - class_id, - class_name))); - contextList.add(newContext); - } - } - - if (contextList.size() > 0) { - Result ret = new Result(); - ret.setId(r.getString(0)); - ret.setContext(contextList); - return ret; - } - return null; - }) - .filter(r -> r != null); - } - - private static void updateForSoftwareDataset( - JavaRDD toupdateresult, - JavaRDD result, - String outputPath, - String type, - String class_id, - String class_name, - List communityIdList) { - JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); - getUpdateForResultDataset( - toupdateresult, - tmp, - outputPath, - type, - class_id, - class_name, - communityIdList) - .map(r -> (Software) r) - .map(s -> new ObjectMapper().writeValueAsString(s)) - .saveAsTextFile(outputPath + "/" + type); - } - - private static void updateForDatasetDataset( - JavaRDD toupdateresult, - JavaRDD result, - String outputPath, - String type, - String class_id, - String class_name, - List communityIdList) { - JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); - getUpdateForResultDataset( - toupdateresult, - tmp, - outputPath, - type, - class_id, - class_name, - communityIdList) - .map(r -> (Dataset) r) - .map(d -> new ObjectMapper().writeValueAsString(d)) - .saveAsTextFile(outputPath + "/" + type); - } - - private static void updateForPublicationDataset( - JavaRDD toupdateresult, - JavaRDD result, - String outputPath, - String type, - String class_id, - String class_name, - List communityIdList) { - JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); - getUpdateForResultDataset( - toupdateresult, - tmp, - outputPath, - type, - class_id, - class_name, - communityIdList) - .map(r -> (Publication) r) - .map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath + "/" + type); - } - - private static void updateForOtherDataset( - JavaRDD toupdateresult, - JavaRDD result, - String outputPath, - String type, - String class_id, - String class_name, - List communityIdList) { - JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); - getUpdateForResultDataset( - toupdateresult, - tmp, - outputPath, - type, - class_id, - class_name, - communityIdList) - .map(r -> (OtherResearchProduct) r) - .map(o -> new ObjectMapper().writeValueAsString(o)) - .saveAsTextFile(outputPath + "/" + type); - } - - private static JavaRDD getUpdateForResultDataset( - JavaRDD toupdateresult, - JavaPairRDD result, - String outputPath, - String type, - String class_id, - String class_name, - List communityIdList) { - return result.leftOuterJoin( - toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1)))) - .map( - c -> { - if (!c._2()._2().isPresent()) { - return c._2()._1(); - } - - List toAddContext = c._2()._2().get(); - Set context_set = new HashSet<>(); - for (Object cId : toAddContext) { - String id = (String) cId; - if (communityIdList.contains(id)) { - context_set.add(id); - } - } - for (Context context : c._2()._1().getContext()) { - if (context_set.contains(context)) { - context_set.remove(context); - } - } - - List contextList = - context_set.stream() - .map( - co -> { - Context newContext = new Context(); - newContext.setId(co); - newContext.setDataInfo( - Arrays.asList( - getDataInfo( - PROPAGATION_DATA_INFO_TYPE, - class_id, - class_name))); - return newContext; - }) - .collect(Collectors.toList()); - - if (contextList.size() > 0) { - Result r = new Result(); - r.setId(c._1()); - r.setContext(contextList); - return r; - } - return null; - }) - .filter(r -> r != null); - - // return toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1))) - // .join(result) - // .map(c -> { - // List toAddContext = c._2()._1(); - // Set context_set = new HashSet<>(); - // for(Object cId: toAddContext){ - // String id = (String)cId; - // if (communityIdList.contains(id)){ - // context_set.add(id); - // } - // } - // for (Context context: c._2()._2().getContext()){ - // if(context_set.contains(context)){ - // context_set.remove(context); - // } - // } - // - // List contextList = context_set.stream().map(co -> { - // Context newContext = new Context(); - // newContext.setId(co); - // - // newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, - // class_name))); - // return newContext; - // - // }).collect(Collectors.toList()); - // - // if(contextList.size() > 0 ){ - // Result r = new Result(); - // r.setId(c._1()); - // r.setContext(contextList); - // return r; - // } - // return null; - // }) - // .filter(r -> r != null); - } - - private static JavaRDD createUpdateForSoftwareDataset( - JavaRDD toupdateresult, - List communityList, - JavaRDD result, - String class_id, - String class_name) { - return result.mapToPair(s -> new Tuple2<>(s.getId(), s)) - .leftOuterJoin(getStringResultJavaPairRDD(toupdateresult, communityList)) - .map( - c -> { - Software oaf = c._2()._1(); - if (c._2()._2().isPresent()) { - - HashSet contexts = new HashSet<>(c._2()._2().get()); - - for (Context context : oaf.getContext()) { - if (contexts.contains(context.getId())) { - if (!context.getDataInfo().stream() - .map(di -> di.getInferenceprovenance()) - .collect(Collectors.toSet()) - .contains(PROPAGATION_DATA_INFO_TYPE)) { - context.getDataInfo() - .add( - getDataInfo( - PROPAGATION_DATA_INFO_TYPE, - class_id, - class_name)); - // community id already in the context of the result. - // Remove it from the set that has to be added - contexts.remove(context.getId()); - } - } - } - List cc = oaf.getContext(); - for (String cId : contexts) { - Context context = new Context(); - context.setId(cId); - context.setDataInfo( - Arrays.asList( - getDataInfo( - PROPAGATION_DATA_INFO_TYPE, - class_id, - class_name))); - cc.add(context); - } - oaf.setContext(cc); - } - return oaf; - }); - } - - private static JavaPairRDD> getStringResultJavaPairRDD( - JavaRDD toupdateresult, List communityList) { - return toupdateresult.mapToPair( - c -> { - List contextList = new ArrayList<>(); - List contexts = c.getList(1); - for (String context : contexts) { - if (communityList.contains(context)) { - contextList.add(context); - } - } - - return new Tuple2<>(c.getString(0), contextList); - }); - } - - private static org.apache.spark.sql.Dataset getContext(SparkSession spark, String table) { - String query = - "SELECT relation.source, " - + table - + ".context , relation.target " - + "FROM " - + table - + " JOIN relation " - + "ON id = source"; - - return spark.sql(query); - } - - private static Boolean relatedToCommunities(Result r, List communityIdList) { - Set result_communities = - r.getContext().stream().map(c -> c.getId()).collect(Collectors.toSet()); - for (String communityId : result_communities) { - if (communityIdList.contains(communityId)) { - return true; - } - } - return false; - } - - private static void updateResult( - JavaPairRDD results, - JavaPairRDD toupdateresult, - String outputPath, - String type) { - results.leftOuterJoin(toupdateresult) - .map( - p -> { - Result r = p._2()._1(); - if (p._2()._2().isPresent()) { - Set communityList = p._2()._2().get().getAccumulator(); - for (Context c : r.getContext()) { - if (communityList.contains(c.getId())) { - // verify if the datainfo for this context contains - // propagation - if (!c.getDataInfo().stream() - .map(di -> di.getInferenceprovenance()) - .collect(Collectors.toSet()) - .contains(PROPAGATION_DATA_INFO_TYPE)) { - c.getDataInfo() - .add( - getDataInfo( - PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME)); - // community id already in the context of the result. - // Remove it from the set that has to be added - communityList.remove(c.getId()); - } - } - } - List cc = r.getContext(); - for (String cId : communityList) { - Context context = new Context(); - context.setId(cId); - context.setDataInfo( - Arrays.asList( - getDataInfo( - PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME))); - cc.add(context); - } - r.setContext(cc); - } - return r; - }) - .map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath + "/" + type); - } - - private static TypedRow getTypedRow( - List communityIdList, List context, String id, String type) { - Set result_communities = - context.stream().map(c -> c.getId()).collect(Collectors.toSet()); - TypedRow tp = new TypedRow(); - tp.setSourceId(id); - tp.setType(type); - for (String communityId : result_communities) { - if (communityIdList.contains(communityId)) { - tp.add(communityId); - } - } - if (tp.getAccumulator() != null) { - return tp; - } - return null; - } -} diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob2.java deleted file mode 100644 index 3a0904404..000000000 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob2.java +++ /dev/null @@ -1,682 +0,0 @@ -package eu.dnetlib.dhp.resulttocommunityfromsemrel; - -import static eu.dnetlib.dhp.PropagationConstant.*; - -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.QueryInformationSystem; -import eu.dnetlib.dhp.TypedRow; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.*; -import java.util.*; -import java.util.stream.Collectors; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.fs.FileSystem; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import scala.Tuple2; - -public class SparkResultToCommunityThroughSemRelJob2 { - public static void main(String[] args) throws Exception { - - final ArgumentApplicationParser parser = - new ArgumentApplicationParser( - IOUtils.toString( - SparkResultToCommunityThroughSemRelJob2.class.getResourceAsStream( - "/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_communitytoresult_parameters.json"))); - parser.parseArgument(args); - - SparkConf conf = new SparkConf(); - conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - final SparkSession spark = - SparkSession.builder() - .appName(SparkResultToCommunityThroughSemRelJob2.class.getSimpleName()) - .master(parser.get("master")) - .config(conf) - .enableHiveSupport() - .getOrCreate(); - - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - final String inputPath = parser.get("sourcePath"); - final String outputPath = "/tmp/provision/propagation/resulttocommunityfromsemrel"; - - final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); - // final List allowedsemrel = Arrays.asList("isSupplementedBy", "isSupplementTo"); - final List communityIdList = - QueryInformationSystem.getCommunityList(parser.get("isLookupUrl")); - // final List communityIdList = - // QueryInformationSystem.getCommunityList("http://beta.services.openaire.eu:8280/is/services/isLookUp"); - - createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); - - JavaRDD publication_rdd = - sc.textFile(inputPath + "/publication") - .map(item -> new ObjectMapper().readValue(item, Publication.class)); - - System.out.println(publication_rdd.count()); - // JavaRDD dataset_rdd = sc.textFile(inputPath + "/dataset") - // .map(item -> new ObjectMapper().readValue(item, Dataset.class)); - // - // JavaRDD orp_rdd = sc.textFile(inputPath + - // "/otherresearchproduct") - // .map(item -> new ObjectMapper().readValue(item, - // OtherResearchProduct.class)); - // - // JavaRDD software_rdd = sc.textFile(inputPath + "/software") - // .map(item -> new ObjectMapper().readValue(item, Software.class)); - - JavaRDD relation_rdd = - sc.textFile(inputPath + "/relation") - .map(item -> new ObjectMapper().readValue(item, Relation.class)); - - System.out.println(relation_rdd.count()); - - // .filter(r -> !r.getDataInfo().getDeletedbyinference()) - // .filter(r -> allowedsemrel.contains(r.getRelClass()) && - // RELATION_RESULTRESULT_REL_TYPE.equals(r.getRelType())).cache(); - - org.apache.spark.sql.Dataset publication = - spark.createDataset(publication_rdd.rdd(), Encoders.bean(Publication.class)); - - org.apache.spark.sql.Dataset relation = - spark.createDataset(relation_rdd.rdd(), Encoders.bean(Relation.class)); - - // org.apache.spark.sql.Dataset dataset = - // spark.createDataset(dataset_rdd.rdd(), - // Encoders.bean(Dataset.class)); - // - // org.apache.spark.sql.Dataset other = - // spark.createDataset(orp_rdd.rdd(), - // Encoders.bean(OtherResearchProduct.class)); - // - // org.apache.spark.sql.Dataset software = - // spark.createDataset(software_rdd.rdd(), - // Encoders.bean(Software.class)); - // - // org.apache.spark.sql.Dataset relation = - // spark.createDataset(relation_rdd.rdd(), - // Encoders.bean(Relation.class)); - - publication.createOrReplaceTempView("publication"); - relation.createOrReplaceTempView("relation"); - // relation.createOrReplaceTempView("relation"); - // dataset.createOrReplaceTempView("dataset"); - // software.createOrReplaceTempView("software"); - // other.createOrReplaceTempView("other"); - - String communitylist = getConstraintList(" co.id = '", communityIdList); - - String semrellist = getConstraintList(" relClass = '", allowedsemrel); - - String query = - "Select source, community_context, target " - + "from (select id, collect_set(co.id) community_context " - + "from publication " - + "lateral view explode (context) c as co " - + "where datainfo.deletedbyinference = false " - + communitylist - + " group by id) p " - + "JOIN " - + "(select * " - + "from relation " - + "where datainfo.deletedbyinference = false and (relClass = 'isSupplementedBy' OR relClass = 'isSupplementTo')) r " - + "ON p.id = r.source"; - - org.apache.spark.sql.Dataset publication_context = spark.sql(query); - publication_context.createOrReplaceTempView("publication_context"); - - // ( source, (mes, dh-ch-, ni), target ) - query = - "select target , collect_set(co) " - + "from (select target, community_context " - + "from publication_context pc join publication p on " - + "p.id = pc.source) tmp " - + "lateral view explode (community_context) c as co " - + "group by target"; - - org.apache.spark.sql.Dataset toupdatepublicationreresult = spark.sql(query); - - System.out.println(toupdatepublicationreresult.count()); - - toupdatepublicationreresult - .toJavaRDD() - .map( - r -> { - TypedRow tp = new TypedRow(); - tp.setSourceId(r.getString(0)); - r.getList(1).stream().forEach(c -> tp.add((String) c)); - return tp; - }) - .map(tr -> new ObjectMapper().writeValueAsString(tr)) - .saveAsTextFile(outputPath + "/community2semrelonpublication"); - // toupdatepublicationreresult.toJavaRDD().flatMap(c -> { - // - // String source = c.getString(0); - // List relation_list = new ArrayList<>(); - // c.getList(1).stream() - // .forEach(res -> { - // Relation r = new Relation(); - // r.setSource(source); - // r.setTarget((String)res); - // r.setRelClass("produces"); - // relation_list.add(r); - // r = new Relation(); - // r.setSource((String)res); - // r.setTarget(source); - // r.setRelClass("isProducedBy"); - // relation_list.add(r); - // }); - // return relation_list.iterator(); - // }).map(tr -> new ObjectMapper().writeValueAsString(tr)) - // .saveAsTextFile(outputPath + "/community2semrel"); - // - - // org.apache.spark.sql.Dataset toupdatesoftwareresult = - // getUpdateCommunitiesForTable(spark, "software"); - // org.apache.spark.sql.Dataset toupdatedatasetresult = - // getUpdateCommunitiesForTable(spark, "dataset"); - // org.apache.spark.sql.Dataset toupdatepublicationreresult = - // getUpdateCommunitiesForTable(spark, "publication"); - // org.apache.spark.sql.Dataset toupdateotherresult = - // getUpdateCommunitiesForTable(spark, "other"); - - // createUpdateForResultDatasetWrite(toupdatesoftwareresult.toJavaRDD(), outputPath, - // "software_update", - // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); - // - // createUpdateForResultDatasetWrite(toupdatedatasetresult.toJavaRDD(), outputPath, - // "dataset_update", - // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); - - // createUpdateForResultDatasetWrite(toupdatepublicationreresult.toJavaRDD(), - // outputPath, "publication_update", - // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); - - // createUpdateForResultDatasetWrite(toupdateotherresult.toJavaRDD(), outputPath, - // "other_update", - // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); - // - // - // updateForDatasetDataset(toupdatedatasetresult.toJavaRDD(), dataset.toJavaRDD(), - // outputPath, "dataset", - // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); - // - // updateForOtherDataset(toupdateotherresult.toJavaRDD(), other.toJavaRDD(), - // outputPath, "otherresearchproduct", - // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); - // - // updateForSoftwareDataset(toupdatesoftwareresult.toJavaRDD(), software.toJavaRDD(), - // outputPath, "software", - // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); - // - // updateForPublicationDataset(toupdatepublicationreresult.toJavaRDD(), - // publication.toJavaRDD(), outputPath, "publication", - // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - // PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList); - // - - /* - JavaPairRDD resultLinkedToCommunities = publication - .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"publication")) - .filter(p -> !(p == null)) - .mapToPair(toPair()) - .union(datasets - .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"dataset")) - .filter(p -> !(p == null)) - .mapToPair(toPair()) - ) - .union(software - .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"software")) - .filter(p -> !(p == null)) - .mapToPair(toPair()) - ) - .union(other - .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"otherresearchproduct")) - .filter(p -> !(p == null)) - .mapToPair(toPair()) - ); - - JavaPairRDD to_add_result_communities = resultLinkedToCommunities.join(result_result).map(r -> r._2()._1().setSourceId(r._2()._2().getTargetId())) - .mapToPair(toPair()); - - JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p)); - - updateResultForCommunity(pubs, to_add_result_communities, outputPath, "publication", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); - updateResultForCommunity(dss, to_add_result_communities, outputPath, "dataset", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); - updateResultForCommunity(sfw, to_add_result_communities, outputPath, "software", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); - updateResultForCommunity(orp, to_add_result_communities, outputPath, "otherresearchproduct", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); - //leftouterjoin result.to_add_result_communities (result = java pair rdd result) [left outer join perche' li voglio tutti anche quelli che non ho aggiornato] - //per quelli che matchano cercare nel risultato se i context da aggiungere sono gia' presenti. Se non ci sono aggiungerli, altrimenti nulla - */ - } - - private static org.apache.spark.sql.Dataset getUpdateCommunitiesForTable( - SparkSession spark, String table) { - String query = - "SELECT target_id, collect_set(co.id) context_id " - + " FROM (SELECT t.id target_id, s.context source_context " - + " FROM context_software s " - + " JOIN " - + table - + " t " - + " ON s.target = t.id " - + " UNION ALL " - + " SELECT t.id target_id, d.context source_context " - + " FROM dataset_context d " - + " JOIN " - + table - + " t" - + " ON s.target = t.id " - + " UNION ALL " - + " SELECT t.id target_id, p.context source_context " - + " FROM publication_context p" - + " JOIN " - + table - + " t " - + " on p.target = t.id " - + " UNION ALL " - + " SELECT t.id target_id, o.context source_context " - + " FROM other_context o " - + " JOIN " - + table - + " t " - + " ON o.target = t.id) TMP " - + " LATERAL VIEW EXPLODE(source_context) MyT as co " - + " GROUP BY target_id"; - - return spark.sql(query); - } - - private static JavaRDD createUpdateForResultDatasetWrite( - JavaRDD toupdateresult, - String outputPath, - String type, - String class_id, - String class_name, - List communityIdList) { - return toupdateresult - .map( - r -> { - List contextList = new ArrayList(); - List toAddContext = r.getList(1); - for (String cId : toAddContext) { - if (communityIdList.contains(cId)) { - Context newContext = new Context(); - newContext.setId(cId); - newContext.setDataInfo( - Arrays.asList( - getDataInfo( - PROPAGATION_DATA_INFO_TYPE, - class_id, - class_name))); - contextList.add(newContext); - } - } - - if (contextList.size() > 0) { - Result ret = new Result(); - ret.setId(r.getString(0)); - ret.setContext(contextList); - return ret; - } - return null; - }) - .filter(r -> r != null); - } - - private static void updateForSoftwareDataset( - JavaRDD toupdateresult, - JavaRDD result, - String outputPath, - String type, - String class_id, - String class_name, - List communityIdList) { - JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); - getUpdateForResultDataset( - toupdateresult, - tmp, - outputPath, - type, - class_id, - class_name, - communityIdList) - .map(r -> (Software) r) - .map(s -> new ObjectMapper().writeValueAsString(s)) - .saveAsTextFile(outputPath + "/" + type); - } - - private static void updateForDatasetDataset( - JavaRDD toupdateresult, - JavaRDD result, - String outputPath, - String type, - String class_id, - String class_name, - List communityIdList) { - JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); - getUpdateForResultDataset( - toupdateresult, - tmp, - outputPath, - type, - class_id, - class_name, - communityIdList) - .map(r -> (Dataset) r) - .map(d -> new ObjectMapper().writeValueAsString(d)) - .saveAsTextFile(outputPath + "/" + type); - } - - private static void updateForPublicationDataset( - JavaRDD toupdateresult, - JavaRDD result, - String outputPath, - String type, - String class_id, - String class_name, - List communityIdList) { - JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); - getUpdateForResultDataset( - toupdateresult, - tmp, - outputPath, - type, - class_id, - class_name, - communityIdList) - .map(r -> (Publication) r) - .map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath + "/" + type); - } - - private static void updateForOtherDataset( - JavaRDD toupdateresult, - JavaRDD result, - String outputPath, - String type, - String class_id, - String class_name, - List communityIdList) { - JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); - getUpdateForResultDataset( - toupdateresult, - tmp, - outputPath, - type, - class_id, - class_name, - communityIdList) - .map(r -> (OtherResearchProduct) r) - .map(o -> new ObjectMapper().writeValueAsString(o)) - .saveAsTextFile(outputPath + "/" + type); - } - - private static JavaRDD getUpdateForResultDataset( - JavaRDD toupdateresult, - JavaPairRDD result, - String outputPath, - String type, - String class_id, - String class_name, - List communityIdList) { - return result.leftOuterJoin( - toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1)))) - .map( - c -> { - if (!c._2()._2().isPresent()) { - return c._2()._1(); - } - - List toAddContext = c._2()._2().get(); - Set context_set = new HashSet<>(); - for (Object cId : toAddContext) { - String id = (String) cId; - if (communityIdList.contains(id)) { - context_set.add(id); - } - } - for (Context context : c._2()._1().getContext()) { - if (context_set.contains(context)) { - context_set.remove(context); - } - } - - List contextList = - context_set.stream() - .map( - co -> { - Context newContext = new Context(); - newContext.setId(co); - newContext.setDataInfo( - Arrays.asList( - getDataInfo( - PROPAGATION_DATA_INFO_TYPE, - class_id, - class_name))); - return newContext; - }) - .collect(Collectors.toList()); - - if (contextList.size() > 0) { - Result r = new Result(); - r.setId(c._1()); - r.setContext(contextList); - return r; - } - return null; - }) - .filter(r -> r != null); - - // return toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1))) - // .join(result) - // .map(c -> { - // List toAddContext = c._2()._1(); - // Set context_set = new HashSet<>(); - // for(Object cId: toAddContext){ - // String id = (String)cId; - // if (communityIdList.contains(id)){ - // context_set.add(id); - // } - // } - // for (Context context: c._2()._2().getContext()){ - // if(context_set.contains(context)){ - // context_set.remove(context); - // } - // } - // - // List contextList = context_set.stream().map(co -> { - // Context newContext = new Context(); - // newContext.setId(co); - // - // newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, - // class_name))); - // return newContext; - // - // }).collect(Collectors.toList()); - // - // if(contextList.size() > 0 ){ - // Result r = new Result(); - // r.setId(c._1()); - // r.setContext(contextList); - // return r; - // } - // return null; - // }) - // .filter(r -> r != null); - } - - private static JavaRDD createUpdateForSoftwareDataset( - JavaRDD toupdateresult, - List communityList, - JavaRDD result, - String class_id, - String class_name) { - return result.mapToPair(s -> new Tuple2<>(s.getId(), s)) - .leftOuterJoin(getStringResultJavaPairRDD(toupdateresult, communityList)) - .map( - c -> { - Software oaf = c._2()._1(); - if (c._2()._2().isPresent()) { - - HashSet contexts = new HashSet<>(c._2()._2().get()); - - for (Context context : oaf.getContext()) { - if (contexts.contains(context.getId())) { - if (!context.getDataInfo().stream() - .map(di -> di.getInferenceprovenance()) - .collect(Collectors.toSet()) - .contains(PROPAGATION_DATA_INFO_TYPE)) { - context.getDataInfo() - .add( - getDataInfo( - PROPAGATION_DATA_INFO_TYPE, - class_id, - class_name)); - // community id already in the context of the result. - // Remove it from the set that has to be added - contexts.remove(context.getId()); - } - } - } - List cc = oaf.getContext(); - for (String cId : contexts) { - Context context = new Context(); - context.setId(cId); - context.setDataInfo( - Arrays.asList( - getDataInfo( - PROPAGATION_DATA_INFO_TYPE, - class_id, - class_name))); - cc.add(context); - } - oaf.setContext(cc); - } - return oaf; - }); - } - - private static JavaPairRDD> getStringResultJavaPairRDD( - JavaRDD toupdateresult, List communityList) { - return toupdateresult.mapToPair( - c -> { - List contextList = new ArrayList<>(); - List contexts = c.getList(1); - for (String context : contexts) { - if (communityList.contains(context)) { - contextList.add(context); - } - } - - return new Tuple2<>(c.getString(0), contextList); - }); - } - - private static org.apache.spark.sql.Dataset getContext(SparkSession spark, String table) { - String query = - "SELECT relation.source, " - + table - + ".context , relation.target " - + "FROM " - + table - + " JOIN relation " - + "ON id = source"; - - return spark.sql(query); - } - - private static Boolean relatedToCommunities(Result r, List communityIdList) { - Set result_communities = - r.getContext().stream().map(c -> c.getId()).collect(Collectors.toSet()); - for (String communityId : result_communities) { - if (communityIdList.contains(communityId)) { - return true; - } - } - return false; - } - - private static void updateResult( - JavaPairRDD results, - JavaPairRDD toupdateresult, - String outputPath, - String type) { - results.leftOuterJoin(toupdateresult) - .map( - p -> { - Result r = p._2()._1(); - if (p._2()._2().isPresent()) { - Set communityList = p._2()._2().get().getAccumulator(); - for (Context c : r.getContext()) { - if (communityList.contains(c.getId())) { - // verify if the datainfo for this context contains - // propagation - if (!c.getDataInfo().stream() - .map(di -> di.getInferenceprovenance()) - .collect(Collectors.toSet()) - .contains(PROPAGATION_DATA_INFO_TYPE)) { - c.getDataInfo() - .add( - getDataInfo( - PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME)); - // community id already in the context of the result. - // Remove it from the set that has to be added - communityList.remove(c.getId()); - } - } - } - List cc = r.getContext(); - for (String cId : communityList) { - Context context = new Context(); - context.setId(cId); - context.setDataInfo( - Arrays.asList( - getDataInfo( - PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME))); - cc.add(context); - } - r.setContext(cc); - } - return r; - }) - .map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath + "/" + type); - } - - private static TypedRow getTypedRow( - List communityIdList, List context, String id, String type) { - Set result_communities = - context.stream().map(c -> c.getId()).collect(Collectors.toSet()); - TypedRow tp = new TypedRow(); - tp.setSourceId(id); - tp.setType(type); - for (String communityId : result_communities) { - if (communityIdList.contains(communityId)) { - tp.add(communityId); - } - } - if (tp.getAccumulator() != null) { - return tp; - } - return null; - } -} diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob3.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob3.java deleted file mode 100644 index 9c5e6c3b3..000000000 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob3.java +++ /dev/null @@ -1,660 +0,0 @@ -package eu.dnetlib.dhp.resulttocommunityfromsemrel; - -import static eu.dnetlib.dhp.PropagationConstant.*; - -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.QueryInformationSystem; -import eu.dnetlib.dhp.TypedRow; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.*; -import java.util.*; -import java.util.stream.Collectors; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.fs.FileSystem; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import scala.Tuple2; - -public class SparkResultToCommunityThroughSemRelJob3 { - public static void main(String[] args) throws Exception { - - final ArgumentApplicationParser parser = - new ArgumentApplicationParser( - IOUtils.toString( - SparkResultToCommunityThroughSemRelJob3.class.getResourceAsStream( - "/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_communitytoresult_parameters.json"))); - parser.parseArgument(args); - - SparkConf conf = new SparkConf(); - conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - final SparkSession spark = - SparkSession.builder() - .appName(SparkResultToCommunityThroughSemRelJob3.class.getSimpleName()) - .master(parser.get("master")) - .config(conf) - .enableHiveSupport() - .getOrCreate(); - - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - final String inputPath = parser.get("sourcePath"); - final String outputPath = "/tmp/provision/propagation/resulttocommunityfromsemrel"; - - final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); - - final List communityIdList = - QueryInformationSystem.getCommunityList(parser.get("isLookupUrl")); - - createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); - - JavaRDD publication_rdd = - sc.textFile(inputPath + "/publication") - .map(item -> new ObjectMapper().readValue(item, Publication.class)); - - JavaRDD dataset_rdd = - sc.textFile(inputPath + "/dataset") - .map(item -> new ObjectMapper().readValue(item, Dataset.class)); - - JavaRDD orp_rdd = - sc.textFile(inputPath + "/otherresearchproduct") - .map( - item -> - new ObjectMapper() - .readValue(item, OtherResearchProduct.class)); - - JavaRDD software_rdd = - sc.textFile(inputPath + "/software") - .map(item -> new ObjectMapper().readValue(item, Software.class)); - - JavaRDD relation_rdd = - sc.textFile(inputPath + "/relation") - .map(item -> new ObjectMapper().readValue(item, Relation.class)); - - org.apache.spark.sql.Dataset publication = - spark.createDataset(publication_rdd.rdd(), Encoders.bean(Publication.class)); - - org.apache.spark.sql.Dataset relation = - spark.createDataset(relation_rdd.rdd(), Encoders.bean(Relation.class)); - - org.apache.spark.sql.Dataset dataset = - spark.createDataset(dataset_rdd.rdd(), Encoders.bean(Dataset.class)); - - org.apache.spark.sql.Dataset other = - spark.createDataset(orp_rdd.rdd(), Encoders.bean(OtherResearchProduct.class)); - - org.apache.spark.sql.Dataset software = - spark.createDataset(software_rdd.rdd(), Encoders.bean(Software.class)); - - publication.createOrReplaceTempView("publication"); - relation.createOrReplaceTempView("relation"); - dataset.createOrReplaceTempView("dataset"); - software.createOrReplaceTempView("software"); - other.createOrReplaceTempView("other"); - - String communitylist = getConstraintList(" co.id = '", communityIdList); - - String semrellist = getConstraintList(" relClass = '", allowedsemrel); - - String query = - "Select source, community_context, target " - + "from (select id, collect_set(co.id) community_context " - + "from publication " - + "lateral view explode (context) c as co " - + "where datainfo.deletedbyinference = false " - + communitylist - + " group by id) p " - + "JOIN " - + "(select * " - + "from relation " - + "where datainfo.deletedbyinference = false " - + semrellist - + ") r " - + "ON p.id = r.source"; - - org.apache.spark.sql.Dataset publication_context = spark.sql(query); - publication_context.createOrReplaceTempView("publication_context"); - - // ( source, (mes, dh-ch-, ni), target ) - query = - "select target , collect_set(co) " - + "from (select target, community_context " - + "from publication_context pc join publication p on " - + "p.id = pc.source) tmp " - + "lateral view explode (community_context) c as co " - + "group by target"; - - org.apache.spark.sql.Dataset toupdatepublicationreresult = spark.sql(query); - org.apache.spark.sql.Dataset toupdatesoftwareresult = - getUpdateCommunitiesForTable(spark, "software"); - org.apache.spark.sql.Dataset toupdatedatasetresult = - getUpdateCommunitiesForTable(spark, "dataset"); - org.apache.spark.sql.Dataset toupdateotherresult = - getUpdateCommunitiesForTable(spark, "other"); - - createUpdateForResultDatasetWrite( - toupdatesoftwareresult.toJavaRDD(), - outputPath, - "software_update", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, - communityIdList); - - createUpdateForResultDatasetWrite( - toupdatedatasetresult.toJavaRDD(), - outputPath, - "dataset_update", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, - communityIdList); - - createUpdateForResultDatasetWrite( - toupdatepublicationreresult.toJavaRDD(), - outputPath, - "publication_update", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, - communityIdList); - - createUpdateForResultDatasetWrite( - toupdateotherresult.toJavaRDD(), - outputPath, - "other_update", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, - communityIdList); - - updateForDatasetDataset( - toupdatedatasetresult.toJavaRDD(), - dataset.toJavaRDD(), - outputPath, - "dataset", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, - communityIdList); - - updateForOtherDataset( - toupdateotherresult.toJavaRDD(), - other.toJavaRDD(), - outputPath, - "otherresearchproduct", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, - communityIdList); - - updateForSoftwareDataset( - toupdatesoftwareresult.toJavaRDD(), - software.toJavaRDD(), - outputPath, - "software", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, - communityIdList); - - updateForPublicationDataset( - toupdatepublicationreresult.toJavaRDD(), - publication.toJavaRDD(), - outputPath, - "publication", - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, - communityIdList); - // - - /* - JavaPairRDD resultLinkedToCommunities = publication - .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"publication")) - .filter(p -> !(p == null)) - .mapToPair(toPair()) - .union(datasets - .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"dataset")) - .filter(p -> !(p == null)) - .mapToPair(toPair()) - ) - .union(software - .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"software")) - .filter(p -> !(p == null)) - .mapToPair(toPair()) - ) - .union(other - .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"otherresearchproduct")) - .filter(p -> !(p == null)) - .mapToPair(toPair()) - ); - - JavaPairRDD to_add_result_communities = resultLinkedToCommunities.join(result_result).map(r -> r._2()._1().setSourceId(r._2()._2().getTargetId())) - .mapToPair(toPair()); - - JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p)); - - updateResultForCommunity(pubs, to_add_result_communities, outputPath, "publication", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); - updateResultForCommunity(dss, to_add_result_communities, outputPath, "dataset", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); - updateResultForCommunity(sfw, to_add_result_communities, outputPath, "software", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); - updateResultForCommunity(orp, to_add_result_communities, outputPath, "otherresearchproduct", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME); - //leftouterjoin result.to_add_result_communities (result = java pair rdd result) [left outer join perche' li voglio tutti anche quelli che non ho aggiornato] - //per quelli che matchano cercare nel risultato se i context da aggiungere sono gia' presenti. Se non ci sono aggiungerli, altrimenti nulla - */ - } - - private static org.apache.spark.sql.Dataset getUpdateCommunitiesForTable( - SparkSession spark, String table) { - String query = - "SELECT target_id, collect_set(co.id) context_id " - + " FROM (SELECT t.id target_id, s.context source_context " - + " FROM context_software s " - + " JOIN " - + table - + " t " - + " ON s.target = t.id " - + " UNION ALL " - + " SELECT t.id target_id, d.context source_context " - + " FROM dataset_context d " - + " JOIN " - + table - + " t" - + " ON s.target = t.id " - + " UNION ALL " - + " SELECT t.id target_id, p.context source_context " - + " FROM publication_context p" - + " JOIN " - + table - + " t " - + " on p.target = t.id " - + " UNION ALL " - + " SELECT t.id target_id, o.context source_context " - + " FROM other_context o " - + " JOIN " - + table - + " t " - + " ON o.target = t.id) TMP " - + " LATERAL VIEW EXPLODE(source_context) MyT as co " - + " GROUP BY target_id"; - - return spark.sql(query); - } - - private static JavaRDD createUpdateForResultDatasetWrite( - JavaRDD toupdateresult, - String outputPath, - String type, - String class_id, - String class_name, - List communityIdList) { - return toupdateresult - .map( - r -> { - List contextList = new ArrayList(); - List toAddContext = r.getList(1); - for (String cId : toAddContext) { - if (communityIdList.contains(cId)) { - Context newContext = new Context(); - newContext.setId(cId); - newContext.setDataInfo( - Arrays.asList( - getDataInfo( - PROPAGATION_DATA_INFO_TYPE, - class_id, - class_name))); - contextList.add(newContext); - } - } - - if (contextList.size() > 0) { - Result ret = new Result(); - ret.setId(r.getString(0)); - ret.setContext(contextList); - return ret; - } - return null; - }) - .filter(r -> r != null); - } - - private static void updateForSoftwareDataset( - JavaRDD toupdateresult, - JavaRDD result, - String outputPath, - String type, - String class_id, - String class_name, - List communityIdList) { - JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); - getUpdateForResultDataset( - toupdateresult, - tmp, - outputPath, - type, - class_id, - class_name, - communityIdList) - .map(r -> (Software) r) - .map(s -> new ObjectMapper().writeValueAsString(s)) - .saveAsTextFile(outputPath + "/" + type); - } - - private static void updateForDatasetDataset( - JavaRDD toupdateresult, - JavaRDD result, - String outputPath, - String type, - String class_id, - String class_name, - List communityIdList) { - JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); - getUpdateForResultDataset( - toupdateresult, - tmp, - outputPath, - type, - class_id, - class_name, - communityIdList) - .map(r -> (Dataset) r) - .map(d -> new ObjectMapper().writeValueAsString(d)) - .saveAsTextFile(outputPath + "/" + type); - } - - private static void updateForPublicationDataset( - JavaRDD toupdateresult, - JavaRDD result, - String outputPath, - String type, - String class_id, - String class_name, - List communityIdList) { - JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); - getUpdateForResultDataset( - toupdateresult, - tmp, - outputPath, - type, - class_id, - class_name, - communityIdList) - .map(r -> (Publication) r) - .map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath + "/" + type); - } - - private static void updateForOtherDataset( - JavaRDD toupdateresult, - JavaRDD result, - String outputPath, - String type, - String class_id, - String class_name, - List communityIdList) { - JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r)); - getUpdateForResultDataset( - toupdateresult, - tmp, - outputPath, - type, - class_id, - class_name, - communityIdList) - .map(r -> (OtherResearchProduct) r) - .map(o -> new ObjectMapper().writeValueAsString(o)) - .saveAsTextFile(outputPath + "/" + type); - } - - private static JavaRDD getUpdateForResultDataset( - JavaRDD toupdateresult, - JavaPairRDD result, - String outputPath, - String type, - String class_id, - String class_name, - List communityIdList) { - return result.leftOuterJoin( - toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1)))) - .map( - c -> { - if (!c._2()._2().isPresent()) { - return c._2()._1(); - } - - List toAddContext = c._2()._2().get(); - Set context_set = new HashSet<>(); - for (Object cId : toAddContext) { - String id = (String) cId; - if (communityIdList.contains(id)) { - context_set.add(id); - } - } - for (Context context : c._2()._1().getContext()) { - if (context_set.contains(context)) { - context_set.remove(context); - } - } - - List contextList = - context_set.stream() - .map( - co -> { - Context newContext = new Context(); - newContext.setId(co); - newContext.setDataInfo( - Arrays.asList( - getDataInfo( - PROPAGATION_DATA_INFO_TYPE, - class_id, - class_name))); - return newContext; - }) - .collect(Collectors.toList()); - - if (contextList.size() > 0) { - Result r = new Result(); - r.setId(c._1()); - r.setContext(contextList); - return r; - } - return null; - }) - .filter(r -> r != null); - - // return toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1))) - // .join(result) - // .map(c -> { - // List toAddContext = c._2()._1(); - // Set context_set = new HashSet<>(); - // for(Object cId: toAddContext){ - // String id = (String)cId; - // if (communityIdList.contains(id)){ - // context_set.add(id); - // } - // } - // for (Context context: c._2()._2().getContext()){ - // if(context_set.contains(context)){ - // context_set.remove(context); - // } - // } - // - // List contextList = context_set.stream().map(co -> { - // Context newContext = new Context(); - // newContext.setId(co); - // - // newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, - // class_name))); - // return newContext; - // - // }).collect(Collectors.toList()); - // - // if(contextList.size() > 0 ){ - // Result r = new Result(); - // r.setId(c._1()); - // r.setContext(contextList); - // return r; - // } - // return null; - // }) - // .filter(r -> r != null); - } - - private static JavaRDD createUpdateForSoftwareDataset( - JavaRDD toupdateresult, - List communityList, - JavaRDD result, - String class_id, - String class_name) { - return result.mapToPair(s -> new Tuple2<>(s.getId(), s)) - .leftOuterJoin(getStringResultJavaPairRDD(toupdateresult, communityList)) - .map( - c -> { - Software oaf = c._2()._1(); - if (c._2()._2().isPresent()) { - - HashSet contexts = new HashSet<>(c._2()._2().get()); - - for (Context context : oaf.getContext()) { - if (contexts.contains(context.getId())) { - if (!context.getDataInfo().stream() - .map(di -> di.getInferenceprovenance()) - .collect(Collectors.toSet()) - .contains(PROPAGATION_DATA_INFO_TYPE)) { - context.getDataInfo() - .add( - getDataInfo( - PROPAGATION_DATA_INFO_TYPE, - class_id, - class_name)); - // community id already in the context of the result. - // Remove it from the set that has to be added - contexts.remove(context.getId()); - } - } - } - List cc = oaf.getContext(); - for (String cId : contexts) { - Context context = new Context(); - context.setId(cId); - context.setDataInfo( - Arrays.asList( - getDataInfo( - PROPAGATION_DATA_INFO_TYPE, - class_id, - class_name))); - cc.add(context); - } - oaf.setContext(cc); - } - return oaf; - }); - } - - private static JavaPairRDD> getStringResultJavaPairRDD( - JavaRDD toupdateresult, List communityList) { - return toupdateresult.mapToPair( - c -> { - List contextList = new ArrayList<>(); - List contexts = c.getList(1); - for (String context : contexts) { - if (communityList.contains(context)) { - contextList.add(context); - } - } - - return new Tuple2<>(c.getString(0), contextList); - }); - } - - private static org.apache.spark.sql.Dataset getContext(SparkSession spark, String table) { - String query = - "SELECT relation.source, " - + table - + ".context , relation.target " - + "FROM " - + table - + " JOIN relation " - + "ON id = source"; - - return spark.sql(query); - } - - private static Boolean relatedToCommunities(Result r, List communityIdList) { - Set result_communities = - r.getContext().stream().map(c -> c.getId()).collect(Collectors.toSet()); - for (String communityId : result_communities) { - if (communityIdList.contains(communityId)) { - return true; - } - } - return false; - } - - private static void updateResult( - JavaPairRDD results, - JavaPairRDD toupdateresult, - String outputPath, - String type) { - results.leftOuterJoin(toupdateresult) - .map( - p -> { - Result r = p._2()._1(); - if (p._2()._2().isPresent()) { - Set communityList = p._2()._2().get().getAccumulator(); - for (Context c : r.getContext()) { - if (communityList.contains(c.getId())) { - // verify if the datainfo for this context contains - // propagation - if (!c.getDataInfo().stream() - .map(di -> di.getInferenceprovenance()) - .collect(Collectors.toSet()) - .contains(PROPAGATION_DATA_INFO_TYPE)) { - c.getDataInfo() - .add( - getDataInfo( - PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME)); - // community id already in the context of the result. - // Remove it from the set that has to be added - communityList.remove(c.getId()); - } - } - } - List cc = r.getContext(); - for (String cId : communityList) { - Context context = new Context(); - context.setId(cId); - context.setDataInfo( - Arrays.asList( - getDataInfo( - PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME))); - cc.add(context); - } - r.setContext(cc); - } - return r; - }) - .map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath + "/" + type); - } - - private static TypedRow getTypedRow( - List communityIdList, List context, String id, String type) { - Set result_communities = - context.stream().map(c -> c.getId()).collect(Collectors.toSet()); - TypedRow tp = new TypedRow(); - tp.setSourceId(id); - tp.setType(type); - for (String communityId : result_communities) { - if (communityIdList.contains(communityId)) { - tp.add(communityId); - } - } - if (tp.getAccumulator() != null) { - return tp; - } - return null; - } -}