openCitations #194
|
@ -14,6 +14,7 @@ import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.function.FilterFunction;
|
import org.apache.spark.api.java.function.FilterFunction;
|
||||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -21,6 +22,7 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
|
@ -83,10 +85,13 @@ public class CreateActionSetSparkJob implements Serializable {
|
||||||
private static void extractContent(SparkSession spark, String inputPath, String outputPath,
|
private static void extractContent(SparkSession spark, String inputPath, String outputPath,
|
||||||
boolean shouldDuplicateRels) {
|
boolean shouldDuplicateRels) {
|
||||||
spark
|
spark
|
||||||
.sqlContext()
|
.read()
|
||||||
.createDataset(spark.sparkContext().textFile(inputPath + "/*", 6000), Encoders.STRING())
|
.textFile(inputPath + "/*")
|
||||||
|
.map(
|
||||||
|
(MapFunction<String, COCI>) value -> OBJECT_MAPPER.readValue(value, COCI.class),
|
||||||
|
Encoders.bean(COCI.class))
|
||||||
.flatMap(
|
.flatMap(
|
||||||
(FlatMapFunction<String, Relation>) value -> createRelation(value, shouldDuplicateRels).iterator(),
|
(FlatMapFunction<COCI, Relation>) value -> createRelation(value, shouldDuplicateRels).iterator(),
|
||||||
Encoders.bean(Relation.class))
|
Encoders.bean(Relation.class))
|
||||||
.filter((FilterFunction<Relation>) value -> value != null)
|
.filter((FilterFunction<Relation>) value -> value != null)
|
||||||
.toJavaRDD()
|
.toJavaRDD()
|
||||||
|
@ -98,27 +103,30 @@ public class CreateActionSetSparkJob implements Serializable {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<Relation> createRelation(String value, boolean duplicate) {
|
private static List<Relation> createRelation(COCI value, boolean duplicate) {
|
||||||
String[] line = value.split(",");
|
|
||||||
if (!line[1].startsWith("10.")) {
|
|
||||||
return new ArrayList<>();
|
|
||||||
}
|
|
||||||
List<Relation> relationList = new ArrayList<>();
|
List<Relation> relationList = new ArrayList<>();
|
||||||
|
|
||||||
String citing = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", line[1]));
|
String citing = ID_PREFIX
|
||||||
final String cited = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", line[2]));
|
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCiting()));
|
||||||
|
final String cited = ID_PREFIX
|
||||||
|
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCited()));
|
||||||
|
|
||||||
|
if(!citing.equals(cited)){
|
||||||
relationList
|
relationList
|
||||||
.addAll(
|
.addAll(
|
||||||
getRelations(
|
getRelations(
|
||||||
citing,
|
citing,
|
||||||
cited));
|
cited));
|
||||||
|
|
||||||
if (duplicate && line[1].endsWith(".refs")) {
|
if (duplicate && value.getCiting().endsWith(".refs")) {
|
||||||
citing = ID_PREFIX + IdentifierFactory
|
citing = ID_PREFIX + IdentifierFactory
|
||||||
.md5(CleaningFunctions.normalizePidValue("doi", line[1].substring(0, line[1].indexOf(".refs"))));
|
.md5(
|
||||||
|
CleaningFunctions
|
||||||
|
.normalizePidValue("doi", value.getCiting().substring(0, value.getCiting().indexOf(".refs"))));
|
||||||
relationList.addAll(getRelations(citing, cited));
|
relationList.addAll(getRelations(citing, cited));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return relationList;
|
return relationList;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,103 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.opencitations;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.actionmanager.Constants.DEFAULT_DELIMITER;
|
||||||
|
import static eu.dnetlib.dhp.actionmanager.Constants.isSparkSessionManaged;
|
||||||
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
import org.apache.spark.sql.*;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI;
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
|
||||||
|
public class ReadCOCI implements Serializable {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(ReadCOCI.class);
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
String jsonConfiguration = IOUtils
|
||||||
|
.toString(
|
||||||
|
ReadCOCI.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json"));
|
||||||
|
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
final String outputPath = parser.get("outputPath");
|
||||||
|
log.info("outputPath: {}", outputPath);
|
||||||
|
|
||||||
|
final String[] inputFile = parser.get("inputFile").split(";");
|
||||||
|
log.info("inputFile {}", inputFile.toString());
|
||||||
|
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
|
||||||
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
|
final String workingPath = parser.get("workingPath");
|
||||||
|
log.info("workingPath {}", workingPath);
|
||||||
|
|
||||||
|
SparkConf sconf = new SparkConf();
|
||||||
|
|
||||||
|
final String delimiter = Optional
|
||||||
|
.ofNullable(parser.get("delimiter"))
|
||||||
|
.orElse(DEFAULT_DELIMITER);
|
||||||
|
|
||||||
|
runWithSparkSession(
|
||||||
|
sconf,
|
||||||
|
isSparkSessionManaged,
|
||||||
|
spark -> {
|
||||||
|
doRead(
|
||||||
|
spark,
|
||||||
|
workingPath,
|
||||||
|
inputFile,
|
||||||
|
outputPath,
|
||||||
|
delimiter);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void doRead(SparkSession spark, String workingPath, String[] inputFiles,
|
||||||
|
String outputPath,
|
||||||
|
String delimiter) throws IOException {
|
||||||
|
|
||||||
|
for(String inputFile : inputFiles){
|
||||||
|
String p_string = workingPath + "/" + inputFile ;
|
||||||
|
|
||||||
|
Dataset<Row> cociData = spark
|
||||||
|
.read()
|
||||||
|
.format("csv")
|
||||||
|
.option("sep", delimiter)
|
||||||
|
.option("inferSchema", "true")
|
||||||
|
.option("header", "true")
|
||||||
|
.option("quotes", "\"")
|
||||||
|
.load(p_string)
|
||||||
|
.repartition(100);
|
||||||
|
|
||||||
|
cociData.map((MapFunction<Row, COCI>) row -> {
|
||||||
|
COCI coci = new COCI();
|
||||||
|
coci.setOci(row.getString(0));
|
||||||
|
coci.setCiting(row.getString(1));
|
||||||
|
coci.setCited(row.getString(2));
|
||||||
|
return coci;
|
||||||
|
}, Encoders.bean(COCI.class))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(outputPath + inputFile);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,41 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.opencitations.model;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
import com.opencsv.bean.CsvBindByPosition;
|
||||||
|
|
||||||
|
public class COCI implements Serializable {
|
||||||
|
private String oci;
|
||||||
|
|
||||||
|
|||||||
|
private String citing;
|
||||||
|
|
||||||
|
private String cited;
|
||||||
|
|
||||||
|
|
||||||
|
public String getOci() {
|
||||||
|
return oci;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setOci(String oci) {
|
||||||
|
this.oci = oci;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getCiting() {
|
||||||
|
return citing;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCiting(String citing) {
|
||||||
|
this.citing = citing;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getCited() {
|
||||||
|
return cited;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCited(String cited) {
|
||||||
|
this.cited = cited;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,37 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"paramName": "wp",
|
||||||
|
"paramLongName": "workingPath",
|
||||||
|
"paramDescription": "the zipped opencitations file",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
|
||||||
|
|
||||||
|
{
|
||||||
|
"paramName": "issm",
|
||||||
|
"paramLongName": "isSparkSessionManaged",
|
||||||
|
"paramDescription": "the hdfs name node",
|
||||||
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "d",
|
||||||
|
"paramLongName": "delimiter",
|
||||||
|
"paramDescription": "the hdfs name node",
|
||||||
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "op",
|
||||||
|
"paramLongName": "outputPath",
|
||||||
|
"paramDescription": "the hdfs name node",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "if",
|
||||||
|
"paramLongName": "inputFile",
|
||||||
|
"paramDescription": "the hdfs name node",
|
||||||
|
"paramRequired": true
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -26,6 +26,7 @@
|
||||||
<switch>
|
<switch>
|
||||||
<case to="download">${wf:conf('resumeFrom') eq 'DownloadDump'}</case>
|
<case to="download">${wf:conf('resumeFrom') eq 'DownloadDump'}</case>
|
||||||
<case to="extract">${wf:conf('resumeFrom') eq 'ExtractContent'}</case>
|
<case to="extract">${wf:conf('resumeFrom') eq 'ExtractContent'}</case>
|
||||||
|
<case to="read">${wf:conf('resumeFrom') eq 'ReadContent'}</case>
|
||||||
<default to="create_actionset"/> <!-- first action to be done when downloadDump is to be performed -->
|
<default to="create_actionset"/> <!-- first action to be done when downloadDump is to be performed -->
|
||||||
</switch>
|
</switch>
|
||||||
</decision>
|
</decision>
|
||||||
|
@ -60,6 +61,32 @@
|
||||||
<arg>--inputFile</arg><arg>${inputFile}</arg>
|
<arg>--inputFile</arg><arg>${inputFile}</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
</java>
|
</java>
|
||||||
|
<ok to="read"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="read">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Produces the AS for OC</name>
|
||||||
|
<class>eu.dnetlib.dhp.actionmanager.opencitations.ReadCOCI</class>
|
||||||
|
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--workingPath</arg><arg>${workingPath}/COCI</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${workingPath}/COCI_JSON</arg>
|
||||||
|
<arg>--delimiter</arg><arg>${delimiter}</arg>
|
||||||
|
<arg>--inputFile</arg><arg>${inputFileCoci}</arg>
|
||||||
|
</spark>
|
||||||
<ok to="create_actionset"/>
|
<ok to="create_actionset"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
@ -81,7 +108,7 @@
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--inputPath</arg><arg>${workingPath}/COCI</arg>
|
<arg>--inputPath</arg><arg>${workingPath}/COCI_JSON</arg>
|
||||||
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
|
|
|
@ -76,7 +76,7 @@ public class CreateOpenCitationsASTest {
|
||||||
|
|
||||||
String inputPath = getClass()
|
String inputPath = getClass()
|
||||||
.getResource(
|
.getResource(
|
||||||
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles")
|
"/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
|
||||||
.getPath();
|
.getPath();
|
||||||
|
|
||||||
CreateActionSetSparkJob
|
CreateActionSetSparkJob
|
||||||
|
@ -99,7 +99,7 @@ public class CreateOpenCitationsASTest {
|
||||||
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
|
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
|
||||||
.map(aa -> ((Relation) aa.getPayload()));
|
.map(aa -> ((Relation) aa.getPayload()));
|
||||||
|
|
||||||
assertEquals(60, tmp.count());
|
assertEquals(62, tmp.count());
|
||||||
|
|
||||||
// tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r)));
|
// tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r)));
|
||||||
|
|
||||||
|
@ -110,7 +110,7 @@ public class CreateOpenCitationsASTest {
|
||||||
|
|
||||||
String inputPath = getClass()
|
String inputPath = getClass()
|
||||||
.getResource(
|
.getResource(
|
||||||
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles")
|
"/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
|
||||||
.getPath();
|
.getPath();
|
||||||
|
|
||||||
CreateActionSetSparkJob
|
CreateActionSetSparkJob
|
||||||
|
@ -131,7 +131,7 @@ public class CreateOpenCitationsASTest {
|
||||||
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
|
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
|
||||||
.map(aa -> ((Relation) aa.getPayload()));
|
.map(aa -> ((Relation) aa.getPayload()));
|
||||||
|
|
||||||
assertEquals(44, tmp.count());
|
assertEquals(46, tmp.count());
|
||||||
|
|
||||||
// tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r)));
|
// tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r)));
|
||||||
|
|
||||||
|
@ -142,7 +142,7 @@ public class CreateOpenCitationsASTest {
|
||||||
|
|
||||||
String inputPath = getClass()
|
String inputPath = getClass()
|
||||||
.getResource(
|
.getResource(
|
||||||
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles")
|
"/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
|
||||||
.getPath();
|
.getPath();
|
||||||
|
|
||||||
CreateActionSetSparkJob
|
CreateActionSetSparkJob
|
||||||
|
@ -175,7 +175,7 @@ public class CreateOpenCitationsASTest {
|
||||||
|
|
||||||
String inputPath = getClass()
|
String inputPath = getClass()
|
||||||
.getResource(
|
.getResource(
|
||||||
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles")
|
"/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
|
||||||
.getPath();
|
.getPath();
|
||||||
|
|
||||||
CreateActionSetSparkJob
|
CreateActionSetSparkJob
|
||||||
|
@ -215,7 +215,7 @@ public class CreateOpenCitationsASTest {
|
||||||
|
|
||||||
String inputPath = getClass()
|
String inputPath = getClass()
|
||||||
.getResource(
|
.getResource(
|
||||||
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles")
|
"/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
|
||||||
.getPath();
|
.getPath();
|
||||||
|
|
||||||
CreateActionSetSparkJob
|
CreateActionSetSparkJob
|
||||||
|
@ -240,8 +240,8 @@ public class CreateOpenCitationsASTest {
|
||||||
assertEquals("citation", r.getSubRelType());
|
assertEquals("citation", r.getSubRelType());
|
||||||
assertEquals("resultResult", r.getRelType());
|
assertEquals("resultResult", r.getRelType());
|
||||||
});
|
});
|
||||||
assertEquals(22, tmp.filter(r -> r.getRelClass().equals("Cites")).count());
|
assertEquals(23, tmp.filter(r -> r.getRelClass().equals("Cites")).count());
|
||||||
assertEquals(22, tmp.filter(r -> r.getRelClass().equals("IsCitedBy")).count());
|
assertEquals(23, tmp.filter(r -> r.getRelClass().equals("IsCitedBy")).count());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -250,7 +250,7 @@ public class CreateOpenCitationsASTest {
|
||||||
|
|
||||||
String inputPath = getClass()
|
String inputPath = getClass()
|
||||||
.getResource(
|
.getResource(
|
||||||
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles")
|
"/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
|
||||||
.getPath();
|
.getPath();
|
||||||
|
|
||||||
CreateActionSetSparkJob
|
CreateActionSetSparkJob
|
||||||
|
@ -295,7 +295,7 @@ public class CreateOpenCitationsASTest {
|
||||||
|
|
||||||
String inputPath = getClass()
|
String inputPath = getClass()
|
||||||
.getResource(
|
.getResource(
|
||||||
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles")
|
"/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
|
||||||
.getPath();
|
.getPath();
|
||||||
|
|
||||||
CreateActionSetSparkJob
|
CreateActionSetSparkJob
|
||||||
|
|
|
@ -0,0 +1,140 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.opencitations;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.actionmanager.Constants.DEFAULT_DELIMITER;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.LocalFileSystem;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.junit.jupiter.api.AfterAll;
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||||
|
|
||||||
|
public class ReadCOCITest {
|
||||||
|
|
||||||
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
private static SparkSession spark;
|
||||||
|
|
||||||
|
private static Path workingDir;
|
||||||
|
private static final Logger log = LoggerFactory
|
||||||
|
.getLogger(ReadCOCITest.class);
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
public static void beforeAll() throws IOException {
|
||||||
|
workingDir = Files
|
||||||
|
.createTempDirectory(ReadCOCITest.class.getSimpleName());
|
||||||
|
log.info("using work dir {}", workingDir);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
conf.setAppName(ReadCOCITest.class.getSimpleName());
|
||||||
|
|
||||||
|
conf.setMaster("local[*]");
|
||||||
|
conf.set("spark.driver.host", "localhost");
|
||||||
|
conf.set("hive.metastore.local", "true");
|
||||||
|
conf.set("spark.ui.enabled", "false");
|
||||||
|
conf.set("spark.sql.warehouse.dir", workingDir.toString());
|
||||||
|
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
|
||||||
|
|
||||||
|
spark = SparkSession
|
||||||
|
.builder()
|
||||||
|
.appName(ReadCOCITest.class.getSimpleName())
|
||||||
|
.config(conf)
|
||||||
|
.getOrCreate();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
public static void afterAll() throws IOException {
|
||||||
|
FileUtils.deleteDirectory(workingDir.toFile());
|
||||||
|
spark.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testReadCOCI() throws Exception {
|
||||||
|
String inputPath = getClass()
|
||||||
|
.getResource(
|
||||||
|
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles")
|
||||||
|
.getPath();
|
||||||
|
|
||||||
|
LocalFileSystem fs = FileSystem.getLocal(new Configuration());
|
||||||
|
fs
|
||||||
|
.copyFromLocalFile(
|
||||||
|
false, new org.apache.hadoop.fs.Path(getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input1")
|
||||||
|
.getPath()),
|
||||||
|
new org.apache.hadoop.fs.Path(workingDir + "/COCI/input1"));
|
||||||
|
|
||||||
|
fs
|
||||||
|
.copyFromLocalFile(
|
||||||
|
false, new org.apache.hadoop.fs.Path(getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input2")
|
||||||
|
.getPath()),
|
||||||
|
new org.apache.hadoop.fs.Path(workingDir + "/COCI/input2"));
|
||||||
|
|
||||||
|
fs
|
||||||
|
.copyFromLocalFile(
|
||||||
|
false, new org.apache.hadoop.fs.Path(getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input3")
|
||||||
|
.getPath()),
|
||||||
|
new org.apache.hadoop.fs.Path(workingDir + "/COCI/input3"));
|
||||||
|
|
||||||
|
fs
|
||||||
|
.copyFromLocalFile(
|
||||||
|
false, new org.apache.hadoop.fs.Path(getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input4")
|
||||||
|
.getPath()),
|
||||||
|
new org.apache.hadoop.fs.Path(workingDir + "/COCI/input4"));
|
||||||
|
|
||||||
|
fs
|
||||||
|
.copyFromLocalFile(
|
||||||
|
false, new org.apache.hadoop.fs.Path(getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input5")
|
||||||
|
.getPath()),
|
||||||
|
new org.apache.hadoop.fs.Path(workingDir + "/COCI/input5"));
|
||||||
|
|
||||||
|
ReadCOCI
|
||||||
|
.main(
|
||||||
|
new String[] {
|
||||||
|
"-isSparkSessionManaged",
|
||||||
|
Boolean.FALSE.toString(),
|
||||||
|
"-workingPath",
|
||||||
|
workingDir.toString() + "/COCI",
|
||||||
|
"-outputPath",
|
||||||
|
workingDir.toString() + "/COCI_json/",
|
||||||
|
"-inputFile", "input1;input2;input3;input4;input5"
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
JavaRDD<COCI> tmp = sc
|
||||||
|
.textFile(workingDir.toString() + "/COCI_json/*/")
|
||||||
|
.map(item -> OBJECT_MAPPER.readValue(item, COCI.class));
|
||||||
|
|
||||||
|
Assertions.assertEquals(24, tmp.count());
|
||||||
|
|
||||||
|
Assertions.assertEquals(1, tmp.filter(c -> c.getCiting().equals("10.1207/s15327647jcd3,4-01")).count());
|
||||||
|
|
||||||
|
Assertions.assertEquals(8, tmp.filter(c -> c.getCiting().indexOf(".refs") > -1).count());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
@ -0,0 +1,2 @@
|
||||||
|
oci,citing,cited,creation,timespan,journal_sc,author_sc
|
||||||
|
0200102000736280105030207060407191213036204630001-02001000107362800030005000000090000000006060903,"10.1207/s15327647jcd3,4-01",10.1017/s0305000900006693,2002-11-01,P17Y1M,no,no
|
|
@ -0,0 +1,2 @@
|
||||||
|
oci,citing,cited,creation,timespan,journal_sc,author_sc
|
||||||
|
02001000007362801000805046300010563030608046333-02001000007362801000805046300010563030608046333,10.1007/s10854-015-3684-x,10.1007/s10854-015-3684-x,2015-09-01,P7Y2M,no,no
|
|
@ -44,7 +44,7 @@
|
||||||
<pluginRepository>
|
<pluginRepository>
|
||||||
<id>iis-releases</id>
|
<id>iis-releases</id>
|
||||||
<name>iis releases plugin repository</name>
|
<name>iis releases plugin repository</name>
|
||||||
<url>http://maven.ceon.pl/artifactory/iis-releases</url>
|
<url>https://maven.ceon.pl/artifactory/iis-releases</url>
|
||||||
<layout>default</layout>
|
<layout>default</layout>
|
||||||
</pluginRepository>
|
</pluginRepository>
|
||||||
</pluginRepositories>
|
</pluginRepositories>
|
||||||
|
|
Loading…
Reference in New Issue
This commented line is confusing. The first column of the csv is the coci id, not the doi.
The same applies to the commented lines below
done