This commit is contained in:
Alessia Bardi 2020-07-12 19:29:12 +02:00
commit 7e96105947
29 changed files with 550 additions and 206 deletions

View File

@ -57,9 +57,9 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>eu.dnetlib</groupId> <groupId>eu.dnetlib.dhp</groupId>
<artifactId>dnet-openaire-broker-common</artifactId> <artifactId>dnet-openaire-broker-common</artifactId>
<version>[3.0.4,4.0.0)</version> <version>[3.0.0-SNAPSHOT,)</version>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -18,8 +18,6 @@ import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event; import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
@ -27,9 +25,6 @@ import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.EventFinder; import eu.dnetlib.dhp.broker.oa.util.EventFinder;
import eu.dnetlib.dhp.broker.oa.util.EventGroup; import eu.dnetlib.dhp.broker.oa.util.EventGroup;
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup; import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.config.DedupConfig;
public class GenerateEventsJob { public class GenerateEventsJob {
@ -52,12 +47,6 @@ public class GenerateEventsJob {
final String workingPath = parser.get("workingPath"); final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath); log.info("workingPath: {}", workingPath);
final String isLookupUrl = parser.get("isLookupUrl");
log.info("isLookupUrl: {}", isLookupUrl);
final String dedupConfigProfileId = parser.get("dedupConfProfile");
log.info("dedupConfigProfileId: {}", dedupConfigProfileId);
final String eventsPath = workingPath + "/events"; final String eventsPath = workingPath + "/events";
log.info("eventsPath: {}", eventsPath); log.info("eventsPath: {}", eventsPath);
@ -72,10 +61,6 @@ public class GenerateEventsJob {
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();
// TODO UNCOMMENT
// final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId);
final DedupConfig dedupConfig = null;
runWithSparkSession(conf, isSparkSessionManaged, spark -> { runWithSparkSession(conf, isSparkSessionManaged, spark -> {
ClusterUtils.removeDir(spark, eventsPath); ClusterUtils.removeDir(spark, eventsPath);
@ -90,7 +75,7 @@ public class GenerateEventsJob {
final Dataset<Event> dataset = groups final Dataset<Event> dataset = groups
.map( .map(
g -> EventFinder g -> EventFinder
.generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, dedupConfig, accumulators), .generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, accumulators),
Encoders Encoders
.bean(EventGroup.class)) .bean(EventGroup.class))
.flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class)); .flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class));
@ -112,23 +97,4 @@ public class GenerateEventsJob {
} }
private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception {
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl);
final String conf = isLookUpService
.getResourceProfileByQuery(
String
.format(
"for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()",
profId));
final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class);
dedupConfig.getPace().initModel();
dedupConfig.getPace().initTranslationMap();
// dedupConfig.getWf().setConfigurationId("???");
return dedupConfig;
}
} }

View File

@ -0,0 +1,63 @@
package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.TypedColumn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.aggregators.stats.DatasourceStats;
import eu.dnetlib.dhp.broker.oa.util.aggregators.stats.StatsAggregator;
public class GenerateStatsJob {
private static final Logger log = LoggerFactory.getLogger(GenerateStatsJob.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
IndexOnESJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final SparkConf conf = new SparkConf();
final String eventsPath = parser.get("workingPath") + "/events";
log.info("eventsPath: {}", eventsPath);
final String statsPath = parser.get("workingPath") + "/stats";
log.info("stats: {}", statsPath);
final TypedColumn<Event, DatasourceStats> aggr = new StatsAggregator().toColumn();
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
final Dataset<DatasourceStats> stats = ClusterUtils
.readPath(spark, eventsPath, Event.class)
.groupByKey(e -> e.getMap().getTargetDatasourceId(), Encoders.STRING())
.agg(aggr)
.map(t -> t._2, Encoders.bean(DatasourceStats.class));
ClusterUtils.save(stats, statsPath, DatasourceStats.class, null);
});
}
}

View File

@ -7,7 +7,6 @@ import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.TypedColumn; import org.apache.spark.sql.TypedColumn;
@ -65,9 +64,7 @@ public class JoinStep2Job {
final Dataset<OaBrokerMainEntity> dataset = sources final Dataset<OaBrokerMainEntity> dataset = sources
.joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
.groupByKey( .groupByKey(t -> t._1.getOpenaireId(), Encoders.STRING())
(MapFunction<Tuple2<OaBrokerMainEntity, RelatedSoftware>, String>) t -> t._1.getOpenaireId(),
Encoders.STRING())
.agg(aggr) .agg(aggr)
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)); .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));

View File

