Compare commits

...

20 Commits

Author SHA1 Message Date
Miriam Baglioni 3c3e3537e0 Merge branch 'singleCommunityDump' into dump 2021-04-23 12:12:43 +02:00
Miriam Baglioni 1416a49b63 merge branch with master 2021-04-23 12:11:15 +02:00
Miriam Baglioni 72e5aa3b42 refactoring 2021-04-23 12:10:30 +02:00
Miriam Baglioni 4ae6fba01d refactoring 2021-04-23 12:09:19 +02:00
Miriam Baglioni 7d1b8b7f64 merge upstream 2021-04-23 11:55:49 +02:00
Miriam Baglioni 8981a82011 - 2021-04-23 11:55:20 +02:00
Miriam Baglioni eb0762622c added decision node to upload on zenodo or not 2021-04-23 11:54:54 +02:00
Miriam Baglioni a469d79b84 test for the creation of relationships between context and projects when the funding contains h2020 2021-04-23 11:52:27 +02:00
Miriam Baglioni 251178aca8 the new json schema for the result 2021-04-23 11:51:27 +02:00
Miriam Baglioni 7cf1f49d5e if the funding does not start with H2020 but contains it the nsp should be corda__h2020 2021-04-23 11:50:26 +02:00
Miriam Baglioni 7465fa3f20 dumping only the communities with status "all". We decided those with status manager wil be available on demand 2021-04-23 11:49:45 +02:00
Claudio Atzori 906d50563c Merge pull request 'properly invalidating impala metadata' (#105) from antonis.lempesis/dnet-hadoop:master into master
Reviewed-on: D-Net/dnet-hadoop#105
2021-04-15 15:06:22 +02:00
Antonis Lempesis 03d36fadea properly invalidating impala metadata 2021-04-15 13:34:22 +03:00
Miriam Baglioni bc501f41f6 added test class for community removal from the set to be dumped 2021-04-13 16:40:24 +02:00
Miriam Baglioni 80a7170794 - 2021-04-13 16:39:55 +02:00
Miriam Baglioni 08e731916b removed parameter communityMap when sending data to Zenodo 2021-04-13 16:38:59 +02:00
Miriam Baglioni 50d13a1d74 changed the workflow for the dump of a single community 2021-04-13 16:33:00 +02:00
Miriam Baglioni 8c4c74a640 changed logic to be able to create a dump for a single community at a time 2021-04-13 16:32:19 +02:00
Miriam Baglioni 6179deb836 removed the part after part-x- in the file name generated by spark. It was too long and created problems while creating the tar entries 2021-04-13 16:30:59 +02:00
miconis dcff9cecdf bug fix: ids in self mergerels are not marked deletedbyinference=true 2021-04-12 15:55:27 +02:00
20 changed files with 314 additions and 30 deletions

View File

@ -90,6 +90,13 @@ public class MakeTarArchive implements Serializable {
String p_string = p.toString();
if (!p_string.endsWith("_SUCCESS")) {
String name = p_string.substring(p_string.lastIndexOf("/") + 1);
if (name.startsWith("part-") & name.length() > 10) {
String tmp = name.substring(0, 10);
if (name.contains(".")) {
tmp += name.substring(name.indexOf("."));
}
name = tmp;
}
TarArchiveEntry entry = new TarArchiveEntry(dir_name + "/" + name);
entry.setSize(fileStatus.getLen());
current_size += fileStatus.getLen();

View File

@ -13,6 +13,7 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
@ -91,6 +92,7 @@ public class SparkUpdateEntity extends AbstractSparkAction {
final JavaPairRDD<String, String> mergedIds = rel
.where("relClass == 'merges'")
.where("source != target")
.select(rel.col("target"))
.distinct()
.toJavaRDD()

View File

@ -37,7 +37,8 @@ public class DumpProducts implements Serializable {
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
execDump(spark, inputPath, outputPath, communityMapPath, inputClazz, outputClazz, dumpType);
execDump(
spark, inputPath, outputPath, communityMapPath, inputClazz, outputClazz, dumpType);
});
}

View File

@ -20,7 +20,7 @@ public class QueryInformationSystem {
private static final String XQUERY = "for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType') "
+
" where $x//CONFIGURATION/context[./@type='community' or ./@type='ri'] " +
" and ($x//context/param[./@name = 'status']/text() = 'manager' or $x//context/param[./@name = 'status']/text() = 'all') "
" and ($x//context/param[./@name = 'status']/text() = 'all') "
+
" return " +
"<community> " +

View File

@ -1,6 +1,7 @@
package eu.dnetlib.dhp.oa.graph.dump;
import java.io.IOException;
import java.io.Serializable;
import java.util.Optional;
@ -9,6 +10,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.jetbrains.annotations.NotNull;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.api.MissingConceptDoiException;
@ -48,15 +50,12 @@ public class SendToZenodoHDFS implements Serializable {
.orElse(false);
final String depositionId = Optional.ofNullable(parser.get("depositionId")).orElse(null);
final String communityMapPath = parser.get("communityMapPath");
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNode);
FileSystem fileSystem = FileSystem.get(conf);
CommunityMap communityMap = Utils.readCommunityMap(fileSystem, communityMapPath);
RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem
.listFiles(
new Path(hdfsPath), true);
@ -87,11 +86,6 @@ public class SendToZenodoHDFS implements Serializable {
if (!p_string.endsWith("_SUCCESS")) {
// String tmp = p_string.substring(0, p_string.lastIndexOf("/"));
String name = p_string.substring(p_string.lastIndexOf("/") + 1);
log.info("Sending information for community: " + name);
if (communityMap.containsKey(name.substring(0, name.lastIndexOf(".")))) {
name = communityMap.get(name.substring(0, name.lastIndexOf("."))).replace(" ", "_") + ".tar";
}
FSDataInputStream inputStream = fileSystem.open(p);
zenodoApiClient.uploadIS(inputStream, name, fileStatus.getLen());

View File

@ -34,12 +34,14 @@ public class CommunitySplit implements Serializable {
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
execSplit(spark, inputPath, outputPath, Utils.getCommunityMap(spark, communityMapPath).keySet());
CommunityMap communityMap = Utils.getCommunityMap(spark, communityMapPath);
execSplit(spark, inputPath, outputPath, communityMap);
});
}
private static void execSplit(SparkSession spark, String inputPath, String outputPath,
Set<String> communities) {
CommunityMap communities) {
Dataset<CommunityResult> result = Utils
.readPath(spark, inputPath + "/publication", CommunityResult.class)
@ -48,8 +50,9 @@ public class CommunitySplit implements Serializable {
.union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class));
communities
.keySet()
.stream()
.forEach(c -> printResult(c, result, outputPath));
.forEach(c -> printResult(c, result, outputPath + "/" + communities.get(c).replace(" ", "_")));
}
@ -61,7 +64,7 @@ public class CommunitySplit implements Serializable {
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath + "/" + c);
.json(outputPath);
}

View File

@ -0,0 +1,85 @@
package eu.dnetlib.dhp.oa.graph.dump.community;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
public class RemoveCommunities implements Serializable {
private static final Logger log = LoggerFactory.getLogger(RemoveCommunities.class);
private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final Configuration conf;
private final BufferedWriter writer;
private final CommunityMap communityMap;
public RemoveCommunities(String path, String hdfsNameNode) throws IOException {
conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNode);
FileSystem fileSystem = FileSystem.get(conf);
Path hdfsPath = new Path(path);
// FSDataInputStream p = fileSystem.open(hdfsPath);
// ObjectMapper mapper = new ObjectMapper();
communityMap = OBJECT_MAPPER.readValue((InputStream) fileSystem.open(hdfsPath), CommunityMap.class);
FSDataOutputStream fsDataOutputStream = null;
if (fileSystem.exists(hdfsPath)) {
fileSystem.delete(hdfsPath);
}
fsDataOutputStream = fileSystem.create(hdfsPath);
writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8));
}
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
RemoveCommunities.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/input_rc_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
final String nameNode = parser.get("nameNode");
log.info("nameNode: {}", nameNode);
final String outputPath = parser.get("path");
log.info("outputPath: {}", outputPath);
final String communityId = parser.get("communityId");
final RemoveCommunities scm = new RemoveCommunities(outputPath, nameNode);
scm.removeCommunities(communityId);
}
private void removeCommunities(String communityId) throws IOException {
Set<String> toRemove = communityMap.keySet().stream().map(key -> {
if (key.equals(communityId))
return null;
return key;
}).filter(Objects::nonNull).collect(Collectors.toSet());
toRemove.forEach(key -> communityMap.remove(key));
writer.write(OBJECT_MAPPER.writeValueAsString(communityMap));
writer.close();
}
}

