forked from antonis.lempesis/dnet-hadoop
Merge branch 'master' of code-repo.d4science.org:D-Net/dnet-hadoop
This commit is contained in:
commit
c01efed79b
|
@ -57,9 +57,9 @@
|
|||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib</groupId>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<artifactId>dnet-openaire-broker-common</artifactId>
|
||||
<version>[3.0.4,4.0.0)</version>
|
||||
<version>[3.0.0-SNAPSHOT,)</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
|
|
@ -0,0 +1,112 @@
|
|||
|
||||
package eu.dnetlib.dhp.broker.oa;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.sql.Encoder;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.apache.spark.sql.TypedColumn;
|
||||
import org.apache.spark.sql.expressions.Aggregator;
|
||||
import org.apache.spark.util.LongAccumulator;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.broker.model.Event;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class CheckDuplictedIdsJob {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(CheckDuplictedIdsJob.class);
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
CheckDuplictedIdsJob.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
final SparkConf conf = new SparkConf();
|
||||
|
||||
final String eventsPath = parser.get("workingPath") + "/events";
|
||||
log.info("eventsPath: {}", eventsPath);
|
||||
|
||||
final String countPath = parser.get("workingPath") + "/counts";
|
||||
log.info("countPath: {}", countPath);
|
||||
|
||||
final SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
|
||||
|
||||
final LongAccumulator total = spark.sparkContext().longAccumulator("invaild_event_id");
|
||||
|
||||
final TypedColumn<Tuple2<String, Long>, Tuple2<String, Long>> agg = new CountAggregator().toColumn();
|
||||
|
||||
ClusterUtils
|
||||
.readPath(spark, eventsPath, Event.class)
|
||||
.map(e -> new Tuple2<>(e.getEventId(), 1l), Encoders.tuple(Encoders.STRING(), Encoders.LONG()))
|
||||
.groupByKey(t -> t._1, Encoders.STRING())
|
||||
.agg(agg)
|
||||
.map(t -> t._2, Encoders.tuple(Encoders.STRING(), Encoders.LONG()))
|
||||
.filter(t -> t._2 > 1)
|
||||
.map(o -> ClusterUtils.incrementAccumulator(o, total), Encoders.tuple(Encoders.STRING(), Encoders.LONG()))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(countPath);
|
||||
;
|
||||
|
||||
}
|
||||
|
||||
private static String eventAsJsonString(final Event f) throws JsonProcessingException {
|
||||
return new ObjectMapper().writeValueAsString(f);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class CountAggregator extends Aggregator<Tuple2<String, Long>, Tuple2<String, Long>, Tuple2<String, Long>> {
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
private static final long serialVersionUID = 1395935985734672538L;
|
||||
|
||||
@Override
|
||||
public Encoder<Tuple2<String, Long>> bufferEncoder() {
|
||||
return Encoders.tuple(Encoders.STRING(), Encoders.LONG());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Tuple2<String, Long> finish(final Tuple2<String, Long> arg0) {
|
||||
return arg0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Tuple2<String, Long> merge(final Tuple2<String, Long> arg0, final Tuple2<String, Long> arg1) {
|
||||
final String s = StringUtils.defaultIfBlank(arg0._1, arg1._1);
|
||||
return new Tuple2<>(s, arg0._2 + arg1._2);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Encoder<Tuple2<String, Long>> outputEncoder() {
|
||||
return Encoders.tuple(Encoders.STRING(), Encoders.LONG());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Tuple2<String, Long> reduce(final Tuple2<String, Long> arg0, final Tuple2<String, Long> arg1) {
|
||||
final String s = StringUtils.defaultIfBlank(arg0._1, arg1._1);
|
||||
return new Tuple2<>(s, arg0._2 + arg1._2);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Tuple2<String, Long> zero() {
|
||||
return new Tuple2<>(null, 0l);
|
||||
}
|
||||
|
||||
}
|
|
@ -3,8 +3,6 @@ package eu.dnetlib.dhp.broker.oa;
|
|||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
@ -20,8 +18,6 @@ import org.apache.spark.util.LongAccumulator;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.broker.model.Event;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
|
@ -29,9 +25,6 @@ import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
|
|||
import eu.dnetlib.dhp.broker.oa.util.EventFinder;
|
||||
import eu.dnetlib.dhp.broker.oa.util.EventGroup;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
|
||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
|
||||
public class GenerateEventsJob {
|
||||
|
||||
|
@ -54,30 +47,20 @@ public class GenerateEventsJob {
|
|||
final String workingPath = parser.get("workingPath");
|
||||
log.info("workingPath: {}", workingPath);
|
||||
|
||||
final String isLookupUrl = parser.get("isLookupUrl");
|
||||
log.info("isLookupUrl: {}", isLookupUrl);
|
||||
|
||||
final String dedupConfigProfileId = parser.get("dedupConfProfile");
|
||||
log.info("dedupConfigProfileId: {}", dedupConfigProfileId);
|
||||
|
||||
final String eventsPath = workingPath + "/events";
|
||||
log.info("eventsPath: {}", eventsPath);
|
||||
|
||||
final Set<String> dsIdWhitelist = parseParamAsList(parser, "datasourceIdWhitelist");
|
||||
final Set<String> dsIdWhitelist = ClusterUtils.parseParamAsList(parser, "datasourceIdWhitelist");
|
||||
log.info("datasourceIdWhitelist: {}", StringUtils.join(dsIdWhitelist, ","));
|
||||
|
||||
final Set<String> dsTypeWhitelist = parseParamAsList(parser, "datasourceTypeWhitelist");
|
||||
final Set<String> dsTypeWhitelist = ClusterUtils.parseParamAsList(parser, "datasourceTypeWhitelist");
|
||||
log.info("datasourceTypeWhitelist: {}", StringUtils.join(dsTypeWhitelist, ","));
|
||||
|
||||
final Set<String> dsIdBlacklist = parseParamAsList(parser, "datasourceIdBlacklist");
|
||||
final Set<String> dsIdBlacklist = ClusterUtils.parseParamAsList(parser, "datasourceIdBlacklist");
|
||||
log.info("datasourceIdBlacklist: {}", StringUtils.join(dsIdBlacklist, ","));
|
||||
|
||||
final SparkConf conf = new SparkConf();
|
||||
|
||||
// TODO UNCOMMENT
|
||||
// final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId);
|
||||
final DedupConfig dedupConfig = null;
|
||||
|
||||
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
|
||||
|
||||
ClusterUtils.removeDir(spark, eventsPath);
|
||||
|
@ -92,7 +75,7 @@ public class GenerateEventsJob {
|
|||
final Dataset<Event> dataset = groups
|
||||
.map(
|
||||
g -> EventFinder
|
||||
.generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, dedupConfig, accumulators),
|
||||
.generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, accumulators),
|
||||
Encoders
|
||||
.bean(EventGroup.class))
|
||||
.flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class));
|
||||
|
@ -103,22 +86,6 @@ public class GenerateEventsJob {
|
|||
|
||||
}
|
||||
|
||||
private static Set<String> parseParamAsList(final ArgumentApplicationParser parser, final String key) {
|
||||
final String s = parser.get(key).trim();
|
||||
|
||||
final Set<String> res = new HashSet<>();
|
||||
|
||||
if (s.length() > 1) { // A value of a single char (for example: '-') indicates an empty list
|
||||
Arrays
|
||||
.stream(s.split(","))
|
||||
.map(String::trim)
|
||||
.filter(StringUtils::isNotBlank)
|
||||
.forEach(res::add);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
public static Map<String, LongAccumulator> prepareAccumulators(final SparkContext sc) {
|
||||
|
||||
return EventFinder
|
||||
|
@ -130,23 +97,4 @@ public class GenerateEventsJob {
|
|||
|
||||
}
|
||||
|
||||
private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception {
|
||||
|
||||
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
||||
|
||||
final String conf = isLookUpService
|
||||
.getResourceProfileByQuery(
|
||||
String
|
||||
.format(
|
||||
"for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()",
|
||||
profId));
|
||||
|
||||
final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class);
|
||||
dedupConfig.getPace().initModel();
|
||||
dedupConfig.getPace().initTranslationMap();
|
||||
// dedupConfig.getWf().setConfigurationId("???");
|
||||
|
||||
return dedupConfig;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -48,7 +48,7 @@ public class IndexOnESJob {
|
|||
|
||||
final JavaRDD<String> inputRdd = ClusterUtils
|
||||
.readPath(spark, eventsPath, Event.class)
|
||||
.limit(10000) // TODO REMOVE
|
||||
// .limit(10000) // TODO REMOVE
|
||||
.map(IndexOnESJob::eventAsJsonString, Encoders.STRING())
|
||||
.javaRDD();
|
||||
|
||||
|
|
|
@ -17,7 +17,6 @@ import org.apache.spark.util.LongAccumulator;
|
|||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
|
||||
public abstract class UpdateMatcher<T> {
|
||||
|
||||
|
@ -37,7 +36,6 @@ public abstract class UpdateMatcher<T> {
|
|||
|
||||
public Collection<UpdateInfo<T>> searchUpdatesForRecord(final OaBrokerMainEntity res,
|
||||
final Collection<OaBrokerMainEntity> others,
|
||||
final DedupConfig dedupConfig,
|
||||
final Map<String, LongAccumulator> accumulators) {
|
||||
|
||||
final Map<String, UpdateInfo<T>> infoMap = new HashMap<>();
|
||||
|
@ -49,7 +47,7 @@ public abstract class UpdateMatcher<T> {
|
|||
if (topic != null) {
|
||||
final UpdateInfo<T> info = new UpdateInfo<>(topic, hl, source, res,
|
||||
getCompileHighlightFunction(),
|
||||
getHighlightToStringFunction(), dedupConfig);
|
||||
getHighlightToStringFunction());
|
||||
|
||||
final String s = DigestUtils.md5Hex(info.getHighlightValueAsString());
|
||||
if (!infoMap.containsKey(s) || infoMap.get(s).getTrust() < info.getTrust()) {
|
||||
|
|
|
@ -15,7 +15,7 @@ public class EnrichMissingProject extends UpdateMatcher<OaBrokerProject> {
|
|||
super(20,
|
||||
prj -> Topic.ENRICH_MISSING_PROJECT,
|
||||
(p, prj) -> p.getProjects().add(prj),
|
||||
prj -> prj.getFunder() + "::" + prj.getFundingProgram() + prj.getCode());
|
||||
prj -> prj.getOpenaireId());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -18,11 +18,7 @@ public class EnrichMoreProject extends UpdateMatcher<OaBrokerProject> {
|
|||
super(20,
|
||||
prj -> Topic.ENRICH_MORE_PROJECT,
|
||||
(p, prj) -> p.getProjects().add(prj),
|
||||
prj -> projectAsString(prj));
|
||||
}
|
||||
|
||||
private static String projectAsString(final OaBrokerProject prj) {
|
||||
return prj.getFunder() + "::" + prj.getFundingProgram() + "::" + prj.getCode();
|
||||
prj -> prj.getOpenaireId());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -36,13 +32,13 @@ public class EnrichMoreProject extends UpdateMatcher<OaBrokerProject> {
|
|||
final Set<String> existingProjects = target
|
||||
.getProjects()
|
||||
.stream()
|
||||
.map(EnrichMoreProject::projectAsString)
|
||||
.map(p -> p.getOpenaireId())
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
return source
|
||||
.getProjects()
|
||||
.stream()
|
||||
.filter(p -> !existingProjects.contains(projectAsString(p)))
|
||||
.filter(p -> !existingProjects.contains(p.getOpenaireId()))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,11 @@
|
|||
|
||||
package eu.dnetlib.dhp.broker.oa.util;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
|
@ -10,6 +15,7 @@ import org.apache.spark.util.LongAccumulator;
|
|||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||
|
||||
public class ClusterUtils {
|
||||
|
@ -53,7 +59,9 @@ public class ClusterUtils {
|
|||
return o;
|
||||
}
|
||||
|
||||
public static <T> void save(final Dataset<T> dataset, final String path, final Class<T> clazz,
|
||||
public static <T> void save(final Dataset<T> dataset,
|
||||
final String path,
|
||||
final Class<T> clazz,
|
||||
final LongAccumulator acc) {
|
||||
dataset
|
||||
.map(o -> ClusterUtils.incrementAccumulator(o, acc), Encoders.bean(clazz))
|
||||
|
@ -62,4 +70,20 @@ public class ClusterUtils {
|
|||
.json(path);
|
||||
}
|
||||
|
||||
public static Set<String> parseParamAsList(final ArgumentApplicationParser parser, final String key) {
|
||||
final String s = parser.get(key).trim();
|
||||
|
||||
final Set<String> res = new HashSet<>();
|
||||
|
||||
if (s.length() > 1) { // A value of a single char (for example: '-') indicates an empty list
|
||||
Arrays
|
||||
.stream(s.split(","))
|
||||
.map(String::trim)
|
||||
.filter(StringUtils::isNotBlank)
|
||||
.forEach(res::add);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -142,6 +142,7 @@ public class ConversionUtils {
|
|||
.filter(pid -> pid.getQualifier().getClassid() != null)
|
||||
.filter(pid -> pid.getQualifier().getClassid().equalsIgnoreCase("orcid"))
|
||||
.map(pid -> pid.getValue())
|
||||
.map(pid -> cleanOrcid(pid))
|
||||
.filter(StringUtils::isNotBlank)
|
||||
.findFirst()
|
||||
.orElse(null) : null;
|
||||
|
@ -149,6 +150,11 @@ public class ConversionUtils {
|
|||
return new OaBrokerAuthor(author.getFullname(), pids);
|
||||
}
|
||||
|
||||
private static String cleanOrcid(final String s) {
|
||||
final String match = "//orcid.org/";
|
||||
return s.contains(match) ? StringUtils.substringAfter(s, match) : s;
|
||||
}
|
||||
|
||||
private static OaBrokerJournal oafJournalToBrokerJournal(final Journal journal) {
|
||||
if (journal == null) {
|
||||
return null;
|
||||
|
|
|
@ -37,7 +37,6 @@ import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreOpenAccess;
|
|||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMorePid;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSubject;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
|
||||
public class EventFinder {
|
||||
|
||||
|
@ -70,14 +69,12 @@ public class EventFinder {
|
|||
matchers.add(new EnrichMissingDatasetReferences());
|
||||
matchers.add(new EnrichMissingDatasetIsSupplementedTo());
|
||||
matchers.add(new EnrichMissingDatasetIsSupplementedBy());
|
||||
matchers.add(new EnrichMissingAbstract());
|
||||
}
|
||||
|
||||
public static EventGroup generateEvents(final ResultGroup results,
|
||||
final Set<String> dsIdWhitelist,
|
||||
final Set<String> dsIdBlacklist,
|
||||
final Set<String> dsTypeWhitelist,
|
||||
final DedupConfig dedupConfig,
|
||||
final Map<String, LongAccumulator> accumulators) {
|
||||
|
||||
final List<UpdateInfo<?>> list = new ArrayList<>();
|
||||
|
@ -85,7 +82,7 @@ public class EventFinder {
|
|||
for (final OaBrokerMainEntity target : results.getData()) {
|
||||
if (verifyTarget(target, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist)) {
|
||||
for (final UpdateMatcher<?> matcher : matchers) {
|
||||
list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), dedupConfig, accumulators));
|
||||
list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), accumulators));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,8 +1,62 @@
|
|||
|
||||
package eu.dnetlib.dhp.broker.oa.util;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
import eu.dnetlib.pace.model.MapDocument;
|
||||
import eu.dnetlib.pace.tree.support.TreeProcessor;
|
||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||
|
||||
public class TrustUtils {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(TrustUtils.class);
|
||||
|
||||
private static DedupConfig dedupConfig;
|
||||
|
||||
static {
|
||||
final ObjectMapper mapper = new ObjectMapper();
|
||||
try {
|
||||
dedupConfig = mapper
|
||||
.readValue(
|
||||
DedupConfig.class.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/dedupConfig/dedupConfig.json"),
|
||||
DedupConfig.class);
|
||||
} catch (final IOException e) {
|
||||
log.error("Error loading dedupConfig, e");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
protected static float calculateTrust(final OaBrokerMainEntity r1, final OaBrokerMainEntity r2) {
|
||||
|
||||
if (dedupConfig == null) {
|
||||
return BrokerConstants.MIN_TRUST;
|
||||
}
|
||||
|
||||
try {
|
||||
final ObjectMapper objectMapper = new ObjectMapper();
|
||||
final MapDocument doc1 = MapDocumentUtil
|
||||
.asMapDocumentWithJPath(dedupConfig, objectMapper.writeValueAsString(r1));
|
||||
final MapDocument doc2 = MapDocumentUtil
|
||||
.asMapDocumentWithJPath(dedupConfig, objectMapper.writeValueAsString(r2));
|
||||
|
||||
final double score = new TreeProcessor(dedupConfig).computeScore(doc1, doc2);
|
||||
|
||||
final double threshold = dedupConfig.getWf().getThreshold();
|
||||
|
||||
return TrustUtils.rescale(score, threshold);
|
||||
} catch (final Exception e) {
|
||||
log.error("Error computing score between results", e);
|
||||
return BrokerConstants.MIN_TRUST;
|
||||
}
|
||||
}
|
||||
|
||||
public static float rescale(final double score, final double threshold) {
|
||||
if (score >= BrokerConstants.MAX_TRUST) {
|
||||
return BrokerConstants.MAX_TRUST;
|
||||
|
|
|
@ -4,20 +4,11 @@ package eu.dnetlib.dhp.broker.oa.util;
|
|||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.broker.objects.OaBrokerEventPayload;
|
||||
import eu.dnetlib.broker.objects.OaBrokerInstance;
|
||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
import eu.dnetlib.broker.objects.OaBrokerProvenance;
|
||||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
import eu.dnetlib.pace.model.MapDocument;
|
||||
import eu.dnetlib.pace.tree.support.TreeProcessor;
|
||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||
|
||||
public final class UpdateInfo<T> {
|
||||
|
||||
|
@ -35,20 +26,17 @@ public final class UpdateInfo<T> {
|
|||
|
||||
private final float trust;
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(UpdateInfo.class);
|
||||
|
||||
public UpdateInfo(final Topic topic, final T highlightValue, final OaBrokerMainEntity source,
|
||||
final OaBrokerMainEntity target,
|
||||
final BiConsumer<OaBrokerMainEntity, T> compileHighlight,
|
||||
final Function<T, String> highlightToString,
|
||||
final DedupConfig dedupConfig) {
|
||||
final Function<T, String> highlightToString) {
|
||||
this.topic = topic;
|
||||
this.highlightValue = highlightValue;
|
||||
this.source = source;
|
||||
this.target = target;
|
||||
this.compileHighlight = compileHighlight;
|
||||
this.highlightToString = highlightToString;
|
||||
this.trust = calculateTrust(dedupConfig, source, target);
|
||||
this.trust = TrustUtils.calculateTrust(source, target);
|
||||
}
|
||||
|
||||
public T getHighlightValue() {
|
||||
|
@ -63,31 +51,6 @@ public final class UpdateInfo<T> {
|
|||
return target;
|
||||
}
|
||||
|
||||
private float calculateTrust(final DedupConfig dedupConfig,
|
||||
final OaBrokerMainEntity r1,
|
||||
final OaBrokerMainEntity r2) {
|
||||
|
||||
if (dedupConfig == null) {
|
||||
return BrokerConstants.MIN_TRUST;
|
||||
}
|
||||
|
||||
try {
|
||||
final ObjectMapper objectMapper = new ObjectMapper();
|
||||
final MapDocument doc1 = MapDocumentUtil
|
||||
.asMapDocumentWithJPath(dedupConfig, objectMapper.writeValueAsString(r1));
|
||||
final MapDocument doc2 = MapDocumentUtil
|
||||
.asMapDocumentWithJPath(dedupConfig, objectMapper.writeValueAsString(r2));
|
||||
|
||||
final double score = new TreeProcessor(dedupConfig).computeScore(doc1, doc2);
|
||||
final double threshold = dedupConfig.getWf().getThreshold();
|
||||
|
||||
return TrustUtils.rescale(score, threshold);
|
||||
} catch (final Exception e) {
|
||||
log.error("Error computing score between results", e);
|
||||
return BrokerConstants.MIN_TRUST;
|
||||
}
|
||||
}
|
||||
|
||||
protected Topic getTopic() {
|
||||
return topic;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,122 @@
|
|||
{
|
||||
"wf": {
|
||||
|
||||
},
|
||||
"pace": {
|
||||
"clustering": [
|
||||
{
|
||||
"name": "wordssuffixprefix",
|
||||
"fields": [
|
||||
"title"
|
||||
],
|
||||
"params": {
|
||||
"max": "2",
|
||||
"len": "3"
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "lowercase",
|
||||
"fields": [
|
||||
"doi"
|
||||
],
|
||||
"params": {
|
||||
|
||||
}
|
||||
}
|
||||
],
|
||||
"decisionTree": {
|
||||
"start": {
|
||||
"fields": [
|
||||
{
|
||||
"field": "doi",
|
||||
"comparator": "exactMatch",
|
||||
"weight": 1.0,
|
||||
"countIfUndefined": "false",
|
||||
"params": {
|
||||
|
||||
}
|
||||
}
|
||||
],
|
||||
"threshold": 0.5,
|
||||
"aggregation": "AVG",
|
||||
"positive": "MATCH",
|
||||
"negative": "layer1",
|
||||
"undefined": "layer1",
|
||||
"ignoreUndefined": "true"
|
||||
},
|
||||
"layer1": {
|
||||
"fields": [
|
||||
{
|
||||
"field": "title",
|
||||
"comparator": "titleVersionMatch",
|
||||
"weight": 0.9,
|
||||
"countIfUndefined": "false",
|
||||
"params": {
|
||||
|
||||
}
|
||||
},
|
||||
{
|
||||
"field": "authors",
|
||||
"comparator": "sizeMatch",
|
||||
"weight": 0.9,
|
||||
"countIfUndefined": "false",
|
||||
"params": {
|
||||
|
||||
}
|
||||
}
|
||||
],
|
||||
"threshold": 0.5,
|
||||
"aggregation": "AVG",
|
||||
"positive": "MATCH",
|
||||
"negative": "layer2",
|
||||
"undefined": "layer2",
|
||||
"ignoreUndefined": "true"
|
||||
},
|
||||
"layer2": {
|
||||
"fields": [
|
||||
{
|
||||
"field": "title",
|
||||
"comparator": "levensteinTitle",
|
||||
"weight": 1.0,
|
||||
"countIfUndefined": "true",
|
||||
"params": {
|
||||
|
||||
}
|
||||
}
|
||||
],
|
||||
"threshold": 0.99,
|
||||
"aggregation": "AVG",
|
||||
"positive": "MATCH",
|
||||
"negative": "NO_MATCH",
|
||||
"undefined": "NO_MATCH",
|
||||
"ignoreUndefined": "true"
|
||||
}
|
||||
},
|
||||
"model": [
|
||||
{
|
||||
"name": "doi",
|
||||
"type": "String",
|
||||
"path": "$.pids[?(@.type == 'doi')].value"
|
||||
},
|
||||
{
|
||||
"name": "title",
|
||||
"type": "String",
|
||||
"path": "$.titles",
|
||||
"length": 250,
|
||||
"size": 5
|
||||
},
|
||||
{
|
||||
"name": "authors",
|
||||
"type": "List",
|
||||
"path": "$.creators[*].fullname",
|
||||
"size": 200
|
||||
}
|
||||
],
|
||||
"blacklists": {
|
||||
|
||||
},
|
||||
"synonyms": {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
|
@ -8,14 +8,6 @@
|
|||
<property>
|
||||
<name>workingPath</name>
|
||||
<description>the path where the the generated data will be stored</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>isLookupUrl</name>
|
||||
<description>the address of the lookUp service</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>dedupConfProfId</name>
|
||||
<description>the id of a valid Dedup Configuration Profile</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>datasourceIdWhitelist</name>
|
||||
|
@ -427,8 +419,6 @@
|
|||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||
<arg>--dedupConfProfile</arg><arg>${dedupConfProfId}</arg>
|
||||
<arg>--datasourceIdWhitelist</arg><arg>${datasourceIdWhitelist}</arg>
|
||||
<arg>--datasourceTypeWhitelist</arg><arg>${datasourceTypeWhitelist}</arg>
|
||||
<arg>--datasourceIdBlacklist</arg><arg>${datasourceIdBlacklist}</arg>
|
||||
|
@ -447,7 +437,7 @@
|
|||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.dynamicAllocation.maxExecutors="2"
|
||||
--conf spark.dynamicAllocation.maxExecutors="8"
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
|
|
|
@ -5,18 +5,6 @@
|
|||
"paramDescription": "the path where the generated events will be stored",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "lu",
|
||||
"paramLongName": "isLookupUrl",
|
||||
"paramDescription": "the address of the ISLookUpService",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "d",
|
||||
"paramLongName": "dedupConfProfile",
|
||||
"paramDescription": "the id of a valid Dedup Configuration Profile",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "datasourceIdWhitelist",
|
||||
"paramLongName": "datasourceIdWhitelist",
|
||||
|
|
|
@ -9,15 +9,6 @@
|
|||
<name>workingPath</name>
|
||||
<description>the path where the the generated data will be stored</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>isLookupUrl</name>
|
||||
<description>the address of the lookUp service</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>dedupConfProfId</name>
|
||||
<description>the id of a valid Dedup Configuration Profile</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
|
@ -73,33 +64,32 @@
|
|||
</configuration>
|
||||
</global>
|
||||
|
||||
<start to="index_es"/>
|
||||
<start to="count"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
|
||||
<action name="index_es">
|
||||
<action name="count">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>IndexOnESJob</name>
|
||||
<class>eu.dnetlib.dhp.broker.oa.IndexOnESJob</class>
|
||||
<name>Count</name>
|
||||
<class>eu.dnetlib.dhp.broker.oa.CheckDuplictedIdsJob</class>
|
||||
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.dynamicAllocation.maxExecutors="2"
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
<arg>--index</arg><arg>${esIndexName}</arg>
|
||||
<arg>--esHost</arg><arg>${esIndexHost}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
|
|
|
@ -30,7 +30,7 @@ class UpdateMatcherTest {
|
|||
final OaBrokerMainEntity p4 = new OaBrokerMainEntity();
|
||||
|
||||
final Collection<UpdateInfo<String>> list = matcher
|
||||
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null);
|
||||
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null);
|
||||
|
||||
assertTrue(list.isEmpty());
|
||||
}
|
||||
|
@ -46,7 +46,7 @@ class UpdateMatcherTest {
|
|||
res.setPublicationdate("2018");
|
||||
|
||||
final Collection<UpdateInfo<String>> list = matcher
|
||||
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null);
|
||||
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null);
|
||||
|
||||
assertTrue(list.isEmpty());
|
||||
}
|
||||
|
@ -62,7 +62,7 @@ class UpdateMatcherTest {
|
|||
p2.setPublicationdate("2018");
|
||||
|
||||
final Collection<UpdateInfo<String>> list = matcher
|
||||
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null);
|
||||
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null);
|
||||
|
||||
assertTrue(list.size() == 1);
|
||||
}
|
||||
|
@ -79,7 +79,7 @@ class UpdateMatcherTest {
|
|||
p2.setPublicationdate("2018");
|
||||
|
||||
final Collection<UpdateInfo<String>> list = matcher
|
||||
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null);
|
||||
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null);
|
||||
|
||||
assertTrue(list.isEmpty());
|
||||
}
|
||||
|
@ -98,7 +98,7 @@ class UpdateMatcherTest {
|
|||
p4.setPublicationdate("2018");
|
||||
|
||||
final Collection<UpdateInfo<String>> list = matcher
|
||||
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null);
|
||||
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null);
|
||||
|
||||
assertTrue(list.isEmpty());
|
||||
}
|
||||
|
@ -117,7 +117,7 @@ class UpdateMatcherTest {
|
|||
p4.setPublicationdate("2018");
|
||||
|
||||
final Collection<UpdateInfo<String>> list = matcher
|
||||
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null);
|
||||
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null);
|
||||
|
||||
assertTrue(list.size() == 1);
|
||||
}
|
||||
|
|
|
@ -5,6 +5,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
|||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import eu.dnetlib.broker.objects.OaBrokerAuthor;
|
||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
import eu.dnetlib.broker.objects.OaBrokerTypedValue;
|
||||
|
||||
public class TrustUtilsTest {
|
||||
|
||||
private static final double THRESHOLD = 0.95;
|
||||
|
@ -64,6 +68,23 @@ public class TrustUtilsTest {
|
|||
verifyValue(2.00, BrokerConstants.MAX_TRUST);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
final OaBrokerMainEntity r1 = new OaBrokerMainEntity();
|
||||
r1.getTitles().add("D-NET Service Package: Data Import");
|
||||
r1.getPids().add(new OaBrokerTypedValue("doi", "123"));
|
||||
r1.getCreators().add(new OaBrokerAuthor("Michele Artini", null));
|
||||
r1.getCreators().add(new OaBrokerAuthor("Claudio Atzori", null));
|
||||
|
||||
final OaBrokerMainEntity r2 = new OaBrokerMainEntity();
|
||||
r2.getTitles().add("D-NET Service Package: Data Import");
|
||||
// r2.getPids().add(new OaBrokerTypedValue("doi", "123"));
|
||||
r2.getCreators().add(new OaBrokerAuthor("Michele Artini", null));
|
||||
// r2.getCreators().add(new OaBrokerAuthor("Claudio Atzori", null));
|
||||
|
||||
System.out.println("TRUST: " + TrustUtils.calculateTrust(r1, r2));
|
||||
}
|
||||
|
||||
private void verifyValue(final double originalScore, final float expectedTrust) {
|
||||
final float trust = TrustUtils.rescale(originalScore, THRESHOLD);
|
||||
System.out.println(trust);
|
||||
|
|
|
@ -37,7 +37,7 @@ public class Deduper implements Serializable {
|
|||
public static JavaPairRDD<String, Block> createSortedBlocks(
|
||||
JavaPairRDD<String, MapDocument> mapDocs, DedupConfig config) {
|
||||
final String of = config.getWf().getOrderField();
|
||||
final int maxQueueSize = config.getWf().getGroupMaxSize();
|
||||
final int maxQueueSize = config.getWf().getQueueMaxSize();
|
||||
|
||||
return mapDocs
|
||||
// the reduce is just to be sure that we haven't document with same id
|
||||
|
|
|
@ -45,6 +45,16 @@ public class EntityMergerTest implements Serializable {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void softwareMergerTest() throws InstantiationException, IllegalAccessException {
|
||||
List<Tuple2<String, Software>> softwares = readSample(testEntityBasePath + "/software_merge.json", Software.class);
|
||||
|
||||
Software merged = DedupRecordFactory
|
||||
.entityMerger(dedupId, softwares.iterator(), 0, dataInfo, Software.class);
|
||||
|
||||
System.out.println(merged.getBestaccessright().getClassid());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void publicationMergerTest() throws InstantiationException, IllegalAccessException {
|
||||
|
||||
|
|
|
@ -182,7 +182,7 @@ public class SparkDedupTest implements Serializable {
|
|||
.count();
|
||||
|
||||
assertEquals(3432, orgs_simrel);
|
||||
assertEquals(7152, pubs_simrel);
|
||||
assertEquals(7054, pubs_simrel);
|
||||
assertEquals(344, sw_simrel);
|
||||
assertEquals(458, ds_simrel);
|
||||
assertEquals(6750, orp_simrel);
|
||||
|
@ -234,7 +234,7 @@ public class SparkDedupTest implements Serializable {
|
|||
.count();
|
||||
|
||||
assertEquals(1276, orgs_mergerel);
|
||||
assertEquals(1442, pubs_mergerel);
|
||||
assertEquals(1440, pubs_mergerel);
|
||||
assertEquals(288, sw_mergerel);
|
||||
assertEquals(472, ds_mergerel);
|
||||
assertEquals(718, orp_mergerel);
|
||||
|
@ -423,7 +423,7 @@ public class SparkDedupTest implements Serializable {
|
|||
|
||||
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
|
||||
|
||||
assertEquals(4975, relations);
|
||||
assertEquals(4971, relations);
|
||||
|
||||
// check deletedbyinference
|
||||
final Dataset<Relation> mergeRels = spark
|
||||
|
|
|
@ -6,10 +6,10 @@
|
|||
"subEntityType" : "resulttype",
|
||||
"subEntityValue" : "dataset",
|
||||
"orderField" : "title",
|
||||
"queueMaxSize" : "2000",
|
||||
"queueMaxSize" : "800",
|
||||
"groupMaxSize" : "100",
|
||||
"maxChildren" : "100",
|
||||
"slidingWindowSize" : "200",
|
||||
"slidingWindowSize" : "80",
|
||||
"rootBuilder" : ["result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
|
||||
"includeChildren" : "true",
|
||||
"idPath" : "$.id",
|
||||
|
@ -17,8 +17,7 @@
|
|||
},
|
||||
"pace" : {
|
||||
"clustering" : [
|
||||
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
|
||||
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
|
||||
{ "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } },
|
||||
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
|
||||
],
|
||||
"decisionTree" : {
|
||||
|
|
|
@ -6,10 +6,10 @@
|
|||
"subEntityType" : "resulttype",
|
||||
"subEntityValue" : "otherresearchproduct",
|
||||
"orderField" : "title",
|
||||
"queueMaxSize" : "2000",
|
||||
"queueMaxSize" : "800",
|
||||
"groupMaxSize" : "100",
|
||||
"maxChildren" : "100",
|
||||
"slidingWindowSize" : "200",
|
||||
"slidingWindowSize" : "80",
|
||||
"rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
|
||||
"includeChildren" : "true",
|
||||
"idPath" : "$.id",
|
||||
|
@ -17,8 +17,7 @@
|
|||
},
|
||||
"pace" : {
|
||||
"clustering" : [
|
||||
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
|
||||
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
|
||||
{ "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } },
|
||||
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
|
||||
],
|
||||
"decisionTree" : {
|
||||
|
|
|
@ -6,10 +6,10 @@
|
|||
"subEntityType": "resulttype",
|
||||
"subEntityValue": "publication",
|
||||
"orderField": "title",
|
||||
"queueMaxSize": "2000",
|
||||
"queueMaxSize": "800",
|
||||
"groupMaxSize": "100",
|
||||
"maxChildren": "100",
|
||||
"slidingWindowSize": "200",
|
||||
"slidingWindowSize": "80",
|
||||
"rootBuilder": [
|
||||
"result",
|
||||
"resultProject_outcome_isProducedBy",
|
||||
|
@ -29,8 +29,7 @@
|
|||
},
|
||||
"pace": {
|
||||
"clustering" : [
|
||||
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
|
||||
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
|
||||
{ "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } },
|
||||
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
|
||||
],
|
||||
"decisionTree": {
|
||||
|
|
|
@ -6,10 +6,10 @@
|
|||
"subEntityType" : "resulttype",
|
||||
"subEntityValue" : "software",
|
||||
"orderField" : "title",
|
||||
"queueMaxSize" : "2000",
|
||||
"queueMaxSize" : "800",
|
||||
"groupMaxSize" : "100",
|
||||
"maxChildren" : "100",
|
||||
"slidingWindowSize" : "200",
|
||||
"slidingWindowSize" : "80",
|
||||
"rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
|
||||
"includeChildren" : "true",
|
||||
"idPath" : "$.id",
|
||||
|
@ -17,8 +17,7 @@
|
|||
},
|
||||
"pace" : {
|
||||
"clustering" : [
|
||||
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
|
||||
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
|
||||
{ "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } },
|
||||
{ "name" : "lowercase", "fields" : [ "doi", "url" ], "params" : { } }
|
||||
],
|
||||
"decisionTree": {
|
||||
|
|
File diff suppressed because one or more lines are too long
|
@ -8,6 +8,7 @@ import java.util.Objects;
|
|||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
|
@ -97,7 +98,7 @@ public class CleanGraphSparkJob {
|
|||
.json(outputPath);
|
||||
}
|
||||
|
||||
private static <T extends Oaf> T fixDefaults(T value) {
|
||||
protected static <T extends Oaf> T fixDefaults(T value) {
|
||||
if (value instanceof Datasource) {
|
||||
// nothing to clean here
|
||||
} else if (value instanceof Project) {
|
||||
|
@ -134,11 +135,6 @@ public class CleanGraphSparkJob {
|
|||
.setResourcetype(
|
||||
qualifier("UNKNOWN", "Unknown", ModelConstants.DNET_DATA_CITE_RESOURCE));
|
||||
}
|
||||
if (Objects.isNull(r.getBestaccessright()) || StringUtils.isBlank(r.getBestaccessright().getClassid())) {
|
||||
r
|
||||
.setBestaccessright(
|
||||
qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES));
|
||||
}
|
||||
if (Objects.nonNull(r.getInstance())) {
|
||||
for (Instance i : r.getInstance()) {
|
||||
if (Objects.isNull(i.getAccessright()) || StringUtils.isBlank(i.getAccessright().getClassid())) {
|
||||
|
@ -152,6 +148,15 @@ public class CleanGraphSparkJob {
|
|||
}
|
||||
}
|
||||
}
|
||||
if (Objects.isNull(r.getBestaccessright()) || StringUtils.isBlank(r.getBestaccessright().getClassid())) {
|
||||
Qualifier bestaccessrights = AbstractMdRecordToOafMapper.createBestAccessRights(r.getInstance());
|
||||
if (Objects.isNull(bestaccessrights)) {
|
||||
r.setBestaccessright(
|
||||
qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES));
|
||||
} else {
|
||||
r.setBestaccessright(bestaccessrights);
|
||||
}
|
||||
}
|
||||
if (Objects.nonNull(r.getAuthor())) {
|
||||
boolean nullRank = r
|
||||
.getAuthor()
|
||||
|
|
|
@ -378,6 +378,10 @@ public abstract class AbstractMdRecordToOafMapper {
|
|||
|
||||
protected abstract Field<String> prepareDatasetStorageDate(Document doc, DataInfo info);
|
||||
|
||||
public static Qualifier createBestAccessRights(final List<Instance> instanceList) {
|
||||
return getBestAccessRights(instanceList);
|
||||
}
|
||||
|
||||
protected static Qualifier getBestAccessRights(final List<Instance> instanceList) {
|
||||
if (instanceList != null) {
|
||||
final Optional<Qualifier> min = instanceList
|
||||
|
|
|
@ -0,0 +1,157 @@
|
|||
<workflow-app name="import Claims as Graph" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>reuseContent</name>
|
||||
<value>false</value>
|
||||
<description>should import content from the aggregator or reuse a previous version</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>contentPath</name>
|
||||
<description>path location to store (or reuse) content from the aggregator</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>postgresURL</name>
|
||||
<description>the postgres URL to access to the database</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>postgresUser</name>
|
||||
<description>the user postgres</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>postgresPassword</name>
|
||||
<description>the password postgres</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>dbSchema</name>
|
||||
<value>beta</value>
|
||||
<description>the database schema according to the D-Net infrastructure (beta or production)</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>mongoURL</name>
|
||||
<description>mongoDB url, example: mongodb://[username:password@]host[:port]</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>mongoDb</name>
|
||||
<description>mongo database</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>isLookupUrl</name>
|
||||
<description>the address of the lookUp service</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozieActionShareLibForSpark2</name>
|
||||
<description>oozie action sharelib for spark 2.*</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2ExtraListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
||||
<description>spark 2.* extra listeners classname</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2SqlQueryExecutionListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
||||
<description>spark 2.* sql query execution listeners classname</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2YarnHistoryServerAddress</name>
|
||||
<description>spark 2.* yarn history server address</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2EventLogDir</name>
|
||||
<description>spark 2.* event log dir location</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<global>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<configuration>
|
||||
<property>
|
||||
<name>mapreduce.job.queuename</name>
|
||||
<value>${queueName}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.launcher.mapred.job.queue.name</name>
|
||||
<value>${oozieLauncherQueueName}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>${oozieActionShareLibForSpark2}</value>
|
||||
</property>
|
||||
</configuration>
|
||||
</global>
|
||||
|
||||
<start to="ImportDB_claims"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="ImportDB_claims">
|
||||
<java>
|
||||
<prepare>
|
||||
<delete path="${contentPath}/db_claims"/>
|
||||
</prepare>
|
||||
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication</main-class>
|
||||
<arg>--hdfsPath</arg><arg>${contentPath}/db_claims</arg>
|
||||
<arg>--postgresUrl</arg><arg>${postgresURL}</arg>
|
||||
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
|
||||
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
|
||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||
<arg>--action</arg><arg>claims</arg>
|
||||
<arg>--dbschema</arg><arg>${dbSchema}</arg>
|
||||
</java>
|
||||
<ok to="ImportODF_claims"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ImportODF_claims">
|
||||
<java>
|
||||
<prepare>
|
||||
<delete path="${contentPath}/odf_claims"/>
|
||||
</prepare>
|
||||
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication</main-class>
|
||||
<arg>-p</arg><arg>${contentPath}/odf_claims</arg>
|
||||
<arg>-mongourl</arg><arg>${mongoURL}</arg>
|
||||
<arg>-mongodb</arg><arg>${mongoDb}</arg>
|
||||
<arg>-f</arg><arg>ODF</arg>
|
||||
<arg>-l</arg><arg>store</arg>
|
||||
<arg>-i</arg><arg>claim</arg>
|
||||
</java>
|
||||
<ok to="ImportOAF_claims"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ImportOAF_claims">
|
||||
<java>
|
||||
<prepare>
|
||||
<delete path="${contentPath}/oaf_claims"/>
|
||||
</prepare>
|
||||
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication</main-class>
|
||||
<arg>-p</arg><arg>${contentPath}/oaf_claims</arg>
|
||||
<arg>-mongourl</arg><arg>${mongoURL}</arg>
|
||||
<arg>-mongodb</arg><arg>${mongoDb}</arg>
|
||||
<arg>-f</arg><arg>OAF</arg>
|
||||
<arg>-l</arg><arg>store</arg>
|
||||
<arg>-i</arg><arg>claim</arg>
|
||||
</java>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -1,169 +0,0 @@
|
|||
<workflow-app name="import Claims as Graph" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>migrationClaimsPathStep1</name>
|
||||
<description>the base path to store hdfs file</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>migrationClaimsPathStep2</name>
|
||||
<description>the temporary path to store entities before dispatching</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>migrationClaimsPathStep3</name>
|
||||
<description>the graph Raw base path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>postgresURL</name>
|
||||
<description>the postgres URL to access to the database</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>postgresUser</name>
|
||||
<description>the user postgres</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>postgresPassword</name>
|
||||
<description>the password postgres</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>mongoURL</name>
|
||||
<description>mongoDB url, example: mongodb://[username:password@]host[:port]</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>mongoDb</name>
|
||||
<description>mongo database</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="ResetWorkingPath"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="ResetWorkingPath">
|
||||
<fs>
|
||||
<delete path='${migrationClaimsPathStep1}'/>
|
||||
<mkdir path='${migrationClaimsPathStep1}'/>
|
||||
</fs>
|
||||
<ok to="ImportDBClaims"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ImportDBClaims">
|
||||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.dhp.migration.step1.MigrateDbEntitiesApplication</main-class>
|
||||
<arg>-p</arg><arg>${migrationClaimsPathStep1}/db_claims</arg>
|
||||
<arg>-pgurl</arg><arg>${postgresURL}</arg>
|
||||
<arg>-pguser</arg><arg>${postgresUser}</arg>
|
||||
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
|
||||
<arg>-a</arg><arg>claims</arg>
|
||||
</java>
|
||||
<ok to="ImportODFClaims"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ImportODFClaims">
|
||||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class>
|
||||
<arg>-p</arg><arg>${migrationClaimsPathStep1}/odf_claims</arg>
|
||||
<arg>-mongourl</arg><arg>${mongoURL}</arg>
|
||||
<arg>-mongodb</arg><arg>${mongoDb}</arg>
|
||||
<arg>-f</arg><arg>ODF</arg>
|
||||
<arg>-l</arg><arg>store</arg>
|
||||
<arg>-i</arg><arg>claim</arg>
|
||||
</java>
|
||||
<ok to="ImportOAFClaims"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ImportOAFClaims">
|
||||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class>
|
||||
<arg>-p</arg><arg>${migrationClaimsPathStep1}/oaf_claims</arg>
|
||||
<arg>-mongourl</arg><arg>${mongoURL}</arg>
|
||||
<arg>-mongodb</arg><arg>${mongoDb}</arg>
|
||||
<arg>-f</arg><arg>OAF</arg>
|
||||
<arg>-l</arg><arg>store</arg>
|
||||
<arg>-i</arg><arg>claim</arg>
|
||||
</java>
|
||||
<ok to="ResetClaimEntities"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ResetClaimEntities">
|
||||
<fs>
|
||||
<delete path='${migrationClaimsPathStep2}'/>
|
||||
<mkdir path='${migrationClaimsPathStep2}'/>
|
||||
</fs>
|
||||
<ok to="GenerateClaimEntities"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="GenerateClaimEntities">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>GenerateClaimEntities</name>
|
||||
<class>eu.dnetlib.dhp.migration.step2.GenerateEntitiesApplication</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>-s</arg><arg>${migrationClaimsPathStep1}/db_claims,${migrationClaimsPathStep1}/oaf_claims,${migrationClaimsPathStep1}/odf_claims</arg>
|
||||
<arg>-t</arg><arg>${migrationClaimsPathStep2}/claim_entities</arg>
|
||||
<arg>-pgurl</arg><arg>${postgresURL}</arg>
|
||||
<arg>-pguser</arg><arg>${postgresUser}</arg>
|
||||
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
|
||||
</spark>
|
||||
<ok to="ResetClaimGraph"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ResetClaimGraph">
|
||||
<fs>
|
||||
<delete path='${migrationClaimsPathStep3}'/>
|
||||
<mkdir path='${migrationClaimsPathStep3}'/>
|
||||
</fs>
|
||||
<ok to="GenerateClaimGraph"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="GenerateClaimGraph">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>GenerateClaimGraph</name>
|
||||
<class>eu.dnetlib.dhp.migration.step3.DispatchEntitiesApplication</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>-s</arg><arg>${migrationClaimsPathStep2}/claim_entities</arg>
|
||||
<arg>-g</arg><arg>${migrationClaimsPathStep3}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -57,6 +57,8 @@ public class CleaningFunctionTest {
|
|||
String json = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/result.json"));
|
||||
Publication p_in = MAPPER.readValue(json, Publication.class);
|
||||
|
||||
assertNull(p_in.getBestaccessright());
|
||||
|
||||
assertTrue(p_in instanceof Result);
|
||||
assertTrue(p_in instanceof Publication);
|
||||
|
||||
|
@ -84,6 +86,9 @@ public class CleaningFunctionTest {
|
|||
.map(p -> p.getQualifier())
|
||||
.allMatch(q -> pidTerms.contains(q.getClassid())));
|
||||
|
||||
Publication p_defaults = CleanGraphSparkJob.fixDefaults(p_out);
|
||||
assertEquals("CLOSED", p_defaults.getBestaccessright().getClassid());
|
||||
|
||||
// TODO add more assertions to verity the cleaned values
|
||||
System.out.println(MAPPER.writeValueAsString(p_out));
|
||||
|
||||
|
|
|
@ -185,12 +185,7 @@
|
|||
"surname": ""
|
||||
}
|
||||
],
|
||||
"bestaccessright": {
|
||||
"classid": "CLOSED",
|
||||
"classname": "Closed Access",
|
||||
"schemeid": "dnet:access_modes",
|
||||
"schemename": "dnet:access_modes"
|
||||
},
|
||||
"bestaccessright": null,
|
||||
"collectedfrom": [
|
||||
{
|
||||
"key": "10|CSC_________::a2b9ce8435390bcbfc05f3cae3948747",
|
||||
|
|
|
@ -156,6 +156,7 @@ public class PrepareRelationsJob {
|
|||
.parquet(outputPath);
|
||||
}
|
||||
|
||||
// experimental
|
||||
private static void prepareRelationsDataset(
|
||||
SparkSession spark, String inputRelationsPath, String outputPath, Set<String> relationFilter, int maxRelations,
|
||||
int relPartitions) {
|
||||
|
|
|
@ -134,6 +134,7 @@
|
|||
<arg>--inputRelationsPath</arg><arg>${inputGraphRootPath}/relation</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/relation</arg>
|
||||
<arg>--maxRelations</arg><arg>${maxRelations}</arg>
|
||||
<arg>--relationFilter</arg><arg>${relationFilter}</arg>
|
||||
<arg>--relPartitions</arg><arg>5000</arg>
|
||||
</spark>
|
||||
<ok to="fork_join_related_entities"/>
|
||||
|
|
|
@ -11,6 +11,29 @@
|
|||
<WORKFLOW_TYPE>Data Provision</WORKFLOW_TYPE>
|
||||
<WORKFLOW_PRIORITY>30</WORKFLOW_PRIORITY>
|
||||
<CONFIGURATION start="manual">
|
||||
|
||||
<NODE isStart="true" name="setReuseContent" type="SetEnvParameter">
|
||||
<DESCRIPTION>reuse cached content from the aggregation system</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
<PARAM managedBy="system" name="parameterName" required="true" type="string">reuseContent</PARAM>
|
||||
<PARAM function="validValues(['true', 'false'])" managedBy="user" name="parameterValue" required="true" type="string">true</PARAM>
|
||||
</PARAMETERS>
|
||||
<ARCS>
|
||||
<ARC to="waitConfig"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE isStart="true" name="setContentPath" type="SetEnvParameter">
|
||||
<DESCRIPTION>set the aggregator content path</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
<PARAM managedBy="system" name="parameterName" required="true" type="string">contentPath</PARAM>
|
||||
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_aggregator</PARAM>
|
||||
</PARAMETERS>
|
||||
<ARCS>
|
||||
<ARC to="waitConfig"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE isStart="true" name="setAggregatorGraphPath" type="SetEnvParameter">
|
||||
<DESCRIPTION>Set the path containing the AGGREGATOR graph</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
|
@ -62,87 +85,94 @@
|
|||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE isStart="true" name="setCleanedGraphPath" type="SetEnvParameter">
|
||||
<DESCRIPTION>Set the target path to store the CLEANED graph</DESCRIPTION>
|
||||
<NODE isStart="true" name="setOrcidGraphPath" type="SetEnvParameter">
|
||||
<DESCRIPTION>Set the target path to store the ORCID enriched graph</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
<PARAM managedBy="system" name="parameterName" required="true" type="string">cleanedGraphPath</PARAM>
|
||||
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/05_graph_cleaned</PARAM>
|
||||
<PARAM managedBy="system" name="parameterName" required="true" type="string">orcidGraphPath</PARAM>
|
||||
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/05_graph_orcid</PARAM>
|
||||
</PARAMETERS>
|
||||
<ARCS>
|
||||
<ARC to="waitConfig"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE isStart="true" name="setOrcidGraphPath" type="SetEnvParameter">
|
||||
<DESCRIPTION>Set the target path to store the ORCID enriched graph</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
<PARAM managedBy="system" name="parameterName" required="true" type="string">orcidGraphPath</PARAM>
|
||||
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/06_graph_orcid</PARAM>
|
||||
</PARAMETERS>
|
||||
<ARCS>
|
||||
<ARC to="waitConfig"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
<NODE isStart="true" name="setBulkTaggingGraphPath" type="SetEnvParameter">
|
||||
<DESCRIPTION>Set the target path to store the BULK TAGGED graph</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
<PARAM managedBy="system" name="parameterName" required="true" type="string">bulkTaggingGraphPath</PARAM>
|
||||
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/07_graph_bulktagging</PARAM>
|
||||
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/06_graph_bulktagging</PARAM>
|
||||
</PARAMETERS>
|
||||
<ARCS>
|
||||
<ARC to="waitConfig"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE isStart="true" name="setAffiliationGraphPath" type="SetEnvParameter">
|
||||
<DESCRIPTION>Set the target path to store the AFFILIATION from INSTITUTIONAL REPOS graph</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
<PARAM managedBy="system" name="parameterName" required="true" type="string">affiliationGraphPath</PARAM>
|
||||
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/08_graph_affiliation</PARAM>
|
||||
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/07_graph_affiliation</PARAM>
|
||||
</PARAMETERS>
|
||||
<ARCS>
|
||||
<ARC to="waitConfig"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE isStart="true" name="setCommunityOrganizationGraphPath" type="SetEnvParameter">
|
||||
<DESCRIPTION>Set the target path to store the COMMUNITY from SELECTED SOURCES graph</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
<PARAM managedBy="system" name="parameterName" required="true" type="string">communityOrganizationGraphPath</PARAM>
|
||||
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/09_graph_comunity_organization</PARAM>
|
||||
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/08_graph_comunity_organization</PARAM>
|
||||
</PARAMETERS>
|
||||
<ARCS>
|
||||
<ARC to="waitConfig"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE isStart="true" name="setFundingGraphPath" type="SetEnvParameter">
|
||||
<DESCRIPTION>Set the target path to store the FUNDING from SEMANTIC RELATION graph</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
<PARAM managedBy="system" name="parameterName" required="true" type="string">fundingGraphPath</PARAM>
|
||||
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/10_graph_funding</PARAM>
|
||||
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/09_graph_funding</PARAM>
|
||||
</PARAMETERS>
|
||||
<ARCS>
|
||||
<ARC to="waitConfig"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE isStart="true" name="setCommunitySemRelGraphPath" type="SetEnvParameter">
|
||||
<DESCRIPTION>Set the target path to store the COMMUNITY from SEMANTIC RELATION graph</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
<PARAM managedBy="system" name="parameterName" required="true" type="string">communitySemRelGraphPath</PARAM>
|
||||
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/11_graph_comunity_sem_rel</PARAM>
|
||||
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/10_graph_comunity_sem_rel</PARAM>
|
||||
</PARAMETERS>
|
||||
<ARCS>
|
||||
<ARC to="waitConfig"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE isStart="true" name="setCountryGraphPath" type="SetEnvParameter">
|
||||
<DESCRIPTION>Set the target path to store the COUNTRY enriched graph</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
<PARAM managedBy="system" name="parameterName" required="true" type="string">countryGraphPath</PARAM>
|
||||
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/12_graph_country</PARAM>
|
||||
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/11_graph_country</PARAM>
|
||||
</PARAMETERS>
|
||||
<ARCS>
|
||||
<ARC to="waitConfig"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE isStart="true" name="setCleanedGraphPath" type="SetEnvParameter">
|
||||
<DESCRIPTION>Set the target path to store the CLEANED graph</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
<PARAM managedBy="system" name="parameterName" required="true" type="string">cleanedGraphPath</PARAM>
|
||||
<PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/beta_provision/graph/12_graph_cleaned</PARAM>
|
||||
</PARAMETERS>
|
||||
<ARCS>
|
||||
<ARC to="waitConfig"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE isStart="true" name="setBlacklistedGraphPath" type="SetEnvParameter">
|
||||
<DESCRIPTION>Set the target path to store the blacklisted graph</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
|
@ -153,6 +183,7 @@
|
|||
<ARC to="waitConfig"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE isStart="true" name="setIsLookUpUrl" type="SetEnvParameter">
|
||||
<DESCRIPTION>Set the lookup address</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
|
@ -163,6 +194,7 @@
|
|||
<ARC to="waitConfig"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE isStart="true" name="setBulkTaggingPathMap" type="SetEnvParameter">
|
||||
<DESCRIPTION>Set the map of paths for the Bulk Tagging</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
|
@ -173,6 +205,7 @@
|
|||
<ARC to="waitConfig"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE isStart="true" name="setPropagationOrganizationCommunityMap" type="SetEnvParameter">
|
||||
<DESCRIPTION>Set the map of associations organization, community list for the propagation of community to result through organization</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
|
@ -185,6 +218,7 @@
|
|||
<ARC to="waitConfig"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE isStart="true" name="setDedupConfig" type="SetEnvParameter">
|
||||
<DESCRIPTION>Set the dedup orchestrator name</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
|
@ -195,6 +229,7 @@
|
|||
<ARC to="waitConfig"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE isStart="true" name="actionSetsRaw" type="SetEnvParameter">
|
||||
<DESCRIPTION>declares the ActionSet ids to promote in the RAW graph</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
|
@ -205,6 +240,7 @@
|
|||
<ARC to="waitConfig"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE isStart="true" name="actionSetsIIS" type="SetEnvParameter">
|
||||
<DESCRIPTION>declares the ActionSet ids to promote in the INFERRED graph</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
|
@ -215,6 +251,7 @@
|
|||
<ARC to="waitConfig"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE isJoin="true" name="waitConfig">
|
||||
<DESCRIPTION>wait configurations</DESCRIPTION>
|
||||
<PARAMETERS/>
|
||||
|
@ -222,6 +259,7 @@
|
|||
<ARC to="aggregatorGraph"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE name="aggregatorGraph" type="SubmitHadoopJob">
|
||||
<DESCRIPTION>create the AGGREGATOR graph</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
|
@ -230,7 +268,9 @@
|
|||
<PARAM managedBy="system" name="envParams" required="true" type="string">
|
||||
{
|
||||
'graphOutputPath' : 'aggregatorGraphPath',
|
||||
'isLookupUrl' : 'isLookUpUrl'
|
||||
'isLookupUrl' : 'isLookUpUrl',
|
||||
'reuseContent' : 'reuseContent',
|
||||
'contentPath' : 'contentPath'
|
||||
}
|
||||
</PARAM>
|
||||
<PARAM managedBy="system" name="params" required="true" type="string">
|
||||
|
@ -241,8 +281,6 @@
|
|||
'postgresURL' : 'jdbc:postgresql://beta.services.openaire.eu:5432/dnet_openaireplus',
|
||||
'postgresUser' : 'dnet',
|
||||
'postgresPassword' : '',
|
||||
'reuseContent' : 'false',
|
||||
'contentPath' : '/tmp/beta_provision/aggregator',
|
||||
'workingDir' : '/tmp/beta_provision/working_dir/aggregator'
|
||||
}
|
||||
</PARAM>
|
||||
|
@ -252,6 +290,7 @@
|
|||
<ARC to="promoteActionsRaw"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE name="promoteActionsRaw" type="SubmitHadoopJob">
|
||||
<DESCRIPTION>create the RAW graph</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
|
@ -289,6 +328,7 @@
|
|||
<ARC to="duplicateScan"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE name="duplicateScan" type="SubmitHadoopJob">
|
||||
<DESCRIPTION>search for duplicates in the raw graph</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
|
@ -314,6 +354,7 @@
|
|||
<ARC to="promoteActionsIIS"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE name="promoteActionsIIS" type="SubmitHadoopJob">
|
||||
<DESCRIPTION>create the INFERRED graph</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
|
@ -351,6 +392,7 @@
|
|||
<ARC to="dedupConsistency"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE name="dedupConsistency" type="SubmitHadoopJob">
|
||||
<DESCRIPTION>mark duplicates as deleted and redistribute the relationships</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
|
@ -375,41 +417,6 @@
|
|||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE name="graphCleaning" type="SubmitHadoopJob">
|
||||
<DESCRIPTION>clean the properties in the graph typed as Qualifier according to the vocabulary indicated in schemeid</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
|
||||
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
|
||||
<PARAM managedBy="system" name="envParams" required="true" type="string">
|
||||
{
|
||||
'graphInputPath' : 'consistentGraphPath',
|
||||
'graphOutputPath': 'cleanedGraphPath',
|
||||
'isLookupUrl': 'isLookUpUrl'
|
||||
}
|
||||
</PARAM>
|
||||
<PARAM managedBy="system" name="params" required="true" type="string">
|
||||
{
|
||||
'oozie.wf.application.path' : '/lib/dnet/oa/graph/clean/oozie_app',
|
||||
'workingPath' : '/tmp/beta_provision/working_dir/clean'
|
||||
}
|
||||
</PARAM>
|
||||
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
|
||||
</PARAMETERS>
|
||||
<ARCS>
|
||||
<ARC to="SKIP_ENRICHMENT"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE name="SKIP_ENRICHMENT" type="Selection">
|
||||
<DESCRIPTION>Do we skip the graph enrichment steps? (Yes to prepare the graph for the IIS)</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
<PARAM function="validValues(['YES', 'NO'])" managedBy="user" name="selection" required="true" type="string">NO</PARAM>
|
||||
</PARAMETERS>
|
||||
<ARCS>
|
||||
<ARC name="YES" to="success"/>
|
||||
<ARC name="NO" to="orcidPropagation"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
<NODE name="orcidPropagation" type="SubmitHadoopJob">
|
||||
<DESCRIPTION>propagates ORCID among results linked by allowedsemrels semantic relationships</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
|
@ -417,7 +424,7 @@
|
|||
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
|
||||
<PARAM managedBy="system" name="envParams" required="true" type="string">
|
||||
{
|
||||
'sourcePath' : 'cleanedGraphPath',
|
||||
'sourcePath' : 'consistentGraphPath',
|
||||
'outputPath': 'orcidGraphPath'
|
||||
}
|
||||
</PARAM>
|
||||
|
@ -435,6 +442,7 @@
|
|||
<ARC to="bulkTagging"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE name="bulkTagging" type="SubmitHadoopJob">
|
||||
<DESCRIPTION>mark results respecting some rules as belonging to communities</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
|
@ -460,6 +468,7 @@
|
|||
<ARC to="affiliationPropagation"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE name="affiliationPropagation" type="SubmitHadoopJob">
|
||||
<DESCRIPTION>creates relashionships between results and organizations when the organizations are associated to institutional repositories</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
|
@ -484,6 +493,7 @@
|
|||
<ARC to="communityOrganizationPropagation"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE name="communityOrganizationPropagation" type="SubmitHadoopJob">
|
||||
<DESCRIPTION>marks as belonging to communities the result collected from datasources related to the organizations specified in the organizationCommunityMap</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
|
@ -509,6 +519,7 @@
|
|||
<ARC to="resultProjectPropagation"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE name="resultProjectPropagation" type="SubmitHadoopJob">
|
||||
<DESCRIPTION>created relation between projects and results linked to other results trough allowedsemrel semantic relations linked to projects</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
|
@ -534,6 +545,7 @@
|
|||
<ARC to="communitySemrelPropagation"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE name="communitySemrelPropagation" type="SubmitHadoopJob">
|
||||
<DESCRIPTION>tag as belonging to communitites result in in allowedsemrels relation with other result already linked to communities </DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
|
@ -560,6 +572,7 @@
|
|||
<ARC to="countryPropagation"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE name="countryPropagation" type="SubmitHadoopJob">
|
||||
<DESCRIPTION>associated to results colleced from allowedtypes and those in the whithelist the country of the organization(s) handling the datasource it is collected from </DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
|
@ -584,10 +597,36 @@
|
|||
</PARAM>
|
||||
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
|
||||
</PARAMETERS>
|
||||
<ARCS>
|
||||
<ARC to="graphCleaning"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE name="graphCleaning" type="SubmitHadoopJob">
|
||||
<DESCRIPTION>clean the properties in the graph typed as Qualifier according to the vocabulary indicated in schemeid</DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
|
||||
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
|
||||
<PARAM managedBy="system" name="envParams" required="true" type="string">
|
||||
{
|
||||
'graphInputPath' : 'countryGraphPath',
|
||||
'graphOutputPath': 'cleanedGraphPath',
|
||||
'isLookupUrl': 'isLookUpUrl'
|
||||
}
|
||||
</PARAM>
|
||||
<PARAM managedBy="system" name="params" required="true" type="string">
|
||||
{
|
||||
'oozie.wf.application.path' : '/lib/dnet/oa/graph/clean/oozie_app',
|
||||
'workingPath' : '/tmp/beta_provision/working_dir/clean'
|
||||
}
|
||||
</PARAM>
|
||||
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
|
||||
</PARAMETERS>
|
||||
<ARCS>
|
||||
<ARC to="blacklistRelations"/>
|
||||
</ARCS>
|
||||
</NODE>
|
||||
|
||||
<NODE name="blacklistRelations" type="SubmitHadoopJob">
|
||||
<DESCRIPTION>removes blacklisted relations </DESCRIPTION>
|
||||
<PARAMETERS>
|
||||
|
@ -595,7 +634,7 @@
|
|||
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
|
||||
<PARAM managedBy="system" name="envParams" required="true" type="string">
|
||||
{
|
||||
'sourcePath' : 'countryGraphPath',
|
||||
'sourcePath' : 'cleanedGraphPath',
|
||||
'outputPath': 'blacklistedGraphPath'
|
||||
}
|
||||
</PARAM>
|
||||
|
|
Loading…
Reference in New Issue