forked from D-Net/dnet-hadoop
removed kryo encoding
This commit is contained in:
parent
8d2b199dd2
commit
61634fbfe0
|
@ -28,8 +28,11 @@ import eu.dnetlib.dhp.broker.oa.util.EventGroup;
|
||||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator;
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator;
|
||||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
|
||||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OpenaireBrokerResultAggregator;
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OpenaireBrokerResultAggregator;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset;
|
||||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory;
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory;
|
||||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware;
|
||||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||||
|
@ -74,8 +77,8 @@ public class GenerateEventsApplication {
|
||||||
log.info("dedupConfigProfileId: {}", dedupConfigProfileId);
|
log.info("dedupConfigProfileId: {}", dedupConfigProfileId);
|
||||||
|
|
||||||
final SparkConf conf = new SparkConf();
|
final SparkConf conf = new SparkConf();
|
||||||
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
// conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
||||||
conf.registerKryoClasses(BrokerConstants.getModelClasses());
|
// conf.registerKryoClasses(BrokerConstants.getModelClasses());
|
||||||
|
|
||||||
// TODO UNCOMMENT
|
// TODO UNCOMMENT
|
||||||
// final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId);
|
// final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId);
|
||||||
|
@ -85,17 +88,24 @@ public class GenerateEventsApplication {
|
||||||
|
|
||||||
removeOutputDir(spark, eventsPath);
|
removeOutputDir(spark, eventsPath);
|
||||||
|
|
||||||
// TODO UNCOMMENT
|
// TODO REMOVE THIS
|
||||||
spark
|
expandResultsWithRelations(spark, graphPath, Publication.class)
|
||||||
.emptyDataset(Encoders.kryo(Event.class))
|
|
||||||
.union(generateEvents(spark, graphPath, Publication.class, dedupConfig))
|
|
||||||
// .union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig))
|
|
||||||
// .union(generateEvents(spark, graphPath, Software.class, dedupConfig))
|
|
||||||
// .union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig))
|
|
||||||
.write()
|
.write()
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
.json(eventsPath);
|
.json(eventsPath);
|
||||||
|
|
||||||
|
// TODO UNCOMMENT THIS
|
||||||
|
// spark
|
||||||
|
// .emptyDataset(Encoders.bean(Event.class))
|
||||||
|
// .union(generateEvents(spark, graphPath, Publication.class, dedupConfig))
|
||||||
|
// .union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig))
|
||||||
|
// .union(generateEvents(spark, graphPath, Software.class, dedupConfig))
|
||||||
|
// .union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig))
|
||||||
|
// .write()
|
||||||
|
// .mode(SaveMode.Overwrite)
|
||||||
|
// .option("compression", "gzip")
|
||||||
|
// .json(eventsPath);
|
||||||
});
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -123,12 +133,12 @@ public class GenerateEventsApplication {
|
||||||
.groupByKey(
|
.groupByKey(
|
||||||
(MapFunction<Tuple2<OpenaireBrokerResult, Relation>, String>) t -> t._2.getTarget(), Encoders.STRING())
|
(MapFunction<Tuple2<OpenaireBrokerResult, Relation>, String>) t -> t._2.getTarget(), Encoders.STRING())
|
||||||
.agg(aggr)
|
.agg(aggr)
|
||||||
.map((MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class))
|
.map((MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.bean(ResultGroup.class))
|
||||||
.filter(ResultGroup::isValid)
|
.filter(ResultGroup::isValid)
|
||||||
.map(
|
.map(
|
||||||
(MapFunction<ResultGroup, EventGroup>) g -> EventFinder.generateEvents(g, dedupConfig),
|
(MapFunction<ResultGroup, EventGroup>) g -> EventFinder.generateEvents(g, dedupConfig),
|
||||||
Encoders.kryo(EventGroup.class))
|
Encoders.bean(EventGroup.class))
|
||||||
.flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class));
|
.flatMap(group -> group.getData().iterator(), Encoders.bean(Event.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <SRC extends Result> Dataset<OpenaireBrokerResult> expandResultsWithRelations(
|
private static <SRC extends Result> Dataset<OpenaireBrokerResult> expandResultsWithRelations(
|
||||||
|
@ -147,15 +157,16 @@ public class GenerateEventsApplication {
|
||||||
.cache();
|
.cache();
|
||||||
|
|
||||||
final Dataset<OpenaireBrokerResult> r0 = readPath(
|
final Dataset<OpenaireBrokerResult> r0 = readPath(
|
||||||
spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class)
|
spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass)
|
||||||
.filter(r -> r.getDataInfo().getDeletedbyinference())
|
.filter(r -> r.getDataInfo().getDeletedbyinference())
|
||||||
.map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class));
|
.map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OpenaireBrokerResult.class));
|
||||||
|
|
||||||
final Dataset<OpenaireBrokerResult> r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class));
|
final Dataset<OpenaireBrokerResult> r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class));
|
||||||
final Dataset<OpenaireBrokerResult> r2 = join(r1, rels, relatedEntities(softwares, rels, RelatedProject.class));
|
final Dataset<OpenaireBrokerResult> r2 = join(
|
||||||
final Dataset<OpenaireBrokerResult> r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedProject.class));
|
r1, rels, relatedEntities(softwares, rels, RelatedSoftware.class));
|
||||||
|
final Dataset<OpenaireBrokerResult> r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedDataset.class));
|
||||||
final Dataset<OpenaireBrokerResult> r4 = join(
|
final Dataset<OpenaireBrokerResult> r4 = join(
|
||||||
r3, rels, relatedEntities(publications, rels, RelatedProject.class));
|
r3, rels, relatedEntities(publications, rels, RelatedPublication.class));
|
||||||
;
|
;
|
||||||
|
|
||||||
return r4;
|
return r4;
|
||||||
|
@ -168,7 +179,7 @@ public class GenerateEventsApplication {
|
||||||
.joinWith(targets, targets.col("id").equalTo(rels.col("target")), "inner")
|
.joinWith(targets, targets.col("id").equalTo(rels.col("target")), "inner")
|
||||||
.map(
|
.map(
|
||||||
t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz),
|
t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz),
|
||||||
Encoders.kryo(clazz));
|
Encoders.bean(clazz));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <T> Dataset<OpenaireBrokerResult> join(final Dataset<OpenaireBrokerResult> sources,
|
private static <T> Dataset<OpenaireBrokerResult> join(final Dataset<OpenaireBrokerResult> sources,
|
||||||
|
@ -184,7 +195,7 @@ public class GenerateEventsApplication {
|
||||||
.groupByKey(
|
.groupByKey(
|
||||||
(MapFunction<Tuple2<OpenaireBrokerResult, T>, String>) t -> t._1.getOpenaireId(), Encoders.STRING())
|
(MapFunction<Tuple2<OpenaireBrokerResult, T>, String>) t -> t._1.getOpenaireId(), Encoders.STRING())
|
||||||
.agg(aggr)
|
.agg(aggr)
|
||||||
.map(t -> t._2, Encoders.kryo(OpenaireBrokerResult.class));
|
.map(t -> t._2, Encoders.bean(OpenaireBrokerResult.class));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -38,13 +38,13 @@ public class ResultAggregator extends Aggregator<Tuple2<OpenaireBrokerResult, Re
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Encoder<ResultGroup> bufferEncoder() {
|
public Encoder<ResultGroup> bufferEncoder() {
|
||||||
return Encoders.kryo(ResultGroup.class);
|
return Encoders.bean(ResultGroup.class);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Encoder<ResultGroup> outputEncoder() {
|
public Encoder<ResultGroup> outputEncoder() {
|
||||||
return Encoders.kryo(ResultGroup.class);
|
return Encoders.bean(ResultGroup.class);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -58,12 +58,12 @@ public class OpenaireBrokerResultAggregator<T>
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Encoder<OpenaireBrokerResult> bufferEncoder() {
|
public Encoder<OpenaireBrokerResult> bufferEncoder() {
|
||||||
return Encoders.kryo(OpenaireBrokerResult.class);
|
return Encoders.bean(OpenaireBrokerResult.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Encoder<OpenaireBrokerResult> outputEncoder() {
|
public Encoder<OpenaireBrokerResult> outputEncoder() {
|
||||||
return Encoders.kryo(OpenaireBrokerResult.class);
|
return Encoders.bean(OpenaireBrokerResult.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue