indexing, accumulators, limited lists

This commit is contained in:
Michele Artini 2020-06-30 16:17:09 +02:00
parent 6f13673464
commit 59a5421c24
19 changed files with 315 additions and 124 deletions

View File

@ -10,10 +10,8 @@ import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext; import org.apache.spark.SparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.util.LongAccumulator; import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -73,17 +71,17 @@ public class GenerateEventsJob {
final Map<String, LongAccumulator> accumulators = prepareAccumulators(spark.sparkContext()); final Map<String, LongAccumulator> accumulators = prepareAccumulators(spark.sparkContext());
final LongAccumulator total = spark.sparkContext().longAccumulator("total_events");
final Dataset<ResultGroup> groups = ClusterUtils final Dataset<ResultGroup> groups = ClusterUtils
.readPath(spark, workingPath + "/duplicates", ResultGroup.class); .readPath(spark, workingPath + "/duplicates", ResultGroup.class);
final Dataset<Event> events = groups final Dataset<Event> dataset = groups
.map( .map(g -> EventFinder.generateEvents(g, dedupConfig, accumulators), Encoders.bean(EventGroup.class))
(MapFunction<ResultGroup, EventGroup>) g -> EventFinder .flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class))
.generateEvents(g, dedupConfig, accumulators), .map(e -> ClusterUtils.incrementAccumulator(e, total), Encoders.bean(Event.class));
Encoders.bean(EventGroup.class))
.flatMap(group -> group.getData().iterator(), Encoders.bean(Event.class));
events.write().mode(SaveMode.Overwrite).json(eventsPath); ClusterUtils.save(dataset, eventsPath, Event.class, total);
}); });

View File

@ -18,6 +18,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event; import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
public class IndexOnESJob { public class IndexOnESJob {
@ -45,10 +46,8 @@ public class IndexOnESJob {
final SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); final SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
final JavaRDD<String> inputRdd = spark final JavaRDD<String> inputRdd = ClusterUtils
.read() .readPath(spark, eventsPath, Event.class)
.load(eventsPath)
.as(Encoders.bean(Event.class))
.map(IndexOnESJob::eventAsJsonString, Encoders.STRING()) .map(IndexOnESJob::eventAsJsonString, Encoders.STRING())
.javaRDD(); .javaRDD();

View File

@ -10,8 +10,8 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.TypedColumn; import org.apache.spark.sql.TypedColumn;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -52,6 +52,8 @@ public class JoinStep1Job {
ClusterUtils.removeDir(spark, joinedEntitiesPath); ClusterUtils.removeDir(spark, joinedEntitiesPath);
final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
final Dataset<OaBrokerMainEntity> sources = ClusterUtils final Dataset<OaBrokerMainEntity> sources = ClusterUtils
.readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class); .readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class);
@ -61,16 +63,15 @@ public class JoinStep1Job {
final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedProject>, OaBrokerMainEntity> aggr = new RelatedProjectAggregator() final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedProject>, OaBrokerMainEntity> aggr = new RelatedProjectAggregator()
.toColumn(); .toColumn();
sources final Dataset<OaBrokerMainEntity> dataset = sources
.joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
.groupByKey( .groupByKey(
(MapFunction<Tuple2<OaBrokerMainEntity, RelatedProject>, String>) t -> t._1.getOpenaireId(), (MapFunction<Tuple2<OaBrokerMainEntity, RelatedProject>, String>) t -> t._1.getOpenaireId(),
Encoders.STRING()) Encoders.STRING())
.agg(aggr) .agg(aggr)
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)) .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));
.write()
.mode(SaveMode.Overwrite) ClusterUtils.save(dataset, joinedEntitiesPath, OaBrokerMainEntity.class, total);
.json(joinedEntitiesPath);
}); });

View File

