From 57c87b765359ee30010daffca76b644514c65d39 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 30 Jul 2020 16:43:43 +0200 Subject: [PATCH] re-implemented to fix issue on not serializable Set variable --- .../dump/graph/SparkOrganizationRelation.java | 111 +++++++++--------- 1 file changed, 56 insertions(+), 55 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkOrganizationRelation.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkOrganizationRelation.java index 0105fd871..09a70e9f7 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkOrganizationRelation.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkOrganizationRelation.java @@ -3,24 +3,12 @@ package eu.dnetlib.dhp.oa.graph.dump.graph; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import java.io.BufferedWriter; -import java.io.IOException; -import java.io.OutputStreamWriter; import java.io.Serializable; -import java.nio.charset.StandardCharsets; import java.util.*; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.avro.generic.GenericData; import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.ForeachFunction; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +22,6 @@ import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.dump.oaf.Provenance; import eu.dnetlib.dhp.schema.dump.oaf.graph.Node; import eu.dnetlib.dhp.schema.dump.oaf.graph.RelType; -import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Relation; public class SparkOrganizationRelation implements Serializable { @@ -67,43 +54,57 @@ public class SparkOrganizationRelation implements Serializable { log.info("organization map : {}", new Gson().toJson(organizationMap)); SparkConf conf = new SparkConf(); - AtomicReference> relationSet = null; runWithSparkSession( conf, isSparkSessionManaged, spark -> { Utils.removeOutputDir(spark, outputPath); - writeRelations(spark, extractRelation(spark, inputPath, organizationMap), outputPath, organizationMap); + extractRelation(spark, inputPath, organizationMap, outputPath); }); } - private static void writeRelations(SparkSession spark, Set rels, String outputPath, - OrganizationMap organizationMap) { + private static void extractRelation(SparkSession spark, String inputPath, OrganizationMap organizationMap, + String outputPath) { + Dataset relationDataset = Utils.readPath(spark, inputPath, Relation.class); + + relationDataset.createOrReplaceTempView("relation"); + Set organizationSet = organizationMap.keySet(); List relList = new ArrayList<>(); - rels.forEach(oId -> { - organizationMap.get(oId).forEach(community -> { - eu.dnetlib.dhp.schema.dump.oaf.graph.Relation direct = new eu.dnetlib.dhp.schema.dump.oaf.graph.Relation(); - eu.dnetlib.dhp.schema.dump.oaf.graph.Relation inverse = new eu.dnetlib.dhp.schema.dump.oaf.graph.Relation(); - String id = Utils.getContextId(community); - direct.setSource(Node.newInstance(id, "context")); - direct.setTarget(Node.newInstance(oId, ModelSupport.idPrefixEntity.get(oId.substring(0, 2)))); - direct.setReltype(RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP)); - direct.setProvenance(Provenance.newInstance("Harvested", "0.9")); - relList.add(direct); - inverse.setTarget(Node.newInstance(id, "context")); - inverse.setSource(Node.newInstance(oId, ModelSupport.idPrefixEntity.get(oId.substring(0, 2)))); - inverse.setReltype(RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP)); - inverse.setProvenance(Provenance.newInstance("Harvested", "0.9")); - relList.add(inverse); + Dataset mergedRelsDataset = spark + .sql( + "SELECT target organizationId, source representativeId " + + "FROM relation " + + "WHERE datainfo.deletedbyinference = false " + + "AND relclass = 'merges' " + + "AND substr(source, 1, 2) = '20'") + .as(Encoders.bean(MergedRels.class)); + mergedRelsDataset.map((MapFunction) mergedRels -> { + if (organizationMap.containsKey(mergedRels.getOrganizationId())) { + return mergedRels; + } + return null; + }, Encoders.bean(MergedRels.class)) + .filter(Objects::nonNull) + .collectAsList() + .forEach(mergedRels -> { + String oId = mergedRels.getOrganizationId(); + organizationSet.remove(oId); + organizationMap + .get(oId) + .forEach(community -> addRelations(relList, community, mergedRels.getRepresentativeId())); }); - }); + organizationSet + .forEach( + oId -> organizationMap + .get(oId) + .forEach(community -> addRelations(relList, community, oId))); spark .createDataset(relList, Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.graph.Relation.class)) @@ -111,31 +112,31 @@ public class SparkOrganizationRelation implements Serializable { .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(outputPath); + } - private static Set extractRelation(SparkSession spark, String inputPath, OrganizationMap organizationMap) { - Dataset tmp = Utils.readPath(spark, inputPath, Relation.class); - Set organizationSet = organizationMap.keySet(); - Set toCreateRels = new HashSet<>(); + private static void addRelations(List relList, String community, + String organization) { - tmp.foreach((ForeachFunction) relation -> { - Optional odInfo = Optional.ofNullable(relation.getDataInfo()); - if (odInfo.isPresent()) { - if (!odInfo.get().getDeletedbyinference()) { - if (relation.getRelClass().equals(ModelConstants.MERGES)) { - String oId = relation.getTarget(); - if (organizationSet.contains(oId)) { - organizationSet.remove(oId); - toCreateRels.add(relation.getSource()); - } - } - } - } - }); - - toCreateRels.addAll(organizationSet); - return toCreateRels; + String id = Utils.getContextId(community); + log.info("create relation for organization: {}", organization); + relList + .add( + eu.dnetlib.dhp.schema.dump.oaf.graph.Relation + .newInstance( + Node.newInstance(id, Constants.CONTEXT_ENTITY), + Node.newInstance(organization, ModelSupport.idPrefixEntity.get(organization.substring(0, 2))), + RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP), + Provenance.newInstance(Constants.USER_CLAIM, Constants.DEFAULT_TRUST))); + relList + .add( + eu.dnetlib.dhp.schema.dump.oaf.graph.Relation + .newInstance( + Node.newInstance(organization, ModelSupport.idPrefixEntity.get(organization.substring(0, 2))), + Node.newInstance(id, Constants.CONTEXT_ENTITY), + RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP), + Provenance.newInstance(Constants.USER_CLAIM, Constants.DEFAULT_TRUST))); } }