implemented first phase of import

This commit is contained in:
sandro.labruzzo 2024-12-02 10:49:13 +01:00
parent d02c8d64bf
commit 99940368e9
4 changed files with 85 additions and 5 deletions

View File

@ -15,7 +15,7 @@ import java.util.stream.Collectors;
import org.apache.commons.cli.ParseException; import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -30,7 +30,6 @@ import org.apache.spark.sql.Dataset;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.spark_project.jetty.util.StringUtil;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
@ -207,7 +206,7 @@ public class ExtractPerson implements Serializable {
null); null);
relation.setValidated(true); relation.setValidated(true);
if (StringUtil.isNotBlank(role)) { if (StringUtils.isNotBlank(role)) {
KeyValue kv = new KeyValue(); KeyValue kv = new KeyValue();
kv.setKey("role"); kv.setKey("role");
kv.setValue(role); kv.setValue(role);
@ -453,13 +452,13 @@ public class ExtractPerson implements Serializable {
null); null);
relation.setValidated(true); relation.setValidated(true);
if (Optional.ofNullable(row.getStartDate()).isPresent() && StringUtil.isNotBlank(row.getStartDate())) { if (Optional.ofNullable(row.getStartDate()).isPresent() && StringUtils.isNotBlank(row.getStartDate())) {
KeyValue kv = new KeyValue(); KeyValue kv = new KeyValue();
kv.setKey("startDate"); kv.setKey("startDate");
kv.setValue(row.getStartDate()); kv.setValue(row.getStartDate());
properties.add(kv); properties.add(kv);
} }
if (Optional.ofNullable(row.getEndDate()).isPresent() && StringUtil.isNotBlank(row.getEndDate())) { if (Optional.ofNullable(row.getEndDate()).isPresent() && StringUtils.isNotBlank(row.getEndDate())) {
KeyValue kv = new KeyValue(); KeyValue kv = new KeyValue();
kv.setKey("endDate"); kv.setKey("endDate");
kv.setValue(row.getEndDate()); kv.setValue(row.getEndDate());

View File

@ -0,0 +1,27 @@
[
{
"paramName": "s",
"paramLongName": "sourcePath",
"paramDescription": "the source mdstore path",
"paramRequired": true
},
{
"paramName": "m",
"paramLongName": "master",
"paramDescription": "the master name",
"paramRequired": false
},
{
"paramName": "w",
"paramLongName": "workingDir",
"paramDescription": "the working Directory",
"paramRequired": true
},
{
"paramName": "c",
"paramLongName": "currentDump",
"paramDescription": "the current Dump Directory",
"paramRequired": true
}
]

View File

@ -0,0 +1,49 @@
package eu.dnetlib.dhp.datacite
import eu.dnetlib.dhp.application.AbstractScalaApplication
import org.slf4j.{Logger, LoggerFactory}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions.{col, from_json, to_timestamp, unix_timestamp}
import org.apache.spark.sql.types._
import org.apache.hadoop.io.Text
class SparkApplyDump (propertyPath: String, args: Array[String], log: Logger)
extends AbstractScalaApplication(propertyPath, args, log: Logger) {
override def run(): Unit = {
val sourcePath = parser.get("sourcePath")
log.info(s"SourcePath is '$sourcePath'")
val currentDump = parser.get("currentDump")
log.info(s"currentDump is '$currentDump'")
val workingDir = parser.get("workingDir")
log.info(s"workingDir is '$workingDir'")
generateDatasetFromSeqDFile(spark , sourcePath, workingDir )
}
def generateDatasetFromSeqDFile(spark:SparkSession, sourcePath:String, workingDir:String):Unit = {
val schema_ddl = "doi STRING, isActive boolean, updated STRING"
val schema = StructType.fromDDL(schema_ddl)
import spark.implicits._
val sc = spark.sparkContext
sc.sequenceFile(s"$sourcePath/metadata.seq", classOf[Text], classOf[Text])
.map(x =>x._2.toString)
.toDF()
.selectExpr("value as json")
.withColumn("metadata", from_json(col("json"), schema))
.selectExpr("lower(metadata.doi) as doi", "metadata.isActive as isActive", "metadata.updated as ts", "json")
.select(col("doi"), col("isActive"), unix_timestamp(to_timestamp(col("ts"), "yyyy-MM-dd'T'HH:mm:ss'Z'")).alias("timestamp"), col("json"))
.write.mode(SaveMode.Overwrite)
.save(s"$workingDir/datacite_ds")
}
}
object SparkApplyDump {
def main(args: Array[String]): Unit = {
val log = LoggerFactory.getLogger(getClass)
val app = new SparkApplyDump("/eu/dnetlib/dhp/datacite/generate_dataset_from_dump_params.json", args, log).initialize()
app.run()
}
}

View File

@ -111,6 +111,11 @@ class DataciteToOAFTest extends AbstractVocabularyTest {
} }
@Test
def testConvertDataciteToDataset(): Unit = {
SparkApplyDump.main(Array("--sourcePath", "/home/sandro/Downloads/datacite", "--currentDump", "/tmp/currentDump", "--workingDir", "/tmp/workingDir", "--master", "local[*]"))
}
@Test @Test
def testFilter(): Unit = { def testFilter(): Unit = {
val record = Source val record = Source