Merge branch 'master' of code-repo.d4science.org:D-Net/dnet-hadoop

This commit is contained in:
Sandro La Bruzzo 2020-07-17 18:01:30 +02:00
commit 9116d75b3e
56 changed files with 1667 additions and 273 deletions

View File

@ -59,7 +59,6 @@
<dependency> <dependency>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<artifactId>dnet-openaire-broker-common</artifactId> <artifactId>dnet-openaire-broker-common</artifactId>
<version>[3.0.0-SNAPSHOT,)</version>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -11,6 +11,8 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils; import org.apache.commons.lang3.time.DateUtils;
import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
public class EventFactory { public class EventFactory {
@ -52,9 +54,11 @@ public class EventFactory {
final OaBrokerMainEntity source = updateInfo.getSource(); final OaBrokerMainEntity source = updateInfo.getSource();
final OaBrokerMainEntity target = updateInfo.getTarget(); final OaBrokerMainEntity target = updateInfo.getTarget();
map.setTargetDatasourceId(target.getCollectedFromId()); final OaBrokerRelatedDatasource targetDs = updateInfo.getTargetDs();
map.setTargetDatasourceName(target.getCollectedFromName());
map.setTargetDatasourceType(target.getCollectedFromType()); map.setTargetDatasourceId(targetDs.getOpenaireId());
map.setTargetDatasourceName(targetDs.getName());
map.setTargetDatasourceType(targetDs.getType());
map.setTargetResultId(target.getOpenaireId()); map.setTargetResultId(target.getOpenaireId());
@ -73,11 +77,19 @@ public class EventFactory {
// PROVENANCE INFO // PROVENANCE INFO
map.setTrust(updateInfo.getTrust()); map.setTrust(updateInfo.getTrust());
map.setProvenanceDatasourceId(source.getCollectedFromId());
map.setProvenanceDatasourceName(source.getCollectedFromName());
map.setProvenanceDatasourceType(source.getCollectedFromType());
map.setProvenanceResultId(source.getOpenaireId()); map.setProvenanceResultId(source.getOpenaireId());
source
.getDatasources()
.stream()
.filter(ds -> ds.getRelType().equals(BrokerConstants.COLLECTED_FROM_REL))
.findFirst()
.ifPresent(ds -> {
map.setProvenanceDatasourceId(ds.getOpenaireId());
map.setProvenanceDatasourceName(ds.getName());
map.setProvenanceDatasourceType(ds.getType());
});
return map; return map;
} }

View File

@ -0,0 +1,63 @@
package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.TypedColumn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.aggregators.stats.DatasourceStats;
import eu.dnetlib.dhp.broker.oa.util.aggregators.stats.StatsAggregator;
public class GenerateStatsJob {
private static final Logger log = LoggerFactory.getLogger(GenerateStatsJob.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
IndexOnESJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final SparkConf conf = new SparkConf();
final String eventsPath = parser.get("workingPath") + "/events";
log.info("eventsPath: {}", eventsPath);
final String statsPath = parser.get("workingPath") + "/stats";
log.info("stats: {}", statsPath);
final TypedColumn<Event, DatasourceStats> aggr = new StatsAggregator().toColumn();
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
final Dataset<DatasourceStats> stats = ClusterUtils
.readPath(spark, eventsPath, Event.class)
.groupByKey(e -> e.getMap().getTargetDatasourceId(), Encoders.STRING())
.agg(aggr)
.map(t -> t._2, Encoders.bean(DatasourceStats.class));
ClusterUtils.save(stats, statsPath, DatasourceStats.class, null);
});
}
}

View File

@ -17,8 +17,8 @@ import org.slf4j.LoggerFactory;
import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.AddDatasourceTypeAggregator; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDatasource;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.SimpleDatasourceInfo; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDatasourceAggregator;
import scala.Tuple2; import scala.Tuple2;
public class JoinStep0Job { public class JoinStep0Job {
@ -45,33 +45,33 @@ public class JoinStep0Job {
final String workingPath = parser.get("workingPath"); final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath); log.info("workingPath: {}", workingPath);
final String outputPath = workingPath + "/joinedEntities_step0"; final String joinedEntitiesPath = workingPath + "/joinedEntities_step0";
log.info("outputPath: {}", outputPath); log.info("joinedEntitiesPath: {}", joinedEntitiesPath);
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> { runWithSparkSession(conf, isSparkSessionManaged, spark -> {
ClusterUtils.removeDir(spark, outputPath); ClusterUtils.removeDir(spark, joinedEntitiesPath);
final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
final Dataset<OaBrokerMainEntity> sources = ClusterUtils final Dataset<OaBrokerMainEntity> sources = ClusterUtils
.readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class); .readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class);
final Dataset<SimpleDatasourceInfo> datasources = ClusterUtils final Dataset<RelatedDatasource> typedRels = ClusterUtils
.readPath(spark, workingPath + "/datasources", SimpleDatasourceInfo.class); .readPath(spark, workingPath + "/relatedDatasources", RelatedDatasource.class);
final TypedColumn<Tuple2<OaBrokerMainEntity, SimpleDatasourceInfo>, OaBrokerMainEntity> aggr = new AddDatasourceTypeAggregator() final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedDatasource>, OaBrokerMainEntity> aggr = new RelatedDatasourceAggregator()
.toColumn(); .toColumn();
final Dataset<OaBrokerMainEntity> dataset = sources final Dataset<OaBrokerMainEntity> dataset = sources
.joinWith(datasources, sources.col("collectedFromId").equalTo(datasources.col("id")), "inner") .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
.groupByKey(t -> t._1.getOpenaireId(), Encoders.STRING()) .groupByKey(t -> t._1.getOpenaireId(), Encoders.STRING())
.agg(aggr) .agg(aggr)
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)); .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));
ClusterUtils.save(dataset, outputPath, OaBrokerMainEntity.class, total); ClusterUtils.save(dataset, joinedEntitiesPath, OaBrokerMainEntity.class, total);
}); });

View File

@ -7,7 +7,6 @@ import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.TypedColumn; import org.apache.spark.sql.TypedColumn;
@ -65,9 +64,7 @@ public class JoinStep2Job {
final Dataset<OaBrokerMainEntity> dataset = sources final Dataset<OaBrokerMainEntity> dataset = sources
.joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
.groupByKey( .groupByKey(t -> t._1.getOpenaireId(), Encoders.STRING())
(MapFunction<Tuple2<OaBrokerMainEntity, RelatedSoftware>, String>) t -> t._1.getOpenaireId(),
Encoders.STRING())
.agg(aggr) .agg(aggr)
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)); .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));

View File

@ -9,14 +9,23 @@ import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator; import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.SimpleDatasourceInfo; import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.DatasourceRelationsAccumulator;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDatasource;
import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
import scala.Tuple3;
public class PrepareRelatedDatasourcesJob { public class PrepareRelatedDatasourcesJob {
@ -42,7 +51,7 @@ public class PrepareRelatedDatasourcesJob {
final String workingPath = parser.get("workingPath"); final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath); log.info("workingPath: {}", workingPath);
final String relsPath = workingPath + "/datasources"; final String relsPath = workingPath + "/relatedDatasources";
log.info("relsPath: {}", relsPath); log.info("relsPath: {}", relsPath);
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();
@ -53,16 +62,46 @@ public class PrepareRelatedDatasourcesJob {
final LongAccumulator total = spark.sparkContext().longAccumulator("total_datasources"); final LongAccumulator total = spark.sparkContext().longAccumulator("total_datasources");
final Dataset<SimpleDatasourceInfo> dataset = ClusterUtils final Dataset<Tuple3<String, String, String>> rels = prepareResultTuples(
.readPath(spark, graphPath + "/datasource", Datasource.class) spark, graphPath, Publication.class)
.map( .union(prepareResultTuples(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class))
ds -> new SimpleDatasourceInfo(ds.getId(), ds.getDatasourcetype().getClassid()), .union(prepareResultTuples(spark, graphPath, Software.class))
Encoders.bean(SimpleDatasourceInfo.class)); .union(prepareResultTuples(spark, graphPath, OtherResearchProduct.class));
ClusterUtils.save(dataset, relsPath, SimpleDatasourceInfo.class, total); final Dataset<OaBrokerRelatedDatasource> datasources = ClusterUtils
.readPath(spark, graphPath + "/datasource", Datasource.class)
.map(ConversionUtils::oafDatasourceToBrokerDatasource, Encoders.bean(OaBrokerRelatedDatasource.class));
final Dataset<RelatedDatasource> dataset = rels
.joinWith(datasources, datasources.col("openaireId").equalTo(rels.col("_2")), "inner")
.map(t -> {
final RelatedDatasource r = new RelatedDatasource();
r.setSource(t._1._1());
r.setRelDatasource(t._2);
r.getRelDatasource().setRelType(t._1._3());
return r;
}, Encoders.bean(RelatedDatasource.class));
ClusterUtils.save(dataset, relsPath, RelatedDatasource.class, total);
}); });
} }
private static final Dataset<Tuple3<String, String, String>> prepareResultTuples(final SparkSession spark,
final String graphPath,
final Class<? extends Result> sourceClass) {
return ClusterUtils
.readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass)
.filter(r -> !ClusterUtils.isDedupRoot(r.getId()))
.filter(r -> r.getDataInfo().getDeletedbyinference())
.map(
r -> DatasourceRelationsAccumulator.calculateTuples(r),
Encoders.bean(DatasourceRelationsAccumulator.class))
.flatMap(
acc -> acc.getRels().iterator(),
Encoders.tuple(Encoders.STRING(), Encoders.STRING(), Encoders.STRING()));
}
} }

View File

