diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java index b59f0ae73..433baf272 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java @@ -1,3 +1,4 @@ + package eu.dnetlib.dhp.oa.oaipmh; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; @@ -5,6 +6,9 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.Charset; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Statement; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Optional; @@ -37,6 +41,8 @@ public class IrishOaiExporterJob { protected static final int NUM_CONNECTIONS = 20; + public static final String TMP_OAI_TABLE = "temp_oai_data"; + public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -53,7 +59,6 @@ public class IrishOaiExporterJob { final String inputPath = parser.get("inputPath"); final String dbUrl = parser.get("dbUrl"); - final String dbTable = parser.get("dbTable"); final String dbUser = parser.get("dbUser"); final String dbPwd = parser.get("dbPwd"); final int numConnections = Optional @@ -64,7 +69,6 @@ public class IrishOaiExporterJob { log.info("inputPath: '{}'", inputPath); log.info("dbUrl: '{}'", dbUrl); log.info("dbUser: '{}'", dbUser); - log.info("table: '{}'", dbTable); log.info("dbPwd: '{}'", "xxx"); log.info("numPartitions: '{}'", numConnections); @@ -80,6 +84,7 @@ public class IrishOaiExporterJob { final Encoder encoderTuple = Encoders.bean(TupleWrapper.class); final Encoder encoderOaiRecord = Encoders.bean(OaiRecordWrapper.class); + log.info("Creating temporary table..."); runWithSparkSession(conf, isSparkSessionManaged, spark -> { final Dataset docs = spark @@ -91,12 +96,23 @@ public class IrishOaiExporterJob { .map((MapFunction) IrishOaiExporterJob::asIrishOaiResult, encoderOaiRecord) .filter((FilterFunction) obj -> (obj != null) && StringUtils.isNotBlank(obj.getId())); - docs.repartition(numConnections) + docs + .repartition(numConnections) .write() .mode(SaveMode.Overwrite) - .jdbc(dbUrl, dbTable, connectionProperties); + .jdbc(dbUrl, TMP_OAI_TABLE, connectionProperties); }); + log.info("Temporary table created."); + + log.info("Updating OAI records..."); + try (final Connection con = DriverManager.getConnection(dbUrl, dbUser, dbPwd)) { + try (final Statement st = con.createStatement()) { + final String query = IOUtils.toString(IrishOaiExporterJob.class.getResourceAsStream("oai-finalize.sql")); + st.execute(query); + } + } + log.info("DONE."); } protected static OaiRecordWrapper asIrishOaiResult(final String xml) { @@ -107,7 +123,7 @@ public class IrishOaiExporterJob { if (isValid(doc)) { r.setId(doc.valueOf("//*[local-name()='objIdentifier']").trim()); r.setBody(gzip(doc.selectSingleNode("//*[local-name()='entity']").asXML())); - r.setDate(LocalDateTime.now()); + r.setDate(LocalDateTime.now().toString()); r.setSets(new ArrayList<>()); } return r; diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/OaiRecordWrapper.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/OaiRecordWrapper.java index 4c2766754..2fdf32c96 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/OaiRecordWrapper.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/OaiRecordWrapper.java @@ -1,7 +1,7 @@ + package eu.dnetlib.dhp.oa.oaipmh; import java.io.Serializable; -import java.time.LocalDateTime; import java.util.List; public class OaiRecordWrapper implements Serializable { @@ -10,10 +10,11 @@ public class OaiRecordWrapper implements Serializable { private String id; private byte[] body; - private LocalDateTime date; + private String date; private List sets; - public OaiRecordWrapper() {} + public OaiRecordWrapper() { + } public String getId() { return this.id; @@ -31,11 +32,11 @@ public class OaiRecordWrapper implements Serializable { this.body = body; } - public LocalDateTime getDate() { + public String getDate() { return this.date; } - public void setDate(final LocalDateTime date) { + public void setDate(final String date) { this.date = date; } diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/oaipmh/input_params_irish_oai_exporter.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/oaipmh/input_params_irish_oai_exporter.json index 99a12927b..86b2bb0d3 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/oaipmh/input_params_irish_oai_exporter.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/oaipmh/input_params_irish_oai_exporter.json @@ -23,12 +23,6 @@ "paramDescription": "the user of the database", "paramRequired": true }, - { - "paramName": "t", - "paramLongName": "dbTable", - "paramDescription": "the name of the table in the database", - "paramRequired": true - }, { "paramName": "dpwd", "paramLongName": "dbPwd", diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/oaipmh/oai-finalize.sql b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/oaipmh/oai-finalize.sql new file mode 100644 index 000000000..1ec0dfee0 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/oaipmh/oai-finalize.sql @@ -0,0 +1,12 @@ +BEGIN; + +DELETE FROM oai_data; + +INSERT INTO oai_data(id, body, date, sets) SELECT + id, + body, + date::timestamp, + sets +FROM temp_oai_data; + +COMMIT; diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/DbSerializationTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/DbSerializationTest.java new file mode 100644 index 000000000..f33708f86 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/DbSerializationTest.java @@ -0,0 +1,97 @@ + +package eu.dnetlib.dhp.oa.oaipmh; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Statement; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +@Disabled +public class DbSerializationTest { + + private static SparkSession spark; + + public static final String dbUrl = "jdbc:postgresql://localhost:5432/db_test"; + public static final String dbUser = null; + public static final String dbPwd = null; + + @BeforeAll + public static void beforeAll() throws IOException { + + final SparkConf conf = new SparkConf(); + conf.setAppName("TEST"); + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + + spark = SparkSession + .builder() + .appName("TEST") + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + spark.stop(); + } + + @Test + public void testDatabaseSerialization() throws Exception { + final Properties connectionProperties = new Properties(); + if (dbUser != null) { + connectionProperties.put("user", dbUser); + } + if (dbPwd != null) { + connectionProperties.put("password", dbPwd); + } + + runWithSparkSession(new SparkConf(), false, spark -> { + + final List list = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + final OaiRecordWrapper r = new OaiRecordWrapper(); + r.setId("record_" + i); + r.setBody("jsahdjkahdjahdajad".getBytes()); + r.setDate(LocalDateTime.now().toString()); + r.setSets(Arrays.asList()); + list.add(r); + } + + final Dataset docs = spark.createDataset(list, Encoders.bean(OaiRecordWrapper.class)); + + docs + .write() + .mode(SaveMode.Overwrite) + .jdbc(dbUrl, IrishOaiExporterJob.TMP_OAI_TABLE, connectionProperties); + + }); + + try (final Connection con = DriverManager.getConnection(dbUrl, dbUser, dbPwd)) { + try (final Statement st = con.createStatement()) { + final String query = IOUtils.toString(getClass().getResourceAsStream("oai-finalize.sql")); + st.execute(query); + } + } + + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJobTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJobTest.java index e33c701c5..57a32e246 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJobTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJobTest.java @@ -1,3 +1,4 @@ + package eu.dnetlib.dhp.oa.oaipmh; import static org.junit.Assert.assertNull; @@ -17,7 +18,7 @@ import org.dom4j.DocumentException; import org.dom4j.io.SAXReader; import org.junit.jupiter.api.Test; -class IrishOaiExporterJobTest { +public class IrishOaiExporterJobTest { @Test void testAsIrishOaiResult() throws Exception { @@ -66,8 +67,12 @@ class IrishOaiExporterJobTest { } public static String gunzip(final byte[] compressed) { - if ((compressed == null) || (compressed.length == 0)) { return null; } - if (!isCompressed(compressed)) { return new String(compressed); } + if ((compressed == null) || (compressed.length == 0)) { + return null; + } + if (!isCompressed(compressed)) { + return new String(compressed); + } try (final GZIPInputStream gis = new GZIPInputStream(new ByteArrayInputStream(compressed))) { return IOUtils.toString(gis, Charset.defaultCharset()); } catch (final IOException e) { @@ -76,6 +81,7 @@ class IrishOaiExporterJobTest { } private static boolean isCompressed(final byte[] compressed) { - return (compressed[0] == (byte) GZIPInputStream.GZIP_MAGIC) && (compressed[1] == (byte) (GZIPInputStream.GZIP_MAGIC >> 8)); + return (compressed[0] == (byte) GZIPInputStream.GZIP_MAGIC) + && (compressed[1] == (byte) (GZIPInputStream.GZIP_MAGIC >> 8)); } }