This commit is contained in:
Michele Artini 2020-06-18 14:49:13 +02:00
parent 61634fbfe0
commit 52f62d5d8c
1 changed files with 10 additions and 13 deletions

View File

@ -28,11 +28,7 @@ import eu.dnetlib.dhp.broker.oa.util.EventGroup;
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator; import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator;
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup; import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OpenaireBrokerResultAggregator; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OpenaireBrokerResultAggregator;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Publication;
@ -92,7 +88,6 @@ public class GenerateEventsApplication {
expandResultsWithRelations(spark, graphPath, Publication.class) expandResultsWithRelations(spark, graphPath, Publication.class)
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(eventsPath); .json(eventsPath);
// TODO UNCOMMENT THIS // TODO UNCOMMENT THIS
@ -161,15 +156,17 @@ public class GenerateEventsApplication {
.filter(r -> r.getDataInfo().getDeletedbyinference()) .filter(r -> r.getDataInfo().getDeletedbyinference())
.map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OpenaireBrokerResult.class)); .map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OpenaireBrokerResult.class));
final Dataset<OpenaireBrokerResult> r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class)); // TODO UNCOMMENT THIS
final Dataset<OpenaireBrokerResult> r2 = join( // final Dataset<OpenaireBrokerResult> r1 = join(r0, rels, relatedEntities(projects, rels,
r1, rels, relatedEntities(softwares, rels, RelatedSoftware.class)); // RelatedProject.class));
final Dataset<OpenaireBrokerResult> r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedDataset.class)); // final Dataset<OpenaireBrokerResult> r2 = join(r1, rels, relatedEntities(softwares, rels,
final Dataset<OpenaireBrokerResult> r4 = join( // RelatedSoftware.class));
r3, rels, relatedEntities(publications, rels, RelatedPublication.class)); // final Dataset<OpenaireBrokerResult> r3 = join(r2, rels, relatedEntities(datasets, rels,
; // RelatedDataset.class));
// final Dataset<OpenaireBrokerResult> r4 = join(r3, rels, relatedEntities(publications, rels,
// RelatedPublication.class));;
return r4; return r0; // TODO it should be r4
} }
private static <T, RT> Dataset<RT> relatedEntities(final Dataset<T> targets, private static <T, RT> Dataset<RT> relatedEntities(final Dataset<T> targets,