merge branch with master

This commit is contained in:
Miriam Baglioni 2020-08-04 10:14:07 +02:00
commit 5b651abf82
37 changed files with 1182 additions and 213 deletions

View File

@ -7,6 +7,7 @@ import eu.dnetlib.dhp.schema.oaf.Qualifier;
public class ModelConstants {
public static final String DNET_SUBJECT_TYPOLOGIES = "dnet:subject_classification_typologies";
public static final String DNET_RESULT_TYPOLOGIES = "dnet:result_typologies";
public static final String DNET_PUBLICATION_RESOURCE = "dnet:publication_resource";
public static final String DNET_ACCESS_MODES = "dnet:access_modes";

View File

@ -289,4 +289,12 @@ public class JsonPathTest {
System.out.println("d = " + d);
}
@Test
public void testNull() throws Exception {
final Object p = null;
System.out.println((String) p);
}
}

View File

@ -6,6 +6,7 @@ import java.util.Collection;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
@ -15,6 +16,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.scholexplorer.DLIDataset;
import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.util.MapDocumentUtil;
import scala.Tuple2;
@ -30,10 +33,16 @@ public class DedupRecordFactory {
final DedupConfig dedupConf) {
long ts = System.currentTimeMillis();
// <id, json_entity>
final JavaPairRDD<String, String> inputJsonEntities = sc
.textFile(entitiesInputPath)
final JavaPairRDD<String, String> inputJsonEntities = spark
.read()
.load(entitiesInputPath)
.as(Encoders.kryo(Oaf.class))
.map(
(MapFunction<Oaf, String>) p -> new org.codehaus.jackson.map.ObjectMapper().writeValueAsString(p),
Encoders.STRING())
.javaRDD()
.mapToPair(
(PairFunction<String, String, String>) it -> new Tuple2<String, String>(
(PairFunction<String, String, String>) it -> new Tuple2<>(
MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), it), it));
// <source, target>: source is the dedup_id, target is the id of the mergedIn
@ -74,9 +83,9 @@ public class DedupRecordFactory {
}
}
private static Publication publicationMerger(Tuple2<String, Iterable<String>> e, final long ts) {
private static DLIPublication publicationMerger(Tuple2<String, Iterable<String>> e, final long ts) {
Publication p = new Publication(); // the result of the merge, to be returned at the end
DLIPublication p = new DLIPublication(); // the result of the merge, to be returned at the end
p.setId(e._1());
@ -110,9 +119,9 @@ public class DedupRecordFactory {
return p;
}
private static Dataset datasetMerger(Tuple2<String, Iterable<String>> e, final long ts) {
private static DLIDataset datasetMerger(Tuple2<String, Iterable<String>> e, final long ts) {
Dataset d = new Dataset(); // the result of the merge, to be returned at the end
DLIDataset d = new DLIDataset(); // the result of the merge, to be returned at the end
d.setId(e._1());

View File

@ -9,18 +9,21 @@ import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.graphx.Edge;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.codehaus.jackson.map.ObjectMapper;
import com.google.common.hash.Hashing;
import eu.dnetlib.dedup.graph.ConnectedComponent;
import eu.dnetlib.dedup.graph.GraphProcessor;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.util.MapDocumentUtil;
@ -42,7 +45,6 @@ public class SparkCreateConnectedComponent {
.master(parser.get("master"))
.getOrCreate();
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
final String inputPath = parser.get("sourcePath");
final String entity = parser.get("entity");
final String targetPath = parser.get("targetPath");
@ -50,8 +52,12 @@ public class SparkCreateConnectedComponent {
// DedupConfig.load(IOUtils.toString(SparkCreateConnectedComponent.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf2.json")));
final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf"));
final JavaPairRDD<Object, String> vertexes = sc
.textFile(inputPath + "/" + entity)
final JavaPairRDD<Object, String> vertexes = spark
.read()
.load(inputPath + "/" + entity)
.as(Encoders.kryo(Oaf.class))
.map((MapFunction<Oaf, String>) p -> new ObjectMapper().writeValueAsString(p), Encoders.STRING())
.javaRDD()
.map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s))
.mapToPair(
(PairFunction<String, Object, String>) s -> new Tuple2<Object, String>(getHashcode(s), s));

View File

@ -4,10 +4,10 @@ package eu.dnetlib.dedup;
import org.apache.commons.io.IOUtils;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.pace.config.DedupConfig;
@ -41,12 +41,19 @@ public class SparkCreateDedupRecord {
DedupUtility.createEntityPath(sourcePath, entity),
OafEntityType.valueOf(entity),
dedupConf);
dedupRecord
.map(
r -> {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(r);
})
.saveAsTextFile(dedupPath + "/" + entity + "/dedup_records");
spark
.createDataset(dedupRecord.rdd(), Encoders.kryo(OafEntity.class))
.write()
.mode(SaveMode.Overwrite)
.save(dedupPath + "/" + entity + "/dedup_records");
//
//
// dedupRecord
// .map(
// r -> {
// ObjectMapper mapper = new ObjectMapper();
// return mapper.writeValueAsString(r);
// })
// .saveAsTextFile(dedupPath + "/" + entity + "/dedup_records");
}
}

View File

@ -7,10 +7,13 @@ import org.apache.commons.io.IOUtils;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.codehaus.jackson.map.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.model.MapDocument;
@ -46,8 +49,12 @@ public class SparkCreateSimRels {
// DedupConfig.load(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf"));
JavaPairRDD<String, MapDocument> mapDocument = sc
.textFile(inputPath + "/" + entity)
JavaPairRDD<String, MapDocument> mapDocument = spark
.read()
.load(inputPath + "/" + entity)
.as(Encoders.kryo(Oaf.class))
.map((MapFunction<Oaf, String>) p -> new ObjectMapper().writeValueAsString(p), Encoders.STRING())
.javaRDD()
.mapToPair(
s -> {
MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s);

View File

@ -14,16 +14,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.scholexplorer.DLIRelation;
import eu.dnetlib.dhp.utils.DHPUtils;
import scala.Tuple2;
public class SparkPropagateRelationsJob {
enum FieldType {
SOURCE, TARGET
}
static final String SOURCEJSONPATH = "$.source";
static final String TARGETJSONPATH = "$.target";
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -39,7 +34,6 @@ public class SparkPropagateRelationsJob {
.master(parser.get("master"))
.getOrCreate();
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
final String relationPath = parser.get("relationPath");
final String mergeRelPath = parser.get("mergeRelPath");
final String targetRelPath = parser.get("targetRelPath");
@ -50,63 +44,38 @@ public class SparkPropagateRelationsJob {
.as(Encoders.bean(Relation.class))
.where("relClass == 'merges'");
final Dataset<Relation> rels = spark.read().load(relationPath).as(Encoders.bean(Relation.class));
final Dataset<DLIRelation> rels = spark
.read()
.load(relationPath)
.as(Encoders.kryo(DLIRelation.class))
.map(
(MapFunction<DLIRelation, DLIRelation>) r -> r,
Encoders.bean(DLIRelation.class));
final Dataset<Relation> firstJoin = rels
final Dataset<DLIRelation> firstJoin = rels
.joinWith(merge, merge.col("target").equalTo(rels.col("source")), "left_outer")
.map(
(MapFunction<Tuple2<Relation, Relation>, Relation>) r -> {
(MapFunction<Tuple2<DLIRelation, Relation>, DLIRelation>) r -> {
final Relation mergeRelation = r._2();
final Relation relation = r._1();
final DLIRelation relation = r._1();
if (mergeRelation != null)
relation.setSource(mergeRelation.getSource());
return relation;
},
Encoders.bean(Relation.class));
Encoders.bean(DLIRelation.class));
final Dataset<Relation> secondJoin = firstJoin
final Dataset<DLIRelation> secondJoin = firstJoin
.joinWith(merge, merge.col("target").equalTo(firstJoin.col("target")), "left_outer")
.map(
(MapFunction<Tuple2<Relation, Relation>, Relation>) r -> {
(MapFunction<Tuple2<DLIRelation, Relation>, DLIRelation>) r -> {
final Relation mergeRelation = r._2();
final Relation relation = r._1();
final DLIRelation relation = r._1();
if (mergeRelation != null)
relation.setTarget(mergeRelation.getSource());
return relation;
},
Encoders.bean(Relation.class));
Encoders.kryo(DLIRelation.class));
secondJoin.write().mode(SaveMode.Overwrite).save(targetRelPath);
}
private static boolean containsDedup(final String json) {
final String source = DHPUtils.getJPathString(SOURCEJSONPATH, json);
final String target = DHPUtils.getJPathString(TARGETJSONPATH, json);
return source.toLowerCase().contains("dedup") || target.toLowerCase().contains("dedup");
}
private static String replaceField(final String json, final String id, final FieldType type) {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
try {
Relation relation = mapper.readValue(json, Relation.class);
if (relation.getDataInfo() == null)
relation.setDataInfo(new DataInfo());
relation.getDataInfo().setDeletedbyinference(false);
switch (type) {
case SOURCE:
relation.setSource(id);
return mapper.writeValueAsString(relation);
case TARGET:
relation.setTarget(id);
return mapper.writeValueAsString(relation);
default:
throw new IllegalArgumentException("");
}
} catch (IOException e) {
throw new RuntimeException("unable to deserialize json relation: " + json, e);
}
}
}

View File

@ -0,0 +1,75 @@
package eu.dnetlib.dedup.sx
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Oaf, OafEntity, Relation}
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation, DLIUnknown, OafUtils}
import org.apache.commons.io.IOUtils
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.LoggerFactory
import org.apache.spark.sql.functions.col
object SparkUpdateEntityWithDedupInfo {
def main(args: Array[String]): Unit = {
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkUpdateEntityWithDedupInfo.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json")))
val logger = LoggerFactory.getLogger(SparkUpdateEntityWithDedupInfo.getClass)
parser.parseArgument(args)
val workingPath: String = parser.get("workingPath")
logger.info(s"Working dir path = $workingPath")
implicit val oafEncoder: Encoder[OafEntity] = Encoders.kryo[OafEntity]
implicit val relEncoder: Encoder[Relation] = Encoders.bean(classOf[Relation])
implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication]
implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo[DLIDataset]
implicit val unkEncoder: Encoder[DLIUnknown] = Encoders.kryo[DLIUnknown]
implicit val dlirelEncoder: Encoder[DLIRelation] = Encoders.kryo[DLIRelation]
val spark: SparkSession = SparkSession
.builder()
.appName(SparkUpdateEntityWithDedupInfo.getClass.getSimpleName)
.master(parser.get("master"))
.getOrCreate()
val entityPath = parser.get("entityPath")
val mergeRelPath = parser.get("mergeRelPath")
val dedupRecordPath = parser.get("dedupRecordPath")
val entity = parser.get("entity")
val destination = parser.get("targetPath")
val mergedIds = spark.read.load(mergeRelPath).as[Relation]
.where("relClass == 'merges'")
.select(col("target"))
val entities: Dataset[(String, OafEntity)] = spark
.read
.load(entityPath).as[OafEntity]
.map(o => (o.getId, o))(Encoders.tuple(Encoders.STRING, oafEncoder))
val finalDataset:Dataset[OafEntity] = entities.joinWith(mergedIds, entities("_1").equalTo(mergedIds("target")), "left")
.map(k => {
val e: OafEntity = k._1._2
val t = k._2
if (t != null && t.getString(0).nonEmpty) {
if (e.getDataInfo == null) {
e.setDataInfo(OafUtils.generateDataInfo())
}
e.getDataInfo.setDeletedbyinference(true)
}
e
})
val dedupRecords :Dataset[OafEntity] = spark.read.load(dedupRecordPath).as[OafEntity]
finalDataset.union(dedupRecords)
.repartition(1200).write
.mode(SaveMode.Overwrite).save(destination)
}
}

View File

@ -144,7 +144,7 @@
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Update ${entity} and add DedupRecord</name>
<class>eu.dnetlib.dedup.sx.SparkUpdateEntityJob</class>
<class>eu.dnetlib.dedup.sx.SparkUpdateEntityWithDedupInfo</class>
<jar>dhp-dedup-scholexplorer-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}

View File

@ -19,7 +19,7 @@ public class QueryInformationSystem {
+ " let $communities := $x//CONFIGURATION/context/category[./@id=concat($x//CONFIGURATION/context/@id,'::zenodocommunities')]/concept "
+
"let $zenodo := $x//param[./@name='zenodoCommunity']/text() "
+ " where $x//CONFIGURATION/context[./@type='community' or ./@type='ri'] "
+ " where $x//CONFIGURATION/context[./@type='community' or ./@type='ri'] and $x//context/param[./@name = 'status']/text() != 'hidden' "
+ " return "
+ " <community> "
+ " { $x//CONFIGURATION/context/@id} "

View File

@ -20,8 +20,6 @@ import eu.dnetlib.dhp.schema.oaf.*;
/** Created by miriam on 02/08/2018. */
public class ResultTagger implements Serializable {
private String trust = "0.8";
private boolean clearContext(Result result) {
int tmp = result.getContext().size();
List<Context> clist = result
@ -171,21 +169,24 @@ public class ResultTagger implements Serializable {
getDataInfo(
BULKTAG_DATA_INFO_TYPE,
CLASS_ID_SUBJECT,
CLASS_NAME_BULKTAG_SUBJECT));
CLASS_NAME_BULKTAG_SUBJECT,
TAGGING_TRUST));
if (datasources.contains(c.getId()))
dataInfoList
.add(
getDataInfo(
BULKTAG_DATA_INFO_TYPE,
CLASS_ID_DATASOURCE,
CLASS_NAME_BULKTAG_DATASOURCE));
CLASS_NAME_BULKTAG_DATASOURCE,
TAGGING_TRUST));
if (czenodo.contains(c.getId()))
dataInfoList
.add(
getDataInfo(
BULKTAG_DATA_INFO_TYPE,
CLASS_ID_CZENODO,
CLASS_NAME_BULKTAG_ZENODO));
CLASS_NAME_BULKTAG_ZENODO,
TAGGING_TRUST));
}
return c;
})
@ -211,21 +212,24 @@ public class ResultTagger implements Serializable {
getDataInfo(
BULKTAG_DATA_INFO_TYPE,
CLASS_ID_SUBJECT,
CLASS_NAME_BULKTAG_SUBJECT));
CLASS_NAME_BULKTAG_SUBJECT,
TAGGING_TRUST));
if (datasources.contains(c))
dataInfoList
.add(
getDataInfo(
BULKTAG_DATA_INFO_TYPE,
CLASS_ID_DATASOURCE,
CLASS_NAME_BULKTAG_DATASOURCE));
CLASS_NAME_BULKTAG_DATASOURCE,
TAGGING_TRUST));
if (czenodo.contains(c))
dataInfoList
.add(
getDataInfo(
BULKTAG_DATA_INFO_TYPE,
CLASS_ID_CZENODO,
CLASS_NAME_BULKTAG_ZENODO));
CLASS_NAME_BULKTAG_ZENODO,
TAGGING_TRUST));
context.setDataInfo(dataInfoList);
return context;
})
@ -236,11 +240,12 @@ public class ResultTagger implements Serializable {
}
public static DataInfo getDataInfo(
String inference_provenance, String inference_class_id, String inference_class_name) {
String inference_provenance, String inference_class_id, String inference_class_name, String trust) {
DataInfo di = new DataInfo();
di.setInferred(true);
di.setInferenceprovenance(inference_provenance);
di.setProvenanceaction(getQualifier(inference_class_id, inference_class_name));
di.setTrust(trust);
return di;
}

