From b713132db7818d3abb9e03a80a63b0f7ae83f870 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 21 Dec 2022 12:49:08 +0100 Subject: [PATCH] [Cleaning] adding missing classes --- .../ReadDatasourceMasterDuplicateFromDB.java | 81 +++++++ .../common/action/model/MasterDuplicate.java | 38 +++ .../graph/clean/cfhb/CleanCfHbSparkJob.java | 227 ++++++++++++++++++ .../oa/graph/clean/cfhb/IdCfHbMapping.java | 54 +++++ 4 files changed, 400 insertions(+) create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/common/action/ReadDatasourceMasterDuplicateFromDB.java create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/common/action/model/MasterDuplicate.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/IdCfHbMapping.java diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/ReadDatasourceMasterDuplicateFromDB.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/ReadDatasourceMasterDuplicateFromDB.java new file mode 100644 index 000000000..33c8d572b --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/ReadDatasourceMasterDuplicateFromDB.java @@ -0,0 +1,81 @@ + +package eu.dnetlib.dhp.common.action; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.sql.ResultSet; +import java.sql.SQLException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.common.DbClient; +import eu.dnetlib.dhp.common.action.model.MasterDuplicate; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; + +public class ReadDatasourceMasterDuplicateFromDB { + + private static final Logger log = LoggerFactory.getLogger(ReadDatasourceMasterDuplicateFromDB.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final String QUERY = "SELECT distinct dd.id as masterId, d.officialname as masterName, dd.duplicate as duplicateId " + + + "FROM dsm_dedup_services dd join dsm_services d on (dd.id = d.id);"; + + public static int execute(String dbUrl, String dbUser, String dbPassword, String hdfsPath, String hdfsNameNode) + throws IOException { + int count = 0; + try (DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)) { + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", hdfsNameNode); + FileSystem fileSystem = FileSystem.get(conf); + FSDataOutputStream fos = fileSystem.create(new Path(hdfsPath)); + + log.info("running query: {}", QUERY); + log.info("storing results in: {}", hdfsPath); + + try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8))) { + dbClient.processResults(QUERY, rs -> writeMap(datasourceMasterMap(rs), writer)); + count++; + } + } + return count; + } + + private static MasterDuplicate datasourceMasterMap(ResultSet rs) { + try { + final MasterDuplicate md = new MasterDuplicate(); + + final String duplicateId = rs.getString("duplicateId"); + final String masterId = rs.getString("masterId"); + final String masterName = rs.getString("masterName"); + + md.setDuplicateId(OafMapperUtils.createOpenaireId(10, duplicateId, true)); + md.setMasterId(OafMapperUtils.createOpenaireId(10, masterId, true)); + md.setMasterName(masterName); + + return md; + } catch (final SQLException e) { + throw new RuntimeException(e); + } + } + + private static void writeMap(final MasterDuplicate dm, final BufferedWriter writer) { + try { + writer.write(OBJECT_MAPPER.writeValueAsString(dm)); + writer.newLine(); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + +} diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/model/MasterDuplicate.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/model/MasterDuplicate.java new file mode 100644 index 000000000..12a4407c4 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/model/MasterDuplicate.java @@ -0,0 +1,38 @@ + +package eu.dnetlib.dhp.common.action.model; + +import java.io.Serializable; + +/** + * @author miriam.baglioni + * @Date 21/07/22 + */ +public class MasterDuplicate implements Serializable { + private String duplicateId; + private String masterId; + private String masterName; + + public String getDuplicateId() { + return duplicateId; + } + + public void setDuplicateId(String duplicateId) { + this.duplicateId = duplicateId; + } + + public String getMasterId() { + return masterId; + } + + public void setMasterId(String masterId) { + this.masterId = masterId; + } + + public String getMasterName() { + return masterName; + } + + public void setMasterName(String masterName) { + this.masterName = masterName; + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java new file mode 100644 index 000000000..3d415f21d --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java @@ -0,0 +1,227 @@ + +package eu.dnetlib.dhp.oa.graph.clean.cfhb; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; +import org.apache.spark.sql.*; +import org.apache.spark.sql.expressions.Aggregator; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.common.action.model.MasterDuplicate; +import eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob; +import eu.dnetlib.dhp.schema.oaf.Instance; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.utils.DHPUtils; +import scala.Tuple2; + +public class CleanCfHbSparkJob { + + private static final Logger log = LoggerFactory.getLogger(CleanCfHbSparkJob.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + CleanCountrySparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/input_clean_cfhb_parameters.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("inputPath"); + log.info("inputPath: {}", inputPath); + + String resolvedPath = parser.get("resolvedPath"); + log.info("resolvedPath: {}", resolvedPath); + + String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + String dsMasterDuplicatePath = parser.get("masterDuplicatePath"); + log.info("masterDuplicatePath: {}", dsMasterDuplicatePath); + + String graphTableClassName = parser.get("graphTableClassName"); + log.info("graphTableClassName: {}", graphTableClassName); + + Class entityClazz = (Class) Class.forName(graphTableClassName); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); + HdfsSupport.remove(resolvedPath, spark.sparkContext().hadoopConfiguration()); + cleanCfHb( + spark, inputPath, entityClazz, resolvedPath, dsMasterDuplicatePath, outputPath); + }); + } + + private static void cleanCfHb(SparkSession spark, String inputPath, Class entityClazz, + String resolvedPath, String masterDuplicatePath, String outputPath) { + + // read the master-duplicate tuples + Dataset md = spark + .read() + .textFile(masterDuplicatePath) + .map(as(MasterDuplicate.class), Encoders.bean(MasterDuplicate.class)); + + // prepare the resolved CF|HB references with the corresponding EMPTY master ID + Dataset resolved = spark + .read() + .textFile(inputPath) + .map(as(entityClazz), Encoders.bean(entityClazz)) + .flatMap(flattenCfHbFn(), Encoders.bean(IdCfHbMapping.class)); + + // set the EMPTY master ID/NAME and save it + resolved + .joinWith(md, resolved.col("cfhb").equalTo(md.col("duplicateId"))) + .map(asIdCfHbMapping(), Encoders.bean(IdCfHbMapping.class)) + .filter((FilterFunction) m -> Objects.nonNull(m.getMasterId())) + .write() + .mode(SaveMode.Overwrite) + .json(resolvedPath); + + // read again the resolved CF|HB mapping + Dataset resolvedDS = spark + .read() + .textFile(resolvedPath) + .map(as(IdCfHbMapping.class), Encoders.bean(IdCfHbMapping.class)); + + // read the result table + Dataset res = spark + .read() + .textFile(inputPath) + .map(as(entityClazz), Encoders.bean(entityClazz)); + + // Join the results with the resolved CF|HB mapping, apply the mapping and save it + res + .joinWith(resolvedDS, res.col("id").equalTo(resolvedDS.col("resultId")), "left") + .groupByKey((MapFunction, String>) t -> t._1().getId(), Encoders.STRING()) + .mapGroups(getMapGroupsFunction(), Encoders.bean(entityClazz)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + } + + private static MapFunction, IdCfHbMapping> asIdCfHbMapping() { + return t -> { + final IdCfHbMapping mapping = t._1(); + Optional + .ofNullable(t._2()) + .ifPresent(t2 -> { + mapping.setMasterId(t2.getMasterId()); + mapping.setMasterName(t2.getMasterName()); + + }); + return mapping; + }; + } + + private static FlatMapFunction flattenCfHbFn() { + return r -> Stream + .concat( + Optional + .ofNullable(r.getCollectedfrom()) + .map(cf -> cf.stream().map(KeyValue::getKey)) + .orElse(Stream.empty()), + Stream + .concat( + Optional + .ofNullable(r.getInstance()) + .map( + instances -> instances + .stream() + .map(i -> Optional.ofNullable(i.getHostedby()).map(KeyValue::getKey).orElse(""))) + .orElse(Stream.empty()) + .filter(StringUtils::isNotBlank), + Optional + .ofNullable(r.getInstance()) + .map( + instances -> instances + .stream() + .map( + i -> Optional + .ofNullable(i.getCollectedfrom()) + .map(KeyValue::getKey) + .orElse(""))) + .orElse(Stream.empty()) + .filter(StringUtils::isNotBlank))) + .distinct() + .filter(StringUtils::isNotBlank) + .map(cfHb -> asIdCfHbMapping(r.getId(), cfHb)) + .iterator(); + } + + private static MapGroupsFunction, T> getMapGroupsFunction() { + return new MapGroupsFunction, T>() { + @Override + public T call(String key, Iterator> values) { + final Tuple2 first = values.next(); + final T res = first._1(); + + updateResult(res, first._2()); + values.forEachRemaining(t -> updateResult(res, t._2())); + return res; + } + + private void updateResult(T res, IdCfHbMapping m) { + if (Objects.nonNull(m)) { + res.getCollectedfrom().forEach(kv -> updateKeyValue(kv, m)); + res.getInstance().forEach(i -> { + updateKeyValue(i.getHostedby(), m); + updateKeyValue(i.getCollectedfrom(), m); + }); + } + } + + private void updateKeyValue(final KeyValue kv, final IdCfHbMapping a) { + if (kv.getKey().equals(a.getCfhb())) { + kv.setKey(a.getMasterId()); + kv.setValue(a.getMasterName()); + } + } + + }; + } + + private static IdCfHbMapping asIdCfHbMapping(String resultId, String cfHb) { + IdCfHbMapping m = new IdCfHbMapping(resultId); + m.setCfhb(cfHb); + return m; + } + + private static MapFunction as(Class clazz) { + return s -> OBJECT_MAPPER.readValue(s, clazz); + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/IdCfHbMapping.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/IdCfHbMapping.java new file mode 100644 index 000000000..0f527c4e7 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/IdCfHbMapping.java @@ -0,0 +1,54 @@ + +package eu.dnetlib.dhp.oa.graph.clean.cfhb; + +import java.io.Serializable; + +public class IdCfHbMapping implements Serializable { + + private String resultId; + + private String cfhb; + + private String masterId; + + private String masterName; + + public IdCfHbMapping() { + } + + public IdCfHbMapping(String id) { + this.resultId = id; + } + + public String getResultId() { + return resultId; + } + + public void setResultId(String resultId) { + this.resultId = resultId; + } + + public String getCfhb() { + return cfhb; + } + + public void setCfhb(String cfhb) { + this.cfhb = cfhb; + } + + public String getMasterId() { + return masterId; + } + + public void setMasterId(String masterId) { + this.masterId = masterId; + } + + public String getMasterName() { + return masterName; + } + + public void setMasterName(String masterName) { + this.masterName = masterName; + } +}