1
0
Fork 0

Merge branch 'beta' into orcid_import

This commit is contained in:
Claudio Atzori 2023-12-01 12:28:14 +01:00
commit 622fafbd2e
125 changed files with 3644 additions and 761 deletions

View File

@ -135,6 +135,24 @@ public class VocabularyGroup implements Serializable {
return vocs.get(vocId.toLowerCase()).getSynonymAsQualifier(syn);
}
public Qualifier lookupTermBySynonym(final String vocId, final String syn) {
return find(vocId)
.map(
vocabulary -> Optional
.ofNullable(vocabulary.getTerm(syn))
.map(
term -> OafMapperUtils
.qualifier(term.getId(), term.getName(), vocabulary.getId(), vocabulary.getName()))
.orElse(
Optional
.ofNullable(vocabulary.getTermBySynonym(syn))
.map(
term -> OafMapperUtils
.qualifier(term.getId(), term.getName(), vocabulary.getId(), vocabulary.getName()))
.orElse(null)))
.orElse(null);
}
/**
* getSynonymAsQualifierCaseSensitive
*

View File

@ -21,10 +21,15 @@ import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.utils.GraphCleaningFunctions;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import scala.Tuple2;
/**
@ -35,6 +40,12 @@ public class GroupEntitiesSparkJob {
private static final Encoder<OafEntity> OAFENTITY_KRYO_ENC = Encoders.kryo(OafEntity.class);
private ArgumentApplicationParser parser;
public GroupEntitiesSparkJob(ArgumentApplicationParser parser) {
this.parser = parser;
}
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
@ -51,6 +62,17 @@ public class GroupEntitiesSparkJob {
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String isLookupUrl = parser.get("isLookupUrl");
log.info("isLookupUrl: {}", isLookupUrl);
final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl);
new GroupEntitiesSparkJob(parser).run(isSparkSessionManaged, isLookupService);
}
public void run(Boolean isSparkSessionManaged, ISLookUpService isLookUpService)
throws ISLookUpException {
String graphInputPath = parser.get("graphInputPath");
log.info("graphInputPath: {}", graphInputPath);
@ -60,19 +82,21 @@ public class GroupEntitiesSparkJob {
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
boolean filterInvisible = Boolean.valueOf(parser.get("filterInvisible"));
boolean filterInvisible = Boolean.parseBoolean(parser.get("filterInvisible"));
log.info("filterInvisible: {}", filterInvisible);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookUpService);
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
HdfsSupport.remove(checkpointPath, spark.sparkContext().hadoopConfiguration());
groupEntities(spark, graphInputPath, checkpointPath, outputPath, filterInvisible);
groupEntities(spark, graphInputPath, checkpointPath, outputPath, filterInvisible, vocs);
});
}
@ -81,7 +105,7 @@ public class GroupEntitiesSparkJob {
String inputPath,
String checkpointPath,
String outputPath,
boolean filterInvisible) {
boolean filterInvisible, VocabularyGroup vocs) {
Dataset<OafEntity> allEntities = spark.emptyDataset(OAFENTITY_KRYO_ENC);
@ -106,10 +130,14 @@ public class GroupEntitiesSparkJob {
}
Dataset<?> groupedEntities = allEntities
.groupByKey((MapFunction<OafEntity, String>) OafEntity::getId, Encoders.STRING())
.reduceGroups((ReduceFunction<OafEntity>) (b, a) -> OafMapperUtils.mergeEntities(b, a))
.map(
(MapFunction<Tuple2<String, OafEntity>, Tuple2<String, OafEntity>>) t -> new Tuple2(
(MapFunction<OafEntity, OafEntity>) entity -> GraphCleaningFunctions
.applyCoarVocabularies(entity, vocs),
OAFENTITY_KRYO_ENC)
.groupByKey((MapFunction<OafEntity, String>) OafEntity::getId, Encoders.STRING())
.reduceGroups((ReduceFunction<OafEntity>) OafMapperUtils::mergeEntities)
.map(
(MapFunction<Tuple2<String, OafEntity>, Tuple2<String, OafEntity>>) t -> new Tuple2<>(
t._2().getClass().getName(), t._2()),
Encoders.tuple(Encoders.STRING(), OAFENTITY_KRYO_ENC));

View File

@ -1,6 +1,8 @@
package eu.dnetlib.dhp.schema.oaf.utils;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import static eu.dnetlib.dhp.schema.common.ModelConstants.OPENAIRE_META_RESOURCE_TYPE;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.getProvenance;
import java.net.MalformedURLException;
@ -870,4 +872,97 @@ public class GraphCleaningFunctions extends CleaningFunctions {
return s;
}
public static OafEntity applyCoarVocabularies(OafEntity entity, VocabularyGroup vocs) {
if (entity instanceof Result) {
final Result result = (Result) entity;
Optional
.ofNullable(result.getInstance())
.ifPresent(
instances -> instances
.forEach(
instance -> {
if (Objects.isNull(instance.getInstanceTypeMapping())) {
List<InstanceTypeMapping> mapping = Lists.newArrayList();
mapping
.add(
OafMapperUtils
.instanceTypeMapping(
instance.getInstancetype().getClassname(),
OPENAIRE_COAR_RESOURCE_TYPES_3_1));
instance.setInstanceTypeMapping(mapping);
}
Optional<InstanceTypeMapping> optionalItm = instance
.getInstanceTypeMapping()
.stream()
.filter(GraphCleaningFunctions::originalResourceType)
.findFirst();
if (optionalItm.isPresent()) {
InstanceTypeMapping coarItm = optionalItm.get();
Optional
.ofNullable(
vocs
.lookupTermBySynonym(
OPENAIRE_COAR_RESOURCE_TYPES_3_1, coarItm.getOriginalType()))
.ifPresent(type -> {
coarItm.setTypeCode(type.getClassid());
coarItm.setTypeLabel(type.getClassname());
});
final List<InstanceTypeMapping> mappings = Lists.newArrayList();
if (vocs.vocabularyExists(OPENAIRE_USER_RESOURCE_TYPES)) {
Optional
.ofNullable(
vocs
.lookupTermBySynonym(
OPENAIRE_USER_RESOURCE_TYPES, coarItm.getTypeCode()))
.ifPresent(
type -> mappings
.add(
OafMapperUtils
.instanceTypeMapping(coarItm.getTypeCode(), type)));
}
if (!mappings.isEmpty()) {
instance.getInstanceTypeMapping().addAll(mappings);
}
}
}));
result.setMetaResourceType(getMetaResourceType(result.getInstance(), vocs));
}
return entity;
}
private static boolean originalResourceType(InstanceTypeMapping itm) {
return StringUtils.isNotBlank(itm.getOriginalType()) &&
OPENAIRE_COAR_RESOURCE_TYPES_3_1.equals(itm.getVocabularyName()) &&
StringUtils.isBlank(itm.getTypeCode()) &&
StringUtils.isBlank(itm.getTypeLabel());
}
private static Qualifier getMetaResourceType(final List<Instance> instances, final VocabularyGroup vocs) {
if (vocs.vocabularyExists(OPENAIRE_META_RESOURCE_TYPE)) {
Optional<InstanceTypeMapping> instanceTypeMapping = instances
.stream()
.flatMap(
i -> Optional.ofNullable(i.getInstanceTypeMapping()).map(Collection::stream).orElse(Stream.empty()))
.filter(t -> OPENAIRE_COAR_RESOURCE_TYPES_3_1.equals(t.getVocabularyName()))
.findFirst();
if (!instanceTypeMapping.isPresent()) {
return null;
} else {
final String typeCode = instanceTypeMapping.get().getTypeCode();
return Optional
.ofNullable(vocs.lookupTermBySynonym(OPENAIRE_META_RESOURCE_TYPE, typeCode))
.orElseThrow(
() -> new IllegalStateException("unable to find a synonym for '" + typeCode + "' in " +
OPENAIRE_META_RESOURCE_TYPE));
}
} else {
throw new IllegalStateException("vocabulary '" + OPENAIRE_META_RESOURCE_TYPE + "' not available");
}
}
}

View File

@ -14,7 +14,6 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.dhp.schema.common.AccessRightComparator;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
@ -141,6 +140,28 @@ public class OafMapperUtils {
.collect(Collectors.toList());
}
public static InstanceTypeMapping instanceTypeMapping(String originalType, String code, String label,
String vocabularyName) {
final InstanceTypeMapping m = new InstanceTypeMapping();
m.setVocabularyName(vocabularyName);
m.setOriginalType(originalType);
m.setTypeCode(code);
m.setTypeLabel(label);
return m;
}
public static InstanceTypeMapping instanceTypeMapping(String originalType, Qualifier term) {
return instanceTypeMapping(originalType, term.getClassid(), term.getClassname(), term.getSchemeid());
}
public static InstanceTypeMapping instanceTypeMapping(String originalType) {
return instanceTypeMapping(originalType, null, null, null);
}
public static InstanceTypeMapping instanceTypeMapping(String originalType, String vocabularyName) {
return instanceTypeMapping(originalType, null, null, vocabularyName);
}
public static Qualifier unknown(final String schemeid, final String schemename) {
return qualifier(UNKNOWN, "Unknown", schemeid, schemename);
}

View File

@ -28,5 +28,11 @@
"paramLongName": "filterInvisible",
"paramDescription": "if true filters out invisible entities",
"paramRequired": true
},
{
"paramName": "isu",
"paramLongName": "isLookupUrl",
"paramDescription": "url to the ISLookup Service",
"paramRequired": true
}
]

View File

@ -166,7 +166,7 @@ object DataciteToOAFTransformation {
resourceTypeGeneral: String,
schemaOrg: String,
vocabularies: VocabularyGroup
): (Qualifier, Qualifier) = {
): (Qualifier, Qualifier, String) = {
if (resourceType != null && resourceType.nonEmpty) {
val typeQualifier =
vocabularies.getSynonymAsQualifier(ModelConstants.DNET_PUBLICATION_RESOURCE, resourceType)
@ -176,7 +176,7 @@ object DataciteToOAFTransformation {
vocabularies.getSynonymAsQualifier(
ModelConstants.DNET_RESULT_TYPOLOGIES,
typeQualifier.getClassid
)
), resourceType
)
}
if (schemaOrg != null && schemaOrg.nonEmpty) {
@ -188,7 +188,7 @@ object DataciteToOAFTransformation {
vocabularies.getSynonymAsQualifier(
ModelConstants.DNET_RESULT_TYPOLOGIES,
typeQualifier.getClassid
)
), schemaOrg
)
}
@ -203,7 +203,7 @@ object DataciteToOAFTransformation {
vocabularies.getSynonymAsQualifier(
ModelConstants.DNET_RESULT_TYPOLOGIES,
typeQualifier.getClassid
)
), resourceTypeGeneral
)
}
@ -216,12 +216,19 @@ object DataciteToOAFTransformation {
schemaOrg: String,
vocabularies: VocabularyGroup
): Result = {
val typeQualifiers: (Qualifier, Qualifier) =
val typeQualifiers: (Qualifier, Qualifier, String) =
getTypeQualifier(resourceType, resourceTypeGeneral, schemaOrg, vocabularies)
if (typeQualifiers == null)
return null
val i = new Instance
i.setInstancetype(typeQualifiers._1)
// ADD ORIGINAL TYPE
val itm = new InstanceTypeMapping
itm.setOriginalType(typeQualifiers._3)
itm.setVocabularyName(ModelConstants.OPENAIRE_COAR_RESOURCE_TYPES_3_1)
i.setInstanceTypeMapping(List(itm).asJava)
typeQualifiers._2.getClassname match {
case "dataset" =>
val r = new OafDataset

View File

@ -176,7 +176,7 @@ object BioDBToOAF {
i.setUrl(List(s"${resolvedURL(input.pidType)}${input.pid}").asJava)
}
if (input.pidType.equalsIgnoreCase("clinicaltrials.gov"))
if (input.pidType.equalsIgnoreCase("clinicaltrials.gov")) {
i.setInstancetype(
OafMapperUtils.qualifier(
"0037",
@ -185,7 +185,11 @@ object BioDBToOAF {
ModelConstants.DNET_PUBLICATION_RESOURCE
)
)
else
val itm = new InstanceTypeMapping
itm.setOriginalType(input.pidType)
itm.setVocabularyName(ModelConstants.OPENAIRE_COAR_RESOURCE_TYPES_3_1)
i.setInstanceTypeMapping(List(itm).asJava)
} else {
i.setInstancetype(
OafMapperUtils.qualifier(
"0046",
@ -194,6 +198,11 @@ object BioDBToOAF {
ModelConstants.DNET_PUBLICATION_RESOURCE
)
)
val itm = new InstanceTypeMapping
itm.setOriginalType("Bioentity")
itm.setVocabularyName(ModelConstants.OPENAIRE_COAR_RESOURCE_TYPES_3_1)
i.setInstanceTypeMapping(List(itm).asJava)
}
if (input.datasource == null || input.datasource.isEmpty)
return null
@ -265,6 +274,10 @@ object BioDBToOAF {
ModelConstants.DNET_PUBLICATION_RESOURCE
)
)
val itm = new InstanceTypeMapping
itm.setOriginalType("Bioentity")
itm.setVocabularyName(ModelConstants.OPENAIRE_COAR_RESOURCE_TYPES_3_1)
i.setInstanceTypeMapping(List(itm).asJava)
i.setCollectedfrom(collectedFromMap("uniprot"))
d.setInstance(List(i).asJava)
@ -471,6 +484,10 @@ object BioDBToOAF {
ModelConstants.DNET_PUBLICATION_RESOURCE
)
)
val itm = new InstanceTypeMapping
itm.setOriginalType("Bioentity")
itm.setVocabularyName(ModelConstants.OPENAIRE_COAR_RESOURCE_TYPES_3_1)
i.setInstanceTypeMapping(List(itm).asJava)
i.setCollectedfrom(collectedFromMap("pdb"))
d.setInstance(List(i).asJava)
@ -571,6 +588,11 @@ object BioDBToOAF {
ModelConstants.DNET_PUBLICATION_RESOURCE
)
)
val itm = new InstanceTypeMapping
itm.setOriginalType("Bioentity")
itm.setVocabularyName(ModelConstants.OPENAIRE_COAR_RESOURCE_TYPES_3_1)
i.setInstanceTypeMapping(List(itm).asJava)
i.setCollectedfrom(collectedFromMap("ebi"))
d.setInstance(List(i).asJava)

View File

@ -188,12 +188,24 @@ object PubMedToOaf {
val cojbCategory =
getVocabularyTerm(ModelConstants.DNET_PUBLICATION_RESOURCE, vocabularies, ja.get.getValue)
pubmedInstance.setInstancetype(cojbCategory)
// ADD ORIGINAL TYPE to the publication
val itm = new InstanceTypeMapping
itm.setOriginalType(ja.get.getValue)
itm.setVocabularyName(ModelConstants.OPENAIRE_COAR_RESOURCE_TYPES_3_1)
pubmedInstance.setInstanceTypeMapping(List(itm).asJava)
} else {
val i_type = article.getPublicationTypes.asScala
.map(s => getVocabularyTerm(ModelConstants.DNET_PUBLICATION_RESOURCE, vocabularies, s.getValue))
.find(q => q != null)
if (i_type.isDefined)
pubmedInstance.setInstancetype(i_type.get)
.map(s => (s.getValue,getVocabularyTerm(ModelConstants.DNET_PUBLICATION_RESOURCE, vocabularies, s.getValue)))
.find(q => q._2 != null)
if (i_type.isDefined) {
pubmedInstance.setInstancetype(i_type.get._2)
// ADD ORIGINAL TYPE to the publication
val itm = new InstanceTypeMapping
itm.setOriginalType(i_type.get._1)
itm.setVocabularyName(ModelConstants.OPENAIRE_COAR_RESOURCE_TYPES_3_1)
pubmedInstance.setInstanceTypeMapping(List(itm).asJava)
}
else
return null
}

View File

@ -107,7 +107,7 @@ case object Crossref2Oaf {
.map(f => f.id)
}
def mappingResult(result: Result, json: JValue, cobjCategory: String): Result = {
def mappingResult(result: Result, json: JValue, cobjCategory: String, originalType:String): Result = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
//MAPPING Crossref DOI into PID
@ -283,6 +283,11 @@ case object Crossref2Oaf {
ModelConstants.DNET_PUBLICATION_RESOURCE
)
)
//ADD ORIGINAL TYPE to the mapping
val itm = new InstanceTypeMapping
itm.setOriginalType(originalType)
itm.setVocabularyName(ModelConstants.OPENAIRE_COAR_RESOURCE_TYPES_3_1)
instance.setInstanceTypeMapping(List(itm).asJava)
result.setResourcetype(
OafMapperUtils.qualifier(
cobjCategory.substring(0, 4),
@ -367,7 +372,7 @@ case object Crossref2Oaf {
objectType,
mappingCrossrefSubType.getOrElse(objectSubType, "0038 Other literature type")
)
mappingResult(result, json, cOBJCategory)
mappingResult(result, json, cOBJCategory, originalType)
if (result == null || result.getId == null)
return List()

View File

@ -71,6 +71,9 @@ public class PropagationConstant {
public static final String PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID = "result:community:organization";
public static final String PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME = " Propagation of result belonging to community through organization";
public static final String PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_ID = "result:community:project";
public static final String PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_NAME = " Propagation of result belonging to community through project";
public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID = "authorpid:result";
public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME = "Propagation of authors pid to result through semantic relations";

View File

@ -0,0 +1,80 @@
package eu.dnetlib.dhp.api;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import org.jetbrains.annotations.NotNull;
/**
* @author miriam.baglioni
* @Date 06/10/23
*/
public class QueryCommunityAPI {
private static String get(String geturl) throws IOException {
URL url = new URL(geturl);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setDoOutput(true);
conn.setRequestMethod("GET");
int responseCode = conn.getResponseCode();
String body = getBody(conn);
conn.disconnect();
if (responseCode != HttpURLConnection.HTTP_OK)
throw new IOException("Unexpected code " + responseCode + body);
return body;
}
public static String communities(String baseURL) throws IOException {
return get(baseURL + "communities");
}
public static String community(String id, String baseURL) throws IOException {
return get(baseURL + id);
}
public static String communityDatasource(String id, String baseURL) throws IOException {
return get(baseURL + id + "/contentproviders");
}
public static String communityPropagationOrganization(String id, String baseURL) throws IOException {
return get(baseURL + id + "/propagationOrganizations");
}
public static String communityProjects(String id, String page, String size, String baseURL) throws IOException {
return get(baseURL + id + "/projects/" + page + "/" + size);
}
@NotNull
private static String getBody(HttpURLConnection conn) throws IOException {
String body = "{}";
try (BufferedReader br = new BufferedReader(
new InputStreamReader(conn.getInputStream(), "utf-8"))) {
StringBuilder response = new StringBuilder();
String responseLine = null;
while ((responseLine = br.readLine()) != null) {
response.append(responseLine.trim());
}
body = response.toString();
}
return body;
}
}

