1
0
Fork 0

Merge branch 'master' of code-repo.d4science.org:D-Net/dnet-hadoop

This commit is contained in:
Sandro La Bruzzo 2020-07-02 12:37:49 +02:00
commit 07f0723fa7
34 changed files with 538 additions and 89 deletions

View File

@ -1,5 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent> <parent>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
@ -24,6 +26,10 @@
<groupId>org.apache.spark</groupId> <groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId> <artifactId>spark-sql_2.11</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
</dependency>
<dependency> <dependency>

View File

@ -3,14 +3,16 @@ package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -18,6 +20,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event; import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.EventFinder; import eu.dnetlib.dhp.broker.oa.util.EventFinder;
import eu.dnetlib.dhp.broker.oa.util.EventGroup; import eu.dnetlib.dhp.broker.oa.util.EventGroup;
@ -66,21 +69,35 @@ public class GenerateEventsJob {
ClusterUtils.removeDir(spark, eventsPath); ClusterUtils.removeDir(spark, eventsPath);
final Map<String, LongAccumulator> accumulators = prepareAccumulators(spark.sparkContext());
final LongAccumulator total = spark.sparkContext().longAccumulator("total_events");
final Dataset<ResultGroup> groups = ClusterUtils final Dataset<ResultGroup> groups = ClusterUtils
.readPath(spark, workingPath + "/duplicates", ResultGroup.class); .readPath(spark, workingPath + "/duplicates", ResultGroup.class);
final Dataset<Event> events = groups final Dataset<Event> dataset = groups
.map( .map(g -> EventFinder.generateEvents(g, dedupConfig, accumulators), Encoders.bean(EventGroup.class))
(MapFunction<ResultGroup, EventGroup>) g -> EventFinder.generateEvents(g, dedupConfig), .flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class))
Encoders.bean(EventGroup.class)) .map(e -> ClusterUtils.incrementAccumulator(e, total), Encoders.bean(Event.class));
.flatMap(group -> group.getData().iterator(), Encoders.bean(Event.class));
events.write().mode(SaveMode.Overwrite).json(eventsPath); ClusterUtils.save(dataset, eventsPath, Event.class, total);
}); });
} }
public static Map<String, LongAccumulator> prepareAccumulators(final SparkContext sc) {
return EventFinder
.getMatchers()
.stream()
.map(UpdateMatcher::accumulatorName)
.distinct()
.collect(Collectors.toMap(s -> s, s -> sc.longAccumulator(s)));
}
private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception { private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception {
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl); final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl);

View File

@ -0,0 +1,70 @@
package eu.dnetlib.dhp.broker.oa;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
public class IndexOnESJob {
private static final Logger log = LoggerFactory.getLogger(IndexOnESJob.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
IndexOnESJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_es.json")));
parser.parseArgument(args);
final SparkConf conf = new SparkConf();
final String eventsPath = parser.get("workingPath") + "/events";
log.info("eventsPath: {}", eventsPath);
final String index = parser.get("index");
log.info("index: {}", index);
final String indexHost = parser.get("esHost");
log.info("indexHost: {}", indexHost);
final SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
final JavaRDD<String> inputRdd = ClusterUtils
.readPath(spark, eventsPath, Event.class)
.map(IndexOnESJob::eventAsJsonString, Encoders.STRING())
.javaRDD();
final Map<String, String> esCfg = new HashMap<>();
// esCfg.put("es.nodes", "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54");
esCfg.put("es.nodes", indexHost);
esCfg.put("es.mapping.id", "eventId"); // THE PRIMARY KEY
esCfg.put("es.batch.write.retry.count", "8");
esCfg.put("es.batch.write.retry.wait", "60s");
esCfg.put("es.batch.size.entries", "200");
esCfg.put("es.nodes.wan.only", "true");
JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg);
}
private static String eventAsJsonString(final Event f) throws JsonProcessingException {
return new ObjectMapper().writeValueAsString(f);
}
}

View File

@ -10,8 +10,8 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.TypedColumn; import org.apache.spark.sql.TypedColumn;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -52,6 +52,8 @@ public class JoinStep1Job {
ClusterUtils.removeDir(spark, joinedEntitiesPath); ClusterUtils.removeDir(spark, joinedEntitiesPath);
final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
final Dataset<OaBrokerMainEntity> sources = ClusterUtils final Dataset<OaBrokerMainEntity> sources = ClusterUtils
.readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class); .readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class);
@ -61,16 +63,15 @@ public class JoinStep1Job {
final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedProject>, OaBrokerMainEntity> aggr = new RelatedProjectAggregator() final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedProject>, OaBrokerMainEntity> aggr = new RelatedProjectAggregator()
.toColumn(); .toColumn();
sources final Dataset<OaBrokerMainEntity> dataset = sources
.joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
.groupByKey( .groupByKey(
(MapFunction<Tuple2<OaBrokerMainEntity, RelatedProject>, String>) t -> t._1.getOpenaireId(), (MapFunction<Tuple2<OaBrokerMainEntity, RelatedProject>, String>) t -> t._1.getOpenaireId(),
Encoders.STRING()) Encoders.STRING())
.agg(aggr) .agg(aggr)
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)) .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));
.write()
.mode(SaveMode.Overwrite) ClusterUtils.save(dataset, joinedEntitiesPath, OaBrokerMainEntity.class, total);
.json(joinedEntitiesPath);
}); });

View File