@ -15,6 +15,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.spark.util.LongAccumulator; import org.apache.spark.util.LongAccumulator;
import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
@ -34,18 +35,19 @@ public abstract class UpdateMatcher<T> {
this.highlightToStringFunction = highlightToStringFunction; this.highlightToStringFunction = highlightToStringFunction;
} }
public Collection<UpdateInfo<T>> searchUpdatesForRecord(final OaBrokerMainEntity res, public Collection<UpdateInfo<T>> searchUpdatesForRecord(final OaBrokerMainEntity target,
final OaBrokerRelatedDatasource targetDs,
final Collection<OaBrokerMainEntity> others, final Collection<OaBrokerMainEntity> others,
final Map<String, LongAccumulator> accumulators) { final Map<String, LongAccumulator> accumulators) {
final Map<String, UpdateInfo<T>> infoMap = new HashMap<>(); final Map<String, UpdateInfo<T>> infoMap = new HashMap<>();
for (final OaBrokerMainEntity source : others) { for (final OaBrokerMainEntity source : others) {
if (source != res) { if (source != target) {
for (final T hl : findDifferences(source, res)) { for (final T hl : findDifferences(source, target)) {
final Topic topic = getTopicFunction().apply(hl); final Topic topic = getTopicFunction().apply(hl);
if (topic != null) { if (topic != null) {
final UpdateInfo<T> info = new UpdateInfo<>(topic, hl, source, res, final UpdateInfo<T> info = new UpdateInfo<>(topic, hl, source, target, targetDs,
getCompileHighlightFunction(), getCompileHighlightFunction(),
getHighlightToStringFunction()); getHighlightToStringFunction());

View File

@ -14,6 +14,10 @@ public class BrokerConstants {
public static final String OPEN_ACCESS = "OPEN"; public static final String OPEN_ACCESS = "OPEN";
public static final String IS_MERGED_IN_CLASS = "isMergedIn"; public static final String IS_MERGED_IN_CLASS = "isMergedIn";
public static final String COLLECTED_FROM_REL = "collectedFrom";
public static final String HOSTED_BY_REL = "hostedBy";
public static final float MIN_TRUST = 0.25f; public static final float MIN_TRUST = 0.25f;
public static final float MAX_TRUST = 1.00f; public static final float MAX_TRUST = 1.00f;

View File

@ -22,11 +22,13 @@ import eu.dnetlib.broker.objects.OaBrokerJournal;
import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.OaBrokerProject; import eu.dnetlib.broker.objects.OaBrokerProject;
import eu.dnetlib.broker.objects.OaBrokerRelatedDataset; import eu.dnetlib.broker.objects.OaBrokerRelatedDataset;
import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
import eu.dnetlib.broker.objects.OaBrokerRelatedPublication; import eu.dnetlib.broker.objects.OaBrokerRelatedPublication;
import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware; import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware;
import eu.dnetlib.broker.objects.OaBrokerTypedValue; import eu.dnetlib.broker.objects.OaBrokerTypedValue;
import eu.dnetlib.dhp.schema.oaf.Author; import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.Dataset; import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.ExternalReference; import eu.dnetlib.dhp.schema.oaf.ExternalReference;
import eu.dnetlib.dhp.schema.oaf.Field; import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Instance; import eu.dnetlib.dhp.schema.oaf.Instance;
@ -119,8 +121,6 @@ public class ConversionUtils {
res res
.setJournal( .setJournal(
result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null); result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null);
res.setCollectedFromId(mappedFirst(result.getCollectedfrom(), KeyValue::getKey));
res.setCollectedFromName(mappedFirst(result.getCollectedfrom(), KeyValue::getValue));
res.setPids(mappedList(result.getPid(), ConversionUtils::oafPidToBrokerPid)); res.setPids(mappedList(result.getPid(), ConversionUtils::oafPidToBrokerPid));
res.setInstances(flatMappedList(result.getInstance(), ConversionUtils::oafInstanceToBrokerInstances)); res.setInstances(flatMappedList(result.getInstance(), ConversionUtils::oafInstanceToBrokerInstances));
res res
@ -223,6 +223,18 @@ public class ConversionUtils {
return res; return res;
} }
public static final OaBrokerRelatedDatasource oafDatasourceToBrokerDatasource(final Datasource ds) {
if (ds == null) {
return null;
}
final OaBrokerRelatedDatasource res = new OaBrokerRelatedDatasource();
res.setName(StringUtils.defaultIfBlank(fieldValue(ds.getOfficialname()), fieldValue(ds.getEnglishname())));
res.setOpenaireId(ds.getId());
res.setType(classId(ds.getDatasourcetype()));
return res;
}
private static String first(final List<String> list) { private static String first(final List<String> list) {
return list != null && list.size() > 0 ? list.get(0) : null; return list != null && list.size() > 0 ? list.get(0) : null;
} }

View File

@ -0,0 +1,68 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.dhp.schema.oaf.Result;
import scala.Tuple3;
public class DatasourceRelationsAccumulator implements Serializable {
/**
*
*/
private static final long serialVersionUID = 3256220670651218957L;
private List<Tuple3<String, String, String>> rels = new ArrayList<>();
public List<Tuple3<String, String, String>> getRels() {
return rels;
}
public void setRels(final List<Tuple3<String, String, String>> rels) {
this.rels = rels;
}
protected void addTuple(final Tuple3<String, String, String> t) {
rels.add(t);
}
public static final DatasourceRelationsAccumulator calculateTuples(final Result r) {
final Set<String> collectedFromSet = r
.getCollectedfrom()
.stream()
.map(kv -> kv.getKey())
.filter(StringUtils::isNotBlank)
.distinct()
.collect(Collectors.toSet());
final Set<String> hostedBySet = r
.getInstance()
.stream()
.map(i -> i.getHostedby())
.filter(Objects::nonNull)
.filter(kv -> !StringUtils.equalsIgnoreCase(kv.getValue(), "Unknown Repository"))
.map(kv -> kv.getKey())
.filter(StringUtils::isNotBlank)
.distinct()
.filter(id -> !collectedFromSet.contains(id))
.collect(Collectors.toSet());
final DatasourceRelationsAccumulator res = new DatasourceRelationsAccumulator();
collectedFromSet
.stream()
.map(s -> new Tuple3<>(r.getId(), s, BrokerConstants.COLLECTED_FROM_REL))
.forEach(res::addTuple);
hostedBySet.stream().map(s -> new Tuple3<>(r.getId(), s, BrokerConstants.HOSTED_BY_REL)).forEach(res::addTuple);
return res;
}
}

View File

@ -11,6 +11,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
import eu.dnetlib.dhp.broker.model.EventFactory; import eu.dnetlib.dhp.broker.model.EventFactory;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsReferencedBy; import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsReferencedBy;
@ -80,9 +81,11 @@ public class EventFinder {
final List<UpdateInfo<?>> list = new ArrayList<>(); final List<UpdateInfo<?>> list = new ArrayList<>();
for (final OaBrokerMainEntity target : results.getData()) { for (final OaBrokerMainEntity target : results.getData()) {
if (verifyTarget(target, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist)) { for (final OaBrokerRelatedDatasource targetDs : target.getDatasources()) {
if (verifyTarget(targetDs, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist)) {
for (final UpdateMatcher<?> matcher : matchers) { for (final UpdateMatcher<?> matcher : matchers) {
list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), accumulators)); list.addAll(matcher.searchUpdatesForRecord(target, targetDs, results.getData(), accumulators));
}
} }
} }
} }
@ -90,17 +93,17 @@ public class EventFinder {
return asEventGroup(list); return asEventGroup(list);
} }
private static boolean verifyTarget(final OaBrokerMainEntity target, private static boolean verifyTarget(final OaBrokerRelatedDatasource target,
final Set<String> dsIdWhitelist, final Set<String> dsIdWhitelist,
final Set<String> dsIdBlacklist, final Set<String> dsIdBlacklist,
final Set<String> dsTypeWhitelist) { final Set<String> dsTypeWhitelist) {
if (dsIdWhitelist.contains(target.getCollectedFromId())) { if (dsIdWhitelist.contains(target.getOpenaireId())) {
return true; return true;
} else if (dsIdBlacklist.contains(target.getCollectedFromId())) { } else if (dsIdBlacklist.contains(target.getOpenaireId())) {
return false; return false;
} else { } else {
return dsTypeWhitelist.contains(target.getCollectedFromType()); return dsTypeWhitelist.contains(target.getType());
} }
} }

View File

@ -8,6 +8,7 @@ import eu.dnetlib.broker.objects.OaBrokerEventPayload;
import eu.dnetlib.broker.objects.OaBrokerInstance; import eu.dnetlib.broker.objects.OaBrokerInstance;
import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.OaBrokerProvenance; import eu.dnetlib.broker.objects.OaBrokerProvenance;
import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
public final class UpdateInfo<T> { public final class UpdateInfo<T> {
@ -20,6 +21,8 @@ public final class UpdateInfo<T> {
private final OaBrokerMainEntity target; private final OaBrokerMainEntity target;
private final OaBrokerRelatedDatasource targetDs;
private final BiConsumer<OaBrokerMainEntity, T> compileHighlight; private final BiConsumer<OaBrokerMainEntity, T> compileHighlight;
private final Function<T, String> highlightToString; private final Function<T, String> highlightToString;
@ -28,12 +31,14 @@ public final class UpdateInfo<T> {
public UpdateInfo(final Topic topic, final T highlightValue, final OaBrokerMainEntity source, public UpdateInfo(final Topic topic, final T highlightValue, final OaBrokerMainEntity source,
final OaBrokerMainEntity target, final OaBrokerMainEntity target,
final OaBrokerRelatedDatasource targetDs,
final BiConsumer<OaBrokerMainEntity, T> compileHighlight, final BiConsumer<OaBrokerMainEntity, T> compileHighlight,
final Function<T, String> highlightToString) { final Function<T, String> highlightToString) {
this.topic = topic; this.topic = topic;
this.highlightValue = highlightValue; this.highlightValue = highlightValue;
this.source = source; this.source = source;
this.target = target; this.target = target;
this.targetDs = targetDs;
this.compileHighlight = compileHighlight; this.compileHighlight = compileHighlight;
this.highlightToString = highlightToString; this.highlightToString = highlightToString;
this.trust = TrustUtils.calculateTrust(source, target); this.trust = TrustUtils.calculateTrust(source, target);
@ -51,6 +56,10 @@ public final class UpdateInfo<T> {
return target; return target;
} }
public OaBrokerRelatedDatasource getTargetDs() {
return targetDs;
}
protected Topic getTopic() { protected Topic getTopic() {
return topic; return topic;
} }
@ -75,8 +84,20 @@ public final class UpdateInfo<T> {
compileHighlight.accept(hl, getHighlightValue()); compileHighlight.accept(hl, getHighlightValue());
final String provId = getSource().getOpenaireId(); final String provId = getSource().getOpenaireId();
final String provRepo = getSource().getCollectedFromName(); final String provRepo = getSource()
final String provType = getSource().getCollectedFromType(); .getDatasources()
.stream()
.filter(ds -> ds.getRelType().equals(BrokerConstants.COLLECTED_FROM_REL))
.map(ds -> ds.getName())
.findFirst()
.orElse("");
final String provType = getSource()
.getDatasources()
.stream()
.filter(ds -> ds.getRelType().equals(BrokerConstants.COLLECTED_FROM_REL))
.map(ds -> ds.getType())
.findFirst()
.orElse("");
final String provUrl = getSource() final String provUrl = getSource()
.getInstances() .getInstances()

View File

@ -0,0 +1,61 @@
package eu.dnetlib.dhp.broker.oa.util.aggregators.stats;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
public class DatasourceStats implements Serializable {
/**
*
*/
private static final long serialVersionUID = -282112564184047677L;
private String id;
private String name;
private String type;
private Map<String, Long> topics = new HashMap<>();
public String getId() {
return id;
}
public void setId(final String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(final String name) {
this.name = name;
}
public String getType() {
return type;
}
public void setType(final String type) {
this.type = type;
}
public Map<String, Long> getTopics() {
return topics;
}
public void setTopics(final Map<String, Long> topics) {
this.topics = topics;
}
public void incrementTopic(final String topic, final long inc) {
if (topics.containsKey(topic)) {
topics.put(topic, topics.get(topic) + inc);
} else {
topics.put(topic, inc);
}
}
}

View File

@ -0,0 +1,59 @@
package eu.dnetlib.dhp.broker.oa.util.aggregators.stats;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
import eu.dnetlib.dhp.broker.model.Event;
public class StatsAggregator extends Aggregator<Event, DatasourceStats, DatasourceStats> {
/**
*
*/
private static final long serialVersionUID = 6652105853037330529L;
@Override
public DatasourceStats zero() {
return new DatasourceStats();
}
@Override
public DatasourceStats reduce(final DatasourceStats stats, final Event e) {
stats.setId(e.getMap().getTargetDatasourceId());
stats.setName(e.getMap().getTargetDatasourceName());
stats.setType(e.getMap().getTargetDatasourceType());
stats.incrementTopic(e.getTopic(), 1l);
return stats;
}
@Override
public DatasourceStats merge(final DatasourceStats stats0, final DatasourceStats stats1) {
if (StringUtils.isBlank(stats0.getId())) {
stats0.setId(stats1.getId());
stats0.setName(stats1.getName());
stats0.setType(stats1.getType());
}
stats1.getTopics().entrySet().forEach(e -> stats0.incrementTopic(e.getKey(), e.getValue()));
return stats0;
}
@Override
public Encoder<DatasourceStats> bufferEncoder() {
return Encoders.bean(DatasourceStats.class);
}
@Override
public DatasourceStats finish(final DatasourceStats stats) {
return stats;
}
@Override
public Encoder<DatasourceStats> outputEncoder() {
return Encoders.bean(DatasourceStats.class);
}
}

View File

@ -0,0 +1,42 @@
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
import java.io.Serializable;
import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
public class RelatedDatasource implements Serializable {
/**
*
*/
private static final long serialVersionUID = 3015550240920424010L;
private String source;
private OaBrokerRelatedDatasource relDatasource;
public RelatedDatasource() {
}
public RelatedDatasource(final String source, final OaBrokerRelatedDatasource relDatasource) {
this.source = source;
this.relDatasource = relDatasource;
}
public String getSource() {
return source;
}
public void setSource(final String source) {
this.source = source;
}
public OaBrokerRelatedDatasource getRelDatasource() {
return relDatasource;
}
public void setRelDatasource(final OaBrokerRelatedDatasource relDatasource) {
this.relDatasource = relDatasource;
}
}

View File

@ -7,15 +7,16 @@ import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator; import org.apache.spark.sql.expressions.Aggregator;
import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
import scala.Tuple2; import scala.Tuple2;
public class AddDatasourceTypeAggregator public class RelatedDatasourceAggregator
extends Aggregator<Tuple2<OaBrokerMainEntity, SimpleDatasourceInfo>, OaBrokerMainEntity, OaBrokerMainEntity> { extends Aggregator<Tuple2<OaBrokerMainEntity, RelatedDatasource>, OaBrokerMainEntity, OaBrokerMainEntity> {
/** /**
* *
*/ */
private static final long serialVersionUID = 8788588975496014728L; private static final long serialVersionUID = -7212121913834713672L;
@Override @Override
public OaBrokerMainEntity zero() { public OaBrokerMainEntity zero() {
@ -29,10 +30,10 @@ public class AddDatasourceTypeAggregator
@Override @Override
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, public OaBrokerMainEntity reduce(final OaBrokerMainEntity g,
final Tuple2<OaBrokerMainEntity, SimpleDatasourceInfo> t) { final Tuple2<OaBrokerMainEntity, RelatedDatasource> t) {
final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1; final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1;
if (t._2 != null && StringUtils.isNotBlank(t._2.getType())) { if (t._2 != null && res.getDatasources().size() < BrokerConstants.MAX_NUMBER_OF_RELS) {
res.setCollectedFromType(t._2.getType()); res.getDatasources().add(t._2.getRelDatasource());
} }
return res; return res;
@ -40,7 +41,15 @@ public class AddDatasourceTypeAggregator
@Override @Override
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) { public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
if (StringUtils.isNotBlank(g1.getOpenaireId()) && StringUtils.isNotBlank(g1.getCollectedFromType())) { if (StringUtils.isNotBlank(g1.getOpenaireId())) {
final int availables = BrokerConstants.MAX_NUMBER_OF_RELS - g1.getDatasources().size();
if (availables > 0) {
if (g2.getDatasources().size() <= availables) {
g1.getDatasources().addAll(g2.getDatasources());
} else {
g1.getDatasources().addAll(g2.getDatasources().subList(0, availables));
}
}
return g1; return g1;
} else { } else {
return g2; return g2;
@ -56,4 +65,5 @@ public class AddDatasourceTypeAggregator
public Encoder<OaBrokerMainEntity> outputEncoder() { public Encoder<OaBrokerMainEntity> outputEncoder() {
return Encoders.bean(OaBrokerMainEntity.class); return Encoders.bean(OaBrokerMainEntity.class);
} }
} }

View File

@ -1,40 +0,0 @@
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
import java.io.Serializable;
public class SimpleDatasourceInfo implements Serializable {
/**
*
*/
private static final long serialVersionUID = 2996609859416024734L;
private String id;
private String type;
public SimpleDatasourceInfo() {
}
public SimpleDatasourceInfo(final String id, final String type) {
this.id = id;
this.type = type;
}
public String getId() {
return id;
}
public void setId(final String id) {
this.id = id;
}
public String getType() {
return type;
}
public void setType(final String type) {
this.type = type;
}
}

View File

@ -448,6 +448,30 @@
<arg>--index</arg><arg>${esIndexName}</arg> <arg>--index</arg><arg>${esIndexName}</arg>
<arg>--esHost</arg><arg>${esIndexHost}</arg> <arg>--esHost</arg><arg>${esIndexHost}</arg>
</spark> </spark>
<ok to="stats"/>
<error to="Kill"/>
</action>
<action name="stats">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateStatsJob</name>
<class>eu.dnetlib.dhp.broker.oa.GenerateStatsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>

View File

@ -64,19 +64,214 @@
</configuration> </configuration>
</global> </global>
<start to="count"/> <start to="join_entities_step0"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="count"> <action name="join_entities_step0">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Count</name> <name>JoinStep0</name>
<class>eu.dnetlib.dhp.broker.oa.CheckDuplictedIdsJob</class> <class>eu.dnetlib.dhp.broker.oa.JoinStep0Job</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="join_entities_step1"/>
<error to="Kill"/>
</action>
<action name="join_entities_step1">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>JoinStep1</name>
<class>eu.dnetlib.dhp.broker.oa.JoinStep1Job</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="join_entities_step2"/>
<error to="Kill"/>
</action>
<action name="join_entities_step2">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>JoinStep2</name>
<class>eu.dnetlib.dhp.broker.oa.JoinStep2Job</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="join_entities_step3"/>
<error to="Kill"/>
</action>
<action name="join_entities_step3">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>JoinStep3</name>
<class>eu.dnetlib.dhp.broker.oa.JoinStep3Job</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="join_entities_step4"/>
<error to="Kill"/>
</action>
<action name="join_entities_step4">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>JoinStep4</name>
<class>eu.dnetlib.dhp.broker.oa.JoinStep4Job</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="prepare_groups"/>
<error to="Kill"/>
</action>
<action name="prepare_groups">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareGroupsJob</name>
<class>eu.dnetlib.dhp.broker.oa.PrepareGroupsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="generate_events"/>
<error to="Kill"/>
</action>
<action name="generate_events">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateEventsJob</name>
<class>eu.dnetlib.dhp.broker.oa.GenerateEventsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--datasourceIdWhitelist</arg><arg>${datasourceIdWhitelist}</arg>
<arg>--datasourceTypeWhitelist</arg><arg>${datasourceTypeWhitelist}</arg>
<arg>--datasourceIdBlacklist</arg><arg>${datasourceIdBlacklist}</arg>
</spark>
<ok to="index_es"/>
<error to="Kill"/>
</action>
<action name="index_es">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>IndexOnESJob</name>
<class>eu.dnetlib.dhp.broker.oa.IndexOnESJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.dynamicAllocation.maxExecutors="8"
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--index</arg><arg>${esIndexName}</arg>
<arg>--esHost</arg><arg>${esIndexHost}</arg>
</spark>
<ok to="stats"/>
<error to="Kill"/>
</action>
<action name="stats">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateStatsJob</name>
<class>eu.dnetlib.dhp.broker.oa.GenerateStatsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar> <jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}

View File

@ -8,15 +8,23 @@ import java.util.Collection;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPublicationDate; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPublicationDate;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
@ExtendWith(MockitoExtension.class)
class UpdateMatcherTest { class UpdateMatcherTest {
UpdateMatcher<String> matcher = new EnrichMissingPublicationDate(); UpdateMatcher<String> matcher = new EnrichMissingPublicationDate();
@Mock
private OaBrokerRelatedDatasource targetDs;
@BeforeEach @BeforeEach
void setUp() throws Exception { void setUp() throws Exception {
} }
@ -30,7 +38,7 @@ class UpdateMatcherTest {
final OaBrokerMainEntity p4 = new OaBrokerMainEntity(); final OaBrokerMainEntity p4 = new OaBrokerMainEntity();
final Collection<UpdateInfo<String>> list = matcher final Collection<UpdateInfo<String>> list = matcher
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null); .searchUpdatesForRecord(res, targetDs, Arrays.asList(p1, p2, p3, p4), null);
assertTrue(list.isEmpty()); assertTrue(list.isEmpty());
} }
@ -46,7 +54,7 @@ class UpdateMatcherTest {
res.setPublicationdate("2018"); res.setPublicationdate("2018");
final Collection<UpdateInfo<String>> list = matcher final Collection<UpdateInfo<String>> list = matcher
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null); .searchUpdatesForRecord(res, targetDs, Arrays.asList(p1, p2, p3, p4), null);
assertTrue(list.isEmpty()); assertTrue(list.isEmpty());
} }
@ -62,7 +70,7 @@ class UpdateMatcherTest {
p2.setPublicationdate("2018"); p2.setPublicationdate("2018");
final Collection<UpdateInfo<String>> list = matcher final Collection<UpdateInfo<String>> list = matcher
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null); .searchUpdatesForRecord(res, targetDs, Arrays.asList(p1, p2, p3, p4), null);
assertTrue(list.size() == 1); assertTrue(list.size() == 1);
} }
@ -79,7 +87,7 @@ class UpdateMatcherTest {
p2.setPublicationdate("2018"); p2.setPublicationdate("2018");
final Collection<UpdateInfo<String>> list = matcher final Collection<UpdateInfo<String>> list = matcher
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null); .searchUpdatesForRecord(res, targetDs, Arrays.asList(p1, p2, p3, p4), null);
assertTrue(list.isEmpty()); assertTrue(list.isEmpty());
} }
@ -98,7 +106,7 @@ class UpdateMatcherTest {
p4.setPublicationdate("2018"); p4.setPublicationdate("2018");
final Collection<UpdateInfo<String>> list = matcher final Collection<UpdateInfo<String>> list = matcher
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null); .searchUpdatesForRecord(res, targetDs, Arrays.asList(p1, p2, p3, p4), null);
assertTrue(list.isEmpty()); assertTrue(list.isEmpty());
} }
@ -117,7 +125,7 @@ class UpdateMatcherTest {
p4.setPublicationdate("2018"); p4.setPublicationdate("2018");
final Collection<UpdateInfo<String>> list = matcher final Collection<UpdateInfo<String>> list = matcher
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null); .searchUpdatesForRecord(res, targetDs, Arrays.asList(p1, p2, p3, p4), null);
assertTrue(list.size() == 1); assertTrue(list.size() == 1);
} }

View File

@ -28,6 +28,8 @@ import eu.dnetlib.pace.config.DedupConfig;
abstract class AbstractSparkAction implements Serializable { abstract class AbstractSparkAction implements Serializable {
protected static final int NUM_PARTITIONS = 1000;
protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

View File

@ -100,6 +100,11 @@ public class DedupUtility {
return String.format("%s/%s/%s_mergerel", basePath, actionSetId, entityType); return String.format("%s/%s/%s_mergerel", basePath, actionSetId, entityType);
} }
public static String createBlockStatsPath(
final String basePath, final String actionSetId, final String entityType) {
return String.format("%s/%s/%s_blockstats", basePath, actionSetId, entityType);
}
public static List<DedupConfig> getConfigurations(String isLookUpUrl, String orchestrator) public static List<DedupConfig> getConfigurations(String isLookUpUrl, String orchestrator)
throws ISLookUpException, DocumentException { throws ISLookUpException, DocumentException {
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookUpUrl); final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookUpUrl);

View File

@ -52,6 +52,7 @@ public class Deduper implements Serializable {
.collect(Collectors.toList()) .collect(Collectors.toList())
.iterator()) .iterator())
.mapToPair(block -> new Tuple2<>(block.getKey(), block)) .mapToPair(block -> new Tuple2<>(block.getKey(), block))
.reduceByKey((b1, b2) -> Block.from(b1, b2, of, maxQueueSize)); .reduceByKey((b1, b2) -> Block.from(b1, b2, of, maxQueueSize))
.filter(b -> b._2().getDocuments().size() > 1);
} }
} }

View File

@ -0,0 +1,57 @@
package eu.dnetlib.dhp.oa.dedup;
import java.util.Objects;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class RelationAggregator extends Aggregator<Relation, Relation, Relation> {
private static Relation ZERO = new Relation();
@Override
public Relation zero() {
return ZERO;
}
@Override
public Relation reduce(Relation b, Relation a) {
return mergeRel(b, a);
}
@Override
public Relation merge(Relation b, Relation a) {
return mergeRel(b, a);
}
@Override
public Relation finish(Relation r) {
return r;
}
private Relation mergeRel(Relation b, Relation a) {
if (Objects.equals(b, ZERO)) {
return a;
}
if (Objects.equals(a, ZERO)) {
return b;
}
b.mergeFrom(a);
return b;
}
@Override
public Encoder<Relation> bufferEncoder() {
return Encoders.kryo(Relation.class);
}
@Override
public Encoder<Relation> outputEncoder() {
return Encoders.kryo(Relation.class);
}
}

View File

@ -0,0 +1,126 @@
package eu.dnetlib.dhp.oa.dedup;
import java.io.IOException;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.dom4j.DocumentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.model.Block;
import eu.dnetlib.dhp.oa.dedup.model.BlockStats;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.util.MapDocumentUtil;
import scala.Tuple2;
public class SparkBlockStats extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkBlockStats.class);
public SparkBlockStats(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark);
}
public static void main(String[] args) throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkBlockStats.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/createBlockStats_parameters.json")));
parser.parseArgument(args);
SparkConf conf = new SparkConf();
new SparkBlockStats(parser, getSparkSession(conf))
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
}
public Long computeComparisons(Long blockSize, Long slidingWindowSize) {
if (slidingWindowSize >= blockSize)
return (slidingWindowSize * (slidingWindowSize - 1)) / 2;
else {
return (blockSize - slidingWindowSize + 1) * (slidingWindowSize * (slidingWindowSize - 1)) / 2;
}
}
@Override
public void run(ISLookUpService isLookUpService)
throws DocumentException, IOException, ISLookUpException {
// read oozie parameters
final String graphBasePath = parser.get("graphBasePath");
final String isLookUpUrl = parser.get("isLookUpUrl");
final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath");
final int numPartitions = Optional
.ofNullable(parser.get("numPartitions"))
.map(Integer::valueOf)
.orElse(NUM_PARTITIONS);
log.info("graphBasePath: '{}'", graphBasePath);
log.info("isLookUpUrl: '{}'", isLookUpUrl);
log.info("actionSetId: '{}'", actionSetId);
log.info("workingPath: '{}'", workingPath);
// for each dedup configuration
for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) {
final String subEntity = dedupConf.getWf().getSubEntityValue();
log.info("Creating blockstats for: '{}'", subEntity);
final String outputPath = DedupUtility.createBlockStatsPath(workingPath, actionSetId, subEntity);
removeOutputDir(spark, outputPath);
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaPairRDD<String, MapDocument> mapDocuments = sc
.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
.repartition(numPartitions)
.mapToPair(
(PairFunction<String, String, MapDocument>) s -> {
MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s);
return new Tuple2<>(d.getIdentifier(), d);
});
// create blocks for deduplication
JavaRDD<BlockStats> blockStats = Deduper
.createSortedBlocks(mapDocuments, dedupConf)
.repartition(numPartitions)
.map(b -> asBlockStats(dedupConf, b));
// save the blockstats in the workingdir
spark
.createDataset(blockStats.rdd(), Encoders.bean(BlockStats.class))
.write()
.mode(SaveMode.Overwrite)
.save(outputPath);
}
}
private BlockStats asBlockStats(DedupConfig dedupConf, Tuple2<String, Block> b) {
return new BlockStats(
b._1(),
(long) b._2().getDocuments().size(),
computeComparisons(
(long) b._2().getDocuments().size(), (long) dedupConf.getWf().getSlidingWindowSize()));
}
}