View File

@ -14,4 +14,6 @@ public class TaggingConstants {
public static final String CLASS_NAME_BULKTAG_SUBJECT = "Bulktagging for Community - Subject";
public static final String CLASS_NAME_BULKTAG_DATASOURCE = "Bulktagging for Community - Datasource";
public static final String CLASS_NAME_BULKTAG_ZENODO = "Bulktagging for Community - Zenodo";
public static final String TAGGING_TRUST = "0.8";
}

View File

@ -90,6 +90,7 @@ public class CleanGraphSparkJob {
final CleaningRuleMap mapping = CleaningRuleMap.create(vocs);
readTableFromPath(spark, inputPath, clazz)
.map((MapFunction<T, T>) value -> fixVocabularyNames(value), Encoders.bean(clazz))
.map((MapFunction<T, T>) value -> OafCleaner.apply(value, mapping), Encoders.bean(clazz))
.map((MapFunction<T, T>) value -> fixDefaults(value), Encoders.bean(clazz))
.write()
@ -98,6 +99,65 @@ public class CleanGraphSparkJob {
.json(outputPath);
}
protected static <T extends Oaf> T fixVocabularyNames(T value) {
if (value instanceof Datasource) {
// nothing to clean here
} else if (value instanceof Project) {
// nothing to clean here
} else if (value instanceof Organization) {
Organization o = (Organization) value;
if (Objects.nonNull(o.getCountry())) {
fixVocabName(o.getCountry(), ModelConstants.DNET_COUNTRY_TYPE);
}
} else if (value instanceof Relation) {
// nothing to clean here
} else if (value instanceof Result) {
Result r = (Result) value;
fixVocabName(r.getLanguage(), ModelConstants.DNET_LANGUAGES);
fixVocabName(r.getResourcetype(), ModelConstants.DNET_DATA_CITE_RESOURCE);
fixVocabName(r.getBestaccessright(), ModelConstants.DNET_ACCESS_MODES);
if (Objects.nonNull(r.getSubject())) {
r.getSubject().forEach(s -> fixVocabName(s.getQualifier(), ModelConstants.DNET_SUBJECT_TYPOLOGIES));
}
if (Objects.nonNull(r.getInstance())) {
for (Instance i : r.getInstance()) {
fixVocabName(i.getAccessright(), ModelConstants.DNET_ACCESS_MODES);
fixVocabName(i.getRefereed(), ModelConstants.DNET_REVIEW_LEVELS);
}
}
if (Objects.nonNull(r.getAuthor())) {
r.getAuthor().forEach(a -> {
if (Objects.nonNull(a.getPid())) {
a.getPid().forEach(p -> {
fixVocabName(p.getQualifier(), ModelConstants.DNET_PID_TYPES);
});
}
});
}
if (value instanceof Publication) {
} else if (value instanceof eu.dnetlib.dhp.schema.oaf.Dataset) {
} else if (value instanceof OtherResearchProduct) {
} else if (value instanceof Software) {
}
}
return value;
}
private static void fixVocabName(Qualifier q, String vocabularyName) {
if (Objects.nonNull(q) && StringUtils.isBlank(q.getSchemeid())) {
q.setSchemeid(vocabularyName);
q.setSchemename(vocabularyName);
}
}
protected static <T extends Oaf> T fixDefaults(T value) {
if (value instanceof Datasource) {
// nothing to clean here
@ -113,6 +173,9 @@ public class CleanGraphSparkJob {
} else if (value instanceof Result) {
Result r = (Result) value;
if (Objects.nonNull(r.getPublisher()) && StringUtils.isBlank(r.getPublisher().getValue())) {
r.setPublisher(null);
}
if (Objects.isNull(r.getLanguage()) || StringUtils.isBlank(r.getLanguage().getClassid())) {
r
.setLanguage(

View File

@ -44,6 +44,7 @@ import java.util.Date;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
@ -53,6 +54,7 @@ import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.DbClient;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
import eu.dnetlib.dhp.oa.graph.raw.common.VerifyNsPrefixPredicate;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
@ -113,6 +115,11 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
final String hdfsPath = parser.get("hdfsPath");
log.info("hdfsPath: {}", hdfsPath);
final String nsPrefixBlacklist = parser.get("nsPrefixBlacklist");
log.info("nsPrefixBlacklist: {}", nsPrefixBlacklist);
final Predicate<Oaf> verifyNamespacePrefix = new VerifyNsPrefixPredicate(nsPrefixBlacklist);
final boolean processClaims = parser.get("action") != null && parser.get("action").equalsIgnoreCase("claims");
log.info("processClaims: {}", processClaims);
@ -123,23 +130,25 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
smdbe.execute("queryClaims.sql", smdbe::processClaims);
} else {
log.info("Processing datasources...");
smdbe.execute("queryDatasources.sql", smdbe::processDatasource);
smdbe.execute("queryDatasources.sql", smdbe::processDatasource, verifyNamespacePrefix);
log.info("Processing projects...");
if (dbSchema.equalsIgnoreCase("beta")) {
smdbe.execute("queryProjects.sql", smdbe::processProject);
smdbe.execute("queryProjects.sql", smdbe::processProject, verifyNamespacePrefix);
} else {
smdbe.execute("queryProjects_production.sql", smdbe::processProject);
smdbe.execute("queryProjects_production.sql", smdbe::processProject, verifyNamespacePrefix);
}
log.info("Processing orgs...");
smdbe.execute("queryOrganizations.sql", smdbe::processOrganization);
smdbe.execute("queryOrganizations.sql", smdbe::processOrganization, verifyNamespacePrefix);
log.info("Processing relationsNoRemoval ds <-> orgs ...");
smdbe.execute("queryDatasourceOrganization.sql", smdbe::processDatasourceOrganization);
smdbe
.execute(
"queryDatasourceOrganization.sql", smdbe::processDatasourceOrganization, verifyNamespacePrefix);
log.info("Processing projects <-> orgs ...");
smdbe.execute("queryProjectOrganization.sql", smdbe::processProjectOrganization);
smdbe.execute("queryProjectOrganization.sql", smdbe::processProjectOrganization, verifyNamespacePrefix);
}
log.info("All done.");
}
@ -163,10 +172,20 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
}
public void execute(final String sqlFile, final Function<ResultSet, List<Oaf>> producer)
throws Exception {
execute(sqlFile, producer, oaf -> true);
}
public void execute(final String sqlFile, final Function<ResultSet, List<Oaf>> producer,
final Predicate<Oaf> predicate)
throws Exception {
final String sql = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/sql/" + sqlFile));
final Consumer<ResultSet> consumer = rs -> producer.apply(rs).forEach(oaf -> emitOaf(oaf));
final Consumer<ResultSet> consumer = rs -> producer.apply(rs).forEach(oaf -> {
if (predicate.test(oaf)) {
emitOaf(oaf);
}
});
dbClient.processResults(sql, consumer);
}

View File

@ -0,0 +1,62 @@
package eu.dnetlib.dhp.oa.graph.raw.common;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import com.google.common.base.Splitter;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Relation;
/**
* This predicate should be used to skip oaf objects using a blacklist of nsprefixes.
*
* @author michele
*/
public class VerifyNsPrefixPredicate implements Predicate<Oaf> {
final Set<String> invalids = new HashSet<>();
public VerifyNsPrefixPredicate(final String blacklist) {
if (StringUtils.isNotBlank(blacklist)) {
Splitter
.on(",")
.trimResults()
.omitEmptyStrings()
.split(blacklist)
.forEach(invalids::add);
}
}
@Override
public boolean test(final Oaf oaf) {
if (oaf instanceof Datasource) {
return testValue(((Datasource) oaf).getNamespaceprefix().getValue());
} else if (oaf instanceof OafEntity) {
return testValue(((OafEntity) oaf).getId());
} else if (oaf instanceof Relation) {
return testValue(((Relation) oaf).getSource()) && testValue(((Relation) oaf).getTarget());
} else {
return true;
}
}
protected boolean testValue(final String s) {
if (StringUtils.isNotBlank(s)) {
for (final String invalid : invalids) {
if (Pattern.matches("^(\\d\\d\\|)?" + invalid + ".*$", s)) {
return false;
}
}
}
return true;
}
}

View File

@ -1,5 +1,6 @@
package eu.dnetlib.dhp.sx.ebi
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset}
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation, DLIUnknown}
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.Aggregator
@ -35,6 +36,88 @@ object EBIAggregator {
}
def getDLIUnknownAggregator(): Aggregator[(String, DLIUnknown), DLIUnknown, DLIUnknown] = new Aggregator[(String, DLIUnknown), DLIUnknown, DLIUnknown]{
override def zero: DLIUnknown = new DLIUnknown()
override def reduce(b: DLIUnknown, a: (String, DLIUnknown)): DLIUnknown = {
b.mergeFrom(a._2)
if (b.getId == null)
b.setId(a._2.getId)
b
}
override def merge(wx: DLIUnknown, wy: DLIUnknown): DLIUnknown = {
wx.mergeFrom(wy)
if(wx.getId == null && wy.getId.nonEmpty)
wx.setId(wy.getId)
wx
}
override def finish(reduction: DLIUnknown): DLIUnknown = reduction
override def bufferEncoder: Encoder[DLIUnknown] =
Encoders.kryo(classOf[DLIUnknown])
override def outputEncoder: Encoder[DLIUnknown] =
Encoders.kryo(classOf[DLIUnknown])
}
def getDLIDatasetAggregator(): Aggregator[(String, DLIDataset), DLIDataset, DLIDataset] = new Aggregator[(String, DLIDataset), DLIDataset, DLIDataset]{
override def zero: DLIDataset = new DLIDataset()
override def reduce(b: DLIDataset, a: (String, DLIDataset)): DLIDataset = {
b.mergeFrom(a._2)
if (b.getId == null)
b.setId(a._2.getId)
b
}
override def merge(wx: DLIDataset, wy: DLIDataset): DLIDataset = {
wx.mergeFrom(wy)
if(wx.getId == null && wy.getId.nonEmpty)
wx.setId(wy.getId)
wx
}
override def finish(reduction: DLIDataset): DLIDataset = reduction
override def bufferEncoder: Encoder[DLIDataset] =
Encoders.kryo(classOf[DLIDataset])
override def outputEncoder: Encoder[DLIDataset] =
Encoders.kryo(classOf[DLIDataset])
}
def getDLIPublicationAggregator(): Aggregator[(String, DLIPublication), DLIPublication, DLIPublication] = new Aggregator[(String, DLIPublication), DLIPublication, DLIPublication]{
override def zero: DLIPublication = new DLIPublication()
override def reduce(b: DLIPublication, a: (String, DLIPublication)): DLIPublication = {
b.mergeFrom(a._2)
if (b.getId == null)
b.setId(a._2.getId)
b
}
override def merge(wx: DLIPublication, wy: DLIPublication): DLIPublication = {
wx.mergeFrom(wy)
if(wx.getId == null && wy.getId.nonEmpty)
wx.setId(wy.getId)
wx
}
override def finish(reduction: DLIPublication): DLIPublication = reduction
override def bufferEncoder: Encoder[DLIPublication] =
Encoders.kryo(classOf[DLIPublication])
override def outputEncoder: Encoder[DLIPublication] =
Encoders.kryo(classOf[DLIPublication])
}
def getPublicationAggregator(): Aggregator[(String, Publication), Publication, Publication] = new Aggregator[(String, Publication), Publication, Publication]{
override def zero: Publication = new Publication()
@ -85,5 +168,27 @@ object EBIAggregator {
}
def getDLIRelationAggregator(): Aggregator[(String, DLIRelation), DLIRelation, DLIRelation] = new Aggregator[(String, DLIRelation), DLIRelation, DLIRelation]{
override def zero: DLIRelation = new DLIRelation()
override def reduce(b: DLIRelation, a: (String, DLIRelation)): DLIRelation = {
a._2
}
override def merge(a: DLIRelation, b: DLIRelation): DLIRelation = {
if(b!= null) b else a
}
override def finish(reduction: DLIRelation): DLIRelation = reduction
override def bufferEncoder: Encoder[DLIRelation] =
Encoders.kryo(classOf[DLIRelation])
override def outputEncoder: Encoder[DLIRelation] =
Encoders.kryo(classOf[DLIRelation])
}
}

View File

@ -1,8 +1,9 @@
package eu.dnetlib.dhp.sx.ebi
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Instance, KeyValue, Oaf}
import eu.dnetlib.dhp.schema.oaf.{Author, Instance, Journal, KeyValue, Oaf, Publication, Dataset => OafDataset}
import eu.dnetlib.dhp.schema.scholexplorer.OafUtils.createQualifier
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIRelation, OafUtils, ProvenaceInfo}
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation, OafUtils, ProvenaceInfo}
import eu.dnetlib.dhp.sx.ebi.model.{PMArticle, PMAuthor, PMJournal}
import eu.dnetlib.dhp.utils.DHPUtils
import eu.dnetlib.scholexplorer.relation.RelationMapper
import org.apache.commons.io.IOUtils
@ -12,6 +13,7 @@ import org.json4s
import org.json4s.DefaultFormats
import org.json4s.JsonAST.{JField, JObject, JString}
import org.json4s.jackson.JsonMethods.parse
import org.apache.spark.sql.functions._
import scala.collection.JavaConverters._
@ -28,6 +30,64 @@ case class EBILinks(relation:String, pubdate:String, tpid:String, tpidType:Strin
}
def journalToOAF(pj:PMJournal): Journal = {
val j = new Journal
j.setIssnPrinted(pj.getIssn)
j.setVol(pj.getVolume)
j.setName(pj.getTitle)
j.setIss(pj.getIssue)
j.setDataInfo(OafUtils.generateDataInfo())
j
}
def pubmedTOPublication(input:PMArticle):DLIPublication = {
val dnetPublicationId = s"50|${DHPUtils.md5(s"${input.getPmid}::pmid")}"
val p = new DLIPublication
p.setId(dnetPublicationId)
p.setDataInfo(OafUtils.generateDataInfo())
p.setPid(List(OafUtils.createSP(input.getPmid.toLowerCase.trim, "pmid", "dnet:pid_types")).asJava)
p.setCompletionStatus("complete")
val pi = new ProvenaceInfo
pi.setId("dli_________::europe_pmc__")
pi.setName( "Europe PMC")
pi.setCompletionStatus("complete")
pi.setCollectionMode("collected")
p.setDlicollectedfrom(List(pi).asJava)
p.setCollectedfrom(List(generatePubmedDLICollectedFrom()).asJava)
if (input.getAuthors != null && input.getAuthors.size() >0) {
var aths: List[Author] = List()
input.getAuthors.asScala.filter(a=> a!= null).foreach(a => {
val c = new Author
c.setFullname(a.getFullName)
c.setName(a.getForeName)
c.setSurname(a.getLastName)
aths = aths ::: List(c)
})
if (aths.nonEmpty)
p.setAuthor(aths.asJava)
}
if (input.getJournal != null)
p.setJournal(journalToOAF(input.getJournal))
p.setTitle(List(OafUtils.createSP(input.getTitle, "main title", "dnet:dataCite_title")).asJava)
p.setDateofacceptance(OafUtils.asField(input.getDate))
val i = new Instance
i.setCollectedfrom(generatePubmedDLICollectedFrom())
i.setDateofacceptance(p.getDateofacceptance)
i.setUrl(List(s"https://pubmed.ncbi.nlm.nih.gov/${input.getPmid}").asJava)
i.setInstancetype(createQualifier("0001", "Article", "dnet:publication_resource", "dnet:publication_resource"))
p.setInstance(List(i).asJava)
p
}
def ebiLinksToOaf(input:(String, String)):List[Oaf] = {
val pmid :String = input._1
val input_json :String = input._2
@ -116,8 +176,16 @@ case class EBILinks(relation:String, pubdate:String, tpid:String, tpidType:Strin
val workingPath = parser.get("workingPath")
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
implicit val oafpubEncoder: Encoder[Publication] = Encoders.kryo[Publication]
implicit val relEncoder: Encoder[DLIRelation] = Encoders.kryo(classOf[DLIRelation])
implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo(classOf[DLIDataset])
implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo(classOf[DLIPublication])
implicit val atEncoder: Encoder[Author] = Encoders.kryo(classOf[Author])
implicit val strEncoder:Encoder[String] = Encoders.STRING
implicit val PMEncoder: Encoder[PMArticle] = Encoders.kryo(classOf[PMArticle])
implicit val PMJEncoder: Encoder[PMJournal] = Encoders.kryo(classOf[PMJournal])
implicit val PMAEncoder: Encoder[PMAuthor] = Encoders.kryo(classOf[PMAuthor])
val ds:Dataset[(String,String)] = spark.read.load(s"$workingPath/baseline_links_updates").as[(String,String)](Encoders.tuple(Encoders.STRING, Encoders.STRING))
@ -133,6 +201,46 @@ case class EBILinks(relation:String, pubdate:String, tpid:String, tpidType:Strin
oDataset.filter(p =>p.isInstanceOf[DLIDataset]).map(p => p.asInstanceOf[DLIDataset]).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_links_updates_dataset")
val idPublicationSolved:Dataset[String] = spark.read.load(s"$workingPath/baseline_links_updates").where(col("links").isNotNull).select("pmid").as[String]
val baseline:Dataset[(String, PMArticle)]= spark.read.load(s"$workingPath/baseline_dataset").as[PMArticle].map(p=> (p.getPmid, p))(Encoders.tuple(strEncoder,PMEncoder))
idPublicationSolved.joinWith(baseline, idPublicationSolved("pmid").equalTo(baseline("_1"))).map(k => pubmedTOPublication(k._2._2)).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_links_updates_publication")
val pmaDatasets = spark.read.load("/user/sandro.labruzzo/scholix/EBI/ebi_garr/baseline_dataset").as[PMArticle]
pmaDatasets.map(p => pubmedTOPublication(p)).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_publication_all")
val pubs: Dataset[(String,Publication)] = spark.read.load("/user/sandro.labruzzo/scholix/EBI/publication").as[Publication].map(p => (p.getId, p))(Encoders.tuple(Encoders.STRING,oafpubEncoder))
val pubdate:Dataset[(String,DLIPublication)] = spark.read.load(s"$workingPath/baseline_publication_all").as[DLIPublication].map(p => (p.getId, p))(Encoders.tuple(Encoders.STRING,pubEncoder))
pubs.joinWith(pubdate, pubs("_1").equalTo(pubdate("_1"))).map(k => k._2._2).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_publication_ebi")
val dt : Dataset[DLIDataset] = spark.read.load(s"$workingPath/dataset").as[DLIDataset]
val update : Dataset[DLIDataset] = spark.read.load(s"$workingPath/ebi_garr/baseline_links_updates_dataset").as[DLIDataset]
dt.union(update).map(d => (d.getId,d))(Encoders.tuple(Encoders.STRING, datEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getDLIDatasetAggregator().toColumn)
.map(p => p._2)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_dataset_ebi")
val rel: Dataset[DLIRelation] = spark.read.load(s"$workingPath/relation").as[DLIRelation]
val relupdate : Dataset[DLIRelation] = spark.read.load(s"$workingPath/ebi_garr/baseline_links_updates_relation").as[DLIRelation]
rel.union(relupdate)
.map(d => (s"${d.getSource}::${d.getRelType}::${d.getTarget}", d))(Encoders.tuple(Encoders.STRING, relEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getDLIRelationAggregator().toColumn)
.map(p => p._2)
.write.mode(SaveMode.Overwrite)
.save(s"$workingPath/baseline_relation_ebi")
}
}

View File

@ -2,6 +2,7 @@ package eu.dnetlib.dhp.sx.ebi
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset}
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation}
import eu.dnetlib.dhp.sx.graph.parser.{DatasetScholexplorerParser, PublicationScholexplorerParser}
import eu.dnetlib.scholexplorer.relation.RelationMapper
import org.apache.commons.io.IOUtils
@ -10,6 +11,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig}
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
object SparkCreateEBIDataFrame {
@ -34,54 +36,51 @@ object SparkCreateEBIDataFrame {
val relationMapper = RelationMapper.load
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
implicit val datasetEncoder: Encoder[OafDataset] = Encoders.kryo(classOf[OafDataset])
implicit val pubEncoder: Encoder[Publication] = Encoders.kryo(classOf[Publication])
implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation])
implicit val datasetEncoder: Encoder[DLIDataset] = Encoders.kryo(classOf[DLIDataset])
implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo(classOf[DLIPublication])
implicit val relEncoder: Encoder[DLIRelation] = Encoders.kryo(classOf[DLIRelation])
logger.info("Extract Publication and relation from publication_xml")
val oafPubsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/publication_xml").map(s =>
{
new ObjectMapper().readValue(s, classOf[String])
}).flatMap(s => {
val d = new PublicationScholexplorerParser
d.parseObject(s, relationMapper).asScala.iterator})
// logger.info("Extract Publication and relation from publication_xml")
// val oafPubsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/publication_xml").map(s =>
// {
// new ObjectMapper().readValue(s, classOf[String])
// }).flatMap(s => {
// val d = new PublicationScholexplorerParser
// d.parseObject(s, relationMapper).asScala.iterator})
//
// val mapper = new ObjectMapper()
// mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT)
// spark.createDataset(oafPubsRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/oaf")
//
// logger.info("Extract Publication and relation from dataset_xml")
// val oafDatsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/dataset_xml").map(s =>
// {
// new ObjectMapper().readValue(s, classOf[String])
// }).flatMap(s => {
// val d = new DatasetScholexplorerParser
// d.parseObject(s, relationMapper).asScala.iterator})
val mapper = new ObjectMapper()
mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT)
spark.createDataset(oafPubsRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/oaf")
logger.info("Extract Publication and relation from dataset_xml")
val oafDatsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/dataset_xml").map(s =>
{
new ObjectMapper().readValue(s, classOf[String])
}).flatMap(s => {
val d = new DatasetScholexplorerParser
d.parseObject(s, relationMapper).asScala.iterator})
spark.createDataset(oafDatsRDD).write.mode(SaveMode.Append).save(s"$workingPath/oaf")
val dataset: Dataset[OafDataset] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[OafDataset]).map(d => d.asInstanceOf[OafDataset])
val publication: Dataset[Publication] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[Publication]).map(d => d.asInstanceOf[Publication])
val relations: Dataset[Relation] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[Relation]).map(d => d.asInstanceOf[Relation])
// spark.createDataset(oafDatsRDD).write.mode(SaveMode.Append).save(s"$workingPath/oaf")
val dataset: Dataset[DLIDataset] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[DLIDataset]).map(d => d.asInstanceOf[DLIDataset])
val publication: Dataset[DLIPublication] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[DLIPublication]).map(d => d.asInstanceOf[DLIPublication])
val relations: Dataset[DLIRelation] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[DLIRelation]).map(d => d.asInstanceOf[DLIRelation])
publication.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getPublicationAggregator().toColumn)
.agg(EBIAggregator.getDLIPublicationAggregator().toColumn)
.map(p => p._2)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/publication")
dataset.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datasetEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getDatasetAggregator().toColumn)
.agg(EBIAggregator.getDLIDatasetAggregator().toColumn)
.map(p => p._2)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/dataset")
relations.map(d => (s"${d.getSource}::${d.getRelType}::${d.getTarget}", d))(Encoders.tuple(Encoders.STRING, relEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getRelationAggregator().toColumn)
.agg(EBIAggregator.getDLIRelationAggregator().toColumn)
.map(p => p._2)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/relation")
relations.map(r => (r.getSource, r.getTarget))(Encoders.tuple(Encoders.STRING,Encoders.STRING))
}
}

