@ -4,8 +4,10 @@ package eu.dnetlib.dhp.oa.dedup;
import java.io.IOException ;
import java.util.Optional ;
import eu.dnetlib.dhp.schema.oaf.Relation ;
import org.apache.commons.io.IOUtils ;
import org.apache.spark.SparkConf ;
import org.apache.spark.api.java.JavaPairRDD ;
import org.apache.spark.api.java.JavaSparkContext ;
import org.apache.spark.api.java.function.MapFunction ;
import org.apache.spark.sql.Dataset ;
@ -24,11 +26,12 @@ import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.utils.ISLookupClientFactory ;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException ;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService ;
import scala.Tuple2 ;
public class SparkC opyOpenorgs extends AbstractSparkAction {
private static final Logger log = LoggerFactory . getLogger ( SparkC opyOpenorgs . class ) ;
public class SparkC reateOrgsDedupRecord extends AbstractSparkAction {
private static final Logger log = LoggerFactory . getLogger ( SparkC reateOrgsDedupRecord . class ) ;
public SparkC opyOpenorgs ( ArgumentApplicationParser parser , SparkSession spark ) {
public SparkC reateOrgsDedupRecord ( ArgumentApplicationParser parser , SparkSession spark ) {
super ( parser , spark ) ;
}
@ -36,13 +39,13 @@ public class SparkCopyOpenorgs extends AbstractSparkAction {
ArgumentApplicationParser parser = new ArgumentApplicationParser (
IOUtils
. toString (
SparkC opyOpenorgs . class
SparkC reateOrgsDedupRecord . class
. getResourceAsStream (
"/eu/dnetlib/dhp/oa/dedup/copyOpenorgs_parameters.json" ) ) ) ;
parser . parseArgument ( args ) ;
SparkConf conf = new SparkConf ( ) ;
new SparkC opyOpenorgs ( parser , getSparkSession ( conf ) )
new SparkC reateOrgsDedupRecord ( parser , getSparkSession ( conf ) )
. run ( ISLookupClientFactory . getLookUpService ( parser . get ( "isLookUpUrl" ) ) ) ;
}
@ -64,14 +67,15 @@ public class SparkCopyOpenorgs extends AbstractSparkAction {
log . info ( "actionSetId: '{}'" , actionSetId ) ;
log . info ( "workingPath: '{}'" , workingPath ) ;
String subEntity = "organization" ;
log . info ( "Copying openorgs to the working dir" ) ;
log . info ( "Copying organization dedup records to the working dir" ) ;
final String outputPath = DedupUtility . createDedupRecordPath ( workingPath , actionSetId , subEntity ) ;
final String outputPath = DedupUtility . createDedupRecordPath ( workingPath , actionSetId , "organization" ) ;
final String entityPath = DedupUtility . createEntityPath ( graphBasePath , subEntity ) ;
final String entityPath = DedupUtility . createEntityPath ( graphBasePath , "organization" ) ;
filterOpenorgs ( spark , entityPath )
final String mergeRelsPath = DedupUtility . createMergeRelPath ( workingPath , actionSetId , "organization" ) ;
rootOrganization ( spark , entityPath , mergeRelsPath )
. write ( )
. mode ( SaveMode . Overwrite )
. option ( "compression" , "gzip" )
@ -79,26 +83,43 @@ public class SparkCopyOpenorgs extends AbstractSparkAction {
}
public static Dataset < Organization > filterOpenorgs (
public static Dataset < Organization > rootOrganization (
final SparkSession spark ,
final String entitiesInputPath ) {
final String entitiesInputPath ,
final String mergeRelsPath ) {
JavaSparkContext sc = JavaSparkContext . fromSparkContext ( spark . sparkContext ( ) ) ;
Dataset < Organization > entities = spark
. createDataset (
sc
. textFile ( entitiesInputPath )
. map ( it - > OBJECT_MAPPER . readValue ( it , Organization . class ) )
. rdd ( ) ,
Encoders . bean ( Organization . class ) ) ;
JavaPairRDD < String , Organization > entities = sc . textFile ( entitiesInputPath )
. map ( it - > OBJECT_MAPPER . readValue ( it , Organization . class ) )
. mapToPair ( o - > new Tuple2 < > ( o . getId ( ) , o ) ) ;
log . info ( "Number of organization entities processed: {}" , entities . count ( ) ) ;
entities = entities . filter ( entities . col ( "id" ) . contains ( DedupUtility . OPENORGS_ID_PREFIX ) ) ;
//collect root ids (ids in the source of 'merges' relations
JavaPairRDD < String , String > roots = spark
. read ( )
. load ( mergeRelsPath )
. as ( Encoders . bean ( Relation . class ) )
. where ( "relClass == 'merges'" )
. map (
( MapFunction < Relation , Tuple2 < String , String > > ) r - > new Tuple2 < > ( r . getSource ( ) , "root" ) ,
Encoders . tuple ( Encoders . STRING ( ) , Encoders . STRING ( ) ) )
. toJavaRDD ( )
. mapToPair ( t - > t )
. distinct ( ) ;
Dataset < Organization > rootOrgs = spark . createDataset (
entities
. leftOuterJoin ( roots )
. filter ( e - > e . _2 ( ) . _2 ( ) . isPresent ( ) ) //if it has been joined with 'root' then it's a root record
. map ( e - > e . _2 ( ) . _1 ( ) )
. rdd ( ) ,
Encoders . bean ( Organization . class ) ) ;
log . info ( "Number of Openorgs organization entities: {}" , entities . count ( ) ) ;
log . info ( "Number of Root organization : {}", entities . count ( ) ) ;
return entities ;
return rootOrg s;
}
}