graph cleaning refactoring #282

Merged
claudio.atzori merged 10 commits from graph_cleaning_refactoring into beta 2023-05-02 10:40:03 +02:00
3 changed files with 75 additions and 57 deletions
Showing only changes of commit 2a6ba29b64 - Show all commits

View File

@ -16,6 +16,8 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.sisyphsu.dateparser.DateParserUtils; import com.github.sisyphsu.dateparser.DateParserUtils;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
@ -41,32 +43,35 @@ public class GraphCleaningFunctions extends CleaningFunctions {
public static <T extends Oaf> T cleanContext(T value, String contextId, String verifyParam) { public static <T extends Oaf> T cleanContext(T value, String contextId, String verifyParam) {
if (ModelSupport.isSubClass(value, Result.class)) { if (ModelSupport.isSubClass(value, Result.class)) {
final Result res = (Result) value; final Result res = (Result) value;
if (res if (shouldCleanContext(res, verifyParam)) {
.getTitle() res
.stream() .setContext(
.filter( res
t -> t .getContext()
.getQualifier() .stream()
.getClassid() .filter(c -> !StringUtils.startsWith(c.getId().toLowerCase(), contextId))
.equalsIgnoreCase(ModelConstants.MAIN_TITLE_QUALIFIER.getClassid())) .collect(Collectors.toList()));
.noneMatch(t -> t.getValue().toLowerCase().startsWith(verifyParam.toLowerCase()))) {
return (T) res;
} }
res
.setContext(
res
.getContext()
.stream()
.filter(
c -> !c.getId().split("::")[0]
.equalsIgnoreCase(contextId))
.collect(Collectors.toList()));
return (T) res; return (T) res;
} else { } else {
return value; return value;
} }
} }
private static boolean shouldCleanContext(Result res, String verifyParam) {
boolean titleMatch = res
.getTitle()
.stream()
.filter(
t -> t
.getQualifier()
.getClassid()
.equalsIgnoreCase(ModelConstants.MAIN_TITLE_QUALIFIER.getClassid()))
.anyMatch(t -> t.getValue().toLowerCase().startsWith(verifyParam.toLowerCase()));
return titleMatch && Objects.nonNull(res.getContext());
}
public static <T extends Oaf> T cleanCountry(T value, String[] verifyParam, Set<String> hostedBy, public static <T extends Oaf> T cleanCountry(T value, String[] verifyParam, Set<String> hostedBy,
String collectedfrom, String country) { String collectedfrom, String country) {
if (ModelSupport.isSubClass(value, Result.class)) { if (ModelSupport.isSubClass(value, Result.class)) {

View File

@ -149,34 +149,29 @@ public class CleanGraphSparkJob {
.map((MapFunction<T, T>) value -> GraphCleaningFunctions.cleanup(value, vocs), Encoders.bean(clazz)) .map((MapFunction<T, T>) value -> GraphCleaningFunctions.cleanup(value, vocs), Encoders.bean(clazz))
.filter((FilterFunction<T>) GraphCleaningFunctions::filter); .filter((FilterFunction<T>) GraphCleaningFunctions::filter);
// read the master-duplicate tuples
Dataset<MasterDuplicate> md = spark
.read()
.textFile(dsMasterDuplicatePath)
.map(as(MasterDuplicate.class), Encoders.bean(MasterDuplicate.class));
// prepare the resolved CF|HB references with the corresponding EMPTY master ID
Dataset<IdCfHbMapping> resolved = spark
.read()
.textFile(inputPath)
.map(as(clazz), Encoders.bean(clazz))
.flatMap(flattenCfHbFn(), Encoders.bean(IdCfHbMapping.class));
if (Boolean.FALSE.equals(deepClean)) { if (Boolean.FALSE.equals(deepClean)) {
cleaned_basic
.write() if (Boolean.TRUE.equals(ModelSupport.isSubClass(clazz, Result.class))) {
.mode(SaveMode.Overwrite) save(fixCFHB(clazz, cleaned_basic, md, resolved), outputPath);
.option("compression", "gzip") } else {
.json(outputPath); save(cleaned_basic, outputPath);
}
} else if (Boolean.TRUE.equals(ModelSupport.isSubClass(clazz, Result.class))) { } else if (Boolean.TRUE.equals(ModelSupport.isSubClass(clazz, Result.class))) {
// read the master-duplicate tuples
Dataset<MasterDuplicate> md = spark
.read()
.textFile(dsMasterDuplicatePath)
.map(as(MasterDuplicate.class), Encoders.bean(MasterDuplicate.class));
// prepare the resolved CF|HB references with the corresponding EMPTY master ID
Dataset<IdCfHbMapping> resolved = spark
.read()
.textFile(inputPath)
.map(as(clazz), Encoders.bean(clazz))
.flatMap(flattenCfHbFn(), Encoders.bean(IdCfHbMapping.class));
// set the EMPTY master ID/NAME
Dataset<IdCfHbMapping> resolvedDs = resolved
.joinWith(md, resolved.col("cfhb").equalTo(md.col("duplicateId")))
.map(asIdCfHbMapping(), Encoders.bean(IdCfHbMapping.class))
.filter((FilterFunction<IdCfHbMapping>) m -> Objects.nonNull(m.getMasterId()));
// load the hostedby mapping // load the hostedby mapping
Set<String> hostedBy = Sets Set<String> hostedBy = Sets
.newHashSet( .newHashSet(
@ -186,7 +181,7 @@ public class CleanGraphSparkJob {
.collectAsList()); .collectAsList());
// perform the deep cleaning steps // perform the deep cleaning steps
final Dataset<T> cleaned_deep = cleaned_basic final Dataset<T> cleaned_deep = fixCFHB(clazz, cleaned_basic, md, resolved)
.map( .map(
(MapFunction<T, T>) value -> GraphCleaningFunctions.cleanContext(value, contextId, verifyParam), (MapFunction<T, T>) value -> GraphCleaningFunctions.cleanContext(value, contextId, verifyParam),
Encoders.bean(clazz)) Encoders.bean(clazz))
@ -195,19 +190,34 @@ public class CleanGraphSparkJob {
.cleanCountry(value, verifyCountryParam, hostedBy, collectedfrom, country), .cleanCountry(value, verifyCountryParam, hostedBy, collectedfrom, country),
Encoders.bean(clazz)); Encoders.bean(clazz));
// Join the results with the resolved CF|HB mapping, apply the mapping and save it save(cleaned_deep, outputPath);
cleaned_deep
.joinWith(resolvedDs, cleaned_deep.col("id").equalTo(resolvedDs.col("resultId")), "left")
.groupByKey(
(MapFunction<Tuple2<T, IdCfHbMapping>, String>) t -> ((Result) t._1()).getId(), Encoders.STRING())
.mapGroups(getMapGroupsFunction(), Encoders.bean(clazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
} }
} }
private static <T extends Oaf> void save(final Dataset<T> dataset, final String outputPath) {
dataset
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
private static <T extends Oaf> Dataset<T> fixCFHB(Class<T> clazz, Dataset<T> results, Dataset<MasterDuplicate> md,
Dataset<IdCfHbMapping> resolved) {
// set the EMPTY master ID/NAME
Dataset<IdCfHbMapping> resolvedDs = resolved
.joinWith(md, resolved.col("cfhb").equalTo(md.col("duplicateId")))
.map(asIdCfHbMapping(), Encoders.bean(IdCfHbMapping.class))
.filter((FilterFunction<IdCfHbMapping>) m -> Objects.nonNull(m.getMasterId()));
return results
.joinWith(resolvedDs, results.col("id").equalTo(resolvedDs.col("resultId")), "left")
.groupByKey(
(MapFunction<Tuple2<T, IdCfHbMapping>, String>) t -> ((Result) t._1()).getId(), Encoders.STRING())
.mapGroups(getMapGroupsFunction(), Encoders.bean(clazz));
}
private static <T extends Oaf> Dataset<T> readTableFromPath( private static <T extends Oaf> Dataset<T> readTableFromPath(
SparkSession spark, String inputEntityPath, Class<T> clazz) { SparkSession spark, String inputEntityPath, Class<T> clazz) {

View File

@ -165,7 +165,8 @@ public class CleanGraphSparkJobTest {
"--outputPath", graphOutputPath + "/relation", "--outputPath", graphOutputPath + "/relation",
"--isLookupUrl", "lookupurl", "--isLookupUrl", "lookupurl",
"--graphTableClassName", Relation.class.getCanonicalName(), "--graphTableClassName", Relation.class.getCanonicalName(),
"--deepClean", "false" "--deepClean", "false",
"--masterDuplicatePath", dsMasterDuplicatePath,
})).run(false, isLookUpService); })).run(false, isLookUpService);
spark spark
@ -262,7 +263,8 @@ public class CleanGraphSparkJobTest {
"--outputPath", graphOutputPath + "/publication", "--outputPath", graphOutputPath + "/publication",
"--isLookupUrl", "lookupurl", "--isLookupUrl", "lookupurl",
"--graphTableClassName", Publication.class.getCanonicalName(), "--graphTableClassName", Publication.class.getCanonicalName(),
"--deepClean", "false" "--deepClean", "false",
"--masterDuplicatePath", dsMasterDuplicatePath,
})).run(false, isLookUpService); })).run(false, isLookUpService);
Publication p = read(spark, graphOutputPath + "/publication", Publication.class) Publication p = read(spark, graphOutputPath + "/publication", Publication.class)
@ -413,7 +415,8 @@ public class CleanGraphSparkJobTest {
"--outputPath", graphOutputPath + "/publication", "--outputPath", graphOutputPath + "/publication",
"--isLookupUrl", "lookupurl", "--isLookupUrl", "lookupurl",
"--graphTableClassName", Publication.class.getCanonicalName(), "--graphTableClassName", Publication.class.getCanonicalName(),
"--deepClean", "false" "--deepClean", "false",
"--masterDuplicatePath", dsMasterDuplicatePath,
})).run(false, isLookUpService); })).run(false, isLookUpService);
Dataset<Publication> p = read(spark, graphOutputPath + "/publication", Publication.class) Dataset<Publication> p = read(spark, graphOutputPath + "/publication", Publication.class)