View File

@ -25,7 +25,8 @@ public class PMAuthor implements Serializable {
}
public String getFullName() {
return String.format("%s, %s", this.foreName, this.lastName);
return String
.format("%s, %s", this.foreName != null ? this.foreName : "", this.lastName != null ? this.lastName : "");
}
}

View File

@ -0,0 +1,106 @@
package eu.dnetlib.dhp.sx.graph
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.Oaf
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation, DLIUnknown}
import eu.dnetlib.dhp.sx.ebi.EBIAggregator
import eu.dnetlib.dhp.sx.ebi.model.{PMArticle, PMAuthor, PMJournal}
import org.apache.commons.io.IOUtils
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.LoggerFactory
object SparkSplitOafTODLIEntities {
def getKeyRelation(rel:DLIRelation):String = {
s"${rel.getSource}::${rel.getRelType}::${rel.getTarget}"
}
def main(args: Array[String]): Unit = {
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkSplitOafTODLIEntities.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json")))
val logger = LoggerFactory.getLogger(SparkSplitOafTODLIEntities.getClass)
parser.parseArgument(args)
val workingPath: String = parser.get("workingPath")
logger.info(s"Working dir path = $workingPath")
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication]
implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo[DLIDataset]
implicit val unkEncoder: Encoder[DLIUnknown] = Encoders.kryo[DLIUnknown]
implicit val relEncoder: Encoder[DLIRelation] = Encoders.kryo[DLIRelation]
val spark:SparkSession = SparkSession
.builder()
.appName(SparkSplitOafTODLIEntities.getClass.getSimpleName)
.master(parser.get("master"))
.getOrCreate()
val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf]
val ebi_dataset:Dataset[DLIDataset] = spark.read.load(s"$workingPath/ebi/baseline_dataset_ebi").as[DLIDataset]
val ebi_publication:Dataset[DLIPublication] = spark.read.load(s"$workingPath/ebi/baseline_publication_ebi").as[DLIPublication]
val ebi_relation:Dataset[DLIRelation] = spark.read.load(s"$workingPath/ebi/baseline_relation_ebi").as[DLIRelation]
OAFDataset
.filter(s => s != null && s.isInstanceOf[DLIPublication])
.map(s =>s.asInstanceOf[DLIPublication])
.union(ebi_publication)
.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getDLIPublicationAggregator().toColumn)
.map(p => p._2)
.repartition(1000)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/publication")
OAFDataset
.filter(s => s != null && s.isInstanceOf[DLIDataset])
.map(s =>s.asInstanceOf[DLIDataset])
.union(ebi_dataset)
.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getDLIDatasetAggregator().toColumn)
.map(p => p._2)
.repartition(1000)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/dataset")
OAFDataset
.filter(s => s != null && s.isInstanceOf[DLIUnknown])
.map(s =>s.asInstanceOf[DLIUnknown])
.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, unkEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getDLIUnknownAggregator().toColumn)
.map(p => p._2)
.repartition(1000)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/unknown")
OAFDataset
.filter(s => s != null && s.isInstanceOf[DLIRelation])
.map(s =>s.asInstanceOf[DLIRelation])
.union(ebi_relation)
.map(d => (getKeyRelation(d), d))(Encoders.tuple(Encoders.STRING, relEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getDLIRelationAggregator().toColumn)
.map(p => p._2)
.repartition(1000)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/relation")
}
}