@ -10,8 +10,8 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.TypedColumn; import org.apache.spark.sql.TypedColumn;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -52,6 +52,8 @@ public class JoinStep2Job {
ClusterUtils.removeDir(spark, joinedEntitiesPath); ClusterUtils.removeDir(spark, joinedEntitiesPath);
final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
final Dataset<OaBrokerMainEntity> sources = ClusterUtils final Dataset<OaBrokerMainEntity> sources = ClusterUtils
.readPath(spark, workingPath + "/joinedEntities_step1", OaBrokerMainEntity.class); .readPath(spark, workingPath + "/joinedEntities_step1", OaBrokerMainEntity.class);
@ -61,16 +63,15 @@ public class JoinStep2Job {
final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedSoftware>, OaBrokerMainEntity> aggr = new RelatedSoftwareAggregator() final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedSoftware>, OaBrokerMainEntity> aggr = new RelatedSoftwareAggregator()
.toColumn(); .toColumn();
sources final Dataset<OaBrokerMainEntity> dataset = sources
.joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
.groupByKey( .groupByKey(
(MapFunction<Tuple2<OaBrokerMainEntity, RelatedSoftware>, String>) t -> t._1.getOpenaireId(), (MapFunction<Tuple2<OaBrokerMainEntity, RelatedSoftware>, String>) t -> t._1.getOpenaireId(),
Encoders.STRING()) Encoders.STRING())
.agg(aggr) .agg(aggr)
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)) .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));
.write()
.mode(SaveMode.Overwrite) ClusterUtils.save(dataset, joinedEntitiesPath, OaBrokerMainEntity.class, total);
.json(joinedEntitiesPath);
}); });

View File

@ -10,8 +10,8 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.TypedColumn; import org.apache.spark.sql.TypedColumn;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -52,6 +52,8 @@ public class JoinStep3Job {
ClusterUtils.removeDir(spark, joinedEntitiesPath); ClusterUtils.removeDir(spark, joinedEntitiesPath);
final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
final Dataset<OaBrokerMainEntity> sources = ClusterUtils final Dataset<OaBrokerMainEntity> sources = ClusterUtils
.readPath(spark, workingPath + "/joinedEntities_step2", OaBrokerMainEntity.class); .readPath(spark, workingPath + "/joinedEntities_step2", OaBrokerMainEntity.class);
@ -61,16 +63,15 @@ public class JoinStep3Job {
final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedDataset>, OaBrokerMainEntity> aggr = new RelatedDatasetAggregator() final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedDataset>, OaBrokerMainEntity> aggr = new RelatedDatasetAggregator()
.toColumn(); .toColumn();
sources final Dataset<OaBrokerMainEntity> dataset = sources
.joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
.groupByKey( .groupByKey(
(MapFunction<Tuple2<OaBrokerMainEntity, RelatedDataset>, String>) t -> t._1.getOpenaireId(), (MapFunction<Tuple2<OaBrokerMainEntity, RelatedDataset>, String>) t -> t._1.getOpenaireId(),
Encoders.STRING()) Encoders.STRING())
.agg(aggr) .agg(aggr)
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)) .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));
.write()
.mode(SaveMode.Overwrite) ClusterUtils.save(dataset, joinedEntitiesPath, OaBrokerMainEntity.class, total);
.json(joinedEntitiesPath);
}); });

View File

@ -10,8 +10,8 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.TypedColumn; import org.apache.spark.sql.TypedColumn;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -52,6 +52,8 @@ public class JoinStep4Job {
ClusterUtils.removeDir(spark, joinedEntitiesPath); ClusterUtils.removeDir(spark, joinedEntitiesPath);
final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
final Dataset<OaBrokerMainEntity> sources = ClusterUtils final Dataset<OaBrokerMainEntity> sources = ClusterUtils
.readPath(spark, workingPath + "/joinedEntities_step3", OaBrokerMainEntity.class); .readPath(spark, workingPath + "/joinedEntities_step3", OaBrokerMainEntity.class);
@ -61,16 +63,15 @@ public class JoinStep4Job {
final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedPublication>, OaBrokerMainEntity> aggr = new RelatedPublicationAggregator() final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedPublication>, OaBrokerMainEntity> aggr = new RelatedPublicationAggregator()
.toColumn(); .toColumn();
sources final Dataset<OaBrokerMainEntity> dataset = sources
.joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
.groupByKey( .groupByKey(
(MapFunction<Tuple2<OaBrokerMainEntity, RelatedPublication>, String>) t -> t._1.getOpenaireId(), (MapFunction<Tuple2<OaBrokerMainEntity, RelatedPublication>, String>) t -> t._1.getOpenaireId(),
Encoders.STRING()) Encoders.STRING())
.agg(aggr) .agg(aggr)
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)) .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));
.write()
.mode(SaveMode.Overwrite) ClusterUtils.save(dataset, joinedEntitiesPath, OaBrokerMainEntity.class, total);
.json(joinedEntitiesPath);
}); });

View File

