diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java index ae72e83ce..0673e7810 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java @@ -28,8 +28,11 @@ import eu.dnetlib.dhp.broker.oa.util.EventGroup; import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator; import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OpenaireBrokerResultAggregator; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Publication; @@ -74,8 +77,8 @@ public class GenerateEventsApplication { log.info("dedupConfigProfileId: {}", dedupConfigProfileId); final SparkConf conf = new SparkConf(); - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(BrokerConstants.getModelClasses()); + // conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + // conf.registerKryoClasses(BrokerConstants.getModelClasses()); // TODO UNCOMMENT // final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId); @@ -85,17 +88,24 @@ public class GenerateEventsApplication { removeOutputDir(spark, eventsPath); - // TODO UNCOMMENT - spark - .emptyDataset(Encoders.kryo(Event.class)) - .union(generateEvents(spark, graphPath, Publication.class, dedupConfig)) - // .union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig)) - // .union(generateEvents(spark, graphPath, Software.class, dedupConfig)) - // .union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig)) + // TODO REMOVE THIS + expandResultsWithRelations(spark, graphPath, Publication.class) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(eventsPath); + + // TODO UNCOMMENT THIS + // spark + // .emptyDataset(Encoders.bean(Event.class)) + // .union(generateEvents(spark, graphPath, Publication.class, dedupConfig)) + // .union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig)) + // .union(generateEvents(spark, graphPath, Software.class, dedupConfig)) + // .union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig)) + // .write() + // .mode(SaveMode.Overwrite) + // .option("compression", "gzip") + // .json(eventsPath); }); } @@ -123,12 +133,12 @@ public class GenerateEventsApplication { .groupByKey( (MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) .agg(aggr) - .map((MapFunction, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class)) + .map((MapFunction, ResultGroup>) t -> t._2, Encoders.bean(ResultGroup.class)) .filter(ResultGroup::isValid) .map( (MapFunction) g -> EventFinder.generateEvents(g, dedupConfig), - Encoders.kryo(EventGroup.class)) - .flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class)); + Encoders.bean(EventGroup.class)) + .flatMap(group -> group.getData().iterator(), Encoders.bean(Event.class)); } private static Dataset expandResultsWithRelations( @@ -147,15 +157,16 @@ public class GenerateEventsApplication { .cache(); final Dataset r0 = readPath( - spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class) + spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass) .filter(r -> r.getDataInfo().getDeletedbyinference()) - .map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class)); + .map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OpenaireBrokerResult.class)); final Dataset r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class)); - final Dataset r2 = join(r1, rels, relatedEntities(softwares, rels, RelatedProject.class)); - final Dataset r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedProject.class)); + final Dataset r2 = join( + r1, rels, relatedEntities(softwares, rels, RelatedSoftware.class)); + final Dataset r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedDataset.class)); final Dataset r4 = join( - r3, rels, relatedEntities(publications, rels, RelatedProject.class)); + r3, rels, relatedEntities(publications, rels, RelatedPublication.class)); ; return r4; @@ -168,7 +179,7 @@ public class GenerateEventsApplication { .joinWith(targets, targets.col("id").equalTo(rels.col("target")), "inner") .map( t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz), - Encoders.kryo(clazz)); + Encoders.bean(clazz)); } private static Dataset join(final Dataset sources, @@ -184,7 +195,7 @@ public class GenerateEventsApplication { .groupByKey( (MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) .agg(aggr) - .map(t -> t._2, Encoders.kryo(OpenaireBrokerResult.class)); + .map(t -> t._2, Encoders.bean(OpenaireBrokerResult.class)); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java index dabe2bb4d..747482198 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java @@ -38,13 +38,13 @@ public class ResultAggregator extends Aggregator bufferEncoder() { - return Encoders.kryo(ResultGroup.class); + return Encoders.bean(ResultGroup.class); } @Override public Encoder outputEncoder() { - return Encoders.kryo(ResultGroup.class); + return Encoders.bean(ResultGroup.class); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/OpenaireBrokerResultAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/OpenaireBrokerResultAggregator.java index b44fbe367..e72dcb988 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/OpenaireBrokerResultAggregator.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/OpenaireBrokerResultAggregator.java @@ -58,12 +58,12 @@ public class OpenaireBrokerResultAggregator @Override public Encoder bufferEncoder() { - return Encoders.kryo(OpenaireBrokerResult.class); + return Encoders.bean(OpenaireBrokerResult.class); } @Override public Encoder outputEncoder() { - return Encoders.kryo(OpenaireBrokerResult.class); + return Encoders.bean(OpenaireBrokerResult.class); } }