Merge remote-tracking branch 'origin/stable_ids' into stable_id_scholexplorer

This commit is contained in:
Sandro La Bruzzo 2021-06-24 17:20:50 +02:00
commit ad50415167
8 changed files with 137 additions and 85 deletions

View File

@ -224,24 +224,20 @@ public class GraphCleaningFunctions extends CleaningFunctions {
if (Objects.nonNull(r.getInstance())) { if (Objects.nonNull(r.getInstance())) {
for (Instance i : r.getInstance()) { for (Instance i : r.getInstance()) {
if (Objects.nonNull(i.getPid())) {
i.setPid(processPidCleaning(i.getPid()));
}
if (Objects.nonNull(i.getAlternateIdentifier())) {
i.setAlternateIdentifier(processPidCleaning(i.getAlternateIdentifier()));
}
Optional Optional
.ofNullable(i.getPid()) .ofNullable(i.getPid())
.ifPresent(pid -> { .ifPresent(pid -> {
final Set<StructuredProperty> pids = pid final Set<StructuredProperty> pids = Sets.newHashSet(pid);
.stream()
.filter(Objects::nonNull)
.filter(p -> StringUtils.isNotBlank(p.getValue()))
.collect(Collectors.toCollection(HashSet::new));
Optional Optional
.ofNullable(i.getAlternateIdentifier()) .ofNullable(i.getAlternateIdentifier())
.ifPresent(altId -> { .ifPresent(altId -> {
final Set<StructuredProperty> altIds = altId final Set<StructuredProperty> altIds = Sets.newHashSet(altId);
.stream()
.filter(Objects::nonNull)
.filter(p -> StringUtils.isNotBlank(p.getValue()))
.collect(Collectors.toCollection(HashSet::new));
i.setAlternateIdentifier(Lists.newArrayList(Sets.difference(altIds, pids))); i.setAlternateIdentifier(Lists.newArrayList(Sets.difference(altIds, pids)));
}); });
}); });

View File

@ -144,22 +144,7 @@ public class PrepareProgramme {
JavaRDD<CSVProgramme> h2020Programmes = programme JavaRDD<CSVProgramme> h2020Programmes = programme
.toJavaRDD() .toJavaRDD()
.mapToPair(csvProgramme -> new Tuple2<>(csvProgramme.getCode(), csvProgramme)) .mapToPair(csvProgramme -> new Tuple2<>(csvProgramme.getCode(), csvProgramme))
.reduceByKey((a, b) -> { .reduceByKey(PrepareProgramme::groupProgrammeByCode)
if (!a.getLanguage().equals("en")) {
if (b.getLanguage().equalsIgnoreCase("en")) {
a.setTitle(b.getTitle());
a.setLanguage(b.getLanguage());
}
}
if (StringUtils.isEmpty(a.getShortTitle())) {
if (!StringUtils.isEmpty(b.getShortTitle())) {
a.setShortTitle(b.getShortTitle());
}
}
return a;
})
.map(p -> { .map(p -> {
CSVProgramme csvProgramme = p._2(); CSVProgramme csvProgramme = p._2();
String programmeTitle = csvProgramme.getTitle().trim(); String programmeTitle = csvProgramme.getTitle().trim();
@ -176,20 +161,31 @@ public class PrepareProgramme {
return csvProgramme; return csvProgramme;
}); });
// prepareClassification(h2020Programmes); final JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
JavaRDD<CSVProgramme> rdd = jsc.parallelize(prepareClassification(h2020Programmes), 1); JavaRDD<CSVProgramme> rdd = jsc.parallelize(prepareClassification(h2020Programmes), 1);
rdd rdd
.map(csvProgramme -> { .map(OBJECT_MAPPER::writeValueAsString)
String tmp = OBJECT_MAPPER.writeValueAsString(csvProgramme);
return tmp;
})
.saveAsTextFile(outputPath); .saveAsTextFile(outputPath);
} }
private static CSVProgramme groupProgrammeByCode(CSVProgramme a, CSVProgramme b) {
if (!a.getLanguage().equals("en")) {
if (b.getLanguage().equalsIgnoreCase("en")) {
a.setTitle(b.getTitle());
a.setLanguage(b.getLanguage());
}
}
if (StringUtils.isEmpty(a.getShortTitle())) {
if (!StringUtils.isEmpty(b.getShortTitle())) {
a.setShortTitle(b.getShortTitle());
}
}
return a;
}
private static List<CSVProgramme> prepareClassification(JavaRDD<CSVProgramme> h2020Programmes) { private static List<CSVProgramme> prepareClassification(JavaRDD<CSVProgramme> h2020Programmes) {
Object[] codedescription = h2020Programmes Object[] codedescription = h2020Programmes
.map( .map(
@ -240,10 +236,10 @@ public class PrepareProgramme {
if (!ent.contains("Euratom")) { if (!ent.contains("Euratom")) {
String parent; String parent;
String tmp_key = tmp[0] + "."; String tmpKey = tmp[0] + ".";
for (int i = 1; i < tmp.length - 1; i++) { for (int i = 1; i < tmp.length - 1; i++) {
tmp_key += tmp[i] + "."; tmpKey += tmp[i] + ".";
parent = map.get(tmp_key)._1().toLowerCase().trim(); parent = map.get(tmpKey)._1().toLowerCase().trim();
if (parent.contains("|")) { if (parent.contains("|")) {
parent = parent.substring(parent.lastIndexOf("|") + 1).trim(); parent = parent.substring(parent.lastIndexOf("|") + 1).trim();
} }

View File

@ -27,6 +27,7 @@ import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import scala.Tuple2; import scala.Tuple2;
@ -37,6 +38,22 @@ public class GenerateEntitiesApplication {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
/**
* Operation mode
*/
enum Mode {
/**
* Groups all the objects by id to merge them
*/
claim,
/**
* Default mode
*/
graph
}
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
@ -67,13 +84,19 @@ public class GenerateEntitiesApplication {
.orElse(true); .orElse(true);
log.info("shouldHashId: {}", shouldHashId); log.info("shouldHashId: {}", shouldHashId);
final Mode mode = Optional
.ofNullable(parser.get("mode"))
.map(Mode::valueOf)
.orElse(Mode.graph);
log.info("mode: {}", mode);
final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl); final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl);
final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookupService); final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookupService);
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> { runWithSparkSession(conf, isSparkSessionManaged, spark -> {
HdfsSupport.remove(targetPath, spark.sparkContext().hadoopConfiguration()); HdfsSupport.remove(targetPath, spark.sparkContext().hadoopConfiguration());
generateEntities(spark, vocs, sourcePaths, targetPath, shouldHashId); generateEntities(spark, vocs, sourcePaths, targetPath, shouldHashId, mode);
}); });
} }
@ -82,7 +105,8 @@ public class GenerateEntitiesApplication {
final VocabularyGroup vocs, final VocabularyGroup vocs,
final String sourcePaths, final String sourcePaths,
final String targetPath, final String targetPath,
final boolean shouldHashId) { final boolean shouldHashId,
final Mode mode) {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
final List<String> existingSourcePaths = Arrays final List<String> existingSourcePaths = Arrays
@ -106,7 +130,23 @@ public class GenerateEntitiesApplication {
.flatMap(list -> list.iterator())); .flatMap(list -> list.iterator()));
} }
inputRdd switch (mode) {
case claim:
save(
inputRdd
.mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf))
.reduceByKey((o1, o2) -> OafMapperUtils.merge(o1, o2))
.map(Tuple2::_2),
targetPath);
break;
case graph:
save(inputRdd, targetPath);
break;
}
}
private static void save(final JavaRDD<Oaf> rdd, final String targetPath) {
rdd
.map( .map(
oaf -> oaf.getClass().getSimpleName().toLowerCase() oaf -> oaf.getClass().getSimpleName().toLowerCase()
+ "|" + "|"

View File

@ -13,7 +13,6 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Objects;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Predicate; import java.util.function.Predicate;
@ -36,7 +35,6 @@ import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Dataset; import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Field; import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Journal;
import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Organization; import eu.dnetlib.dhp.schema.oaf.Organization;
@ -54,6 +52,13 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
private static final Logger log = LoggerFactory.getLogger(MigrateDbEntitiesApplication.class); private static final Logger log = LoggerFactory.getLogger(MigrateDbEntitiesApplication.class);
private static final DataInfo DATA_INFO_CLAIM = dataInfo(
false, null, false, false,
qualifier(USER_CLAIM, USER_CLAIM, DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS), "0.9");
private static final List<KeyValue> COLLECTED_FROM_CLAIM = listKeyValues(
createOpenaireId(10, "infrastruct_::openaire", true), "OpenAIRE");
public static final String SOURCE_TYPE = "source_type"; public static final String SOURCE_TYPE = "source_type";
public static final String TARGET_TYPE = "target_type"; public static final String TARGET_TYPE = "target_type";
@ -443,25 +448,19 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
} }
public List<Oaf> processClaims(final ResultSet rs) { public List<Oaf> processClaims(final ResultSet rs) {
final DataInfo info = dataInfo(
false, null, false, false,
qualifier(USER_CLAIM, USER_CLAIM, DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS), "0.9");
final List<KeyValue> collectedFrom = listKeyValues(
createOpenaireId(10, "infrastruct_::openaire", true), "OpenAIRE");
try { try {
if (rs.getString(SOURCE_TYPE).equals("context")) { final String sourceType = rs.getString(SOURCE_TYPE);
final String targetType = rs.getString(TARGET_TYPE);
if (sourceType.equals("context")) {
final Result r; final Result r;
if (rs.getString(TARGET_TYPE).equals("dataset")) { if (targetType.equals("dataset")) {
r = new Dataset(); r = new Dataset();
r.setResulttype(DATASET_DEFAULT_RESULTTYPE); r.setResulttype(DATASET_DEFAULT_RESULTTYPE);
} else if (rs.getString(TARGET_TYPE).equals("software")) { } else if (targetType.equals("software")) {
r = new Software(); r = new Software();
r.setResulttype(SOFTWARE_DEFAULT_RESULTTYPE); r.setResulttype(SOFTWARE_DEFAULT_RESULTTYPE);
} else if (rs.getString(TARGET_TYPE).equals("other")) { } else if (targetType.equals("other")) {
r = new OtherResearchProduct(); r = new OtherResearchProduct();
r.setResulttype(ORP_DEFAULT_RESULTTYPE); r.setResulttype(ORP_DEFAULT_RESULTTYPE);
} else { } else {
@ -470,57 +469,72 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
} }
r.setId(createOpenaireId(50, rs.getString("target_id"), false)); r.setId(createOpenaireId(50, rs.getString("target_id"), false));
r.setLastupdatetimestamp(lastUpdateTimestamp); r.setLastupdatetimestamp(lastUpdateTimestamp);
r.setContext(prepareContext(rs.getString("source_id"), info)); r.setContext(prepareContext(rs.getString("source_id"), DATA_INFO_CLAIM));
r.setDataInfo(info); r.setDataInfo(DATA_INFO_CLAIM);
r.setCollectedfrom(collectedFrom); r.setCollectedfrom(COLLECTED_FROM_CLAIM);
return Arrays.asList(r); return Arrays.asList(r);
} else { } else {
final String validationDate = rs.getString("curation_date"); final String validationDate = rs.getString("curation_date");
final String sourceId = createOpenaireId(rs.getString(SOURCE_TYPE), rs.getString("source_id"), false); final String sourceId = createOpenaireId(sourceType, rs.getString("source_id"), false);
final String targetId = createOpenaireId(rs.getString(TARGET_TYPE), rs.getString("target_id"), false); final String targetId = createOpenaireId(targetType, rs.getString("target_id"), false);
final Relation r1 = new Relation(); final Relation r1 = new Relation();
final Relation r2 = new Relation(); final Relation r2 = new Relation();
r1.setValidated(true); if (StringUtils.isNotBlank(validationDate)) {
r1.setValidationDate(validationDate); r1.setValidated(true);
r1.setCollectedfrom(collectedFrom); r1.setValidationDate(validationDate);
r2.setValidated(true);
r2.setValidationDate(validationDate);
}
r1.setCollectedfrom(COLLECTED_FROM_CLAIM);
r1.setSource(sourceId); r1.setSource(sourceId);
r1.setTarget(targetId); r1.setTarget(targetId);
r1.setDataInfo(info); r1.setDataInfo(DATA_INFO_CLAIM);
r1.setLastupdatetimestamp(lastUpdateTimestamp); r1.setLastupdatetimestamp(lastUpdateTimestamp);
r2.setValidationDate(validationDate); r2.setCollectedfrom(COLLECTED_FROM_CLAIM);
r2.setValidated(true);
r2.setCollectedfrom(collectedFrom);
r2.setSource(targetId); r2.setSource(targetId);
r2.setTarget(sourceId); r2.setTarget(sourceId);
r2.setDataInfo(info); r2.setDataInfo(DATA_INFO_CLAIM);
r2.setLastupdatetimestamp(lastUpdateTimestamp); r2.setLastupdatetimestamp(lastUpdateTimestamp);
if (rs.getString(SOURCE_TYPE).equals("project")) { final String semantics = rs.getString("semantics");
r1.setRelType(RESULT_PROJECT);
r1.setSubRelType(OUTCOME);
r1.setRelClass(PRODUCES);
r2.setRelType(RESULT_PROJECT); switch (semantics) {
r2.setSubRelType(OUTCOME); case "resultResult_relationship_isRelatedTo":
r2.setRelClass(IS_PRODUCED_BY); r1.setRelType(RESULT_RESULT);
} else { r1.setSubRelType(RELATIONSHIP);
r1.setRelType(RESULT_RESULT); r1.setRelClass(IS_RELATED_TO);
r1.setSubRelType(RELATIONSHIP);
r1.setRelClass(IS_RELATED_TO);
r2.setRelType(RESULT_RESULT); r2.setRelType(RESULT_RESULT);
r2.setSubRelType(RELATIONSHIP); r2.setSubRelType(RELATIONSHIP);
r2.setRelClass(IS_RELATED_TO); r2.setRelClass(IS_RELATED_TO);
break;
case "resultProject_outcome_produces":
if (!"project".equals(sourceType)) {
throw new IllegalStateException(
String
.format(
"invalid claim, sourceId: %s, targetId: %s, semantics: %s",
sourceId, targetId, semantics));
}
r1.setRelType(RESULT_PROJECT);
r1.setSubRelType(OUTCOME);
r1.setRelClass(PRODUCES);
r2.setRelType(RESULT_PROJECT);
r2.setSubRelType(OUTCOME);
r2.setRelClass(IS_PRODUCED_BY);
break;
default:
throw new IllegalArgumentException("claim semantics not managed: " + semantics);
} }
return Arrays.asList(r1, r2); return Arrays.asList(r1, r2);
} }
} catch (final Exception e) { } catch (final Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }

View File

@ -28,5 +28,11 @@
"paramLongName": "shouldHashId", "paramLongName": "shouldHashId",
"paramDescription": "should ids be hashed?", "paramDescription": "should ids be hashed?",
"paramRequired": false "paramRequired": false
},
{
"paramName": "m",
"paramLongName": "mode",
"paramDescription": "operation mode",
"paramRequired": false
} }
] ]

View File

@ -460,6 +460,7 @@
<arg>--targetPath</arg><arg>${workingDir}/entities_claim</arg> <arg>--targetPath</arg><arg>${workingDir}/entities_claim</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--shouldHashId</arg><arg>${shouldHashId}</arg> <arg>--shouldHashId</arg><arg>${shouldHashId}</arg>
<arg>--mode</arg><arg>claim</arg>
</spark> </spark>
<ok to="GenerateGraph_claims"/> <ok to="GenerateGraph_claims"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -26,7 +26,6 @@ import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;

View File

@ -21,7 +21,7 @@
}, },
{ {
"field": "semantics", "field": "semantics",
"type": "not_used", "type": "string",
"value": "resultProject_outcome_produces" "value": "resultProject_outcome_produces"
} }
] ]