@ -10,8 +10,8 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.TypedColumn; import org.apache.spark.sql.TypedColumn;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -57,6 +57,8 @@ public class PrepareGroupsJob {
ClusterUtils.removeDir(spark, groupsPath); ClusterUtils.removeDir(spark, groupsPath);
final LongAccumulator total = spark.sparkContext().longAccumulator("total_groups");
final Dataset<OaBrokerMainEntity> results = ClusterUtils final Dataset<OaBrokerMainEntity> results = ClusterUtils
.readPath(spark, workingPath + "/joinedEntities_step4", OaBrokerMainEntity.class); .readPath(spark, workingPath + "/joinedEntities_step4", OaBrokerMainEntity.class);
@ -67,20 +69,16 @@ public class PrepareGroupsJob {
final TypedColumn<Tuple2<OaBrokerMainEntity, Relation>, ResultGroup> aggr = new ResultAggregator() final TypedColumn<Tuple2<OaBrokerMainEntity, Relation>, ResultGroup> aggr = new ResultAggregator()
.toColumn(); .toColumn();
final Dataset<ResultGroup> groups = results final Dataset<ResultGroup> dataset = results
.joinWith(mergedRels, results.col("openaireId").equalTo(mergedRels.col("source")), "inner") .joinWith(mergedRels, results.col("openaireId").equalTo(mergedRels.col("source")), "inner")
.groupByKey( .groupByKey(
(MapFunction<Tuple2<OaBrokerMainEntity, Relation>, String>) t -> t._2.getTarget(), (MapFunction<Tuple2<OaBrokerMainEntity, Relation>, String>) t -> t._2.getTarget(),
Encoders.STRING()) Encoders.STRING())
.agg(aggr) .agg(aggr)
.map( .map(t -> t._2, Encoders.bean(ResultGroup.class))
(MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.bean(ResultGroup.class))
.filter(rg -> rg.getData().size() > 1); .filter(rg -> rg.getData().size() > 1);
groups ClusterUtils.save(dataset, groupsPath, ResultGroup.class, total);
.write()
.mode(SaveMode.Overwrite)
.json(groupsPath);
}); });
} }

View File

@ -9,7 +9,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -54,6 +54,8 @@ public class PrepareRelatedDatasetsJob {
ClusterUtils.removeDir(spark, relsPath); ClusterUtils.removeDir(spark, relsPath);
final LongAccumulator total = spark.sparkContext().longAccumulator("total_rels");
final Dataset<OaBrokerRelatedDataset> datasets = ClusterUtils final Dataset<OaBrokerRelatedDataset> datasets = ClusterUtils
.readPath(spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class) .readPath(spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class)
.filter(d -> !ClusterUtils.isDedupRoot(d.getId())) .filter(d -> !ClusterUtils.isDedupRoot(d.getId()))
@ -67,16 +69,15 @@ public class PrepareRelatedDatasetsJob {
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) .filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
.filter(r -> !ClusterUtils.isDedupRoot(r.getTarget())); .filter(r -> !ClusterUtils.isDedupRoot(r.getTarget()));
rels final Dataset<RelatedDataset> dataset = rels
.joinWith(datasets, datasets.col("openaireId").equalTo(rels.col("target")), "inner") .joinWith(datasets, datasets.col("openaireId").equalTo(rels.col("target")), "inner")
.map(t -> { .map(t -> {
final RelatedDataset rel = new RelatedDataset(t._1.getSource(), t._2); final RelatedDataset rel = new RelatedDataset(t._1.getSource(), t._2);
rel.getRelDataset().setRelType(t._1.getRelClass()); rel.getRelDataset().setRelType(t._1.getRelClass());
return rel; return rel;
}, Encoders.bean(RelatedDataset.class)) }, Encoders.bean(RelatedDataset.class));
.write()
.mode(SaveMode.Overwrite) ClusterUtils.save(dataset, relsPath, RelatedDataset.class, total);
.json(relsPath);
}); });

View File

