re-implemented to fix issue on not serializable Set<String> variable

This commit is contained in:
Miriam Baglioni 2020-07-30 16:43:43 +02:00
parent ef8e5957b5
commit 57c87b7653
1 changed files with 56 additions and 55 deletions

View File

@ -3,24 +3,12 @@ package eu.dnetlib.dhp.oa.graph.dump.graph;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Serializable; import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.*; import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.avro.generic.GenericData;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -34,7 +22,6 @@ import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.dump.oaf.Provenance; import eu.dnetlib.dhp.schema.dump.oaf.Provenance;
import eu.dnetlib.dhp.schema.dump.oaf.graph.Node; import eu.dnetlib.dhp.schema.dump.oaf.graph.Node;
import eu.dnetlib.dhp.schema.dump.oaf.graph.RelType; import eu.dnetlib.dhp.schema.dump.oaf.graph.RelType;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
public class SparkOrganizationRelation implements Serializable { public class SparkOrganizationRelation implements Serializable {
@ -67,43 +54,57 @@ public class SparkOrganizationRelation implements Serializable {
log.info("organization map : {}", new Gson().toJson(organizationMap)); log.info("organization map : {}", new Gson().toJson(organizationMap));
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
AtomicReference<Set<String>> relationSet = null;
runWithSparkSession( runWithSparkSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
Utils.removeOutputDir(spark, outputPath); Utils.removeOutputDir(spark, outputPath);
writeRelations(spark, extractRelation(spark, inputPath, organizationMap), outputPath, organizationMap); extractRelation(spark, inputPath, organizationMap, outputPath);
}); });
} }
private static void writeRelations(SparkSession spark, Set<String> rels, String outputPath, private static void extractRelation(SparkSession spark, String inputPath, OrganizationMap organizationMap,
OrganizationMap organizationMap) { String outputPath) {
Dataset<Relation> relationDataset = Utils.readPath(spark, inputPath, Relation.class);
relationDataset.createOrReplaceTempView("relation");
Set<String> organizationSet = organizationMap.keySet();
List<eu.dnetlib.dhp.schema.dump.oaf.graph.Relation> relList = new ArrayList<>(); List<eu.dnetlib.dhp.schema.dump.oaf.graph.Relation> relList = new ArrayList<>();
rels.forEach(oId -> { Dataset<MergedRels> mergedRelsDataset = spark
organizationMap.get(oId).forEach(community -> { .sql(
eu.dnetlib.dhp.schema.dump.oaf.graph.Relation direct = new eu.dnetlib.dhp.schema.dump.oaf.graph.Relation(); "SELECT target organizationId, source representativeId " +
eu.dnetlib.dhp.schema.dump.oaf.graph.Relation inverse = new eu.dnetlib.dhp.schema.dump.oaf.graph.Relation(); "FROM relation " +
String id = Utils.getContextId(community); "WHERE datainfo.deletedbyinference = false " +
direct.setSource(Node.newInstance(id, "context")); "AND relclass = 'merges' " +
direct.setTarget(Node.newInstance(oId, ModelSupport.idPrefixEntity.get(oId.substring(0, 2)))); "AND substr(source, 1, 2) = '20'")
direct.setReltype(RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP)); .as(Encoders.bean(MergedRels.class));
direct.setProvenance(Provenance.newInstance("Harvested", "0.9"));
relList.add(direct);
inverse.setTarget(Node.newInstance(id, "context"));
inverse.setSource(Node.newInstance(oId, ModelSupport.idPrefixEntity.get(oId.substring(0, 2))));
inverse.setReltype(RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP));
inverse.setProvenance(Provenance.newInstance("Harvested", "0.9"));
relList.add(inverse);
mergedRelsDataset.map((MapFunction<MergedRels, MergedRels>) mergedRels -> {
if (organizationMap.containsKey(mergedRels.getOrganizationId())) {
return mergedRels;
}
return null;
}, Encoders.bean(MergedRels.class))
.filter(Objects::nonNull)
.collectAsList()
.forEach(mergedRels -> {
String oId = mergedRels.getOrganizationId();
organizationSet.remove(oId);
organizationMap
.get(oId)
.forEach(community -> addRelations(relList, community, mergedRels.getRepresentativeId()));
}); });
}); organizationSet
.forEach(
oId -> organizationMap
.get(oId)
.forEach(community -> addRelations(relList, community, oId)));
spark spark
.createDataset(relList, Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.graph.Relation.class)) .createDataset(relList, Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.graph.Relation.class))
@ -111,31 +112,31 @@ public class SparkOrganizationRelation implements Serializable {
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath); .json(outputPath);
}
private static Set<String> extractRelation(SparkSession spark, String inputPath, OrganizationMap organizationMap) {
Dataset<Relation> tmp = Utils.readPath(spark, inputPath, Relation.class);
Set<String> organizationSet = organizationMap.keySet();
Set<String> toCreateRels = new HashSet<>();
tmp.foreach((ForeachFunction<Relation>) relation -> {
Optional<DataInfo> odInfo = Optional.ofNullable(relation.getDataInfo());
if (odInfo.isPresent()) {
if (!odInfo.get().getDeletedbyinference()) {
if (relation.getRelClass().equals(ModelConstants.MERGES)) {
String oId = relation.getTarget();
if (organizationSet.contains(oId)) {
organizationSet.remove(oId);
toCreateRels.add(relation.getSource());
}
}
}
}
});
toCreateRels.addAll(organizationSet);
return toCreateRels;
} }
private static void addRelations(List<eu.dnetlib.dhp.schema.dump.oaf.graph.Relation> relList, String community,
String organization) {
String id = Utils.getContextId(community);
log.info("create relation for organization: {}", organization);
relList
.add(
eu.dnetlib.dhp.schema.dump.oaf.graph.Relation
.newInstance(
Node.newInstance(id, Constants.CONTEXT_ENTITY),
Node.newInstance(organization, ModelSupport.idPrefixEntity.get(organization.substring(0, 2))),
RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP),
Provenance.newInstance(Constants.USER_CLAIM, Constants.DEFAULT_TRUST)));
relList
.add(
eu.dnetlib.dhp.schema.dump.oaf.graph.Relation
.newInstance(
Node.newInstance(organization, ModelSupport.idPrefixEntity.get(organization.substring(0, 2))),
Node.newInstance(id, Constants.CONTEXT_ENTITY),
RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP),
Provenance.newInstance(Constants.USER_CLAIM, Constants.DEFAULT_TRUST)));
}
} }