This commit is contained in:
Miriam Baglioni 2020-12-15 18:59:23 +01:00
parent 24bbd30e80
commit 5df74ebe9c
7 changed files with 144 additions and 71 deletions

View File

@ -7,10 +7,10 @@ import com.google.common.collect.Maps;
public class Constants {
public static String PUBLICATION_URL = "https://beta.risis.openaire.eu/search/publication?articleId=";
public static String DATASET_URL = "https://beta.risis.openaire.eu/search/dataset?datasetId=";
public static String SOFTWARE_URL = "https://beta.risis.openaire.eu/search/software?softwareId=";
public static String ORP_URL = "https://beta.risis.openaire.eu/search/other?orpId=";
public static String PUBLICATION_URL = "https://science-innovation-policy.openaire.eu/search/publication?articleId=";
public static String DATASET_URL = "https://science-innovation-policy.openaire.eu/search/dataset?datasetId=";
public static String SOFTWARE_URL = "https://science-innovation-policy.openaire.eu/search/software?softwareId=";
public static String ORP_URL = "https://science-innovation-policy.openaire.eu/search/other?orpId=";
public static String DEFAULT_LICENCE_ID = "notspecified";
public static final Map<String, String> accessRightsCoarMap = Maps.newHashMap();
public static final Map<String, String> coarCodeLabelMap = Maps.newHashMap();
@ -32,6 +32,8 @@ public class Constants {
public static String ORCID = "orcid";
public static String UNKNOWN = "unknown";
static {
accessRightsCoarMap.put("OPEN", "c_abf2");
accessRightsCoarMap.put("RESTRICTED", "c_16ec");
@ -47,7 +49,6 @@ public class Constants {
coarCodeLabelMap.put("c_f1cf", "EMBARGO");
}
static {
gcatCatalogue.put("OPEN", "OPEN");
gcatCatalogue.put("RESTRICTED", "RESTRICTED");

View File

@ -10,6 +10,7 @@ import java.util.stream.Stream;
import org.apache.avro.generic.GenericData;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.util.LongAccumulator;
import eu.dnetlib.dhp.oa.graph.dump.Constants;
import eu.dnetlib.dhp.schema.dump.gcat.CatalogueEntry;
@ -20,6 +21,7 @@ import eu.dnetlib.dhp.schema.dump.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import org.omg.CORBA.UNKNOWN;
public class Mapper implements Serializable {
@ -27,7 +29,8 @@ public class Mapper implements Serializable {
.asList("zenodo", "hal", "figshare", "digital-csic", "dans", "datacite");
private static final List<String> access = Arrays.asList("open", "closed", "embargoed", "restricted");
public static <I extends eu.dnetlib.dhp.schema.oaf.Result> CatalogueEntry map(I input) {
public static <I extends eu.dnetlib.dhp.schema.oaf.Result> CatalogueEntry map(I input,
Map<String, LongAccumulator> map) {
final CatalogueEntry out = new CatalogueEntry();
Optional<Qualifier> ort = Optional.ofNullable(input.getResulttype());
@ -204,7 +207,13 @@ public class Mapper implements Serializable {
.newInstance(
"Language", Optional
.ofNullable(input.getLanguage())
.map(value -> value.getClassname())
.map(value -> {
String lang = value.getClassname();
if(lang.toLowerCase().equals(Constants.UNKNOWN)){
return "";
}
return lang;
})
.orElse("")));
List<StructuredProperty> iTitle = Optional
@ -217,7 +226,7 @@ public class Mapper implements Serializable {
.orElse(new ArrayList<>());
if (iTitle.size() > 0) {
out.setTitle(iTitle.get(0).getValue());
out.setTitle(textReplacement(iTitle.get(0).getValue()));
} else {
out.setTitle("");
}
@ -277,7 +286,7 @@ public class Mapper implements Serializable {
s -> {
String classId = s.getQualifier().getClassid();
String prefix = "";
if (!classId.equals("keyword") &&
if (!(classId.equals("keyword") || classId.toLowerCase().equals(Constants.UNKNOWN)) &&
StringUtils.isNotEmpty(classId)) {
prefix = classId + ".";
}
@ -326,7 +335,10 @@ public class Mapper implements Serializable {
out.setExtras(externals);
}
if (out == null)
map.get("void_records").add(1);
map.get("dumped_records").add(1);
return out;
}
@ -344,6 +356,12 @@ public class Mapper implements Serializable {
}
public static String textReplacement(String text){
return text
.replace("", "\"").replace("", "\"")
.replace("", "\"").replace("", "\"");
}
private static String getAuthor(Author v) {
String author = v.getFullname();
Optional<List<StructuredProperty>> oPid = Optional.ofNullable(v.getPid());
@ -375,7 +393,7 @@ public class Mapper implements Serializable {
private static void getDescription(CatalogueEntry out, List<KeyValue> externals, List<Field<String>> value) {
Iterator<Field<String>> it = value.iterator();
if (it.hasNext()) {
out.setNotes(it.next().getValue());
out.setNotes(textReplacement(it.next().getValue()));
} else {
out.setNotes("");
}

View File

@ -58,27 +58,34 @@ public class SendToCatalogue implements Serializable {
// log.info("Copying information for : " + name);
// fileSystem.copyToLocalFile(p, new Path("/tmp/" + name));
try {
// InputStream in = new GZIPInputStream(new FileInputStream("/tmp/" + name));
// try {
// InputStream in = new GZIPInputStream(new FileInputStream("/tmp/" + name));
// BufferedReader reader = new BufferedReader(
// new InputStreamReader(in));
BufferedReader reader = new BufferedReader(new InputStreamReader(fileSystem.open(p)));
String line;
while ((line = reader.readLine()) != null) {
if (HttpStatus.SC_CREATED != gCatAPIClient.publish(line)) {
log.error("entry not created for item " + line);
}
}
reader.close();
// in.close();
FSDataInputStream in = fileSystem.open(p);
} finally {
GZIPInputStream gis = new GZIPInputStream(in);
BufferedReader reader = new BufferedReader(new InputStreamReader(gis));
String line;
while ((line = reader.readLine()) != null) {
try {
gCatAPIClient.publish(line);
} catch (Exception e) {
log.error("ERROR_FOR " + line);
}
}
reader.close();
// in.close();
// } finally {
// log.info("deleting information for: " + name);
// File f = new File("/tmp/" + name);
// if (f.exists()) {
// f.delete();
}
// }
}
}

View File

@ -4,10 +4,7 @@ package eu.dnetlib.dhp.oa.graph.dump.gcat;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
@ -17,6 +14,7 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -66,8 +64,17 @@ public class SparkDumpRISISCatalogue implements Serializable {
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
execDump(spark, inputPath, outputPath, inputClazz, communityName);// , dumpClazz);
Map<String, LongAccumulator> map = new HashMap<>();
map.put("dumped_records", spark.sparkContext().longAccumulator("dumped_records"));
map.put("send_to_dump_records", spark.sparkContext().longAccumulator("send_to_dump_records"));
map.put("skipped_records", spark.sparkContext().longAccumulator("skipped_records"));
map.put("void_records", spark.sparkContext().longAccumulator("void_records"));
execDump(
spark, inputPath, outputPath, inputClazz, communityName, map);// ,
// dumpClazz);
log.info("records send to dump: {}", map.get("send_to_dump_records").value());
log.info("skipped records : {}", map.get("skipped_records").value());
log.info("dumped_records : {}", map.get("dumped_records").value());
});
}
@ -76,17 +83,18 @@ public class SparkDumpRISISCatalogue implements Serializable {
String inputPath,
String outputPath,
Class<I> inputClazz,
String communityName) {// Class<O> dumpClazz) {
String communityName,
Map<String, LongAccumulator> map) {// Class<O> dumpClazz) {
// Set<String> communities = communityMap.keySet();
Dataset<I> tmp = Utils.readPath(spark, inputPath, inputClazz);
tmp
Utils
.readPath(spark, inputPath, inputClazz)
.map(
(MapFunction<I, CatalogueEntry>) value -> execMap(value, communityName),
Encoders.bean(eu.dnetlib.dhp.schema.dump.gcat.CatalogueEntry.class))
(MapFunction<I, CatalogueEntry>) value -> execMap(
value, communityName, map),
Encoders.bean(CatalogueEntry.class))
.filter(Objects::nonNull)
.repartition(1)
.coalesce(1)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
@ -94,28 +102,47 @@ public class SparkDumpRISISCatalogue implements Serializable {
}
private static <I extends Result> CatalogueEntry execMap(I value, String community) {
{
private static <I extends Result> CatalogueEntry execMap(I value, String community,
Map<String, LongAccumulator> map) {
Optional<List<Context>> inputContext = Optional.ofNullable(value.getContext());
if (!inputContext.isPresent()) {
return null;
}
List<String> toDumpFor = inputContext.get().stream().map(c -> {
String id = c.getId();
if (id.contains("::")) {
id = id.substring(0, id.indexOf("::"));
}
if (community.equals(id)) {
return id;
}
return null;
}).filter(Objects::nonNull).collect(Collectors.toList());
if (toDumpFor.size() == 0) {
return null;
}
return Mapper.map(value);
if (value.getDataInfo().getDeletedbyinference() || value.getDataInfo().getInvisible()) {
// map.get("skipped_records").add(1);
return null;
}
Optional<List<Context>> inputContext = Optional.ofNullable(value.getContext());
if (!inputContext.isPresent()) {
map.get("skipped_records").add(1);
return null;
}
if (inputContext.get().stream().map(c -> {
String id = c.getId();
if (id.contains("::")) {
return id.substring(0, id.indexOf("::"));
}
return id;
}).collect(Collectors.toList()).contains(community)) {
map.get("send_to_dump_records").add(1);
return Mapper.map(value, map);
}
map.get("skipped_records").add(1);
return null;
// List<String> toDumpFor = inputContext.get().stream().map(c -> {
// String id = c.getId();
// if (id.contains("::")) {
// id = id.substring(0, id.indexOf("::"));
// }
// if (community.equals(id)) {
// dumpedRecords.add(1);
// return id;
// }
// return null;
// }).filter(Objects::nonNull).collect(Collectors.toList());
// if (toDumpFor.size() == 0) {
// skippedRecords.add(1);
// return null;
// }
// return Mapper.map(value);
}
}

View File

@ -98,25 +98,25 @@ public class DumpJobTest {
public void testDataset() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/gcat/dataset_for_dump.json")
.getPath();
.getResource("/eu/dnetlib/dhp/oa/graph/dump/gcat/dataset_for_dump.json")
.getPath();
SparkDumpRISISCatalogue.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/result",
"-sourcePath", sourcePath,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-communityName", "science-innovation-policy"
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/result",
"-sourcePath", sourcePath,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-communityName", "science-innovation-policy"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<eu.dnetlib.dhp.schema.dump.gcat.CatalogueEntry> tmp = sc
.textFile(workingDir.toString() + "/result")
.map(item -> OBJECT_MAPPER.readValue(item, eu.dnetlib.dhp.schema.dump.gcat.CatalogueEntry.class));
.textFile(workingDir.toString() + "/result")
.map(item -> OBJECT_MAPPER.readValue(item, eu.dnetlib.dhp.schema.dump.gcat.CatalogueEntry.class));
org.apache.spark.sql.Dataset<eu.dnetlib.dhp.schema.dump.gcat.CatalogueEntry> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(eu.dnetlib.dhp.schema.dump.gcat.CatalogueEntry.class));
.createDataset(tmp.rdd(), Encoders.bean(eu.dnetlib.dhp.schema.dump.gcat.CatalogueEntry.class));
Assertions.assertEquals(2, verificationDataset.count());
verificationDataset.show(false);

View File

@ -1,12 +1,10 @@
package eu.dnetlib.dhp.oa.graph.gcat;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.*;
import java.net.URISyntaxException;
import java.util.List;
import java.util.zip.GZIPInputStream;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpStatus;
@ -63,7 +61,8 @@ public class GCatAPIClientTest {
// and '_'.
// You can validate your name using the regular expression : ^[a-z0-9_\\-]{2,100}$
String json = IOUtils
.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/dump/gcat/gcat_dat_prova_20201130.json"));
.toString(
getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/dump/gcat/gcat_software_20201130.json"));
System.out.println("Creating item...");
Assertions.assertEquals(HttpStatus.SC_CREATED, client.publish(json));
System.out.println("item created, now listing...");
@ -85,13 +84,34 @@ public class GCatAPIClientTest {
@Test
public void bulkPublishDATS() throws IOException, URISyntaxException {
BufferedReader reader = new BufferedReader(new FileReader(getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/gcat/dats_20201126")
.getPath()));
String line;
int count = 1;
while ((line = reader.readLine()) != null) {
Assertions.assertEquals(HttpStatus.SC_CREATED, client.publish(line));
System.out.println(count);
count++;
}
}
@Test
public void bulkPublishCompressedSW() throws IOException, URISyntaxException {
BufferedReader reader = new BufferedReader(
new InputStreamReader(new GZIPInputStream(new FileInputStream(getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/gcat/software_20201130.gz")
.getPath()))));
String line;
int count = 1;
while ((line = reader.readLine()) != null) {
Assertions.assertEquals(HttpStatus.SC_CREATED, client.publish(line));
System.out.println(count);
count++;
}
}