@ -9,7 +9,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -56,6 +56,8 @@ public class PrepareRelatedProjectsJob {
ClusterUtils.removeDir(spark, relsPath); ClusterUtils.removeDir(spark, relsPath);
final LongAccumulator total = spark.sparkContext().longAccumulator("total_rels");
final Dataset<OaBrokerProject> projects = ClusterUtils final Dataset<OaBrokerProject> projects = ClusterUtils
.readPath(spark, graphPath + "/project", Project.class) .readPath(spark, graphPath + "/project", Project.class)
.filter(p -> !ClusterUtils.isDedupRoot(p.getId())) .filter(p -> !ClusterUtils.isDedupRoot(p.getId()))
@ -69,12 +71,12 @@ public class PrepareRelatedProjectsJob {
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) .filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
.filter(r -> !ClusterUtils.isDedupRoot(r.getTarget())); .filter(r -> !ClusterUtils.isDedupRoot(r.getTarget()));
rels final Dataset<RelatedProject> dataset = rels
.joinWith(projects, projects.col("openaireId").equalTo(rels.col("target")), "inner") .joinWith(projects, projects.col("openaireId").equalTo(rels.col("target")), "inner")
.map(t -> new RelatedProject(t._1.getSource(), t._2), Encoders.bean(RelatedProject.class)) .map(t -> new RelatedProject(t._1.getSource(), t._2), Encoders.bean(RelatedProject.class));
.write()
.mode(SaveMode.Overwrite) ClusterUtils.save(dataset, relsPath, RelatedProject.class, total);
.json(relsPath);
}); });
} }

View File

@ -9,7 +9,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -55,6 +55,8 @@ public class PrepareRelatedPublicationsJob {
ClusterUtils.removeDir(spark, relsPath); ClusterUtils.removeDir(spark, relsPath);
final LongAccumulator total = spark.sparkContext().longAccumulator("total_rels");
final Dataset<OaBrokerRelatedPublication> pubs = ClusterUtils final Dataset<OaBrokerRelatedPublication> pubs = ClusterUtils
.readPath(spark, graphPath + "/publication", Publication.class) .readPath(spark, graphPath + "/publication", Publication.class)
.filter(p -> !ClusterUtils.isDedupRoot(p.getId())) .filter(p -> !ClusterUtils.isDedupRoot(p.getId()))
@ -70,16 +72,15 @@ public class PrepareRelatedPublicationsJob {
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) .filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
.filter(r -> !ClusterUtils.isDedupRoot(r.getTarget())); .filter(r -> !ClusterUtils.isDedupRoot(r.getTarget()));
rels final Dataset<RelatedPublication> dataset = rels
.joinWith(pubs, pubs.col("openaireId").equalTo(rels.col("target")), "inner") .joinWith(pubs, pubs.col("openaireId").equalTo(rels.col("target")), "inner")
.map(t -> { .map(t -> {
final RelatedPublication rel = new RelatedPublication(t._1.getSource(), t._2); final RelatedPublication rel = new RelatedPublication(t._1.getSource(), t._2);
rel.getRelPublication().setRelType(t._1.getRelClass()); rel.getRelPublication().setRelType(t._1.getRelClass());
return rel; return rel;
}, Encoders.bean(RelatedPublication.class)) }, Encoders.bean(RelatedPublication.class));
.write()
.mode(SaveMode.Overwrite) ClusterUtils.save(dataset, relsPath, RelatedPublication.class, total);
.json(relsPath);
}); });

View File

@ -9,7 +9,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -56,6 +56,8 @@ public class PrepareRelatedSoftwaresJob {
ClusterUtils.removeDir(spark, relsPath); ClusterUtils.removeDir(spark, relsPath);
final LongAccumulator total = spark.sparkContext().longAccumulator("total_rels");
final Dataset<OaBrokerRelatedSoftware> softwares = ClusterUtils final Dataset<OaBrokerRelatedSoftware> softwares = ClusterUtils
.readPath(spark, graphPath + "/software", Software.class) .readPath(spark, graphPath + "/software", Software.class)
.filter(sw -> !ClusterUtils.isDedupRoot(sw.getId())) .filter(sw -> !ClusterUtils.isDedupRoot(sw.getId()))
@ -69,12 +71,11 @@ public class PrepareRelatedSoftwaresJob {
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) .filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
.filter(r -> !ClusterUtils.isDedupRoot(r.getTarget())); .filter(r -> !ClusterUtils.isDedupRoot(r.getTarget()));
rels final Dataset<RelatedSoftware> dataset = rels
.joinWith(softwares, softwares.col("openaireId").equalTo(rels.col("target")), "inner") .joinWith(softwares, softwares.col("openaireId").equalTo(rels.col("target")), "inner")
.map(t -> new RelatedSoftware(t._1.getSource(), t._2), Encoders.bean(RelatedSoftware.class)) .map(t -> new RelatedSoftware(t._1.getSource(), t._2), Encoders.bean(RelatedSoftware.class));
.write()
.mode(SaveMode.Overwrite) ClusterUtils.save(dataset, relsPath, RelatedSoftware.class, total);
.json(relsPath);
}); });