View File

@ -5,11 +5,13 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.graphx.Edge; import org.apache.spark.graphx.Edge;
import org.apache.spark.rdd.RDD; import org.apache.spark.rdd.RDD;
@ -75,7 +77,11 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
final String workingPath = parser.get("workingPath"); final String workingPath = parser.get("workingPath");
final String isLookUpUrl = parser.get("isLookUpUrl"); final String isLookUpUrl = parser.get("isLookUpUrl");
final String actionSetId = parser.get("actionSetId"); final String actionSetId = parser.get("actionSetId");
int cut = Optional
.ofNullable(parser.get("cutConnectedComponent"))
.map(Integer::valueOf)
.orElse(0);
log.info("connected component cut: '{}'", cut);
log.info("graphBasePath: '{}'", graphBasePath); log.info("graphBasePath: '{}'", graphBasePath);
log.info("isLookUpUrl: '{}'", isLookUpUrl); log.info("isLookUpUrl: '{}'", isLookUpUrl);
log.info("actionSetId: '{}'", actionSetId); log.info("actionSetId: '{}'", actionSetId);
@ -100,8 +106,10 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
final RDD<Edge<String>> edgeRdd = spark final RDD<Edge<String>> edgeRdd = spark
.read() .read()
.load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)) .textFile(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity))
.as(Encoders.bean(Relation.class)) .map(
(MapFunction<String, Relation>) r -> OBJECT_MAPPER.readValue(r, Relation.class),
Encoders.bean(Relation.class))
.javaRDD() .javaRDD()
.map(it -> new Edge<>(hash(it.getSource()), hash(it.getTarget()), it.getRelClass())) .map(it -> new Edge<>(hash(it.getSource()), hash(it.getTarget()), it.getRelClass()))
.rdd(); .rdd();
@ -109,7 +117,7 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
final Dataset<Relation> mergeRels = spark final Dataset<Relation> mergeRels = spark
.createDataset( .createDataset(
GraphProcessor GraphProcessor
.findCCs(vertexes.rdd(), edgeRdd, maxIterations) .findCCs(vertexes.rdd(), edgeRdd, maxIterations, cut)
.toJavaRDD() .toJavaRDD()
.filter(k -> k.getDocIds().size() > 1) .filter(k -> k.getDocIds().size() > 1)
.flatMap(cc -> ccToMergeRel(cc, dedupConf)) .flatMap(cc -> ccToMergeRel(cc, dedupConf))
@ -117,6 +125,7 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
Encoders.bean(Relation.class)); Encoders.bean(Relation.class));
mergeRels.write().mode(SaveMode.Append).parquet(mergeRelPath); mergeRels.write().mode(SaveMode.Append).parquet(mergeRelPath);
} }
} }

