2020-05-08 16:49:47 +02:00
|
|
|
|
2020-05-07 12:31:26 +02:00
|
|
|
package eu.dnetlib.dhp.broker.oa;
|
|
|
|
|
|
|
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
|
|
|
|
|
|
|
import java.util.Optional;
|
|
|
|
|
|
|
|
import org.apache.commons.io.IOUtils;
|
|
|
|
import org.apache.spark.SparkConf;
|
2020-05-19 16:17:35 +02:00
|
|
|
import org.apache.spark.api.java.function.MapFunction;
|
|
|
|
import org.apache.spark.sql.Dataset;
|
|
|
|
import org.apache.spark.sql.Encoders;
|
2020-06-08 16:26:16 +02:00
|
|
|
import org.apache.spark.sql.SaveMode;
|
2020-05-07 12:31:26 +02:00
|
|
|
import org.apache.spark.sql.SparkSession;
|
2020-06-08 16:26:16 +02:00
|
|
|
import org.apache.spark.sql.TypedColumn;
|
2020-05-07 12:31:26 +02:00
|
|
|
import org.slf4j.Logger;
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
2020-05-19 16:17:35 +02:00
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
|
|
2020-05-07 12:31:26 +02:00
|
|
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|
|
|
import eu.dnetlib.dhp.broker.model.Event;
|
2020-05-21 16:47:53 +02:00
|
|
|
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
|
2020-06-11 11:25:18 +02:00
|
|
|
import eu.dnetlib.dhp.broker.oa.util.EventFinder;
|
2020-06-08 16:26:16 +02:00
|
|
|
import eu.dnetlib.dhp.broker.oa.util.EventGroup;
|
2020-06-11 11:25:18 +02:00
|
|
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator;
|
|
|
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
|
|
|
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory;
|
|
|
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
|
|
|
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
|
|
|
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelationsAggregator;
|
2020-05-07 12:31:26 +02:00
|
|
|
import eu.dnetlib.dhp.common.HdfsSupport;
|
2020-05-21 16:47:53 +02:00
|
|
|
import eu.dnetlib.dhp.schema.oaf.Project;
|
2020-05-19 16:17:35 +02:00
|
|
|
import eu.dnetlib.dhp.schema.oaf.Publication;
|
|
|
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
2020-05-07 12:31:26 +02:00
|
|
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
2020-05-19 16:17:35 +02:00
|
|
|
import eu.dnetlib.dhp.schema.oaf.Software;
|
2020-06-09 16:01:31 +02:00
|
|
|
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
|
|
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
|
|
|
import eu.dnetlib.pace.config.DedupConfig;
|
2020-06-08 16:26:16 +02:00
|
|
|
import scala.Tuple2;
|
2020-05-07 12:31:26 +02:00
|
|
|
|
|
|
|
public class GenerateEventsApplication {
|
|
|
|
|
|
|
|
private static final Logger log = LoggerFactory.getLogger(GenerateEventsApplication.class);
|
|
|
|
|
2020-06-09 16:01:31 +02:00
|
|
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
2020-05-19 16:17:35 +02:00
|
|
|
|
2020-05-07 12:31:26 +02:00
|
|
|
public static void main(final String[] args) throws Exception {
|
|
|
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
2020-05-08 16:49:47 +02:00
|
|
|
IOUtils
|
2020-06-10 12:11:16 +02:00
|
|
|
.toString(
|
|
|
|
GenerateEventsApplication.class
|
|
|
|
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json")));
|
2020-05-07 12:31:26 +02:00
|
|
|
parser.parseArgument(args);
|
|
|
|
|
|
|
|
final Boolean isSparkSessionManaged = Optional
|
|
|
|
.ofNullable(parser.get("isSparkSessionManaged"))
|
|
|
|
.map(Boolean::valueOf)
|
|
|
|
.orElse(Boolean.TRUE);
|
|
|
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
|
|
|
|
|
|
|
final String graphPath = parser.get("graphPath");
|
|
|
|
log.info("graphPath: {}", graphPath);
|
|
|
|
|
|
|
|
final String eventsPath = parser.get("eventsPath");
|
|
|
|
log.info("eventsPath: {}", eventsPath);
|
|
|
|
|
2020-06-09 16:01:31 +02:00
|
|
|
final String isLookupUrl = parser.get("isLookupUrl");
|
|
|
|
log.info("isLookupUrl: {}", isLookupUrl);
|
|
|
|
|
|
|
|
final String dedupConfigProfileId = parser.get("dedupConfProfile");
|
|
|
|
log.info("dedupConfigProfileId: {}", dedupConfigProfileId);
|
|
|
|
|
2020-05-07 12:31:26 +02:00
|
|
|
final SparkConf conf = new SparkConf();
|
2020-05-19 16:17:35 +02:00
|
|
|
|
2020-06-09 16:01:31 +02:00
|
|
|
final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId);
|
|
|
|
|
2020-05-07 12:31:26 +02:00
|
|
|
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
|
2020-06-08 16:26:16 +02:00
|
|
|
|
2020-05-07 12:31:26 +02:00
|
|
|
removeOutputDir(spark, eventsPath);
|
2020-05-19 16:17:35 +02:00
|
|
|
|
2020-06-08 16:26:16 +02:00
|
|
|
final Dataset<Event> all = spark.emptyDataset(Encoders.kryo(Event.class));
|
2020-05-19 16:17:35 +02:00
|
|
|
|
2020-06-04 10:26:34 +02:00
|
|
|
for (final Class<? extends Result> r1 : BrokerConstants.RESULT_CLASSES) {
|
2020-06-11 11:25:18 +02:00
|
|
|
all.union(generateEvents(spark, graphPath, r1, dedupConfig));
|
2020-06-04 10:26:34 +02:00
|
|
|
}
|
2020-05-19 16:17:35 +02:00
|
|
|
|
2020-06-11 11:25:18 +02:00
|
|
|
all.write().mode(SaveMode.Overwrite).option("compression", "gzip").json(eventsPath);
|
2020-05-07 12:31:26 +02:00
|
|
|
});
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
private static void removeOutputDir(final SparkSession spark, final String path) {
|
|
|
|
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
|
|
|
}
|
|
|
|
|
2020-06-11 11:25:18 +02:00
|
|
|
private static <SRC extends Result> Dataset<Event> generateEvents(
|
|
|
|
final SparkSession spark,
|
2020-05-19 16:17:35 +02:00
|
|
|
final String graphPath,
|
2020-06-11 11:25:18 +02:00
|
|
|
final Class<SRC> sourceClass,
|
2020-06-09 16:01:31 +02:00
|
|
|
final DedupConfig dedupConfig) {
|
2020-05-19 16:17:35 +02:00
|
|
|
|
2020-06-11 11:25:18 +02:00
|
|
|
final Dataset<ResultWithRelations> results = expandResultsWithRelations(spark, graphPath, sourceClass);
|
2020-05-19 16:17:35 +02:00
|
|
|
|
2020-06-09 16:01:31 +02:00
|
|
|
final Dataset<Relation> mergedRels = readPath(spark, graphPath + "/relation", Relation.class)
|
2020-05-21 16:47:53 +02:00
|
|
|
.filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
|
2020-05-19 16:17:35 +02:00
|
|
|
|
2020-06-11 11:25:18 +02:00
|
|
|
final TypedColumn<Tuple2<ResultWithRelations, Relation>, ResultGroup> aggr = new ResultAggregator()
|
|
|
|
.toColumn();
|
|
|
|
|
2020-06-10 12:11:16 +02:00
|
|
|
return results
|
2020-06-11 11:25:18 +02:00
|
|
|
.joinWith(mergedRels, results.col("result.id").equalTo(mergedRels.col("source")), "inner")
|
|
|
|
.groupByKey(
|
|
|
|
(MapFunction<Tuple2<ResultWithRelations, Relation>, String>) t -> t._2.getTarget(), Encoders.STRING())
|
|
|
|
.agg(aggr)
|
2020-06-08 16:26:16 +02:00
|
|
|
.map((MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class))
|
|
|
|
.filter(ResultGroup::isValid)
|
2020-06-10 12:11:16 +02:00
|
|
|
.map(
|
2020-06-11 11:25:18 +02:00
|
|
|
(MapFunction<ResultGroup, EventGroup>) g -> EventFinder.generateEvents(g, dedupConfig),
|
2020-06-10 12:11:16 +02:00
|
|
|
Encoders.kryo(EventGroup.class))
|
2020-06-08 16:26:16 +02:00
|
|
|
.flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class));
|
2020-05-07 12:31:26 +02:00
|
|
|
}
|
|
|
|
|
2020-06-11 11:25:18 +02:00
|
|
|
private static <SRC extends Result> Dataset<ResultWithRelations> expandResultsWithRelations(
|
2020-06-10 12:11:16 +02:00
|
|
|
final SparkSession spark,
|
2020-06-04 17:10:43 +02:00
|
|
|
final String graphPath,
|
2020-06-11 11:25:18 +02:00
|
|
|
final Class<SRC> sourceClass) {
|
|
|
|
final Dataset<Project> projects = readPath(spark, graphPath + "/project", Project.class);
|
|
|
|
final Dataset<eu.dnetlib.dhp.schema.oaf.Dataset> datasets = readPath(
|
|
|
|
spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
|
|
|
|
final Dataset<Software> softwares = readPath(spark, graphPath + "/software", Software.class);
|
|
|
|
final Dataset<Publication> publications = readPath(spark, graphPath + "/publication", Publication.class);
|
2020-06-04 17:10:43 +02:00
|
|
|
|
|
|
|
final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class)
|
|
|
|
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
|
|
|
|
|
2020-06-11 11:25:18 +02:00
|
|
|
final Dataset<ResultWithRelations> r0 = readPath(
|
|
|
|
spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class)
|
|
|
|
.filter(r -> r.getDataInfo().getDeletedbyinference())
|
|
|
|
.map(r -> new ResultWithRelations(r), Encoders.kryo(ResultWithRelations.class));
|
|
|
|
final Dataset<ResultWithRelations> r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class));
|
|
|
|
final Dataset<ResultWithRelations> r2 = join(r1, rels, relatedEntities(softwares, rels, RelatedProject.class));
|
|
|
|
final Dataset<ResultWithRelations> r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedProject.class));
|
|
|
|
final Dataset<ResultWithRelations> r4 = join(
|
|
|
|
r3, rels, relatedEntities(publications, rels, RelatedProject.class));
|
|
|
|
;
|
|
|
|
|
|
|
|
return r4;
|
2020-05-21 16:47:53 +02:00
|
|
|
}
|
|
|
|
|
2020-06-11 11:25:18 +02:00
|
|
|
private static <T, RT> Dataset<RT> relatedEntities(final Dataset<T> targets, final Dataset<Relation> rels,
|
|
|
|
final Class<RT> clazz) {
|
|
|
|
return rels
|
|
|
|
.joinWith(targets, targets.col("id").equalTo(rels.col("target")), "inner")
|
|
|
|
.map(
|
|
|
|
t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz),
|
|
|
|
Encoders.kryo(clazz));
|
2020-05-21 16:47:53 +02:00
|
|
|
}
|
|
|
|
|
2020-06-11 11:25:18 +02:00
|
|
|
private static <T> Dataset<ResultWithRelations> join(final Dataset<ResultWithRelations> sources,
|
|
|
|
final Dataset<Relation> rels,
|
|
|
|
final Dataset<T> typedRels) {
|
2020-05-21 16:47:53 +02:00
|
|
|
|
2020-06-11 11:25:18 +02:00
|
|
|
final TypedColumn<Tuple2<ResultWithRelations, T>, ResultWithRelations> aggr = new ResultWithRelationsAggregator<T>()
|
|
|
|
.toColumn();
|
|
|
|
;
|
2020-05-21 16:47:53 +02:00
|
|
|
|
2020-06-11 11:25:18 +02:00
|
|
|
return sources
|
|
|
|
.joinWith(typedRels, sources.col("result.id").equalTo(rels.col("source")), "left_outer")
|
|
|
|
.groupByKey(
|
|
|
|
(MapFunction<Tuple2<ResultWithRelations, T>, String>) t -> t._1.getResult().getId(), Encoders.STRING())
|
|
|
|
.agg(aggr)
|
|
|
|
.map(t -> t._2, Encoders.kryo(ResultWithRelations.class));
|
2020-05-21 16:47:53 +02:00
|
|
|
}
|
|
|
|
|
2020-05-19 16:17:35 +02:00
|
|
|
public static <R> Dataset<R> readPath(
|
|
|
|
final SparkSession spark,
|
|
|
|
final String inputPath,
|
|
|
|
final Class<R> clazz) {
|
|
|
|
return spark
|
|
|
|
.read()
|
|
|
|
.textFile(inputPath)
|
|
|
|
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
|
|
|
|
}
|
2020-06-09 16:01:31 +02:00
|
|
|
|
|
|
|
private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception {
|
|
|
|
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
|
|
|
|
2020-06-10 12:11:16 +02:00
|
|
|
final String conf = isLookUpService
|
|
|
|
.getResourceProfileByQuery(
|
|
|
|
String
|
|
|
|
.format(
|
|
|
|
"for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()",
|
|
|
|
profId));
|
2020-06-09 16:01:31 +02:00
|
|
|
|
|
|
|
final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class);
|
|
|
|
dedupConfig.getPace().initModel();
|
|
|
|
dedupConfig.getPace().initTranslationMap();
|
|
|
|
// dedupConfig.getWf().setConfigurationId("???");
|
|
|
|
|
|
|
|
return dedupConfig;
|
|
|
|
}
|
|
|
|
|
2020-05-07 12:31:26 +02:00
|
|
|
}
|