forked from D-Net/dnet-hadoop
Merge branch 'master' of code-repo.d4science.org:D-Net/dnet-hadoop
This commit is contained in:
commit
464eeeec87
|
@ -24,11 +24,7 @@
|
||||||
<groupId>org.apache.spark</groupId>
|
<groupId>org.apache.spark</groupId>
|
||||||
<artifactId>spark-sql_2.11</artifactId>
|
<artifactId>spark-sql_2.11</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.spark</groupId>
|
|
||||||
<artifactId>spark-hive_2.11</artifactId>
|
|
||||||
<scope>test</scope>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>eu.dnetlib.dhp</groupId>
|
<groupId>eu.dnetlib.dhp</groupId>
|
||||||
|
@ -45,10 +41,6 @@
|
||||||
<artifactId>dnet-pace-core</artifactId>
|
<artifactId>dnet-pace-core</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
|
||||||
<groupId>com.jayway.jsonpath</groupId>
|
|
||||||
<artifactId>json-path</artifactId>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>dom4j</groupId>
|
<groupId>dom4j</groupId>
|
||||||
<artifactId>dom4j</artifactId>
|
<artifactId>dom4j</artifactId>
|
||||||
|
@ -61,7 +53,7 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>eu.dnetlib</groupId>
|
<groupId>eu.dnetlib</groupId>
|
||||||
<artifactId>dnet-openaire-broker-common</artifactId>
|
<artifactId>dnet-openaire-broker-common</artifactId>
|
||||||
<version>[3.0.1,4.0.0)</version>
|
<version>[3.0.2,4.0.0)</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
|
@ -31,12 +31,9 @@ import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OpenaireBrokerResultAg
|
||||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory;
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory;
|
||||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
|
||||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||||
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Software;
|
|
||||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||||
import eu.dnetlib.pace.config.DedupConfig;
|
import eu.dnetlib.pace.config.DedupConfig;
|
||||||
|
@ -75,25 +72,38 @@ public class GenerateEventsApplication {
|
||||||
log.info("dedupConfigProfileId: {}", dedupConfigProfileId);
|
log.info("dedupConfigProfileId: {}", dedupConfigProfileId);
|
||||||
|
|
||||||
final SparkConf conf = new SparkConf();
|
final SparkConf conf = new SparkConf();
|
||||||
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
// conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
||||||
conf.registerKryoClasses(BrokerConstants.getModelClasses());
|
// conf.registerKryoClasses(BrokerConstants.getModelClasses());
|
||||||
|
|
||||||
final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId);
|
// TODO UNCOMMENT
|
||||||
|
// final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId);
|
||||||
|
final DedupConfig dedupConfig = null;
|
||||||
|
|
||||||
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
|
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
|
||||||
|
|
||||||
removeOutputDir(spark, eventsPath);
|
removeOutputDir(spark, eventsPath);
|
||||||
|
|
||||||
spark
|
// TODO REMOVE THIS
|
||||||
.emptyDataset(Encoders.kryo(Event.class))
|
final Dataset<Project> projects = readPath(spark, graphPath + "/project", Project.class);
|
||||||
.union(generateEvents(spark, graphPath, Publication.class, dedupConfig))
|
final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class)
|
||||||
.union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig))
|
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
|
||||||
.union(generateEvents(spark, graphPath, Software.class, dedupConfig))
|
.cache();
|
||||||
.union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig))
|
relatedEntities(projects, rels, RelatedProject.class)
|
||||||
.write()
|
.write()
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.option("compression", "gzip")
|
|
||||||
.json(eventsPath);
|
.json(eventsPath);
|
||||||
|
|
||||||
|
// TODO UNCOMMENT THIS
|
||||||
|
// spark
|
||||||
|
// .emptyDataset(Encoders.bean(Event.class))
|
||||||
|
// .union(generateEvents(spark, graphPath, Publication.class, dedupConfig))
|
||||||
|
// .union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig))
|
||||||
|
// .union(generateEvents(spark, graphPath, Software.class, dedupConfig))
|
||||||
|
// .union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig))
|
||||||
|
// .write()
|
||||||
|
// .mode(SaveMode.Overwrite)
|
||||||
|
// .option("compression", "gzip")
|
||||||
|
// .json(eventsPath);
|
||||||
});
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -117,45 +127,48 @@ public class GenerateEventsApplication {
|
||||||
.toColumn();
|
.toColumn();
|
||||||
|
|
||||||
return results
|
return results
|
||||||
.joinWith(mergedRels, results.col("result.id").equalTo(mergedRels.col("source")), "inner")
|
.joinWith(mergedRels, results.col("openaireId").equalTo(mergedRels.col("source")), "inner")
|
||||||
.groupByKey(
|
.groupByKey(
|
||||||
(MapFunction<Tuple2<OpenaireBrokerResult, Relation>, String>) t -> t._2.getTarget(), Encoders.STRING())
|
(MapFunction<Tuple2<OpenaireBrokerResult, Relation>, String>) t -> t._2.getTarget(), Encoders.STRING())
|
||||||
.agg(aggr)
|
.agg(aggr)
|
||||||
.map((MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class))
|
.map((MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.bean(ResultGroup.class))
|
||||||
.filter(ResultGroup::isValid)
|
.filter(rg -> rg.getData().size() > 1)
|
||||||
.map(
|
.map(
|
||||||
(MapFunction<ResultGroup, EventGroup>) g -> EventFinder.generateEvents(g, dedupConfig),
|
(MapFunction<ResultGroup, EventGroup>) g -> EventFinder.generateEvents(g, dedupConfig),
|
||||||
Encoders.kryo(EventGroup.class))
|
Encoders.bean(EventGroup.class))
|
||||||
.flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class));
|
.flatMap(group -> group.getData().iterator(), Encoders.bean(Event.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <SRC extends Result> Dataset<OpenaireBrokerResult> expandResultsWithRelations(
|
private static <SRC extends Result> Dataset<OpenaireBrokerResult> expandResultsWithRelations(
|
||||||
final SparkSession spark,
|
final SparkSession spark,
|
||||||
final String graphPath,
|
final String graphPath,
|
||||||
final Class<SRC> sourceClass) {
|
final Class<SRC> sourceClass) {
|
||||||
|
|
||||||
final Dataset<Project> projects = readPath(spark, graphPath + "/project", Project.class);
|
final Dataset<Project> projects = readPath(spark, graphPath + "/project", Project.class);
|
||||||
final Dataset<eu.dnetlib.dhp.schema.oaf.Dataset> datasets = readPath(
|
// final Dataset<eu.dnetlib.dhp.schema.oaf.Dataset> datasets = readPath(
|
||||||
spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
|
// spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
|
||||||
final Dataset<Software> softwares = readPath(spark, graphPath + "/software", Software.class);
|
// final Dataset<Software> softwares = readPath(spark, graphPath + "/software", Software.class);
|
||||||
final Dataset<Publication> publications = readPath(spark, graphPath + "/publication", Publication.class);
|
// final Dataset<Publication> publications = readPath(spark, graphPath + "/publication", Publication.class);
|
||||||
|
|
||||||
final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class)
|
final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class)
|
||||||
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
|
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
|
||||||
.cache();
|
.cache();
|
||||||
|
|
||||||
final Dataset<OpenaireBrokerResult> r0 = readPath(
|
final Dataset<OpenaireBrokerResult> r0 = readPath(
|
||||||
spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class)
|
spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass)
|
||||||
.filter(r -> r.getDataInfo().getDeletedbyinference())
|
.filter(r -> r.getDataInfo().getDeletedbyinference())
|
||||||
.map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class));
|
.map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OpenaireBrokerResult.class));
|
||||||
|
|
||||||
|
// TODO UNCOMMENT THIS
|
||||||
final Dataset<OpenaireBrokerResult> r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class));
|
final Dataset<OpenaireBrokerResult> r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class));
|
||||||
final Dataset<OpenaireBrokerResult> r2 = join(r1, rels, relatedEntities(softwares, rels, RelatedProject.class));
|
// final Dataset<OpenaireBrokerResult> r2 = join(r1, rels, relatedEntities(softwares, rels,
|
||||||
final Dataset<OpenaireBrokerResult> r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedProject.class));
|
// RelatedSoftware.class));
|
||||||
final Dataset<OpenaireBrokerResult> r4 = join(
|
// final Dataset<OpenaireBrokerResult> r3 = join(r2, rels, relatedEntities(datasets, rels,
|
||||||
r3, rels, relatedEntities(publications, rels, RelatedProject.class));
|
// RelatedDataset.class));
|
||||||
;
|
// final Dataset<OpenaireBrokerResult> r4 = join(r3, rels, relatedEntities(publications, rels,
|
||||||
|
// RelatedPublication.class));;
|
||||||
|
|
||||||
return r4;
|
return r0; // TODO it should be r4
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <T, RT> Dataset<RT> relatedEntities(final Dataset<T> targets,
|
private static <T, RT> Dataset<RT> relatedEntities(final Dataset<T> targets,
|
||||||
|
@ -165,7 +178,7 @@ public class GenerateEventsApplication {
|
||||||
.joinWith(targets, targets.col("id").equalTo(rels.col("target")), "inner")
|
.joinWith(targets, targets.col("id").equalTo(rels.col("target")), "inner")
|
||||||
.map(
|
.map(
|
||||||
t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz),
|
t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz),
|
||||||
Encoders.kryo(clazz));
|
Encoders.bean(clazz));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <T> Dataset<OpenaireBrokerResult> join(final Dataset<OpenaireBrokerResult> sources,
|
private static <T> Dataset<OpenaireBrokerResult> join(final Dataset<OpenaireBrokerResult> sources,
|
||||||
|
@ -174,14 +187,14 @@ public class GenerateEventsApplication {
|
||||||
|
|
||||||
final TypedColumn<Tuple2<OpenaireBrokerResult, T>, OpenaireBrokerResult> aggr = new OpenaireBrokerResultAggregator<T>()
|
final TypedColumn<Tuple2<OpenaireBrokerResult, T>, OpenaireBrokerResult> aggr = new OpenaireBrokerResultAggregator<T>()
|
||||||
.toColumn();
|
.toColumn();
|
||||||
;
|
|
||||||
|
|
||||||
return sources
|
return sources
|
||||||
.joinWith(typedRels, sources.col("result.id").equalTo(rels.col("source")), "left_outer")
|
.joinWith(typedRels, sources.col("openaireId").equalTo(rels.col("source")), "left_outer")
|
||||||
.groupByKey(
|
.groupByKey(
|
||||||
(MapFunction<Tuple2<OpenaireBrokerResult, T>, String>) t -> t._1.getOpenaireId(), Encoders.STRING())
|
(MapFunction<Tuple2<OpenaireBrokerResult, T>, String>) t -> t._1.getOpenaireId(), Encoders.STRING())
|
||||||
.agg(aggr)
|
.agg(aggr)
|
||||||
.map(t -> t._2, Encoders.kryo(OpenaireBrokerResult.class));
|
.map(t -> t._2, Encoders.bean(OpenaireBrokerResult.class));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <R> Dataset<R> readPath(
|
public static <R> Dataset<R> readPath(
|
||||||
|
@ -195,6 +208,7 @@ public class GenerateEventsApplication {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception {
|
private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception {
|
||||||
|
|
||||||
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
||||||
|
|
||||||
final String conf = isLookUpService
|
final String conf = isLookUpService
|
||||||
|
|
|
@ -3,16 +3,18 @@ package eu.dnetlib.dhp.broker.oa.util;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.lang3.tuple.Pair;
|
|
||||||
import org.dom4j.Document;
|
import org.dom4j.Document;
|
||||||
import org.dom4j.DocumentException;
|
import org.dom4j.DocumentException;
|
||||||
import org.dom4j.DocumentHelper;
|
import org.dom4j.DocumentHelper;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.google.common.base.Function;
|
||||||
|
|
||||||
import eu.dnetlib.broker.objects.OpenaireBrokerResult;
|
import eu.dnetlib.broker.objects.OpenaireBrokerResult;
|
||||||
import eu.dnetlib.broker.objects.TypedValue;
|
import eu.dnetlib.broker.objects.TypedValue;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Author;
|
import eu.dnetlib.dhp.schema.oaf.Author;
|
||||||
|
@ -24,6 +26,7 @@ import eu.dnetlib.dhp.schema.oaf.Journal;
|
||||||
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Software;
|
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||||
|
@ -33,133 +36,133 @@ public class ConversionUtils {
|
||||||
private static final Logger log = LoggerFactory.getLogger(ConversionUtils.class);
|
private static final Logger log = LoggerFactory.getLogger(ConversionUtils.class);
|
||||||
|
|
||||||
public static List<eu.dnetlib.broker.objects.Instance> oafInstanceToBrokerInstances(final Instance i) {
|
public static List<eu.dnetlib.broker.objects.Instance> oafInstanceToBrokerInstances(final Instance i) {
|
||||||
return i.getUrl().stream().map(url -> {
|
if (i == null) {
|
||||||
return new eu.dnetlib.broker.objects.Instance()
|
return new ArrayList<>();
|
||||||
.setUrl(url)
|
}
|
||||||
.setInstancetype(i.getInstancetype().getClassid())
|
|
||||||
.setLicense(BrokerConstants.OPEN_ACCESS)
|
return mappedList(i.getUrl(), url -> {
|
||||||
.setHostedby(i.getHostedby().getValue());
|
final eu.dnetlib.broker.objects.Instance res = new eu.dnetlib.broker.objects.Instance();
|
||||||
}).collect(Collectors.toList());
|
res.setUrl(url);
|
||||||
|
res.setInstancetype(classId(i.getInstancetype()));
|
||||||
|
res.setLicense(BrokerConstants.OPEN_ACCESS);
|
||||||
|
res.setHostedby(kvValue(i.getHostedby()));
|
||||||
|
return res;
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public static TypedValue oafPidToBrokerPid(final StructuredProperty sp) {
|
public static TypedValue oafPidToBrokerPid(final StructuredProperty sp) {
|
||||||
return sp != null ? new TypedValue()
|
return oafStructPropToBrokerTypedValue(sp);
|
||||||
.setValue(sp.getValue())
|
|
||||||
.setType(sp.getQualifier().getClassid()) : null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final Pair<String, String> oafSubjectToPair(final StructuredProperty sp) {
|
public static TypedValue oafStructPropToBrokerTypedValue(final StructuredProperty sp) {
|
||||||
return sp != null ? Pair.of(sp.getQualifier().getClassid(), sp.getValue()) : null;
|
return sp != null ? new TypedValue(classId(sp.getQualifier()), sp.getValue()) : null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final eu.dnetlib.broker.objects.Dataset oafDatasetToBrokerDataset(final Dataset d) {
|
public static final eu.dnetlib.broker.objects.Dataset oafDatasetToBrokerDataset(final Dataset d) {
|
||||||
return d != null ? new eu.dnetlib.broker.objects.Dataset()
|
if (d == null) {
|
||||||
.setOriginalId(d.getOriginalId().get(0))
|
return null;
|
||||||
.setTitle(structPropValue(d.getTitle()))
|
}
|
||||||
.setPids(d.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList()))
|
|
||||||
.setInstances(
|
final eu.dnetlib.broker.objects.Dataset res = new eu.dnetlib.broker.objects.Dataset();
|
||||||
d
|
res.setOriginalId(first(d.getOriginalId()));
|
||||||
.getInstance()
|
res.setTitle(structPropValue(d.getTitle()));
|
||||||
.stream()
|
res.setPids(mappedList(d.getPid(), ConversionUtils::oafPidToBrokerPid));
|
||||||
.map(ConversionUtils::oafInstanceToBrokerInstances)
|
res.setInstances(flatMappedList(d.getInstance(), ConversionUtils::oafInstanceToBrokerInstances));
|
||||||
.flatMap(List::stream)
|
res.setCollectedFrom(mappedFirst(d.getCollectedfrom(), KeyValue::getValue));
|
||||||
.collect(Collectors.toList()))
|
return res;
|
||||||
.setCollectedFrom(d.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null))
|
|
||||||
: null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static eu.dnetlib.broker.objects.Publication oafPublicationToBrokerPublication(final Publication p) {
|
public static eu.dnetlib.broker.objects.Publication oafPublicationToBrokerPublication(final Publication p) {
|
||||||
return p != null ? new eu.dnetlib.broker.objects.Publication()
|
if (p == null) {
|
||||||
.setOriginalId(p.getOriginalId().get(0))
|
return null;
|
||||||
.setTitle(structPropValue(p.getTitle()))
|
}
|
||||||
.setPids(p.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList()))
|
|
||||||
.setInstances(
|
final eu.dnetlib.broker.objects.Publication res = new eu.dnetlib.broker.objects.Publication();
|
||||||
p
|
res.setOriginalId(first(p.getOriginalId()));
|
||||||
.getInstance()
|
res.setTitle(structPropValue(p.getTitle()));
|
||||||
.stream()
|
res.setPids(mappedList(p.getPid(), ConversionUtils::oafPidToBrokerPid));
|
||||||
.map(ConversionUtils::oafInstanceToBrokerInstances)
|
res.setInstances(flatMappedList(p.getInstance(), ConversionUtils::oafInstanceToBrokerInstances));
|
||||||
.flatMap(List::stream)
|
res.setCollectedFrom(mappedFirst(p.getCollectedfrom(), KeyValue::getValue));
|
||||||
.collect(Collectors.toList()))
|
|
||||||
.setCollectedFrom(p.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null))
|
return res;
|
||||||
: null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final OpenaireBrokerResult oafResultToBrokerResult(final Result result) {
|
public static final OpenaireBrokerResult oafResultToBrokerResult(final Result result) {
|
||||||
|
if (result == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
return result != null ? new OpenaireBrokerResult()
|
final OpenaireBrokerResult res = new OpenaireBrokerResult();
|
||||||
.setOpenaireId(result.getId())
|
|
||||||
.setOriginalId(result.getOriginalId().get(0))
|
res.setOpenaireId(result.getId());
|
||||||
.setTypology(result.getResulttype().getClassid())
|
res.setOriginalId(first(result.getOriginalId()));
|
||||||
.setTitles(structPropList(result.getTitle()))
|
res.setTypology(classId(result.getResulttype()));
|
||||||
.setAbstracts(fieldList(result.getDescription()))
|
res.setTitles(structPropList(result.getTitle()));
|
||||||
.setLanguage(result.getLanguage().getClassid())
|
res.setAbstracts(fieldList(result.getDescription()));
|
||||||
.setSubjects(structPropTypedList(result.getSubject()))
|
res.setLanguage(classId(result.getLanguage()));
|
||||||
.setCreators(
|
res.setSubjects(structPropTypedList(result.getSubject()));
|
||||||
result.getAuthor().stream().map(ConversionUtils::oafAuthorToBrokerAuthor).collect(Collectors.toList()))
|
res.setCreators(mappedList(result.getAuthor(), ConversionUtils::oafAuthorToBrokerAuthor));
|
||||||
.setPublicationdate(result.getDateofacceptance().getValue())
|
res.setPublicationdate(fieldValue(result.getDateofacceptance()));
|
||||||
.setPublisher(fieldValue(result.getPublisher()))
|
res.setPublisher(fieldValue(result.getPublisher()));
|
||||||
.setEmbargoenddate(fieldValue(result.getEmbargoenddate()))
|
res.setEmbargoenddate(fieldValue(result.getEmbargoenddate()));
|
||||||
.setContributor(fieldList(result.getContributor()))
|
res.setContributor(fieldList(result.getContributor()));
|
||||||
|
res
|
||||||
.setJournal(
|
.setJournal(
|
||||||
result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null)
|
result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null);
|
||||||
.setCollectedFromId(result.getCollectedfrom().stream().map(KeyValue::getKey).findFirst().orElse(null))
|
res.setCollectedFromId(mappedFirst(result.getCollectedfrom(), KeyValue::getKey));
|
||||||
.setCollectedFromName(result.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null))
|
res.setCollectedFromName(mappedFirst(result.getCollectedfrom(), KeyValue::getValue));
|
||||||
.setPids(result.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList()))
|
res.setPids(mappedList(result.getPid(), ConversionUtils::oafPidToBrokerPid));
|
||||||
.setInstances(
|
res.setInstances(flatMappedList(result.getInstance(), ConversionUtils::oafInstanceToBrokerInstances));
|
||||||
result
|
res.setExternalReferences(mappedList(result.getExternalReference(), ConversionUtils::oafExtRefToBrokerExtRef));
|
||||||
.getInstance()
|
|
||||||
.stream()
|
|
||||||
.map(ConversionUtils::oafInstanceToBrokerInstances)
|
|
||||||
.flatMap(List::stream)
|
|
||||||
.collect(Collectors.toList()))
|
|
||||||
.setExternalReferences(
|
|
||||||
result
|
|
||||||
.getExternalReference()
|
|
||||||
.stream()
|
|
||||||
.map(ConversionUtils::oafExtRefToBrokerExtRef)
|
|
||||||
.collect(Collectors.toList()))
|
|
||||||
: null;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static List<TypedValue> structPropTypedList(final List<StructuredProperty> list) {
|
return res;
|
||||||
return list
|
|
||||||
.stream()
|
|
||||||
.map(
|
|
||||||
p -> new TypedValue()
|
|
||||||
.setValue(p.getValue())
|
|
||||||
.setType(p.getQualifier().getClassid()))
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static eu.dnetlib.broker.objects.Author oafAuthorToBrokerAuthor(final Author author) {
|
private static eu.dnetlib.broker.objects.Author oafAuthorToBrokerAuthor(final Author author) {
|
||||||
return author != null ? new eu.dnetlib.broker.objects.Author()
|
if (author == null) {
|
||||||
.setFullname(author.getFullname())
|
return null;
|
||||||
.setOrcid(
|
}
|
||||||
author
|
|
||||||
.getPid()
|
final String pids = author.getPid() != null ? author
|
||||||
.stream()
|
.getPid()
|
||||||
.filter(pid -> pid.getQualifier().getClassid().equalsIgnoreCase("orcid"))
|
.stream()
|
||||||
.map(pid -> pid.getValue())
|
.filter(pid -> pid != null)
|
||||||
.findFirst()
|
.filter(pid -> pid.getQualifier() != null)
|
||||||
.orElse(null))
|
.filter(pid -> pid.getQualifier().getClassid() != null)
|
||||||
: null;
|
.filter(pid -> pid.getQualifier().getClassid().equalsIgnoreCase("orcid"))
|
||||||
|
.map(pid -> pid.getValue())
|
||||||
|
.filter(StringUtils::isNotBlank)
|
||||||
|
.findFirst()
|
||||||
|
.orElse(null) : null;
|
||||||
|
|
||||||
|
return new eu.dnetlib.broker.objects.Author(author.getFullname(), pids);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static eu.dnetlib.broker.objects.Journal oafJournalToBrokerJournal(final Journal journal) {
|
private static eu.dnetlib.broker.objects.Journal oafJournalToBrokerJournal(final Journal journal) {
|
||||||
return journal != null ? new eu.dnetlib.broker.objects.Journal()
|
if (journal == null) {
|
||||||
.setName(journal.getName())
|
return null;
|
||||||
.setIssn(journal.getIssnPrinted())
|
}
|
||||||
.setEissn(journal.getIssnOnline())
|
|
||||||
.setLissn(journal.getIssnLinking()) : null;
|
final eu.dnetlib.broker.objects.Journal res = new eu.dnetlib.broker.objects.Journal();
|
||||||
|
res.setName(journal.getName());
|
||||||
|
res.setIssn(journal.getIssnPrinted());
|
||||||
|
res.setEissn(journal.getIssnOnline());
|
||||||
|
res.setLissn(journal.getIssnLinking());
|
||||||
|
|
||||||
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static eu.dnetlib.broker.objects.ExternalReference oafExtRefToBrokerExtRef(final ExternalReference ref) {
|
private static eu.dnetlib.broker.objects.ExternalReference oafExtRefToBrokerExtRef(final ExternalReference ref) {
|
||||||
return ref != null ? new eu.dnetlib.broker.objects.ExternalReference()
|
if (ref == null) {
|
||||||
.setRefidentifier(ref.getRefidentifier())
|
return null;
|
||||||
.setSitename(ref.getSitename())
|
}
|
||||||
.setType(ref.getQualifier().getClassid())
|
|
||||||
.setUrl(ref.getUrl())
|
final eu.dnetlib.broker.objects.ExternalReference res = new eu.dnetlib.broker.objects.ExternalReference();
|
||||||
: null;
|
res.setRefidentifier(ref.getRefidentifier());
|
||||||
|
res.setSitename(ref.getSitename());
|
||||||
|
res.setType(classId(ref.getQualifier()));
|
||||||
|
res.setUrl(ref.getUrl());
|
||||||
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final eu.dnetlib.broker.objects.Project oafProjectToBrokerProject(final Project p) {
|
public static final eu.dnetlib.broker.objects.Project oafProjectToBrokerProject(final Project p) {
|
||||||
|
@ -167,10 +170,10 @@ public class ConversionUtils {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
final eu.dnetlib.broker.objects.Project res = new eu.dnetlib.broker.objects.Project()
|
final eu.dnetlib.broker.objects.Project res = new eu.dnetlib.broker.objects.Project();
|
||||||
.setTitle(fieldValue(p.getTitle()))
|
res.setTitle(fieldValue(p.getTitle()));
|
||||||
.setAcronym(fieldValue(p.getAcronym()))
|
res.setAcronym(fieldValue(p.getAcronym()));
|
||||||
.setCode(fieldValue(p.getCode()));
|
res.setCode(fieldValue(p.getCode()));
|
||||||
|
|
||||||
final String ftree = fieldValue(p.getFundingtree());
|
final String ftree = fieldValue(p.getFundingtree());
|
||||||
if (StringUtils.isNotBlank(ftree)) {
|
if (StringUtils.isNotBlank(ftree)) {
|
||||||
|
@ -188,12 +191,25 @@ public class ConversionUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final eu.dnetlib.broker.objects.Software oafSoftwareToBrokerSoftware(final Software sw) {
|
public static final eu.dnetlib.broker.objects.Software oafSoftwareToBrokerSoftware(final Software sw) {
|
||||||
return sw != null ? new eu.dnetlib.broker.objects.Software()
|
if (sw == null) {
|
||||||
.setName(structPropValue(sw.getTitle()))
|
return null;
|
||||||
.setDescription(fieldValue(sw.getDescription()))
|
}
|
||||||
.setRepository(fieldValue(sw.getCodeRepositoryUrl()))
|
|
||||||
.setLandingPage(fieldValue(sw.getDocumentationUrl()))
|
final eu.dnetlib.broker.objects.Software res = new eu.dnetlib.broker.objects.Software();
|
||||||
: null;
|
res.setName(structPropValue(sw.getTitle()));
|
||||||
|
res.setDescription(fieldValue(sw.getDescription()));
|
||||||
|
res.setRepository(fieldValue(sw.getCodeRepositoryUrl()));
|
||||||
|
res.setLandingPage(fieldValue(sw.getDocumentationUrl()));
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String first(final List<String> list) {
|
||||||
|
return list != null && list.size() > 0 ? list.get(0) : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String kvValue(final KeyValue kv) {
|
||||||
|
return kv != null ? kv.getValue() : null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String fieldValue(final Field<String> f) {
|
private static String fieldValue(final Field<String> f) {
|
||||||
|
@ -205,6 +221,10 @@ public class ConversionUtils {
|
||||||
: null;
|
: null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static String classId(final Qualifier q) {
|
||||||
|
return q != null ? q.getClassid() : null;
|
||||||
|
}
|
||||||
|
|
||||||
private static String structPropValue(final List<StructuredProperty> props) {
|
private static String structPropValue(final List<StructuredProperty> props) {
|
||||||
return props != null
|
return props != null
|
||||||
? props.stream().map(StructuredProperty::getValue).filter(StringUtils::isNotBlank).findFirst().orElse(null)
|
? props.stream().map(StructuredProperty::getValue).filter(StringUtils::isNotBlank).findFirst().orElse(null)
|
||||||
|
@ -226,4 +246,55 @@ public class ConversionUtils {
|
||||||
.collect(Collectors.toList())
|
.collect(Collectors.toList())
|
||||||
: new ArrayList<>();
|
: new ArrayList<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static List<TypedValue> structPropTypedList(final List<StructuredProperty> list) {
|
||||||
|
if (list == null) {
|
||||||
|
return new ArrayList<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
return list
|
||||||
|
.stream()
|
||||||
|
.map(ConversionUtils::oafStructPropToBrokerTypedValue)
|
||||||
|
.filter(Objects::nonNull)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static <F, T> List<T> mappedList(final List<F> list, final Function<F, T> func) {
|
||||||
|
if (list == null) {
|
||||||
|
return new ArrayList<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
return list
|
||||||
|
.stream()
|
||||||
|
.map(func::apply)
|
||||||
|
.filter(Objects::nonNull)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static <F, T> List<T> flatMappedList(final List<F> list, final Function<F, List<T>> func) {
|
||||||
|
if (list == null) {
|
||||||
|
return new ArrayList<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
return list
|
||||||
|
.stream()
|
||||||
|
.map(func::apply)
|
||||||
|
.flatMap(List::stream)
|
||||||
|
.filter(Objects::nonNull)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static <F, T> T mappedFirst(final List<F> list, final Function<F, T> func) {
|
||||||
|
if (list == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return list
|
||||||
|
.stream()
|
||||||
|
.map(func::apply)
|
||||||
|
.filter(Objects::nonNull)
|
||||||
|
.findFirst()
|
||||||
|
.orElse(null);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,8 +63,14 @@ public final class UpdateInfo<T> {
|
||||||
return target;
|
return target;
|
||||||
}
|
}
|
||||||
|
|
||||||
private float calculateTrust(final DedupConfig dedupConfig, final OpenaireBrokerResult r1,
|
private float calculateTrust(final DedupConfig dedupConfig,
|
||||||
|
final OpenaireBrokerResult r1,
|
||||||
final OpenaireBrokerResult r2) {
|
final OpenaireBrokerResult r2) {
|
||||||
|
|
||||||
|
if (dedupConfig == null) {
|
||||||
|
return BrokerConstants.MIN_TRUST;
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final ObjectMapper objectMapper = new ObjectMapper();
|
final ObjectMapper objectMapper = new ObjectMapper();
|
||||||
final MapDocument doc1 = MapDocumentUtil
|
final MapDocument doc1 = MapDocumentUtil
|
||||||
|
@ -116,13 +122,15 @@ public final class UpdateInfo<T> {
|
||||||
.orElse(null);
|
.orElse(null);
|
||||||
;
|
;
|
||||||
|
|
||||||
final Provenance provenance = new Provenance().setId(provId).setRepositoryName(provRepo).setUrl(provUrl);
|
final Provenance provenance = new Provenance(provId, provRepo, provUrl);
|
||||||
|
|
||||||
return new OpenAireEventPayload()
|
final OpenAireEventPayload res = new OpenAireEventPayload();
|
||||||
.setPublication(target)
|
res.setResult(target);
|
||||||
.setHighlight(hl)
|
res.setHighlight(hl);
|
||||||
.setTrust(trust)
|
res.setTrust(trust);
|
||||||
.setProvenance(provenance);
|
res.setProvenance(provenance);
|
||||||
|
|
||||||
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,12 +23,14 @@ public class ResultAggregator extends Aggregator<Tuple2<OpenaireBrokerResult, Re
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ResultGroup reduce(final ResultGroup group, final Tuple2<OpenaireBrokerResult, Relation> t) {
|
public ResultGroup reduce(final ResultGroup group, final Tuple2<OpenaireBrokerResult, Relation> t) {
|
||||||
return group.addElement(t._1);
|
group.getData().add(t._1);
|
||||||
|
return group;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ResultGroup merge(final ResultGroup g1, final ResultGroup g2) {
|
public ResultGroup merge(final ResultGroup g1, final ResultGroup g2) {
|
||||||
return g1.addGroup(g2);
|
g1.getData().addAll(g2.getData());
|
||||||
|
return g1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -38,13 +40,13 @@ public class ResultAggregator extends Aggregator<Tuple2<OpenaireBrokerResult, Re
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Encoder<ResultGroup> bufferEncoder() {
|
public Encoder<ResultGroup> bufferEncoder() {
|
||||||
return Encoders.kryo(ResultGroup.class);
|
return Encoders.bean(ResultGroup.class);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Encoder<ResultGroup> outputEncoder() {
|
public Encoder<ResultGroup> outputEncoder() {
|
||||||
return Encoders.kryo(ResultGroup.class);
|
return Encoders.bean(ResultGroup.class);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -14,23 +14,14 @@ public class ResultGroup implements Serializable {
|
||||||
*/
|
*/
|
||||||
private static final long serialVersionUID = -3360828477088669296L;
|
private static final long serialVersionUID = -3360828477088669296L;
|
||||||
|
|
||||||
private final List<OpenaireBrokerResult> data = new ArrayList<>();
|
private List<OpenaireBrokerResult> data = new ArrayList<>();
|
||||||
|
|
||||||
public List<OpenaireBrokerResult> getData() {
|
public List<OpenaireBrokerResult> getData() {
|
||||||
return data;
|
return data;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ResultGroup addElement(final OpenaireBrokerResult elem) {
|
public void setData(final List<OpenaireBrokerResult> data) {
|
||||||
data.add(elem);
|
this.data = data;
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public ResultGroup addGroup(final ResultGroup group) {
|
|
||||||
data.addAll(group.getData());
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isValid() {
|
|
||||||
return data.size() > 1;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,12 +58,12 @@ public class OpenaireBrokerResultAggregator<T>
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Encoder<OpenaireBrokerResult> bufferEncoder() {
|
public Encoder<OpenaireBrokerResult> bufferEncoder() {
|
||||||
return Encoders.kryo(OpenaireBrokerResult.class);
|
return Encoders.bean(OpenaireBrokerResult.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Encoder<OpenaireBrokerResult> outputEncoder() {
|
public Encoder<OpenaireBrokerResult> outputEncoder() {
|
||||||
return Encoders.kryo(OpenaireBrokerResult.class);
|
return Encoders.bean(OpenaireBrokerResult.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,9 +11,12 @@ public class RelatedDataset implements Serializable {
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
private static final long serialVersionUID = 774487705184038324L;
|
private static final long serialVersionUID = 774487705184038324L;
|
||||||
private final String source;
|
private String source;
|
||||||
private final String relType;
|
private String relType;
|
||||||
private final Dataset relDataset;
|
private Dataset relDataset;
|
||||||
|
|
||||||
|
public RelatedDataset() {
|
||||||
|
}
|
||||||
|
|
||||||
public RelatedDataset(final String source, final String relType, final Dataset relDataset) {
|
public RelatedDataset(final String source, final String relType, final Dataset relDataset) {
|
||||||
this.source = source;
|
this.source = source;
|
||||||
|
@ -25,12 +28,24 @@ public class RelatedDataset implements Serializable {
|
||||||
return source;
|
return source;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setSource(final String source) {
|
||||||
|
this.source = source;
|
||||||
|
}
|
||||||
|
|
||||||
public String getRelType() {
|
public String getRelType() {
|
||||||
return relType;
|
return relType;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setRelType(final String relType) {
|
||||||
|
this.relType = relType;
|
||||||
|
}
|
||||||
|
|
||||||
public Dataset getRelDataset() {
|
public Dataset getRelDataset() {
|
||||||
return relDataset;
|
return relDataset;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setRelDataset(final Dataset relDataset) {
|
||||||
|
this.relDataset = relDataset;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,9 +12,12 @@ public class RelatedProject implements Serializable {
|
||||||
*/
|
*/
|
||||||
private static final long serialVersionUID = 4941437626549329870L;
|
private static final long serialVersionUID = 4941437626549329870L;
|
||||||
|
|
||||||
private final String source;
|
private String source;
|
||||||
private final String relType;
|
private String relType;
|
||||||
private final Project relProject;
|
private Project relProject;
|
||||||
|
|
||||||
|
public RelatedProject() {
|
||||||
|
}
|
||||||
|
|
||||||
public RelatedProject(final String source, final String relType, final Project relProject) {
|
public RelatedProject(final String source, final String relType, final Project relProject) {
|
||||||
this.source = source;
|
this.source = source;
|
||||||
|
@ -26,12 +29,24 @@ public class RelatedProject implements Serializable {
|
||||||
return source;
|
return source;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setSource(final String source) {
|
||||||
|
this.source = source;
|
||||||
|
}
|
||||||
|
|
||||||
public String getRelType() {
|
public String getRelType() {
|
||||||
return relType;
|
return relType;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setRelType(final String relType) {
|
||||||
|
this.relType = relType;
|
||||||
|
}
|
||||||
|
|
||||||
public Project getRelProject() {
|
public Project getRelProject() {
|
||||||
return relProject;
|
return relProject;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setRelProject(final Project relProject) {
|
||||||
|
this.relProject = relProject;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,9 +12,12 @@ public class RelatedPublication implements Serializable {
|
||||||
*/
|
*/
|
||||||
private static final long serialVersionUID = 9021609640411395128L;
|
private static final long serialVersionUID = 9021609640411395128L;
|
||||||
|
|
||||||
private final String source;
|
private String source;
|
||||||
private final String relType;
|
private String relType;
|
||||||
private final Publication relPublication;
|
private Publication relPublication;
|
||||||
|
|
||||||
|
public RelatedPublication() {
|
||||||
|
}
|
||||||
|
|
||||||
public RelatedPublication(final String source, final String relType, final Publication relPublication) {
|
public RelatedPublication(final String source, final String relType, final Publication relPublication) {
|
||||||
this.source = source;
|
this.source = source;
|
||||||
|
@ -26,12 +29,24 @@ public class RelatedPublication implements Serializable {
|
||||||
return source;
|
return source;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setSource(final String source) {
|
||||||
|
this.source = source;
|
||||||
|
}
|
||||||
|
|
||||||
public String getRelType() {
|
public String getRelType() {
|
||||||
return relType;
|
return relType;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setRelType(final String relType) {
|
||||||
|
this.relType = relType;
|
||||||
|
}
|
||||||
|
|
||||||
public Publication getRelPublication() {
|
public Publication getRelPublication() {
|
||||||
return relPublication;
|
return relPublication;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setRelPublication(final Publication relPublication) {
|
||||||
|
this.relPublication = relPublication;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,9 +11,12 @@ public class RelatedSoftware implements Serializable {
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
private static final long serialVersionUID = 7573383356943300157L;
|
private static final long serialVersionUID = 7573383356943300157L;
|
||||||
private final String source;
|
private String source;
|
||||||
private final String relType;
|
private String relType;
|
||||||
private final Software relSoftware;
|
private Software relSoftware;
|
||||||
|
|
||||||
|
public RelatedSoftware() {
|
||||||
|
}
|
||||||
|
|
||||||
public RelatedSoftware(final String source, final String relType, final Software relSoftware) {
|
public RelatedSoftware(final String source, final String relType, final Software relSoftware) {
|
||||||
this.source = source;
|
this.source = source;
|
||||||
|
@ -25,12 +28,24 @@ public class RelatedSoftware implements Serializable {
|
||||||
return source;
|
return source;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setSource(final String source) {
|
||||||
|
this.source = source;
|
||||||
|
}
|
||||||
|
|
||||||
public String getRelType() {
|
public String getRelType() {
|
||||||
return relType;
|
return relType;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setRelType(final String relType) {
|
||||||
|
this.relType = relType;
|
||||||
|
}
|
||||||
|
|
||||||
public Software getRelSoftware() {
|
public Software getRelSoftware() {
|
||||||
return relSoftware;
|
return relSoftware;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setRelSoftware(final Software relSoftware) {
|
||||||
|
this.relSoftware = relSoftware;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -80,20 +80,32 @@
|
||||||
</kill>
|
</kill>
|
||||||
|
|
||||||
<action name="generate_events">
|
<action name="generate_events">
|
||||||
<java>
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<prepare>
|
<master>yarn</master>
|
||||||
<delete path="${eventsOutputPath}"/>
|
<mode>cluster</mode>
|
||||||
</prepare>
|
<name>GenerateEvents</name>
|
||||||
<main-class>eu.dnetlib.dhp.broker.oa.GenerateEventsApplication</main-class>
|
<class>eu.dnetlib.dhp.broker.oa.GenerateEventsApplication</class>
|
||||||
|
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
</spark-opts>
|
||||||
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
||||||
<arg>--eventsPath</arg><arg>${eventsOutputPath}</arg>
|
<arg>--eventsPath</arg><arg>${eventsOutputPath}</arg>
|
||||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||||
<arg>--dedupConfProfile</arg><arg>${dedupConfProfId}</arg>
|
<arg>--dedupConfProfile</arg><arg>${dedupConfProfId}</arg>
|
||||||
</java>
|
</spark>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
|
|
||||||
<end name="End"/>
|
<end name="End"/>
|
||||||
|
|
||||||
</workflow-app>
|
</workflow-app>
|
|
@ -3,9 +3,13 @@ package eu.dnetlib.dhp.oa.graph.clean;
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
import java.io.BufferedInputStream;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
|
@ -19,7 +23,9 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||||
|
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
|
||||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||||
|
@ -84,12 +90,83 @@ public class CleanGraphSparkJob {
|
||||||
|
|
||||||
readTableFromPath(spark, inputPath, clazz)
|
readTableFromPath(spark, inputPath, clazz)
|
||||||
.map((MapFunction<T, T>) value -> OafCleaner.apply(value, mapping), Encoders.bean(clazz))
|
.map((MapFunction<T, T>) value -> OafCleaner.apply(value, mapping), Encoders.bean(clazz))
|
||||||
|
.map((MapFunction<T, T>) value -> fixDefaults(value), Encoders.bean(clazz))
|
||||||
.write()
|
.write()
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
.json(outputPath);
|
.json(outputPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static <T extends Oaf> T fixDefaults(T value) {
|
||||||
|
if (value instanceof Datasource) {
|
||||||
|
// nothing to clean here
|
||||||
|
} else if (value instanceof Project) {
|
||||||
|
// nothing to clean here
|
||||||
|
} else if (value instanceof Organization) {
|
||||||
|
Organization o = (Organization) value;
|
||||||
|
if (Objects.isNull(o.getCountry()) || StringUtils.isBlank(o.getCountry().getClassid())) {
|
||||||
|
o.setCountry(qualifier("UNKNOWN", "Unknown", ModelConstants.DNET_COUNTRY_TYPE));
|
||||||
|
}
|
||||||
|
} else if (value instanceof Relation) {
|
||||||
|
// nothing to clean here
|
||||||
|
} else if (value instanceof Result) {
|
||||||
|
|
||||||
|
Result r = (Result) value;
|
||||||
|
if (Objects.isNull(r.getLanguage()) || StringUtils.isBlank(r.getLanguage().getClassid())) {
|
||||||
|
r
|
||||||
|
.setLanguage(
|
||||||
|
qualifier("und", "Undetermined", ModelConstants.DNET_LANGUAGES));
|
||||||
|
}
|
||||||
|
if (Objects.nonNull(r.getSubject())) {
|
||||||
|
r
|
||||||
|
.setSubject(
|
||||||
|
r
|
||||||
|
.getSubject()
|
||||||
|
.stream()
|
||||||
|
.filter(Objects::nonNull)
|
||||||
|
.filter(sp -> StringUtils.isNotBlank(sp.getValue()))
|
||||||
|
.filter(sp -> Objects.nonNull(sp.getQualifier()))
|
||||||
|
.filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid()))
|
||||||
|
.collect(Collectors.toList()));
|
||||||
|
}
|
||||||
|
if (Objects.isNull(r.getResourcetype()) || StringUtils.isBlank(r.getResourcetype().getClassid())) {
|
||||||
|
r
|
||||||
|
.setResourcetype(
|
||||||
|
qualifier("UNKNOWN", "Unknown", ModelConstants.DNET_DATA_CITE_RESOURCE));
|
||||||
|
}
|
||||||
|
if (Objects.isNull(r.getBestaccessright()) || StringUtils.isBlank(r.getBestaccessright().getClassid())) {
|
||||||
|
r
|
||||||
|
.setBestaccessright(
|
||||||
|
qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES));
|
||||||
|
}
|
||||||
|
if (Objects.nonNull(r.getInstance())) {
|
||||||
|
for (Instance i : r.getInstance()) {
|
||||||
|
if (Objects.isNull(i.getAccessright()) || StringUtils.isBlank(i.getAccessright().getClassid())) {
|
||||||
|
i.setAccessright(qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (value instanceof Publication) {
|
||||||
|
|
||||||
|
} else if (value instanceof eu.dnetlib.dhp.schema.oaf.Dataset) {
|
||||||
|
|
||||||
|
} else if (value instanceof OtherResearchProduct) {
|
||||||
|
|
||||||
|
} else if (value instanceof Software) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Qualifier qualifier(String classid, String classname, String scheme) {
|
||||||
|
return OafMapperUtils
|
||||||
|
.qualifier(
|
||||||
|
classid, classname, scheme, scheme);
|
||||||
|
}
|
||||||
|
|
||||||
private static <T extends Oaf> Dataset<T> readTableFromPath(
|
private static <T extends Oaf> Dataset<T> readTableFromPath(
|
||||||
SparkSession spark, String inputEntityPath, Class<T> clazz) {
|
SparkSession spark, String inputEntityPath, Class<T> clazz) {
|
||||||
|
|
||||||
|
|
|
@ -4,10 +4,13 @@ package eu.dnetlib.dhp.oa.graph.clean;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableConsumer;
|
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableConsumer;
|
||||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Country;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
|
||||||
|
|
||||||
public class CleaningRuleMap extends HashMap<Class, SerializableConsumer<Object>> implements Serializable {
|
public class CleaningRuleMap extends HashMap<Class, SerializableConsumer<Object>> implements Serializable {
|
||||||
|
|
||||||
|
@ -18,23 +21,24 @@ public class CleaningRuleMap extends HashMap<Class, SerializableConsumer<Object>
|
||||||
*/
|
*/
|
||||||
public static CleaningRuleMap create(VocabularyGroup vocabularies) {
|
public static CleaningRuleMap create(VocabularyGroup vocabularies) {
|
||||||
CleaningRuleMap mapping = new CleaningRuleMap();
|
CleaningRuleMap mapping = new CleaningRuleMap();
|
||||||
mapping.put(Qualifier.class, o -> {
|
mapping.put(Qualifier.class, o -> cleanQualifier(vocabularies, (Qualifier) o));
|
||||||
Qualifier q = (Qualifier) o;
|
mapping.put(Country.class, o -> {
|
||||||
if (vocabularies.vocabularyExists(q.getSchemeid())) {
|
final Country c = (Country) o;
|
||||||
Qualifier newValue = vocabularies.lookup(q.getSchemeid(), q.getClassid());
|
if (StringUtils.isBlank(c.getSchemeid())) {
|
||||||
q.setClassid(newValue.getClassid());
|
c.setSchemeid(ModelConstants.DNET_COUNTRY_TYPE);
|
||||||
q.setClassname(newValue.getClassname());
|
c.setSchemename(ModelConstants.DNET_COUNTRY_TYPE);
|
||||||
}
|
}
|
||||||
});
|
cleanQualifier(vocabularies, c);
|
||||||
mapping.put(StructuredProperty.class, o -> {
|
|
||||||
StructuredProperty sp = (StructuredProperty) o;
|
|
||||||
// TODO implement a policy
|
|
||||||
/*
|
|
||||||
* if (StringUtils.isBlank(sp.getValue())) { sp.setValue(null); sp.setQualifier(null); sp.setDataInfo(null);
|
|
||||||
* }
|
|
||||||
*/
|
|
||||||
});
|
});
|
||||||
return mapping;
|
return mapping;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static <Q extends Qualifier> void cleanQualifier(VocabularyGroup vocabularies, Q q) {
|
||||||
|
if (vocabularies.vocabularyExists(q.getSchemeid())) {
|
||||||
|
Qualifier newValue = vocabularies.lookup(q.getSchemeid(), q.getClassid());
|
||||||
|
q.setClassid(newValue.getClassid());
|
||||||
|
q.setClassname(newValue.getClassname());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -122,7 +122,11 @@ public class VocabularyGroup implements Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean vocabularyExists(final String vocId) {
|
public boolean vocabularyExists(final String vocId) {
|
||||||
return vocs.containsKey(vocId.toLowerCase());
|
return Optional
|
||||||
|
.ofNullable(vocId)
|
||||||
|
.map(String::toLowerCase)
|
||||||
|
.map(id -> vocs.containsKey(id))
|
||||||
|
.orElse(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addSynonyms(final String vocId, final String termId, final String syn) {
|
private void addSynonyms(final String vocId, final String termId, final String syn) {
|
||||||
|
|
|
@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||||
|
|
||||||
|
@ -56,6 +57,9 @@ public class CleaningFunctionTest {
|
||||||
String json = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/result.json"));
|
String json = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/result.json"));
|
||||||
Publication p_in = MAPPER.readValue(json, Publication.class);
|
Publication p_in = MAPPER.readValue(json, Publication.class);
|
||||||
|
|
||||||
|
assertTrue(p_in instanceof Result);
|
||||||
|
assertTrue(p_in instanceof Publication);
|
||||||
|
|
||||||
Publication p_out = OafCleaner.apply(p_in, mapping);
|
Publication p_out = OafCleaner.apply(p_in, mapping);
|
||||||
|
|
||||||
assertNotNull(p_out);
|
assertNotNull(p_out);
|
||||||
|
@ -63,6 +67,9 @@ public class CleaningFunctionTest {
|
||||||
assertEquals("und", p_out.getLanguage().getClassid());
|
assertEquals("und", p_out.getLanguage().getClassid());
|
||||||
assertEquals("Undetermined", p_out.getLanguage().getClassname());
|
assertEquals("Undetermined", p_out.getLanguage().getClassname());
|
||||||
|
|
||||||
|
assertEquals("DE", p_out.getCountry().get(0).getClassid());
|
||||||
|
assertEquals("Germany", p_out.getCountry().get(0).getClassname());
|
||||||
|
|
||||||
assertEquals("0018", p_out.getInstance().get(0).getInstancetype().getClassid());
|
assertEquals("0018", p_out.getInstance().get(0).getInstancetype().getClassid());
|
||||||
assertEquals("Annotation", p_out.getInstance().get(0).getInstancetype().getClassname());
|
assertEquals("Annotation", p_out.getInstance().get(0).getInstancetype().getClassname());
|
||||||
|
|
||||||
|
|
|
@ -202,6 +202,12 @@
|
||||||
"contributor": [
|
"contributor": [
|
||||||
],
|
],
|
||||||
"country": [
|
"country": [
|
||||||
|
{
|
||||||
|
"classid": "DE",
|
||||||
|
"classname": "DE",
|
||||||
|
"schemeid": "dnet:countries",
|
||||||
|
"schemename": "dnet:countries"
|
||||||
|
}
|
||||||
],
|
],
|
||||||
"coverage": [
|
"coverage": [
|
||||||
],
|
],
|
||||||
|
|
Loading…
Reference in New Issue