View File

@ -2,6 +2,7 @@
package eu.dnetlib.dhp.oa.dedup; package eu.dnetlib.dhp.oa.dedup;
import java.io.IOException; import java.io.IOException;
import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -48,13 +49,6 @@ public class SparkCreateSimRels extends AbstractSparkAction {
parser.parseArgument(args); parser.parseArgument(args);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf
.registerKryoClasses(
new Class[] {
MapDocument.class, FieldListImpl.class, FieldValueImpl.class, Block.class
});
new SparkCreateSimRels(parser, getSparkSession(conf)) new SparkCreateSimRels(parser, getSparkSession(conf))
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
} }
@ -68,7 +62,12 @@ public class SparkCreateSimRels extends AbstractSparkAction {
final String isLookUpUrl = parser.get("isLookUpUrl"); final String isLookUpUrl = parser.get("isLookUpUrl");
final String actionSetId = parser.get("actionSetId"); final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath"); final String workingPath = parser.get("workingPath");
final int numPartitions = Optional
.ofNullable(parser.get("numPartitions"))
.map(Integer::valueOf)
.orElse(NUM_PARTITIONS);
log.info("numPartitions: '{}'", numPartitions);
log.info("graphBasePath: '{}'", graphBasePath); log.info("graphBasePath: '{}'", graphBasePath);
log.info("isLookUpUrl: '{}'", isLookUpUrl); log.info("isLookUpUrl: '{}'", isLookUpUrl);
log.info("actionSetId: '{}'", actionSetId); log.info("actionSetId: '{}'", actionSetId);
@ -88,6 +87,7 @@ public class SparkCreateSimRels extends AbstractSparkAction {
JavaPairRDD<String, MapDocument> mapDocuments = sc JavaPairRDD<String, MapDocument> mapDocuments = sc
.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
.repartition(numPartitions)
.mapToPair( .mapToPair(
(PairFunction<String, String, MapDocument>) s -> { (PairFunction<String, String, MapDocument>) s -> {
MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s);
@ -95,19 +95,17 @@ public class SparkCreateSimRels extends AbstractSparkAction {
}); });
// create blocks for deduplication // create blocks for deduplication
JavaPairRDD<String, Block> blocks = Deduper.createSortedBlocks(mapDocuments, dedupConf); JavaPairRDD<String, Block> blocks = Deduper
.createSortedBlocks(mapDocuments, dedupConf)
.repartition(numPartitions);
// create relations by comparing only elements in the same group // create relations by comparing only elements in the same group
JavaRDD<Relation> relations = Deduper Deduper
.computeRelations(sc, blocks, dedupConf) .computeRelations(sc, blocks, dedupConf)
.map(t -> createSimRel(t._1(), t._2(), entity)); .map(t -> createSimRel(t._1(), t._2(), entity))
.repartition(numPartitions)
// save the simrel in the workingdir .map(r -> OBJECT_MAPPER.writeValueAsString(r))
spark .saveAsTextFile(outputPath);
.createDataset(relations.rdd(), Encoders.bean(Relation.class))
.write()
.mode(SaveMode.Append)
.save(outputPath);
} }
} }