@ -10,8 +10,8 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.TypedColumn; import org.apache.spark.sql.TypedColumn;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -52,6 +52,8 @@ public class JoinStep2Job {
ClusterUtils.removeDir(spark, joinedEntitiesPath); ClusterUtils.removeDir(spark, joinedEntitiesPath);
final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
final Dataset<OaBrokerMainEntity> sources = ClusterUtils final Dataset<OaBrokerMainEntity> sources = ClusterUtils
.readPath(spark, workingPath + "/joinedEntities_step1", OaBrokerMainEntity.class); .readPath(spark, workingPath + "/joinedEntities_step1", OaBrokerMainEntity.class);
@ -61,16 +63,15 @@ public class JoinStep2Job {
final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedSoftware>, OaBrokerMainEntity> aggr = new RelatedSoftwareAggregator() final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedSoftware>, OaBrokerMainEntity> aggr = new RelatedSoftwareAggregator()
.toColumn(); .toColumn();
sources final Dataset<OaBrokerMainEntity> dataset = sources
.joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
.groupByKey( .groupByKey(
(MapFunction<Tuple2<OaBrokerMainEntity, RelatedSoftware>, String>) t -> t._1.getOpenaireId(), (MapFunction<Tuple2<OaBrokerMainEntity, RelatedSoftware>, String>) t -> t._1.getOpenaireId(),
Encoders.STRING()) Encoders.STRING())
.agg(aggr) .agg(aggr)
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)) .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));
.write()
.mode(SaveMode.Overwrite) ClusterUtils.save(dataset, joinedEntitiesPath, OaBrokerMainEntity.class, total);
.json(joinedEntitiesPath);
}); });

View File

@ -10,8 +10,8 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.TypedColumn; import org.apache.spark.sql.TypedColumn;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -52,6 +52,8 @@ public class JoinStep3Job {
ClusterUtils.removeDir(spark, joinedEntitiesPath); ClusterUtils.removeDir(spark, joinedEntitiesPath);
final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
final Dataset<OaBrokerMainEntity> sources = ClusterUtils final Dataset<OaBrokerMainEntity> sources = ClusterUtils
.readPath(spark, workingPath + "/joinedEntities_step2", OaBrokerMainEntity.class); .readPath(spark, workingPath + "/joinedEntities_step2", OaBrokerMainEntity.class);
@ -61,16 +63,15 @@ public class JoinStep3Job {
final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedDataset>, OaBrokerMainEntity> aggr = new RelatedDatasetAggregator() final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedDataset>, OaBrokerMainEntity> aggr = new RelatedDatasetAggregator()
.toColumn(); .toColumn();
sources final Dataset<OaBrokerMainEntity> dataset = sources
.joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
.groupByKey( .groupByKey(
(MapFunction<Tuple2<OaBrokerMainEntity, RelatedDataset>, String>) t -> t._1.getOpenaireId(), (MapFunction<Tuple2<OaBrokerMainEntity, RelatedDataset>, String>) t -> t._1.getOpenaireId(),
Encoders.STRING()) Encoders.STRING())
.agg(aggr) .agg(aggr)
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)) .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));
.write()
.mode(SaveMode.Overwrite) ClusterUtils.save(dataset, joinedEntitiesPath, OaBrokerMainEntity.class, total);
.json(joinedEntitiesPath);
}); });

View File

@ -10,8 +10,8 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.TypedColumn; import org.apache.spark.sql.TypedColumn;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -52,6 +52,8 @@ public class JoinStep4Job {
ClusterUtils.removeDir(spark, joinedEntitiesPath); ClusterUtils.removeDir(spark, joinedEntitiesPath);
final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
final Dataset<OaBrokerMainEntity> sources = ClusterUtils final Dataset<OaBrokerMainEntity> sources = ClusterUtils
.readPath(spark, workingPath + "/joinedEntities_step3", OaBrokerMainEntity.class); .readPath(spark, workingPath + "/joinedEntities_step3", OaBrokerMainEntity.class);
@ -61,16 +63,15 @@ public class JoinStep4Job {
final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedPublication>, OaBrokerMainEntity> aggr = new RelatedPublicationAggregator() final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedPublication>, OaBrokerMainEntity> aggr = new RelatedPublicationAggregator()
.toColumn(); .toColumn();
sources final Dataset<OaBrokerMainEntity> dataset = sources
.joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
.groupByKey( .groupByKey(
(MapFunction<Tuple2<OaBrokerMainEntity, RelatedPublication>, String>) t -> t._1.getOpenaireId(), (MapFunction<Tuple2<OaBrokerMainEntity, RelatedPublication>, String>) t -> t._1.getOpenaireId(),
Encoders.STRING()) Encoders.STRING())
.agg(aggr) .agg(aggr)
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)) .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));
.write()
.mode(SaveMode.Overwrite) ClusterUtils.save(dataset, joinedEntitiesPath, OaBrokerMainEntity.class, total);
.json(joinedEntitiesPath);
}); });

View File

@ -10,8 +10,8 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.TypedColumn; import org.apache.spark.sql.TypedColumn;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -57,6 +57,8 @@ public class PrepareGroupsJob {
ClusterUtils.removeDir(spark, groupsPath); ClusterUtils.removeDir(spark, groupsPath);
final LongAccumulator total = spark.sparkContext().longAccumulator("total_groups");
final Dataset<OaBrokerMainEntity> results = ClusterUtils final Dataset<OaBrokerMainEntity> results = ClusterUtils
.readPath(spark, workingPath + "/joinedEntities_step4", OaBrokerMainEntity.class); .readPath(spark, workingPath + "/joinedEntities_step4", OaBrokerMainEntity.class);
@ -67,20 +69,16 @@ public class PrepareGroupsJob {
final TypedColumn<Tuple2<OaBrokerMainEntity, Relation>, ResultGroup> aggr = new ResultAggregator() final TypedColumn<Tuple2<OaBrokerMainEntity, Relation>, ResultGroup> aggr = new ResultAggregator()
.toColumn(); .toColumn();
final Dataset<ResultGroup> groups = results final Dataset<ResultGroup> dataset = results
.joinWith(mergedRels, results.col("openaireId").equalTo(mergedRels.col("source")), "inner") .joinWith(mergedRels, results.col("openaireId").equalTo(mergedRels.col("source")), "inner")
.groupByKey( .groupByKey(
(MapFunction<Tuple2<OaBrokerMainEntity, Relation>, String>) t -> t._2.getTarget(), (MapFunction<Tuple2<OaBrokerMainEntity, Relation>, String>) t -> t._2.getTarget(),
Encoders.STRING()) Encoders.STRING())
.agg(aggr) .agg(aggr)
.map( .map(t -> t._2, Encoders.bean(ResultGroup.class))
(MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.bean(ResultGroup.class))
.filter(rg -> rg.getData().size() > 1); .filter(rg -> rg.getData().size() > 1);
groups ClusterUtils.save(dataset, groupsPath, ResultGroup.class, total);
.write()
.mode(SaveMode.Overwrite)
.json(groupsPath);
}); });
} }

