some fixes

This commit is contained in:
Michele Artini 2020-06-19 13:53:56 +02:00
parent 834f139e6e
commit 4822747313
8 changed files with 143 additions and 91 deletions

View File

@ -29,8 +29,9 @@ import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator;
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup; import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OpenaireBrokerResultAggregator; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OpenaireBrokerResultAggregator;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.dhp.utils.ISLookupClientFactory;
@ -83,9 +84,11 @@ public class GenerateEventsApplication {
removeOutputDir(spark, eventsPath); removeOutputDir(spark, eventsPath);
// TODO REMOVE THIS // TODO REMOVE THIS
readPath(spark, graphPath + "/publication", Publication.class) final Dataset<Project> projects = readPath(spark, graphPath + "/project", Project.class);
.filter(r -> r.getDataInfo().getDeletedbyinference()) final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class)
.map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OpenaireBrokerResult.class)) .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
.cache();
relatedEntities(projects, rels, RelatedProject.class)
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.json(eventsPath); .json(eventsPath);
@ -129,7 +132,7 @@ public class GenerateEventsApplication {
(MapFunction<Tuple2<OpenaireBrokerResult, Relation>, String>) t -> t._2.getTarget(), Encoders.STRING()) (MapFunction<Tuple2<OpenaireBrokerResult, Relation>, String>) t -> t._2.getTarget(), Encoders.STRING())
.agg(aggr) .agg(aggr)
.map((MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.bean(ResultGroup.class)) .map((MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.bean(ResultGroup.class))
.filter(ResultGroup::isValid) .filter(rg -> rg.getData().size() > 1)
.map( .map(
(MapFunction<ResultGroup, EventGroup>) g -> EventFinder.generateEvents(g, dedupConfig), (MapFunction<ResultGroup, EventGroup>) g -> EventFinder.generateEvents(g, dedupConfig),
Encoders.bean(EventGroup.class)) Encoders.bean(EventGroup.class))
@ -141,15 +144,15 @@ public class GenerateEventsApplication {
final String graphPath, final String graphPath,
final Class<SRC> sourceClass) { final Class<SRC> sourceClass) {
// final Dataset<Project> projects = readPath(spark, graphPath + "/project", Project.class); final Dataset<Project> projects = readPath(spark, graphPath + "/project", Project.class);
// final Dataset<eu.dnetlib.dhp.schema.oaf.Dataset> datasets = readPath( // final Dataset<eu.dnetlib.dhp.schema.oaf.Dataset> datasets = readPath(
// spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); // spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
// final Dataset<Software> softwares = readPath(spark, graphPath + "/software", Software.class); // final Dataset<Software> softwares = readPath(spark, graphPath + "/software", Software.class);
// final Dataset<Publication> publications = readPath(spark, graphPath + "/publication", Publication.class); // final Dataset<Publication> publications = readPath(spark, graphPath + "/publication", Publication.class);
// final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class) final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class)
// .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
// .cache(); .cache();
final Dataset<OpenaireBrokerResult> r0 = readPath( final Dataset<OpenaireBrokerResult> r0 = readPath(
spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass) spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass)
@ -157,8 +160,7 @@ public class GenerateEventsApplication {
.map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OpenaireBrokerResult.class)); .map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OpenaireBrokerResult.class));
// TODO UNCOMMENT THIS // TODO UNCOMMENT THIS
// final Dataset<OpenaireBrokerResult> r1 = join(r0, rels, relatedEntities(projects, rels, final Dataset<OpenaireBrokerResult> r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class));
// RelatedProject.class));
// final Dataset<OpenaireBrokerResult> r2 = join(r1, rels, relatedEntities(softwares, rels, // final Dataset<OpenaireBrokerResult> r2 = join(r1, rels, relatedEntities(softwares, rels,
// RelatedSoftware.class)); // RelatedSoftware.class));
// final Dataset<OpenaireBrokerResult> r3 = join(r2, rels, relatedEntities(datasets, rels, // final Dataset<OpenaireBrokerResult> r3 = join(r2, rels, relatedEntities(datasets, rels,

View File