View File

@ -4,12 +4,16 @@ package eu.dnetlib.dhp.oa.dedup;
import static org.apache.spark.sql.functions.col; import static org.apache.spark.sql.functions.col;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.DataInfo;
@ -95,7 +99,24 @@ public class SparkPropagateRelation extends AbstractSparkAction {
FieldType.TARGET, FieldType.TARGET,
getDeletedFn()); getDeletedFn());
save(newRels.union(updated).union(mergeRels), outputRelationPath, SaveMode.Overwrite); save(
distinctRelations(
newRels
.union(updated)
.union(mergeRels)
.map((MapFunction<Relation, Relation>) r -> r, Encoders.kryo(Relation.class))),
outputRelationPath, SaveMode.Overwrite);
}
private Dataset<Relation> distinctRelations(Dataset<Relation> rels) {
return rels
.filter(getRelationFilterFunction())
.groupByKey(
(MapFunction<Relation, String>) r -> String
.join(r.getSource(), r.getTarget(), r.getRelType(), r.getSubRelType(), r.getRelClass()),
Encoders.STRING())
.agg(new RelationAggregator().toColumn())
.map((MapFunction<Tuple2<String, Relation>, Relation>) t -> t._2(), Encoders.bean(Relation.class));
} }
private static Dataset<Relation> processDataset( private static Dataset<Relation> processDataset(
@ -112,6 +133,14 @@ public class SparkPropagateRelation extends AbstractSparkAction {
.map(mapFn, Encoders.bean(Relation.class)); .map(mapFn, Encoders.bean(Relation.class));
} }
private FilterFunction<Relation> getRelationFilterFunction() {
return (FilterFunction<Relation>) r -> StringUtils.isNotBlank(r.getSource()) ||
StringUtils.isNotBlank(r.getTarget()) ||
StringUtils.isNotBlank(r.getRelClass()) ||
StringUtils.isNotBlank(r.getSubRelType()) ||
StringUtils.isNotBlank(r.getRelClass());
}
private static MapFunction<String, Relation> patchRelFn() { private static MapFunction<String, Relation> patchRelFn() {
return value -> { return value -> {
final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);

View File

@ -4,6 +4,7 @@ package eu.dnetlib.dhp.oa.dedup.graph;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.codehaus.jackson.annotate.JsonIgnore; import org.codehaus.jackson.annotate.JsonIgnore;
@ -18,12 +19,17 @@ public class ConnectedComponent implements Serializable {
private Set<String> docIds; private Set<String> docIds;
private String ccId; private String ccId;
public ConnectedComponent() { public ConnectedComponent(Set<String> docIds, final int cut) {
}
public ConnectedComponent(Set<String> docIds) {
this.docIds = docIds; this.docIds = docIds;
createID(); createID();
if (cut > 0 && docIds.size() > cut) {
this.docIds = docIds
.stream()
.filter(s -> !ccId.equalsIgnoreCase(s))
.limit(cut - 1)
.collect(Collectors.toSet());
this.docIds.add(ccId);
}
} }
public String createID() { public String createID() {
@ -41,6 +47,7 @@ public class ConnectedComponent implements Serializable {
public String getMin() { public String getMin() {
final StringBuilder min = new StringBuilder(); final StringBuilder min = new StringBuilder();
docIds docIds
.forEach( .forEach(
i -> { i -> {

View File

@ -7,7 +7,7 @@ import scala.collection.JavaConversions;
object GraphProcessor { object GraphProcessor {
def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], maxIterations: Int): RDD[ConnectedComponent] = { def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], maxIterations: Int, cut:Int): RDD[ConnectedComponent] = {
val graph: Graph[String, String] = Graph(vertexes, edges).partitionBy(PartitionStrategy.RandomVertexCut) //TODO remember to remove partitionby val graph: Graph[String, String] = Graph(vertexes, edges).partitionBy(PartitionStrategy.RandomVertexCut) //TODO remember to remove partitionby
val cc = graph.connectedComponents(maxIterations).vertices val cc = graph.connectedComponents(maxIterations).vertices
@ -22,15 +22,15 @@ object GraphProcessor {
} }
} }
val connectedComponents = joinResult.groupByKey() val connectedComponents = joinResult.groupByKey()
.map[ConnectedComponent](cc => asConnectedComponent(cc)) .map[ConnectedComponent](cc => asConnectedComponent(cc, cut))
connectedComponents connectedComponents
} }
def asConnectedComponent(group: (VertexId, Iterable[String])): ConnectedComponent = { def asConnectedComponent(group: (VertexId, Iterable[String]), cut:Int): ConnectedComponent = {
val docs = group._2.toSet[String] val docs = group._2.toSet[String]
val connectedComponent = new ConnectedComponent(JavaConversions.setAsJavaSet[String](docs)); val connectedComponent = new ConnectedComponent(JavaConversions.setAsJavaSet[String](docs), cut);
connectedComponent connectedComponent
} }

View File

@ -0,0 +1,45 @@
package eu.dnetlib.dhp.oa.dedup.model;
import java.io.Serializable;
public class BlockStats implements Serializable {
private String key; // key of the block
private Long size; // number of elements in the block
private Long comparisons; // number of comparisons in the block
public BlockStats() {
}
public BlockStats(String key, Long size, Long comparisons) {
this.key = key;
this.size = size;
this.comparisons = comparisons;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public Long getSize() {
return size;
}
public void setSize(Long size) {
this.size = size;
}
public Long getComparisons() {
return comparisons;
}
public void setComparisons(Long comparisons) {
this.comparisons = comparisons;
}
}

View File

@ -0,0 +1,32 @@
[
{
"paramName": "la",
"paramLongName": "isLookUpUrl",
"paramDescription": "address for the LookUp",
"paramRequired": true
},
{
"paramName": "asi",
"paramLongName": "actionSetId",
"paramDescription": "action set identifier (name of the orchestrator)",
"paramRequired": true
},
{
"paramName": "i",
"paramLongName": "graphBasePath",
"paramDescription": "the base path of the raw graph",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "workingPath",
"paramDescription": "path of the working directory",
"paramRequired": true
},
{
"paramName": "np",
"paramLongName": "numPartitions",
"paramDescription": "number of partitions for the similarity relations intermediate phases",
"paramRequired": false
}
]

View File

@ -17,6 +17,12 @@
"paramDescription": "the url for the lookup service", "paramDescription": "the url for the lookup service",
"paramRequired": true "paramRequired": true
}, },
{
"paramName": "cc",
"paramLongName": "cutConnectedComponent",
"paramDescription": "the number of maximum elements that belongs to a connected components",
"paramRequired": false
},
{ {
"paramName": "w", "paramName": "w",
"paramLongName": "workingPath", "paramLongName": "workingPath",

View File

@ -22,5 +22,11 @@
"paramLongName": "workingPath", "paramLongName": "workingPath",
"paramDescription": "path of the working directory", "paramDescription": "path of the working directory",
"paramRequired": true "paramRequired": true
},
{
"paramName": "np",
"paramLongName": "numPartitions",
"paramDescription": "number of partitions for the similarity relations intermediate phases",
"paramRequired": false
} }
] ]

View File

@ -20,6 +20,10 @@
<name>dedupGraphPath</name> <name>dedupGraphPath</name>
<description>path for the output graph</description> <description>path for the output graph</description>
</property> </property>
<property>
<name>cutConnectedComponent</name>
<description>max number of elements in a connected component</description>
</property>
<property> <property>
<name>sparkDriverMemory</name> <name>sparkDriverMemory</name>
<description>memory for driver process</description> <description>memory for driver process</description>
@ -106,10 +110,11 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--i</arg><arg>${graphBasePath}</arg> <arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--la</arg><arg>${isLookUpUrl}</arg> <arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--asi</arg><arg>${actionSetId}</arg> <arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--w</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--numPartitions</arg><arg>8000</arg>
</spark> </spark>
<ok to="CreateMergeRel"/> <ok to="CreateMergeRel"/>
<error to="Kill"/> <error to="Kill"/>
@ -132,10 +137,11 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--i</arg><arg>${graphBasePath}</arg> <arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--w</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--la</arg><arg>${isLookUpUrl}</arg> <arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--asi</arg><arg>${actionSetId}</arg> <arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--cutConnectedComponent</arg><arg>${cutConnectedComponent}</arg>
</spark> </spark>
<ok to="CreateDedupRecord"/> <ok to="CreateDedupRecord"/>
<error to="Kill"/> <error to="Kill"/>
@ -158,10 +164,10 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--i</arg><arg>${graphBasePath}</arg> <arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--w</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--la</arg><arg>${isLookUpUrl}</arg> <arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--asi</arg><arg>${actionSetId}</arg> <arg>--actionSetId</arg><arg>${actionSetId}</arg>
</spark> </spark>
<ok to="UpdateEntity"/> <ok to="UpdateEntity"/>
<error to="Kill"/> <error to="Kill"/>
@ -184,9 +190,9 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--i</arg><arg>${graphBasePath}</arg> <arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--w</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--o</arg><arg>${dedupGraphPath}</arg> <arg>--dedupGraphPath</arg><arg>${dedupGraphPath}</arg>
</spark> </spark>
<ok to="copyRelations"/> <ok to="copyRelations"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -0,0 +1,18 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -0,0 +1,108 @@
<workflow-app name="Create dedup blocks" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>graphBasePath</name>
<description>the raw graph base path</description>
</property>
<property>
<name>isLookUpUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>actionSetId</name>
<description>id of the actionSet</description>
</property>
<property>
<name>numPartitions</name>
<description>number of partitions for the similarity relations intermediate phases</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="CreateBlockStats"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="CreateBlockStats">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Create deduplication blocks</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkBlockStats</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--workingPath</arg><arg>${workingDir}</arg>
<arg>--numPartitions</arg><arg>${numPartitions}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -47,12 +47,13 @@ public class EntityMergerTest implements Serializable {
@Test @Test
public void softwareMergerTest() throws InstantiationException, IllegalAccessException { public void softwareMergerTest() throws InstantiationException, IllegalAccessException {
List<Tuple2<String, Software>> softwares = readSample(testEntityBasePath + "/software_merge.json", Software.class); List<Tuple2<String, Software>> softwares = readSample(
testEntityBasePath + "/software_merge.json", Software.class);
Software merged = DedupRecordFactory Software merged = DedupRecordFactory
.entityMerger(dedupId, softwares.iterator(), 0, dataInfo, Software.class); .entityMerger(dedupId, softwares.iterator(), 0, dataInfo, Software.class);
System.out.println(merged.getBestaccessright().getClassid()); assertEquals(merged.getBestaccessright().getClassid(), "OPEN SOURCE");
} }
@Test @Test

View File

@ -3,6 +3,8 @@ package eu.dnetlib.dhp.oa.dedup;
import static java.nio.file.Files.createTempDirectory; import static java.nio.file.Files.createTempDirectory;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.count;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.lenient;
@ -11,6 +13,9 @@ import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -18,6 +23,7 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
@ -71,11 +77,13 @@ public class SparkDedupTest implements Serializable {
FileUtils.deleteDirectory(new File(testOutputBasePath)); FileUtils.deleteDirectory(new File(testOutputBasePath));
FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
final SparkConf conf = new SparkConf();
conf.set("spark.sql.shuffle.partitions", "200");
spark = SparkSession spark = SparkSession
.builder() .builder()
.appName(SparkDedupTest.class.getSimpleName()) .appName(SparkDedupTest.class.getSimpleName())
.master("local[*]") .master("local[*]")
.config(new SparkConf()) .config(conf)
.getOrCreate(); .getOrCreate();
jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -152,37 +160,42 @@ public class SparkDedupTest implements Serializable {
parser parser
.parseArgument( .parseArgument(
new String[] { new String[] {
"-i", "-i", testGraphBasePath,
testGraphBasePath, "-asi", testActionSetId,
"-asi", "-la", "lookupurl",
testActionSetId, "-w", testOutputBasePath,
"-la", "-np", "50"
"lookupurl",
"-w",
testOutputBasePath
}); });
new SparkCreateSimRels(parser, spark).run(isLookUpService); new SparkCreateSimRels(parser, spark).run(isLookUpService);
long orgs_simrel = spark long orgs_simrel = spark
.read() .read()
.load(testOutputBasePath + "/" + testActionSetId + "/organization_simrel") .textFile(testOutputBasePath + "/" + testActionSetId + "/organization_simrel")
.count(); .count();
long pubs_simrel = spark long pubs_simrel = spark
.read() .read()
.load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel") .textFile(testOutputBasePath + "/" + testActionSetId + "/publication_simrel")
.count(); .count();
long sw_simrel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/software_simrel").count();
long ds_simrel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel").count(); long sw_simrel = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/software_simrel")
.count();
long ds_simrel = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel")
.count();
long orp_simrel = spark long orp_simrel = spark
.read() .read()
.load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel") .textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel")
.count(); .count();
assertEquals(3432, orgs_simrel); assertEquals(3432, orgs_simrel);
assertEquals(7054, pubs_simrel); assertEquals(7152, pubs_simrel);
assertEquals(344, sw_simrel); assertEquals(344, sw_simrel);
assertEquals(458, ds_simrel); assertEquals(458, ds_simrel);
assertEquals(6750, orp_simrel); assertEquals(6750, orp_simrel);
@ -190,6 +203,101 @@ public class SparkDedupTest implements Serializable {
@Test @Test
@Order(2) @Order(2)
public void cutMergeRelsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCreateMergeRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")));
parser
.parseArgument(
new String[] {
"-i",
testGraphBasePath,
"-asi",
testActionSetId,
"-la",
"lookupurl",
"-w",
testOutputBasePath,
"-cc",
"3"
});
new SparkCreateMergeRels(parser, spark).run(isLookUpService);
long orgs_mergerel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel")
.as(Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) r -> r.getRelClass().equalsIgnoreCase("merges"))
.groupBy("source")
.agg(count("target").alias("cnt"))
.select("source", "cnt")
.where("cnt > 3")
.count();
long pubs_mergerel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel")
.as(Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) r -> r.getRelClass().equalsIgnoreCase("merges"))
.groupBy("source")
.agg(count("target").alias("cnt"))
.select("source", "cnt")
.where("cnt > 3")
.count();
long sw_mergerel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel")
.as(Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) r -> r.getRelClass().equalsIgnoreCase("merges"))
.groupBy("source")
.agg(count("target").alias("cnt"))
.select("source", "cnt")
.where("cnt > 3")
.count();
long ds_mergerel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel")
.as(Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) r -> r.getRelClass().equalsIgnoreCase("merges"))
.groupBy("source")
.agg(count("target").alias("cnt"))
.select("source", "cnt")
.where("cnt > 3")
.count();
long orp_mergerel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel")
.as(Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) r -> r.getRelClass().equalsIgnoreCase("merges"))
.groupBy("source")
.agg(count("target").alias("cnt"))
.select("source", "cnt")
.where("cnt > 3")
.count();
assertEquals(0, orgs_mergerel);
assertEquals(0, pubs_mergerel);
assertEquals(0, sw_mergerel);
assertEquals(0, ds_mergerel);
assertEquals(0, orp_mergerel);
FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel"));
FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel"));
FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/software_mergerel"));
FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel"));
FileUtils
.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel"));
}
@Test
@Order(3)
public void createMergeRelsTest() throws Exception { public void createMergeRelsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser( ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -225,8 +333,10 @@ public class SparkDedupTest implements Serializable {
.read() .read()
.load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel") .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel")
.count(); .count();
long ds_mergerel = spark
long ds_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel").count(); .read()
.load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel")
.count();
long orp_mergerel = spark long orp_mergerel = spark
.read() .read()
@ -234,14 +344,14 @@ public class SparkDedupTest implements Serializable {
.count(); .count();
assertEquals(1276, orgs_mergerel); assertEquals(1276, orgs_mergerel);
assertEquals(1440, pubs_mergerel); assertEquals(1442, pubs_mergerel);
assertEquals(288, sw_mergerel); assertEquals(288, sw_mergerel);
assertEquals(472, ds_mergerel); assertEquals(472, ds_mergerel);
assertEquals(718, orp_mergerel); assertEquals(718, orp_mergerel);
} }
@Test @Test
@Order(3) @Order(4)
public void createDedupRecordTest() throws Exception { public void createDedupRecordTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser( ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -288,7 +398,7 @@ public class SparkDedupTest implements Serializable {
} }
@Test @Test
@Order(4) @Order(5)
public void updateEntityTest() throws Exception { public void updateEntityTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser( ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -404,7 +514,7 @@ public class SparkDedupTest implements Serializable {
} }
@Test @Test
@Order(5) @Order(6)
public void propagateRelationTest() throws Exception { public void propagateRelationTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser( ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -423,7 +533,7 @@ public class SparkDedupTest implements Serializable {
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
assertEquals(4971, relations); assertEquals(4866, relations);
// check deletedbyinference // check deletedbyinference
final Dataset<Relation> mergeRels = spark final Dataset<Relation> mergeRels = spark
@ -454,7 +564,7 @@ public class SparkDedupTest implements Serializable {
} }
@Test @Test
@Order(6) @Order(7)
public void testRelations() throws Exception { public void testRelations() throws Exception {
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_1.json", 12, 10); testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_1.json", 12, 10);
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2); testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2);

View File

@ -0,0 +1,177 @@
package eu.dnetlib.dhp.oa.dedup;
import static java.nio.file.Files.createTempDirectory;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.lenient;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ExtendWith(MockitoExtension.class)
public class SparkStatsTest implements Serializable {
@Mock(serializable = true)
ISLookUpService isLookUpService;
private static SparkSession spark;
private static JavaSparkContext jsc;
private static String testGraphBasePath;
private static String testOutputBasePath;
private static final String testActionSetId = "test-orchestrator";
@BeforeAll
public static void cleanUp() throws IOException, URISyntaxException {
testGraphBasePath = Paths
.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/entities").toURI())
.toFile()
.getAbsolutePath();
testOutputBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-")
.toAbsolutePath()
.toString();
FileUtils.deleteDirectory(new File(testOutputBasePath));
final SparkConf conf = new SparkConf();
conf.set("spark.sql.shuffle.partitions", "200");
spark = SparkSession
.builder()
.appName(SparkDedupTest.class.getSimpleName())
.master("local[*]")
.config(conf)
.getOrCreate();
jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
}
@BeforeEach
public void setUp() throws IOException, ISLookUpException {
lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains(testActionSetId)))
.thenReturn(
IOUtils
.toString(
SparkDedupTest.class
.getResourceAsStream(
"/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator.xml")));
lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("organization")))
.thenReturn(
IOUtils
.toString(
SparkDedupTest.class
.getResourceAsStream(
"/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("publication")))
.thenReturn(
IOUtils
.toString(
SparkDedupTest.class
.getResourceAsStream(
"/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json")));
lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("software")))
.thenReturn(
IOUtils
.toString(
SparkDedupTest.class
.getResourceAsStream(
"/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json")));
lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("dataset")))
.thenReturn(
IOUtils
.toString(
SparkDedupTest.class
.getResourceAsStream(
"/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json")));
lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("otherresearchproduct")))
.thenReturn(
IOUtils
.toString(
SparkDedupTest.class
.getResourceAsStream(
"/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json")));
}
@Test
public void createBlockStatsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCreateSimRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/createBlockStats_parameters.json")));
parser
.parseArgument(
new String[] {
"-i", testGraphBasePath,
"-asi", testActionSetId,
"-la", "lookupurl",
"-w", testOutputBasePath
});
new SparkBlockStats(parser, spark).run(isLookUpService);
long orgs_blocks = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/organization_blockstats")
.count();
long pubs_blocks = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/publication_blockstats")
.count();
long sw_blocks = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/software_blockstats")
.count();
long ds_blocks = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_blockstats")
.count();
long orp_blocks = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_blockstats")
.count();
assertEquals(121, orgs_blocks);
assertEquals(110, pubs_blocks);
assertEquals(21, sw_blocks);
assertEquals(67, ds_blocks);
assertEquals(55, orp_blocks);
}
}