@ -17,7 +17,6 @@ import org.apache.spark.util.LongAccumulator;
import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.pace.config.DedupConfig;
public abstract class UpdateMatcher<T> { public abstract class UpdateMatcher<T> {
@ -37,7 +36,6 @@ public abstract class UpdateMatcher<T> {
public Collection<UpdateInfo<T>> searchUpdatesForRecord(final OaBrokerMainEntity res, public Collection<UpdateInfo<T>> searchUpdatesForRecord(final OaBrokerMainEntity res,
final Collection<OaBrokerMainEntity> others, final Collection<OaBrokerMainEntity> others,
final DedupConfig dedupConfig,
final Map<String, LongAccumulator> accumulators) { final Map<String, LongAccumulator> accumulators) {
final Map<String, UpdateInfo<T>> infoMap = new HashMap<>(); final Map<String, UpdateInfo<T>> infoMap = new HashMap<>();
@ -49,7 +47,7 @@ public abstract class UpdateMatcher<T> {
if (topic != null) { if (topic != null) {
final UpdateInfo<T> info = new UpdateInfo<>(topic, hl, source, res, final UpdateInfo<T> info = new UpdateInfo<>(topic, hl, source, res,
getCompileHighlightFunction(), getCompileHighlightFunction(),
getHighlightToStringFunction(), dedupConfig); getHighlightToStringFunction());
final String s = DigestUtils.md5Hex(info.getHighlightValueAsString()); final String s = DigestUtils.md5Hex(info.getHighlightValueAsString());
if (!infoMap.containsKey(s) || infoMap.get(s).getTrust() < info.getTrust()) { if (!infoMap.containsKey(s) || infoMap.get(s).getTrust() < info.getTrust()) {

View File

@ -37,7 +37,6 @@ import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreOpenAccess;
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMorePid; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMorePid;
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSubject; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSubject;
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup; import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
import eu.dnetlib.pace.config.DedupConfig;
public class EventFinder { public class EventFinder {
@ -76,7 +75,6 @@ public class EventFinder {
final Set<String> dsIdWhitelist, final Set<String> dsIdWhitelist,
final Set<String> dsIdBlacklist, final Set<String> dsIdBlacklist,
final Set<String> dsTypeWhitelist, final Set<String> dsTypeWhitelist,
final DedupConfig dedupConfig,
final Map<String, LongAccumulator> accumulators) { final Map<String, LongAccumulator> accumulators) {
final List<UpdateInfo<?>> list = new ArrayList<>(); final List<UpdateInfo<?>> list = new ArrayList<>();
@ -84,7 +82,7 @@ public class EventFinder {
for (final OaBrokerMainEntity target : results.getData()) { for (final OaBrokerMainEntity target : results.getData()) {
if (verifyTarget(target, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist)) { if (verifyTarget(target, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist)) {
for (final UpdateMatcher<?> matcher : matchers) { for (final UpdateMatcher<?> matcher : matchers) {
list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), dedupConfig, accumulators)); list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), accumulators));
} }
} }
} }

View File

@ -1,8 +1,62 @@
package eu.dnetlib.dhp.broker.oa.util; package eu.dnetlib.dhp.broker.oa.util;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.tree.support.TreeProcessor;
import eu.dnetlib.pace.util.MapDocumentUtil;
public class TrustUtils { public class TrustUtils {
private static final Logger log = LoggerFactory.getLogger(TrustUtils.class);
private static DedupConfig dedupConfig;
static {
final ObjectMapper mapper = new ObjectMapper();
try {
dedupConfig = mapper
.readValue(
DedupConfig.class.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/dedupConfig/dedupConfig.json"),
DedupConfig.class);
} catch (final IOException e) {
log.error("Error loading dedupConfig, e");
}
}
protected static float calculateTrust(final OaBrokerMainEntity r1, final OaBrokerMainEntity r2) {
if (dedupConfig == null) {
return BrokerConstants.MIN_TRUST;
}
try {
final ObjectMapper objectMapper = new ObjectMapper();
final MapDocument doc1 = MapDocumentUtil
.asMapDocumentWithJPath(dedupConfig, objectMapper.writeValueAsString(r1));
final MapDocument doc2 = MapDocumentUtil
.asMapDocumentWithJPath(dedupConfig, objectMapper.writeValueAsString(r2));
final double score = new TreeProcessor(dedupConfig).computeScore(doc1, doc2);
final double threshold = dedupConfig.getWf().getThreshold();
return TrustUtils.rescale(score, threshold);
} catch (final Exception e) {
log.error("Error computing score between results", e);
return BrokerConstants.MIN_TRUST;
}
}
public static float rescale(final double score, final double threshold) { public static float rescale(final double score, final double threshold) {
if (score >= BrokerConstants.MAX_TRUST) { if (score >= BrokerConstants.MAX_TRUST) {
return BrokerConstants.MAX_TRUST; return BrokerConstants.MAX_TRUST;

View File

@ -4,20 +4,11 @@ package eu.dnetlib.dhp.broker.oa.util;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Function; import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.broker.objects.OaBrokerEventPayload; import eu.dnetlib.broker.objects.OaBrokerEventPayload;
import eu.dnetlib.broker.objects.OaBrokerInstance; import eu.dnetlib.broker.objects.OaBrokerInstance;
import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.OaBrokerProvenance; import eu.dnetlib.broker.objects.OaBrokerProvenance;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.tree.support.TreeProcessor;
import eu.dnetlib.pace.util.MapDocumentUtil;
public final class UpdateInfo<T> { public final class UpdateInfo<T> {
@ -35,20 +26,17 @@ public final class UpdateInfo<T> {
private final float trust; private final float trust;
private static final Logger log = LoggerFactory.getLogger(UpdateInfo.class);
public UpdateInfo(final Topic topic, final T highlightValue, final OaBrokerMainEntity source, public UpdateInfo(final Topic topic, final T highlightValue, final OaBrokerMainEntity source,
final OaBrokerMainEntity target, final OaBrokerMainEntity target,
final BiConsumer<OaBrokerMainEntity, T> compileHighlight, final BiConsumer<OaBrokerMainEntity, T> compileHighlight,
final Function<T, String> highlightToString, final Function<T, String> highlightToString) {
final DedupConfig dedupConfig) {
this.topic = topic; this.topic = topic;
this.highlightValue = highlightValue; this.highlightValue = highlightValue;
this.source = source; this.source = source;
this.target = target; this.target = target;
this.compileHighlight = compileHighlight; this.compileHighlight = compileHighlight;
this.highlightToString = highlightToString; this.highlightToString = highlightToString;
this.trust = calculateTrust(dedupConfig, source, target); this.trust = TrustUtils.calculateTrust(source, target);
} }
public T getHighlightValue() { public T getHighlightValue() {
@ -63,31 +51,6 @@ public final class UpdateInfo<T> {
return target; return target;
} }
private float calculateTrust(final DedupConfig dedupConfig,
final OaBrokerMainEntity r1,
final OaBrokerMainEntity r2) {
if (dedupConfig == null) {
return BrokerConstants.MIN_TRUST;
}
try {
final ObjectMapper objectMapper = new ObjectMapper();
final MapDocument doc1 = MapDocumentUtil
.asMapDocumentWithJPath(dedupConfig, objectMapper.writeValueAsString(r1));
final MapDocument doc2 = MapDocumentUtil
.asMapDocumentWithJPath(dedupConfig, objectMapper.writeValueAsString(r2));
final double score = new TreeProcessor(dedupConfig).computeScore(doc1, doc2);
final double threshold = dedupConfig.getWf().getThreshold();
return TrustUtils.rescale(score, threshold);
} catch (final Exception e) {
log.error("Error computing score between results", e);
return BrokerConstants.MIN_TRUST;
}
}
protected Topic getTopic() { protected Topic getTopic() {
return topic; return topic;
} }

View File

@ -0,0 +1,61 @@
package eu.dnetlib.dhp.broker.oa.util.aggregators.stats;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
public class DatasourceStats implements Serializable {
/**
*
*/
private static final long serialVersionUID = -282112564184047677L;
private String id;
private String name;
private String type;
private Map<String, Long> topics = new HashMap<>();
public String getId() {
return id;
}
public void setId(final String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(final String name) {
this.name = name;
}
public String getType() {
return type;
}
public void setType(final String type) {
this.type = type;
}
public Map<String, Long> getTopics() {
return topics;
}
public void setTopics(final Map<String, Long> topics) {
this.topics = topics;
}
public void incrementTopic(final String topic, final long inc) {
if (topics.containsKey(topic)) {
topics.put(topic, topics.get(topic) + inc);
} else {
topics.put(topic, inc);
}
}
}

View File

@ -0,0 +1,59 @@
package eu.dnetlib.dhp.broker.oa.util.aggregators.stats;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
import eu.dnetlib.dhp.broker.model.Event;
public class StatsAggregator extends Aggregator<Event, DatasourceStats, DatasourceStats> {
/**
*
*/
private static final long serialVersionUID = 6652105853037330529L;
@Override
public DatasourceStats zero() {
return new DatasourceStats();
}
@Override
public DatasourceStats reduce(final DatasourceStats stats, final Event e) {
stats.setId(e.getMap().getTargetDatasourceId());
stats.setName(e.getMap().getTargetDatasourceName());
stats.setType(e.getMap().getTargetDatasourceType());
stats.incrementTopic(e.getTopic(), 1l);
return stats;
}
@Override
public DatasourceStats merge(final DatasourceStats stats0, final DatasourceStats stats1) {
if (StringUtils.isBlank(stats0.getId())) {
stats0.setId(stats1.getId());
stats0.setName(stats1.getName());
stats0.setType(stats1.getType());
}
stats1.getTopics().entrySet().forEach(e -> stats0.incrementTopic(e.getKey(), e.getValue()));
return stats0;
}
@Override
public Encoder<DatasourceStats> bufferEncoder() {
return Encoders.bean(DatasourceStats.class);
}
@Override
public DatasourceStats finish(final DatasourceStats stats) {
return stats;
}
@Override
public Encoder<DatasourceStats> outputEncoder() {
return Encoders.bean(DatasourceStats.class);
}
}

View File

@ -0,0 +1,122 @@
{
"wf": {
},
"pace": {
"clustering": [
{
"name": "wordssuffixprefix",
"fields": [
"title"
],
"params": {
"max": "2",
"len": "3"
}
},
{
"name": "lowercase",
"fields": [
"doi"
],
"params": {
}
}
],
"decisionTree": {
"start": {
"fields": [
{
"field": "doi",
"comparator": "exactMatch",
"weight": 1.0,
"countIfUndefined": "false",
"params": {
}
}
],
"threshold": 0.5,
"aggregation": "AVG",
"positive": "MATCH",
"negative": "layer1",
"undefined": "layer1",
"ignoreUndefined": "true"
},
"layer1": {
"fields": [
{
"field": "title",
"comparator": "titleVersionMatch",
"weight": 0.9,
"countIfUndefined": "false",
"params": {
}
},
{
"field": "authors",
"comparator": "sizeMatch",
"weight": 0.9,
"countIfUndefined": "false",
"params": {
}
}
],
"threshold": 0.5,
"aggregation": "AVG",
"positive": "MATCH",
"negative": "layer2",
"undefined": "layer2",
"ignoreUndefined": "true"
},
"layer2": {
"fields": [
{
"field": "title",
"comparator": "levensteinTitle",
"weight": 1.0,
"countIfUndefined": "true",
"params": {
}
}
],
"threshold": 0.99,
"aggregation": "AVG",
"positive": "MATCH",
"negative": "NO_MATCH",
"undefined": "NO_MATCH",
"ignoreUndefined": "true"
}
},
"model": [
{
"name": "doi",
"type": "String",
"path": "$.pids[?(@.type == 'doi')].value"
},
{
"name": "title",
"type": "String",
"path": "$.titles",
"length": 250,
"size": 5
},
{
"name": "authors",
"type": "List",
"path": "$.creators[*].fullname",
"size": 200
}
],
"blacklists": {
},
"synonyms": {
}
}
}

View File

@ -8,14 +8,6 @@
<property> <property>
<name>workingPath</name> <name>workingPath</name>
<description>the path where the the generated data will be stored</description> <description>the path where the the generated data will be stored</description>
</property>
<property>
<name>isLookupUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>dedupConfProfId</name>
<description>the id of a valid Dedup Configuration Profile</description>
</property> </property>
<property> <property>
<name>datasourceIdWhitelist</name> <name>datasourceIdWhitelist</name>
@ -427,8 +419,6 @@
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--dedupConfProfile</arg><arg>${dedupConfProfId}</arg>
<arg>--datasourceIdWhitelist</arg><arg>${datasourceIdWhitelist}</arg> <arg>--datasourceIdWhitelist</arg><arg>${datasourceIdWhitelist}</arg>
<arg>--datasourceTypeWhitelist</arg><arg>${datasourceTypeWhitelist}</arg> <arg>--datasourceTypeWhitelist</arg><arg>${datasourceTypeWhitelist}</arg>
<arg>--datasourceIdBlacklist</arg><arg>${datasourceIdBlacklist}</arg> <arg>--datasourceIdBlacklist</arg><arg>${datasourceIdBlacklist}</arg>

View File

@ -5,18 +5,6 @@
"paramDescription": "the path where the generated events will be stored", "paramDescription": "the path where the generated events will be stored",
"paramRequired": true "paramRequired": true
}, },
{
"paramName": "lu",
"paramLongName": "isLookupUrl",
"paramDescription": "the address of the ISLookUpService",
"paramRequired": true
},
{
"paramName": "d",
"paramLongName": "dedupConfProfile",
"paramDescription": "the id of a valid Dedup Configuration Profile",
"paramRequired": true
},
{ {
"paramName": "datasourceIdWhitelist", "paramName": "datasourceIdWhitelist",
"paramLongName": "datasourceIdWhitelist", "paramLongName": "datasourceIdWhitelist",

View File

@ -9,15 +9,6 @@
<name>workingPath</name> <name>workingPath</name>
<description>the path where the the generated data will be stored</description> <description>the path where the the generated data will be stored</description>
</property> </property>
<property>
<name>isLookupUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>dedupConfProfId</name>
<description>the id of a valid Dedup Configuration Profile</description>
</property>
<property> <property>
<name>sparkDriverMemory</name> <name>sparkDriverMemory</name>
<description>memory for driver process</description> <description>memory for driver process</description>
@ -73,19 +64,19 @@
</configuration> </configuration>
</global> </global>
<start to="count"/> <start to="stats"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="count"> <action name="stats">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Count</name> <name>GenerateStatsJob</name>
<class>eu.dnetlib.dhp.broker.oa.CheckDuplictedIdsJob</class> <class>eu.dnetlib.dhp.broker.oa.GenerateStatsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar> <jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}

View File

@ -30,7 +30,7 @@ class UpdateMatcherTest {
final OaBrokerMainEntity p4 = new OaBrokerMainEntity(); final OaBrokerMainEntity p4 = new OaBrokerMainEntity();
final Collection<UpdateInfo<String>> list = matcher final Collection<UpdateInfo<String>> list = matcher
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null); .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null);
assertTrue(list.isEmpty()); assertTrue(list.isEmpty());
} }
@ -46,7 +46,7 @@ class UpdateMatcherTest {
res.setPublicationdate("2018"); res.setPublicationdate("2018");
final Collection<UpdateInfo<String>> list = matcher final Collection<UpdateInfo<String>> list = matcher
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null); .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null);
assertTrue(list.isEmpty()); assertTrue(list.isEmpty());
} }
@ -62,7 +62,7 @@ class UpdateMatcherTest {
p2.setPublicationdate("2018"); p2.setPublicationdate("2018");
final Collection<UpdateInfo<String>> list = matcher final Collection<UpdateInfo<String>> list = matcher
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null); .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null);
assertTrue(list.size() == 1); assertTrue(list.size() == 1);
} }
@ -79,7 +79,7 @@ class UpdateMatcherTest {
p2.setPublicationdate("2018"); p2.setPublicationdate("2018");
final Collection<UpdateInfo<String>> list = matcher final Collection<UpdateInfo<String>> list = matcher
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null); .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null);
assertTrue(list.isEmpty()); assertTrue(list.isEmpty());
} }
@ -98,7 +98,7 @@ class UpdateMatcherTest {
p4.setPublicationdate("2018"); p4.setPublicationdate("2018");
final Collection<UpdateInfo<String>> list = matcher final Collection<UpdateInfo<String>> list = matcher
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null); .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null);
assertTrue(list.isEmpty()); assertTrue(list.isEmpty());
} }
@ -117,7 +117,7 @@ class UpdateMatcherTest {
p4.setPublicationdate("2018"); p4.setPublicationdate("2018");
final Collection<UpdateInfo<String>> list = matcher final Collection<UpdateInfo<String>> list = matcher
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null); .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null);
assertTrue(list.size() == 1); assertTrue(list.size() == 1);
} }

