diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java index d5e577972d..045104720c 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java @@ -57,7 +57,6 @@ import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSubject; import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Relation; @@ -87,25 +86,32 @@ public class GenerateEventsApplication { private static final UpdateMatcher>, ?> enrichMoreSoftware = new EnrichMoreSoftware(); private static final UpdateMatcher>, ?> enrichMisissingPublicationIsRelatedTo = new EnrichMissingPublicationIsRelatedTo(); - private static final UpdateMatcher>, ?> enrichMissingPublicationIsReferencedBy = new EnrichMissingPublicationIsReferencedBy(); + private static final UpdateMatcher>, ?> enrichMissingPublicationIsReferencedBy = + new EnrichMissingPublicationIsReferencedBy(); private static final UpdateMatcher>, ?> enrichMissingPublicationReferences = new EnrichMissingPublicationReferences(); - private static final UpdateMatcher>, ?> enrichMissingPublicationIsSupplementedTo = new EnrichMissingPublicationIsSupplementedTo(); - private static final UpdateMatcher>, ?> enrichMissingPublicationIsSupplementedBy = new EnrichMissingPublicationIsSupplementedBy(); + private static final UpdateMatcher>, ?> enrichMissingPublicationIsSupplementedTo = + new EnrichMissingPublicationIsSupplementedTo(); + private static final UpdateMatcher>, ?> enrichMissingPublicationIsSupplementedBy = + new EnrichMissingPublicationIsSupplementedBy(); - private static final UpdateMatcher>, ?> enrichMisissingDatasetIsRelatedTo = new EnrichMissingDatasetIsRelatedTo(); - private static final UpdateMatcher>, ?> enrichMissingDatasetIsReferencedBy = new EnrichMissingDatasetIsReferencedBy(); - private static final UpdateMatcher>, ?> enrichMissingDatasetReferences = new EnrichMissingDatasetReferences(); - private static final UpdateMatcher>, ?> enrichMissingDatasetIsSupplementedTo = new EnrichMissingDatasetIsSupplementedTo(); - private static final UpdateMatcher>, ?> enrichMissingDatasetIsSupplementedBy = new EnrichMissingDatasetIsSupplementedBy(); + private static final UpdateMatcher>, ?> enrichMisissingDatasetIsRelatedTo = + new EnrichMissingDatasetIsRelatedTo(); + private static final UpdateMatcher>, ?> enrichMissingDatasetIsReferencedBy = + new EnrichMissingDatasetIsReferencedBy(); + private static final UpdateMatcher>, ?> enrichMissingDatasetReferences = + new EnrichMissingDatasetReferences(); + private static final UpdateMatcher>, ?> enrichMissingDatasetIsSupplementedTo = + new EnrichMissingDatasetIsSupplementedTo(); + private static final UpdateMatcher>, ?> enrichMissingDatasetIsSupplementedBy = + new EnrichMissingDatasetIsSupplementedBy(); public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - GenerateEventsApplication.class - .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json"))); + .toString(GenerateEventsApplication.class + .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -128,10 +134,13 @@ public class GenerateEventsApplication { final JavaRDD eventsRdd = sc.emptyRDD(); - eventsRdd.union(generateSimpleEvents(spark, graphPath, Publication.class)); - eventsRdd.union(generateSimpleEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class)); - eventsRdd.union(generateSimpleEvents(spark, graphPath, Software.class)); - eventsRdd.union(generateSimpleEvents(spark, graphPath, OtherResearchProduct.class)); + for (final Class r1 : BrokerConstants.RESULT_CLASSES) { + eventsRdd.union(generateSimpleEvents(spark, graphPath, r1)); + + for (final Class r2 : BrokerConstants.RESULT_CLASSES) { + eventsRdd.union(generateRelationEvents(spark, graphPath, r1, r2)); + } + } eventsRdd.saveAsTextFile(eventsPath, GzipCodec.class); }); @@ -146,9 +155,8 @@ public class GenerateEventsApplication { final String graphPath, final Class resultClazz) { - final Dataset results = readPath( - spark, graphPath + "/" + resultClazz.getSimpleName().toLowerCase(), resultClazz) - .filter(r -> r.getDataInfo().getDeletedbyinference()); + final Dataset results = readPath(spark, graphPath + "/" + resultClazz.getSimpleName().toLowerCase(), resultClazz) + .filter(r -> r.getDataInfo().getDeletedbyinference()); final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class) .filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)); @@ -169,6 +177,14 @@ public class GenerateEventsApplication { } + private static JavaRDD generateRelationEvents(final SparkSession spark, + final String graphPath, + final Class sourceClass, + final Class targetClass) { + // TODO Auto-generated method stub + return null; + } + private List generateSimpleEvents(final Collection children) { final List> list = new ArrayList<>(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/BrokerConstants.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/BrokerConstants.java index 8e97192bfb..06a46b7dab 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/BrokerConstants.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/BrokerConstants.java @@ -1,9 +1,21 @@ package eu.dnetlib.dhp.broker.oa.util; +import java.util.Arrays; +import java.util.List; + +import eu.dnetlib.dhp.schema.oaf.Dataset; +import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; +import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.schema.oaf.Software; + public class BrokerConstants { - public final static String OPEN_ACCESS = "OPEN"; - public final static String IS_MERGED_IN_CLASS = "isMergedIn"; + public static final String OPEN_ACCESS = "OPEN"; + public static final String IS_MERGED_IN_CLASS = "isMergedIn"; + + public static final List> RESULT_CLASSES = + Arrays.asList(Publication.class, Dataset.class, Software.class, OtherResearchProduct.class); }