View File

@ -0,0 +1,73 @@
package eu.dnetlib.dhp.sx.graph
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.Oaf
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation}
import eu.dnetlib.dhp.sx.graph.parser.{DatasetScholexplorerParser, PublicationScholexplorerParser}
import eu.dnetlib.scholexplorer.relation.RelationMapper
import org.apache.commons.io.IOUtils
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._
/**
* This new version of the Job read a sequential File containing XML stored in the aggregator and generates a Dataset OAF of heterogeneous
* entities like Dataset, Relation, Publication and Unknown
*/
object SparkXMLToOAFDataset {
def main(args: Array[String]): Unit = {
val logger = LoggerFactory.getLogger(SparkXMLToOAFDataset.getClass)
val conf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkXMLToOAFDataset.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/input_graph_scholix_parameters.json")))
parser.parseArgument(args)
val spark =
SparkSession
.builder()
.config(conf)
.appName(SparkXMLToOAFDataset.getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
val sc = spark.sparkContext
implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val datasetEncoder:Encoder[DLIDataset] = Encoders.kryo[DLIDataset]
implicit val publicationEncoder:Encoder[DLIPublication] = Encoders.kryo[DLIPublication]
implicit val relationEncoder:Encoder[DLIRelation] = Encoders.kryo[DLIRelation]
val relationMapper = RelationMapper.load
val inputPath: String = parser.get("sourcePath")
val entity: String = parser.get("entity")
val targetPath = parser.get("targetPath")
logger.info(s"Input path is $inputPath")
logger.info(s"Entity path is $entity")
logger.info(s"Target Path is $targetPath")
val scholixRdd:RDD[Oaf] = sc.sequenceFile(inputPath, classOf[IntWritable], classOf[Text])
.map(s => s._2.toString)
.flatMap(s => {
entity match {
case "publication" =>
val p = new PublicationScholexplorerParser
val l =p.parseObject(s, relationMapper)
if (l != null) l.asScala else List()
case "dataset" =>
val d = new DatasetScholexplorerParser
val l =d.parseObject(s, relationMapper)
if (l != null) l.asScala else List()
}
}).filter(s => s!= null)
spark.createDataset(scholixRdd).write.mode(SaveMode.Append).save(targetPath)
}
}

View File

@ -317,6 +317,15 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
.collect(Collectors.toList()));
}
// TERRIBLE HACK TO AVOID EMPTY COLLECTED FROM
if (parsedObject.getDlicollectedfrom() == null) {
final KeyValue cf = new KeyValue();
cf.setKey("dli_________::europe_pmc__");
cf.setValue("Europe PMC");
parsedObject.setCollectedfrom(Collections.singletonList(cf));
}
if (StringUtils.isNotBlank(resolvedURL)) {
Instance i = new Instance();
i.setCollectedfrom(parsedObject.getCollectedfrom().get(0));

View File

@ -40,5 +40,11 @@
"paramLongName": "dbschema",
"paramDescription": "the database schema according to the D-Net infrastructure (beta or production)",
"paramRequired": true
},
{
"paramName": "nsbl",
"paramLongName": "nsPrefixBlacklist",
"paramDescription": "a blacklist of nsprefixes (comma separeted)",
"paramRequired": false
}
]

