diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java index e40de935e..1aee72f09 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java @@ -16,6 +16,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Encoders; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.github.sisyphsu.dateparser.DateParserUtils; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -41,32 +43,35 @@ public class GraphCleaningFunctions extends CleaningFunctions { public static T cleanContext(T value, String contextId, String verifyParam) { if (ModelSupport.isSubClass(value, Result.class)) { final Result res = (Result) value; - if (res - .getTitle() - .stream() - .filter( - t -> t - .getQualifier() - .getClassid() - .equalsIgnoreCase(ModelConstants.MAIN_TITLE_QUALIFIER.getClassid())) - .noneMatch(t -> t.getValue().toLowerCase().startsWith(verifyParam.toLowerCase()))) { - return (T) res; + if (shouldCleanContext(res, verifyParam)) { + res + .setContext( + res + .getContext() + .stream() + .filter(c -> !StringUtils.startsWith(c.getId().toLowerCase(), contextId)) + .collect(Collectors.toList())); } - res - .setContext( - res - .getContext() - .stream() - .filter( - c -> !c.getId().split("::")[0] - .equalsIgnoreCase(contextId)) - .collect(Collectors.toList())); return (T) res; } else { return value; } } + private static boolean shouldCleanContext(Result res, String verifyParam) { + boolean titleMatch = res + .getTitle() + .stream() + .filter( + t -> t + .getQualifier() + .getClassid() + .equalsIgnoreCase(ModelConstants.MAIN_TITLE_QUALIFIER.getClassid())) + .anyMatch(t -> t.getValue().toLowerCase().startsWith(verifyParam.toLowerCase())); + + return titleMatch && Objects.nonNull(res.getContext()); + } + public static T cleanCountry(T value, String[] verifyParam, Set hostedBy, String collectedfrom, String country) { if (ModelSupport.isSubClass(value, Result.class)) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java index e97ff3cb2..01baca226 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java @@ -149,34 +149,29 @@ public class CleanGraphSparkJob { .map((MapFunction) value -> GraphCleaningFunctions.cleanup(value, vocs), Encoders.bean(clazz)) .filter((FilterFunction) GraphCleaningFunctions::filter); + // read the master-duplicate tuples + Dataset md = spark + .read() + .textFile(dsMasterDuplicatePath) + .map(as(MasterDuplicate.class), Encoders.bean(MasterDuplicate.class)); + + // prepare the resolved CF|HB references with the corresponding EMPTY master ID + Dataset resolved = spark + .read() + .textFile(inputPath) + .map(as(clazz), Encoders.bean(clazz)) + .flatMap(flattenCfHbFn(), Encoders.bean(IdCfHbMapping.class)); + if (Boolean.FALSE.equals(deepClean)) { - cleaned_basic - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath); + + if (Boolean.TRUE.equals(ModelSupport.isSubClass(clazz, Result.class))) { + save(fixCFHB(clazz, cleaned_basic, md, resolved), outputPath); + } else { + save(cleaned_basic, outputPath); + } } else if (Boolean.TRUE.equals(ModelSupport.isSubClass(clazz, Result.class))) { - // read the master-duplicate tuples - Dataset md = spark - .read() - .textFile(dsMasterDuplicatePath) - .map(as(MasterDuplicate.class), Encoders.bean(MasterDuplicate.class)); - - // prepare the resolved CF|HB references with the corresponding EMPTY master ID - Dataset resolved = spark - .read() - .textFile(inputPath) - .map(as(clazz), Encoders.bean(clazz)) - .flatMap(flattenCfHbFn(), Encoders.bean(IdCfHbMapping.class)); - - // set the EMPTY master ID/NAME - Dataset resolvedDs = resolved - .joinWith(md, resolved.col("cfhb").equalTo(md.col("duplicateId"))) - .map(asIdCfHbMapping(), Encoders.bean(IdCfHbMapping.class)) - .filter((FilterFunction) m -> Objects.nonNull(m.getMasterId())); - // load the hostedby mapping Set hostedBy = Sets .newHashSet( @@ -186,7 +181,7 @@ public class CleanGraphSparkJob { .collectAsList()); // perform the deep cleaning steps - final Dataset cleaned_deep = cleaned_basic + final Dataset cleaned_deep = fixCFHB(clazz, cleaned_basic, md, resolved) .map( (MapFunction) value -> GraphCleaningFunctions.cleanContext(value, contextId, verifyParam), Encoders.bean(clazz)) @@ -195,19 +190,34 @@ public class CleanGraphSparkJob { .cleanCountry(value, verifyCountryParam, hostedBy, collectedfrom, country), Encoders.bean(clazz)); - // Join the results with the resolved CF|HB mapping, apply the mapping and save it - cleaned_deep - .joinWith(resolvedDs, cleaned_deep.col("id").equalTo(resolvedDs.col("resultId")), "left") - .groupByKey( - (MapFunction, String>) t -> ((Result) t._1()).getId(), Encoders.STRING()) - .mapGroups(getMapGroupsFunction(), Encoders.bean(clazz)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath); + save(cleaned_deep, outputPath); } } + private static void save(final Dataset dataset, final String outputPath) { + dataset + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + } + + private static Dataset fixCFHB(Class clazz, Dataset results, Dataset md, + Dataset resolved) { + + // set the EMPTY master ID/NAME + Dataset resolvedDs = resolved + .joinWith(md, resolved.col("cfhb").equalTo(md.col("duplicateId"))) + .map(asIdCfHbMapping(), Encoders.bean(IdCfHbMapping.class)) + .filter((FilterFunction) m -> Objects.nonNull(m.getMasterId())); + + return results + .joinWith(resolvedDs, results.col("id").equalTo(resolvedDs.col("resultId")), "left") + .groupByKey( + (MapFunction, String>) t -> ((Result) t._1()).getId(), Encoders.STRING()) + .mapGroups(getMapGroupsFunction(), Encoders.bean(clazz)); + } + private static Dataset readTableFromPath( SparkSession spark, String inputEntityPath, Class clazz) { diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJobTest.java index 5b021af01..65182108e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJobTest.java @@ -165,7 +165,8 @@ public class CleanGraphSparkJobTest { "--outputPath", graphOutputPath + "/relation", "--isLookupUrl", "lookupurl", "--graphTableClassName", Relation.class.getCanonicalName(), - "--deepClean", "false" + "--deepClean", "false", + "--masterDuplicatePath", dsMasterDuplicatePath, })).run(false, isLookUpService); spark @@ -262,7 +263,8 @@ public class CleanGraphSparkJobTest { "--outputPath", graphOutputPath + "/publication", "--isLookupUrl", "lookupurl", "--graphTableClassName", Publication.class.getCanonicalName(), - "--deepClean", "false" + "--deepClean", "false", + "--masterDuplicatePath", dsMasterDuplicatePath, })).run(false, isLookUpService); Publication p = read(spark, graphOutputPath + "/publication", Publication.class) @@ -413,7 +415,8 @@ public class CleanGraphSparkJobTest { "--outputPath", graphOutputPath + "/publication", "--isLookupUrl", "lookupurl", "--graphTableClassName", Publication.class.getCanonicalName(), - "--deepClean", "false" + "--deepClean", "false", + "--masterDuplicatePath", dsMasterDuplicatePath, })).run(false, isLookUpService); Dataset p = read(spark, graphOutputPath + "/publication", Publication.class)