Merge branch 'stable_ids' of https://code-repo.d4science.org/D-Net/dnet-hadoop into stable_ids

This commit is contained in:
Miriam Baglioni 2021-06-14 10:19:49 +02:00
commit cf360d7c97
17 changed files with 917 additions and 156 deletions

View File

@ -22,8 +22,8 @@
<artifactId>hadoop-common</artifactId> <artifactId>hadoop-common</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>commons-validator</groupId> <groupId>com.github.sisyphsu</groupId>
<artifactId>commons-validator</artifactId> <artifactId>dateparser</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.spark</groupId> <groupId>org.apache.spark</groupId>

View File

@ -1,15 +1,23 @@
package eu.dnetlib.dhp.schema.oaf.utils; package eu.dnetlib.dhp.schema.oaf.utils;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.*; import java.util.*;
import java.util.function.Function; import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.validator.GenericValidator; import org.jetbrains.annotations.NotNull;
import com.github.sisyphsu.dateparser.DateParserUtils;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
@ -18,8 +26,9 @@ import eu.dnetlib.dhp.schema.oaf.*;
public class GraphCleaningFunctions extends CleaningFunctions { public class GraphCleaningFunctions extends CleaningFunctions {
public static final String ORCID_CLEANING_REGEX = ".*([0-9]{4}).*[-–—−=].*([0-9]{4}).*[-–—−=].*([0-9]{4}).*[-–—−=].*([0-9x]{4})";
public static final int ORCID_LEN = 19;
public static final String CLEANING_REGEX = "(?:\\n|\\r|\\t)"; public static final String CLEANING_REGEX = "(?:\\n|\\r|\\t)";
public static final String ORCID_PREFIX_REGEX = "^http(s?):\\/\\/orcid\\.org\\/";
public static final String INVALID_AUTHOR_REGEX = ".*deactivated.*"; public static final String INVALID_AUTHOR_REGEX = ".*deactivated.*";
public static final String TITLE_FILTER_REGEX = "[.*test.*\\W\\d]"; public static final String TITLE_FILTER_REGEX = "[.*test.*\\W\\d]";
public static final int TITLE_FILTER_RESIDUAL_LENGTH = 10; public static final int TITLE_FILTER_RESIDUAL_LENGTH = 10;
@ -119,14 +128,42 @@ public class GraphCleaningFunctions extends CleaningFunctions {
} else if (value instanceof Relation) { } else if (value instanceof Relation) {
Relation r = (Relation) value; Relation r = (Relation) value;
if (!isValidDate(r.getValidationDate())) { Optional<String> validationDate = doCleanDate(r.getValidationDate());
if (validationDate.isPresent()) {
r.setValidationDate(validationDate.get());
r.setValidated(true);
} else {
r.setValidationDate(null); r.setValidationDate(null);
r.setValidated(false); r.setValidated(false);
} }
} else if (value instanceof Result) { } else if (value instanceof Result) {
Result r = (Result) value; Result r = (Result) value;
if (Objects.nonNull(r.getDateofacceptance())) {
Optional<String> date = cleanDateField(r.getDateofacceptance());
if (date.isPresent()) {
r.getDateofacceptance().setValue(date.get());
} else {
r.setDateofacceptance(null);
}
}
if (Objects.nonNull(r.getRelevantdate())) {
r
.setRelevantdate(
r
.getRelevantdate()
.stream()
.filter(Objects::nonNull)
.filter(sp -> Objects.nonNull(sp.getQualifier()))
.filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid()))
.map(sp -> {
sp.setValue(GraphCleaningFunctions.cleanDate(sp.getValue()));
return sp;
})
.filter(sp -> StringUtils.isNotBlank(sp.getValue()))
.collect(Collectors.toList()));
}
if (Objects.nonNull(r.getPublisher()) && StringUtils.isBlank(r.getPublisher().getValue())) { if (Objects.nonNull(r.getPublisher()) && StringUtils.isBlank(r.getPublisher().getValue())) {
r.setPublisher(null); r.setPublisher(null);
} }
@ -222,6 +259,14 @@ public class GraphCleaningFunctions extends CleaningFunctions {
if (Objects.isNull(i.getRefereed())) { if (Objects.isNull(i.getRefereed())) {
i.setRefereed(qualifier("0000", "Unknown", ModelConstants.DNET_REVIEW_LEVELS)); i.setRefereed(qualifier("0000", "Unknown", ModelConstants.DNET_REVIEW_LEVELS));
} }
if (Objects.nonNull(i.getDateofacceptance())) {
Optional<String> date = cleanDateField(i.getDateofacceptance());
if (date.isPresent()) {
i.getDateofacceptance().setValue(date.get());
} else {
i.setDateofacceptance(null);
}
}
} }
} }
if (Objects.isNull(r.getBestaccessright()) || StringUtils.isBlank(r.getBestaccessright().getClassid())) { if (Objects.isNull(r.getBestaccessright()) || StringUtils.isBlank(r.getBestaccessright().getClassid())) {
@ -237,7 +282,27 @@ public class GraphCleaningFunctions extends CleaningFunctions {
} }
} }
if (Objects.nonNull(r.getAuthor())) { if (Objects.nonNull(r.getAuthor())) {
final List<Author> authors = Lists.newArrayList(); r
.setAuthor(
r
.getAuthor()
.stream()
.filter(a -> Objects.nonNull(a))
.filter(a -> StringUtils.isNotBlank(a.getFullname()))
.filter(a -> StringUtils.isNotBlank(a.getFullname().replaceAll("[\\W]", "")))
.collect(Collectors.toList()));
boolean nullRank = r
.getAuthor()
.stream()
.anyMatch(a -> Objects.isNull(a.getRank()));
if (nullRank) {
int i = 1;
for (Author author : r.getAuthor()) {
author.setRank(i++);
}
}
for (Author a : r.getAuthor()) { for (Author a : r.getAuthor()) {
if (Objects.isNull(a.getPid())) { if (Objects.isNull(a.getPid())) {
a.setPid(Lists.newArrayList()); a.setPid(Lists.newArrayList());
@ -251,40 +316,52 @@ public class GraphCleaningFunctions extends CleaningFunctions {
.filter(p -> Objects.nonNull(p.getQualifier())) .filter(p -> Objects.nonNull(p.getQualifier()))
.filter(p -> StringUtils.isNotBlank(p.getValue())) .filter(p -> StringUtils.isNotBlank(p.getValue()))
.map(p -> { .map(p -> {
p.setValue(p.getValue().trim().replaceAll(ORCID_PREFIX_REGEX, "")); // hack to distinguish orcid from orcid_pending
String pidProvenance = Optional
.ofNullable(p.getDataInfo())
.map(
d -> Optional
.ofNullable(d.getProvenanceaction())
.map(Qualifier::getClassid)
.orElse(""))
.orElse("");
if (p
.getQualifier()
.getClassid()
.toLowerCase()
.contains(ModelConstants.ORCID)) {
if (pidProvenance
.equals(ModelConstants.SYSIMPORT_CROSSWALK_ENTITYREGISTRY)) {
p.getQualifier().setClassid(ModelConstants.ORCID);
} else {
p.getQualifier().setClassid(ModelConstants.ORCID_PENDING);
}
final String orcid = p
.getValue()
.trim()
.toLowerCase()
.replaceAll(ORCID_CLEANING_REGEX, "$1-$2-$3-$4");
if (orcid.length() == ORCID_LEN) {
p.setValue(orcid);
} else {
p.setValue("");
}
}
return p; return p;
}) })
.filter(p -> StringUtils.isNotBlank(p.getValue())) .filter(p -> StringUtils.isNotBlank(p.getValue()))
.collect( .collect(
Collectors Collectors
.toMap( .toMap(
StructuredProperty::getValue, Function.identity(), (p1, p2) -> p1, p -> p.getQualifier().getClassid() + p.getValue(),
Function.identity(),
(p1, p2) -> p1,
LinkedHashMap::new)) LinkedHashMap::new))
.values() .values()
.stream() .stream()
.collect(Collectors.toList())); .collect(Collectors.toList()));
} }
if (StringUtils.isBlank(a.getFullname())) {
if (StringUtils.isNotBlank(a.getName()) && StringUtils.isNotBlank(a.getSurname())) {
a.setFullname(a.getSurname() + ", " + a.getName());
}
}
if (StringUtils.isNotBlank(a.getFullname()) && isValidAuthorName(a)) {
authors.add(a);
}
} }
boolean nullRank = authors
.stream()
.anyMatch(a -> Objects.isNull(a.getRank()));
if (nullRank) {
int i = 1;
for (Author author : authors) {
author.setRank(i++);
}
}
r.setAuthor(authors);
} }
if (value instanceof Publication) { if (value instanceof Publication) {
@ -300,10 +377,34 @@ public class GraphCleaningFunctions extends CleaningFunctions {
return value; return value;
} }
protected static boolean isValidDate(String date) { private static Optional<String> cleanDateField(Field<String> dateofacceptance) {
return Stream return Optional
.of(ModelSupport.DATE_TIME_FORMATS) .ofNullable(dateofacceptance)
.anyMatch(format -> GenericValidator.isDate(date, format, false)); .map(Field::getValue)
.map(GraphCleaningFunctions::cleanDate)
.filter(Objects::nonNull);
}
protected static Optional<String> doCleanDate(String date) {
return Optional.ofNullable(cleanDate(date));
}
public static String cleanDate(final String inputDate) {
if (StringUtils.isBlank(inputDate)) {
return null;
}
try {
final LocalDate date = DateParserUtils
.parseDate(inputDate.trim())
.toInstant()
.atZone(ZoneId.systemDefault())
.toLocalDate();
return DateTimeFormatter.ofPattern(ModelSupport.DATE_FORMAT).format(date);
} catch (DateTimeParseException e) {
return null;
}
} }
// HELPERS // HELPERS

View File

@ -4,9 +4,12 @@ package eu.dnetlib.dhp.schema.oaf.utils;
import static org.junit.jupiter.api.Assertions.*; import static org.junit.jupiter.api.Assertions.*;
import java.io.IOException; import java.io.IOException;
import java.time.format.DateTimeParseException; import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -26,11 +29,111 @@ public class OafMapperUtilsTest {
@Test @Test
public void testDateValidation() { public void testDateValidation() {
assertTrue(GraphCleaningFunctions.isValidDate("2016-05-07T12:41:19.202Z")); assertTrue(GraphCleaningFunctions.doCleanDate("2016-05-07T12:41:19.202Z ").isPresent());
assertTrue(GraphCleaningFunctions.isValidDate("2020-09-10 11:08:52")); assertTrue(GraphCleaningFunctions.doCleanDate("2020-09-10 11:08:52 ").isPresent());
assertTrue(GraphCleaningFunctions.isValidDate("2016-04-05")); assertTrue(GraphCleaningFunctions.doCleanDate(" 2016-04-05").isPresent());
assertFalse(GraphCleaningFunctions.isValidDate("2016 April 05"));
assertEquals("2016-04-05", GraphCleaningFunctions.doCleanDate("2016 Apr 05").get());
assertEquals("2009-05-08", GraphCleaningFunctions.doCleanDate("May 8, 2009 5:57:51 PM").get());
assertEquals("1970-10-07", GraphCleaningFunctions.doCleanDate("oct 7, 1970").get());
assertEquals("1970-10-07", GraphCleaningFunctions.doCleanDate("oct 7, '70").get());
assertEquals("1970-10-07", GraphCleaningFunctions.doCleanDate("oct. 7, 1970").get());
assertEquals("1970-10-07", GraphCleaningFunctions.doCleanDate("oct. 7, 70").get());
assertEquals("2006-01-02", GraphCleaningFunctions.doCleanDate("Mon Jan 2 15:04:05 2006").get());
assertEquals("2006-01-02", GraphCleaningFunctions.doCleanDate("Mon Jan 2 15:04:05 MST 2006").get());
assertEquals("2006-01-02", GraphCleaningFunctions.doCleanDate("Mon Jan 02 15:04:05 -0700 2006").get());
assertEquals("2006-01-02", GraphCleaningFunctions.doCleanDate("Monday, 02-Jan-06 15:04:05 MST").get());
assertEquals("2006-01-02", GraphCleaningFunctions.doCleanDate("Mon, 02 Jan 2006 15:04:05 MST").get());
assertEquals("2017-07-11", GraphCleaningFunctions.doCleanDate("Tue, 11 Jul 2017 16:28:13 +0200 (CEST)").get());
assertEquals("2006-01-02", GraphCleaningFunctions.doCleanDate("Mon, 02 Jan 2006 15:04:05 -0700").get());
assertEquals("2018-01-04", GraphCleaningFunctions.doCleanDate("Thu, 4 Jan 2018 17:53:36 +0000").get());
assertEquals("2015-08-10", GraphCleaningFunctions.doCleanDate("Mon Aug 10 15:44:11 UTC+0100 2015").get());
assertEquals(
"2015-07-03",
GraphCleaningFunctions.doCleanDate("Fri Jul 03 2015 18:04:07 GMT+0100 (GMT Daylight Time)").get());
assertEquals("2012-09-17", GraphCleaningFunctions.doCleanDate("September 17, 2012 10:09am").get());
assertEquals("2012-09-17", GraphCleaningFunctions.doCleanDate("September 17, 2012 at 10:09am PST-08").get());
assertEquals("2012-09-17", GraphCleaningFunctions.doCleanDate("September 17, 2012, 10:10:09").get());
assertEquals("1970-10-07", GraphCleaningFunctions.doCleanDate("October 7, 1970").get());
assertEquals("1970-10-07", GraphCleaningFunctions.doCleanDate("October 7th, 1970").get());
assertEquals("2006-02-12", GraphCleaningFunctions.doCleanDate("12 Feb 2006, 19:17").get());
assertEquals("2006-02-12", GraphCleaningFunctions.doCleanDate("12 Feb 2006 19:17").get());
assertEquals("1970-10-07", GraphCleaningFunctions.doCleanDate("7 oct 70").get());
assertEquals("1970-10-07", GraphCleaningFunctions.doCleanDate("7 oct 1970").get());
assertEquals("2013-02-03", GraphCleaningFunctions.doCleanDate("03 February 2013").get());
assertEquals("2013-07-01", GraphCleaningFunctions.doCleanDate("1 July 2013").get());
assertEquals("2013-02-03", GraphCleaningFunctions.doCleanDate("2013-Feb-03").get());
assertEquals("2014-03-31", GraphCleaningFunctions.doCleanDate("3/31/2014").get());
assertEquals("2014-03-31", GraphCleaningFunctions.doCleanDate("03/31/2014").get());
assertEquals("1971-08-21", GraphCleaningFunctions.doCleanDate("08/21/71").get());
assertEquals("1971-01-08", GraphCleaningFunctions.doCleanDate("8/1/71").get());
assertEquals("2014-08-04", GraphCleaningFunctions.doCleanDate("4/8/2014 22:05").get());
assertEquals("2014-08-04", GraphCleaningFunctions.doCleanDate("04/08/2014 22:05").get());
assertEquals("2014-08-04", GraphCleaningFunctions.doCleanDate("4/8/14 22:05").get());
assertEquals("2014-02-04", GraphCleaningFunctions.doCleanDate("04/2/2014 03:00:51").get());
assertEquals("1965-08-08", GraphCleaningFunctions.doCleanDate("8/8/1965 12:00:00 AM").get());
assertEquals("1965-08-08", GraphCleaningFunctions.doCleanDate("8/8/1965 01:00:01 PM").get());
assertEquals("1965-08-08", GraphCleaningFunctions.doCleanDate("8/8/1965 01:00 PM").get());
assertEquals("1965-08-08", GraphCleaningFunctions.doCleanDate("8/8/1965 1:00 PM").get());
assertEquals("1965-08-08", GraphCleaningFunctions.doCleanDate("8/8/1965 12:00 AM").get());
assertEquals("2014-02-04", GraphCleaningFunctions.doCleanDate("4/02/2014 03:00:51").get());
assertEquals("2012-03-19", GraphCleaningFunctions.doCleanDate("03/19/2012 10:11:59").get());
assertEquals("2012-03-19", GraphCleaningFunctions.doCleanDate("03/19/2012 10:11:59.3186369").get());
assertEquals("2014-03-31", GraphCleaningFunctions.doCleanDate("2014/3/31").get());
assertEquals("2014-03-31", GraphCleaningFunctions.doCleanDate("2014/03/31").get());
assertEquals("2014-04-08", GraphCleaningFunctions.doCleanDate("2014/4/8 22:05").get());
assertEquals("2014-04-08", GraphCleaningFunctions.doCleanDate("2014/04/08 22:05").get());
assertEquals("2014-04-02", GraphCleaningFunctions.doCleanDate("2014/04/2 03:00:51").get());
assertEquals("2014-04-02", GraphCleaningFunctions.doCleanDate("2014/4/02 03:00:51").get());
assertEquals("2012-03-19", GraphCleaningFunctions.doCleanDate("2012/03/19 10:11:59").get());
assertEquals("2012-03-19", GraphCleaningFunctions.doCleanDate("2012/03/19 10:11:59.3186369").get());
assertEquals("2014-04-08", GraphCleaningFunctions.doCleanDate("2014年04月08日").get());
assertEquals("2006-01-02", GraphCleaningFunctions.doCleanDate("2006-01-02T15:04:05+0000").get());
assertEquals("2009-08-13", GraphCleaningFunctions.doCleanDate("2009-08-12T22:15:09-07:00").get());
assertEquals("2009-08-12", GraphCleaningFunctions.doCleanDate("2009-08-12T22:15:09").get());
assertEquals("2009-08-12", GraphCleaningFunctions.doCleanDate("2009-08-12T22:15:09Z").get());
assertEquals("2014-04-26", GraphCleaningFunctions.doCleanDate("2014-04-26 17:24:37.3186369").get());
assertEquals("2012-08-03", GraphCleaningFunctions.doCleanDate("2012-08-03 18:31:59.257000000").get());
assertEquals("2014-04-26", GraphCleaningFunctions.doCleanDate("2014-04-26 17:24:37.123").get());
assertEquals("2013-04-01", GraphCleaningFunctions.doCleanDate("2013-04-01 22:43").get());
assertEquals("2013-04-01", GraphCleaningFunctions.doCleanDate("2013-04-01 22:43:22").get());
assertEquals("2014-12-16", GraphCleaningFunctions.doCleanDate("2014-12-16 06:20:00 UTC").get());
assertEquals("2014-12-16", GraphCleaningFunctions.doCleanDate("2014-12-16 06:20:00 GMT").get());
assertEquals("2014-04-26", GraphCleaningFunctions.doCleanDate("2014-04-26 05:24:37 PM").get());
assertEquals("2014-04-26", GraphCleaningFunctions.doCleanDate("2014-04-26 13:13:43 +0800").get());
assertEquals("2014-04-26", GraphCleaningFunctions.doCleanDate("2014-04-26 13:13:43 +0800 +08").get());
assertEquals("2014-04-26", GraphCleaningFunctions.doCleanDate("2014-04-26 13:13:44 +09:00").get());
assertEquals("2012-08-03", GraphCleaningFunctions.doCleanDate("2012-08-03 18:31:59.257000000 +0000 UTC").get());
assertEquals("2015-09-30", GraphCleaningFunctions.doCleanDate("2015-09-30 18:48:56.35272715 +0000 UTC").get());
assertEquals("2015-02-18", GraphCleaningFunctions.doCleanDate("2015-02-18 00:12:00 +0000 GMT").get());
assertEquals("2015-02-18", GraphCleaningFunctions.doCleanDate("2015-02-18 00:12:00 +0000 UTC").get());
assertEquals(
"2015-02-08", GraphCleaningFunctions.doCleanDate("2015-02-08 03:02:00 +0300 MSK m=+0.000000001").get());
assertEquals(
"2015-02-08", GraphCleaningFunctions.doCleanDate("2015-02-08 03:02:00.001 +0300 MSK m=+0.000000001").get());
assertEquals("2017-07-19", GraphCleaningFunctions.doCleanDate("2017-07-19 03:21:51+00:00").get());
assertEquals("2014-04-26", GraphCleaningFunctions.doCleanDate("2014-04-26").get());
assertEquals("2014-04-01", GraphCleaningFunctions.doCleanDate("2014-04").get());
assertEquals("2014-01-01", GraphCleaningFunctions.doCleanDate("2014").get());
assertEquals("2014-05-11", GraphCleaningFunctions.doCleanDate("2014-05-11 08:20:13,787").get());
assertEquals("2014-03-31", GraphCleaningFunctions.doCleanDate("3.31.2014").get());
assertEquals("2014-03-31", GraphCleaningFunctions.doCleanDate("03.31.2014").get());
assertEquals("1971-08-21", GraphCleaningFunctions.doCleanDate("08.21.71").get());
assertEquals("2014-03-01", GraphCleaningFunctions.doCleanDate("2014.03").get());
assertEquals("2014-03-30", GraphCleaningFunctions.doCleanDate("2014.03.30").get());
assertEquals("2014-06-01", GraphCleaningFunctions.doCleanDate("20140601").get());
assertEquals("2014-07-22", GraphCleaningFunctions.doCleanDate("20140722105203").get());
assertEquals("2012-03-19", GraphCleaningFunctions.doCleanDate("1332151919").get());
assertEquals("2013-11-12", GraphCleaningFunctions.doCleanDate("1384216367189").get());
assertEquals("2013-11-12", GraphCleaningFunctions.doCleanDate("1384216367111222").get());
assertEquals("2013-11-12", GraphCleaningFunctions.doCleanDate("1384216367111222333").get());
}
@Test
public void testDate() {
System.out.println(GraphCleaningFunctions.cleanDate("23-FEB-1998"));
} }
@Test @Test

View File

@ -10,87 +10,11 @@ import java.util.*;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import eu.dnetlib.dhp.schema.oaf.utils.GraphCleaningFunctions;
import net.sf.saxon.s9api.*; import net.sf.saxon.s9api.*;
public class DateCleaner implements ExtensionFunction, Serializable { public class DateCleaner implements ExtensionFunction, Serializable {
private final static List<Pattern> dateRegex = Arrays
.asList(
// Y-M-D
Pattern.compile("(18|19|20)\\d\\d([- /.])(0[1-9]|1[012])\\2(0[1-9]|[12][0-9]|3[01])", Pattern.MULTILINE),
// M-D-Y
Pattern
.compile(
"((0[1-9]|1[012])|([1-9]))([- /.])(0[1-9]|[12][0-9]|3[01])([- /.])(18|19|20)?\\d\\d",
Pattern.MULTILINE),
// D-M-Y
Pattern
.compile(
"(?:(?:31(/|-|\\.)(?:0?[13578]|1[02]|(?:Jan|Mar|May|Jul|Aug|Oct|Dec)))\\1|(?:(?:29|30)(/|-|\\.)(?:0?[1,3-9]|1[0-2]|(?:Jan|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec))\\2))(?:(?:1[6-9]|[2-9]\\d)?\\d{2})|(?:29(/|-|\\.)(?:0?2|(?:Feb))\\3(?:(?:(?:1[6-9]|[2-9]\\d)?(?:0[48]|[2468][048]|[13579][26])|(?:(?:16|[2468][048]|[3579][26])00))))|(?:0?[1-9]|1\\d|2[0-8])(/|-|\\.)(?:(?:0?[1-9]|(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep))|(?:1[0-2]|(?:Oct|Nov|Dec)))\\4(?:(?:1[6-9]|[2-9]\\d)?\\d{2})",
Pattern.MULTILINE),
// Y
Pattern.compile("(19|20)\\d\\d", Pattern.MULTILINE));
private final static Pattern incompleteDateRegex = Pattern
.compile("^((18|19|20)\\d\\d){1}([- \\\\ \\/](0?[1-9]|1[012]))?", Pattern.MULTILINE);
private final static List<DateTimeFormatter> dformats = Arrays
.asList(
DateTimeFormatter
.ofPattern(
"[MM-dd-yyyy][MM/dd/yyyy][dd-MM-yy][dd-MMM-yyyy][dd/MMM/yyyy][dd-MMM-yy][dd/MMM/yy][dd-MM-yy][dd/MM/yy][dd-MM-yyyy][dd/MM/yyyy][yyyy-MM-dd][yyyy/MM/dd]",
Locale.ENGLISH),
DateTimeFormatter.ofPattern("[dd-MM-yyyy][dd/MM/yyyy]", Locale.ITALIAN));
public String clean(final String inputDate) {
Optional<String> cleanedDate = dateRegex
.stream()
.map(
p -> {
final Matcher matcher = p.matcher(inputDate);
if (matcher.find())
return matcher.group(0);
else
return null;
})
.filter(Objects::nonNull)
.map(m -> {
Optional<String> cleanDate = dformats
.stream()
.map(f -> {
try {
LocalDate parsedDate = LocalDate.parse(m, f);
if (parsedDate != null)
return parsedDate.toString();
else
return null;
} catch (Throwable e) {
return null;
}
}
)
.filter(Objects::nonNull)
.findAny();
return cleanDate.orElse(null);
})
.filter(Objects::nonNull)
.findAny();
if (cleanedDate.isPresent())
return cleanedDate.get();
final Matcher matcher = incompleteDateRegex.matcher(inputDate);
if (matcher.find()) {
final Integer year = Integer.parseInt(matcher.group(1));
final Integer month = Integer.parseInt(matcher.group(4) == null ? "01" : matcher.group(4));
return String.format("%d-%02d-01", year, month);
}
return null;
}
@Override @Override
public QName getName() { public QName getName() {
return new QName(QNAME_BASE_URI + "/dateISO", "dateISO"); return new QName(QNAME_BASE_URI + "/dateISO", "dateISO");
@ -117,4 +41,9 @@ public class DateCleaner implements ExtensionFunction, Serializable {
final String currentValue = xdmValues[0].itemAt(0).getStringValue(); final String currentValue = xdmValues[0].itemAt(0).getStringValue();
return new XdmAtomicValue(clean(currentValue)); return new XdmAtomicValue(clean(currentValue));
} }
// for backward compatibility with the existing unit tests
public String clean(String date) {
return GraphCleaningFunctions.cleanDate(date);
}
} }

View File

@ -51,11 +51,11 @@ public class TransformationJobTest extends AbstractVocabularyTest {
@DisplayName("Test Date cleaner") @DisplayName("Test Date cleaner")
public void testDateCleaner() throws Exception { public void testDateCleaner() throws Exception {
DateCleaner dc = new DateCleaner(); DateCleaner dc = new DateCleaner();
assertEquals(dc.clean("20/09/1982"), "1982-09-20"); assertEquals("1982-09-20", dc.clean("20/09/1982"));
assertEquals(dc.clean("20-09-2002"), "2002-09-20"); assertEquals("2002-09-20", dc.clean("20-09-2002"));
assertEquals(dc.clean("2002-09-20"), "2002-09-20"); assertEquals("2002-09-20", dc.clean("2002-09-20"));
assertEquals(dc.clean("2002-9"), "2002-09-01"); assertEquals("2002-09-01", dc.clean("2002-9"));
assertEquals(dc.clean("2021"), "2021-01-01"); assertEquals("2021-01-01", dc.clean("2021"));
} }
@Test @Test

View File

@ -1,24 +1,65 @@
package eu.dnetlib.dhp.oa.graph.raw; package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*; import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PID_TYPES;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.*; import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_PRODUCED_BY;
import static eu.dnetlib.dhp.schema.common.ModelConstants.OUTCOME;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PRODUCES;
import static eu.dnetlib.dhp.schema.common.ModelConstants.REPOSITORY_PROVENANCE_ACTIONS;
import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_PROJECT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.UNKNOWN;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.dataInfo;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.field;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.journal;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.keyValue;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.listFields;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.oaiIProvenance;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.qualifier;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.structuredProperty;
import java.util.*; import java.util.ArrayList;
import java.util.stream.Collectors; import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.dom4j.Document; import org.dom4j.Document;
import org.dom4j.DocumentFactory; import org.dom4j.DocumentFactory;
import org.dom4j.DocumentHelper; import org.dom4j.DocumentHelper;
import org.dom4j.Node; import org.dom4j.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.AccessRight;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.GeoLocation;
import eu.dnetlib.dhp.schema.oaf.Instance;
import eu.dnetlib.dhp.schema.oaf.Journal;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.OAIProvenance;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
@ -35,7 +76,9 @@ public abstract class AbstractMdRecordToOafMapper {
protected static final String DATACITE_SCHEMA_KERNEL_3 = "http://datacite.org/schema/kernel-3"; protected static final String DATACITE_SCHEMA_KERNEL_3 = "http://datacite.org/schema/kernel-3";
protected static final String DATACITE_SCHEMA_KERNEL_3_SLASH = "http://datacite.org/schema/kernel-3/"; protected static final String DATACITE_SCHEMA_KERNEL_3_SLASH = "http://datacite.org/schema/kernel-3/";
protected static final Qualifier ORCID_PID_TYPE = qualifier( protected static final Qualifier ORCID_PID_TYPE = qualifier(
ORCID_PENDING, ORCID_CLASSNAME, DNET_PID_TYPES, DNET_PID_TYPES); ModelConstants.ORCID_PENDING,
ModelConstants.ORCID_CLASSNAME,
DNET_PID_TYPES, DNET_PID_TYPES);
protected static final Qualifier MAG_PID_TYPE = qualifier( protected static final Qualifier MAG_PID_TYPE = qualifier(
"MAGIdentifier", "Microsoft Academic Graph Identifier", DNET_PID_TYPES, DNET_PID_TYPES); "MAGIdentifier", "Microsoft Academic Graph Identifier", DNET_PID_TYPES, DNET_PID_TYPES);
@ -43,6 +86,8 @@ public abstract class AbstractMdRecordToOafMapper {
protected static final Map<String, String> nsContext = new HashMap<>(); protected static final Map<String, String> nsContext = new HashMap<>();
private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesApplication.class);
static { static {
nsContext.put("dr", "http://www.driver-repository.eu/namespace/dr"); nsContext.put("dr", "http://www.driver-repository.eu/namespace/dr");
nsContext.put("dri", "http://www.driver-repository.eu/namespace/dri"); nsContext.put("dri", "http://www.driver-repository.eu/namespace/dri");
@ -61,6 +106,9 @@ public abstract class AbstractMdRecordToOafMapper {
} }
public List<Oaf> processMdRecord(final String xml) { public List<Oaf> processMdRecord(final String xml) {
// log.info("Processing record: " + xml);
try { try {
DocumentFactory.getInstance().setXPathNamespaceURIs(nsContext); DocumentFactory.getInstance().setXPathNamespaceURIs(nsContext);
@ -100,10 +148,10 @@ public abstract class AbstractMdRecordToOafMapper {
} }
protected String getResultType(final Document doc, final List<Instance> instances) { protected String getResultType(final Document doc, final List<Instance> instances) {
String type = doc.valueOf("//dr:CobjCategory/@type"); final String type = doc.valueOf("//dr:CobjCategory/@type");
if (StringUtils.isBlank(type) & vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) { if (StringUtils.isBlank(type) & vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) {
String instanceType = instances final String instanceType = instances
.stream() .stream()
.map(i -> i.getInstancetype().getClassid()) .map(i -> i.getInstancetype().getClassid())
.findFirst() .findFirst()
@ -158,8 +206,12 @@ public abstract class AbstractMdRecordToOafMapper {
return oafs; return oafs;
} }
private OafEntity createEntity(Document doc, String type, List<Instance> instances, KeyValue collectedFrom, private OafEntity createEntity(final Document doc,
DataInfo info, long lastUpdateTimestamp) { final String type,
final List<Instance> instances,
final KeyValue collectedFrom,
final DataInfo info,
final long lastUpdateTimestamp) {
switch (type.toLowerCase()) { switch (type.toLowerCase()) {
case "publication": case "publication":
final Publication p = new Publication(); final Publication p = new Publication();
@ -219,9 +271,7 @@ public abstract class AbstractMdRecordToOafMapper {
getRelation( getRelation(
docId, projectId, RESULT_PROJECT, OUTCOME, IS_PRODUCED_BY, entity, validationdDate)); docId, projectId, RESULT_PROJECT, OUTCOME, IS_PRODUCED_BY, entity, validationdDate));
res res
.add( .add(getRelation(projectId, docId, RESULT_PROJECT, OUTCOME, PRODUCES, entity, validationdDate));
getRelation(
projectId, docId, RESULT_PROJECT, OUTCOME, PRODUCES, entity, validationdDate));
} }
} }
@ -411,10 +461,10 @@ public abstract class AbstractMdRecordToOafMapper {
return Lists.newArrayList(id); return Lists.newArrayList(id);
} }
} }
List<String> idList = doc final List<String> idList = doc
.selectNodes( .selectNodes(
"normalize-space(//*[local-name()='header']/*[local-name()='identifier' or local-name()='recordIdentifier']/text())"); "normalize-space(//*[local-name()='header']/*[local-name()='identifier' or local-name()='recordIdentifier']/text())");
Set<String> originalIds = Sets.newHashSet(idList); final Set<String> originalIds = Sets.newHashSet(idList);
if (originalIds.isEmpty()) { if (originalIds.isEmpty()) {
throw new IllegalStateException("missing originalID on " + doc.asXML()); throw new IllegalStateException("missing originalID on " + doc.asXML());
@ -423,8 +473,8 @@ public abstract class AbstractMdRecordToOafMapper {
} }
protected AccessRight prepareAccessRight(final Node node, final String xpath, final String schemeId) { protected AccessRight prepareAccessRight(final Node node, final String xpath, final String schemeId) {
Qualifier qualifier = prepareQualifier(node.valueOf(xpath).trim(), schemeId); final Qualifier qualifier = prepareQualifier(node.valueOf(xpath).trim(), schemeId);
AccessRight accessRight = new AccessRight(); final AccessRight accessRight = new AccessRight();
accessRight.setClassid(qualifier.getClassid()); accessRight.setClassid(qualifier.getClassid());
accessRight.setClassname(qualifier.getClassname()); accessRight.setClassname(qualifier.getClassname());
accessRight.setSchemeid(qualifier.getSchemeid()); accessRight.setSchemeid(qualifier.getSchemeid());

View File

@ -0,0 +1,154 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.StringReader;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.Namespace;
import org.dom4j.QName;
import org.dom4j.io.SAXReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo;
import scala.Tuple2;
public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication {
private static final Logger log = LoggerFactory.getLogger(MigrateHdfsMdstoresApplication.class);
private static final Namespace DRI_NS_PREFIX = new Namespace("dri",
"http://www.driver-repository.eu/namespace/dri");
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
MigrateHdfsMdstoresApplication.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_hdfs_mstores_parameters.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String mdstoreManagerUrl = parser.get("mdstoreManagerUrl");
final String mdFormat = parser.get("mdFormat");
final String mdLayout = parser.get("mdLayout");
final String mdInterpretation = parser.get("mdInterpretation");
final String hdfsPath = parser.get("hdfsPath");
final Set<String> paths = mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation);
final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
HdfsSupport.remove(hdfsPath, spark.sparkContext().hadoopConfiguration());
processPaths(spark, hdfsPath, paths, String.format("%s-%s-%s", mdFormat, mdLayout, mdInterpretation));
});
}
public static void processPaths(final SparkSession spark,
final String outputPath,
final Set<String> paths,
final String type) throws Exception {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
log.info("Found " + paths.size() + " not empty mdstores");
paths.forEach(log::info);
final String[] validPaths = paths
.stream()
.filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration()))
.toArray(size -> new String[size]);
spark
.read()
.parquet(validPaths)
.map((MapFunction<Row, String>) r -> enrichRecord(r), Encoders.STRING())
.toJavaRDD()
.mapToPair(xml -> new Tuple2<>(new Text(UUID.randomUUID() + ":" + type), new Text(xml)))
// .coalesce(1)
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
}
private static String enrichRecord(final Row r) {
final String xml = r.getAs("body");
final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ");
final String collDate = dateFormat.format(new Date((Long) r.getAs("dateOfCollection")));
final String tranDate = dateFormat.format(new Date((Long) r.getAs("dateOfTransformation")));
try {
final Document doc = new SAXReader().read(new StringReader(xml));
final Element head = (Element) doc.selectSingleNode("//*[local-name() = 'header']");
head.addElement(new QName("objIdentifier", DRI_NS_PREFIX)).addText(r.getAs("id"));
head.addElement(new QName("dateOfCollection", DRI_NS_PREFIX)).addText(collDate);
head.addElement(new QName("dateOfTransformation", DRI_NS_PREFIX)).addText(tranDate);
return doc.asXML();
} catch (final Exception e) {
log.error("Error patching record: " + xml);
throw new RuntimeException("Error patching record: " + xml, e);
}
}
private static Set<String> mdstorePaths(final String mdstoreManagerUrl,
final String format,
final String layout,
final String interpretation)
throws Exception {
final String url = mdstoreManagerUrl + "/mdstores/";
final ObjectMapper objectMapper = new ObjectMapper();
final HttpGet req = new HttpGet(url);
try (final CloseableHttpClient client = HttpClients.createDefault()) {
try (final CloseableHttpResponse response = client.execute(req)) {
final String json = IOUtils.toString(response.getEntity().getContent());
final MDStoreWithInfo[] mdstores = objectMapper.readValue(json, MDStoreWithInfo[].class);
return Arrays
.stream(mdstores)
.filter(md -> md.getFormat().equalsIgnoreCase(format))
.filter(md -> md.getLayout().equalsIgnoreCase(layout))
.filter(md -> md.getInterpretation().equalsIgnoreCase(interpretation))
.filter(md -> StringUtils.isNotBlank(md.getHdfsPath()))
.filter(md -> StringUtils.isNotBlank(md.getCurrentVersion()))
.filter(md -> md.getSize() > 0)
.map(md -> md.getHdfsPath() + "/" + md.getCurrentVersion() + "/store")
.collect(Collectors.toSet());
}
}
}
}

View File

@ -10,9 +10,6 @@ import org.apache.commons.io.IOUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.MdstoreClient; import eu.dnetlib.dhp.common.MdstoreClient;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;

View File

@ -0,0 +1,32 @@
[
{
"paramName": "p",
"paramLongName": "hdfsPath",
"paramDescription": "the path where storing the sequential file",
"paramRequired": true
},
{
"paramName": "u",
"paramLongName": "mdstoreManagerUrl",
"paramDescription": "the MdstoreManager url",
"paramRequired": true
},
{
"paramName": "f",
"paramLongName": "mdFormat",
"paramDescription": "metadata format",
"paramRequired": true
},
{
"paramName": "l",
"paramLongName": "mdLayout",
"paramDescription": "metadata layout",
"paramRequired": true
},
{
"paramName": "i",
"paramLongName": "mdInterpretation",
"paramDescription": "metadata interpretation",
"paramRequired": true
}
]

View File

@ -40,6 +40,16 @@
<value>false</value> <value>false</value>
<description>should import content from the aggregator or reuse a previous version</description> <description>should import content from the aggregator or reuse a previous version</description>
</property> </property>
<property>
<name>reuseODF_hdfs</name>
<value>false</value>
<description>should import content from the aggregator or reuse a previous version</description>
</property>
<property>
<name>reuseOAF_hdfs</name>
<value>false</value>
<description>should import content from the aggregator or reuse a previous version</description>
</property>
<property> <property>
<name>contentPath</name> <name>contentPath</name>
<description>path location to store (or reuse) content from the aggregator</description> <description>path location to store (or reuse) content from the aggregator</description>
@ -289,7 +299,7 @@
<decision name="reuse_oaf"> <decision name="reuse_oaf">
<switch> <switch>
<case to="ImportOAF">${wf:conf('reuseOAF') eq false}</case> <case to="ImportOAF">${wf:conf('reuseOAF') eq false}</case>
<case to="wait_import">${wf:conf('reuseOAF') eq true}</case> <case to="reuse_odf_hdfs">${wf:conf('reuseOAF') eq true}</case>
<default to="ImportOAF"/> <default to="ImportOAF"/>
</switch> </switch>
</decision> </decision>
@ -324,6 +334,74 @@
<arg>--mdLayout</arg><arg>store</arg> <arg>--mdLayout</arg><arg>store</arg>
<arg>--mdInterpretation</arg><arg>intersection</arg> <arg>--mdInterpretation</arg><arg>intersection</arg>
</java> </java>
<ok to="ImportODF_hdfs"/>
<error to="Kill"/>
</action>
<decision name="reuse_odf_hdfs">
<switch>
<case to="ImportODF_hdfs">${wf:conf('reuseODF_hdfs') eq false}</case>
<case to="reuse_oaf_hdfs">${wf:conf('reuseODF_hdfs') eq true}</case>
<default to="ImportODF_hdfs"/>
</switch>
</decision>
<action name="ImportODF_hdfs">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>ImportODF_hdfs</name>
<class>eu.dnetlib.dhp.oa.graph.raw.MigrateHdfsMdstoresApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--hdfsPath</arg><arg>${contentPath}/odf_records_hdfs</arg>
<arg>--mdstoreManagerUrl</arg><arg>${mdstoreManagerUrl}</arg>
<arg>--mdFormat</arg><arg>ODF</arg>
<arg>--mdLayout</arg><arg>store</arg>
<arg>--mdInterpretation</arg><arg>cleaned</arg>
</spark>
<ok to="reuse_oaf_hdfs"/>
<error to="Kill"/>
</action>
<decision name="reuse_oaf_hdfs">
<switch>
<case to="ImportOAF_hdfs">${wf:conf('reuseOAF_hdfs') eq false}</case>
<case to="wait_import">${wf:conf('reuseOAF_hdfs') eq true}</case>
<default to="ImportOAF_hdfs"/>
</switch>
</decision>
<action name="ImportOAF_hdfs">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>ImportOAF_hdfs</name>
<class>eu.dnetlib.dhp.oa.graph.raw.MigrateHdfsMdstoresApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--hdfsPath</arg><arg>${contentPath}/oaf_records_hdfs</arg>
<arg>--mdstoreManagerUrl</arg><arg>${mdstoreManagerUrl}</arg>
<arg>--mdFormat</arg><arg>OAF</arg>
<arg>--mdLayout</arg><arg>store</arg>
<arg>--mdInterpretation</arg><arg>cleaned</arg>
</spark>
<ok to="wait_import"/> <ok to="wait_import"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
@ -426,7 +504,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--sourcePaths</arg><arg>${contentPath}/db_openaire,${contentPath}/db_openorgs,${contentPath}/oaf_records,${contentPath}/odf_records</arg> <arg>--sourcePaths</arg><arg>${contentPath}/db_openaire,${contentPath}/db_openorgs,${contentPath}/oaf_records,${contentPath}/odf_records,${contentPath}/oaf_records_hdfs,${contentPath}/odf_records_hdfs</arg>
<arg>--targetPath</arg><arg>${workingDir}/entities</arg> <arg>--targetPath</arg><arg>${workingDir}/entities</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--shouldHashId</arg><arg>${shouldHashId}</arg> <arg>--shouldHashId</arg><arg>${shouldHashId}</arg>

View File

@ -0,0 +1,18 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -0,0 +1,157 @@
<workflow-app name="Test Import of Hdfs Stores" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>graphOutputPath</name>
<description>the target path to store raw graph</description>
</property>
<property>
<name>contentPath</name>
<description>path location to store (or reuse) content from the aggregator</description>
</property>
<property>
<name>mdstoreManagerUrl</name>
<description>the address of the Mdstore Manager</description>
</property>
<property>
<name>isLookupUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="ImportODF_hdfs"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ImportODF_hdfs">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>ImportODF_hdfs</name>
<class>eu.dnetlib.dhp.oa.graph.raw.MigrateHdfsMdstoresApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--hdfsPath</arg><arg>${contentPath}/odf_records_hdfs</arg>
<arg>--mdstoreManagerUrl</arg><arg>${mdstoreManagerUrl}</arg>
<arg>--mdFormat</arg><arg>ODF</arg>
<arg>--mdLayout</arg><arg>store</arg>
<arg>--mdInterpretation</arg><arg>cleaned</arg>
</spark>
<ok to="GenerateEntities"/>
<error to="Kill"/>
</action>
<action name="GenerateEntities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateEntities</name>
<class>eu.dnetlib.dhp.oa.graph.raw.GenerateEntitiesApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePaths</arg><arg>${contentPath}/odf_records_hdfs</arg>
<arg>--targetPath</arg><arg>${workingDir}/entities</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--shouldHashId</arg><arg>${shouldHashId}</arg>
</spark>
<ok to="GenerateGraph"/>
<error to="Kill"/>
</action>
<action name="GenerateGraph">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateGraph</name>
<class>eu.dnetlib.dhp.oa.graph.raw.DispatchEntitiesApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/entities</arg>
<arg>--graphRawPath</arg><arg>${workingDir}/graph_raw</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -129,6 +129,8 @@ public class GraphCleaningFunctionsTest {
assertEquals("CLOSED", p_cleaned.getBestaccessright().getClassid()); assertEquals("CLOSED", p_cleaned.getBestaccessright().getClassid());
assertNull(p_out.getPublisher()); assertNull(p_out.getPublisher());
assertEquals("1970-10-07", p_cleaned.getDateofacceptance().getValue());
final List<Instance> pci = p_cleaned.getInstance(); final List<Instance> pci = p_cleaned.getInstance();
assertNotNull(pci); assertNotNull(pci);
assertEquals(1, pci.size()); assertEquals(1, pci.size());

View File

@ -1,7 +1,11 @@
package eu.dnetlib.dhp.oa.graph.raw; package eu.dnetlib.dhp.oa.graph.raw;
import static org.junit.jupiter.api.Assertions.*; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.lenient;
import java.io.IOException; import java.io.IOException;
@ -21,7 +25,15 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.oa.graph.clean.GraphCleaningFunctionsTest; import eu.dnetlib.dhp.oa.graph.clean.GraphCleaningFunctionsTest;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Instance;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.schema.oaf.utils.PidType; import eu.dnetlib.dhp.schema.oaf.utils.PidType;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ -183,8 +195,8 @@ public class MappersTest {
.findFirst() .findFirst()
.get(); .get();
assertEquals("0000-0001-6651-1178", pid.getValue()); assertEquals("0000-0001-6651-1178", pid.getValue());
assertEquals("ORCID", pid.getQualifier().getClassid()); assertEquals(ModelConstants.ORCID_PENDING, pid.getQualifier().getClassid());
assertEquals("Open Researcher and Contributor ID", pid.getQualifier().getClassname()); assertEquals(ModelConstants.ORCID_CLASSNAME, pid.getQualifier().getClassname());
assertEquals(ModelConstants.DNET_PID_TYPES, pid.getQualifier().getSchemeid()); assertEquals(ModelConstants.DNET_PID_TYPES, pid.getQualifier().getSchemeid());
assertEquals(ModelConstants.DNET_PID_TYPES, pid.getQualifier().getSchemename()); assertEquals(ModelConstants.DNET_PID_TYPES, pid.getQualifier().getSchemename());
assertEquals("Votsi,Nefta", author.get().getFullname()); assertEquals("Votsi,Nefta", author.get().getFullname());
@ -409,7 +421,10 @@ public class MappersTest {
assertEquals(1, d.getTitle().size()); assertEquals(1, d.getTitle().size());
assertEquals( assertEquals(
"Validation of the Goodstrength System for Assessment of Abdominal Wall Strength in Patients With Incisional Hernia", "Validation of the Goodstrength System for Assessment of Abdominal Wall Strength in Patients With Incisional Hernia",
d.getTitle().get(0).getValue()); d
.getTitle()
.get(0)
.getValue());
assertNotNull(d.getDescription()); assertNotNull(d.getDescription());
assertEquals(1, d.getDescription().size()); assertEquals(1, d.getDescription().size());
@ -435,7 +450,7 @@ public class MappersTest {
assertNotNull(d.getInstance()); assertNotNull(d.getInstance());
assertTrue(d.getInstance().size() == 1); assertTrue(d.getInstance().size() == 1);
Instance i = d.getInstance().get(0); final Instance i = d.getInstance().get(0);
assertNotNull(i.getAccessright()); assertNotNull(i.getAccessright());
assertEquals(ModelConstants.DNET_ACCESS_MODES, i.getAccessright().getSchemeid()); assertEquals(ModelConstants.DNET_ACCESS_MODES, i.getAccessright().getSchemeid());
@ -633,7 +648,55 @@ public class MappersTest {
System.out.println(p.getTitle().get(0).getValue()); System.out.println(p.getTitle().get(0).getValue());
} }
@Test
void testOdfFromHdfs() throws IOException {
final String xml = IOUtils.toString(getClass().getResourceAsStream("odf_from_hdfs.xml"));
final List<Oaf> list = new OdfToOafMapper(vocs, false, true).processMdRecord(xml);
assertEquals(1, list.size());
System.out.println(list.get(0).getClass());
assertTrue(list.get(0) instanceof Dataset);
final Dataset p = (Dataset) list.get(0);
assertValidId(p.getId());
assertTrue(p.getOriginalId().size() == 1);
assertEquals("df76e73f-0483-49a4-a9bb-63f2f985574a", p.getOriginalId().get(0));
assertValidId(p.getCollectedfrom().get(0).getKey());
assertTrue(p.getAuthor().size() > 0);
final Optional<Author> author = p
.getAuthor()
.stream()
.findFirst();
assertTrue(author.isPresent());
assertEquals("Museum Sønderjylland", author.get().getFullname());
assertTrue(p.getSubject().size() > 0);
assertTrue(p.getInstance().size() > 0);
assertNotNull(p.getTitle());
assertFalse(p.getTitle().isEmpty());
assertNotNull(p.getInstance());
assertTrue(p.getInstance().size() > 0);
p
.getInstance()
.stream()
.forEach(i -> {
assertNotNull(i.getAccessright());
assertEquals("UNKNOWN", i.getAccessright().getClassid());
});
assertEquals("UNKNOWN", p.getInstance().get(0).getRefereed().getClassid());
}
private void assertValidId(final String id) { private void assertValidId(final String id) {
System.out.println(id);
assertEquals(49, id.length()); assertEquals(49, id.length());
assertEquals('|', id.charAt(2)); assertEquals('|', id.charAt(2));
assertEquals(':', id.charAt(15)); assertEquals(':', id.charAt(15));

View File

@ -301,7 +301,7 @@
}, },
"trust": "0.9" "trust": "0.9"
}, },
"value": "2016-01-01" "value": "7 oct 1970"
}, },
"dateofcollection": "", "dateofcollection": "",
"dateoftransformation": "2020-04-22T12:34:08.009Z", "dateoftransformation": "2020-04-22T12:34:08.009Z",

View File

@ -0,0 +1,77 @@
<record xmlns:oaf="http://namespace.openaire.eu/oaf"
xmlns:oai="http://www.openarchives.org/OAI/2.0/"
xmlns:datacite="http://datacite.org/schema/kernel-3"
xmlns:dr="http://www.driver-repository.eu/namespace/dr"
xmlns:dri="http://www.driver-repository.eu/namespace/dri">
<header xmlns="http://www.openarchives.org/OAI/2.0/">
<identifier>df76e73f-0483-49a4-a9bb-63f2f985574a</identifier>
<datestamp>2020-09-30T08:17:54Z</datestamp>
<setSpec>eudat-b2find</setSpec>
<dr:dateOfTransformation>2021-05-20T13:43:52.888Z</dr:dateOfTransformation>
<dri:objIdentifier>test________::92fe3efa47883b2f3401e6a4bd92e9d7</dri:objIdentifier>
<dri:dateOfCollection>2020-05-21T05:26:15.93Z</dri:dateOfCollection>
<dri:dateOfTransformation>2020-08-01T11:06:26.977Z</dri:dateOfTransformation>
</header>
<metadata>
<resource xmlns="http://datacite.org/schema/kernel-4"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://datacite.org/schema/kernel-4 http://schema.datacite.org/meta/kernel-4.3/metadata.xsd">
<creators>
<creator>
<creatorName>Museum Sønderjylland</creatorName>
</creator>
</creators>
<titles>
<title>200202-124 Hjelmvrå</title>
</titles>
<descriptions>
<description descriptionType="Abstract">This record describes
ancient sites and monuments as well archaeological excavations
undertaken by Danish museums. Excerpt of the Danish description of
events: 1995-04-26: Ved en besigtigelse ud for stedet fandt Nørgård
en større mængde skår i skovens udkant, liggende i nogle
drængrøfter1995-04-26: Leif Nørgård, der er leder af Sønderjyllands
Amatørarkæologer, havde ved en samtale med en tidligere ansat på
motorvejsprojektet gennem Sønderjylland fået at vide, at man på
dette sted havde fundet "urner".1995-04-26: Ved en besigtigelse ud
for stedet fandt Nørgård en større mængde skår i skovens udkant,
liggende i nogle drængrøfter1995-04-26: Leif Nørgård, der er leder
af Sønderjyllands Amatørarkæologer, havde ved en samtale med en
tidligere ansat på motorvejsprojektet gennem Sønderjylland fået at
vide, at man på dette sted havde fundet "urner".</description>
</descriptions>
<geoLocations>
<geoLocation>
<geoLocationPlace>(9.376 LON, 55.220 LAT)</geoLocationPlace>
</geoLocation>
</geoLocations>
<subjects>
<subject>Enkeltfund</subject>
<subject>Settlement</subject>
<subject>Single find</subject>
<subject>Archaeology</subject>
</subjects>
<alternateIdentifiers
xmlns="http://datacite.org/schema/kernel-3">
<alternateIdentifier
xmlns="http://datacite.org/schema/kernel-4"
alternateIdentifierType="URL">http://www.kulturarv.dk/fundogfortidsminder/Lokalitet/136540/</alternateIdentifier>
</alternateIdentifiers>
<publicationYear>2020</publicationYear>
<publisher>Slots- og Kulturstyrelsen (www.slks.dk)</publisher>
<language>Danish</language>
<rightsList>
<rights>Public</rights>
</rightsList>
<resourceType resourceTypeGeneral="Other">Dataset</resourceType>
</resource>
<dr:CobjCategory type="dataset">0021</dr:CobjCategory>
<oaf:dateAccepted>2020-01-01</oaf:dateAccepted>
<oaf:accessrights>UNKNOWN</oaf:accessrights>
<oaf:language>Danish</oaf:language>
<oaf:hostedBy name="B2FIND"
id="re3data_____::r3d100012377" />
<oaf:collectedFrom name="B2FIND"
id="re3data_____::r3d100012377" />
</metadata>
</record>

12
pom.xml
View File

@ -200,11 +200,11 @@
<version>${dhp.commons.lang.version}</version> <version>${dhp.commons.lang.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>commons-validator</groupId> <groupId>com.github.sisyphsu</groupId>
<artifactId>commons-validator</artifactId> <artifactId>dateparser</artifactId>
<version>1.7</version> <version>1.0.7</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.google.guava</groupId> <groupId>com.google.guava</groupId>
@ -736,7 +736,7 @@
<mockito-core.version>3.3.3</mockito-core.version> <mockito-core.version>3.3.3</mockito-core.version>
<mongodb.driver.version>3.4.2</mongodb.driver.version> <mongodb.driver.version>3.4.2</mongodb.driver.version>
<vtd.version>[2.12,3.0)</vtd.version> <vtd.version>[2.12,3.0)</vtd.version>
<dhp-schemas.version>[2.5.11]</dhp-schemas.version> <dhp-schemas.version>[2.5.12]</dhp-schemas.version>
<dnet-actionmanager-api.version>[4.0.3]</dnet-actionmanager-api.version> <dnet-actionmanager-api.version>[4.0.3]</dnet-actionmanager-api.version>
<dnet-actionmanager-common.version>[6.0.5]</dnet-actionmanager-common.version> <dnet-actionmanager-common.version>[6.0.5]</dnet-actionmanager-common.version>
<dnet-openaire-broker-common.version>[3.1.6]</dnet-openaire-broker-common.version> <dnet-openaire-broker-common.version>[3.1.6]</dnet-openaire-broker-common.version>