From 32870339f53d9ba1cc44f36720355e198081c8e3 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 13 Feb 2023 13:06:48 +0100 Subject: [PATCH] refactoring after compile --- .../ReadDatasourceMasterDuplicateFromDB.java | 88 ++--- .../GetFOSSparkJob.java | 1 - .../createunresolvedentities/GetFosTest.java | 81 ++--- .../oa/graph/clean/MasterDuplicateAction.java | 44 +-- .../graph/clean/cfhb/CleanCfHbSparkJob.java | 318 +++++++++--------- .../oa/graph/clean/cfhb/IdCfHbMapping.java | 66 ++-- 6 files changed, 300 insertions(+), 298 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/ReadDatasourceMasterDuplicateFromDB.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/ReadDatasourceMasterDuplicateFromDB.java index 33c8d572b6..5d39216f19 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/ReadDatasourceMasterDuplicateFromDB.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/ReadDatasourceMasterDuplicateFromDB.java @@ -23,59 +23,59 @@ import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; public class ReadDatasourceMasterDuplicateFromDB { - private static final Logger log = LoggerFactory.getLogger(ReadDatasourceMasterDuplicateFromDB.class); + private static final Logger log = LoggerFactory.getLogger(ReadDatasourceMasterDuplicateFromDB.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final String QUERY = "SELECT distinct dd.id as masterId, d.officialname as masterName, dd.duplicate as duplicateId " - + - "FROM dsm_dedup_services dd join dsm_services d on (dd.id = d.id);"; + private static final String QUERY = "SELECT distinct dd.id as masterId, d.officialname as masterName, dd.duplicate as duplicateId " + + + "FROM dsm_dedup_services dd join dsm_services d on (dd.id = d.id);"; - public static int execute(String dbUrl, String dbUser, String dbPassword, String hdfsPath, String hdfsNameNode) - throws IOException { - int count = 0; - try (DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)) { - Configuration conf = new Configuration(); - conf.set("fs.defaultFS", hdfsNameNode); - FileSystem fileSystem = FileSystem.get(conf); - FSDataOutputStream fos = fileSystem.create(new Path(hdfsPath)); + public static int execute(String dbUrl, String dbUser, String dbPassword, String hdfsPath, String hdfsNameNode) + throws IOException { + int count = 0; + try (DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)) { + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", hdfsNameNode); + FileSystem fileSystem = FileSystem.get(conf); + FSDataOutputStream fos = fileSystem.create(new Path(hdfsPath)); - log.info("running query: {}", QUERY); - log.info("storing results in: {}", hdfsPath); + log.info("running query: {}", QUERY); + log.info("storing results in: {}", hdfsPath); - try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8))) { - dbClient.processResults(QUERY, rs -> writeMap(datasourceMasterMap(rs), writer)); - count++; - } - } - return count; - } + try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8))) { + dbClient.processResults(QUERY, rs -> writeMap(datasourceMasterMap(rs), writer)); + count++; + } + } + return count; + } - private static MasterDuplicate datasourceMasterMap(ResultSet rs) { - try { - final MasterDuplicate md = new MasterDuplicate(); + private static MasterDuplicate datasourceMasterMap(ResultSet rs) { + try { + final MasterDuplicate md = new MasterDuplicate(); - final String duplicateId = rs.getString("duplicateId"); - final String masterId = rs.getString("masterId"); - final String masterName = rs.getString("masterName"); + final String duplicateId = rs.getString("duplicateId"); + final String masterId = rs.getString("masterId"); + final String masterName = rs.getString("masterName"); - md.setDuplicateId(OafMapperUtils.createOpenaireId(10, duplicateId, true)); - md.setMasterId(OafMapperUtils.createOpenaireId(10, masterId, true)); - md.setMasterName(masterName); + md.setDuplicateId(OafMapperUtils.createOpenaireId(10, duplicateId, true)); + md.setMasterId(OafMapperUtils.createOpenaireId(10, masterId, true)); + md.setMasterName(masterName); - return md; - } catch (final SQLException e) { - throw new RuntimeException(e); - } - } + return md; + } catch (final SQLException e) { + throw new RuntimeException(e); + } + } - private static void writeMap(final MasterDuplicate dm, final BufferedWriter writer) { - try { - writer.write(OBJECT_MAPPER.writeValueAsString(dm)); - writer.newLine(); - } catch (final IOException e) { - throw new RuntimeException(e); - } - } + private static void writeMap(final MasterDuplicate dm, final BufferedWriter writer) { + try { + writer.write(OBJECT_MAPPER.writeValueAsString(dm)); + writer.newLine(); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetFOSSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetFOSSparkJob.java index c98f1b05a6..0cc2f93df4 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetFOSSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetFOSSparkJob.java @@ -9,7 +9,6 @@ import java.io.Serializable; import java.util.Optional; import org.apache.commons.io.IOUtils; - import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetFosTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetFosTest.java index 9a74ae07e3..7e0acc2bbb 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetFosTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetFosTest.java @@ -1,7 +1,10 @@ + package eu.dnetlib.dhp.actionmanager.createunresolvedentities; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -17,9 +20,9 @@ import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel; /** * @author miriam.baglioni @@ -27,48 +30,48 @@ import java.nio.file.Path; */ public class GetFosTest { - private static final Logger log = LoggerFactory.getLogger(ProduceTest.class); + private static final Logger log = LoggerFactory.getLogger(ProduceTest.class); - private static Path workingDir; - private static SparkSession spark; - private static LocalFileSystem fs; - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static Path workingDir; + private static SparkSession spark; + private static LocalFileSystem fs; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - @BeforeAll - public static void beforeAll() throws IOException { - workingDir = Files.createTempDirectory(PrepareTest.class.getSimpleName()); + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(PrepareTest.class.getSimpleName()); - fs = FileSystem.getLocal(new Configuration()); - log.info("using work dir {}", workingDir); + fs = FileSystem.getLocal(new Configuration()); + log.info("using work dir {}", workingDir); - SparkConf conf = new SparkConf(); - conf.setAppName(ProduceTest.class.getSimpleName()); + SparkConf conf = new SparkConf(); + conf.setAppName(ProduceTest.class.getSimpleName()); - conf.setMaster("local[*]"); - conf.set("spark.driver.host", "localhost"); - conf.set("hive.metastore.local", "true"); - conf.set("spark.ui.enabled", "false"); - conf.set("spark.sql.warehouse.dir", workingDir.toString()); - conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); - spark = SparkSession - .builder() - .appName(PrepareTest.class.getSimpleName()) - .config(conf) - .getOrCreate(); - } + spark = SparkSession + .builder() + .appName(PrepareTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } - @AfterAll - public static void afterAll() throws IOException { - FileUtils.deleteDirectory(workingDir.toFile()); - spark.stop(); - } - @Test + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test void test3() throws Exception { - final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/fos/fos_sbs.tsv") - .getPath(); - + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/fos/fos_sbs.tsv") + .getPath(); final String outputPath = workingDir.toString() + "/fos.json"; GetFOSSparkJob diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/MasterDuplicateAction.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/MasterDuplicateAction.java index 0fb9adf651..8bf36ff824 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/MasterDuplicateAction.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/MasterDuplicateAction.java @@ -10,36 +10,36 @@ import eu.dnetlib.dhp.common.action.ReadDatasourceMasterDuplicateFromDB; public class MasterDuplicateAction { - private static final Logger log = LoggerFactory.getLogger(MasterDuplicateAction.class); + private static final Logger log = LoggerFactory.getLogger(MasterDuplicateAction.class); - public static void main(final String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - MasterDuplicateAction.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/datasourcemaster_parameters.json"))); + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + MasterDuplicateAction.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/datasourcemaster_parameters.json"))); - parser.parseArgument(args); + parser.parseArgument(args); - final String dbUrl = parser.get("postgresUrl"); - log.info("postgresUrl: {}", dbUrl); + final String dbUrl = parser.get("postgresUrl"); + log.info("postgresUrl: {}", dbUrl); - final String dbUser = parser.get("postgresUser"); - log.info("postgresUser: {}", dbUser); + final String dbUser = parser.get("postgresUser"); + log.info("postgresUser: {}", dbUser); - final String dbPassword = parser.get("postgresPassword"); - log.info("postgresPassword: {}", dbPassword); + final String dbPassword = parser.get("postgresPassword"); + log.info("postgresPassword: {}", dbPassword); - final String hdfsPath = parser.get("hdfsPath"); - log.info("hdfsPath: {}", hdfsPath); + final String hdfsPath = parser.get("hdfsPath"); + log.info("hdfsPath: {}", hdfsPath); - final String hdfsNameNode = parser.get("hdfsNameNode"); - log.info("hdfsNameNode: {}", hdfsNameNode); + final String hdfsNameNode = parser.get("hdfsNameNode"); + log.info("hdfsNameNode: {}", hdfsNameNode); - int rows = ReadDatasourceMasterDuplicateFromDB.execute(dbUrl, dbUser, dbPassword, hdfsPath, hdfsNameNode); + int rows = ReadDatasourceMasterDuplicateFromDB.execute(dbUrl, dbUser, dbPassword, hdfsPath, hdfsNameNode); - log.info("written {} rows", rows); - } + log.info("written {} rows", rows); + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java index 3d415f21d7..531b415ed6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java @@ -37,191 +37,191 @@ import scala.Tuple2; public class CleanCfHbSparkJob { - private static final Logger log = LoggerFactory.getLogger(CleanCfHbSparkJob.class); + private static final Logger log = LoggerFactory.getLogger(CleanCfHbSparkJob.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void main(String[] args) throws Exception { + public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - CleanCountrySparkJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/input_clean_cfhb_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); + String jsonConfiguration = IOUtils + .toString( + CleanCountrySparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/input_clean_cfhb_parameters.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - String inputPath = parser.get("inputPath"); - log.info("inputPath: {}", inputPath); + String inputPath = parser.get("inputPath"); + log.info("inputPath: {}", inputPath); - String resolvedPath = parser.get("resolvedPath"); - log.info("resolvedPath: {}", resolvedPath); + String resolvedPath = parser.get("resolvedPath"); + log.info("resolvedPath: {}", resolvedPath); - String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); + String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); - String dsMasterDuplicatePath = parser.get("masterDuplicatePath"); - log.info("masterDuplicatePath: {}", dsMasterDuplicatePath); + String dsMasterDuplicatePath = parser.get("masterDuplicatePath"); + log.info("masterDuplicatePath: {}", dsMasterDuplicatePath); - String graphTableClassName = parser.get("graphTableClassName"); - log.info("graphTableClassName: {}", graphTableClassName); + String graphTableClassName = parser.get("graphTableClassName"); + log.info("graphTableClassName: {}", graphTableClassName); - Class entityClazz = (Class) Class.forName(graphTableClassName); + Class entityClazz = (Class) Class.forName(graphTableClassName); - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); - HdfsSupport.remove(resolvedPath, spark.sparkContext().hadoopConfiguration()); - cleanCfHb( - spark, inputPath, entityClazz, resolvedPath, dsMasterDuplicatePath, outputPath); - }); - } + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); + HdfsSupport.remove(resolvedPath, spark.sparkContext().hadoopConfiguration()); + cleanCfHb( + spark, inputPath, entityClazz, resolvedPath, dsMasterDuplicatePath, outputPath); + }); + } - private static void cleanCfHb(SparkSession spark, String inputPath, Class entityClazz, - String resolvedPath, String masterDuplicatePath, String outputPath) { + private static void cleanCfHb(SparkSession spark, String inputPath, Class entityClazz, + String resolvedPath, String masterDuplicatePath, String outputPath) { - // read the master-duplicate tuples - Dataset md = spark - .read() - .textFile(masterDuplicatePath) - .map(as(MasterDuplicate.class), Encoders.bean(MasterDuplicate.class)); + // read the master-duplicate tuples + Dataset md = spark + .read() + .textFile(masterDuplicatePath) + .map(as(MasterDuplicate.class), Encoders.bean(MasterDuplicate.class)); - // prepare the resolved CF|HB references with the corresponding EMPTY master ID - Dataset resolved = spark - .read() - .textFile(inputPath) - .map(as(entityClazz), Encoders.bean(entityClazz)) - .flatMap(flattenCfHbFn(), Encoders.bean(IdCfHbMapping.class)); + // prepare the resolved CF|HB references with the corresponding EMPTY master ID + Dataset resolved = spark + .read() + .textFile(inputPath) + .map(as(entityClazz), Encoders.bean(entityClazz)) + .flatMap(flattenCfHbFn(), Encoders.bean(IdCfHbMapping.class)); - // set the EMPTY master ID/NAME and save it - resolved - .joinWith(md, resolved.col("cfhb").equalTo(md.col("duplicateId"))) - .map(asIdCfHbMapping(), Encoders.bean(IdCfHbMapping.class)) - .filter((FilterFunction) m -> Objects.nonNull(m.getMasterId())) - .write() - .mode(SaveMode.Overwrite) - .json(resolvedPath); + // set the EMPTY master ID/NAME and save it + resolved + .joinWith(md, resolved.col("cfhb").equalTo(md.col("duplicateId"))) + .map(asIdCfHbMapping(), Encoders.bean(IdCfHbMapping.class)) + .filter((FilterFunction) m -> Objects.nonNull(m.getMasterId())) + .write() + .mode(SaveMode.Overwrite) + .json(resolvedPath); - // read again the resolved CF|HB mapping - Dataset resolvedDS = spark - .read() - .textFile(resolvedPath) - .map(as(IdCfHbMapping.class), Encoders.bean(IdCfHbMapping.class)); + // read again the resolved CF|HB mapping + Dataset resolvedDS = spark + .read() + .textFile(resolvedPath) + .map(as(IdCfHbMapping.class), Encoders.bean(IdCfHbMapping.class)); - // read the result table - Dataset res = spark - .read() - .textFile(inputPath) - .map(as(entityClazz), Encoders.bean(entityClazz)); + // read the result table + Dataset res = spark + .read() + .textFile(inputPath) + .map(as(entityClazz), Encoders.bean(entityClazz)); - // Join the results with the resolved CF|HB mapping, apply the mapping and save it - res - .joinWith(resolvedDS, res.col("id").equalTo(resolvedDS.col("resultId")), "left") - .groupByKey((MapFunction, String>) t -> t._1().getId(), Encoders.STRING()) - .mapGroups(getMapGroupsFunction(), Encoders.bean(entityClazz)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath); - } + // Join the results with the resolved CF|HB mapping, apply the mapping and save it + res + .joinWith(resolvedDS, res.col("id").equalTo(resolvedDS.col("resultId")), "left") + .groupByKey((MapFunction, String>) t -> t._1().getId(), Encoders.STRING()) + .mapGroups(getMapGroupsFunction(), Encoders.bean(entityClazz)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + } - private static MapFunction, IdCfHbMapping> asIdCfHbMapping() { - return t -> { - final IdCfHbMapping mapping = t._1(); - Optional - .ofNullable(t._2()) - .ifPresent(t2 -> { - mapping.setMasterId(t2.getMasterId()); - mapping.setMasterName(t2.getMasterName()); + private static MapFunction, IdCfHbMapping> asIdCfHbMapping() { + return t -> { + final IdCfHbMapping mapping = t._1(); + Optional + .ofNullable(t._2()) + .ifPresent(t2 -> { + mapping.setMasterId(t2.getMasterId()); + mapping.setMasterName(t2.getMasterName()); - }); - return mapping; - }; - } + }); + return mapping; + }; + } - private static FlatMapFunction flattenCfHbFn() { - return r -> Stream - .concat( - Optional - .ofNullable(r.getCollectedfrom()) - .map(cf -> cf.stream().map(KeyValue::getKey)) - .orElse(Stream.empty()), - Stream - .concat( - Optional - .ofNullable(r.getInstance()) - .map( - instances -> instances - .stream() - .map(i -> Optional.ofNullable(i.getHostedby()).map(KeyValue::getKey).orElse(""))) - .orElse(Stream.empty()) - .filter(StringUtils::isNotBlank), - Optional - .ofNullable(r.getInstance()) - .map( - instances -> instances - .stream() - .map( - i -> Optional - .ofNullable(i.getCollectedfrom()) - .map(KeyValue::getKey) - .orElse(""))) - .orElse(Stream.empty()) - .filter(StringUtils::isNotBlank))) - .distinct() - .filter(StringUtils::isNotBlank) - .map(cfHb -> asIdCfHbMapping(r.getId(), cfHb)) - .iterator(); - } + private static FlatMapFunction flattenCfHbFn() { + return r -> Stream + .concat( + Optional + .ofNullable(r.getCollectedfrom()) + .map(cf -> cf.stream().map(KeyValue::getKey)) + .orElse(Stream.empty()), + Stream + .concat( + Optional + .ofNullable(r.getInstance()) + .map( + instances -> instances + .stream() + .map(i -> Optional.ofNullable(i.getHostedby()).map(KeyValue::getKey).orElse(""))) + .orElse(Stream.empty()) + .filter(StringUtils::isNotBlank), + Optional + .ofNullable(r.getInstance()) + .map( + instances -> instances + .stream() + .map( + i -> Optional + .ofNullable(i.getCollectedfrom()) + .map(KeyValue::getKey) + .orElse(""))) + .orElse(Stream.empty()) + .filter(StringUtils::isNotBlank))) + .distinct() + .filter(StringUtils::isNotBlank) + .map(cfHb -> asIdCfHbMapping(r.getId(), cfHb)) + .iterator(); + } - private static MapGroupsFunction, T> getMapGroupsFunction() { - return new MapGroupsFunction, T>() { - @Override - public T call(String key, Iterator> values) { - final Tuple2 first = values.next(); - final T res = first._1(); + private static MapGroupsFunction, T> getMapGroupsFunction() { + return new MapGroupsFunction, T>() { + @Override + public T call(String key, Iterator> values) { + final Tuple2 first = values.next(); + final T res = first._1(); - updateResult(res, first._2()); - values.forEachRemaining(t -> updateResult(res, t._2())); - return res; - } + updateResult(res, first._2()); + values.forEachRemaining(t -> updateResult(res, t._2())); + return res; + } - private void updateResult(T res, IdCfHbMapping m) { - if (Objects.nonNull(m)) { - res.getCollectedfrom().forEach(kv -> updateKeyValue(kv, m)); - res.getInstance().forEach(i -> { - updateKeyValue(i.getHostedby(), m); - updateKeyValue(i.getCollectedfrom(), m); - }); - } - } + private void updateResult(T res, IdCfHbMapping m) { + if (Objects.nonNull(m)) { + res.getCollectedfrom().forEach(kv -> updateKeyValue(kv, m)); + res.getInstance().forEach(i -> { + updateKeyValue(i.getHostedby(), m); + updateKeyValue(i.getCollectedfrom(), m); + }); + } + } - private void updateKeyValue(final KeyValue kv, final IdCfHbMapping a) { - if (kv.getKey().equals(a.getCfhb())) { - kv.setKey(a.getMasterId()); - kv.setValue(a.getMasterName()); - } - } + private void updateKeyValue(final KeyValue kv, final IdCfHbMapping a) { + if (kv.getKey().equals(a.getCfhb())) { + kv.setKey(a.getMasterId()); + kv.setValue(a.getMasterName()); + } + } - }; - } + }; + } - private static IdCfHbMapping asIdCfHbMapping(String resultId, String cfHb) { - IdCfHbMapping m = new IdCfHbMapping(resultId); - m.setCfhb(cfHb); - return m; - } + private static IdCfHbMapping asIdCfHbMapping(String resultId, String cfHb) { + IdCfHbMapping m = new IdCfHbMapping(resultId); + m.setCfhb(cfHb); + return m; + } - private static MapFunction as(Class clazz) { - return s -> OBJECT_MAPPER.readValue(s, clazz); - } + private static MapFunction as(Class clazz) { + return s -> OBJECT_MAPPER.readValue(s, clazz); + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/IdCfHbMapping.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/IdCfHbMapping.java index 0f527c4e74..fad1129c57 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/IdCfHbMapping.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/IdCfHbMapping.java @@ -5,50 +5,50 @@ import java.io.Serializable; public class IdCfHbMapping implements Serializable { - private String resultId; + private String resultId; - private String cfhb; + private String cfhb; - private String masterId; + private String masterId; - private String masterName; + private String masterName; - public IdCfHbMapping() { - } + public IdCfHbMapping() { + } - public IdCfHbMapping(String id) { - this.resultId = id; - } + public IdCfHbMapping(String id) { + this.resultId = id; + } - public String getResultId() { - return resultId; - } + public String getResultId() { + return resultId; + } - public void setResultId(String resultId) { - this.resultId = resultId; - } + public void setResultId(String resultId) { + this.resultId = resultId; + } - public String getCfhb() { - return cfhb; - } + public String getCfhb() { + return cfhb; + } - public void setCfhb(String cfhb) { - this.cfhb = cfhb; - } + public void setCfhb(String cfhb) { + this.cfhb = cfhb; + } - public String getMasterId() { - return masterId; - } + public String getMasterId() { + return masterId; + } - public void setMasterId(String masterId) { - this.masterId = masterId; - } + public void setMasterId(String masterId) { + this.masterId = masterId; + } - public String getMasterName() { - return masterName; - } + public String getMasterName() { + return masterName; + } - public void setMasterName(String masterName) { - this.masterName = masterName; - } + public void setMasterName(String masterName) { + this.masterName = masterName; + } }