diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java index 9486a74ce..eeb86a8ff 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java @@ -65,7 +65,10 @@ public class CreateActionSetSparkJob implements Serializable { final String outputPath = parser.get("outputPath"); log.info("outputPath {}", outputPath); - final boolean shouldDuplicateRels = Boolean.valueOf(parser.get("shouldDuplicateRels")); + final boolean shouldDuplicateRels = + Optional.ofNullable(parser.get("shouldDuplicateRels")) + .map(Boolean::valueOf) + .orElse(Boolean.FALSE); SparkConf conf = new SparkConf(); runWithSparkSession( diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateRelationsJson.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateRelationsJson.java deleted file mode 100644 index 4996a3089..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateRelationsJson.java +++ /dev/null @@ -1,176 +0,0 @@ - -package eu.dnetlib.dhp.actionmanager.opencitations; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.io.IOException; -import java.io.Serializable; -import java.util.*; - -import org.apache.commons.cli.ParseException; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.SequenceFileOutputFormat; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FilterFunction; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.action.AtomicAction; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.DataInfo; -import eu.dnetlib.dhp.schema.oaf.KeyValue; -import eu.dnetlib.dhp.schema.oaf.Qualifier; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; -import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; -import scala.Tuple2; - -public class CreateRelationsJson implements Serializable { - public static final String OPENCITATIONS_CLASSID = "sysimport:crosswalk:opencitations"; - public static final String OPENCITATIONS_CLASSNAME = "Imported from OpenCitations"; - private static final String ID_PREFIX = "50|doi_________::"; - private static final String TRUST = "0.91"; - - private static final Logger log = LoggerFactory.getLogger(CreateRelationsJson.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - public static void main(final String[] args) throws IOException, ParseException { - - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - Objects - .requireNonNull( - CreateRelationsJson.class - .getResourceAsStream( - "/eu/dnetlib/dhp/actionmanager/opencitations/as_parameters.json")))); - - parser.parseArgument(args); - - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - final String inputPath = parser.get("inputPath"); - log.info("inputPath {}", inputPath.toString()); - - final String outputPath = parser.get("outputPath"); - log.info("outputPath {}", outputPath); - - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - extractContent(spark, inputPath, outputPath); - }); - - } - - private static void extractContent(SparkSession spark, String inputPath, String outputPath) { - spark - .sqlContext() - .createDataset(spark.sparkContext().textFile(inputPath + "/*", 6000), Encoders.STRING()) - .flatMap( - (FlatMapFunction) value -> createRelation(value).iterator(), - Encoders.bean(Relation.class)) - .filter((FilterFunction) value -> value != null) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath); - - } - - private static List createRelation(String value) { - String[] line = value.split(","); - if (!line[1].startsWith("10.")) { - return new ArrayList<>(); - } - List relationList = new ArrayList<>(); - - String citing = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", line[1])); - final String cited = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", line[2])); - - relationList - .addAll( - getRelations( - citing, - cited)); - - if (line[1].endsWith(".refs")) { - citing = ID_PREFIX + IdentifierFactory - .md5(CleaningFunctions.normalizePidValue("doi", line[1].substring(0, line[1].indexOf(".refs")))); - relationList.addAll(getRelations(citing, cited)); - } - - return relationList; - } - - private static Collection getRelations(String citing, String cited) { - - return Arrays - .asList( - getRelation(citing, cited, ModelConstants.CITES), - getRelation(cited, citing, ModelConstants.IS_CITED_BY)); - } - - public static Relation getRelation( - String source, - String target, - String relclass) { - Relation r = new Relation(); - r.setCollectedfrom(getCollectedFrom()); - r.setSource(source); - r.setTarget(target); - r.setRelClass(relclass); - r.setRelType(ModelConstants.RESULT_RESULT); - r.setSubRelType(ModelConstants.CITATION); - r - .setDataInfo( - getDataInfo()); - return r; - } - - public static List getCollectedFrom() { - KeyValue kv = new KeyValue(); - kv.setKey(ModelConstants.OPENOCITATIONS_ID); - kv.setValue(ModelConstants.OPENOCITATIONS_NAME); - - return Arrays.asList(kv); - } - - public static DataInfo getDataInfo() { - DataInfo di = new DataInfo(); - di.setInferred(false); - di.setDeletedbyinference(false); - di.setTrust(TRUST); - - di - .setProvenanceaction( - getQualifier(OPENCITATIONS_CLASSID, OPENCITATIONS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS)); - return di; - } - - public static Qualifier getQualifier(String class_id, String class_name, - String qualifierSchema) { - Qualifier pa = new Qualifier(); - pa.setClassid(class_id); - pa.setClassname(class_name); - pa.setSchemeid(qualifierSchema); - pa.setSchemename(qualifierSchema); - return pa; - } - -} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/OpenCitationModel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/OpenCitationModel.java deleted file mode 100644 index 2da96084e..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/OpenCitationModel.java +++ /dev/null @@ -1,5 +0,0 @@ - -package eu.dnetlib.dhp.actionmanager.opencitations; - -public class OpenCitationModel { -} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java index f3ceaa1ec..7567f855b 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java @@ -84,6 +84,8 @@ public class CreateOpenCitationsASTest { new String[] { "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-shouldDuplicateRels", + Boolean.TRUE.toString(), "-inputPath", inputPath, "-outputPath", @@ -99,7 +101,39 @@ public class CreateOpenCitationsASTest { assertEquals(60, tmp.count()); - tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r))); + // tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r))); + + } + + @Test + void testNumberofRelations2() throws Exception { + + String inputPath = getClass() + .getResource( + "/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") + .getPath(); + + CreateActionSetSparkJob + .main( + new String[] { + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-inputPath", + inputPath, + "-outputPath", + workingDir.toString() + "/actionSet" + }); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Relation) aa.getPayload())); + + assertEquals(44, tmp.count()); + + // tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r))); } @@ -206,8 +240,8 @@ public class CreateOpenCitationsASTest { assertEquals("citation", r.getSubRelType()); assertEquals("resultResult", r.getRelType()); }); - assertEquals(30, tmp.filter(r -> r.getRelClass().equals("Cites")).count()); - assertEquals(30, tmp.filter(r -> r.getRelClass().equals("IsCitedBy")).count()); + assertEquals(22, tmp.filter(r -> r.getRelClass().equals("Cites")).count()); + assertEquals(22, tmp.filter(r -> r.getRelClass().equals("IsCitedBy")).count()); }