forked from D-Net/dnet-hadoop
extended existing codo to accomodate import of POCI from open citation
This commit is contained in:
parent
0935d7757c
commit
e84f5b5e64
|
@ -26,7 +26,6 @@ import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions;
|
import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions;
|
||||||
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
|
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
|
||||||
|
@ -35,7 +34,9 @@ import scala.Tuple2;
|
||||||
public class CreateActionSetSparkJob implements Serializable {
|
public class CreateActionSetSparkJob implements Serializable {
|
||||||
public static final String OPENCITATIONS_CLASSID = "sysimport:crosswalk:opencitations";
|
public static final String OPENCITATIONS_CLASSID = "sysimport:crosswalk:opencitations";
|
||||||
public static final String OPENCITATIONS_CLASSNAME = "Imported from OpenCitations";
|
public static final String OPENCITATIONS_CLASSNAME = "Imported from OpenCitations";
|
||||||
private static final String ID_PREFIX = "50|doi_________::";
|
private static final String DOI_PREFIX = "50|doi_________::";
|
||||||
|
|
||||||
|
private static final String PMID_PREFIX = "50|pmid________::";
|
||||||
private static final String TRUST = "0.91";
|
private static final String TRUST = "0.91";
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(CreateActionSetSparkJob.class);
|
private static final Logger log = LoggerFactory.getLogger(CreateActionSetSparkJob.class);
|
||||||
|
@ -67,6 +68,9 @@ public class CreateActionSetSparkJob implements Serializable {
|
||||||
final String outputPath = parser.get("outputPath");
|
final String outputPath = parser.get("outputPath");
|
||||||
log.info("outputPath {}", outputPath);
|
log.info("outputPath {}", outputPath);
|
||||||
|
|
||||||
|
final String prefix = parser.get("prefix");
|
||||||
|
log.info("prefix {}", prefix);
|
||||||
|
|
||||||
final boolean shouldDuplicateRels = Optional
|
final boolean shouldDuplicateRels = Optional
|
||||||
.ofNullable(parser.get("shouldDuplicateRels"))
|
.ofNullable(parser.get("shouldDuplicateRels"))
|
||||||
.map(Boolean::valueOf)
|
.map(Boolean::valueOf)
|
||||||
|
@ -77,13 +81,13 @@ public class CreateActionSetSparkJob implements Serializable {
|
||||||
conf,
|
conf,
|
||||||
isSparkSessionManaged,
|
isSparkSessionManaged,
|
||||||
spark -> {
|
spark -> {
|
||||||
extractContent(spark, inputPath, outputPath, shouldDuplicateRels);
|
extractContent(spark, inputPath, outputPath, shouldDuplicateRels, prefix);
|
||||||
});
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void extractContent(SparkSession spark, String inputPath, String outputPath,
|
private static void extractContent(SparkSession spark, String inputPath, String outputPath,
|
||||||
boolean shouldDuplicateRels) {
|
boolean shouldDuplicateRels, String prefix) {
|
||||||
spark
|
spark
|
||||||
.read()
|
.read()
|
||||||
.textFile(inputPath + "/*")
|
.textFile(inputPath + "/*")
|
||||||
|
@ -91,7 +95,8 @@ public class CreateActionSetSparkJob implements Serializable {
|
||||||
(MapFunction<String, COCI>) value -> OBJECT_MAPPER.readValue(value, COCI.class),
|
(MapFunction<String, COCI>) value -> OBJECT_MAPPER.readValue(value, COCI.class),
|
||||||
Encoders.bean(COCI.class))
|
Encoders.bean(COCI.class))
|
||||||
.flatMap(
|
.flatMap(
|
||||||
(FlatMapFunction<COCI, Relation>) value -> createRelation(value, shouldDuplicateRels).iterator(),
|
(FlatMapFunction<COCI, Relation>) value -> createRelation(value, shouldDuplicateRels, prefix)
|
||||||
|
.iterator(),
|
||||||
Encoders.bean(Relation.class))
|
Encoders.bean(Relation.class))
|
||||||
.filter((FilterFunction<Relation>) value -> value != null)
|
.filter((FilterFunction<Relation>) value -> value != null)
|
||||||
.toJavaRDD()
|
.toJavaRDD()
|
||||||
|
@ -103,13 +108,19 @@ public class CreateActionSetSparkJob implements Serializable {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<Relation> createRelation(COCI value, boolean duplicate) {
|
private static List<Relation> createRelation(COCI value, boolean duplicate, String p) {
|
||||||
|
|
||||||
List<Relation> relationList = new ArrayList<>();
|
List<Relation> relationList = new ArrayList<>();
|
||||||
|
String prefix;
|
||||||
|
if (p.equals("COCI")) {
|
||||||
|
prefix = DOI_PREFIX;
|
||||||
|
} else {
|
||||||
|
prefix = PMID_PREFIX;
|
||||||
|
}
|
||||||
|
|
||||||
String citing = ID_PREFIX
|
String citing = prefix
|
||||||
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCiting()));
|
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCiting()));
|
||||||
final String cited = ID_PREFIX
|
final String cited = prefix
|
||||||
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCited()));
|
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCited()));
|
||||||
|
|
||||||
if (!citing.equals(cited)) {
|
if (!citing.equals(cited)) {
|
||||||
|
@ -120,7 +131,7 @@ public class CreateActionSetSparkJob implements Serializable {
|
||||||
cited, ModelConstants.CITES));
|
cited, ModelConstants.CITES));
|
||||||
|
|
||||||
if (duplicate && value.getCiting().endsWith(".refs")) {
|
if (duplicate && value.getCiting().endsWith(".refs")) {
|
||||||
citing = ID_PREFIX + IdentifierFactory
|
citing = prefix + IdentifierFactory
|
||||||
.md5(
|
.md5(
|
||||||
CleaningFunctions
|
CleaningFunctions
|
||||||
.normalizePidValue(
|
.normalizePidValue(
|
||||||
|
|
|
@ -45,6 +45,9 @@ public class GetOpenCitationsRefs implements Serializable {
|
||||||
final String hdfsNameNode = parser.get("hdfsNameNode");
|
final String hdfsNameNode = parser.get("hdfsNameNode");
|
||||||
log.info("hdfsNameNode {}", hdfsNameNode);
|
log.info("hdfsNameNode {}", hdfsNameNode);
|
||||||
|
|
||||||
|
final String prefix = parser.get("prefix");
|
||||||
|
log.info("prefix {}", prefix);
|
||||||
|
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.set("fs.defaultFS", hdfsNameNode);
|
conf.set("fs.defaultFS", hdfsNameNode);
|
||||||
|
|
||||||
|
@ -53,30 +56,31 @@ public class GetOpenCitationsRefs implements Serializable {
|
||||||
GetOpenCitationsRefs ocr = new GetOpenCitationsRefs();
|
GetOpenCitationsRefs ocr = new GetOpenCitationsRefs();
|
||||||
|
|
||||||
for (String file : inputFile) {
|
for (String file : inputFile) {
|
||||||
ocr.doExtract(workingPath + "/Original/" + file, workingPath, fileSystem);
|
ocr.doExtract(workingPath + "/Original/" + file, workingPath, fileSystem, prefix);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void doExtract(String inputFile, String workingPath, FileSystem fileSystem)
|
private void doExtract(String inputFile, String workingPath, FileSystem fileSystem, String prefix)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
final Path path = new Path(inputFile);
|
final Path path = new Path(inputFile);
|
||||||
|
|
||||||
FSDataInputStream oc_zip = fileSystem.open(path);
|
FSDataInputStream oc_zip = fileSystem.open(path);
|
||||||
|
|
||||||
int count = 1;
|
// int count = 1;
|
||||||
try (ZipInputStream zis = new ZipInputStream(oc_zip)) {
|
try (ZipInputStream zis = new ZipInputStream(oc_zip)) {
|
||||||
ZipEntry entry = null;
|
ZipEntry entry = null;
|
||||||
while ((entry = zis.getNextEntry()) != null) {
|
while ((entry = zis.getNextEntry()) != null) {
|
||||||
|
|
||||||
if (!entry.isDirectory()) {
|
if (!entry.isDirectory()) {
|
||||||
String fileName = entry.getName();
|
String fileName = entry.getName();
|
||||||
fileName = fileName.substring(0, fileName.indexOf("T")) + "_" + count;
|
// fileName = fileName.substring(0, fileName.indexOf("T")) + "_" + count;
|
||||||
count++;
|
fileName = fileName.substring(0, fileName.lastIndexOf("."));
|
||||||
|
// count++;
|
||||||
try (
|
try (
|
||||||
FSDataOutputStream out = fileSystem
|
FSDataOutputStream out = fileSystem
|
||||||
.create(new Path(workingPath + "/COCI/" + fileName + ".gz"));
|
.create(new Path(workingPath + "/" + prefix + "/" + fileName + ".gz"));
|
||||||
GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) {
|
GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) {
|
||||||
|
|
||||||
IOUtils.copy(zis, gzipOs);
|
IOUtils.copy(zis, gzipOs);
|
||||||
|
|
|
@ -49,6 +49,9 @@ public class ReadCOCI implements Serializable {
|
||||||
final String workingPath = parser.get("workingPath");
|
final String workingPath = parser.get("workingPath");
|
||||||
log.info("workingPath {}", workingPath);
|
log.info("workingPath {}", workingPath);
|
||||||
|
|
||||||
|
final String format = parser.get("format");
|
||||||
|
log.info("format {}", format);
|
||||||
|
|
||||||
SparkConf sconf = new SparkConf();
|
SparkConf sconf = new SparkConf();
|
||||||
|
|
||||||
final String delimiter = Optional
|
final String delimiter = Optional
|
||||||
|
@ -64,13 +67,14 @@ public class ReadCOCI implements Serializable {
|
||||||
workingPath,
|
workingPath,
|
||||||
inputFile,
|
inputFile,
|
||||||
outputPath,
|
outputPath,
|
||||||
delimiter);
|
delimiter,
|
||||||
|
format);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void doRead(SparkSession spark, String workingPath, String[] inputFiles,
|
private static void doRead(SparkSession spark, String workingPath, String[] inputFiles,
|
||||||
String outputPath,
|
String outputPath,
|
||||||
String delimiter) throws IOException {
|
String delimiter, String format) throws IOException {
|
||||||
|
|
||||||
for (String inputFile : inputFiles) {
|
for (String inputFile : inputFiles) {
|
||||||
String p_string = workingPath + "/" + inputFile + ".gz";
|
String p_string = workingPath + "/" + inputFile + ".gz";
|
||||||
|
@ -87,9 +91,15 @@ public class ReadCOCI implements Serializable {
|
||||||
|
|
||||||
cociData.map((MapFunction<Row, COCI>) row -> {
|
cociData.map((MapFunction<Row, COCI>) row -> {
|
||||||
COCI coci = new COCI();
|
COCI coci = new COCI();
|
||||||
|
if (format.equals("COCI")) {
|
||||||
|
coci.setCiting(row.getString(1));
|
||||||
|
coci.setCited(row.getString(2));
|
||||||
|
} else {
|
||||||
|
coci.setCiting(String.valueOf(row.getInt(1)));
|
||||||
|
coci.setCited(String.valueOf(row.getInt(2)));
|
||||||
|
}
|
||||||
coci.setOci(row.getString(0));
|
coci.setOci(row.getString(0));
|
||||||
coci.setCiting(row.getString(1));
|
|
||||||
coci.setCited(row.getString(2));
|
|
||||||
return coci;
|
return coci;
|
||||||
}, Encoders.bean(COCI.class))
|
}, Encoders.bean(COCI.class))
|
||||||
.write()
|
.write()
|
||||||
|
|
|
@ -21,5 +21,10 @@
|
||||||
"paramLongName": "shouldDuplicateRels",
|
"paramLongName": "shouldDuplicateRels",
|
||||||
"paramDescription": "the hdfs name node",
|
"paramDescription": "the hdfs name node",
|
||||||
"paramRequired": false
|
"paramRequired": false
|
||||||
|
},{
|
||||||
|
"paramName": "p",
|
||||||
|
"paramLongName": "prefix",
|
||||||
|
"paramDescription": "the hdfs name node",
|
||||||
|
"paramRequired": true
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
|
|
@ -16,5 +16,11 @@
|
||||||
"paramLongName": "hdfsNameNode",
|
"paramLongName": "hdfsNameNode",
|
||||||
"paramDescription": "the hdfs name node",
|
"paramDescription": "the hdfs name node",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "p",
|
||||||
|
"paramLongName": "prefix",
|
||||||
|
"paramDescription": "COCI or POCI",
|
||||||
|
"paramRequired": true
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
|
|
@ -30,7 +30,12 @@
|
||||||
"paramLongName": "inputFile",
|
"paramLongName": "inputFile",
|
||||||
"paramDescription": "the hdfs name node",
|
"paramDescription": "the hdfs name node",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
}
|
}, {
|
||||||
|
"paramName": "f",
|
||||||
|
"paramLongName": "format",
|
||||||
|
"paramDescription": "the hdfs name node",
|
||||||
|
"paramRequired": true
|
||||||
|
}
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -60,6 +60,7 @@
|
||||||
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
|
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
|
||||||
<arg>--inputFile</arg><arg>${inputFile}</arg>
|
<arg>--inputFile</arg><arg>${inputFile}</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
<arg>--prefix</arg><arg>${prefix}</arg>
|
||||||
</java>
|
</java>
|
||||||
<ok to="read"/>
|
<ok to="read"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
@ -82,10 +83,11 @@
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--workingPath</arg><arg>${workingPath}/COCI</arg>
|
<arg>--workingPath</arg><arg>${workingPath}/${prefix}</arg>
|
||||||
<arg>--outputPath</arg><arg>${workingPath}/COCI_JSON/</arg>
|
<arg>--outputPath</arg><arg>${workingPath}/${prefix}_JSON/</arg>
|
||||||
<arg>--delimiter</arg><arg>${delimiter}</arg>
|
<arg>--delimiter</arg><arg>${delimiter}</arg>
|
||||||
<arg>--inputFile</arg><arg>${inputFileCoci}</arg>
|
<arg>--inputFile</arg><arg>${inputFileCoci}</arg>
|
||||||
|
<arg>--format</arg><arg>${prefix}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="create_actionset"/>
|
<ok to="create_actionset"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
@ -108,8 +110,9 @@
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--inputPath</arg><arg>${workingPath}/COCI_JSON</arg>
|
<arg>--inputPath</arg><arg>${workingPath}/${prefix}_JSON</arg>
|
||||||
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||||
|
<arg>--prefix</arg><arg>${prefix}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
|
Loading…
Reference in New Issue