openCitations #194

Merged
miriam.baglioni merged 6 commits from openCitations into beta 2022-02-14 14:58:28 +01:00
21 changed files with 392 additions and 32 deletions

View File

@ -14,6 +14,7 @@ import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -21,6 +22,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
@ -83,10 +85,13 @@ public class CreateActionSetSparkJob implements Serializable {
private static void extractContent(SparkSession spark, String inputPath, String outputPath, private static void extractContent(SparkSession spark, String inputPath, String outputPath,
boolean shouldDuplicateRels) { boolean shouldDuplicateRels) {
spark spark
.sqlContext() .read()
.createDataset(spark.sparkContext().textFile(inputPath + "/*", 6000), Encoders.STRING()) .textFile(inputPath + "/*")
.map(
(MapFunction<String, COCI>) value -> OBJECT_MAPPER.readValue(value, COCI.class),
Encoders.bean(COCI.class))
.flatMap( .flatMap(
(FlatMapFunction<String, Relation>) value -> createRelation(value, shouldDuplicateRels).iterator(), (FlatMapFunction<COCI, Relation>) value -> createRelation(value, shouldDuplicateRels).iterator(),
Encoders.bean(Relation.class)) Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) value -> value != null) .filter((FilterFunction<Relation>) value -> value != null)
.toJavaRDD() .toJavaRDD()
@ -98,27 +103,30 @@ public class CreateActionSetSparkJob implements Serializable {
} }
private static List<Relation> createRelation(String value, boolean duplicate) { private static List<Relation> createRelation(COCI value, boolean duplicate) {
String[] line = value.split(",");
if (!line[1].startsWith("10.")) {
return new ArrayList<>();
}
List<Relation> relationList = new ArrayList<>(); List<Relation> relationList = new ArrayList<>();
String citing = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", line[1])); String citing = ID_PREFIX
final String cited = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", line[2])); + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCiting()));
final String cited = ID_PREFIX
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCited()));
if(!citing.equals(cited)){
relationList relationList
.addAll( .addAll(
getRelations( getRelations(
citing, citing,
cited)); cited));
if (duplicate && line[1].endsWith(".refs")) { if (duplicate && value.getCiting().endsWith(".refs")) {
citing = ID_PREFIX + IdentifierFactory citing = ID_PREFIX + IdentifierFactory
.md5(CleaningFunctions.normalizePidValue("doi", line[1].substring(0, line[1].indexOf(".refs")))); .md5(
CleaningFunctions
.normalizePidValue("doi", value.getCiting().substring(0, value.getCiting().indexOf(".refs"))));
relationList.addAll(getRelations(citing, cited)); relationList.addAll(getRelations(citing, cited));
} }
}
return relationList; return relationList;
} }

View File

@ -0,0 +1,103 @@
package eu.dnetlib.dhp.actionmanager.opencitations;
import static eu.dnetlib.dhp.actionmanager.Constants.DEFAULT_DELIMITER;
import static eu.dnetlib.dhp.actionmanager.Constants.isSparkSessionManaged;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.IOException;
import java.io.Serializable;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
public class ReadCOCI implements Serializable {
private static final Logger log = LoggerFactory.getLogger(ReadCOCI.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
ReadCOCI.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String[] inputFile = parser.get("inputFile").split(";");
log.info("inputFile {}", inputFile.toString());
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String workingPath = parser.get("workingPath");
log.info("workingPath {}", workingPath);
SparkConf sconf = new SparkConf();
final String delimiter = Optional
.ofNullable(parser.get("delimiter"))
.orElse(DEFAULT_DELIMITER);
runWithSparkSession(
sconf,
isSparkSessionManaged,
spark -> {
doRead(
spark,
workingPath,
inputFile,
outputPath,
delimiter);
});
}
private static void doRead(SparkSession spark, String workingPath, String[] inputFiles,
String outputPath,
String delimiter) throws IOException {
for(String inputFile : inputFiles){
String p_string = workingPath + "/" + inputFile ;
Dataset<Row> cociData = spark
.read()
.format("csv")
.option("sep", delimiter)
.option("inferSchema", "true")
.option("header", "true")
.option("quotes", "\"")
.load(p_string)
.repartition(100);
cociData.map((MapFunction<Row, COCI>) row -> {
COCI coci = new COCI();
coci.setOci(row.getString(0));
coci.setCiting(row.getString(1));
coci.setCited(row.getString(2));
return coci;
}, Encoders.bean(COCI.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + inputFile);
}
}
}

View File

@ -0,0 +1,41 @@
package eu.dnetlib.dhp.actionmanager.opencitations.model;
import java.io.Serializable;
import com.opencsv.bean.CsvBindByPosition;
public class COCI implements Serializable {
private String oci;
Review

This commented line is confusing. The first column of the csv is the coci id, not the doi.
The same applies to the commented lines below

This commented line is confusing. The first column of the csv is the coci id, not the doi. The same applies to the commented lines below
Review

done

done
private String citing;
private String cited;
public String getOci() {
return oci;
}
public void setOci(String oci) {
this.oci = oci;
}
public String getCiting() {
return citing;
}
public void setCiting(String citing) {
this.citing = citing;
}
public String getCited() {
return cited;
}
public void setCited(String cited) {
this.cited = cited;
}
}

View File

@ -0,0 +1,37 @@
[
{
"paramName": "wp",
"paramLongName": "workingPath",
"paramDescription": "the zipped opencitations file",
"paramRequired": true
},
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "the hdfs name node",
"paramRequired": false
},
{
"paramName": "d",
"paramLongName": "delimiter",
"paramDescription": "the hdfs name node",
"paramRequired": false
},
{
"paramName": "op",
"paramLongName": "outputPath",
"paramDescription": "the hdfs name node",
"paramRequired": true
},
{
"paramName": "if",
"paramLongName": "inputFile",
"paramDescription": "the hdfs name node",
"paramRequired": true
}
]