View File

@ -43,7 +43,11 @@
<name>isLookupUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>nsPrefixBlacklist</name>
<value></value>
<description>a blacklist of nsprefixes (comma separeted)</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
@ -131,6 +135,7 @@
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--action</arg><arg>claims</arg>
<arg>--dbschema</arg><arg>${dbSchema}</arg>
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
</java>
<ok to="ImportODF_claims"/>
<error to="Kill"/>
@ -182,6 +187,7 @@
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--dbschema</arg><arg>${dbSchema}</arg>
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
</java>
<ok to="ImportODF"/>
<error to="Kill"/>

View File

@ -38,7 +38,11 @@
<name>isLookupUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>nsPrefixBlacklist</name>
<value></value>
<description>a blacklist of nsprefixes (comma separeted)</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
@ -113,6 +117,7 @@
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--action</arg><arg>claims</arg>
<arg>--dbschema</arg><arg>${dbSchema}</arg>
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
</java>
<ok to="ImportODF_claims"/>
<error to="Kill"/>

View File

@ -25,7 +25,11 @@
<name>isLookupUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>nsPrefixBlacklist</name>
<value></value>
<description>a blacklist of nsprefixes (comma separeted)</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
@ -99,6 +103,7 @@
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--dbschema</arg><arg>${dbSchema}</arg>
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
</java>
<ok to="ImportDB_claims"/>
<error to="Kill"/>
@ -117,6 +122,7 @@
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--dbschema</arg><arg>${dbSchema}</arg>
<arg>--action</arg><arg>claims</arg>
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
</java>
<ok to="End"/>
<error to="Kill"/>

