Add actionset creation for pubmed affiliations
This commit is contained in:
parent
94089878fd
commit
6ce9b600c1
|
@ -12,6 +12,7 @@ import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.io.compress.GzipCodec;
|
import org.apache.hadoop.io.compress.GzipCodec;
|
||||||
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||||
import org.apache.spark.sql.*;
|
import org.apache.spark.sql.*;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
|
@ -58,10 +59,13 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
final String inputPath = parser.get("inputPath");
|
final String inputPath = parser.get("inputPath");
|
||||||
log.info("inputPath {}: ", inputPath);
|
log.info("inputPath: {}", inputPath);
|
||||||
|
|
||||||
|
final String pubmedInputPath = parser.get("pubmedInputPath");
|
||||||
|
log.info("pubmedInputPath: {}", pubmedInputPath);
|
||||||
|
|
||||||
final String outputPath = parser.get("outputPath");
|
final String outputPath = parser.get("outputPath");
|
||||||
log.info("outputPath {}: ", outputPath);
|
log.info("outputPath: {}", outputPath);
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
SparkConf conf = new SparkConf();
|
||||||
|
|
||||||
|
@ -70,12 +74,22 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
isSparkSessionManaged,
|
isSparkSessionManaged,
|
||||||
spark -> {
|
spark -> {
|
||||||
Constants.removeOutputDir(spark, outputPath);
|
Constants.removeOutputDir(spark, outputPath);
|
||||||
prepareAffiliationRelations(spark, inputPath, outputPath);
|
|
||||||
|
List<KeyValue> collectedFromCrossref = OafMapperUtils.listKeyValues(ModelConstants.CROSSREF_ID, "Crossref");
|
||||||
|
JavaPairRDD<Text, Text> crossrefRelations = prepareAffiliationRelations(spark, inputPath, collectedFromCrossref);
|
||||||
|
|
||||||
|
List<KeyValue> collectedFromPubmed = OafMapperUtils.listKeyValues(ModelConstants.PUBMED_CENTRAL_ID, "Pubmed");
|
||||||
|
JavaPairRDD<Text, Text> pubmedRelations = prepareAffiliationRelations(spark, inputPath, collectedFromPubmed);
|
||||||
|
|
||||||
|
crossrefRelations
|
||||||
|
.union(pubmedRelations)
|
||||||
|
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
||||||
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <I extends Result> void prepareAffiliationRelations(SparkSession spark, String inputPath,
|
private static <I extends Result> JavaPairRDD<Text, Text> prepareAffiliationRelations(SparkSession spark, String inputPath,
|
||||||
String outputPath) {
|
List<KeyValue> collectedfrom) {
|
||||||
|
|
||||||
// load and parse affiliation relations from HDFS
|
// load and parse affiliation relations from HDFS
|
||||||
Dataset<Row> df = spark
|
Dataset<Row> df = spark
|
||||||
|
@ -92,7 +106,7 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
new Column("matching.Confidence").as("confidence"));
|
new Column("matching.Confidence").as("confidence"));
|
||||||
|
|
||||||
// prepare action sets for affiliation relations
|
// prepare action sets for affiliation relations
|
||||||
df
|
return df
|
||||||
.toJavaRDD()
|
.toJavaRDD()
|
||||||
.flatMap((FlatMapFunction<Row, Relation>) row -> {
|
.flatMap((FlatMapFunction<Row, Relation>) row -> {
|
||||||
|
|
||||||
|
@ -120,8 +134,6 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
qualifier,
|
qualifier,
|
||||||
Double.toString(row.getAs("confidence")));
|
Double.toString(row.getAs("confidence")));
|
||||||
|
|
||||||
List<KeyValue> collectedfrom = OafMapperUtils.listKeyValues(ModelConstants.CROSSREF_ID, "Crossref");
|
|
||||||
|
|
||||||
// return bi-directional relations
|
// return bi-directional relations
|
||||||
return getAffiliationRelationPair(paperId, affId, collectedfrom, dataInfo).iterator();
|
return getAffiliationRelationPair(paperId, affId, collectedfrom, dataInfo).iterator();
|
||||||
|
|
||||||
|
@ -129,9 +141,7 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
.map(p -> new AtomicAction(Relation.class, p))
|
.map(p -> new AtomicAction(Relation.class, p))
|
||||||
.mapToPair(
|
.mapToPair(
|
||||||
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
|
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
|
||||||
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
|
new Text(OBJECT_MAPPER.writeValueAsString(aa))));
|
||||||
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<Relation> getAffiliationRelationPair(String paperId, String affId, List<KeyValue> collectedfrom,
|
private static List<Relation> getAffiliationRelationPair(String paperId, String affId, List<KeyValue> collectedfrom,
|
||||||
|
|
|
@ -8,7 +8,13 @@
|
||||||
{
|
{
|
||||||
"paramName": "ip",
|
"paramName": "ip",
|
||||||
"paramLongName": "inputPath",
|
"paramLongName": "inputPath",
|
||||||
"paramDescription": "the URL from where to get the programme file",
|
"paramDescription": "the path to get the input data from Crossref",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "pip",
|
||||||
|
"paramLongName": "pubmedInputPath",
|
||||||
|
"paramDescription": "the path to get the input data from Pubmed",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
|
@ -32,4 +32,5 @@ spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListen
|
||||||
oozie.wf.application.path=${oozieTopWfApplicationPath}
|
oozie.wf.application.path=${oozieTopWfApplicationPath}
|
||||||
|
|
||||||
inputPath=/data/bip-affiliations/data.json
|
inputPath=/data/bip-affiliations/data.json
|
||||||
|
pubmedInputPath=/data/bip-affiiations/pubmed-data.json
|
||||||
outputPath=/tmp/crossref-affiliations-output-v5
|
outputPath=/tmp/crossref-affiliations-output-v5
|
||||||
|
|
|
@ -3,7 +3,11 @@
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>inputPath</name>
|
<name>inputPath</name>
|
||||||
<description>the path where to find the inferred affiliation relations</description>
|
<description>the path where to find the inferred affiliation relations from Crossref</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>pubmedInputPath</name>
|
||||||
|
<description>the path where to find the inferred affiliation relations from Pubmed</description>
|
||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>outputPath</name>
|
<name>outputPath</name>
|
||||||
|
@ -97,6 +101,7 @@
|
||||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--inputPath</arg><arg>${inputPath}</arg>
|
<arg>--inputPath</arg><arg>${inputPath}</arg>
|
||||||
|
<arg>--pubmedInputPath</arg><arg>${pubmedInputPath}</arg>
|
||||||
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
|
|
Loading…
Reference in New Issue