diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/subset/ReadMasterDuplicateFromDB.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/subset/ReadMasterDuplicateFromDB.java index d379730..d606800 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/subset/ReadMasterDuplicateFromDB.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/subset/ReadMasterDuplicateFromDB.java @@ -2,6 +2,7 @@ package eu.dnetlib.dhp.oa.graph.dump.subset; import com.fasterxml.jackson.databind.ObjectMapper; +import com.mongodb.DBCursor; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.DbClient; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; @@ -23,13 +24,9 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.function.Function; -public class ReadMasterDuplicateFromDB implements Closeable { +public class ReadMasterDuplicateFromDB { - private final DbClient dbClient; - private static final Log log = LogFactory.getLog(ReadMasterDuplicateFromDB.class); - - private final BufferedWriter writer; - private final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final String QUERY = "SELECT id as master, duplicate FROM dsm_dedup_services; "; @@ -49,31 +46,37 @@ public class ReadMasterDuplicateFromDB implements Closeable { final String hdfsPath = parser.get("hdfsPath"); final String hdfsNameNode = parser.get("hdfsNameNode"); - try ( - final ReadMasterDuplicateFromDB rmd = new ReadMasterDuplicateFromDB(hdfsPath, hdfsNameNode, dbUrl, dbUser, - dbPassword)) { + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", hdfsNameNode); - log.info("Processing datasources..."); - rmd.execute(QUERY, rmd::datasourceMasterMap); + FileSystem fileSystem = FileSystem.get(conf); + Path hdfsWritePath = new Path(hdfsPath); + FSDataOutputStream fsDataOutputStream = fileSystem.create(hdfsWritePath); + execute(dbUrl, dbUser, dbPassword, fsDataOutputStream); + + } + + private static void execute(String dbUrl, String dbUser, String dbPassword, FSDataOutputStream fos) { + try(DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)){ + try(BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8))){ + dbClient.processResults(QUERY, rs -> writeMap(datasourceMasterMap(rs), writer)); + } + + } catch (IOException e) { + e.printStackTrace(); } } - public void execute(final String sql, final Function producer) { - dbClient.processResults(sql, rs -> writeMap(producer.apply(rs))); - } - - public MasterDuplicate datasourceMasterMap(ResultSet rs) { + public static MasterDuplicate datasourceMasterMap(ResultSet rs) { try { MasterDuplicate dm = new MasterDuplicate(); String duplicate = rs.getString("duplicate"); - dm.setDuplicate(duplicate); + dm.setDuplicate(OafMapperUtils.createOpenaireId(10, duplicate, true)); String master = rs.getString("master"); - if (StringUtils.isNotBlank(master)) - dm.setMaster(OafMapperUtils.createOpenaireId(10, master, true)); - else - dm.setMaster(OafMapperUtils.createOpenaireId(10, duplicate, true)); + dm.setMaster(OafMapperUtils.createOpenaireId(10, master, true)); + return dm; } catch (final SQLException e) { @@ -81,34 +84,8 @@ public class ReadMasterDuplicateFromDB implements Closeable { } } - @Override - public void close() throws IOException { - dbClient.close(); - writer.close(); - } - public ReadMasterDuplicateFromDB( - final String hdfsPath, String hdfsNameNode, final String dbUrl, final String dbUser, final String dbPassword) - throws IOException { - - this.dbClient = new DbClient(dbUrl, dbUser, dbPassword); - - Configuration conf = new Configuration(); - conf.set("fs.defaultFS", hdfsNameNode); - - FileSystem fileSystem = FileSystem.get(conf); - Path hdfsWritePath = new Path(hdfsPath); - FSDataOutputStream fsDataOutputStream = null; - if (fileSystem.exists(hdfsWritePath)) { - fsDataOutputStream = fileSystem.append(hdfsWritePath); - } else { - fsDataOutputStream = fileSystem.create(hdfsWritePath); - } - - this.writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8)); - } - - protected void writeMap(final MasterDuplicate dm) { + protected static void writeMap(final MasterDuplicate dm, BufferedWriter writer) { try { writer.write(OBJECT_MAPPER.writeValueAsString(dm)); writer.newLine(); diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/subset/oozie_app/workflow.xml b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/subset/oozie_app/workflow.xml index 0244be9..2fe7c96 100644 --- a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/subset/oozie_app/workflow.xml +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/subset/oozie_app/workflow.xml @@ -8,11 +8,18 @@ outputPath the output path - organizationCommunityMap the organization community map + + pathMap + the path where to find the elements involved in the constraints within the json + + + selectionCriteria + the selection criteria used to select the results + hiveDbName