View File

@ -5,6 +5,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import eu.dnetlib.broker.objects.OaBrokerAuthor;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.OaBrokerTypedValue;
public class TrustUtilsTest { public class TrustUtilsTest {
private static final double THRESHOLD = 0.95; private static final double THRESHOLD = 0.95;
@ -64,6 +68,23 @@ public class TrustUtilsTest {
verifyValue(2.00, BrokerConstants.MAX_TRUST); verifyValue(2.00, BrokerConstants.MAX_TRUST);
} }
@Test
public void test() throws Exception {
final OaBrokerMainEntity r1 = new OaBrokerMainEntity();
r1.getTitles().add("D-NET Service Package: Data Import");
r1.getPids().add(new OaBrokerTypedValue("doi", "123"));
r1.getCreators().add(new OaBrokerAuthor("Michele Artini", null));
r1.getCreators().add(new OaBrokerAuthor("Claudio Atzori", null));
final OaBrokerMainEntity r2 = new OaBrokerMainEntity();
r2.getTitles().add("D-NET Service Package: Data Import");
// r2.getPids().add(new OaBrokerTypedValue("doi", "123"));
r2.getCreators().add(new OaBrokerAuthor("Michele Artini", null));
// r2.getCreators().add(new OaBrokerAuthor("Claudio Atzori", null));
System.out.println("TRUST: " + TrustUtils.calculateTrust(r1, r2));
}
private void verifyValue(final double originalScore, final float expectedTrust) { private void verifyValue(final double originalScore, final float expectedTrust) {
final float trust = TrustUtils.rescale(originalScore, THRESHOLD); final float trust = TrustUtils.rescale(originalScore, THRESHOLD);
System.out.println(trust); System.out.println(trust);

View File

@ -47,10 +47,11 @@ public class EntityMergerTest implements Serializable {
@Test @Test
public void softwareMergerTest() throws InstantiationException, IllegalAccessException { public void softwareMergerTest() throws InstantiationException, IllegalAccessException {
List<Tuple2<String, Software>> softwares = readSample(testEntityBasePath + "/software_merge.json", Software.class); List<Tuple2<String, Software>> softwares = readSample(
testEntityBasePath + "/software_merge.json", Software.class);
Software merged = DedupRecordFactory Software merged = DedupRecordFactory
.entityMerger(dedupId, softwares.iterator(), 0, dataInfo, Software.class); .entityMerger(dedupId, softwares.iterator(), 0, dataInfo, Software.class);
assertEquals(merged.getBestaccessright().getClassid(), "OPEN SOURCE"); assertEquals(merged.getBestaccessright().getClassid(), "OPEN SOURCE");
} }

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper;
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils; import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
@ -97,7 +98,7 @@ public class CleanGraphSparkJob {
.json(outputPath); .json(outputPath);
} }
private static <T extends Oaf> T fixDefaults(T value) { protected static <T extends Oaf> T fixDefaults(T value) {
if (value instanceof Datasource) { if (value instanceof Datasource) {
// nothing to clean here // nothing to clean here
} else if (value instanceof Project) { } else if (value instanceof Project) {
@ -134,11 +135,6 @@ public class CleanGraphSparkJob {
.setResourcetype( .setResourcetype(
qualifier("UNKNOWN", "Unknown", ModelConstants.DNET_DATA_CITE_RESOURCE)); qualifier("UNKNOWN", "Unknown", ModelConstants.DNET_DATA_CITE_RESOURCE));
} }
if (Objects.isNull(r.getBestaccessright()) || StringUtils.isBlank(r.getBestaccessright().getClassid())) {
r
.setBestaccessright(
qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES));
}
if (Objects.nonNull(r.getInstance())) { if (Objects.nonNull(r.getInstance())) {
for (Instance i : r.getInstance()) { for (Instance i : r.getInstance()) {
if (Objects.isNull(i.getAccessright()) || StringUtils.isBlank(i.getAccessright().getClassid())) { if (Objects.isNull(i.getAccessright()) || StringUtils.isBlank(i.getAccessright().getClassid())) {
@ -152,6 +148,16 @@ public class CleanGraphSparkJob {
} }
} }
} }
if (Objects.isNull(r.getBestaccessright()) || StringUtils.isBlank(r.getBestaccessright().getClassid())) {
Qualifier bestaccessrights = AbstractMdRecordToOafMapper.createBestAccessRights(r.getInstance());
if (Objects.isNull(bestaccessrights)) {
r
.setBestaccessright(
qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES));
} else {
r.setBestaccessright(bestaccessrights);
}
}
if (Objects.nonNull(r.getAuthor())) { if (Objects.nonNull(r.getAuthor())) {
boolean nullRank = r boolean nullRank = r
.getAuthor() .getAuthor()

View File

@ -378,6 +378,10 @@ public abstract class AbstractMdRecordToOafMapper {
protected abstract Field<String> prepareDatasetStorageDate(Document doc, DataInfo info); protected abstract Field<String> prepareDatasetStorageDate(Document doc, DataInfo info);
public static Qualifier createBestAccessRights(final List<Instance> instanceList) {
return getBestAccessRights(instanceList);
}
protected static Qualifier getBestAccessRights(final List<Instance> instanceList) { protected static Qualifier getBestAccessRights(final List<Instance> instanceList) {
if (instanceList != null) { if (instanceList != null) {
final Optional<Qualifier> min = instanceList final Optional<Qualifier> min = instanceList

View File

@ -16,6 +16,11 @@
<name>postgresPassword</name> <name>postgresPassword</name>
<description>the password postgres</description> <description>the password postgres</description>
</property> </property>
<property>
<name>dbSchema</name>
<value>beta</value>
<description>the database schema according to the D-Net infrastructure (beta or production)</description>
</property>
<property> <property>
<name>isLookupUrl</name> <name>isLookupUrl</name>
<description>the address of the lookUp service</description> <description>the address of the lookUp service</description>
@ -93,6 +98,7 @@
<arg>--postgresUser</arg><arg>${postgresUser}</arg> <arg>--postgresUser</arg><arg>${postgresUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg> <arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--dbschema</arg><arg>${dbSchema}</arg>
</java> </java>
<ok to="ImportDB_claims"/> <ok to="ImportDB_claims"/>
<error to="Kill"/> <error to="Kill"/>
@ -109,6 +115,7 @@
<arg>--postgresUser</arg><arg>${postgresUser}</arg> <arg>--postgresUser</arg><arg>${postgresUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg> <arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--dbschema</arg><arg>${dbSchema}</arg>
<arg>--action</arg><arg>claims</arg> <arg>--action</arg><arg>claims</arg>
</java> </java>
<ok to="End"/> <ok to="End"/>

View File

@ -57,6 +57,8 @@ public class CleaningFunctionTest {
String json = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/result.json")); String json = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/result.json"));
Publication p_in = MAPPER.readValue(json, Publication.class); Publication p_in = MAPPER.readValue(json, Publication.class);
assertNull(p_in.getBestaccessright());
assertTrue(p_in instanceof Result); assertTrue(p_in instanceof Result);
assertTrue(p_in instanceof Publication); assertTrue(p_in instanceof Publication);
@ -84,6 +86,9 @@ public class CleaningFunctionTest {
.map(p -> p.getQualifier()) .map(p -> p.getQualifier())
.allMatch(q -> pidTerms.contains(q.getClassid()))); .allMatch(q -> pidTerms.contains(q.getClassid())));
Publication p_defaults = CleanGraphSparkJob.fixDefaults(p_out);
assertEquals("CLOSED", p_defaults.getBestaccessright().getClassid());
// TODO add more assertions to verity the cleaned values // TODO add more assertions to verity the cleaned values
System.out.println(MAPPER.writeValueAsString(p_out)); System.out.println(MAPPER.writeValueAsString(p_out));

View File

@ -185,12 +185,7 @@
"surname": "" "surname": ""
} }
], ],
"bestaccessright": { "bestaccessright": null,
"classid": "CLOSED",
"classname": "Closed Access",
"schemeid": "dnet:access_modes",
"schemename": "dnet:access_modes"
},
"collectedfrom": [ "collectedfrom": [
{ {
"key": "10|CSC_________::a2b9ce8435390bcbfc05f3cae3948747", "key": "10|CSC_________::a2b9ce8435390bcbfc05f3cae3948747",

View File

@ -9,6 +9,7 @@ import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
@ -115,11 +116,21 @@ public class CreateRelatedEntitiesJob_phase1 {
Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class))) Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class)))
.cache(); .cache();
Dataset<Tuple2<String, RelatedEntity>> entities = readPathEntity(spark, inputEntityPath, clazz) final String relatedEntityPath = outputPath + "_relatedEntity";
readPathEntity(spark, inputEntityPath, clazz)
.filter("dataInfo.invisible == false") .filter("dataInfo.invisible == false")
.map( .map(
(MapFunction<E, RelatedEntity>) value -> asRelatedEntity(value, clazz), (MapFunction<E, RelatedEntity>) value -> asRelatedEntity(value, clazz),
Encoders.kryo(RelatedEntity.class)) Encoders.kryo(RelatedEntity.class))
.repartition(5000)
.write()
.mode(SaveMode.Overwrite)
.parquet(relatedEntityPath);
Dataset<Tuple2<String, RelatedEntity>> entities = spark
.read()
.load(relatedEntityPath)
.as(Encoders.kryo(RelatedEntity.class))
.map( .map(
(MapFunction<RelatedEntity, Tuple2<String, RelatedEntity>>) e -> new Tuple2<>(e.getId(), e), (MapFunction<RelatedEntity, Tuple2<String, RelatedEntity>>) e -> new Tuple2<>(e.getId(), e),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class))) Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class)))
@ -165,13 +176,21 @@ public class CreateRelatedEntitiesJob_phase1 {
Result result = (Result) entity; Result result = (Result) entity;
if (result.getTitle() != null && !result.getTitle().isEmpty()) { if (result.getTitle() != null && !result.getTitle().isEmpty()) {
re.setTitle(result.getTitle().stream().findFirst().get()); final StructuredProperty title = result.getTitle().stream().findFirst().get();
title.setValue(StringUtils.left(title.getValue(), ProvisionConstants.MAX_TITLE_LENGTH));
re.setTitle(title);
} }
re.setDateofacceptance(getValue(result.getDateofacceptance())); re.setDateofacceptance(getValue(result.getDateofacceptance()));
re.setPublisher(getValue(result.getPublisher())); re.setPublisher(getValue(result.getPublisher()));
re.setResulttype(result.getResulttype()); re.setResulttype(result.getResulttype());
re.setInstances(result.getInstance()); re
.setInstances(
result
.getInstance()
.stream()
.limit(ProvisionConstants.MAX_INSTANCES)
.collect(Collectors.toList()));
// TODO still to be mapped // TODO still to be mapped
// re.setCodeRepositoryUrl(j.read("$.coderepositoryurl")); // re.setCodeRepositoryUrl(j.read("$.coderepositoryurl"));

View File

@ -61,12 +61,6 @@ public class CreateRelatedEntitiesJob_phase2 {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final int MAX_EXTERNAL_ENTITIES = 50;
private static final int MAX_AUTHORS = 200;
private static final int MAX_AUTHOR_FULLNAME_LENGTH = 1000;
private static final int MAX_TITLE_LENGTH = 5000;
private static final int MAX_ABSTRACT_LENGTH = 100000;
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils String jsonConfiguration = IOUtils
@ -246,15 +240,15 @@ public class CreateRelatedEntitiesJob_phase2 {
List<ExternalReference> refs = r List<ExternalReference> refs = r
.getExternalReference() .getExternalReference()
.stream() .stream()
.limit(MAX_EXTERNAL_ENTITIES) .limit(ProvisionConstants.MAX_EXTERNAL_ENTITIES)
.collect(Collectors.toList()); .collect(Collectors.toList());
r.setExternalReference(refs); r.setExternalReference(refs);
} }
if (r.getAuthor() != null) { if (r.getAuthor() != null) {
List<Author> authors = Lists.newArrayList(); List<Author> authors = Lists.newArrayList();
for (Author a : r.getAuthor()) { for (Author a : r.getAuthor()) {
a.setFullname(StringUtils.left(a.getFullname(), MAX_AUTHOR_FULLNAME_LENGTH)); a.setFullname(StringUtils.left(a.getFullname(), ProvisionConstants.MAX_AUTHOR_FULLNAME_LENGTH));
if (authors.size() < MAX_AUTHORS || hasORCID(a)) { if (authors.size() < ProvisionConstants.MAX_AUTHORS || hasORCID(a)) {
authors.add(a); authors.add(a);
} }
} }
@ -266,7 +260,7 @@ public class CreateRelatedEntitiesJob_phase2 {
.stream() .stream()
.filter(Objects::nonNull) .filter(Objects::nonNull)
.map(d -> { .map(d -> {
d.setValue(StringUtils.left(d.getValue(), MAX_ABSTRACT_LENGTH)); d.setValue(StringUtils.left(d.getValue(), ProvisionConstants.MAX_ABSTRACT_LENGTH));
return d; return d;
}) })
.collect(Collectors.toList()); .collect(Collectors.toList());
@ -278,9 +272,10 @@ public class CreateRelatedEntitiesJob_phase2 {
.stream() .stream()
.filter(Objects::nonNull) .filter(Objects::nonNull)
.map(t -> { .map(t -> {
t.setValue(StringUtils.left(t.getValue(), MAX_TITLE_LENGTH)); t.setValue(StringUtils.left(t.getValue(), ProvisionConstants.MAX_TITLE_LENGTH));
return t; return t;
}) })
.limit(ProvisionConstants.MAX_TITLES)
.collect(Collectors.toList()); .collect(Collectors.toList());
r.setTitle(titles); r.setTitle(titles);
} }

View File

@ -100,11 +100,17 @@ public class PrepareRelationsJob {
.orElse(new HashSet<>()); .orElse(new HashSet<>());
log.info("relationFilter: {}", relationFilter); log.info("relationFilter: {}", relationFilter);
int maxRelations = Optional int sourceMaxRelations = Optional
.ofNullable(parser.get("maxRelations")) .ofNullable(parser.get("sourceMaxRelations"))
.map(Integer::valueOf) .map(Integer::valueOf)
.orElse(MAX_RELS); .orElse(MAX_RELS);
log.info("maxRelations: {}", maxRelations); log.info("sourceMaxRelations: {}", sourceMaxRelations);
int targetMaxRelations = Optional
.ofNullable(parser.get("targetMaxRelations"))
.map(Integer::valueOf)
.orElse(MAX_RELS);
log.info("targetMaxRelations: {}", targetMaxRelations);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
@ -116,7 +122,8 @@ public class PrepareRelationsJob {
spark -> { spark -> {
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
prepareRelationsRDD( prepareRelationsRDD(
spark, inputRelationsPath, outputPath, relationFilter, maxRelations, relPartitions); spark, inputRelationsPath, outputPath, relationFilter, sourceMaxRelations, targetMaxRelations,
relPartitions);
}); });
} }
@ -129,31 +136,40 @@ public class PrepareRelationsJob {
* @param inputRelationsPath source path for the graph relations * @param inputRelationsPath source path for the graph relations
* @param outputPath output path for the processed relations * @param outputPath output path for the processed relations
* @param relationFilter set of relation filters applied to the `relClass` field * @param relationFilter set of relation filters applied to the `relClass` field
* @param maxRelations maximum number of allowed outgoing edges * @param sourceMaxRelations maximum number of allowed outgoing edges grouping by relation.source
* @param targetMaxRelations maximum number of allowed outgoing edges grouping by relation.target
* @param relPartitions number of partitions for the output RDD * @param relPartitions number of partitions for the output RDD
*/ */
private static void prepareRelationsRDD(SparkSession spark, String inputRelationsPath, String outputPath, private static void prepareRelationsRDD(SparkSession spark, String inputRelationsPath, String outputPath,
Set<String> relationFilter, int maxRelations, int relPartitions) { Set<String> relationFilter, int sourceMaxRelations, int targetMaxRelations, int relPartitions) {
// group by SOURCE and apply limit JavaRDD<Relation> rels = readPathRelationRDD(spark, inputRelationsPath)
RDD<Relation> bySource = readPathRelationRDD(spark, inputRelationsPath)
.filter(rel -> rel.getDataInfo().getDeletedbyinference() == false) .filter(rel -> rel.getDataInfo().getDeletedbyinference() == false)
.filter(rel -> relationFilter.contains(rel.getRelClass()) == false) .filter(rel -> relationFilter.contains(rel.getRelClass()) == false);
.mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, r.getSource()), r))
JavaRDD<Relation> pruned = pruneRels(
pruneRels(
rels,
sourceMaxRelations, relPartitions, (Function<Relation, String>) r -> r.getSource()),
targetMaxRelations, relPartitions, (Function<Relation, String>) r -> r.getTarget());
spark
.createDataset(pruned.rdd(), Encoders.bean(Relation.class))
.repartition(relPartitions)
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
}
private static JavaRDD<Relation> pruneRels(JavaRDD<Relation> rels, int maxRelations,
int relPartitions, Function<Relation, String> idFn) {
return rels
.mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, idFn.call(r)), r))
.repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions)) .repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions))
.groupBy(Tuple2::_1) .groupBy(Tuple2::_1)
.map(Tuple2::_2) .map(Tuple2::_2)
.map(t -> Iterables.limit(t, maxRelations)) .map(t -> Iterables.limit(t, maxRelations))
.flatMap(Iterable::iterator) .flatMap(Iterable::iterator)
.map(Tuple2::_2) .map(Tuple2::_2);
.rdd();
spark
.createDataset(bySource, Encoders.bean(Relation.class))
.repartition(relPartitions)
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
} }
// experimental // experimental