View File

@ -159,7 +159,7 @@ public class QueryInformationSystem {
if (funding == null) {
return null;
}
if (funding.toLowerCase().startsWith("h2020")) {
if (funding.toLowerCase().contains("h2020")) {
nsp = "corda__h2020::";
} else {
nsp = "corda_______::";

View File

@ -1,6 +1,18 @@
<workflow-app name="dump_community_products" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>singleDeposition</name>
<description>Indicates if each file in the directory should be uploaded in a own deposition</description>
</property>
<property>
<name>upload</name>
<description>true if the dump should be upload in zenodo</description>
</property>
<property>
<name>communityId</name>
<description>the id of the community to be dumped if a dump for a single community should be done</description>
</property>
<property>
<name>sourcePath</name>
<description>the source path</description>
@ -123,6 +135,24 @@
<arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
</java>
<ok to="single_deposition"/>
<error to="Kill"/>
</action>
<decision name="single_deposition">
<switch>
<case to="remove_communities">${wf:conf('singleDeposition') eq true}</case>
<default to="fork_dump"/>
</switch>
</decision>
<action name="remove_communities">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.community.RemoveCommunities</main-class>
<arg>--path</arg><arg>${workingDir}/communityMap</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--communityId</arg><arg>${communityId}</arg>
</java>
<ok to="fork_dump"/>
<error to="Kill"/>
</action>
@ -405,10 +435,16 @@
<arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--sourcePath</arg><arg>${workingDir}/split</arg>
</java>
<ok to="send_zenodo"/>
<ok to="should_upload"/>
<error to="Kill"/>
</action>
<decision name="should_upload">
<switch>
<case to="send_zenodo">${wf:conf('upload') eq true}</case>
<default to="End"/>
</switch>
</decision>
<action name="send_zenodo">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.SendToZenodoHDFS</main-class>
@ -417,7 +453,6 @@
<arg>--accessToken</arg><arg>${accessToken}</arg>
<arg>--connectionUrl</arg><arg>${connectionUrl}</arg>
<arg>--metadata</arg><arg>${metadata}</arg>
<arg>--communityMapPath</arg><arg>${workingDir}/communityMap</arg>
<arg>--conceptRecordId</arg><arg>${conceptRecordId}</arg>
<arg>--depositionId</arg><arg>${depositionId}</arg>
<arg>--depositionType</arg><arg>${depositionType}</arg>

View File

@ -596,7 +596,6 @@
<arg>--accessToken</arg><arg>${accessToken}</arg>
<arg>--connectionUrl</arg><arg>${connectionUrl}</arg>
<arg>--metadata</arg><arg>${metadata}</arg>
<arg>--communityMapPath</arg><arg>${workingDir}/communityMap</arg>
<arg>--conceptRecordId</arg><arg>${conceptRecordId}</arg>
<arg>--depositionType</arg><arg>${depositionType}</arg>
<arg>--depositionId</arg><arg>${depositionId}</arg>

View File

@ -1,6 +1,23 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"AccessRight":{
"type":"object",
"properties":{
"code": {
"type": "string",
"description": "COAR access mode code: http://vocabularies.coar-repositories.org/documentation/access_rights/"
},
"label": {
"type": "string",
"description": "Label for the access mode"
},
"scheme": {
"type": "string",
"description": "Scheme of reference for access right code. Always set to COAR access rights vocabulary: http://vocabularies.coar-repositories.org/documentation/access_rights/"
}
}
},
"ControlledField": {
"type": "object",
"properties": {
@ -266,6 +283,57 @@
]
}
},
"instance":{
"type":"array",
"items":{
"type":"object",
"properties":{
"accessright":{
"allOf":[
{
"$ref":"#/definitions/AccessRight"
},
{
"description":"The accessright of this materialization of the result"
}
]
},
"articleprocessingcharge":{
"type":"object",
"properties":{
"amount":{
"type":"string"
},
"currency":{
"type":"string"
}
}
},
"license":{
"type":"string"
},
"publicationdate":{
"type":"string"
},
"refereed":{
"type":"string"
},
"type":{
"type":"string",
"description":"The specific sub-type of this materialization of the result (see https://api.openaire.eu/vocabularies/dnet:result_typologies following the links)"
},
"url":{
"description":"Description of url",
"type":"array",
"items":{
"type":"string",
"description":"urls where it is possible to access the materialization of the result"
}
}
},
"description":"One of the materialization for this result"
}
},
"programmingLanguage": {
"type": "string",
"description": "Only for results with type 'software': the programming language"
@ -302,7 +370,7 @@
"subject": {
"allOf": [
{"$ref": "#/definitions/ControlledField"},
{"description": "OpenAIRE subject classification scheme (https://api.openaire.eu/vocabularies/dnet:subject_classification_typologies) and value. When the scheme is 'keyword', it means that the subject is free-text (i.e. not a term from a controlled vocabulary)."},
{"description": "OpenAIRE subject classification scheme (https://api.openaire.eu/vocabularies/dnet:subject_classification_typologies) and value. When the scheme is 'keyword', it means that the subject is free-text (i.e. not a term from a controlled vocabulary)."}
]
}
}

View File

@ -548,7 +548,6 @@
<arg>--accessToken</arg><arg>${accessToken}</arg>
<arg>--connectionUrl</arg><arg>${connectionUrl}</arg>
<arg>--metadata</arg><arg>${metadata}</arg>
<arg>--communityMapPath</arg><arg>${workingDir}/communityMap</arg>
<arg>--conceptRecordId</arg><arg>${conceptRecordId}</arg>
<arg>--depositionType</arg><arg>${depositionType}</arg>
<arg>--depositionId</arg><arg>${depositionId}</arg>

View File

@ -35,7 +35,12 @@
"paramLongName":"dumpType",
"paramDescription": "the type of the dump (complete for the whole graph, community for the products related to communities, funder for the results with at least a link to project",
"paramRequired": false
}
}, {
"paramName":"cid",
"paramLongName":"communityId",
"paramDescription": "the id of the community to be dumped",
"paramRequired": false
}
]

