Use json loading in place of text loading + jackson mapper

This commit is contained in:
Giambattista Bloisi 2024-12-20 15:24:35 +01:00
parent 1a3fc87599
commit 79e4728afd
7 changed files with 28 additions and 23 deletions

View File

@ -45,11 +45,11 @@
<!-- <artifactId>dhp-broker-events</artifactId>-->
<!-- <version>${project.version}</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>eu.dnetlib.dhp</groupId>-->
<!-- <artifactId>dhp-dedup-openaire</artifactId>-->
<!-- <version>${project.version}</version>-->
<!-- </dependency>-->
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-dedup-openaire</artifactId>
<version>${project.version}</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>eu.dnetlib.dhp</groupId>-->
<!-- <artifactId>dhp-enrichment</artifactId>-->

View File

@ -128,13 +128,12 @@ abstract class AbstractSparkAction implements Serializable {
.collect(Collectors.joining(SP_SEPARATOR));
}
protected static MapFunction<String, Relation> patchRelFn() {
protected static MapFunction<Relation, Relation> patchRelFn() {
return value -> {
final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);
if (rel.getDataInfo() == null) {
rel.setDataInfo(new DataInfo());
if (value.getDataInfo() == null) {
value.setDataInfo(new DataInfo());
}
return rel;
return value;
};
}

View File

@ -68,22 +68,20 @@ public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction {
final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation");
// collect organization merge relations from openorgs database
JavaRDD<Relation> mergeRelsRDD = spark
Dataset<Relation> relations = spark
.read()
.textFile(relationPath)
.schema(Encoders.bean(Relation.class).schema())
.json(relationPath)
.as(Encoders.bean(Relation.class))
.map(patchRelFn(), Encoders.bean(Relation.class))
.toJavaRDD()
.filter(this::isOpenorgs) // take only openorgs relations
.filter(this::isMergeRel); // take merges and isMergedIn relations
log.info("Number of Openorgs Merge Relations collected: {}", mergeRelsRDD.count());
final Dataset<Relation> relations = spark
.createDataset(
mergeRelsRDD.rdd(),
Encoders.bean(Relation.class));
relations.cache();
log.info("Number of Openorgs Merge Relations collected: {}", relations.count());
saveParquet(relations, outputPath, SaveMode.Append);
relations.unpersist();
}
private boolean isMergeRel(Relation rel) {

View File

@ -69,7 +69,9 @@ public class SparkCopyOpenorgsSimRels extends AbstractSparkAction {
Dataset<Relation> rawRels = spark
.read()
.textFile(relationPath)
.schema(Encoders.bean(Relation.class).schema())
.json(relationPath)
.as(Encoders.bean(Relation.class))
.map(patchRelFn(), Encoders.bean(Relation.class))
.filter(this::filterOpenorgsRels);

View File

@ -58,7 +58,9 @@ public class SparkCopyRelationsNoOpenorgs extends AbstractSparkAction {
JavaRDD<Relation> simRels = spark
.read()
.textFile(relationPath)
.schema(Encoders.bean(Relation.class).schema())
.json(relationPath)
.as(Encoders.bean(Relation.class))
.map(patchRelFn(), Encoders.bean(Relation.class))
.toJavaRDD()
.filter(x -> !isOpenorgsDedupRel(x));

View File

@ -111,7 +111,9 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction {
// collect diffrels from the raw graph relations: <other id, "diffRel">
JavaPairRDD<String, String> diffRels = spark
.read()
.textFile(relationPath)
.schema(Encoders.bean(Relation.class).schema())
.json(relationPath)
.as(Encoders.bean(Relation.class))
.map(patchRelFn(), Encoders.bean(Relation.class))
.toJavaRDD()
.filter(r -> filterRels(r, ModelSupport.getMainType(EntityType.organization)))

View File

@ -133,7 +133,9 @@ public class SparkPrepareOrgRels extends AbstractSparkAction {
// collect diffrels from the raw graph relations: <<best id, other id>, "diffRel">
JavaRDD<Tuple2<Tuple2<String, String>, String>> diffRels = spark
.read()
.textFile(relationPath)
.schema(Encoders.bean(Relation.class).schema())
.json(relationPath)
.as(Encoders.bean(Relation.class))
.map(patchRelFn(), Encoders.bean(Relation.class))
.toJavaRDD()
.filter(r -> filterRels(r, "organization"))