This commit is contained in:
Claudio Atzori 2020-07-02 12:45:01 +02:00
commit 1d39f7901c
15 changed files with 433 additions and 16 deletions

View File

@ -59,7 +59,7 @@
<dependency>
<groupId>eu.dnetlib</groupId>
<artifactId>dnet-openaire-broker-common</artifactId>
<version>[3.0.3,4.0.0)</version>
<version>[3.0.4,4.0.0)</version>
</dependency>
</dependencies>

View File

@ -42,6 +42,7 @@ public class EventFactory {
res.setCreationDate(now);
res.setExpiryDate(calculateExpiryDate(now));
res.setInstantMessage(false);
return res;
}
@ -53,6 +54,7 @@ public class EventFactory {
map.setTargetDatasourceId(target.getCollectedFromId());
map.setTargetDatasourceName(target.getCollectedFromName());
map.setTargetDatasourceType(target.getCollectedFromType());
map.setTargetResultId(target.getOpenaireId());
@ -73,6 +75,7 @@ public class EventFactory {
map.setTrust(updateInfo.getTrust());
map.setProvenanceDatasourceId(source.getCollectedFromId());
map.setProvenanceDatasourceName(source.getCollectedFromName());
map.setProvenanceDatasourceType(source.getCollectedFromType());
map.setProvenanceResultId(source.getOpenaireId());
return map;

View File

@ -13,6 +13,7 @@ public class MappedFields implements Serializable {
private String targetDatasourceId;
private String targetDatasourceName;
private String targetDatasourceType;
private String targetResultId;
private String targetResultTitle;
private long targetDateofacceptance;
@ -21,6 +22,7 @@ public class MappedFields implements Serializable {
private float trust;
private String provenanceDatasourceId;
private String provenanceDatasourceName;
private String provenanceDatasourceType;
private String provenanceResultId;
public String getTargetDatasourceId() {
@ -39,6 +41,14 @@ public class MappedFields implements Serializable {
this.targetDatasourceName = targetDatasourceName;
}
public String getTargetDatasourceType() {
return targetDatasourceType;
}
public void setTargetDatasourceType(final String targetDatasourceType) {
this.targetDatasourceType = targetDatasourceType;
}
public String getTargetResultId() {
return targetResultId;
}
@ -103,6 +113,14 @@ public class MappedFields implements Serializable {
this.provenanceDatasourceName = provenanceDatasourceName;
}
public String getProvenanceDatasourceType() {
return provenanceDatasourceType;
}
public void setProvenanceDatasourceType(final String provenanceDatasourceType) {
this.provenanceDatasourceType = provenanceDatasourceType;
}
public String getProvenanceResultId() {
return provenanceResultId;
}
@ -111,4 +129,8 @@ public class MappedFields implements Serializable {
this.provenanceResultId = provenanceResultId;
}
public static long getSerialversionuid() {
return serialVersionUID;
}
}

View File

@ -3,11 +3,15 @@ package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
@ -59,6 +63,15 @@ public class GenerateEventsJob {
final String eventsPath = workingPath + "/events";
log.info("eventsPath: {}", eventsPath);
final Set<String> dsIdWhitelist = parseParamAsList(parser, "datasourceIdWhitelist");
log.info("datasourceIdWhitelist: {}", StringUtils.join(dsIdWhitelist, ","));
final Set<String> dsTypeWhitelist = parseParamAsList(parser, "datasourceTypeWhitelist");
log.info("datasourceTypeWhitelist: {}", StringUtils.join(dsTypeWhitelist, ","));
final Set<String> dsIdBlacklist = parseParamAsList(parser, "datasourceIdBlacklist");
log.info("datasourceIdBlacklist: {}", StringUtils.join(dsIdBlacklist, ","));
final SparkConf conf = new SparkConf();
// TODO UNCOMMENT
@ -77,9 +90,12 @@ public class GenerateEventsJob {
.readPath(spark, workingPath + "/duplicates", ResultGroup.class);
final Dataset<Event> dataset = groups
.map(g -> EventFinder.generateEvents(g, dedupConfig, accumulators), Encoders.bean(EventGroup.class))
.flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class))
.map(e -> ClusterUtils.incrementAccumulator(e, total), Encoders.bean(Event.class));
.map(
g -> EventFinder
.generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, dedupConfig, accumulators),
Encoders
.bean(EventGroup.class))
.flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class));
ClusterUtils.save(dataset, eventsPath, Event.class, total);
@ -87,6 +103,22 @@ public class GenerateEventsJob {
}
private static Set<String> parseParamAsList(final ArgumentApplicationParser parser, final String key) {
final String s = parser.get(key).trim();
final Set<String> res = new HashSet<>();
if (s.length() > 1) { // A value of a single char (for example: '-') indicates an empty list
Arrays
.stream(s.split(","))
.map(String::trim)
.filter(StringUtils::isNotBlank)
.forEach(res::add);
}
return res;
}
public static Map<String, LongAccumulator> prepareAccumulators(final SparkContext sc) {
return EventFinder

View File

@ -48,6 +48,7 @@ public class IndexOnESJob {
final JavaRDD<String> inputRdd = ClusterUtils
.readPath(spark, eventsPath, Event.class)
.limit(10000) // TODO REMOVE
.map(IndexOnESJob::eventAsJsonString, Encoders.STRING())
.javaRDD();

View File

@ -0,0 +1,80 @@
package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.TypedColumn;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.AddDatasourceTypeAggregator;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.SimpleDatasourceInfo;
import scala.Tuple2;
public class JoinStep0Job {
private static final Logger log = LoggerFactory.getLogger(JoinStep0Job.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
JoinStep0Job.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String outputPath = workingPath + "/joinedEntities_step0";
log.info("outputPath: {}", outputPath);
final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
ClusterUtils.removeDir(spark, outputPath);
final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
final Dataset<OaBrokerMainEntity> sources = ClusterUtils
.readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class);
final Dataset<SimpleDatasourceInfo> datasources = ClusterUtils
.readPath(spark, workingPath + "/datasources", SimpleDatasourceInfo.class);
final TypedColumn<Tuple2<OaBrokerMainEntity, SimpleDatasourceInfo>, OaBrokerMainEntity> aggr = new AddDatasourceTypeAggregator()
.toColumn();
final Dataset<OaBrokerMainEntity> dataset = sources
.joinWith(datasources, sources.col("collectedFromId").equalTo(datasources.col("id")), "inner")
.groupByKey(t -> t._1.getOpenaireId(), Encoders.STRING())
.agg(aggr)
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));
ClusterUtils.save(dataset, outputPath, OaBrokerMainEntity.class, total);
});
}
}

