forked from D-Net/dnet-hadoop
[graph cleaning] applying coar based vocabularies in bulk
This commit is contained in:
parent
a24178cb93
commit
11a1207f9c
|
@ -21,10 +21,15 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.schema.common.EntityType;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.GraphCleaningFunctions;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
import scala.Tuple2;
|
||||
|
||||
/**
|
||||
|
@ -35,6 +40,12 @@ public class GroupEntitiesSparkJob {
|
|||
|
||||
private static final Encoder<OafEntity> OAFENTITY_KRYO_ENC = Encoders.kryo(OafEntity.class);
|
||||
|
||||
private ArgumentApplicationParser parser;
|
||||
|
||||
public GroupEntitiesSparkJob(ArgumentApplicationParser parser) {
|
||||
this.parser = parser;
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
String jsonConfiguration = IOUtils
|
||||
|
@ -51,6 +62,17 @@ public class GroupEntitiesSparkJob {
|
|||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String isLookupUrl = parser.get("isLookupUrl");
|
||||
log.info("isLookupUrl: {}", isLookupUrl);
|
||||
|
||||
final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
||||
|
||||
new GroupEntitiesSparkJob(parser).run(isSparkSessionManaged, isLookupService);
|
||||
}
|
||||
|
||||
public void run(Boolean isSparkSessionManaged, ISLookUpService isLookUpService)
|
||||
throws ISLookUpException {
|
||||
|
||||
String graphInputPath = parser.get("graphInputPath");
|
||||
log.info("graphInputPath: {}", graphInputPath);
|
||||
|
||||
|
@ -60,19 +82,21 @@ public class GroupEntitiesSparkJob {
|
|||
String outputPath = parser.get("outputPath");
|
||||
log.info("outputPath: {}", outputPath);
|
||||
|
||||
boolean filterInvisible = Boolean.valueOf(parser.get("filterInvisible"));
|
||||
boolean filterInvisible = Boolean.parseBoolean(parser.get("filterInvisible"));
|
||||
log.info("filterInvisible: {}", filterInvisible);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
||||
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
|
||||
|
||||
final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookUpService);
|
||||
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
HdfsSupport.remove(checkpointPath, spark.sparkContext().hadoopConfiguration());
|
||||
groupEntities(spark, graphInputPath, checkpointPath, outputPath, filterInvisible);
|
||||
groupEntities(spark, graphInputPath, checkpointPath, outputPath, filterInvisible, vocs);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -81,7 +105,7 @@ public class GroupEntitiesSparkJob {
|
|||
String inputPath,
|
||||
String checkpointPath,
|
||||
String outputPath,
|
||||
boolean filterInvisible) {
|
||||
boolean filterInvisible, VocabularyGroup vocs) {
|
||||
|
||||
Dataset<OafEntity> allEntities = spark.emptyDataset(OAFENTITY_KRYO_ENC);
|
||||
|
||||
|
@ -106,10 +130,14 @@ public class GroupEntitiesSparkJob {
|
|||
}
|
||||
|
||||
Dataset<?> groupedEntities = allEntities
|
||||
.groupByKey((MapFunction<OafEntity, String>) OafEntity::getId, Encoders.STRING())
|
||||
.reduceGroups((ReduceFunction<OafEntity>) (b, a) -> OafMapperUtils.mergeEntities(b, a))
|
||||
.map(
|
||||
(MapFunction<Tuple2<String, OafEntity>, Tuple2<String, OafEntity>>) t -> new Tuple2(
|
||||
(MapFunction<OafEntity, OafEntity>) entity -> GraphCleaningFunctions
|
||||
.applyCoarVocabularies(entity, vocs),
|
||||
OAFENTITY_KRYO_ENC)
|
||||
.groupByKey((MapFunction<OafEntity, String>) OafEntity::getId, Encoders.STRING())
|
||||
.reduceGroups((ReduceFunction<OafEntity>) OafMapperUtils::mergeEntities)
|
||||
.map(
|
||||
(MapFunction<Tuple2<String, OafEntity>, Tuple2<String, OafEntity>>) t -> new Tuple2<>(
|
||||
t._2().getClass().getName(), t._2()),
|
||||
Encoders.tuple(Encoders.STRING(), OAFENTITY_KRYO_ENC));
|
||||
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
|
||||
package eu.dnetlib.dhp.schema.oaf.utils;
|
||||
|
||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
|
||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.OPENAIRE_META_RESOURCE_TYPE;
|
||||
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.getProvenance;
|
||||
|
||||
import java.time.LocalDate;
|
||||
|
@ -784,4 +786,97 @@ public class GraphCleaningFunctions extends CleaningFunctions {
|
|||
return s;
|
||||
}
|
||||
|
||||
public static OafEntity applyCoarVocabularies(OafEntity entity, VocabularyGroup vocs) {
|
||||
|
||||
if (entity instanceof Result) {
|
||||
final Result result = (Result) entity;
|
||||
|
||||
Optional
|
||||
.ofNullable(result.getInstance())
|
||||
.ifPresent(
|
||||
instances -> instances
|
||||
.forEach(
|
||||
instance -> {
|
||||
if (Objects.isNull(instance.getInstanceTypeMapping())) {
|
||||
List<InstanceTypeMapping> mapping = Lists.newArrayList();
|
||||
mapping
|
||||
.add(
|
||||
OafMapperUtils
|
||||
.instanceTypeMapping(
|
||||
instance.getInstancetype().getClassname(),
|
||||
OPENAIRE_COAR_RESOURCE_TYPES_3_1));
|
||||
instance.setInstanceTypeMapping(mapping);
|
||||
}
|
||||
Optional<InstanceTypeMapping> optionalItm = instance
|
||||
.getInstanceTypeMapping()
|
||||
.stream()
|
||||
.filter(GraphCleaningFunctions::originalResourceType)
|
||||
.findFirst();
|
||||
if (optionalItm.isPresent()) {
|
||||
InstanceTypeMapping coarItm = optionalItm.get();
|
||||
Optional
|
||||
.ofNullable(
|
||||
vocs
|
||||
.lookupTermBySynonym(
|
||||
OPENAIRE_COAR_RESOURCE_TYPES_3_1, coarItm.getOriginalType()))
|
||||
.ifPresent(type -> {
|
||||
coarItm.setTypeCode(type.getClassid());
|
||||
coarItm.setTypeLabel(type.getClassname());
|
||||
});
|
||||
final List<InstanceTypeMapping> mappings = Lists.newArrayList();
|
||||
if (vocs.vocabularyExists(OPENAIRE_USER_RESOURCE_TYPES)) {
|
||||
Optional
|
||||
.ofNullable(
|
||||
vocs
|
||||
.lookupTermBySynonym(
|
||||
OPENAIRE_USER_RESOURCE_TYPES, coarItm.getTypeCode()))
|
||||
.ifPresent(
|
||||
type -> mappings
|
||||
.add(
|
||||
OafMapperUtils
|
||||
.instanceTypeMapping(coarItm.getTypeCode(), type)));
|
||||
}
|
||||
if (!mappings.isEmpty()) {
|
||||
instance.getInstanceTypeMapping().addAll(mappings);
|
||||
}
|
||||
}
|
||||
}));
|
||||
result.setMetaResourceType(getMetaResourceType(result.getInstance(), vocs));
|
||||
}
|
||||
|
||||
return entity;
|
||||
}
|
||||
|
||||
private static boolean originalResourceType(InstanceTypeMapping itm) {
|
||||
return StringUtils.isNotBlank(itm.getOriginalType()) &&
|
||||
OPENAIRE_COAR_RESOURCE_TYPES_3_1.equals(itm.getVocabularyName()) &&
|
||||
StringUtils.isBlank(itm.getTypeCode()) &&
|
||||
StringUtils.isBlank(itm.getTypeLabel());
|
||||
}
|
||||
|
||||
private static Qualifier getMetaResourceType(final List<Instance> instances, final VocabularyGroup vocs) {
|
||||
|
||||
if (vocs.vocabularyExists(OPENAIRE_META_RESOURCE_TYPE)) {
|
||||
Optional<InstanceTypeMapping> instanceTypeMapping = instances
|
||||
.stream()
|
||||
.flatMap(
|
||||
i -> Optional.ofNullable(i.getInstanceTypeMapping()).map(Collection::stream).orElse(Stream.empty()))
|
||||
.filter(t -> OPENAIRE_COAR_RESOURCE_TYPES_3_1.equals(t.getVocabularyName()))
|
||||
.findFirst();
|
||||
|
||||
if (!instanceTypeMapping.isPresent()) {
|
||||
return null;
|
||||
} else {
|
||||
final String typeCode = instanceTypeMapping.get().getTypeCode();
|
||||
return Optional
|
||||
.ofNullable(vocs.lookupTermBySynonym(OPENAIRE_META_RESOURCE_TYPE, typeCode))
|
||||
.orElseThrow(
|
||||
() -> new IllegalStateException("unable to find a synonym for '" + typeCode + "' in " +
|
||||
OPENAIRE_META_RESOURCE_TYPE));
|
||||
}
|
||||
} else {
|
||||
throw new IllegalStateException("vocabulary '" + OPENAIRE_META_RESOURCE_TYPE + "' not available");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -140,15 +140,28 @@ public class OafMapperUtils {
|
|||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public static InstanceTypeMapping instanceTypeMapping(String originalType, Qualifier term) {
|
||||
public static InstanceTypeMapping instanceTypeMapping(String originalType, String code, String label,
|
||||
String vocabularyName) {
|
||||
final InstanceTypeMapping m = new InstanceTypeMapping();
|
||||
m.setVocabularyName(term.getSchemeid());
|
||||
m.setVocabularyName(vocabularyName);
|
||||
m.setOriginalType(originalType);
|
||||
m.setTypeCode(term.getClassid());
|
||||
m.setTypeLabel(term.getClassname());
|
||||
m.setTypeCode(code);
|
||||
m.setTypeLabel(label);
|
||||
return m;
|
||||
}
|
||||
|
||||
public static InstanceTypeMapping instanceTypeMapping(String originalType, Qualifier term) {
|
||||
return instanceTypeMapping(originalType, term.getClassid(), term.getClassname(), term.getSchemeid());
|
||||
}
|
||||
|
||||
public static InstanceTypeMapping instanceTypeMapping(String originalType) {
|
||||
return instanceTypeMapping(originalType, null, null, null);
|
||||
}
|
||||
|
||||
public static InstanceTypeMapping instanceTypeMapping(String originalType, String vocabularyName) {
|
||||
return instanceTypeMapping(originalType, null, null, vocabularyName);
|
||||
}
|
||||
|
||||
public static Qualifier unknown(final String schemeid, final String schemename) {
|
||||
return qualifier(UNKNOWN, "Unknown", schemeid, schemename);
|
||||
}
|
||||
|
|
|
@ -28,5 +28,11 @@
|
|||
"paramLongName": "filterInvisible",
|
||||
"paramDescription": "if true filters out invisible entities",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "isu",
|
||||
"paramLongName": "isLookupUrl",
|
||||
"paramDescription": "url to the ISLookup Service",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -125,9 +125,7 @@ public abstract class AbstractMdRecordToOafMapper {
|
|||
|
||||
final String type = getResultType(doc, instances);
|
||||
|
||||
final Qualifier metaResourceType = getMetaResourceType(instances);
|
||||
|
||||
return createOafs(doc, type, metaResourceType, instances, collectedFrom, entityInfo, lastUpdateTimestamp);
|
||||
return createOafs(doc, type, instances, collectedFrom, entityInfo, lastUpdateTimestamp);
|
||||
} catch (DocumentException e) {
|
||||
log.error("Error with record:\n" + xml);
|
||||
return Lists.newArrayList();
|
||||
|
@ -153,30 +151,6 @@ public abstract class AbstractMdRecordToOafMapper {
|
|||
return type;
|
||||
}
|
||||
|
||||
protected Qualifier getMetaResourceType(final List<Instance> instances) {
|
||||
|
||||
if (vocs.vocabularyExists(OPENAIRE_META_RESOURCE_TYPE)) {
|
||||
Optional<InstanceTypeMapping> instanceTypeMapping = instances
|
||||
.stream()
|
||||
.flatMap(i -> i.getInstanceTypeMapping().stream())
|
||||
.filter(t -> OPENAIRE_COAR_RESOURCE_TYPES_3_1.equals(t.getVocabularyName()))
|
||||
.findFirst();
|
||||
|
||||
if (!instanceTypeMapping.isPresent()) {
|
||||
return null;
|
||||
} else {
|
||||
final String typeCode = instanceTypeMapping.get().getTypeCode();
|
||||
return Optional
|
||||
.ofNullable(vocs.lookupTermBySynonym(OPENAIRE_META_RESOURCE_TYPE, typeCode))
|
||||
.orElseThrow(
|
||||
() -> new IllegalStateException("unable to find a synonym for '" + typeCode + "' in " +
|
||||
OPENAIRE_META_RESOURCE_TYPE));
|
||||
}
|
||||
} else {
|
||||
throw new IllegalStateException("vocabulary '" + OPENAIRE_META_RESOURCE_TYPE + "' not available");
|
||||
}
|
||||
}
|
||||
|
||||
private KeyValue getProvenanceDatasource(final Document doc, final String xpathId, final String xpathName) {
|
||||
final String dsId = doc.valueOf(xpathId);
|
||||
final String dsName = doc.valueOf(xpathName);
|
||||
|
@ -191,14 +165,13 @@ public abstract class AbstractMdRecordToOafMapper {
|
|||
protected List<Oaf> createOafs(
|
||||
final Document doc,
|
||||
final String type,
|
||||
final Qualifier metaResourceType,
|
||||
final List<Instance> instances,
|
||||
final KeyValue collectedFrom,
|
||||
final DataInfo info,
|
||||
final long lastUpdateTimestamp) {
|
||||
|
||||
final OafEntity entity = createEntity(
|
||||
doc, type, metaResourceType, instances, collectedFrom, info, lastUpdateTimestamp);
|
||||
doc, type, instances, collectedFrom, info, lastUpdateTimestamp);
|
||||
|
||||
final Set<String> originalId = Sets.newHashSet(entity.getOriginalId());
|
||||
originalId.add(entity.getId());
|
||||
|
@ -231,7 +204,6 @@ public abstract class AbstractMdRecordToOafMapper {
|
|||
|
||||
private OafEntity createEntity(final Document doc,
|
||||
final String type,
|
||||
final Qualifier metaResourceType,
|
||||
final List<Instance> instances,
|
||||
final KeyValue collectedFrom,
|
||||
final DataInfo info,
|
||||
|
@ -239,12 +211,12 @@ public abstract class AbstractMdRecordToOafMapper {
|
|||
switch (type.toLowerCase()) {
|
||||
case "publication":
|
||||
final Publication p = new Publication();
|
||||
populateResultFields(p, metaResourceType, doc, instances, collectedFrom, info, lastUpdateTimestamp);
|
||||
populateResultFields(p, doc, instances, collectedFrom, info, lastUpdateTimestamp);
|
||||
p.setJournal(prepareJournal(doc, info));
|
||||
return p;
|
||||
case "dataset":
|
||||
final Dataset d = new Dataset();
|
||||
populateResultFields(d, metaResourceType, doc, instances, collectedFrom, info, lastUpdateTimestamp);
|
||||
populateResultFields(d, doc, instances, collectedFrom, info, lastUpdateTimestamp);
|
||||
d.setStoragedate(prepareDatasetStorageDate(doc, info));
|
||||
d.setDevice(prepareDatasetDevice(doc, info));
|
||||
d.setSize(prepareDatasetSize(doc, info));
|
||||
|
@ -255,7 +227,7 @@ public abstract class AbstractMdRecordToOafMapper {
|
|||
return d;
|
||||
case "software":
|
||||
final Software s = new Software();
|
||||
populateResultFields(s, metaResourceType, doc, instances, collectedFrom, info, lastUpdateTimestamp);
|
||||
populateResultFields(s, doc, instances, collectedFrom, info, lastUpdateTimestamp);
|
||||
s.setDocumentationUrl(prepareSoftwareDocumentationUrls(doc, info));
|
||||
s.setLicense(prepareSoftwareLicenses(doc, info));
|
||||
s.setCodeRepositoryUrl(prepareSoftwareCodeRepositoryUrl(doc, info));
|
||||
|
@ -265,7 +237,7 @@ public abstract class AbstractMdRecordToOafMapper {
|
|||
case "otherresearchproducts":
|
||||
default:
|
||||
final OtherResearchProduct o = new OtherResearchProduct();
|
||||
populateResultFields(o, metaResourceType, doc, instances, collectedFrom, info, lastUpdateTimestamp);
|
||||
populateResultFields(o, doc, instances, collectedFrom, info, lastUpdateTimestamp);
|
||||
o.setContactperson(prepareOtherResearchProductContactPersons(doc, info));
|
||||
o.setContactgroup(prepareOtherResearchProductContactGroups(doc, info));
|
||||
o.setTool(prepareOtherResearchProductTools(doc, info));
|
||||
|
@ -402,13 +374,11 @@ public abstract class AbstractMdRecordToOafMapper {
|
|||
|
||||
private void populateResultFields(
|
||||
final Result r,
|
||||
final Qualifier metaResourceType,
|
||||
final Document doc,
|
||||
final List<Instance> instances,
|
||||
final KeyValue collectedFrom,
|
||||
final DataInfo info,
|
||||
final long lastUpdateTimestamp) {
|
||||
r.setMetaResourceType(metaResourceType);
|
||||
r.setDataInfo(info);
|
||||
r.setLastupdatetimestamp(lastUpdateTimestamp);
|
||||
r.setId(createOpenaireId(50, doc.valueOf("//dri:objIdentifier"), false));
|
||||
|
@ -555,26 +525,7 @@ public abstract class AbstractMdRecordToOafMapper {
|
|||
.ofNullable(findOriginalType(doc))
|
||||
.map(originalType -> {
|
||||
final List<InstanceTypeMapping> mappings = Lists.newArrayList();
|
||||
|
||||
if (vocs.vocabularyExists(OPENAIRE_COAR_RESOURCE_TYPES_3_1)) {
|
||||
|
||||
// TODO verify what the vocabs return when a synonym is not defined
|
||||
Optional
|
||||
.ofNullable(vocs.lookupTermBySynonym(OPENAIRE_COAR_RESOURCE_TYPES_3_1, originalType))
|
||||
.ifPresent(coarTerm -> {
|
||||
mappings.add(OafMapperUtils.instanceTypeMapping(originalType, coarTerm));
|
||||
if (vocs.vocabularyExists(OPENAIRE_USER_RESOURCE_TYPES)) {
|
||||
|
||||
// TODO verify what the vocabs return when a synonym is not defined
|
||||
Optional
|
||||
.ofNullable(
|
||||
vocs.lookupTermBySynonym(OPENAIRE_USER_RESOURCE_TYPES, coarTerm.getClassid()))
|
||||
.ifPresent(
|
||||
type -> mappings.add(OafMapperUtils.instanceTypeMapping(originalType, type)));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
mappings.add(OafMapperUtils.instanceTypeMapping(originalType, OPENAIRE_COAR_RESOURCE_TYPES_3_1));
|
||||
return mappings;
|
||||
})
|
||||
.orElse(new ArrayList<>());
|
||||
|
|
|
@ -1,15 +1,23 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.group;
|
||||
|
||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.Mockito.lenient;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.cli.ParseException;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
|
@ -17,20 +25,36 @@ import org.apache.spark.sql.Dataset;
|
|||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.Instance;
|
||||
import eu.dnetlib.dhp.schema.oaf.InstanceTypeMapping;
|
||||
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
|
||||
public class GroupEntitiesSparkJobTest {
|
||||
|
||||
@Mock
|
||||
private ISLookUpService isLookUpService;
|
||||
|
||||
private VocabularyGroup vocabularies;
|
||||
|
||||
private static SparkSession spark;
|
||||
|
||||
private static ObjectMapper mapper = new ObjectMapper()
|
||||
|
@ -45,10 +69,10 @@ public class GroupEntitiesSparkJobTest {
|
|||
|
||||
@BeforeAll
|
||||
public static void beforeAll() throws IOException {
|
||||
workingDir = Files.createTempDirectory(GroupEntitiesSparkJob.class.getSimpleName());
|
||||
workingDir = Files.createTempDirectory(GroupEntitiesSparkJobTest.class.getSimpleName());
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.setAppName(GroupEntitiesSparkJob.class.getSimpleName());
|
||||
conf.setAppName(GroupEntitiesSparkJobTest.class.getSimpleName());
|
||||
conf.setMaster("local");
|
||||
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
||||
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
|
||||
|
@ -56,10 +80,17 @@ public class GroupEntitiesSparkJobTest {
|
|||
}
|
||||
|
||||
@BeforeEach
|
||||
public void beforeEach() throws IOException, URISyntaxException {
|
||||
public void beforeEach() throws IOException, URISyntaxException, ISLookUpException {
|
||||
dataInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI());
|
||||
checkpointPath = workingDir.resolve("grouped_entity");
|
||||
outputPath = workingDir.resolve("dispatched_entity");
|
||||
|
||||
lenient().when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARIES_XQUERY)).thenReturn(vocs());
|
||||
lenient()
|
||||
.when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARY_SYNONYMS_XQUERY))
|
||||
.thenReturn(synonyms());
|
||||
|
||||
vocabularies = VocabularyGroup.loadVocsFromIS(isLookUpService);
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
|
@ -71,18 +102,17 @@ public class GroupEntitiesSparkJobTest {
|
|||
@Test
|
||||
@Order(1)
|
||||
void testGroupEntities() throws Exception {
|
||||
GroupEntitiesSparkJob.main(new String[] {
|
||||
"-isSparkSessionManaged",
|
||||
Boolean.FALSE.toString(),
|
||||
"-graphInputPath",
|
||||
dataInputPath.toString(),
|
||||
"-checkpointPath",
|
||||
checkpointPath.toString(),
|
||||
"-outputPath",
|
||||
outputPath.toString(),
|
||||
"-filterInvisible",
|
||||
Boolean.FALSE.toString()
|
||||
});
|
||||
new GroupEntitiesSparkJob(
|
||||
args(
|
||||
"/eu/dnetlib/dhp/oa/merge/group_graph_entities_parameters.json",
|
||||
new String[] {
|
||||
"--isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"--graphInputPath", dataInputPath.toString(),
|
||||
"--checkpointPath", checkpointPath.toString(),
|
||||
"--outputPath", outputPath.toString(),
|
||||
"--filterInvisible", Boolean.FALSE.toString(),
|
||||
"--isLookupUrl", "lookupurl"
|
||||
})).run(false, isLookUpService);
|
||||
|
||||
Dataset<OafEntity> checkpointTable = spark
|
||||
.read()
|
||||
|
@ -109,6 +139,14 @@ public class GroupEntitiesSparkJobTest {
|
|||
.map((MapFunction<String, Result>) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class));
|
||||
|
||||
assertEquals(3, output.count());
|
||||
|
||||
List<String> resultTypes = output
|
||||
.map((MapFunction<Result, String>) value -> value.getResulttype().getClassid(), Encoders.STRING())
|
||||
.distinct()
|
||||
.collectAsList();
|
||||
|
||||
assertEquals(2, resultTypes.size());
|
||||
|
||||
assertEquals(
|
||||
2,
|
||||
output
|
||||
|
@ -121,5 +159,68 @@ public class GroupEntitiesSparkJobTest {
|
|||
.map((MapFunction<Result, String>) r -> r.getResulttype().getClassid(), Encoders.STRING())
|
||||
.filter((FilterFunction<String>) s -> s.equals("dataset"))
|
||||
.count());
|
||||
|
||||
Result result = output
|
||||
.filter("id = '50|doi_________::09821844208a5cd6300b2bfb13bca1b9'")
|
||||
.first();
|
||||
|
||||
result.getInstance().forEach(instance -> {
|
||||
Optional<InstanceTypeMapping> coarType = instance
|
||||
.getInstanceTypeMapping()
|
||||
.stream()
|
||||
.filter(itm -> OPENAIRE_COAR_RESOURCE_TYPES_3_1.equals(itm.getVocabularyName()))
|
||||
.filter(itm -> "journal-article".equals(itm.getOriginalType()))
|
||||
.findFirst();
|
||||
|
||||
assertTrue(coarType.isPresent());
|
||||
assertEquals("http://purl.org/coar/resource_type/c_2df8fbb1", coarType.get().getTypeCode());
|
||||
assertEquals("research article", coarType.get().getTypeLabel());
|
||||
});
|
||||
|
||||
final Dataset<Result> filtered = output.filter("id = '50|DansKnawCris::203a27996ddc0fd1948258e5b7dec61c'");
|
||||
assertEquals(1, filtered.count());
|
||||
result = filtered.first();
|
||||
|
||||
result
|
||||
.getInstance()
|
||||
.stream()
|
||||
.flatMap(instance -> instance.getInstanceTypeMapping().stream())
|
||||
.filter(itm -> OPENAIRE_COAR_RESOURCE_TYPES_3_1.equals(itm.getVocabularyName()))
|
||||
.filter(itm -> "Patent".equals(itm.getOriginalType()))
|
||||
.forEach(itm -> {
|
||||
assertEquals("http://purl.org/coar/resource_type/c_15cd", itm.getTypeCode());
|
||||
assertEquals("patent", itm.getTypeLabel());
|
||||
});
|
||||
}
|
||||
|
||||
private List<String> vocs() throws IOException {
|
||||
return IOUtils
|
||||
.readLines(
|
||||
Objects
|
||||
.requireNonNull(
|
||||
getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt")));
|
||||
}
|
||||
|
||||
private List<String> synonyms() throws IOException {
|
||||
return IOUtils
|
||||
.readLines(
|
||||
Objects
|
||||
.requireNonNull(
|
||||
getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/synonyms.txt")));
|
||||
}
|
||||
|
||||
private ArgumentApplicationParser args(String paramSpecs, String[] args) throws IOException, ParseException {
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(classPathResourceAsString(paramSpecs));
|
||||
parser.parseArgument(args);
|
||||
return parser;
|
||||
}
|
||||
|
||||
private static String classPathResourceAsString(String path) throws IOException {
|
||||
return IOUtils
|
||||
.toString(
|
||||
Objects
|
||||
.requireNonNull(
|
||||
GroupEntitiesSparkJobTest.class.getResourceAsStream(path)));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -92,13 +92,14 @@ class GenerateEntitiesApplicationTest {
|
|||
private List<String> vocs() throws IOException {
|
||||
return IOUtils
|
||||
.readLines(
|
||||
GraphCleaningFunctionsTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt"));
|
||||
GenerateEntitiesApplicationTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt"));
|
||||
}
|
||||
|
||||
private List<String> synonyms() throws IOException {
|
||||
return IOUtils
|
||||
.readLines(
|
||||
GraphCleaningFunctionsTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/synonyms.txt"));
|
||||
GenerateEntitiesApplicationTest.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/synonyms.txt"));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -122,7 +122,7 @@ class MappersTest {
|
|||
assertTrue(instance.getPid().isEmpty());
|
||||
|
||||
assertNotNull(instance.getInstanceTypeMapping());
|
||||
assertEquals(2, instance.getInstanceTypeMapping().size());
|
||||
assertEquals(1, instance.getInstanceTypeMapping().size());
|
||||
|
||||
Optional<InstanceTypeMapping> coarType = instance
|
||||
.getInstanceTypeMapping()
|
||||
|
@ -131,8 +131,8 @@ class MappersTest {
|
|||
.findFirst();
|
||||
|
||||
assertTrue(coarType.isPresent());
|
||||
assertEquals("http://purl.org/coar/resource_type/c_5794", coarType.get().getTypeCode());
|
||||
assertEquals("conference paper", coarType.get().getTypeLabel());
|
||||
assertNull(coarType.get().getTypeCode());
|
||||
assertNull(coarType.get().getTypeLabel());
|
||||
|
||||
Optional<InstanceTypeMapping> userType = instance
|
||||
.getInstanceTypeMapping()
|
||||
|
@ -140,9 +140,7 @@ class MappersTest {
|
|||
.filter(itm -> ModelConstants.OPENAIRE_USER_RESOURCE_TYPES.equals(itm.getVocabularyName()))
|
||||
.findFirst();
|
||||
|
||||
assertTrue(userType.isPresent());
|
||||
assertEquals("Article", userType.get().getTypeCode());
|
||||
assertEquals("Article", userType.get().getTypeLabel());
|
||||
assertFalse(userType.isPresent());
|
||||
|
||||
assertFalse(instance.getAlternateIdentifier().isEmpty());
|
||||
assertEquals("doi", instance.getAlternateIdentifier().get(0).getQualifier().getClassid());
|
||||
|
@ -710,14 +708,10 @@ class MappersTest {
|
|||
assertEquals("0001", p_cleaned.getInstance().get(0).getRefereed().getClassid());
|
||||
assertEquals("peerReviewed", p_cleaned.getInstance().get(0).getRefereed().getClassname());
|
||||
|
||||
assertNotNull(p_cleaned.getMetaResourceType());
|
||||
assertEquals("Research Literature", p_cleaned.getMetaResourceType().getClassid());
|
||||
assertEquals("Research Literature", p_cleaned.getMetaResourceType().getClassname());
|
||||
assertEquals(ModelConstants.OPENAIRE_META_RESOURCE_TYPE, p_cleaned.getMetaResourceType().getSchemeid());
|
||||
assertEquals(ModelConstants.OPENAIRE_META_RESOURCE_TYPE, p_cleaned.getMetaResourceType().getSchemename());
|
||||
assertNull(p_cleaned.getMetaResourceType());
|
||||
|
||||
assertNotNull(p_cleaned.getInstance().get(0).getInstanceTypeMapping());
|
||||
assertEquals(2, p_cleaned.getInstance().get(0).getInstanceTypeMapping().size());
|
||||
assertEquals(1, p_cleaned.getInstance().get(0).getInstanceTypeMapping().size());
|
||||
|
||||
assertTrue(
|
||||
p_cleaned
|
||||
|
@ -728,8 +722,7 @@ class MappersTest {
|
|||
.anyMatch(
|
||||
t -> "journal-article".equals(t.getOriginalType()) &&
|
||||
ModelConstants.OPENAIRE_COAR_RESOURCE_TYPES_3_1.equals(t.getVocabularyName()) &&
|
||||
"http://purl.org/coar/resource_type/c_2df8fbb1".equals(t.getTypeCode()) &&
|
||||
"research article".equals(t.getTypeLabel())));
|
||||
Objects.isNull(t.getTypeCode()) && Objects.isNull(t.getTypeLabel())));
|
||||
|
||||
assertTrue(
|
||||
p_cleaned
|
||||
|
@ -737,11 +730,8 @@ class MappersTest {
|
|||
.get(0)
|
||||
.getInstanceTypeMapping()
|
||||
.stream()
|
||||
.anyMatch(
|
||||
t -> "journal-article".equals(t.getOriginalType()) &&
|
||||
ModelConstants.OPENAIRE_USER_RESOURCE_TYPES.equals(t.getVocabularyName()) &&
|
||||
"Article".equals(t.getTypeCode()) &&
|
||||
"Article".equals(t.getTypeLabel())));
|
||||
.noneMatch(
|
||||
t -> ModelConstants.OPENAIRE_USER_RESOURCE_TYPES.equals(t.getVocabularyName())));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
2
pom.xml
2
pom.xml
|
@ -888,7 +888,7 @@
|
|||
<mockito-core.version>3.3.3</mockito-core.version>
|
||||
<mongodb.driver.version>3.4.2</mongodb.driver.version>
|
||||
<vtd.version>[2.12,3.0)</vtd.version>
|
||||
<dhp-schemas.version>[4.17.2-SNAPSHOT]</dhp-schemas.version>
|
||||
<dhp-schemas.version>[4.17.2]</dhp-schemas.version>
|
||||
<dnet-actionmanager-api.version>[4.0.3]</dnet-actionmanager-api.version>
|
||||
<dnet-actionmanager-common.version>[6.0.5]</dnet-actionmanager-common.version>
|
||||
<dnet-openaire-broker-common.version>[3.1.6]</dnet-openaire-broker-common.version>
|
||||
|
|
Loading…
Reference in New Issue