View File

@ -9,7 +9,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -54,6 +54,8 @@ public class PrepareRelatedDatasetsJob {
ClusterUtils.removeDir(spark, relsPath); ClusterUtils.removeDir(spark, relsPath);
final LongAccumulator total = spark.sparkContext().longAccumulator("total_rels");
final Dataset<OaBrokerRelatedDataset> datasets = ClusterUtils final Dataset<OaBrokerRelatedDataset> datasets = ClusterUtils
.readPath(spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class) .readPath(spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class)
.filter(d -> !ClusterUtils.isDedupRoot(d.getId())) .filter(d -> !ClusterUtils.isDedupRoot(d.getId()))
@ -67,16 +69,15 @@ public class PrepareRelatedDatasetsJob {
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) .filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
.filter(r -> !ClusterUtils.isDedupRoot(r.getTarget())); .filter(r -> !ClusterUtils.isDedupRoot(r.getTarget()));
rels final Dataset<RelatedDataset> dataset = rels
.joinWith(datasets, datasets.col("openaireId").equalTo(rels.col("target")), "inner") .joinWith(datasets, datasets.col("openaireId").equalTo(rels.col("target")), "inner")
.map(t -> { .map(t -> {
final RelatedDataset rel = new RelatedDataset(t._1.getSource(), t._2); final RelatedDataset rel = new RelatedDataset(t._1.getSource(), t._2);
rel.getRelDataset().setRelType(t._1.getRelClass()); rel.getRelDataset().setRelType(t._1.getRelClass());
return rel; return rel;
}, Encoders.bean(RelatedDataset.class)) }, Encoders.bean(RelatedDataset.class));
.write()
.mode(SaveMode.Overwrite) ClusterUtils.save(dataset, relsPath, RelatedDataset.class, total);
.json(relsPath);
}); });

View File

@ -9,7 +9,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -56,6 +56,8 @@ public class PrepareRelatedProjectsJob {
ClusterUtils.removeDir(spark, relsPath); ClusterUtils.removeDir(spark, relsPath);
final LongAccumulator total = spark.sparkContext().longAccumulator("total_rels");
final Dataset<OaBrokerProject> projects = ClusterUtils final Dataset<OaBrokerProject> projects = ClusterUtils
.readPath(spark, graphPath + "/project", Project.class) .readPath(spark, graphPath + "/project", Project.class)
.filter(p -> !ClusterUtils.isDedupRoot(p.getId())) .filter(p -> !ClusterUtils.isDedupRoot(p.getId()))
@ -69,12 +71,12 @@ public class PrepareRelatedProjectsJob {
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) .filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
.filter(r -> !ClusterUtils.isDedupRoot(r.getTarget())); .filter(r -> !ClusterUtils.isDedupRoot(r.getTarget()));
rels final Dataset<RelatedProject> dataset = rels
.joinWith(projects, projects.col("openaireId").equalTo(rels.col("target")), "inner") .joinWith(projects, projects.col("openaireId").equalTo(rels.col("target")), "inner")
.map(t -> new RelatedProject(t._1.getSource(), t._2), Encoders.bean(RelatedProject.class)) .map(t -> new RelatedProject(t._1.getSource(), t._2), Encoders.bean(RelatedProject.class));
.write()
.mode(SaveMode.Overwrite) ClusterUtils.save(dataset, relsPath, RelatedProject.class, total);
.json(relsPath);
}); });
} }

View File

@ -9,7 +9,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -55,6 +55,8 @@ public class PrepareRelatedPublicationsJob {
ClusterUtils.removeDir(spark, relsPath); ClusterUtils.removeDir(spark, relsPath);
final LongAccumulator total = spark.sparkContext().longAccumulator("total_rels");
final Dataset<OaBrokerRelatedPublication> pubs = ClusterUtils final Dataset<OaBrokerRelatedPublication> pubs = ClusterUtils
.readPath(spark, graphPath + "/publication", Publication.class) .readPath(spark, graphPath + "/publication", Publication.class)
.filter(p -> !ClusterUtils.isDedupRoot(p.getId())) .filter(p -> !ClusterUtils.isDedupRoot(p.getId()))
@ -70,16 +72,15 @@ public class PrepareRelatedPublicationsJob {
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) .filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
.filter(r -> !ClusterUtils.isDedupRoot(r.getTarget())); .filter(r -> !ClusterUtils.isDedupRoot(r.getTarget()));
rels final Dataset<RelatedPublication> dataset = rels
.joinWith(pubs, pubs.col("openaireId").equalTo(rels.col("target")), "inner") .joinWith(pubs, pubs.col("openaireId").equalTo(rels.col("target")), "inner")
.map(t -> { .map(t -> {
final RelatedPublication rel = new RelatedPublication(t._1.getSource(), t._2); final RelatedPublication rel = new RelatedPublication(t._1.getSource(), t._2);
rel.getRelPublication().setRelType(t._1.getRelClass()); rel.getRelPublication().setRelType(t._1.getRelClass());
return rel; return rel;
}, Encoders.bean(RelatedPublication.class)) }, Encoders.bean(RelatedPublication.class));
.write()
.mode(SaveMode.Overwrite) ClusterUtils.save(dataset, relsPath, RelatedPublication.class, total);
.json(relsPath);
}); });

View File

