From b85ad7012a14584a8ce9e64fb07257c1d2a8a5e7 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 29 Apr 2020 17:29:49 +0200 Subject: [PATCH] reads the blacklist from the blacklist db and writes it as a set of relations on hdfs --- .../dhp/schema/common/RelationInverse.java | 4 + dhp-workflows/dhp-blacklist/pom.xml | 15 ++ .../dhp/blacklist/ReadBlacklistFromDB.java | 138 ++++++++++++++++++ .../dhp/blacklist/blacklist_parameters.json | 32 ++++ 4 files changed, 189 insertions(+) create mode 100644 dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/RelationInverse.java create mode 100644 dhp-workflows/dhp-blacklist/pom.xml create mode 100644 dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/ReadBlacklistFromDB.java create mode 100644 dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/blacklist_parameters.json diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/RelationInverse.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/RelationInverse.java new file mode 100644 index 000000000..b726a8c16 --- /dev/null +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/RelationInverse.java @@ -0,0 +1,4 @@ +package eu.dnetlib.dhp.schema.common; + +public class RelationInverse { +} diff --git a/dhp-workflows/dhp-blacklist/pom.xml b/dhp-workflows/dhp-blacklist/pom.xml new file mode 100644 index 000000000..69f26b961 --- /dev/null +++ b/dhp-workflows/dhp-blacklist/pom.xml @@ -0,0 +1,15 @@ + + + + dhp-workflows + eu.dnetlib.dhp + 1.1.7-SNAPSHOT + + 4.0.0 + + dhp-blacklist + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/ReadBlacklistFromDB.java b/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/ReadBlacklistFromDB.java new file mode 100644 index 000000000..a8d427761 --- /dev/null +++ b/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/ReadBlacklistFromDB.java @@ -0,0 +1,138 @@ +package eu.dnetlib.dhp.blacklist; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.raw.common.DbClient; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.common.RelationInverse; +import eu.dnetlib.dhp.schema.oaf.Relation; +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.BufferedWriter; +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.sql.ResultSet; +import java.util.Arrays; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.hadoop.conf.Configuration; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + + +public class ReadBlacklistFromDB implements Closeable { + + private final DbClient dbClient; + private static final Log log = LogFactory.getLog(ReadBlacklistFromDB.class); + private final Configuration conf; + private final BufferedWriter writer; + + private final static String query = "SELECT source_type, unnest(original_source_objects) as source, " + + "target_type, unnest(original_target_objects) as target, " + + "relationship FROM blacklist WHERE status = 'ACCEPTED'"; + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = + new ArgumentApplicationParser( + IOUtils.toString( + ReadBlacklistFromDB.class.getResourceAsStream( + "/eu/dnetlib/dhp/blacklist/blacklist_parameters.json"))); + + parser.parseArgument(args); + + final String dbUrl = parser.get("postgresUrl"); + final String dbUser = parser.get("postgresUser"); + final String dbPassword = parser.get("postgresPassword"); + final String hdfsPath = parser.get("hdfsPath"); + final String hdfsNameNode = parser.get("hdfsNameNode"); + + + + try (final ReadBlacklistFromDB rbl = + new ReadBlacklistFromDB(hdfsPath, hdfsNameNode, dbUrl, dbUser, dbPassword)) { + + log.info("Processing blacklist..."); + rbl.execute(query, rbl::processBlacklistEntry); + + } + } + + + public void execute(final String sql, final Function> producer) + throws Exception { + + final Consumer consumer = rs -> producer.apply(rs).forEach(r -> writeRelation(r)); + + dbClient.processResults(sql, consumer); + } + + public List processBlacklistEntry(ResultSet rs){ + try { + Relation direct = new Relation(); + Relation inverse = new Relation(); + + String source_prefix = ModelSupport.entityIdPrefix.get(rs.getString("source_type")); + String target_prefix = ModelSupport.entityIdPrefix.get(rs.getString("target_type")); + + String source_direct = source_prefix + "|" + rs.getString("source"); + direct.setSource(source_direct); + inverse.setTarget(source_direct); + + String target_direct = target_prefix + "|" + rs.getString("target"); + direct.setTarget(target_direct); + inverse.setSource(target_direct); + + String encoding = rs.getString("relationship"); + RelationInverse ri = ModelSupport.relationInverseMap.get(encoding); + direct.setRelClass(ri.getRelation()); + inverse.setRelClass(ri.getInverse()); + direct.setRelType(ri.getRelType()); + inverse.setRelType(ri.getRelType()); + direct.setSubRelType(ri.getSubReltype()); + inverse.setSubRelType(ri.getSubReltype()); + + return Arrays.asList(direct, inverse); + + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() throws IOException { + dbClient.close(); + writer.close(); + } + + public ReadBlacklistFromDB( + final String hdfsPath, String hdfsNameNode, final String dbUrl, final String dbUser, final String dbPassword) + throws Exception { + + this.dbClient = new DbClient(dbUrl, dbUser, dbPassword); + this.conf = new Configuration(); + this.conf.set("fs.defaultFS", hdfsNameNode); + FileSystem fileSystem = FileSystem.get(this.conf); + Path hdfsWritePath = new Path(hdfsPath); + FSDataOutputStream fsDataOutputStream = fileSystem.append(hdfsWritePath); + this.writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8)); + } + + protected void writeRelation(final Relation r) { + try { + writer.write(new ObjectMapper().writeValueAsString(r)); + writer.newLine(); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + + + +} diff --git a/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/blacklist_parameters.json b/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/blacklist_parameters.json new file mode 100644 index 000000000..cb13ff024 --- /dev/null +++ b/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/blacklist_parameters.json @@ -0,0 +1,32 @@ +[ + { + "paramName": "p", + "paramLongName": "hdfsPath", + "paramDescription": "the path where storing the sequential file", + "paramRequired": true + }, + { + "paramName": "pgurl", + "paramLongName": "postgresUrl", + "paramDescription": "postgres url, example: jdbc:postgresql://localhost:5432/testdb", + "paramRequired": true + }, + { + "paramName": "pguser", + "paramLongName": "postgresUser", + "paramDescription": "postgres user", + "paramRequired": false + }, + { + "paramName": "pgpasswd", + "paramLongName": "postgresPassword", + "paramDescription": "postgres password", + "paramRequired": false + }, + { + "paramName": "a", + "paramLongName": "action", + "paramDescription": "process claims", + "paramRequired": false + } +] \ No newline at end of file