forked from D-Net/dnet-hadoop
merge upstream
This commit is contained in:
commit
44e1c40c42
|
@ -59,7 +59,6 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>eu.dnetlib.dhp</groupId>
|
<groupId>eu.dnetlib.dhp</groupId>
|
||||||
<artifactId>dnet-openaire-broker-common</artifactId>
|
<artifactId>dnet-openaire-broker-common</artifactId>
|
||||||
<version>[3.0.0,)</version>
|
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
|
@ -11,6 +11,8 @@ import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.lang3.time.DateUtils;
|
import org.apache.commons.lang3.time.DateUtils;
|
||||||
|
|
||||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||||
|
import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
|
||||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||||
|
|
||||||
public class EventFactory {
|
public class EventFactory {
|
||||||
|
@ -52,9 +54,11 @@ public class EventFactory {
|
||||||
final OaBrokerMainEntity source = updateInfo.getSource();
|
final OaBrokerMainEntity source = updateInfo.getSource();
|
||||||
final OaBrokerMainEntity target = updateInfo.getTarget();
|
final OaBrokerMainEntity target = updateInfo.getTarget();
|
||||||
|
|
||||||
map.setTargetDatasourceId(target.getCollectedFromId());
|
final OaBrokerRelatedDatasource targetDs = updateInfo.getTargetDs();
|
||||||
map.setTargetDatasourceName(target.getCollectedFromName());
|
|
||||||
map.setTargetDatasourceType(target.getCollectedFromType());
|
map.setTargetDatasourceId(targetDs.getOpenaireId());
|
||||||
|
map.setTargetDatasourceName(targetDs.getName());
|
||||||
|
map.setTargetDatasourceType(targetDs.getType());
|
||||||
|
|
||||||
map.setTargetResultId(target.getOpenaireId());
|
map.setTargetResultId(target.getOpenaireId());
|
||||||
|
|
||||||
|
@ -73,11 +77,19 @@ public class EventFactory {
|
||||||
|
|
||||||
// PROVENANCE INFO
|
// PROVENANCE INFO
|
||||||
map.setTrust(updateInfo.getTrust());
|
map.setTrust(updateInfo.getTrust());
|
||||||
map.setProvenanceDatasourceId(source.getCollectedFromId());
|
|
||||||
map.setProvenanceDatasourceName(source.getCollectedFromName());
|
|
||||||
map.setProvenanceDatasourceType(source.getCollectedFromType());
|
|
||||||
map.setProvenanceResultId(source.getOpenaireId());
|
map.setProvenanceResultId(source.getOpenaireId());
|
||||||
|
|
||||||
|
source
|
||||||
|
.getDatasources()
|
||||||
|
.stream()
|
||||||
|
.filter(ds -> ds.getRelType().equals(BrokerConstants.COLLECTED_FROM_REL))
|
||||||
|
.findFirst()
|
||||||
|
.ifPresent(ds -> {
|
||||||
|
map.setProvenanceDatasourceId(ds.getOpenaireId());
|
||||||
|
map.setProvenanceDatasourceName(ds.getName());
|
||||||
|
map.setProvenanceDatasourceType(ds.getType());
|
||||||
|
});
|
||||||
|
|
||||||
return map;
|
return map;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,8 +17,8 @@ import org.slf4j.LoggerFactory;
|
||||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
|
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
|
||||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.AddDatasourceTypeAggregator;
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDatasource;
|
||||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.SimpleDatasourceInfo;
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDatasourceAggregator;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
public class JoinStep0Job {
|
public class JoinStep0Job {
|
||||||
|
@ -45,33 +45,33 @@ public class JoinStep0Job {
|
||||||
final String workingPath = parser.get("workingPath");
|
final String workingPath = parser.get("workingPath");
|
||||||
log.info("workingPath: {}", workingPath);
|
log.info("workingPath: {}", workingPath);
|
||||||
|
|
||||||
final String outputPath = workingPath + "/joinedEntities_step0";
|
final String joinedEntitiesPath = workingPath + "/joinedEntities_step0";
|
||||||
log.info("outputPath: {}", outputPath);
|
log.info("joinedEntitiesPath: {}", joinedEntitiesPath);
|
||||||
|
|
||||||
final SparkConf conf = new SparkConf();
|
final SparkConf conf = new SparkConf();
|
||||||
|
|
||||||
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
|
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
|
||||||
|
|
||||||
ClusterUtils.removeDir(spark, outputPath);
|
ClusterUtils.removeDir(spark, joinedEntitiesPath);
|
||||||
|
|
||||||
final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
|
final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
|
||||||
|
|
||||||
final Dataset<OaBrokerMainEntity> sources = ClusterUtils
|
final Dataset<OaBrokerMainEntity> sources = ClusterUtils
|
||||||
.readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class);
|
.readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class);
|
||||||
|
|
||||||
final Dataset<SimpleDatasourceInfo> datasources = ClusterUtils
|
final Dataset<RelatedDatasource> typedRels = ClusterUtils
|
||||||
.readPath(spark, workingPath + "/datasources", SimpleDatasourceInfo.class);
|
.readPath(spark, workingPath + "/relatedDatasources", RelatedDatasource.class);
|
||||||
|
|
||||||
final TypedColumn<Tuple2<OaBrokerMainEntity, SimpleDatasourceInfo>, OaBrokerMainEntity> aggr = new AddDatasourceTypeAggregator()
|
final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedDatasource>, OaBrokerMainEntity> aggr = new RelatedDatasourceAggregator()
|
||||||
.toColumn();
|
.toColumn();
|
||||||
|
|
||||||
final Dataset<OaBrokerMainEntity> dataset = sources
|
final Dataset<OaBrokerMainEntity> dataset = sources
|
||||||
.joinWith(datasources, sources.col("collectedFromId").equalTo(datasources.col("id")), "inner")
|
.joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
|
||||||
.groupByKey(t -> t._1.getOpenaireId(), Encoders.STRING())
|
.groupByKey(t -> t._1.getOpenaireId(), Encoders.STRING())
|
||||||
.agg(aggr)
|
.agg(aggr)
|
||||||
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));
|
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));
|
||||||
|
|
||||||
ClusterUtils.save(dataset, outputPath, OaBrokerMainEntity.class, total);
|
ClusterUtils.save(dataset, joinedEntitiesPath, OaBrokerMainEntity.class, total);
|
||||||
|
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
@ -9,14 +9,23 @@ import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
import org.apache.spark.util.LongAccumulator;
|
import org.apache.spark.util.LongAccumulator;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
|
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
|
||||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.SimpleDatasourceInfo;
|
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.DatasourceRelationsAccumulator;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDatasource;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||||
|
import scala.Tuple3;
|
||||||
|
|
||||||
public class PrepareRelatedDatasourcesJob {
|
public class PrepareRelatedDatasourcesJob {
|
||||||
|
|
||||||
|
@ -42,7 +51,7 @@ public class PrepareRelatedDatasourcesJob {
|
||||||
final String workingPath = parser.get("workingPath");
|
final String workingPath = parser.get("workingPath");
|
||||||
log.info("workingPath: {}", workingPath);
|
log.info("workingPath: {}", workingPath);
|
||||||
|
|
||||||
final String relsPath = workingPath + "/datasources";
|
final String relsPath = workingPath + "/relatedDatasources";
|
||||||
log.info("relsPath: {}", relsPath);
|
log.info("relsPath: {}", relsPath);
|
||||||
|
|
||||||
final SparkConf conf = new SparkConf();
|
final SparkConf conf = new SparkConf();
|
||||||
|
@ -53,16 +62,46 @@ public class PrepareRelatedDatasourcesJob {
|
||||||
|
|
||||||
final LongAccumulator total = spark.sparkContext().longAccumulator("total_datasources");
|
final LongAccumulator total = spark.sparkContext().longAccumulator("total_datasources");
|
||||||
|
|
||||||
final Dataset<SimpleDatasourceInfo> dataset = ClusterUtils
|
final Dataset<Tuple3<String, String, String>> rels = prepareResultTuples(
|
||||||
.readPath(spark, graphPath + "/datasource", Datasource.class)
|
spark, graphPath, Publication.class)
|
||||||
.map(
|
.union(prepareResultTuples(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class))
|
||||||
ds -> new SimpleDatasourceInfo(ds.getId(), ds.getDatasourcetype().getClassid()),
|
.union(prepareResultTuples(spark, graphPath, Software.class))
|
||||||
Encoders.bean(SimpleDatasourceInfo.class));
|
.union(prepareResultTuples(spark, graphPath, OtherResearchProduct.class));
|
||||||
|
|
||||||
ClusterUtils.save(dataset, relsPath, SimpleDatasourceInfo.class, total);
|
final Dataset<OaBrokerRelatedDatasource> datasources = ClusterUtils
|
||||||
|
.readPath(spark, graphPath + "/datasource", Datasource.class)
|
||||||
|
.map(ConversionUtils::oafDatasourceToBrokerDatasource, Encoders.bean(OaBrokerRelatedDatasource.class));
|
||||||
|
|
||||||
|
final Dataset<RelatedDatasource> dataset = rels
|
||||||
|
.joinWith(datasources, datasources.col("openaireId").equalTo(rels.col("_2")), "inner")
|
||||||
|
.map(t -> {
|
||||||
|
final RelatedDatasource r = new RelatedDatasource();
|
||||||
|
r.setSource(t._1._1());
|
||||||
|
r.setRelDatasource(t._2);
|
||||||
|
r.getRelDatasource().setRelType(t._1._3());
|
||||||
|
return r;
|
||||||
|
}, Encoders.bean(RelatedDatasource.class));
|
||||||
|
|
||||||
|
ClusterUtils.save(dataset, relsPath, RelatedDatasource.class, total);
|
||||||
|
|
||||||
});
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final Dataset<Tuple3<String, String, String>> prepareResultTuples(final SparkSession spark,
|
||||||
|
final String graphPath,
|
||||||
|
final Class<? extends Result> sourceClass) {
|
||||||
|
|
||||||
|
return ClusterUtils
|
||||||
|
.readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass)
|
||||||
|
.filter(r -> !ClusterUtils.isDedupRoot(r.getId()))
|
||||||
|
.filter(r -> r.getDataInfo().getDeletedbyinference())
|
||||||
|
.map(
|
||||||
|
r -> DatasourceRelationsAccumulator.calculateTuples(r),
|
||||||
|
Encoders.bean(DatasourceRelationsAccumulator.class))
|
||||||
|
.flatMap(
|
||||||
|
acc -> acc.getRels().iterator(),
|
||||||
|
Encoders.tuple(Encoders.STRING(), Encoders.STRING(), Encoders.STRING()));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,6 +15,7 @@ import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.spark.util.LongAccumulator;
|
import org.apache.spark.util.LongAccumulator;
|
||||||
|
|
||||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||||
|
import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
|
||||||
import eu.dnetlib.dhp.broker.model.Topic;
|
import eu.dnetlib.dhp.broker.model.Topic;
|
||||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||||
|
|
||||||
|
@ -34,18 +35,19 @@ public abstract class UpdateMatcher<T> {
|
||||||
this.highlightToStringFunction = highlightToStringFunction;
|
this.highlightToStringFunction = highlightToStringFunction;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Collection<UpdateInfo<T>> searchUpdatesForRecord(final OaBrokerMainEntity res,
|
public Collection<UpdateInfo<T>> searchUpdatesForRecord(final OaBrokerMainEntity target,
|
||||||
|
final OaBrokerRelatedDatasource targetDs,
|
||||||
final Collection<OaBrokerMainEntity> others,
|
final Collection<OaBrokerMainEntity> others,
|
||||||
final Map<String, LongAccumulator> accumulators) {
|
final Map<String, LongAccumulator> accumulators) {
|
||||||
|
|
||||||
final Map<String, UpdateInfo<T>> infoMap = new HashMap<>();
|
final Map<String, UpdateInfo<T>> infoMap = new HashMap<>();
|
||||||
|
|
||||||
for (final OaBrokerMainEntity source : others) {
|
for (final OaBrokerMainEntity source : others) {
|
||||||
if (source != res) {
|
if (source != target) {
|
||||||
for (final T hl : findDifferences(source, res)) {
|
for (final T hl : findDifferences(source, target)) {
|
||||||
final Topic topic = getTopicFunction().apply(hl);
|
final Topic topic = getTopicFunction().apply(hl);
|
||||||
if (topic != null) {
|
if (topic != null) {
|
||||||
final UpdateInfo<T> info = new UpdateInfo<>(topic, hl, source, res,
|
final UpdateInfo<T> info = new UpdateInfo<>(topic, hl, source, target, targetDs,
|
||||||
getCompileHighlightFunction(),
|
getCompileHighlightFunction(),
|
||||||
getHighlightToStringFunction());
|
getHighlightToStringFunction());
|
||||||
|
|
||||||
|
|
|
@ -14,6 +14,10 @@ public class BrokerConstants {
|
||||||
public static final String OPEN_ACCESS = "OPEN";
|
public static final String OPEN_ACCESS = "OPEN";
|
||||||
public static final String IS_MERGED_IN_CLASS = "isMergedIn";
|
public static final String IS_MERGED_IN_CLASS = "isMergedIn";
|
||||||
|
|
||||||
|
public static final String COLLECTED_FROM_REL = "collectedFrom";
|
||||||
|
|
||||||
|
public static final String HOSTED_BY_REL = "hostedBy";
|
||||||
|
|
||||||
public static final float MIN_TRUST = 0.25f;
|
public static final float MIN_TRUST = 0.25f;
|
||||||
public static final float MAX_TRUST = 1.00f;
|
public static final float MAX_TRUST = 1.00f;
|
||||||
|
|
||||||
|
|
|
@ -22,11 +22,13 @@ import eu.dnetlib.broker.objects.OaBrokerJournal;
|
||||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||||
import eu.dnetlib.broker.objects.OaBrokerProject;
|
import eu.dnetlib.broker.objects.OaBrokerProject;
|
||||||
import eu.dnetlib.broker.objects.OaBrokerRelatedDataset;
|
import eu.dnetlib.broker.objects.OaBrokerRelatedDataset;
|
||||||
|
import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
|
||||||
import eu.dnetlib.broker.objects.OaBrokerRelatedPublication;
|
import eu.dnetlib.broker.objects.OaBrokerRelatedPublication;
|
||||||
import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware;
|
import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware;
|
||||||
import eu.dnetlib.broker.objects.OaBrokerTypedValue;
|
import eu.dnetlib.broker.objects.OaBrokerTypedValue;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Author;
|
import eu.dnetlib.dhp.schema.oaf.Author;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
||||||
import eu.dnetlib.dhp.schema.oaf.ExternalReference;
|
import eu.dnetlib.dhp.schema.oaf.ExternalReference;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Field;
|
import eu.dnetlib.dhp.schema.oaf.Field;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Instance;
|
import eu.dnetlib.dhp.schema.oaf.Instance;
|
||||||
|
@ -119,8 +121,6 @@ public class ConversionUtils {
|
||||||
res
|
res
|
||||||
.setJournal(
|
.setJournal(
|
||||||
result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null);
|
result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null);
|
||||||
res.setCollectedFromId(mappedFirst(result.getCollectedfrom(), KeyValue::getKey));
|
|
||||||
res.setCollectedFromName(mappedFirst(result.getCollectedfrom(), KeyValue::getValue));
|
|
||||||
res.setPids(mappedList(result.getPid(), ConversionUtils::oafPidToBrokerPid));
|
res.setPids(mappedList(result.getPid(), ConversionUtils::oafPidToBrokerPid));
|
||||||
res.setInstances(flatMappedList(result.getInstance(), ConversionUtils::oafInstanceToBrokerInstances));
|
res.setInstances(flatMappedList(result.getInstance(), ConversionUtils::oafInstanceToBrokerInstances));
|
||||||
res
|
res
|
||||||
|
@ -223,6 +223,18 @@ public class ConversionUtils {
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static final OaBrokerRelatedDatasource oafDatasourceToBrokerDatasource(final Datasource ds) {
|
||||||
|
if (ds == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
final OaBrokerRelatedDatasource res = new OaBrokerRelatedDatasource();
|
||||||
|
res.setName(StringUtils.defaultIfBlank(fieldValue(ds.getOfficialname()), fieldValue(ds.getEnglishname())));
|
||||||
|
res.setOpenaireId(ds.getId());
|
||||||
|
res.setType(classId(ds.getDatasourcetype()));
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
private static String first(final List<String> list) {
|
private static String first(final List<String> list) {
|
||||||
return list != null && list.size() > 0 ? list.get(0) : null;
|
return list != null && list.size() > 0 ? list.get(0) : null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,68 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.broker.oa.util;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
|
import scala.Tuple3;
|
||||||
|
|
||||||
|
public class DatasourceRelationsAccumulator implements Serializable {
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
private static final long serialVersionUID = 3256220670651218957L;
|
||||||
|
|
||||||
|
private List<Tuple3<String, String, String>> rels = new ArrayList<>();
|
||||||
|
|
||||||
|
public List<Tuple3<String, String, String>> getRels() {
|
||||||
|
return rels;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRels(final List<Tuple3<String, String, String>> rels) {
|
||||||
|
this.rels = rels;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void addTuple(final Tuple3<String, String, String> t) {
|
||||||
|
rels.add(t);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final DatasourceRelationsAccumulator calculateTuples(final Result r) {
|
||||||
|
|
||||||
|
final Set<String> collectedFromSet = r
|
||||||
|
.getCollectedfrom()
|
||||||
|
.stream()
|
||||||
|
.map(kv -> kv.getKey())
|
||||||
|
.filter(StringUtils::isNotBlank)
|
||||||
|
.distinct()
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
|
||||||
|
final Set<String> hostedBySet = r
|
||||||
|
.getInstance()
|
||||||
|
.stream()
|
||||||
|
.map(i -> i.getHostedby())
|
||||||
|
.filter(Objects::nonNull)
|
||||||
|
.filter(kv -> !StringUtils.equalsIgnoreCase(kv.getValue(), "Unknown Repository"))
|
||||||
|
.map(kv -> kv.getKey())
|
||||||
|
.filter(StringUtils::isNotBlank)
|
||||||
|
.distinct()
|
||||||
|
.filter(id -> !collectedFromSet.contains(id))
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
|
||||||
|
final DatasourceRelationsAccumulator res = new DatasourceRelationsAccumulator();
|
||||||
|
collectedFromSet
|
||||||
|
.stream()
|
||||||
|
.map(s -> new Tuple3<>(r.getId(), s, BrokerConstants.COLLECTED_FROM_REL))
|
||||||
|
.forEach(res::addTuple);
|
||||||
|
hostedBySet.stream().map(s -> new Tuple3<>(r.getId(), s, BrokerConstants.HOSTED_BY_REL)).forEach(res::addTuple);
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -11,6 +11,7 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||||
|
import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
|
||||||
import eu.dnetlib.dhp.broker.model.EventFactory;
|
import eu.dnetlib.dhp.broker.model.EventFactory;
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsReferencedBy;
|
import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsReferencedBy;
|
||||||
|
@ -80,9 +81,11 @@ public class EventFinder {
|
||||||
final List<UpdateInfo<?>> list = new ArrayList<>();
|
final List<UpdateInfo<?>> list = new ArrayList<>();
|
||||||
|
|
||||||
for (final OaBrokerMainEntity target : results.getData()) {
|
for (final OaBrokerMainEntity target : results.getData()) {
|
||||||
if (verifyTarget(target, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist)) {
|
for (final OaBrokerRelatedDatasource targetDs : target.getDatasources()) {
|
||||||
|
if (verifyTarget(targetDs, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist)) {
|
||||||
for (final UpdateMatcher<?> matcher : matchers) {
|
for (final UpdateMatcher<?> matcher : matchers) {
|
||||||
list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), accumulators));
|
list.addAll(matcher.searchUpdatesForRecord(target, targetDs, results.getData(), accumulators));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -90,17 +93,17 @@ public class EventFinder {
|
||||||
return asEventGroup(list);
|
return asEventGroup(list);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean verifyTarget(final OaBrokerMainEntity target,
|
private static boolean verifyTarget(final OaBrokerRelatedDatasource target,
|
||||||
final Set<String> dsIdWhitelist,
|
final Set<String> dsIdWhitelist,
|
||||||
final Set<String> dsIdBlacklist,
|
final Set<String> dsIdBlacklist,
|
||||||
final Set<String> dsTypeWhitelist) {
|
final Set<String> dsTypeWhitelist) {
|
||||||
|
|
||||||
if (dsIdWhitelist.contains(target.getCollectedFromId())) {
|
if (dsIdWhitelist.contains(target.getOpenaireId())) {
|
||||||
return true;
|
return true;
|
||||||
} else if (dsIdBlacklist.contains(target.getCollectedFromId())) {
|
} else if (dsIdBlacklist.contains(target.getOpenaireId())) {
|
||||||
return false;
|
return false;
|
||||||
} else {
|
} else {
|
||||||
return dsTypeWhitelist.contains(target.getCollectedFromType());
|
return dsTypeWhitelist.contains(target.getType());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -8,6 +8,7 @@ import eu.dnetlib.broker.objects.OaBrokerEventPayload;
|
||||||
import eu.dnetlib.broker.objects.OaBrokerInstance;
|
import eu.dnetlib.broker.objects.OaBrokerInstance;
|
||||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||||
import eu.dnetlib.broker.objects.OaBrokerProvenance;
|
import eu.dnetlib.broker.objects.OaBrokerProvenance;
|
||||||
|
import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
|
||||||
import eu.dnetlib.dhp.broker.model.Topic;
|
import eu.dnetlib.dhp.broker.model.Topic;
|
||||||
|
|
||||||
public final class UpdateInfo<T> {
|
public final class UpdateInfo<T> {
|
||||||
|
@ -20,6 +21,8 @@ public final class UpdateInfo<T> {
|
||||||
|
|
||||||
private final OaBrokerMainEntity target;
|
private final OaBrokerMainEntity target;
|
||||||
|
|
||||||
|
private final OaBrokerRelatedDatasource targetDs;
|
||||||
|
|
||||||
private final BiConsumer<OaBrokerMainEntity, T> compileHighlight;
|
private final BiConsumer<OaBrokerMainEntity, T> compileHighlight;
|
||||||
|
|
||||||
private final Function<T, String> highlightToString;
|
private final Function<T, String> highlightToString;
|
||||||
|
@ -28,12 +31,14 @@ public final class UpdateInfo<T> {
|
||||||
|
|
||||||
public UpdateInfo(final Topic topic, final T highlightValue, final OaBrokerMainEntity source,
|
public UpdateInfo(final Topic topic, final T highlightValue, final OaBrokerMainEntity source,
|
||||||
final OaBrokerMainEntity target,
|
final OaBrokerMainEntity target,
|
||||||
|
final OaBrokerRelatedDatasource targetDs,
|
||||||
final BiConsumer<OaBrokerMainEntity, T> compileHighlight,
|
final BiConsumer<OaBrokerMainEntity, T> compileHighlight,
|
||||||
final Function<T, String> highlightToString) {
|
final Function<T, String> highlightToString) {
|
||||||
this.topic = topic;
|
this.topic = topic;
|
||||||
this.highlightValue = highlightValue;
|
this.highlightValue = highlightValue;
|
||||||
this.source = source;
|
this.source = source;
|
||||||
this.target = target;
|
this.target = target;
|
||||||
|
this.targetDs = targetDs;
|
||||||
this.compileHighlight = compileHighlight;
|
this.compileHighlight = compileHighlight;
|
||||||
this.highlightToString = highlightToString;
|
this.highlightToString = highlightToString;
|
||||||
this.trust = TrustUtils.calculateTrust(source, target);
|
this.trust = TrustUtils.calculateTrust(source, target);
|
||||||
|
@ -51,6 +56,10 @@ public final class UpdateInfo<T> {
|
||||||
return target;
|
return target;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public OaBrokerRelatedDatasource getTargetDs() {
|
||||||
|
return targetDs;
|
||||||
|
}
|
||||||
|
|
||||||
protected Topic getTopic() {
|
protected Topic getTopic() {
|
||||||
return topic;
|
return topic;
|
||||||
}
|
}
|
||||||
|
@ -75,8 +84,20 @@ public final class UpdateInfo<T> {
|
||||||
compileHighlight.accept(hl, getHighlightValue());
|
compileHighlight.accept(hl, getHighlightValue());
|
||||||
|
|
||||||
final String provId = getSource().getOpenaireId();
|
final String provId = getSource().getOpenaireId();
|
||||||
final String provRepo = getSource().getCollectedFromName();
|
final String provRepo = getSource()
|
||||||
final String provType = getSource().getCollectedFromType();
|
.getDatasources()
|
||||||
|
.stream()
|
||||||
|
.filter(ds -> ds.getRelType().equals(BrokerConstants.COLLECTED_FROM_REL))
|
||||||
|
.map(ds -> ds.getName())
|
||||||
|
.findFirst()
|
||||||
|
.orElse("");
|
||||||
|
final String provType = getSource()
|
||||||
|
.getDatasources()
|
||||||
|
.stream()
|
||||||
|
.filter(ds -> ds.getRelType().equals(BrokerConstants.COLLECTED_FROM_REL))
|
||||||
|
.map(ds -> ds.getType())
|
||||||
|
.findFirst()
|
||||||
|
.orElse("");
|
||||||
|
|
||||||
final String provUrl = getSource()
|
final String provUrl = getSource()
|
||||||
.getInstances()
|
.getInstances()
|
||||||
|
|
|
@ -0,0 +1,42 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
|
||||||
|
|
||||||
|
public class RelatedDatasource implements Serializable {
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
private static final long serialVersionUID = 3015550240920424010L;
|
||||||
|
|
||||||
|
private String source;
|
||||||
|
private OaBrokerRelatedDatasource relDatasource;
|
||||||
|
|
||||||
|
public RelatedDatasource() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public RelatedDatasource(final String source, final OaBrokerRelatedDatasource relDatasource) {
|
||||||
|
this.source = source;
|
||||||
|
this.relDatasource = relDatasource;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getSource() {
|
||||||
|
return source;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSource(final String source) {
|
||||||
|
this.source = source;
|
||||||
|
}
|
||||||
|
|
||||||
|
public OaBrokerRelatedDatasource getRelDatasource() {
|
||||||
|
return relDatasource;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRelDatasource(final OaBrokerRelatedDatasource relDatasource) {
|
||||||
|
this.relDatasource = relDatasource;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -7,15 +7,16 @@ import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.expressions.Aggregator;
|
import org.apache.spark.sql.expressions.Aggregator;
|
||||||
|
|
||||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
public class AddDatasourceTypeAggregator
|
public class RelatedDatasourceAggregator
|
||||||
extends Aggregator<Tuple2<OaBrokerMainEntity, SimpleDatasourceInfo>, OaBrokerMainEntity, OaBrokerMainEntity> {
|
extends Aggregator<Tuple2<OaBrokerMainEntity, RelatedDatasource>, OaBrokerMainEntity, OaBrokerMainEntity> {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
private static final long serialVersionUID = 8788588975496014728L;
|
private static final long serialVersionUID = -7212121913834713672L;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public OaBrokerMainEntity zero() {
|
public OaBrokerMainEntity zero() {
|
||||||
|
@ -29,10 +30,10 @@ public class AddDatasourceTypeAggregator
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g,
|
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g,
|
||||||
final Tuple2<OaBrokerMainEntity, SimpleDatasourceInfo> t) {
|
final Tuple2<OaBrokerMainEntity, RelatedDatasource> t) {
|
||||||
final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1;
|
final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1;
|
||||||
if (t._2 != null && StringUtils.isNotBlank(t._2.getType())) {
|
if (t._2 != null && res.getDatasources().size() < BrokerConstants.MAX_NUMBER_OF_RELS) {
|
||||||
res.setCollectedFromType(t._2.getType());
|
res.getDatasources().add(t._2.getRelDatasource());
|
||||||
}
|
}
|
||||||
return res;
|
return res;
|
||||||
|
|
||||||
|
@ -40,7 +41,15 @@ public class AddDatasourceTypeAggregator
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
|
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
|
||||||
if (StringUtils.isNotBlank(g1.getOpenaireId()) && StringUtils.isNotBlank(g1.getCollectedFromType())) {
|
if (StringUtils.isNotBlank(g1.getOpenaireId())) {
|
||||||
|
final int availables = BrokerConstants.MAX_NUMBER_OF_RELS - g1.getDatasources().size();
|
||||||
|
if (availables > 0) {
|
||||||
|
if (g2.getDatasources().size() <= availables) {
|
||||||
|
g1.getDatasources().addAll(g2.getDatasources());
|
||||||
|
} else {
|
||||||
|
g1.getDatasources().addAll(g2.getDatasources().subList(0, availables));
|
||||||
|
}
|
||||||
|
}
|
||||||
return g1;
|
return g1;
|
||||||
} else {
|
} else {
|
||||||
return g2;
|
return g2;
|
||||||
|
@ -56,4 +65,5 @@ public class AddDatasourceTypeAggregator
|
||||||
public Encoder<OaBrokerMainEntity> outputEncoder() {
|
public Encoder<OaBrokerMainEntity> outputEncoder() {
|
||||||
return Encoders.bean(OaBrokerMainEntity.class);
|
return Encoders.bean(OaBrokerMainEntity.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -1,40 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
|
|
||||||
public class SimpleDatasourceInfo implements Serializable {
|
|
||||||
|
|
||||||
/**
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
private static final long serialVersionUID = 2996609859416024734L;
|
|
||||||
|
|
||||||
private String id;
|
|
||||||
private String type;
|
|
||||||
|
|
||||||
public SimpleDatasourceInfo() {
|
|
||||||
}
|
|
||||||
|
|
||||||
public SimpleDatasourceInfo(final String id, final String type) {
|
|
||||||
this.id = id;
|
|
||||||
this.type = type;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getId() {
|
|
||||||
return id;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setId(final String id) {
|
|
||||||
this.id = id;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getType() {
|
|
||||||
return type;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setType(final String type) {
|
|
||||||
this.type = type;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -448,6 +448,30 @@
|
||||||
<arg>--index</arg><arg>${esIndexName}</arg>
|
<arg>--index</arg><arg>${esIndexName}</arg>
|
||||||
<arg>--esHost</arg><arg>${esIndexHost}</arg>
|
<arg>--esHost</arg><arg>${esIndexHost}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
|
<ok to="stats"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="stats">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>GenerateStatsJob</name>
|
||||||
|
<class>eu.dnetlib.dhp.broker.oa.GenerateStatsJob</class>
|
||||||
|
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
</spark>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
|
@ -64,13 +64,208 @@
|
||||||
</configuration>
|
</configuration>
|
||||||
</global>
|
</global>
|
||||||
|
|
||||||
<start to="stats"/>
|
<start to="join_entities_step0"/>
|
||||||
|
|
||||||
<kill name="Kill">
|
<kill name="Kill">
|
||||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
</kill>
|
</kill>
|
||||||
|
|
||||||
|
|
||||||
|
<action name="join_entities_step0">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>JoinStep0</name>
|
||||||
|
<class>eu.dnetlib.dhp.broker.oa.JoinStep0Job</class>
|
||||||
|
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="join_entities_step1"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="join_entities_step1">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>JoinStep1</name>
|
||||||
|
<class>eu.dnetlib.dhp.broker.oa.JoinStep1Job</class>
|
||||||
|
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="join_entities_step2"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="join_entities_step2">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>JoinStep2</name>
|
||||||
|
<class>eu.dnetlib.dhp.broker.oa.JoinStep2Job</class>
|
||||||
|
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="join_entities_step3"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="join_entities_step3">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>JoinStep3</name>
|
||||||
|
<class>eu.dnetlib.dhp.broker.oa.JoinStep3Job</class>
|
||||||
|
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="join_entities_step4"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="join_entities_step4">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>JoinStep4</name>
|
||||||
|
<class>eu.dnetlib.dhp.broker.oa.JoinStep4Job</class>
|
||||||
|
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="prepare_groups"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="prepare_groups">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>PrepareGroupsJob</name>
|
||||||
|
<class>eu.dnetlib.dhp.broker.oa.PrepareGroupsJob</class>
|
||||||
|
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="generate_events"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="generate_events">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>GenerateEventsJob</name>
|
||||||
|
<class>eu.dnetlib.dhp.broker.oa.GenerateEventsJob</class>
|
||||||
|
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
<arg>--datasourceIdWhitelist</arg><arg>${datasourceIdWhitelist}</arg>
|
||||||
|
<arg>--datasourceTypeWhitelist</arg><arg>${datasourceTypeWhitelist}</arg>
|
||||||
|
<arg>--datasourceIdBlacklist</arg><arg>${datasourceIdBlacklist}</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="index_es"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="index_es">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>IndexOnESJob</name>
|
||||||
|
<class>eu.dnetlib.dhp.broker.oa.IndexOnESJob</class>
|
||||||
|
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.dynamicAllocation.maxExecutors="8"
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
<arg>--index</arg><arg>${esIndexName}</arg>
|
||||||
|
<arg>--esHost</arg><arg>${esIndexHost}</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="stats"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
<action name="stats">
|
<action name="stats">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
|
|
|
@ -8,15 +8,23 @@ import java.util.Collection;
|
||||||
|
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
|
||||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||||
|
import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPublicationDate;
|
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPublicationDate;
|
||||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||||
|
|
||||||
|
@ExtendWith(MockitoExtension.class)
|
||||||
class UpdateMatcherTest {
|
class UpdateMatcherTest {
|
||||||
|
|
||||||
UpdateMatcher<String> matcher = new EnrichMissingPublicationDate();
|
UpdateMatcher<String> matcher = new EnrichMissingPublicationDate();
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private OaBrokerRelatedDatasource targetDs;
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
void setUp() throws Exception {
|
void setUp() throws Exception {
|
||||||
}
|
}
|
||||||
|
@ -30,7 +38,7 @@ class UpdateMatcherTest {
|
||||||
final OaBrokerMainEntity p4 = new OaBrokerMainEntity();
|
final OaBrokerMainEntity p4 = new OaBrokerMainEntity();
|
||||||
|
|
||||||
final Collection<UpdateInfo<String>> list = matcher
|
final Collection<UpdateInfo<String>> list = matcher
|
||||||
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null);
|
.searchUpdatesForRecord(res, targetDs, Arrays.asList(p1, p2, p3, p4), null);
|
||||||
|
|
||||||
assertTrue(list.isEmpty());
|
assertTrue(list.isEmpty());
|
||||||
}
|
}
|
||||||
|
@ -46,7 +54,7 @@ class UpdateMatcherTest {
|
||||||
res.setPublicationdate("2018");
|
res.setPublicationdate("2018");
|
||||||
|
|
||||||
final Collection<UpdateInfo<String>> list = matcher
|
final Collection<UpdateInfo<String>> list = matcher
|
||||||
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null);
|
.searchUpdatesForRecord(res, targetDs, Arrays.asList(p1, p2, p3, p4), null);
|
||||||
|
|
||||||
assertTrue(list.isEmpty());
|
assertTrue(list.isEmpty());
|
||||||
}
|
}
|
||||||
|
@ -62,7 +70,7 @@ class UpdateMatcherTest {
|
||||||
p2.setPublicationdate("2018");
|
p2.setPublicationdate("2018");
|
||||||
|
|
||||||
final Collection<UpdateInfo<String>> list = matcher
|
final Collection<UpdateInfo<String>> list = matcher
|
||||||
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null);
|
.searchUpdatesForRecord(res, targetDs, Arrays.asList(p1, p2, p3, p4), null);
|
||||||
|
|
||||||
assertTrue(list.size() == 1);
|
assertTrue(list.size() == 1);
|
||||||
}
|
}
|
||||||
|
@ -79,7 +87,7 @@ class UpdateMatcherTest {
|
||||||
p2.setPublicationdate("2018");
|
p2.setPublicationdate("2018");
|
||||||
|
|
||||||
final Collection<UpdateInfo<String>> list = matcher
|
final Collection<UpdateInfo<String>> list = matcher
|
||||||
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null);
|
.searchUpdatesForRecord(res, targetDs, Arrays.asList(p1, p2, p3, p4), null);
|
||||||
|
|
||||||
assertTrue(list.isEmpty());
|
assertTrue(list.isEmpty());
|
||||||
}
|
}
|
||||||
|
@ -98,7 +106,7 @@ class UpdateMatcherTest {
|
||||||
p4.setPublicationdate("2018");
|
p4.setPublicationdate("2018");
|
||||||
|
|
||||||
final Collection<UpdateInfo<String>> list = matcher
|
final Collection<UpdateInfo<String>> list = matcher
|
||||||
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null);
|
.searchUpdatesForRecord(res, targetDs, Arrays.asList(p1, p2, p3, p4), null);
|
||||||
|
|
||||||
assertTrue(list.isEmpty());
|
assertTrue(list.isEmpty());
|
||||||
}
|
}
|
||||||
|
@ -117,7 +125,7 @@ class UpdateMatcherTest {
|
||||||
p4.setPublicationdate("2018");
|
p4.setPublicationdate("2018");
|
||||||
|
|
||||||
final Collection<UpdateInfo<String>> list = matcher
|
final Collection<UpdateInfo<String>> list = matcher
|
||||||
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null);
|
.searchUpdatesForRecord(res, targetDs, Arrays.asList(p1, p2, p3, p4), null);
|
||||||
|
|
||||||
assertTrue(list.size() == 1);
|
assertTrue(list.size() == 1);
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,8 @@ import eu.dnetlib.pace.config.DedupConfig;
|
||||||
|
|
||||||
abstract class AbstractSparkAction implements Serializable {
|
abstract class AbstractSparkAction implements Serializable {
|
||||||
|
|
||||||
|
protected static final int NUM_PARTITIONS = 1000;
|
||||||
|
|
||||||
protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
|
protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
|
||||||
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||||
|
|
||||||
|
|
|
@ -100,6 +100,11 @@ public class DedupUtility {
|
||||||
return String.format("%s/%s/%s_mergerel", basePath, actionSetId, entityType);
|
return String.format("%s/%s/%s_mergerel", basePath, actionSetId, entityType);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static String createBlockStatsPath(
|
||||||
|
final String basePath, final String actionSetId, final String entityType) {
|
||||||
|
return String.format("%s/%s/%s_blockstats", basePath, actionSetId, entityType);
|
||||||
|
}
|
||||||
|
|
||||||
public static List<DedupConfig> getConfigurations(String isLookUpUrl, String orchestrator)
|
public static List<DedupConfig> getConfigurations(String isLookUpUrl, String orchestrator)
|
||||||
throws ISLookUpException, DocumentException {
|
throws ISLookUpException, DocumentException {
|
||||||
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookUpUrl);
|
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookUpUrl);
|
||||||
|
|
|
@ -52,6 +52,7 @@ public class Deduper implements Serializable {
|
||||||
.collect(Collectors.toList())
|
.collect(Collectors.toList())
|
||||||
.iterator())
|
.iterator())
|
||||||
.mapToPair(block -> new Tuple2<>(block.getKey(), block))
|
.mapToPair(block -> new Tuple2<>(block.getKey(), block))
|
||||||
.reduceByKey((b1, b2) -> Block.from(b1, b2, of, maxQueueSize));
|
.reduceByKey((b1, b2) -> Block.from(b1, b2, of, maxQueueSize))
|
||||||
|
.filter(b -> b._2().getDocuments().size() > 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,57 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.oa.dedup;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
import org.apache.spark.sql.Encoder;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.expressions.Aggregator;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
|
||||||
|
public class RelationAggregator extends Aggregator<Relation, Relation, Relation> {
|
||||||
|
|
||||||
|
private static Relation ZERO = new Relation();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Relation zero() {
|
||||||
|
return ZERO;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Relation reduce(Relation b, Relation a) {
|
||||||
|
return mergeRel(b, a);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Relation merge(Relation b, Relation a) {
|
||||||
|
return mergeRel(b, a);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Relation finish(Relation r) {
|
||||||
|
return r;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Relation mergeRel(Relation b, Relation a) {
|
||||||
|
if (Objects.equals(b, ZERO)) {
|
||||||
|
return a;
|
||||||
|
}
|
||||||
|
if (Objects.equals(a, ZERO)) {
|
||||||
|
return b;
|
||||||
|
}
|
||||||
|
|
||||||
|
b.mergeFrom(a);
|
||||||
|
return b;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Encoder<Relation> bufferEncoder() {
|
||||||
|
return Encoders.kryo(Relation.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Encoder<Relation> outputEncoder() {
|
||||||
|
return Encoders.kryo(Relation.class);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,126 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.oa.dedup;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.api.java.function.FilterFunction;
|
||||||
|
import org.apache.spark.api.java.function.PairFunction;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.SaveMode;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.dom4j.DocumentException;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.oa.dedup.model.Block;
|
||||||
|
import eu.dnetlib.dhp.oa.dedup.model.BlockStats;
|
||||||
|
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||||
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||||
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||||
|
import eu.dnetlib.pace.config.DedupConfig;
|
||||||
|
import eu.dnetlib.pace.model.MapDocument;
|
||||||
|
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
public class SparkBlockStats extends AbstractSparkAction {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(SparkBlockStats.class);
|
||||||
|
|
||||||
|
public SparkBlockStats(ArgumentApplicationParser parser, SparkSession spark) {
|
||||||
|
super(parser, spark);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
SparkBlockStats.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/oa/dedup/createBlockStats_parameters.json")));
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
|
||||||
|
new SparkBlockStats(parser, getSparkSession(conf))
|
||||||
|
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long computeComparisons(Long blockSize, Long slidingWindowSize) {
|
||||||
|
|
||||||
|
if (slidingWindowSize >= blockSize)
|
||||||
|
return (slidingWindowSize * (slidingWindowSize - 1)) / 2;
|
||||||
|
else {
|
||||||
|
return (blockSize - slidingWindowSize + 1) * (slidingWindowSize * (slidingWindowSize - 1)) / 2;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run(ISLookUpService isLookUpService)
|
||||||
|
throws DocumentException, IOException, ISLookUpException {
|
||||||
|
|
||||||
|
// read oozie parameters
|
||||||
|
final String graphBasePath = parser.get("graphBasePath");
|
||||||
|
final String isLookUpUrl = parser.get("isLookUpUrl");
|
||||||
|
final String actionSetId = parser.get("actionSetId");
|
||||||
|
final String workingPath = parser.get("workingPath");
|
||||||
|
final int numPartitions = Optional
|
||||||
|
.ofNullable(parser.get("numPartitions"))
|
||||||
|
.map(Integer::valueOf)
|
||||||
|
.orElse(NUM_PARTITIONS);
|
||||||
|
|
||||||
|
log.info("graphBasePath: '{}'", graphBasePath);
|
||||||
|
log.info("isLookUpUrl: '{}'", isLookUpUrl);
|
||||||
|
log.info("actionSetId: '{}'", actionSetId);
|
||||||
|
log.info("workingPath: '{}'", workingPath);
|
||||||
|
|
||||||
|
// for each dedup configuration
|
||||||
|
for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) {
|
||||||
|
|
||||||
|
final String subEntity = dedupConf.getWf().getSubEntityValue();
|
||||||
|
log.info("Creating blockstats for: '{}'", subEntity);
|
||||||
|
|
||||||
|
final String outputPath = DedupUtility.createBlockStatsPath(workingPath, actionSetId, subEntity);
|
||||||
|
removeOutputDir(spark, outputPath);
|
||||||
|
|
||||||
|
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
JavaPairRDD<String, MapDocument> mapDocuments = sc
|
||||||
|
.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
|
||||||
|
.repartition(numPartitions)
|
||||||
|
.mapToPair(
|
||||||
|
(PairFunction<String, String, MapDocument>) s -> {
|
||||||
|
MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s);
|
||||||
|
return new Tuple2<>(d.getIdentifier(), d);
|
||||||
|
});
|
||||||
|
|
||||||
|
// create blocks for deduplication
|
||||||
|
JavaRDD<BlockStats> blockStats = Deduper
|
||||||
|
.createSortedBlocks(mapDocuments, dedupConf)
|
||||||
|
.repartition(numPartitions)
|
||||||
|
.map(b -> asBlockStats(dedupConf, b));
|
||||||
|
|
||||||
|
// save the blockstats in the workingdir
|
||||||
|
spark
|
||||||
|
.createDataset(blockStats.rdd(), Encoders.bean(BlockStats.class))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.save(outputPath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private BlockStats asBlockStats(DedupConfig dedupConf, Tuple2<String, Block> b) {
|
||||||
|
return new BlockStats(
|
||||||
|
b._1(),
|
||||||
|
(long) b._2().getDocuments().size(),
|
||||||
|
computeComparisons(
|
||||||
|
(long) b._2().getDocuments().size(), (long) dedupConf.getWf().getSlidingWindowSize()));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -5,11 +5,13 @@ import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.JavaPairRDD;
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.api.java.function.PairFunction;
|
import org.apache.spark.api.java.function.PairFunction;
|
||||||
import org.apache.spark.graphx.Edge;
|
import org.apache.spark.graphx.Edge;
|
||||||
import org.apache.spark.rdd.RDD;
|
import org.apache.spark.rdd.RDD;
|
||||||
|
@ -75,7 +77,11 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
|
||||||
final String workingPath = parser.get("workingPath");
|
final String workingPath = parser.get("workingPath");
|
||||||
final String isLookUpUrl = parser.get("isLookUpUrl");
|
final String isLookUpUrl = parser.get("isLookUpUrl");
|
||||||
final String actionSetId = parser.get("actionSetId");
|
final String actionSetId = parser.get("actionSetId");
|
||||||
|
int cut = Optional
|
||||||
|
.ofNullable(parser.get("cutConnectedComponent"))
|
||||||
|
.map(Integer::valueOf)
|
||||||
|
.orElse(0);
|
||||||
|
log.info("connected component cut: '{}'", cut);
|
||||||
log.info("graphBasePath: '{}'", graphBasePath);
|
log.info("graphBasePath: '{}'", graphBasePath);
|
||||||
log.info("isLookUpUrl: '{}'", isLookUpUrl);
|
log.info("isLookUpUrl: '{}'", isLookUpUrl);
|
||||||
log.info("actionSetId: '{}'", actionSetId);
|
log.info("actionSetId: '{}'", actionSetId);
|
||||||
|
@ -100,8 +106,10 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
|
||||||
|
|
||||||
final RDD<Edge<String>> edgeRdd = spark
|
final RDD<Edge<String>> edgeRdd = spark
|
||||||
.read()
|
.read()
|
||||||
.load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity))
|
.textFile(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity))
|
||||||
.as(Encoders.bean(Relation.class))
|
.map(
|
||||||
|
(MapFunction<String, Relation>) r -> OBJECT_MAPPER.readValue(r, Relation.class),
|
||||||
|
Encoders.bean(Relation.class))
|
||||||
.javaRDD()
|
.javaRDD()
|
||||||
.map(it -> new Edge<>(hash(it.getSource()), hash(it.getTarget()), it.getRelClass()))
|
.map(it -> new Edge<>(hash(it.getSource()), hash(it.getTarget()), it.getRelClass()))
|
||||||
.rdd();
|
.rdd();
|
||||||
|
@ -109,7 +117,7 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
|
||||||
final Dataset<Relation> mergeRels = spark
|
final Dataset<Relation> mergeRels = spark
|
||||||
.createDataset(
|
.createDataset(
|
||||||
GraphProcessor
|
GraphProcessor
|
||||||
.findCCs(vertexes.rdd(), edgeRdd, maxIterations)
|
.findCCs(vertexes.rdd(), edgeRdd, maxIterations, cut)
|
||||||
.toJavaRDD()
|
.toJavaRDD()
|
||||||
.filter(k -> k.getDocIds().size() > 1)
|
.filter(k -> k.getDocIds().size() > 1)
|
||||||
.flatMap(cc -> ccToMergeRel(cc, dedupConf))
|
.flatMap(cc -> ccToMergeRel(cc, dedupConf))
|
||||||
|
@ -117,6 +125,7 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
|
||||||
Encoders.bean(Relation.class));
|
Encoders.bean(Relation.class));
|
||||||
|
|
||||||
mergeRels.write().mode(SaveMode.Append).parquet(mergeRelPath);
|
mergeRels.write().mode(SaveMode.Append).parquet(mergeRelPath);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
package eu.dnetlib.dhp.oa.dedup;
|
package eu.dnetlib.dhp.oa.dedup;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
|
@ -48,13 +49,6 @@ public class SparkCreateSimRels extends AbstractSparkAction {
|
||||||
parser.parseArgument(args);
|
parser.parseArgument(args);
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
SparkConf conf = new SparkConf();
|
||||||
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
|
||||||
conf
|
|
||||||
.registerKryoClasses(
|
|
||||||
new Class[] {
|
|
||||||
MapDocument.class, FieldListImpl.class, FieldValueImpl.class, Block.class
|
|
||||||
});
|
|
||||||
|
|
||||||
new SparkCreateSimRels(parser, getSparkSession(conf))
|
new SparkCreateSimRels(parser, getSparkSession(conf))
|
||||||
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
|
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
|
||||||
}
|
}
|
||||||
|
@ -68,7 +62,12 @@ public class SparkCreateSimRels extends AbstractSparkAction {
|
||||||
final String isLookUpUrl = parser.get("isLookUpUrl");
|
final String isLookUpUrl = parser.get("isLookUpUrl");
|
||||||
final String actionSetId = parser.get("actionSetId");
|
final String actionSetId = parser.get("actionSetId");
|
||||||
final String workingPath = parser.get("workingPath");
|
final String workingPath = parser.get("workingPath");
|
||||||
|
final int numPartitions = Optional
|
||||||
|
.ofNullable(parser.get("numPartitions"))
|
||||||
|
.map(Integer::valueOf)
|
||||||
|
.orElse(NUM_PARTITIONS);
|
||||||
|
|
||||||
|
log.info("numPartitions: '{}'", numPartitions);
|
||||||
log.info("graphBasePath: '{}'", graphBasePath);
|
log.info("graphBasePath: '{}'", graphBasePath);
|
||||||
log.info("isLookUpUrl: '{}'", isLookUpUrl);
|
log.info("isLookUpUrl: '{}'", isLookUpUrl);
|
||||||
log.info("actionSetId: '{}'", actionSetId);
|
log.info("actionSetId: '{}'", actionSetId);
|
||||||
|
@ -88,6 +87,7 @@ public class SparkCreateSimRels extends AbstractSparkAction {
|
||||||
|
|
||||||
JavaPairRDD<String, MapDocument> mapDocuments = sc
|
JavaPairRDD<String, MapDocument> mapDocuments = sc
|
||||||
.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
|
.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
|
||||||
|
.repartition(numPartitions)
|
||||||
.mapToPair(
|
.mapToPair(
|
||||||
(PairFunction<String, String, MapDocument>) s -> {
|
(PairFunction<String, String, MapDocument>) s -> {
|
||||||
MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s);
|
MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s);
|
||||||
|
@ -95,19 +95,17 @@ public class SparkCreateSimRels extends AbstractSparkAction {
|
||||||
});
|
});
|
||||||
|
|
||||||
// create blocks for deduplication
|
// create blocks for deduplication
|
||||||
JavaPairRDD<String, Block> blocks = Deduper.createSortedBlocks(mapDocuments, dedupConf);
|
JavaPairRDD<String, Block> blocks = Deduper
|
||||||
|
.createSortedBlocks(mapDocuments, dedupConf)
|
||||||
|
.repartition(numPartitions);
|
||||||
|
|
||||||
// create relations by comparing only elements in the same group
|
// create relations by comparing only elements in the same group
|
||||||
JavaRDD<Relation> relations = Deduper
|
Deduper
|
||||||
.computeRelations(sc, blocks, dedupConf)
|
.computeRelations(sc, blocks, dedupConf)
|
||||||
.map(t -> createSimRel(t._1(), t._2(), entity));
|
.map(t -> createSimRel(t._1(), t._2(), entity))
|
||||||
|
.repartition(numPartitions)
|
||||||
// save the simrel in the workingdir
|
.map(r -> OBJECT_MAPPER.writeValueAsString(r))
|
||||||
spark
|
.saveAsTextFile(outputPath);
|
||||||
.createDataset(relations.rdd(), Encoders.bean(Relation.class))
|
|
||||||
.write()
|
|
||||||
.mode(SaveMode.Append)
|
|
||||||
.save(outputPath);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,12 +4,16 @@ package eu.dnetlib.dhp.oa.dedup;
|
||||||
import static org.apache.spark.sql.functions.col;
|
import static org.apache.spark.sql.functions.col;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.function.FilterFunction;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.sql.*;
|
import org.apache.spark.sql.*;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.google.common.base.Joiner;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||||
|
@ -95,7 +99,24 @@ public class SparkPropagateRelation extends AbstractSparkAction {
|
||||||
FieldType.TARGET,
|
FieldType.TARGET,
|
||||||
getDeletedFn());
|
getDeletedFn());
|
||||||
|
|
||||||
save(newRels.union(updated).union(mergeRels), outputRelationPath, SaveMode.Overwrite);
|
save(
|
||||||
|
distinctRelations(
|
||||||
|
newRels
|
||||||
|
.union(updated)
|
||||||
|
.union(mergeRels)
|
||||||
|
.map((MapFunction<Relation, Relation>) r -> r, Encoders.kryo(Relation.class))),
|
||||||
|
outputRelationPath, SaveMode.Overwrite);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Dataset<Relation> distinctRelations(Dataset<Relation> rels) {
|
||||||
|
return rels
|
||||||
|
.filter(getRelationFilterFunction())
|
||||||
|
.groupByKey(
|
||||||
|
(MapFunction<Relation, String>) r -> String
|
||||||
|
.join(r.getSource(), r.getTarget(), r.getRelType(), r.getSubRelType(), r.getRelClass()),
|
||||||
|
Encoders.STRING())
|
||||||
|
.agg(new RelationAggregator().toColumn())
|
||||||
|
.map((MapFunction<Tuple2<String, Relation>, Relation>) t -> t._2(), Encoders.bean(Relation.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Dataset<Relation> processDataset(
|
private static Dataset<Relation> processDataset(
|
||||||
|
@ -112,6 +133,14 @@ public class SparkPropagateRelation extends AbstractSparkAction {
|
||||||
.map(mapFn, Encoders.bean(Relation.class));
|
.map(mapFn, Encoders.bean(Relation.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private FilterFunction<Relation> getRelationFilterFunction() {
|
||||||
|
return (FilterFunction<Relation>) r -> StringUtils.isNotBlank(r.getSource()) ||
|
||||||
|
StringUtils.isNotBlank(r.getTarget()) ||
|
||||||
|
StringUtils.isNotBlank(r.getRelClass()) ||
|
||||||
|
StringUtils.isNotBlank(r.getSubRelType()) ||
|
||||||
|
StringUtils.isNotBlank(r.getRelClass());
|
||||||
|
}
|
||||||
|
|
||||||
private static MapFunction<String, Relation> patchRelFn() {
|
private static MapFunction<String, Relation> patchRelFn() {
|
||||||
return value -> {
|
return value -> {
|
||||||
final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);
|
final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);
|
||||||
|
|
|
@ -4,6 +4,7 @@ package eu.dnetlib.dhp.oa.dedup.graph;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.codehaus.jackson.annotate.JsonIgnore;
|
import org.codehaus.jackson.annotate.JsonIgnore;
|
||||||
|
@ -18,12 +19,17 @@ public class ConnectedComponent implements Serializable {
|
||||||
private Set<String> docIds;
|
private Set<String> docIds;
|
||||||
private String ccId;
|
private String ccId;
|
||||||
|
|
||||||
public ConnectedComponent() {
|
public ConnectedComponent(Set<String> docIds, final int cut) {
|
||||||
}
|
|
||||||
|
|
||||||
public ConnectedComponent(Set<String> docIds) {
|
|
||||||
this.docIds = docIds;
|
this.docIds = docIds;
|
||||||
createID();
|
createID();
|
||||||
|
if (cut > 0 && docIds.size() > cut) {
|
||||||
|
this.docIds = docIds
|
||||||
|
.stream()
|
||||||
|
.filter(s -> !ccId.equalsIgnoreCase(s))
|
||||||
|
.limit(cut - 1)
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
this.docIds.add(ccId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public String createID() {
|
public String createID() {
|
||||||
|
@ -41,6 +47,7 @@ public class ConnectedComponent implements Serializable {
|
||||||
public String getMin() {
|
public String getMin() {
|
||||||
|
|
||||||
final StringBuilder min = new StringBuilder();
|
final StringBuilder min = new StringBuilder();
|
||||||
|
|
||||||
docIds
|
docIds
|
||||||
.forEach(
|
.forEach(
|
||||||
i -> {
|
i -> {
|
||||||
|
|
|
@ -7,7 +7,7 @@ import scala.collection.JavaConversions;
|
||||||
|
|
||||||
object GraphProcessor {
|
object GraphProcessor {
|
||||||
|
|
||||||
def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], maxIterations: Int): RDD[ConnectedComponent] = {
|
def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], maxIterations: Int, cut:Int): RDD[ConnectedComponent] = {
|
||||||
val graph: Graph[String, String] = Graph(vertexes, edges).partitionBy(PartitionStrategy.RandomVertexCut) //TODO remember to remove partitionby
|
val graph: Graph[String, String] = Graph(vertexes, edges).partitionBy(PartitionStrategy.RandomVertexCut) //TODO remember to remove partitionby
|
||||||
val cc = graph.connectedComponents(maxIterations).vertices
|
val cc = graph.connectedComponents(maxIterations).vertices
|
||||||
|
|
||||||
|
@ -22,15 +22,15 @@ object GraphProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
val connectedComponents = joinResult.groupByKey()
|
val connectedComponents = joinResult.groupByKey()
|
||||||
.map[ConnectedComponent](cc => asConnectedComponent(cc))
|
.map[ConnectedComponent](cc => asConnectedComponent(cc, cut))
|
||||||
connectedComponents
|
connectedComponents
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def asConnectedComponent(group: (VertexId, Iterable[String])): ConnectedComponent = {
|
def asConnectedComponent(group: (VertexId, Iterable[String]), cut:Int): ConnectedComponent = {
|
||||||
val docs = group._2.toSet[String]
|
val docs = group._2.toSet[String]
|
||||||
val connectedComponent = new ConnectedComponent(JavaConversions.setAsJavaSet[String](docs));
|
val connectedComponent = new ConnectedComponent(JavaConversions.setAsJavaSet[String](docs), cut);
|
||||||
connectedComponent
|
connectedComponent
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,45 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.oa.dedup.model;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
public class BlockStats implements Serializable {
|
||||||
|
|
||||||
|
private String key; // key of the block
|
||||||
|
private Long size; // number of elements in the block
|
||||||
|
private Long comparisons; // number of comparisons in the block
|
||||||
|
|
||||||
|
public BlockStats() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public BlockStats(String key, Long size, Long comparisons) {
|
||||||
|
this.key = key;
|
||||||
|
this.size = size;
|
||||||
|
this.comparisons = comparisons;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getKey() {
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setKey(String key) {
|
||||||
|
this.key = key;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getSize() {
|
||||||
|
return size;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSize(Long size) {
|
||||||
|
this.size = size;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getComparisons() {
|
||||||
|
return comparisons;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setComparisons(Long comparisons) {
|
||||||
|
this.comparisons = comparisons;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,32 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"paramName": "la",
|
||||||
|
"paramLongName": "isLookUpUrl",
|
||||||
|
"paramDescription": "address for the LookUp",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "asi",
|
||||||
|
"paramLongName": "actionSetId",
|
||||||
|
"paramDescription": "action set identifier (name of the orchestrator)",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "i",
|
||||||
|
"paramLongName": "graphBasePath",
|
||||||
|
"paramDescription": "the base path of the raw graph",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "w",
|
||||||
|
"paramLongName": "workingPath",
|
||||||
|
"paramDescription": "path of the working directory",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "np",
|
||||||
|
"paramLongName": "numPartitions",
|
||||||
|
"paramDescription": "number of partitions for the similarity relations intermediate phases",
|
||||||
|
"paramRequired": false
|
||||||
|
}
|
||||||
|
]
|
|
@ -17,6 +17,12 @@
|
||||||
"paramDescription": "the url for the lookup service",
|
"paramDescription": "the url for the lookup service",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"paramName": "cc",
|
||||||
|
"paramLongName": "cutConnectedComponent",
|
||||||
|
"paramDescription": "the number of maximum elements that belongs to a connected components",
|
||||||
|
"paramRequired": false
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"paramName": "w",
|
"paramName": "w",
|
||||||
"paramLongName": "workingPath",
|
"paramLongName": "workingPath",
|
||||||
|
|
|
@ -22,5 +22,11 @@
|
||||||
"paramLongName": "workingPath",
|
"paramLongName": "workingPath",
|
||||||
"paramDescription": "path of the working directory",
|
"paramDescription": "path of the working directory",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "np",
|
||||||
|
"paramLongName": "numPartitions",
|
||||||
|
"paramDescription": "number of partitions for the similarity relations intermediate phases",
|
||||||
|
"paramRequired": false
|
||||||
}
|
}
|
||||||
]
|
]
|
|
@ -20,6 +20,10 @@
|
||||||
<name>dedupGraphPath</name>
|
<name>dedupGraphPath</name>
|
||||||
<description>path for the output graph</description>
|
<description>path for the output graph</description>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>cutConnectedComponent</name>
|
||||||
|
<description>max number of elements in a connected component</description>
|
||||||
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>sparkDriverMemory</name>
|
<name>sparkDriverMemory</name>
|
||||||
<description>memory for driver process</description>
|
<description>memory for driver process</description>
|
||||||
|
@ -106,10 +110,11 @@
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
--conf spark.sql.shuffle.partitions=3840
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--i</arg><arg>${graphBasePath}</arg>
|
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||||
<arg>--la</arg><arg>${isLookUpUrl}</arg>
|
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||||
<arg>--asi</arg><arg>${actionSetId}</arg>
|
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
|
||||||
<arg>--w</arg><arg>${workingPath}</arg>
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
<arg>--numPartitions</arg><arg>8000</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="CreateMergeRel"/>
|
<ok to="CreateMergeRel"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
@ -132,10 +137,11 @@
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
--conf spark.sql.shuffle.partitions=3840
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--i</arg><arg>${graphBasePath}</arg>
|
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||||
<arg>--w</arg><arg>${workingPath}</arg>
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
<arg>--la</arg><arg>${isLookUpUrl}</arg>
|
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||||
<arg>--asi</arg><arg>${actionSetId}</arg>
|
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
|
||||||
|
<arg>--cutConnectedComponent</arg><arg>${cutConnectedComponent}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="CreateDedupRecord"/>
|
<ok to="CreateDedupRecord"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
@ -158,10 +164,10 @@
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
--conf spark.sql.shuffle.partitions=3840
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--i</arg><arg>${graphBasePath}</arg>
|
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||||
<arg>--w</arg><arg>${workingPath}</arg>
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
<arg>--la</arg><arg>${isLookUpUrl}</arg>
|
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||||
<arg>--asi</arg><arg>${actionSetId}</arg>
|
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="UpdateEntity"/>
|
<ok to="UpdateEntity"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
@ -184,9 +190,9 @@
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
--conf spark.sql.shuffle.partitions=3840
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--i</arg><arg>${graphBasePath}</arg>
|
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||||
<arg>--w</arg><arg>${workingPath}</arg>
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
<arg>--o</arg><arg>${dedupGraphPath}</arg>
|
<arg>--dedupGraphPath</arg><arg>${dedupGraphPath}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="copyRelations"/>
|
<ok to="copyRelations"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
|
|
@ -0,0 +1,18 @@
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>jobTracker</name>
|
||||||
|
<value>yarnRM</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>nameNode</name>
|
||||||
|
<value>hdfs://nameservice1</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.use.system.libpath</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.action.sharelib.for.spark</name>
|
||||||
|
<value>spark2</value>
|
||||||
|
</property>
|
||||||
|
</configuration>
|
|
@ -0,0 +1,108 @@
|
||||||
|
<workflow-app name="Create dedup blocks" xmlns="uri:oozie:workflow:0.5">
|
||||||
|
<parameters>
|
||||||
|
<property>
|
||||||
|
<name>graphBasePath</name>
|
||||||
|
<description>the raw graph base path</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>isLookUpUrl</name>
|
||||||
|
<description>the address of the lookUp service</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>actionSetId</name>
|
||||||
|
<description>id of the actionSet</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>numPartitions</name>
|
||||||
|
<description>number of partitions for the similarity relations intermediate phases</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkDriverMemory</name>
|
||||||
|
<description>memory for driver process</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkExecutorMemory</name>
|
||||||
|
<description>memory for individual executor</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkExecutorCores</name>
|
||||||
|
<description>number of cores used by single executor</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozieActionShareLibForSpark2</name>
|
||||||
|
<description>oozie action sharelib for spark 2.*</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2ExtraListeners</name>
|
||||||
|
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
||||||
|
<description>spark 2.* extra listeners classname</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2SqlQueryExecutionListeners</name>
|
||||||
|
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
||||||
|
<description>spark 2.* sql query execution listeners classname</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2YarnHistoryServerAddress</name>
|
||||||
|
<description>spark 2.* yarn history server address</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2EventLogDir</name>
|
||||||
|
<description>spark 2.* event log dir location</description>
|
||||||
|
</property>
|
||||||
|
</parameters>
|
||||||
|
|
||||||
|
<global>
|
||||||
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
|
<name-node>${nameNode}</name-node>
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>mapreduce.job.queuename</name>
|
||||||
|
<value>${queueName}</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.launcher.mapred.job.queue.name</name>
|
||||||
|
<value>${oozieLauncherQueueName}</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.action.sharelib.for.spark</name>
|
||||||
|
<value>${oozieActionShareLibForSpark2}</value>
|
||||||
|
</property>
|
||||||
|
</configuration>
|
||||||
|
</global>
|
||||||
|
|
||||||
|
<start to="CreateBlockStats"/>
|
||||||
|
|
||||||
|
<kill name="Kill">
|
||||||
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
|
</kill>
|
||||||
|
|
||||||
|
<action name="CreateBlockStats">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Create deduplication blocks</name>
|
||||||
|
<class>eu.dnetlib.dhp.oa.dedup.SparkBlockStats</class>
|
||||||
|
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||||
|
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||||
|
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${workingDir}</arg>
|
||||||
|
<arg>--numPartitions</arg><arg>${numPartitions}</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="End"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<end name="End"/>
|
||||||
|
</workflow-app>
|
|
@ -1,17 +1,17 @@
|
||||||
[
|
[
|
||||||
{
|
{
|
||||||
"paramName": "i",
|
"paramName": "i",
|
||||||
"paramLongName": "graphBasePath",
|
"paramLongName": "graphBasePath",
|
||||||
"paramDescription": "the base path of raw graph",
|
"paramDescription": "the base path of raw graph",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"paramName": "w",
|
"paramName": "w",
|
||||||
"paramLongName": "workingPath",
|
"paramLongName": "workingPath",
|
||||||
"paramDescription": "the working directory path",
|
"paramDescription": "the working directory path",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"paramName": "o",
|
"paramName": "o",
|
||||||
"paramLongName": "dedupGraphPath",
|
"paramLongName": "dedupGraphPath",
|
||||||
"paramDescription": "the path of the dedup graph",
|
"paramDescription": "the path of the dedup graph",
|
||||||
|
|
|
@ -3,6 +3,8 @@ package eu.dnetlib.dhp.oa.dedup;
|
||||||
|
|
||||||
import static java.nio.file.Files.createTempDirectory;
|
import static java.nio.file.Files.createTempDirectory;
|
||||||
|
|
||||||
|
import static org.apache.spark.sql.functions.col;
|
||||||
|
import static org.apache.spark.sql.functions.count;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.mockito.Mockito.lenient;
|
import static org.mockito.Mockito.lenient;
|
||||||
|
|
||||||
|
@ -11,6 +13,9 @@ import java.io.IOException;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
|
@ -18,6 +23,7 @@ import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.JavaPairRDD;
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.api.java.function.FilterFunction;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.api.java.function.PairFunction;
|
import org.apache.spark.api.java.function.PairFunction;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
|
@ -71,11 +77,13 @@ public class SparkDedupTest implements Serializable {
|
||||||
FileUtils.deleteDirectory(new File(testOutputBasePath));
|
FileUtils.deleteDirectory(new File(testOutputBasePath));
|
||||||
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
|
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
|
||||||
|
|
||||||
|
final SparkConf conf = new SparkConf();
|
||||||
|
conf.set("spark.sql.shuffle.partitions", "200");
|
||||||
spark = SparkSession
|
spark = SparkSession
|
||||||
.builder()
|
.builder()
|
||||||
.appName(SparkDedupTest.class.getSimpleName())
|
.appName(SparkDedupTest.class.getSimpleName())
|
||||||
.master("local[*]")
|
.master("local[*]")
|
||||||
.config(new SparkConf())
|
.config(conf)
|
||||||
.getOrCreate();
|
.getOrCreate();
|
||||||
|
|
||||||
jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
|
@ -152,37 +160,42 @@ public class SparkDedupTest implements Serializable {
|
||||||
parser
|
parser
|
||||||
.parseArgument(
|
.parseArgument(
|
||||||
new String[] {
|
new String[] {
|
||||||
"-i",
|
"-i", testGraphBasePath,
|
||||||
testGraphBasePath,
|
"-asi", testActionSetId,
|
||||||
"-asi",
|
"-la", "lookupurl",
|
||||||
testActionSetId,
|
"-w", testOutputBasePath,
|
||||||
"-la",
|
"-np", "50"
|
||||||
"lookupurl",
|
|
||||||
"-w",
|
|
||||||
testOutputBasePath
|
|
||||||
});
|
});
|
||||||
|
|
||||||
new SparkCreateSimRels(parser, spark).run(isLookUpService);
|
new SparkCreateSimRels(parser, spark).run(isLookUpService);
|
||||||
|
|
||||||
long orgs_simrel = spark
|
long orgs_simrel = spark
|
||||||
.read()
|
.read()
|
||||||
.load(testOutputBasePath + "/" + testActionSetId + "/organization_simrel")
|
.textFile(testOutputBasePath + "/" + testActionSetId + "/organization_simrel")
|
||||||
.count();
|
.count();
|
||||||
|
|
||||||
long pubs_simrel = spark
|
long pubs_simrel = spark
|
||||||
.read()
|
.read()
|
||||||
.load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel")
|
.textFile(testOutputBasePath + "/" + testActionSetId + "/publication_simrel")
|
||||||
.count();
|
.count();
|
||||||
long sw_simrel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/software_simrel").count();
|
|
||||||
|
|
||||||
long ds_simrel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel").count();
|
long sw_simrel = spark
|
||||||
|
.read()
|
||||||
|
.textFile(testOutputBasePath + "/" + testActionSetId + "/software_simrel")
|
||||||
|
.count();
|
||||||
|
|
||||||
|
long ds_simrel = spark
|
||||||
|
.read()
|
||||||
|
.textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel")
|
||||||
|
.count();
|
||||||
|
|
||||||
long orp_simrel = spark
|
long orp_simrel = spark
|
||||||
.read()
|
.read()
|
||||||
.load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel")
|
.textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel")
|
||||||
.count();
|
.count();
|
||||||
|
|
||||||
assertEquals(3432, orgs_simrel);
|
assertEquals(3432, orgs_simrel);
|
||||||
assertEquals(7054, pubs_simrel);
|
assertEquals(7152, pubs_simrel);
|
||||||
assertEquals(344, sw_simrel);
|
assertEquals(344, sw_simrel);
|
||||||
assertEquals(458, ds_simrel);
|
assertEquals(458, ds_simrel);
|
||||||
assertEquals(6750, orp_simrel);
|
assertEquals(6750, orp_simrel);
|
||||||
|
@ -190,6 +203,101 @@ public class SparkDedupTest implements Serializable {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@Order(2)
|
@Order(2)
|
||||||
|
public void cutMergeRelsTest() throws Exception {
|
||||||
|
|
||||||
|
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
SparkCreateMergeRels.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")));
|
||||||
|
parser
|
||||||
|
.parseArgument(
|
||||||
|
new String[] {
|
||||||
|
"-i",
|
||||||
|
testGraphBasePath,
|
||||||
|
"-asi",
|
||||||
|
testActionSetId,
|
||||||
|
"-la",
|
||||||
|
"lookupurl",
|
||||||
|
"-w",
|
||||||
|
testOutputBasePath,
|
||||||
|
"-cc",
|
||||||
|
"3"
|
||||||
|
});
|
||||||
|
|
||||||
|
new SparkCreateMergeRels(parser, spark).run(isLookUpService);
|
||||||
|
|
||||||
|
long orgs_mergerel = spark
|
||||||
|
.read()
|
||||||
|
.load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel")
|
||||||
|
.as(Encoders.bean(Relation.class))
|
||||||
|
.filter((FilterFunction<Relation>) r -> r.getRelClass().equalsIgnoreCase("merges"))
|
||||||
|
.groupBy("source")
|
||||||
|
.agg(count("target").alias("cnt"))
|
||||||
|
.select("source", "cnt")
|
||||||
|
.where("cnt > 3")
|
||||||
|
.count();
|
||||||
|
|
||||||
|
long pubs_mergerel = spark
|
||||||
|
.read()
|
||||||
|
.load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel")
|
||||||
|
.as(Encoders.bean(Relation.class))
|
||||||
|
.filter((FilterFunction<Relation>) r -> r.getRelClass().equalsIgnoreCase("merges"))
|
||||||
|
.groupBy("source")
|
||||||
|
.agg(count("target").alias("cnt"))
|
||||||
|
.select("source", "cnt")
|
||||||
|
.where("cnt > 3")
|
||||||
|
.count();
|
||||||
|
long sw_mergerel = spark
|
||||||
|
.read()
|
||||||
|
.load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel")
|
||||||
|
.as(Encoders.bean(Relation.class))
|
||||||
|
.filter((FilterFunction<Relation>) r -> r.getRelClass().equalsIgnoreCase("merges"))
|
||||||
|
.groupBy("source")
|
||||||
|
.agg(count("target").alias("cnt"))
|
||||||
|
.select("source", "cnt")
|
||||||
|
.where("cnt > 3")
|
||||||
|
.count();
|
||||||
|
|
||||||
|
long ds_mergerel = spark
|
||||||
|
.read()
|
||||||
|
.load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel")
|
||||||
|
.as(Encoders.bean(Relation.class))
|
||||||
|
.filter((FilterFunction<Relation>) r -> r.getRelClass().equalsIgnoreCase("merges"))
|
||||||
|
.groupBy("source")
|
||||||
|
.agg(count("target").alias("cnt"))
|
||||||
|
.select("source", "cnt")
|
||||||
|
.where("cnt > 3")
|
||||||
|
.count();
|
||||||
|
|
||||||
|
long orp_mergerel = spark
|
||||||
|
.read()
|
||||||
|
.load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel")
|
||||||
|
.as(Encoders.bean(Relation.class))
|
||||||
|
.filter((FilterFunction<Relation>) r -> r.getRelClass().equalsIgnoreCase("merges"))
|
||||||
|
.groupBy("source")
|
||||||
|
.agg(count("target").alias("cnt"))
|
||||||
|
.select("source", "cnt")
|
||||||
|
.where("cnt > 3")
|
||||||
|
.count();
|
||||||
|
|
||||||
|
assertEquals(0, orgs_mergerel);
|
||||||
|
assertEquals(0, pubs_mergerel);
|
||||||
|
assertEquals(0, sw_mergerel);
|
||||||
|
assertEquals(0, ds_mergerel);
|
||||||
|
assertEquals(0, orp_mergerel);
|
||||||
|
|
||||||
|
FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel"));
|
||||||
|
FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel"));
|
||||||
|
FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/software_mergerel"));
|
||||||
|
FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel"));
|
||||||
|
FileUtils
|
||||||
|
.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@Order(3)
|
||||||
public void createMergeRelsTest() throws Exception {
|
public void createMergeRelsTest() throws Exception {
|
||||||
|
|
||||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
@ -225,8 +333,10 @@ public class SparkDedupTest implements Serializable {
|
||||||
.read()
|
.read()
|
||||||
.load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel")
|
.load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel")
|
||||||
.count();
|
.count();
|
||||||
|
long ds_mergerel = spark
|
||||||
long ds_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel").count();
|
.read()
|
||||||
|
.load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel")
|
||||||
|
.count();
|
||||||
|
|
||||||
long orp_mergerel = spark
|
long orp_mergerel = spark
|
||||||
.read()
|
.read()
|
||||||
|
@ -234,14 +344,14 @@ public class SparkDedupTest implements Serializable {
|
||||||
.count();
|
.count();
|
||||||
|
|
||||||
assertEquals(1276, orgs_mergerel);
|
assertEquals(1276, orgs_mergerel);
|
||||||
assertEquals(1440, pubs_mergerel);
|
assertEquals(1442, pubs_mergerel);
|
||||||
assertEquals(288, sw_mergerel);
|
assertEquals(288, sw_mergerel);
|
||||||
assertEquals(472, ds_mergerel);
|
assertEquals(472, ds_mergerel);
|
||||||
assertEquals(718, orp_mergerel);
|
assertEquals(718, orp_mergerel);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@Order(3)
|
@Order(4)
|
||||||
public void createDedupRecordTest() throws Exception {
|
public void createDedupRecordTest() throws Exception {
|
||||||
|
|
||||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
@ -288,7 +398,7 @@ public class SparkDedupTest implements Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@Order(4)
|
@Order(5)
|
||||||
public void updateEntityTest() throws Exception {
|
public void updateEntityTest() throws Exception {
|
||||||
|
|
||||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
@ -404,7 +514,7 @@ public class SparkDedupTest implements Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@Order(5)
|
@Order(6)
|
||||||
public void propagateRelationTest() throws Exception {
|
public void propagateRelationTest() throws Exception {
|
||||||
|
|
||||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
@ -423,7 +533,7 @@ public class SparkDedupTest implements Serializable {
|
||||||
|
|
||||||
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
|
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
|
||||||
|
|
||||||
assertEquals(4971, relations);
|
assertEquals(4866, relations);
|
||||||
|
|
||||||
// check deletedbyinference
|
// check deletedbyinference
|
||||||
final Dataset<Relation> mergeRels = spark
|
final Dataset<Relation> mergeRels = spark
|
||||||
|
@ -454,7 +564,7 @@ public class SparkDedupTest implements Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@Order(6)
|
@Order(7)
|
||||||
public void testRelations() throws Exception {
|
public void testRelations() throws Exception {
|
||||||
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_1.json", 12, 10);
|
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_1.json", 12, 10);
|
||||||
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2);
|
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2);
|
||||||
|
|
|
@ -0,0 +1,177 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.oa.dedup;
|
||||||
|
|
||||||
|
import static java.nio.file.Files.createTempDirectory;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.mockito.Mockito.lenient;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.junit.jupiter.api.*;
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||||
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||||
|
|
||||||
|
@ExtendWith(MockitoExtension.class)
|
||||||
|
public class SparkStatsTest implements Serializable {
|
||||||
|
|
||||||
|
@Mock(serializable = true)
|
||||||
|
ISLookUpService isLookUpService;
|
||||||
|
|
||||||
|
private static SparkSession spark;
|
||||||
|
private static JavaSparkContext jsc;
|
||||||
|
|
||||||
|
private static String testGraphBasePath;
|
||||||
|
private static String testOutputBasePath;
|
||||||
|
private static final String testActionSetId = "test-orchestrator";
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
public static void cleanUp() throws IOException, URISyntaxException {
|
||||||
|
|
||||||
|
testGraphBasePath = Paths
|
||||||
|
.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/entities").toURI())
|
||||||
|
.toFile()
|
||||||
|
.getAbsolutePath();
|
||||||
|
testOutputBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-")
|
||||||
|
.toAbsolutePath()
|
||||||
|
.toString();
|
||||||
|
|
||||||
|
FileUtils.deleteDirectory(new File(testOutputBasePath));
|
||||||
|
|
||||||
|
final SparkConf conf = new SparkConf();
|
||||||
|
conf.set("spark.sql.shuffle.partitions", "200");
|
||||||
|
spark = SparkSession
|
||||||
|
.builder()
|
||||||
|
.appName(SparkDedupTest.class.getSimpleName())
|
||||||
|
.master("local[*]")
|
||||||
|
.config(conf)
|
||||||
|
.getOrCreate();
|
||||||
|
|
||||||
|
jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void setUp() throws IOException, ISLookUpException {
|
||||||
|
|
||||||
|
lenient()
|
||||||
|
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains(testActionSetId)))
|
||||||
|
.thenReturn(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
SparkDedupTest.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator.xml")));
|
||||||
|
|
||||||
|
lenient()
|
||||||
|
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("organization")))
|
||||||
|
.thenReturn(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
SparkDedupTest.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
|
||||||
|
|
||||||
|
lenient()
|
||||||
|
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("publication")))
|
||||||
|
.thenReturn(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
SparkDedupTest.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json")));
|
||||||
|
|
||||||
|
lenient()
|
||||||
|
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("software")))
|
||||||
|
.thenReturn(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
SparkDedupTest.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json")));
|
||||||
|
|
||||||
|
lenient()
|
||||||
|
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("dataset")))
|
||||||
|
.thenReturn(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
SparkDedupTest.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json")));
|
||||||
|
|
||||||
|
lenient()
|
||||||
|
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("otherresearchproduct")))
|
||||||
|
.thenReturn(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
SparkDedupTest.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json")));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void createBlockStatsTest() throws Exception {
|
||||||
|
|
||||||
|
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
SparkCreateSimRels.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/oa/dedup/createBlockStats_parameters.json")));
|
||||||
|
parser
|
||||||
|
.parseArgument(
|
||||||
|
new String[] {
|
||||||
|
"-i", testGraphBasePath,
|
||||||
|
"-asi", testActionSetId,
|
||||||
|
"-la", "lookupurl",
|
||||||
|
"-w", testOutputBasePath
|
||||||
|
});
|
||||||
|
|
||||||
|
new SparkBlockStats(parser, spark).run(isLookUpService);
|
||||||
|
|
||||||
|
long orgs_blocks = spark
|
||||||
|
.read()
|
||||||
|
.textFile(testOutputBasePath + "/" + testActionSetId + "/organization_blockstats")
|
||||||
|
.count();
|
||||||
|
|
||||||
|
long pubs_blocks = spark
|
||||||
|
.read()
|
||||||
|
.textFile(testOutputBasePath + "/" + testActionSetId + "/publication_blockstats")
|
||||||
|
.count();
|
||||||
|
|
||||||
|
long sw_blocks = spark
|
||||||
|
.read()
|
||||||
|
.textFile(testOutputBasePath + "/" + testActionSetId + "/software_blockstats")
|
||||||
|
.count();
|
||||||
|
|
||||||
|
long ds_blocks = spark
|
||||||
|
.read()
|
||||||
|
.textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_blockstats")
|
||||||
|
.count();
|
||||||
|
|
||||||
|
long orp_blocks = spark
|
||||||
|
.read()
|
||||||
|
.textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_blockstats")
|
||||||
|
.count();
|
||||||
|
|
||||||
|
assertEquals(121, orgs_blocks);
|
||||||
|
assertEquals(110, pubs_blocks);
|
||||||
|
assertEquals(21, sw_blocks);
|
||||||
|
assertEquals(67, ds_blocks);
|
||||||
|
assertEquals(55, orp_blocks);
|
||||||
|
}
|
||||||
|
}
|
|
@ -6,10 +6,10 @@
|
||||||
"subEntityType" : "resulttype",
|
"subEntityType" : "resulttype",
|
||||||
"subEntityValue" : "dataset",
|
"subEntityValue" : "dataset",
|
||||||
"orderField" : "title",
|
"orderField" : "title",
|
||||||
"queueMaxSize" : "800",
|
"queueMaxSize" : "100",
|
||||||
"groupMaxSize" : "100",
|
"groupMaxSize" : "100",
|
||||||
"maxChildren" : "100",
|
"maxChildren" : "100",
|
||||||
"slidingWindowSize" : "80",
|
"slidingWindowSize" : "100",
|
||||||
"rootBuilder" : ["result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
|
"rootBuilder" : ["result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
|
||||||
"includeChildren" : "true",
|
"includeChildren" : "true",
|
||||||
"idPath" : "$.id",
|
"idPath" : "$.id",
|
||||||
|
@ -17,7 +17,8 @@
|
||||||
},
|
},
|
||||||
"pace" : {
|
"pace" : {
|
||||||
"clustering" : [
|
"clustering" : [
|
||||||
{ "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } },
|
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
|
||||||
|
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
|
||||||
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
|
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
|
||||||
],
|
],
|
||||||
"decisionTree" : {
|
"decisionTree" : {
|
||||||
|
|
|
@ -6,10 +6,10 @@
|
||||||
"subEntityType" : "resulttype",
|
"subEntityType" : "resulttype",
|
||||||
"subEntityValue" : "otherresearchproduct",
|
"subEntityValue" : "otherresearchproduct",
|
||||||
"orderField" : "title",
|
"orderField" : "title",
|
||||||
"queueMaxSize" : "800",
|
"queueMaxSize" : "100",
|
||||||
"groupMaxSize" : "100",
|
"groupMaxSize" : "100",
|
||||||
"maxChildren" : "100",
|
"maxChildren" : "100",
|
||||||
"slidingWindowSize" : "80",
|
"slidingWindowSize" : "100",
|
||||||
"rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
|
"rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
|
||||||
"includeChildren" : "true",
|
"includeChildren" : "true",
|
||||||
"idPath" : "$.id",
|
"idPath" : "$.id",
|
||||||
|
@ -17,7 +17,8 @@
|
||||||
},
|
},
|
||||||
"pace" : {
|
"pace" : {
|
||||||
"clustering" : [
|
"clustering" : [
|
||||||
{ "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } },
|
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
|
||||||
|
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
|
||||||
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
|
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
|
||||||
],
|
],
|
||||||
"decisionTree" : {
|
"decisionTree" : {
|
||||||
|
|
|
@ -6,10 +6,10 @@
|
||||||
"subEntityType": "resulttype",
|
"subEntityType": "resulttype",
|
||||||
"subEntityValue": "publication",
|
"subEntityValue": "publication",
|
||||||
"orderField": "title",
|
"orderField": "title",
|
||||||
"queueMaxSize": "800",
|
"queueMaxSize": "100",
|
||||||
"groupMaxSize": "100",
|
"groupMaxSize": "100",
|
||||||
"maxChildren": "100",
|
"maxChildren": "100",
|
||||||
"slidingWindowSize": "80",
|
"slidingWindowSize": "100",
|
||||||
"rootBuilder": [
|
"rootBuilder": [
|
||||||
"result",
|
"result",
|
||||||
"resultProject_outcome_isProducedBy",
|
"resultProject_outcome_isProducedBy",
|
||||||
|
@ -29,7 +29,8 @@
|
||||||
},
|
},
|
||||||
"pace": {
|
"pace": {
|
||||||
"clustering" : [
|
"clustering" : [
|
||||||
{ "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } },
|
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
|
||||||
|
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
|
||||||
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
|
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
|
||||||
],
|
],
|
||||||
"decisionTree": {
|
"decisionTree": {
|
||||||
|
|
|
@ -6,10 +6,10 @@
|
||||||
"subEntityType" : "resulttype",
|
"subEntityType" : "resulttype",
|
||||||
"subEntityValue" : "software",
|
"subEntityValue" : "software",
|
||||||
"orderField" : "title",
|
"orderField" : "title",
|
||||||
"queueMaxSize" : "800",
|
"queueMaxSize" : "100",
|
||||||
"groupMaxSize" : "100",
|
"groupMaxSize" : "100",
|
||||||
"maxChildren" : "100",
|
"maxChildren" : "100",
|
||||||
"slidingWindowSize" : "80",
|
"slidingWindowSize" : "100",
|
||||||
"rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
|
"rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
|
||||||
"includeChildren" : "true",
|
"includeChildren" : "true",
|
||||||
"idPath" : "$.id",
|
"idPath" : "$.id",
|
||||||
|
@ -17,8 +17,9 @@
|
||||||
},
|
},
|
||||||
"pace" : {
|
"pace" : {
|
||||||
"clustering" : [
|
"clustering" : [
|
||||||
{ "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } },
|
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
|
||||||
{ "name" : "lowercase", "fields" : [ "doi", "url" ], "params" : { } }
|
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
|
||||||
|
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
|
||||||
],
|
],
|
||||||
"decisionTree": {
|
"decisionTree": {
|
||||||
"start": {
|
"start": {
|
||||||
|
|
|
@ -116,23 +116,10 @@ public class CreateRelatedEntitiesJob_phase1 {
|
||||||
Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class)))
|
Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class)))
|
||||||
.cache();
|
.cache();
|
||||||
|
|
||||||
final String relatedEntityPath = outputPath + "_relatedEntity";
|
Dataset<Tuple2<String, RelatedEntity>> entities = readPathEntity(spark, inputEntityPath, clazz)
|
||||||
readPathEntity(spark, inputEntityPath, clazz)
|
|
||||||
.filter("dataInfo.invisible == false")
|
.filter("dataInfo.invisible == false")
|
||||||
.map(
|
.map(
|
||||||
(MapFunction<E, RelatedEntity>) value -> asRelatedEntity(value, clazz),
|
(MapFunction<E, Tuple2<String, RelatedEntity>>) e -> new Tuple2<>(e.getId(), asRelatedEntity(e, clazz)),
|
||||||
Encoders.kryo(RelatedEntity.class))
|
|
||||||
.repartition(5000)
|
|
||||||
.write()
|
|
||||||
.mode(SaveMode.Overwrite)
|
|
||||||
.parquet(relatedEntityPath);
|
|
||||||
|
|
||||||
Dataset<Tuple2<String, RelatedEntity>> entities = spark
|
|
||||||
.read()
|
|
||||||
.load(relatedEntityPath)
|
|
||||||
.as(Encoders.kryo(RelatedEntity.class))
|
|
||||||
.map(
|
|
||||||
(MapFunction<RelatedEntity, Tuple2<String, RelatedEntity>>) e -> new Tuple2<>(e.getId(), e),
|
|
||||||
Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class)))
|
Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class)))
|
||||||
.cache();
|
.cache();
|
||||||
|
|
||||||
|
@ -184,13 +171,16 @@ public class CreateRelatedEntitiesJob_phase1 {
|
||||||
re.setDateofacceptance(getValue(result.getDateofacceptance()));
|
re.setDateofacceptance(getValue(result.getDateofacceptance()));
|
||||||
re.setPublisher(getValue(result.getPublisher()));
|
re.setPublisher(getValue(result.getPublisher()));
|
||||||
re.setResulttype(result.getResulttype());
|
re.setResulttype(result.getResulttype());
|
||||||
|
if (Objects.nonNull(result.getInstance())) {
|
||||||
re
|
re
|
||||||
.setInstances(
|
.setInstances(
|
||||||
result
|
result
|
||||||
.getInstance()
|
.getInstance()
|
||||||
.stream()
|
.stream()
|
||||||
|
.filter(Objects::nonNull)
|
||||||
.limit(ProvisionConstants.MAX_INSTANCES)
|
.limit(ProvisionConstants.MAX_INSTANCES)
|
||||||
.collect(Collectors.toList()));
|
.collect(Collectors.toList()));
|
||||||
|
}
|
||||||
|
|
||||||
// TODO still to be mapped
|
// TODO still to be mapped
|
||||||
// re.setCodeRepositoryUrl(j.read("$.coderepositoryurl"));
|
// re.setCodeRepositoryUrl(j.read("$.coderepositoryurl"));
|
||||||
|
|
|
@ -329,7 +329,7 @@ public class XmlRecordFactory implements Serializable {
|
||||||
.stream()
|
.stream()
|
||||||
.filter(Objects::nonNull)
|
.filter(Objects::nonNull)
|
||||||
.map(c -> XmlSerializationUtils.asXmlElement("description", c.getValue()))
|
.map(c -> XmlSerializationUtils.asXmlElement("description", c.getValue()))
|
||||||
.collect(Collectors.toList()));
|
.collect(Collectors.toCollection(HashSet::new)));
|
||||||
}
|
}
|
||||||
if (r.getEmbargoenddate() != null) {
|
if (r.getEmbargoenddate() != null) {
|
||||||
metadata
|
metadata
|
||||||
|
@ -370,7 +370,7 @@ public class XmlRecordFactory implements Serializable {
|
||||||
.stream()
|
.stream()
|
||||||
.filter(Objects::nonNull)
|
.filter(Objects::nonNull)
|
||||||
.map(c -> XmlSerializationUtils.asXmlElement("source", c.getValue()))
|
.map(c -> XmlSerializationUtils.asXmlElement("source", c.getValue()))
|
||||||
.collect(Collectors.toList()));
|
.collect(Collectors.toCollection(HashSet::new)));
|
||||||
}
|
}
|
||||||
if (r.getFormat() != null) {
|
if (r.getFormat() != null) {
|
||||||
metadata
|
metadata
|
||||||
|
|
|
@ -578,10 +578,18 @@
|
||||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||||
<arg>--otherDsTypeId</arg><arg>${otherDsTypeId}</arg>
|
<arg>--otherDsTypeId</arg><arg>${otherDsTypeId}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="to_solr_index"/>
|
<ok to="should_index"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
|
<decision name="should_index">
|
||||||
|
<switch>
|
||||||
|
<case to="to_solr_index">${wf:conf('shouldIndex') eq 'true'}</case>
|
||||||
|
<case to="End">${wf:conf('shouldIndex') eq 'false'}</case>
|
||||||
|
<default to="to_solr_index"/>
|
||||||
|
</switch>
|
||||||
|
</decision>
|
||||||
|
|
||||||
<action name="to_solr_index">
|
<action name="to_solr_index">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
|
|
7
pom.xml
7
pom.xml
|
@ -323,6 +323,12 @@
|
||||||
<version>[2.0.0,3.0.0)</version>
|
<version>[2.0.0,3.0.0)</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>eu.dnetlib.dhp</groupId>
|
||||||
|
<artifactId>dnet-openaire-broker-common</artifactId>
|
||||||
|
<version>${dnet.openaire.broker.common}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.cxf</groupId>
|
<groupId>org.apache.cxf</groupId>
|
||||||
<artifactId>cxf-rt-transports-http</artifactId>
|
<artifactId>cxf-rt-transports-http</artifactId>
|
||||||
|
@ -618,5 +624,6 @@
|
||||||
<mockito-core.version>3.3.3</mockito-core.version>
|
<mockito-core.version>3.3.3</mockito-core.version>
|
||||||
<mongodb.driver.version>3.4.2</mongodb.driver.version>
|
<mongodb.driver.version>3.4.2</mongodb.driver.version>
|
||||||
<vtd.version>[2.12,3.0)</vtd.version>
|
<vtd.version>[2.12,3.0)</vtd.version>
|
||||||
|
<dnet.openaire.broker.common>3.1.0</dnet.openaire.broker.common>
|
||||||
</properties>
|
</properties>
|
||||||
</project>
|
</project>
|
||||||
|
|
Loading…
Reference in New Issue