View File

@ -0,0 +1,14 @@
package eu.dnetlib.dhp.oa.provision;
public class ProvisionConstants {
public static final int MAX_EXTERNAL_ENTITIES = 50;
public static final int MAX_AUTHORS = 200;
public static final int MAX_AUTHOR_FULLNAME_LENGTH = 1000;
public static final int MAX_TITLE_LENGTH = 5000;
public static final int MAX_TITLES = 10;
public static final int MAX_ABSTRACT_LENGTH = 100000;
public static final int MAX_INSTANCES = 10;
}

View File

@ -16,18 +16,18 @@ public class SortableRelationKey implements Comparable<SortableRelationKey>, Ser
private static final Map<String, Integer> weights = Maps.newHashMap(); private static final Map<String, Integer> weights = Maps.newHashMap();
static { static {
weights.put("outcome", 0); weights.put("participation", 0);
weights.put("supplement", 1);
weights.put("review", 2);
weights.put("citation", 3);
weights.put("affiliation", 4);
weights.put("relationship", 5);
weights.put("publicationDataset", 6);
weights.put("similarity", 7);
weights.put("provision", 8); weights.put("outcome", 1);
weights.put("participation", 9); weights.put("affiliation", 2);
weights.put("dedup", 10); weights.put("dedup", 3);
weights.put("publicationDataset", 4);
weights.put("citation", 5);
weights.put("supplement", 6);
weights.put("review", 7);
weights.put("relationship", 8);
weights.put("provision", 9);
weights.put("similarity", 10);
} }
private static final long serialVersionUID = 3232323; private static final long serialVersionUID = 3232323;

