From 354f0162bed984c9a0ff5714c07f8bc1b6dcf4c3 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 30 Apr 2020 10:26:50 +0200 Subject: [PATCH] changes in the blacklist and workflow definition --- .../dhp/blacklist/ReadBlacklistFromDB.java | 187 +++++++++--------- .../dhp/blacklist/oozie_app/workflow.xml | 21 +- 2 files changed, 109 insertions(+), 99 deletions(-) diff --git a/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/ReadBlacklistFromDB.java b/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/ReadBlacklistFromDB.java index a8d4277614..4f66d2f109 100644 --- a/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/ReadBlacklistFromDB.java +++ b/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/ReadBlacklistFromDB.java @@ -1,13 +1,5 @@ -package eu.dnetlib.dhp.blacklist; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.graph.raw.common.DbClient; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.common.RelationInverse; -import eu.dnetlib.dhp.schema.oaf.Relation; -import org.apache.commons.io.IOUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +package eu.dnetlib.dhp.blacklist; import java.io.BufferedWriter; import java.io.Closeable; @@ -19,120 +11,125 @@ import java.util.Arrays; import java.util.List; import java.util.function.Consumer; import java.util.function.Function; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.raw.common.DbClient; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.common.RelationInverse; +import eu.dnetlib.dhp.schema.oaf.Relation; public class ReadBlacklistFromDB implements Closeable { - private final DbClient dbClient; - private static final Log log = LogFactory.getLog(ReadBlacklistFromDB.class); - private final Configuration conf; - private final BufferedWriter writer; + private final DbClient dbClient; + private static final Log log = LogFactory.getLog(ReadBlacklistFromDB.class); + private final Configuration conf; + private final BufferedWriter writer; - private final static String query = "SELECT source_type, unnest(original_source_objects) as source, " + - "target_type, unnest(original_target_objects) as target, " + - "relationship FROM blacklist WHERE status = 'ACCEPTED'"; + private final static String query = "SELECT source_type, unnest(original_source_objects) as source, " + + "target_type, unnest(original_target_objects) as target, " + + "relationship FROM blacklist WHERE status = 'ACCEPTED'"; - public static void main(final String[] args) throws Exception { - final ArgumentApplicationParser parser = - new ArgumentApplicationParser( - IOUtils.toString( - ReadBlacklistFromDB.class.getResourceAsStream( - "/eu/dnetlib/dhp/blacklist/blacklist_parameters.json"))); + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + ReadBlacklistFromDB.class + .getResourceAsStream( + "/eu/dnetlib/dhp/blacklist/blacklist_parameters.json"))); - parser.parseArgument(args); + parser.parseArgument(args); - final String dbUrl = parser.get("postgresUrl"); - final String dbUser = parser.get("postgresUser"); - final String dbPassword = parser.get("postgresPassword"); - final String hdfsPath = parser.get("hdfsPath"); - final String hdfsNameNode = parser.get("hdfsNameNode"); + final String dbUrl = parser.get("postgresUrl"); + final String dbUser = parser.get("postgresUser"); + final String dbPassword = parser.get("postgresPassword"); + final String hdfsPath = parser.get("hdfsPath"); + final String hdfsNameNode = parser.get("hdfsNameNode"); + try (final ReadBlacklistFromDB rbl = new ReadBlacklistFromDB(hdfsPath, hdfsNameNode, dbUrl, dbUser, + dbPassword)) { + log.info("Processing blacklist..."); + rbl.execute(query, rbl::processBlacklistEntry); - try (final ReadBlacklistFromDB rbl = - new ReadBlacklistFromDB(hdfsPath, hdfsNameNode, dbUrl, dbUser, dbPassword)) { + } + } - log.info("Processing blacklist..."); - rbl.execute(query, rbl::processBlacklistEntry); + public void execute(final String sql, final Function> producer) + throws Exception { - } - } + final Consumer consumer = rs -> producer.apply(rs).forEach(r -> writeRelation(r)); + dbClient.processResults(sql, consumer); + } - public void execute(final String sql, final Function> producer) - throws Exception { + public List processBlacklistEntry(ResultSet rs) { + try { + Relation direct = new Relation(); + Relation inverse = new Relation(); - final Consumer consumer = rs -> producer.apply(rs).forEach(r -> writeRelation(r)); + String source_prefix = ModelSupport.entityIdPrefix.get(rs.getString("source_type")); + String target_prefix = ModelSupport.entityIdPrefix.get(rs.getString("target_type")); - dbClient.processResults(sql, consumer); - } + String source_direct = source_prefix + "|" + rs.getString("source"); + direct.setSource(source_direct); + inverse.setTarget(source_direct); - public List processBlacklistEntry(ResultSet rs){ - try { - Relation direct = new Relation(); - Relation inverse = new Relation(); + String target_direct = target_prefix + "|" + rs.getString("target"); + direct.setTarget(target_direct); + inverse.setSource(target_direct); - String source_prefix = ModelSupport.entityIdPrefix.get(rs.getString("source_type")); - String target_prefix = ModelSupport.entityIdPrefix.get(rs.getString("target_type")); + String encoding = rs.getString("relationship"); + RelationInverse ri = ModelSupport.relationInverseMap.get(encoding); + direct.setRelClass(ri.getRelation()); + inverse.setRelClass(ri.getInverse()); + direct.setRelType(ri.getRelType()); + inverse.setRelType(ri.getRelType()); + direct.setSubRelType(ri.getSubReltype()); + inverse.setSubRelType(ri.getSubReltype()); - String source_direct = source_prefix + "|" + rs.getString("source"); - direct.setSource(source_direct); - inverse.setTarget(source_direct); + return Arrays.asList(direct, inverse); - String target_direct = target_prefix + "|" + rs.getString("target"); - direct.setTarget(target_direct); - inverse.setSource(target_direct); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } - String encoding = rs.getString("relationship"); - RelationInverse ri = ModelSupport.relationInverseMap.get(encoding); - direct.setRelClass(ri.getRelation()); - inverse.setRelClass(ri.getInverse()); - direct.setRelType(ri.getRelType()); - inverse.setRelType(ri.getRelType()); - direct.setSubRelType(ri.getSubReltype()); - inverse.setSubRelType(ri.getSubReltype()); - - return Arrays.asList(direct, inverse); - - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public void close() throws IOException { - dbClient.close(); - writer.close(); - } - - public ReadBlacklistFromDB( - final String hdfsPath, String hdfsNameNode, final String dbUrl, final String dbUser, final String dbPassword) - throws Exception { - - this.dbClient = new DbClient(dbUrl, dbUser, dbPassword); - this.conf = new Configuration(); - this.conf.set("fs.defaultFS", hdfsNameNode); - FileSystem fileSystem = FileSystem.get(this.conf); - Path hdfsWritePath = new Path(hdfsPath); - FSDataOutputStream fsDataOutputStream = fileSystem.append(hdfsWritePath); - this.writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8)); - } - - protected void writeRelation(final Relation r) { - try { - writer.write(new ObjectMapper().writeValueAsString(r)); - writer.newLine(); - } catch (final Exception e) { - throw new RuntimeException(e); - } - } + @Override + public void close() throws IOException { + dbClient.close(); + writer.close(); + } + public ReadBlacklistFromDB( + final String hdfsPath, String hdfsNameNode, final String dbUrl, final String dbUser, final String dbPassword) + throws Exception { + this.dbClient = new DbClient(dbUrl, dbUser, dbPassword); + this.conf = new Configuration(); + this.conf.set("fs.defaultFS", hdfsNameNode); + FileSystem fileSystem = FileSystem.get(this.conf); + Path hdfsWritePath = new Path(hdfsPath); + FSDataOutputStream fsDataOutputStream = fileSystem.append(hdfsWritePath); + this.writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8)); + } + protected void writeRelation(final Relation r) { + try { + writer.write(new ObjectMapper().writeValueAsString(r)); + writer.newLine(); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } } diff --git a/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/oozie_app/workflow.xml b/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/oozie_app/workflow.xml index 483c0378af..e38d721b96 100644 --- a/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/oozie_app/workflow.xml @@ -1,5 +1,18 @@ - - + + + + postgresURL + the url of the postgress server to query + + + postgresUser + the username to access the postgres db + + + postgresPassword + the postgres password + + @@ -8,7 +21,7 @@ - + @@ -21,7 +34,7 @@ eu.dnetlib.dhp.blacklist.ReadBlacklistFromDB --hdfsPath${workingDir}/blacklist --hdfsNameNode${nameNode} - --postgresUrl${postgresUrl} + --postgresUrl${postgresURL} --postgresUser${postgresUser} --postgresPassword${postgresPassword}