forked from D-Net/dnet-hadoop
Merge branch 'master' of code-repo.d4science.org:D-Net/dnet-hadoop
This commit is contained in:
commit
18b9330312
|
@ -59,7 +59,7 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>eu.dnetlib</groupId>
|
<groupId>eu.dnetlib</groupId>
|
||||||
<artifactId>dnet-openaire-broker-common</artifactId>
|
<artifactId>dnet-openaire-broker-common</artifactId>
|
||||||
<version>[3.0.3,4.0.0)</version>
|
<version>[3.0.4,4.0.0)</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
|
@ -42,6 +42,7 @@ public class EventFactory {
|
||||||
res.setCreationDate(now);
|
res.setCreationDate(now);
|
||||||
res.setExpiryDate(calculateExpiryDate(now));
|
res.setExpiryDate(calculateExpiryDate(now));
|
||||||
res.setInstantMessage(false);
|
res.setInstantMessage(false);
|
||||||
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -53,6 +54,7 @@ public class EventFactory {
|
||||||
|
|
||||||
map.setTargetDatasourceId(target.getCollectedFromId());
|
map.setTargetDatasourceId(target.getCollectedFromId());
|
||||||
map.setTargetDatasourceName(target.getCollectedFromName());
|
map.setTargetDatasourceName(target.getCollectedFromName());
|
||||||
|
map.setTargetDatasourceType(target.getCollectedFromType());
|
||||||
|
|
||||||
map.setTargetResultId(target.getOpenaireId());
|
map.setTargetResultId(target.getOpenaireId());
|
||||||
|
|
||||||
|
@ -73,6 +75,7 @@ public class EventFactory {
|
||||||
map.setTrust(updateInfo.getTrust());
|
map.setTrust(updateInfo.getTrust());
|
||||||
map.setProvenanceDatasourceId(source.getCollectedFromId());
|
map.setProvenanceDatasourceId(source.getCollectedFromId());
|
||||||
map.setProvenanceDatasourceName(source.getCollectedFromName());
|
map.setProvenanceDatasourceName(source.getCollectedFromName());
|
||||||
|
map.setProvenanceDatasourceType(source.getCollectedFromType());
|
||||||
map.setProvenanceResultId(source.getOpenaireId());
|
map.setProvenanceResultId(source.getOpenaireId());
|
||||||
|
|
||||||
return map;
|
return map;
|
||||||
|
|
|
@ -13,6 +13,7 @@ public class MappedFields implements Serializable {
|
||||||
|
|
||||||
private String targetDatasourceId;
|
private String targetDatasourceId;
|
||||||
private String targetDatasourceName;
|
private String targetDatasourceName;
|
||||||
|
private String targetDatasourceType;
|
||||||
private String targetResultId;
|
private String targetResultId;
|
||||||
private String targetResultTitle;
|
private String targetResultTitle;
|
||||||
private long targetDateofacceptance;
|
private long targetDateofacceptance;
|
||||||
|
@ -21,6 +22,7 @@ public class MappedFields implements Serializable {
|
||||||
private float trust;
|
private float trust;
|
||||||
private String provenanceDatasourceId;
|
private String provenanceDatasourceId;
|
||||||
private String provenanceDatasourceName;
|
private String provenanceDatasourceName;
|
||||||
|
private String provenanceDatasourceType;
|
||||||
private String provenanceResultId;
|
private String provenanceResultId;
|
||||||
|
|
||||||
public String getTargetDatasourceId() {
|
public String getTargetDatasourceId() {
|
||||||
|
@ -39,6 +41,14 @@ public class MappedFields implements Serializable {
|
||||||
this.targetDatasourceName = targetDatasourceName;
|
this.targetDatasourceName = targetDatasourceName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getTargetDatasourceType() {
|
||||||
|
return targetDatasourceType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTargetDatasourceType(final String targetDatasourceType) {
|
||||||
|
this.targetDatasourceType = targetDatasourceType;
|
||||||
|
}
|
||||||
|
|
||||||
public String getTargetResultId() {
|
public String getTargetResultId() {
|
||||||
return targetResultId;
|
return targetResultId;
|
||||||
}
|
}
|
||||||
|
@ -103,6 +113,14 @@ public class MappedFields implements Serializable {
|
||||||
this.provenanceDatasourceName = provenanceDatasourceName;
|
this.provenanceDatasourceName = provenanceDatasourceName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getProvenanceDatasourceType() {
|
||||||
|
return provenanceDatasourceType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setProvenanceDatasourceType(final String provenanceDatasourceType) {
|
||||||
|
this.provenanceDatasourceType = provenanceDatasourceType;
|
||||||
|
}
|
||||||
|
|
||||||
public String getProvenanceResultId() {
|
public String getProvenanceResultId() {
|
||||||
return provenanceResultId;
|
return provenanceResultId;
|
||||||
}
|
}
|
||||||
|
@ -111,4 +129,8 @@ public class MappedFields implements Serializable {
|
||||||
this.provenanceResultId = provenanceResultId;
|
this.provenanceResultId = provenanceResultId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static long getSerialversionuid() {
|
||||||
|
return serialVersionUID;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,11 +3,15 @@ package eu.dnetlib.dhp.broker.oa;
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.SparkContext;
|
import org.apache.spark.SparkContext;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
|
@ -59,6 +63,15 @@ public class GenerateEventsJob {
|
||||||
final String eventsPath = workingPath + "/events";
|
final String eventsPath = workingPath + "/events";
|
||||||
log.info("eventsPath: {}", eventsPath);
|
log.info("eventsPath: {}", eventsPath);
|
||||||
|
|
||||||
|
final Set<String> dsIdWhitelist = parseParamAsList(parser, "datasourceIdWhitelist");
|
||||||
|
log.info("datasourceIdWhitelist: {}", StringUtils.join(dsIdWhitelist, ","));
|
||||||
|
|
||||||
|
final Set<String> dsTypeWhitelist = parseParamAsList(parser, "datasourceTypeWhitelist");
|
||||||
|
log.info("datasourceTypeWhitelist: {}", StringUtils.join(dsTypeWhitelist, ","));
|
||||||
|
|
||||||
|
final Set<String> dsIdBlacklist = parseParamAsList(parser, "datasourceIdBlacklist");
|
||||||
|
log.info("datasourceIdBlacklist: {}", StringUtils.join(dsIdBlacklist, ","));
|
||||||
|
|
||||||
final SparkConf conf = new SparkConf();
|
final SparkConf conf = new SparkConf();
|
||||||
|
|
||||||
// TODO UNCOMMENT
|
// TODO UNCOMMENT
|
||||||
|
@ -77,9 +90,12 @@ public class GenerateEventsJob {
|
||||||
.readPath(spark, workingPath + "/duplicates", ResultGroup.class);
|
.readPath(spark, workingPath + "/duplicates", ResultGroup.class);
|
||||||
|
|
||||||
final Dataset<Event> dataset = groups
|
final Dataset<Event> dataset = groups
|
||||||
.map(g -> EventFinder.generateEvents(g, dedupConfig, accumulators), Encoders.bean(EventGroup.class))
|
.map(
|
||||||
.flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class))
|
g -> EventFinder
|
||||||
.map(e -> ClusterUtils.incrementAccumulator(e, total), Encoders.bean(Event.class));
|
.generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, dedupConfig, accumulators),
|
||||||
|
Encoders
|
||||||
|
.bean(EventGroup.class))
|
||||||
|
.flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class));
|
||||||
|
|
||||||
ClusterUtils.save(dataset, eventsPath, Event.class, total);
|
ClusterUtils.save(dataset, eventsPath, Event.class, total);
|
||||||
|
|
||||||
|
@ -87,6 +103,22 @@ public class GenerateEventsJob {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static Set<String> parseParamAsList(final ArgumentApplicationParser parser, final String key) {
|
||||||
|
final String s = parser.get(key).trim();
|
||||||
|
|
||||||
|
final Set<String> res = new HashSet<>();
|
||||||
|
|
||||||
|
if (s.length() > 1) { // A value of a single char (for example: '-') indicates an empty list
|
||||||
|
Arrays
|
||||||
|
.stream(s.split(","))
|
||||||
|
.map(String::trim)
|
||||||
|
.filter(StringUtils::isNotBlank)
|
||||||
|
.forEach(res::add);
|
||||||
|
}
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
public static Map<String, LongAccumulator> prepareAccumulators(final SparkContext sc) {
|
public static Map<String, LongAccumulator> prepareAccumulators(final SparkContext sc) {
|
||||||
|
|
||||||
return EventFinder
|
return EventFinder
|
||||||
|
|
|
@ -48,6 +48,7 @@ public class IndexOnESJob {
|
||||||
|
|
||||||
final JavaRDD<String> inputRdd = ClusterUtils
|
final JavaRDD<String> inputRdd = ClusterUtils
|
||||||
.readPath(spark, eventsPath, Event.class)
|
.readPath(spark, eventsPath, Event.class)
|
||||||
|
.limit(10000) // TODO REMOVE
|
||||||
.map(IndexOnESJob::eventAsJsonString, Encoders.STRING())
|
.map(IndexOnESJob::eventAsJsonString, Encoders.STRING())
|
||||||
.javaRDD();
|
.javaRDD();
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,80 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.broker.oa;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.TypedColumn;
|
||||||
|
import org.apache.spark.util.LongAccumulator;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.AddDatasourceTypeAggregator;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.SimpleDatasourceInfo;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
public class JoinStep0Job {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(JoinStep0Job.class);
|
||||||
|
|
||||||
|
public static void main(final String[] args) throws Exception {
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
JoinStep0Job.class
|
||||||
|
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
final Boolean isSparkSessionManaged = Optional
|
||||||
|
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||||
|
.map(Boolean::valueOf)
|
||||||
|
.orElse(Boolean.TRUE);
|
||||||
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
|
final String graphPath = parser.get("graphPath");
|
||||||
|
log.info("graphPath: {}", graphPath);
|
||||||
|
|
||||||
|
final String workingPath = parser.get("workingPath");
|
||||||
|
log.info("workingPath: {}", workingPath);
|
||||||
|
|
||||||
|
final String outputPath = workingPath + "/joinedEntities_step0";
|
||||||
|
log.info("outputPath: {}", outputPath);
|
||||||
|
|
||||||
|
final SparkConf conf = new SparkConf();
|
||||||
|
|
||||||
|
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
|
||||||
|
|
||||||
|
ClusterUtils.removeDir(spark, outputPath);
|
||||||
|
|
||||||
|
final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
|
||||||
|
|
||||||
|
final Dataset<OaBrokerMainEntity> sources = ClusterUtils
|
||||||
|
.readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class);
|
||||||
|
|
||||||
|
final Dataset<SimpleDatasourceInfo> datasources = ClusterUtils
|
||||||
|
.readPath(spark, workingPath + "/datasources", SimpleDatasourceInfo.class);
|
||||||
|
|
||||||
|
final TypedColumn<Tuple2<OaBrokerMainEntity, SimpleDatasourceInfo>, OaBrokerMainEntity> aggr = new AddDatasourceTypeAggregator()
|
||||||
|
.toColumn();
|
||||||
|
|
||||||
|
final Dataset<OaBrokerMainEntity> dataset = sources
|
||||||
|
.joinWith(datasources, sources.col("collectedFromId").equalTo(datasources.col("id")), "inner")
|
||||||
|
.groupByKey(t -> t._1.getOpenaireId(), Encoders.STRING())
|
||||||
|
.agg(aggr)
|
||||||
|
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));
|
||||||
|
|
||||||
|
ClusterUtils.save(dataset, outputPath, OaBrokerMainEntity.class, total);
|
||||||
|
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -55,7 +55,7 @@ public class JoinStep1Job {
|
||||||
final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
|
final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
|
||||||
|
|
||||||
final Dataset<OaBrokerMainEntity> sources = ClusterUtils
|
final Dataset<OaBrokerMainEntity> sources = ClusterUtils
|
||||||
.readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class);
|
.readPath(spark, workingPath + "/joinedEntities_step0", OaBrokerMainEntity.class);
|
||||||
|
|
||||||
final Dataset<RelatedProject> typedRels = ClusterUtils
|
final Dataset<RelatedProject> typedRels = ClusterUtils
|
||||||
.readPath(spark, workingPath + "/relatedProjects", RelatedProject.class);
|
.readPath(spark, workingPath + "/relatedProjects", RelatedProject.class);
|
||||||
|
|
|
@ -0,0 +1,68 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.broker.oa;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.util.LongAccumulator;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.SimpleDatasourceInfo;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
||||||
|
|
||||||
|
public class PrepareRelatedDatasourcesJob {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(PrepareRelatedDatasourcesJob.class);
|
||||||
|
|
||||||
|
public static void main(final String[] args) throws Exception {
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
PrepareRelatedDatasourcesJob.class
|
||||||
|
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
final Boolean isSparkSessionManaged = Optional
|
||||||
|
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||||
|
.map(Boolean::valueOf)
|
||||||
|
.orElse(Boolean.TRUE);
|
||||||
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
|
final String graphPath = parser.get("graphPath");
|
||||||
|
log.info("graphPath: {}", graphPath);
|
||||||
|
|
||||||
|
final String workingPath = parser.get("workingPath");
|
||||||
|
log.info("workingPath: {}", workingPath);
|
||||||
|
|
||||||
|
final String relsPath = workingPath + "/datasources";
|
||||||
|
log.info("relsPath: {}", relsPath);
|
||||||
|
|
||||||
|
final SparkConf conf = new SparkConf();
|
||||||
|
|
||||||
|
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
|
||||||
|
|
||||||
|
ClusterUtils.removeDir(spark, relsPath);
|
||||||
|
|
||||||
|
final LongAccumulator total = spark.sparkContext().longAccumulator("total_datasources");
|
||||||
|
|
||||||
|
final Dataset<SimpleDatasourceInfo> dataset = ClusterUtils
|
||||||
|
.readPath(spark, graphPath + "/datasource", Datasource.class)
|
||||||
|
.map(
|
||||||
|
ds -> new SimpleDatasourceInfo(ds.getId(), ds.getDatasourcetype().getClassid()),
|
||||||
|
Encoders.bean(SimpleDatasourceInfo.class));
|
||||||
|
|
||||||
|
ClusterUtils.save(dataset, relsPath, SimpleDatasourceInfo.class, total);
|
||||||
|
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -4,8 +4,11 @@ package eu.dnetlib.dhp.broker.oa.util;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.spark.util.LongAccumulator;
|
import org.apache.spark.util.LongAccumulator;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||||
import eu.dnetlib.dhp.broker.model.EventFactory;
|
import eu.dnetlib.dhp.broker.model.EventFactory;
|
||||||
|
@ -38,6 +41,8 @@ import eu.dnetlib.pace.config.DedupConfig;
|
||||||
|
|
||||||
public class EventFinder {
|
public class EventFinder {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(EventFinder.class);
|
||||||
|
|
||||||
private static final List<UpdateMatcher<?>> matchers = new ArrayList<>();
|
private static final List<UpdateMatcher<?>> matchers = new ArrayList<>();
|
||||||
static {
|
static {
|
||||||
matchers.add(new EnrichMissingAbstract());
|
matchers.add(new EnrichMissingAbstract());
|
||||||
|
@ -69,19 +74,39 @@ public class EventFinder {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static EventGroup generateEvents(final ResultGroup results,
|
public static EventGroup generateEvents(final ResultGroup results,
|
||||||
|
final Set<String> dsIdWhitelist,
|
||||||
|
final Set<String> dsIdBlacklist,
|
||||||
|
final Set<String> dsTypeWhitelist,
|
||||||
final DedupConfig dedupConfig,
|
final DedupConfig dedupConfig,
|
||||||
final Map<String, LongAccumulator> accumulators) {
|
final Map<String, LongAccumulator> accumulators) {
|
||||||
|
|
||||||
final List<UpdateInfo<?>> list = new ArrayList<>();
|
final List<UpdateInfo<?>> list = new ArrayList<>();
|
||||||
|
|
||||||
for (final OaBrokerMainEntity target : results.getData()) {
|
for (final OaBrokerMainEntity target : results.getData()) {
|
||||||
for (final UpdateMatcher<?> matcher : matchers) {
|
if (verifyTarget(target, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist)) {
|
||||||
list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), dedupConfig, accumulators));
|
for (final UpdateMatcher<?> matcher : matchers) {
|
||||||
|
list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), dedupConfig, accumulators));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return asEventGroup(list);
|
return asEventGroup(list);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static boolean verifyTarget(final OaBrokerMainEntity target,
|
||||||
|
final Set<String> dsIdWhitelist,
|
||||||
|
final Set<String> dsIdBlacklist,
|
||||||
|
final Set<String> dsTypeWhitelist) {
|
||||||
|
|
||||||
|
if (dsIdWhitelist.contains(target.getCollectedFromId())) {
|
||||||
|
return true;
|
||||||
|
} else if (dsIdBlacklist.contains(target.getCollectedFromId())) {
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
return dsTypeWhitelist.contains(target.getCollectedFromType());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static EventGroup asEventGroup(final List<UpdateInfo<?>> list) {
|
private static EventGroup asEventGroup(final List<UpdateInfo<?>> list) {
|
||||||
final EventGroup events = new EventGroup();
|
final EventGroup events = new EventGroup();
|
||||||
list.stream().map(EventFactory::newBrokerEvent).forEach(events::addElement);
|
list.stream().map(EventFactory::newBrokerEvent).forEach(events::addElement);
|
||||||
|
|
|
@ -113,6 +113,7 @@ public final class UpdateInfo<T> {
|
||||||
|
|
||||||
final String provId = getSource().getOpenaireId();
|
final String provId = getSource().getOpenaireId();
|
||||||
final String provRepo = getSource().getCollectedFromName();
|
final String provRepo = getSource().getCollectedFromName();
|
||||||
|
final String provType = getSource().getCollectedFromType();
|
||||||
|
|
||||||
final String provUrl = getSource()
|
final String provUrl = getSource()
|
||||||
.getInstances()
|
.getInstances()
|
||||||
|
@ -122,7 +123,7 @@ public final class UpdateInfo<T> {
|
||||||
.orElse(null);
|
.orElse(null);
|
||||||
;
|
;
|
||||||
|
|
||||||
final OaBrokerProvenance provenance = new OaBrokerProvenance(provId, provRepo, provUrl);
|
final OaBrokerProvenance provenance = new OaBrokerProvenance(provId, provRepo, provType, provUrl);
|
||||||
|
|
||||||
final OaBrokerEventPayload res = new OaBrokerEventPayload();
|
final OaBrokerEventPayload res = new OaBrokerEventPayload();
|
||||||
res.setResult(target);
|
res.setResult(target);
|
||||||
|
|
|
@ -0,0 +1,59 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.spark.sql.Encoder;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.expressions.Aggregator;
|
||||||
|
|
||||||
|
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
public class AddDatasourceTypeAggregator
|
||||||
|
extends Aggregator<Tuple2<OaBrokerMainEntity, SimpleDatasourceInfo>, OaBrokerMainEntity, OaBrokerMainEntity> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
private static final long serialVersionUID = 8788588975496014728L;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OaBrokerMainEntity zero() {
|
||||||
|
return new OaBrokerMainEntity();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OaBrokerMainEntity finish(final OaBrokerMainEntity g) {
|
||||||
|
return g;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g,
|
||||||
|
final Tuple2<OaBrokerMainEntity, SimpleDatasourceInfo> t) {
|
||||||
|
final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1;
|
||||||
|
if (t._2 != null && StringUtils.isNotBlank(t._2.getType())) {
|
||||||
|
res.setCollectedFromType(t._2.getType());
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
|
||||||
|
if (StringUtils.isNotBlank(g1.getOpenaireId()) && StringUtils.isNotBlank(g1.getCollectedFromType())) {
|
||||||
|
return g1;
|
||||||
|
} else {
|
||||||
|
return g2;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Encoder<OaBrokerMainEntity> bufferEncoder() {
|
||||||
|
return Encoders.bean(OaBrokerMainEntity.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Encoder<OaBrokerMainEntity> outputEncoder() {
|
||||||
|
return Encoders.bean(OaBrokerMainEntity.class);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,40 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
public class SimpleDatasourceInfo implements Serializable {
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
private static final long serialVersionUID = 2996609859416024734L;
|
||||||
|
|
||||||
|
private String id;
|
||||||
|
private String type;
|
||||||
|
|
||||||
|
public SimpleDatasourceInfo() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public SimpleDatasourceInfo(final String id, final String type) {
|
||||||
|
this.id = id;
|
||||||
|
this.type = type;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getId() {
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setId(final String id) {
|
||||||
|
this.id = id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getType() {
|
||||||
|
return type;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setType(final String type) {
|
||||||
|
this.type = type;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -16,6 +16,21 @@
|
||||||
<property>
|
<property>
|
||||||
<name>dedupConfProfId</name>
|
<name>dedupConfProfId</name>
|
||||||
<description>the id of a valid Dedup Configuration Profile</description>
|
<description>the id of a valid Dedup Configuration Profile</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>datasourceIdWhitelist</name>
|
||||||
|
<value>-</value>
|
||||||
|
<description>a white list (comma separeted, - for empty list) of datasource ids</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>datasourceTypeWhitelist</name>
|
||||||
|
<value>-</value>
|
||||||
|
<description>a white list (comma separeted, - for empty list) of datasource types</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>datasourceIdBlacklist</name>
|
||||||
|
<value>-</value>
|
||||||
|
<description>a black list (comma separeted, - for empty list) of datasource ids</description>
|
||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>esIndexName</name>
|
<name>esIndexName</name>
|
||||||
|
@ -96,6 +111,7 @@
|
||||||
|
|
||||||
<fork name="start_entities_and_rels">
|
<fork name="start_entities_and_rels">
|
||||||
<path start="prepare_simple_entities"/>
|
<path start="prepare_simple_entities"/>
|
||||||
|
<path start="prepare_related_datasources"/>
|
||||||
<path start="prepare_related_softwares"/>
|
<path start="prepare_related_softwares"/>
|
||||||
<path start="prepare_related_datasets"/>
|
<path start="prepare_related_datasets"/>
|
||||||
<path start="prepare_related_projects"/>
|
<path start="prepare_related_projects"/>
|
||||||
|
@ -126,6 +142,30 @@
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
|
<action name="prepare_related_datasources">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>PrepareRelatedDatasourcesJob</name>
|
||||||
|
<class>eu.dnetlib.dhp.broker.oa.PrepareRelatedDatasourcesJob</class>
|
||||||
|
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="wait_entities_and_rels"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
|
||||||
<action name="prepare_related_datasets">
|
<action name="prepare_related_datasets">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
@ -223,7 +263,31 @@
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
<join name="wait_entities_and_rels" to="join_entities_step1"/>
|
<join name="wait_entities_and_rels" to="join_entities_step0"/>
|
||||||
|
|
||||||
|
<action name="join_entities_step0">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>JoinStep0</name>
|
||||||
|
<class>eu.dnetlib.dhp.broker.oa.JoinStep0Job</class>
|
||||||
|
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="join_entities_step1"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
<action name="join_entities_step1">
|
<action name="join_entities_step1">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
@ -365,6 +429,9 @@
|
||||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||||
<arg>--dedupConfProfile</arg><arg>${dedupConfProfId}</arg>
|
<arg>--dedupConfProfile</arg><arg>${dedupConfProfId}</arg>
|
||||||
|
<arg>--datasourceIdWhitelist</arg><arg>${datasourceIdWhitelist}</arg>
|
||||||
|
<arg>--datasourceTypeWhitelist</arg><arg>${datasourceTypeWhitelist}</arg>
|
||||||
|
<arg>--datasourceIdBlacklist</arg><arg>${datasourceIdBlacklist}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="index_es"/>
|
<ok to="index_es"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
|
|
@ -16,5 +16,23 @@
|
||||||
"paramLongName": "dedupConfProfile",
|
"paramLongName": "dedupConfProfile",
|
||||||
"paramDescription": "the id of a valid Dedup Configuration Profile",
|
"paramDescription": "the id of a valid Dedup Configuration Profile",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "datasourceIdWhitelist",
|
||||||
|
"paramLongName": "datasourceIdWhitelist",
|
||||||
|
"paramDescription": "a white list (comma separeted, - for empty list) of datasource ids",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "datasourceTypeWhitelist",
|
||||||
|
"paramLongName": "datasourceTypeWhitelist",
|
||||||
|
"paramDescription": "a white list (comma separeted, - for empty list) of datasource types",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "datasourceIdBlacklist",
|
||||||
|
"paramLongName": "datasourceIdBlacklist",
|
||||||
|
"paramDescription": "a black list (comma separeted, - for empty list) of datasource ids",
|
||||||
|
"paramRequired": true
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
|
|
@ -73,23 +73,24 @@
|
||||||
</configuration>
|
</configuration>
|
||||||
</global>
|
</global>
|
||||||
|
|
||||||
<start to="generate_events"/>
|
<start to="index_es"/>
|
||||||
|
|
||||||
<kill name="Kill">
|
<kill name="Kill">
|
||||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
</kill>
|
</kill>
|
||||||
|
|
||||||
<action name="generate_events">
|
|
||||||
|
<action name="index_es">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>GenerateEventsJob</name>
|
<name>IndexOnESJob</name>
|
||||||
<class>eu.dnetlib.dhp.broker.oa.GenerateEventsJob</class>
|
<class>eu.dnetlib.dhp.broker.oa.IndexOnESJob</class>
|
||||||
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
--executor-memory=${sparkExecutorMemory}
|
||||||
--driver-memory=${sparkDriverMemory}
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.dynamicAllocation.maxExecutors="2"
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
@ -97,8 +98,8 @@
|
||||||
--conf spark.sql.shuffle.partitions=3840
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
<arg>--index</arg><arg>${esIndexName}</arg>
|
||||||
<arg>--dedupConfProfile</arg><arg>${dedupConfProfId}</arg>
|
<arg>--esHost</arg><arg>${esIndexHost}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
|
Loading…
Reference in New Issue