This commit is contained in:
Miriam Baglioni 2024-01-18 11:18:53 +01:00
parent 5dcb7019a5
commit 3ad0d6edfc
15 changed files with 1174 additions and 1018 deletions

1
.gitignore vendored
View File

@ -30,3 +30,4 @@ spark-warehouse
/job.properties
/*/job.properties
/*/*/job.properties
/*/*/*/job.properties

File diff suppressed because it is too large Load Diff

View File

@ -1,5 +1,6 @@
import java.io.IOException;
import eu.dnetlib.dhp.oa.model.Result;
import org.junit.jupiter.api.Test;
import com.fasterxml.jackson.core.JsonProcessingException;
@ -24,7 +25,7 @@ class GenerateJsonSchema {
configBuilder.forFields().withDescriptionResolver(field -> "Description of " + field.getDeclaredName());
SchemaGeneratorConfig config = configBuilder.build();
SchemaGenerator generator = new SchemaGenerator(config);
JsonNode jsonSchema = generator.generateSchema(GraphResult.class);
JsonNode jsonSchema = generator.generateSchema(CommunityResult.class);
System.out.println(jsonSchema.toString());
}
@ -41,7 +42,7 @@ class GenerateJsonSchema {
.without(Option.NONPUBLIC_NONSTATIC_FIELDS_WITHOUT_GETTERS);
SchemaGeneratorConfig config = configBuilder.build();
SchemaGenerator generator = new SchemaGenerator(config);
JsonNode jsonSchema = generator.generateSchema(Project.class);
JsonNode jsonSchema = generator.generateSchema(Result.class);
System.out.println(jsonSchema.toString());
}

View File

@ -2,6 +2,8 @@
package eu.dnetlib.dhp.oa.graph.dump;
import static eu.dnetlib.dhp.oa.graph.dump.Constants.*;
import static eu.dnetlib.dhp.oa.graph.dump.Utils.ENTITY_ID_SEPARATOR;
import static eu.dnetlib.dhp.oa.graph.dump.Utils.getEntityId;
import java.io.Serializable;
import java.util.*;
@ -29,6 +31,7 @@ import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
public class ResultMapper implements Serializable {
private static final String NULL = "null";
public static <E extends eu.dnetlib.dhp.schema.oaf.OafEntity> Result map(
E in, Map<String, String> communityMap, String dumpType)
@ -60,7 +63,7 @@ public class ResultMapper implements Serializable {
mapDescription(out, input);
mapEmbargo(out, input);
mapFormat(out, input);
out.setId(input.getId().substring(3));
out.setId(getEntityId(input.getId(), ENTITY_ID_SEPARATOR));
mapOriginalId(out, input);
mapInstance(dumpType, out, input);
mapLanguage(out, input);
@ -100,7 +103,7 @@ public class ResultMapper implements Serializable {
break;
}
}
private static void mapContext(Map<String, String> communityMap, CommunityResult out,
@ -175,7 +178,7 @@ public class ResultMapper implements Serializable {
input
.getCollectedfrom()
.stream()
.map(cf -> CfHbKeyValue.newInstance(cf.getKey().substring(3), cf.getValue()))
.map(cf -> CfHbKeyValue.newInstance(getEntityId(cf.getKey(), ENTITY_ID_SEPARATOR), cf.getValue()))
.collect(Collectors.toList()));
}
@ -207,6 +210,7 @@ public class ResultMapper implements Serializable {
// .getProvenanceaction()
// .getClassid()
// .equalsIgnoreCase("subject:sdg"))))
.filter(s -> !s.getValue().equalsIgnoreCase(NULL))
.forEach(s -> subjectList.add(getSubject(s))));
out.setSubjects(subjectList);
@ -541,14 +545,18 @@ public class ResultMapper implements Serializable {
instance
.setCollectedfrom(
CfHbKeyValue
.newInstance(i.getCollectedfrom().getKey().substring(3), i.getCollectedfrom().getValue()));
.newInstance(
getEntityId(i.getCollectedfrom().getKey(), ENTITY_ID_SEPARATOR),
i.getCollectedfrom().getValue()));
if (Optional.ofNullable(i.getHostedby()).isPresent() &&
Optional.ofNullable(i.getHostedby().getKey()).isPresent() &&
StringUtils.isNotBlank(i.getHostedby().getKey()))
instance
.setHostedby(
CfHbKeyValue.newInstance(i.getHostedby().getKey().substring(3), i.getHostedby().getValue()));
CfHbKeyValue
.newInstance(
getEntityId(i.getHostedby().getKey(), ENTITY_ID_SEPARATOR), i.getHostedby().getValue()));
return instance;

