forked from D-Net/dnet-hadoop
refactoring
This commit is contained in:
parent
f9fc64ffaf
commit
16c7a18435
|
@ -53,7 +53,7 @@
|
|||
<dependency>
|
||||
<groupId>eu.dnetlib</groupId>
|
||||
<artifactId>dnet-openaire-broker-common</artifactId>
|
||||
<version>[3.0.2,4.0.0)</version>
|
||||
<version>[3.0.3,4.0.0)</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
|
|
@ -11,7 +11,7 @@ import org.apache.commons.codec.digest.DigestUtils;
|
|||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.time.DateUtils;
|
||||
|
||||
import eu.dnetlib.broker.objects.OpenaireBrokerResult;
|
||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||
|
||||
public class EventFactory {
|
||||
|
@ -49,8 +49,8 @@ public class EventFactory {
|
|||
private static Map<String, Object> createMapFromResult(final UpdateInfo<?> updateInfo) {
|
||||
final Map<String, Object> map = new HashMap<>();
|
||||
|
||||
final OpenaireBrokerResult source = updateInfo.getSource();
|
||||
final OpenaireBrokerResult target = updateInfo.getTarget();
|
||||
final OaBrokerMainEntity source = updateInfo.getSource();
|
||||
final OaBrokerMainEntity target = updateInfo.getTarget();
|
||||
|
||||
map.put("target_datasource_id", target.getCollectedFromId());
|
||||
map.put("target_datasource_name", target.getCollectedFromName());
|
||||
|
|
|
@ -18,27 +18,20 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.broker.objects.OpenaireBrokerResult;
|
||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.broker.model.Event;
|
||||
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.EventFinder;
|
||||
import eu.dnetlib.dhp.broker.oa.util.EventGroup;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OpenaireBrokerResultAggregator;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware;
|
||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OaBrokerMainEntityAggregator;
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
|
@ -48,8 +41,6 @@ public class GenerateEventsApplication {
|
|||
|
||||
private static final Logger log = LoggerFactory.getLogger(GenerateEventsApplication.class);
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
|
@ -86,11 +77,11 @@ public class GenerateEventsApplication {
|
|||
|
||||
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
|
||||
|
||||
removeOutputDir(spark, eventsPath);
|
||||
ClusterUtils.removeDir(spark, eventsPath);
|
||||
|
||||
// TODO REMOVE THIS
|
||||
|
||||
relatedProjects(spark, graphPath)
|
||||
expandResultsWithRelations(spark, graphPath, Publication.class)
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(eventsPath);
|
||||
|
@ -110,28 +101,25 @@ public class GenerateEventsApplication {
|
|||
|
||||
}
|
||||
|
||||
private static void removeOutputDir(final SparkSession spark, final String path) {
|
||||
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
||||
}
|
||||
|
||||
private static <SRC extends Result> Dataset<Event> generateEvents(
|
||||
final SparkSession spark,
|
||||
final String graphPath,
|
||||
final Class<SRC> sourceClass,
|
||||
final DedupConfig dedupConfig) {
|
||||
|
||||
final Dataset<OpenaireBrokerResult> results = expandResultsWithRelations(spark, graphPath, sourceClass);
|
||||
final Dataset<OaBrokerMainEntity> results = expandResultsWithRelations(spark, graphPath, sourceClass);
|
||||
|
||||
final Dataset<Relation> mergedRels = readPath(spark, graphPath + "/relation", Relation.class)
|
||||
final Dataset<Relation> mergedRels = ClusterUtils
|
||||
.readPath(spark, graphPath + "/relation", Relation.class)
|
||||
.filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
|
||||
|
||||
final TypedColumn<Tuple2<OpenaireBrokerResult, Relation>, ResultGroup> aggr = new ResultAggregator()
|
||||
final TypedColumn<Tuple2<OaBrokerMainEntity, Relation>, ResultGroup> aggr = new ResultAggregator()
|
||||
.toColumn();
|
||||
|
||||
return results
|
||||
.joinWith(mergedRels, results.col("openaireId").equalTo(mergedRels.col("source")), "inner")
|
||||
.groupByKey(
|
||||
(MapFunction<Tuple2<OpenaireBrokerResult, Relation>, String>) t -> t._2.getTarget(), Encoders.STRING())
|
||||
(MapFunction<Tuple2<OaBrokerMainEntity, Relation>, String>) t -> t._2.getTarget(), Encoders.STRING())
|
||||
.agg(aggr)
|
||||
.map((MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.bean(ResultGroup.class))
|
||||
.filter(rg -> rg.getData().size() > 1)
|
||||
|
@ -141,7 +129,7 @@ public class GenerateEventsApplication {
|
|||
.flatMap(group -> group.getData().iterator(), Encoders.bean(Event.class));
|
||||
}
|
||||
|
||||
private static <SRC extends Result> Dataset<OpenaireBrokerResult> expandResultsWithRelations(
|
||||
private static <SRC extends Result> Dataset<OaBrokerMainEntity> expandResultsWithRelations(
|
||||
final SparkSession spark,
|
||||
final String graphPath,
|
||||
final Class<SRC> sourceClass) {
|
||||
|
@ -151,116 +139,35 @@ public class GenerateEventsApplication {
|
|||
// final Dataset<Software> softwares = readPath(spark, graphPath + "/software", Software.class);
|
||||
// final Dataset<Publication> publications = readPath(spark, graphPath + "/publication", Publication.class);
|
||||
|
||||
final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class)
|
||||
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
|
||||
.cache();
|
||||
|
||||
final Dataset<OpenaireBrokerResult> r0 = readPath(
|
||||
spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass)
|
||||
.filter(r -> r.getDataInfo().getDeletedbyinference())
|
||||
.map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OpenaireBrokerResult.class));
|
||||
final Dataset<OaBrokerMainEntity> r0 = ClusterUtils
|
||||
.readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass)
|
||||
.filter(r -> r.getDataInfo().getDeletedbyinference())
|
||||
.map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OaBrokerMainEntity.class));
|
||||
|
||||
// TODO UNCOMMENT THIS
|
||||
final Dataset<OpenaireBrokerResult> r1 = join(r0, rels, relatedProjects(spark, graphPath));
|
||||
// final Dataset<OpenaireBrokerResult> r2 = join(r1, rels, relatedDataset(spark, graphPath));
|
||||
// final Dataset<OpenaireBrokerResult> r3 = join(r2, rels, relatedPublications(spark, graphPath));
|
||||
// final Dataset<OpenaireBrokerResult> r4 = join(r3, rels, relatedSoftwares(spark, graphPath));
|
||||
// final Dataset<OaBrokerMainEntity> r1 = join(r0, relatedProjects(spark, graphPath));
|
||||
// final Dataset<OaBrokerMainEntity> r2 = join(r1, relatedDataset(spark, graphPath));
|
||||
// final Dataset<OaBrokerMainEntity> r3 = join(r2, relatedPublications(spark, graphPath));
|
||||
// final Dataset<OaBrokerMainEntity> r4 = join(r3, relatedSoftwares(spark, graphPath));
|
||||
|
||||
return r1; // TODO it should be r4
|
||||
return r0; // TODO it should be r4
|
||||
}
|
||||
|
||||
private static Dataset<RelatedProject> relatedProjects(final SparkSession spark, final String graphPath) {
|
||||
|
||||
final Dataset<Project> projects = readPath(spark, graphPath + "/project", Project.class);
|
||||
|
||||
final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class)
|
||||
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_PROJECT));
|
||||
|
||||
return rels
|
||||
.joinWith(projects, projects.col("id").equalTo(rels.col("target")), "inner")
|
||||
.map(
|
||||
t -> new RelatedProject(
|
||||
t._1.getSource(),
|
||||
t._1.getRelType(),
|
||||
ConversionUtils.oafProjectToBrokerProject(t._2)),
|
||||
Encoders.bean(RelatedProject.class));
|
||||
}
|
||||
|
||||
private static Dataset<RelatedDataset> relatedDataset(final SparkSession spark, final String graphPath) {
|
||||
|
||||
final Dataset<eu.dnetlib.dhp.schema.oaf.Dataset> datasets = readPath(
|
||||
spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
|
||||
|
||||
final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class);
|
||||
|
||||
return rels
|
||||
.joinWith(datasets, datasets.col("id").equalTo(rels.col("target")), "inner")
|
||||
.map(
|
||||
t -> new RelatedDataset(
|
||||
t._1.getSource(),
|
||||
t._1.getRelType(),
|
||||
ConversionUtils.oafDatasetToBrokerDataset(t._2)),
|
||||
Encoders.bean(RelatedDataset.class));
|
||||
}
|
||||
|
||||
private static Dataset<RelatedSoftware> relatedSoftwares(final SparkSession spark, final String graphPath) {
|
||||
|
||||
final Dataset<Software> softwares = readPath(spark, graphPath + "/software", Software.class);
|
||||
|
||||
final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class);
|
||||
|
||||
return rels
|
||||
.joinWith(softwares, softwares.col("id").equalTo(rels.col("target")), "inner")
|
||||
.map(
|
||||
t -> new RelatedSoftware(
|
||||
t._1.getSource(),
|
||||
t._1.getRelType(),
|
||||
ConversionUtils.oafSoftwareToBrokerSoftware(t._2)),
|
||||
Encoders.bean(RelatedSoftware.class));
|
||||
}
|
||||
|
||||
private static Dataset<RelatedPublication> relatedPublications(final SparkSession spark, final String graphPath) {
|
||||
|
||||
final Dataset<Publication> pubs = readPath(spark, graphPath + "/publication", Publication.class);
|
||||
|
||||
final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class);
|
||||
|
||||
return rels
|
||||
.joinWith(pubs, pubs.col("id").equalTo(rels.col("target")), "inner")
|
||||
.map(
|
||||
t -> new RelatedPublication(
|
||||
t._1.getSource(),
|
||||
t._1.getRelType(),
|
||||
ConversionUtils.oafPublicationToBrokerPublication(t._2)),
|
||||
Encoders.bean(RelatedPublication.class));
|
||||
}
|
||||
|
||||
private static <T> Dataset<OpenaireBrokerResult> join(final Dataset<OpenaireBrokerResult> sources,
|
||||
final Dataset<Relation> rels,
|
||||
private static <T> Dataset<OaBrokerMainEntity> join(final Dataset<OaBrokerMainEntity> sources,
|
||||
final Dataset<T> typedRels) {
|
||||
|
||||
final TypedColumn<Tuple2<OpenaireBrokerResult, T>, OpenaireBrokerResult> aggr = new OpenaireBrokerResultAggregator<T>()
|
||||
final TypedColumn<Tuple2<OaBrokerMainEntity, T>, OaBrokerMainEntity> aggr = new OaBrokerMainEntityAggregator<T>()
|
||||
.toColumn();
|
||||
|
||||
return sources
|
||||
.joinWith(typedRels, sources.col("openaireId").equalTo(rels.col("source")), "left_outer")
|
||||
.joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
|
||||
.groupByKey(
|
||||
(MapFunction<Tuple2<OpenaireBrokerResult, T>, String>) t -> t._1.getOpenaireId(), Encoders.STRING())
|
||||
(MapFunction<Tuple2<OaBrokerMainEntity, T>, String>) t -> t._1.getOpenaireId(), Encoders.STRING())
|
||||
.agg(aggr)
|
||||
.map(t -> t._2, Encoders.bean(OpenaireBrokerResult.class));
|
||||
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));
|
||||
|
||||
}
|
||||
|
||||
public static <R> Dataset<R> readPath(
|
||||
final SparkSession spark,
|
||||
final String inputPath,
|
||||
final Class<R> clazz) {
|
||||
return spark
|
||||
.read()
|
||||
.textFile(inputPath)
|
||||
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
|
||||
}
|
||||
|
||||
private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception {
|
||||
|
||||
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
|
||||
package eu.dnetlib.dhp.broker.oa;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
|
||||
public class GenerateRelatedDatasets {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(GenerateRelatedDatasets.class);
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
GenerateRelatedDatasets.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_relations.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
final Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String graphPath = parser.get("graphPath");
|
||||
log.info("graphPath: {}", graphPath);
|
||||
|
||||
final String relsPath = parser.get("relsPath");
|
||||
log.info("relsPath: {}", relsPath);
|
||||
|
||||
final SparkConf conf = new SparkConf();
|
||||
|
||||
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
|
||||
|
||||
ClusterUtils.removeDir(spark, relsPath);
|
||||
|
||||
final Dataset<eu.dnetlib.dhp.schema.oaf.Dataset> datasets = ClusterUtils
|
||||
.readPath(spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
|
||||
|
||||
final Dataset<Relation> rels = ClusterUtils.readPath(spark, graphPath + "/relation", Relation.class);
|
||||
|
||||
rels
|
||||
.joinWith(datasets, datasets.col("id").equalTo(rels.col("target")), "inner")
|
||||
.map(
|
||||
t -> new RelatedDataset(
|
||||
t._1.getSource(),
|
||||
t._1.getRelType(),
|
||||
ConversionUtils.oafDatasetToBrokerDataset(t._2)),
|
||||
Encoders.bean(RelatedDataset.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(relsPath);
|
||||
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,79 @@
|
|||
|
||||
package eu.dnetlib.dhp.broker.oa;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
|
||||
public class GenerateRelatedProjects {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(GenerateRelatedProjects.class);
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
GenerateRelatedProjects.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_relations.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
final Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String graphPath = parser.get("graphPath");
|
||||
log.info("graphPath: {}", graphPath);
|
||||
|
||||
final String relsPath = parser.get("relsPath");
|
||||
log.info("relsPath: {}", relsPath);
|
||||
|
||||
final SparkConf conf = new SparkConf();
|
||||
|
||||
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
|
||||
|
||||
ClusterUtils.removeDir(spark, relsPath);
|
||||
|
||||
final Dataset<Project> projects = ClusterUtils.readPath(spark, graphPath + "/project", Project.class);
|
||||
|
||||
final Dataset<Relation> rels = ClusterUtils
|
||||
.readPath(spark, graphPath + "/relation", Relation.class)
|
||||
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_PROJECT));
|
||||
|
||||
rels
|
||||
.joinWith(projects, projects.col("id").equalTo(rels.col("target")), "inner")
|
||||
.map(
|
||||
t -> new RelatedProject(
|
||||
t._1.getSource(),
|
||||
t._1.getRelType(),
|
||||
ConversionUtils.oafProjectToBrokerProject(t._2)),
|
||||
Encoders.bean(RelatedProject.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(relsPath);
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,78 @@
|
|||
|
||||
package eu.dnetlib.dhp.broker.oa;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication;
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
|
||||
public class GenerateRelatedPublications {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(GenerateRelatedPublications.class);
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
GenerateRelatedPublications.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_relations.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
final Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String graphPath = parser.get("graphPath");
|
||||
log.info("graphPath: {}", graphPath);
|
||||
|
||||
final String relsPath = parser.get("relsPath");
|
||||
log.info("relsPath: {}", relsPath);
|
||||
|
||||
final SparkConf conf = new SparkConf();
|
||||
|
||||
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
|
||||
|
||||
ClusterUtils.removeDir(spark, relsPath);
|
||||
|
||||
final Dataset<Publication> pubs = ClusterUtils
|
||||
.readPath(spark, graphPath + "/publication", Publication.class);
|
||||
|
||||
final Dataset<Relation> rels = ClusterUtils.readPath(spark, graphPath + "/relation", Relation.class);
|
||||
|
||||
rels
|
||||
.joinWith(pubs, pubs.col("id").equalTo(rels.col("target")), "inner")
|
||||
.map(
|
||||
t -> new RelatedPublication(
|
||||
t._1.getSource(),
|
||||
t._1.getRelType(),
|
||||
ConversionUtils.oafPublicationToBrokerPublication(t._2)),
|
||||
Encoders.bean(RelatedPublication.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(relsPath);
|
||||
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
|
||||
package eu.dnetlib.dhp.broker.oa;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||
|
||||
public class GenerateRelatedSoftwares {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(GenerateRelatedSoftwares.class);
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
GenerateRelatedSoftwares.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_relations.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
final Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String graphPath = parser.get("graphPath");
|
||||
log.info("graphPath: {}", graphPath);
|
||||
|
||||
final String relsPath = parser.get("relsPath");
|
||||
log.info("relsPath: {}", relsPath);
|
||||
|
||||
final SparkConf conf = new SparkConf();
|
||||
|
||||
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
|
||||
|
||||
ClusterUtils.removeDir(spark, relsPath);
|
||||
final Dataset<Software> softwares = ClusterUtils.readPath(spark, graphPath + "/software", Software.class);
|
||||
|
||||
final Dataset<Relation> rels = ClusterUtils.readPath(spark, graphPath + "/relation", Relation.class);
|
||||
|
||||
rels
|
||||
.joinWith(softwares, softwares.col("id").equalTo(rels.col("target")), "inner")
|
||||
.map(
|
||||
t -> new RelatedSoftware(
|
||||
t._1.getSource(),
|
||||
t._1.getRelType(),
|
||||
ConversionUtils.oafSoftwareToBrokerSoftware(t._2)),
|
||||
Encoders.bean(RelatedSoftware.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(relsPath);
|
||||
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,85 @@
|
|||
|
||||
package eu.dnetlib.dhp.broker.oa;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
|
||||
public class GenerateSimpleEntitities {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(GenerateSimpleEntitities.class);
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
GenerateSimpleEntitities.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_simple_entities.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
final Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String graphPath = parser.get("graphPath");
|
||||
log.info("graphPath: {}", graphPath);
|
||||
|
||||
final String simpleEntitiesPath = parser.get("simpleEntitiesPath");
|
||||
log.info("simpleEntitiesPath: {}", simpleEntitiesPath);
|
||||
|
||||
final SparkConf conf = new SparkConf();
|
||||
|
||||
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
|
||||
|
||||
ClusterUtils.removeDir(spark, simpleEntitiesPath);
|
||||
|
||||
expandResultsWithRelations(spark, graphPath, Publication.class)
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(simpleEntitiesPath);
|
||||
|
||||
// TODO UNCOMMENT THIS
|
||||
// spark
|
||||
// .emptyDataset(Encoders.bean(Event.class))
|
||||
// .union(generateEvents(spark, graphPath, Publication.class, dedupConfig))
|
||||
// .union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig))
|
||||
// .union(generateEvents(spark, graphPath, Software.class, dedupConfig))
|
||||
// .union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig))
|
||||
// .write()
|
||||
// .mode(SaveMode.Overwrite)
|
||||
// .option("compression", "gzip")
|
||||
// .json(eventsPath);
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
private static <SRC extends Result> Dataset<OaBrokerMainEntity> expandResultsWithRelations(
|
||||
final SparkSession spark,
|
||||
final String graphPath,
|
||||
final Class<SRC> sourceClass) {
|
||||
|
||||
return ClusterUtils
|
||||
.readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass)
|
||||
.filter(r -> r.getDataInfo().getDeletedbyinference())
|
||||
.map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OaBrokerMainEntity.class));
|
||||
}
|
||||
|
||||
}
|
|
@ -12,7 +12,7 @@ import java.util.function.Function;
|
|||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import eu.dnetlib.broker.objects.OpenaireBrokerResult;
|
||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
|
@ -21,11 +21,11 @@ public abstract class UpdateMatcher<T> {
|
|||
|
||||
private final boolean multipleUpdate;
|
||||
private final Function<T, Topic> topicFunction;
|
||||
private final BiConsumer<OpenaireBrokerResult, T> compileHighlightFunction;
|
||||
private final BiConsumer<OaBrokerMainEntity, T> compileHighlightFunction;
|
||||
private final Function<T, String> highlightToStringFunction;
|
||||
|
||||
public UpdateMatcher(final boolean multipleUpdate, final Function<T, Topic> topicFunction,
|
||||
final BiConsumer<OpenaireBrokerResult, T> compileHighlightFunction,
|
||||
final BiConsumer<OaBrokerMainEntity, T> compileHighlightFunction,
|
||||
final Function<T, String> highlightToStringFunction) {
|
||||
this.multipleUpdate = multipleUpdate;
|
||||
this.topicFunction = topicFunction;
|
||||
|
@ -33,13 +33,13 @@ public abstract class UpdateMatcher<T> {
|
|||
this.highlightToStringFunction = highlightToStringFunction;
|
||||
}
|
||||
|
||||
public Collection<UpdateInfo<T>> searchUpdatesForRecord(final OpenaireBrokerResult res,
|
||||
final Collection<OpenaireBrokerResult> others,
|
||||
public Collection<UpdateInfo<T>> searchUpdatesForRecord(final OaBrokerMainEntity res,
|
||||
final Collection<OaBrokerMainEntity> others,
|
||||
final DedupConfig dedupConfig) {
|
||||
|
||||
final Map<String, UpdateInfo<T>> infoMap = new HashMap<>();
|
||||
|
||||
for (final OpenaireBrokerResult source : others) {
|
||||
for (final OaBrokerMainEntity source : others) {
|
||||
if (source != res) {
|
||||
for (final T hl : findDifferences(source, res)) {
|
||||
final Topic topic = getTopicFunction().apply(hl);
|
||||
|
@ -68,7 +68,7 @@ public abstract class UpdateMatcher<T> {
|
|||
}
|
||||
}
|
||||
|
||||
protected abstract List<T> findDifferences(OpenaireBrokerResult source, OpenaireBrokerResult target);
|
||||
protected abstract List<T> findDifferences(OaBrokerMainEntity source, OaBrokerMainEntity target);
|
||||
|
||||
protected static boolean isMissing(final List<String> list) {
|
||||
return list == null || list.isEmpty() || StringUtils.isBlank(list.get(0));
|
||||
|
@ -86,7 +86,7 @@ public abstract class UpdateMatcher<T> {
|
|||
return topicFunction;
|
||||
}
|
||||
|
||||
public BiConsumer<OpenaireBrokerResult, T> getCompileHighlightFunction() {
|
||||
public BiConsumer<OaBrokerMainEntity, T> getCompileHighlightFunction() {
|
||||
return compileHighlightFunction;
|
||||
}
|
||||
|
||||
|
|
|
@ -5,13 +5,12 @@ import java.util.List;
|
|||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import eu.dnetlib.broker.objects.Dataset;
|
||||
import eu.dnetlib.broker.objects.OpenaireBrokerResult;
|
||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
import eu.dnetlib.broker.objects.OaBrokerRelatedDataset;
|
||||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
|
||||
public abstract class AbstractEnrichMissingDataset
|
||||
extends UpdateMatcher<Dataset> {
|
||||
public abstract class AbstractEnrichMissingDataset extends UpdateMatcher<OaBrokerRelatedDataset> {
|
||||
|
||||
public AbstractEnrichMissingDataset(final Topic topic) {
|
||||
super(true,
|
||||
|
@ -23,14 +22,14 @@ public abstract class AbstractEnrichMissingDataset
|
|||
protected abstract boolean filterByType(String relType);
|
||||
|
||||
@Override
|
||||
protected final List<Dataset> findDifferences(final OpenaireBrokerResult source,
|
||||
final OpenaireBrokerResult target) {
|
||||
protected final List<OaBrokerRelatedDataset> findDifferences(final OaBrokerMainEntity source,
|
||||
final OaBrokerMainEntity target) {
|
||||
|
||||
final Set<String> existingDatasets = target
|
||||
.getDatasets()
|
||||
.stream()
|
||||
.filter(rel -> filterByType(rel.getRelType()))
|
||||
.map(Dataset::getOriginalId)
|
||||
.map(OaBrokerRelatedDataset::getOriginalId)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
return source
|
||||
|
|
|
@ -4,12 +4,12 @@ package eu.dnetlib.dhp.broker.oa.matchers.relatedProjects;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import eu.dnetlib.broker.objects.OpenaireBrokerResult;
|
||||
import eu.dnetlib.broker.objects.Project;
|
||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
import eu.dnetlib.broker.objects.OaBrokerProject;
|
||||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
|
||||
public class EnrichMissingProject extends UpdateMatcher<Project> {
|
||||
public class EnrichMissingProject extends UpdateMatcher<OaBrokerProject> {
|
||||
|
||||
public EnrichMissingProject() {
|
||||
super(true,
|
||||
|
@ -19,7 +19,7 @@ public class EnrichMissingProject extends UpdateMatcher<Project> {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected List<Project> findDifferences(final OpenaireBrokerResult source, final OpenaireBrokerResult target) {
|
||||
protected List<OaBrokerProject> findDifferences(final OaBrokerMainEntity source, final OaBrokerMainEntity target) {
|
||||
if (target.getProjects().isEmpty()) {
|
||||
return source.getProjects();
|
||||
} else {
|
||||
|
|
|
@ -5,12 +5,12 @@ import java.util.List;
|
|||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import eu.dnetlib.broker.objects.OpenaireBrokerResult;
|
||||
import eu.dnetlib.broker.objects.Project;
|
||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
import eu.dnetlib.broker.objects.OaBrokerProject;
|
||||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
|
||||
public class EnrichMoreProject extends UpdateMatcher<Project> {
|
||||
public class EnrichMoreProject extends UpdateMatcher<OaBrokerProject> {
|
||||
|
||||
public EnrichMoreProject() {
|
||||
super(true,
|
||||
|
@ -19,13 +19,13 @@ public class EnrichMoreProject extends UpdateMatcher<Project> {
|
|||
prj -> projectAsString(prj));
|
||||
}
|
||||
|
||||
private static String projectAsString(final Project prj) {
|
||||
private static String projectAsString(final OaBrokerProject prj) {
|
||||
return prj.getFunder() + "::" + prj.getFundingProgram() + "::" + prj.getCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<eu.dnetlib.broker.objects.Project> findDifferences(final OpenaireBrokerResult source,
|
||||
final OpenaireBrokerResult target) {
|
||||
protected List<OaBrokerProject> findDifferences(final OaBrokerMainEntity source,
|
||||
final OaBrokerMainEntity target) {
|
||||
|
||||
final Set<String> existingProjects = target
|
||||
.getProjects()
|
||||
|
|
|
@ -5,12 +5,12 @@ import java.util.List;
|
|||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import eu.dnetlib.broker.objects.OpenaireBrokerResult;
|
||||
import eu.dnetlib.broker.objects.Publication;
|
||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
import eu.dnetlib.broker.objects.OaBrokerRelatedPublication;
|
||||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
|
||||
public abstract class AbstractEnrichMissingPublication extends UpdateMatcher<Publication> {
|
||||
public abstract class AbstractEnrichMissingPublication extends UpdateMatcher<OaBrokerRelatedPublication> {
|
||||
|
||||
public AbstractEnrichMissingPublication(final Topic topic) {
|
||||
super(true,
|
||||
|
@ -23,15 +23,15 @@ public abstract class AbstractEnrichMissingPublication extends UpdateMatcher<Pub
|
|||
protected abstract boolean filterByType(String relType);
|
||||
|
||||
@Override
|
||||
protected final List<eu.dnetlib.broker.objects.Publication> findDifferences(
|
||||
final OpenaireBrokerResult source,
|
||||
final OpenaireBrokerResult target) {
|
||||
protected final List<OaBrokerRelatedPublication> findDifferences(
|
||||
final OaBrokerMainEntity source,
|
||||
final OaBrokerMainEntity target) {
|
||||
|
||||
final Set<String> existingPublications = target
|
||||
.getPublications()
|
||||
.stream()
|
||||
.filter(rel -> filterByType(rel.getRelType()))
|
||||
.map(Publication::getOriginalId)
|
||||
.map(OaBrokerRelatedPublication::getOriginalId)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
return source
|
||||
|
|
|
@ -4,12 +4,13 @@ package eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import eu.dnetlib.broker.objects.OpenaireBrokerResult;
|
||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware;
|
||||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
|
||||
public class EnrichMissingSoftware
|
||||
extends UpdateMatcher<eu.dnetlib.broker.objects.Software> {
|
||||
extends UpdateMatcher<OaBrokerRelatedSoftware> {
|
||||
|
||||
public EnrichMissingSoftware() {
|
||||
super(true,
|
||||
|
@ -19,9 +20,9 @@ public class EnrichMissingSoftware
|
|||
}
|
||||
|
||||
@Override
|
||||
protected List<eu.dnetlib.broker.objects.Software> findDifferences(
|
||||
final OpenaireBrokerResult source,
|
||||
final OpenaireBrokerResult target) {
|
||||
protected List<OaBrokerRelatedSoftware> findDifferences(
|
||||
final OaBrokerMainEntity source,
|
||||
final OaBrokerMainEntity target) {
|
||||
|
||||
if (target.getSoftwares().isEmpty()) {
|
||||
return source.getSoftwares();
|
||||
|
|
|
@ -5,12 +5,12 @@ import java.util.List;
|
|||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import eu.dnetlib.broker.objects.OpenaireBrokerResult;
|
||||
import eu.dnetlib.broker.objects.Software;
|
||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware;
|
||||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
|
||||
public class EnrichMoreSoftware extends UpdateMatcher<Software> {
|
||||
public class EnrichMoreSoftware extends UpdateMatcher<OaBrokerRelatedSoftware> {
|
||||
|
||||
public EnrichMoreSoftware() {
|
||||
super(true,
|
||||
|
@ -20,14 +20,14 @@ public class EnrichMoreSoftware extends UpdateMatcher<Software> {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected List<eu.dnetlib.broker.objects.Software> findDifferences(
|
||||
final OpenaireBrokerResult source,
|
||||
final OpenaireBrokerResult target) {
|
||||
protected List<OaBrokerRelatedSoftware> findDifferences(
|
||||
final OaBrokerMainEntity source,
|
||||
final OaBrokerMainEntity target) {
|
||||
|
||||
final Set<String> existingSoftwares = source
|
||||
.getSoftwares()
|
||||
.stream()
|
||||
.map(Software::getName)
|
||||
.map(OaBrokerRelatedSoftware::getName)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
return target
|
||||
|
|
|
@ -5,7 +5,7 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import eu.dnetlib.broker.objects.OpenaireBrokerResult;
|
||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
|
||||
|
@ -19,7 +19,7 @@ public class EnrichMissingAbstract extends UpdateMatcher<String> {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected List<String> findDifferences(final OpenaireBrokerResult source, final OpenaireBrokerResult target) {
|
||||
protected List<String> findDifferences(final OaBrokerMainEntity source, final OaBrokerMainEntity target) {
|
||||
if (isMissing(target.getAbstracts()) && !isMissing(source.getAbstracts())) {
|
||||
return Arrays.asList(source.getAbstracts().get(0));
|
||||
} else {
|
||||
|
|
|
@ -7,12 +7,12 @@ import java.util.stream.Collectors;
|
|||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import eu.dnetlib.broker.objects.Author;
|
||||
import eu.dnetlib.broker.objects.OpenaireBrokerResult;
|
||||
import eu.dnetlib.broker.objects.OaBrokerAuthor;
|
||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
|
||||
public class EnrichMissingAuthorOrcid extends UpdateMatcher<Author> {
|
||||
public class EnrichMissingAuthorOrcid extends UpdateMatcher<OaBrokerAuthor> {
|
||||
|
||||
public EnrichMissingAuthorOrcid() {
|
||||
super(true,
|
||||
|
@ -22,13 +22,13 @@ public class EnrichMissingAuthorOrcid extends UpdateMatcher<Author> {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected List<Author> findDifferences(final OpenaireBrokerResult source,
|
||||
final OpenaireBrokerResult target) {
|
||||
protected List<OaBrokerAuthor> findDifferences(final OaBrokerMainEntity source,
|
||||
final OaBrokerMainEntity target) {
|
||||
|
||||
final Set<String> existingOrcids = target
|
||||
.getCreators()
|
||||
.stream()
|
||||
.map(Author::getOrcid)
|
||||
.map(OaBrokerAuthor::getOrcid)
|
||||
.filter(StringUtils::isNotBlank)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
|
|
|
@ -5,28 +5,28 @@ import java.util.Arrays;
|
|||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import eu.dnetlib.broker.objects.Instance;
|
||||
import eu.dnetlib.broker.objects.OpenaireBrokerResult;
|
||||
import eu.dnetlib.broker.objects.OaBrokerInstance;
|
||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
|
||||
|
||||
public class EnrichMissingOpenAccess extends UpdateMatcher<Instance> {
|
||||
public class EnrichMissingOpenAccess extends UpdateMatcher<OaBrokerInstance> {
|
||||
|
||||
public EnrichMissingOpenAccess() {
|
||||
super(true,
|
||||
i -> Topic.ENRICH_MISSING_OA_VERSION,
|
||||
(p, i) -> p.getInstances().add(i),
|
||||
Instance::getUrl);
|
||||
OaBrokerInstance::getUrl);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<Instance> findDifferences(final OpenaireBrokerResult source,
|
||||
final OpenaireBrokerResult target) {
|
||||
protected List<OaBrokerInstance> findDifferences(final OaBrokerMainEntity source,
|
||||
final OaBrokerMainEntity target) {
|
||||
final long count = target
|
||||
.getInstances()
|
||||
.stream()
|
||||
.map(Instance::getLicense)
|
||||
.map(OaBrokerInstance::getLicense)
|
||||
.filter(right -> right.equals(BrokerConstants.OPEN_ACCESS))
|
||||
.count();
|
||||
|
||||
|
|
|
@ -5,12 +5,12 @@ import java.util.Arrays;
|
|||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import eu.dnetlib.broker.objects.OpenaireBrokerResult;
|
||||
import eu.dnetlib.broker.objects.TypedValue;
|
||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
import eu.dnetlib.broker.objects.OaBrokerTypedValue;
|
||||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
|
||||
public class EnrichMissingPid extends UpdateMatcher<TypedValue> {
|
||||
public class EnrichMissingPid extends UpdateMatcher<OaBrokerTypedValue> {
|
||||
|
||||
public EnrichMissingPid() {
|
||||
super(true,
|
||||
|
@ -20,8 +20,8 @@ public class EnrichMissingPid extends UpdateMatcher<TypedValue> {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected List<TypedValue> findDifferences(final OpenaireBrokerResult source,
|
||||
final OpenaireBrokerResult target) {
|
||||
protected List<OaBrokerTypedValue> findDifferences(final OaBrokerMainEntity source,
|
||||
final OaBrokerMainEntity target) {
|
||||
final long count = target.getPids().size();
|
||||
|
||||
if (count > 0) {
|
||||
|
|
|
@ -5,7 +5,7 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import eu.dnetlib.broker.objects.OpenaireBrokerResult;
|
||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
|
||||
|
@ -19,8 +19,8 @@ public class EnrichMissingPublicationDate extends UpdateMatcher<String> {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected List<String> findDifferences(final OpenaireBrokerResult source,
|
||||
final OpenaireBrokerResult target) {
|
||||
protected List<String> findDifferences(final OaBrokerMainEntity source,
|
||||
final OaBrokerMainEntity target) {
|
||||
|
||||
if (isMissing(target.getPublicationdate()) && !isMissing(source.getPublicationdate())) {
|
||||
return Arrays.asList(source.getPublicationdate());
|
||||
|
|
|
@ -5,12 +5,12 @@ import java.util.List;
|
|||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import eu.dnetlib.broker.objects.OpenaireBrokerResult;
|
||||
import eu.dnetlib.broker.objects.TypedValue;
|
||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
import eu.dnetlib.broker.objects.OaBrokerTypedValue;
|
||||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
|
||||
public class EnrichMissingSubject extends UpdateMatcher<TypedValue> {
|
||||
public class EnrichMissingSubject extends UpdateMatcher<OaBrokerTypedValue> {
|
||||
|
||||
public EnrichMissingSubject() {
|
||||
super(true,
|
||||
|
@ -20,8 +20,8 @@ public class EnrichMissingSubject extends UpdateMatcher<TypedValue> {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected List<TypedValue> findDifferences(final OpenaireBrokerResult source,
|
||||
final OpenaireBrokerResult target) {
|
||||
protected List<OaBrokerTypedValue> findDifferences(final OaBrokerMainEntity source,
|
||||
final OaBrokerMainEntity target) {
|
||||
final Set<String> existingSubject = target
|
||||
.getSubjects()
|
||||
.stream()
|
||||
|
@ -35,7 +35,7 @@ public class EnrichMissingSubject extends UpdateMatcher<TypedValue> {
|
|||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private static String subjectAsString(final TypedValue s) {
|
||||
private static String subjectAsString(final OaBrokerTypedValue s) {
|
||||
return s.getType() + "::" + s.getValue();
|
||||
}
|
||||
|
||||
|
|
|
@ -5,24 +5,24 @@ import java.util.List;
|
|||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import eu.dnetlib.broker.objects.Instance;
|
||||
import eu.dnetlib.broker.objects.OpenaireBrokerResult;
|
||||
import eu.dnetlib.broker.objects.OaBrokerInstance;
|
||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
|
||||
|
||||
public class EnrichMoreOpenAccess extends UpdateMatcher<Instance> {
|
||||
public class EnrichMoreOpenAccess extends UpdateMatcher<OaBrokerInstance> {
|
||||
|
||||
public EnrichMoreOpenAccess() {
|
||||
super(true,
|
||||
i -> Topic.ENRICH_MORE_OA_VERSION,
|
||||
(p, i) -> p.getInstances().add(i),
|
||||
Instance::getUrl);
|
||||
OaBrokerInstance::getUrl);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<Instance> findDifferences(final OpenaireBrokerResult source,
|
||||
final OpenaireBrokerResult target) {
|
||||
protected List<OaBrokerInstance> findDifferences(final OaBrokerMainEntity source,
|
||||
final OaBrokerMainEntity target) {
|
||||
final Set<String> urls = target
|
||||
.getInstances()
|
||||
.stream()
|
||||
|
|
|
@ -5,12 +5,12 @@ import java.util.List;
|
|||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import eu.dnetlib.broker.objects.OpenaireBrokerResult;
|
||||
import eu.dnetlib.broker.objects.TypedValue;
|
||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
import eu.dnetlib.broker.objects.OaBrokerTypedValue;
|
||||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
|
||||
public class EnrichMorePid extends UpdateMatcher<TypedValue> {
|
||||
public class EnrichMorePid extends UpdateMatcher<OaBrokerTypedValue> {
|
||||
|
||||
public EnrichMorePid() {
|
||||
super(true,
|
||||
|
@ -20,8 +20,8 @@ public class EnrichMorePid extends UpdateMatcher<TypedValue> {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected List<TypedValue> findDifferences(final OpenaireBrokerResult source,
|
||||
final OpenaireBrokerResult target) {
|
||||
protected List<OaBrokerTypedValue> findDifferences(final OaBrokerMainEntity source,
|
||||
final OaBrokerMainEntity target) {
|
||||
final Set<String> existingPids = target
|
||||
.getPids()
|
||||
.stream()
|
||||
|
@ -35,7 +35,7 @@ public class EnrichMorePid extends UpdateMatcher<TypedValue> {
|
|||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private static String pidAsString(final TypedValue pid) {
|
||||
private static String pidAsString(final OaBrokerTypedValue pid) {
|
||||
return pid.getType() + "::" + pid.getValue();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,12 +5,12 @@ import java.util.List;
|
|||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import eu.dnetlib.broker.objects.OpenaireBrokerResult;
|
||||
import eu.dnetlib.broker.objects.TypedValue;
|
||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
import eu.dnetlib.broker.objects.OaBrokerTypedValue;
|
||||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
|
||||
public class EnrichMoreSubject extends UpdateMatcher<TypedValue> {
|
||||
public class EnrichMoreSubject extends UpdateMatcher<OaBrokerTypedValue> {
|
||||
|
||||
public EnrichMoreSubject() {
|
||||
super(true,
|
||||
|
@ -20,8 +20,8 @@ public class EnrichMoreSubject extends UpdateMatcher<TypedValue> {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected List<TypedValue> findDifferences(final OpenaireBrokerResult source,
|
||||
final OpenaireBrokerResult target) {
|
||||
protected List<OaBrokerTypedValue> findDifferences(final OaBrokerMainEntity source,
|
||||
final OaBrokerMainEntity target) {
|
||||
final Set<String> existingSubjects = target
|
||||
.getSubjects()
|
||||
.stream()
|
||||
|
@ -35,7 +35,7 @@ public class EnrichMoreSubject extends UpdateMatcher<TypedValue> {
|
|||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private static String subjectAsString(final TypedValue s) {
|
||||
private static String subjectAsString(final OaBrokerTypedValue s) {
|
||||
return s.getType() + "::" + s.getValue();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
|
||||
package eu.dnetlib.dhp.broker.oa.util;
|
||||
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||
|
||||
public class ClusterUtils {
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
public static void removeDir(final SparkSession spark, final String path) {
|
||||
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
||||
}
|
||||
|
||||
public static <R> Dataset<R> readPath(
|
||||
final SparkSession spark,
|
||||
final String inputPath,
|
||||
final Class<R> clazz) {
|
||||
return spark
|
||||
.read()
|
||||
.textFile(inputPath)
|
||||
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
|
||||
}
|
||||
|
||||
}
|
|
@ -15,8 +15,16 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import com.google.common.base.Function;
|
||||
|
||||
import eu.dnetlib.broker.objects.OpenaireBrokerResult;
|
||||
import eu.dnetlib.broker.objects.TypedValue;
|
||||
import eu.dnetlib.broker.objects.OaBrokerAuthor;
|
||||
import eu.dnetlib.broker.objects.OaBrokerExternalReference;
|
||||
import eu.dnetlib.broker.objects.OaBrokerInstance;
|
||||
import eu.dnetlib.broker.objects.OaBrokerJournal;
|
||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
import eu.dnetlib.broker.objects.OaBrokerProject;
|
||||
import eu.dnetlib.broker.objects.OaBrokerRelatedDataset;
|
||||
import eu.dnetlib.broker.objects.OaBrokerRelatedPublication;
|
||||
import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware;
|
||||
import eu.dnetlib.broker.objects.OaBrokerTypedValue;
|
||||
import eu.dnetlib.dhp.schema.oaf.Author;
|
||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||
import eu.dnetlib.dhp.schema.oaf.ExternalReference;
|
||||
|
@ -35,13 +43,13 @@ public class ConversionUtils {
|
|||
|
||||
private static final Logger log = LoggerFactory.getLogger(ConversionUtils.class);
|
||||
|
||||
public static List<eu.dnetlib.broker.objects.Instance> oafInstanceToBrokerInstances(final Instance i) {
|
||||
public static List<OaBrokerInstance> oafInstanceToBrokerInstances(final Instance i) {
|
||||
if (i == null) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
return mappedList(i.getUrl(), url -> {
|
||||
final eu.dnetlib.broker.objects.Instance res = new eu.dnetlib.broker.objects.Instance();
|
||||
final OaBrokerInstance res = new OaBrokerInstance();
|
||||
res.setUrl(url);
|
||||
res.setInstancetype(classId(i.getInstancetype()));
|
||||
res.setLicense(BrokerConstants.OPEN_ACCESS);
|
||||
|
@ -50,20 +58,21 @@ public class ConversionUtils {
|
|||
});
|
||||
}
|
||||
|
||||
public static TypedValue oafPidToBrokerPid(final StructuredProperty sp) {
|
||||
public static OaBrokerTypedValue oafPidToBrokerPid(final StructuredProperty sp) {
|
||||
return oafStructPropToBrokerTypedValue(sp);
|
||||
}
|
||||
|
||||
public static TypedValue oafStructPropToBrokerTypedValue(final StructuredProperty sp) {
|
||||
return sp != null ? new TypedValue(classId(sp.getQualifier()), sp.getValue()) : null;
|
||||
public static OaBrokerTypedValue oafStructPropToBrokerTypedValue(final StructuredProperty sp) {
|
||||
return sp != null ? new OaBrokerTypedValue(classId(sp.getQualifier()), sp.getValue()) : null;
|
||||
}
|
||||
|
||||
public static final eu.dnetlib.broker.objects.Dataset oafDatasetToBrokerDataset(final Dataset d) {
|
||||
public static final OaBrokerRelatedDataset oafDatasetToBrokerDataset(final Dataset d) {
|
||||
if (d == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final eu.dnetlib.broker.objects.Dataset res = new eu.dnetlib.broker.objects.Dataset();
|
||||
final OaBrokerRelatedDataset res = new OaBrokerRelatedDataset();
|
||||
res.setOpenaireId(d.getId());
|
||||
res.setOriginalId(first(d.getOriginalId()));
|
||||
res.setTitle(structPropValue(d.getTitle()));
|
||||
res.setPids(mappedList(d.getPid(), ConversionUtils::oafPidToBrokerPid));
|
||||
|
@ -72,12 +81,13 @@ public class ConversionUtils {
|
|||
return res;
|
||||
}
|
||||
|
||||
public static eu.dnetlib.broker.objects.Publication oafPublicationToBrokerPublication(final Publication p) {
|
||||
public static OaBrokerRelatedPublication oafPublicationToBrokerPublication(final Publication p) {
|
||||
if (p == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final eu.dnetlib.broker.objects.Publication res = new eu.dnetlib.broker.objects.Publication();
|
||||
final OaBrokerRelatedPublication res = new OaBrokerRelatedPublication();
|
||||
res.setOpenaireId(p.getId());
|
||||
res.setOriginalId(first(p.getOriginalId()));
|
||||
res.setTitle(structPropValue(p.getTitle()));
|
||||
res.setPids(mappedList(p.getPid(), ConversionUtils::oafPidToBrokerPid));
|
||||
|
@ -87,12 +97,12 @@ public class ConversionUtils {
|
|||
return res;
|
||||
}
|
||||
|
||||
public static final OpenaireBrokerResult oafResultToBrokerResult(final Result result) {
|
||||
public static final OaBrokerMainEntity oafResultToBrokerResult(final Result result) {
|
||||
if (result == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final OpenaireBrokerResult res = new OpenaireBrokerResult();
|
||||
final OaBrokerMainEntity res = new OaBrokerMainEntity();
|
||||
|
||||
res.setOpenaireId(result.getId());
|
||||
res.setOriginalId(first(result.getOriginalId()));
|
||||
|
@ -118,7 +128,7 @@ public class ConversionUtils {
|
|||
return res;
|
||||
}
|
||||
|
||||
private static eu.dnetlib.broker.objects.Author oafAuthorToBrokerAuthor(final Author author) {
|
||||
private static OaBrokerAuthor oafAuthorToBrokerAuthor(final Author author) {
|
||||
if (author == null) {
|
||||
return null;
|
||||
}
|
||||
|
@ -135,15 +145,15 @@ public class ConversionUtils {
|
|||
.findFirst()
|
||||
.orElse(null) : null;
|
||||
|
||||
return new eu.dnetlib.broker.objects.Author(author.getFullname(), pids);
|
||||
return new OaBrokerAuthor(author.getFullname(), pids);
|
||||
}
|
||||
|
||||
private static eu.dnetlib.broker.objects.Journal oafJournalToBrokerJournal(final Journal journal) {
|
||||
private static OaBrokerJournal oafJournalToBrokerJournal(final Journal journal) {
|
||||
if (journal == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final eu.dnetlib.broker.objects.Journal res = new eu.dnetlib.broker.objects.Journal();
|
||||
final OaBrokerJournal res = new OaBrokerJournal();
|
||||
res.setName(journal.getName());
|
||||
res.setIssn(journal.getIssnPrinted());
|
||||
res.setEissn(journal.getIssnOnline());
|
||||
|
@ -152,12 +162,12 @@ public class ConversionUtils {
|
|||
return res;
|
||||
}
|
||||
|
||||
private static eu.dnetlib.broker.objects.ExternalReference oafExtRefToBrokerExtRef(final ExternalReference ref) {
|
||||
private static OaBrokerExternalReference oafExtRefToBrokerExtRef(final ExternalReference ref) {
|
||||
if (ref == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final eu.dnetlib.broker.objects.ExternalReference res = new eu.dnetlib.broker.objects.ExternalReference();
|
||||
final OaBrokerExternalReference res = new OaBrokerExternalReference();
|
||||
res.setRefidentifier(ref.getRefidentifier());
|
||||
res.setSitename(ref.getSitename());
|
||||
res.setType(classId(ref.getQualifier()));
|
||||
|
@ -165,12 +175,13 @@ public class ConversionUtils {
|
|||
return res;
|
||||
}
|
||||
|
||||
public static final eu.dnetlib.broker.objects.Project oafProjectToBrokerProject(final Project p) {
|
||||
public static final OaBrokerProject oafProjectToBrokerProject(final Project p) {
|
||||
if (p == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final eu.dnetlib.broker.objects.Project res = new eu.dnetlib.broker.objects.Project();
|
||||
final OaBrokerProject res = new OaBrokerProject();
|
||||
res.setOpenaireId(p.getId());
|
||||
res.setTitle(fieldValue(p.getTitle()));
|
||||
res.setAcronym(fieldValue(p.getAcronym()));
|
||||
res.setCode(fieldValue(p.getCode()));
|
||||
|
@ -190,12 +201,13 @@ public class ConversionUtils {
|
|||
return res;
|
||||
}
|
||||
|
||||
public static final eu.dnetlib.broker.objects.Software oafSoftwareToBrokerSoftware(final Software sw) {
|
||||
public static final OaBrokerRelatedSoftware oafSoftwareToBrokerSoftware(final Software sw) {
|
||||
if (sw == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final eu.dnetlib.broker.objects.Software res = new eu.dnetlib.broker.objects.Software();
|
||||
final OaBrokerRelatedSoftware res = new OaBrokerRelatedSoftware();
|
||||
res.setOpenaireId(sw.getId());
|
||||
res.setName(structPropValue(sw.getTitle()));
|
||||
res.setDescription(fieldValue(sw.getDescription()));
|
||||
res.setRepository(fieldValue(sw.getCodeRepositoryUrl()));
|
||||
|
@ -247,7 +259,7 @@ public class ConversionUtils {
|
|||
: new ArrayList<>();
|
||||
}
|
||||
|
||||
private static List<TypedValue> structPropTypedList(final List<StructuredProperty> list) {
|
||||
private static List<OaBrokerTypedValue> structPropTypedList(final List<StructuredProperty> list) {
|
||||
if (list == null) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
|
|
@ -4,7 +4,7 @@ package eu.dnetlib.dhp.broker.oa.util;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import eu.dnetlib.broker.objects.OpenaireBrokerResult;
|
||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
import eu.dnetlib.dhp.broker.model.EventFactory;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsReferencedBy;
|
||||
|
@ -68,7 +68,7 @@ public class EventFinder {
|
|||
public static EventGroup generateEvents(final ResultGroup results, final DedupConfig dedupConfig) {
|
||||
final List<UpdateInfo<?>> list = new ArrayList<>();
|
||||
|
||||
for (final OpenaireBrokerResult target : results.getData()) {
|
||||
for (final OaBrokerMainEntity target : results.getData()) {
|
||||
for (final UpdateMatcher<?> matcher : matchers) {
|
||||
list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), dedupConfig));
|
||||
}
|
||||
|
|
|
@ -9,10 +9,10 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.broker.objects.Instance;
|
||||
import eu.dnetlib.broker.objects.OpenAireEventPayload;
|
||||
import eu.dnetlib.broker.objects.OpenaireBrokerResult;
|
||||
import eu.dnetlib.broker.objects.Provenance;
|
||||
import eu.dnetlib.broker.objects.OaBrokerEventPayload;
|
||||
import eu.dnetlib.broker.objects.OaBrokerInstance;
|
||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
import eu.dnetlib.broker.objects.OaBrokerProvenance;
|
||||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
import eu.dnetlib.pace.model.MapDocument;
|
||||
|
@ -25,11 +25,11 @@ public final class UpdateInfo<T> {
|
|||
|
||||
private final T highlightValue;
|
||||
|
||||
private final OpenaireBrokerResult source;
|
||||
private final OaBrokerMainEntity source;
|
||||
|
||||
private final OpenaireBrokerResult target;
|
||||
private final OaBrokerMainEntity target;
|
||||
|
||||
private final BiConsumer<OpenaireBrokerResult, T> compileHighlight;
|
||||
private final BiConsumer<OaBrokerMainEntity, T> compileHighlight;
|
||||
|
||||
private final Function<T, String> highlightToString;
|
||||
|
||||
|
@ -37,9 +37,9 @@ public final class UpdateInfo<T> {
|
|||
|
||||
private static final Logger log = LoggerFactory.getLogger(UpdateInfo.class);
|
||||
|
||||
public UpdateInfo(final Topic topic, final T highlightValue, final OpenaireBrokerResult source,
|
||||
final OpenaireBrokerResult target,
|
||||
final BiConsumer<OpenaireBrokerResult, T> compileHighlight,
|
||||
public UpdateInfo(final Topic topic, final T highlightValue, final OaBrokerMainEntity source,
|
||||
final OaBrokerMainEntity target,
|
||||
final BiConsumer<OaBrokerMainEntity, T> compileHighlight,
|
||||
final Function<T, String> highlightToString,
|
||||
final DedupConfig dedupConfig) {
|
||||
this.topic = topic;
|
||||
|
@ -55,17 +55,17 @@ public final class UpdateInfo<T> {
|
|||
return highlightValue;
|
||||
}
|
||||
|
||||
public OpenaireBrokerResult getSource() {
|
||||
public OaBrokerMainEntity getSource() {
|
||||
return source;
|
||||
}
|
||||
|
||||
public OpenaireBrokerResult getTarget() {
|
||||
public OaBrokerMainEntity getTarget() {
|
||||
return target;
|
||||
}
|
||||
|
||||
private float calculateTrust(final DedupConfig dedupConfig,
|
||||
final OpenaireBrokerResult r1,
|
||||
final OpenaireBrokerResult r2) {
|
||||
final OaBrokerMainEntity r1,
|
||||
final OaBrokerMainEntity r2) {
|
||||
|
||||
if (dedupConfig == null) {
|
||||
return BrokerConstants.MIN_TRUST;
|
||||
|
@ -104,11 +104,11 @@ public final class UpdateInfo<T> {
|
|||
return highlightToString.apply(getHighlightValue());
|
||||
}
|
||||
|
||||
public OpenAireEventPayload asBrokerPayload() {
|
||||
public OaBrokerEventPayload asBrokerPayload() {
|
||||
|
||||
compileHighlight.accept(target, getHighlightValue());
|
||||
|
||||
final OpenaireBrokerResult hl = new OpenaireBrokerResult();
|
||||
final OaBrokerMainEntity hl = new OaBrokerMainEntity();
|
||||
compileHighlight.accept(hl, getHighlightValue());
|
||||
|
||||
final String provId = getSource().getOriginalId();
|
||||
|
@ -117,14 +117,14 @@ public final class UpdateInfo<T> {
|
|||
final String provUrl = getSource()
|
||||
.getInstances()
|
||||
.stream()
|
||||
.map(Instance::getUrl)
|
||||
.map(OaBrokerInstance::getUrl)
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
;
|
||||
|
||||
final Provenance provenance = new Provenance(provId, provRepo, provUrl);
|
||||
final OaBrokerProvenance provenance = new OaBrokerProvenance(provId, provRepo, provUrl);
|
||||
|
||||
final OpenAireEventPayload res = new OpenAireEventPayload();
|
||||
final OaBrokerEventPayload res = new OaBrokerEventPayload();
|
||||
res.setResult(target);
|
||||
res.setHighlight(hl);
|
||||
res.setTrust(trust);
|
||||
|
|
|
@ -5,11 +5,11 @@ import org.apache.spark.sql.Encoder;
|
|||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.expressions.Aggregator;
|
||||
|
||||
import eu.dnetlib.broker.objects.OpenaireBrokerResult;
|
||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class ResultAggregator extends Aggregator<Tuple2<OpenaireBrokerResult, Relation>, ResultGroup, ResultGroup> {
|
||||
public class ResultAggregator extends Aggregator<Tuple2<OaBrokerMainEntity, Relation>, ResultGroup, ResultGroup> {
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -22,7 +22,7 @@ public class ResultAggregator extends Aggregator<Tuple2<OpenaireBrokerResult, Re
|
|||
}
|
||||
|
||||
@Override
|
||||
public ResultGroup reduce(final ResultGroup group, final Tuple2<OpenaireBrokerResult, Relation> t) {
|
||||
public ResultGroup reduce(final ResultGroup group, final Tuple2<OaBrokerMainEntity, Relation> t) {
|
||||
group.getData().add(t._1);
|
||||
return group;
|
||||
}
|
||||
|
|
|
@ -5,7 +5,7 @@ import java.io.Serializable;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import eu.dnetlib.broker.objects.OpenaireBrokerResult;
|
||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
|
||||
public class ResultGroup implements Serializable {
|
||||
|
||||
|
@ -14,13 +14,13 @@ public class ResultGroup implements Serializable {
|
|||
*/
|
||||
private static final long serialVersionUID = -3360828477088669296L;
|
||||
|
||||
private List<OpenaireBrokerResult> data = new ArrayList<>();
|
||||
private List<OaBrokerMainEntity> data = new ArrayList<>();
|
||||
|
||||
public List<OpenaireBrokerResult> getData() {
|
||||
public List<OaBrokerMainEntity> getData() {
|
||||
return data;
|
||||
}
|
||||
|
||||
public void setData(final List<OpenaireBrokerResult> data) {
|
||||
public void setData(final List<OaBrokerMainEntity> data) {
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
|
|
|
@ -5,11 +5,11 @@ import org.apache.spark.sql.Encoder;
|
|||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.expressions.Aggregator;
|
||||
|
||||
import eu.dnetlib.broker.objects.OpenaireBrokerResult;
|
||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class OpenaireBrokerResultAggregator<T>
|
||||
extends Aggregator<Tuple2<OpenaireBrokerResult, T>, OpenaireBrokerResult, OpenaireBrokerResult> {
|
||||
public class OaBrokerMainEntityAggregator<T>
|
||||
extends Aggregator<Tuple2<OaBrokerMainEntity, T>, OaBrokerMainEntity, OaBrokerMainEntity> {
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -17,17 +17,17 @@ public class OpenaireBrokerResultAggregator<T>
|
|||
private static final long serialVersionUID = -3687878788861013488L;
|
||||
|
||||
@Override
|
||||
public OpenaireBrokerResult zero() {
|
||||
return new OpenaireBrokerResult();
|
||||
public OaBrokerMainEntity zero() {
|
||||
return new OaBrokerMainEntity();
|
||||
}
|
||||
|
||||
@Override
|
||||
public OpenaireBrokerResult finish(final OpenaireBrokerResult g) {
|
||||
public OaBrokerMainEntity finish(final OaBrokerMainEntity g) {
|
||||
return g;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OpenaireBrokerResult reduce(final OpenaireBrokerResult g, final Tuple2<OpenaireBrokerResult, T> t) {
|
||||
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2<OaBrokerMainEntity, T> t) {
|
||||
if (g.getOriginalId() == null) {
|
||||
return t._1;
|
||||
} else if (t._2 instanceof RelatedSoftware) {
|
||||
|
@ -38,13 +38,15 @@ public class OpenaireBrokerResultAggregator<T>
|
|||
g.getPublications().add(((RelatedPublication) t._2).getRelPublication());
|
||||
} else if (t._2 instanceof RelatedProject) {
|
||||
g.getProjects().add(((RelatedProject) t._2).getRelProject());
|
||||
} else {
|
||||
throw new RuntimeException("Invalid Object: " + t._2.getClass());
|
||||
}
|
||||
return g;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public OpenaireBrokerResult merge(final OpenaireBrokerResult g1, final OpenaireBrokerResult g2) {
|
||||
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
|
||||
if (g1.getOriginalId() != null) {
|
||||
g1.getSoftwares().addAll(g2.getSoftwares());
|
||||
g1.getDatasets().addAll(g2.getDatasets());
|
||||
|
@ -57,13 +59,13 @@ public class OpenaireBrokerResultAggregator<T>
|
|||
}
|
||||
|
||||
@Override
|
||||
public Encoder<OpenaireBrokerResult> bufferEncoder() {
|
||||
return Encoders.bean(OpenaireBrokerResult.class);
|
||||
public Encoder<OaBrokerMainEntity> bufferEncoder() {
|
||||
return Encoders.bean(OaBrokerMainEntity.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Encoder<OpenaireBrokerResult> outputEncoder() {
|
||||
return Encoders.bean(OpenaireBrokerResult.class);
|
||||
public Encoder<OaBrokerMainEntity> outputEncoder() {
|
||||
return Encoders.bean(OaBrokerMainEntity.class);
|
||||
}
|
||||
|
||||
}
|
|
@ -3,7 +3,7 @@ package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
|
|||
|
||||
import java.io.Serializable;
|
||||
|
||||
import eu.dnetlib.broker.objects.Dataset;
|
||||
import eu.dnetlib.broker.objects.OaBrokerRelatedDataset;
|
||||
|
||||
public class RelatedDataset implements Serializable {
|
||||
|
||||
|
@ -13,12 +13,12 @@ public class RelatedDataset implements Serializable {
|
|||
private static final long serialVersionUID = 774487705184038324L;
|
||||
private String source;
|
||||
private String relType;
|
||||
private Dataset relDataset;
|
||||
private OaBrokerRelatedDataset relDataset;
|
||||
|
||||
public RelatedDataset() {
|
||||
}
|
||||
|
||||
public RelatedDataset(final String source, final String relType, final Dataset relDataset) {
|
||||
public RelatedDataset(final String source, final String relType, final OaBrokerRelatedDataset relDataset) {
|
||||
this.source = source;
|
||||
this.relType = relType;
|
||||
this.relDataset = relDataset;
|
||||
|
@ -40,11 +40,11 @@ public class RelatedDataset implements Serializable {
|
|||
this.relType = relType;
|
||||
}
|
||||
|
||||
public Dataset getRelDataset() {
|
||||
public OaBrokerRelatedDataset getRelDataset() {
|
||||
return relDataset;
|
||||
}
|
||||
|
||||
public void setRelDataset(final Dataset relDataset) {
|
||||
public void setRelDataset(final OaBrokerRelatedDataset relDataset) {
|
||||
this.relDataset = relDataset;
|
||||
}
|
||||
|
||||
|
|
|
@ -3,7 +3,7 @@ package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
|
|||
|
||||
import java.io.Serializable;
|
||||
|
||||
import eu.dnetlib.broker.objects.Project;
|
||||
import eu.dnetlib.broker.objects.OaBrokerProject;
|
||||
|
||||
public class RelatedProject implements Serializable {
|
||||
|
||||
|
@ -14,12 +14,12 @@ public class RelatedProject implements Serializable {
|
|||
|
||||
private String source;
|
||||
private String relType;
|
||||
private Project relProject;
|
||||
private OaBrokerProject relProject;
|
||||
|
||||
public RelatedProject() {
|
||||
}
|
||||
|
||||
public RelatedProject(final String source, final String relType, final Project relProject) {
|
||||
public RelatedProject(final String source, final String relType, final OaBrokerProject relProject) {
|
||||
this.source = source;
|
||||
this.relType = relType;
|
||||
this.relProject = relProject;
|
||||
|
@ -41,11 +41,11 @@ public class RelatedProject implements Serializable {
|
|||
this.relType = relType;
|
||||
}
|
||||
|
||||
public Project getRelProject() {
|
||||
public OaBrokerProject getRelProject() {
|
||||
return relProject;
|
||||
}
|
||||
|
||||
public void setRelProject(final Project relProject) {
|
||||
public void setRelProject(final OaBrokerProject relProject) {
|
||||
this.relProject = relProject;
|
||||
}
|
||||
|
||||
|
|
|
@ -3,7 +3,7 @@ package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
|
|||
|
||||
import java.io.Serializable;
|
||||
|
||||
import eu.dnetlib.broker.objects.Publication;
|
||||
import eu.dnetlib.broker.objects.OaBrokerRelatedPublication;
|
||||
|
||||
public class RelatedPublication implements Serializable {
|
||||
|
||||
|
@ -14,12 +14,13 @@ public class RelatedPublication implements Serializable {
|
|||
|
||||
private String source;
|
||||
private String relType;
|
||||
private Publication relPublication;
|
||||
private OaBrokerRelatedPublication relPublication;
|
||||
|
||||
public RelatedPublication() {
|
||||
}
|
||||
|
||||
public RelatedPublication(final String source, final String relType, final Publication relPublication) {
|
||||
public RelatedPublication(final String source, final String relType,
|
||||
final OaBrokerRelatedPublication relPublication) {
|
||||
this.source = source;
|
||||
this.relType = relType;
|
||||
this.relPublication = relPublication;
|
||||
|
@ -41,11 +42,11 @@ public class RelatedPublication implements Serializable {
|
|||
this.relType = relType;
|
||||
}
|
||||
|
||||
public Publication getRelPublication() {
|
||||
public OaBrokerRelatedPublication getRelPublication() {
|
||||
return relPublication;
|
||||
}
|
||||
|
||||
public void setRelPublication(final Publication relPublication) {
|
||||
public void setRelPublication(final OaBrokerRelatedPublication relPublication) {
|
||||
this.relPublication = relPublication;
|
||||
}
|
||||
|
||||
|
|
|
@ -3,7 +3,7 @@ package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
|
|||
|
||||
import java.io.Serializable;
|
||||
|
||||
import eu.dnetlib.broker.objects.Software;
|
||||
import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware;
|
||||
|
||||
public class RelatedSoftware implements Serializable {
|
||||
|
||||
|
@ -13,12 +13,12 @@ public class RelatedSoftware implements Serializable {
|
|||
private static final long serialVersionUID = 7573383356943300157L;
|
||||
private String source;
|
||||
private String relType;
|
||||
private Software relSoftware;
|
||||
private OaBrokerRelatedSoftware relSoftware;
|
||||
|
||||
public RelatedSoftware() {
|
||||
}
|
||||
|
||||
public RelatedSoftware(final String source, final String relType, final Software relSoftware) {
|
||||
public RelatedSoftware(final String source, final String relType, final OaBrokerRelatedSoftware relSoftware) {
|
||||
this.source = source;
|
||||
this.relType = relType;
|
||||
this.relSoftware = relSoftware;
|
||||
|
@ -40,11 +40,11 @@ public class RelatedSoftware implements Serializable {
|
|||
this.relType = relType;
|
||||
}
|
||||
|
||||
public Software getRelSoftware() {
|
||||
public OaBrokerRelatedSoftware getRelSoftware() {
|
||||
return relSoftware;
|
||||
}
|
||||
|
||||
public void setRelSoftware(final Software relSoftware) {
|
||||
public void setRelSoftware(final OaBrokerRelatedSoftware relSoftware) {
|
||||
this.relSoftware = relSoftware;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
[
|
||||
{
|
||||
"paramName": "g",
|
||||
"paramLongName": "graphPath",
|
||||
"paramDescription": "the path where there the graph is stored",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "o",
|
||||
"paramLongName": "relsPath",
|
||||
"paramDescription": "the path where the generated relations will be stored",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -0,0 +1,14 @@
|
|||
[
|
||||
{
|
||||
"paramName": "g",
|
||||
"paramLongName": "graphPath",
|
||||
"paramDescription": "the path where there the graph is stored",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "o",
|
||||
"paramLongName": "simpleEntitiesPath",
|
||||
"paramDescription": "the path where the generated simple entities (without relations) will be stored",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
Loading…
Reference in New Issue