@ -9,7 +9,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -56,6 +56,8 @@ public class PrepareRelatedSoftwaresJob {
ClusterUtils.removeDir(spark, relsPath); ClusterUtils.removeDir(spark, relsPath);
final LongAccumulator total = spark.sparkContext().longAccumulator("total_rels");
final Dataset<OaBrokerRelatedSoftware> softwares = ClusterUtils final Dataset<OaBrokerRelatedSoftware> softwares = ClusterUtils
.readPath(spark, graphPath + "/software", Software.class) .readPath(spark, graphPath + "/software", Software.class)
.filter(sw -> !ClusterUtils.isDedupRoot(sw.getId())) .filter(sw -> !ClusterUtils.isDedupRoot(sw.getId()))
@ -69,12 +71,11 @@ public class PrepareRelatedSoftwaresJob {
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) .filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
.filter(r -> !ClusterUtils.isDedupRoot(r.getTarget())); .filter(r -> !ClusterUtils.isDedupRoot(r.getTarget()));
rels final Dataset<RelatedSoftware> dataset = rels
.joinWith(softwares, softwares.col("openaireId").equalTo(rels.col("target")), "inner") .joinWith(softwares, softwares.col("openaireId").equalTo(rels.col("target")), "inner")
.map(t -> new RelatedSoftware(t._1.getSource(), t._2), Encoders.bean(RelatedSoftware.class)) .map(t -> new RelatedSoftware(t._1.getSource(), t._2), Encoders.bean(RelatedSoftware.class));
.write()
.mode(SaveMode.Overwrite) ClusterUtils.save(dataset, relsPath, RelatedSoftware.class, total);
.json(relsPath);
}); });

View File

@ -9,8 +9,8 @@ import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -56,13 +56,14 @@ public class PrepareSimpleEntititiesJob {
ClusterUtils.removeDir(spark, simpleEntitiesPath); ClusterUtils.removeDir(spark, simpleEntitiesPath);
prepareSimpleEntities(spark, graphPath, Publication.class) final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
final Dataset<OaBrokerMainEntity> dataset = prepareSimpleEntities(spark, graphPath, Publication.class)
.union(prepareSimpleEntities(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class)) .union(prepareSimpleEntities(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class))
.union(prepareSimpleEntities(spark, graphPath, Software.class)) .union(prepareSimpleEntities(spark, graphPath, Software.class))
.union(prepareSimpleEntities(spark, graphPath, OtherResearchProduct.class)) .union(prepareSimpleEntities(spark, graphPath, OtherResearchProduct.class));
.write()
.mode(SaveMode.Overwrite) ClusterUtils.save(dataset, simpleEntitiesPath, OaBrokerMainEntity.class, total);
.json(simpleEntitiesPath);
}); });
} }

View File

@ -12,6 +12,7 @@ import java.util.stream.Collectors;
import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.spark.util.LongAccumulator;
import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
@ -36,7 +37,8 @@ public abstract class UpdateMatcher<T> {
public Collection<UpdateInfo<T>> searchUpdatesForRecord(final OaBrokerMainEntity res, public Collection<UpdateInfo<T>> searchUpdatesForRecord(final OaBrokerMainEntity res,
final Collection<OaBrokerMainEntity> others, final Collection<OaBrokerMainEntity> others,
final DedupConfig dedupConfig) { final DedupConfig dedupConfig,
final Map<String, LongAccumulator> accumulators) {
final Map<String, UpdateInfo<T>> infoMap = new HashMap<>(); final Map<String, UpdateInfo<T>> infoMap = new HashMap<>();
@ -67,9 +69,10 @@ public abstract class UpdateMatcher<T> {
if (values.isEmpty()) { if (values.isEmpty()) {
return new ArrayList<>(); return new ArrayList<>();
} else if (values.size() > maxNumber) { } else if (values.size() > maxNumber) {
System.err.println("Too many events (" + values.size() + ") matched by " + getClass().getSimpleName()); incrementAccumulator(accumulators, maxNumber);
return values.subList(0, maxNumber); return values.subList(0, maxNumber);
} else { } else {
incrementAccumulator(accumulators, values.size());
return values; return values;
} }
} }
@ -80,8 +83,8 @@ public abstract class UpdateMatcher<T> {
return list == null || list.isEmpty() || StringUtils.isBlank(list.get(0)); return list == null || list.isEmpty() || StringUtils.isBlank(list.get(0));
} }
protected boolean isMissing(final String field) { protected boolean isMissing(final String s) {
return StringUtils.isBlank(field); return StringUtils.isBlank(s);
} }
public int getMaxNumber() { public int getMaxNumber() {
@ -100,4 +103,14 @@ public abstract class UpdateMatcher<T> {
return highlightToStringFunction; return highlightToStringFunction;
} }
public String accumulatorName() {
return "event_matcher_" + getClass().getSimpleName().toLowerCase();
}
public void incrementAccumulator(final Map<String, LongAccumulator> accumulators, final long n) {
if (accumulators != null && accumulators.containsKey(accumulatorName())) {
accumulators.get(accumulatorName()).add(n);
}
}
} }

View File