@ -7,7 +7,6 @@ import java.util.Objects;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.dom4j.Document; import org.dom4j.Document;
import org.dom4j.DocumentException; import org.dom4j.DocumentException;
import org.dom4j.DocumentHelper; import org.dom4j.DocumentHelper;
@ -59,10 +58,6 @@ public class ConversionUtils {
return sp != null ? new TypedValue(classId(sp.getQualifier()), sp.getValue()) : null; return sp != null ? new TypedValue(classId(sp.getQualifier()), sp.getValue()) : null;
} }
public static final Pair<String, String> oafSubjectToPair(final StructuredProperty sp) {
return sp != null ? Pair.of(classId(sp.getQualifier()), sp.getValue()) : null;
}
public static final eu.dnetlib.broker.objects.Dataset oafDatasetToBrokerDataset(final Dataset d) { public static final eu.dnetlib.broker.objects.Dataset oafDatasetToBrokerDataset(final Dataset d) {
if (d == null) { if (d == null) {
return null; return null;
@ -123,55 +118,6 @@ public class ConversionUtils {
return res; return res;
} }
private static List<TypedValue> structPropTypedList(final List<StructuredProperty> list) {
if (list == null) {
return new ArrayList<>();
}
return list
.stream()
.map(ConversionUtils::oafStructPropToBrokerTypedValue)
.collect(Collectors.toList());
}
private static <F, T> List<T> mappedList(final List<F> list, final Function<F, T> func) {
if (list == null) {
return new ArrayList<>();
}
return list
.stream()
.map(func::apply)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
private static <F, T> List<T> flatMappedList(final List<F> list, final Function<F, List<T>> func) {
if (list == null) {
return new ArrayList<>();
}
return list
.stream()
.map(func::apply)
.flatMap(List::stream)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
private static <F, T> T mappedFirst(final List<F> list, final Function<F, T> func) {
if (list == null) {
return null;
}
return list
.stream()
.map(func::apply)
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
}
private static eu.dnetlib.broker.objects.Author oafAuthorToBrokerAuthor(final Author author) { private static eu.dnetlib.broker.objects.Author oafAuthorToBrokerAuthor(final Author author) {
if (author == null) { if (author == null) {
return null; return null;
@ -300,4 +246,55 @@ public class ConversionUtils {
.collect(Collectors.toList()) .collect(Collectors.toList())
: new ArrayList<>(); : new ArrayList<>();
} }
private static List<TypedValue> structPropTypedList(final List<StructuredProperty> list) {
if (list == null) {
return new ArrayList<>();
}
return list
.stream()
.map(ConversionUtils::oafStructPropToBrokerTypedValue)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
private static <F, T> List<T> mappedList(final List<F> list, final Function<F, T> func) {
if (list == null) {
return new ArrayList<>();
}
return list
.stream()
.map(func::apply)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
private static <F, T> List<T> flatMappedList(final List<F> list, final Function<F, List<T>> func) {
if (list == null) {
return new ArrayList<>();
}
return list
.stream()
.map(func::apply)
.flatMap(List::stream)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
private static <F, T> T mappedFirst(final List<F> list, final Function<F, T> func) {
if (list == null) {
return null;
}
return list
.stream()
.map(func::apply)
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
}
} }

View File

@ -23,12 +23,14 @@ public class ResultAggregator extends Aggregator<Tuple2<OpenaireBrokerResult, Re
@Override @Override
public ResultGroup reduce(final ResultGroup group, final Tuple2<OpenaireBrokerResult, Relation> t) { public ResultGroup reduce(final ResultGroup group, final Tuple2<OpenaireBrokerResult, Relation> t) {
return group.addElement(t._1); group.getData().add(t._1);
return group;
} }
@Override @Override
public ResultGroup merge(final ResultGroup g1, final ResultGroup g2) { public ResultGroup merge(final ResultGroup g1, final ResultGroup g2) {
return g1.addGroup(g2); g1.getData().addAll(g2.getData());
return g1;
} }
@Override @Override

View File

@ -14,23 +14,14 @@ public class ResultGroup implements Serializable {
*/ */
private static final long serialVersionUID = -3360828477088669296L; private static final long serialVersionUID = -3360828477088669296L;
private final List<OpenaireBrokerResult> data = new ArrayList<>(); private List<OpenaireBrokerResult> data = new ArrayList<>();
public List<OpenaireBrokerResult> getData() { public List<OpenaireBrokerResult> getData() {
return data; return data;
} }
public ResultGroup addElement(final OpenaireBrokerResult elem) { public void setData(final List<OpenaireBrokerResult> data) {
data.add(elem); this.data = data;
return this;
} }
public ResultGroup addGroup(final ResultGroup group) {
data.addAll(group.getData());
return this;
}
public boolean isValid() {
return data.size() > 1;
}
} }

View File

@ -11,9 +11,12 @@ public class RelatedDataset implements Serializable {
* *
*/ */
private static final long serialVersionUID = 774487705184038324L; private static final long serialVersionUID = 774487705184038324L;
private final String source; private String source;
private final String relType; private String relType;
private final Dataset relDataset; private Dataset relDataset;
public RelatedDataset() {
}
public RelatedDataset(final String source, final String relType, final Dataset relDataset) { public RelatedDataset(final String source, final String relType, final Dataset relDataset) {
this.source = source; this.source = source;
@ -25,12 +28,24 @@ public class RelatedDataset implements Serializable {
return source; return source;
} }
public void setSource(final String source) {
this.source = source;
}
public String getRelType() { public String getRelType() {
return relType; return relType;
} }
public void setRelType(final String relType) {
this.relType = relType;
}
public Dataset getRelDataset() { public Dataset getRelDataset() {
return relDataset; return relDataset;
} }
public void setRelDataset(final Dataset relDataset) {
this.relDataset = relDataset;
}
} }

View File

@ -12,9 +12,12 @@ public class RelatedProject implements Serializable {
*/ */
private static final long serialVersionUID = 4941437626549329870L; private static final long serialVersionUID = 4941437626549329870L;
private final String source; private String source;
private final String relType; private String relType;
private final Project relProject; private Project relProject;
public RelatedProject() {
}
public RelatedProject(final String source, final String relType, final Project relProject) { public RelatedProject(final String source, final String relType, final Project relProject) {
this.source = source; this.source = source;
@ -26,12 +29,24 @@ public class RelatedProject implements Serializable {
return source; return source;
} }
public void setSource(final String source) {
this.source = source;
}
public String getRelType() { public String getRelType() {
return relType; return relType;
} }
public void setRelType(final String relType) {
this.relType = relType;
}
public Project getRelProject() { public Project getRelProject() {
return relProject; return relProject;
} }
public void setRelProject(final Project relProject) {
this.relProject = relProject;
}
} }

View File

@ -12,9 +12,12 @@ public class RelatedPublication implements Serializable {
*/ */
private static final long serialVersionUID = 9021609640411395128L; private static final long serialVersionUID = 9021609640411395128L;
private final String source; private String source;
private final String relType; private String relType;
private final Publication relPublication; private Publication relPublication;
public RelatedPublication() {
}
public RelatedPublication(final String source, final String relType, final Publication relPublication) { public RelatedPublication(final String source, final String relType, final Publication relPublication) {
this.source = source; this.source = source;
@ -26,12 +29,24 @@ public class RelatedPublication implements Serializable {
return source; return source;
} }
public void setSource(final String source) {
this.source = source;
}
public String getRelType() { public String getRelType() {
return relType; return relType;
} }
public void setRelType(final String relType) {
this.relType = relType;
}
public Publication getRelPublication() { public Publication getRelPublication() {
return relPublication; return relPublication;
} }
public void setRelPublication(final Publication relPublication) {
this.relPublication = relPublication;
}
} }

View File

@ -11,9 +11,12 @@ public class RelatedSoftware implements Serializable {
* *
*/ */
private static final long serialVersionUID = 7573383356943300157L; private static final long serialVersionUID = 7573383356943300157L;
private final String source; private String source;
private final String relType; private String relType;
private final Software relSoftware; private Software relSoftware;
public RelatedSoftware() {
}
public RelatedSoftware(final String source, final String relType, final Software relSoftware) { public RelatedSoftware(final String source, final String relType, final Software relSoftware) {
this.source = source; this.source = source;
@ -25,12 +28,24 @@ public class RelatedSoftware implements Serializable {
return source; return source;
} }
public void setSource(final String source) {
this.source = source;
}
public String getRelType() { public String getRelType() {
return relType; return relType;
} }
public void setRelType(final String relType) {
this.relType = relType;
}
public Software getRelSoftware() { public Software getRelSoftware() {
return relSoftware; return relSoftware;
} }
public void setRelSoftware(final Software relSoftware) {
this.relSoftware = relSoftware;
}
} }