diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/ProjectSubset.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/ProjectSubset.java new file mode 100644 index 000000000..cfbb62f21 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/ProjectSubset.java @@ -0,0 +1,16 @@ +package eu.dnetlib.dhp.actionmanager.project; + +import java.io.Serializable; + +public class ProjectSubset implements Serializable { + + private String code; + + public String getCode() { + return code; + } + + public void setCode(String code) { + this.code = code; + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsFromDB.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsFromDB.java new file mode 100644 index 000000000..0015dc60f --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsFromDB.java @@ -0,0 +1,113 @@ +package eu.dnetlib.dhp.actionmanager.project; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.DbClient; +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.BufferedWriter; +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.sql.ResultSet; +import java.util.Arrays; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Function; + +public class ReadProjectsFromDB implements Closeable { + + private final DbClient dbClient; + private static final Log log = LogFactory.getLog(ReadProjectsFromDB.class); + private final Configuration conf; + private final BufferedWriter writer; + private final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final static String query = "SELECT code " + + "from projects where id like 'corda__h2020%' " ; + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + ReadProjectsFromDB.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/project/read_projects_db.json"))); + + parser.parseArgument(args); + + final String dbUrl = parser.get("postgresUrl"); + final String dbUser = parser.get("postgresUser"); + final String dbPassword = parser.get("postgresPassword"); + final String hdfsPath = parser.get("hdfsPath") ; + final String hdfsNameNode = parser.get("hdfsNameNode"); + + try (final ReadProjectsFromDB rbl = new ReadProjectsFromDB(hdfsPath, hdfsNameNode, dbUrl, dbUser, + dbPassword)) { + + log.info("Processing blacklist..."); + rbl.execute(query, rbl::processProjectsEntry); + + } + } + public void execute(final String sql, final Function> producer) throws Exception { + + final Consumer consumer = rs -> producer.apply(rs).forEach(r -> writeProject(r)); + + dbClient.processResults(sql, consumer); + } + + public List processProjectsEntry(ResultSet rs) { + try { + ProjectSubset p = new ProjectSubset(); + p.setCode(rs.getString("code")); + + return Arrays.asList(p); + + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + protected void writeProject(final ProjectSubset r) { + try { + writer.write(OBJECT_MAPPER.writeValueAsString(r)); + writer.newLine(); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + public ReadProjectsFromDB( + final String hdfsPath, String hdfsNameNode, final String dbUrl, final String dbUser, final String dbPassword) + throws Exception { + + this.dbClient = new DbClient(dbUrl, dbUser, dbPassword); + this.conf = new Configuration(); + this.conf.set("fs.defaultFS", hdfsNameNode); + FileSystem fileSystem = FileSystem.get(this.conf); + Path hdfsWritePath = new Path(hdfsPath); + FSDataOutputStream fsDataOutputStream = null; + if (fileSystem.exists(hdfsWritePath)) { + fileSystem.delete(hdfsWritePath, false); + } + fsDataOutputStream = fileSystem.create(hdfsWritePath); + + + this.writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8)); + } + + @Override + public void close() throws IOException { + dbClient.close(); + writer.close(); + } +} + diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/read_projects_db.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/read_projects_db.json new file mode 100644 index 000000000..9a2eadaa7 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/read_projects_db.json @@ -0,0 +1,32 @@ +[ + { + "paramName": "p", + "paramLongName": "hdfsPath", + "paramDescription": "the path where storing the sequential file", + "paramRequired": true + }, + { + "paramName": "nn", + "paramLongName": "hdfsNameNode", + "paramDescription": "the name node on hdfs", + "paramRequired": true + }, + { + "paramName": "pgurl", + "paramLongName": "postgresUrl", + "paramDescription": "postgres url, example: jdbc:postgresql://localhost:5432/testdb", + "paramRequired": true + }, + { + "paramName": "pguser", + "paramLongName": "postgresUser", + "paramDescription": "postgres user", + "paramRequired": false + }, + { + "paramName": "pgpasswd", + "paramLongName": "postgresPassword", + "paramDescription": "postgres password", + "paramRequired": false + } +] \ No newline at end of file