@ -1,6 +1,7 @@
package eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets; package eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -9,6 +10,7 @@ import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.OaBrokerRelatedDataset; import eu.dnetlib.broker.objects.OaBrokerRelatedDataset;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
public abstract class AbstractEnrichMissingDataset extends UpdateMatcher<OaBrokerRelatedDataset> { public abstract class AbstractEnrichMissingDataset extends UpdateMatcher<OaBrokerRelatedDataset> {
@ -25,6 +27,10 @@ public abstract class AbstractEnrichMissingDataset extends UpdateMatcher<OaBroke
protected final List<OaBrokerRelatedDataset> findDifferences(final OaBrokerMainEntity source, protected final List<OaBrokerRelatedDataset> findDifferences(final OaBrokerMainEntity source,
final OaBrokerMainEntity target) { final OaBrokerMainEntity target) {
if (target.getDatasets().size() >= BrokerConstants.MAX_LIST_SIZE) {
return new ArrayList<>();
}
final Set<String> existingDatasets = target final Set<String> existingDatasets = target
.getDatasets() .getDatasets()
.stream() .stream()

View File

@ -1,6 +1,7 @@
package eu.dnetlib.dhp.broker.oa.matchers.relatedProjects; package eu.dnetlib.dhp.broker.oa.matchers.relatedProjects;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -9,6 +10,7 @@ import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.OaBrokerProject; import eu.dnetlib.broker.objects.OaBrokerProject;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
public class EnrichMoreProject extends UpdateMatcher<OaBrokerProject> { public class EnrichMoreProject extends UpdateMatcher<OaBrokerProject> {
@ -27,6 +29,10 @@ public class EnrichMoreProject extends UpdateMatcher<OaBrokerProject> {
protected List<OaBrokerProject> findDifferences(final OaBrokerMainEntity source, protected List<OaBrokerProject> findDifferences(final OaBrokerMainEntity source,
final OaBrokerMainEntity target) { final OaBrokerMainEntity target) {
if (target.getProjects().size() >= BrokerConstants.MAX_LIST_SIZE) {
return new ArrayList<>();
}
final Set<String> existingProjects = target final Set<String> existingProjects = target
.getProjects() .getProjects()
.stream() .stream()

View File

@ -1,6 +1,7 @@
package eu.dnetlib.dhp.broker.oa.matchers.relatedPublications; package eu.dnetlib.dhp.broker.oa.matchers.relatedPublications;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -9,6 +10,7 @@ import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.OaBrokerRelatedPublication; import eu.dnetlib.broker.objects.OaBrokerRelatedPublication;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
public abstract class AbstractEnrichMissingPublication extends UpdateMatcher<OaBrokerRelatedPublication> { public abstract class AbstractEnrichMissingPublication extends UpdateMatcher<OaBrokerRelatedPublication> {
@ -27,6 +29,10 @@ public abstract class AbstractEnrichMissingPublication extends UpdateMatcher<OaB
final OaBrokerMainEntity source, final OaBrokerMainEntity source,
final OaBrokerMainEntity target) { final OaBrokerMainEntity target) {
if (target.getPublications().size() >= BrokerConstants.MAX_LIST_SIZE) {
return new ArrayList<>();
}
final Set<String> existingPublications = target final Set<String> existingPublications = target
.getPublications() .getPublications()
.stream() .stream()

View File

@ -1,6 +1,7 @@
package eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware; package eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -9,6 +10,7 @@ import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware; import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
public class EnrichMoreSoftware extends UpdateMatcher<OaBrokerRelatedSoftware> { public class EnrichMoreSoftware extends UpdateMatcher<OaBrokerRelatedSoftware> {
@ -24,6 +26,10 @@ public class EnrichMoreSoftware extends UpdateMatcher<OaBrokerRelatedSoftware> {
final OaBrokerMainEntity source, final OaBrokerMainEntity source,
final OaBrokerMainEntity target) { final OaBrokerMainEntity target) {
if (target.getSoftwares().size() >= BrokerConstants.MAX_LIST_SIZE) {
return new ArrayList<>();
}
final Set<String> existingSoftwares = source final Set<String> existingSoftwares = source
.getSoftwares() .getSoftwares()
.stream() .stream()

View File

@ -1,6 +1,7 @@
package eu.dnetlib.dhp.broker.oa.matchers.simple; package eu.dnetlib.dhp.broker.oa.matchers.simple;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -11,6 +12,7 @@ import eu.dnetlib.broker.objects.OaBrokerAuthor;
import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
public class EnrichMissingAuthorOrcid extends UpdateMatcher<OaBrokerAuthor> { public class EnrichMissingAuthorOrcid extends UpdateMatcher<OaBrokerAuthor> {
@ -25,6 +27,10 @@ public class EnrichMissingAuthorOrcid extends UpdateMatcher<OaBrokerAuthor> {
protected List<OaBrokerAuthor> findDifferences(final OaBrokerMainEntity source, protected List<OaBrokerAuthor> findDifferences(final OaBrokerMainEntity source,
final OaBrokerMainEntity target) { final OaBrokerMainEntity target) {
if (target.getCreators().size() >= BrokerConstants.MAX_LIST_SIZE) {
return new ArrayList<>();
}
final Set<String> existingOrcids = target final Set<String> existingOrcids = target
.getCreators() .getCreators()
.stream() .stream()

View File

@ -1,6 +1,7 @@
package eu.dnetlib.dhp.broker.oa.matchers.simple; package eu.dnetlib.dhp.broker.oa.matchers.simple;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -23,6 +24,11 @@ public class EnrichMissingOpenAccess extends UpdateMatcher<OaBrokerInstance> {
@Override @Override
protected List<OaBrokerInstance> findDifferences(final OaBrokerMainEntity source, protected List<OaBrokerInstance> findDifferences(final OaBrokerMainEntity source,
final OaBrokerMainEntity target) { final OaBrokerMainEntity target) {
if (target.getInstances().size() >= BrokerConstants.MAX_LIST_SIZE) {
return new ArrayList<>();
}
final long count = target final long count = target
.getInstances() .getInstances()
.stream() .stream()

View File

@ -22,9 +22,8 @@ public class EnrichMissingPid extends UpdateMatcher<OaBrokerTypedValue> {
@Override @Override
protected List<OaBrokerTypedValue> findDifferences(final OaBrokerMainEntity source, protected List<OaBrokerTypedValue> findDifferences(final OaBrokerMainEntity source,
final OaBrokerMainEntity target) { final OaBrokerMainEntity target) {
final long count = target.getPids().size();
if (count > 0) { if (target.getPids().size() > 0) {
return Arrays.asList(); return Arrays.asList();
} }

View File

@ -1,6 +1,7 @@
package eu.dnetlib.dhp.broker.oa.matchers.simple; package eu.dnetlib.dhp.broker.oa.matchers.simple;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -9,6 +10,7 @@ import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.OaBrokerTypedValue; import eu.dnetlib.broker.objects.OaBrokerTypedValue;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
public class EnrichMissingSubject extends UpdateMatcher<OaBrokerTypedValue> { public class EnrichMissingSubject extends UpdateMatcher<OaBrokerTypedValue> {
@ -22,6 +24,11 @@ public class EnrichMissingSubject extends UpdateMatcher<OaBrokerTypedValue> {
@Override @Override
protected List<OaBrokerTypedValue> findDifferences(final OaBrokerMainEntity source, protected List<OaBrokerTypedValue> findDifferences(final OaBrokerMainEntity source,
final OaBrokerMainEntity target) { final OaBrokerMainEntity target) {
if (target.getSubjects().size() >= BrokerConstants.MAX_LIST_SIZE) {
return new ArrayList<>();
}
final Set<String> existingSubject = target final Set<String> existingSubject = target
.getSubjects() .getSubjects()
.stream() .stream()

View File

@ -1,6 +1,7 @@
package eu.dnetlib.dhp.broker.oa.matchers.simple; package eu.dnetlib.dhp.broker.oa.matchers.simple;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -23,6 +24,11 @@ public class EnrichMoreOpenAccess extends UpdateMatcher<OaBrokerInstance> {
@Override @Override
protected List<OaBrokerInstance> findDifferences(final OaBrokerMainEntity source, protected List<OaBrokerInstance> findDifferences(final OaBrokerMainEntity source,
final OaBrokerMainEntity target) { final OaBrokerMainEntity target) {
if (target.getInstances().size() >= BrokerConstants.MAX_LIST_SIZE) {
return new ArrayList<>();
}
final Set<String> urls = target final Set<String> urls = target
.getInstances() .getInstances()
.stream() .stream()

View File

@ -1,6 +1,7 @@
package eu.dnetlib.dhp.broker.oa.matchers.simple; package eu.dnetlib.dhp.broker.oa.matchers.simple;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -9,6 +10,7 @@ import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.OaBrokerTypedValue; import eu.dnetlib.broker.objects.OaBrokerTypedValue;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
public class EnrichMorePid extends UpdateMatcher<OaBrokerTypedValue> { public class EnrichMorePid extends UpdateMatcher<OaBrokerTypedValue> {
@ -22,6 +24,11 @@ public class EnrichMorePid extends UpdateMatcher<OaBrokerTypedValue> {
@Override @Override
protected List<OaBrokerTypedValue> findDifferences(final OaBrokerMainEntity source, protected List<OaBrokerTypedValue> findDifferences(final OaBrokerMainEntity source,
final OaBrokerMainEntity target) { final OaBrokerMainEntity target) {
if (target.getPids().size() >= BrokerConstants.MAX_LIST_SIZE) {
return new ArrayList<>();
}
final Set<String> existingPids = target final Set<String> existingPids = target
.getPids() .getPids()
.stream() .stream()

View File

@ -1,6 +1,7 @@
package eu.dnetlib.dhp.broker.oa.matchers.simple; package eu.dnetlib.dhp.broker.oa.matchers.simple;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -9,6 +10,7 @@ import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.OaBrokerTypedValue; import eu.dnetlib.broker.objects.OaBrokerTypedValue;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
public class EnrichMoreSubject extends UpdateMatcher<OaBrokerTypedValue> { public class EnrichMoreSubject extends UpdateMatcher<OaBrokerTypedValue> {
@ -23,6 +25,10 @@ public class EnrichMoreSubject extends UpdateMatcher<OaBrokerTypedValue> {
protected List<OaBrokerTypedValue> findDifferences(final OaBrokerMainEntity source, protected List<OaBrokerTypedValue> findDifferences(final OaBrokerMainEntity source,
final OaBrokerMainEntity target) { final OaBrokerMainEntity target) {
if (target.getSubjects().size() >= BrokerConstants.MAX_LIST_SIZE) {
return new ArrayList<>();
}
final Set<String> existingSubjects = target final Set<String> existingSubjects = target
.getSubjects() .getSubjects()
.stream() .stream()

View File

@ -19,6 +19,10 @@ public class BrokerConstants {
public static final int MAX_NUMBER_OF_RELS = 20; public static final int MAX_NUMBER_OF_RELS = 20;
public static final int MAX_STRING_SIZE = 3000;
public static final int MAX_LIST_SIZE = 50;
public static Class<?>[] getModelClasses() { public static Class<?>[] getModelClasses() {
final Set<Class<?>> list = new HashSet<>(); final Set<Class<?>> list = new HashSet<>();
list.addAll(Arrays.asList(ModelSupport.getOafModelClasses())); list.addAll(Arrays.asList(ModelSupport.getOafModelClasses()));

View File

@ -4,7 +4,9 @@ package eu.dnetlib.dhp.broker.oa.util;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
@ -44,4 +46,20 @@ public class ClusterUtils {
|| s.equals("isSupplementedTo"); || s.equals("isSupplementedTo");
} }
public static <T> T incrementAccumulator(final T o, final LongAccumulator acc) {
if (acc != null) {
acc.add(1);
}
return o;
}
public static <T> void save(final Dataset<T> dataset, final String path, final Class<T> clazz,
final LongAccumulator acc) {
dataset
.map(o -> ClusterUtils.incrementAccumulator(o, acc), Encoders.bean(clazz))
.write()
.mode(SaveMode.Overwrite)
.json(path);
}
} }

View File

@ -123,7 +123,8 @@ public class ConversionUtils {
res.setCollectedFromName(mappedFirst(result.getCollectedfrom(), KeyValue::getValue)); res.setCollectedFromName(mappedFirst(result.getCollectedfrom(), KeyValue::getValue));
res.setPids(mappedList(result.getPid(), ConversionUtils::oafPidToBrokerPid)); res.setPids(mappedList(result.getPid(), ConversionUtils::oafPidToBrokerPid));
res.setInstances(flatMappedList(result.getInstance(), ConversionUtils::oafInstanceToBrokerInstances)); res.setInstances(flatMappedList(result.getInstance(), ConversionUtils::oafInstanceToBrokerInstances));
res.setExternalReferences(mappedList(result.getExternalReference(), ConversionUtils::oafExtRefToBrokerExtRef)); res
.setExternalReferences(mappedList(result.getExternalReference(), ConversionUtils::oafExtRefToBrokerExtRef));
return res; return res;
} }
@ -245,7 +246,13 @@ public class ConversionUtils {
private static List<String> fieldList(final List<Field<String>> fl) { private static List<String> fieldList(final List<Field<String>> fl) {
return fl != null return fl != null
? fl.stream().map(Field::getValue).filter(StringUtils::isNotBlank).collect(Collectors.toList()) ? fl
.stream()
.map(Field::getValue)
.map(s -> StringUtils.abbreviate(s, BrokerConstants.MAX_STRING_SIZE))
.filter(StringUtils::isNotBlank)
.limit(BrokerConstants.MAX_LIST_SIZE)
.collect(Collectors.toList())
: new ArrayList<>(); : new ArrayList<>();
} }
@ -255,6 +262,7 @@ public class ConversionUtils {
.stream() .stream()
.map(StructuredProperty::getValue) .map(StructuredProperty::getValue)
.filter(StringUtils::isNotBlank) .filter(StringUtils::isNotBlank)
.limit(BrokerConstants.MAX_LIST_SIZE)
.collect(Collectors.toList()) .collect(Collectors.toList())
: new ArrayList<>(); : new ArrayList<>();
} }
@ -280,6 +288,7 @@ public class ConversionUtils {
.stream() .stream()
.map(func::apply) .map(func::apply)
.filter(Objects::nonNull) .filter(Objects::nonNull)
.limit(BrokerConstants.MAX_LIST_SIZE)
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
@ -293,6 +302,7 @@ public class ConversionUtils {
.map(func::apply) .map(func::apply)
.flatMap(List::stream) .flatMap(List::stream)
.filter(Objects::nonNull) .filter(Objects::nonNull)
.limit(BrokerConstants.MAX_LIST_SIZE)
.collect(Collectors.toList()); .collect(Collectors.toList());
} }

View File

@ -3,6 +3,9 @@ package eu.dnetlib.dhp.broker.oa.util;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import org.apache.spark.util.LongAccumulator;
import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.model.EventFactory; import eu.dnetlib.dhp.broker.model.EventFactory;
@ -35,7 +38,7 @@ import eu.dnetlib.pace.config.DedupConfig;
public class EventFinder { public class EventFinder {
private static List<UpdateMatcher<?>> matchers = new ArrayList<>(); private static final List<UpdateMatcher<?>> matchers = new ArrayList<>();
static { static {
matchers.add(new EnrichMissingAbstract()); matchers.add(new EnrichMissingAbstract());
matchers.add(new EnrichMissingAuthorOrcid()); matchers.add(new EnrichMissingAuthorOrcid());
@ -47,7 +50,7 @@ public class EventFinder {
matchers.add(new EnrichMorePid()); matchers.add(new EnrichMorePid());
matchers.add(new EnrichMoreSubject()); matchers.add(new EnrichMoreSubject());
// // Advanced matchers // Advanced matchers
matchers.add(new EnrichMissingProject()); matchers.add(new EnrichMissingProject());
matchers.add(new EnrichMoreProject()); matchers.add(new EnrichMoreProject());
matchers.add(new EnrichMissingSoftware()); matchers.add(new EnrichMissingSoftware());
@ -65,12 +68,14 @@ public class EventFinder {
matchers.add(new EnrichMissingAbstract()); matchers.add(new EnrichMissingAbstract());
} }
public static EventGroup generateEvents(final ResultGroup results, final DedupConfig dedupConfig) { public static EventGroup generateEvents(final ResultGroup results,
final DedupConfig dedupConfig,
final Map<String, LongAccumulator> accumulators) {
final List<UpdateInfo<?>> list = new ArrayList<>(); final List<UpdateInfo<?>> list = new ArrayList<>();
for (final OaBrokerMainEntity target : results.getData()) { for (final OaBrokerMainEntity target : results.getData()) {
for (final UpdateMatcher<?> matcher : matchers) { for (final UpdateMatcher<?> matcher : matchers) {
list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), dedupConfig)); list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), dedupConfig, accumulators));
} }
} }
@ -83,4 +88,8 @@ public class EventFinder {
return events; return events;
} }
public static List<UpdateMatcher<?>> getMatchers() {
return matchers;
}
} }

View File

@ -17,7 +17,14 @@
<name>dedupConfProfId</name> <name>dedupConfProfId</name>
<description>the id of a valid Dedup Configuration Profile</description> <description>the id of a valid Dedup Configuration Profile</description>
</property> </property>
<property>
<name>esIndexName</name>
<description>the elasticsearch index name</description>
</property>
<property>
<name>esIndexHost</name>
<description>the elasticsearch host</description>
</property>
<property> <property>
<name>sparkDriverMemory</name> <name>sparkDriverMemory</name>
<description>memory for driver process</description> <description>memory for driver process</description>
@ -359,6 +366,31 @@
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--dedupConfProfile</arg><arg>${dedupConfProfId}</arg> <arg>--dedupConfProfile</arg><arg>${dedupConfProfId}</arg>
</spark> </spark>
<ok to="index_es"/>
<error to="Kill"/>
</action>
<action name="index_es">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>IndexOnESJob</name>
<class>eu.dnetlib.dhp.broker.oa.IndexOnESJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.dynamicAllocation.maxExecutors="2"
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--index</arg><arg>${esIndexName}</arg>
<arg>--esHost</arg><arg>${esIndexHost}</arg>
</spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>

View File

@ -0,0 +1,20 @@
[
{
"paramName": "o",
"paramLongName": "workingPath",
"paramDescription": "the workinh path",
"paramRequired": true
},
{
"paramName": "idx",
"paramLongName": "index",
"paramDescription": "the ES index",
"paramRequired": true
},
{
"paramName": "es",
"paramLongName": "esHost",
"paramDescription": "the ES host",
"paramRequired": true
}
]

View File

@ -1,4 +1,4 @@
<workflow-app name="create broker events" xmlns="uri:oozie:workflow:0.5"> <workflow-app name="create broker events - partial" xmlns="uri:oozie:workflow:0.5">
<parameters> <parameters>
<property> <property>
@ -79,7 +79,6 @@
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="generate_events"> <action name="generate_events">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>

View File

@ -0,0 +1,125 @@
package eu.dnetlib.dhp.broker.oa.matchers;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Arrays;
import java.util.Collection;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPublicationDate;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
class UpdateMatcherTest {
UpdateMatcher<String> matcher = new EnrichMissingPublicationDate();
@BeforeEach
void setUp() throws Exception {
}
@Test
void testSearchUpdatesForRecord_1() {
final OaBrokerMainEntity res = new OaBrokerMainEntity();
final OaBrokerMainEntity p1 = new OaBrokerMainEntity();
final OaBrokerMainEntity p2 = new OaBrokerMainEntity();
final OaBrokerMainEntity p3 = new OaBrokerMainEntity();
final OaBrokerMainEntity p4 = new OaBrokerMainEntity();
final Collection<UpdateInfo<String>> list = matcher
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null);
assertTrue(list.isEmpty());
}
@Test
void testSearchUpdatesForRecord_2() {
final OaBrokerMainEntity res = new OaBrokerMainEntity();
final OaBrokerMainEntity p1 = new OaBrokerMainEntity();
final OaBrokerMainEntity p2 = new OaBrokerMainEntity();
final OaBrokerMainEntity p3 = new OaBrokerMainEntity();
final OaBrokerMainEntity p4 = new OaBrokerMainEntity();
res.setPublicationdate("2018");
final Collection<UpdateInfo<String>> list = matcher
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null);
assertTrue(list.isEmpty());
}
@Test
void testSearchUpdatesForRecord_3() {
final OaBrokerMainEntity res = new OaBrokerMainEntity();
final OaBrokerMainEntity p1 = new OaBrokerMainEntity();
final OaBrokerMainEntity p2 = new OaBrokerMainEntity();
final OaBrokerMainEntity p3 = new OaBrokerMainEntity();
final OaBrokerMainEntity p4 = new OaBrokerMainEntity();
p2.setPublicationdate("2018");
final Collection<UpdateInfo<String>> list = matcher
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null);
assertTrue(list.size() == 1);
}
@Test
void testSearchUpdatesForRecord_4() {
final OaBrokerMainEntity res = new OaBrokerMainEntity();
final OaBrokerMainEntity p1 = new OaBrokerMainEntity();
final OaBrokerMainEntity p2 = new OaBrokerMainEntity();
final OaBrokerMainEntity p3 = new OaBrokerMainEntity();
final OaBrokerMainEntity p4 = new OaBrokerMainEntity();
res.setPublicationdate("2018");
p2.setPublicationdate("2018");
final Collection<UpdateInfo<String>> list = matcher
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null);
assertTrue(list.isEmpty());
}
@Test
void testSearchUpdatesForRecord_5() {
final OaBrokerMainEntity res = new OaBrokerMainEntity();
final OaBrokerMainEntity p1 = new OaBrokerMainEntity();
final OaBrokerMainEntity p2 = new OaBrokerMainEntity();
final OaBrokerMainEntity p3 = new OaBrokerMainEntity();
final OaBrokerMainEntity p4 = new OaBrokerMainEntity();
res.setPublicationdate("2018");
p1.setPublicationdate("2018");
p2.setPublicationdate("2018");
p3.setPublicationdate("2018");
p4.setPublicationdate("2018");
final Collection<UpdateInfo<String>> list = matcher
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null);
assertTrue(list.isEmpty());
}
@Test
void testSearchUpdatesForRecord_6() {
final OaBrokerMainEntity res = new OaBrokerMainEntity();
final OaBrokerMainEntity p1 = new OaBrokerMainEntity();
final OaBrokerMainEntity p2 = new OaBrokerMainEntity();
final OaBrokerMainEntity p3 = new OaBrokerMainEntity();
final OaBrokerMainEntity p4 = new OaBrokerMainEntity();
p1.setPublicationdate("2018");
p2.setPublicationdate("2018");
p3.setPublicationdate("2018");
p4.setPublicationdate("2018");
final Collection<UpdateInfo<String>> list = matcher
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null);
assertTrue(list.size() == 1);
}
}

View File

@ -0,0 +1,57 @@
package eu.dnetlib.dhp.broker.oa.matchers.simple;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.List;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
class EnrichMissingPublicationDateTest {
final EnrichMissingPublicationDate matcher = new EnrichMissingPublicationDate();
@BeforeEach
void setUp() throws Exception {
}
@Test
void testFindDifferences_1() {
final OaBrokerMainEntity source = new OaBrokerMainEntity();
final OaBrokerMainEntity target = new OaBrokerMainEntity();
final List<String> list = matcher.findDifferences(source, target);
assertTrue(list.isEmpty());
}
@Test
void testFindDifferences_2() {
final OaBrokerMainEntity source = new OaBrokerMainEntity();
final OaBrokerMainEntity target = new OaBrokerMainEntity();
source.setPublicationdate("2018");
final List<String> list = matcher.findDifferences(source, target);
assertTrue(list.size() == 1);
}
@Test
void testFindDifferences_3() {
final OaBrokerMainEntity source = new OaBrokerMainEntity();
final OaBrokerMainEntity target = new OaBrokerMainEntity();
target.setPublicationdate("2018");
final List<String> list = matcher.findDifferences(source, target);
assertTrue(list.isEmpty());
}
@Test
void testFindDifferences_4() {
final OaBrokerMainEntity source = new OaBrokerMainEntity();
final OaBrokerMainEntity target = new OaBrokerMainEntity();
source.setPublicationdate("2018");
target.setPublicationdate("2018");
final List<String> list = matcher.findDifferences(source, target);
assertTrue(list.isEmpty());
}
}