View File

@ -26,6 +26,7 @@
<switch> <switch>
<case to="download">${wf:conf('resumeFrom') eq 'DownloadDump'}</case> <case to="download">${wf:conf('resumeFrom') eq 'DownloadDump'}</case>
<case to="extract">${wf:conf('resumeFrom') eq 'ExtractContent'}</case> <case to="extract">${wf:conf('resumeFrom') eq 'ExtractContent'}</case>
<case to="read">${wf:conf('resumeFrom') eq 'ReadContent'}</case>
<default to="create_actionset"/> <!-- first action to be done when downloadDump is to be performed --> <default to="create_actionset"/> <!-- first action to be done when downloadDump is to be performed -->
</switch> </switch>
</decision> </decision>
@ -60,6 +61,32 @@
<arg>--inputFile</arg><arg>${inputFile}</arg> <arg>--inputFile</arg><arg>${inputFile}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
</java> </java>
<ok to="read"/>
<error to="Kill"/>
</action>
<action name="read">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Produces the AS for OC</name>
<class>eu.dnetlib.dhp.actionmanager.opencitations.ReadCOCI</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}/COCI</arg>
<arg>--outputPath</arg><arg>${workingPath}/COCI_JSON</arg>
<arg>--delimiter</arg><arg>${delimiter}</arg>
<arg>--inputFile</arg><arg>${inputFileCoci}</arg>
</spark>
<ok to="create_actionset"/> <ok to="create_actionset"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
@ -81,7 +108,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/COCI</arg> <arg>--inputPath</arg><arg>${workingPath}/COCI_JSON</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg> <arg>--outputPath</arg><arg>${outputPath}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>

View File

