From 11695ba649f778646ea7811f8888993532230169 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 28 Nov 2022 10:18:43 +0100 Subject: [PATCH] [graph cleaning] patch also the result's collectedfrom and hostedby datasource name according to the datasource master-duplicate mapping --- .../ReadDatasourceMasterDuplicateFromDB.java | 19 +++++++----- .../common/action/model/MasterDuplicate.java | 29 ++++++++++++------- .../graph/clean/cfhb/CleanCfHbSparkJob.java | 16 +++++----- .../oa/graph/clean/cfhb/IdCfHbMapping.java | 20 +++++++++---- 4 files changed, 55 insertions(+), 29 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/ReadDatasourceMasterDuplicateFromDB.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/ReadDatasourceMasterDuplicateFromDB.java index d9e8ced85..5d39216f1 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/ReadDatasourceMasterDuplicateFromDB.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/ReadDatasourceMasterDuplicateFromDB.java @@ -8,7 +8,6 @@ import java.nio.charset.StandardCharsets; import java.sql.ResultSet; import java.sql.SQLException; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -28,7 +27,9 @@ public class ReadDatasourceMasterDuplicateFromDB { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final String QUERY = "SELECT id as master, duplicate FROM dsm_dedup_services;"; + private static final String QUERY = "SELECT distinct dd.id as masterId, d.officialname as masterName, dd.duplicate as duplicateId " + + + "FROM dsm_dedup_services dd join dsm_services d on (dd.id = d.id);"; public static int execute(String dbUrl, String dbUser, String dbPassword, String hdfsPath, String hdfsNameNode) throws IOException { @@ -52,11 +53,15 @@ public class ReadDatasourceMasterDuplicateFromDB { private static MasterDuplicate datasourceMasterMap(ResultSet rs) { try { - MasterDuplicate md = new MasterDuplicate(); - final String master = rs.getString("master"); - final String duplicate = rs.getString("duplicate"); - md.setMaster(OafMapperUtils.createOpenaireId(10, master, true)); - md.setDuplicate(OafMapperUtils.createOpenaireId(10, duplicate, true)); + final MasterDuplicate md = new MasterDuplicate(); + + final String duplicateId = rs.getString("duplicateId"); + final String masterId = rs.getString("masterId"); + final String masterName = rs.getString("masterName"); + + md.setDuplicateId(OafMapperUtils.createOpenaireId(10, duplicateId, true)); + md.setMasterId(OafMapperUtils.createOpenaireId(10, masterId, true)); + md.setMasterName(masterName); return md; } catch (final SQLException e) { diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/model/MasterDuplicate.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/model/MasterDuplicate.java index b3e0d2aaa..12a4407c4 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/model/MasterDuplicate.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/model/MasterDuplicate.java @@ -8,22 +8,31 @@ import java.io.Serializable; * @Date 21/07/22 */ public class MasterDuplicate implements Serializable { - private String duplicate; - private String master; + private String duplicateId; + private String masterId; + private String masterName; - public String getDuplicate() { - return duplicate; + public String getDuplicateId() { + return duplicateId; } - public void setDuplicate(String duplicate) { - this.duplicate = duplicate; + public void setDuplicateId(String duplicateId) { + this.duplicateId = duplicateId; } - public String getMaster() { - return master; + public String getMasterId() { + return masterId; } - public void setMaster(String master) { - this.master = master; + public void setMasterId(String masterId) { + this.masterId = masterId; + } + + public String getMasterName() { + return masterName; + } + + public void setMasterName(String masterName) { + this.masterName = masterName; } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java index ad7e252f6..d35dbc7c1 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java @@ -78,6 +78,7 @@ public class CleanCfHbSparkJob { private static void cleanCfHb(SparkSession spark, String inputPath, Class entityClazz, String workingPath, String masterDuplicatePath, String outputPath) { + // read the master-duplicate tuples Dataset md = spark .read() @@ -111,7 +112,7 @@ public class CleanCfHbSparkJob { resolved .joinWith(md, resolved.col("cfhb").equalTo(md.col("duplicate"))) .map((MapFunction, IdCfHbMapping>) t -> { - t._1().setMaster(t._2().getMaster()); + t._1().setMasterId(t._2().getMasterId()); return t._1(); }, Encoders.bean(IdCfHbMapping.class)) .write() @@ -154,13 +155,13 @@ public class CleanCfHbSparkJob { @Override public T reduce(T r, IdCfHbMapping a) { - if (Objects.isNull(a) && StringUtils.isBlank(a.getMaster())) { + if (Objects.isNull(a) && StringUtils.isBlank(a.getMasterId())) { return r; } - r.getCollectedfrom().forEach(kv -> updateKey(kv, a)); + r.getCollectedfrom().forEach(kv -> updateKeyValue(kv, a)); r.getInstance().forEach(i -> { - updateKey(i.getHostedby(), a); - updateKey(i.getCollectedfrom(), a); + updateKeyValue(i.getHostedby(), a); + updateKeyValue(i.getCollectedfrom(), a); }); return r; } @@ -178,9 +179,10 @@ public class CleanCfHbSparkJob { return r; } - private void updateKey(final KeyValue kv, final IdCfHbMapping a) { + private void updateKeyValue(final KeyValue kv, final IdCfHbMapping a) { if (kv.getKey().equals(a.getCfhb())) { - kv.setKey(a.getMaster()); + kv.setKey(a.getMasterId()); + kv.setValue(a.getMasterName()); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/IdCfHbMapping.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/IdCfHbMapping.java index 16d1a2613..cb4e1b5e6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/IdCfHbMapping.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/IdCfHbMapping.java @@ -9,7 +9,9 @@ public class IdCfHbMapping implements Serializable { private String cfhb; - private String master; + private String masterId; + + private String masterName; public IdCfHbMapping() { } @@ -34,11 +36,19 @@ public class IdCfHbMapping implements Serializable { this.cfhb = cfhb; } - public String getMaster() { - return master; + public String getMasterId() { + return masterId; } - public void setMaster(String master) { - this.master = master; + public void setMasterId(String masterId) { + this.masterId = masterId; + } + + public String getMasterName() { + return masterName; + } + + public void setMasterName(String masterName) { + this.masterName = masterName; } }