View File

@ -30,9 +30,16 @@
"paramRequired": false "paramRequired": false
}, },
{ {
"paramName": "mr", "paramName": "smr",
"paramLongName": "maxRelations", "paramLongName": "sourceMaxRelations",
"paramDescription": "maximum number of relations allowed for a each entity", "paramDescription": "maximum number of relations allowed for a each entity grouping by source",
"paramRequired": false
},
{
"paramName": "tmr",
"paramLongName": "targetMaxRelations",
"paramDescription": "maximum number of relations allowed for a each entity grouping by target",
"paramRequired": false "paramRequired": false
} }
] ]

View File

@ -18,8 +18,12 @@
<description>filter applied reading relations (by relClass)</description> <description>filter applied reading relations (by relClass)</description>
</property> </property>
<property> <property>
<name>maxRelations</name> <name>sourceMaxRelations</name>
<description>maximum number of relations allowed for a each entity</description> <description>maximum number of relations allowed for a each entity grouping by source</description>
</property>
<property>
<name>targetMaxRelations</name>
<description>maximum number of relations allowed for a each entity grouping by target</description>
</property> </property>
<property> <property>
<name>otherDsTypeId</name> <name>otherDsTypeId</name>
@ -133,7 +137,8 @@
</spark-opts> </spark-opts>
<arg>--inputRelationsPath</arg><arg>${inputGraphRootPath}/relation</arg> <arg>--inputRelationsPath</arg><arg>${inputGraphRootPath}/relation</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation</arg> <arg>--outputPath</arg><arg>${workingDir}/relation</arg>
<arg>--maxRelations</arg><arg>${maxRelations}</arg> <arg>--sourceMaxRelations</arg><arg>${sourceMaxRelations}</arg>
<arg>--targetMaxRelations</arg><arg>${targetMaxRelations}</arg>
<arg>--relationFilter</arg><arg>${relationFilter}</arg> <arg>--relationFilter</arg><arg>${relationFilter}</arg>
<arg>--relPartitions</arg><arg>5000</arg> <arg>--relPartitions</arg><arg>5000</arg>
</spark> </spark>
@ -166,7 +171,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=15000
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg> <arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
@ -193,7 +198,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=15000
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg> <arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
@ -220,7 +225,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=10000
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg> <arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
@ -247,7 +252,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg> <arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
@ -274,7 +279,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg> <arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
@ -301,7 +306,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg> <arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
@ -328,7 +333,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg> <arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
@ -367,7 +372,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15360 --conf spark.sql.shuffle.partitions=15000
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/publication</arg> <arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/publication</arg>
@ -395,7 +400,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=10000
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/dataset</arg> <arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/dataset</arg>
@ -423,7 +428,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=10000
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/otherresearchproduct</arg> <arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/otherresearchproduct</arg>
@ -451,7 +456,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/software</arg> <arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/software</arg>
@ -479,7 +484,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=8000
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/datasource</arg> <arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/datasource</arg>
@ -507,7 +512,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=10000
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/organization</arg> <arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/organization</arg>
@ -535,7 +540,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/project</arg> <arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/project</arg>
@ -607,5 +612,4 @@
</action> </action>
<end name="End"/> <end name="End"/>
</workflow-app> </workflow-app>