View File

@ -0,0 +1,170 @@
package eu.dnetlib.dhp.api;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.management.Query;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.util.StringUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.api.model.*;
import eu.dnetlib.dhp.bulktag.community.Community;
import eu.dnetlib.dhp.bulktag.community.CommunityConfiguration;
import eu.dnetlib.dhp.bulktag.community.Provider;
import eu.dnetlib.dhp.bulktag.criteria.VerbResolver;
import eu.dnetlib.dhp.bulktag.criteria.VerbResolverFactory;
import eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob;
/**
* @author miriam.baglioni
* @Date 09/10/23
*/
public class Utils implements Serializable {
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final VerbResolver resolver = VerbResolverFactory.newInstance();
private static final Logger log = LoggerFactory.getLogger(Utils.class);
public static CommunityConfiguration getCommunityConfiguration(String baseURL) throws IOException {
final Map<String, Community> communities = Maps.newHashMap();
List<Community> validCommunities = new ArrayList<>();
getValidCommunities(baseURL)
.forEach(community -> {
try {
CommunityModel cm = MAPPER
.readValue(QueryCommunityAPI.community(community.getId(), baseURL), CommunityModel.class);
validCommunities.add(getCommunity(cm));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
validCommunities.forEach(community -> {
try {
DatasourceList dl = MAPPER
.readValue(
QueryCommunityAPI.communityDatasource(community.getId(), baseURL), DatasourceList.class);
community.setProviders(dl.stream().map(d -> {
if (d.getEnabled() == null || Boolean.FALSE.equals(d.getEnabled()))
return null;
Provider p = new Provider();
p.setOpenaireId("10|" + d.getOpenaireId());
p.setSelectionConstraints(d.getSelectioncriteria());
if (p.getSelectionConstraints() != null)
p.getSelectionConstraints().setSelection(resolver);
return p;
})
.filter(Objects::nonNull)
.collect(Collectors.toList()));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
validCommunities.forEach(community -> {
if (community.isValid())
communities.put(community.getId(), community);
});
return new CommunityConfiguration(communities);
}
private static Community getCommunity(CommunityModel cm) {
Community c = new Community();
c.setId(cm.getId());
c.setZenodoCommunities(cm.getOtherZenodoCommunities());
if (!StringUtils.isNullOrEmpty(cm.getZenodoCommunity()))
c.getZenodoCommunities().add(cm.getZenodoCommunity());
c.setSubjects(cm.getSubjects());
c.getSubjects().addAll(cm.getFos());
c.getSubjects().addAll(cm.getSdg());
if (cm.getAdvancedConstraints() != null) {
c.setConstraints(cm.getAdvancedConstraints());
c.getConstraints().setSelection(resolver);
}
if (cm.getRemoveConstraints() != null) {
c.setRemoveConstraints(cm.getRemoveConstraints());
c.getRemoveConstraints().setSelection(resolver);
}
return c;
}
public static List<CommunityModel> getValidCommunities(String baseURL) throws IOException {
return MAPPER
.readValue(QueryCommunityAPI.communities(baseURL), CommunitySummary.class)
.stream()
.filter(
community -> !community.getStatus().equals("hidden") &&
(community.getType().equals("ri") || community.getType().equals("community")))
.collect(Collectors.toList());
}
/**
* it returns for each organization the list of associated communities
*/
public static CommunityEntityMap getCommunityOrganization(String baseURL) throws IOException {
CommunityEntityMap organizationMap = new CommunityEntityMap();
getValidCommunities(baseURL)
.forEach(community -> {
String id = community.getId();
try {
List<String> associatedOrgs = MAPPER
.readValue(
QueryCommunityAPI.communityPropagationOrganization(id, baseURL), OrganizationList.class);
associatedOrgs.forEach(o -> {
if (!organizationMap
.keySet()
.contains(
"20|" + o))
organizationMap.put("20|" + o, new ArrayList<>());
organizationMap.get("20|" + o).add(community.getId());
});
} catch (IOException e) {
throw new RuntimeException(e);
}
});
return organizationMap;
}
public static CommunityEntityMap getCommunityProjects(String baseURL) throws IOException {
CommunityEntityMap projectMap = new CommunityEntityMap();
getValidCommunities(baseURL)
.forEach(community -> {
int page = -1;
int size = 100;
ContentModel cm = new ContentModel();
do {
page++;
try {
cm = MAPPER
.readValue(
QueryCommunityAPI
.communityProjects(
community.getId(), String.valueOf(page), String.valueOf(size), baseURL),
ContentModel.class);
if (cm.getContent().size() > 0) {
cm.getContent().forEach(p -> {
if (!projectMap.keySet().contains("40|" + p.getOpenaireId()))
projectMap.put("40|" + p.getOpenaireId(), new ArrayList<>());
projectMap.get("40|" + p.getOpenaireId()).add(community.getId());
});
}
} catch (IOException e) {
throw new RuntimeException(e);
}
} while (!cm.getLast());
});
return projectMap;
}
}

View File

@ -0,0 +1,43 @@
package eu.dnetlib.dhp.api.model;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.google.gson.Gson;
import eu.dnetlib.dhp.bulktag.community.SelectionConstraints;
@JsonAutoDetect
@JsonIgnoreProperties(ignoreUnknown = true)
public class CommunityContentprovider {
private String openaireId;
private SelectionConstraints selectioncriteria;
private String enabled;
public String getEnabled() {
return enabled;
}
public void setEnabled(String enabled) {
this.enabled = enabled;
}
public String getOpenaireId() {
return openaireId;
}
public void setOpenaireId(final String openaireId) {
this.openaireId = openaireId;
}
public SelectionConstraints getSelectioncriteria() {
return this.selectioncriteria;
}
public void setSelectioncriteria(SelectionConstraints selectioncriteria) {
this.selectioncriteria = selectioncriteria;
}
}

View File

@ -1,13 +1,13 @@
package eu.dnetlib.dhp.resulttocommunityfromorganization;
package eu.dnetlib.dhp.api.model;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
public class OrganizationMap extends HashMap<String, List<String>> {
public class CommunityEntityMap extends HashMap<String, List<String>> {
public OrganizationMap() {
public CommunityEntityMap() {
super();
}

View File

@ -0,0 +1,108 @@
package eu.dnetlib.dhp.api.model;
import java.io.Serializable;
import java.util.List;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import eu.dnetlib.dhp.bulktag.community.SelectionConstraints;
/**
* @author miriam.baglioni
* @Date 06/10/23
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class CommunityModel implements Serializable {
private String id;
private String type;
private String status;
private String zenodoCommunity;
private List<String> subjects;
private List<String> otherZenodoCommunities;
private List<String> fos;
private List<String> sdg;
private SelectionConstraints advancedConstraints;
private SelectionConstraints removeConstraints;
public String getZenodoCommunity() {
return zenodoCommunity;
}
public void setZenodoCommunity(String zenodoCommunity) {
this.zenodoCommunity = zenodoCommunity;
}
public List<String> getSubjects() {
return subjects;
}
public void setSubjects(List<String> subjects) {
this.subjects = subjects;
}
public List<String> getOtherZenodoCommunities() {
return otherZenodoCommunities;
}
public void setOtherZenodoCommunities(List<String> otherZenodoCommunities) {
this.otherZenodoCommunities = otherZenodoCommunities;
}
public List<String> getFos() {
return fos;
}
public void setFos(List<String> fos) {
this.fos = fos;
}
public List<String> getSdg() {
return sdg;
}
public void setSdg(List<String> sdg) {
this.sdg = sdg;
}
public SelectionConstraints getRemoveConstraints() {
return removeConstraints;
}
public void setRemoveConstraints(SelectionConstraints removeConstraints) {
this.removeConstraints = removeConstraints;
}
public SelectionConstraints getAdvancedConstraints() {
return advancedConstraints;
}
public void setAdvancedConstraints(SelectionConstraints advancedConstraints) {
this.advancedConstraints = advancedConstraints;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
}

View File

@ -0,0 +1,15 @@
package eu.dnetlib.dhp.api.model;
import java.io.Serializable;
import java.util.ArrayList;
/**
* @author miriam.baglioni
* @Date 06/10/23
*/
public class CommunitySummary extends ArrayList<CommunityModel> implements Serializable {
public CommunitySummary() {
super();
}
}

View File

@ -0,0 +1,51 @@
package eu.dnetlib.dhp.api.model;
import java.io.Serializable;
import java.util.List;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
/**
* @author miriam.baglioni
* @Date 09/10/23
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class ContentModel implements Serializable {
private List<ProjectModel> content;
private Integer totalPages;
private Boolean last;
private Integer number;
public List<ProjectModel> getContent() {
return content;
}
public void setContent(List<ProjectModel> content) {
this.content = content;
}
public Integer getTotalPages() {
return totalPages;
}
public void setTotalPages(Integer totalPages) {
this.totalPages = totalPages;
}
public Boolean getLast() {
return last;
}
public void setLast(Boolean last) {
this.last = last;
}
public Integer getNumber() {
return number;
}
public void setNumber(Integer number) {
this.number = number;
}
}

View File

@ -0,0 +1,13 @@
package eu.dnetlib.dhp.api.model;
import java.io.Serializable;
import java.util.ArrayList;
import eu.dnetlib.dhp.api.model.CommunityContentprovider;
public class DatasourceList extends ArrayList<CommunityContentprovider> implements Serializable {
public DatasourceList() {
super();
}
}

View File

@ -0,0 +1,16 @@
package eu.dnetlib.dhp.api.model;
import java.io.Serializable;
import java.util.ArrayList;
/**
* @author miriam.baglioni
* @Date 09/10/23
*/
public class OrganizationList extends ArrayList<String> implements Serializable {
public OrganizationList() {
super();
}
}

View File

@ -0,0 +1,24 @@
package eu.dnetlib.dhp.api.model;
import java.io.Serializable;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
/**
* @author miriam.baglioni
* @Date 09/10/23
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class ProjectModel implements Serializable {
private String openaireId;
public String getOpenaireId() {
return openaireId;
}
public void setOpenaireId(String openaireId) {
this.openaireId = openaireId;
}
}

View File

@ -9,7 +9,6 @@ import java.util.*;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
@ -21,8 +20,11 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import eu.dnetlib.dhp.api.Utils;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.bulktag.community.*;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
@ -54,50 +56,39 @@ public class SparkBulkTagJob {
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
Boolean isTest = Optional
.ofNullable(parser.get("isTest"))
.map(Boolean::valueOf)
.orElse(Boolean.FALSE);
log.info("isTest: {} ", isTest);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String baseURL = parser.get("baseURL");
log.info("baseURL: {}", baseURL);
ProtoMap protoMappingParams = new Gson().fromJson(parser.get("pathMap"), ProtoMap.class);
log.info("pathMap: {}", new Gson().toJson(protoMappingParams));
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
final Boolean saveGraph = Optional
.ofNullable(parser.get("saveGraph"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("saveGraph: {}", saveGraph);
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
SparkConf conf = new SparkConf();
CommunityConfiguration cc;
String taggingConf = parser.get("taggingConf");
String taggingConf = Optional
.ofNullable(parser.get("taggingConf"))
.map(String::valueOf)
.orElse(null);
if (isTest) {
if (taggingConf != null) {
cc = CommunityConfigurationFactory.newInstance(taggingConf);
} else {
cc = QueryInformationSystem.getCommunityConfiguration(parser.get("isLookUpUrl"));
cc = Utils.getCommunityConfiguration(baseURL);
log.info(OBJECT_MAPPER.writeValueAsString(cc));
}
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
extendCommunityConfigurationForEOSC(spark, inputPath, cc);
execBulkTag(spark, inputPath, outputPath, protoMappingParams, resultClazz, cc);
execBulkTag(spark, inputPath, outputPath, protoMappingParams, cc);
});
}
@ -106,10 +97,7 @@ public class SparkBulkTagJob {
Dataset<String> datasources = readPath(
spark, inputPath
.substring(
0,
inputPath.lastIndexOf("/"))
+ "/datasource",
+ "datasource",
Datasource.class)
.filter((FilterFunction<Datasource>) ds -> isOKDatasource(ds))
.map((MapFunction<Datasource, String>) ds -> ds.getId(), Encoders.STRING());
@ -117,10 +105,10 @@ public class SparkBulkTagJob {
Map<String, List<Pair<String, SelectionConstraints>>> dsm = cc.getEoscDatasourceMap();
for (String ds : datasources.collectAsList()) {
final String dsId = ds.substring(3);
if (!dsm.containsKey(dsId)) {
// final String dsId = ds.substring(3);
if (!dsm.containsKey(ds)) {
ArrayList<Pair<String, SelectionConstraints>> eoscList = new ArrayList<>();
dsm.put(dsId, eoscList);
dsm.put(ds, eoscList);
}
}
@ -142,22 +130,30 @@ public class SparkBulkTagJob {
String inputPath,
String outputPath,
ProtoMap protoMappingParams,
Class<R> resultClazz,
CommunityConfiguration communityConfiguration) {
ResultTagger resultTagger = new ResultTagger();
readPath(spark, inputPath, resultClazz)
.map(patchResult(), Encoders.bean(resultClazz))
.filter(Objects::nonNull)
.map(
(MapFunction<R, R>) value -> resultTagger
.enrichContextCriteria(
value, communityConfiguration, protoMappingParams),
Encoders.bean(resultClazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
ModelSupport.entityTypes
.keySet()
.parallelStream()
.filter(e -> ModelSupport.isResult(e))
.forEach(e -> {
removeOutputDir(spark, outputPath + e.name());
ResultTagger resultTagger = new ResultTagger();
Class<R> resultClazz = ModelSupport.entityTypes.get(e);
readPath(spark, inputPath + e.name(), resultClazz)
.map(patchResult(), Encoders.bean(resultClazz))
.filter(Objects::nonNull)
.map(
(MapFunction<R, R>) value -> resultTagger
.enrichContextCriteria(
value, communityConfiguration, protoMappingParams),
Encoders.bean(resultClazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + e.name());
});
}
public static <R> Dataset<R> readPath(

View File

@ -4,6 +4,7 @@ package eu.dnetlib.dhp.bulktag.community;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import com.google.gson.Gson;
@ -13,7 +14,7 @@ public class Community implements Serializable {
private String id;
private List<String> subjects = new ArrayList<>();
private List<Provider> providers = new ArrayList<>();
private List<ZenodoCommunity> zenodoCommunities = new ArrayList<>();
private List<String> zenodoCommunities = new ArrayList<>();
private SelectionConstraints constraints = new SelectionConstraints();
private SelectionConstraints removeConstraints = new SelectionConstraints();
@ -26,7 +27,7 @@ public class Community implements Serializable {
return !getSubjects().isEmpty()
|| !getProviders().isEmpty()
|| !getZenodoCommunities().isEmpty()
|| getConstraints().getCriteria() != null;
|| (Optional.ofNullable(getConstraints()).isPresent() && getConstraints().getCriteria() != null);
}
public String getId() {
@ -53,11 +54,11 @@ public class Community implements Serializable {
this.providers = providers;
}
public List<ZenodoCommunity> getZenodoCommunities() {
public List<String> getZenodoCommunities() {
return zenodoCommunities;
}
public void setZenodoCommunities(List<ZenodoCommunity> zenodoCommunities) {
public void setZenodoCommunities(List<String> zenodoCommunities) {
this.zenodoCommunities = zenodoCommunities;
}

View File

@ -81,7 +81,7 @@ public class CommunityConfiguration implements Serializable {
this.removeConstraintsMap = removeConstraintsMap;
}
CommunityConfiguration(final Map<String, Community> communities) {
public CommunityConfiguration(final Map<String, Community> communities) {
this.communities = communities;
init();
}
@ -117,10 +117,10 @@ public class CommunityConfiguration implements Serializable {
add(d.getOpenaireId(), new Pair<>(id, d.getSelectionConstraints()), datasourceMap);
}
// get zenodo communities
for (ZenodoCommunity zc : c.getZenodoCommunities()) {
for (String zc : c.getZenodoCommunities()) {
add(
zc.getZenodoCommunityId(),
new Pair<>(id, zc.getSelCriteria()),
zc,
new Pair<>(id, null),
zenodocommunityMap);
}
selectionConstraintsMap.put(id, c.getConstraints());

View File

@ -143,16 +143,16 @@ public class CommunityConfigurationFactory {
return providerList;
}
private static List<ZenodoCommunity> parseZenodoCommunities(final Node node) {
private static List<String> parseZenodoCommunities(final Node node) {
final List<Node> list = node.selectNodes("./zenodocommunities/zenodocommunity");
final List<ZenodoCommunity> zenodoCommunityList = new ArrayList<>();
final List<String> zenodoCommunityList = new ArrayList<>();
for (Node n : list) {
ZenodoCommunity zc = new ZenodoCommunity();
zc.setZenodoCommunityId(n.selectSingleNode("./zenodoid").getText());
zc.setSelCriteria(n.selectSingleNode("./selcriteria"));
// ZenodoCommunity zc = new ZenodoCommunity();
// zc.setZenodoCommunityId(n.selectSingleNode("./zenodoid").getText());
// zc.setSelCriteria(n.selectSingleNode("./selcriteria"));
zenodoCommunityList.add(zc);
zenodoCommunityList.add(n.selectSingleNode("./zenodoid").getText());
}
log.info("size of the zenodo community list " + zenodoCommunityList.size());

View File

@ -4,6 +4,8 @@ package eu.dnetlib.dhp.bulktag.community;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import org.apache.htrace.fasterxml.jackson.annotation.JsonIgnore;
import eu.dnetlib.dhp.bulktag.criteria.Selection;
import eu.dnetlib.dhp.bulktag.criteria.VerbResolver;
@ -12,6 +14,7 @@ public class Constraint implements Serializable {
private String field;
private String value;
// private String element;
@JsonIgnore
private Selection selection;
public String getVerb() {
@ -38,10 +41,11 @@ public class Constraint implements Serializable {
this.value = value;
}
public void setSelection(Selection sel) {
selection = sel;
}
//@JsonIgnore
// public void setSelection(Selection sel) {
// selection = sel;
// }
@JsonIgnore
public void setSelection(VerbResolver resolver)
throws InvocationTargetException, NoSuchMethodException, InstantiationException,
IllegalAccessException {
@ -52,11 +56,4 @@ public class Constraint implements Serializable {
return selection.apply(metadata);
}
// public String getElement() {
// return element;
// }
//
// public void setElement(String element) {
// this.element = element;
// }
}

View File

@ -1,34 +0,0 @@
package eu.dnetlib.dhp.bulktag.community;
import java.io.IOException;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.dom4j.DocumentException;
import org.xml.sax.SAXException;
import com.google.common.base.Joiner;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
public class QueryInformationSystem {
public static CommunityConfiguration getCommunityConfiguration(final String isLookupUrl)
throws ISLookUpException, DocumentException, SAXException, IOException {
ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl);
final List<String> res = isLookUp
.quickSearchProfile(
IOUtils
.toString(
QueryInformationSystem.class
.getResourceAsStream(
"/eu/dnetlib/dhp/bulktag/query.xq")));
final String xmlConf = "<communities>" + Joiner.on(" ").join(res) + "</communities>";
return CommunityConfigurationFactory.newInstance(xmlConf);
}
}

View File

@ -82,19 +82,23 @@ public class ResultTagger implements Serializable {
// communities contains all the communities to be not added to the context
final Set<String> removeCommunities = new HashSet<>();
// if (conf.getRemoveConstraintsMap().keySet().size() > 0)
conf
.getRemoveConstraintsMap()
.keySet()
.forEach(communityId -> {
if (conf.getRemoveConstraintsMap().get(communityId).getCriteria() != null &&
conf
.getRemoveConstraintsMap()
.get(communityId)
.getCriteria()
.stream()
.anyMatch(crit -> crit.verifyCriteria(param)))
removeCommunities.add(communityId);
});
.forEach(
communityId -> {
// log.info("Remove constraints for " + communityId);
if (conf.getRemoveConstraintsMap().keySet().contains(communityId) &&
conf.getRemoveConstraintsMap().get(communityId).getCriteria() != null &&
conf
.getRemoveConstraintsMap()
.get(communityId)
.getCriteria()
.stream()
.anyMatch(crit -> crit.verifyCriteria(param)))
removeCommunities.add(communityId);
});
// communities contains all the communities to be added as context for the result
final Set<String> communities = new HashSet<>();
@ -124,10 +128,10 @@ public class ResultTagger implements Serializable {
if (Objects.nonNull(result.getInstance())) {
for (Instance i : result.getInstance()) {
if (Objects.nonNull(i.getCollectedfrom()) && Objects.nonNull(i.getCollectedfrom().getKey())) {
collfrom.add(StringUtils.substringAfter(i.getCollectedfrom().getKey(), "|"));
collfrom.add(i.getCollectedfrom().getKey());
}
if (Objects.nonNull(i.getHostedby()) && Objects.nonNull(i.getHostedby().getKey())) {
hostdby.add(StringUtils.substringAfter(i.getHostedby().getKey(), "|"));
hostdby.add(i.getHostedby().getKey());
}
}

View File

@ -7,11 +7,13 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import eu.dnetlib.dhp.bulktag.criteria.VerbResolver;
@JsonAutoDetect
public class SelectionConstraints implements Serializable {
private List<Constraints> criteria;

View File

@ -9,9 +9,7 @@ import java.util.*;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
@ -20,6 +18,8 @@ import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import eu.dnetlib.dhp.api.Utils;
import eu.dnetlib.dhp.api.model.CommunityEntityMap;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation;
@ -48,10 +48,10 @@ public class PrepareResultCommunitySet {
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final OrganizationMap organizationMap = new Gson()
.fromJson(
parser.get("organizationtoresultcommunitymap"),
OrganizationMap.class);
final String baseURL = parser.get("baseURL");
log.info("baseURL: {}", baseURL);
final CommunityEntityMap organizationMap = Utils.getCommunityOrganization(baseURL);
log.info("organizationMap: {}", new Gson().toJson(organizationMap));
SparkConf conf = new SparkConf();
@ -70,7 +70,7 @@ public class PrepareResultCommunitySet {
SparkSession spark,
String inputPath,
String outputPath,
OrganizationMap organizationMap) {
CommunityEntityMap organizationMap) {
Dataset<Relation> relation = readPath(spark, inputPath, Relation.class);
relation.createOrReplaceTempView("relation");
@ -115,7 +115,7 @@ public class PrepareResultCommunitySet {
}
private static MapFunction<ResultOrganizations, ResultCommunityList> mapResultCommunityFn(
OrganizationMap organizationMap) {
CommunityEntityMap organizationMap) {
return value -> {
String rId = value.getResultId();
Optional<List<String>> orgs = Optional.ofNullable(value.getMerges());

View File

@ -2,7 +2,7 @@
package eu.dnetlib.dhp.resulttocommunityfromorganization;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.ArrayList;
import java.util.Arrays;
@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.Result;
import scala.Tuple2;
@ -53,29 +54,14 @@ public class SparkResultToCommunityFromOrganizationJob {
final String possibleupdatespath = parser.get("preparedInfoPath");
log.info("preparedInfoPath: {}", possibleupdatespath);
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
final Boolean saveGraph = Optional
.ofNullable(parser.get("saveGraph"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("saveGraph: {}", saveGraph);
@SuppressWarnings("unchecked")
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
runWithSparkHiveSession(
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
if (saveGraph) {
execPropagation(spark, inputPath, outputPath, resultClazz, possibleupdatespath);
}
execPropagation(spark, inputPath, outputPath, possibleupdatespath);
});
}
@ -83,22 +69,32 @@ public class SparkResultToCommunityFromOrganizationJob {
SparkSession spark,
String inputPath,
String outputPath,
Class<R> resultClazz,
String possibleUpdatesPath) {
Dataset<ResultCommunityList> possibleUpdates = readPath(spark, possibleUpdatesPath, ResultCommunityList.class);
Dataset<R> result = readPath(spark, inputPath, resultClazz);
result
.joinWith(
possibleUpdates,
result.col("id").equalTo(possibleUpdates.col("resultId")),
"left_outer")
.map(resultCommunityFn(), Encoders.bean(resultClazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
ModelSupport.entityTypes
.keySet()
.parallelStream()
.forEach(e -> {
if (ModelSupport.isResult(e)) {
Class<R> resultClazz = ModelSupport.entityTypes.get(e);
removeOutputDir(spark, outputPath + e.name());
Dataset<R> result = readPath(spark, inputPath + e.name(), resultClazz);
result
.joinWith(
possibleUpdates,
result.col("id").equalTo(possibleUpdates.col("resultId")),
"left_outer")
.map(resultCommunityFn(), Encoders.bean(resultClazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + e.name());
}
});
}
private static <R extends Result> MapFunction<Tuple2<R, ResultCommunityList>, R> resultCommunityFn() {

View File

@ -0,0 +1,123 @@
package eu.dnetlib.dhp.resulttocommunityfromproject;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.*;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import eu.dnetlib.dhp.api.Utils;
import eu.dnetlib.dhp.api.model.CommunityEntityMap;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultOrganizations;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation;
import scala.Tuple2;
public class PrepareResultCommunitySet {
private static final Logger log = LoggerFactory.getLogger(PrepareResultCommunitySet.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
PrepareResultCommunitySet.class
.getResourceAsStream(
"/eu/dnetlib/dhp/resulttocommunityfromproject/input_preparecommunitytoresult_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String baseURL = parser.get("baseURL");
log.info("baseUEL: {}", baseURL);
final CommunityEntityMap projectsMap = Utils.getCommunityProjects(baseURL);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
prepareInfo(spark, inputPath, outputPath, projectsMap);
});
}
private static void prepareInfo(
SparkSession spark,
String inputPath,
String outputPath,
CommunityEntityMap projectMap) {
final StructType structureSchema = new StructType()
.add(
"dataInfo", new StructType()
.add("deletedbyinference", DataTypes.BooleanType)
.add("invisible", DataTypes.BooleanType))
.add("source", DataTypes.StringType)
.add("target", DataTypes.StringType)
.add("relClass", DataTypes.StringType);
spark
.read()
.schema(structureSchema)
.json(inputPath)
.filter(
"dataInfo.deletedbyinference != true " +
"and relClass == '" + ModelConstants.IS_PRODUCED_BY + "'")
.select(
new Column("source").as("resultId"),
new Column("target").as("projectId"))
.groupByKey((MapFunction<Row, String>) r -> (String) r.getAs("resultId"), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Row, ResultProjectList>) (k, v) -> {
ResultProjectList rpl = new ResultProjectList();
rpl.setResultId(k);
ArrayList<String> cl = new ArrayList<>();
cl.addAll(projectMap.get(v.next().getAs("projectId")));
v.forEachRemaining(r -> {
projectMap
.get(r.getAs("projectId"))
.forEach(c -> {
if (!cl.contains(c))
cl.add(c);
});
});
if (cl.size() == 0)
return null;
rpl.setCommunityList(cl);
return rpl;
}, Encoders.bean(ResultProjectList.class))
.filter(Objects::nonNull)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
}

View File

@ -0,0 +1,26 @@
package eu.dnetlib.dhp.resulttocommunityfromproject;
import java.io.Serializable;
import java.util.ArrayList;
public class ResultProjectList implements Serializable {
private String resultId;
private ArrayList<String> communityList;
public String getResultId() {
return resultId;
}
public void setResultId(String resultId) {
this.resultId = resultId;
}
public ArrayList<String> getCommunityList() {
return communityList;
}
public void setCommunityList(ArrayList<String> communityList) {
this.communityList = communityList;
}
}

View File

@ -0,0 +1,163 @@
package eu.dnetlib.dhp.resulttocommunityfromproject;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.PropagationConstant.PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
import eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.Result;
import scala.Tuple2;
/**
* @author miriam.baglioni
* @Date 11/10/23
*/
public class SparkResultToCommunityFromProject implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkResultToCommunityFromProject.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkResultToCommunityFromProject.class
.getResourceAsStream(
"/eu/dnetlib/dhp/resulttocommunityfromproject/input_communitytoresult_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String possibleupdatespath = parser.get("preparedInfoPath");
log.info("preparedInfoPath: {}", possibleupdatespath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
execPropagation(spark, inputPath, outputPath, possibleupdatespath);
});
}
private static <R extends Result> void execPropagation(
SparkSession spark,
String inputPath,
String outputPath,
String possibleUpdatesPath) {
Dataset<ResultProjectList> possibleUpdates = readPath(spark, possibleUpdatesPath, ResultProjectList.class);
ModelSupport.entityTypes
.keySet()
.parallelStream()
.forEach(e -> {
if (ModelSupport.isResult(e)) {
removeOutputDir(spark, outputPath + e.name());
Class<R> resultClazz = ModelSupport.entityTypes.get(e);
Dataset<R> result = readPath(spark, inputPath + e.name(), resultClazz);
result
.joinWith(
possibleUpdates,
result.col("id").equalTo(possibleUpdates.col("resultId")),
"left_outer")
.map(resultCommunityFn(), Encoders.bean(resultClazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + e.name());
}
});
}
private static <R extends Result> MapFunction<Tuple2<R, ResultProjectList>, R> resultCommunityFn() {
return value -> {
R ret = value._1();
Optional<ResultProjectList> rcl = Optional.ofNullable(value._2());
if (rcl.isPresent()) {
// ArrayList<String> communitySet = rcl.get().getCommunityList();
List<String> contextList = ret
.getContext()
.stream()
.map(Context::getId)
.collect(Collectors.toList());
@SuppressWarnings("unchecked")
R res = (R) ret.getClass().newInstance();
res.setId(ret.getId());
List<Context> propagatedContexts = new ArrayList<>();
for (String cId : rcl.get().getCommunityList()) {
if (!contextList.contains(cId)) {
Context newContext = new Context();
newContext.setId(cId);
newContext
.setDataInfo(
Arrays
.asList(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS)));
propagatedContexts.add(newContext);
} else {
ret
.getContext()
.stream()
.filter(c -> c.getId().equals(cId))
.findFirst()
.get()
.getDataInfo()
.add(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS));
}
}
res.setContext(propagatedContexts);
ret.mergeFrom(res);
}
return ret;
};
}
}

View File

@ -1,10 +1,5 @@
[
{
"paramName":"is",
"paramLongName":"isLookUpUrl",
"paramDescription": "URL of the isLookUp Service",
"paramRequired": true
},
{
"paramName":"s",
"paramLongName":"sourcePath",
@ -17,12 +12,7 @@
"paramDescription": "the json path associated to each selection field",
"paramRequired": true
},
{
"paramName":"tn",
"paramLongName":"resultTableName",
"paramDescription": "the name of the result table we are currently working on",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
@ -35,17 +25,19 @@
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
},
{
"paramName": "test",
"paramLongName": "isTest",
"paramDescription": "Parameter intended for testing purposes only. True if the reun is relatesd to a test and so the taggingConf parameter should be loaded",
"paramRequired": false
},
{
"paramName": "tg",
"paramLongName": "taggingConf",
"paramDescription": "this parameter is intended for testing purposes only. It is a possible tagging configuration obtained via the XQUERY. Intended to be removed",
"paramRequired": false
},
{
"paramName": "bu",
"paramLongName": "baseURL",
"paramDescription": "this parameter is to specify the api to be queried (beta or production)",
"paramRequired": false
}
]

View File

@ -4,10 +4,6 @@
<name>sourcePath</name>
<description>the source path</description>
</property>
<property>
<name>isLookUpUrl</name>
<description>the isLookup service endpoint</description>
</property>
<property>
<name>pathMap</name>
<description>the json path associated to each selection field</description>
@ -44,7 +40,7 @@
</configuration>
</global>
<start to="reset_outputpath"/>
<start to="exec_bulktag"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
@ -102,16 +98,9 @@
<error to="Kill"/>
</action>
<join name="copy_wait" to="fork_exec_bulktag"/>
<join name="copy_wait" to="exec_bulktag"/>
<fork name="fork_exec_bulktag">
<path start="bulktag_publication"/>
<path start="bulktag_dataset"/>
<path start="bulktag_otherresearchproduct"/>
<path start="bulktag_software"/>
</fork>
<action name="bulktag_publication">
<action name="exec_bulktag">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
@ -128,98 +117,15 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${outputPath}/publication</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/</arg>
<arg>--outputPath</arg><arg>${outputPath}/</arg>
<arg>--pathMap</arg><arg>${pathMap}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--production</arg><arg>${production}</arg>
</spark>
<ok to="wait"/>
<ok to="End"/>
<error to="Kill"/>
</action>
<action name="bulktag_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>bulkTagging-dataset</name>
<class>eu.dnetlib.dhp.bulktag.SparkBulkTagJob</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--num-executors=${sparkExecutorNumber}
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${outputPath}/dataset</arg>
<arg>--pathMap</arg><arg>${pathMap}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
</spark>
<ok to="wait"/>
<error to="Kill"/>
</action>
<action name="bulktag_otherresearchproduct">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>bulkTagging-orp</name>
<class>eu.dnetlib.dhp.bulktag.SparkBulkTagJob</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--num-executors=${sparkExecutorNumber}
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg>
<arg>--pathMap</arg><arg>${pathMap}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
</spark>
<ok to="wait"/>
<error to="Kill"/>
</action>
<action name="bulktag_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>bulkTagging-software</name>
<class>eu.dnetlib.dhp.bulktag.SparkBulkTagJob</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--num-executors=${sparkExecutorNumber}
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${outputPath}/software</arg>
<arg>--pathMap</arg><arg>${pathMap}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
</spark>
<ok to="wait"/>
<error to="Kill"/>
</action>
<join name="wait" to="End"/>
<end name="End"/>

View File

@ -1,62 +0,0 @@
for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType')
let $subj := $x//CONFIGURATION/context/param[./@name='subject']/text()
let $datasources := $x//CONFIGURATION/context/category[./@id=concat($x//CONFIGURATION/context/@id,'::contentproviders')]/concept
let $organizations := $x//CONFIGURATION/context/category[./@id=concat($x//CONFIGURATION/context/@id,'::resultorganizations')]/concept
let $communities := $x//CONFIGURATION/context/category[./@id=concat($x//CONFIGURATION/context/@id,'::zenodocommunities')]/concept
let $fos := $x//CONFIGURATION/context/param[./@name='fos']/text()
let $sdg := $x//CONFIGURATION/context/param[./@name='sdg']/text()
let $zenodo := $x//param[./@name='zenodoCommunity']/text()
where $x//CONFIGURATION/context[./@type='community' or ./@type='ri'] and $x//context/param[./@name = 'status']/text() != 'hidden'
return
<community>
{ $x//CONFIGURATION/context/@id}
<removeConstraints>
{$x//CONFIGURATION/context/param[./@name='removeConstraints']/text() }
</removeConstraints>
<advancedConstraints>
{$x//CONFIGURATION/context/param[./@name='advancedConstraints']/text() }
</advancedConstraints>
<subjects>
{for $y in tokenize($subj,',')
return
<subject>{$y}</subject>}
{for $y in tokenize($fos,',')
return
<subject>{$y}</subject>}
{for $y in tokenize($sdg,',')
return
<subject>{$y}</subject>}
</subjects>
<datasources>
{for $d in $datasources
where $d/param[./@name='enabled']/text()='true'
return
<datasource>
<openaireId>
{$d//param[./@name='openaireId']/text()}
</openaireId>
<selcriteria>
{$d/param[./@name='selcriteria']/text()}
</selcriteria>
</datasource> }
</datasources>
<zenodocommunities>
{for $zc in $zenodo
return
<zenodocommunity>
<zenodoid>
{$zc}
</zenodoid>
</zenodocommunity>}
{for $zc in $communities
return
<zenodocommunity>
<zenodoid>
{$zc/param[./@name='zenodoid']/text()}
</zenodoid>
<selcriteria>
{$zc/param[./@name='selcriteria']/text()}
</selcriteria>
</zenodocommunity>}
</zenodocommunities>
</community>

View File

@ -5,24 +5,7 @@
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName":"h",
"paramLongName":"hive_metastore_uris",
"paramDescription": "the hive metastore uris",
"paramRequired": true
},
{
"paramName":"sg",
"paramLongName":"saveGraph",
"paramDescription": "true if the new version of the graph must be saved",
"paramRequired": false
},
{
"paramName":"test",
"paramLongName":"isTest",
"paramDescription": "true if it is executing a test",
"paramRequired": false
},
{
"paramName": "out",
"paramLongName": "outputPath",
@ -35,12 +18,6 @@
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
},
{
"paramName":"tn",
"paramLongName":"resultTableName",
"paramDescription": "the name of the result table we are currently working on",
"paramRequired": true
},
{
"paramName": "p",
"paramLongName": "preparedInfoPath",

View File

@ -5,12 +5,6 @@
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName":"ocm",
"paramLongName":"organizationtoresultcommunitymap",
"paramDescription": "the map for the association organization communities",
"paramRequired": true
},
{
"paramName":"h",
"paramLongName":"hive_metastore_uris",
@ -28,6 +22,12 @@
"paramLongName": "outputPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
},
{
"paramName": "bu",
"paramLongName": "baseURL",
"paramDescription": "the base URL to the community API to use",
"paramRequired": false
}
]

View File

@ -4,10 +4,7 @@
<name>sourcePath</name>
<description>the source path</description>
</property>
<property>
<name>organizationtoresultcommunitymap</name>
<description>organization community map</description>
</property>
<property>
<name>outputPath</name>
<description>the output path</description>
@ -25,7 +22,7 @@
</configuration>
</global>
<start to="reset_outputpath"/>
<start to="prepare_result_communitylist"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
@ -93,33 +90,28 @@
<class>eu.dnetlib.dhp.resulttocommunityfromorganization.PrepareResultCommunitySet</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--executor-cores=6
--executor-memory=5G
--conf spark.executor.memoryOverhead=3g
--conf spark.sql.shuffle.partitions=3284
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/relation</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo/resultCommunityList</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--organizationtoresultcommunitymap</arg><arg>${organizationtoresultcommunitymap}</arg>
<arg>--production</arg><arg>${production}</arg>
</spark>
<ok to="fork-join-exec-propagation"/>
<ok to="exec-propagation"/>
<error to="Kill"/>
</action>
<fork name="fork-join-exec-propagation">
<path start="join_propagate_publication"/>
<path start="join_propagate_dataset"/>
<path start="join_propagate_otherresearchproduct"/>
<path start="join_propagate_software"/>
</fork>
<action name="join_propagate_publication">
<action name="exec-propagation">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
@ -127,115 +119,26 @@
<class>eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--executor-cores=6
--executor-memory=5G
--conf spark.executor.memoryOverhead=3g
--conf spark.sql.shuffle.partitions=3284
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo/resultCommunityList</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--outputPath</arg><arg>${outputPath}/publication</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/</arg>
<arg>--outputPath</arg><arg>${outputPath}/</arg>
</spark>
<ok to="wait2"/>
<ok to="End"/>
<error to="Kill"/>
</action>
<action name="join_propagate_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>community2resultfromorganization-Dataset</name>
<class>eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo/resultCommunityList</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--outputPath</arg><arg>${outputPath}/dataset</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
</spark>
<ok to="wait2"/>
<error to="Kill"/>
</action>
<action name="join_propagate_otherresearchproduct">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>community2resultfromorganization-ORP</name>
<class>eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo/resultCommunityList</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
</spark>
<ok to="wait2"/>
<error to="Kill"/>
</action>
<action name="join_propagate_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>community2resultfromorganization-Software</name>
<class>eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo/resultCommunityList</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--outputPath</arg><arg>${outputPath}/software</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
</spark>
<ok to="wait2"/>
<error to="Kill"/>
</action>
<join name="wait2" to="End"/>
<end name="End"/>

View File

@ -0,0 +1,28 @@
[
{
"paramName":"s",
"paramLongName":"sourcePath",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
},
{
"paramName": "ssm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
},
{
"paramName": "p",
"paramLongName": "preparedInfoPath",
"paramDescription": "the path where prepared info have been stored",
"paramRequired": true
}
]

View File

@ -0,0 +1,28 @@
[
{
"paramName":"s",
"paramLongName":"sourcePath",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName": "ssm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
},
{
"paramName": "bu",
"paramLongName": "baseURL",
"paramDescription": "the path used to store temporary output files",
"paramRequired": false
}
]

View File

@ -0,0 +1,58 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hive_metastore_uris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
</property>
<property>
<name>spark2EventLogDir</name>
<value>/user/spark/spark2ApplicationHistory</value>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
</property>
<property>
<name>sparkExecutorNumber</name>
<value>4</value>
</property>
<property>
<name>sparkDriverMemory</name>
<value>15G</value>
</property>
<property>
<name>sparkExecutorMemory</name>
<value>6G</value>
</property>
<property>
<name>sparkExecutorCores</name>
<value>1</value>
</property>
<property>
<name>spark2MaxExecutors</name>
<value>50</value>
</property>
</configuration>

View File

@ -0,0 +1,144 @@
<workflow-app name="community_to_result_propagation_project" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the source path</description>
</property>
<property>
<name>outputPath</name>
<description>the output path</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="reset_outputpath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="reset_outputpath">
<fs>
<delete path="${outputPath}"/>
<mkdir path="${outputPath}"/>
</fs>
<ok to="copy_entities"/>
<error to="Kill"/>
</action>
<fork name="copy_entities">
<path start="copy_relation"/>
<path start="copy_organization"/>
<path start="copy_projects"/>
<path start="copy_datasources"/>
</fork>
<action name="copy_relation">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${sourcePath}/relation</arg>
<arg>${nameNode}/${outputPath}/relation</arg>
</distcp>
<ok to="copy_wait"/>
<error to="Kill"/>
</action>
<action name="copy_organization">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${sourcePath}/organization</arg>
<arg>${nameNode}/${outputPath}/organization</arg>
</distcp>
<ok to="copy_wait"/>
<error to="Kill"/>
</action>
<action name="copy_projects">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${sourcePath}/project</arg>
<arg>${nameNode}/${outputPath}/project</arg>
</distcp>
<ok to="copy_wait"/>
<error to="Kill"/>
</action>
<action name="copy_datasources">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${sourcePath}/datasource</arg>
<arg>${nameNode}/${outputPath}/datasource</arg>
</distcp>
<ok to="copy_wait"/>
<error to="Kill"/>
</action>
<join name="copy_wait" to="prepare_result_communitylist"/>
<action name="prepare_result_communitylist">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Prepare-Community-Result-Organization</name>
<class>eu.dnetlib.dhp.resulttocommunityfromproject.PrepareResultCommunitySet</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=6
--executor-memory=5G
--conf spark.executor.memoryOverhead=3g
--conf spark.sql.shuffle.partitions=3284
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/relation</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo/resultCommunityList</arg>
<arg>--production</arg><arg>${production}</arg>
</spark>
<ok to="exec-propagation"/>
<error to="Kill"/>
</action>
<action name="exec-propagation">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>community2resultfromproject</name>
<class>eu.dnetlib.dhp.resulttocommunityfromproject.SparkResultToCommunityFromProject</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=6
--executor-memory=5G
--conf spark.executor.memoryOverhead=3g
--conf spark.sql.shuffle.partitions=3284
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo/resultCommunityList</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/</arg>
<arg>--outputPath</arg><arg>${outputPath}/</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -31,8 +31,6 @@ public class BulkTagJobTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static final String MOCK_IS_LOOK_UP_URL = "BASEURL:8280/is/services/isLookUp";
public static final String pathMap = "{ \"author\" : \"$['author'][*]['fullname']\","
+ " \"title\" : \"$['title'][*]['value']\","
+ " \"orcid\" : \"$['author'][*]['pid'][*][?(@['key']=='ORCID')]['value']\","
@ -42,7 +40,9 @@ public class BulkTagJobTest {
"\"fos\" : \"$['subject'][?(@['qualifier']['classid']=='FOS')].value\"," +
"\"sdg\" : \"$['subject'][?(@['qualifier']['classid']=='SDG')].value\"," +
"\"hostedby\" : \"$['instance'][*]['hostedby']['key']\" , " +
"\"collectedfrom\" : \"$['instance'][*]['collectedfrom']['key']\"} ";
"\"collectedfrom\" : \"$['instance'][*]['collectedfrom']['key']\"," +
"\"publisher\":\"$['publisher'].value\"," +
"\"publicationyear\":\"$['dateofacceptance'].value\"} ";
private static SparkSession spark;
@ -98,14 +98,11 @@ public class BulkTagJobTest {
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath",
getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/no_updates").getPath(),
getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/no_updates/").getPath(),
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -133,19 +130,16 @@ public class BulkTagJobTest {
@Test
void bulktagBySubjectNoPreviousContextTest() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject/nocontext")
.getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject/nocontext/")
.getPath();
final String pathMap = BulkTagJobTest.pathMap;
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -230,19 +224,19 @@ public class BulkTagJobTest {
void bulktagBySubjectPreviousContextNoProvenanceTest() throws Exception {
final String sourcePath = getClass()
.getResource(
"/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject/contextnoprovenance")
"/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject/contextnoprovenance/")
.getPath();
final String pathMap = BulkTagJobTest.pathMap;
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -311,18 +305,18 @@ public class BulkTagJobTest {
@Test
void bulktagByDatasourceTest() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource")
.getResource("/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource/")
.getPath();
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
"-outputPath", workingDir.toString() + "/publication",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -384,25 +378,25 @@ public class BulkTagJobTest {
void bulktagByZenodoCommunityTest() throws Exception {
final String sourcePath = getClass()
.getResource(
"/eu/dnetlib/dhp/bulktag/sample/otherresearchproduct/update_zenodocommunity")
"/eu/dnetlib/dhp/bulktag/sample/otherresearchproduct/update_zenodocommunity/")
.getPath();
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct",
"-outputPath", workingDir.toString() + "/orp",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<OtherResearchProduct> tmp = sc
.textFile(workingDir.toString() + "/orp")
.textFile(workingDir.toString() + "/otherresearchproduct")
.map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class));
Assertions.assertEquals(10, tmp.count());
@ -505,18 +499,18 @@ public class BulkTagJobTest {
@Test
void bulktagBySubjectDatasourceTest() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject_datasource")
.getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject_datasource/")
.getPath();
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -539,6 +533,7 @@ public class BulkTagJobTest {
+ "where MyD.inferenceprovenance = 'bulktagging'";
org.apache.spark.sql.Dataset<Row> idExplodeCommunity = spark.sql(query);
Assertions.assertEquals(7, idExplodeCommunity.count());
Assertions
@ -636,14 +631,14 @@ public class BulkTagJobTest {
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath",
getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/software/software_10.json.gz").getPath(),
getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/software/").getPath(),
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Software",
"-outputPath", workingDir.toString() + "/software",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -732,18 +727,18 @@ public class BulkTagJobTest {
final String sourcePath = getClass()
.getResource(
"/eu/dnetlib/dhp/bulktag/sample/dataset/update_datasourcewithconstraints")
"/eu/dnetlib/dhp/bulktag/sample/dataset/update_datasourcewithconstraints/")
.getPath();
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -774,19 +769,19 @@ public class BulkTagJobTest {
void bulkTagOtherJupyter() throws Exception {
final String sourcePath = getClass()
.getResource(
"/eu/dnetlib/dhp/eosctag/jupyter/otherresearchproduct")
"/eu/dnetlib/dhp/eosctag/jupyter/")
.getPath();
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct",
"-outputPath", workingDir.toString() + "/otherresearchproduct",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -829,18 +824,18 @@ public class BulkTagJobTest {
public void bulkTagDatasetJupyter() throws Exception {
final String sourcePath = getClass()
.getResource(
"/eu/dnetlib/dhp/eosctag/jupyter/dataset")
"/eu/dnetlib/dhp/eosctag/jupyter/")
.getPath();
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -878,18 +873,18 @@ public class BulkTagJobTest {
final String sourcePath = getClass()
.getResource(
"/eu/dnetlib/dhp/eosctag/jupyter/software")
"/eu/dnetlib/dhp/eosctag/jupyter/")
.getPath();
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Software",
"-outputPath", workingDir.toString() + "/software",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -1096,18 +1091,18 @@ public class BulkTagJobTest {
void galaxyOtherTest() throws Exception {
final String sourcePath = getClass()
.getResource(
"/eu/dnetlib/dhp/eosctag/galaxy/otherresearchproduct")
"/eu/dnetlib/dhp/eosctag/galaxy/")
.getPath();
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct",
"-outputPath", workingDir.toString() + "/otherresearchproduct",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -1214,18 +1209,18 @@ public class BulkTagJobTest {
void galaxySoftwareTest() throws Exception {
final String sourcePath = getClass()
.getResource(
"/eu/dnetlib/dhp/eosctag/galaxy/software")
"/eu/dnetlib/dhp/eosctag/galaxy/")
.getPath();
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Software",
"-outputPath", workingDir.toString() + "/software",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -1333,19 +1328,19 @@ public class BulkTagJobTest {
void twitterDatasetTest() throws Exception {
final String sourcePath = getClass()
.getResource(
"/eu/dnetlib/dhp/eosctag/twitter/dataset")
"/eu/dnetlib/dhp/eosctag/twitter/")
.getPath();
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -1373,19 +1368,19 @@ public class BulkTagJobTest {
void twitterOtherTest() throws Exception {
final String sourcePath = getClass()
.getResource(
"/eu/dnetlib/dhp/eosctag/twitter/otherresearchproduct")
"/eu/dnetlib/dhp/eosctag/twitter/")
.getPath();
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct",
"-outputPath", workingDir.toString() + "/otherresearchproduct",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -1418,19 +1413,19 @@ public class BulkTagJobTest {
void twitterSoftwareTest() throws Exception {
final String sourcePath = getClass()
.getResource(
"/eu/dnetlib/dhp/eosctag/twitter/software")
"/eu/dnetlib/dhp/eosctag/twitter/")
.getPath();
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Software",
"-outputPath", workingDir.toString() + "/software",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -1455,19 +1450,19 @@ public class BulkTagJobTest {
void EoscContextTagTest() throws Exception {
final String sourcePath = getClass()
.getResource(
"/eu/dnetlib/dhp/bulktag/eosc/dataset/dataset_10.json")
"/eu/dnetlib/dhp/bulktag/eosc/dataset/")
.getPath();
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -1533,16 +1528,16 @@ public class BulkTagJobTest {
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath",
getClass()
.getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_datasourcewithconstraints")
.getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_datasourcewithconstraints/")
.getPath(),
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -1568,4 +1563,41 @@ public class BulkTagJobTest {
}
@Test
void newConfTest() throws Exception {
final String pathMap = BulkTagJobTest.pathMap;
SparkBulkTagJob
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath",
getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/no_updates/").getPath(),
"-outputPath", workingDir.toString() + "/",
// "-baseURL", "https://services.openaire.eu/openaire/community/",
"-pathMap", pathMap,
"-taggingConf", taggingConf
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Dataset> tmp = sc
.textFile(workingDir.toString() + "/dataset")
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
Assertions.assertEquals(10, tmp.count());
org.apache.spark.sql.Dataset<Dataset> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
verificationDataset.createOrReplaceTempView("dataset");
String query = "select id, MyT.id community "
+ "from dataset "
+ "lateral view explode(context) c as MyT "
+ "lateral view explode(MyT.datainfo) d as MyD "
+ "where MyD.inferenceprovenance = 'bulktagging'";
Assertions.assertEquals(0, spark.sql(query).count());
}
}

View File

@ -47,7 +47,7 @@ class CommunityConfigurationFactoryTest {
sc.setVerb("not_contains");
sc.setField("contributor");
sc.setValue("DARIAH");
sc.setSelection(resolver.getSelectionCriteria(sc.getVerb(), sc.getValue()));
sc.setSelection(resolver);// .getSelectionCriteria(sc.getVerb(), sc.getValue()));
String metadata = "This work has been partially supported by DARIAH-EU infrastructure";
Assertions.assertFalse(sc.verifyCriteria(metadata));
}

View File

@ -72,15 +72,13 @@ public class ResultToCommunityJobTest {
SparkResultToCommunityFromOrganizationJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", getClass()
.getResource("/eu/dnetlib/dhp/resulttocommunityfromorganization/sample")
.getResource("/eu/dnetlib/dhp/resulttocommunityfromorganization/sample/")
.getPath(),
"-hive_metastore_uris", "",
"-saveGraph", "true",
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset",
"-outputPath", workingDir.toString() + "/",
"-preparedInfoPath", preparedInfoPath
});

View File

@ -0,0 +1,133 @@
package eu.dnetlib.dhp.resulttocommunityfromproject;
import static org.apache.spark.sql.functions.desc;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.orcidtoresultfromsemrel.OrcidPropagationJobTest;
import eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.Dataset;
public class ResultToCommunityJobTest {
private static final Logger log = LoggerFactory.getLogger(ResultToCommunityJobTest.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static Path workingDir;
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files.createTempDirectory(ResultToCommunityJobTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(ResultToCommunityJobTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(OrcidPropagationJobTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
void testSparkResultToCommunityFromProjectJob() throws Exception {
final String preparedInfoPath = getClass()
.getResource("/eu/dnetlib/dhp/resulttocommunityfromproject/preparedInfo")
.getPath();
SparkResultToCommunityFromProject
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", getClass()
.getResource("/eu/dnetlib/dhp/resulttocommunityfromproject/sample/")
.getPath(),
"-outputPath", workingDir.toString() + "/",
"-preparedInfoPath", preparedInfoPath
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Dataset> tmp = sc
.textFile(workingDir.toString() + "/dataset")
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
Assertions.assertEquals(10, tmp.count());
/**
* {"resultId":"50|57a035e5b1ae::d5be548ca7ae489d762f893be67af52f","communityList":["aurora"]}
* {"resultId":"50|57a035e5b1ae::a77232ffca9115fcad51c3503dbc7e3e","communityList":["aurora"]}
* {"resultId":"50|57a035e5b1ae::803aaad4decab7e27cd4b52a1931b3a1","communityList":["sdsn-gr"]}
* {"resultId":"50|57a035e5b1ae::a02e9e4087bca50687731ae5c765b5e1","communityList":["netherlands"]}
*/
List<Context> context = tmp
.filter(r -> r.getId().equals("50|57a035e5b1ae::d5be548ca7ae489d762f893be67af52f"))
.first()
.getContext();
Assertions.assertTrue(context.stream().anyMatch(c -> containsResultCommunityProject(c)));
context = tmp
.filter(r -> r.getId().equals("50|57a035e5b1ae::a77232ffca9115fcad51c3503dbc7e3e"))
.first()
.getContext();
Assertions.assertTrue(context.stream().anyMatch(c -> containsResultCommunityProject(c)));
Assertions
.assertEquals(
0, tmp.filter(r -> r.getId().equals("50|57a035e5b1ae::803aaad4decab7e27cd4b52a1931b3a1")).count());
Assertions
.assertEquals(
0, tmp.filter(r -> r.getId().equals("50|57a035e5b1ae::a02e9e4087bca50687731ae5c765b5e1")).count());
Assertions
.assertEquals(
2, tmp.filter(r -> r.getContext().stream().anyMatch(c -> c.getId().equals("aurora"))).count());
}
private static boolean containsResultCommunityProject(Context c) {
return c
.getDataInfo()
.stream()
.anyMatch(di -> di.getProvenanceaction().getClassid().equals("result:community:project"));
}
}

View File

@ -26,7 +26,7 @@
<subjects/>
<datasources>
<datasource>
<openaireId>re3data_____::a507cdacc5bbcc08761c92185dee5cab</openaireId>
<openaireId>10|re3data_____::a507cdacc5bbcc08761c92185dee5cab</openaireId>
<selcriteria/>
</datasource>
</datasources>
@ -140,39 +140,39 @@
</subjects>
<datasources>
<datasource>
<openaireId>re3data_____::9ebe127e5f3a0bf401875690f3bb6b81</openaireId>
<openaireId>10|re3data_____::9ebe127e5f3a0bf401875690f3bb6b81</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::c6cd4b532e12868c1d760a8d7cda6815</openaireId>
<openaireId>10|doajarticles::c6cd4b532e12868c1d760a8d7cda6815</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::a6de4499bb87bf3c01add0a9e2c9ed0b</openaireId>
<openaireId>10|doajarticles::a6de4499bb87bf3c01add0a9e2c9ed0b</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::6eb31d13b12bc06bbac06aef63cf33c9</openaireId>
<openaireId>10|doajarticles::6eb31d13b12bc06bbac06aef63cf33c9</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::0da84e9dfdc8419576169e027baa8028</openaireId>
<openaireId>10|doajarticles::0da84e9dfdc8419576169e027baa8028</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>re3data_____::84e123776089ce3c7a33db98d9cd15a8</openaireId>
<openaireId>10|re3data_____::84e123776089ce3c7a33db98d9cd15a8</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>openaire____::c5502a43e76feab55dd00cf50f519125</openaireId>
<openaireId>10|openaire____::c5502a43e76feab55dd00cf50f519125</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>re3data_____::a48f09c562b247a9919acfe195549b47</openaireId>
<openaireId>10|re3data_____::a48f09c562b247a9919acfe195549b47</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>opendoar____::97275a23ca44226c9964043c8462be96</openaireId>
<openaireId>10|opendoar____::97275a23ca44226c9964043c8462be96</openaireId>
<selcriteria/>
</datasource>
</datasources>
@ -287,55 +287,55 @@
</subjects>
<datasources>
<datasource>
<openaireId>doajarticles::8cec81178926caaca531afbd8eb5d64c</openaireId>
<openaireId>10|doajarticles::8cec81178926caaca531afbd8eb5d64c</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::0f7a7f30b5400615cae1829f3e743982</openaireId>
<openaireId>10|doajarticles::0f7a7f30b5400615cae1829f3e743982</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::9740f7f5af3e506d2ad2c215cdccd51a</openaireId>
<openaireId>10|doajarticles::9740f7f5af3e506d2ad2c215cdccd51a</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::9f3fbaae044fa33cb7069b72935a3254</openaireId>
<openaireId>10|doajarticles::9f3fbaae044fa33cb7069b72935a3254</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::cb67f33eb9819f5c624ce0313957f6b3</openaireId>
<openaireId>10|doajarticles::cb67f33eb9819f5c624ce0313957f6b3</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::e21c97cbb7a209afc75703681c462906</openaireId>
<openaireId>10|doajarticles::e21c97cbb7a209afc75703681c462906</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::554cde3be9e5c4588b4c4f9f503120cb</openaireId>
<openaireId>10|doajarticles::554cde3be9e5c4588b4c4f9f503120cb</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>tubitakulakb::11e22f49e65b9fd11d5b144b93861a1b</openaireId>
<openaireId>10|tubitakulakb::11e22f49e65b9fd11d5b144b93861a1b</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::57c5d3837da943e93b28ec4db82ec7a5</openaireId>
<openaireId>10|doajarticles::57c5d3837da943e93b28ec4db82ec7a5</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::a186f5ddb8e8c7ecc992ef51cf3315b1</openaireId>
<openaireId>10|doajarticles::a186f5ddb8e8c7ecc992ef51cf3315b1</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::e21c97cbb7a209afc75703681c462906</openaireId>
<openaireId>10|doajarticles::e21c97cbb7a209afc75703681c462906</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::dca64612dfe0963fffc119098a319957</openaireId>
<openaireId>10|doajarticles::dca64612dfe0963fffc119098a319957</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::dd70e44479f0ade25aa106aef3e87a0a</openaireId>
<openaireId>10|doajarticles::dd70e44479f0ade25aa106aef3e87a0a</openaireId>
<selcriteria/>
</datasource>
</datasources>
@ -406,27 +406,27 @@
</subjects>
<datasources>
<datasource>
<openaireId>re3data_____::5b9bf9171d92df854cf3c520692e9122</openaireId>
<openaireId>10|re3data_____::5b9bf9171d92df854cf3c520692e9122</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::c7d3de67dc77af72f6747157441252ec</openaireId>
<openaireId>10|doajarticles::c7d3de67dc77af72f6747157441252ec</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>re3data_____::8515794670370f49c1d176c399c714f5</openaireId>
<openaireId>10|re3data_____::8515794670370f49c1d176c399c714f5</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::d640648c84b10d425f96f11c3de468f3</openaireId>
<openaireId>10|doajarticles::d640648c84b10d425f96f11c3de468f3</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::0c0e74daa5d95504eade9c81ebbd5b8a</openaireId>
<openaireId>10|doajarticles::0c0e74daa5d95504eade9c81ebbd5b8a</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>rest________::fb1a3d4523c95e63496e3bc7ba36244b</openaireId>
<openaireId>10|rest________::fb1a3d4523c95e63496e3bc7ba36244b</openaireId>
<selcriteria/>
</datasource>
</datasources>
@ -743,27 +743,27 @@
</subjects>
<datasources>
<datasource>
<openaireId>opendoar____::1a551829d50f1400b0dab21fdd969c04</openaireId>
<openaireId>10|opendoar____::1a551829d50f1400b0dab21fdd969c04</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>opendoar____::49af6c4e558a7569d80eee2e035e2bd7</openaireId>
<openaireId>10|opendoar____::49af6c4e558a7569d80eee2e035e2bd7</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>opendoar____::0266e33d3f546cb5436a10798e657d97</openaireId>
<openaireId>10|opendoar____::0266e33d3f546cb5436a10798e657d97</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>opendoar____::fd4c2dc64ccb8496e6f1f94c85f30d06</openaireId>
<openaireId>10|opendoar____::fd4c2dc64ccb8496e6f1f94c85f30d06</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>opendoar____::41bfd20a38bb1b0bec75acf0845530a7</openaireId>
<openaireId>10|opendoar____::41bfd20a38bb1b0bec75acf0845530a7</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>opendoar____::87ae6fb631f7c8a627e8e28785d9992d</openaireId>
<openaireId>10|opendoar____::87ae6fb631f7c8a627e8e28785d9992d</openaireId>
<selcriteria/>
</datasource>
</datasources>
@ -983,11 +983,11 @@
<subjects/>
<datasources>
<datasource>
<openaireId>opendoar____::7e7757b1e12abcb736ab9a754ffb617a</openaireId>
<openaireId>10|opendoar____::7e7757b1e12abcb736ab9a754ffb617a</openaireId>
<selcriteria>{"criteria":[{"constraint":[{"verb":"contains","field":"contributor","value":"DARIAH"}]}]}</selcriteria>
</datasource>
<datasource>
<openaireId>opendoar____::96da2f590cd7246bbde0051047b0d6f7</openaireId>
<openaireId>10|opendoar____::96da2f590cd7246bbde0051047b0d6f7</openaireId>
<selcriteria>{"criteria":[{"constraint":[{"verb":"contains","field":"contributor","value":"DARIAH"}]}]}</selcriteria>
</datasource>
</datasources>
@ -1166,87 +1166,87 @@
</subjects>
<datasources>
<datasource>
<openaireId>doajarticles::1c5bdf8fca58937894ad1441cca99b76</openaireId>
<openaireId>10|doajarticles::1c5bdf8fca58937894ad1441cca99b76</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::b37a634324a45c821687e6e80e6f53b4</openaireId>
<openaireId>10|doajarticles::b37a634324a45c821687e6e80e6f53b4</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::4bf64f2a104040e4e055cd9594b2d77c</openaireId>
<openaireId>10|doajarticles::4bf64f2a104040e4e055cd9594b2d77c</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::479ca537c12755d1868bbf02938a900c</openaireId>
<openaireId>10|doajarticles::479ca537c12755d1868bbf02938a900c</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::55f31df96a60e2309f45b7c265fcf7a2</openaireId>
<openaireId>10|doajarticles::55f31df96a60e2309f45b7c265fcf7a2</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::c52a09891a5301f9986ebbfe3761810c</openaireId>
<openaireId>10|doajarticles::c52a09891a5301f9986ebbfe3761810c</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::379807bc7f6c71a227ef1651462c414c</openaireId>
<openaireId>10|doajarticles::379807bc7f6c71a227ef1651462c414c</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::36069db531a00b85a2e8fb301f4bdc19</openaireId>
<openaireId>10|doajarticles::36069db531a00b85a2e8fb301f4bdc19</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::b6a898da311ded96fabf49c520b80d5d</openaireId>
<openaireId>10|doajarticles::b6a898da311ded96fabf49c520b80d5d</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::d0753d9180b35a271d8b4a31f449749f</openaireId>
<openaireId>10|doajarticles::d0753d9180b35a271d8b4a31f449749f</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::172050a92511838393a3fe237ae47e31</openaireId>
<openaireId>10|doajarticles::172050a92511838393a3fe237ae47e31</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::301ed96c62abb160a3e29796efe5c95c</openaireId>
<openaireId>10|doajarticles::301ed96c62abb160a3e29796efe5c95c</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::0f4f805b3d842f2c7f1b077c3426fa59</openaireId>
<openaireId>10|doajarticles::0f4f805b3d842f2c7f1b077c3426fa59</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::ba73728b84437b8d48ae287b867c7215</openaireId>
<openaireId>10|doajarticles::ba73728b84437b8d48ae287b867c7215</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::86faef424d804309ccf45f692523aa48</openaireId>
<openaireId>10|doajarticles::86faef424d804309ccf45f692523aa48</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::73bd758fa41671de70964c3ecba013af</openaireId>
<openaireId>10|doajarticles::73bd758fa41671de70964c3ecba013af</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::e661fc0bdb24af42b740a08f0ddc6cf4</openaireId>
<openaireId>10|doajarticles::e661fc0bdb24af42b740a08f0ddc6cf4</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::a6d3052047d5dbfbd43d95b4afb0f3d7</openaireId>
<openaireId>10|doajarticles::a6d3052047d5dbfbd43d95b4afb0f3d7</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::ca61df07089acc53a1569bde6673d82a</openaireId>
<openaireId>10|doajarticles::ca61df07089acc53a1569bde6673d82a</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::237dd6f1606600459d0297abd8ed9976</openaireId>
<openaireId>10|doajarticles::237dd6f1606600459d0297abd8ed9976</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::fba6191177ede7c51ea1cdf58eae7f8b</openaireId>
<openaireId>10|doajarticles::fba6191177ede7c51ea1cdf58eae7f8b</openaireId>
<selcriteria/>
</datasource>
</datasources>
@ -1345,87 +1345,87 @@
</subjects>
<datasources>
<datasource>
<openaireId>doajarticles::c6f0ed5fa41e98863e7c73501fe4bd6d</openaireId>
<openaireId>10|doajarticles::c6f0ed5fa41e98863e7c73501fe4bd6d</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::ae4c7286c79590f19fdca670156ce816</openaireId>
<openaireId>10|doajarticles::ae4c7286c79590f19fdca670156ce816</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::0f664bce92ce953e0c7a92068c46bfb3</openaireId>
<openaireId>10|doajarticles::0f664bce92ce953e0c7a92068c46bfb3</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::00017183dc4c858fb77541985323a4ef</openaireId>
<openaireId>10|doajarticles::00017183dc4c858fb77541985323a4ef</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::93b306f458cce3d7aaaf58c0a725f4f9</openaireId>
<openaireId>10|doajarticles::93b306f458cce3d7aaaf58c0a725f4f9</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::9dbf8fbf3e9fe0fe1fc01e55fbd90bfc</openaireId>
<openaireId>10|doajarticles::9dbf8fbf3e9fe0fe1fc01e55fbd90bfc</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::a2bda8785c863279bba4b8f34827b4c9</openaireId>
<openaireId>10|doajarticles::a2bda8785c863279bba4b8f34827b4c9</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::019a1fcb42c3fea1c1b689df76330b58</openaireId>
<openaireId>10|doajarticles::019a1fcb42c3fea1c1b689df76330b58</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::0daa8281938831e9c82bfed8b55a2975</openaireId>
<openaireId>10|doajarticles::0daa8281938831e9c82bfed8b55a2975</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::f67ad6d268162079b3abd51a24468744</openaireId>
<openaireId>10|doajarticles::f67ad6d268162079b3abd51a24468744</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::c6f0ed5fa41e98863e7c73501fe4bd6d</openaireId>
<openaireId>10|doajarticles::c6f0ed5fa41e98863e7c73501fe4bd6d</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::ad114356e196a4a3d84dda59c720dacd</openaireId>
<openaireId>10|doajarticles::ad114356e196a4a3d84dda59c720dacd</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::01e8a54fdecaaf354c67a2dd74ae7d4f</openaireId>
<openaireId>10|doajarticles::01e8a54fdecaaf354c67a2dd74ae7d4f</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::449305f096b10a9464449ff2d0e10e06</openaireId>
<openaireId>10|doajarticles::449305f096b10a9464449ff2d0e10e06</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::982c0c0ac378256254cce2fa6572bb6c</openaireId>
<openaireId>10|doajarticles::982c0c0ac378256254cce2fa6572bb6c</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::49d6ed47138884566ce93cf0ccb12c02</openaireId>
<openaireId>10|doajarticles::49d6ed47138884566ce93cf0ccb12c02</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::a98e820dbc2e8ee0fc84ab66f263267c</openaireId>
<openaireId>10|doajarticles::a98e820dbc2e8ee0fc84ab66f263267c</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::50b1ce37427b36368f8f0f1317e47f83</openaireId>
<openaireId>10|doajarticles::50b1ce37427b36368f8f0f1317e47f83</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::f0ec29b7450b2ac5d0ad45327eeb531a</openaireId>
<openaireId>10|doajarticles::f0ec29b7450b2ac5d0ad45327eeb531a</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::d8d421d3b0349a7aaa93758b27a54e84</openaireId>
<openaireId>10|doajarticles::d8d421d3b0349a7aaa93758b27a54e84</openaireId>
<selcriteria/>
</datasource>
<datasource>
<openaireId>doajarticles::7ffc35ac5133da01d421ccf8af5b70bc</openaireId>
<openaireId>10|doajarticles::7ffc35ac5133da01d421ccf8af5b70bc</openaireId>
<selcriteria/>
</datasource>
</datasources>
@ -1454,81 +1454,81 @@
</subjects>
<datasources>
<datasource>
<openaireId>opendoar____::358aee4cc897452c00244351e4d91f69</openaireId>
<openaireId>10|opendoar____::358aee4cc897452c00244351e4d91f69</openaireId>
<selcriteria>{"criteria":[{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"COVID-19"}]},
{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"SARS-CoV-2"}]},
{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"2019-nCoV"}}]}
</selcriteria>
</datasource>
<datasource>
<openaireId>re3data_____::7b0ad08687b2c960d5aeef06f811d5e6</openaireId>
<openaireId>10|re3data_____::7b0ad08687b2c960d5aeef06f811d5e6</openaireId>
<selcriteria>{"criteria":[{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"COVID-19"}]},
{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"SARS-CoV-2"}]},
{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"2019-nCoV"}]}]}
</selcriteria>
</datasource>
<datasource>
<openaireId>driver______::bee53aa31dc2cbb538c10c2b65fa5824</openaireId>
<openaireId>10|driver______::bee53aa31dc2cbb538c10c2b65fa5824</openaireId>
<selcriteria>{"criteria":[{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"COVID-19"}]},
{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"SARS-CoV-2"}]},
{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"2019-nCoV"}]}]}
</selcriteria>
</datasource>
<datasource>
<openaireId>openaire____::437f4b072b1aa198adcbc35910ff3b98</openaireId>
<openaireId>10|openaire____::437f4b072b1aa198adcbc35910ff3b98</openaireId>
<selcriteria>{"criteria":[{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"COVID-19"}]},
{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"SARS-CoV-2"}]},
{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"2019-nCoV"}]}]}
</selcriteria>
</datasource>
<datasource>
<openaireId>openaire____::081b82f96300b6a6e3d282bad31cb6e2</openaireId>
<openaireId>10|openaire____::081b82f96300b6a6e3d282bad31cb6e2</openaireId>
<selcriteria>{"criteria":[{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"COVID-19"}]},
{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"SARS-CoV-2"}]},
{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"2019-nCoV"}]}]}
</selcriteria>
</datasource>
<datasource>
<openaireId>openaire____::9e3be59865b2c1c335d32dae2fe7b254</openaireId>
<openaireId>10|openaire____::9e3be59865b2c1c335d32dae2fe7b254</openaireId>
<selcriteria>{"criteria":[{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"COVID-19"}]},
{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"SARS-CoV-2"}]},
{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"2019-nCoV"}]}]}
</selcriteria>
</datasource>
<datasource>
<openaireId>opendoar____::8b6dd7db9af49e67306feb59a8bdc52c</openaireId>
<openaireId>10|opendoar____::8b6dd7db9af49e67306feb59a8bdc52c</openaireId>
<selcriteria>{"criteria":[{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"COVID-19"}]},
{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"SARS-CoV-2"}]},
{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"2019-nCoV"}]}]}
</selcriteria>
</datasource>
<datasource>
<openaireId>share_______::4719356ec8d7d55d3feb384ce879ad6c</openaireId>
<openaireId>10|share_______::4719356ec8d7d55d3feb384ce879ad6c</openaireId>
<selcriteria>{"criteria":[{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"COVID-19"}]},
{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"SARS-CoV-2"}]},
{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"2019-nCoV"}]}]}
</selcriteria>
</datasource>
<datasource>
<openaireId>share_______::bbd802baad85d1fd440f32a7a3a2c2b1</openaireId>
<openaireId>10|share_______::bbd802baad85d1fd440f32a7a3a2c2b1</openaireId>
<selcriteria>{"criteria":[{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"COVID-19"}]},
{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"SARS-CoV-2"}]},
{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"2019-nCoV"}]}]}
</selcriteria>
</datasource>
<datasource>
<openaireId>opendoar____::6f4922f45568161a8cdf4ad2299f6d23</openaireId>
<openaireId>10|opendoar____::6f4922f45568161a8cdf4ad2299f6d23</openaireId>
<selcriteria>{"criteria":[{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"COVID-19"}]},
{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"SARS-CoV-2"}]},
{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"2019-nCoV"}]}]}
</selcriteria>
</datasource>
<datasource>
<openaireId>re3data_____::7980778c78fb4cf0fab13ce2159030dc</openaireId>
<openaireId>10|re3data_____::7980778c78fb4cf0fab13ce2159030dc</openaireId>
<selcriteria>{"criteria":[{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"SARS-CoV-2"}]},{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"COVID-19"}]},{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"2019-nCov"}]}]}</selcriteria>
</datasource>
<datasource>
<openaireId>re3data_____::978378def740bbf2bfb420de868c460b</openaireId>
<openaireId>10|re3data_____::978378def740bbf2bfb420de868c460b</openaireId>
<selcriteria>{"criteria":[{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"SARS-CoV-2"}]},{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"COVID-19"}]},{"constraint":[{"verb":"contains_caseinsensitive","field":"title","value":"2019-nCov"}]}]}</selcriteria>
</datasource>
</datasources>

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

Some files were not shown because too many files have changed in this diff Show More