forked from D-Net/dnet-hadoop
Merged Datacite transfrom into this branch
This commit is contained in:
parent
99cf3a8ea4
commit
0f8e2ecce6
|
@ -58,12 +58,6 @@
|
||||||
<groupId>eu.dnetlib.dhp</groupId>
|
<groupId>eu.dnetlib.dhp</groupId>
|
||||||
<artifactId>dhp-common</artifactId>
|
<artifactId>dhp-common</artifactId>
|
||||||
<version>${project.version}</version>
|
<version>${project.version}</version>
|
||||||
<exclusions>
|
|
||||||
<exclusion>
|
|
||||||
<groupId>com.sun.xml.bind</groupId>
|
|
||||||
<artifactId>jaxb-core</artifactId>
|
|
||||||
</exclusion>
|
|
||||||
</exclusions>
|
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
|
|
|
@ -11,9 +11,10 @@ import org.json4s.JsonAST.{JField, JObject, JString}
|
||||||
import org.json4s.jackson.JsonMethods.parse
|
import org.json4s.jackson.JsonMethods.parse
|
||||||
|
|
||||||
import java.nio.charset.CodingErrorAction
|
import java.nio.charset.CodingErrorAction
|
||||||
|
import java.text.SimpleDateFormat
|
||||||
import java.time.LocalDate
|
import java.time.LocalDate
|
||||||
import java.time.format.DateTimeFormatter
|
import java.time.format.DateTimeFormatter
|
||||||
import java.util.Locale
|
import java.util.{Date, Locale}
|
||||||
import java.util.regex.Pattern
|
import java.util.regex.Pattern
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
import scala.io.{Codec, Source}
|
import scala.io.{Codec, Source}
|
||||||
|
@ -44,6 +45,8 @@ object DataciteToOAFTransformation {
|
||||||
codec.onMalformedInput(CodingErrorAction.REPLACE)
|
codec.onMalformedInput(CodingErrorAction.REPLACE)
|
||||||
codec.onUnmappableCharacter(CodingErrorAction.REPLACE)
|
codec.onUnmappableCharacter(CodingErrorAction.REPLACE)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private val PID_VOCABULARY = "dnet:pid_types"
|
private val PID_VOCABULARY = "dnet:pid_types"
|
||||||
val COBJ_VOCABULARY = "dnet:publication_resource"
|
val COBJ_VOCABULARY = "dnet:publication_resource"
|
||||||
val RESULT_VOCABULARY = "dnet:result_typologies"
|
val RESULT_VOCABULARY = "dnet:result_typologies"
|
||||||
|
@ -298,8 +301,13 @@ object DataciteToOAFTransformation {
|
||||||
result.setPid(List(pid).asJava)
|
result.setPid(List(pid).asJava)
|
||||||
result.setId(OafMapperUtils.createOpenaireId(50, s"datacite____::$doi", true))
|
result.setId(OafMapperUtils.createOpenaireId(50, s"datacite____::$doi", true))
|
||||||
result.setOriginalId(List(doi).asJava)
|
result.setOriginalId(List(doi).asJava)
|
||||||
result.setDateofcollection(s"${dateOfCollection}")
|
|
||||||
result.setDateoftransformation(s"$ts")
|
val d = new Date(dateOfCollection*1000)
|
||||||
|
val ISO8601FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ", Locale.US)
|
||||||
|
|
||||||
|
|
||||||
|
result.setDateofcollection(ISO8601FORMAT.format(d))
|
||||||
|
result.setDateoftransformation(ISO8601FORMAT.format(ts))
|
||||||
result.setDataInfo(dataInfo)
|
result.setDataInfo(dataInfo)
|
||||||
|
|
||||||
val creators = (json \\ "creators").extractOrElse[List[CreatorType]](List())
|
val creators = (json \\ "creators").extractOrElse[List[CreatorType]](List())
|
||||||
|
|
|
@ -108,6 +108,7 @@ object ImportDatacite {
|
||||||
|
|
||||||
val cnt = writeSequenceFile(hdfsTargetPath, ts, conf)
|
val cnt = writeSequenceFile(hdfsTargetPath, ts, conf)
|
||||||
|
|
||||||
|
|
||||||
log.info(s"Imported from Datacite API $cnt documents")
|
log.info(s"Imported from Datacite API $cnt documents")
|
||||||
|
|
||||||
if (cnt > 0) {
|
if (cnt > 0) {
|
||||||
|
|
|
@ -13,13 +13,25 @@
|
||||||
<name>nativeInputPath</name>
|
<name>nativeInputPath</name>
|
||||||
<description>the path of the input MDStore</description>
|
<description>the path of the input MDStore</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
|
||||||
</parameters>
|
</parameters>
|
||||||
|
|
||||||
<start to="ImportDatacite"/>
|
<start to="resume_from"/>
|
||||||
|
|
||||||
|
|
||||||
<kill name="Kill">
|
<kill name="Kill">
|
||||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
</kill>
|
</kill>
|
||||||
|
|
||||||
|
<decision name="resume_from">
|
||||||
|
<switch>
|
||||||
|
<case to="TransformJob">${wf:conf('resumeFrom') eq 'TransformJob'}</case>
|
||||||
|
<case to="ExportDataset">${wf:conf('resumeFrom') eq 'ExportDataset'}</case>
|
||||||
|
<default to="ImportDatacite"/>
|
||||||
|
</switch>
|
||||||
|
</decision>
|
||||||
|
|
||||||
<action name="ImportDatacite">
|
<action name="ImportDatacite">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn-cluster</master>
|
<master>yarn-cluster</master>
|
||||||
|
@ -69,6 +81,14 @@
|
||||||
<arg>-tr</arg><arg>${isLookupUrl}</arg>
|
<arg>-tr</arg><arg>${isLookupUrl}</arg>
|
||||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||||
</spark>
|
</spark>
|
||||||
|
<ok to="DeletePathIfExists"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="DeletePathIfExists">
|
||||||
|
<fs>
|
||||||
|
<delete path='${mdstoreOutputPath}_raw_AS'/>
|
||||||
|
</fs>
|
||||||
<ok to="ExportDataset"/>
|
<ok to="ExportDataset"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
Loading…
Reference in New Issue