View File

@ -55,7 +55,7 @@ public class JoinStep1Job {
final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
final Dataset<OaBrokerMainEntity> sources = ClusterUtils
.readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class);
.readPath(spark, workingPath + "/joinedEntities_step0", OaBrokerMainEntity.class);
final Dataset<RelatedProject> typedRels = ClusterUtils
.readPath(spark, workingPath + "/relatedProjects", RelatedProject.class);

View File

@ -0,0 +1,68 @@
package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.SimpleDatasourceInfo;
import eu.dnetlib.dhp.schema.oaf.Datasource;
public class PrepareRelatedDatasourcesJob {
private static final Logger log = LoggerFactory.getLogger(PrepareRelatedDatasourcesJob.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
PrepareRelatedDatasourcesJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String relsPath = workingPath + "/datasources";
log.info("relsPath: {}", relsPath);
final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
ClusterUtils.removeDir(spark, relsPath);
final LongAccumulator total = spark.sparkContext().longAccumulator("total_datasources");
final Dataset<SimpleDatasourceInfo> dataset = ClusterUtils
.readPath(spark, graphPath + "/datasource", Datasource.class)
.map(
ds -> new SimpleDatasourceInfo(ds.getId(), ds.getDatasourcetype().getClassid()),
Encoders.bean(SimpleDatasourceInfo.class));
ClusterUtils.save(dataset, relsPath, SimpleDatasourceInfo.class, total);
});
}
}

View File

