2020-11-25 14:55:32 +01:00
|
|
|
|
2022-01-19 12:24:52 +01:00
|
|
|
package eu.dnetlib.dhp.oa.merge;
|
2020-11-25 14:55:32 +01:00
|
|
|
|
2022-01-19 12:30:52 +01:00
|
|
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
2023-08-24 21:48:07 +02:00
|
|
|
import static org.apache.spark.sql.functions.col;
|
|
|
|
import static org.apache.spark.sql.functions.when;
|
2022-01-19 12:30:52 +01:00
|
|
|
|
2023-08-24 21:48:07 +02:00
|
|
|
import java.util.Map;
|
2022-01-19 12:30:52 +01:00
|
|
|
import java.util.Optional;
|
2023-08-24 21:48:07 +02:00
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
|
import java.util.concurrent.ForkJoinPool;
|
2022-01-19 12:30:52 +01:00
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
2022-01-19 12:24:52 +01:00
|
|
|
import org.apache.commons.io.IOUtils;
|
|
|
|
import org.apache.spark.SparkConf;
|
|
|
|
import org.apache.spark.api.java.function.MapFunction;
|
2023-08-24 21:48:07 +02:00
|
|
|
import org.apache.spark.api.java.function.ReduceFunction;
|
2022-01-19 12:24:52 +01:00
|
|
|
import org.apache.spark.sql.*;
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
import org.slf4j.LoggerFactory;
|
2020-11-25 14:55:32 +01:00
|
|
|
|
2022-01-19 12:30:52 +01:00
|
|
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|
|
|
import eu.dnetlib.dhp.common.HdfsSupport;
|
2023-08-24 21:48:07 +02:00
|
|
|
import eu.dnetlib.dhp.schema.common.EntityType;
|
2022-01-19 12:30:52 +01:00
|
|
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
2023-08-24 21:48:07 +02:00
|
|
|
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
2022-01-19 12:30:52 +01:00
|
|
|
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
|
|
|
import scala.Tuple2;
|
2022-01-19 12:24:52 +01:00
|
|
|
|
2020-11-25 14:55:32 +01:00
|
|
|
/**
|
|
|
|
* Groups the graph content by entity identifier to ensure ID uniqueness
|
|
|
|
*/
|
2021-07-28 11:34:19 +02:00
|
|
|
public class GroupEntitiesSparkJob {
|
2020-11-25 14:55:32 +01:00
|
|
|
private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class);
|
|
|
|
|
2023-10-06 12:31:17 +02:00
|
|
|
private static final Encoder<OafEntity> OAFENTITY_KRYO_ENC = Encoders.kryo(OafEntity.class);
|
2020-11-25 14:55:32 +01:00
|
|
|
|
|
|
|
public static void main(String[] args) throws Exception {
|
|
|
|
|
|
|
|
String jsonConfiguration = IOUtils
|
|
|
|
.toString(
|
|
|
|
GroupEntitiesSparkJob.class
|
|
|
|
.getResourceAsStream(
|
2022-01-19 17:13:21 +01:00
|
|
|
"/eu/dnetlib/dhp/oa/merge/group_graph_entities_parameters.json"));
|
2020-11-25 14:55:32 +01:00
|
|
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
|
|
|
parser.parseArgument(args);
|
|
|
|
|
|
|
|
Boolean isSparkSessionManaged = Optional
|
|
|
|
.ofNullable(parser.get("isSparkSessionManaged"))
|
|
|
|
.map(Boolean::valueOf)
|
|
|
|
.orElse(Boolean.TRUE);
|
|
|
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
|
|
|
|
|
|
|
String graphInputPath = parser.get("graphInputPath");
|
|
|
|
log.info("graphInputPath: {}", graphInputPath);
|
|
|
|
|
2023-08-24 21:48:07 +02:00
|
|
|
String checkpointPath = parser.get("checkpointPath");
|
|
|
|
log.info("checkpointPath: {}", checkpointPath);
|
|
|
|
|
2020-11-25 14:55:32 +01:00
|
|
|
String outputPath = parser.get("outputPath");
|
|
|
|
log.info("outputPath: {}", outputPath);
|
|
|
|
|
2023-08-24 21:48:07 +02:00
|
|
|
boolean filterInvisible = Boolean.valueOf(parser.get("filterInvisible"));
|
|
|
|
log.info("filterInvisible: {}", filterInvisible);
|
|
|
|
|
2020-11-25 14:55:32 +01:00
|
|
|
SparkConf conf = new SparkConf();
|
|
|
|
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
|
|
|
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
|
|
|
|
|
|
|
|
runWithSparkSession(
|
|
|
|
conf,
|
|
|
|
isSparkSessionManaged,
|
|
|
|
spark -> {
|
2023-10-17 07:54:01 +02:00
|
|
|
HdfsSupport.remove(checkpointPath, spark.sparkContext().hadoopConfiguration());
|
2023-08-24 21:48:07 +02:00
|
|
|
groupEntities(spark, graphInputPath, checkpointPath, outputPath, filterInvisible);
|
2020-11-25 14:55:32 +01:00
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
private static void groupEntities(
|
|
|
|
SparkSession spark,
|
|
|
|
String inputPath,
|
2023-08-24 21:48:07 +02:00
|
|
|
String checkpointPath,
|
|
|
|
String outputPath,
|
|
|
|
boolean filterInvisible) {
|
2020-11-25 14:55:32 +01:00
|
|
|
|
2023-08-24 21:48:07 +02:00
|
|
|
Dataset<OafEntity> allEntities = spark.emptyDataset(OAFENTITY_KRYO_ENC);
|
2020-11-25 14:55:32 +01:00
|
|
|
|
2023-08-24 21:48:07 +02:00
|
|
|
for (Map.Entry<EntityType, Class> e : ModelSupport.entityTypes.entrySet()) {
|
|
|
|
String entity = e.getKey().name();
|
|
|
|
Class<? extends OafEntity> entityClass = e.getValue();
|
|
|
|
String entityInputPath = inputPath + "/" + entity;
|
2020-11-25 14:55:32 +01:00
|
|
|
|
2023-08-24 21:48:07 +02:00
|
|
|
if (!HdfsSupport.exists(entityInputPath, spark.sparkContext().hadoopConfiguration())) {
|
|
|
|
continue;
|
2020-11-25 14:55:32 +01:00
|
|
|
}
|
|
|
|
|
2023-08-24 21:48:07 +02:00
|
|
|
allEntities = allEntities
|
|
|
|
.union(
|
|
|
|
((Dataset<OafEntity>) spark
|
|
|
|
.read()
|
|
|
|
.schema(Encoders.bean(entityClass).schema())
|
|
|
|
.json(entityInputPath)
|
|
|
|
.filter("length(id) > 0")
|
|
|
|
.as(Encoders.bean(entityClass)))
|
|
|
|
.map((MapFunction<OafEntity, OafEntity>) r -> r, OAFENTITY_KRYO_ENC));
|
2020-11-25 14:55:32 +01:00
|
|
|
}
|
|
|
|
|
2023-08-24 21:48:07 +02:00
|
|
|
Dataset<?> groupedEntities = allEntities
|
|
|
|
.groupByKey((MapFunction<OafEntity, String>) OafEntity::getId, Encoders.STRING())
|
|
|
|
.reduceGroups((ReduceFunction<OafEntity>) (b, a) -> OafMapperUtils.mergeEntities(b, a))
|
|
|
|
.map(
|
|
|
|
(MapFunction<Tuple2<String, OafEntity>, Tuple2<String, OafEntity>>) t -> new Tuple2(
|
|
|
|
t._2().getClass().getName(), t._2()),
|
|
|
|
Encoders.tuple(Encoders.STRING(), OAFENTITY_KRYO_ENC));
|
|
|
|
|
|
|
|
// pivot on "_1" (classname of the entity)
|
2023-10-06 12:31:17 +02:00
|
|
|
// created columns containing only entities of the same class
|
2023-08-24 21:48:07 +02:00
|
|
|
for (Map.Entry<EntityType, Class> e : ModelSupport.entityTypes.entrySet()) {
|
|
|
|
String entity = e.getKey().name();
|
|
|
|
Class<? extends OafEntity> entityClass = e.getValue();
|
|
|
|
|
|
|
|
groupedEntities = groupedEntities
|
|
|
|
.withColumn(
|
|
|
|
entity,
|
|
|
|
when(col("_1").equalTo(entityClass.getName()), col("_2")));
|
2020-11-25 14:55:32 +01:00
|
|
|
}
|
|
|
|
|
2023-08-24 21:48:07 +02:00
|
|
|
groupedEntities
|
|
|
|
.drop("_1", "_2")
|
|
|
|
.write()
|
|
|
|
.mode(SaveMode.Overwrite)
|
|
|
|
.option("compression", "gzip")
|
|
|
|
.save(checkpointPath);
|
2020-11-25 14:55:32 +01:00
|
|
|
|
2023-08-24 21:48:07 +02:00
|
|
|
ForkJoinPool parPool = new ForkJoinPool(ModelSupport.entityTypes.size());
|
2020-11-25 14:55:32 +01:00
|
|
|
|
2023-08-24 21:48:07 +02:00
|
|
|
ModelSupport.entityTypes
|
|
|
|
.entrySet()
|
2020-11-25 14:55:32 +01:00
|
|
|
.stream()
|
2023-08-24 21:48:07 +02:00
|
|
|
.map(e -> parPool.submit(() -> {
|
|
|
|
String entity = e.getKey().name();
|
|
|
|
Class<? extends OafEntity> entityClass = e.getValue();
|
|
|
|
|
|
|
|
spark
|
|
|
|
.read()
|
|
|
|
.load(checkpointPath)
|
|
|
|
.select(col(entity).as("value"))
|
|
|
|
.filter("value IS NOT NULL")
|
|
|
|
.as(OAFENTITY_KRYO_ENC)
|
|
|
|
.map((MapFunction<OafEntity, OafEntity>) r -> r, (Encoder<OafEntity>) Encoders.bean(entityClass))
|
|
|
|
.filter(filterInvisible ? "dataInfo.invisible != TRUE" : "TRUE")
|
|
|
|
.write()
|
|
|
|
.mode(SaveMode.Overwrite)
|
|
|
|
.option("compression", "gzip")
|
|
|
|
.json(outputPath + "/" + entity);
|
|
|
|
}))
|
|
|
|
.collect(Collectors.toList())
|
|
|
|
.forEach(t -> {
|
|
|
|
try {
|
|
|
|
t.get();
|
|
|
|
} catch (InterruptedException | ExecutionException e) {
|
|
|
|
throw new RuntimeException(e);
|
|
|
|
}
|
|
|
|
});
|
2020-11-25 14:55:32 +01:00
|
|
|
}
|
|
|
|
}
|