1
0
Fork 0

[IE OAIPHM] added oozie workflow, minor changes, code formatting

This commit is contained in:
Claudio Atzori 2024-06-05 10:03:33 +02:00
parent 2b3b5fe9a1
commit 81090ad593
4 changed files with 155 additions and 36 deletions

View File

@ -46,15 +46,16 @@ public class IrishOaiExporterJob {
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(XmlConverterJob.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/oaipmh/input_params_irish_oai_exporter.json")));
IOUtils
.toString(
XmlConverterJob.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/oaipmh/input_params_irish_oai_exporter.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("inputPath");
@ -62,9 +63,9 @@ public class IrishOaiExporterJob {
final String dbUser = parser.get("dbUser");
final String dbPwd = parser.get("dbPwd");
final int numConnections = Optional
.ofNullable(parser.get("numConnections"))
.map(Integer::valueOf)
.orElse(NUM_CONNECTIONS);
.ofNullable(parser.get("numConnections"))
.map(Integer::valueOf)
.orElse(NUM_CONNECTIONS);
log.info("inputPath: '{}'", inputPath);
log.info("dbUrl: '{}'", dbUrl);
@ -78,29 +79,31 @@ public class IrishOaiExporterJob {
final SparkConf conf = new SparkConf();
conf.registerKryoClasses(new Class[] {
SerializableSolrInputDocument.class
SerializableSolrInputDocument.class
});
final Encoder<TupleWrapper> encoderTuple = Encoders.bean(TupleWrapper.class);
final Encoder<OaiRecordWrapper> encoderOaiRecord = Encoders.bean(OaiRecordWrapper.class);
final String date = LocalDateTime.now().toString();
log.info("Creating temporary table...");
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
final Dataset<OaiRecordWrapper> docs = spark
.read()
.schema(encoderTuple.schema())
.json(inputPath)
.as(encoderTuple)
.map((MapFunction<TupleWrapper, String>) TupleWrapper::getXml, Encoders.STRING())
.map((MapFunction<String, OaiRecordWrapper>) IrishOaiExporterJob::asIrishOaiResult, encoderOaiRecord)
.filter((FilterFunction<OaiRecordWrapper>) obj -> (obj != null) && StringUtils.isNotBlank(obj.getId()));
.read()
.schema(encoderTuple.schema())
.json(inputPath)
.as(encoderTuple)
.map((MapFunction<TupleWrapper, String>) TupleWrapper::getXml, Encoders.STRING())
.map((MapFunction<String, OaiRecordWrapper>) r -> asIrishOaiResult(r, date), encoderOaiRecord)
.filter((FilterFunction<OaiRecordWrapper>) obj -> (obj != null) && StringUtils.isNotBlank(obj.getId()));
docs
.repartition(numConnections)
.write()
.mode(SaveMode.Overwrite)
.jdbc(dbUrl, TMP_OAI_TABLE, connectionProperties);
.repartition(numConnections)
.write()
.mode(SaveMode.Overwrite)
.jdbc(dbUrl, TMP_OAI_TABLE, connectionProperties);
});
log.info("Temporary table created.");
@ -108,14 +111,15 @@ public class IrishOaiExporterJob {
log.info("Updating OAI records...");
try (final Connection con = DriverManager.getConnection(dbUrl, dbUser, dbPwd)) {
try (final Statement st = con.createStatement()) {
final String query = IOUtils.toString(IrishOaiExporterJob.class.getResourceAsStream("oai-finalize.sql"));
final String query = IOUtils
.toString(IrishOaiExporterJob.class.getResourceAsStream("oai-finalize.sql"));
st.execute(query);
}
}
log.info("DONE.");
}
protected static OaiRecordWrapper asIrishOaiResult(final String xml) {
protected static OaiRecordWrapper asIrishOaiResult(final String xml, final String date) {
try {
final Document doc = DocumentHelper.parseText(xml);
final OaiRecordWrapper r = new OaiRecordWrapper();
@ -123,7 +127,7 @@ public class IrishOaiExporterJob {
if (isValid(doc)) {
r.setId(doc.valueOf("//*[local-name()='objIdentifier']").trim());
r.setBody(gzip(doc.selectSingleNode("//*[local-name()='entity']").asXML()));
r.setDate(LocalDateTime.now().toString());
r.setDate(date);
r.setSets(new ArrayList<>());
}
return r;
@ -140,19 +144,25 @@ public class IrishOaiExporterJob {
if (n != null) {
for (final Object o : n.selectNodes(".//*[local-name()='datainfo']/*[local-name()='deletedbyinference']")) {
if ("true".equals(((Node) o).getText().trim())) { return false; }
if ("true".equals(((Node) o).getText().trim())) {
return false;
}
}
// verify the main country of the result
for (final Object o : n.selectNodes("./*[local-name()='country']")) {
if ("IE".equals(((Node) o).valueOf("@classid").trim())) { return true; }
if ("IE".equals(((Node) o).valueOf("@classid").trim())) {
return true;
}
}
// verify the countries of the related organizations
for (final Object o : n.selectNodes(".//*[local-name()='rel']")) {
final String relType = ((Node) o).valueOf("./*[local-name() = 'to']/@type").trim();
final String relCountry = ((Node) o).valueOf("./*[local-name() = 'country']/@classid").trim();
if ("organization".equals(relType) && "IE".equals(relCountry)) { return true; }
if ("organization".equals(relType) && "IE".equals(relCountry)) {
return true;
}
}
}
return false;
@ -160,7 +170,9 @@ public class IrishOaiExporterJob {
}
protected static byte[] gzip(final String str) {
if (StringUtils.isBlank(str)) { return null; }
if (StringUtils.isBlank(str)) {
return null;
}
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
try (final GZIPOutputStream gzip = new GZIPOutputStream(baos)) {

View File

@ -0,0 +1,106 @@
<workflow-app name="irish-oaipmh-provision" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>inputPath</name>
<description>The path of the input records on HDFS</description>
</property>
<property>
<name>numConnections</name>
<description>number of connections to the postgres db (for the write operation)</description>
</property>
<property>
<name>dbUrl</name>
<description>the url of the database</description>
</property>
<property>
<name>dbUser</name>
<description>the user of the database</description>
</property>
<property>
<name>dbPwd</name>
<description>the password for the user of the database</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="oaiphm_provision"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="irish_oaiphm_provision">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Irish OAI-PHM provision</name>
<class>eu.dnetlib.dhp.oa.oaipmh.IrishOaiExporterJob</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=8000
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}</arg>
<arg>--numConnections</arg><arg>${numConnections}</arg>
<arg>--dbUrl</arg><arg>${dbUrl}</arg>
<arg>--dbUser</arg><arg>${dbUser}</arg>
<arg>--dbPwd</arg><arg>${dbPwd}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -42,10 +42,10 @@ public class DbSerializationTest {
conf.set("spark.driver.host", "localhost");
spark = SparkSession
.builder()
.appName("TEST")
.config(conf)
.getOrCreate();
.builder()
.appName("TEST")
.config(conf)
.getOrCreate();
}
@AfterAll
@ -79,9 +79,9 @@ public class DbSerializationTest {
final Dataset<OaiRecordWrapper> docs = spark.createDataset(list, Encoders.bean(OaiRecordWrapper.class));
docs
.write()
.mode(SaveMode.Overwrite)
.jdbc(dbUrl, IrishOaiExporterJob.TMP_OAI_TABLE, connectionProperties);
.write()
.mode(SaveMode.Overwrite)
.jdbc(dbUrl, IrishOaiExporterJob.TMP_OAI_TABLE, connectionProperties);
});

View File

@ -10,6 +10,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.time.LocalDateTime;
import java.util.zip.GZIPInputStream;
import org.apache.commons.io.IOUtils;
@ -23,7 +24,7 @@ public class IrishOaiExporterJobTest {
@Test
void testAsIrishOaiResult() throws Exception {
final String xml = IOUtils.toString(getClass().getResourceAsStream("record_IE.xml"));
final OaiRecordWrapper res = IrishOaiExporterJob.asIrishOaiResult(xml);
final OaiRecordWrapper res = IrishOaiExporterJob.asIrishOaiResult(xml, LocalDateTime.now().toString());
assertNotNull(res.getId());
assertNotNull(res.getBody());
assertNotNull(res.getSets());