@ -4,8 +4,11 @@ package eu.dnetlib.dhp.broker.oa.util;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.model.EventFactory;
@ -38,6 +41,8 @@ import eu.dnetlib.pace.config.DedupConfig;
public class EventFinder {
private static final Logger log = LoggerFactory.getLogger(EventFinder.class);
private static final List<UpdateMatcher<?>> matchers = new ArrayList<>();
static {
matchers.add(new EnrichMissingAbstract());
@ -69,19 +74,39 @@ public class EventFinder {
}
public static EventGroup generateEvents(final ResultGroup results,
final Set<String> dsIdWhitelist,
final Set<String> dsIdBlacklist,
final Set<String> dsTypeWhitelist,
final DedupConfig dedupConfig,
final Map<String, LongAccumulator> accumulators) {
final List<UpdateInfo<?>> list = new ArrayList<>();
for (final OaBrokerMainEntity target : results.getData()) {
for (final UpdateMatcher<?> matcher : matchers) {
list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), dedupConfig, accumulators));
if (verifyTarget(target, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist)) {
for (final UpdateMatcher<?> matcher : matchers) {
list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), dedupConfig, accumulators));
}
}
}
return asEventGroup(list);
}
private static boolean verifyTarget(final OaBrokerMainEntity target,
final Set<String> dsIdWhitelist,
final Set<String> dsIdBlacklist,
final Set<String> dsTypeWhitelist) {
if (dsIdWhitelist.contains(target.getCollectedFromId())) {
return true;
} else if (dsIdBlacklist.contains(target.getCollectedFromId())) {
return false;
} else {
return dsTypeWhitelist.contains(target.getCollectedFromType());
}
}
private static EventGroup asEventGroup(final List<UpdateInfo<?>> list) {
final EventGroup events = new EventGroup();
list.stream().map(EventFactory::newBrokerEvent).forEach(events::addElement);

View File

@ -113,6 +113,7 @@ public final class UpdateInfo<T> {
final String provId = getSource().getOpenaireId();
final String provRepo = getSource().getCollectedFromName();
final String provType = getSource().getCollectedFromType();
final String provUrl = getSource()
.getInstances()
@ -122,7 +123,7 @@ public final class UpdateInfo<T> {
.orElse(null);
;
final OaBrokerProvenance provenance = new OaBrokerProvenance(provId, provRepo, provUrl);
final OaBrokerProvenance provenance = new OaBrokerProvenance(provId, provRepo, provType, provUrl);
final OaBrokerEventPayload res = new OaBrokerEventPayload();
res.setResult(target);

View File

@ -0,0 +1,59 @@
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import scala.Tuple2;
public class AddDatasourceTypeAggregator
extends Aggregator<Tuple2<OaBrokerMainEntity, SimpleDatasourceInfo>, OaBrokerMainEntity, OaBrokerMainEntity> {
/**
*
*/
private static final long serialVersionUID = 8788588975496014728L;
@Override
public OaBrokerMainEntity zero() {
return new OaBrokerMainEntity();
}
@Override
public OaBrokerMainEntity finish(final OaBrokerMainEntity g) {
return g;
}
@Override
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g,
final Tuple2<OaBrokerMainEntity, SimpleDatasourceInfo> t) {
final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1;
if (t._2 != null && StringUtils.isNotBlank(t._2.getType())) {
res.setCollectedFromType(t._2.getType());
}
return res;
}
@Override
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
if (StringUtils.isNotBlank(g1.getOpenaireId()) && StringUtils.isNotBlank(g1.getCollectedFromType())) {
return g1;
} else {
return g2;
}
}
@Override
public Encoder<OaBrokerMainEntity> bufferEncoder() {
return Encoders.bean(OaBrokerMainEntity.class);
}
@Override
public Encoder<OaBrokerMainEntity> outputEncoder() {
return Encoders.bean(OaBrokerMainEntity.class);
}
}

View File

@ -0,0 +1,40 @@
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
import java.io.Serializable;
public class SimpleDatasourceInfo implements Serializable {
/**
*
*/
private static final long serialVersionUID = 2996609859416024734L;
private String id;
private String type;
public SimpleDatasourceInfo() {
}
public SimpleDatasourceInfo(final String id, final String type) {
this.id = id;
this.type = type;
}
public String getId() {
return id;
}
public void setId(final String id) {
this.id = id;
}
public String getType() {
return type;
}
public void setType(final String type) {
this.type = type;
}
}

View File

@ -16,6 +16,21 @@
<property>
<name>dedupConfProfId</name>
<description>the id of a valid Dedup Configuration Profile</description>
</property>
<property>
<name>datasourceIdWhitelist</name>
<value>-</value>
<description>a white list (comma separeted, - for empty list) of datasource ids</description>
</property>
<property>
<name>datasourceTypeWhitelist</name>
<value>-</value>
<description>a white list (comma separeted, - for empty list) of datasource types</description>
</property>
<property>
<name>datasourceIdBlacklist</name>
<value>-</value>
<description>a black list (comma separeted, - for empty list) of datasource ids</description>
</property>
<property>
<name>esIndexName</name>
@ -96,6 +111,7 @@
<fork name="start_entities_and_rels">
<path start="prepare_simple_entities"/>
<path start="prepare_related_datasources"/>
<path start="prepare_related_softwares"/>
<path start="prepare_related_datasets"/>
<path start="prepare_related_projects"/>
@ -125,6 +141,30 @@
<ok to="wait_entities_and_rels"/>
<error to="Kill"/>
</action>
<action name="prepare_related_datasources">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareRelatedDatasourcesJob</name>
<class>eu.dnetlib.dhp.broker.oa.PrepareRelatedDatasourcesJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="wait_entities_and_rels"/>
<error to="Kill"/>
</action>
<action name="prepare_related_datasets">
@ -223,7 +263,31 @@
<error to="Kill"/>
</action>
<join name="wait_entities_and_rels" to="join_entities_step1"/>
<join name="wait_entities_and_rels" to="join_entities_step0"/>
<action name="join_entities_step0">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>JoinStep0</name>
<class>eu.dnetlib.dhp.broker.oa.JoinStep0Job</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="join_entities_step1"/>
<error to="Kill"/>
</action>
<action name="join_entities_step1">
<spark xmlns="uri:oozie:spark-action:0.2">
@ -365,6 +429,9 @@
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--dedupConfProfile</arg><arg>${dedupConfProfId}</arg>
<arg>--datasourceIdWhitelist</arg><arg>${datasourceIdWhitelist}</arg>
<arg>--datasourceTypeWhitelist</arg><arg>${datasourceTypeWhitelist}</arg>
<arg>--datasourceIdBlacklist</arg><arg>${datasourceIdBlacklist}</arg>
</spark>
<ok to="index_es"/>
<error to="Kill"/>

View File

@ -16,5 +16,23 @@
"paramLongName": "dedupConfProfile",
"paramDescription": "the id of a valid Dedup Configuration Profile",
"paramRequired": true
},
{
"paramName": "datasourceIdWhitelist",
"paramLongName": "datasourceIdWhitelist",
"paramDescription": "a white list (comma separeted, - for empty list) of datasource ids",
"paramRequired": true
},
{
"paramName": "datasourceTypeWhitelist",
"paramLongName": "datasourceTypeWhitelist",
"paramDescription": "a white list (comma separeted, - for empty list) of datasource types",
"paramRequired": true
},
{
"paramName": "datasourceIdBlacklist",
"paramLongName": "datasourceIdBlacklist",
"paramDescription": "a black list (comma separeted, - for empty list) of datasource ids",
"paramRequired": true
}
]

View File

@ -73,23 +73,24 @@
</configuration>
</global>
<start to="generate_events"/>
<start to="index_es"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="generate_events">
<action name="index_es">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateEventsJob</name>
<class>eu.dnetlib.dhp.broker.oa.GenerateEventsJob</class>
<name>IndexOnESJob</name>
<class>eu.dnetlib.dhp.broker.oa.IndexOnESJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.dynamicAllocation.maxExecutors="2"
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -97,8 +98,8 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--dedupConfProfile</arg><arg>${dedupConfProfId}</arg>
<arg>--index</arg><arg>${esIndexName}</arg>
<arg>--esHost</arg><arg>${esIndexHost}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>