View File

@ -28,6 +28,11 @@
<name>isLookupUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>nsPrefixBlacklist</name>
<value></value>
<description>a blacklist of nsprefixes (comma separeted)</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
@ -67,6 +72,7 @@
<arg>-pguser</arg><arg>${postgresUser}</arg>
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
<arg>-islookup</arg><arg>${isLookupUrl}</arg>
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
</java>
<ok to="ImportODF"/>
<error to="Kill"/>

View File

@ -1,7 +1,7 @@
<configuration>
<!-- OCEAN -->
<!--
<property>
<name>jobTracker</name>
<value>yarnRM</value>
@ -18,26 +18,26 @@
<name>spark2YarnHistoryServerAddress</name>
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
</property>
-->
<!-- GARR -->
<property>
<name>jobTracker</name>
<value>yarn</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://hadoop-rm1.garr-pa1.d4science.org:8020</value>
</property>
<property>
<name>hive_metastore_uris</name>
<value>thrift://hadoop-edge3.garr-pa1.d4science.org:9083</value>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<value>http://hadoop-rm2.garr-pa1.d4science.org:19888</value>
</property>
<!-- <property>-->
<!-- <name>jobTracker</name>-->
<!-- <value>yarn</value>-->
<!-- </property>-->
<!-- <property>-->
<!-- <name>nameNode</name>-->
<!-- <value>hdfs://hadoop-rm1.garr-pa1.d4science.org:8020</value>-->
<!-- </property>-->
<!-- <property>-->
<!-- <name>hive_metastore_uris</name>-->
<!-- <value>thrift://hadoop-edge3.garr-pa1.d4science.org:9083</value>-->
<!-- </property>-->
<!-- <property>-->
<!-- <name>spark2YarnHistoryServerAddress</name>-->
<!-- <value>http://hadoop-rm2.garr-pa1.d4science.org:19888</value>-->
<!-- </property>-->
<property>

View File

@ -18,7 +18,7 @@
</property>
</parameters>
<start to="GenerateUpdates"/>
<start to="CreateEBIDataSet"/>
<kill name="Kill">
@ -48,6 +48,28 @@
<error to="Kill"/>
</action>
<action name="CreateEBIDataSet">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Create EBI DataSet</name>
<class>eu.dnetlib.dhp.sx.ebi.SparkCreateEBIDataFrame</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=1000
${sparkExtraOPT}
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--master</arg><arg>yarn</arg>
</spark>
<ok to="GenerateUpdates"/>
<error to="Kill"/>
</action>
<action name="GenerateUpdates">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
@ -71,27 +93,7 @@
</action>
<action name="CreateEBIDataSet">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Create EBI DataSet</name>
<class>eu.dnetlib.dhp.sx.ebi.SparkCreateEBIDataFrame</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=1000
${sparkExtraOPT}
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--master</arg><arg>yarn</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -1,7 +1,4 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true},
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the result data", "paramRequired": true},
{"paramName":"td", "paramLongName":"targetDir", "paramDescription": "the name of the result data", "paramRequired": true},
{"paramName":"e", "paramLongName":"entities", "paramDescription": "the entity type to be filtered", "paramRequired": true}
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the work dir path", "paramRequired": true}
]

View File

@ -101,12 +101,17 @@
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Import ${entity} and related entities</name>
<class>eu.dnetlib.dhp.sx.graph.SparkScholexplorerGraphImporter</class>
<class>eu.dnetlib.dhp.sx.graph.SparkXMLToOAFDataset</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT}</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
${sparkExtraOPT}
</spark-opts>
<arg>-mt</arg> <arg>yarn</arg>
<arg>--sourcePath</arg><arg>${targetXMLPath}</arg>
<arg>--targetPath</arg><arg>${targetEntityPath}</arg>
<arg>--targetPath</arg><arg>${workingPath}/input/OAFDataset</arg>
<arg>--entity</arg><arg>${entity}</arg>
</spark>
<ok to="End"/>

View File

@ -1,16 +1,8 @@
<workflow-app name="Create Raw Graph Step 2: Map XML to OAF Entities" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the source path</description>
</property>
<property>
<name>targetPath</name>
<description>the source path</description>
</property>
<property>
<name>targetDir</name>
<description>the name of the path</description>
<name>workingPath</name>
<description>the working path</description>
</property>
<property>
<name>sparkDriverMemory</name>
@ -20,32 +12,13 @@
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>entities</name>
<description>the entities to be extracted</description>
</property>
</parameters>
<start to="DeleteTargetPath"/>
<start to="ExtractDLIEntities"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="DeleteTargetPath">
<fs>
<mkdir path="${targetPath}"/>
<mkdir path="${targetPath}/dataset"/>
<mkdir path="${targetPath}/publication"/>
<mkdir path="${targetPath}/unknown"/>
<mkdir path="${targetPath}/relation"/>
<delete path='${targetPath}/dataset/${targetDir}'/>
<delete path='${targetPath}/publication/${targetDir}'/>
<delete path='${targetPath}/unknown/${targetDir}'/>
<delete path='${targetPath}/relation/${targetDir}'/>
</fs>
<ok to="ExtractDLIEntities"/>
<error to="Kill"/>
</action>
<action name="ExtractDLIEntities">
<spark xmlns="uri:oozie:spark-action:0.2">
@ -53,19 +26,18 @@
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Extract ${entities}</name>
<class>eu.dnetlib.dhp.sx.graph.SparkExtractEntitiesJob</class>
<name>Extract DLI Entities</name>
<class>eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
${sparkExtraOPT}
</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--targetPath</arg><arg>${targetPath}</arg>
<arg>--targetDir</arg><arg>${targetDir}</arg>
<arg>--entities</arg><arg>${entities}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>

View File

