merge upstream
This commit is contained in:
commit
b34177d8ef
|
@ -7,6 +7,7 @@ import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||||
|
|
||||||
public class ModelConstants {
|
public class ModelConstants {
|
||||||
|
|
||||||
|
public static final String DNET_SUBJECT_TYPOLOGIES = "dnet:subject_classification_typologies";
|
||||||
public static final String DNET_RESULT_TYPOLOGIES = "dnet:result_typologies";
|
public static final String DNET_RESULT_TYPOLOGIES = "dnet:result_typologies";
|
||||||
public static final String DNET_PUBLICATION_RESOURCE = "dnet:publication_resource";
|
public static final String DNET_PUBLICATION_RESOURCE = "dnet:publication_resource";
|
||||||
public static final String DNET_ACCESS_MODES = "dnet:access_modes";
|
public static final String DNET_ACCESS_MODES = "dnet:access_modes";
|
||||||
|
|
|
@ -289,4 +289,12 @@ public class JsonPathTest {
|
||||||
|
|
||||||
System.out.println("d = " + d);
|
System.out.println("d = " + d);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNull() throws Exception {
|
||||||
|
final Object p = null;
|
||||||
|
|
||||||
|
System.out.println((String) p);
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ import java.util.Collection;
|
||||||
import org.apache.spark.api.java.JavaPairRDD;
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.api.java.function.PairFunction;
|
import org.apache.spark.api.java.function.PairFunction;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
@ -15,6 +16,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
|
import eu.dnetlib.dhp.schema.scholexplorer.DLIDataset;
|
||||||
|
import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication;
|
||||||
import eu.dnetlib.pace.config.DedupConfig;
|
import eu.dnetlib.pace.config.DedupConfig;
|
||||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
@ -30,10 +33,16 @@ public class DedupRecordFactory {
|
||||||
final DedupConfig dedupConf) {
|
final DedupConfig dedupConf) {
|
||||||
long ts = System.currentTimeMillis();
|
long ts = System.currentTimeMillis();
|
||||||
// <id, json_entity>
|
// <id, json_entity>
|
||||||
final JavaPairRDD<String, String> inputJsonEntities = sc
|
final JavaPairRDD<String, String> inputJsonEntities = spark
|
||||||
.textFile(entitiesInputPath)
|
.read()
|
||||||
|
.load(entitiesInputPath)
|
||||||
|
.as(Encoders.kryo(Oaf.class))
|
||||||
|
.map(
|
||||||
|
(MapFunction<Oaf, String>) p -> new org.codehaus.jackson.map.ObjectMapper().writeValueAsString(p),
|
||||||
|
Encoders.STRING())
|
||||||
|
.javaRDD()
|
||||||
.mapToPair(
|
.mapToPair(
|
||||||
(PairFunction<String, String, String>) it -> new Tuple2<String, String>(
|
(PairFunction<String, String, String>) it -> new Tuple2<>(
|
||||||
MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), it), it));
|
MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), it), it));
|
||||||
|
|
||||||
// <source, target>: source is the dedup_id, target is the id of the mergedIn
|
// <source, target>: source is the dedup_id, target is the id of the mergedIn
|
||||||
|
@ -74,9 +83,9 @@ public class DedupRecordFactory {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Publication publicationMerger(Tuple2<String, Iterable<String>> e, final long ts) {
|
private static DLIPublication publicationMerger(Tuple2<String, Iterable<String>> e, final long ts) {
|
||||||
|
|
||||||
Publication p = new Publication(); // the result of the merge, to be returned at the end
|
DLIPublication p = new DLIPublication(); // the result of the merge, to be returned at the end
|
||||||
|
|
||||||
p.setId(e._1());
|
p.setId(e._1());
|
||||||
|
|
||||||
|
@ -110,9 +119,9 @@ public class DedupRecordFactory {
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Dataset datasetMerger(Tuple2<String, Iterable<String>> e, final long ts) {
|
private static DLIDataset datasetMerger(Tuple2<String, Iterable<String>> e, final long ts) {
|
||||||
|
|
||||||
Dataset d = new Dataset(); // the result of the merge, to be returned at the end
|
DLIDataset d = new DLIDataset(); // the result of the merge, to be returned at the end
|
||||||
|
|
||||||
d.setId(e._1());
|
d.setId(e._1());
|
||||||
|
|
||||||
|
|
|
@ -9,18 +9,21 @@ import org.apache.spark.api.java.JavaPairRDD;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.api.java.function.PairFunction;
|
import org.apache.spark.api.java.function.PairFunction;
|
||||||
import org.apache.spark.graphx.Edge;
|
import org.apache.spark.graphx.Edge;
|
||||||
import org.apache.spark.rdd.RDD;
|
import org.apache.spark.rdd.RDD;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
|
|
||||||
import com.google.common.hash.Hashing;
|
import com.google.common.hash.Hashing;
|
||||||
|
|
||||||
import eu.dnetlib.dedup.graph.ConnectedComponent;
|
import eu.dnetlib.dedup.graph.ConnectedComponent;
|
||||||
import eu.dnetlib.dedup.graph.GraphProcessor;
|
import eu.dnetlib.dedup.graph.GraphProcessor;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
import eu.dnetlib.pace.config.DedupConfig;
|
import eu.dnetlib.pace.config.DedupConfig;
|
||||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||||
|
@ -42,7 +45,6 @@ public class SparkCreateConnectedComponent {
|
||||||
.master(parser.get("master"))
|
.master(parser.get("master"))
|
||||||
.getOrCreate();
|
.getOrCreate();
|
||||||
|
|
||||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
|
||||||
final String inputPath = parser.get("sourcePath");
|
final String inputPath = parser.get("sourcePath");
|
||||||
final String entity = parser.get("entity");
|
final String entity = parser.get("entity");
|
||||||
final String targetPath = parser.get("targetPath");
|
final String targetPath = parser.get("targetPath");
|
||||||
|
@ -50,8 +52,12 @@ public class SparkCreateConnectedComponent {
|
||||||
// DedupConfig.load(IOUtils.toString(SparkCreateConnectedComponent.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf2.json")));
|
// DedupConfig.load(IOUtils.toString(SparkCreateConnectedComponent.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf2.json")));
|
||||||
final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf"));
|
final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf"));
|
||||||
|
|
||||||
final JavaPairRDD<Object, String> vertexes = sc
|
final JavaPairRDD<Object, String> vertexes = spark
|
||||||
.textFile(inputPath + "/" + entity)
|
.read()
|
||||||
|
.load(inputPath + "/" + entity)
|
||||||
|
.as(Encoders.kryo(Oaf.class))
|
||||||
|
.map((MapFunction<Oaf, String>) p -> new ObjectMapper().writeValueAsString(p), Encoders.STRING())
|
||||||
|
.javaRDD()
|
||||||
.map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s))
|
.map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s))
|
||||||
.mapToPair(
|
.mapToPair(
|
||||||
(PairFunction<String, Object, String>) s -> new Tuple2<Object, String>(getHashcode(s), s));
|
(PairFunction<String, Object, String>) s -> new Tuple2<Object, String>(getHashcode(s), s));
|
||||||
|
|
|
@ -4,10 +4,10 @@ package eu.dnetlib.dedup;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.SaveMode;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||||
import eu.dnetlib.pace.config.DedupConfig;
|
import eu.dnetlib.pace.config.DedupConfig;
|
||||||
|
@ -41,12 +41,19 @@ public class SparkCreateDedupRecord {
|
||||||
DedupUtility.createEntityPath(sourcePath, entity),
|
DedupUtility.createEntityPath(sourcePath, entity),
|
||||||
OafEntityType.valueOf(entity),
|
OafEntityType.valueOf(entity),
|
||||||
dedupConf);
|
dedupConf);
|
||||||
dedupRecord
|
spark
|
||||||
.map(
|
.createDataset(dedupRecord.rdd(), Encoders.kryo(OafEntity.class))
|
||||||
r -> {
|
.write()
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
.mode(SaveMode.Overwrite)
|
||||||
return mapper.writeValueAsString(r);
|
.save(dedupPath + "/" + entity + "/dedup_records");
|
||||||
})
|
//
|
||||||
.saveAsTextFile(dedupPath + "/" + entity + "/dedup_records");
|
//
|
||||||
|
// dedupRecord
|
||||||
|
// .map(
|
||||||
|
// r -> {
|
||||||
|
// ObjectMapper mapper = new ObjectMapper();
|
||||||
|
// return mapper.writeValueAsString(r);
|
||||||
|
// })
|
||||||
|
// .saveAsTextFile(dedupPath + "/" + entity + "/dedup_records");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,10 +7,13 @@ import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.api.java.JavaPairRDD;
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
import eu.dnetlib.pace.config.DedupConfig;
|
import eu.dnetlib.pace.config.DedupConfig;
|
||||||
import eu.dnetlib.pace.model.MapDocument;
|
import eu.dnetlib.pace.model.MapDocument;
|
||||||
|
@ -46,8 +49,12 @@ public class SparkCreateSimRels {
|
||||||
// DedupConfig.load(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
|
// DedupConfig.load(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
|
||||||
final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf"));
|
final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf"));
|
||||||
|
|
||||||
JavaPairRDD<String, MapDocument> mapDocument = sc
|
JavaPairRDD<String, MapDocument> mapDocument = spark
|
||||||
.textFile(inputPath + "/" + entity)
|
.read()
|
||||||
|
.load(inputPath + "/" + entity)
|
||||||
|
.as(Encoders.kryo(Oaf.class))
|
||||||
|
.map((MapFunction<Oaf, String>) p -> new ObjectMapper().writeValueAsString(p), Encoders.STRING())
|
||||||
|
.javaRDD()
|
||||||
.mapToPair(
|
.mapToPair(
|
||||||
s -> {
|
s -> {
|
||||||
MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s);
|
MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s);
|
||||||
|
|
|
@ -14,16 +14,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
import eu.dnetlib.dhp.schema.scholexplorer.DLIRelation;
|
||||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
public class SparkPropagateRelationsJob {
|
public class SparkPropagateRelationsJob {
|
||||||
enum FieldType {
|
|
||||||
SOURCE, TARGET
|
|
||||||
}
|
|
||||||
|
|
||||||
static final String SOURCEJSONPATH = "$.source";
|
|
||||||
static final String TARGETJSONPATH = "$.target";
|
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
@ -39,7 +34,6 @@ public class SparkPropagateRelationsJob {
|
||||||
.master(parser.get("master"))
|
.master(parser.get("master"))
|
||||||
.getOrCreate();
|
.getOrCreate();
|
||||||
|
|
||||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
|
||||||
final String relationPath = parser.get("relationPath");
|
final String relationPath = parser.get("relationPath");
|
||||||
final String mergeRelPath = parser.get("mergeRelPath");
|
final String mergeRelPath = parser.get("mergeRelPath");
|
||||||
final String targetRelPath = parser.get("targetRelPath");
|
final String targetRelPath = parser.get("targetRelPath");
|
||||||
|
@ -50,63 +44,38 @@ public class SparkPropagateRelationsJob {
|
||||||
.as(Encoders.bean(Relation.class))
|
.as(Encoders.bean(Relation.class))
|
||||||
.where("relClass == 'merges'");
|
.where("relClass == 'merges'");
|
||||||
|
|
||||||
final Dataset<Relation> rels = spark.read().load(relationPath).as(Encoders.bean(Relation.class));
|
final Dataset<DLIRelation> rels = spark
|
||||||
|
.read()
|
||||||
|
.load(relationPath)
|
||||||
|
.as(Encoders.kryo(DLIRelation.class))
|
||||||
|
.map(
|
||||||
|
(MapFunction<DLIRelation, DLIRelation>) r -> r,
|
||||||
|
Encoders.bean(DLIRelation.class));
|
||||||
|
|
||||||
final Dataset<Relation> firstJoin = rels
|
final Dataset<DLIRelation> firstJoin = rels
|
||||||
.joinWith(merge, merge.col("target").equalTo(rels.col("source")), "left_outer")
|
.joinWith(merge, merge.col("target").equalTo(rels.col("source")), "left_outer")
|
||||||
.map(
|
.map(
|
||||||
(MapFunction<Tuple2<Relation, Relation>, Relation>) r -> {
|
(MapFunction<Tuple2<DLIRelation, Relation>, DLIRelation>) r -> {
|
||||||
final Relation mergeRelation = r._2();
|
final Relation mergeRelation = r._2();
|
||||||
final Relation relation = r._1();
|
final DLIRelation relation = r._1();
|
||||||
|
|
||||||
if (mergeRelation != null)
|
if (mergeRelation != null)
|
||||||
relation.setSource(mergeRelation.getSource());
|
relation.setSource(mergeRelation.getSource());
|
||||||
return relation;
|
return relation;
|
||||||
},
|
},
|
||||||
Encoders.bean(Relation.class));
|
Encoders.bean(DLIRelation.class));
|
||||||
|
|
||||||
final Dataset<Relation> secondJoin = firstJoin
|
final Dataset<DLIRelation> secondJoin = firstJoin
|
||||||
.joinWith(merge, merge.col("target").equalTo(firstJoin.col("target")), "left_outer")
|
.joinWith(merge, merge.col("target").equalTo(firstJoin.col("target")), "left_outer")
|
||||||
.map(
|
.map(
|
||||||
(MapFunction<Tuple2<Relation, Relation>, Relation>) r -> {
|
(MapFunction<Tuple2<DLIRelation, Relation>, DLIRelation>) r -> {
|
||||||
final Relation mergeRelation = r._2();
|
final Relation mergeRelation = r._2();
|
||||||
final Relation relation = r._1();
|
final DLIRelation relation = r._1();
|
||||||
if (mergeRelation != null)
|
if (mergeRelation != null)
|
||||||
relation.setTarget(mergeRelation.getSource());
|
relation.setTarget(mergeRelation.getSource());
|
||||||
return relation;
|
return relation;
|
||||||
},
|
},
|
||||||
Encoders.bean(Relation.class));
|
Encoders.kryo(DLIRelation.class));
|
||||||
|
|
||||||
secondJoin.write().mode(SaveMode.Overwrite).save(targetRelPath);
|
secondJoin.write().mode(SaveMode.Overwrite).save(targetRelPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean containsDedup(final String json) {
|
|
||||||
final String source = DHPUtils.getJPathString(SOURCEJSONPATH, json);
|
|
||||||
final String target = DHPUtils.getJPathString(TARGETJSONPATH, json);
|
|
||||||
|
|
||||||
return source.toLowerCase().contains("dedup") || target.toLowerCase().contains("dedup");
|
|
||||||
}
|
|
||||||
|
|
||||||
private static String replaceField(final String json, final String id, final FieldType type) {
|
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
|
||||||
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
|
||||||
try {
|
|
||||||
Relation relation = mapper.readValue(json, Relation.class);
|
|
||||||
if (relation.getDataInfo() == null)
|
|
||||||
relation.setDataInfo(new DataInfo());
|
|
||||||
relation.getDataInfo().setDeletedbyinference(false);
|
|
||||||
switch (type) {
|
|
||||||
case SOURCE:
|
|
||||||
relation.setSource(id);
|
|
||||||
return mapper.writeValueAsString(relation);
|
|
||||||
case TARGET:
|
|
||||||
relation.setTarget(id);
|
|
||||||
return mapper.writeValueAsString(relation);
|
|
||||||
default:
|
|
||||||
throw new IllegalArgumentException("");
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new RuntimeException("unable to deserialize json relation: " + json, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,75 @@
|
||||||
|
package eu.dnetlib.dedup.sx
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.{Oaf, OafEntity, Relation}
|
||||||
|
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation, DLIUnknown, OafUtils}
|
||||||
|
import org.apache.commons.io.IOUtils
|
||||||
|
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
||||||
|
import org.slf4j.LoggerFactory
|
||||||
|
import org.apache.spark.sql.functions.col
|
||||||
|
|
||||||
|
object SparkUpdateEntityWithDedupInfo {
|
||||||
|
|
||||||
|
def main(args: Array[String]): Unit = {
|
||||||
|
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkUpdateEntityWithDedupInfo.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json")))
|
||||||
|
val logger = LoggerFactory.getLogger(SparkUpdateEntityWithDedupInfo.getClass)
|
||||||
|
parser.parseArgument(args)
|
||||||
|
|
||||||
|
val workingPath: String = parser.get("workingPath")
|
||||||
|
logger.info(s"Working dir path = $workingPath")
|
||||||
|
|
||||||
|
implicit val oafEncoder: Encoder[OafEntity] = Encoders.kryo[OafEntity]
|
||||||
|
implicit val relEncoder: Encoder[Relation] = Encoders.bean(classOf[Relation])
|
||||||
|
|
||||||
|
implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication]
|
||||||
|
implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo[DLIDataset]
|
||||||
|
implicit val unkEncoder: Encoder[DLIUnknown] = Encoders.kryo[DLIUnknown]
|
||||||
|
implicit val dlirelEncoder: Encoder[DLIRelation] = Encoders.kryo[DLIRelation]
|
||||||
|
|
||||||
|
|
||||||
|
val spark: SparkSession = SparkSession
|
||||||
|
.builder()
|
||||||
|
.appName(SparkUpdateEntityWithDedupInfo.getClass.getSimpleName)
|
||||||
|
.master(parser.get("master"))
|
||||||
|
.getOrCreate()
|
||||||
|
|
||||||
|
|
||||||
|
val entityPath = parser.get("entityPath")
|
||||||
|
val mergeRelPath = parser.get("mergeRelPath")
|
||||||
|
val dedupRecordPath = parser.get("dedupRecordPath")
|
||||||
|
val entity = parser.get("entity")
|
||||||
|
val destination = parser.get("targetPath")
|
||||||
|
|
||||||
|
val mergedIds = spark.read.load(mergeRelPath).as[Relation]
|
||||||
|
.where("relClass == 'merges'")
|
||||||
|
.select(col("target"))
|
||||||
|
|
||||||
|
|
||||||
|
val entities: Dataset[(String, OafEntity)] = spark
|
||||||
|
.read
|
||||||
|
.load(entityPath).as[OafEntity]
|
||||||
|
.map(o => (o.getId, o))(Encoders.tuple(Encoders.STRING, oafEncoder))
|
||||||
|
|
||||||
|
|
||||||
|
val finalDataset:Dataset[OafEntity] = entities.joinWith(mergedIds, entities("_1").equalTo(mergedIds("target")), "left")
|
||||||
|
.map(k => {
|
||||||
|
val e: OafEntity = k._1._2
|
||||||
|
val t = k._2
|
||||||
|
if (t != null && t.getString(0).nonEmpty) {
|
||||||
|
if (e.getDataInfo == null) {
|
||||||
|
e.setDataInfo(OafUtils.generateDataInfo())
|
||||||
|
}
|
||||||
|
e.getDataInfo.setDeletedbyinference(true)
|
||||||
|
}
|
||||||
|
e
|
||||||
|
})
|
||||||
|
|
||||||
|
val dedupRecords :Dataset[OafEntity] = spark.read.load(dedupRecordPath).as[OafEntity]
|
||||||
|
|
||||||
|
finalDataset.union(dedupRecords)
|
||||||
|
.repartition(1200).write
|
||||||
|
.mode(SaveMode.Overwrite).save(destination)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -144,7 +144,7 @@
|
||||||
<master>yarn-cluster</master>
|
<master>yarn-cluster</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>Update ${entity} and add DedupRecord</name>
|
<name>Update ${entity} and add DedupRecord</name>
|
||||||
<class>eu.dnetlib.dedup.sx.SparkUpdateEntityJob</class>
|
<class>eu.dnetlib.dedup.sx.SparkUpdateEntityWithDedupInfo</class>
|
||||||
<jar>dhp-dedup-scholexplorer-${projectVersion}.jar</jar>
|
<jar>dhp-dedup-scholexplorer-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--executor-memory ${sparkExecutorMemory}
|
--executor-memory ${sparkExecutorMemory}
|
||||||
|
|
|
@ -90,6 +90,7 @@ public class CleanGraphSparkJob {
|
||||||
final CleaningRuleMap mapping = CleaningRuleMap.create(vocs);
|
final CleaningRuleMap mapping = CleaningRuleMap.create(vocs);
|
||||||
|
|
||||||
readTableFromPath(spark, inputPath, clazz)
|
readTableFromPath(spark, inputPath, clazz)
|
||||||
|
.map((MapFunction<T, T>) value -> fixVocabularyNames(value), Encoders.bean(clazz))
|
||||||
.map((MapFunction<T, T>) value -> OafCleaner.apply(value, mapping), Encoders.bean(clazz))
|
.map((MapFunction<T, T>) value -> OafCleaner.apply(value, mapping), Encoders.bean(clazz))
|
||||||
.map((MapFunction<T, T>) value -> fixDefaults(value), Encoders.bean(clazz))
|
.map((MapFunction<T, T>) value -> fixDefaults(value), Encoders.bean(clazz))
|
||||||
.write()
|
.write()
|
||||||
|
@ -98,6 +99,65 @@ public class CleanGraphSparkJob {
|
||||||
.json(outputPath);
|
.json(outputPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected static <T extends Oaf> T fixVocabularyNames(T value) {
|
||||||
|
if (value instanceof Datasource) {
|
||||||
|
// nothing to clean here
|
||||||
|
} else if (value instanceof Project) {
|
||||||
|
// nothing to clean here
|
||||||
|
} else if (value instanceof Organization) {
|
||||||
|
Organization o = (Organization) value;
|
||||||
|
if (Objects.nonNull(o.getCountry())) {
|
||||||
|
fixVocabName(o.getCountry(), ModelConstants.DNET_COUNTRY_TYPE);
|
||||||
|
}
|
||||||
|
} else if (value instanceof Relation) {
|
||||||
|
// nothing to clean here
|
||||||
|
} else if (value instanceof Result) {
|
||||||
|
|
||||||
|
Result r = (Result) value;
|
||||||
|
|
||||||
|
fixVocabName(r.getLanguage(), ModelConstants.DNET_LANGUAGES);
|
||||||
|
fixVocabName(r.getResourcetype(), ModelConstants.DNET_DATA_CITE_RESOURCE);
|
||||||
|
fixVocabName(r.getBestaccessright(), ModelConstants.DNET_ACCESS_MODES);
|
||||||
|
|
||||||
|
if (Objects.nonNull(r.getSubject())) {
|
||||||
|
r.getSubject().forEach(s -> fixVocabName(s.getQualifier(), ModelConstants.DNET_SUBJECT_TYPOLOGIES));
|
||||||
|
}
|
||||||
|
if (Objects.nonNull(r.getInstance())) {
|
||||||
|
for (Instance i : r.getInstance()) {
|
||||||
|
fixVocabName(i.getAccessright(), ModelConstants.DNET_ACCESS_MODES);
|
||||||
|
fixVocabName(i.getRefereed(), ModelConstants.DNET_REVIEW_LEVELS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (Objects.nonNull(r.getAuthor())) {
|
||||||
|
r.getAuthor().forEach(a -> {
|
||||||
|
if (Objects.nonNull(a.getPid())) {
|
||||||
|
a.getPid().forEach(p -> {
|
||||||
|
fixVocabName(p.getQualifier(), ModelConstants.DNET_PID_TYPES);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
if (value instanceof Publication) {
|
||||||
|
|
||||||
|
} else if (value instanceof eu.dnetlib.dhp.schema.oaf.Dataset) {
|
||||||
|
|
||||||
|
} else if (value instanceof OtherResearchProduct) {
|
||||||
|
|
||||||
|
} else if (value instanceof Software) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void fixVocabName(Qualifier q, String vocabularyName) {
|
||||||
|
if (Objects.nonNull(q) && StringUtils.isBlank(q.getSchemeid())) {
|
||||||
|
q.setSchemeid(vocabularyName);
|
||||||
|
q.setSchemename(vocabularyName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protected static <T extends Oaf> T fixDefaults(T value) {
|
protected static <T extends Oaf> T fixDefaults(T value) {
|
||||||
if (value instanceof Datasource) {
|
if (value instanceof Datasource) {
|
||||||
// nothing to clean here
|
// nothing to clean here
|
||||||
|
@ -113,6 +173,9 @@ public class CleanGraphSparkJob {
|
||||||
} else if (value instanceof Result) {
|
} else if (value instanceof Result) {
|
||||||
|
|
||||||
Result r = (Result) value;
|
Result r = (Result) value;
|
||||||
|
if (Objects.nonNull(r.getPublisher()) && StringUtils.isBlank(r.getPublisher().getValue())) {
|
||||||
|
r.setPublisher(null);
|
||||||
|
}
|
||||||
if (Objects.isNull(r.getLanguage()) || StringUtils.isBlank(r.getLanguage().getClassid())) {
|
if (Objects.isNull(r.getLanguage()) || StringUtils.isBlank(r.getLanguage().getClassid())) {
|
||||||
r
|
r
|
||||||
.setLanguage(
|
.setLanguage(
|
||||||
|
|
|
@ -44,6 +44,7 @@ import java.util.Date;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
import java.util.function.Predicate;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
@ -53,6 +54,7 @@ import org.slf4j.LoggerFactory;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.common.DbClient;
|
import eu.dnetlib.dhp.common.DbClient;
|
||||||
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
|
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
|
||||||
|
import eu.dnetlib.dhp.oa.graph.raw.common.VerifyNsPrefixPredicate;
|
||||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Context;
|
import eu.dnetlib.dhp.schema.oaf.Context;
|
||||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||||
|
@ -113,6 +115,11 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
|
||||||
final String hdfsPath = parser.get("hdfsPath");
|
final String hdfsPath = parser.get("hdfsPath");
|
||||||
log.info("hdfsPath: {}", hdfsPath);
|
log.info("hdfsPath: {}", hdfsPath);
|
||||||
|
|
||||||
|
final String nsPrefixBlacklist = parser.get("nsPrefixBlacklist");
|
||||||
|
log.info("nsPrefixBlacklist: {}", nsPrefixBlacklist);
|
||||||
|
|
||||||
|
final Predicate<Oaf> verifyNamespacePrefix = new VerifyNsPrefixPredicate(nsPrefixBlacklist);
|
||||||
|
|
||||||
final boolean processClaims = parser.get("action") != null && parser.get("action").equalsIgnoreCase("claims");
|
final boolean processClaims = parser.get("action") != null && parser.get("action").equalsIgnoreCase("claims");
|
||||||
log.info("processClaims: {}", processClaims);
|
log.info("processClaims: {}", processClaims);
|
||||||
|
|
||||||
|
@ -123,23 +130,25 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
|
||||||
smdbe.execute("queryClaims.sql", smdbe::processClaims);
|
smdbe.execute("queryClaims.sql", smdbe::processClaims);
|
||||||
} else {
|
} else {
|
||||||
log.info("Processing datasources...");
|
log.info("Processing datasources...");
|
||||||
smdbe.execute("queryDatasources.sql", smdbe::processDatasource);
|
smdbe.execute("queryDatasources.sql", smdbe::processDatasource, verifyNamespacePrefix);
|
||||||
|
|
||||||
log.info("Processing projects...");
|
log.info("Processing projects...");
|
||||||
if (dbSchema.equalsIgnoreCase("beta")) {
|
if (dbSchema.equalsIgnoreCase("beta")) {
|
||||||
smdbe.execute("queryProjects.sql", smdbe::processProject);
|
smdbe.execute("queryProjects.sql", smdbe::processProject, verifyNamespacePrefix);
|
||||||
} else {
|
} else {
|
||||||
smdbe.execute("queryProjects_production.sql", smdbe::processProject);
|
smdbe.execute("queryProjects_production.sql", smdbe::processProject, verifyNamespacePrefix);
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info("Processing orgs...");
|
log.info("Processing orgs...");
|
||||||
smdbe.execute("queryOrganizations.sql", smdbe::processOrganization);
|
smdbe.execute("queryOrganizations.sql", smdbe::processOrganization, verifyNamespacePrefix);
|
||||||
|
|
||||||
log.info("Processing relationsNoRemoval ds <-> orgs ...");
|
log.info("Processing relationsNoRemoval ds <-> orgs ...");
|
||||||
smdbe.execute("queryDatasourceOrganization.sql", smdbe::processDatasourceOrganization);
|
smdbe
|
||||||
|
.execute(
|
||||||
|
"queryDatasourceOrganization.sql", smdbe::processDatasourceOrganization, verifyNamespacePrefix);
|
||||||
|
|
||||||
log.info("Processing projects <-> orgs ...");
|
log.info("Processing projects <-> orgs ...");
|
||||||
smdbe.execute("queryProjectOrganization.sql", smdbe::processProjectOrganization);
|
smdbe.execute("queryProjectOrganization.sql", smdbe::processProjectOrganization, verifyNamespacePrefix);
|
||||||
}
|
}
|
||||||
log.info("All done.");
|
log.info("All done.");
|
||||||
}
|
}
|
||||||
|
@ -163,10 +172,20 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
|
||||||
}
|
}
|
||||||
|
|
||||||
public void execute(final String sqlFile, final Function<ResultSet, List<Oaf>> producer)
|
public void execute(final String sqlFile, final Function<ResultSet, List<Oaf>> producer)
|
||||||
|
throws Exception {
|
||||||
|
execute(sqlFile, producer, oaf -> true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void execute(final String sqlFile, final Function<ResultSet, List<Oaf>> producer,
|
||||||
|
final Predicate<Oaf> predicate)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
final String sql = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/sql/" + sqlFile));
|
final String sql = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/sql/" + sqlFile));
|
||||||
|
|
||||||
final Consumer<ResultSet> consumer = rs -> producer.apply(rs).forEach(oaf -> emitOaf(oaf));
|
final Consumer<ResultSet> consumer = rs -> producer.apply(rs).forEach(oaf -> {
|
||||||
|
if (predicate.test(oaf)) {
|
||||||
|
emitOaf(oaf);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
dbClient.processResults(sql, consumer);
|
dbClient.processResults(sql, consumer);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,62 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.oa.graph.raw.common;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.function.Predicate;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
||||||
|
import com.google.common.base.Splitter;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This predicate should be used to skip oaf objects using a blacklist of nsprefixes.
|
||||||
|
*
|
||||||
|
* @author michele
|
||||||
|
*/
|
||||||
|
public class VerifyNsPrefixPredicate implements Predicate<Oaf> {
|
||||||
|
|
||||||
|
final Set<String> invalids = new HashSet<>();
|
||||||
|
|
||||||
|
public VerifyNsPrefixPredicate(final String blacklist) {
|
||||||
|
if (StringUtils.isNotBlank(blacklist)) {
|
||||||
|
Splitter
|
||||||
|
.on(",")
|
||||||
|
.trimResults()
|
||||||
|
.omitEmptyStrings()
|
||||||
|
.split(blacklist)
|
||||||
|
.forEach(invalids::add);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean test(final Oaf oaf) {
|
||||||
|
if (oaf instanceof Datasource) {
|
||||||
|
return testValue(((Datasource) oaf).getNamespaceprefix().getValue());
|
||||||
|
} else if (oaf instanceof OafEntity) {
|
||||||
|
return testValue(((OafEntity) oaf).getId());
|
||||||
|
} else if (oaf instanceof Relation) {
|
||||||
|
return testValue(((Relation) oaf).getSource()) && testValue(((Relation) oaf).getTarget());
|
||||||
|
} else {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean testValue(final String s) {
|
||||||
|
if (StringUtils.isNotBlank(s)) {
|
||||||
|
for (final String invalid : invalids) {
|
||||||
|
if (Pattern.matches("^(\\d\\d\\|)?" + invalid + ".*$", s)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -1,5 +1,6 @@
|
||||||
package eu.dnetlib.dhp.sx.ebi
|
package eu.dnetlib.dhp.sx.ebi
|
||||||
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset}
|
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset}
|
||||||
|
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation, DLIUnknown}
|
||||||
import org.apache.spark.sql.{Encoder, Encoders}
|
import org.apache.spark.sql.{Encoder, Encoders}
|
||||||
import org.apache.spark.sql.expressions.Aggregator
|
import org.apache.spark.sql.expressions.Aggregator
|
||||||
|
|
||||||
|
@ -35,6 +36,88 @@ object EBIAggregator {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def getDLIUnknownAggregator(): Aggregator[(String, DLIUnknown), DLIUnknown, DLIUnknown] = new Aggregator[(String, DLIUnknown), DLIUnknown, DLIUnknown]{
|
||||||
|
|
||||||
|
override def zero: DLIUnknown = new DLIUnknown()
|
||||||
|
|
||||||
|
override def reduce(b: DLIUnknown, a: (String, DLIUnknown)): DLIUnknown = {
|
||||||
|
b.mergeFrom(a._2)
|
||||||
|
if (b.getId == null)
|
||||||
|
b.setId(a._2.getId)
|
||||||
|
b
|
||||||
|
}
|
||||||
|
|
||||||
|
override def merge(wx: DLIUnknown, wy: DLIUnknown): DLIUnknown = {
|
||||||
|
wx.mergeFrom(wy)
|
||||||
|
if(wx.getId == null && wy.getId.nonEmpty)
|
||||||
|
wx.setId(wy.getId)
|
||||||
|
wx
|
||||||
|
}
|
||||||
|
override def finish(reduction: DLIUnknown): DLIUnknown = reduction
|
||||||
|
|
||||||
|
override def bufferEncoder: Encoder[DLIUnknown] =
|
||||||
|
Encoders.kryo(classOf[DLIUnknown])
|
||||||
|
|
||||||
|
override def outputEncoder: Encoder[DLIUnknown] =
|
||||||
|
Encoders.kryo(classOf[DLIUnknown])
|
||||||
|
}
|
||||||
|
|
||||||
|
def getDLIDatasetAggregator(): Aggregator[(String, DLIDataset), DLIDataset, DLIDataset] = new Aggregator[(String, DLIDataset), DLIDataset, DLIDataset]{
|
||||||
|
|
||||||
|
override def zero: DLIDataset = new DLIDataset()
|
||||||
|
|
||||||
|
override def reduce(b: DLIDataset, a: (String, DLIDataset)): DLIDataset = {
|
||||||
|
b.mergeFrom(a._2)
|
||||||
|
if (b.getId == null)
|
||||||
|
b.setId(a._2.getId)
|
||||||
|
b
|
||||||
|
}
|
||||||
|
|
||||||
|
override def merge(wx: DLIDataset, wy: DLIDataset): DLIDataset = {
|
||||||
|
wx.mergeFrom(wy)
|
||||||
|
if(wx.getId == null && wy.getId.nonEmpty)
|
||||||
|
wx.setId(wy.getId)
|
||||||
|
wx
|
||||||
|
}
|
||||||
|
override def finish(reduction: DLIDataset): DLIDataset = reduction
|
||||||
|
|
||||||
|
override def bufferEncoder: Encoder[DLIDataset] =
|
||||||
|
Encoders.kryo(classOf[DLIDataset])
|
||||||
|
|
||||||
|
override def outputEncoder: Encoder[DLIDataset] =
|
||||||
|
Encoders.kryo(classOf[DLIDataset])
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def getDLIPublicationAggregator(): Aggregator[(String, DLIPublication), DLIPublication, DLIPublication] = new Aggregator[(String, DLIPublication), DLIPublication, DLIPublication]{
|
||||||
|
|
||||||
|
override def zero: DLIPublication = new DLIPublication()
|
||||||
|
|
||||||
|
override def reduce(b: DLIPublication, a: (String, DLIPublication)): DLIPublication = {
|
||||||
|
b.mergeFrom(a._2)
|
||||||
|
if (b.getId == null)
|
||||||
|
b.setId(a._2.getId)
|
||||||
|
b
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
override def merge(wx: DLIPublication, wy: DLIPublication): DLIPublication = {
|
||||||
|
wx.mergeFrom(wy)
|
||||||
|
if(wx.getId == null && wy.getId.nonEmpty)
|
||||||
|
wx.setId(wy.getId)
|
||||||
|
wx
|
||||||
|
}
|
||||||
|
override def finish(reduction: DLIPublication): DLIPublication = reduction
|
||||||
|
|
||||||
|
override def bufferEncoder: Encoder[DLIPublication] =
|
||||||
|
Encoders.kryo(classOf[DLIPublication])
|
||||||
|
|
||||||
|
override def outputEncoder: Encoder[DLIPublication] =
|
||||||
|
Encoders.kryo(classOf[DLIPublication])
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def getPublicationAggregator(): Aggregator[(String, Publication), Publication, Publication] = new Aggregator[(String, Publication), Publication, Publication]{
|
def getPublicationAggregator(): Aggregator[(String, Publication), Publication, Publication] = new Aggregator[(String, Publication), Publication, Publication]{
|
||||||
|
|
||||||
override def zero: Publication = new Publication()
|
override def zero: Publication = new Publication()
|
||||||
|
@ -85,5 +168,27 @@ object EBIAggregator {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def getDLIRelationAggregator(): Aggregator[(String, DLIRelation), DLIRelation, DLIRelation] = new Aggregator[(String, DLIRelation), DLIRelation, DLIRelation]{
|
||||||
|
|
||||||
|
override def zero: DLIRelation = new DLIRelation()
|
||||||
|
|
||||||
|
override def reduce(b: DLIRelation, a: (String, DLIRelation)): DLIRelation = {
|
||||||
|
a._2
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
override def merge(a: DLIRelation, b: DLIRelation): DLIRelation = {
|
||||||
|
if(b!= null) b else a
|
||||||
|
}
|
||||||
|
override def finish(reduction: DLIRelation): DLIRelation = reduction
|
||||||
|
|
||||||
|
override def bufferEncoder: Encoder[DLIRelation] =
|
||||||
|
Encoders.kryo(classOf[DLIRelation])
|
||||||
|
|
||||||
|
override def outputEncoder: Encoder[DLIRelation] =
|
||||||
|
Encoders.kryo(classOf[DLIRelation])
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,9 @@
|
||||||
package eu.dnetlib.dhp.sx.ebi
|
package eu.dnetlib.dhp.sx.ebi
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||||
import eu.dnetlib.dhp.schema.oaf.{Instance, KeyValue, Oaf}
|
import eu.dnetlib.dhp.schema.oaf.{Author, Instance, Journal, KeyValue, Oaf, Publication, Dataset => OafDataset}
|
||||||
import eu.dnetlib.dhp.schema.scholexplorer.OafUtils.createQualifier
|
import eu.dnetlib.dhp.schema.scholexplorer.OafUtils.createQualifier
|
||||||
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIRelation, OafUtils, ProvenaceInfo}
|
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation, OafUtils, ProvenaceInfo}
|
||||||
|
import eu.dnetlib.dhp.sx.ebi.model.{PMArticle, PMAuthor, PMJournal}
|
||||||
import eu.dnetlib.dhp.utils.DHPUtils
|
import eu.dnetlib.dhp.utils.DHPUtils
|
||||||
import eu.dnetlib.scholexplorer.relation.RelationMapper
|
import eu.dnetlib.scholexplorer.relation.RelationMapper
|
||||||
import org.apache.commons.io.IOUtils
|
import org.apache.commons.io.IOUtils
|
||||||
|
@ -12,6 +13,7 @@ import org.json4s
|
||||||
import org.json4s.DefaultFormats
|
import org.json4s.DefaultFormats
|
||||||
import org.json4s.JsonAST.{JField, JObject, JString}
|
import org.json4s.JsonAST.{JField, JObject, JString}
|
||||||
import org.json4s.jackson.JsonMethods.parse
|
import org.json4s.jackson.JsonMethods.parse
|
||||||
|
import org.apache.spark.sql.functions._
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
|
@ -28,6 +30,64 @@ case class EBILinks(relation:String, pubdate:String, tpid:String, tpidType:Strin
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def journalToOAF(pj:PMJournal): Journal = {
|
||||||
|
val j = new Journal
|
||||||
|
j.setIssnPrinted(pj.getIssn)
|
||||||
|
j.setVol(pj.getVolume)
|
||||||
|
j.setName(pj.getTitle)
|
||||||
|
j.setIss(pj.getIssue)
|
||||||
|
j.setDataInfo(OafUtils.generateDataInfo())
|
||||||
|
j
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def pubmedTOPublication(input:PMArticle):DLIPublication = {
|
||||||
|
|
||||||
|
|
||||||
|
val dnetPublicationId = s"50|${DHPUtils.md5(s"${input.getPmid}::pmid")}"
|
||||||
|
|
||||||
|
val p = new DLIPublication
|
||||||
|
p.setId(dnetPublicationId)
|
||||||
|
p.setDataInfo(OafUtils.generateDataInfo())
|
||||||
|
p.setPid(List(OafUtils.createSP(input.getPmid.toLowerCase.trim, "pmid", "dnet:pid_types")).asJava)
|
||||||
|
p.setCompletionStatus("complete")
|
||||||
|
val pi = new ProvenaceInfo
|
||||||
|
pi.setId("dli_________::europe_pmc__")
|
||||||
|
pi.setName( "Europe PMC")
|
||||||
|
pi.setCompletionStatus("complete")
|
||||||
|
pi.setCollectionMode("collected")
|
||||||
|
p.setDlicollectedfrom(List(pi).asJava)
|
||||||
|
p.setCollectedfrom(List(generatePubmedDLICollectedFrom()).asJava)
|
||||||
|
|
||||||
|
if (input.getAuthors != null && input.getAuthors.size() >0) {
|
||||||
|
var aths: List[Author] = List()
|
||||||
|
input.getAuthors.asScala.filter(a=> a!= null).foreach(a => {
|
||||||
|
val c = new Author
|
||||||
|
c.setFullname(a.getFullName)
|
||||||
|
c.setName(a.getForeName)
|
||||||
|
c.setSurname(a.getLastName)
|
||||||
|
aths = aths ::: List(c)
|
||||||
|
})
|
||||||
|
if (aths.nonEmpty)
|
||||||
|
p.setAuthor(aths.asJava)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if (input.getJournal != null)
|
||||||
|
p.setJournal(journalToOAF(input.getJournal))
|
||||||
|
p.setTitle(List(OafUtils.createSP(input.getTitle, "main title", "dnet:dataCite_title")).asJava)
|
||||||
|
p.setDateofacceptance(OafUtils.asField(input.getDate))
|
||||||
|
val i = new Instance
|
||||||
|
i.setCollectedfrom(generatePubmedDLICollectedFrom())
|
||||||
|
i.setDateofacceptance(p.getDateofacceptance)
|
||||||
|
i.setUrl(List(s"https://pubmed.ncbi.nlm.nih.gov/${input.getPmid}").asJava)
|
||||||
|
i.setInstancetype(createQualifier("0001", "Article", "dnet:publication_resource", "dnet:publication_resource"))
|
||||||
|
p.setInstance(List(i).asJava)
|
||||||
|
p
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def ebiLinksToOaf(input:(String, String)):List[Oaf] = {
|
def ebiLinksToOaf(input:(String, String)):List[Oaf] = {
|
||||||
val pmid :String = input._1
|
val pmid :String = input._1
|
||||||
val input_json :String = input._2
|
val input_json :String = input._2
|
||||||
|
@ -116,8 +176,16 @@ case class EBILinks(relation:String, pubdate:String, tpid:String, tpidType:Strin
|
||||||
|
|
||||||
val workingPath = parser.get("workingPath")
|
val workingPath = parser.get("workingPath")
|
||||||
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
|
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
|
||||||
|
implicit val oafpubEncoder: Encoder[Publication] = Encoders.kryo[Publication]
|
||||||
implicit val relEncoder: Encoder[DLIRelation] = Encoders.kryo(classOf[DLIRelation])
|
implicit val relEncoder: Encoder[DLIRelation] = Encoders.kryo(classOf[DLIRelation])
|
||||||
implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo(classOf[DLIDataset])
|
implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo(classOf[DLIDataset])
|
||||||
|
implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo(classOf[DLIPublication])
|
||||||
|
implicit val atEncoder: Encoder[Author] = Encoders.kryo(classOf[Author])
|
||||||
|
implicit val strEncoder:Encoder[String] = Encoders.STRING
|
||||||
|
implicit val PMEncoder: Encoder[PMArticle] = Encoders.kryo(classOf[PMArticle])
|
||||||
|
implicit val PMJEncoder: Encoder[PMJournal] = Encoders.kryo(classOf[PMJournal])
|
||||||
|
implicit val PMAEncoder: Encoder[PMAuthor] = Encoders.kryo(classOf[PMAuthor])
|
||||||
|
|
||||||
|
|
||||||
val ds:Dataset[(String,String)] = spark.read.load(s"$workingPath/baseline_links_updates").as[(String,String)](Encoders.tuple(Encoders.STRING, Encoders.STRING))
|
val ds:Dataset[(String,String)] = spark.read.load(s"$workingPath/baseline_links_updates").as[(String,String)](Encoders.tuple(Encoders.STRING, Encoders.STRING))
|
||||||
|
|
||||||
|
@ -133,6 +201,46 @@ case class EBILinks(relation:String, pubdate:String, tpid:String, tpidType:Strin
|
||||||
oDataset.filter(p =>p.isInstanceOf[DLIDataset]).map(p => p.asInstanceOf[DLIDataset]).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_links_updates_dataset")
|
oDataset.filter(p =>p.isInstanceOf[DLIDataset]).map(p => p.asInstanceOf[DLIDataset]).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_links_updates_dataset")
|
||||||
|
|
||||||
|
|
||||||
|
val idPublicationSolved:Dataset[String] = spark.read.load(s"$workingPath/baseline_links_updates").where(col("links").isNotNull).select("pmid").as[String]
|
||||||
|
val baseline:Dataset[(String, PMArticle)]= spark.read.load(s"$workingPath/baseline_dataset").as[PMArticle].map(p=> (p.getPmid, p))(Encoders.tuple(strEncoder,PMEncoder))
|
||||||
|
idPublicationSolved.joinWith(baseline, idPublicationSolved("pmid").equalTo(baseline("_1"))).map(k => pubmedTOPublication(k._2._2)).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_links_updates_publication")
|
||||||
|
|
||||||
|
|
||||||
|
val pmaDatasets = spark.read.load("/user/sandro.labruzzo/scholix/EBI/ebi_garr/baseline_dataset").as[PMArticle]
|
||||||
|
|
||||||
|
pmaDatasets.map(p => pubmedTOPublication(p)).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_publication_all")
|
||||||
|
|
||||||
|
val pubs: Dataset[(String,Publication)] = spark.read.load("/user/sandro.labruzzo/scholix/EBI/publication").as[Publication].map(p => (p.getId, p))(Encoders.tuple(Encoders.STRING,oafpubEncoder))
|
||||||
|
val pubdate:Dataset[(String,DLIPublication)] = spark.read.load(s"$workingPath/baseline_publication_all").as[DLIPublication].map(p => (p.getId, p))(Encoders.tuple(Encoders.STRING,pubEncoder))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
pubs.joinWith(pubdate, pubs("_1").equalTo(pubdate("_1"))).map(k => k._2._2).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_publication_ebi")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
val dt : Dataset[DLIDataset] = spark.read.load(s"$workingPath/dataset").as[DLIDataset]
|
||||||
|
val update : Dataset[DLIDataset] = spark.read.load(s"$workingPath/ebi_garr/baseline_links_updates_dataset").as[DLIDataset]
|
||||||
|
|
||||||
|
|
||||||
|
dt.union(update).map(d => (d.getId,d))(Encoders.tuple(Encoders.STRING, datEncoder))
|
||||||
|
.groupByKey(_._1)(Encoders.STRING)
|
||||||
|
.agg(EBIAggregator.getDLIDatasetAggregator().toColumn)
|
||||||
|
.map(p => p._2)
|
||||||
|
.write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_dataset_ebi")
|
||||||
|
|
||||||
|
|
||||||
|
val rel: Dataset[DLIRelation] = spark.read.load(s"$workingPath/relation").as[DLIRelation]
|
||||||
|
val relupdate : Dataset[DLIRelation] = spark.read.load(s"$workingPath/ebi_garr/baseline_links_updates_relation").as[DLIRelation]
|
||||||
|
|
||||||
|
|
||||||
|
rel.union(relupdate)
|
||||||
|
.map(d => (s"${d.getSource}::${d.getRelType}::${d.getTarget}", d))(Encoders.tuple(Encoders.STRING, relEncoder))
|
||||||
|
.groupByKey(_._1)(Encoders.STRING)
|
||||||
|
.agg(EBIAggregator.getDLIRelationAggregator().toColumn)
|
||||||
|
.map(p => p._2)
|
||||||
|
.write.mode(SaveMode.Overwrite)
|
||||||
|
.save(s"$workingPath/baseline_relation_ebi")
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package eu.dnetlib.dhp.sx.ebi
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||||
import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset}
|
import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset}
|
||||||
|
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation}
|
||||||
import eu.dnetlib.dhp.sx.graph.parser.{DatasetScholexplorerParser, PublicationScholexplorerParser}
|
import eu.dnetlib.dhp.sx.graph.parser.{DatasetScholexplorerParser, PublicationScholexplorerParser}
|
||||||
import eu.dnetlib.scholexplorer.relation.RelationMapper
|
import eu.dnetlib.scholexplorer.relation.RelationMapper
|
||||||
import org.apache.commons.io.IOUtils
|
import org.apache.commons.io.IOUtils
|
||||||
|
@ -10,6 +11,7 @@ import org.apache.spark.rdd.RDD
|
||||||
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
||||||
import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig}
|
import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig}
|
||||||
import org.slf4j.{Logger, LoggerFactory}
|
import org.slf4j.{Logger, LoggerFactory}
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
object SparkCreateEBIDataFrame {
|
object SparkCreateEBIDataFrame {
|
||||||
|
@ -34,54 +36,51 @@ object SparkCreateEBIDataFrame {
|
||||||
val relationMapper = RelationMapper.load
|
val relationMapper = RelationMapper.load
|
||||||
|
|
||||||
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
|
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
|
||||||
implicit val datasetEncoder: Encoder[OafDataset] = Encoders.kryo(classOf[OafDataset])
|
implicit val datasetEncoder: Encoder[DLIDataset] = Encoders.kryo(classOf[DLIDataset])
|
||||||
implicit val pubEncoder: Encoder[Publication] = Encoders.kryo(classOf[Publication])
|
implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo(classOf[DLIPublication])
|
||||||
implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation])
|
implicit val relEncoder: Encoder[DLIRelation] = Encoders.kryo(classOf[DLIRelation])
|
||||||
|
|
||||||
logger.info("Extract Publication and relation from publication_xml")
|
// logger.info("Extract Publication and relation from publication_xml")
|
||||||
val oafPubsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/publication_xml").map(s =>
|
// val oafPubsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/publication_xml").map(s =>
|
||||||
{
|
// {
|
||||||
new ObjectMapper().readValue(s, classOf[String])
|
// new ObjectMapper().readValue(s, classOf[String])
|
||||||
}).flatMap(s => {
|
// }).flatMap(s => {
|
||||||
val d = new PublicationScholexplorerParser
|
// val d = new PublicationScholexplorerParser
|
||||||
d.parseObject(s, relationMapper).asScala.iterator})
|
// d.parseObject(s, relationMapper).asScala.iterator})
|
||||||
|
//
|
||||||
|
// val mapper = new ObjectMapper()
|
||||||
|
// mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT)
|
||||||
|
// spark.createDataset(oafPubsRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/oaf")
|
||||||
|
//
|
||||||
|
// logger.info("Extract Publication and relation from dataset_xml")
|
||||||
|
// val oafDatsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/dataset_xml").map(s =>
|
||||||
|
// {
|
||||||
|
// new ObjectMapper().readValue(s, classOf[String])
|
||||||
|
// }).flatMap(s => {
|
||||||
|
// val d = new DatasetScholexplorerParser
|
||||||
|
// d.parseObject(s, relationMapper).asScala.iterator})
|
||||||
|
|
||||||
val mapper = new ObjectMapper()
|
// spark.createDataset(oafDatsRDD).write.mode(SaveMode.Append).save(s"$workingPath/oaf")
|
||||||
mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT)
|
val dataset: Dataset[DLIDataset] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[DLIDataset]).map(d => d.asInstanceOf[DLIDataset])
|
||||||
spark.createDataset(oafPubsRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/oaf")
|
val publication: Dataset[DLIPublication] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[DLIPublication]).map(d => d.asInstanceOf[DLIPublication])
|
||||||
|
val relations: Dataset[DLIRelation] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[DLIRelation]).map(d => d.asInstanceOf[DLIRelation])
|
||||||
logger.info("Extract Publication and relation from dataset_xml")
|
|
||||||
val oafDatsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/dataset_xml").map(s =>
|
|
||||||
{
|
|
||||||
new ObjectMapper().readValue(s, classOf[String])
|
|
||||||
}).flatMap(s => {
|
|
||||||
val d = new DatasetScholexplorerParser
|
|
||||||
d.parseObject(s, relationMapper).asScala.iterator})
|
|
||||||
|
|
||||||
spark.createDataset(oafDatsRDD).write.mode(SaveMode.Append).save(s"$workingPath/oaf")
|
|
||||||
val dataset: Dataset[OafDataset] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[OafDataset]).map(d => d.asInstanceOf[OafDataset])
|
|
||||||
val publication: Dataset[Publication] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[Publication]).map(d => d.asInstanceOf[Publication])
|
|
||||||
val relations: Dataset[Relation] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[Relation]).map(d => d.asInstanceOf[Relation])
|
|
||||||
publication.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder))
|
publication.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder))
|
||||||
.groupByKey(_._1)(Encoders.STRING)
|
.groupByKey(_._1)(Encoders.STRING)
|
||||||
.agg(EBIAggregator.getPublicationAggregator().toColumn)
|
.agg(EBIAggregator.getDLIPublicationAggregator().toColumn)
|
||||||
.map(p => p._2)
|
.map(p => p._2)
|
||||||
.write.mode(SaveMode.Overwrite).save(s"$workingPath/publication")
|
.write.mode(SaveMode.Overwrite).save(s"$workingPath/publication")
|
||||||
|
|
||||||
dataset.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datasetEncoder))
|
dataset.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datasetEncoder))
|
||||||
.groupByKey(_._1)(Encoders.STRING)
|
.groupByKey(_._1)(Encoders.STRING)
|
||||||
.agg(EBIAggregator.getDatasetAggregator().toColumn)
|
.agg(EBIAggregator.getDLIDatasetAggregator().toColumn)
|
||||||
.map(p => p._2)
|
.map(p => p._2)
|
||||||
.write.mode(SaveMode.Overwrite).save(s"$workingPath/dataset")
|
.write.mode(SaveMode.Overwrite).save(s"$workingPath/dataset")
|
||||||
|
|
||||||
relations.map(d => (s"${d.getSource}::${d.getRelType}::${d.getTarget}", d))(Encoders.tuple(Encoders.STRING, relEncoder))
|
relations.map(d => (s"${d.getSource}::${d.getRelType}::${d.getTarget}", d))(Encoders.tuple(Encoders.STRING, relEncoder))
|
||||||
.groupByKey(_._1)(Encoders.STRING)
|
.groupByKey(_._1)(Encoders.STRING)
|
||||||
.agg(EBIAggregator.getRelationAggregator().toColumn)
|
.agg(EBIAggregator.getDLIRelationAggregator().toColumn)
|
||||||
.map(p => p._2)
|
.map(p => p._2)
|
||||||
.write.mode(SaveMode.Overwrite).save(s"$workingPath/relation")
|
.write.mode(SaveMode.Overwrite).save(s"$workingPath/relation")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
relations.map(r => (r.getSource, r.getTarget))(Encoders.tuple(Encoders.STRING,Encoders.STRING))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,8 @@ public class PMAuthor implements Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getFullName() {
|
public String getFullName() {
|
||||||
return String.format("%s, %s", this.foreName, this.lastName);
|
return String
|
||||||
|
.format("%s, %s", this.foreName != null ? this.foreName : "", this.lastName != null ? this.lastName : "");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,106 @@
|
||||||
|
package eu.dnetlib.dhp.sx.graph
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Oaf
|
||||||
|
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation, DLIUnknown}
|
||||||
|
import eu.dnetlib.dhp.sx.ebi.EBIAggregator
|
||||||
|
import eu.dnetlib.dhp.sx.ebi.model.{PMArticle, PMAuthor, PMJournal}
|
||||||
|
import org.apache.commons.io.IOUtils
|
||||||
|
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
||||||
|
import org.slf4j.LoggerFactory
|
||||||
|
|
||||||
|
object SparkSplitOafTODLIEntities {
|
||||||
|
|
||||||
|
|
||||||
|
def getKeyRelation(rel:DLIRelation):String = {
|
||||||
|
s"${rel.getSource}::${rel.getRelType}::${rel.getTarget}"
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
def main(args: Array[String]): Unit = {
|
||||||
|
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkSplitOafTODLIEntities.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json")))
|
||||||
|
val logger = LoggerFactory.getLogger(SparkSplitOafTODLIEntities.getClass)
|
||||||
|
parser.parseArgument(args)
|
||||||
|
|
||||||
|
val workingPath: String = parser.get("workingPath")
|
||||||
|
logger.info(s"Working dir path = $workingPath")
|
||||||
|
|
||||||
|
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
|
||||||
|
implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication]
|
||||||
|
implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo[DLIDataset]
|
||||||
|
implicit val unkEncoder: Encoder[DLIUnknown] = Encoders.kryo[DLIUnknown]
|
||||||
|
implicit val relEncoder: Encoder[DLIRelation] = Encoders.kryo[DLIRelation]
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
val spark:SparkSession = SparkSession
|
||||||
|
.builder()
|
||||||
|
.appName(SparkSplitOafTODLIEntities.getClass.getSimpleName)
|
||||||
|
.master(parser.get("master"))
|
||||||
|
.getOrCreate()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf]
|
||||||
|
|
||||||
|
val ebi_dataset:Dataset[DLIDataset] = spark.read.load(s"$workingPath/ebi/baseline_dataset_ebi").as[DLIDataset]
|
||||||
|
val ebi_publication:Dataset[DLIPublication] = spark.read.load(s"$workingPath/ebi/baseline_publication_ebi").as[DLIPublication]
|
||||||
|
val ebi_relation:Dataset[DLIRelation] = spark.read.load(s"$workingPath/ebi/baseline_relation_ebi").as[DLIRelation]
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
OAFDataset
|
||||||
|
.filter(s => s != null && s.isInstanceOf[DLIPublication])
|
||||||
|
.map(s =>s.asInstanceOf[DLIPublication])
|
||||||
|
.union(ebi_publication)
|
||||||
|
.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder))
|
||||||
|
.groupByKey(_._1)(Encoders.STRING)
|
||||||
|
.agg(EBIAggregator.getDLIPublicationAggregator().toColumn)
|
||||||
|
.map(p => p._2)
|
||||||
|
.repartition(1000)
|
||||||
|
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/publication")
|
||||||
|
|
||||||
|
OAFDataset
|
||||||
|
.filter(s => s != null && s.isInstanceOf[DLIDataset])
|
||||||
|
.map(s =>s.asInstanceOf[DLIDataset])
|
||||||
|
.union(ebi_dataset)
|
||||||
|
.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datEncoder))
|
||||||
|
.groupByKey(_._1)(Encoders.STRING)
|
||||||
|
.agg(EBIAggregator.getDLIDatasetAggregator().toColumn)
|
||||||
|
.map(p => p._2)
|
||||||
|
.repartition(1000)
|
||||||
|
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/dataset")
|
||||||
|
|
||||||
|
|
||||||
|
OAFDataset
|
||||||
|
.filter(s => s != null && s.isInstanceOf[DLIUnknown])
|
||||||
|
.map(s =>s.asInstanceOf[DLIUnknown])
|
||||||
|
.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, unkEncoder))
|
||||||
|
.groupByKey(_._1)(Encoders.STRING)
|
||||||
|
.agg(EBIAggregator.getDLIUnknownAggregator().toColumn)
|
||||||
|
.map(p => p._2)
|
||||||
|
.repartition(1000)
|
||||||
|
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/unknown")
|
||||||
|
|
||||||
|
|
||||||
|
OAFDataset
|
||||||
|
.filter(s => s != null && s.isInstanceOf[DLIRelation])
|
||||||
|
.map(s =>s.asInstanceOf[DLIRelation])
|
||||||
|
.union(ebi_relation)
|
||||||
|
.map(d => (getKeyRelation(d), d))(Encoders.tuple(Encoders.STRING, relEncoder))
|
||||||
|
.groupByKey(_._1)(Encoders.STRING)
|
||||||
|
.agg(EBIAggregator.getDLIRelationAggregator().toColumn)
|
||||||
|
.map(p => p._2)
|
||||||
|
.repartition(1000)
|
||||||
|
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/relation")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,73 @@
|
||||||
|
package eu.dnetlib.dhp.sx.graph
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Oaf
|
||||||
|
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation}
|
||||||
|
import eu.dnetlib.dhp.sx.graph.parser.{DatasetScholexplorerParser, PublicationScholexplorerParser}
|
||||||
|
import eu.dnetlib.scholexplorer.relation.RelationMapper
|
||||||
|
import org.apache.commons.io.IOUtils
|
||||||
|
import org.apache.hadoop.io.{IntWritable, Text}
|
||||||
|
import org.apache.spark.SparkConf
|
||||||
|
import org.apache.spark.rdd.RDD
|
||||||
|
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
|
||||||
|
import org.slf4j.LoggerFactory
|
||||||
|
|
||||||
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This new version of the Job read a sequential File containing XML stored in the aggregator and generates a Dataset OAF of heterogeneous
|
||||||
|
* entities like Dataset, Relation, Publication and Unknown
|
||||||
|
*/
|
||||||
|
|
||||||
|
object SparkXMLToOAFDataset {
|
||||||
|
|
||||||
|
|
||||||
|
def main(args: Array[String]): Unit = {
|
||||||
|
val logger = LoggerFactory.getLogger(SparkXMLToOAFDataset.getClass)
|
||||||
|
val conf = new SparkConf()
|
||||||
|
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkXMLToOAFDataset.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/input_graph_scholix_parameters.json")))
|
||||||
|
parser.parseArgument(args)
|
||||||
|
val spark =
|
||||||
|
SparkSession
|
||||||
|
.builder()
|
||||||
|
.config(conf)
|
||||||
|
.appName(SparkXMLToOAFDataset.getClass.getSimpleName)
|
||||||
|
.master(parser.get("master")).getOrCreate()
|
||||||
|
|
||||||
|
val sc = spark.sparkContext
|
||||||
|
|
||||||
|
implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf]
|
||||||
|
implicit val datasetEncoder:Encoder[DLIDataset] = Encoders.kryo[DLIDataset]
|
||||||
|
implicit val publicationEncoder:Encoder[DLIPublication] = Encoders.kryo[DLIPublication]
|
||||||
|
implicit val relationEncoder:Encoder[DLIRelation] = Encoders.kryo[DLIRelation]
|
||||||
|
|
||||||
|
val relationMapper = RelationMapper.load
|
||||||
|
|
||||||
|
val inputPath: String = parser.get("sourcePath")
|
||||||
|
val entity: String = parser.get("entity")
|
||||||
|
val targetPath = parser.get("targetPath")
|
||||||
|
|
||||||
|
logger.info(s"Input path is $inputPath")
|
||||||
|
logger.info(s"Entity path is $entity")
|
||||||
|
logger.info(s"Target Path is $targetPath")
|
||||||
|
|
||||||
|
val scholixRdd:RDD[Oaf] = sc.sequenceFile(inputPath, classOf[IntWritable], classOf[Text])
|
||||||
|
.map(s => s._2.toString)
|
||||||
|
.flatMap(s => {
|
||||||
|
entity match {
|
||||||
|
case "publication" =>
|
||||||
|
val p = new PublicationScholexplorerParser
|
||||||
|
val l =p.parseObject(s, relationMapper)
|
||||||
|
if (l != null) l.asScala else List()
|
||||||
|
case "dataset" =>
|
||||||
|
val d = new DatasetScholexplorerParser
|
||||||
|
val l =d.parseObject(s, relationMapper)
|
||||||
|
if (l != null) l.asScala else List()
|
||||||
|
}
|
||||||
|
}).filter(s => s!= null)
|
||||||
|
spark.createDataset(scholixRdd).write.mode(SaveMode.Append).save(targetPath)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -317,6 +317,15 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
|
||||||
.collect(Collectors.toList()));
|
.collect(Collectors.toList()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TERRIBLE HACK TO AVOID EMPTY COLLECTED FROM
|
||||||
|
if (parsedObject.getDlicollectedfrom() == null) {
|
||||||
|
|
||||||
|
final KeyValue cf = new KeyValue();
|
||||||
|
cf.setKey("dli_________::europe_pmc__");
|
||||||
|
cf.setValue("Europe PMC");
|
||||||
|
parsedObject.setCollectedfrom(Collections.singletonList(cf));
|
||||||
|
}
|
||||||
|
|
||||||
if (StringUtils.isNotBlank(resolvedURL)) {
|
if (StringUtils.isNotBlank(resolvedURL)) {
|
||||||
Instance i = new Instance();
|
Instance i = new Instance();
|
||||||
i.setCollectedfrom(parsedObject.getCollectedfrom().get(0));
|
i.setCollectedfrom(parsedObject.getCollectedfrom().get(0));
|
||||||
|
|
|
@ -40,5 +40,11 @@
|
||||||
"paramLongName": "dbschema",
|
"paramLongName": "dbschema",
|
||||||
"paramDescription": "the database schema according to the D-Net infrastructure (beta or production)",
|
"paramDescription": "the database schema according to the D-Net infrastructure (beta or production)",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "nsbl",
|
||||||
|
"paramLongName": "nsPrefixBlacklist",
|
||||||
|
"paramDescription": "a blacklist of nsprefixes (comma separeted)",
|
||||||
|
"paramRequired": false
|
||||||
}
|
}
|
||||||
]
|
]
|
|
@ -43,7 +43,11 @@
|
||||||
<name>isLookupUrl</name>
|
<name>isLookupUrl</name>
|
||||||
<description>the address of the lookUp service</description>
|
<description>the address of the lookUp service</description>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>nsPrefixBlacklist</name>
|
||||||
|
<value></value>
|
||||||
|
<description>a blacklist of nsprefixes (comma separeted)</description>
|
||||||
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>sparkDriverMemory</name>
|
<name>sparkDriverMemory</name>
|
||||||
<description>memory for driver process</description>
|
<description>memory for driver process</description>
|
||||||
|
@ -131,6 +135,7 @@
|
||||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||||
<arg>--action</arg><arg>claims</arg>
|
<arg>--action</arg><arg>claims</arg>
|
||||||
<arg>--dbschema</arg><arg>${dbSchema}</arg>
|
<arg>--dbschema</arg><arg>${dbSchema}</arg>
|
||||||
|
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
|
||||||
</java>
|
</java>
|
||||||
<ok to="ImportODF_claims"/>
|
<ok to="ImportODF_claims"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
@ -182,6 +187,7 @@
|
||||||
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
|
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
|
||||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||||
<arg>--dbschema</arg><arg>${dbSchema}</arg>
|
<arg>--dbschema</arg><arg>${dbSchema}</arg>
|
||||||
|
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
|
||||||
</java>
|
</java>
|
||||||
<ok to="ImportODF"/>
|
<ok to="ImportODF"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
|
|
@ -38,7 +38,11 @@
|
||||||
<name>isLookupUrl</name>
|
<name>isLookupUrl</name>
|
||||||
<description>the address of the lookUp service</description>
|
<description>the address of the lookUp service</description>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>nsPrefixBlacklist</name>
|
||||||
|
<value></value>
|
||||||
|
<description>a blacklist of nsprefixes (comma separeted)</description>
|
||||||
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>sparkDriverMemory</name>
|
<name>sparkDriverMemory</name>
|
||||||
<description>memory for driver process</description>
|
<description>memory for driver process</description>
|
||||||
|
@ -113,6 +117,7 @@
|
||||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||||
<arg>--action</arg><arg>claims</arg>
|
<arg>--action</arg><arg>claims</arg>
|
||||||
<arg>--dbschema</arg><arg>${dbSchema}</arg>
|
<arg>--dbschema</arg><arg>${dbSchema}</arg>
|
||||||
|
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
|
||||||
</java>
|
</java>
|
||||||
<ok to="ImportODF_claims"/>
|
<ok to="ImportODF_claims"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
|
|
@ -25,7 +25,11 @@
|
||||||
<name>isLookupUrl</name>
|
<name>isLookupUrl</name>
|
||||||
<description>the address of the lookUp service</description>
|
<description>the address of the lookUp service</description>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>nsPrefixBlacklist</name>
|
||||||
|
<value></value>
|
||||||
|
<description>a blacklist of nsprefixes (comma separeted)</description>
|
||||||
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>sparkDriverMemory</name>
|
<name>sparkDriverMemory</name>
|
||||||
<description>memory for driver process</description>
|
<description>memory for driver process</description>
|
||||||
|
@ -99,6 +103,7 @@
|
||||||
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
|
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
|
||||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||||
<arg>--dbschema</arg><arg>${dbSchema}</arg>
|
<arg>--dbschema</arg><arg>${dbSchema}</arg>
|
||||||
|
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
|
||||||
</java>
|
</java>
|
||||||
<ok to="ImportDB_claims"/>
|
<ok to="ImportDB_claims"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
@ -117,6 +122,7 @@
|
||||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||||
<arg>--dbschema</arg><arg>${dbSchema}</arg>
|
<arg>--dbschema</arg><arg>${dbSchema}</arg>
|
||||||
<arg>--action</arg><arg>claims</arg>
|
<arg>--action</arg><arg>claims</arg>
|
||||||
|
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
|
||||||
</java>
|
</java>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
|
|
@ -28,6 +28,11 @@
|
||||||
<name>isLookupUrl</name>
|
<name>isLookupUrl</name>
|
||||||
<description>the address of the lookUp service</description>
|
<description>the address of the lookUp service</description>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>nsPrefixBlacklist</name>
|
||||||
|
<value></value>
|
||||||
|
<description>a blacklist of nsprefixes (comma separeted)</description>
|
||||||
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>sparkDriverMemory</name>
|
<name>sparkDriverMemory</name>
|
||||||
<description>memory for driver process</description>
|
<description>memory for driver process</description>
|
||||||
|
@ -67,6 +72,7 @@
|
||||||
<arg>-pguser</arg><arg>${postgresUser}</arg>
|
<arg>-pguser</arg><arg>${postgresUser}</arg>
|
||||||
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
|
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
|
||||||
<arg>-islookup</arg><arg>${isLookupUrl}</arg>
|
<arg>-islookup</arg><arg>${isLookupUrl}</arg>
|
||||||
|
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
|
||||||
</java>
|
</java>
|
||||||
<ok to="ImportODF"/>
|
<ok to="ImportODF"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
<configuration>
|
<configuration>
|
||||||
|
|
||||||
<!-- OCEAN -->
|
<!-- OCEAN -->
|
||||||
<!--
|
|
||||||
<property>
|
<property>
|
||||||
<name>jobTracker</name>
|
<name>jobTracker</name>
|
||||||
<value>yarnRM</value>
|
<value>yarnRM</value>
|
||||||
|
@ -18,26 +18,26 @@
|
||||||
<name>spark2YarnHistoryServerAddress</name>
|
<name>spark2YarnHistoryServerAddress</name>
|
||||||
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
|
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
|
||||||
</property>
|
</property>
|
||||||
-->
|
|
||||||
|
|
||||||
<!-- GARR -->
|
<!-- GARR -->
|
||||||
|
|
||||||
<property>
|
<!-- <property>-->
|
||||||
<name>jobTracker</name>
|
<!-- <name>jobTracker</name>-->
|
||||||
<value>yarn</value>
|
<!-- <value>yarn</value>-->
|
||||||
</property>
|
<!-- </property>-->
|
||||||
<property>
|
<!-- <property>-->
|
||||||
<name>nameNode</name>
|
<!-- <name>nameNode</name>-->
|
||||||
<value>hdfs://hadoop-rm1.garr-pa1.d4science.org:8020</value>
|
<!-- <value>hdfs://hadoop-rm1.garr-pa1.d4science.org:8020</value>-->
|
||||||
</property>
|
<!-- </property>-->
|
||||||
<property>
|
<!-- <property>-->
|
||||||
<name>hive_metastore_uris</name>
|
<!-- <name>hive_metastore_uris</name>-->
|
||||||
<value>thrift://hadoop-edge3.garr-pa1.d4science.org:9083</value>
|
<!-- <value>thrift://hadoop-edge3.garr-pa1.d4science.org:9083</value>-->
|
||||||
</property>
|
<!-- </property>-->
|
||||||
<property>
|
<!-- <property>-->
|
||||||
<name>spark2YarnHistoryServerAddress</name>
|
<!-- <name>spark2YarnHistoryServerAddress</name>-->
|
||||||
<value>http://hadoop-rm2.garr-pa1.d4science.org:19888</value>
|
<!-- <value>http://hadoop-rm2.garr-pa1.d4science.org:19888</value>-->
|
||||||
</property>
|
<!-- </property>-->
|
||||||
|
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
|
|
|
@ -18,7 +18,7 @@
|
||||||
</property>
|
</property>
|
||||||
</parameters>
|
</parameters>
|
||||||
|
|
||||||
<start to="GenerateUpdates"/>
|
<start to="CreateEBIDataSet"/>
|
||||||
|
|
||||||
|
|
||||||
<kill name="Kill">
|
<kill name="Kill">
|
||||||
|
@ -48,6 +48,28 @@
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
|
<action name="CreateEBIDataSet">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn-cluster</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Create EBI DataSet</name>
|
||||||
|
|
||||||
|
<class>eu.dnetlib.dhp.sx.ebi.SparkCreateEBIDataFrame</class>
|
||||||
|
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.sql.shuffle.partitions=1000
|
||||||
|
${sparkExtraOPT}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
<arg>--master</arg><arg>yarn</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="GenerateUpdates"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
<action name="GenerateUpdates">
|
<action name="GenerateUpdates">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn-cluster</master>
|
<master>yarn-cluster</master>
|
||||||
|
@ -71,27 +93,7 @@
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
|
|
||||||
<action name="CreateEBIDataSet">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn-cluster</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>Create EBI DataSet</name>
|
|
||||||
|
|
||||||
<class>eu.dnetlib.dhp.sx.ebi.SparkCreateEBIDataFrame</class>
|
|
||||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.sql.shuffle.partitions=1000
|
|
||||||
${sparkExtraOPT}
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
|
||||||
<arg>--master</arg><arg>yarn</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="End"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<end name="End"/>
|
<end name="End"/>
|
||||||
</workflow-app>
|
</workflow-app>
|
|
@ -1,7 +1,4 @@
|
||||||
[
|
[
|
||||||
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
|
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
|
||||||
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true},
|
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the work dir path", "paramRequired": true}
|
||||||
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the result data", "paramRequired": true},
|
|
||||||
{"paramName":"td", "paramLongName":"targetDir", "paramDescription": "the name of the result data", "paramRequired": true},
|
|
||||||
{"paramName":"e", "paramLongName":"entities", "paramDescription": "the entity type to be filtered", "paramRequired": true}
|
|
||||||
]
|
]
|
|
@ -101,12 +101,17 @@
|
||||||
<master>yarn-cluster</master>
|
<master>yarn-cluster</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>Import ${entity} and related entities</name>
|
<name>Import ${entity} and related entities</name>
|
||||||
<class>eu.dnetlib.dhp.sx.graph.SparkScholexplorerGraphImporter</class>
|
<class>eu.dnetlib.dhp.sx.graph.SparkXMLToOAFDataset</class>
|
||||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT}</spark-opts>
|
<spark-opts>
|
||||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
--executor-memory ${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
${sparkExtraOPT}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>-mt</arg> <arg>yarn</arg>
|
||||||
<arg>--sourcePath</arg><arg>${targetXMLPath}</arg>
|
<arg>--sourcePath</arg><arg>${targetXMLPath}</arg>
|
||||||
<arg>--targetPath</arg><arg>${targetEntityPath}</arg>
|
<arg>--targetPath</arg><arg>${workingPath}/input/OAFDataset</arg>
|
||||||
<arg>--entity</arg><arg>${entity}</arg>
|
<arg>--entity</arg><arg>${entity}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
|
|
|
@ -1,16 +1,8 @@
|
||||||
<workflow-app name="Create Raw Graph Step 2: Map XML to OAF Entities" xmlns="uri:oozie:workflow:0.5">
|
<workflow-app name="Create Raw Graph Step 2: Map XML to OAF Entities" xmlns="uri:oozie:workflow:0.5">
|
||||||
<parameters>
|
<parameters>
|
||||||
<property>
|
<property>
|
||||||
<name>sourcePath</name>
|
<name>workingPath</name>
|
||||||
<description>the source path</description>
|
<description>the working path</description>
|
||||||
</property>
|
|
||||||
<property>
|
|
||||||
<name>targetPath</name>
|
|
||||||
<description>the source path</description>
|
|
||||||
</property>
|
|
||||||
<property>
|
|
||||||
<name>targetDir</name>
|
|
||||||
<description>the name of the path</description>
|
|
||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>sparkDriverMemory</name>
|
<name>sparkDriverMemory</name>
|
||||||
|
@ -20,32 +12,13 @@
|
||||||
<name>sparkExecutorMemory</name>
|
<name>sparkExecutorMemory</name>
|
||||||
<description>memory for individual executor</description>
|
<description>memory for individual executor</description>
|
||||||
</property>
|
</property>
|
||||||
<property>
|
|
||||||
<name>entities</name>
|
|
||||||
<description>the entities to be extracted</description>
|
|
||||||
</property>
|
|
||||||
</parameters>
|
</parameters>
|
||||||
|
|
||||||
<start to="DeleteTargetPath"/>
|
<start to="ExtractDLIEntities"/>
|
||||||
|
|
||||||
<kill name="Kill">
|
<kill name="Kill">
|
||||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
</kill>
|
</kill>
|
||||||
<action name="DeleteTargetPath">
|
|
||||||
<fs>
|
|
||||||
<mkdir path="${targetPath}"/>
|
|
||||||
<mkdir path="${targetPath}/dataset"/>
|
|
||||||
<mkdir path="${targetPath}/publication"/>
|
|
||||||
<mkdir path="${targetPath}/unknown"/>
|
|
||||||
<mkdir path="${targetPath}/relation"/>
|
|
||||||
<delete path='${targetPath}/dataset/${targetDir}'/>
|
|
||||||
<delete path='${targetPath}/publication/${targetDir}'/>
|
|
||||||
<delete path='${targetPath}/unknown/${targetDir}'/>
|
|
||||||
<delete path='${targetPath}/relation/${targetDir}'/>
|
|
||||||
</fs>
|
|
||||||
<ok to="ExtractDLIEntities"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="ExtractDLIEntities">
|
<action name="ExtractDLIEntities">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
@ -53,19 +26,18 @@
|
||||||
<name-node>${nameNode}</name-node>
|
<name-node>${nameNode}</name-node>
|
||||||
<master>yarn-cluster</master>
|
<master>yarn-cluster</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>Extract ${entities}</name>
|
<name>Extract DLI Entities</name>
|
||||||
<class>eu.dnetlib.dhp.sx.graph.SparkExtractEntitiesJob</class>
|
<class>eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities</class>
|
||||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--executor-memory ${sparkExecutorMemory}
|
--executor-memory ${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
--driver-memory=${sparkDriverMemory}
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
${sparkExtraOPT}
|
${sparkExtraOPT}
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||||
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
<arg>--targetPath</arg><arg>${targetPath}</arg>
|
|
||||||
<arg>--targetDir</arg><arg>${targetDir}</arg>
|
|
||||||
<arg>--entities</arg><arg>${entities}</arg>
|
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
|
|
@ -7,6 +7,8 @@ import static org.mockito.Mockito.lenient;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.function.Predicate;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
|
@ -19,9 +21,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||||
|
|
||||||
|
@ -62,10 +62,12 @@ public class CleaningFunctionTest {
|
||||||
assertTrue(p_in instanceof Result);
|
assertTrue(p_in instanceof Result);
|
||||||
assertTrue(p_in instanceof Publication);
|
assertTrue(p_in instanceof Publication);
|
||||||
|
|
||||||
Publication p_out = OafCleaner.apply(p_in, mapping);
|
Publication p_out = OafCleaner.apply(CleanGraphSparkJob.fixVocabularyNames(p_in), mapping);
|
||||||
|
|
||||||
assertNotNull(p_out);
|
assertNotNull(p_out);
|
||||||
|
|
||||||
|
assertNotNull(p_out.getPublisher());
|
||||||
|
assertNull(p_out.getPublisher().getValue());
|
||||||
assertEquals("und", p_out.getLanguage().getClassid());
|
assertEquals("und", p_out.getLanguage().getClassid());
|
||||||
assertEquals("Undetermined", p_out.getLanguage().getClassname());
|
assertEquals("Undetermined", p_out.getLanguage().getClassname());
|
||||||
|
|
||||||
|
@ -88,6 +90,16 @@ public class CleaningFunctionTest {
|
||||||
|
|
||||||
Publication p_defaults = CleanGraphSparkJob.fixDefaults(p_out);
|
Publication p_defaults = CleanGraphSparkJob.fixDefaults(p_out);
|
||||||
assertEquals("CLOSED", p_defaults.getBestaccessright().getClassid());
|
assertEquals("CLOSED", p_defaults.getBestaccessright().getClassid());
|
||||||
|
assertNull(p_out.getPublisher());
|
||||||
|
|
||||||
|
getAuthorPids(p_defaults).forEach(pid -> {
|
||||||
|
System.out
|
||||||
|
.println(
|
||||||
|
String
|
||||||
|
.format(
|
||||||
|
"%s [%s - %s]", pid.getValue(), pid.getQualifier().getClassid(),
|
||||||
|
pid.getQualifier().getClassname()));
|
||||||
|
});
|
||||||
|
|
||||||
// TODO add more assertions to verity the cleaned values
|
// TODO add more assertions to verity the cleaned values
|
||||||
System.out.println(MAPPER.writeValueAsString(p_out));
|
System.out.println(MAPPER.writeValueAsString(p_out));
|
||||||
|
@ -97,7 +109,7 @@ public class CleaningFunctionTest {
|
||||||
*/
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
private Stream<Qualifier> getAuthorPidTypes(Publication pub) {
|
private Stream<Qualifier> getAuthorPidTypes(Result pub) {
|
||||||
return pub
|
return pub
|
||||||
.getAuthor()
|
.getAuthor()
|
||||||
.stream()
|
.stream()
|
||||||
|
@ -106,6 +118,14 @@ public class CleaningFunctionTest {
|
||||||
.map(s -> s.getQualifier());
|
.map(s -> s.getQualifier());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Stream<StructuredProperty> getAuthorPids(Result pub) {
|
||||||
|
return pub
|
||||||
|
.getAuthor()
|
||||||
|
.stream()
|
||||||
|
.map(a -> a.getPid())
|
||||||
|
.flatMap(p -> p.stream());
|
||||||
|
}
|
||||||
|
|
||||||
private List<String> vocs() throws IOException {
|
private List<String> vocs() throws IOException {
|
||||||
return IOUtils
|
return IOUtils
|
||||||
.readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt"));
|
.readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt"));
|
||||||
|
|
|
@ -276,6 +276,17 @@ public class MappersTest {
|
||||||
System.out.println("***************");
|
System.out.println("***************");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testClaimDedup() throws IOException {
|
||||||
|
final String xml = IOUtils.toString(getClass().getResourceAsStream("oaf_claim_dedup.xml"));
|
||||||
|
final List<Oaf> list = new OafToOafMapper(vocs, false).processMdRecord(xml);
|
||||||
|
|
||||||
|
System.out.println("***************");
|
||||||
|
System.out.println(new ObjectMapper().writeValueAsString(list));
|
||||||
|
System.out.println("***************");
|
||||||
|
}
|
||||||
|
|
||||||
private void assertValidId(final String id) {
|
private void assertValidId(final String id) {
|
||||||
assertEquals(49, id.length());
|
assertEquals(49, id.length());
|
||||||
assertEquals('|', id.charAt(2));
|
assertEquals('|', id.charAt(2));
|
||||||
|
|
|
@ -0,0 +1,92 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.oa.graph.raw.common;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Field;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
|
||||||
|
class VerifyNsPrefixPredicateTest {
|
||||||
|
|
||||||
|
private VerifyNsPrefixPredicate predicate;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void setUp() throws Exception {
|
||||||
|
predicate = new VerifyNsPrefixPredicate("corda,nsf,wt");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testTestValue() {
|
||||||
|
assertFalse(predicate.testValue("corda__2020"));
|
||||||
|
assertFalse(predicate.testValue("nsf________"));
|
||||||
|
assertFalse(predicate.testValue("nsf"));
|
||||||
|
assertFalse(predicate.testValue("corda"));
|
||||||
|
assertFalse(predicate.testValue("10|corda_______::fjkdsfjksdhfksj"));
|
||||||
|
assertFalse(predicate.testValue("20|corda_______::fjkdsfjksdhfksj"));
|
||||||
|
|
||||||
|
assertTrue(predicate.testValue("xxxxxx_____"));
|
||||||
|
assertTrue(predicate.testValue("10|xxxxxx_____::sdasdasaddasad"));
|
||||||
|
|
||||||
|
assertTrue(predicate.testValue(null));
|
||||||
|
assertTrue(predicate.testValue(""));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testTest_ds_true() {
|
||||||
|
final Field<String> prefix = new Field<>();
|
||||||
|
prefix.setValue("xxxxxx______");
|
||||||
|
|
||||||
|
final Datasource ds = new Datasource();
|
||||||
|
ds.setNamespaceprefix(prefix);
|
||||||
|
|
||||||
|
assertTrue(predicate.test(ds));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testTest_ds_false() {
|
||||||
|
final Field<String> prefix = new Field<>();
|
||||||
|
prefix.setValue("corda__2020");
|
||||||
|
|
||||||
|
final Datasource ds = new Datasource();
|
||||||
|
ds.setNamespaceprefix(prefix);
|
||||||
|
|
||||||
|
assertFalse(predicate.test(ds));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testTest_rel_true() {
|
||||||
|
final Relation rel = new Relation();
|
||||||
|
rel.setSource("10|yyyyyy______:sdfsfsffsdfs");
|
||||||
|
rel.setTarget("10|xxxxxx______:sdfsfsffsdfs");
|
||||||
|
assertTrue(predicate.test(rel));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testTest_rel_false() {
|
||||||
|
final Relation rel = new Relation();
|
||||||
|
rel.setSource("10|corda_______:sdfsfsffsdfs");
|
||||||
|
rel.setTarget("10|xxxxxx______:sdfsfsffsdfs");
|
||||||
|
assertFalse(predicate.test(rel));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testTest_proj_true() {
|
||||||
|
final Project p = new Project();
|
||||||
|
p.setId("10|xxxxxx______:sdfsfsffsdfs");
|
||||||
|
assertTrue(predicate.test(p));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testTest_proj_false() {
|
||||||
|
final Project p = new Project();
|
||||||
|
p.setId("10|corda_____:sdfsfsffsdfs");
|
||||||
|
assertFalse(predicate.test(p));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -59,6 +59,28 @@
|
||||||
"schemename": "dnet:pid_types"
|
"schemename": "dnet:pid_types"
|
||||||
},
|
},
|
||||||
"value": "qwerty"
|
"value": "qwerty"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"dataInfo": {
|
||||||
|
"deletedbyinference": false,
|
||||||
|
"inferenceprovenance": "",
|
||||||
|
"inferred": false,
|
||||||
|
"invisible": false,
|
||||||
|
"provenanceaction": {
|
||||||
|
"classid": "sysimport:crosswalk:datasetarchive",
|
||||||
|
"classname": "sysimport:crosswalk:datasetarchive",
|
||||||
|
"schemeid": "dnet:provenanceActions",
|
||||||
|
"schemename": "dnet:provenanceActions"
|
||||||
|
},
|
||||||
|
"trust": "0.9"
|
||||||
|
},
|
||||||
|
"qualifier": {
|
||||||
|
"classid": "ORCID",
|
||||||
|
"classname": "ORCID",
|
||||||
|
"schemeid": "",
|
||||||
|
"schemename": ""
|
||||||
|
},
|
||||||
|
"value": "asdasd"
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"rank": 2,
|
"rank": 2,
|
||||||
|
@ -186,6 +208,9 @@
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"bestaccessright": null,
|
"bestaccessright": null,
|
||||||
|
"publisher": {
|
||||||
|
"value": null
|
||||||
|
},
|
||||||
"collectedfrom": [
|
"collectedfrom": [
|
||||||
{
|
{
|
||||||
"key": "10|CSC_________::a2b9ce8435390bcbfc05f3cae3948747",
|
"key": "10|CSC_________::a2b9ce8435390bcbfc05f3cae3948747",
|
||||||
|
|
|
@ -0,0 +1,182 @@
|
||||||
|
<oai:record xmlns:oaf="http://namespace.openaire.eu/oaf" xmlns:dc="http://purl.org/dc/elements/1.1/"
|
||||||
|
xmlns:oai_dc="http://www.openarchives.org/OAI/2.0/oai_dc/"
|
||||||
|
xmlns:oai="http://www.openarchives.org/OAI/2.0/"
|
||||||
|
xmlns:dri="http://www.driver-repository.eu/namespace/dri"
|
||||||
|
xmlns:dr="http://www.driver-repository.eu/namespace/dr">
|
||||||
|
<oai:header>
|
||||||
|
<dri:objIdentifier>dedup_wf_001::534276867e917fe9efe0cca10e363457</dri:objIdentifier>
|
||||||
|
<dri:dateOfCollection>2020-08-02T22:55:40.866Z</dri:dateOfCollection>
|
||||||
|
<oaf:datasourceprefix>openaire____</oaf:datasourceprefix>
|
||||||
|
<dr:dateOfTransformation>2020-08-02T23:53:04.582Z</dr:dateOfTransformation>
|
||||||
|
</oai:header>
|
||||||
|
<oai:metadata>
|
||||||
|
<dc:contributor>ATLAS (IHEF, IoP, FNWI)</dc:contributor>
|
||||||
|
<dc:contributor>Doğuş Üniversitesi, Fen Edebiyat Fakültesi, Fizik Bölümü</dc:contributor>
|
||||||
|
<dc:contributor>TR3959</dc:contributor>
|
||||||
|
<dc:contributor>Doğuş Üniversitesi, Fen Edebiyat Fakültesi, Fizik Bölümü</dc:contributor>
|
||||||
|
<dc:contributor>TR3959</dc:contributor>
|
||||||
|
<dc:source>urn:issn:1748-0221</dc:source>
|
||||||
|
<dc:source>VOLUME=7;ISSUE=1;ISSN=1748-0221;TITLE=Journal of Instrumentation</dc:source>
|
||||||
|
<dc:source>ATLAS Collaboration Mitsou, Vasiliki Fiorini, Luca Ros Martínez, Eduardo Castillo
|
||||||
|
Giménez, María Victoria Fuster Verdú, Juan A. García García, Carmen Cabrera Urbán,
|
||||||
|
Susana Martí García, Salvador Salt Cairols, José Lacasta Llácer, Carlos Valls Ferrer,
|
||||||
|
Juan Antonio Higón Rodríguez, Emilio Ferrer Soria, Antonio González de la Hoz, Santiago
|
||||||
|
Kaci, Mohammed Hernández Jiménez, Yesenia Villaplana Pérez, Miguel 2012 A study of the
|
||||||
|
material in the ATLAS inner detector using secondary hadronic interactions Journal Of
|
||||||
|
Instrumentation 7 P01013 1 41</dc:source>
|
||||||
|
<dc:source>Journal of Instrumentation, 7(1)</dc:source>
|
||||||
|
<dc:source>Aad, G; Abbott, B; Abdallah, J; Abdelalim, AA; Abdesselam, A; Abdinov, O; et
|
||||||
|
al.(2012). A study of the material in the ATLAS inner detector using secondary hadronic
|
||||||
|
interactions. Journal of Instrumentation, 7(1). doi: 10.1088/1748-0221/7/01/P01013. UC
|
||||||
|
Santa Cruz: Retrieved from: http://www.escholarship.org/uc/item/05j2j2br</dc:source>
|
||||||
|
<dc:source>Journal of Instrumentation, 7</dc:source>
|
||||||
|
<dc:source>VOLUME=7;ISSN=1748-0221;TITLE=Journal of Instrumentation</dc:source>
|
||||||
|
<dc:source>1748-0221</dc:source>
|
||||||
|
<dc:source>Journal of Instrumentation 7, P01013 (2012).
|
||||||
|
doi:10.1088/1748-0221/7/01/P01013</dc:source>
|
||||||
|
<dc:title>A measurement of the material in the ATLAS inner detector using secondary hadronic
|
||||||
|
interactions</dc:title>
|
||||||
|
<dc:language>und</dc:language>
|
||||||
|
<dc:subject classid="keyword" classname="keyword">Detector modelling and simulations I
|
||||||
|
(interaction of radiation with matter, interaction of photons with matter, interaction
|
||||||
|
of hadrons with matter, etc); Particle tracking detectors (Solid-state detectors); Si
|
||||||
|
microstrip and pad detectors; Large detector systems for particle and astroparticle
|
||||||
|
physics</dc:subject>
|
||||||
|
<dc:subject classid="keyword" classname="keyword">of photons with matter, interaction of
|
||||||
|
hadrons with matter, etc)</dc:subject>
|
||||||
|
<dc:subject classid="keyword" classname="keyword">Particle Physics - Experiment</dc:subject>
|
||||||
|
<dc:subject classid="keyword" classname="keyword">Detector Modelling and
|
||||||
|
Simulations</dc:subject>
|
||||||
|
<dc:subject classid="keyword" classname="keyword">Detector modelling and simulations I
|
||||||
|
(interaction of radiation with matter, interaction of photons with matter, interaction
|
||||||
|
of hadrons with matter, etc)</dc:subject>
|
||||||
|
<dc:subject classid="keyword" classname="keyword">Large detector systems for particle and
|
||||||
|
astroparticle physics</dc:subject>
|
||||||
|
<dc:subject classid="keyword" classname="keyword">Detector modelling and simulations I
|
||||||
|
(interaction of radiation with matter, interaction</dc:subject>
|
||||||
|
<dc:subject classid="keyword" classname="keyword">Large Detector Systems</dc:subject>
|
||||||
|
<dc:subject classid="keyword" classname="keyword">530</dc:subject>
|
||||||
|
<dc:subject classid="keyword" classname="keyword">Science & Technology</dc:subject>
|
||||||
|
<dc:subject classid="keyword" classname="keyword">:Ciências Físicas [Ciências
|
||||||
|
Naturais]</dc:subject>
|
||||||
|
<dc:subject classid="keyword" classname="keyword">High Energy Physics -
|
||||||
|
Experiment</dc:subject>
|
||||||
|
<dc:subject classid="keyword" classname="keyword">Detectors de radiació</dc:subject>
|
||||||
|
<dc:subject classid="keyword" classname="keyword">Física nuclear</dc:subject>
|
||||||
|
<dc:subject classid="ddc" classname="ddc">ddc:610</dc:subject>
|
||||||
|
<dc:subject classid="keyword" classname="keyword">Si microstrip and pad
|
||||||
|
detectors</dc:subject>
|
||||||
|
<dc:subject classid="keyword" classname="keyword">Particle tracking detectors (Solid-state
|
||||||
|
detectors)</dc:subject>
|
||||||
|
<dc:subject classid="keyword" classname="keyword">Col·lisions (Física nuclear)</dc:subject>
|
||||||
|
<dc:subject classid="keyword" classname="keyword">Particle Tracking Detectors</dc:subject>
|
||||||
|
<dc:publisher>IOP Publishing</dc:publisher>
|
||||||
|
<dc:format>application/pdf</dc:format>
|
||||||
|
<dc:format>application/pdf</dc:format>
|
||||||
|
<dc:format>application/pdf</dc:format>
|
||||||
|
<dc:format>application/pdf</dc:format>
|
||||||
|
<dc:format>application/pdf</dc:format>
|
||||||
|
<dc:format>application/pdf</dc:format>
|
||||||
|
<dc:date>2016-05-02</dc:date>
|
||||||
|
<dc:description>The ATLAS inner detector is used to reconstruct secondary vertices due to
|
||||||
|
hadronic interactions of primary collision products, so probing the location and amount
|
||||||
|
of material in the inner region of ATLAS. Data collected in 7 TeV pp collisions at the
|
||||||
|
LHC, with a minimum bias trigger, are used for comparisons with simulated events. The
|
||||||
|
reconstructed secondary vertices have spatial resolutions ranging from ~ 200μm to 1 mm.
|
||||||
|
The overall material description in the simulation is validated to within an
|
||||||
|
experimental uncertainty of about 7%. This will lead to a better understanding of the
|
||||||
|
reconstruction of various objects such as tracks, leptons, jets, and missing transverse
|
||||||
|
momentum. We acknowledge the support of ANPCyT, Argentina; YerPhI, Armenia; ARC,
|
||||||
|
Australia; BMWF, Austria; ANAS, Azerbaijan; SSTC, Belarus; CNPq and FAPESP, Brazil;
|
||||||
|
NSERC, NRC and CFI, Canada; CERN; CONICYT, Chile; CAS, MOST and NSFC, China;
|
||||||
|
COLCIENCIAS, Colombia; MSMT CR, MPO CR and VSC CR, Czech Republic; DNRF, DNSRC and
|
||||||
|
Lundbeck Foundation, Denmark; ARTEMIS, European Union; IN2P3-CNRS, CEA-DSM/IRFU, France;
|
||||||
|
GNAS, Georgia; BMBF, DFG, HGF, MPG and AvH Foundation, Germany; GSRT, Greece; ISF,
|
||||||
|
MINERVA, GIF, DIP and Benoziyo Center, Israel; INFN, Italy; MEXT and JSPS, Japan; CNRST,
|
||||||
|
Morocco; FOM and NWO, Netherlands; RCN, Norway; MNiSW, Poland; GRICES and FCT, Portugal;
|
||||||
|
MERYS (MECTS), Romania; MES of Russia and ROSATOM, Russian Federation; JINR; MSTD,
|
||||||
|
Serbia; MSSR, Slovakia; ARRS and MVZT, Slovenia; DST/NRF, South Africa; MICINN, Spain;
|
||||||
|
SRC and Wallenberg Foundation, Sweden; SER, SNSF and Cantons of Bern and Geneva,
|
||||||
|
Switzerland; NSC, Taiwan; TAEK, Turkey; STFC, the Royal Society and Leverhulme Trust,
|
||||||
|
United Kingdom; DOE and NSF, United States of America.
|
||||||
|
info:eu-repo/semantics/publishedVersion</dc:description>
|
||||||
|
<dc:source>NARCIS</dc:source>
|
||||||
|
<dc:source>DSpace@Dogus</dc:source>
|
||||||
|
<dc:source>Lancaster EPrints</dc:source>
|
||||||
|
<dc:source>CERN Document Server</dc:source>
|
||||||
|
<dc:source>DESY Publication Database</dc:source>
|
||||||
|
<dc:source>OpenAIRE</dc:source>
|
||||||
|
<dc:source>Publikationenserver der Georg-August-Universität Göttingen</dc:source>
|
||||||
|
<dc:source>arXiv.org e-Print Archive</dc:source>
|
||||||
|
<dc:source>CORE (RIOXX-UK Aggregator)</dc:source>
|
||||||
|
<dc:source>eScholarship - University of California</dc:source>
|
||||||
|
<dc:source>Universidade do Minho: RepositoriUM</dc:source>
|
||||||
|
<dc:source>Dokuz Eylul University Open Archive System</dc:source>
|
||||||
|
<dc:source>Repositori d'Objectes Digitals per a l'Ensenyament la Recerca i la
|
||||||
|
Cultura</dc:source>
|
||||||
|
<dc:relation>info:eu-repo/semantics/altIdentifier/doi/10.1088/1748-0221/7/01/P01013</dc:relation>
|
||||||
|
<dc:relation>info:eu-repo/semantics/altIdentifier/doi/10.1088/1748-0221/7/01/P01013.</dc:relation>
|
||||||
|
<dc:type>Article</dc:type>
|
||||||
|
<dc:identifier>http://hdl.handle.net/11376/1605</dc:identifier>
|
||||||
|
<dc:type>Article</dc:type>
|
||||||
|
<dc:identifier>http://www.escholarship.org/uc/item/05j2j2br</dc:identifier>
|
||||||
|
<dc:type>Unknown</dc:type>
|
||||||
|
<dc:identifier>http://cds.cern.ch/record/1394292</dc:identifier>
|
||||||
|
<dc:type>Article</dc:type>
|
||||||
|
<dc:identifier>http://eprints.lancs.ac.uk/68235/</dc:identifier>
|
||||||
|
<dc:type>Article</dc:type>
|
||||||
|
<dc:identifier>http://hdl.handle.net/10550/36188</dc:identifier>
|
||||||
|
<dc:type>Article</dc:type>
|
||||||
|
<dc:identifier>http://eprints.gla.ac.uk/65933/1/65933.pdf</dc:identifier>
|
||||||
|
<dc:type>Preprint</dc:type>
|
||||||
|
<dc:identifier>http://arxiv.org/abs/1110.6191</dc:identifier>
|
||||||
|
<dc:type>Article</dc:type>
|
||||||
|
<dc:identifier>http://dare.uva.nl/personal/pure/en/publications/a-study-of-the-material-in-the-atlas-inner-detector-using-secondary-hadronic-interactions(6b7667e2-04e2-4a66-92a8-ff4edbf61a17).html</dc:identifier>
|
||||||
|
<dc:type>Article</dc:type>
|
||||||
|
<dc:identifier>http://hdl.handle.net/1822/48768</dc:identifier>
|
||||||
|
<dc:type>Article</dc:type>
|
||||||
|
<dc:identifier>http://resolver.sub.uni-goettingen.de/purl?gs-1/12231</dc:identifier>
|
||||||
|
<dc:type>Article</dc:type>
|
||||||
|
<dc:identifier>http://bib-pubdb1.desy.de/search?p=id:%22PHPPUBDB-21212%22</dc:identifier>
|
||||||
|
<dc:identifier>http://bib-pubdb1.desy.de/record/96807/files/CERN-PH-EP-2011-147_1110.6191v2.pdf</dc:identifier>
|
||||||
|
<dc:identifier>http://bib-pubdb1.desy.de//record/96807/files/CERN-PH-EP-2011-147_1110.6191v2.pdf</dc:identifier>
|
||||||
|
<dc:identifier>http://bib-pubdb1.desy.de/record/96807</dc:identifier>
|
||||||
|
<dc:type>Article</dc:type>
|
||||||
|
<dc:identifier>http://arxiv.org/abs/1110.6191</dc:identifier>
|
||||||
|
<dc:type>Article</dc:type>
|
||||||
|
<dc:identifier>http://hdl.handle.net/11376/1605</dc:identifier>
|
||||||
|
<dc:identifier>http://dx.doi.org/10.1088/1748-0221/7/01/P01013</dc:identifier>
|
||||||
|
<dc:type>Article</dc:type>
|
||||||
|
<dc:identifier>http://hdl.handle.net/2066/93736</dc:identifier>
|
||||||
|
<dc:creator>ATLAS Collaboration</dc:creator>
|
||||||
|
<dr:CobjCategory type="publication">0001</dr:CobjCategory>
|
||||||
|
<oaf:refereed>0002</oaf:refereed>
|
||||||
|
<oaf:dateAccepted>2016-05-02</oaf:dateAccepted>
|
||||||
|
<oaf:accessrights>OPEN</oaf:accessrights>
|
||||||
|
<oaf:language>und</oaf:language>
|
||||||
|
<oaf:embargoenddate/>
|
||||||
|
<oaf:hostedBy id="infrastruct_::openaire" name="OpenAIRE"/>
|
||||||
|
<oaf:collectedFrom id="infrastruct_::openaire" name="OpenAIRE"/>
|
||||||
|
<oaf:journal eissn="" ep="" iss="1748-0221" issn="1748-0221" lissn="" sp="" vol=""/>
|
||||||
|
</oai:metadata>
|
||||||
|
<about>
|
||||||
|
<provenance xmlns="http://www.openarchives.org/OAI/2.0/provenance"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/provenance http://www.openarchives.org/OAI/2.0/provenance.xsd">
|
||||||
|
<originDescription harvestDate="2020-08-02T22:55:40.866Z" altered="true">
|
||||||
|
<baseURL>file%3A%2F%2F%2Fsrv%2Fclaims%2Frecords%2Fpublication%2Fopenaire</baseURL>
|
||||||
|
<identifier/>
|
||||||
|
<datestamp/>
|
||||||
|
<metadataNamespace/>
|
||||||
|
</originDescription>
|
||||||
|
</provenance>
|
||||||
|
<oaf:datainfo>
|
||||||
|
<oaf:inferred>false</oaf:inferred>
|
||||||
|
<oaf:deletedbyinference>false</oaf:deletedbyinference>
|
||||||
|
<oaf:trust>0.9</oaf:trust>
|
||||||
|
<oaf:inferenceprovenance/>
|
||||||
|
<oaf:provenanceaction schemename="dnet:provenanceActions"
|
||||||
|
schemeid="dnet:provenanceActions" classname="user:claim" classid="user:claim"/>
|
||||||
|
</oaf:datainfo>
|
||||||
|
</about>
|
||||||
|
</oai:record>
|
Loading…
Reference in New Issue