@ -76,7 +76,7 @@ public class CreateOpenCitationsASTest {
String inputPath = getClass() String inputPath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") "/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
.getPath(); .getPath();
CreateActionSetSparkJob CreateActionSetSparkJob
@ -99,7 +99,7 @@ public class CreateOpenCitationsASTest {
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Relation) aa.getPayload())); .map(aa -> ((Relation) aa.getPayload()));
assertEquals(60, tmp.count()); assertEquals(62, tmp.count());
// tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r))); // tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r)));
@ -110,7 +110,7 @@ public class CreateOpenCitationsASTest {
String inputPath = getClass() String inputPath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") "/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
.getPath(); .getPath();
CreateActionSetSparkJob CreateActionSetSparkJob
@ -131,7 +131,7 @@ public class CreateOpenCitationsASTest {
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Relation) aa.getPayload())); .map(aa -> ((Relation) aa.getPayload()));
assertEquals(44, tmp.count()); assertEquals(46, tmp.count());
// tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r))); // tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r)));
@ -142,7 +142,7 @@ public class CreateOpenCitationsASTest {
String inputPath = getClass() String inputPath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") "/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
.getPath(); .getPath();
CreateActionSetSparkJob CreateActionSetSparkJob
@ -175,7 +175,7 @@ public class CreateOpenCitationsASTest {
String inputPath = getClass() String inputPath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") "/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
.getPath(); .getPath();
CreateActionSetSparkJob CreateActionSetSparkJob
@ -215,7 +215,7 @@ public class CreateOpenCitationsASTest {
String inputPath = getClass() String inputPath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") "/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
.getPath(); .getPath();
CreateActionSetSparkJob CreateActionSetSparkJob
@ -240,8 +240,8 @@ public class CreateOpenCitationsASTest {
assertEquals("citation", r.getSubRelType()); assertEquals("citation", r.getSubRelType());
assertEquals("resultResult", r.getRelType()); assertEquals("resultResult", r.getRelType());
}); });
assertEquals(22, tmp.filter(r -> r.getRelClass().equals("Cites")).count()); assertEquals(23, tmp.filter(r -> r.getRelClass().equals("Cites")).count());
assertEquals(22, tmp.filter(r -> r.getRelClass().equals("IsCitedBy")).count()); assertEquals(23, tmp.filter(r -> r.getRelClass().equals("IsCitedBy")).count());
} }
@ -250,7 +250,7 @@ public class CreateOpenCitationsASTest {
String inputPath = getClass() String inputPath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") "/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
.getPath(); .getPath();
CreateActionSetSparkJob CreateActionSetSparkJob
@ -295,7 +295,7 @@ public class CreateOpenCitationsASTest {
String inputPath = getClass() String inputPath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") "/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
.getPath(); .getPath();
CreateActionSetSparkJob CreateActionSetSparkJob

View File

@ -0,0 +1,140 @@
package eu.dnetlib.dhp.actionmanager.opencitations;
import static eu.dnetlib.dhp.actionmanager.Constants.DEFAULT_DELIMITER;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI;
import eu.dnetlib.dhp.schema.oaf.Dataset;
public class ReadCOCITest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static Path workingDir;
private static final Logger log = LoggerFactory
.getLogger(ReadCOCITest.class);
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files
.createTempDirectory(ReadCOCITest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(ReadCOCITest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(ReadCOCITest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
void testReadCOCI() throws Exception {
String inputPath = getClass()
.getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles")
.getPath();
LocalFileSystem fs = FileSystem.getLocal(new Configuration());
fs
.copyFromLocalFile(
false, new org.apache.hadoop.fs.Path(getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input1")
.getPath()),
new org.apache.hadoop.fs.Path(workingDir + "/COCI/input1"));
fs
.copyFromLocalFile(
false, new org.apache.hadoop.fs.Path(getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input2")
.getPath()),
new org.apache.hadoop.fs.Path(workingDir + "/COCI/input2"));
fs
.copyFromLocalFile(
false, new org.apache.hadoop.fs.Path(getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input3")
.getPath()),
new org.apache.hadoop.fs.Path(workingDir + "/COCI/input3"));
fs
.copyFromLocalFile(
false, new org.apache.hadoop.fs.Path(getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input4")
.getPath()),
new org.apache.hadoop.fs.Path(workingDir + "/COCI/input4"));
fs
.copyFromLocalFile(
false, new org.apache.hadoop.fs.Path(getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input5")
.getPath()),
new org.apache.hadoop.fs.Path(workingDir + "/COCI/input5"));
ReadCOCI
.main(
new String[] {
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-workingPath",
workingDir.toString() + "/COCI",
"-outputPath",
workingDir.toString() + "/COCI_json/",
"-inputFile", "input1;input2;input3;input4;input5"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<COCI> tmp = sc
.textFile(workingDir.toString() + "/COCI_json/*/")
.map(item -> OBJECT_MAPPER.readValue(item, COCI.class));
Assertions.assertEquals(24, tmp.count());
Assertions.assertEquals(1, tmp.filter(c -> c.getCiting().equals("10.1207/s15327647jcd3,4-01")).count());
Assertions.assertEquals(8, tmp.filter(c -> c.getCiting().indexOf(".refs") > -1).count());
}
}

View File

@ -0,0 +1,2 @@
oci,citing,cited,creation,timespan,journal_sc,author_sc
0200102000736280105030207060407191213036204630001-02001000107362800030005000000090000000006060903,"10.1207/s15327647jcd3,4-01",10.1017/s0305000900006693,2002-11-01,P17Y1M,no,no

View File

@ -0,0 +1,2 @@
oci,citing,cited,creation,timespan,journal_sc,author_sc
02001000007362801000805046300010563030608046333-02001000007362801000805046300010563030608046333,10.1007/s10854-015-3684-x,10.1007/s10854-015-3684-x,2015-09-01,P7Y2M,no,no

View File

@ -44,7 +44,7 @@
<pluginRepository> <pluginRepository>
<id>iis-releases</id> <id>iis-releases</id>
<name>iis releases plugin repository</name> <name>iis releases plugin repository</name>
<url>http://maven.ceon.pl/artifactory/iis-releases</url> <url>https://maven.ceon.pl/artifactory/iis-releases</url>
<layout>default</layout> <layout>default</layout>
</pluginRepository> </pluginRepository>
</pluginRepositories> </pluginRepositories>