View File

@ -9,8 +9,8 @@ import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -56,13 +56,14 @@ public class PrepareSimpleEntititiesJob {
ClusterUtils.removeDir(spark, simpleEntitiesPath); ClusterUtils.removeDir(spark, simpleEntitiesPath);
prepareSimpleEntities(spark, graphPath, Publication.class) final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
final Dataset<OaBrokerMainEntity> dataset = prepareSimpleEntities(spark, graphPath, Publication.class)
.union(prepareSimpleEntities(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class)) .union(prepareSimpleEntities(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class))
.union(prepareSimpleEntities(spark, graphPath, Software.class)) .union(prepareSimpleEntities(spark, graphPath, Software.class))
.union(prepareSimpleEntities(spark, graphPath, OtherResearchProduct.class)) .union(prepareSimpleEntities(spark, graphPath, OtherResearchProduct.class));
.write()
.mode(SaveMode.Overwrite) ClusterUtils.save(dataset, simpleEntitiesPath, OaBrokerMainEntity.class, total);
.json(simpleEntitiesPath);
}); });
} }

View File

@ -83,8 +83,8 @@ public abstract class UpdateMatcher<T> {
return list == null || list.isEmpty() || StringUtils.isBlank(list.get(0)); return list == null || list.isEmpty() || StringUtils.isBlank(list.get(0));
} }
protected boolean isMissing(final String field) { protected boolean isMissing(final String s) {
return StringUtils.isBlank(field); return StringUtils.isBlank(s);
} }
public int getMaxNumber() { public int getMaxNumber() {
@ -108,7 +108,7 @@ public abstract class UpdateMatcher<T> {
} }
public void incrementAccumulator(final Map<String, LongAccumulator> accumulators, final long n) { public void incrementAccumulator(final Map<String, LongAccumulator> accumulators, final long n) {
if (accumulators.containsKey(accumulatorName())) { if (accumulators != null && accumulators.containsKey(accumulatorName())) {
accumulators.get(accumulatorName()).add(n); accumulators.get(accumulatorName()).add(n);
} }
} }

View File

@ -4,7 +4,9 @@ package eu.dnetlib.dhp.broker.oa.util;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
@ -44,4 +46,20 @@ public class ClusterUtils {
|| s.equals("isSupplementedTo"); || s.equals("isSupplementedTo");
} }
public static <T> T incrementAccumulator(final T o, final LongAccumulator acc) {
if (acc != null) {
acc.add(1);
}
return o;
}
public static <T> void save(final Dataset<T> dataset, final String path, final Class<T> clazz,
final LongAccumulator acc) {
dataset
.map(o -> ClusterUtils.incrementAccumulator(o, acc), Encoders.bean(clazz))
.write()
.mode(SaveMode.Overwrite)
.json(path);
}
} }

View File

