join simrels

This commit is contained in:
Michele Artini 2020-06-08 16:26:16 +02:00
parent a73973a74b
commit 81e85465d8
4 changed files with 153 additions and 38 deletions

View File

@ -12,16 +12,13 @@ import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.TypedColumn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -55,6 +52,9 @@ import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreOpenAccess;
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMorePid;
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSubject;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
import eu.dnetlib.dhp.broker.oa.util.EventGroup;
import eu.dnetlib.dhp.broker.oa.util.ResultAggregator;
import eu.dnetlib.dhp.broker.oa.util.ResultGroup;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
@ -63,6 +63,7 @@ import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
import scala.Tuple2;
public class GenerateEventsApplication {
@ -130,20 +131,20 @@ public class GenerateEventsApplication {
final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
removeOutputDir(spark, eventsPath);
final JavaRDD<Event> eventsRdd = sc.emptyRDD();
final Dataset<Event> all = spark.emptyDataset(Encoders.kryo(Event.class));
for (final Class<? extends Result> r1 : BrokerConstants.RESULT_CLASSES) {
eventsRdd.union(generateSimpleEvents(spark, graphPath, r1));
all.union(generateSimpleEvents(spark, graphPath, r1));
for (final Class<? extends Result> r2 : BrokerConstants.RESULT_CLASSES) {
eventsRdd.union(generateRelationEvents(spark, graphPath, r1, r2));
all.union(generateRelationEvents(spark, graphPath, r1, r2));
}
}
eventsRdd.saveAsTextFile(eventsPath, GzipCodec.class);
all.write().mode(SaveMode.Overwrite).json(eventsPath);
});
}
@ -152,51 +153,48 @@ public class GenerateEventsApplication {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
private static <R extends Result> JavaRDD<Event> generateSimpleEvents(final SparkSession spark,
private static <R extends Result> Dataset<Event> generateSimpleEvents(final SparkSession spark,
final String graphPath,
final Class<R> resultClazz) {
final Dataset<R> results = readPath(spark, graphPath + "/" + resultClazz.getSimpleName().toLowerCase(), resultClazz)
final Dataset<Result> results = readPath(spark, graphPath + "/" + resultClazz.getSimpleName().toLowerCase(), Result.class)
.filter(r -> r.getDataInfo().getDeletedbyinference());
final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
final Column c = null; // TODO
final Dataset<Row> aa = results
.joinWith(rels, results.col("id").equalTo(rels.col("source")), "inner")
.groupBy(rels.col("target"))
.agg(c)
.filter(x -> x.size() > 1)
// generateSimpleEvents(...)
// flatMap()
// toRdd()
;
return null;
final TypedColumn<Tuple2<Result, Relation>, ResultGroup> aggr = new ResultAggregator().toColumn();
return results.joinWith(rels, results.col("id").equalTo(rels.col("source")), "inner")
.groupByKey((MapFunction<Tuple2<Result, Relation>, String>) t -> t._2.getTarget(), Encoders.STRING())
.agg(aggr)
.map((MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class))
.filter(ResultGroup::isValid)
.map((MapFunction<ResultGroup, EventGroup>) g -> GenerateEventsApplication.generateSimpleEvents(g), Encoders.kryo(EventGroup.class))
.flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class));
}
private List<Event> generateSimpleEvents(final Collection<Result> children) {
private static EventGroup generateSimpleEvents(final ResultGroup results) {
final List<UpdateInfo<?>> list = new ArrayList<>();
for (final Result target : children) {
list.addAll(enrichMissingAbstract.searchUpdatesForRecord(target, children));
list.addAll(enrichMissingAuthorOrcid.searchUpdatesForRecord(target, children));
list.addAll(enrichMissingOpenAccess.searchUpdatesForRecord(target, children));
list.addAll(enrichMissingPid.searchUpdatesForRecord(target, children));
list.addAll(enrichMissingPublicationDate.searchUpdatesForRecord(target, children));
list.addAll(enrichMissingSubject.searchUpdatesForRecord(target, children));
list.addAll(enrichMoreOpenAccess.searchUpdatesForRecord(target, children));
list.addAll(enrichMorePid.searchUpdatesForRecord(target, children));
list.addAll(enrichMoreSubject.searchUpdatesForRecord(target, children));
for (final Result target : results.getData()) {
list.addAll(enrichMissingAbstract.searchUpdatesForRecord(target, results.getData()));
list.addAll(enrichMissingAuthorOrcid.searchUpdatesForRecord(target, results.getData()));
list.addAll(enrichMissingOpenAccess.searchUpdatesForRecord(target, results.getData()));
list.addAll(enrichMissingPid.searchUpdatesForRecord(target, results.getData()));
list.addAll(enrichMissingPublicationDate.searchUpdatesForRecord(target, results.getData()));
list.addAll(enrichMissingSubject.searchUpdatesForRecord(target, results.getData()));
list.addAll(enrichMoreOpenAccess.searchUpdatesForRecord(target, results.getData()));
list.addAll(enrichMorePid.searchUpdatesForRecord(target, results.getData()));
list.addAll(enrichMoreSubject.searchUpdatesForRecord(target, results.getData()));
}
return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList());
final EventGroup events = new EventGroup();
list.stream().map(EventFactory::newBrokerEvent).forEach(events::addElement);
return events;
}
private static <SRC extends Result, TRG extends OafEntity> JavaRDD<Event> generateRelationEvents(final SparkSession spark,
private static <SRC extends Result, TRG extends OafEntity> Dataset<Event> generateRelationEvents(final SparkSession spark,
final String graphPath,
final Class<SRC> sourceClass,
final Class<TRG> targetClass) {

View File

@ -0,0 +1,32 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import eu.dnetlib.dhp.broker.model.Event;
public class EventGroup implements Serializable {
/**
*
*/
private static final long serialVersionUID = 765977943803533130L;
private final List<Event> data = new ArrayList<>();
public List<Event> getData() {
return data;
}
public EventGroup addElement(final Event elem) {
data.add(elem);
return this;
}
public EventGroup addGroup(final EventGroup group) {
data.addAll(group.getData());
return this;
}
}

View File

@ -0,0 +1,50 @@
package eu.dnetlib.dhp.broker.oa.util;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import scala.Tuple2;
public class ResultAggregator extends Aggregator<Tuple2<Result, Relation>, ResultGroup, ResultGroup> {
/**
*
*/
private static final long serialVersionUID = -1492327874705585538L;
@Override
public ResultGroup zero() {
return new ResultGroup();
}
@Override
public ResultGroup reduce(final ResultGroup group, final Tuple2<Result, Relation> t) {
return group.addElement(t._1);
}
@Override
public ResultGroup merge(final ResultGroup g1, final ResultGroup g2) {
return g1.addGroup(g2);
}
@Override
public ResultGroup finish(final ResultGroup group) {
return group;
}
@Override
public Encoder<ResultGroup> bufferEncoder() {
return Encoders.kryo(ResultGroup.class);
}
@Override
public Encoder<ResultGroup> outputEncoder() {
return Encoders.kryo(ResultGroup.class);
}
}

View File

@ -0,0 +1,35 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import eu.dnetlib.dhp.schema.oaf.Result;
public class ResultGroup implements Serializable {
/**
*
*/
private static final long serialVersionUID = -3360828477088669296L;
private final List<Result> data = new ArrayList<>();
public List<Result> getData() {
return data;
}
public ResultGroup addElement(final Result elem) {
data.add(elem);
return this;
}
public ResultGroup addGroup(final ResultGroup group) {
data.addAll(group.getData());
return this;
}
public boolean isValid() {
return data.size() > 1;
}
}