|
|
|
@ -26,6 +26,7 @@ import eu.dnetlib.dhp.schema.oaf.Oaf;
|
|
|
|
|
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
|
|
|
|
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
|
|
|
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
|
|
|
|
import static eu.dnetlib.dhp.oa.graph.clean.CleaningFunctions.*;
|
|
|
|
|
|
|
|
|
|
public class CleanGraphSparkJob {
|
|
|
|
|
|
|
|
|
@ -86,9 +87,9 @@ public class CleanGraphSparkJob {
|
|
|
|
|
final CleaningRuleMap mapping = CleaningRuleMap.create(vocs);
|
|
|
|
|
|
|
|
|
|
readTableFromPath(spark, inputPath, clazz)
|
|
|
|
|
.map((MapFunction<T, T>) value -> CleaningFunctions.fixVocabularyNames(value), Encoders.bean(clazz))
|
|
|
|
|
.map((MapFunction<T, T>) value -> fixVocabularyNames(value), Encoders.bean(clazz))
|
|
|
|
|
.map((MapFunction<T, T>) value -> OafCleaner.apply(value, mapping), Encoders.bean(clazz))
|
|
|
|
|
.map((MapFunction<T, T>) value -> CleaningFunctions.fixDefaults(value), Encoders.bean(clazz))
|
|
|
|
|
.map((MapFunction<T, T>) value -> fixDefaults(value), Encoders.bean(clazz))
|
|
|
|
|
.write()
|
|
|
|
|
.mode(SaveMode.Overwrite)
|
|
|
|
|
.option("compression", "gzip")
|
|
|
|
|