@ -55,7 +55,7 @@ public class ConversionUtils {
res.setLicense(BrokerConstants.OPEN_ACCESS); res.setLicense(BrokerConstants.OPEN_ACCESS);
res.setHostedby(kvValue(i.getHostedby())); res.setHostedby(kvValue(i.getHostedby()));
return res; return res;
}); }, 20);
} }
public static OaBrokerTypedValue oafPidToBrokerPid(final StructuredProperty sp) { public static OaBrokerTypedValue oafPidToBrokerPid(final StructuredProperty sp) {
@ -75,8 +75,8 @@ public class ConversionUtils {
res.setOpenaireId(d.getId()); res.setOpenaireId(d.getId());
res.setOriginalId(first(d.getOriginalId())); res.setOriginalId(first(d.getOriginalId()));
res.setTitle(structPropValue(d.getTitle())); res.setTitle(structPropValue(d.getTitle()));
res.setPids(mappedList(d.getPid(), ConversionUtils::oafPidToBrokerPid)); res.setPids(mappedList(d.getPid(), ConversionUtils::oafPidToBrokerPid, 20));
res.setInstances(flatMappedList(d.getInstance(), ConversionUtils::oafInstanceToBrokerInstances)); res.setInstances(flatMappedList(d.getInstance(), ConversionUtils::oafInstanceToBrokerInstances, 20));
res.setCollectedFrom(mappedFirst(d.getCollectedfrom(), KeyValue::getValue)); res.setCollectedFrom(mappedFirst(d.getCollectedfrom(), KeyValue::getValue));
return res; return res;
} }
@ -90,8 +90,8 @@ public class ConversionUtils {
res.setOpenaireId(p.getId()); res.setOpenaireId(p.getId());
res.setOriginalId(first(p.getOriginalId())); res.setOriginalId(first(p.getOriginalId()));
res.setTitle(structPropValue(p.getTitle())); res.setTitle(structPropValue(p.getTitle()));
res.setPids(mappedList(p.getPid(), ConversionUtils::oafPidToBrokerPid)); res.setPids(mappedList(p.getPid(), ConversionUtils::oafPidToBrokerPid, 20));
res.setInstances(flatMappedList(p.getInstance(), ConversionUtils::oafInstanceToBrokerInstances)); res.setInstances(flatMappedList(p.getInstance(), ConversionUtils::oafInstanceToBrokerInstances, 20));
res.setCollectedFrom(mappedFirst(p.getCollectedfrom(), KeyValue::getValue)); res.setCollectedFrom(mappedFirst(p.getCollectedfrom(), KeyValue::getValue));
return res; return res;
@ -107,23 +107,25 @@ public class ConversionUtils {
res.setOpenaireId(result.getId()); res.setOpenaireId(result.getId());
res.setOriginalId(first(result.getOriginalId())); res.setOriginalId(first(result.getOriginalId()));
res.setTypology(classId(result.getResulttype())); res.setTypology(classId(result.getResulttype()));
res.setTitles(structPropList(result.getTitle())); res.setTitles(structPropList(result.getTitle(), 10));
res.setAbstracts(fieldList(result.getDescription())); res.setAbstracts(fieldList(result.getDescription(), 10));
res.setLanguage(classId(result.getLanguage())); res.setLanguage(classId(result.getLanguage()));
res.setSubjects(structPropTypedList(result.getSubject())); res.setSubjects(structPropTypedList(result.getSubject()));
res.setCreators(mappedList(result.getAuthor(), ConversionUtils::oafAuthorToBrokerAuthor)); res.setCreators(mappedList(result.getAuthor(), ConversionUtils::oafAuthorToBrokerAuthor, 30));
res.setPublicationdate(fieldValue(result.getDateofacceptance())); res.setPublicationdate(fieldValue(result.getDateofacceptance()));
res.setPublisher(fieldValue(result.getPublisher())); res.setPublisher(fieldValue(result.getPublisher()));
res.setEmbargoenddate(fieldValue(result.getEmbargoenddate())); res.setEmbargoenddate(fieldValue(result.getEmbargoenddate()));
res.setContributor(fieldList(result.getContributor())); res.setContributor(fieldList(result.getContributor(), 20));
res res
.setJournal( .setJournal(
result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null); result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null);
res.setCollectedFromId(mappedFirst(result.getCollectedfrom(), KeyValue::getKey)); res.setCollectedFromId(mappedFirst(result.getCollectedfrom(), KeyValue::getKey));
res.setCollectedFromName(mappedFirst(result.getCollectedfrom(), KeyValue::getValue)); res.setCollectedFromName(mappedFirst(result.getCollectedfrom(), KeyValue::getValue));
res.setPids(mappedList(result.getPid(), ConversionUtils::oafPidToBrokerPid)); res.setPids(mappedList(result.getPid(), ConversionUtils::oafPidToBrokerPid, 20));
res.setInstances(flatMappedList(result.getInstance(), ConversionUtils::oafInstanceToBrokerInstances)); res.setInstances(flatMappedList(result.getInstance(), ConversionUtils::oafInstanceToBrokerInstances, 20));
res.setExternalReferences(mappedList(result.getExternalReference(), ConversionUtils::oafExtRefToBrokerExtRef)); res
.setExternalReferences(
mappedList(result.getExternalReference(), ConversionUtils::oafExtRefToBrokerExtRef, 20));
return res; return res;
} }
@ -243,18 +245,25 @@ public class ConversionUtils {
: null; : null;
} }
private static List<String> fieldList(final List<Field<String>> fl) { private static List<String> fieldList(final List<Field<String>> fl, final long maxSize) {
return fl != null return fl != null
? fl.stream().map(Field::getValue).filter(StringUtils::isNotBlank).collect(Collectors.toList()) ? fl
.stream()
.map(Field::getValue)
.map(s -> StringUtils.abbreviate(s, 3000)) // MAX 3000 CHARS
.filter(StringUtils::isNotBlank)
.limit(maxSize)
.collect(Collectors.toList())
: new ArrayList<>(); : new ArrayList<>();
} }
private static List<String> structPropList(final List<StructuredProperty> props) { private static List<String> structPropList(final List<StructuredProperty> props, final long maxSize) {
return props != null return props != null
? props ? props
.stream() .stream()
.map(StructuredProperty::getValue) .map(StructuredProperty::getValue)
.filter(StringUtils::isNotBlank) .filter(StringUtils::isNotBlank)
.limit(maxSize)
.collect(Collectors.toList()) .collect(Collectors.toList())
: new ArrayList<>(); : new ArrayList<>();
} }
@ -271,7 +280,7 @@ public class ConversionUtils {
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
private static <F, T> List<T> mappedList(final List<F> list, final Function<F, T> func) { private static <F, T> List<T> mappedList(final List<F> list, final Function<F, T> func, final long maxSize) {
if (list == null) { if (list == null) {
return new ArrayList<>(); return new ArrayList<>();
} }
@ -280,10 +289,12 @@ public class ConversionUtils {
.stream() .stream()
.map(func::apply) .map(func::apply)
.filter(Objects::nonNull) .filter(Objects::nonNull)
.limit(maxSize)
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
private static <F, T> List<T> flatMappedList(final List<F> list, final Function<F, List<T>> func) { private static <F, T> List<T> flatMappedList(final List<F> list, final Function<F, List<T>> func,
final long maxSize) {
if (list == null) { if (list == null) {
return new ArrayList<>(); return new ArrayList<>();
} }
@ -293,6 +304,7 @@ public class ConversionUtils {
.map(func::apply) .map(func::apply)
.flatMap(List::stream) .flatMap(List::stream)
.filter(Objects::nonNull) .filter(Objects::nonNull)
.limit(maxSize)
.collect(Collectors.toList()); .collect(Collectors.toList());
} }

View File

@ -378,9 +378,9 @@
<class>eu.dnetlib.dhp.broker.oa.IndexOnESJob</class> <class>eu.dnetlib.dhp.broker.oa.IndexOnESJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar> <jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.dynamicAllocation.maxExecutors="2"
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}

View File

@ -79,7 +79,6 @@
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="generate_events"> <action name="generate_events">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
@ -101,31 +100,6 @@
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--dedupConfProfile</arg><arg>${dedupConfProfId}</arg> <arg>--dedupConfProfile</arg><arg>${dedupConfProfId}</arg>
</spark> </spark>
<ok to="index_es"/>
<error to="Kill"/>
</action>
<action name="index_es">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>IndexOnESJob</name>
<class>eu.dnetlib.dhp.broker.oa.IndexOnESJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--index</arg><arg>${esIndexName}</arg>
<arg>--esHost</arg><arg>${esIndexHost}</arg>
</spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>

View File

@ -0,0 +1,125 @@
package eu.dnetlib.dhp.broker.oa.matchers;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Arrays;
import java.util.Collection;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPublicationDate;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
class UpdateMatcherTest {
UpdateMatcher<String> matcher = new EnrichMissingPublicationDate();
@BeforeEach
void setUp() throws Exception {
}
@Test
void testSearchUpdatesForRecord_1() {
final OaBrokerMainEntity res = new OaBrokerMainEntity();
final OaBrokerMainEntity p1 = new OaBrokerMainEntity();
final OaBrokerMainEntity p2 = new OaBrokerMainEntity();
final OaBrokerMainEntity p3 = new OaBrokerMainEntity();
final OaBrokerMainEntity p4 = new OaBrokerMainEntity();
final Collection<UpdateInfo<String>> list = matcher
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null);
assertTrue(list.isEmpty());
}
@Test
void testSearchUpdatesForRecord_2() {
final OaBrokerMainEntity res = new OaBrokerMainEntity();
final OaBrokerMainEntity p1 = new OaBrokerMainEntity();
final OaBrokerMainEntity p2 = new OaBrokerMainEntity();
final OaBrokerMainEntity p3 = new OaBrokerMainEntity();
final OaBrokerMainEntity p4 = new OaBrokerMainEntity();
res.setPublicationdate("2018");
final Collection<UpdateInfo<String>> list = matcher
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null);
assertTrue(list.isEmpty());
}
@Test
void testSearchUpdatesForRecord_3() {
final OaBrokerMainEntity res = new OaBrokerMainEntity();
final OaBrokerMainEntity p1 = new OaBrokerMainEntity();
final OaBrokerMainEntity p2 = new OaBrokerMainEntity();
final OaBrokerMainEntity p3 = new OaBrokerMainEntity();
final OaBrokerMainEntity p4 = new OaBrokerMainEntity();
p2.setPublicationdate("2018");
final Collection<UpdateInfo<String>> list = matcher
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null);
assertTrue(list.size() == 1);
}
@Test
void testSearchUpdatesForRecord_4() {
final OaBrokerMainEntity res = new OaBrokerMainEntity();
final OaBrokerMainEntity p1 = new OaBrokerMainEntity();
final OaBrokerMainEntity p2 = new OaBrokerMainEntity();
final OaBrokerMainEntity p3 = new OaBrokerMainEntity();
final OaBrokerMainEntity p4 = new OaBrokerMainEntity();
res.setPublicationdate("2018");
p2.setPublicationdate("2018");
final Collection<UpdateInfo<String>> list = matcher
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null);
assertTrue(list.isEmpty());
}
@Test
void testSearchUpdatesForRecord_5() {
final OaBrokerMainEntity res = new OaBrokerMainEntity();
final OaBrokerMainEntity p1 = new OaBrokerMainEntity();
final OaBrokerMainEntity p2 = new OaBrokerMainEntity();
final OaBrokerMainEntity p3 = new OaBrokerMainEntity();
final OaBrokerMainEntity p4 = new OaBrokerMainEntity();
res.setPublicationdate("2018");
p1.setPublicationdate("2018");
p2.setPublicationdate("2018");
p3.setPublicationdate("2018");
p4.setPublicationdate("2018");
final Collection<UpdateInfo<String>> list = matcher
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null);
assertTrue(list.isEmpty());
}
@Test
void testSearchUpdatesForRecord_6() {
final OaBrokerMainEntity res = new OaBrokerMainEntity();
final OaBrokerMainEntity p1 = new OaBrokerMainEntity();
final OaBrokerMainEntity p2 = new OaBrokerMainEntity();
final OaBrokerMainEntity p3 = new OaBrokerMainEntity();
final OaBrokerMainEntity p4 = new OaBrokerMainEntity();
p1.setPublicationdate("2018");
p2.setPublicationdate("2018");
p3.setPublicationdate("2018");
p4.setPublicationdate("2018");
final Collection<UpdateInfo<String>> list = matcher
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null);
assertTrue(list.size() == 1);
}
}

