2021-10-20 11:44:23 +02:00
|
|
|
|
2021-10-29 11:20:03 +02:00
|
|
|
package eu.dnetlib.dhp.resulttoorganizationfromsemrel;
|
2021-10-20 11:44:23 +02:00
|
|
|
|
2021-10-29 11:20:03 +02:00
|
|
|
import static eu.dnetlib.dhp.PropagationConstant.*;
|
|
|
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
2021-10-20 11:44:23 +02:00
|
|
|
|
2021-10-29 11:20:03 +02:00
|
|
|
import java.io.Serializable;
|
|
|
|
import java.util.*;
|
2021-10-20 11:44:23 +02:00
|
|
|
|
2021-10-29 11:20:03 +02:00
|
|
|
import org.apache.commons.io.IOUtils;
|
|
|
|
import org.apache.spark.SparkConf;
|
|
|
|
import org.apache.spark.api.java.function.*;
|
|
|
|
import org.apache.spark.sql.*;
|
2021-10-20 11:44:23 +02:00
|
|
|
import org.slf4j.Logger;
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
2021-10-29 11:20:03 +02:00
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
2021-10-20 11:44:23 +02:00
|
|
|
|
2021-10-29 11:20:03 +02:00
|
|
|
import eu.dnetlib.dhp.KeyValueSet;
|
|
|
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|
|
|
import eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob;
|
|
|
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
|
|
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
|
|
|
import scala.Tuple2;
|
2021-10-20 11:44:23 +02:00
|
|
|
|
|
|
|
/**
|
|
|
|
* Searches for all the association between result and organization already existing in the graph
|
|
|
|
* Creates also the parenthood hierarchy from the organizations
|
|
|
|
*/
|
|
|
|
|
|
|
|
public class PrepareInfo implements Serializable {
|
|
|
|
|
2021-10-29 11:20:03 +02:00
|
|
|
private static final Logger log = LoggerFactory.getLogger(PrepareInfo.class);
|
|
|
|
|
|
|
|
// associate orgs with all their parent
|
2021-12-21 11:41:49 +01:00
|
|
|
private static final String ORGANIZATION_ORGANIZATION_QUERY = "SELECT target key, collect_set(source) as valueSet "
|
|
|
|
+
|
2021-10-29 11:20:03 +02:00
|
|
|
"FROM relation " +
|
|
|
|
"WHERE lower(relclass) = '" + ModelConstants.IS_PARENT_OF.toLowerCase() +
|
|
|
|
"' and datainfo.deletedbyinference = false " +
|
|
|
|
"GROUP BY target";
|
|
|
|
|
2021-12-21 11:41:49 +01:00
|
|
|
// associates results with all the orgs they are affiliated to
|
2021-11-16 13:56:32 +01:00
|
|
|
private static final String RESULT_ORGANIZATION_QUERY = "SELECT source key, collect_set(target) as valueSet " +
|
2021-10-29 11:20:03 +02:00
|
|
|
"FROM relation " +
|
|
|
|
"WHERE lower(relclass) = '" + ModelConstants.HAS_AUTHOR_INSTITUTION.toLowerCase() +
|
|
|
|
"' and datainfo.deletedbyinference = false " +
|
|
|
|
"GROUP BY source";
|
|
|
|
|
|
|
|
public static void main(String[] args) throws Exception {
|
|
|
|
|
|
|
|
String jsonConfiguration = IOUtils
|
|
|
|
.toString(
|
|
|
|
SparkResultToOrganizationFromIstRepoJob.class
|
|
|
|
.getResourceAsStream(
|
|
|
|
"/eu/dnetlib/dhp/resulttoorganizationfromsemrel/input_preparation_parameter.json"));
|
|
|
|
|
|
|
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
|
|
|
|
|
|
|
parser.parseArgument(args);
|
|
|
|
|
|
|
|
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
|
|
|
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
|
|
|
|
|
|
|
String graphPath = parser.get("graphPath");
|
|
|
|
log.info("graphPath: {}", graphPath);
|
|
|
|
|
|
|
|
final String leavesPath = parser.get("leavesPath");
|
|
|
|
log.info("leavesPath: {}", leavesPath);
|
|
|
|
|
|
|
|
final String childParentPath = parser.get("childParentPath");
|
|
|
|
log.info("childParentPath: {}", childParentPath);
|
|
|
|
|
|
|
|
final String resultOrganizationPath = parser.get("resultOrgPath");
|
|
|
|
log.info("resultOrganizationPath: {}", resultOrganizationPath);
|
|
|
|
|
2021-11-16 15:24:19 +01:00
|
|
|
final String relationPath = parser.get("relationPath");
|
|
|
|
log.info("relationPath: {}", relationPath);
|
|
|
|
|
2021-10-29 11:20:03 +02:00
|
|
|
SparkConf conf = new SparkConf();
|
|
|
|
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
|
|
|
|
|
|
|
|
runWithSparkHiveSession(
|
|
|
|
conf,
|
|
|
|
isSparkSessionManaged,
|
|
|
|
spark -> prepareInfo(
|
|
|
|
spark,
|
|
|
|
graphPath,
|
|
|
|
childParentPath,
|
|
|
|
leavesPath,
|
2021-11-16 15:24:19 +01:00
|
|
|
resultOrganizationPath,
|
2021-12-21 11:41:49 +01:00
|
|
|
relationPath));
|
2021-10-29 11:20:03 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
private static void prepareInfo(SparkSession spark, String inputPath, String childParentOrganizationPath,
|
2021-11-16 15:24:19 +01:00
|
|
|
String currentIterationPath, String resultOrganizationPath, String relationPath) {
|
2021-10-29 11:20:03 +02:00
|
|
|
Dataset<Relation> relation = readPath(spark, inputPath + "/relation", Relation.class);
|
|
|
|
relation.createOrReplaceTempView("relation");
|
|
|
|
|
|
|
|
spark
|
2021-11-16 13:56:32 +01:00
|
|
|
.sql(ORGANIZATION_ORGANIZATION_QUERY)
|
2021-10-29 11:20:03 +02:00
|
|
|
.as(Encoders.bean(KeyValueSet.class))
|
|
|
|
.write()
|
|
|
|
.mode(SaveMode.Overwrite)
|
|
|
|
.option("compression", "gzip")
|
|
|
|
.json(childParentOrganizationPath);
|
|
|
|
|
|
|
|
spark
|
2021-11-16 13:56:32 +01:00
|
|
|
.sql(RESULT_ORGANIZATION_QUERY)
|
2021-10-29 11:20:03 +02:00
|
|
|
.as(Encoders.bean(KeyValueSet.class))
|
|
|
|
.write()
|
|
|
|
.mode(SaveMode.Overwrite)
|
|
|
|
.option("compression", "gzip")
|
|
|
|
.json(resultOrganizationPath);
|
|
|
|
|
2021-11-16 15:24:19 +01:00
|
|
|
relation
|
2021-12-21 11:41:49 +01:00
|
|
|
.filter(
|
2023-02-15 16:20:24 +01:00
|
|
|
(FilterFunction<Relation>) r -> r.getRelClass().equals(ModelConstants.HAS_AUTHOR_INSTITUTION))
|
2021-12-21 11:41:49 +01:00
|
|
|
.write()
|
|
|
|
.mode(SaveMode.Overwrite)
|
|
|
|
.option("compression", "gzip")
|
|
|
|
.json(relationPath);
|
2021-11-16 15:24:19 +01:00
|
|
|
|
2021-10-29 11:20:03 +02:00
|
|
|
Dataset<String> children = spark
|
|
|
|
.sql(
|
|
|
|
"Select distinct target as child from relation where " +
|
|
|
|
"lower(relclass)='" + ModelConstants.IS_PARENT_OF.toLowerCase() +
|
|
|
|
"' and datainfo.deletedbyinference = false")
|
|
|
|
.as(Encoders.STRING());
|
|
|
|
|
|
|
|
Dataset<String> parent = spark
|
|
|
|
.sql(
|
|
|
|
"Select distinct source as parent from relation " +
|
|
|
|
"where lower(relclass)='" + ModelConstants.IS_PARENT_OF.toLowerCase() +
|
|
|
|
"' and datainfo.deletedbyinference = false")
|
|
|
|
.as(Encoders.STRING());
|
|
|
|
|
2021-11-16 13:56:32 +01:00
|
|
|
// takes from the join the entities having only the left hand side: the leaves. Saves them
|
2021-10-29 11:20:03 +02:00
|
|
|
children
|
|
|
|
.joinWith(parent, children.col("child").equalTo(parent.col("parent")), "left")
|
|
|
|
.map((MapFunction<Tuple2<String, String>, String>) value -> {
|
|
|
|
if (Optional.ofNullable(value._2()).isPresent()) {
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
|
|
|
|
return value._1();
|
|
|
|
}, Encoders.STRING())
|
|
|
|
.filter(Objects::nonNull)
|
|
|
|
.write()
|
|
|
|
.mode(SaveMode.Overwrite)
|
|
|
|
.json(currentIterationPath);
|
|
|
|
}
|
2021-10-20 11:44:23 +02:00
|
|
|
|
|
|
|
}
|