diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java index 3531e6fa2..05f083740 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java @@ -15,7 +15,7 @@ import java.util.stream.Collectors; import org.apache.commons.cli.ParseException; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -30,7 +30,6 @@ import org.apache.spark.sql.Dataset; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.spark_project.jetty.util.StringUtil; import com.fasterxml.jackson.databind.ObjectMapper; @@ -207,7 +206,7 @@ public class ExtractPerson implements Serializable { null); relation.setValidated(true); - if (StringUtil.isNotBlank(role)) { + if (StringUtils.isNotBlank(role)) { KeyValue kv = new KeyValue(); kv.setKey("role"); kv.setValue(role); @@ -453,13 +452,13 @@ public class ExtractPerson implements Serializable { null); relation.setValidated(true); - if (Optional.ofNullable(row.getStartDate()).isPresent() && StringUtil.isNotBlank(row.getStartDate())) { + if (Optional.ofNullable(row.getStartDate()).isPresent() && StringUtils.isNotBlank(row.getStartDate())) { KeyValue kv = new KeyValue(); kv.setKey("startDate"); kv.setValue(row.getStartDate()); properties.add(kv); } - if (Optional.ofNullable(row.getEndDate()).isPresent() && StringUtil.isNotBlank(row.getEndDate())) { + if (Optional.ofNullable(row.getEndDate()).isPresent() && StringUtils.isNotBlank(row.getEndDate())) { KeyValue kv = new KeyValue(); kv.setKey("endDate"); kv.setValue(row.getEndDate()); diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/generate_dataset_from_dump_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/generate_dataset_from_dump_params.json new file mode 100644 index 000000000..ae5ec4d73 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/generate_dataset_from_dump_params.json @@ -0,0 +1,27 @@ +[ + { + "paramName": "s", + "paramLongName": "sourcePath", + "paramDescription": "the source mdstore path", + "paramRequired": true + }, + { + "paramName": "m", + "paramLongName": "master", + "paramDescription": "the master name", + "paramRequired": false + }, + { + "paramName": "w", + "paramLongName": "workingDir", + "paramDescription": "the working Directory", + "paramRequired": true + }, + { + "paramName": "c", + "paramLongName": "currentDump", + "paramDescription": "the current Dump Directory", + "paramRequired": true + } + +] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/SparkApplyDump.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/SparkApplyDump.scala new file mode 100644 index 000000000..69849ed96 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/SparkApplyDump.scala @@ -0,0 +1,49 @@ +package eu.dnetlib.dhp.datacite + +import eu.dnetlib.dhp.application.AbstractScalaApplication +import org.slf4j.{Logger, LoggerFactory} +import org.apache.spark.sql.{SaveMode, SparkSession} +import org.apache.spark.sql.functions.{col, from_json, to_timestamp, unix_timestamp} +import org.apache.spark.sql.types._ +import org.apache.hadoop.io.Text + +class SparkApplyDump (propertyPath: String, args: Array[String], log: Logger) + extends AbstractScalaApplication(propertyPath, args, log: Logger) { + + override def run(): Unit = { + val sourcePath = parser.get("sourcePath") + log.info(s"SourcePath is '$sourcePath'") + val currentDump = parser.get("currentDump") + log.info(s"currentDump is '$currentDump'") + val workingDir = parser.get("workingDir") + log.info(s"workingDir is '$workingDir'") + generateDatasetFromSeqDFile(spark , sourcePath, workingDir ) + + } + + + def generateDatasetFromSeqDFile(spark:SparkSession, sourcePath:String, workingDir:String):Unit = { + val schema_ddl = "doi STRING, isActive boolean, updated STRING" + val schema = StructType.fromDDL(schema_ddl) + import spark.implicits._ + val sc = spark.sparkContext + sc.sequenceFile(s"$sourcePath/metadata.seq", classOf[Text], classOf[Text]) + .map(x =>x._2.toString) + .toDF() + .selectExpr("value as json") + .withColumn("metadata", from_json(col("json"), schema)) + .selectExpr("lower(metadata.doi) as doi", "metadata.isActive as isActive", "metadata.updated as ts", "json") + .select(col("doi"), col("isActive"), unix_timestamp(to_timestamp(col("ts"), "yyyy-MM-dd'T'HH:mm:ss'Z'")).alias("timestamp"), col("json")) + .write.mode(SaveMode.Overwrite) + .save(s"$workingDir/datacite_ds") + } + +} + +object SparkApplyDump { + def main(args: Array[String]): Unit = { + val log = LoggerFactory.getLogger(getClass) + val app = new SparkApplyDump("/eu/dnetlib/dhp/datacite/generate_dataset_from_dump_params.json", args, log).initialize() + app.run() + } +} diff --git a/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/datacite/DataciteToOAFTest.scala b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/datacite/DataciteToOAFTest.scala index 01ef3ba54..7be7f2dc3 100644 --- a/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/datacite/DataciteToOAFTest.scala +++ b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/datacite/DataciteToOAFTest.scala @@ -111,6 +111,11 @@ class DataciteToOAFTest extends AbstractVocabularyTest { } + @Test + def testConvertDataciteToDataset(): Unit = { + SparkApplyDump.main(Array("--sourcePath", "/home/sandro/Downloads/datacite", "--currentDump", "/tmp/currentDump", "--workingDir", "/tmp/workingDir", "--master", "local[*]")) + } + @Test def testFilter(): Unit = { val record = Source