oai finalization and test

This commit is contained in:
Michele Artini 2024-05-15 14:13:16 +02:00
parent c9a327bc50
commit 2b3b5fe9a1
6 changed files with 146 additions and 20 deletions

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.oa.oaipmh; package eu.dnetlib.dhp.oa.oaipmh;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
@ -5,6 +6,9 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Optional; import java.util.Optional;
@ -37,6 +41,8 @@ public class IrishOaiExporterJob {
protected static final int NUM_CONNECTIONS = 20; protected static final int NUM_CONNECTIONS = 20;
public static final String TMP_OAI_TABLE = "temp_oai_data";
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -53,7 +59,6 @@ public class IrishOaiExporterJob {
final String inputPath = parser.get("inputPath"); final String inputPath = parser.get("inputPath");
final String dbUrl = parser.get("dbUrl"); final String dbUrl = parser.get("dbUrl");
final String dbTable = parser.get("dbTable");
final String dbUser = parser.get("dbUser"); final String dbUser = parser.get("dbUser");
final String dbPwd = parser.get("dbPwd"); final String dbPwd = parser.get("dbPwd");
final int numConnections = Optional final int numConnections = Optional
@ -64,7 +69,6 @@ public class IrishOaiExporterJob {
log.info("inputPath: '{}'", inputPath); log.info("inputPath: '{}'", inputPath);
log.info("dbUrl: '{}'", dbUrl); log.info("dbUrl: '{}'", dbUrl);
log.info("dbUser: '{}'", dbUser); log.info("dbUser: '{}'", dbUser);
log.info("table: '{}'", dbTable);
log.info("dbPwd: '{}'", "xxx"); log.info("dbPwd: '{}'", "xxx");
log.info("numPartitions: '{}'", numConnections); log.info("numPartitions: '{}'", numConnections);
@ -80,6 +84,7 @@ public class IrishOaiExporterJob {
final Encoder<TupleWrapper> encoderTuple = Encoders.bean(TupleWrapper.class); final Encoder<TupleWrapper> encoderTuple = Encoders.bean(TupleWrapper.class);
final Encoder<OaiRecordWrapper> encoderOaiRecord = Encoders.bean(OaiRecordWrapper.class); final Encoder<OaiRecordWrapper> encoderOaiRecord = Encoders.bean(OaiRecordWrapper.class);
log.info("Creating temporary table...");
runWithSparkSession(conf, isSparkSessionManaged, spark -> { runWithSparkSession(conf, isSparkSessionManaged, spark -> {
final Dataset<OaiRecordWrapper> docs = spark final Dataset<OaiRecordWrapper> docs = spark
@ -91,12 +96,23 @@ public class IrishOaiExporterJob {
.map((MapFunction<String, OaiRecordWrapper>) IrishOaiExporterJob::asIrishOaiResult, encoderOaiRecord) .map((MapFunction<String, OaiRecordWrapper>) IrishOaiExporterJob::asIrishOaiResult, encoderOaiRecord)
.filter((FilterFunction<OaiRecordWrapper>) obj -> (obj != null) && StringUtils.isNotBlank(obj.getId())); .filter((FilterFunction<OaiRecordWrapper>) obj -> (obj != null) && StringUtils.isNotBlank(obj.getId()));
docs.repartition(numConnections) docs
.repartition(numConnections)
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.jdbc(dbUrl, dbTable, connectionProperties); .jdbc(dbUrl, TMP_OAI_TABLE, connectionProperties);
}); });
log.info("Temporary table created.");
log.info("Updating OAI records...");
try (final Connection con = DriverManager.getConnection(dbUrl, dbUser, dbPwd)) {
try (final Statement st = con.createStatement()) {
final String query = IOUtils.toString(IrishOaiExporterJob.class.getResourceAsStream("oai-finalize.sql"));
st.execute(query);
}
}
log.info("DONE.");
} }
protected static OaiRecordWrapper asIrishOaiResult(final String xml) { protected static OaiRecordWrapper asIrishOaiResult(final String xml) {
@ -107,7 +123,7 @@ public class IrishOaiExporterJob {
if (isValid(doc)) { if (isValid(doc)) {
r.setId(doc.valueOf("//*[local-name()='objIdentifier']").trim()); r.setId(doc.valueOf("//*[local-name()='objIdentifier']").trim());
r.setBody(gzip(doc.selectSingleNode("//*[local-name()='entity']").asXML())); r.setBody(gzip(doc.selectSingleNode("//*[local-name()='entity']").asXML()));
r.setDate(LocalDateTime.now()); r.setDate(LocalDateTime.now().toString());
r.setSets(new ArrayList<>()); r.setSets(new ArrayList<>());
} }
return r; return r;

View File

@ -1,7 +1,7 @@
package eu.dnetlib.dhp.oa.oaipmh; package eu.dnetlib.dhp.oa.oaipmh;
import java.io.Serializable; import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.List; import java.util.List;
public class OaiRecordWrapper implements Serializable { public class OaiRecordWrapper implements Serializable {
@ -10,10 +10,11 @@ public class OaiRecordWrapper implements Serializable {
private String id; private String id;
private byte[] body; private byte[] body;
private LocalDateTime date; private String date;
private List<String> sets; private List<String> sets;
public OaiRecordWrapper() {} public OaiRecordWrapper() {
}
public String getId() { public String getId() {
return this.id; return this.id;
@ -31,11 +32,11 @@ public class OaiRecordWrapper implements Serializable {
this.body = body; this.body = body;
} }
public LocalDateTime getDate() { public String getDate() {
return this.date; return this.date;
} }
public void setDate(final LocalDateTime date) { public void setDate(final String date) {
this.date = date; this.date = date;
} }

View File

@ -23,12 +23,6 @@
"paramDescription": "the user of the database", "paramDescription": "the user of the database",
"paramRequired": true "paramRequired": true
}, },
{
"paramName": "t",
"paramLongName": "dbTable",
"paramDescription": "the name of the table in the database",
"paramRequired": true
},
{ {
"paramName": "dpwd", "paramName": "dpwd",
"paramLongName": "dbPwd", "paramLongName": "dbPwd",

View File

@ -0,0 +1,12 @@
BEGIN;
DELETE FROM oai_data;
INSERT INTO oai_data(id, body, date, sets) SELECT
id,
body,
date::timestamp,
sets
FROM temp_oai_data;
COMMIT;

View File

@ -0,0 +1,97 @@
package eu.dnetlib.dhp.oa.oaipmh;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@Disabled
public class DbSerializationTest {
private static SparkSession spark;
public static final String dbUrl = "jdbc:postgresql://localhost:5432/db_test";
public static final String dbUser = null;
public static final String dbPwd = null;
@BeforeAll
public static void beforeAll() throws IOException {
final SparkConf conf = new SparkConf();
conf.setAppName("TEST");
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
spark = SparkSession
.builder()
.appName("TEST")
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
spark.stop();
}
@Test
public void testDatabaseSerialization() throws Exception {
final Properties connectionProperties = new Properties();
if (dbUser != null) {
connectionProperties.put("user", dbUser);
}
if (dbPwd != null) {
connectionProperties.put("password", dbPwd);
}
runWithSparkSession(new SparkConf(), false, spark -> {
final List<OaiRecordWrapper> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final OaiRecordWrapper r = new OaiRecordWrapper();
r.setId("record_" + i);
r.setBody("jsahdjkahdjahdajad".getBytes());
r.setDate(LocalDateTime.now().toString());
r.setSets(Arrays.asList());
list.add(r);
}
final Dataset<OaiRecordWrapper> docs = spark.createDataset(list, Encoders.bean(OaiRecordWrapper.class));
docs
.write()
.mode(SaveMode.Overwrite)
.jdbc(dbUrl, IrishOaiExporterJob.TMP_OAI_TABLE, connectionProperties);
});
try (final Connection con = DriverManager.getConnection(dbUrl, dbUser, dbPwd)) {
try (final Statement st = con.createStatement()) {
final String query = IOUtils.toString(getClass().getResourceAsStream("oai-finalize.sql"));
st.execute(query);
}
}
}
}

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.oa.oaipmh; package eu.dnetlib.dhp.oa.oaipmh;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
@ -17,7 +18,7 @@ import org.dom4j.DocumentException;
import org.dom4j.io.SAXReader; import org.dom4j.io.SAXReader;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
class IrishOaiExporterJobTest { public class IrishOaiExporterJobTest {
@Test @Test
void testAsIrishOaiResult() throws Exception { void testAsIrishOaiResult() throws Exception {
@ -66,8 +67,12 @@ class IrishOaiExporterJobTest {
} }
public static String gunzip(final byte[] compressed) { public static String gunzip(final byte[] compressed) {
if ((compressed == null) || (compressed.length == 0)) { return null; } if ((compressed == null) || (compressed.length == 0)) {
if (!isCompressed(compressed)) { return new String(compressed); } return null;
}
if (!isCompressed(compressed)) {
return new String(compressed);
}
try (final GZIPInputStream gis = new GZIPInputStream(new ByteArrayInputStream(compressed))) { try (final GZIPInputStream gis = new GZIPInputStream(new ByteArrayInputStream(compressed))) {
return IOUtils.toString(gis, Charset.defaultCharset()); return IOUtils.toString(gis, Charset.defaultCharset());
} catch (final IOException e) { } catch (final IOException e) {
@ -76,6 +81,7 @@ class IrishOaiExporterJobTest {
} }
private static boolean isCompressed(final byte[] compressed) { private static boolean isCompressed(final byte[] compressed) {
return (compressed[0] == (byte) GZIPInputStream.GZIP_MAGIC) && (compressed[1] == (byte) (GZIPInputStream.GZIP_MAGIC >> 8)); return (compressed[0] == (byte) GZIPInputStream.GZIP_MAGIC)
&& (compressed[1] == (byte) (GZIPInputStream.GZIP_MAGIC >> 8));
} }
} }