@ -7,6 +7,8 @@ import static org.mockito.Mockito.lenient;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils;
@ -19,9 +21,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ -62,10 +62,12 @@ public class CleaningFunctionTest {
assertTrue(p_in instanceof Result);
assertTrue(p_in instanceof Publication);
Publication p_out = OafCleaner.apply(p_in, mapping);
Publication p_out = OafCleaner.apply(CleanGraphSparkJob.fixVocabularyNames(p_in), mapping);
assertNotNull(p_out);
assertNotNull(p_out.getPublisher());
assertNull(p_out.getPublisher().getValue());
assertEquals("und", p_out.getLanguage().getClassid());
assertEquals("Undetermined", p_out.getLanguage().getClassname());
@ -88,6 +90,16 @@ public class CleaningFunctionTest {
Publication p_defaults = CleanGraphSparkJob.fixDefaults(p_out);
assertEquals("CLOSED", p_defaults.getBestaccessright().getClassid());
assertNull(p_out.getPublisher());
getAuthorPids(p_defaults).forEach(pid -> {
System.out
.println(
String
.format(
"%s [%s - %s]", pid.getValue(), pid.getQualifier().getClassid(),
pid.getQualifier().getClassname()));
});
// TODO add more assertions to verity the cleaned values
System.out.println(MAPPER.writeValueAsString(p_out));
@ -97,7 +109,7 @@ public class CleaningFunctionTest {
*/
}
private Stream<Qualifier> getAuthorPidTypes(Publication pub) {
private Stream<Qualifier> getAuthorPidTypes(Result pub) {
return pub
.getAuthor()
.stream()
@ -106,6 +118,14 @@ public class CleaningFunctionTest {
.map(s -> s.getQualifier());
}
private Stream<StructuredProperty> getAuthorPids(Result pub) {
return pub
.getAuthor()
.stream()
.map(a -> a.getPid())
.flatMap(p -> p.stream());
}
private List<String> vocs() throws IOException {
return IOUtils
.readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt"));

View File

@ -276,6 +276,17 @@ public class MappersTest {
System.out.println("***************");
}
@Test
void testClaimDedup() throws IOException {
final String xml = IOUtils.toString(getClass().getResourceAsStream("oaf_claim_dedup.xml"));
final List<Oaf> list = new OafToOafMapper(vocs, false).processMdRecord(xml);
System.out.println("***************");
System.out.println(new ObjectMapper().writeValueAsString(list));
System.out.println("***************");
}
private void assertValidId(final String id) {
assertEquals(49, id.length());
assertEquals('|', id.charAt(2));

View File

@ -0,0 +1,92 @@
package eu.dnetlib.dhp.oa.graph.raw.common;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Relation;
class VerifyNsPrefixPredicateTest {
private VerifyNsPrefixPredicate predicate;
@BeforeEach
void setUp() throws Exception {
predicate = new VerifyNsPrefixPredicate("corda,nsf,wt");
}
@Test
void testTestValue() {
assertFalse(predicate.testValue("corda__2020"));
assertFalse(predicate.testValue("nsf________"));
assertFalse(predicate.testValue("nsf"));
assertFalse(predicate.testValue("corda"));
assertFalse(predicate.testValue("10|corda_______::fjkdsfjksdhfksj"));
assertFalse(predicate.testValue("20|corda_______::fjkdsfjksdhfksj"));
assertTrue(predicate.testValue("xxxxxx_____"));
assertTrue(predicate.testValue("10|xxxxxx_____::sdasdasaddasad"));
assertTrue(predicate.testValue(null));
assertTrue(predicate.testValue(""));
}
@Test
void testTest_ds_true() {
final Field<String> prefix = new Field<>();
prefix.setValue("xxxxxx______");
final Datasource ds = new Datasource();
ds.setNamespaceprefix(prefix);
assertTrue(predicate.test(ds));
}
@Test
void testTest_ds_false() {
final Field<String> prefix = new Field<>();
prefix.setValue("corda__2020");
final Datasource ds = new Datasource();
ds.setNamespaceprefix(prefix);
assertFalse(predicate.test(ds));
}
@Test
void testTest_rel_true() {
final Relation rel = new Relation();
rel.setSource("10|yyyyyy______:sdfsfsffsdfs");
rel.setTarget("10|xxxxxx______:sdfsfsffsdfs");
assertTrue(predicate.test(rel));
}
@Test
void testTest_rel_false() {
final Relation rel = new Relation();
rel.setSource("10|corda_______:sdfsfsffsdfs");
rel.setTarget("10|xxxxxx______:sdfsfsffsdfs");
assertFalse(predicate.test(rel));
}
@Test
void testTest_proj_true() {
final Project p = new Project();
p.setId("10|xxxxxx______:sdfsfsffsdfs");
assertTrue(predicate.test(p));
}
@Test
void testTest_proj_false() {
final Project p = new Project();
p.setId("10|corda_____:sdfsfsffsdfs");
assertFalse(predicate.test(p));
}
}

View File

@ -59,6 +59,28 @@
"schemename": "dnet:pid_types"
},
"value": "qwerty"
},
{
"dataInfo": {
"deletedbyinference": false,
"inferenceprovenance": "",
"inferred": false,
"invisible": false,
"provenanceaction": {
"classid": "sysimport:crosswalk:datasetarchive",
"classname": "sysimport:crosswalk:datasetarchive",
"schemeid": "dnet:provenanceActions",
"schemename": "dnet:provenanceActions"
},
"trust": "0.9"
},
"qualifier": {
"classid": "ORCID",
"classname": "ORCID",
"schemeid": "",
"schemename": ""
},
"value": "asdasd"
}
],
"rank": 2,
@ -186,6 +208,9 @@
}
],
"bestaccessright": null,
"publisher": {
"value": null
},
"collectedfrom": [
{
"key": "10|CSC_________::a2b9ce8435390bcbfc05f3cae3948747",

View File

@ -0,0 +1,182 @@
<oai:record xmlns:oaf="http://namespace.openaire.eu/oaf" xmlns:dc="http://purl.org/dc/elements/1.1/"
xmlns:oai_dc="http://www.openarchives.org/OAI/2.0/oai_dc/"
xmlns:oai="http://www.openarchives.org/OAI/2.0/"
xmlns:dri="http://www.driver-repository.eu/namespace/dri"
xmlns:dr="http://www.driver-repository.eu/namespace/dr">
<oai:header>
<dri:objIdentifier>dedup_wf_001::534276867e917fe9efe0cca10e363457</dri:objIdentifier>
<dri:dateOfCollection>2020-08-02T22:55:40.866Z</dri:dateOfCollection>
<oaf:datasourceprefix>openaire____</oaf:datasourceprefix>
<dr:dateOfTransformation>2020-08-02T23:53:04.582Z</dr:dateOfTransformation>
</oai:header>
<oai:metadata>
<dc:contributor>ATLAS (IHEF, IoP, FNWI)</dc:contributor>
<dc:contributor>Doğuş Üniversitesi, Fen Edebiyat Fakültesi, Fizik Bölümü</dc:contributor>
<dc:contributor>TR3959</dc:contributor>
<dc:contributor>Doğuş Üniversitesi, Fen Edebiyat Fakültesi, Fizik Bölümü</dc:contributor>
<dc:contributor>TR3959</dc:contributor>
<dc:source>urn:issn:1748-0221</dc:source>
<dc:source>VOLUME=7;ISSUE=1;ISSN=1748-0221;TITLE=Journal of Instrumentation</dc:source>
<dc:source>ATLAS Collaboration Mitsou, Vasiliki Fiorini, Luca Ros Martínez, Eduardo Castillo
Giménez, María Victoria Fuster Verdú, Juan A. García García, Carmen Cabrera Urbán,
Susana Martí García, Salvador Salt Cairols, José Lacasta Llácer, Carlos Valls Ferrer,
Juan Antonio Higón Rodríguez, Emilio Ferrer Soria, Antonio González de la Hoz, Santiago
Kaci, Mohammed Hernández Jiménez, Yesenia Villaplana Pérez, Miguel 2012 A study of the
material in the ATLAS inner detector using secondary hadronic interactions Journal Of
Instrumentation 7 P01013 1 41</dc:source>
<dc:source>Journal of Instrumentation, 7(1)</dc:source>
<dc:source>Aad, G; Abbott, B; Abdallah, J; Abdelalim, AA; Abdesselam, A; Abdinov, O;  et
al.(2012). A study of the material in the ATLAS inner detector using secondary hadronic
interactions. Journal of Instrumentation, 7(1). doi: 10.1088/1748-0221/7/01/P01013. UC
Santa Cruz: Retrieved from: http://www.escholarship.org/uc/item/05j2j2br</dc:source>
<dc:source>Journal of Instrumentation, 7</dc:source>
<dc:source>VOLUME=7;ISSN=1748-0221;TITLE=Journal of Instrumentation</dc:source>
<dc:source>1748-0221</dc:source>
<dc:source>Journal of Instrumentation 7, P01013 (2012).
doi:10.1088/1748-0221/7/01/P01013</dc:source>
<dc:title>A measurement of the material in the ATLAS inner detector using secondary hadronic
interactions</dc:title>
<dc:language>und</dc:language>
<dc:subject classid="keyword" classname="keyword">Detector modelling and simulations I
(interaction of radiation with matter, interaction of photons with matter, interaction
of hadrons with matter, etc); Particle tracking detectors (Solid-state detectors); Si
microstrip and pad detectors; Large detector systems for particle and astroparticle
physics</dc:subject>
<dc:subject classid="keyword" classname="keyword">of photons with matter, interaction of
hadrons with matter, etc)</dc:subject>
<dc:subject classid="keyword" classname="keyword">Particle Physics - Experiment</dc:subject>
<dc:subject classid="keyword" classname="keyword">Detector Modelling and
Simulations</dc:subject>
<dc:subject classid="keyword" classname="keyword">Detector modelling and simulations I
(interaction of radiation with matter, interaction of photons with matter, interaction
of hadrons with matter, etc)</dc:subject>
<dc:subject classid="keyword" classname="keyword">Large detector systems for particle and
astroparticle physics</dc:subject>
<dc:subject classid="keyword" classname="keyword">Detector modelling and simulations I
(interaction of radiation with matter, interaction</dc:subject>
<dc:subject classid="keyword" classname="keyword">Large Detector Systems</dc:subject>
<dc:subject classid="keyword" classname="keyword">530</dc:subject>
<dc:subject classid="keyword" classname="keyword">Science &amp; Technology</dc:subject>
<dc:subject classid="keyword" classname="keyword">:Ciências Físicas [Ciências
Naturais]</dc:subject>
<dc:subject classid="keyword" classname="keyword">High Energy Physics -
Experiment</dc:subject>
<dc:subject classid="keyword" classname="keyword">Detectors de radiació</dc:subject>
<dc:subject classid="keyword" classname="keyword">Física nuclear</dc:subject>
<dc:subject classid="ddc" classname="ddc">ddc:610</dc:subject>
<dc:subject classid="keyword" classname="keyword">Si microstrip and pad
detectors</dc:subject>
<dc:subject classid="keyword" classname="keyword">Particle tracking detectors (Solid-state
detectors)</dc:subject>
<dc:subject classid="keyword" classname="keyword">Col·lisions (Física nuclear)</dc:subject>
<dc:subject classid="keyword" classname="keyword">Particle Tracking Detectors</dc:subject>
<dc:publisher>IOP Publishing</dc:publisher>
<dc:format>application/pdf</dc:format>
<dc:format>application/pdf</dc:format>
<dc:format>application/pdf</dc:format>
<dc:format>application/pdf</dc:format>
<dc:format>application/pdf</dc:format>
<dc:format>application/pdf</dc:format>
<dc:date>2016-05-02</dc:date>
<dc:description>The ATLAS inner detector is used to reconstruct secondary vertices due to
hadronic interactions of primary collision products, so probing the location and amount
of material in the inner region of ATLAS. Data collected in 7 TeV pp collisions at the
LHC, with a minimum bias trigger, are used for comparisons with simulated events. The
reconstructed secondary vertices have spatial resolutions ranging from ~ 200μm to 1 mm.
The overall material description in the simulation is validated to within an
experimental uncertainty of about 7%. This will lead to a better understanding of the
reconstruction of various objects such as tracks, leptons, jets, and missing transverse
momentum. We acknowledge the support of ANPCyT, Argentina; YerPhI, Armenia; ARC,
Australia; BMWF, Austria; ANAS, Azerbaijan; SSTC, Belarus; CNPq and FAPESP, Brazil;
NSERC, NRC and CFI, Canada; CERN; CONICYT, Chile; CAS, MOST and NSFC, China;
COLCIENCIAS, Colombia; MSMT CR, MPO CR and VSC CR, Czech Republic; DNRF, DNSRC and
Lundbeck Foundation, Denmark; ARTEMIS, European Union; IN2P3-CNRS, CEA-DSM/IRFU, France;
GNAS, Georgia; BMBF, DFG, HGF, MPG and AvH Foundation, Germany; GSRT, Greece; ISF,
MINERVA, GIF, DIP and Benoziyo Center, Israel; INFN, Italy; MEXT and JSPS, Japan; CNRST,
Morocco; FOM and NWO, Netherlands; RCN, Norway; MNiSW, Poland; GRICES and FCT, Portugal;
MERYS (MECTS), Romania; MES of Russia and ROSATOM, Russian Federation; JINR; MSTD,
Serbia; MSSR, Slovakia; ARRS and MVZT, Slovenia; DST/NRF, South Africa; MICINN, Spain;
SRC and Wallenberg Foundation, Sweden; SER, SNSF and Cantons of Bern and Geneva,
Switzerland; NSC, Taiwan; TAEK, Turkey; STFC, the Royal Society and Leverhulme Trust,
United Kingdom; DOE and NSF, United States of America.
info:eu-repo/semantics/publishedVersion</dc:description>
<dc:source>NARCIS</dc:source>
<dc:source>DSpace@Dogus</dc:source>
<dc:source>Lancaster EPrints</dc:source>
<dc:source>CERN Document Server</dc:source>
<dc:source>DESY Publication Database</dc:source>
<dc:source>OpenAIRE</dc:source>
<dc:source>Publikationenserver der Georg-August-Universität Göttingen</dc:source>
<dc:source>arXiv.org e-Print Archive</dc:source>
<dc:source>CORE (RIOXX-UK Aggregator)</dc:source>
<dc:source>eScholarship - University of California</dc:source>
<dc:source>Universidade do Minho: RepositoriUM</dc:source>
<dc:source>Dokuz Eylul University Open Archive System</dc:source>
<dc:source>Repositori d'Objectes Digitals per a l'Ensenyament la Recerca i la
Cultura</dc:source>
<dc:relation>info:eu-repo/semantics/altIdentifier/doi/10.1088/1748-0221/7/01/P01013</dc:relation>
<dc:relation>info:eu-repo/semantics/altIdentifier/doi/10.1088/1748-0221/7/01/P01013.</dc:relation>
<dc:type>Article</dc:type>
<dc:identifier>http://hdl.handle.net/11376/1605</dc:identifier>
<dc:type>Article</dc:type>
<dc:identifier>http://www.escholarship.org/uc/item/05j2j2br</dc:identifier>
<dc:type>Unknown</dc:type>
<dc:identifier>http://cds.cern.ch/record/1394292</dc:identifier>
<dc:type>Article</dc:type>
<dc:identifier>http://eprints.lancs.ac.uk/68235/</dc:identifier>
<dc:type>Article</dc:type>
<dc:identifier>http://hdl.handle.net/10550/36188</dc:identifier>
<dc:type>Article</dc:type>
<dc:identifier>http://eprints.gla.ac.uk/65933/1/65933.pdf</dc:identifier>
<dc:type>Preprint</dc:type>
<dc:identifier>http://arxiv.org/abs/1110.6191</dc:identifier>
<dc:type>Article</dc:type>
<dc:identifier>http://dare.uva.nl/personal/pure/en/publications/a-study-of-the-material-in-the-atlas-inner-detector-using-secondary-hadronic-interactions(6b7667e2-04e2-4a66-92a8-ff4edbf61a17).html</dc:identifier>
<dc:type>Article</dc:type>
<dc:identifier>http://hdl.handle.net/1822/48768</dc:identifier>
<dc:type>Article</dc:type>
<dc:identifier>http://resolver.sub.uni-goettingen.de/purl?gs-1/12231</dc:identifier>
<dc:type>Article</dc:type>
<dc:identifier>http://bib-pubdb1.desy.de/search?p=id:%22PHPPUBDB-21212%22</dc:identifier>
<dc:identifier>http://bib-pubdb1.desy.de/record/96807/files/CERN-PH-EP-2011-147_1110.6191v2.pdf</dc:identifier>
<dc:identifier>http://bib-pubdb1.desy.de//record/96807/files/CERN-PH-EP-2011-147_1110.6191v2.pdf</dc:identifier>
<dc:identifier>http://bib-pubdb1.desy.de/record/96807</dc:identifier>
<dc:type>Article</dc:type>
<dc:identifier>http://arxiv.org/abs/1110.6191</dc:identifier>
<dc:type>Article</dc:type>
<dc:identifier>http://hdl.handle.net/11376/1605</dc:identifier>
<dc:identifier>http://dx.doi.org/10.1088/1748-0221/7/01/P01013</dc:identifier>
<dc:type>Article</dc:type>
<dc:identifier>http://hdl.handle.net/2066/93736</dc:identifier>
<dc:creator>ATLAS Collaboration</dc:creator>
<dr:CobjCategory type="publication">0001</dr:CobjCategory>
<oaf:refereed>0002</oaf:refereed>
<oaf:dateAccepted>2016-05-02</oaf:dateAccepted>
<oaf:accessrights>OPEN</oaf:accessrights>
<oaf:language>und</oaf:language>
<oaf:embargoenddate/>
<oaf:hostedBy id="infrastruct_::openaire" name="OpenAIRE"/>
<oaf:collectedFrom id="infrastruct_::openaire" name="OpenAIRE"/>
<oaf:journal eissn="" ep="" iss="1748-0221" issn="1748-0221" lissn="" sp="" vol=""/>
</oai:metadata>
<about>
<provenance xmlns="http://www.openarchives.org/OAI/2.0/provenance"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/provenance http://www.openarchives.org/OAI/2.0/provenance.xsd">
<originDescription harvestDate="2020-08-02T22:55:40.866Z" altered="true">
<baseURL>file%3A%2F%2F%2Fsrv%2Fclaims%2Frecords%2Fpublication%2Fopenaire</baseURL>
<identifier/>
<datestamp/>
<metadataNamespace/>
</originDescription>
</provenance>
<oaf:datainfo>
<oaf:inferred>false</oaf:inferred>
<oaf:deletedbyinference>false</oaf:deletedbyinference>
<oaf:trust>0.9</oaf:trust>
<oaf:inferenceprovenance/>
<oaf:provenanceaction schemename="dnet:provenanceActions"
schemeid="dnet:provenanceActions" classname="user:claim" classid="user:claim"/>
</oaf:datainfo>
</about>
</oai:record>