From 24ef301cc1d962cd2c613be6ad23e6634ebaa4f4 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 28 Nov 2022 09:53:23 +0100 Subject: [PATCH 1/4] [graph cleaning] patch the result's collectedfrom and hostedby identifiers according to the datasource master-duplicate mapping --- .../ReadDatasourceMasterDuplicateFromDB.java | 76 +++++++ .../common/action/model/MasterDuplicate.java | 29 +++ .../oa/graph/clean/MasterDuplicateAction.java | 45 ++++ .../graph/clean/cfhb/CleanCfHbSparkJob.java | 207 ++++++++++++++++++ .../oa/graph/clean/cfhb/IdCfHbMapping.java | 44 ++++ .../dhp/oa/graph/clean/oozie_app/workflow.xml | 202 ++++++++++++++++- .../oa/graph/datasourcemaster_parameters.json | 32 +++ .../oa/graph/input_clean_cfhb_parameters.json | 32 +++ .../dnetlib/dhp/oa/graph/raw/MappersTest.java | 3 +- .../dhp/oa/provision/EOSCFuture_Test.java | 105 ++++----- 10 files changed, 719 insertions(+), 56 deletions(-) create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/common/action/ReadDatasourceMasterDuplicateFromDB.java create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/common/action/model/MasterDuplicate.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/MasterDuplicateAction.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/IdCfHbMapping.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/datasourcemaster_parameters.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_clean_cfhb_parameters.json diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/ReadDatasourceMasterDuplicateFromDB.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/ReadDatasourceMasterDuplicateFromDB.java new file mode 100644 index 000000000..d9e8ced85 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/ReadDatasourceMasterDuplicateFromDB.java @@ -0,0 +1,76 @@ + +package eu.dnetlib.dhp.common.action; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.sql.ResultSet; +import java.sql.SQLException; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.common.DbClient; +import eu.dnetlib.dhp.common.action.model.MasterDuplicate; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; + +public class ReadDatasourceMasterDuplicateFromDB { + + private static final Logger log = LoggerFactory.getLogger(ReadDatasourceMasterDuplicateFromDB.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final String QUERY = "SELECT id as master, duplicate FROM dsm_dedup_services;"; + + public static int execute(String dbUrl, String dbUser, String dbPassword, String hdfsPath, String hdfsNameNode) + throws IOException { + int count = 0; + try (DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)) { + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", hdfsNameNode); + FileSystem fileSystem = FileSystem.get(conf); + FSDataOutputStream fos = fileSystem.create(new Path(hdfsPath)); + + log.info("running query: {}", QUERY); + log.info("storing results in: {}", hdfsPath); + + try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8))) { + dbClient.processResults(QUERY, rs -> writeMap(datasourceMasterMap(rs), writer)); + count++; + } + } + return count; + } + + private static MasterDuplicate datasourceMasterMap(ResultSet rs) { + try { + MasterDuplicate md = new MasterDuplicate(); + final String master = rs.getString("master"); + final String duplicate = rs.getString("duplicate"); + md.setMaster(OafMapperUtils.createOpenaireId(10, master, true)); + md.setDuplicate(OafMapperUtils.createOpenaireId(10, duplicate, true)); + + return md; + } catch (final SQLException e) { + throw new RuntimeException(e); + } + } + + private static void writeMap(final MasterDuplicate dm, final BufferedWriter writer) { + try { + writer.write(OBJECT_MAPPER.writeValueAsString(dm)); + writer.newLine(); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + +} diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/model/MasterDuplicate.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/model/MasterDuplicate.java new file mode 100644 index 000000000..b3e0d2aaa --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/model/MasterDuplicate.java @@ -0,0 +1,29 @@ + +package eu.dnetlib.dhp.common.action.model; + +import java.io.Serializable; + +/** + * @author miriam.baglioni + * @Date 21/07/22 + */ +public class MasterDuplicate implements Serializable { + private String duplicate; + private String master; + + public String getDuplicate() { + return duplicate; + } + + public void setDuplicate(String duplicate) { + this.duplicate = duplicate; + } + + public String getMaster() { + return master; + } + + public void setMaster(String master) { + this.master = master; + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/MasterDuplicateAction.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/MasterDuplicateAction.java new file mode 100644 index 000000000..8bf36ff82 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/MasterDuplicateAction.java @@ -0,0 +1,45 @@ + +package eu.dnetlib.dhp.oa.graph.clean; + +import org.apache.commons.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.action.ReadDatasourceMasterDuplicateFromDB; + +public class MasterDuplicateAction { + + private static final Logger log = LoggerFactory.getLogger(MasterDuplicateAction.class); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + MasterDuplicateAction.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/datasourcemaster_parameters.json"))); + + parser.parseArgument(args); + + final String dbUrl = parser.get("postgresUrl"); + log.info("postgresUrl: {}", dbUrl); + + final String dbUser = parser.get("postgresUser"); + log.info("postgresUser: {}", dbUser); + + final String dbPassword = parser.get("postgresPassword"); + log.info("postgresPassword: {}", dbPassword); + + final String hdfsPath = parser.get("hdfsPath"); + log.info("hdfsPath: {}", hdfsPath); + + final String hdfsNameNode = parser.get("hdfsNameNode"); + log.info("hdfsNameNode: {}", hdfsNameNode); + + int rows = ReadDatasourceMasterDuplicateFromDB.execute(dbUrl, dbUser, dbPassword, hdfsPath, hdfsNameNode); + + log.info("written {} rows", rows); + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java new file mode 100644 index 000000000..ad7e252f6 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java @@ -0,0 +1,207 @@ + +package eu.dnetlib.dhp.oa.graph.clean.cfhb; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Stream; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; +import org.apache.spark.sql.expressions.Aggregator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.action.model.MasterDuplicate; +import eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob; +import eu.dnetlib.dhp.schema.oaf.Instance; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Result; +import scala.Tuple2; + +public class CleanCfHbSparkJob { + + private static final Logger log = LoggerFactory.getLogger(CleanCfHbSparkJob.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + CleanCountrySparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/input_clean_cfhb_parameters.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("inputPath"); + log.info("inputPath: {}", inputPath); + + String workingPath = parser.get("workingPath"); + log.info("workingPath: {}", workingPath); + + String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + String masterDuplicatePath = parser.get("masterDuplicatePath"); + log.info("masterDuplicatePath: {}", masterDuplicatePath); + + String graphTableClassName = parser.get("graphTableClassName"); + log.info("graphTableClassName: {}", graphTableClassName); + + Class entityClazz = (Class) Class.forName(graphTableClassName); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + cleanCfHb( + spark, inputPath, entityClazz, workingPath, masterDuplicatePath, outputPath); + }); + } + + private static void cleanCfHb(SparkSession spark, String inputPath, Class entityClazz, + String workingPath, String masterDuplicatePath, String outputPath) { + // read the master-duplicate tuples + Dataset md = spark + .read() + .textFile(masterDuplicatePath) + .map(as(MasterDuplicate.class), Encoders.bean(MasterDuplicate.class)); + + // read the result table + Dataset res = spark + .read() + .textFile(inputPath) + .map(as(entityClazz), Encoders.bean(entityClazz)); + + // prepare the resolved CF|HB references with the corresponding EMPTY master ID + Dataset resolved = res + .flatMap( + (FlatMapFunction) r -> Stream + .concat( + r.getCollectedfrom().stream().map(KeyValue::getKey), + Stream + .concat( + r.getInstance().stream().map(Instance::getHostedby).map(KeyValue::getKey), + r.getInstance().stream().map(Instance::getCollectedfrom).map(KeyValue::getKey))) + .distinct() + .map(s -> asIdCfHbMapping(r.getId(), s)) + .iterator(), + Encoders.bean(IdCfHbMapping.class)); + + final String resolvedPath = workingPath + "/cfHbResolved"; + + // set the EMPTY master ID and save it aside + resolved + .joinWith(md, resolved.col("cfhb").equalTo(md.col("duplicate"))) + .map((MapFunction, IdCfHbMapping>) t -> { + t._1().setMaster(t._2().getMaster()); + return t._1(); + }, Encoders.bean(IdCfHbMapping.class)) + .write() + .mode(SaveMode.Overwrite) + .parquet(resolvedPath); + + // read again the resolved CF|HB mapping + Dataset resolvedDS = spark + .read() + .load(resolvedPath) + .as(Encoders.bean(IdCfHbMapping.class)); + + // Join the results with the resolved CF|HB mapping, apply the mapping and save it + res + .joinWith(resolvedDS, res.col("id").equalTo(resolved.col("resultId")), "left") + .groupByKey((MapFunction, String>) t -> t._1().getId(), Encoders.STRING()) + .agg(new IdCfHbMappingAggregator(entityClazz).toColumn()) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + } + + public static class IdCfHbMappingAggregator extends Aggregator { + + private final Class entityClazz; + + public IdCfHbMappingAggregator(Class entityClazz) { + this.entityClazz = entityClazz; + } + + @Override + public T zero() { + try { + return entityClazz.newInstance(); + } catch (InstantiationException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } + + @Override + public T reduce(T r, IdCfHbMapping a) { + if (Objects.isNull(a) && StringUtils.isBlank(a.getMaster())) { + return r; + } + r.getCollectedfrom().forEach(kv -> updateKey(kv, a)); + r.getInstance().forEach(i -> { + updateKey(i.getHostedby(), a); + updateKey(i.getCollectedfrom(), a); + }); + return r; + } + + @Override + public T merge(T b1, T b2) { + if (Objects.isNull(b1.getId())) { + return b2; + } + return b1; + } + + @Override + public T finish(T r) { + return r; + } + + private void updateKey(final KeyValue kv, final IdCfHbMapping a) { + if (kv.getKey().equals(a.getCfhb())) { + kv.setKey(a.getMaster()); + } + } + + @Override + public Encoder bufferEncoder() { + return Encoders.bean(entityClazz); + } + + @Override + public Encoder outputEncoder() { + return Encoders.bean(entityClazz); + } + } + + private static IdCfHbMapping asIdCfHbMapping(String resultId, String cfHb) { + IdCfHbMapping m = new IdCfHbMapping(resultId); + m.setCfhb(cfHb); + return m; + } + + private static MapFunction as(Class clazz) { + return s -> OBJECT_MAPPER.readValue(s, clazz); + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/IdCfHbMapping.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/IdCfHbMapping.java new file mode 100644 index 000000000..16d1a2613 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/IdCfHbMapping.java @@ -0,0 +1,44 @@ + +package eu.dnetlib.dhp.oa.graph.clean.cfhb; + +import java.io.Serializable; + +public class IdCfHbMapping implements Serializable { + + private String resultid; + + private String cfhb; + + private String master; + + public IdCfHbMapping() { + } + + public IdCfHbMapping(String id) { + this.resultid = id; + } + + public String getResultid() { + return resultid; + } + + public void setResultid(String resultid) { + this.resultid = resultid; + } + + public String getCfhb() { + return cfhb; + } + + public void setCfhb(String cfhb) { + this.cfhb = cfhb; + } + + public String getMaster() { + return master; + } + + public void setMaster(String master) { + this.master = master; + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml index 6435d5131..e717fac0f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml @@ -317,13 +317,13 @@ - + yarn @@ -434,7 +434,6 @@ - yarn @@ -460,13 +459,13 @@ - + yarn @@ -583,7 +582,202 @@ - + + + + + ${wf:conf('shouldClean') eq true} + + + + + + + eu.dnetlib.dhp.oa.graph.clean.MasterDuplicateAction + --postgresUrl${postgresURL} + --postgresUser${postgresUser} + --postgresPassword${postgresPassword} + --hdfsPath${workingDir}/masterduplicate + --hdfsNameNode${nameNode} + + + + + + + + + + + + + + + yarn + cluster + patch publication cfhb + eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${graphOutputPath}/publication + --outputPath${workingPath}/cfHbPatched/publication + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication + --workingDir${workingDir}/working/publication + --masterDuplicatePath${workingDir}/masterduplicate + + + + + + + + yarn + cluster + patch dataset cfhb + eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${graphOutputPath}/dataset + --outputPath${workingPath}/cfHbPatched/dataset + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset + --workingDir${workingDir}/working/dataset + --masterDuplicatePath${workingDir}/masterduplicate + + + + + + + + yarn + cluster + patch otherresearchproduct cfhb + eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${graphOutputPath}/otherresearchproduct + --outputPath${workingPath}/cfHbPatched/otherresearchproduct + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct + --workingDir${workingDir}/working/otherresearchproduct + --masterDuplicatePath${workingDir}/masterduplicate + + + + + + + + yarn + cluster + patch software cfhb + eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${graphOutputPath}/software + --outputPath${workingPath}/cfHbPatched/software + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software + --workingDir${workingDir}/working/software + --masterDuplicatePath${workingDir}/masterduplicate + + + + + + + + + + + + + + + + + + + + ${workingPath}/cfHbPatched/publication + ${graphOutputPath}/publication + + + + + + + + + + + ${workingPath}/cfHbPatched/dataset + ${graphOutputPath}/dataset + + + + + + + + + + + ${workingPath}/cfHbPatched/otherresearchproduct + ${graphOutputPath}/otherresearchproduct + + + + + + + + + + + ${workingPath}/cfHbPatched/software + ${graphOutputPath}/software + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/datasourcemaster_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/datasourcemaster_parameters.json new file mode 100644 index 000000000..fbe2cca10 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/datasourcemaster_parameters.json @@ -0,0 +1,32 @@ +[ + { + "paramName": "pu", + "paramLongName": "postgresUrl", + "paramDescription": "the jdbc url to the postgres", + "paramRequired": true + }, + { + "paramName": "uid", + "paramLongName": "postgresUser", + "paramDescription": "the postgres user", + "paramRequired": true + }, + { + "paramName": "pwd", + "paramLongName": "postgresPassword", + "paramDescription": "the postgres password=", + "paramRequired": true + }, + { + "paramName": "p", + "paramLongName": "hdfsPath", + "paramDescription": "the target path on HDFS", + "paramRequired": true + }, + { + "paramName": "nn", + "paramLongName": "hdfsNameNode", + "paramDescription": "the HDFS nameNode", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_clean_cfhb_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_clean_cfhb_parameters.json new file mode 100644 index 000000000..8b8a5f70e --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_clean_cfhb_parameters.json @@ -0,0 +1,32 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "in", + "paramLongName": "inputPath", + "paramDescription": "the path to the graph data dump to read", + "paramRequired": true + }, + { + "paramName": "out", + "paramLongName": "outputPath", + "paramDescription": "the path to store the output graph", + "paramRequired": true + }, + { + "paramName": "class", + "paramLongName": "graphTableClassName", + "paramDescription": "class name moelling the graph table", + "paramRequired": true + }, + { + "paramName": "md", + "paramLongName": "datasourceMasterDuplicate", + "paramDescription": "path to the file on HDFS holding the datasource id tuples [master, duplicate]", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java index 3e35021c8..ad6ceef54 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java @@ -1002,7 +1002,8 @@ class MappersTest { @Test void testEOSCFuture_ROHub() throws IOException { - final String xml = IOUtils.toString(Objects.requireNonNull(getClass().getResourceAsStream("photic-zone-transformed.xml"))); + final String xml = IOUtils + .toString(Objects.requireNonNull(getClass().getResourceAsStream("photic-zone-transformed.xml"))); final List list = new OdfToOafMapper(vocs, false, true).processMdRecord(xml); final OtherResearchProduct rocrate = (OtherResearchProduct) list.get(0); assertNotNull(rocrate.getEoscifguidelines()); diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/EOSCFuture_Test.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/EOSCFuture_Test.java index 08bf19fe4..3e1a501d1 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/EOSCFuture_Test.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/EOSCFuture_Test.java @@ -1,13 +1,14 @@ + package eu.dnetlib.dhp.oa.provision; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.oa.provision.model.JoinedEntity; -import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; -import eu.dnetlib.dhp.oa.provision.utils.StreamingInputDocumentFactory; -import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory; -import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; -import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.io.IOException; +import java.io.StringReader; + +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerException; + import org.apache.commons.io.IOUtils; import org.apache.solr.client.solrj.util.ClientUtils; import org.apache.solr.common.SolrInputDocument; @@ -18,71 +19,73 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import javax.xml.transform.Transformer; -import javax.xml.transform.TransformerException; -import java.io.IOException; -import java.io.StringReader; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; -import static org.junit.jupiter.api.Assertions.assertNotNull; +import eu.dnetlib.dhp.oa.provision.model.JoinedEntity; +import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; +import eu.dnetlib.dhp.oa.provision.utils.StreamingInputDocumentFactory; +import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory; +import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; +import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory; public class EOSCFuture_Test { - public static ObjectMapper OBJECT_MAPPER = new ObjectMapper() - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + public static ObjectMapper OBJECT_MAPPER = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - public static final String VERSION = "2021-04-15T10:05:53Z"; - public static final String DSID = "b9ee796a-c49f-4473-a708-e7d67b84c16d_SW5kZXhEU1Jlc291cmNlcy9JbmRleERTUmVzb3VyY2VUeXBl"; + public static final String VERSION = "2021-04-15T10:05:53Z"; + public static final String DSID = "b9ee796a-c49f-4473-a708-e7d67b84c16d_SW5kZXhEU1Jlc291cmNlcy9JbmRleERTUmVzb3VyY2VUeXBl"; - private ContextMapper contextMapper; + private ContextMapper contextMapper; - @BeforeEach - public void setUp() { - contextMapper = new ContextMapper(); - } + @BeforeEach + public void setUp() { + contextMapper = new ContextMapper(); + } + @Test + public void testEOSC_ROHub() throws IOException, DocumentException, TransformerException { - @Test - public void testEOSC_ROHub() throws IOException, DocumentException, TransformerException { + final ContextMapper contextMapper = new ContextMapper(); - final ContextMapper contextMapper = new ContextMapper(); + final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, + XmlConverterJob.schemaLocation); - final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, - XmlConverterJob.schemaLocation); + final OtherResearchProduct p = OBJECT_MAPPER + .readValue( + IOUtils.toString(getClass().getResourceAsStream("eosc-future/photic-zone.json")), + OtherResearchProduct.class); - final OtherResearchProduct p = OBJECT_MAPPER - .readValue(IOUtils.toString(getClass().getResourceAsStream("eosc-future/photic-zone.json")), OtherResearchProduct.class); + final String xml = xmlRecordFactory.build(new JoinedEntity<>(p)); - final String xml = xmlRecordFactory.build(new JoinedEntity<>(p)); + assertNotNull(xml); - assertNotNull(xml); + final Document doc = new SAXReader().read(new StringReader(xml)); - final Document doc = new SAXReader().read(new StringReader(xml)); + assertNotNull(doc); + System.out.println(doc.asXML()); - assertNotNull(doc); - System.out.println(doc.asXML()); + testRecordTransformation(xml); + } + private void testRecordTransformation(final String record) throws IOException, TransformerException { + final String fields = IOUtils.toString(getClass().getResourceAsStream("fields.xml")); + final String xslt = IOUtils.toString(getClass().getResourceAsStream("layoutToRecordTransformer.xsl")); - testRecordTransformation(xml); - } + final String transformer = XmlIndexingJob.getLayoutTransformer("DMF", fields, xslt); + final Transformer tr = SaxonTransformerFactory.newInstance(transformer); - private void testRecordTransformation(final String record) throws IOException, TransformerException { - final String fields = IOUtils.toString(getClass().getResourceAsStream("fields.xml")); - final String xslt = IOUtils.toString(getClass().getResourceAsStream("layoutToRecordTransformer.xsl")); + final String indexRecordXML = XmlIndexingJob.toIndexRecord(tr, record); - final String transformer = XmlIndexingJob.getLayoutTransformer("DMF", fields, xslt); + final SolrInputDocument solrDoc = new StreamingInputDocumentFactory(VERSION, DSID) + .parseDocument(indexRecordXML); - final Transformer tr = SaxonTransformerFactory.newInstance(transformer); + final String xmlDoc = ClientUtils.toXML(solrDoc); - final String indexRecordXML = XmlIndexingJob.toIndexRecord(tr, record); - - final SolrInputDocument solrDoc = new StreamingInputDocumentFactory(VERSION, DSID) - .parseDocument(indexRecordXML); - - final String xmlDoc = ClientUtils.toXML(solrDoc); - - Assertions.assertNotNull(xmlDoc); - System.out.println(xmlDoc); - } + Assertions.assertNotNull(xmlDoc); + System.out.println(xmlDoc); + } } From 11695ba649f778646ea7811f8888993532230169 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 28 Nov 2022 10:18:43 +0100 Subject: [PATCH 2/4] [graph cleaning] patch also the result's collectedfrom and hostedby datasource name according to the datasource master-duplicate mapping --- .../ReadDatasourceMasterDuplicateFromDB.java | 19 +++++++----- .../common/action/model/MasterDuplicate.java | 29 ++++++++++++------- .../graph/clean/cfhb/CleanCfHbSparkJob.java | 16 +++++----- .../oa/graph/clean/cfhb/IdCfHbMapping.java | 20 +++++++++---- 4 files changed, 55 insertions(+), 29 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/ReadDatasourceMasterDuplicateFromDB.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/ReadDatasourceMasterDuplicateFromDB.java index d9e8ced85..5d39216f1 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/ReadDatasourceMasterDuplicateFromDB.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/ReadDatasourceMasterDuplicateFromDB.java @@ -8,7 +8,6 @@ import java.nio.charset.StandardCharsets; import java.sql.ResultSet; import java.sql.SQLException; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -28,7 +27,9 @@ public class ReadDatasourceMasterDuplicateFromDB { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final String QUERY = "SELECT id as master, duplicate FROM dsm_dedup_services;"; + private static final String QUERY = "SELECT distinct dd.id as masterId, d.officialname as masterName, dd.duplicate as duplicateId " + + + "FROM dsm_dedup_services dd join dsm_services d on (dd.id = d.id);"; public static int execute(String dbUrl, String dbUser, String dbPassword, String hdfsPath, String hdfsNameNode) throws IOException { @@ -52,11 +53,15 @@ public class ReadDatasourceMasterDuplicateFromDB { private static MasterDuplicate datasourceMasterMap(ResultSet rs) { try { - MasterDuplicate md = new MasterDuplicate(); - final String master = rs.getString("master"); - final String duplicate = rs.getString("duplicate"); - md.setMaster(OafMapperUtils.createOpenaireId(10, master, true)); - md.setDuplicate(OafMapperUtils.createOpenaireId(10, duplicate, true)); + final MasterDuplicate md = new MasterDuplicate(); + + final String duplicateId = rs.getString("duplicateId"); + final String masterId = rs.getString("masterId"); + final String masterName = rs.getString("masterName"); + + md.setDuplicateId(OafMapperUtils.createOpenaireId(10, duplicateId, true)); + md.setMasterId(OafMapperUtils.createOpenaireId(10, masterId, true)); + md.setMasterName(masterName); return md; } catch (final SQLException e) { diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/model/MasterDuplicate.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/model/MasterDuplicate.java index b3e0d2aaa..12a4407c4 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/model/MasterDuplicate.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/model/MasterDuplicate.java @@ -8,22 +8,31 @@ import java.io.Serializable; * @Date 21/07/22 */ public class MasterDuplicate implements Serializable { - private String duplicate; - private String master; + private String duplicateId; + private String masterId; + private String masterName; - public String getDuplicate() { - return duplicate; + public String getDuplicateId() { + return duplicateId; } - public void setDuplicate(String duplicate) { - this.duplicate = duplicate; + public void setDuplicateId(String duplicateId) { + this.duplicateId = duplicateId; } - public String getMaster() { - return master; + public String getMasterId() { + return masterId; } - public void setMaster(String master) { - this.master = master; + public void setMasterId(String masterId) { + this.masterId = masterId; + } + + public String getMasterName() { + return masterName; + } + + public void setMasterName(String masterName) { + this.masterName = masterName; } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java index ad7e252f6..d35dbc7c1 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java @@ -78,6 +78,7 @@ public class CleanCfHbSparkJob { private static void cleanCfHb(SparkSession spark, String inputPath, Class entityClazz, String workingPath, String masterDuplicatePath, String outputPath) { + // read the master-duplicate tuples Dataset md = spark .read() @@ -111,7 +112,7 @@ public class CleanCfHbSparkJob { resolved .joinWith(md, resolved.col("cfhb").equalTo(md.col("duplicate"))) .map((MapFunction, IdCfHbMapping>) t -> { - t._1().setMaster(t._2().getMaster()); + t._1().setMasterId(t._2().getMasterId()); return t._1(); }, Encoders.bean(IdCfHbMapping.class)) .write() @@ -154,13 +155,13 @@ public class CleanCfHbSparkJob { @Override public T reduce(T r, IdCfHbMapping a) { - if (Objects.isNull(a) && StringUtils.isBlank(a.getMaster())) { + if (Objects.isNull(a) && StringUtils.isBlank(a.getMasterId())) { return r; } - r.getCollectedfrom().forEach(kv -> updateKey(kv, a)); + r.getCollectedfrom().forEach(kv -> updateKeyValue(kv, a)); r.getInstance().forEach(i -> { - updateKey(i.getHostedby(), a); - updateKey(i.getCollectedfrom(), a); + updateKeyValue(i.getHostedby(), a); + updateKeyValue(i.getCollectedfrom(), a); }); return r; } @@ -178,9 +179,10 @@ public class CleanCfHbSparkJob { return r; } - private void updateKey(final KeyValue kv, final IdCfHbMapping a) { + private void updateKeyValue(final KeyValue kv, final IdCfHbMapping a) { if (kv.getKey().equals(a.getCfhb())) { - kv.setKey(a.getMaster()); + kv.setKey(a.getMasterId()); + kv.setValue(a.getMasterName()); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/IdCfHbMapping.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/IdCfHbMapping.java index 16d1a2613..cb4e1b5e6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/IdCfHbMapping.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/IdCfHbMapping.java @@ -9,7 +9,9 @@ public class IdCfHbMapping implements Serializable { private String cfhb; - private String master; + private String masterId; + + private String masterName; public IdCfHbMapping() { } @@ -34,11 +36,19 @@ public class IdCfHbMapping implements Serializable { this.cfhb = cfhb; } - public String getMaster() { - return master; + public String getMasterId() { + return masterId; } - public void setMaster(String master) { - this.master = master; + public void setMasterId(String masterId) { + this.masterId = masterId; + } + + public String getMasterName() { + return masterName; + } + + public void setMasterName(String masterName) { + this.masterName = masterName; } } From 58c05731f9d3b3d472b828a092c73f6aa78703ff Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Tue, 29 Nov 2022 11:21:51 +0100 Subject: [PATCH 3/4] [graph cleaning] WIP: testing the collectedfron and hostedby patch procedure --- .../oa/graph/clean/CleanContextSparkJob.java | 12 +- .../graph/clean/cfhb/CleanCfHbSparkJob.java | 151 ++++++++---------- .../oa/graph/clean/cfhb/IdCfHbMapping.java | 12 +- .../clean/country/CleanCountrySparkJob.java | 12 +- .../country/GetDatasourceFromCountry.java | 8 +- .../dhp/oa/graph/clean/oozie_app/workflow.xml | 34 ++-- .../oa/graph/input_clean_cfhb_parameters.json | 6 + .../graph/input_clean_context_parameters.json | 4 +- .../graph/input_clean_country_parameters.json | 4 +- .../input_datasource_country_parameters.json | 4 +- .../clean/cfhb/CleanCfHbSparkJobTest.java | 118 ++++++++++++++ .../clean/cfhb/entities/dataset/dataset.json | 3 + .../oa/graph/clean/cfhb/masterduplicate.json | 4 + 13 files changed, 243 insertions(+), 129 deletions(-) create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJobTest.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/cfhb/entities/dataset/dataset.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/cfhb/masterduplicate.json diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextSparkJob.java index 55fdbac59..10a3d4465 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextSparkJob.java @@ -47,8 +47,8 @@ public class CleanContextSparkJob implements Serializable { String inputPath = parser.get("inputPath"); log.info("inputPath: {}", inputPath); - String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); String contextId = parser.get("contextId"); log.info("contextId: {}", contextId); @@ -67,12 +67,12 @@ public class CleanContextSparkJob implements Serializable { isSparkSessionManaged, spark -> { - cleanContext(spark, contextId, verifyParam, inputPath, entityClazz, workingPath); + cleanContext(spark, contextId, verifyParam, inputPath, entityClazz, workingDir); }); } private static void cleanContext(SparkSession spark, String contextId, String verifyParam, - String inputPath, Class entityClazz, String workingPath) { + String inputPath, Class entityClazz, String workingDir) { Dataset res = spark .read() .textFile(inputPath) @@ -106,11 +106,11 @@ public class CleanContextSparkJob implements Serializable { .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .json(workingPath); + .json(workingDir); spark .read() - .textFile(workingPath) + .textFile(workingDir) .map( (MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), Encoders.bean(entityClazz)) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java index d35dbc7c1..b4678cc6c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java @@ -3,8 +3,11 @@ package eu.dnetlib.dhp.oa.graph.clean.cfhb; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import java.util.Iterator; +import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.io.IOUtils; @@ -12,8 +15,10 @@ import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.expressions.Aggregator; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,14 +57,14 @@ public class CleanCfHbSparkJob { String inputPath = parser.get("inputPath"); log.info("inputPath: {}", inputPath); - String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + String resolvedPath = parser.get("resolvedPath"); + log.info("resolvedPath: {}", resolvedPath); String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); - String masterDuplicatePath = parser.get("masterDuplicatePath"); - log.info("masterDuplicatePath: {}", masterDuplicatePath); + String dsMasterDuplicatePath = parser.get("datasourceMasterDuplicate"); + log.info("datasourceMasterDuplicate: {}", dsMasterDuplicatePath); String graphTableClassName = parser.get("graphTableClassName"); log.info("graphTableClassName: {}", graphTableClassName); @@ -72,12 +77,12 @@ public class CleanCfHbSparkJob { isSparkSessionManaged, spark -> { cleanCfHb( - spark, inputPath, entityClazz, workingPath, masterDuplicatePath, outputPath); + spark, inputPath, entityClazz, resolvedPath, dsMasterDuplicatePath, outputPath); }); } private static void cleanCfHb(SparkSession spark, String inputPath, Class entityClazz, - String workingPath, String masterDuplicatePath, String outputPath) { + String resolvedPath, String masterDuplicatePath, String outputPath) { // read the master-duplicate tuples Dataset md = spark @@ -85,116 +90,94 @@ public class CleanCfHbSparkJob { .textFile(masterDuplicatePath) .map(as(MasterDuplicate.class), Encoders.bean(MasterDuplicate.class)); - // read the result table - Dataset res = spark - .read() - .textFile(inputPath) - .map(as(entityClazz), Encoders.bean(entityClazz)); - // prepare the resolved CF|HB references with the corresponding EMPTY master ID - Dataset resolved = res - .flatMap( - (FlatMapFunction) r -> Stream - .concat( - r.getCollectedfrom().stream().map(KeyValue::getKey), - Stream + Dataset resolved = spark + .read() + .textFile(inputPath) + .map(as(entityClazz), Encoders.bean(entityClazz)) + .flatMap( + (FlatMapFunction) r -> { + final List list = Stream .concat( - r.getInstance().stream().map(Instance::getHostedby).map(KeyValue::getKey), - r.getInstance().stream().map(Instance::getCollectedfrom).map(KeyValue::getKey))) - .distinct() - .map(s -> asIdCfHbMapping(r.getId(), s)) - .iterator(), + r.getCollectedfrom().stream().map(KeyValue::getKey), + Stream + .concat( + r.getInstance().stream().map(Instance::getHostedby).map(KeyValue::getKey), + r.getInstance().stream().map(Instance::getCollectedfrom).map(KeyValue::getKey))) + .distinct() + .map(s -> asIdCfHbMapping(r.getId(), s)) + .collect(Collectors.toList()); + return list.iterator(); + }, Encoders.bean(IdCfHbMapping.class)); - final String resolvedPath = workingPath + "/cfHbResolved"; - - // set the EMPTY master ID and save it aside + // set the EMPTY master ID/NAME and save it resolved - .joinWith(md, resolved.col("cfhb").equalTo(md.col("duplicate"))) + .joinWith(md, resolved.col("cfhb").equalTo(md.col("duplicateId"))) .map((MapFunction, IdCfHbMapping>) t -> { t._1().setMasterId(t._2().getMasterId()); + t._1().setMasterName(t._2().getMasterName()); return t._1(); }, Encoders.bean(IdCfHbMapping.class)) .write() .mode(SaveMode.Overwrite) - .parquet(resolvedPath); + .json(resolvedPath); // read again the resolved CF|HB mapping Dataset resolvedDS = spark .read() - .load(resolvedPath) - .as(Encoders.bean(IdCfHbMapping.class)); + .textFile(resolvedPath) + .map(as(IdCfHbMapping.class), Encoders.bean(IdCfHbMapping.class)); + + // read the result table + Dataset res = spark + .read() + .textFile(inputPath) + .map(as(entityClazz), Encoders.bean(entityClazz)); // Join the results with the resolved CF|HB mapping, apply the mapping and save it res - .joinWith(resolvedDS, res.col("id").equalTo(resolved.col("resultId")), "left") + .joinWith(resolvedDS, res.col("id").equalTo(resolvedDS.col("resultId")), "left") .groupByKey((MapFunction, String>) t -> t._1().getId(), Encoders.STRING()) - .agg(new IdCfHbMappingAggregator(entityClazz).toColumn()) + .mapGroups(getMapGroupsFunction(), Encoders.bean(entityClazz)) + //.agg(new IdCfHbMappingAggregator(entityClazz).toColumn()) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(outputPath); } - public static class IdCfHbMappingAggregator extends Aggregator { + @NotNull + private static MapGroupsFunction, T> getMapGroupsFunction() { + return new MapGroupsFunction, T>() { + @Override + public T call(String key, Iterator> values) throws Exception { + final Tuple2 first = values.next(); + final T res = first._1(); - private final Class entityClazz; - - public IdCfHbMappingAggregator(Class entityClazz) { - this.entityClazz = entityClazz; - } - - @Override - public T zero() { - try { - return entityClazz.newInstance(); - } catch (InstantiationException | IllegalAccessException e) { - throw new RuntimeException(e); + updateResult(res, first._2()); + values.forEachRemaining(t -> updateResult(res, t._2())); + return res; } - } - @Override - public T reduce(T r, IdCfHbMapping a) { - if (Objects.isNull(a) && StringUtils.isBlank(a.getMasterId())) { - return r; + private void updateResult(T res, IdCfHbMapping m) { + if (Objects.nonNull(m)) { + res.getCollectedfrom().forEach(kv -> updateKeyValue(kv, m)); + res.getInstance().forEach(i -> { + updateKeyValue(i.getHostedby(), m); + updateKeyValue(i.getCollectedfrom(), m); + }); + } } - r.getCollectedfrom().forEach(kv -> updateKeyValue(kv, a)); - r.getInstance().forEach(i -> { - updateKeyValue(i.getHostedby(), a); - updateKeyValue(i.getCollectedfrom(), a); - }); - return r; - } - @Override - public T merge(T b1, T b2) { - if (Objects.isNull(b1.getId())) { - return b2; + private void updateKeyValue(final KeyValue kv, final IdCfHbMapping a) { + if (kv.getKey().equals(a.getCfhb())) { + kv.setKey(a.getMasterId()); + kv.setValue(a.getMasterName()); + } } - return b1; - } - @Override - public T finish(T r) { - return r; - } - - private void updateKeyValue(final KeyValue kv, final IdCfHbMapping a) { - if (kv.getKey().equals(a.getCfhb())) { - kv.setKey(a.getMasterId()); - kv.setValue(a.getMasterName()); - } - } - - @Override - public Encoder bufferEncoder() { - return Encoders.bean(entityClazz); - } - - @Override - public Encoder outputEncoder() { - return Encoders.bean(entityClazz); - } + }; } private static IdCfHbMapping asIdCfHbMapping(String resultId, String cfHb) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/IdCfHbMapping.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/IdCfHbMapping.java index cb4e1b5e6..fad1129c5 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/IdCfHbMapping.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/IdCfHbMapping.java @@ -5,7 +5,7 @@ import java.io.Serializable; public class IdCfHbMapping implements Serializable { - private String resultid; + private String resultId; private String cfhb; @@ -17,15 +17,15 @@ public class IdCfHbMapping implements Serializable { } public IdCfHbMapping(String id) { - this.resultid = id; + this.resultId = id; } - public String getResultid() { - return resultid; + public String getResultId() { + return resultId; } - public void setResultid(String resultid) { - this.resultid = resultid; + public void setResultId(String resultId) { + this.resultId = resultId; } public String getCfhb() { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/CleanCountrySparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/CleanCountrySparkJob.java index 45590f789..d8d803458 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/CleanCountrySparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/CleanCountrySparkJob.java @@ -58,8 +58,8 @@ public class CleanCountrySparkJob implements Serializable { String inputPath = parser.get("inputPath"); log.info("inputPath: {}", inputPath); - String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); String datasourcePath = parser.get("hostedBy"); log.info("datasourcePath: {}", datasourcePath); @@ -85,12 +85,12 @@ public class CleanCountrySparkJob implements Serializable { spark -> { cleanCountry( - spark, country, verifyParam, inputPath, entityClazz, workingPath, collectedfrom, datasourcePath); + spark, country, verifyParam, inputPath, entityClazz, workingDir, collectedfrom, datasourcePath); }); } private static void cleanCountry(SparkSession spark, String country, String[] verifyParam, - String inputPath, Class entityClazz, String workingPath, String collectedfrom, String datasourcePath) { + String inputPath, Class entityClazz, String workingDir, String collectedfrom, String datasourcePath) { List hostedBy = spark .read() @@ -134,11 +134,11 @@ public class CleanCountrySparkJob implements Serializable { .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .json(workingPath); + .json(workingDir); spark .read() - .textFile(workingPath) + .textFile(workingDir) .map( (MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), Encoders.bean(entityClazz)) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/GetDatasourceFromCountry.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/GetDatasourceFromCountry.java index d3741d3e8..598fccdd7 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/GetDatasourceFromCountry.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/GetDatasourceFromCountry.java @@ -54,8 +54,8 @@ public class GetDatasourceFromCountry implements Serializable { String inputPath = parser.get("inputPath"); log.info("inputPath: {}", inputPath); - String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + String workingPath = parser.get("workingDir"); + log.info("workingDir: {}", workingPath); String country = parser.get("country"); log.info("country: {}", country); @@ -70,7 +70,7 @@ public class GetDatasourceFromCountry implements Serializable { } private static void getDatasourceFromCountry(SparkSession spark, String country, String inputPath, - String workingPath) { + String workingDir) { Dataset organization = spark .read() @@ -100,7 +100,7 @@ public class GetDatasourceFromCountry implements Serializable { .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .json(workingPath); + .json(workingDir); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml index e717fac0f..e756840bd 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml @@ -343,7 +343,7 @@ --inputPath${graphOutputPath}/publication --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication - --workingPath${workingDir}/working/publication + --workingDir${workingDir}/working/publication --contextId${contextId} --verifyParam${verifyParam} @@ -370,7 +370,7 @@ --inputPath${graphOutputPath}/dataset --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset - --workingPath${workingDir}/working/dataset + --workingDir${workingDir}/working/dataset --contextId${contextId} --verifyParam${verifyParam} @@ -397,7 +397,7 @@ --inputPath${graphOutputPath}/otherresearchproduct --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct - --workingPath${workingDir}/working/otherresearchproduct + --workingDir${workingDir}/working/otherresearchproduct --contextId${contextId} --verifyParam${verifyParam} @@ -424,7 +424,7 @@ --inputPath${graphOutputPath}/software --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software - --workingPath${workingDir}/working/software + --workingDir${workingDir}/working/software --contextId${contextId} --verifyParam${verifyParam} @@ -452,7 +452,7 @@ --conf spark.sql.shuffle.partitions=7680 --inputPath${graphOutputPath} - --workingPath${workingDir}/working/hostedby + --workingDir${workingDir}/working/hostedby --country${country} @@ -485,7 +485,7 @@ --inputPath${graphOutputPath}/publication --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication - --workingPath${workingDir}/working/publication + --workingDir${workingDir}/working/publication --country${country} --verifyParam${verifyCountryParam} --hostedBy${workingDir}/working/hostedby @@ -514,7 +514,7 @@ --inputPath${graphOutputPath}/dataset --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset - --workingPath${workingDir}/working/dataset + --workingDir${workingDir}/working/dataset --country${country} --verifyParam${verifyCountryParam} --hostedBy${workingDir}/working/hostedby @@ -543,7 +543,7 @@ --inputPath${graphOutputPath}/otherresearchproduct --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct - --workingPath${workingDir}/working/otherresearchproduct + --workingDir${workingDir}/working/otherresearchproduct --country${country} --verifyParam${verifyCountryParam} --hostedBy${workingDir}/working/hostedby @@ -572,7 +572,7 @@ --inputPath${graphOutputPath}/software --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software - --workingPath${workingDir}/working/software + --workingDir${workingDir}/working/software --country${country} --verifyParam${verifyCountryParam} --hostedBy${workingDir}/working/hostedby @@ -629,9 +629,9 @@ --conf spark.sql.shuffle.partitions=7680 --inputPath${graphOutputPath}/publication + --resolvedPath${workingDir}/cfHbResolved/publication --outputPath${workingPath}/cfHbPatched/publication --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication - --workingDir${workingDir}/working/publication --masterDuplicatePath${workingDir}/masterduplicate @@ -656,9 +656,9 @@ --conf spark.sql.shuffle.partitions=7680 --inputPath${graphOutputPath}/dataset + --resolvedPath${workingDir}/cfHbResolved/dataset --outputPath${workingPath}/cfHbPatched/dataset --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset - --workingDir${workingDir}/working/dataset --masterDuplicatePath${workingDir}/masterduplicate @@ -683,9 +683,9 @@ --conf spark.sql.shuffle.partitions=7680 --inputPath${graphOutputPath}/otherresearchproduct + --resolvedPath${workingDir}/cfHbResolved/otherresearchproduct --outputPath${workingPath}/cfHbPatched/otherresearchproduct --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct - --workingDir${workingDir}/working/otherresearchproduct --masterDuplicatePath${workingDir}/masterduplicate @@ -710,9 +710,9 @@ --conf spark.sql.shuffle.partitions=7680 --inputPath${graphOutputPath}/software + --resolvedPath${workingDir}/cfHbResolved/software --outputPath${workingPath}/cfHbPatched/software --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software - --workingDir${workingDir}/working/software --masterDuplicatePath${workingDir}/masterduplicate @@ -733,7 +733,7 @@ - ${workingPath}/cfHbPatched/publication + ${workingDir}/cfHbPatched/publication ${graphOutputPath}/publication @@ -745,7 +745,7 @@ - ${workingPath}/cfHbPatched/dataset + ${workingDir}/cfHbPatched/dataset ${graphOutputPath}/dataset @@ -757,7 +757,7 @@ - ${workingPath}/cfHbPatched/otherresearchproduct + ${workingDir}/cfHbPatched/otherresearchproduct ${graphOutputPath}/otherresearchproduct @@ -769,7 +769,7 @@ - ${workingPath}/cfHbPatched/software + ${workingDir}/cfHbPatched/software ${graphOutputPath}/software diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_clean_cfhb_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_clean_cfhb_parameters.json index 8b8a5f70e..934d173b5 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_clean_cfhb_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_clean_cfhb_parameters.json @@ -11,6 +11,12 @@ "paramDescription": "the path to the graph data dump to read", "paramRequired": true }, + { + "paramName": "rp", + "paramLongName": "resolvedPath", + "paramDescription": "the path to store the resolved records", + "paramRequired": true + }, { "paramName": "out", "paramLongName": "outputPath", diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_clean_context_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_clean_context_parameters.json index e3d31d69f..8be6496d8 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_clean_context_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_clean_context_parameters.json @@ -12,8 +12,8 @@ "paramRequired": true }, { - "paramName": "wp", - "paramLongName": "workingPath", + "paramName": "wd", + "paramLongName": "workingDir", "paramDescription": "the path to store the output graph", "paramRequired": true }, diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_clean_country_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_clean_country_parameters.json index 318fb22f8..b38b5ac9f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_clean_country_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_clean_country_parameters.json @@ -12,8 +12,8 @@ "paramRequired": true }, { - "paramName": "wp", - "paramLongName": "workingPath", + "paramName": "wd", + "paramLongName": "workingDir", "paramDescription": "the path to store the output graph", "paramRequired": true }, diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_datasource_country_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_datasource_country_parameters.json index e0aa60328..01aa2e7b0 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_datasource_country_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_datasource_country_parameters.json @@ -12,8 +12,8 @@ "paramRequired": true }, { - "paramName": "wp", - "paramLongName": "workingPath", + "paramName": "wd", + "paramLongName": "workingDir", "paramDescription": "the path to store the output graph", "paramRequired": true }, diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJobTest.java new file mode 100644 index 000000000..680d1ff64 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJobTest.java @@ -0,0 +1,118 @@ +package eu.dnetlib.dhp.oa.graph.clean.cfhb; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.schema.oaf.Dataset; +import eu.dnetlib.dhp.schema.oaf.Publication; +import org.apache.commons.io.FileUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +public class CleanCfHbSparkJobTest { + + private static final Logger log = LoggerFactory.getLogger(CleanCfHbSparkJobTest.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SparkSession spark; + + private static Path testBaseTmpPath; + + private static String resolvedPath; + + private static String graphInputPath; + + private static String graphOutputPath; + + private static String dsMasterDuplicatePath; + + @BeforeAll + public static void beforeAll() throws IOException, URISyntaxException { + + testBaseTmpPath = Files.createTempDirectory(CleanCfHbSparkJobTest.class.getSimpleName()); + log.info("using test base path {}", testBaseTmpPath); + + final File entitiesSources = Paths + .get(CleanCfHbSparkJobTest.class.getResource("/eu/dnetlib/dhp/oa/graph/clean/cfhb/entities").toURI()) + .toFile(); + + FileUtils + .copyDirectory( + entitiesSources, + testBaseTmpPath.resolve("input").resolve("entities").toFile()); + + FileUtils + .copyFileToDirectory( + Paths + .get(CleanCfHbSparkJobTest.class.getResource("/eu/dnetlib/dhp/oa/graph/clean/cfhb/masterduplicate.json").toURI()) + .toFile(), + testBaseTmpPath.resolve("workingDir").resolve("masterduplicate").toFile()); + + + graphInputPath = testBaseTmpPath.resolve("input").resolve("entities").toString(); + resolvedPath = testBaseTmpPath.resolve("workingDir").resolve("cfHbResolved").toString(); + graphOutputPath = testBaseTmpPath.resolve("workingDir").resolve("cfHbPatched").toString(); + dsMasterDuplicatePath = testBaseTmpPath.resolve("workingDir").resolve("masterduplicate").toString(); + + SparkConf conf = new SparkConf(); + conf.setAppName(CleanCfHbSparkJobTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("spark.ui.enabled", "false"); + + spark = SparkSession + .builder() + .appName(CleanCfHbSparkJobTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(testBaseTmpPath.toFile()); + spark.stop(); + } + + @Test + void testCleanCfHbSparkJob() throws Exception { + final String outputPath = graphOutputPath + "/dataset"; + CleanCfHbSparkJob + .main( + new String[] { + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--inputPath", graphInputPath + "/dataset", + "--outputPath", outputPath, + "--resolvedPath", resolvedPath + "/dataset", + "--graphTableClassName", Dataset.class.getCanonicalName(), + "--datasourceMasterDuplicate", dsMasterDuplicatePath + }); + + //final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + Assertions.assertTrue(Files.exists(Paths.get(graphOutputPath, "dataset"))); + + final org.apache.spark.sql.Dataset d = spark + .read() + .textFile(outputPath) + .map(as(Dataset.class), Encoders.bean(Dataset.class)); + Assertions.assertEquals(3, d.count()); + + } + + private static MapFunction as(Class clazz) { + return s -> OBJECT_MAPPER.readValue(s, clazz); + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/cfhb/entities/dataset/dataset.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/cfhb/entities/dataset/dataset.json new file mode 100644 index 000000000..bf2f2d963 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/cfhb/entities/dataset/dataset.json @@ -0,0 +1,3 @@ +{"author":[{"affiliation":[],"fullname":"Greenough, B","name":"B","pid":[],"rank":1,"surname":"Greenough"}],"bestaccessright":{"classid":"UNKNOWN","classname":"not available","schemeid":"dnet:access_modes","schemename":"dnet:access_modes"},"collectedfrom":[{"key":"10|re3data_____::4c4416659cb74c2e0e891a883a047cbc","value":"Bacterial Protein Interaction Database - DUP"}],"context":[],"contributor":[],"country":[],"coverage":[],"dataInfo":{"deletedbyinference":true,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:datasetarchive","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"dateofcollection":"2021-09-25T10:55:00.639Z","dateoftransformation":"2021-09-25T11:00:04.201Z","description":[{"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:datasetarchive","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"value":"Heritage Education"}],"externalReference":[],"extraInfo":[],"format":[],"fulltext":[],"geolocation":[],"id":"50|doi_________::09821844208a5cd6300b2bfb13bca1b9","instance":[{"accessright":{"classid":"UNKNOWN","classname":"not available","schemeid":"dnet:access_modes","schemename":"dnet:access_modes"},"pid":[{"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:datasetarchive","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"qualifier":{"classid":"urn","classname":"urn","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"value":"urn:nbn:nl:ui:13-59-cjhf"},{"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:datasetarchive","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"qualifier":{"classid":"doi","classname":"Digital Object Identifier","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"value":"10.17632/96bpgw5j9d.1"}],"collectedfrom":{"key":"10|re3data_____::4c4416659cb74c2e0e891a883a047cbc","value":"Bacterial Protein Interaction Database - DUP"},"hostedby":{"key":"10|openaire____::c6df70599aa984f16ee52b4b86d2e89f","value":"DANS (Data Archiving and Networked Services)"},"instancetype":{"classid":"0021","classname":"Dataset","schemeid":"dnet:publication_resource","schemename":"dnet:publication_resource"},"alternateIdentifier":[],"refereed":{"classid":"0000","classname":"Unknown","schemeid":"dnet:review_levels","schemename":"dnet:review_levels"},"url":["","http://dx.doi.org/10.17632/96bpgw5j9d.1"]}],"language":{"classid":"und","classname":"Undetermined","schemeid":"dnet:languages","schemename":"dnet:languages"},"lastupdatetimestamp":1635434801681,"oaiprovenance":{"originDescription":{"altered":true,"baseURL":"http%3A%2F%2Fservices.nod.dans.knaw.nl%2Foa-cerif","datestamp":"2021-08-16T15:29:45Z","harvestDate":"2021-09-25T10:55:00.639Z","identifier":"oai:services.nod.dans.knaw.nl:Products/dans:oai:easy.dans.knaw.nl:easy-dataset:211323","metadataNamespace":""}},"originalId":["50|DansKnawCris::09821844208a5cd6300b2bfb13bca1b9","oai:services.nod.dans.knaw.nl:Products/dans:oai:easy.dans.knaw.nl:easy-dataset:211323"],"pid":[],"relevantdate":[],"resourcetype":{"classid":"0021","classname":"0021","schemeid":"dnet:dataCite_resource","schemename":"dnet:dataCite_resource"},"resulttype":{"classid":"dataset","classname":"dataset","schemeid":"dnet:result_typologies","schemename":"dnet:result_typologies"},"source":[],"subject":[{"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:datasetarchive","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"qualifier":{"classid":"keyword","classname":"keyword","schemeid":"dnet:subject_classification_typologies","schemename":"dnet:subject_classification_typologies"},"value":"Interdisciplinary sciences"},{"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:datasetarchive","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"qualifier":{"classid":"keyword","classname":"keyword","schemeid":"dnet:subject_classification_typologies","schemename":"dnet:subject_classification_typologies"},"value":"Interdisciplinary sciences"}],"title":[{"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:datasetarchive","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"qualifier":{"classid":"main title","classname":"main title","schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"value":"Heritage Education"}]} +{"author":[{"affiliation":[],"fullname":"Keijers, D.M.G.","name":"D.M.G.","pid":[],"rank":1,"surname":"Keijers"}],"bestaccessright":{"classid":"UNKNOWN","classname":"not available","schemeid":"dnet:access_modes","schemename":"dnet:access_modes"},"collectedfrom":[{"key":"10|opendoar____::788b4ac1e172d8e520c2b9461c0a3d35","value":"FILUR DATA - DUP"}],"context":[],"contributor":[],"country":[],"coverage":[],"dataInfo":{"deletedbyinference":true,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:datasetarchive","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"dateofcollection":"2021-09-25T10:41:59.767Z","dateoftransformation":"2021-09-25T11:00:19.238Z","description":[{"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:datasetarchive","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"value":"onderzoeksrapport"}],"externalReference":[],"extraInfo":[],"format":[],"fulltext":[],"geolocation":[],"id":"50|DansKnawCris::0dd644304b7116e8e58da3a5e3adc37a","instance":[{"accessright":{"classid":"UNKNOWN","classname":"not available","schemeid":"dnet:access_modes","schemename":"dnet:access_modes"},"pid":[{"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:datasetarchive","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"qualifier":{"classid":"urn","classname":"urn","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"value":"urn:nbn:nl:ui:13-das-fkq"},{"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:datasetarchive","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"qualifier":{"classid":"doi","classname":"Digital Object Identifier","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"value":"10.17026/dans-xsw-qtnx"}],"collectedfrom":{"key":"10|opendoar____::788b4ac1e172d8e520c2b9461c0a3d35","value":"FILUR DATA - DUP"},"hostedby":{"key":"10|re3data_____::6ffd7bc058f762912dc494cd9c175341","value":"depositar - DUP"},"instancetype":{"classid":"0021","classname":"Dataset","schemeid":"dnet:publication_resource","schemename":"dnet:publication_resource"},"alternateIdentifier":[],"refereed":{"classid":"0000","classname":"Unknown","schemeid":"dnet:review_levels","schemename":"dnet:review_levels"},"url":["","http://dx.doi.org/10.17026/dans-xsw-qtnx"]}],"language":{"classid":"dut/nld","classname":"Dutch; Flemish","schemeid":"dnet:languages","schemename":"dnet:languages"},"lastupdatetimestamp":1635434847381,"oaiprovenance":{"originDescription":{"altered":true,"baseURL":"http%3A%2F%2Fservices.nod.dans.knaw.nl%2Foa-cerif","datestamp":"2021-08-16T13:53:29Z","harvestDate":"2021-09-25T10:41:59.767Z","identifier":"oai:services.nod.dans.knaw.nl:Products/dans:oai:easy.dans.knaw.nl:easy-dataset:20759","metadataNamespace":""}},"originalId":["oai:services.nod.dans.knaw.nl:Products/dans:oai:easy.dans.knaw.nl:easy-dataset:20759","50|DansKnawCris::0dd644304b7116e8e58da3a5e3adc37a"],"pid":[],"relevantdate":[],"resourcetype":{"classid":"0021","classname":"0021","schemeid":"dnet:dataCite_resource","schemename":"dnet:dataCite_resource"},"resulttype":{"classid":"dataset","classname":"dataset","schemeid":"dnet:result_typologies","schemename":"dnet:result_typologies"},"source":[],"subject":[{"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:datasetarchive","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"qualifier":{"classid":"keyword","classname":"keyword","schemeid":"dnet:subject_classification_typologies","schemename":"dnet:subject_classification_typologies"},"value":"PROSPECTIE"},{"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:datasetarchive","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"qualifier":{"classid":"keyword","classname":"keyword","schemeid":"dnet:subject_classification_typologies","schemename":"dnet:subject_classification_typologies"},"value":"Archaeology"}],"title":[{"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:datasetarchive","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"qualifier":{"classid":"main title","classname":"main title","schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"value":"Plangebied Lange Ekker te Vessem, gemeente Eersel"}]} +{"author":[],"bestaccessright":{"classid":"UNKNOWN","classname":"not available","schemeid":"dnet:access_modes","schemename":"dnet:access_modes"},"collectedfrom":[{"key":"10|openaire____::c6df70599aa984f16ee52b4b86d2e89f","value":"DANS (Data Archiving and Networked Services)"}],"context":[],"contributor":[],"country":[],"coverage":[],"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:datasetarchive","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"dateofcollection":"2021-09-25T10:43:13.768Z","dateoftransformation":"2021-09-25T11:01:22.863Z","description":[{"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:datasetarchive","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"value":"This find is registered at Portable Antiquities of the Netherlands with number PAN-00054604"}],"externalReference":[],"extraInfo":[],"format":[],"fulltext":[],"geolocation":[],"id":"50|DansKnawCris::203a27996ddc0fd1948258e5b7dec61c","instance":[{"accessright":{"classid":"UNKNOWN","classname":"not available","schemeid":"dnet:access_modes","schemename":"dnet:access_modes"},"pid":[{"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:datasetarchive","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"qualifier":{"classid":"urn","classname":"urn","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"value":"urn:nbn:nl:ui:13-a7-hwgy"},{"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:datasetarchive","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"qualifier":{"classid":"doi","classname":"Digital Object Identifier","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"value":"10.17026/dans-x3z-fsq5"}],"collectedfrom":{"key":"10|openaire____::c6df70599aa984f16ee52b4b86d2e89f","value":"DANS (Data Archiving and Networked Services)"},"hostedby":{"key":"10|openaire____::c6df70599aa984f16ee52b4b86d2e89f","value":"DANS (Data Archiving and Networked Services)"},"instancetype":{"classid":"0021","classname":"Dataset","schemeid":"dnet:publication_resource","schemename":"dnet:publication_resource"},"alternateIdentifier":[],"refereed":{"classid":"0000","classname":"Unknown","schemeid":"dnet:review_levels","schemename":"dnet:review_levels"},"url":["","http://dx.doi.org/10.17026/dans-x3z-fsq5"]}],"language":{"classid":"eng","classname":"English","schemeid":"dnet:languages","schemename":"dnet:languages"},"lastupdatetimestamp":1635434508886,"oaiprovenance":{"originDescription":{"altered":true,"baseURL":"http%3A%2F%2Fservices.nod.dans.knaw.nl%2Foa-cerif","datestamp":"2021-08-16T14:01:37Z","harvestDate":"2021-09-25T10:43:13.768Z","identifier":"oai:services.nod.dans.knaw.nl:Products/dans:oai:easy.dans.knaw.nl:easy-dataset:129566","metadataNamespace":""}},"originalId":["oai:services.nod.dans.knaw.nl:Products/dans:oai:easy.dans.knaw.nl:easy-dataset:129566","50|DansKnawCris::203a27996ddc0fd1948258e5b7dec61c"],"pid":[],"relevantdate":[],"resourcetype":{"classid":"0021","classname":"0021","schemeid":"dnet:dataCite_resource","schemename":"dnet:dataCite_resource"},"resulttype":{"classid":"dataset","classname":"dataset","schemeid":"dnet:result_typologies","schemename":"dnet:result_typologies"},"source":[],"subject":[{"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:datasetarchive","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"qualifier":{"classid":"keyword","classname":"keyword","schemeid":"dnet:subject_classification_typologies","schemename":"dnet:subject_classification_typologies"},"value":"early medieval enamelled disc brooch variant A9"},{"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:datasetarchive","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"qualifier":{"classid":"keyword","classname":"keyword","schemeid":"dnet:subject_classification_typologies","schemename":"dnet:subject_classification_typologies"},"value":"Broader Match: disc brooches"},{"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:datasetarchive","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"qualifier":{"classid":"keyword","classname":"keyword","schemeid":"dnet:subject_classification_typologies","schemename":"dnet:subject_classification_typologies"},"value":"Broader Match: schijffibula - geemailleerd"},{"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:datasetarchive","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"qualifier":{"classid":"keyword","classname":"keyword","schemeid":"dnet:subject_classification_typologies","schemename":"dnet:subject_classification_typologies"},"value":"metal"},{"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:datasetarchive","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"qualifier":{"classid":"keyword","classname":"keyword","schemeid":"dnet:subject_classification_typologies","schemename":"dnet:subject_classification_typologies"},"value":"copper alloy"},{"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:datasetarchive","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"qualifier":{"classid":"keyword","classname":"keyword","schemeid":"dnet:subject_classification_typologies","schemename":"dnet:subject_classification_typologies"},"value":"Temporal coverage: Early Middle Ages C"},{"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:datasetarchive","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"qualifier":{"classid":"keyword","classname":"keyword","schemeid":"dnet:subject_classification_typologies","schemename":"dnet:subject_classification_typologies"},"value":"Temporal coverage: Early Middle Ages D"},{"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:datasetarchive","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"qualifier":{"classid":"keyword","classname":"keyword","schemeid":"dnet:subject_classification_typologies","schemename":"dnet:subject_classification_typologies"},"value":"Temporal coverage: 800 until 1000"},{"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:datasetarchive","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"qualifier":{"classid":"keyword","classname":"keyword","schemeid":"dnet:subject_classification_typologies","schemename":"dnet:subject_classification_typologies"},"value":"Archaeology"}],"title":[{"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:datasetarchive","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"qualifier":{"classid":"main title","classname":"main title","schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"value":"PAN-00054604 - early medieval enamelled disc brooch variant A9"}]} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/cfhb/masterduplicate.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/cfhb/masterduplicate.json new file mode 100644 index 000000000..b63cfe6b3 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/cfhb/masterduplicate.json @@ -0,0 +1,4 @@ +{ "duplicateId" : "10|re3data_____::4c4416659cb74c2e0e891a883a047cbc", "masterId" : "10|fairsharing_::a29d1598024f9e87beab4b98411d48ce", "masterName" : "Bacterial Protein Interaction Database" } +{ "duplicateId" : "10|opendoar____::788b4ac1e172d8e520c2b9461c0a3d35", "masterId" : "10|re3data_____::fc1db64b3964826913b1e9eafe830490", "masterName" : "FULIR Data" } +{ "duplicateId" : "10|re3data_____::6ffd7bc058f762912dc494cd9c175341", "masterId" : "10|fairsharing_::3f647cadf56541fb9513cb63ec370187", "masterName" : "depositar" } +{ "duplicateId" : "10|scindeksserb::07022f78a8cc6d1171092454ecdbb47c", "masterId" : "10|doajarticles::07022f78a8cc6d1171092454ecdbb47c", "masterName" : "Artefact" } \ No newline at end of file From 8e3edba318b7573f24bcf9300a2c115bf2074e34 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Tue, 29 Nov 2022 16:07:09 +0100 Subject: [PATCH 4/4] [graph cleaning] testing the collectedfron and hostedby patch procedure --- .../graph/clean/cfhb/CleanCfHbSparkJob.java | 63 +++-- .../clean/cfhb/CleanCfHbSparkJobTest.java | 253 ++++++++++++------ 2 files changed, 208 insertions(+), 108 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java index b4678cc6c..122e27dec 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java @@ -25,11 +25,13 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.action.model.MasterDuplicate; import eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob; import eu.dnetlib.dhp.schema.oaf.Instance; import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.utils.DHPUtils; import scala.Tuple2; public class CleanCfHbSparkJob { @@ -76,6 +78,8 @@ public class CleanCfHbSparkJob { conf, isSparkSessionManaged, spark -> { + HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); + HdfsSupport.remove(resolvedPath, spark.sparkContext().hadoopConfiguration()); cleanCfHb( spark, inputPath, entityClazz, resolvedPath, dsMasterDuplicatePath, outputPath); }); @@ -92,33 +96,15 @@ public class CleanCfHbSparkJob { // prepare the resolved CF|HB references with the corresponding EMPTY master ID Dataset resolved = spark - .read() - .textFile(inputPath) - .map(as(entityClazz), Encoders.bean(entityClazz)) - .flatMap( - (FlatMapFunction) r -> { - final List list = Stream - .concat( - r.getCollectedfrom().stream().map(KeyValue::getKey), - Stream - .concat( - r.getInstance().stream().map(Instance::getHostedby).map(KeyValue::getKey), - r.getInstance().stream().map(Instance::getCollectedfrom).map(KeyValue::getKey))) - .distinct() - .map(s -> asIdCfHbMapping(r.getId(), s)) - .collect(Collectors.toList()); - return list.iterator(); - }, - Encoders.bean(IdCfHbMapping.class)); + .read() + .textFile(inputPath) + .map(as(entityClazz), Encoders.bean(entityClazz)) + .flatMap(flattenCfHbFn(), Encoders.bean(IdCfHbMapping.class)); // set the EMPTY master ID/NAME and save it resolved .joinWith(md, resolved.col("cfhb").equalTo(md.col("duplicateId"))) - .map((MapFunction, IdCfHbMapping>) t -> { - t._1().setMasterId(t._2().getMasterId()); - t._1().setMasterName(t._2().getMasterName()); - return t._1(); - }, Encoders.bean(IdCfHbMapping.class)) + .map(asIdCfHbMapping(), Encoders.bean(IdCfHbMapping.class)) .write() .mode(SaveMode.Overwrite) .json(resolvedPath); @@ -131,27 +117,46 @@ public class CleanCfHbSparkJob { // read the result table Dataset res = spark - .read() - .textFile(inputPath) - .map(as(entityClazz), Encoders.bean(entityClazz)); + .read() + .textFile(inputPath) + .map(as(entityClazz), Encoders.bean(entityClazz)); // Join the results with the resolved CF|HB mapping, apply the mapping and save it res .joinWith(resolvedDS, res.col("id").equalTo(resolvedDS.col("resultId")), "left") .groupByKey((MapFunction, String>) t -> t._1().getId(), Encoders.STRING()) .mapGroups(getMapGroupsFunction(), Encoders.bean(entityClazz)) - //.agg(new IdCfHbMappingAggregator(entityClazz).toColumn()) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(outputPath); } - @NotNull + private static MapFunction, IdCfHbMapping> asIdCfHbMapping() { + return t -> { + t._1().setMasterId(t._2().getMasterId()); + t._1().setMasterName(t._2().getMasterName()); + return t._1(); + }; + } + + private static FlatMapFunction flattenCfHbFn() { + return r -> Stream + .concat( + r.getCollectedfrom().stream().map(KeyValue::getKey), + Stream + .concat( + r.getInstance().stream().map(Instance::getHostedby).map(KeyValue::getKey), + r.getInstance().stream().map(Instance::getCollectedfrom).map(KeyValue::getKey))) + .distinct() + .map(s -> asIdCfHbMapping(r.getId(), s)) + .iterator(); + } + private static MapGroupsFunction, T> getMapGroupsFunction() { return new MapGroupsFunction, T>() { @Override - public T call(String key, Iterator> values) throws Exception { + public T call(String key, Iterator> values) { final Tuple2 first = values.next(); final T res = first._1(); diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJobTest.java index 680d1ff64..b0097ed6f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJobTest.java @@ -1,8 +1,16 @@ + package eu.dnetlib.dhp.oa.graph.clean.cfhb; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.schema.oaf.Dataset; -import eu.dnetlib.dhp.schema.oaf.Publication; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + import org.apache.commons.io.FileUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; @@ -13,106 +21,193 @@ import org.junit.jupiter.api.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.net.URISyntaxException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.schema.oaf.Dataset; +import eu.dnetlib.dhp.schema.oaf.Publication; public class CleanCfHbSparkJobTest { - private static final Logger log = LoggerFactory.getLogger(CleanCfHbSparkJobTest.class); + private static final Logger log = LoggerFactory.getLogger(CleanCfHbSparkJobTest.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static SparkSession spark; + private static SparkSession spark; - private static Path testBaseTmpPath; + private static Path testBaseTmpPath; - private static String resolvedPath; + private static String resolvedPath; - private static String graphInputPath; + private static String graphInputPath; - private static String graphOutputPath; + private static String graphOutputPath; - private static String dsMasterDuplicatePath; + private static String dsMasterDuplicatePath; - @BeforeAll - public static void beforeAll() throws IOException, URISyntaxException { + @BeforeAll + public static void beforeAll() throws IOException, URISyntaxException { - testBaseTmpPath = Files.createTempDirectory(CleanCfHbSparkJobTest.class.getSimpleName()); - log.info("using test base path {}", testBaseTmpPath); + testBaseTmpPath = Files.createTempDirectory(CleanCfHbSparkJobTest.class.getSimpleName()); + log.info("using test base path {}", testBaseTmpPath); - final File entitiesSources = Paths - .get(CleanCfHbSparkJobTest.class.getResource("/eu/dnetlib/dhp/oa/graph/clean/cfhb/entities").toURI()) - .toFile(); + final File entitiesSources = Paths + .get(CleanCfHbSparkJobTest.class.getResource("/eu/dnetlib/dhp/oa/graph/clean/cfhb/entities").toURI()) + .toFile(); - FileUtils - .copyDirectory( - entitiesSources, - testBaseTmpPath.resolve("input").resolve("entities").toFile()); + FileUtils + .copyDirectory( + entitiesSources, + testBaseTmpPath.resolve("input").resolve("entities").toFile()); - FileUtils - .copyFileToDirectory( - Paths - .get(CleanCfHbSparkJobTest.class.getResource("/eu/dnetlib/dhp/oa/graph/clean/cfhb/masterduplicate.json").toURI()) - .toFile(), - testBaseTmpPath.resolve("workingDir").resolve("masterduplicate").toFile()); + FileUtils + .copyFileToDirectory( + Paths + .get( + CleanCfHbSparkJobTest.class + .getResource("/eu/dnetlib/dhp/oa/graph/clean/cfhb/masterduplicate.json") + .toURI()) + .toFile(), + testBaseTmpPath.resolve("workingDir").resolve("masterduplicate").toFile()); + graphInputPath = testBaseTmpPath.resolve("input").resolve("entities").toString(); + resolvedPath = testBaseTmpPath.resolve("workingDir").resolve("cfHbResolved").toString(); + graphOutputPath = testBaseTmpPath.resolve("workingDir").resolve("cfHbPatched").toString(); + dsMasterDuplicatePath = testBaseTmpPath.resolve("workingDir").resolve("masterduplicate").toString(); - graphInputPath = testBaseTmpPath.resolve("input").resolve("entities").toString(); - resolvedPath = testBaseTmpPath.resolve("workingDir").resolve("cfHbResolved").toString(); - graphOutputPath = testBaseTmpPath.resolve("workingDir").resolve("cfHbPatched").toString(); - dsMasterDuplicatePath = testBaseTmpPath.resolve("workingDir").resolve("masterduplicate").toString(); + SparkConf conf = new SparkConf(); + conf.setAppName(CleanCfHbSparkJobTest.class.getSimpleName()); - SparkConf conf = new SparkConf(); - conf.setAppName(CleanCfHbSparkJobTest.class.getSimpleName()); + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("spark.ui.enabled", "false"); - conf.setMaster("local[*]"); - conf.set("spark.driver.host", "localhost"); - conf.set("spark.ui.enabled", "false"); + spark = SparkSession + .builder() + .appName(CleanCfHbSparkJobTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } - spark = SparkSession - .builder() - .appName(CleanCfHbSparkJobTest.class.getSimpleName()) - .config(conf) - .getOrCreate(); - } + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(testBaseTmpPath.toFile()); + spark.stop(); + } - @AfterAll - public static void afterAll() throws IOException { - FileUtils.deleteDirectory(testBaseTmpPath.toFile()); - spark.stop(); - } + @Test + void testCleanCfHbSparkJob() throws Exception { + final String outputPath = graphOutputPath + "/dataset"; + final String inputPath = graphInputPath + "/dataset"; - @Test - void testCleanCfHbSparkJob() throws Exception { - final String outputPath = graphOutputPath + "/dataset"; - CleanCfHbSparkJob - .main( - new String[] { - "--isSparkSessionManaged", Boolean.FALSE.toString(), - "--inputPath", graphInputPath + "/dataset", - "--outputPath", outputPath, - "--resolvedPath", resolvedPath + "/dataset", - "--graphTableClassName", Dataset.class.getCanonicalName(), - "--datasourceMasterDuplicate", dsMasterDuplicatePath - }); + org.apache.spark.sql.Dataset records = read(spark, inputPath, Dataset.class); + Dataset d = records + .filter("id = '50|doi_________::09821844208a5cd6300b2bfb13bca1b9'") + .first(); + assertEquals("10|re3data_____::4c4416659cb74c2e0e891a883a047cbc", d.getCollectedfrom().get(0).getKey()); + assertEquals("Bacterial Protein Interaction Database - DUP", d.getCollectedfrom().get(0).getValue()); + assertEquals( + "10|re3data_____::4c4416659cb74c2e0e891a883a047cbc", d.getInstance().get(0).getCollectedfrom().getKey()); + assertEquals( + "Bacterial Protein Interaction Database - DUP", d.getInstance().get(0).getCollectedfrom().getValue()); - //final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + d = records + .filter("id = '50|DansKnawCris::0dd644304b7116e8e58da3a5e3adc37a'") + .first(); + assertEquals("10|opendoar____::788b4ac1e172d8e520c2b9461c0a3d35", d.getCollectedfrom().get(0).getKey()); + assertEquals("FILUR DATA - DUP", d.getCollectedfrom().get(0).getValue()); + assertEquals( + "10|opendoar____::788b4ac1e172d8e520c2b9461c0a3d35", d.getInstance().get(0).getCollectedfrom().getKey()); + assertEquals("FILUR DATA - DUP", d.getInstance().get(0).getCollectedfrom().getValue()); + assertEquals( + "10|re3data_____::6ffd7bc058f762912dc494cd9c175341", d.getInstance().get(0).getHostedby().getKey()); + assertEquals("depositar - DUP", d.getInstance().get(0).getHostedby().getValue()); - Assertions.assertTrue(Files.exists(Paths.get(graphOutputPath, "dataset"))); + d = records + .filter("id = '50|DansKnawCris::203a27996ddc0fd1948258e5b7dec61c'") + .first(); + assertEquals("10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getCollectedfrom().get(0).getKey()); + assertEquals("DANS (Data Archiving and Networked Services)", d.getCollectedfrom().get(0).getValue()); + assertEquals( + "10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getCollectedfrom().getKey()); + assertEquals( + "DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getCollectedfrom().getValue()); + assertEquals( + "10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getHostedby().getKey()); + assertEquals("DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getHostedby().getValue()); - final org.apache.spark.sql.Dataset d = spark - .read() - .textFile(outputPath) - .map(as(Dataset.class), Encoders.bean(Dataset.class)); - Assertions.assertEquals(3, d.count()); - - } + CleanCfHbSparkJob + .main( + new String[] { + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--inputPath", inputPath, + "--outputPath", outputPath, + "--resolvedPath", resolvedPath + "/dataset", + "--graphTableClassName", Dataset.class.getCanonicalName(), + "--datasourceMasterDuplicate", dsMasterDuplicatePath + }); - private static MapFunction as(Class clazz) { - return s -> OBJECT_MAPPER.readValue(s, clazz); - } + assertTrue(Files.exists(Paths.get(graphOutputPath, "dataset"))); + + records = read(spark, outputPath, Dataset.class); + + assertEquals(3, records.count()); + + d = records + .filter("id = '50|doi_________::09821844208a5cd6300b2bfb13bca1b9'") + .first(); + assertEquals("10|fairsharing_::a29d1598024f9e87beab4b98411d48ce", d.getCollectedfrom().get(0).getKey()); + assertEquals("Bacterial Protein Interaction Database", d.getCollectedfrom().get(0).getValue()); + assertEquals( + "10|fairsharing_::a29d1598024f9e87beab4b98411d48ce", d.getInstance().get(0).getCollectedfrom().getKey()); + assertEquals("Bacterial Protein Interaction Database", d.getInstance().get(0).getCollectedfrom().getValue()); + + d = records + .filter("id = '50|DansKnawCris::0dd644304b7116e8e58da3a5e3adc37a'") + .first(); + assertEquals("10|re3data_____::fc1db64b3964826913b1e9eafe830490", d.getCollectedfrom().get(0).getKey()); + assertEquals("FULIR Data", d.getCollectedfrom().get(0).getValue()); + assertEquals( + "10|re3data_____::fc1db64b3964826913b1e9eafe830490", d.getInstance().get(0).getCollectedfrom().getKey()); + assertEquals("FULIR Data", d.getInstance().get(0).getCollectedfrom().getValue()); + assertEquals( + "10|fairsharing_::3f647cadf56541fb9513cb63ec370187", d.getInstance().get(0).getHostedby().getKey()); + assertEquals("depositar", d.getInstance().get(0).getHostedby().getValue()); + + d = records + .filter("id = '50|DansKnawCris::203a27996ddc0fd1948258e5b7dec61c'") + .first(); + assertEquals("10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getCollectedfrom().get(0).getKey()); + assertEquals("DANS (Data Archiving and Networked Services)", d.getCollectedfrom().get(0).getValue()); + assertEquals( + "10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getCollectedfrom().getKey()); + assertEquals( + "DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getCollectedfrom().getValue()); + assertEquals( + "10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getHostedby().getKey()); + assertEquals("DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getHostedby().getValue()); + + d = records + .filter("id = '50|DansKnawCris::203a27996ddc0fd1948258e5b7dec61c'") + .first(); + assertEquals("10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getCollectedfrom().get(0).getKey()); + assertEquals("DANS (Data Archiving and Networked Services)", d.getCollectedfrom().get(0).getValue()); + assertEquals( + "10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getCollectedfrom().getKey()); + assertEquals( + "DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getCollectedfrom().getValue()); + assertEquals( + "10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getHostedby().getKey()); + assertEquals("DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getHostedby().getValue()); + } + + private org.apache.spark.sql.Dataset read(SparkSession spark, String path, Class clazz) { + return spark + .read() + .textFile(path) + .map(as(clazz), Encoders.bean(clazz)); + } + + private static MapFunction as(Class clazz) { + return s -> OBJECT_MAPPER.readValue(s, clazz); + } }