View File

@ -0,0 +1,25 @@
[
{
"paramName":"ci",
"paramLongName":"communityId",
"paramDescription": "URL of the isLookUp Service",
"paramRequired": true
},
{
"paramName":"nn",
"paramLongName":"nameNode",
"paramDescription": "the name node",
"paramRequired": true
},
{
"paramName": "p",
"paramLongName": "path",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
}
]

View File

@ -25,6 +25,12 @@
"paramLongName": "isSparkSessionManaged",
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
},
{
"paramName": "cid",
"paramLongName": "communityId",
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
}
]

View File

@ -12,12 +12,6 @@
"paramDescription": "The id of the concept record for a new version",
"paramRequired": false
},
{
"paramName":"cmp",
"paramLongName":"communityMapPath",
"paramDescription": "the path to the serialization of the community map",
"paramRequired": false
},
{
"paramName":"di",
"paramLongName":"depositionId",

View File

@ -9,7 +9,7 @@ import com.github.victools.jsonschema.generator.*;
import eu.dnetlib.dhp.schema.dump.oaf.graph.*;
@Disabled
//@Disabled
public class GenerateJsonSchema {
@Test
@ -21,7 +21,7 @@ public class GenerateJsonSchema {
configBuilder.forFields().withDescriptionResolver(field -> "Description of " + field.getDeclaredName());
SchemaGeneratorConfig config = configBuilder.build();
SchemaGenerator generator = new SchemaGenerator(config);
JsonNode jsonSchema = generator.generateSchema(Relation.class);
JsonNode jsonSchema = generator.generateSchema(GraphResult.class);
System.out.println(jsonSchema.toString());
}

View File

@ -0,0 +1,60 @@
package eu.dnetlib.dhp.oa.graph.dump.community;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.dump.MakeTar;
import eu.dnetlib.dhp.oa.graph.dump.MakeTarTest;
public class RemoveCommunityTest {
private static String workingDir;
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files
.createTempDirectory(MakeTarTest.class.getSimpleName())
.toString();
}
@Test
public void testRemove() throws Exception {
LocalFileSystem fs = FileSystem.getLocal(new Configuration());
fs
.copyFromLocalFile(
false, new Path(getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymap.json")
.getPath()),
new Path(workingDir + "/communityMap"));
String path = workingDir + "/communityMap";
RemoveCommunities.main(new String[] {
"-nameNode", "local",
"-path", path,
"-communityId", "beopen"
}
);
CommunityMap cm = new ObjectMapper()
.readValue(new FileInputStream(workingDir + "/communityMap"), CommunityMap.class);
Assertions.assertEquals(1, cm.size());
Assertions.assertTrue(cm.containsKey("beopen"));
}
}

View File

@ -380,7 +380,7 @@ public class CreateRelationTest {
" <param name=\"rule\"/>\n" +
" <param name=\"CD_PROJECT_NUMBER\">675858</param>\n" +
" <param name=\"url\"/>\n" +
" <param name=\"funding\">H2020-EINFRA-2015-1</param>\n" +
" <param name=\"funding\">EC | H2020 | RIA</param>\n" +
" <param name=\"funder\">EC</param>\n" +
" <param name=\"acronym\">West-Life</param>\n" +
" </concept>\n" +

View File

@ -10,6 +10,7 @@ export SOURCE=$1
export SHADOW=$2
echo "Updating shadow database"
impala-shell -q "invalidate metadata"
impala-shell -d ${SOURCE} -q "invalidate metadata"
impala-shell -d ${SOURCE} -q "show tables" --delimited | sed "s/^\(.*\)/compute stats ${SOURCE}.\1;/" | impala-shell -c -f -
impala-shell -q "create database if not exists ${SHADOW}"