@ -1,18 +1,15 @@
package eu.dnetlib.dhp.oa.graph.raw
import com.fasterxml.jackson.databind. ObjectMapper
import com.fasterxml.jackson.databind. { DeserializationFeature , ObjectMapper}
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.common.HdfsSupport
import eu.dnetlib.dhp.schema.common.ModelSupport
import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo
import eu.dnetlib.dhp.schema.oaf.Oaf
import eu.dnetlib.dhp.utils.DHPUtils
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils
import org.apache.http.client.methods.HttpGet
import org.apache.http.impl.client.HttpClients
import org.apache.spark.sql. { Encoder , Encoders , SaveMode , SparkSession }
import org.apache.spark. { SparkConf , SparkContext }
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._
@ -51,18 +48,21 @@ object CopyHdfsOafSparkApplication {
log . info ( "hdfsPath: {}" , hdfsPath )
implicit val oafEncoder : Encoder [ Oaf ] = Encoders . kryo [ Oaf ]
import spark.implicits._
val paths = DHPUtils . mdstorePaths ( mdstoreManagerUrl , mdFormat , mdLayout , mdInterpretation , true ) . asScala
val validPaths : List [ String ] = paths . filter ( p => HdfsSupport . exists ( p , sc . hadoopConfiguration ) ) . toList
if ( validPaths . nonEmpty ) {
val oaf = spark . read . load ( validPaths : _ * ) . as [ Oaf ]
val mapper = new ObjectMapper ( )
val l = ModelSupport . oafTypes . entrySet . asScala . map ( e => e . getKey ) . toList
val oaf = spark . read . load ( validPaths : _ * ) . as [ String ]
val mapper = new ObjectMapper ( ) . configure ( DeserializationFeature . FAIL_ON_UNKNOWN_PROPERTIES , false )
val l = ModelSupport . oafTypes . entrySet . asScala . toList
l . foreach (
e =>
oaf . filter ( o => o . getClass . getSimpleName . equalsIgnoreCase ( e ) )
oaf
. filter ( o => isOafType ( o , e . getKey ) )
. map ( j => mapper . readValue ( j , e . getValue ) . asInstanceOf [ Oaf ] )
. map ( s => mapper . writeValueAsString ( s ) ) ( Encoders . STRING )
. write
. option ( "compression" , "gzip" )
@ -71,4 +71,20 @@ object CopyHdfsOafSparkApplication {
)
}
}
def isOafType ( input : String , oafType : String ) : Boolean = {
implicit lazy val formats : DefaultFormats . type = org . json4s . DefaultFormats
lazy val json : org . json4s . JValue = parse ( input )
if ( oafType == "relation" ) {
val hasSource = ( json \ "source" ) . extractOrElse [ String ] ( null )
val hasTarget = ( json \ "target" ) . extractOrElse [ String ] ( null )
hasSource != null && hasTarget != null
} else {
val hasId = ( json \ "id" ) . extractOrElse [ String ] ( null )
val resultType = ( json \ "resulttype" \ "classid" ) . extractOrElse [ String ] ( null )
hasId != null && oafType . equalsIgnoreCase ( resultType )
}
}
}