View File

@ -6,10 +6,10 @@
"subEntityType" : "resulttype", "subEntityType" : "resulttype",
"subEntityValue" : "dataset", "subEntityValue" : "dataset",
"orderField" : "title", "orderField" : "title",
"queueMaxSize" : "800", "queueMaxSize" : "100",
"groupMaxSize" : "100", "groupMaxSize" : "100",
"maxChildren" : "100", "maxChildren" : "100",
"slidingWindowSize" : "80", "slidingWindowSize" : "100",
"rootBuilder" : ["result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ], "rootBuilder" : ["result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
"includeChildren" : "true", "includeChildren" : "true",
"idPath" : "$.id", "idPath" : "$.id",
@ -17,7 +17,8 @@
}, },
"pace" : { "pace" : {
"clustering" : [ "clustering" : [
{ "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } }, { "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } } { "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
], ],
"decisionTree" : { "decisionTree" : {

View File

@ -6,10 +6,10 @@
"subEntityType" : "resulttype", "subEntityType" : "resulttype",
"subEntityValue" : "otherresearchproduct", "subEntityValue" : "otherresearchproduct",
"orderField" : "title", "orderField" : "title",
"queueMaxSize" : "800", "queueMaxSize" : "100",
"groupMaxSize" : "100", "groupMaxSize" : "100",
"maxChildren" : "100", "maxChildren" : "100",
"slidingWindowSize" : "80", "slidingWindowSize" : "100",
"rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ], "rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
"includeChildren" : "true", "includeChildren" : "true",
"idPath" : "$.id", "idPath" : "$.id",
@ -17,7 +17,8 @@
}, },
"pace" : { "pace" : {
"clustering" : [ "clustering" : [
{ "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } }, { "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } } { "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
], ],
"decisionTree" : { "decisionTree" : {

View File

@ -6,10 +6,10 @@
"subEntityType": "resulttype", "subEntityType": "resulttype",
"subEntityValue": "publication", "subEntityValue": "publication",
"orderField": "title", "orderField": "title",
"queueMaxSize": "800", "queueMaxSize": "100",
"groupMaxSize": "100", "groupMaxSize": "100",
"maxChildren": "100", "maxChildren": "100",
"slidingWindowSize": "80", "slidingWindowSize": "100",
"rootBuilder": [ "rootBuilder": [
"result", "result",
"resultProject_outcome_isProducedBy", "resultProject_outcome_isProducedBy",
@ -29,7 +29,8 @@
}, },
"pace": { "pace": {
"clustering" : [ "clustering" : [
{ "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } }, { "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } } { "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
], ],
"decisionTree": { "decisionTree": {

View File

@ -6,10 +6,10 @@
"subEntityType" : "resulttype", "subEntityType" : "resulttype",
"subEntityValue" : "software", "subEntityValue" : "software",
"orderField" : "title", "orderField" : "title",
"queueMaxSize" : "800", "queueMaxSize" : "100",
"groupMaxSize" : "100", "groupMaxSize" : "100",
"maxChildren" : "100", "maxChildren" : "100",
"slidingWindowSize" : "80", "slidingWindowSize" : "100",
"rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ], "rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
"includeChildren" : "true", "includeChildren" : "true",
"idPath" : "$.id", "idPath" : "$.id",
@ -17,8 +17,9 @@
}, },
"pace" : { "pace" : {
"clustering" : [ "clustering" : [
{ "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } }, { "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
{ "name" : "lowercase", "fields" : [ "doi", "url" ], "params" : { } } { "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
], ],
"decisionTree": { "decisionTree": {
"start": { "start": {

View File

@ -8,7 +8,6 @@ import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -24,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper;
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils; import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
@ -151,7 +151,8 @@ public class CleanGraphSparkJob {
if (Objects.isNull(r.getBestaccessright()) || StringUtils.isBlank(r.getBestaccessright().getClassid())) { if (Objects.isNull(r.getBestaccessright()) || StringUtils.isBlank(r.getBestaccessright().getClassid())) {
Qualifier bestaccessrights = AbstractMdRecordToOafMapper.createBestAccessRights(r.getInstance()); Qualifier bestaccessrights = AbstractMdRecordToOafMapper.createBestAccessRights(r.getInstance());
if (Objects.isNull(bestaccessrights)) { if (Objects.isNull(bestaccessrights)) {
r.setBestaccessright( r
.setBestaccessright(
qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES)); qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES));
} else { } else {
r.setBestaccessright(bestaccessrights); r.setBestaccessright(bestaccessrights);

View File

@ -16,6 +16,11 @@
<name>postgresPassword</name> <name>postgresPassword</name>
<description>the password postgres</description> <description>the password postgres</description>
</property> </property>
<property>
<name>dbSchema</name>
<value>beta</value>
<description>the database schema according to the D-Net infrastructure (beta or production)</description>
</property>
<property> <property>
<name>isLookupUrl</name> <name>isLookupUrl</name>
<description>the address of the lookUp service</description> <description>the address of the lookUp service</description>
@ -93,6 +98,7 @@
<arg>--postgresUser</arg><arg>${postgresUser}</arg> <arg>--postgresUser</arg><arg>${postgresUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg> <arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--dbschema</arg><arg>${dbSchema}</arg>
</java> </java>
<ok to="ImportDB_claims"/> <ok to="ImportDB_claims"/>
<error to="Kill"/> <error to="Kill"/>
@ -109,6 +115,7 @@
<arg>--postgresUser</arg><arg>${postgresUser}</arg> <arg>--postgresUser</arg><arg>${postgresUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg> <arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--dbschema</arg><arg>${dbSchema}</arg>
<arg>--action</arg><arg>claims</arg> <arg>--action</arg><arg>claims</arg>
</java> </java>
<ok to="End"/> <ok to="End"/>

View File

@ -9,6 +9,7 @@ import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
@ -118,10 +119,7 @@ public class CreateRelatedEntitiesJob_phase1 {
Dataset<Tuple2<String, RelatedEntity>> entities = readPathEntity(spark, inputEntityPath, clazz) Dataset<Tuple2<String, RelatedEntity>> entities = readPathEntity(spark, inputEntityPath, clazz)
.filter("dataInfo.invisible == false") .filter("dataInfo.invisible == false")
.map( .map(
(MapFunction<E, RelatedEntity>) value -> asRelatedEntity(value, clazz), (MapFunction<E, Tuple2<String, RelatedEntity>>) e -> new Tuple2<>(e.getId(), asRelatedEntity(e, clazz)),
Encoders.kryo(RelatedEntity.class))
.map(
(MapFunction<RelatedEntity, Tuple2<String, RelatedEntity>>) e -> new Tuple2<>(e.getId(), e),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class))) Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class)))
.cache(); .cache();
@ -165,13 +163,24 @@ public class CreateRelatedEntitiesJob_phase1 {
Result result = (Result) entity; Result result = (Result) entity;
if (result.getTitle() != null && !result.getTitle().isEmpty()) { if (result.getTitle() != null && !result.getTitle().isEmpty()) {
re.setTitle(result.getTitle().stream().findFirst().get()); final StructuredProperty title = result.getTitle().stream().findFirst().get();
title.setValue(StringUtils.left(title.getValue(), ProvisionConstants.MAX_TITLE_LENGTH));
re.setTitle(title);
} }
re.setDateofacceptance(getValue(result.getDateofacceptance())); re.setDateofacceptance(getValue(result.getDateofacceptance()));
re.setPublisher(getValue(result.getPublisher())); re.setPublisher(getValue(result.getPublisher()));
re.setResulttype(result.getResulttype()); re.setResulttype(result.getResulttype());
re.setInstances(result.getInstance()); if (Objects.nonNull(result.getInstance())) {
re
.setInstances(
result
.getInstance()
.stream()
.filter(Objects::nonNull)
.limit(ProvisionConstants.MAX_INSTANCES)
.collect(Collectors.toList()));
}
// TODO still to be mapped // TODO still to be mapped
// re.setCodeRepositoryUrl(j.read("$.coderepositoryurl")); // re.setCodeRepositoryUrl(j.read("$.coderepositoryurl"));

View File

@ -61,12 +61,6 @@ public class CreateRelatedEntitiesJob_phase2 {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final int MAX_EXTERNAL_ENTITIES = 50;
private static final int MAX_AUTHORS = 200;
private static final int MAX_AUTHOR_FULLNAME_LENGTH = 1000;
private static final int MAX_TITLE_LENGTH = 5000;
private static final int MAX_ABSTRACT_LENGTH = 100000;
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils String jsonConfiguration = IOUtils
@ -246,15 +240,15 @@ public class CreateRelatedEntitiesJob_phase2 {
List<ExternalReference> refs = r List<ExternalReference> refs = r
.getExternalReference() .getExternalReference()
.stream() .stream()
.limit(MAX_EXTERNAL_ENTITIES) .limit(ProvisionConstants.MAX_EXTERNAL_ENTITIES)
.collect(Collectors.toList()); .collect(Collectors.toList());
r.setExternalReference(refs); r.setExternalReference(refs);
} }
if (r.getAuthor() != null) { if (r.getAuthor() != null) {
List<Author> authors = Lists.newArrayList(); List<Author> authors = Lists.newArrayList();
for (Author a : r.getAuthor()) { for (Author a : r.getAuthor()) {
a.setFullname(StringUtils.left(a.getFullname(), MAX_AUTHOR_FULLNAME_LENGTH)); a.setFullname(StringUtils.left(a.getFullname(), ProvisionConstants.MAX_AUTHOR_FULLNAME_LENGTH));
if (authors.size() < MAX_AUTHORS || hasORCID(a)) { if (authors.size() < ProvisionConstants.MAX_AUTHORS || hasORCID(a)) {
authors.add(a); authors.add(a);
} }
} }
@ -266,7 +260,7 @@ public class CreateRelatedEntitiesJob_phase2 {
.stream() .stream()
.filter(Objects::nonNull) .filter(Objects::nonNull)
.map(d -> { .map(d -> {
d.setValue(StringUtils.left(d.getValue(), MAX_ABSTRACT_LENGTH)); d.setValue(StringUtils.left(d.getValue(), ProvisionConstants.MAX_ABSTRACT_LENGTH));
return d; return d;
}) })
.collect(Collectors.toList()); .collect(Collectors.toList());
@ -278,9 +272,10 @@ public class CreateRelatedEntitiesJob_phase2 {
.stream() .stream()
.filter(Objects::nonNull) .filter(Objects::nonNull)
.map(t -> { .map(t -> {
t.setValue(StringUtils.left(t.getValue(), MAX_TITLE_LENGTH)); t.setValue(StringUtils.left(t.getValue(), ProvisionConstants.MAX_TITLE_LENGTH));
return t; return t;
}) })
.limit(ProvisionConstants.MAX_TITLES)
.collect(Collectors.toList()); .collect(Collectors.toList());
r.setTitle(titles); r.setTitle(titles);
} }

View File

@ -100,11 +100,17 @@ public class PrepareRelationsJob {
.orElse(new HashSet<>()); .orElse(new HashSet<>());
log.info("relationFilter: {}", relationFilter); log.info("relationFilter: {}", relationFilter);
int maxRelations = Optional int sourceMaxRelations = Optional
.ofNullable(parser.get("maxRelations")) .ofNullable(parser.get("sourceMaxRelations"))
.map(Integer::valueOf) .map(Integer::valueOf)
.orElse(MAX_RELS); .orElse(MAX_RELS);
log.info("maxRelations: {}", maxRelations); log.info("sourceMaxRelations: {}", sourceMaxRelations);
int targetMaxRelations = Optional
.ofNullable(parser.get("targetMaxRelations"))
.map(Integer::valueOf)
.orElse(MAX_RELS);
log.info("targetMaxRelations: {}", targetMaxRelations);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
@ -116,7 +122,8 @@ public class PrepareRelationsJob {
spark -> { spark -> {
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
prepareRelationsRDD( prepareRelationsRDD(
spark, inputRelationsPath, outputPath, relationFilter, maxRelations, relPartitions); spark, inputRelationsPath, outputPath, relationFilter, sourceMaxRelations, targetMaxRelations,
relPartitions);
}); });
} }
@ -129,31 +136,40 @@ public class PrepareRelationsJob {
* @param inputRelationsPath source path for the graph relations * @param inputRelationsPath source path for the graph relations
* @param outputPath output path for the processed relations * @param outputPath output path for the processed relations
* @param relationFilter set of relation filters applied to the `relClass` field * @param relationFilter set of relation filters applied to the `relClass` field
* @param maxRelations maximum number of allowed outgoing edges * @param sourceMaxRelations maximum number of allowed outgoing edges grouping by relation.source
* @param targetMaxRelations maximum number of allowed outgoing edges grouping by relation.target
* @param relPartitions number of partitions for the output RDD * @param relPartitions number of partitions for the output RDD
*/ */
private static void prepareRelationsRDD(SparkSession spark, String inputRelationsPath, String outputPath, private static void prepareRelationsRDD(SparkSession spark, String inputRelationsPath, String outputPath,
Set<String> relationFilter, int maxRelations, int relPartitions) { Set<String> relationFilter, int sourceMaxRelations, int targetMaxRelations, int relPartitions) {
// group by SOURCE and apply limit JavaRDD<Relation> rels = readPathRelationRDD(spark, inputRelationsPath)
RDD<Relation> bySource = readPathRelationRDD(spark, inputRelationsPath)
.filter(rel -> rel.getDataInfo().getDeletedbyinference() == false) .filter(rel -> rel.getDataInfo().getDeletedbyinference() == false)
.filter(rel -> relationFilter.contains(rel.getRelClass()) == false) .filter(rel -> relationFilter.contains(rel.getRelClass()) == false);
.mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, r.getSource()), r))
JavaRDD<Relation> pruned = pruneRels(
pruneRels(
rels,
sourceMaxRelations, relPartitions, (Function<Relation, String>) r -> r.getSource()),
targetMaxRelations, relPartitions, (Function<Relation, String>) r -> r.getTarget());
spark
.createDataset(pruned.rdd(), Encoders.bean(Relation.class))
.repartition(relPartitions)
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
}
private static JavaRDD<Relation> pruneRels(JavaRDD<Relation> rels, int maxRelations,
int relPartitions, Function<Relation, String> idFn) {
return rels
.mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, idFn.call(r)), r))
.repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions)) .repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions))
.groupBy(Tuple2::_1) .groupBy(Tuple2::_1)
.map(Tuple2::_2) .map(Tuple2::_2)
.map(t -> Iterables.limit(t, maxRelations)) .map(t -> Iterables.limit(t, maxRelations))
.flatMap(Iterable::iterator) .flatMap(Iterable::iterator)
.map(Tuple2::_2) .map(Tuple2::_2);
.rdd();
spark
.createDataset(bySource, Encoders.bean(Relation.class))
.repartition(relPartitions)
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
} }
// experimental // experimental

View File

@ -0,0 +1,14 @@
package eu.dnetlib.dhp.oa.provision;
public class ProvisionConstants {
public static final int MAX_EXTERNAL_ENTITIES = 50;
public static final int MAX_AUTHORS = 200;
public static final int MAX_AUTHOR_FULLNAME_LENGTH = 1000;
public static final int MAX_TITLE_LENGTH = 5000;
public static final int MAX_TITLES = 10;
public static final int MAX_ABSTRACT_LENGTH = 100000;
public static final int MAX_INSTANCES = 10;
}

View File

@ -16,18 +16,18 @@ public class SortableRelationKey implements Comparable<SortableRelationKey>, Ser
private static final Map<String, Integer> weights = Maps.newHashMap(); private static final Map<String, Integer> weights = Maps.newHashMap();
static { static {
weights.put("outcome", 0); weights.put("participation", 0);
weights.put("supplement", 1);
weights.put("review", 2);
weights.put("citation", 3);
weights.put("affiliation", 4);
weights.put("relationship", 5);
weights.put("publicationDataset", 6);
weights.put("similarity", 7);
weights.put("provision", 8); weights.put("outcome", 1);
weights.put("participation", 9); weights.put("affiliation", 2);
weights.put("dedup", 10); weights.put("dedup", 3);
weights.put("publicationDataset", 4);
weights.put("citation", 5);
weights.put("supplement", 6);
weights.put("review", 7);
weights.put("relationship", 8);
weights.put("provision", 9);
weights.put("similarity", 10);
} }
private static final long serialVersionUID = 3232323; private static final long serialVersionUID = 3232323;

View File

@ -329,7 +329,7 @@ public class XmlRecordFactory implements Serializable {
.stream() .stream()
.filter(Objects::nonNull) .filter(Objects::nonNull)
.map(c -> XmlSerializationUtils.asXmlElement("description", c.getValue())) .map(c -> XmlSerializationUtils.asXmlElement("description", c.getValue()))
.collect(Collectors.toList())); .collect(Collectors.toCollection(HashSet::new)));
} }
if (r.getEmbargoenddate() != null) { if (r.getEmbargoenddate() != null) {
metadata metadata
@ -370,7 +370,7 @@ public class XmlRecordFactory implements Serializable {
.stream() .stream()
.filter(Objects::nonNull) .filter(Objects::nonNull)
.map(c -> XmlSerializationUtils.asXmlElement("source", c.getValue())) .map(c -> XmlSerializationUtils.asXmlElement("source", c.getValue()))
.collect(Collectors.toList())); .collect(Collectors.toCollection(HashSet::new)));
} }
if (r.getFormat() != null) { if (r.getFormat() != null) {
metadata metadata

View File

@ -30,9 +30,16 @@
"paramRequired": false "paramRequired": false
}, },
{ {
"paramName": "mr", "paramName": "smr",
"paramLongName": "maxRelations", "paramLongName": "sourceMaxRelations",
"paramDescription": "maximum number of relations allowed for a each entity", "paramDescription": "maximum number of relations allowed for a each entity grouping by source",
"paramRequired": false
},
{
"paramName": "tmr",
"paramLongName": "targetMaxRelations",
"paramDescription": "maximum number of relations allowed for a each entity grouping by target",
"paramRequired": false "paramRequired": false
} }
] ]

View File

@ -18,8 +18,12 @@
<description>filter applied reading relations (by relClass)</description> <description>filter applied reading relations (by relClass)</description>
</property> </property>
<property> <property>
<name>maxRelations</name> <name>sourceMaxRelations</name>
<description>maximum number of relations allowed for a each entity</description> <description>maximum number of relations allowed for a each entity grouping by source</description>
</property>
<property>
<name>targetMaxRelations</name>
<description>maximum number of relations allowed for a each entity grouping by target</description>
</property> </property>
<property> <property>
<name>otherDsTypeId</name> <name>otherDsTypeId</name>
@ -133,7 +137,8 @@
</spark-opts> </spark-opts>
<arg>--inputRelationsPath</arg><arg>${inputGraphRootPath}/relation</arg> <arg>--inputRelationsPath</arg><arg>${inputGraphRootPath}/relation</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation</arg> <arg>--outputPath</arg><arg>${workingDir}/relation</arg>
<arg>--maxRelations</arg><arg>${maxRelations}</arg> <arg>--sourceMaxRelations</arg><arg>${sourceMaxRelations}</arg>
<arg>--targetMaxRelations</arg><arg>${targetMaxRelations}</arg>
<arg>--relationFilter</arg><arg>${relationFilter}</arg> <arg>--relationFilter</arg><arg>${relationFilter}</arg>
<arg>--relPartitions</arg><arg>5000</arg> <arg>--relPartitions</arg><arg>5000</arg>
</spark> </spark>
@ -166,7 +171,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=15000
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg> <arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
@ -193,7 +198,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=15000
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg> <arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
@ -220,7 +225,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=10000
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg> <arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
@ -247,7 +252,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg> <arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
@ -274,7 +279,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg> <arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
@ -301,7 +306,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg> <arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
@ -328,7 +333,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg> <arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
@ -367,7 +372,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15360 --conf spark.sql.shuffle.partitions=15000
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/publication</arg> <arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/publication</arg>
@ -395,7 +400,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=10000
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/dataset</arg> <arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/dataset</arg>
@ -423,7 +428,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=10000
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/otherresearchproduct</arg> <arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/otherresearchproduct</arg>
@ -451,7 +456,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/software</arg> <arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/software</arg>
@ -479,7 +484,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=8000
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/datasource</arg> <arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/datasource</arg>
@ -507,7 +512,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=10000
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/organization</arg> <arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/organization</arg>
@ -535,7 +540,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/project</arg> <arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/project</arg>
@ -573,10 +578,18 @@
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--otherDsTypeId</arg><arg>${otherDsTypeId}</arg> <arg>--otherDsTypeId</arg><arg>${otherDsTypeId}</arg>
</spark> </spark>
<ok to="to_solr_index"/> <ok to="should_index"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<decision name="should_index">
<switch>
<case to="to_solr_index">${wf:conf('shouldIndex') eq 'true'}</case>
<case to="End">${wf:conf('shouldIndex') eq 'false'}</case>
<default to="to_solr_index"/>
</switch>
</decision>
<action name="to_solr_index"> <action name="to_solr_index">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
@ -607,5 +620,4 @@
</action> </action>
<end name="End"/> <end name="End"/>
</workflow-app> </workflow-app>

View File

@ -323,6 +323,12 @@
<version>[2.0.0,3.0.0)</version> <version>[2.0.0,3.0.0)</version>
</dependency> </dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dnet-openaire-broker-common</artifactId>
<version>${dnet.openaire.broker.common}</version>
</dependency>
<dependency> <dependency>
<groupId>org.apache.cxf</groupId> <groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-transports-http</artifactId> <artifactId>cxf-rt-transports-http</artifactId>
@ -618,5 +624,6 @@
<mockito-core.version>3.3.3</mockito-core.version> <mockito-core.version>3.3.3</mockito-core.version>
<mongodb.driver.version>3.4.2</mongodb.driver.version> <mongodb.driver.version>3.4.2</mongodb.driver.version>
<vtd.version>[2.12,3.0)</vtd.version> <vtd.version>[2.12,3.0)</vtd.version>
<dnet.openaire.broker.common>3.1.0</dnet.openaire.broker.common>
</properties> </properties>
</project> </project>