View File

@ -37,6 +37,7 @@ import scala.Tuple2;
public class Utils {
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static final String ENTITY_ID_SEPARATOR = "|";
private Utils() {
}
@ -83,6 +84,10 @@ public class Utils {
return new Gson().fromJson(sb.toString(), CommunityMap.class);
}
public static String getEntityId(String id, String separator) {
return id.substring(id.indexOf(separator) + 1);
}
public static Dataset<String> getEntitiesId(SparkSession spark, String inputPath) {
Dataset<String> dumpedIds = Utils
.readPath(spark, inputPath + "/publication", GraphResult.class)

View File

@ -67,7 +67,7 @@ public class CommunitySplit implements Serializable {
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.text(outputPath + "/" + communities.get(c).replace(" ", "_"));
.text(outputPath + "/" + c.replace(" ", "_"));
});
}

View File

@ -2,6 +2,8 @@
package eu.dnetlib.dhp.oa.graph.dump.community;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.oa.graph.dump.Utils.ENTITY_ID_SEPARATOR;
import static eu.dnetlib.dhp.oa.graph.dump.Utils.getEntityId;
import java.io.Serializable;
import java.io.StringReader;
@ -110,7 +112,7 @@ public class SparkPrepareResultProject implements Serializable {
Tuple2<eu.dnetlib.dhp.schema.oaf.Project, Relation> first = it.next();
ResultProject rp = new ResultProject();
if (substring)
rp.setResultId(s.substring(3));
rp.setResultId(getEntityId(s, ENTITY_ID_SEPARATOR));
else
rp.setResultId(s);
eu.dnetlib.dhp.schema.oaf.Project p = first._1();
@ -142,7 +144,7 @@ public class SparkPrepareResultProject implements Serializable {
private static Project getProject(eu.dnetlib.dhp.schema.oaf.Project op, Relation relation) {
Project p = Project
.newInstance(
op.getId().substring(3),
getEntityId(op.getId(), ENTITY_ID_SEPARATOR),
op.getCode().getValue(),
Optional
.ofNullable(op.getAcronym())

View File

@ -2,6 +2,8 @@
package eu.dnetlib.dhp.oa.graph.dump.complete;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.oa.graph.dump.Utils.ENTITY_ID_SEPARATOR;
import static eu.dnetlib.dhp.oa.graph.dump.Utils.getEntityId;
import java.io.Serializable;
import java.util.*;
@ -84,7 +86,7 @@ public class Extractor implements Serializable {
.orElse(null))
.orElse(null);
Relation r = getRelation(
value.getId().substring(3), contextId,
getEntityId(value.getId(), ENTITY_ID_SEPARATOR), contextId,
Constants.RESULT_ENTITY,
Constants.CONTEXT_ENTITY,
ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP, provenance);
@ -94,7 +96,7 @@ public class Extractor implements Serializable {
hashCodes.add(r.hashCode());
}
r = getRelation(
contextId, value.getId().substring(3),
contextId, getEntityId(value.getId(), ENTITY_ID_SEPARATOR),
Constants.CONTEXT_ENTITY,
Constants.RESULT_ENTITY,
ModelConstants.IS_RELATED_TO,
@ -163,8 +165,8 @@ public class Extractor implements Serializable {
eu.dnetlib.dhp.oa.graph.dump.Constants.HARVESTED,
eu.dnetlib.dhp.oa.graph.dump.Constants.DEFAULT_TRUST));
Relation r = getRelation(
value.getId().substring(3),
cf.getKey().substring(3), Constants.RESULT_ENTITY, Constants.DATASOURCE_ENTITY,
getEntityId(value.getId(), ENTITY_ID_SEPARATOR),
getEntityId(cf.getKey(), ENTITY_ID_SEPARATOR), Constants.RESULT_ENTITY, Constants.DATASOURCE_ENTITY,
resultDatasource, ModelConstants.PROVISION,
provenance);
if (!hashCodes.contains(r.hashCode())) {
@ -174,7 +176,7 @@ public class Extractor implements Serializable {
}
r = getRelation(
cf.getKey().substring(3), value.getId().substring(3),
getEntityId(cf.getKey(), ENTITY_ID_SEPARATOR), getEntityId(value.getId(), ENTITY_ID_SEPARATOR),
Constants.DATASOURCE_ENTITY, Constants.RESULT_ENTITY,
datasourceResult, ModelConstants.PROVISION,
provenance);

View File

@ -2,6 +2,8 @@
package eu.dnetlib.dhp.oa.graph.dump.complete;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.oa.graph.dump.Utils.ENTITY_ID_SEPARATOR;
import static eu.dnetlib.dhp.oa.graph.dump.Utils.getEntityId;
import java.io.Serializable;
import java.io.StringReader;
@ -216,7 +218,7 @@ public class SparkDumpEntitiesJob implements Serializable {
return null;
Datasource datasource = new Datasource();
datasource.setId(d.getId().substring(3));
datasource.setId(getEntityId(d.getId(), ENTITY_ID_SEPARATOR));
Optional
.ofNullable(d.getOriginalId())
@ -406,7 +408,7 @@ public class SparkDumpEntitiesJob implements Serializable {
Optional
.ofNullable(p.getId())
.ifPresent(id -> project.setId(id.substring(3)));
.ifPresent(id -> project.setId(getEntityId(id, ENTITY_ID_SEPARATOR)));
Optional
.ofNullable(p.getWebsiteurl())
@ -619,7 +621,7 @@ public class SparkDumpEntitiesJob implements Serializable {
Optional
.ofNullable(org.getId())
.ifPresent(value -> organization.setId(value.substring(3)));
.ifPresent(value -> organization.setId(getEntityId(value, ENTITY_ID_SEPARATOR)));
Optional
.ofNullable(org.getPid())

View File

@ -2,6 +2,8 @@
package eu.dnetlib.dhp.oa.graph.dump.complete;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.oa.graph.dump.Utils.ENTITY_ID_SEPARATOR;
import static eu.dnetlib.dhp.oa.graph.dump.Utils.getEntityId;
import java.io.Serializable;
import java.util.Collections;
@ -85,11 +87,11 @@ public class SparkDumpRelationJob implements Serializable {
.map((MapFunction<Relation, eu.dnetlib.dhp.oa.model.graph.Relation>) relation -> {
eu.dnetlib.dhp.oa.model.graph.Relation relNew = new eu.dnetlib.dhp.oa.model.graph.Relation();
relNew
.setSource(relation.getSource().substring(3));
.setSource(getEntityId(relation.getSource(), ENTITY_ID_SEPARATOR));
relNew.setSourceType(ModelSupport.idPrefixEntity.get(relation.getSource().substring(0, 2)));
relNew
.setTarget(relation.getTarget().substring(3));
.setTarget(getEntityId(relation.getTarget(), ENTITY_ID_SEPARATOR));
relNew.setTargetType(ModelSupport.idPrefixEntity.get(relation.getTarget().substring(0, 2)));
relNew

View File

@ -2,6 +2,9 @@
package eu.dnetlib.dhp.oa.graph.dump.complete;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.oa.graph.dump.Utils.ENTITY_ID_SEPARATOR;
import static eu.dnetlib.dhp.oa.graph.dump.Utils.getEntityId;
import static eu.dnetlib.dhp.schema.common.ModelSupport.idPrefixMap;
import java.io.Serializable;
import java.util.ArrayList;
@ -27,8 +30,10 @@ import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap;
import eu.dnetlib.dhp.oa.model.Provenance;
import eu.dnetlib.dhp.oa.model.graph.RelType;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Relation;
/**
@ -103,7 +108,7 @@ public class SparkOrganizationRelation implements Serializable {
.as(Encoders.bean(MergedRels.class));
mergedRelsDataset.map((MapFunction<MergedRels, MergedRels>) mergedRels -> {
if (organizationMap.containsKey(mergedRels.getOrganizationId())) {
if (organizationMap.containsKey(getEntityId(mergedRels.getOrganizationId(), ENTITY_ID_SEPARATOR))) {
return mergedRels;
}
return null;
@ -135,12 +140,13 @@ public class SparkOrganizationRelation implements Serializable {
private static Consumer<MergedRels> getMergedRelsConsumer(CommunityEntityMap organizationMap,
List<eu.dnetlib.dhp.oa.model.graph.Relation> relList, CommunityMap communityMap) {
return mergedRels -> {
String oId = mergedRels.getOrganizationId();
String oId = getEntityId(mergedRels.getOrganizationId(), ENTITY_ID_SEPARATOR);
organizationMap
.get(oId)
.forEach(community -> {
if (communityMap.containsKey(community)) {
addRelations(relList, community, mergedRels.getRepresentativeId());
addRelations(
relList, community, getEntityId(mergedRels.getRepresentativeId(), ENTITY_ID_SEPARATOR));
}
});
@ -158,8 +164,8 @@ public class SparkOrganizationRelation implements Serializable {
eu.dnetlib.dhp.oa.model.graph.Relation
.newInstance(
id, Constants.CONTEXT_ENTITY,
organization.substring(3),
ModelSupport.idPrefixEntity.get(organization.substring(0, 2)),
organization,
ModelSupport.idPrefixEntity.get(idPrefixMap.get(Organization.class)),
RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP),
Provenance
.newInstance(
@ -170,7 +176,7 @@ public class SparkOrganizationRelation implements Serializable {
.add(
eu.dnetlib.dhp.oa.model.graph.Relation
.newInstance(
organization.substring(3), ModelSupport.idPrefixEntity.get(organization.substring(0, 2)),
organization, ModelSupport.idPrefixEntity.get(idPrefixMap.get(Organization.class)),
id, Constants.CONTEXT_ENTITY,
RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP),
Provenance

View File

@ -204,11 +204,12 @@ public class ZenodoAPIClient implements Serializable {
.build();
log.info("URL: " + request.url().toString());
log.info("Headers: " + request.headers().toString());
// log.info("Headers: " + request.headers().toString());
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful())
System.out.println("Unexpected code " + response + response.body().string());
log.info("Unexpected code " + response + response.body().string());
System.out.println("Unexpected code " + response + response.body().string());
return response.code();
}
}
@ -354,13 +355,14 @@ public class ZenodoAPIClient implements Serializable {
.build();
log.info("URL: " + request.url().toString());
log.info("Headers: " + request.headers().toString());
// log.info("Headers: " + request.headers().toString());
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful())
if (!response.isSuccessful()) {
log.info("Unexpected code " + response + response.body().string());
throw new IOException("Unexpected code " + response + response.body().string());
}
ZenodoModel zenodoModel = new Gson()
.fromJson(response.body().string(), ZenodoModel.class);
bucket = zenodoModel.getLinks().getBucket();

View File

@ -1,4 +1,4 @@
<workflow-app name="sub-dump_subset" xmlns="uri:oozie:workflow:0.5">
<workflow-app name="dump_country" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>

View File

@ -5,6 +5,8 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;