dnet-hadoop/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkOrganizationRelation.java

177 lines
5.9 KiB
Java
Raw Normal View History

package eu.dnetlib.dhp.oa.graph.dump.complete;
2020-07-29 17:42:50 +02:00
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
2020-07-29 17:42:50 +02:00
import java.io.Serializable;
import java.util.*;
2020-08-12 10:03:33 +02:00
import java.util.function.Consumer;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
2020-08-12 10:03:33 +02:00
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
2020-07-29 17:42:50 +02:00
import com.google.gson.Gson;
2020-07-29 17:42:50 +02:00
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
2020-09-14 14:33:10 +02:00
import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap;
2020-07-29 17:42:50 +02:00
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.dump.oaf.Provenance;
import eu.dnetlib.dhp.schema.dump.oaf.graph.Node;
import eu.dnetlib.dhp.schema.dump.oaf.graph.RelType;
import eu.dnetlib.dhp.schema.oaf.Relation;
/**
2020-08-19 10:57:36 +02:00
* Create new Relations between Context Entities and Organizations whose products are associated to the context. It
* produces relation such as: organization <-> isRelatedTo <-> context
*/
public class SparkOrganizationRelation implements Serializable {
2020-07-29 17:42:50 +02:00
private static final Logger log = LoggerFactory.getLogger(SparkOrganizationRelation.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkOrganizationRelation.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/complete/input_organization_parameters.json"));
2020-07-29 17:42:50 +02:00
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final OrganizationMap organizationMap = new Gson()
.fromJson(parser.get("organizationCommunityMap"), OrganizationMap.class);
final String serializedOrganizationMap = new Gson().toJson(organizationMap);
log.info("organization map : {}", serializedOrganizationMap);
2020-07-29 17:42:50 +02:00
2020-09-14 14:33:10 +02:00
final String communityMapPath = parser.get("communityMapPath");
2021-08-11 12:13:22 +02:00
log.info("communityMapPath: {}", communityMapPath);
2020-09-14 14:33:10 +02:00
2020-07-29 17:42:50 +02:00
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
2020-09-14 14:33:10 +02:00
extractRelation(spark, inputPath, organizationMap, outputPath, communityMapPath);
2020-07-29 17:42:50 +02:00
});
}
private static void extractRelation(SparkSession spark, String inputPath, OrganizationMap organizationMap,
2020-09-14 14:33:10 +02:00
String outputPath, String communityMapPath) {
CommunityMap communityMap = Utils.getCommunityMap(spark, communityMapPath);
Dataset<Relation> relationDataset = Utils.readPath(spark, inputPath, Relation.class);
2020-07-29 17:42:50 +02:00
relationDataset.createOrReplaceTempView("relation");
2020-07-29 17:42:50 +02:00
List<eu.dnetlib.dhp.schema.dump.oaf.graph.Relation> relList = new ArrayList<>();
2020-07-29 17:42:50 +02:00
Dataset<MergedRels> mergedRelsDataset = spark
.sql(
"SELECT target organizationId, source representativeId " +
"FROM relation " +
"WHERE datainfo.deletedbyinference = false " +
"AND relclass = 'merges' " +
"AND substr(source, 1, 2) = '20'")
.as(Encoders.bean(MergedRels.class));
mergedRelsDataset.map((MapFunction<MergedRels, MergedRels>) mergedRels -> {
if (organizationMap.containsKey(mergedRels.getOrganizationId())) {
return mergedRels;
}
return null;
}, Encoders.bean(MergedRels.class))
.filter(Objects::nonNull)
.collectAsList()
2020-09-14 14:33:10 +02:00
.forEach(getMergedRelsConsumer(organizationMap, relList, communityMap));
2020-07-29 17:42:50 +02:00
organizationMap
.keySet()
.forEach(
oId -> organizationMap
.get(oId)
2020-09-14 14:33:10 +02:00
.forEach(community -> {
if (communityMap.containsKey(community)) {
addRelations(relList, community, oId);
}
}));
2020-07-29 17:42:50 +02:00
spark
.createDataset(relList, Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.graph.Relation.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
2020-07-29 17:42:50 +02:00
2020-08-12 10:03:33 +02:00
@NotNull
private static Consumer<MergedRels> getMergedRelsConsumer(OrganizationMap organizationMap,
2020-09-14 14:33:10 +02:00
List<eu.dnetlib.dhp.schema.dump.oaf.graph.Relation> relList, CommunityMap communityMap) {
2020-08-12 10:03:33 +02:00
return mergedRels -> {
String oId = mergedRels.getOrganizationId();
organizationMap
.get(oId)
2020-09-14 14:33:10 +02:00
.forEach(community -> {
if (communityMap.containsKey(community)) {
addRelations(relList, community, mergedRels.getRepresentativeId());
}
});
2020-08-12 10:03:33 +02:00
organizationMap.remove(oId);
};
}
private static void addRelations(List<eu.dnetlib.dhp.schema.dump.oaf.graph.Relation> relList, String community,
String organization) {
String id = Utils.getContextId(community);
log.info("create relation for organization: {}", organization);
relList
.add(
eu.dnetlib.dhp.schema.dump.oaf.graph.Relation
.newInstance(
Node.newInstance(id, Constants.CONTEXT_ENTITY),
Node.newInstance(organization, ModelSupport.idPrefixEntity.get(organization.substring(0, 2))),
RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP),
2020-08-07 17:45:35 +02:00
Provenance
.newInstance(
eu.dnetlib.dhp.oa.graph.dump.Constants.USER_CLAIM,
eu.dnetlib.dhp.oa.graph.dump.Constants.DEFAULT_TRUST)));
relList
.add(
eu.dnetlib.dhp.schema.dump.oaf.graph.Relation
.newInstance(
Node.newInstance(organization, ModelSupport.idPrefixEntity.get(organization.substring(0, 2))),
Node.newInstance(id, Constants.CONTEXT_ENTITY),
RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP),
2020-08-07 17:45:35 +02:00
Provenance
.newInstance(
eu.dnetlib.dhp.oa.graph.dump.Constants.USER_CLAIM,
eu.dnetlib.dhp.oa.graph.dump.Constants.DEFAULT_TRUST)));
2020-07-29 17:42:50 +02:00
}
}