diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml index f060a60bc..eddc042c6 100644 --- a/dhp-workflows/dhp-broker-events/pom.xml +++ b/dhp-workflows/dhp-broker-events/pom.xml @@ -61,7 +61,7 @@ eu.dnetlib dnet-openaire-broker-common - [2.0.1,3.0.0) + [3.0.1,4.0.0) diff --git a/dhp-workflows/dhp-broker-events/report.xml b/dhp-workflows/dhp-broker-events/report.xml new file mode 100644 index 000000000..6e706f723 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/report.xml @@ -0,0 +1,37 @@ + + + + Feature Extraction + + + TCPFLOW + 1.5.0 + + 4.2.1 (4.2.1 Compatible Apple LLVM 11.0.0 (clang-1100.0.33.8)) + -D_THREAD_SAFE -pthread -I/usr/local/include -I/usr/local/include -DUTC_OFFSET=+0000 + -g -D_THREAD_SAFE -pthread -g -O3 -MD -Wpointer-arith -Wmissing-declarations -Wmissing-prototypes -Wshadow -Wwrite-strings -Wcast-align -Waggregate-return -Wbad-function-cast -Wcast-qual -Wundef -Wredundant-decls -Wdisabled-optimization -Wfloat-equal -Wmultichar -Wc++-compat -Wmissing-noreturn -Wall -Wstrict-prototypes -MD -D_FORTIFY_SOURCE=2 -Wpointer-arith -Wmissing-declarations -Wmissing-prototypes -Wshadow -Wwrite-strings -Wcast-align -Waggregate-return -Wbad-function-cast -Wcast-qual -Wundef -Wredundant-decls -Wdisabled-optimization -Wfloat-equal -Wmultichar -Wc++-compat -Wmissing-noreturn -Wall -Wstrict-prototypes + -g -D_THREAD_SAFE -pthread -g -O3 -Wall -MD -D_FORTIFY_SOURCE=2 -Wpointer-arith -Wshadow -Wwrite-strings -Wcast-align -Wredundant-decls -Wdisabled-optimization -Wfloat-equal -Wmultichar -Wmissing-noreturn -Woverloaded-virtual -Wsign-promo -funit-at-a-time -Weffc++ -std=c++11 -Wall -MD -D_FORTIFY_SOURCE=2 -Wpointer-arith -Wshadow -Wwrite-strings -Wcast-align -Wredundant-decls -Wdisabled-optimization -Wfloat-equal -Wmultichar -Wmissing-noreturn -Woverloaded-virtual -Wsign-promo -funit-at-a-time -Weffc++ + -L/usr/local/lib -L/usr/local/lib + -lpython2.7 -lpython2.7 -lpcap -lbz2 -lexpat -lsqlite3 -lcrypto -lssl -lcrypto -ldl -lz + 2019-10-11T01:16:58 + + + + + Darwin + 19.5.0 + Darwin Kernel Version 19.5.0: Tue May 26 20:41:44 PDT 2020; root:xnu-6153.121.2~2/RELEASE_X86_64 + Micheles-MBP.local + x86_64 + tcpflow + 501 + michele + 2020-06-15T14:55:03Z + + + + + 0 diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Event.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Event.java index 0512a3813..f94d286e4 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Event.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Event.java @@ -1,9 +1,15 @@ package eu.dnetlib.dhp.broker.model; +import java.io.Serializable; import java.util.Map; -public class Event { +public class Event implements Serializable { + + /** + * + */ + private static final long serialVersionUID = -5936790326505781395L; private String eventId; diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java index 9146cf422..bf4f62d24 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java @@ -6,17 +6,13 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateUtils; +import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; -import eu.dnetlib.dhp.schema.oaf.Author; -import eu.dnetlib.dhp.schema.oaf.KeyValue; -import eu.dnetlib.dhp.schema.oaf.Result; -import eu.dnetlib.dhp.schema.oaf.StructuredProperty; public class EventFactory { @@ -37,8 +33,7 @@ public class EventFactory { final Map map = createMapFromResult(updateInfo); final String eventId = calculateEventId( - updateInfo.getTopicPath(), updateInfo.getTarget().getResult().getOriginalId().get(0), - updateInfo.getHighlightValueAsString()); + updateInfo.getTopicPath(), updateInfo.getTarget().getOriginalId(), updateInfo.getHighlightValueAsString()); res.setEventId(eventId); res.setProducerId(PRODUCER_ID); @@ -54,53 +49,31 @@ public class EventFactory { private static Map createMapFromResult(final UpdateInfo updateInfo) { final Map map = new HashMap<>(); - final Result source = updateInfo.getSource().getResult(); - final Result target = updateInfo.getTarget().getResult(); + final OpenaireBrokerResult source = updateInfo.getSource(); + final OpenaireBrokerResult target = updateInfo.getTarget(); - final List collectedFrom = target.getCollectedfrom(); - if (collectedFrom.size() == 1) { - map.put("target_datasource_id", collectedFrom.get(0).getKey()); - map.put("target_datasource_name", collectedFrom.get(0).getValue()); - } + map.put("target_datasource_id", target.getCollectedFromId()); + map.put("target_datasource_name", target.getCollectedFromName()); - final List ids = target.getOriginalId(); - if (ids.size() > 0) { - map.put("target_publication_id", ids.get(0)); - } + map.put("target_publication_id", target.getOriginalId()); - final List titles = target.getTitle(); + final List titles = target.getTitles(); if (titles.size() > 0) { map.put("target_publication_title", titles.get(0)); } - final long date = parseDateTolong(target.getDateofacceptance().getValue()); + final long date = parseDateTolong(target.getPublicationdate()); if (date > 0) { map.put("target_dateofacceptance", date); } - final List subjects = target.getSubject(); - if (subjects.size() > 0) { - map - .put( - "target_publication_subject_list", - subjects.stream().map(StructuredProperty::getValue).collect(Collectors.toList())); - } - - final List authors = target.getAuthor(); - if (authors.size() > 0) { - map - .put( - "target_publication_author_list", - authors.stream().map(Author::getFullname).collect(Collectors.toList())); - } + map.put("target_publication_subject_list", target.getSubjects()); + map.put("target_publication_author_list", target.getCreators()); // PROVENANCE INFO map.put("trust", updateInfo.getTrust()); - final List sourceCollectedFrom = source.getCollectedfrom(); - if (sourceCollectedFrom.size() == 1) { - map.put("provenance_datasource_id", sourceCollectedFrom.get(0).getKey()); - map.put("provenance_datasource_name", sourceCollectedFrom.get(0).getValue()); - } + map.put("provenance_datasource_id", source.getCollectedFromId()); + map.put("provenance_datasource_name", source.getCollectedFromName()); map.put("provenance_publication_id_list", source.getOriginalId()); return map; diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java index 4d40ba80d..940d7f9f3 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java @@ -18,18 +18,20 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.broker.model.Event; import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; +import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.EventFinder; import eu.dnetlib.dhp.broker.oa.util.EventGroup; import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator; import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OpenaireBrokerResultAggregator; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelationsAggregator; import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Relation; @@ -73,6 +75,8 @@ public class GenerateEventsApplication { log.info("dedupConfigProfileId: {}", dedupConfigProfileId); final SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(BrokerConstants.getModelClasses()); final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId); @@ -80,13 +84,16 @@ public class GenerateEventsApplication { removeOutputDir(spark, eventsPath); - final Dataset all = spark.emptyDataset(Encoders.kryo(Event.class)); - - for (final Class r1 : BrokerConstants.RESULT_CLASSES) { - all.union(generateEvents(spark, graphPath, r1, dedupConfig)); - } - - all.write().mode(SaveMode.Overwrite).option("compression", "gzip").json(eventsPath); + spark + .emptyDataset(Encoders.kryo(Event.class)) + .union(generateEvents(spark, graphPath, Publication.class, dedupConfig)) + .union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig)) + .union(generateEvents(spark, graphPath, Software.class, dedupConfig)) + .union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(eventsPath); }); } @@ -101,18 +108,18 @@ public class GenerateEventsApplication { final Class sourceClass, final DedupConfig dedupConfig) { - final Dataset results = expandResultsWithRelations(spark, graphPath, sourceClass); + final Dataset results = expandResultsWithRelations(spark, graphPath, sourceClass); final Dataset mergedRels = readPath(spark, graphPath + "/relation", Relation.class) .filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)); - final TypedColumn, ResultGroup> aggr = new ResultAggregator() + final TypedColumn, ResultGroup> aggr = new ResultAggregator() .toColumn(); return results .joinWith(mergedRels, results.col("result.id").equalTo(mergedRels.col("source")), "inner") .groupByKey( - (MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) + (MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) .agg(aggr) .map((MapFunction, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class)) .filter(ResultGroup::isValid) @@ -122,7 +129,7 @@ public class GenerateEventsApplication { .flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class)); } - private static Dataset expandResultsWithRelations( + private static Dataset expandResultsWithRelations( final SparkSession spark, final String graphPath, final Class sourceClass) { @@ -135,14 +142,15 @@ public class GenerateEventsApplication { final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class) .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)); - final Dataset r0 = readPath( + final Dataset r0 = readPath( spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class) .filter(r -> r.getDataInfo().getDeletedbyinference()) - .map(r -> new ResultWithRelations(r), Encoders.kryo(ResultWithRelations.class)); - final Dataset r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class)); - final Dataset r2 = join(r1, rels, relatedEntities(softwares, rels, RelatedProject.class)); - final Dataset r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedProject.class)); - final Dataset r4 = join( + .map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class)); + + final Dataset r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class)); + final Dataset r2 = join(r1, rels, relatedEntities(softwares, rels, RelatedProject.class)); + final Dataset r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedProject.class)); + final Dataset r4 = join( r3, rels, relatedEntities(publications, rels, RelatedProject.class)); ; @@ -159,20 +167,20 @@ public class GenerateEventsApplication { Encoders.kryo(clazz)); } - private static Dataset join(final Dataset sources, + private static Dataset join(final Dataset sources, final Dataset rels, final Dataset typedRels) { - final TypedColumn, ResultWithRelations> aggr = new ResultWithRelationsAggregator() + final TypedColumn, OpenaireBrokerResult> aggr = new OpenaireBrokerResultAggregator() .toColumn(); ; return sources .joinWith(typedRels, sources.col("result.id").equalTo(rels.col("source")), "left_outer") .groupByKey( - (MapFunction, String>) t -> t._1.getResult().getId(), Encoders.STRING()) + (MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) .agg(aggr) - .map(t -> t._2, Encoders.kryo(ResultWithRelations.class)); + .map(t -> t._2, Encoders.kryo(OpenaireBrokerResult.class)); } public static Dataset readPath( diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java index 0e57f32f9..13aeefb2f 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java @@ -12,22 +12,20 @@ import java.util.function.Function; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.StringUtils; -import eu.dnetlib.broker.objects.Publication; +import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; -import eu.dnetlib.dhp.schema.oaf.Field; import eu.dnetlib.pace.config.DedupConfig; public abstract class UpdateMatcher { private final boolean multipleUpdate; private final Function topicFunction; - private final BiConsumer compileHighlightFunction; + private final BiConsumer compileHighlightFunction; private final Function highlightToStringFunction; public UpdateMatcher(final boolean multipleUpdate, final Function topicFunction, - final BiConsumer compileHighlightFunction, + final BiConsumer compileHighlightFunction, final Function highlightToStringFunction) { this.multipleUpdate = multipleUpdate; this.topicFunction = topicFunction; @@ -35,19 +33,18 @@ public abstract class UpdateMatcher { this.highlightToStringFunction = highlightToStringFunction; } - public Collection> searchUpdatesForRecord(final ResultWithRelations res, - final Collection others, + public Collection> searchUpdatesForRecord(final OpenaireBrokerResult res, + final Collection others, final DedupConfig dedupConfig) { final Map> infoMap = new HashMap<>(); - for (final ResultWithRelations source : others) { + for (final OpenaireBrokerResult source : others) { if (source != res) { for (final T hl : findDifferences(source, res)) { final Topic topic = getTopicFunction().apply(hl); final UpdateInfo info = new UpdateInfo<>(topic, hl, source, res, getCompileHighlightFunction(), - getHighlightToStringFunction(), - dedupConfig); + getHighlightToStringFunction(), dedupConfig); final String s = DigestUtils.md5Hex(info.getHighlightValueAsString()); if (!infoMap.containsKey(s) || infoMap.get(s).getTrust() < info.getTrust()) { } else { @@ -71,14 +68,14 @@ public abstract class UpdateMatcher { } } - protected abstract List findDifferences(ResultWithRelations source, ResultWithRelations target); + protected abstract List findDifferences(OpenaireBrokerResult source, OpenaireBrokerResult target); - protected static boolean isMissing(final List> list) { - return list == null || list.isEmpty() || StringUtils.isBlank(list.get(0).getValue()); + protected static boolean isMissing(final List list) { + return list == null || list.isEmpty() || StringUtils.isBlank(list.get(0)); } - protected boolean isMissing(final Field field) { - return field == null || StringUtils.isBlank(field.getValue()); + protected boolean isMissing(final String field) { + return StringUtils.isBlank(field); } public boolean isMultipleUpdate() { @@ -89,7 +86,7 @@ public abstract class UpdateMatcher { return topicFunction; } - public BiConsumer getCompileHighlightFunction() { + public BiConsumer getCompileHighlightFunction() { return compileHighlightFunction; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java index a8e0a2c42..7a58f986b 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java @@ -5,45 +5,39 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; +import eu.dnetlib.broker.objects.Dataset; +import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; -import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; -import eu.dnetlib.dhp.schema.oaf.Dataset; public abstract class AbstractEnrichMissingDataset - extends UpdateMatcher { + extends UpdateMatcher { public AbstractEnrichMissingDataset(final Topic topic) { super(true, rel -> topic, (p, rel) -> p.getDatasets().add(rel), - rel -> rel.getInstances().get(0).getUrl()); + rel -> rel.getOriginalId()); } protected abstract boolean filterByType(String relType); @Override - protected final List findDifferences( - final ResultWithRelations source, - final ResultWithRelations target) { + protected final List findDifferences(final OpenaireBrokerResult source, + final OpenaireBrokerResult target) { final Set existingDatasets = target .getDatasets() .stream() .filter(rel -> filterByType(rel.getRelType())) - .map(RelatedDataset::getRelDataset) - .map(Dataset::getId) + .map(Dataset::getOriginalId) .collect(Collectors.toSet()); return source .getDatasets() .stream() .filter(rel -> filterByType(rel.getRelType())) - .map(RelatedDataset::getRelDataset) - .filter(d -> !existingDatasets.contains(d.getId())) - .map(ConversionUtils::oafDatasetToBrokerDataset) + .filter(d -> !existingDatasets.contains(d.getOriginalId())) .collect(Collectors.toList()); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java index ab4325c1e..fa5fde725 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java @@ -1,19 +1,15 @@ package eu.dnetlib.dhp.broker.oa.matchers.relatedProjects; -import java.util.Arrays; +import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; +import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.broker.objects.Project; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; -import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; -public class EnrichMissingProject - extends UpdateMatcher { +public class EnrichMissingProject extends UpdateMatcher { public EnrichMissingProject() { super(true, @@ -23,16 +19,11 @@ public class EnrichMissingProject } @Override - protected List findDifferences(final ResultWithRelations source, final ResultWithRelations target) { - if (source.getProjects().isEmpty()) { - return Arrays.asList(); + protected List findDifferences(final OpenaireBrokerResult source, final OpenaireBrokerResult target) { + if (target.getProjects().isEmpty()) { + return source.getProjects(); } else { - return target - .getProjects() - .stream() - .map(RelatedProject::getRelProject) - .map(ConversionUtils::oafProjectToBrokerProject) - .collect(Collectors.toList()); + return new ArrayList<>(); } } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java index 3bf23a36b..ca63aeb49 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java @@ -5,39 +5,38 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; +import eu.dnetlib.broker.objects.OpenaireBrokerResult; +import eu.dnetlib.broker.objects.Project; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; -import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; -import eu.dnetlib.dhp.schema.oaf.Project; -public class EnrichMoreProject extends UpdateMatcher { +public class EnrichMoreProject extends UpdateMatcher { public EnrichMoreProject() { super(true, prj -> Topic.ENRICH_MORE_PROJECT, (p, prj) -> p.getProjects().add(prj), - prj -> prj.getFunder() + "::" + prj.getFundingProgram() + prj.getCode()); + prj -> projectAsString(prj)); + } + + private static String projectAsString(final Project prj) { + return prj.getFunder() + "::" + prj.getFundingProgram() + "::" + prj.getCode(); } @Override - protected List findDifferences(final ResultWithRelations source, - final ResultWithRelations target) { + protected List findDifferences(final OpenaireBrokerResult source, + final OpenaireBrokerResult target) { - final Set existingProjects = source + final Set existingProjects = target .getProjects() .stream() - .map(RelatedProject::getRelProject) - .map(Project::getId) + .map(EnrichMoreProject::projectAsString) .collect(Collectors.toSet()); - return target + return source .getProjects() .stream() - .map(RelatedProject::getRelProject) - .filter(p -> !existingProjects.contains(p.getId())) - .map(ConversionUtils::oafProjectToBrokerProject) + .filter(p -> !existingProjects.contains(projectAsString(p))) .collect(Collectors.toList()); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java index bba3e9648..300863949 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java @@ -5,21 +5,18 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; +import eu.dnetlib.broker.objects.OpenaireBrokerResult; +import eu.dnetlib.broker.objects.Publication; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; -import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; -import eu.dnetlib.dhp.schema.oaf.Publication; -public abstract class AbstractEnrichMissingPublication - extends UpdateMatcher { +public abstract class AbstractEnrichMissingPublication extends UpdateMatcher { public AbstractEnrichMissingPublication(final Topic topic) { super(true, rel -> topic, (p, rel) -> p.getPublications().add(rel), - rel -> rel.getInstances().get(0).getUrl()); + rel -> rel.getOriginalId()); } @@ -27,24 +24,21 @@ public abstract class AbstractEnrichMissingPublication @Override protected final List findDifferences( - final ResultWithRelations source, - final ResultWithRelations target) { + final OpenaireBrokerResult source, + final OpenaireBrokerResult target) { final Set existingPublications = target .getPublications() .stream() .filter(rel -> filterByType(rel.getRelType())) - .map(RelatedPublication::getRelPublication) - .map(Publication::getId) + .map(Publication::getOriginalId) .collect(Collectors.toSet()); return source .getPublications() .stream() .filter(rel -> filterByType(rel.getRelType())) - .map(RelatedPublication::getRelPublication) - .filter(d -> !existingPublications.contains(d.getId())) - .map(ConversionUtils::oafResultToBrokerPublication) + .filter(p -> !existingPublications.contains(p.getOriginalId())) .collect(Collectors.toList()); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java index 6090939dc..76ae061e6 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java @@ -1,15 +1,12 @@ package eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware; -import java.util.Arrays; +import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; +import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; -import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; public class EnrichMissingSoftware extends UpdateMatcher { @@ -23,18 +20,13 @@ public class EnrichMissingSoftware @Override protected List findDifferences( - final ResultWithRelations source, - final ResultWithRelations target) { + final OpenaireBrokerResult source, + final OpenaireBrokerResult target) { - if (source.getSoftwares().isEmpty()) { - return Arrays.asList(); + if (target.getSoftwares().isEmpty()) { + return source.getSoftwares(); } else { - return target - .getSoftwares() - .stream() - .map(RelatedSoftware::getRelSoftware) - .map(ConversionUtils::oafSoftwareToBrokerSoftware) - .collect(Collectors.toList()); + return new ArrayList<>(); } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java index ba422f436..ebd421b8e 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java @@ -5,15 +5,12 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; +import eu.dnetlib.broker.objects.OpenaireBrokerResult; +import eu.dnetlib.broker.objects.Software; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; -import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; -import eu.dnetlib.dhp.schema.oaf.Software; -public class EnrichMoreSoftware - extends UpdateMatcher { +public class EnrichMoreSoftware extends UpdateMatcher { public EnrichMoreSoftware() { super(true, @@ -24,22 +21,19 @@ public class EnrichMoreSoftware @Override protected List findDifferences( - final ResultWithRelations source, - final ResultWithRelations target) { + final OpenaireBrokerResult source, + final OpenaireBrokerResult target) { final Set existingSoftwares = source .getSoftwares() .stream() - .map(RelatedSoftware::getRelSoftware) - .map(Software::getId) + .map(Software::getName) .collect(Collectors.toSet()); return target .getSoftwares() .stream() - .map(RelatedSoftware::getRelSoftware) - .filter(p -> !existingSoftwares.contains(p.getId())) - .map(ConversionUtils::oafSoftwareToBrokerSoftware) + .filter(p -> !existingSoftwares.contains(p.getName())) .collect(Collectors.toList()); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAbstract.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAbstract.java index 25d5f9d8a..b2cbbce2c 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAbstract.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAbstract.java @@ -5,9 +5,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; public class EnrichMissingAbstract extends UpdateMatcher { @@ -19,13 +19,12 @@ public class EnrichMissingAbstract extends UpdateMatcher { } @Override - protected List findDifferences(final ResultWithRelations source, - final ResultWithRelations target) { - if (isMissing(target.getResult().getDescription()) && !isMissing(source.getResult().getDescription())) { - return Arrays - .asList(source.getResult().getDescription().get(0).getValue()); + protected List findDifferences(final OpenaireBrokerResult source, final OpenaireBrokerResult target) { + if (isMissing(target.getAbstracts()) && !isMissing(source.getAbstracts())) { + return Arrays.asList(source.getAbstracts().get(0)); + } else { + return new ArrayList<>(); } - return new ArrayList<>(); } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java index eed2d0be8..c4b96e67b 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java @@ -1,53 +1,43 @@ package eu.dnetlib.dhp.broker.oa.matchers.simple; -import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; + +import eu.dnetlib.broker.objects.Author; +import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; -import eu.dnetlib.dhp.schema.oaf.Author; -import eu.dnetlib.dhp.schema.oaf.StructuredProperty; -public class EnrichMissingAuthorOrcid extends UpdateMatcher { +public class EnrichMissingAuthorOrcid extends UpdateMatcher { public EnrichMissingAuthorOrcid() { super(true, aut -> Topic.ENRICH_MISSING_AUTHOR_ORCID, (p, aut) -> p.getCreators().add(aut), - aut -> aut); + aut -> aut.getOrcid()); } @Override - protected List findDifferences(final ResultWithRelations source, - final ResultWithRelations target) { + protected List findDifferences(final OpenaireBrokerResult source, + final OpenaireBrokerResult target) { final Set existingOrcids = target - .getResult() - .getAuthor() + .getCreators() .stream() - .map(Author::getPid) - .flatMap(List::stream) - .filter(pid -> pid.getQualifier().getClassid().equalsIgnoreCase("orcid")) - .map(pid -> pid.getValue()) + .map(Author::getOrcid) + .filter(StringUtils::isNotBlank) .collect(Collectors.toSet()); - final List list = new ArrayList<>(); + return source + .getCreators() + .stream() + .filter(a -> StringUtils.isNotBlank(a.getOrcid())) + .filter(a -> !existingOrcids.contains(a.getOrcid())) + .collect(Collectors.toList()); - for (final Author author : source.getResult().getAuthor()) { - final String name = author.getFullname(); - - for (final StructuredProperty pid : author.getPid()) { - if (pid.getQualifier().getClassid().equalsIgnoreCase("orcid") - && !existingOrcids.contains(pid.getValue())) { - list.add(name + " [ORCID: " + pid.getValue() + "]"); - } - } - } - - return list; } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java index e8ca18ae9..e870cf1fa 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java @@ -6,11 +6,10 @@ import java.util.List; import java.util.stream.Collectors; import eu.dnetlib.broker.objects.Instance; +import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; -import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; public class EnrichMissingOpenAccess extends UpdateMatcher { @@ -22,13 +21,12 @@ public class EnrichMissingOpenAccess extends UpdateMatcher { } @Override - protected List findDifferences(final ResultWithRelations source, - final ResultWithRelations target) { + protected List findDifferences(final OpenaireBrokerResult source, + final OpenaireBrokerResult target) { final long count = target - .getResult() - .getInstance() + .getInstances() .stream() - .map(i -> i.getAccessright().getClassid()) + .map(Instance::getLicense) .filter(right -> right.equals(BrokerConstants.OPEN_ACCESS)) .count(); @@ -37,12 +35,9 @@ public class EnrichMissingOpenAccess extends UpdateMatcher { } return source - .getResult() - .getInstance() + .getInstances() .stream() - .filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS)) - .map(ConversionUtils::oafInstanceToBrokerInstances) - .flatMap(List::stream) + .filter(i -> i.getLicense().equals(BrokerConstants.OPEN_ACCESS)) .collect(Collectors.toList()); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPid.java index 6c061a2e8..cc72d9fa9 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPid.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPid.java @@ -5,13 +5,12 @@ import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; -import eu.dnetlib.broker.objects.Pid; +import eu.dnetlib.broker.objects.OpenaireBrokerResult; +import eu.dnetlib.broker.objects.TypedValue; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; -import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; -public class EnrichMissingPid extends UpdateMatcher { +public class EnrichMissingPid extends UpdateMatcher { public EnrichMissingPid() { super(true, @@ -21,19 +20,17 @@ public class EnrichMissingPid extends UpdateMatcher { } @Override - protected List findDifferences(final ResultWithRelations source, - final ResultWithRelations target) { - final long count = target.getResult().getPid().size(); + protected List findDifferences(final OpenaireBrokerResult source, + final OpenaireBrokerResult target) { + final long count = target.getPids().size(); if (count > 0) { return Arrays.asList(); } return source - .getResult() - .getPid() + .getPids() .stream() - .map(ConversionUtils::oafPidToBrokerPid) .collect(Collectors.toList()); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDate.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDate.java index 27a71740c..ed8c26b5a 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDate.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDate.java @@ -5,9 +5,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; public class EnrichMissingPublicationDate extends UpdateMatcher { @@ -19,13 +19,14 @@ public class EnrichMissingPublicationDate extends UpdateMatcher { } @Override - protected List findDifferences(final ResultWithRelations source, - final ResultWithRelations target) { - if (isMissing(target.getResult().getDateofacceptance()) - && !isMissing(source.getResult().getDateofacceptance())) { - return Arrays.asList(source.getResult().getDateofacceptance().getValue()); + protected List findDifferences(final OpenaireBrokerResult source, + final OpenaireBrokerResult target) { + + if (isMissing(target.getPublicationdate()) && !isMissing(source.getPublicationdate())) { + return Arrays.asList(source.getPublicationdate()); + } else { + return new ArrayList<>(); } - return new ArrayList<>(); } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSubject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSubject.java index 04de30089..07b1fa41a 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSubject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSubject.java @@ -5,42 +5,38 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; -import org.apache.commons.lang3.tuple.Pair; - +import eu.dnetlib.broker.objects.OpenaireBrokerResult; +import eu.dnetlib.broker.objects.TypedValue; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; -import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; -import eu.dnetlib.dhp.schema.oaf.Qualifier; -import eu.dnetlib.dhp.schema.oaf.StructuredProperty; -public class EnrichMissingSubject extends UpdateMatcher> { +public class EnrichMissingSubject extends UpdateMatcher { public EnrichMissingSubject() { super(true, - pair -> Topic.fromPath("ENRICH/MISSING/SUBJECT/" + pair.getLeft()), - (p, pair) -> p.getSubjects().add(pair.getRight()), - pair -> pair.getLeft() + "::" + pair.getRight()); + s -> Topic.fromPath("ENRICH/MISSING/SUBJECT/" + s.getType()), + (p, s) -> p.getSubjects().add(s), + s -> subjectAsString(s)); } @Override - protected List> findDifferences(final ResultWithRelations source, - final ResultWithRelations target) { - final Set existingTypes = target - .getResult() - .getSubject() + protected List findDifferences(final OpenaireBrokerResult source, + final OpenaireBrokerResult target) { + final Set existingSubject = target + .getSubjects() .stream() - .map(StructuredProperty::getQualifier) - .map(Qualifier::getClassid) + .map(s -> subjectAsString(s)) .collect(Collectors.toSet()); return source - .getResult() - .getPid() + .getSubjects() .stream() - .filter(pid -> !existingTypes.contains(pid.getQualifier().getClassid())) - .map(ConversionUtils::oafSubjectToPair) + .filter(s -> !existingSubject.contains(subjectAsString(s))) .collect(Collectors.toList()); } + private static String subjectAsString(final TypedValue s) { + return s.getType() + "::" + s.getValue(); + } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java index 3ee4c8bdd..bfef3ee4f 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java @@ -6,11 +6,10 @@ import java.util.Set; import java.util.stream.Collectors; import eu.dnetlib.broker.objects.Instance; +import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; -import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; public class EnrichMoreOpenAccess extends UpdateMatcher { @@ -22,24 +21,19 @@ public class EnrichMoreOpenAccess extends UpdateMatcher { } @Override - protected List findDifferences(final ResultWithRelations source, - final ResultWithRelations target) { + protected List findDifferences(final OpenaireBrokerResult source, + final OpenaireBrokerResult target) { final Set urls = target - .getResult() - .getInstance() + .getInstances() .stream() - .filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS)) + .filter(i -> i.getLicense().equals(BrokerConstants.OPEN_ACCESS)) .map(i -> i.getUrl()) - .flatMap(List::stream) .collect(Collectors.toSet()); return source - .getResult() - .getInstance() + .getInstances() .stream() - .filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS)) - .map(ConversionUtils::oafInstanceToBrokerInstances) - .flatMap(List::stream) + .filter(i -> i.getLicense().equals(BrokerConstants.OPEN_ACCESS)) .filter(i -> !urls.contains(i.getUrl())) .collect(Collectors.toList()); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMorePid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMorePid.java index 694c9460a..d1f2e6022 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMorePid.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMorePid.java @@ -5,38 +5,37 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; -import eu.dnetlib.broker.objects.Pid; +import eu.dnetlib.broker.objects.OpenaireBrokerResult; +import eu.dnetlib.broker.objects.TypedValue; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; -import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; -public class EnrichMorePid extends UpdateMatcher { +public class EnrichMorePid extends UpdateMatcher { public EnrichMorePid() { super(true, pid -> Topic.ENRICH_MORE_PID, (p, pid) -> p.getPids().add(pid), - pid -> pid.getType() + "::" + pid.getValue()); + pid -> pidAsString(pid)); } @Override - protected List findDifferences(final ResultWithRelations source, - final ResultWithRelations target) { + protected List findDifferences(final OpenaireBrokerResult source, + final OpenaireBrokerResult target) { final Set existingPids = target - .getResult() - .getPid() + .getPids() .stream() - .map(pid -> pid.getQualifier().getClassid() + "::" + pid.getValue()) + .map(pid -> pidAsString(pid)) .collect(Collectors.toSet()); return source - .getResult() - .getPid() + .getPids() .stream() - .filter(pid -> !existingPids.contains(pid.getQualifier().getClassid() + "::" + pid.getValue())) - .map(ConversionUtils::oafPidToBrokerPid) + .filter(pid -> !existingPids.contains(pidAsString(pid))) .collect(Collectors.toList()); } + private static String pidAsString(final TypedValue pid) { + return pid.getType() + "::" + pid.getValue(); + } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java index 7d0b1a65e..39225e8ab 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java @@ -5,39 +5,37 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; -import org.apache.commons.lang3.tuple.Pair; - +import eu.dnetlib.broker.objects.OpenaireBrokerResult; +import eu.dnetlib.broker.objects.TypedValue; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; -import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; -public class EnrichMoreSubject extends UpdateMatcher> { +public class EnrichMoreSubject extends UpdateMatcher { public EnrichMoreSubject() { super(true, - pair -> Topic.fromPath("ENRICH/MORE/SUBJECT/" + pair.getLeft()), - (p, pair) -> p.getSubjects().add(pair.getRight()), - pair -> pair.getLeft() + "::" + pair.getRight()); + s -> Topic.fromPath("ENRICH/MORE/SUBJECT/" + s.getType()), + (p, s) -> p.getSubjects().add(s), + s -> subjectAsString(s)); } @Override - protected List> findDifferences(final ResultWithRelations source, - final ResultWithRelations target) { + protected List findDifferences(final OpenaireBrokerResult source, + final OpenaireBrokerResult target) { final Set existingSubjects = target - .getResult() - .getSubject() + .getSubjects() .stream() - .map(pid -> pid.getQualifier().getClassid() + "::" + pid.getValue()) + .map(pid -> subjectAsString(pid)) .collect(Collectors.toSet()); return source - .getResult() - .getPid() + .getPids() .stream() - .filter(pid -> !existingSubjects.contains(pid.getQualifier().getClassid() + "::" + pid.getValue())) - .map(ConversionUtils::oafSubjectToPair) + .filter(s -> !existingSubjects.contains(subjectAsString(s))) .collect(Collectors.toList()); } + private static String subjectAsString(final TypedValue s) { + return s.getType() + "::" + s.getValue(); + } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/BrokerConstants.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/BrokerConstants.java index 0665c69dd..49c46c7f0 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/BrokerConstants.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/BrokerConstants.java @@ -2,13 +2,12 @@ package eu.dnetlib.dhp.broker.oa.util; import java.util.Arrays; -import java.util.List; +import java.util.HashSet; +import java.util.Set; -import eu.dnetlib.dhp.schema.oaf.Dataset; -import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; -import eu.dnetlib.dhp.schema.oaf.Publication; -import eu.dnetlib.dhp.schema.oaf.Result; -import eu.dnetlib.dhp.schema.oaf.Software; +import eu.dnetlib.dhp.broker.model.Event; +import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup; +import eu.dnetlib.dhp.schema.common.ModelSupport; public class BrokerConstants { @@ -18,7 +17,11 @@ public class BrokerConstants { public static final float MIN_TRUST = 0.25f; public static final float MAX_TRUST = 1.00f; - public static final List> RESULT_CLASSES = Arrays - .asList(Publication.class, Dataset.class, Software.class, OtherResearchProduct.class); + public static Class[] getModelClasses() { + final Set> list = new HashSet<>(); + list.addAll(Arrays.asList(ModelSupport.getOafModelClasses())); + list.addAll(Arrays.asList(ResultGroup.class, Event.class)); + return list.toArray(new Class[] {}); + } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java index 2b39115b1..b80848682 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java @@ -13,7 +13,8 @@ import org.dom4j.DocumentHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import eu.dnetlib.broker.objects.Pid; +import eu.dnetlib.broker.objects.OpenaireBrokerResult; +import eu.dnetlib.broker.objects.TypedValue; import eu.dnetlib.dhp.schema.oaf.Author; import eu.dnetlib.dhp.schema.oaf.Dataset; import eu.dnetlib.dhp.schema.oaf.ExternalReference; @@ -41,8 +42,8 @@ public class ConversionUtils { }).collect(Collectors.toList()); } - public static Pid oafPidToBrokerPid(final StructuredProperty sp) { - return sp != null ? new Pid() + public static TypedValue oafPidToBrokerPid(final StructuredProperty sp) { + return sp != null ? new TypedValue() .setValue(sp.getValue()) .setType(sp.getQualifier().getClassid()) : null; } @@ -54,7 +55,7 @@ public class ConversionUtils { public static final eu.dnetlib.broker.objects.Dataset oafDatasetToBrokerDataset(final Dataset d) { return d != null ? new eu.dnetlib.broker.objects.Dataset() .setOriginalId(d.getOriginalId().get(0)) - .setTitles(structPropList(d.getTitle())) + .setTitle(structPropValue(d.getTitle())) .setPids(d.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList())) .setInstances( d @@ -63,26 +64,30 @@ public class ConversionUtils { .map(ConversionUtils::oafInstanceToBrokerInstances) .flatMap(List::stream) .collect(Collectors.toList())) - .setCollectedFrom(d.getCollectedfrom().stream().map(KeyValue::getValue).collect(Collectors.toList())) + .setCollectedFrom(d.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null)) : null; } - public static final eu.dnetlib.broker.objects.Publication oafResultToBrokerPublication(final Result result) { + public static final OpenaireBrokerResult oafResultToBrokerResult(final Result result) { - return result != null ? new eu.dnetlib.broker.objects.Publication() + return result != null ? new OpenaireBrokerResult() + .setOpenaireId(result.getId()) .setOriginalId(result.getOriginalId().get(0)) + .setTypology(result.getResulttype().getClassid()) .setTitles(structPropList(result.getTitle())) .setAbstracts(fieldList(result.getDescription())) .setLanguage(result.getLanguage().getClassid()) - .setSubjects(structPropList(result.getSubject())) - .setCreators(result.getAuthor().stream().map(Author::getFullname).collect(Collectors.toList())) - .setPublicationdate(result.getDateofcollection()) + .setSubjects(structPropTypedList(result.getSubject())) + .setCreators( + result.getAuthor().stream().map(ConversionUtils::oafAuthorToBrokerAuthor).collect(Collectors.toList())) + .setPublicationdate(result.getDateofacceptance().getValue()) .setPublisher(fieldValue(result.getPublisher())) .setEmbargoenddate(fieldValue(result.getEmbargoenddate())) .setContributor(fieldList(result.getContributor())) .setJournal( result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null) - .setCollectedFrom(result.getCollectedfrom().stream().map(KeyValue::getValue).collect(Collectors.toList())) + .setCollectedFromId(result.getCollectedfrom().stream().map(KeyValue::getKey).findFirst().orElse(null)) + .setCollectedFromName(result.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null)) .setPids(result.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList())) .setInstances( result @@ -100,6 +105,30 @@ public class ConversionUtils { : null; } + private static List structPropTypedList(final List list) { + return list + .stream() + .map( + p -> new TypedValue() + .setValue(p.getValue()) + .setType(p.getQualifier().getClassid())) + .collect(Collectors.toList()); + } + + private static eu.dnetlib.broker.objects.Author oafAuthorToBrokerAuthor(final Author author) { + return author != null ? new eu.dnetlib.broker.objects.Author() + .setFullname(author.getFullname()) + .setOrcid( + author + .getPid() + .stream() + .filter(pid -> pid.getQualifier().getClassid().equalsIgnoreCase("orcid")) + .map(pid -> pid.getValue()) + .findFirst() + .orElse(null)) + : null; + } + private static eu.dnetlib.broker.objects.Journal oafJournalToBrokerJournal(final Journal journal) { return journal != null ? new eu.dnetlib.broker.objects.Journal() .setName(journal.getName()) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java index b4de08db7..4c20ac5ca 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java @@ -4,6 +4,7 @@ package eu.dnetlib.dhp.broker.oa.util; import java.util.ArrayList; import java.util.List; +import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.dhp.broker.model.EventFactory; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsReferencedBy; @@ -30,7 +31,6 @@ import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreOpenAccess; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMorePid; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSubject; import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; import eu.dnetlib.pace.config.DedupConfig; public class EventFinder { @@ -68,7 +68,7 @@ public class EventFinder { public static EventGroup generateEvents(final ResultGroup results, final DedupConfig dedupConfig) { final List> list = new ArrayList<>(); - for (final ResultWithRelations target : results.getData()) { + for (final OpenaireBrokerResult target : results.getData()) { for (final UpdateMatcher matcher : matchers) { list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), dedupConfig)); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java index 9da636413..82d017864 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java @@ -1,7 +1,6 @@ package eu.dnetlib.dhp.broker.oa.util; -import java.util.List; import java.util.function.BiConsumer; import java.util.function.Function; @@ -10,14 +9,11 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.broker.objects.Instance; import eu.dnetlib.broker.objects.OpenAireEventPayload; +import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.broker.objects.Provenance; -import eu.dnetlib.broker.objects.Publication; import eu.dnetlib.dhp.broker.model.Topic; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; -import eu.dnetlib.dhp.schema.oaf.Instance; -import eu.dnetlib.dhp.schema.oaf.KeyValue; -import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.model.MapDocument; import eu.dnetlib.pace.tree.support.TreeProcessor; @@ -29,11 +25,11 @@ public final class UpdateInfo { private final T highlightValue; - private final ResultWithRelations source; + private final OpenaireBrokerResult source; - private final ResultWithRelations target; + private final OpenaireBrokerResult target; - private final BiConsumer compileHighlight; + private final BiConsumer compileHighlight; private final Function highlightToString; @@ -41,9 +37,9 @@ public final class UpdateInfo { private static final Logger log = LoggerFactory.getLogger(UpdateInfo.class); - public UpdateInfo(final Topic topic, final T highlightValue, final ResultWithRelations source, - final ResultWithRelations target, - final BiConsumer compileHighlight, + public UpdateInfo(final Topic topic, final T highlightValue, final OpenaireBrokerResult source, + final OpenaireBrokerResult target, + final BiConsumer compileHighlight, final Function highlightToString, final DedupConfig dedupConfig) { this.topic = topic; @@ -52,22 +48,23 @@ public final class UpdateInfo { this.target = target; this.compileHighlight = compileHighlight; this.highlightToString = highlightToString; - this.trust = calculateTrust(dedupConfig, source.getResult(), target.getResult()); + this.trust = calculateTrust(dedupConfig, source, target); } public T getHighlightValue() { return highlightValue; } - public ResultWithRelations getSource() { + public OpenaireBrokerResult getSource() { return source; } - public ResultWithRelations getTarget() { + public OpenaireBrokerResult getTarget() { return target; } - private float calculateTrust(final DedupConfig dedupConfig, final Result r1, final Result r2) { + private float calculateTrust(final DedupConfig dedupConfig, final OpenaireBrokerResult r1, + final OpenaireBrokerResult r2) { try { final ObjectMapper objectMapper = new ObjectMapper(); final MapDocument doc1 = MapDocumentUtil @@ -103,26 +100,18 @@ public final class UpdateInfo { public OpenAireEventPayload asBrokerPayload() { - final Publication p = ConversionUtils.oafResultToBrokerPublication(getSource().getResult()); - compileHighlight.accept(p, getHighlightValue()); + compileHighlight.accept(target, getHighlightValue()); - final Publication hl = new Publication(); + final OpenaireBrokerResult hl = new OpenaireBrokerResult(); compileHighlight.accept(hl, getHighlightValue()); - final String provId = getSource().getResult().getOriginalId().stream().findFirst().orElse(null); - final String provRepo = getSource() - .getResult() - .getCollectedfrom() - .stream() - .map(KeyValue::getValue) - .findFirst() - .orElse(null); + final String provId = getSource().getOriginalId(); + final String provRepo = getSource().getCollectedFromName(); + final String provUrl = getSource() - .getResult() - .getInstance() + .getInstances() .stream() .map(Instance::getUrl) - .flatMap(List::stream) .findFirst() .orElse(null); ; @@ -130,7 +119,7 @@ public final class UpdateInfo { final Provenance provenance = new Provenance().setId(provId).setRepositoryName(provRepo).setUrl(provUrl); return new OpenAireEventPayload() - .setPublication(p) + .setPublication(target) .setHighlight(hl) .setTrust(trust) .setProvenance(provenance); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java index 397f30660..dabe2bb4d 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java @@ -5,11 +5,11 @@ import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.expressions.Aggregator; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; +import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.dhp.schema.oaf.Relation; import scala.Tuple2; -public class ResultAggregator extends Aggregator, ResultGroup, ResultGroup> { +public class ResultAggregator extends Aggregator, ResultGroup, ResultGroup> { /** * @@ -22,7 +22,7 @@ public class ResultAggregator extends Aggregator t) { + public ResultGroup reduce(final ResultGroup group, final Tuple2 t) { return group.addElement(t._1); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultGroup.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultGroup.java index 81f189d62..4308224a5 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultGroup.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultGroup.java @@ -5,7 +5,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; +import eu.dnetlib.broker.objects.OpenaireBrokerResult; public class ResultGroup implements Serializable { @@ -14,13 +14,13 @@ public class ResultGroup implements Serializable { */ private static final long serialVersionUID = -3360828477088669296L; - private final List data = new ArrayList<>(); + private final List data = new ArrayList<>(); - public List getData() { + public List getData() { return data; } - public ResultGroup addElement(final ResultWithRelations elem) { + public ResultGroup addElement(final OpenaireBrokerResult elem) { data.add(elem); return this; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/OpenaireBrokerResultAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/OpenaireBrokerResultAggregator.java new file mode 100644 index 000000000..b44fbe367 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/OpenaireBrokerResultAggregator.java @@ -0,0 +1,69 @@ + +package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; + +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.expressions.Aggregator; + +import eu.dnetlib.broker.objects.OpenaireBrokerResult; +import scala.Tuple2; + +public class OpenaireBrokerResultAggregator + extends Aggregator, OpenaireBrokerResult, OpenaireBrokerResult> { + + /** + * + */ + private static final long serialVersionUID = -3687878788861013488L; + + @Override + public OpenaireBrokerResult zero() { + return new OpenaireBrokerResult(); + } + + @Override + public OpenaireBrokerResult finish(final OpenaireBrokerResult g) { + return g; + } + + @Override + public OpenaireBrokerResult reduce(final OpenaireBrokerResult g, final Tuple2 t) { + if (g.getOriginalId() == null) { + return t._1; + } else if (t._2 instanceof RelatedSoftware) { + g.getSoftwares().add(((RelatedSoftware) t._2).getRelSoftware()); + } else if (t._2 instanceof RelatedDataset) { + g.getDatasets().add(((RelatedDataset) t._2).getRelDataset()); + } else if (t._2 instanceof RelatedPublication) { + g.getPublications().add(((RelatedPublication) t._2).getRelPublication()); + } else if (t._2 instanceof RelatedProject) { + g.getProjects().add(((RelatedProject) t._2).getRelProject()); + } + return g; + + } + + @Override + public OpenaireBrokerResult merge(final OpenaireBrokerResult g1, final OpenaireBrokerResult g2) { + if (g1.getOriginalId() != null) { + g1.getSoftwares().addAll(g2.getSoftwares()); + g1.getDatasets().addAll(g2.getDatasets()); + g1.getPublications().addAll(g2.getPublications()); + g1.getProjects().addAll(g2.getProjects()); + return g1; + } else { + return g2; + } + } + + @Override + public Encoder bufferEncoder() { + return Encoders.kryo(OpenaireBrokerResult.class); + } + + @Override + public Encoder outputEncoder() { + return Encoders.kryo(OpenaireBrokerResult.class); + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java index 84cf693ad..fcf1b89b1 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java @@ -1,10 +1,16 @@ package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; -import eu.dnetlib.dhp.schema.oaf.Dataset; +import java.io.Serializable; -public class RelatedDataset { +import eu.dnetlib.broker.objects.Dataset; +public class RelatedDataset implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 774487705184038324L; private final String source; private final String relType; private final Dataset relDataset; diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedEntityFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedEntityFactory.java index 490724a44..08d57a1ea 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedEntityFactory.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedEntityFactory.java @@ -1,15 +1,17 @@ package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; -import eu.dnetlib.dhp.schema.oaf.Dataset; -import eu.dnetlib.dhp.schema.oaf.Project; -import eu.dnetlib.dhp.schema.oaf.Publication; -import eu.dnetlib.dhp.schema.oaf.Software; +import eu.dnetlib.broker.objects.Dataset; +import eu.dnetlib.broker.objects.Project; +import eu.dnetlib.broker.objects.Publication; +import eu.dnetlib.broker.objects.Software; public class RelatedEntityFactory { @SuppressWarnings("unchecked") - public static RT newRelatedEntity(final String sourceId, final String relType, final T target, + public static RT newRelatedEntity(final String sourceId, + final String relType, + final T target, final Class clazz) { if (clazz == RelatedProject.class) { return (RT) new RelatedProject(sourceId, relType, (Project) target); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProject.java index 2d16f5409..233041c09 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProject.java @@ -1,9 +1,16 @@ package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; -import eu.dnetlib.dhp.schema.oaf.Project; +import java.io.Serializable; -public class RelatedProject { +import eu.dnetlib.broker.objects.Project; + +public class RelatedProject implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 4941437626549329870L; private final String source; private final String relType; diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublication.java index f1545c004..80b92462d 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublication.java @@ -1,9 +1,16 @@ package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; -import eu.dnetlib.dhp.schema.oaf.Publication; +import java.io.Serializable; -public class RelatedPublication { +import eu.dnetlib.broker.objects.Publication; + +public class RelatedPublication implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 9021609640411395128L; private final String source; private final String relType; diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftware.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftware.java index e5873d263..13f1f4290 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftware.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftware.java @@ -1,10 +1,16 @@ package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; -import eu.dnetlib.dhp.schema.oaf.Software; +import java.io.Serializable; -public class RelatedSoftware { +import eu.dnetlib.broker.objects.Software; +public class RelatedSoftware implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 7573383356943300157L; private final String source; private final String relType; private final Software relSoftware; diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/ResultWithRelations.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/ResultWithRelations.java deleted file mode 100644 index 2d762aded..000000000 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/ResultWithRelations.java +++ /dev/null @@ -1,55 +0,0 @@ - -package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; - -import eu.dnetlib.dhp.schema.oaf.Result; - -public class ResultWithRelations implements Serializable { - - /** - * - */ - private static final long serialVersionUID = -1368401915974311571L; - - private Result result; - - private final List datasets = new ArrayList<>(); - private final List publications = new ArrayList<>(); - private final List softwares = new ArrayList<>(); - private final List projects = new ArrayList<>(); - - public ResultWithRelations() { - } - - public ResultWithRelations(final Result result) { - this.result = result; - } - - public Result getResult() { - return result; - } - - public List getDatasets() { - return datasets; - } - - public List getPublications() { - return publications; - } - - public List getSoftwares() { - return softwares; - } - - public List getProjects() { - return projects; - } - - public void setResult(final Result result) { - this.result = result; - } - -} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/ResultWithRelationsAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/ResultWithRelationsAggregator.java deleted file mode 100644 index b4922a64f..000000000 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/ResultWithRelationsAggregator.java +++ /dev/null @@ -1,68 +0,0 @@ - -package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; - -import org.apache.spark.sql.Encoder; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.expressions.Aggregator; - -import scala.Tuple2; - -public class ResultWithRelationsAggregator - extends Aggregator, ResultWithRelations, ResultWithRelations> { - - /** - * - */ - private static final long serialVersionUID = -3687878788861013488L; - - @Override - public ResultWithRelations zero() { - return new ResultWithRelations(); - } - - @Override - public ResultWithRelations finish(final ResultWithRelations g) { - return g; - } - - @Override - public ResultWithRelations reduce(final ResultWithRelations g, final Tuple2 t) { - if (g.getResult() == null) { - return t._1; - } else if (t._2 instanceof RelatedSoftware) { - g.getSoftwares().add((RelatedSoftware) t._2); - } else if (t._2 instanceof RelatedDataset) { - g.getDatasets().add((RelatedDataset) t._2); - } else if (t._2 instanceof RelatedPublication) { - g.getPublications().add((RelatedPublication) t._2); - } else if (t._2 instanceof RelatedProject) { - g.getProjects().add((RelatedProject) t._2); - } - return g; - - } - - @Override - public ResultWithRelations merge(final ResultWithRelations g1, final ResultWithRelations g2) { - if (g1.getResult() != null) { - g1.getSoftwares().addAll(g2.getSoftwares()); - g1.getDatasets().addAll(g2.getDatasets()); - g1.getPublications().addAll(g2.getPublications()); - g1.getProjects().addAll(g2.getProjects()); - return g1; - } else { - return g2; - } - } - - @Override - public Encoder bufferEncoder() { - return Encoders.kryo(ResultWithRelations.class); - } - - @Override - public Encoder outputEncoder() { - return Encoders.kryo(ResultWithRelations.class); - } - -} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/reflections/ReflectionTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/reflections/ReflectionTest.java new file mode 100644 index 000000000..110fabf45 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/reflections/ReflectionTest.java @@ -0,0 +1,200 @@ + +package eu.dnetlib.dhp.oa.graph.reflections; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; + +import org.junit.jupiter.api.Test; + +public class ReflectionTest { + + private final Cleaner cleaner = new Cleaner(); + + @Test + void testObject() throws Exception { + final Publication pub = new Publication(); + pub.setTitle("openaire guidelines"); + pub.getAuthors().add(new Author("Michele Artini", new Prop("aa-001", "orcid"))); + pub.getAuthors().add(new Author("Claudio Atzori", new Prop("aa-002", "orcid"))); + pub.getAuthors().add(new Author("Alessia Bardi", new Prop("aa-003", "orcid"))); + pub.getSubjects().add(new Prop("infrastructures", "keyword")); + pub.getSubjects().add(new Prop("digital libraries", "keyword")); + + cleaner.clean(pub); + + System.out.println(pub); + + assertEquals("OPENAIRE GUIDELINES", pub.getTitle()); + + assertEquals("MICHELE ARTINI", pub.getAuthors().get(0).getName()); + assertEquals("CLAUDIO ATZORI", pub.getAuthors().get(1).getName()); + assertEquals("ALESSIA BARDI", pub.getAuthors().get(2).getName()); + + assertEquals("dnet:aa-001", pub.getAuthors().get(0).getId().getId()); + assertEquals("dnet:aa-002", pub.getAuthors().get(1).getId().getId()); + assertEquals("dnet:aa-003", pub.getAuthors().get(2).getId().getId()); + assertEquals("dnet:orcid", pub.getAuthors().get(0).getId().getName()); + assertEquals("dnet:orcid", pub.getAuthors().get(1).getId().getName()); + assertEquals("dnet:orcid", pub.getAuthors().get(2).getId().getName()); + + assertEquals("dnet:infrastructures", pub.getSubjects().get(0).getId()); + assertEquals("dnet:keyword", pub.getSubjects().get(0).getName()); + assertEquals("dnet:digital libraries", pub.getSubjects().get(1).getId()); + assertEquals("dnet:keyword", pub.getSubjects().get(1).getName()); + } + +} + +class Cleaner { + + public void clean(final Object o) throws IllegalArgumentException, IllegalAccessException { + if (isPrimitive(o)) { + return; + } else if (isIterable(o.getClass())) { + for (final Object elem : (Iterable) o) { + clean(elem); + } + } else if (hasMapping(o)) { + mapObject(o); + } else { + for (final Field f : o.getClass().getDeclaredFields()) { + f.setAccessible(true); + final Object val = f.get(o); + if (isPrimitive(val)) { + f.set(o, cleanValue(f.get(o))); + } else if (hasMapping(val)) { + mapObject(val); + } else { + clean(f.get(o)); + } + } + } + } + + private boolean hasMapping(final Object o) { + return o.getClass() == Prop.class; + } + + private void mapObject(final Object o) { + if (o.getClass() == Prop.class) { + ((Prop) o).setId("dnet:" + ((Prop) o).getId()); + ((Prop) o).setName("dnet:" + ((Prop) o).getName()); + } + + } + + private Object cleanValue(final Object o) { + if (o.getClass() == String.class) { + return ((String) o).toUpperCase(); + } else { + return o; + } + + } + + private boolean isIterable(final Class cl) { + return Iterable.class.isAssignableFrom(cl); + } + + private boolean isPrimitive(final Object o) { + return o.getClass() == String.class; + } +} + +class Publication { + + private String title; + private final List authors = new ArrayList<>(); + private final List subjects = new ArrayList<>(); + + public String getTitle() { + return title; + } + + public void setTitle(final String title) { + this.title = title; + } + + public List getAuthors() { + return authors; + } + + public List getSubjects() { + return subjects; + } + + @Override + public String toString() { + return String.format("Publication [title=%s, authors=%s, subjects=%s]", title, authors, subjects); + } + +} + +class Prop { + + private String id; + private String name; + + public Prop(final String id, final String name) { + this.id = id; + this.name = name; + } + + public String getId() { + return id; + } + + public void setId(final String id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(final String name) { + this.name = name; + } + + @Override + public String toString() { + return String.format("Prop [id=%s, name=%s]", id, name); + } + +} + +class Author { + + private String name; + private Prop id; + + public Author(final String name, final Prop id) { + this.name = name; + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(final String name) { + this.name = name; + } + + public Prop getId() { + return id; + } + + public void setId(final Prop id) { + this.id = id; + } + + @Override + public String toString() { + return String.format("Author [name=%s, id=%s]", name, id); + } + +}