BrBETA_dnet-hadoop/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java

285 lines
9.4 KiB
Java

package eu.dnetlib.dhp.oa.graph.clean;
import static eu.dnetlib.dhp.common.GraphSupport.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.*;
import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
public class CleanGraphSparkJob {
private static final Logger log = LoggerFactory.getLogger(CleanGraphSparkJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
CleanGraphSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/input_clean_graph_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputGraph = parser.get("inputGraph");
log.info("inputGraph: {}", inputGraph);
String outputGraph = parser.get("outputGraph");
log.info("outputGraph: {}", outputGraph);
GraphFormat inputGraphFormat = Optional
.ofNullable(parser.get("inputGraphFormat"))
.map(GraphFormat::valueOf)
.orElse(GraphFormat.DEFAULT);
log.info("inputGraphFormat: {}", inputGraphFormat);
GraphFormat outputGraphFormat = Optional
.ofNullable(parser.get("outputGraphFormat"))
.map(GraphFormat::valueOf)
.orElse(GraphFormat.DEFAULT);
log.info("outputGraphFormat: {}", outputGraphFormat);
String isLookUpUrl = parser.get("isLookUpUrl");
log.info("isLookUpUrl: {}", isLookUpUrl);
String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName);
String hiveMetastoreUris = parser.get("hiveMetastoreUris");
log.info("hiveMetastoreUris: {}", hiveMetastoreUris);
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", hiveMetastoreUris);
Class<? extends Oaf> clazz = (Class<? extends Oaf>) Class.forName(graphTableClassName);
final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookUpUrl);
final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookupService);
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> cleanGraphTable(spark, vocs, inputGraph, inputGraphFormat, outputGraph, outputGraphFormat, clazz));
}
private static <T extends Oaf> void cleanGraphTable(
SparkSession spark,
VocabularyGroup vocs,
String inputGraph,
GraphFormat inputGraphFormat,
String outputGraph,
GraphFormat outputGraphFormat,
Class<T> clazz) {
final CleaningRuleMap mapping = CleaningRuleMap.create(vocs);
Dataset<T> cleaned = readGraph(spark, inputGraph, clazz, inputGraphFormat)
.map((MapFunction<T, T>) value -> fixVocabularyNames(value), Encoders.bean(clazz))
.map((MapFunction<T, T>) value -> OafCleaner.apply(value, mapping), Encoders.bean(clazz))
.map((MapFunction<T, T>) value -> fixDefaults(value), Encoders.bean(clazz));
saveGraphTable(cleaned, clazz, outputGraph, outputGraphFormat);
}
protected static <T extends Oaf> T fixVocabularyNames(T value) {
if (value instanceof Datasource) {
// nothing to clean here
} else if (value instanceof Project) {
// nothing to clean here
} else if (value instanceof Organization) {
Organization o = (Organization) value;
if (Objects.nonNull(o.getCountry())) {
fixVocabName(o.getCountry(), ModelConstants.DNET_COUNTRY_TYPE);
}
} else if (value instanceof Relation) {
// nothing to clean here
} else if (value instanceof Result) {
Result r = (Result) value;
fixVocabName(r.getLanguage(), ModelConstants.DNET_LANGUAGES);
fixVocabName(r.getResourcetype(), ModelConstants.DNET_DATA_CITE_RESOURCE);
fixVocabName(r.getBestaccessright(), ModelConstants.DNET_ACCESS_MODES);
if (Objects.nonNull(r.getSubject())) {
r.getSubject().forEach(s -> fixVocabName(s.getQualifier(), ModelConstants.DNET_SUBJECT_TYPOLOGIES));
}
if (Objects.nonNull(r.getInstance())) {
for (Instance i : r.getInstance()) {
fixVocabName(i.getAccessright(), ModelConstants.DNET_ACCESS_MODES);
fixVocabName(i.getRefereed(), ModelConstants.DNET_REVIEW_LEVELS);
}
}
if (Objects.nonNull(r.getAuthor())) {
r.getAuthor().forEach(a -> {
if (Objects.nonNull(a.getPid())) {
a.getPid().forEach(p -> {
fixVocabName(p.getQualifier(), ModelConstants.DNET_PID_TYPES);
});
}
});
}
if (value instanceof Publication) {
} else if (value instanceof eu.dnetlib.dhp.schema.oaf.Dataset) {
} else if (value instanceof OtherResearchProduct) {
} else if (value instanceof Software) {
}
}
return value;
}
private static void fixVocabName(Qualifier q, String vocabularyName) {
if (Objects.nonNull(q) && StringUtils.isBlank(q.getSchemeid())) {
q.setSchemeid(vocabularyName);
q.setSchemename(vocabularyName);
}
}
protected static <T extends Oaf> T fixDefaults(T value) {
if (value instanceof Datasource) {
// nothing to clean here
} else if (value instanceof Project) {
// nothing to clean here
} else if (value instanceof Organization) {
Organization o = (Organization) value;
if (Objects.isNull(o.getCountry()) || StringUtils.isBlank(o.getCountry().getClassid())) {
o.setCountry(qualifier("UNKNOWN", "Unknown", ModelConstants.DNET_COUNTRY_TYPE));
}
} else if (value instanceof Relation) {
// nothing to clean here
} else if (value instanceof Result) {
Result r = (Result) value;
if (Objects.nonNull(r.getPublisher()) && StringUtils.isBlank(r.getPublisher().getValue())) {
r.setPublisher(null);
}
if (Objects.isNull(r.getLanguage()) || StringUtils.isBlank(r.getLanguage().getClassid())) {
r
.setLanguage(
qualifier("und", "Undetermined", ModelConstants.DNET_LANGUAGES));
}
if (Objects.nonNull(r.getSubject())) {
r
.setSubject(
r
.getSubject()
.stream()
.filter(Objects::nonNull)
.filter(sp -> StringUtils.isNotBlank(sp.getValue()))
.filter(sp -> Objects.nonNull(sp.getQualifier()))
.filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid()))
.collect(Collectors.toList()));
}
if (Objects.isNull(r.getResourcetype()) || StringUtils.isBlank(r.getResourcetype().getClassid())) {
r
.setResourcetype(
qualifier("UNKNOWN", "Unknown", ModelConstants.DNET_DATA_CITE_RESOURCE));
}
if (Objects.nonNull(r.getInstance())) {
for (Instance i : r.getInstance()) {
if (Objects.isNull(i.getAccessright()) || StringUtils.isBlank(i.getAccessright().getClassid())) {
i.setAccessright(qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES));
}
if (Objects.isNull(i.getHostedby()) || StringUtils.isBlank(i.getHostedby().getKey())) {
i.setHostedby(ModelConstants.UNKNOWN_REPOSITORY);
}
if (Objects.isNull(i.getRefereed())) {
i.setRefereed(qualifier("0000", "Unknown", ModelConstants.DNET_REVIEW_LEVELS));
}
}
}
if (Objects.isNull(r.getBestaccessright()) || StringUtils.isBlank(r.getBestaccessright().getClassid())) {
Qualifier bestaccessrights = AbstractMdRecordToOafMapper.createBestAccessRights(r.getInstance());
if (Objects.isNull(bestaccessrights)) {
r
.setBestaccessright(
qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES));
} else {
r.setBestaccessright(bestaccessrights);
}
}
if (Objects.nonNull(r.getAuthor())) {
boolean nullRank = r
.getAuthor()
.stream()
.anyMatch(a -> Objects.isNull(a.getRank()));
if (nullRank) {
int i = 1;
for (Author author : r.getAuthor()) {
author.setRank(i++);
}
}
}
if (value instanceof Publication) {
} else if (value instanceof eu.dnetlib.dhp.schema.oaf.Dataset) {
} else if (value instanceof OtherResearchProduct) {
} else if (value instanceof Software) {
}
}
return value;
}
private static Qualifier qualifier(String classid, String classname, String scheme) {
return OafMapperUtils
.qualifier(
classid, classname, scheme, scheme);
}
private static <T extends Oaf> Dataset<T> readTableFromPath(
SparkSession spark, String inputEntityPath, Class<T> clazz) {
log.info("Reading Graph table from: {}", inputEntityPath);
return spark
.read()
.textFile(inputEntityPath)
.map(
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, clazz),
Encoders.bean(clazz));
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
}