View File

@ -0,0 +1,57 @@
package eu.dnetlib.dhp.broker.oa.matchers.simple;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.List;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
class EnrichMissingPublicationDateTest {
final EnrichMissingPublicationDate matcher = new EnrichMissingPublicationDate();
@BeforeEach
void setUp() throws Exception {
}
@Test
void testFindDifferences_1() {
final OaBrokerMainEntity source = new OaBrokerMainEntity();
final OaBrokerMainEntity target = new OaBrokerMainEntity();
final List<String> list = matcher.findDifferences(source, target);
assertTrue(list.isEmpty());
}
@Test
void testFindDifferences_2() {
final OaBrokerMainEntity source = new OaBrokerMainEntity();
final OaBrokerMainEntity target = new OaBrokerMainEntity();
source.setPublicationdate("2018");
final List<String> list = matcher.findDifferences(source, target);
assertTrue(list.size() == 1);
}
@Test
void testFindDifferences_3() {
final OaBrokerMainEntity source = new OaBrokerMainEntity();
final OaBrokerMainEntity target = new OaBrokerMainEntity();
target.setPublicationdate("2018");
final List<String> list = matcher.findDifferences(source, target);
assertTrue(list.isEmpty());
}
@Test
void testFindDifferences_4() {
final OaBrokerMainEntity source = new OaBrokerMainEntity();
final OaBrokerMainEntity target = new OaBrokerMainEntity();
source.setPublicationdate("2018");
target.setPublicationdate("2018");
final List<String> list = matcher.findDifferences(source, target);
assertTrue(list.isEmpty());
}
}