diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsFromDB.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsFromDB.java index 0015dc60f1..2d541d2f98 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsFromDB.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsFromDB.java @@ -1,15 +1,5 @@ -package eu.dnetlib.dhp.actionmanager.project; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.DbClient; -import org.apache.commons.io.IOUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +package eu.dnetlib.dhp.actionmanager.project; import java.io.BufferedWriter; import java.io.Closeable; @@ -22,92 +12,104 @@ import java.util.List; import java.util.function.Consumer; import java.util.function.Function; +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.DbClient; + public class ReadProjectsFromDB implements Closeable { - private final DbClient dbClient; - private static final Log log = LogFactory.getLog(ReadProjectsFromDB.class); - private final Configuration conf; - private final BufferedWriter writer; - private final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private final DbClient dbClient; + private static final Log log = LogFactory.getLog(ReadProjectsFromDB.class); + private final Configuration conf; + private final BufferedWriter writer; + private final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private final static String query = "SELECT code " + - "from projects where id like 'corda__h2020%' " ; + private final static String query = "SELECT code " + + "from projects where id like 'corda__h2020%' "; - public static void main(final String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - ReadProjectsFromDB.class - .getResourceAsStream( - "/eu/dnetlib/dhp/actionmanager/project/read_projects_db.json"))); + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + ReadProjectsFromDB.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/project/read_projects_db.json"))); - parser.parseArgument(args); + parser.parseArgument(args); - final String dbUrl = parser.get("postgresUrl"); - final String dbUser = parser.get("postgresUser"); - final String dbPassword = parser.get("postgresPassword"); - final String hdfsPath = parser.get("hdfsPath") ; - final String hdfsNameNode = parser.get("hdfsNameNode"); + final String dbUrl = parser.get("postgresUrl"); + final String dbUser = parser.get("postgresUser"); + final String dbPassword = parser.get("postgresPassword"); + final String hdfsPath = parser.get("hdfsPath"); + final String hdfsNameNode = parser.get("hdfsNameNode"); - try (final ReadProjectsFromDB rbl = new ReadProjectsFromDB(hdfsPath, hdfsNameNode, dbUrl, dbUser, - dbPassword)) { + try (final ReadProjectsFromDB rbl = new ReadProjectsFromDB(hdfsPath, hdfsNameNode, dbUrl, dbUser, + dbPassword)) { - log.info("Processing blacklist..."); - rbl.execute(query, rbl::processProjectsEntry); + log.info("Processing projects..."); + rbl.execute(query, rbl::processProjectsEntry); - } - } - public void execute(final String sql, final Function> producer) throws Exception { + } + } - final Consumer consumer = rs -> producer.apply(rs).forEach(r -> writeProject(r)); + public void execute(final String sql, final Function> producer) throws Exception { - dbClient.processResults(sql, consumer); - } + final Consumer consumer = rs -> producer.apply(rs).forEach(r -> writeProject(r)); - public List processProjectsEntry(ResultSet rs) { - try { - ProjectSubset p = new ProjectSubset(); - p.setCode(rs.getString("code")); + dbClient.processResults(sql, consumer); + } - return Arrays.asList(p); + public List processProjectsEntry(ResultSet rs) { + try { + ProjectSubset p = new ProjectSubset(); + p.setCode(rs.getString("code")); - } catch (final Exception e) { - throw new RuntimeException(e); - } - } + return Arrays.asList(p); - protected void writeProject(final ProjectSubset r) { - try { - writer.write(OBJECT_MAPPER.writeValueAsString(r)); - writer.newLine(); - } catch (final Exception e) { - throw new RuntimeException(e); - } - } + } catch (final Exception e) { + throw new RuntimeException(e); + } + } - public ReadProjectsFromDB( - final String hdfsPath, String hdfsNameNode, final String dbUrl, final String dbUser, final String dbPassword) - throws Exception { + protected void writeProject(final ProjectSubset r) { + try { + writer.write(OBJECT_MAPPER.writeValueAsString(r)); + writer.newLine(); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } - this.dbClient = new DbClient(dbUrl, dbUser, dbPassword); - this.conf = new Configuration(); - this.conf.set("fs.defaultFS", hdfsNameNode); - FileSystem fileSystem = FileSystem.get(this.conf); - Path hdfsWritePath = new Path(hdfsPath); - FSDataOutputStream fsDataOutputStream = null; - if (fileSystem.exists(hdfsWritePath)) { - fileSystem.delete(hdfsWritePath, false); - } - fsDataOutputStream = fileSystem.create(hdfsWritePath); + public ReadProjectsFromDB( + final String hdfsPath, String hdfsNameNode, final String dbUrl, final String dbUser, final String dbPassword) + throws Exception { + this.dbClient = new DbClient(dbUrl, dbUser, dbPassword); + this.conf = new Configuration(); + this.conf.set("fs.defaultFS", hdfsNameNode); + FileSystem fileSystem = FileSystem.get(this.conf); + Path hdfsWritePath = new Path(hdfsPath); + FSDataOutputStream fsDataOutputStream = null; + if (fileSystem.exists(hdfsWritePath)) { + fileSystem.delete(hdfsWritePath, false); + } + fsDataOutputStream = fileSystem.create(hdfsWritePath); - this.writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8)); - } + this.writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8)); + } - @Override - public void close() throws IOException { - dbClient.close(); - writer.close(); - } + @Override + public void close() throws IOException { + dbClient.close(); + writer.close(); + } } -