1
0
Fork 0

Merge remote-tracking branch 'origin/beta' into beta

This commit is contained in:
Miriam Baglioni 2024-06-11 10:28:51 +02:00
commit 8fe934810f
45 changed files with 1568 additions and 250 deletions

View File

@ -0,0 +1,104 @@
package eu.dnetlib.dhp.schema.oaf.utils;
import static eu.dnetlib.dhp.schema.common.ModelConstants.CROSSREF_ID;
import java.util.*;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Result;
public class MergeEntitiesComparator implements Comparator<Oaf> {
static final List<String> PID_AUTHORITIES = Arrays
.asList(
ModelConstants.ARXIV_ID,
ModelConstants.PUBMED_CENTRAL_ID,
ModelConstants.EUROPE_PUBMED_CENTRAL_ID,
ModelConstants.DATACITE_ID,
ModelConstants.CROSSREF_ID);
static final List<String> RESULT_TYPES = Arrays
.asList(
ModelConstants.ORP_RESULTTYPE_CLASSID,
ModelConstants.SOFTWARE_RESULTTYPE_CLASSID,
ModelConstants.DATASET_RESULTTYPE_CLASSID,
ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
public static final Comparator<Oaf> INSTANCE = new MergeEntitiesComparator();
@Override
public int compare(Oaf left, Oaf right) {
if (left == null && right == null)
return 0;
if (left == null)
return -1;
if (right == null)
return 1;
int res = 0;
// pid authority
int cfp1 = left
.getCollectedfrom()
.stream()
.map(kv -> PID_AUTHORITIES.indexOf(kv.getKey()))
.max(Integer::compare)
.orElse(-1);
int cfp2 = right
.getCollectedfrom()
.stream()
.map(kv -> PID_AUTHORITIES.indexOf(kv.getKey()))
.max(Integer::compare)
.orElse(-1);
if (cfp1 >= 0 && cfp1 > cfp2) {
return 1;
} else if (cfp2 >= 0 && cfp2 > cfp1) {
return -1;
}
// trust
if (left.getDataInfo() != null && right.getDataInfo() != null) {
res = left.getDataInfo().getTrust().compareTo(right.getDataInfo().getTrust());
}
// result type
if (res == 0) {
if (left instanceof Result && right instanceof Result) {
Result r1 = (Result) left;
Result r2 = (Result) right;
if (r1.getResulttype() == null || r1.getResulttype().getClassid() == null) {
if (r2.getResulttype() != null && r2.getResulttype().getClassid() != null) {
return -1;
}
} else if (r2.getResulttype() == null || r2.getResulttype().getClassid() == null) {
return 1;
}
int rt1 = RESULT_TYPES.indexOf(r1.getResulttype().getClassid());
int rt2 = RESULT_TYPES.indexOf(r2.getResulttype().getClassid());
if (rt1 >= 0 && rt1 > rt2) {
return 1;
} else if (rt2 >= 0 && rt2 > rt1) {
return -1;
}
}
}
// id
if (res == 0) {
if (left instanceof OafEntity && right instanceof OafEntity) {
res = ((OafEntity) left).getId().compareTo(((OafEntity) right).getId());
}
}
return res;
}
}

View File

@ -40,27 +40,12 @@ public class MergeUtils {
public static <T extends Oaf> T mergeGroup(String s, Iterator<T> oafEntityIterator, public static <T extends Oaf> T mergeGroup(String s, Iterator<T> oafEntityIterator,
boolean checkDelegateAuthority) { boolean checkDelegateAuthority) {
TreeSet<T> sortedEntities = new TreeSet<>((o1, o2) -> {
int res = 0;
if (o1.getDataInfo() != null && o2.getDataInfo() != null) { ArrayList<T> sortedEntities = new ArrayList<>();
res = o1.getDataInfo().getTrust().compareTo(o2.getDataInfo().getTrust()); oafEntityIterator.forEachRemaining(sortedEntities::add);
} sortedEntities.sort(MergeEntitiesComparator.INSTANCE.reversed());
if (res == 0) { Iterator<T> it = sortedEntities.iterator();
if (o1 instanceof Result && o2 instanceof Result) {
return ResultTypeComparator.INSTANCE.compare((Result) o1, (Result) o2);
}
}
return res;
});
while (oafEntityIterator.hasNext()) {
sortedEntities.add(oafEntityIterator.next());
}
Iterator<T> it = sortedEntities.descendingIterator();
T merged = it.next(); T merged = it.next();
while (it.hasNext()) { while (it.hasNext()) {
@ -143,7 +128,7 @@ public class MergeUtils {
* https://graph.openaire.eu/docs/data-model/pids-and-identifiers#delegated-authorities and in that case it prefers * https://graph.openaire.eu/docs/data-model/pids-and-identifiers#delegated-authorities and in that case it prefers
* such version. * such version.
* <p> * <p>
* Otherwise, it considers a resulttype priority order implemented in {@link ResultTypeComparator} * Otherwise, it considers a resulttype priority order implemented in {@link MergeEntitiesComparator}
* and proceeds with the canonical property merging. * and proceeds with the canonical property merging.
* *
* @param left * @param left
@ -161,8 +146,9 @@ public class MergeUtils {
if (!leftFromDelegatedAuthority && rightFromDelegatedAuthority) { if (!leftFromDelegatedAuthority && rightFromDelegatedAuthority) {
return right; return right;
} }
// TODO: raise trust to have preferred fields from one or the other?? // TODO: raise trust to have preferred fields from one or the other??
if (new ResultTypeComparator().compare(left, right) < 0) { if (MergeEntitiesComparator.INSTANCE.compare(left, right) > 0) {
return mergeResultFields(left, right); return mergeResultFields(left, right);
} else { } else {
return mergeResultFields(right, left); return mergeResultFields(right, left);
@ -225,9 +211,9 @@ public class MergeUtils {
private static <T, K> List<T> mergeLists(final List<T> left, final List<T> right, int trust, private static <T, K> List<T> mergeLists(final List<T> left, final List<T> right, int trust,
Function<T, K> keyExtractor, BinaryOperator<T> merger) { Function<T, K> keyExtractor, BinaryOperator<T> merger) {
if (left == null) { if (left == null || left.isEmpty()) {
return right; return right != null ? right : new ArrayList<>();
} else if (right == null) { } else if (right == null || right.isEmpty()) {
return left; return left;
} }
@ -405,7 +391,7 @@ public class MergeUtils {
} }
// should be an instance attribute, get the first non-null value // should be an instance attribute, get the first non-null value
merge.setLanguage(coalesce(merge.getLanguage(), enrich.getLanguage())); merge.setLanguage(coalesceQualifier(merge.getLanguage(), enrich.getLanguage()));
// distinct countries, do not manage datainfo // distinct countries, do not manage datainfo
merge.setCountry(mergeQualifiers(merge.getCountry(), enrich.getCountry(), trust)); merge.setCountry(mergeQualifiers(merge.getCountry(), enrich.getCountry(), trust));
@ -575,6 +561,13 @@ public class MergeUtils {
return m != null ? m : e; return m != null ? m : e;
} }
private static Qualifier coalesceQualifier(Qualifier m, Qualifier e) {
if (m == null || m.getClassid() == null || StringUtils.isBlank(m.getClassid())) {
return e;
}
return m;
}
private static List<Author> mergeAuthors(List<Author> author, List<Author> author1, int trust) { private static List<Author> mergeAuthors(List<Author> author, List<Author> author1, int trust) {
List<List<Author>> authors = new ArrayList<>(); List<List<Author>> authors = new ArrayList<>();
if (author != null) { if (author != null) {
@ -587,6 +580,10 @@ public class MergeUtils {
} }
private static String instanceKeyExtractor(Instance i) { private static String instanceKeyExtractor(Instance i) {
// three levels of concatenating:
// 1. ::
// 2. @@
// 3. ||
return String return String
.join( .join(
"::", "::",
@ -594,10 +591,10 @@ public class MergeUtils {
kvKeyExtractor(i.getCollectedfrom()), kvKeyExtractor(i.getCollectedfrom()),
qualifierKeyExtractor(i.getAccessright()), qualifierKeyExtractor(i.getAccessright()),
qualifierKeyExtractor(i.getInstancetype()), qualifierKeyExtractor(i.getInstancetype()),
Optional.ofNullable(i.getUrl()).map(u -> String.join("::", u)).orElse(null), Optional.ofNullable(i.getUrl()).map(u -> String.join("@@", u)).orElse(null),
Optional Optional
.ofNullable(i.getPid()) .ofNullable(i.getPid())
.map(pp -> pp.stream().map(MergeUtils::spKeyExtractor).collect(Collectors.joining("::"))) .map(pp -> pp.stream().map(MergeUtils::spKeyExtractor).collect(Collectors.joining("@@")))
.orElse(null)); .orElse(null));
} }
@ -706,7 +703,7 @@ public class MergeUtils {
private static String spKeyExtractor(StructuredProperty sp) { private static String spKeyExtractor(StructuredProperty sp) {
return Optional return Optional
.ofNullable(sp) .ofNullable(sp)
.map(s -> Joiner.on("::").join(s, qualifierKeyExtractor(s.getQualifier()))) .map(s -> Joiner.on("||").join(qualifierKeyExtractor(s.getQualifier()), s.getValue()))
.orElse(null); .orElse(null);
} }

View File

@ -1,87 +0,0 @@
package eu.dnetlib.dhp.schema.oaf.utils;
import static eu.dnetlib.dhp.schema.common.ModelConstants.CROSSREF_ID;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Optional;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Result;
public class ResultTypeComparator implements Comparator<Result> {
public static final ResultTypeComparator INSTANCE = new ResultTypeComparator();
@Override
public int compare(Result left, Result right) {
if (left == null && right == null)
return 0;
if (left == null)
return 1;
if (right == null)
return -1;
HashSet<String> lCf = getCollectedFromIds(left);
HashSet<String> rCf = getCollectedFromIds(right);
if (lCf.contains(CROSSREF_ID) && !rCf.contains(CROSSREF_ID)) {
return -1;
}
if (!lCf.contains(CROSSREF_ID) && rCf.contains(CROSSREF_ID)) {
return 1;
}
if (left.getResulttype() == null || left.getResulttype().getClassid() == null) {
if (right.getResulttype() == null || right.getResulttype().getClassid() == null) {
return 0;
}
return 1;
} else if (right.getResulttype() == null || right.getResulttype().getClassid() == null) {
return -1;
}
String lClass = left.getResulttype().getClassid();
String rClass = right.getResulttype().getClassid();
if (!lClass.equals(rClass)) {
if (lClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID))
return -1;
if (rClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID))
return 1;
if (lClass.equals(ModelConstants.DATASET_RESULTTYPE_CLASSID))
return -1;
if (rClass.equals(ModelConstants.DATASET_RESULTTYPE_CLASSID))
return 1;
if (lClass.equals(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID))
return -1;
if (rClass.equals(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID))
return 1;
if (lClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID))
return -1;
if (rClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID))
return 1;
}
// Else (but unlikely), lexicographical ordering will do.
return lClass.compareTo(rClass);
}
protected HashSet<String> getCollectedFromIds(Result left) {
return Optional
.ofNullable(left.getCollectedfrom())
.map(
cf -> cf
.stream()
.map(KeyValue::getKey)
.collect(Collectors.toCollection(HashSet::new)))
.orElse(new HashSet<>());
}
}

View File

@ -0,0 +1,113 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<artifactId>dhp</artifactId>
<groupId>eu.dnetlib.dhp</groupId>
<version>1.2.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dhp-shade-package</artifactId>
<description>This module create a jar of all module dependencies</description>
<build>
<plugins>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer>
<mainClass>eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels</mainClass>
</transformer>
<transformer />
<transformer>
<resource>META-INF/cxf/bus-extensions.txt</resource>
</transformer>
</transformers>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/maven/**</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>com</pattern>
<shadedPattern>repackaged.com.google.common</shadedPattern>
<includes>
<include>com.google.common.**</include>
</includes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.28</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.6.1</version>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>junit-jupiter-api</artifactId>
<groupId>org.junit.jupiter</groupId>
</exclusion>
<exclusion>
<artifactId>junit-jupiter-params</artifactId>
<groupId>org.junit.jupiter</groupId>
</exclusion>
<exclusion>
<artifactId>junit-jupiter-engine</artifactId>
<groupId>org.junit.jupiter</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>3.3.3</version>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>byte-buddy</artifactId>
<groupId>net.bytebuddy</groupId>
</exclusion>
<exclusion>
<artifactId>byte-buddy-agent</artifactId>
<groupId>net.bytebuddy</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>3.3.3</version>
<scope>test</scope>
</dependency>
</dependencies>
<distributionManagement>
<site>
<id>DHPSite</id>
<url>${dhp.site.stage.path}/dhp-common</url>
</site>
</distributionManagement>
</project>

View File

@ -103,6 +103,7 @@
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -156,6 +157,7 @@
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}

View File

@ -95,6 +95,7 @@
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}

View File

@ -125,6 +125,7 @@
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}

View File

@ -95,6 +95,7 @@
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}

View File

@ -103,6 +103,7 @@
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -155,11 +156,12 @@
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=2560 --conf spark.sql.shuffle.partitions=8000
</spark-opts> </spark-opts>
<arg>--inputGraphTablePath</arg><arg>${workingDir}/otherresearchproduct</arg> <arg>--inputGraphTablePath</arg><arg>${workingDir}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>

View File

@ -95,6 +95,7 @@
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}

View File

@ -103,11 +103,12 @@
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7000 --conf spark.sql.shuffle.partitions=15000
</spark-opts> </spark-opts>
<arg>--inputGraphTablePath</arg><arg>${inputGraphRootPath}/publication</arg> <arg>--inputGraphTablePath</arg><arg>${inputGraphRootPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
@ -156,11 +157,12 @@
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7000 --conf spark.sql.shuffle.partitions=15000
</spark-opts> </spark-opts>
<arg>--inputGraphTablePath</arg><arg>${workingDir}/publication</arg> <arg>--inputGraphTablePath</arg><arg>${workingDir}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>

View File

@ -95,11 +95,12 @@
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=10000 --conf spark.sql.shuffle.partitions=15000
</spark-opts> </spark-opts>
<arg>--inputGraphTablePath</arg><arg>${inputGraphRootPath}/relation</arg> <arg>--inputGraphTablePath</arg><arg>${inputGraphRootPath}/relation</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg>

View File

@ -103,6 +103,7 @@
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -155,11 +156,12 @@
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=2560 --conf spark.sql.shuffle.partitions=4000
</spark-opts> </spark-opts>
<arg>--inputGraphTablePath</arg><arg>${workingDir}/software</arg> <arg>--inputGraphTablePath</arg><arg>${workingDir}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>

View File

@ -1,6 +1,7 @@
package eu.dnetlib.dhp.collection.plugin.rest; package eu.dnetlib.dhp.collection.plugin.rest;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Spliterator; import java.util.Spliterator;
import java.util.Spliterators; import java.util.Spliterators;
@ -9,6 +10,8 @@ import java.util.stream.StreamSupport;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import com.google.gson.Gson;
import eu.dnetlib.dhp.collection.ApiDescriptor; import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport; import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
@ -47,6 +50,9 @@ public class RestCollectorPlugin implements CollectorPlugin {
final String entityXpath = api.getParams().get("entityXpath"); final String entityXpath = api.getParams().get("entityXpath");
final String authMethod = api.getParams().get("authMethod"); final String authMethod = api.getParams().get("authMethod");
final String authToken = api.getParams().get("authToken"); final String authToken = api.getParams().get("authToken");
final String requestHeaderMap = api.getParams().get("requestHeaderMap");
Gson gson = new Gson();
Map requestHeaders = gson.fromJson(requestHeaderMap, Map.class);
final String resultSizeValue = Optional final String resultSizeValue = Optional
.ofNullable(api.getParams().get("resultSizeValue")) .ofNullable(api.getParams().get("resultSizeValue"))
.filter(StringUtils::isNotBlank) .filter(StringUtils::isNotBlank)
@ -64,9 +70,6 @@ public class RestCollectorPlugin implements CollectorPlugin {
if (StringUtils.isBlank(resultFormatValue)) { if (StringUtils.isBlank(resultFormatValue)) {
throw new CollectorException("Param 'resultFormatValue' is null or empty"); throw new CollectorException("Param 'resultFormatValue' is null or empty");
} }
if (StringUtils.isBlank(queryParams)) {
throw new CollectorException("Param 'queryParams' is null or empty");
}
if (StringUtils.isBlank(entityXpath)) { if (StringUtils.isBlank(entityXpath)) {
throw new CollectorException("Param 'entityXpath' is null or empty"); throw new CollectorException("Param 'entityXpath' is null or empty");
} }
@ -92,7 +95,8 @@ public class RestCollectorPlugin implements CollectorPlugin {
entityXpath, entityXpath,
authMethod, authMethod,
authToken, authToken,
resultOutputFormat); resultOutputFormat,
requestHeaders);
return StreamSupport return StreamSupport
.stream( .stream(

View File

@ -9,8 +9,11 @@ import java.net.URL;
import java.net.URLEncoder; import java.net.URLEncoder;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.xml.transform.OutputKeys; import javax.xml.transform.OutputKeys;
import javax.xml.transform.Transformer; import javax.xml.transform.Transformer;
@ -18,22 +21,18 @@ import javax.xml.transform.TransformerConfigurationException;
import javax.xml.transform.TransformerFactory; import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource; import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult; import javax.xml.transform.stream.StreamResult;
import javax.xml.xpath.XPath; import javax.xml.xpath.*;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpression;
import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.entity.ContentType;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.w3c.dom.Node; import org.w3c.dom.Node;
import org.w3c.dom.NodeList; import org.w3c.dom.NodeList;
import org.xml.sax.InputSource; import org.xml.sax.InputSource;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.collection.plugin.utils.JsonUtils; import eu.dnetlib.dhp.collection.plugin.utils.JsonUtils;
import eu.dnetlib.dhp.common.collection.CollectorException; import eu.dnetlib.dhp.common.collection.CollectorException;
import eu.dnetlib.dhp.common.collection.HttpClientParams; import eu.dnetlib.dhp.common.collection.HttpClientParams;
@ -48,20 +47,23 @@ import eu.dnetlib.dhp.common.collection.HttpClientParams;
* *
*/ */
public class RestIterator implements Iterator<String> { public class RestIterator implements Iterator<String> {
private static final Logger log = LoggerFactory.getLogger(RestIterator.class); private static final Logger log = LoggerFactory.getLogger(RestIterator.class);
public static final String UTF_8 = "UTF-8"; public static final String UTF_8 = "UTF-8";
private static final int MAX_ATTEMPTS = 5; private static final int MAX_ATTEMPTS = 5;
private final HttpClientParams clientParams; private final HttpClientParams clientParams;
private final String BASIC = "basic"; private final String AUTHBASIC = "basic";
private static final String XML_HEADER = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
private static final String EMPTY_XML = XML_HEADER + "<" + JsonUtils.XML_WRAP_TAG + "></" + JsonUtils.XML_WRAP_TAG
+ ">";
private final String baseUrl; private final String baseUrl;
private final String resumptionType; private final String resumptionType;
private final String resumptionParam; private final String resumptionParam;
private final String resultFormatValue; private final String resultFormatValue;
private String queryParams; private String queryParams = "";
private final int resultSizeValue; private final int resultSizeValue;
private int resumptionInt = 0; // integer resumption token (first record to harvest) private int resumptionInt = 0; // integer resumption token (first record to harvest)
private int resultTotal = -1; private int resultTotal = -1;
@ -89,6 +91,11 @@ public class RestIterator implements Iterator<String> {
*/ */
private final String resultOutputFormat; private final String resultOutputFormat;
/*
* Can be used to set additional request headers, like for content negotiation
*/
private Map<String, String> requestHeaders;
/** /**
* RestIterator class compatible to version 1.3.33 * RestIterator class compatible to version 1.3.33
*/ */
@ -107,7 +114,8 @@ public class RestIterator implements Iterator<String> {
final String entityXpath, final String entityXpath,
final String authMethod, final String authMethod,
final String authToken, final String authToken,
final String resultOutputFormat) { final String resultOutputFormat,
final Map<String, String> requestHeaders) {
this.clientParams = clientParams; this.clientParams = clientParams;
this.baseUrl = baseUrl; this.baseUrl = baseUrl;
@ -119,6 +127,7 @@ public class RestIterator implements Iterator<String> {
this.authMethod = authMethod; this.authMethod = authMethod;
this.authToken = authToken; this.authToken = authToken;
this.resultOutputFormat = resultOutputFormat; this.resultOutputFormat = resultOutputFormat;
this.requestHeaders = requestHeaders != null ? requestHeaders : Maps.newHashMap();
this.queryFormat = StringUtils.isNotBlank(resultFormatParam) ? "&" + resultFormatParam + "=" + resultFormatValue this.queryFormat = StringUtils.isNotBlank(resultFormatParam) ? "&" + resultFormatParam + "=" + resultFormatValue
: ""; : "";
@ -148,7 +157,12 @@ public class RestIterator implements Iterator<String> {
} }
private void initQueue() { private void initQueue() {
this.query = this.baseUrl + "?" + this.queryParams + this.querySize + this.queryFormat; if (queryParams.equals("") && querySize.equals("") && queryFormat.equals("")) {
query = baseUrl;
} else {
query = baseUrl + "?" + queryParams + querySize + queryFormat;
}
log.info("REST calls starting with {}", this.query); log.info("REST calls starting with {}", this.query);
} }
@ -209,9 +223,8 @@ public class RestIterator implements Iterator<String> {
try { try {
String resultJson; String resultJson;
String resultXml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"; String resultXml = XML_HEADER;
String nextQuery = ""; String nextQuery = "";
final String emptyXml = resultXml + "<" + JsonUtils.XML_WRAP_TAG + "></" + JsonUtils.XML_WRAP_TAG + ">";
Node resultNode = null; Node resultNode = null;
NodeList nodeList = null; NodeList nodeList = null;
String qUrlArgument = ""; String qUrlArgument = "";
@ -226,37 +239,47 @@ public class RestIterator implements Iterator<String> {
} }
} }
// find pagination page start number in queryParam and remove before start the first query
if ((resumptionType.toLowerCase().equals("pagination") || resumptionType.toLowerCase().equals("page"))
&& (query.contains("paginationStart="))) {
final Matcher m = Pattern.compile("paginationStart=([0-9]+)").matcher(query);
m.find(); // guaranteed to be true for this regex
String[] pageVal = m.group(0).split("=");
pagination = Integer.parseInt(pageVal[1]);
// remove page start number from queryParams
query = query.replaceFirst("&?paginationStart=[0-9]+", "");
}
try { try {
log.info("requesting URL [{}]", query); log.info("requesting URL [{}]", query);
final URL qUrl = new URL(query); final URL qUrl = new URL(query);
log.debug("authMethod: {}", this.authMethod); log.debug("authMethod: {}", this.authMethod);
if ("bearer".equalsIgnoreCase(this.authMethod)) { if (this.authMethod == "bearer") {
log.trace("authMethod before inputStream: {}", resultXml); log.trace("RestIterator.downloadPage():: authMethod before inputStream: " + resultXml);
final HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection(); requestHeaders.put("Authorization", "Bearer " + authToken);
conn.setRequestProperty(HttpHeaders.AUTHORIZATION, "Bearer " + this.authToken); // requestHeaders.put("Content-Type", "application/json");
conn.setRequestProperty(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType()); } else if (AUTHBASIC.equalsIgnoreCase(this.authMethod)) {
conn.setRequestMethod("GET"); log.trace("RestIterator.downloadPage():: authMethod before inputStream: " + resultXml);
theHttpInputStream = conn.getInputStream(); requestHeaders.put("Authorization", "Basic " + authToken);
} else if (this.BASIC.equalsIgnoreCase(this.authMethod)) { // requestHeaders.put("accept", "application/xml");
log.trace("authMethod before inputStream: {}", resultXml);
final HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection();
conn.setRequestProperty(HttpHeaders.AUTHORIZATION, "Basic " + this.authToken);
conn.setRequestProperty(HttpHeaders.ACCEPT, ContentType.APPLICATION_XML.getMimeType());
conn.setRequestMethod("GET");
theHttpInputStream = conn.getInputStream();
} else {
theHttpInputStream = qUrl.openStream();
} }
HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection();
conn.setRequestMethod("GET");
this.setRequestHeader(conn);
resultStream = conn.getInputStream();
this.resultStream = theHttpInputStream;
if ("json".equals(this.resultOutputFormat)) { if ("json".equals(this.resultOutputFormat)) {
resultJson = IOUtils.toString(this.resultStream, StandardCharsets.UTF_8); resultJson = IOUtils.toString(this.resultStream, StandardCharsets.UTF_8);
resultXml = JsonUtils.convertToXML(resultJson); resultXml = JsonUtils.convertToXML(resultJson);
this.resultStream = IOUtils.toInputStream(resultXml, UTF_8); this.resultStream = IOUtils.toInputStream(resultXml, UTF_8);
} }
if (!(emptyXml).equalsIgnoreCase(resultXml)) { if (!isEmptyXml(resultXml)) {
resultNode = (Node) this.xpath resultNode = (Node) this.xpath
.evaluate("/", new InputSource(this.resultStream), XPathConstants.NODE); .evaluate("/", new InputSource(this.resultStream), XPathConstants.NODE);
nodeList = (NodeList) this.xprEntity.evaluate(resultNode, XPathConstants.NODESET); nodeList = (NodeList) this.xprEntity.evaluate(resultNode, XPathConstants.NODESET);
@ -265,8 +288,7 @@ public class RestIterator implements Iterator<String> {
final StringWriter sw = new StringWriter(); final StringWriter sw = new StringWriter();
this.transformer.transform(new DOMSource(nodeList.item(i)), new StreamResult(sw)); this.transformer.transform(new DOMSource(nodeList.item(i)), new StreamResult(sw));
final String toEnqueue = sw.toString(); final String toEnqueue = sw.toString();
if ((toEnqueue == null) || StringUtils.isBlank(toEnqueue) if ((toEnqueue == null) || StringUtils.isBlank(toEnqueue) || isEmptyXml(toEnqueue)) {
|| emptyXml.equalsIgnoreCase(toEnqueue)) {
log log
.warn( .warn(
"The following record resulted in empty item for the feeding queue: {}", resultXml); "The following record resulted in empty item for the feeding queue: {}", resultXml);
@ -294,6 +316,7 @@ public class RestIterator implements Iterator<String> {
throw new CollectorException("Mode: discover, Param 'resultSizeValue' is less than 2"); throw new CollectorException("Mode: discover, Param 'resultSizeValue' is less than 2");
} }
qUrlArgument = qUrl.getQuery(); qUrlArgument = qUrl.getQuery();
final String[] arrayQUrlArgument = qUrlArgument.split("&"); final String[] arrayQUrlArgument = qUrlArgument.split("&");
for (final String arrayUrlArgStr : arrayQUrlArgument) { for (final String arrayUrlArgStr : arrayQUrlArgument) {
if (arrayUrlArgStr.startsWith(this.resumptionParam)) { if (arrayUrlArgStr.startsWith(this.resumptionParam)) {
@ -307,7 +330,7 @@ public class RestIterator implements Iterator<String> {
} }
} }
if (((emptyXml).equalsIgnoreCase(resultXml)) if (isEmptyXml(resultXml)
|| ((nodeList != null) && (nodeList.getLength() < this.resultSizeValue))) { || ((nodeList != null) && (nodeList.getLength() < this.resultSizeValue))) {
// resumptionStr = ""; // resumptionStr = "";
if (nodeList != null) { if (nodeList != null) {
@ -326,13 +349,13 @@ public class RestIterator implements Iterator<String> {
case "pagination": case "pagination":
case "page": // pagination, iterate over page numbers case "page": // pagination, iterate over page numbers
this.pagination += 1; if (nodeList != null && nodeList.getLength() > 0) {
if (nodeList != null) {
this.discoverResultSize += nodeList.getLength(); this.discoverResultSize += nodeList.getLength();
} else { } else {
this.resultTotal = this.discoverResultSize; this.resultTotal = this.discoverResultSize;
this.pagination = this.discoverResultSize; this.pagination = this.discoverResultSize;
} }
this.pagination += 1;
this.resumptionInt = this.pagination; this.resumptionInt = this.pagination;
this.resumptionStr = Integer.toString(this.resumptionInt); this.resumptionStr = Integer.toString(this.resumptionInt);
break; break;
@ -380,7 +403,8 @@ public class RestIterator implements Iterator<String> {
try { try {
if (this.resultTotal == -1) { if (this.resultTotal == -1) {
this.resultTotal = Integer.parseInt(this.xprResultTotalPath.evaluate(resultNode)); this.resultTotal = Integer.parseInt(this.xprResultTotalPath.evaluate(resultNode));
if ("page".equalsIgnoreCase(this.resumptionType) && !this.BASIC.equalsIgnoreCase(this.authMethod)) { if ("page".equalsIgnoreCase(this.resumptionType)
&& !this.AUTHBASIC.equalsIgnoreCase(this.authMethod)) {
this.resultTotal += 1; this.resultTotal += 1;
} // to correct the upper bound } // to correct the upper bound
log.info("resultTotal was -1 is now: " + this.resultTotal); log.info("resultTotal was -1 is now: " + this.resultTotal);
@ -409,6 +433,10 @@ public class RestIterator implements Iterator<String> {
} }
private boolean isEmptyXml(String s) {
return EMPTY_XML.equalsIgnoreCase(s);
}
private boolean isInteger(final String s) { private boolean isInteger(final String s) {
boolean isValidInteger = false; boolean isValidInteger = false;
try { try {
@ -433,6 +461,22 @@ public class RestIterator implements Iterator<String> {
} }
} }
/**
* setRequestHeader
*
* setRequestProperty: Sets the general request property. If a property with the key already exists, overwrite its value with the new value.
* @param conn
*/
private void setRequestHeader(HttpURLConnection conn) {
if (requestHeaders != null) {
for (String key : requestHeaders.keySet()) {
conn.setRequestProperty(key, requestHeaders.get(key));
}
log.debug("Set Request Header with: " + requestHeaders);
}
}
public String getResultFormatValue() { public String getResultFormatValue() {
return this.resultFormatValue; return this.resultFormatValue;
} }

View File

@ -8,7 +8,10 @@ import java.io.StringWriter;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder; import java.nio.charset.CharsetDecoder;
import java.nio.charset.CodingErrorAction; import java.nio.charset.CodingErrorAction;
import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import javax.xml.stream.XMLEventFactory; import javax.xml.stream.XMLEventFactory;
import javax.xml.stream.XMLEventReader; import javax.xml.stream.XMLEventReader;
@ -19,6 +22,7 @@ import javax.xml.stream.XMLStreamException;
import javax.xml.stream.events.StartElement; import javax.xml.stream.events.StartElement;
import javax.xml.stream.events.XMLEvent; import javax.xml.stream.events.XMLEvent;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -58,13 +62,23 @@ public class XMLIterator implements Iterator<String> {
private String element; private String element;
private List<String> elements;
private InputStream inputStream; private InputStream inputStream;
public XMLIterator(final String element, final InputStream inputStream) { public XMLIterator(final String element, final InputStream inputStream) {
super(); super();
this.element = element; this.element = element;
if (element.contains(",")) {
elements = Arrays
.stream(element.split(","))
.filter(StringUtils::isNoneBlank)
.map(String::toLowerCase)
.collect(Collectors.toList());
}
this.inputStream = inputStream; this.inputStream = inputStream;
this.parser = getParser(); this.parser = getParser();
try { try {
this.current = findElement(parser); this.current = findElement(parser);
} catch (XMLStreamException e) { } catch (XMLStreamException e) {
@ -113,7 +127,7 @@ public class XMLIterator implements Iterator<String> {
final XMLEvent event = parser.nextEvent(); final XMLEvent event = parser.nextEvent();
// TODO: replace with depth tracking instead of close tag tracking. // TODO: replace with depth tracking instead of close tag tracking.
if (event.isEndElement() && event.asEndElement().getName().getLocalPart().equals(element)) { if (event.isEndElement() && isCheckTag(event.asEndElement().getName().getLocalPart())) {
writer.add(event); writer.add(event);
break; break;
} }
@ -142,31 +156,48 @@ public class XMLIterator implements Iterator<String> {
XMLEvent peek = parser.peek(); XMLEvent peek = parser.peek();
if (peek != null && peek.isStartElement()) { if (peek != null && peek.isStartElement()) {
String name = peek.asStartElement().getName().getLocalPart(); String name = peek.asStartElement().getName().getLocalPart();
if (element.equals(name)) { if (isCheckTag(name))
return peek; return peek;
} }
}
while (parser.hasNext()) { while (parser.hasNext()) {
final XMLEvent event = parser.nextEvent(); XMLEvent event = parser.nextEvent();
if (event != null && event.isStartElement()) { if (event != null && event.isStartElement()) {
String name = event.asStartElement().getName().getLocalPart(); String name = event.asStartElement().getName().getLocalPart();
if (element.equals(name)) { if (isCheckTag(name))
return event; return event;
} }
} }
}
return null; return null;
} }
private XMLEventReader getParser() { private XMLEventReader getParser() {
try { try {
return inputFactory.get().createXMLEventReader(sanitize(inputStream)); XMLInputFactory xif = inputFactory.get();
xif.setProperty(XMLInputFactory.SUPPORT_DTD, false);
return xif.createXMLEventReader(sanitize(inputStream));
} catch (XMLStreamException e) { } catch (XMLStreamException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
private boolean isCheckTag(final String tagName) {
if (elements != null) {
final String found = elements
.stream()
.filter(e -> e.equalsIgnoreCase(tagName))
.findFirst()
.orElse(null);
if (found != null)
return true;
} else {
if (element.equalsIgnoreCase(tagName)) {
return true;
}
}
return false;
}
private Reader sanitize(final InputStream in) { private Reader sanitize(final InputStream in) {
final CharsetDecoder charsetDecoder = Charset.forName(UTF_8).newDecoder(); final CharsetDecoder charsetDecoder = Charset.forName(UTF_8).newDecoder();
charsetDecoder.onMalformedInput(CodingErrorAction.REPLACE); charsetDecoder.onMalformedInput(CodingErrorAction.REPLACE);

View File

@ -0,0 +1,64 @@
package eu.dnetlib.dhp.collection.plugin.file;
import java.io.IOException;
import java.util.HashMap;
import java.util.Objects;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.collection.CollectorException;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@ExtendWith(MockitoExtension.class)
public class FileGZipMultipleNodeTest {
private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPluginTest.class);
private final ApiDescriptor api = new ApiDescriptor();
private FileGZipCollectorPlugin plugin;
private static final String SPLIT_ON_ELEMENT = "incollection,article";
@BeforeEach
public void setUp() throws IOException {
final String gzipFile = Objects
.requireNonNull(
this
.getClass()
.getResource("/eu/dnetlib/dhp/collection/plugin/file/dblp.gz"))
.getFile();
api.setBaseUrl(gzipFile);
HashMap<String, String> params = new HashMap<>();
params.put("splitOnElement", SPLIT_ON_ELEMENT);
api.setParams(params);
FileSystem fs = FileSystem.get(new Configuration());
plugin = new FileGZipCollectorPlugin(fs);
}
@Test
void test() throws CollectorException {
final Stream<String> stream = plugin.collect(api, new AggregatorReport());
stream.limit(10).forEach(s -> {
Assertions.assertTrue(s.length() > 0);
log.info(s);
});
}
}

View File

@ -36,11 +36,11 @@ public class OsfPreprintCollectorTest {
private final String resultTotalXpath = "/*/*[local-name()='links']/*[local-name()='meta']/*[local-name()='total']"; private final String resultTotalXpath = "/*/*[local-name()='links']/*[local-name()='meta']/*[local-name()='total']";
private final String resumptionParam = "page"; private final String resumptionParam = "page";
private final String resumptionType = "page"; private final String resumptionType = "scan";
private final String resumptionXpath = "/*/*[local-name()='links']/*[local-name()='next']"; private final String resumptionXpath = "substring-before(substring-after(/*/*[local-name()='links']/*[local-name()='next'], 'page='), '&')";
private final String resultSizeParam = ""; private final String resultSizeParam = "page[size]";
private final String resultSizeValue = ""; private final String resultSizeValue = "100";
private final String resultFormatParam = "format"; private final String resultFormatParam = "format";
private final String resultFormatValue = "json"; private final String resultFormatValue = "json";
@ -74,7 +74,7 @@ public class OsfPreprintCollectorTest {
final AtomicInteger i = new AtomicInteger(0); final AtomicInteger i = new AtomicInteger(0);
final Stream<String> stream = this.rcp.collect(this.api, new AggregatorReport()); final Stream<String> stream = this.rcp.collect(this.api, new AggregatorReport());
stream.limit(200).forEach(s -> { stream.limit(2000).forEach(s -> {
Assertions.assertTrue(s.length() > 0); Assertions.assertTrue(s.length() > 0);
i.incrementAndGet(); i.incrementAndGet();
log.info(s); log.info(s);

View File

@ -4,6 +4,11 @@
package eu.dnetlib.dhp.collection.plugin.rest; package eu.dnetlib.dhp.collection.plugin.rest;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap; import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -12,6 +17,8 @@ import org.junit.jupiter.api.*;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import eu.dnetlib.dhp.collection.ApiDescriptor; import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport; import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.collection.CollectorException; import eu.dnetlib.dhp.common.collection.CollectorException;
@ -25,18 +32,18 @@ class RestCollectorPluginTest {
private static final Logger log = LoggerFactory.getLogger(RestCollectorPluginTest.class); private static final Logger log = LoggerFactory.getLogger(RestCollectorPluginTest.class);
private final String baseUrl = "https://share.osf.io/api/v2/search/creativeworks/_search"; private final String baseUrl = "https://ddh-openapi.worldbank.org/search";
private final String resumptionType = "count"; private final String resumptionType = "discover";
private final String resumptionParam = "from"; private final String resumptionParam = "skip";
private final String entityXpath = "//hits/hits"; private final String entityXpath = "//*[local-name()='data']";
private final String resumptionXpath = "//hits"; private final String resumptionXpath = "";
private final String resultTotalXpath = "//hits/total"; private final String resultTotalXpath = "//*[local-name()='count']";
private final String resultFormatParam = "format"; private final String resultFormatParam = "";
private final String resultFormatValue = "json"; private final String resultFormatValue = "json";
private final String resultSizeParam = "size"; private final String resultSizeParam = "top";
private final String resultSizeValue = "10"; private final String resultSizeValue = "10";
// private String query = "q=%28sources%3ASocArXiv+AND+type%3Apreprint%29"; // private String query = "q=%28sources%3ASocArXiv+AND+type%3Apreprint%29";
private final String query = "q=%28sources%3AengrXiv+AND+type%3Apreprint%29"; private final String query = "";
// private String query = "=(sources:engrXiv AND type:preprint)"; // private String query = "=(sources:engrXiv AND type:preprint)";
private final String protocolDescriptor = "rest_json2xml"; private final String protocolDescriptor = "rest_json2xml";
@ -56,6 +63,7 @@ class RestCollectorPluginTest {
params.put("resultSizeValue", resultSizeValue); params.put("resultSizeValue", resultSizeValue);
params.put("queryParams", query); params.put("queryParams", query);
params.put("entityXpath", entityXpath); params.put("entityXpath", entityXpath);
params.put("requestHeaderMap", "{\"User-Agent\": \"OpenAIRE DEV\"}");
api.setBaseUrl(baseUrl); api.setBaseUrl(baseUrl);
api.setParams(params); api.setParams(params);
@ -78,4 +86,19 @@ class RestCollectorPluginTest {
log.info("{}", i.intValue()); log.info("{}", i.intValue());
Assertions.assertTrue(i.intValue() > 0); Assertions.assertTrue(i.intValue() > 0);
} }
@Disabled
@Test
void testUrl() throws IOException {
String url_s = "https://ddh-openapi.worldbank.org/search?&top=10";
URL url = new URL(url_s);
final HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
conn.setRequestProperty("User-Agent", "OpenAIRE");
Gson gson = new Gson();
System.out.println("Request header");
System.out.println(gson.toJson(conn.getHeaderFields()));
InputStream inputStream = conn.getInputStream();
}
} }

View File

@ -44,7 +44,7 @@ public class RestIteratorTest {
final RestIterator iterator = new RestIterator(clientParams, baseUrl, resumptionType, resumptionParam, final RestIterator iterator = new RestIterator(clientParams, baseUrl, resumptionType, resumptionParam,
resumptionXpath, resultTotalXpath, resultFormatParam, resultFormatValue, resultSizeParam, resultSizeValue, resumptionXpath, resultTotalXpath, resultFormatParam, resultFormatValue, resultSizeParam, resultSizeValue,
query, entityXpath, authMethod, authToken, resultOffsetParam); query, entityXpath, authMethod, authToken, resultOffsetParam, null);
int i = 20; int i = 20;
while (iterator.hasNext() && i > 0) { while (iterator.hasNext() && i > 0) {
String result = iterator.next(); String result = iterator.next();

View File

@ -0,0 +1,103 @@
package eu.dnetlib.dhp.oa.dedup;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
import eu.dnetlib.pace.util.MapDocumentUtil;
import scala.Tuple2;
class DatasetMergerTest implements Serializable {
private List<Tuple2<String, Dataset>> datasets;
private String testEntityBasePath;
private DataInfo dataInfo;
private final String dedupId = "50|doi_________::3d18564ef27ebe9ef3bd8b4dec67e148";
private Dataset dataset_top;
@BeforeEach
public void setUp() throws Exception {
testEntityBasePath = Paths
.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/json").toURI())
.toFile()
.getAbsolutePath();
datasets = readSample(testEntityBasePath + "/dataset_merge.json", Dataset.class);
dataset_top = getTopPub(datasets);
dataInfo = setDI();
}
@Test
void datasetMergerTest() throws InstantiationException, IllegalAccessException, InvocationTargetException {
Dataset pub_merged = MergeUtils.mergeGroup(dedupId, datasets.stream().map(Tuple2::_2).iterator());
// verify id
assertEquals(dedupId, pub_merged.getId());
assertEquals(2, pub_merged.getInstance().size());
}
public DataInfo setDI() {
DataInfo dataInfo = new DataInfo();
dataInfo.setTrust("0.9");
dataInfo.setDeletedbyinference(false);
dataInfo.setInferenceprovenance("testing");
dataInfo.setInferred(true);
return dataInfo;
}
public Dataset getTopPub(List<Tuple2<String, Dataset>> publications) {
Double maxTrust = 0.0;
Dataset maxPub = new Dataset();
for (Tuple2<String, Dataset> publication : publications) {
Double pubTrust = Double.parseDouble(publication._2().getDataInfo().getTrust());
if (pubTrust > maxTrust) {
maxTrust = pubTrust;
maxPub = publication._2();
}
}
return maxPub;
}
public <T> List<Tuple2<String, T>> readSample(String path, Class<T> clazz) {
List<Tuple2<String, T>> res = new ArrayList<>();
BufferedReader reader;
try {
reader = new BufferedReader(new FileReader(path));
String line = reader.readLine();
while (line != null) {
res
.add(
new Tuple2<>(
MapDocumentUtil.getJPathString("$.id", line),
new ObjectMapper().readValue(line, clazz)));
// read next line
line = reader.readLine();
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
return res;
}
}

View File

@ -93,14 +93,14 @@ class EntityMergerTest implements Serializable {
assertEquals(pub_top.getJournal().getConferencedate(), pub_merged.getJournal().getConferencedate()); assertEquals(pub_top.getJournal().getConferencedate(), pub_merged.getJournal().getConferencedate());
assertEquals(pub_top.getJournal().getConferenceplace(), pub_merged.getJournal().getConferenceplace()); assertEquals(pub_top.getJournal().getConferenceplace(), pub_merged.getJournal().getConferenceplace());
assertEquals("OPEN", pub_merged.getBestaccessright().getClassid()); assertEquals("OPEN", pub_merged.getBestaccessright().getClassid());
assertEquals(pub_top.getResulttype(), pub_merged.getResulttype()); assertEquals(pub_top.getResulttype().getClassid(), pub_merged.getResulttype().getClassid());
assertEquals(pub_top.getLanguage(), pub_merged.getLanguage()); assertEquals(pub_top.getLanguage().getClassid(), pub_merged.getLanguage().getClassid());
assertEquals(pub_top.getPublisher(), pub_merged.getPublisher()); assertEquals("Elsevier BV", pub_merged.getPublisher().getValue());
assertEquals(pub_top.getEmbargoenddate(), pub_merged.getEmbargoenddate()); assertEquals(pub_top.getEmbargoenddate().getValue(), pub_merged.getEmbargoenddate().getValue());
assertEquals(pub_top.getResourcetype().getClassid(), ""); assertEquals(pub_top.getResourcetype().getClassid(), "");
assertEquals(pub_top.getDateoftransformation(), pub_merged.getDateoftransformation()); assertEquals(pub_top.getDateoftransformation(), pub_merged.getDateoftransformation());
assertEquals(pub_top.getOaiprovenance(), pub_merged.getOaiprovenance()); assertEquals(pub_top.getOaiprovenance(), pub_merged.getOaiprovenance());
assertEquals(pub_top.getDateofcollection(), pub_merged.getDateofcollection()); // assertEquals(pub_top.getDateofcollection(), pub_merged.getDateofcollection());
assertEquals(3, pub_merged.getInstance().size()); assertEquals(3, pub_merged.getInstance().size());
assertEquals(2, pub_merged.getCountry().size()); assertEquals(2, pub_merged.getCountry().size());
assertEquals(0, pub_merged.getSubject().size()); assertEquals(0, pub_merged.getSubject().size());

File diff suppressed because one or more lines are too long

View File

@ -172,7 +172,7 @@ public class SparkBulkTagJob {
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath + "project"); .json(outputPath + "project");
readPath(spark, outputPath + "project", Datasource.class) readPath(spark, outputPath + "project", Project.class)
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")

View File

@ -398,6 +398,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
o.setEcsmevalidated(field(Boolean.toString(rs.getBoolean("ecsmevalidated")), info)); o.setEcsmevalidated(field(Boolean.toString(rs.getBoolean("ecsmevalidated")), info));
o.setEcnutscode(field(Boolean.toString(rs.getBoolean("ecnutscode")), info)); o.setEcnutscode(field(Boolean.toString(rs.getBoolean("ecnutscode")), info));
o.setCountry(prepareQualifierSplitting(rs.getString("country"))); o.setCountry(prepareQualifierSplitting(rs.getString("country")));
o.setOrganizationType(Organization.OrganizationType.valueOf(rs.getString("typology")));
o.setDataInfo(info); o.setDataInfo(info);
o.setLastupdatetimestamp(lastUpdateTimestamp); o.setLastupdatetimestamp(lastUpdateTimestamp);

View File

@ -156,6 +156,7 @@
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -190,6 +191,7 @@
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -224,6 +226,7 @@
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -258,6 +261,7 @@
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -292,6 +296,7 @@
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -326,6 +331,7 @@
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -360,6 +366,7 @@
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -394,6 +401,7 @@
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}

View File

@ -116,17 +116,19 @@
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=10000
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/publication</arg> <arg>--inputPath</arg><arg>${inputPath}/publication</arg>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg> <arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
<arg>--className</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg> <arg>--className</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg> <arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
<arg>--numPartitions</arg><arg>8000</arg> <arg>--numPartitions</arg><arg>10000</arg>
</spark> </spark>
<ok to="join_import"/> <ok to="join_import"/>
<error to="Kill"/> <error to="Kill"/>
@ -143,17 +145,19 @@
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=4000
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/dataset</arg> <arg>--inputPath</arg><arg>${inputPath}/dataset</arg>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg> <arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
<arg>--className</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg> <arg>--className</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg> <arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
<arg>--numPartitions</arg><arg>4000</arg> <arg>--numPartitions</arg><arg>8000</arg>
</spark> </spark>
<ok to="join_import"/> <ok to="join_import"/>
<error to="Kill"/> <error to="Kill"/>
@ -170,11 +174,13 @@
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=8000
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/otherresearchproduct</arg> <arg>--inputPath</arg><arg>${inputPath}/otherresearchproduct</arg>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg> <arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
@ -197,17 +203,19 @@
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=1000
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/software</arg> <arg>--inputPath</arg><arg>${inputPath}/software</arg>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg> <arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
<arg>--className</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg> <arg>--className</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg> <arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
<arg>--numPartitions</arg><arg>300</arg> <arg>--numPartitions</arg><arg>1000</arg>
</spark> </spark>
<ok to="join_import"/> <ok to="join_import"/>
<error to="Kill"/> <error to="Kill"/>
@ -224,17 +232,19 @@
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=200
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/datasource</arg> <arg>--inputPath</arg><arg>${inputPath}/datasource</arg>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg> <arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
<arg>--className</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg> <arg>--className</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg> <arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
<arg>--numPartitions</arg><arg>100</arg> <arg>--numPartitions</arg><arg>200</arg>
</spark> </spark>
<ok to="join_import"/> <ok to="join_import"/>
<error to="Kill"/> <error to="Kill"/>
@ -251,17 +261,19 @@
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=1000
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/organization</arg> <arg>--inputPath</arg><arg>${inputPath}/organization</arg>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg> <arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
<arg>--className</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg> <arg>--className</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg> <arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
<arg>--numPartitions</arg><arg>400</arg> <arg>--numPartitions</arg><arg>1000</arg>
</spark> </spark>
<ok to="join_import"/> <ok to="join_import"/>
<error to="Kill"/> <error to="Kill"/>
@ -278,17 +290,19 @@
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=1000
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/project</arg> <arg>--inputPath</arg><arg>${inputPath}/project</arg>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg> <arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
<arg>--className</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg> <arg>--className</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg> <arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
<arg>--numPartitions</arg><arg>100</arg> <arg>--numPartitions</arg><arg>1000</arg>
</spark> </spark>
<ok to="join_import"/> <ok to="join_import"/>
<error to="Kill"/> <error to="Kill"/>
@ -305,17 +319,19 @@
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=15000
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/relation</arg> <arg>--inputPath</arg><arg>${inputPath}/relation</arg>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg> <arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
<arg>--className</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg> <arg>--className</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg> <arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
<arg>--numPartitions</arg><arg>10000</arg> <arg>--numPartitions</arg><arg>15000</arg>
</spark> </spark>
<ok to="join_import"/> <ok to="join_import"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -28,7 +28,8 @@ SELECT
(array_remove(array_cat(ARRAY[o.ec_internationalorganization], array_agg(od.ec_internationalorganization)), NULL))[1] AS ecinternationalorganization, (array_remove(array_cat(ARRAY[o.ec_internationalorganization], array_agg(od.ec_internationalorganization)), NULL))[1] AS ecinternationalorganization,
(array_remove(array_cat(ARRAY[o.ec_enterprise], array_agg(od.ec_enterprise)), NULL))[1] AS ecenterprise, (array_remove(array_cat(ARRAY[o.ec_enterprise], array_agg(od.ec_enterprise)), NULL))[1] AS ecenterprise,
(array_remove(array_cat(ARRAY[o.ec_smevalidated], array_agg(od.ec_smevalidated)), NULL))[1] AS ecsmevalidated, (array_remove(array_cat(ARRAY[o.ec_smevalidated], array_agg(od.ec_smevalidated)), NULL))[1] AS ecsmevalidated,
(array_remove(array_cat(ARRAY[o.ec_nutscode], array_agg(od.ec_nutscode)), NULL))[1] AS ecnutscode (array_remove(array_cat(ARRAY[o.ec_nutscode], array_agg(od.ec_nutscode)), NULL))[1] AS ecnutscode,
org_types.name AS typology
FROM organizations o FROM organizations o
LEFT OUTER JOIN acronyms a ON (a.id = o.id) LEFT OUTER JOIN acronyms a ON (a.id = o.id)
LEFT OUTER JOIN urls u ON (u.id = o.id) LEFT OUTER JOIN urls u ON (u.id = o.id)
@ -37,6 +38,7 @@ FROM organizations o
LEFT OUTER JOIN oa_duplicates d ON (o.id = d.local_id AND d.reltype != 'is_different') LEFT OUTER JOIN oa_duplicates d ON (o.id = d.local_id AND d.reltype != 'is_different')
LEFT OUTER JOIN organizations od ON (d.oa_original_id = od.id) LEFT OUTER JOIN organizations od ON (d.oa_original_id = od.id)
LEFT OUTER JOIN other_ids idup ON (od.id = idup.id) LEFT OUTER JOIN other_ids idup ON (od.id = idup.id)
LEFT OUTER JOIN org_types ON (org_types.val = o.type)
WHERE WHERE
o.status = 'approved' OR o.status = 'suggested' o.status = 'approved' OR o.status = 'suggested'
GROUP BY GROUP BY
@ -44,4 +46,5 @@ GROUP BY
o.name, o.name,
o.creation_date, o.creation_date,
o.modification_date, o.modification_date,
o.country; o.country,
org_types.name;

View File

@ -25,6 +25,22 @@ object SparkApplyHostedByMapToResult {
val i = p.getInstance().asScala val i = p.getInstance().asScala
if (i.size == 1) { if (i.size == 1) {
val inst: Instance = i.head val inst: Instance = i.head
patchInstance(p, ei, inst)
} else {
val cf = i.map(ii => ii.getCollectedfrom.getValue)
if (cf.contains("Crossref")) {
i.foreach(ii => {
patchInstance(p, ei, ii)
})
}
}
}
p
})(Encoders.bean(classOf[Publication]))
}
private def patchInstance(p: Publication, ei: EntityInfo, inst: Instance): Unit = {
inst.getHostedby.setKey(ei.getHostedById) inst.getHostedby.setKey(ei.getHostedById)
inst.getHostedby.setValue(ei.getName) inst.getHostedby.setValue(ei.getName)
if (ei.getOpenAccess) { if (ei.getOpenAccess) {
@ -39,11 +55,6 @@ object SparkApplyHostedByMapToResult {
inst.getAccessright.setOpenAccessRoute(OpenAccessRoute.gold) inst.getAccessright.setOpenAccessRoute(OpenAccessRoute.gold)
p.setBestaccessright(OafMapperUtils.createBestAccessRights(p.getInstance())); p.setBestaccessright(OafMapperUtils.createBestAccessRights(p.getInstance()));
} }
}
}
p
})(Encoders.bean(classOf[Publication]))
} }
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {

View File

@ -0,0 +1,186 @@
package eu.dnetlib.dhp.oa.oaipmh;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Optional;
import java.util.Properties;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.dom4j.Document;
import org.dom4j.DocumentHelper;
import org.dom4j.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.provision.model.SerializableSolrInputDocument;
import eu.dnetlib.dhp.oa.provision.model.TupleWrapper;
public class IrishOaiExporterJob {
private static final Logger log = LoggerFactory.getLogger(IrishOaiExporterJob.class);
protected static final int NUM_CONNECTIONS = 20;
public static final String TMP_OAI_TABLE = "temp_oai_data";
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
IrishOaiExporterJob.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/oaipmh/input_params_irish_oai_exporter.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("inputPath");
final String dbUrl = parser.get("dbUrl");
final String dbUser = parser.get("dbUser");
final String dbPwd = parser.get("dbPwd");
final int numConnections = Optional
.ofNullable(parser.get("numConnections"))
.map(Integer::valueOf)
.orElse(NUM_CONNECTIONS);
log.info("inputPath: '{}'", inputPath);
log.info("dbUrl: '{}'", dbUrl);
log.info("dbUser: '{}'", dbUser);
log.info("dbPwd: '{}'", "xxx");
log.info("numPartitions: '{}'", numConnections);
final Properties connectionProperties = new Properties();
connectionProperties.put("user", dbUser);
connectionProperties.put("password", dbPwd);
final SparkConf conf = new SparkConf();
conf.registerKryoClasses(new Class[] {
SerializableSolrInputDocument.class
});
final Encoder<TupleWrapper> encoderTuple = Encoders.bean(TupleWrapper.class);
final Encoder<OaiRecordWrapper> encoderOaiRecord = Encoders.bean(OaiRecordWrapper.class);
final String date = LocalDateTime.now().toString();
log.info("Creating temporary table...");
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
final Dataset<OaiRecordWrapper> docs = spark
.read()
.schema(encoderTuple.schema())
.json(inputPath)
.as(encoderTuple)
.map((MapFunction<TupleWrapper, String>) TupleWrapper::getXml, Encoders.STRING())
.map((MapFunction<String, OaiRecordWrapper>) r -> asIrishOaiResult(r, date), encoderOaiRecord)
.filter((FilterFunction<OaiRecordWrapper>) obj -> (obj != null) && StringUtils.isNotBlank(obj.getId()));
docs
.repartition(numConnections)
.write()
.mode(SaveMode.Overwrite)
.jdbc(dbUrl, TMP_OAI_TABLE, connectionProperties);
});
log.info("Temporary table created.");
log.info("Updating OAI records...");
try (final Connection con = DriverManager.getConnection(dbUrl, dbUser, dbPwd)) {
try (final Statement st = con.createStatement()) {
final String query = IOUtils
.toString(IrishOaiExporterJob.class.getResourceAsStream("oai-finalize.sql"));
st.execute(query);
}
}
log.info("DONE.");
}
protected static OaiRecordWrapper asIrishOaiResult(final String xml, final String date) {
try {
final Document doc = DocumentHelper.parseText(xml);
final OaiRecordWrapper r = new OaiRecordWrapper();
if (isValid(doc)) {
r.setId(doc.valueOf("//*[local-name()='objIdentifier']").trim());
r.setBody(gzip(doc.selectSingleNode("//*[local-name()='entity']").asXML()));
r.setDate(date);
r.setSets(new ArrayList<>());
}
return r;
} catch (final Exception e) {
log.error("Error parsing record: " + xml, e);
throw new RuntimeException("Error parsing record: " + xml, e);
}
}
protected static boolean isValid(final Document doc) {
final Node n = doc.selectSingleNode("//*[local-name()='entity']/*[local-name()='result']");
if (n != null) {
for (final Object o : n.selectNodes(".//*[local-name()='datainfo']/*[local-name()='deletedbyinference']")) {
if ("true".equals(((Node) o).getText().trim())) {
return false;
}
}
// verify the main country of the result
for (final Object o : n.selectNodes("./*[local-name()='country']")) {
if ("IE".equals(((Node) o).valueOf("@classid").trim())) {
return true;
}
}
// verify the countries of the related organizations
for (final Object o : n.selectNodes(".//*[local-name()='rel']")) {
final String relType = ((Node) o).valueOf("./*[local-name() = 'to']/@type").trim();
final String relCountry = ((Node) o).valueOf("./*[local-name() = 'country']/@classid").trim();
if ("organization".equals(relType) && "IE".equals(relCountry)) {
return true;
}
}
}
return false;
}
protected static byte[] gzip(final String str) {
if (StringUtils.isBlank(str)) {
return null;
}
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
try (final GZIPOutputStream gzip = new GZIPOutputStream(baos)) {
IOUtils.write(str.getBytes(Charset.defaultCharset()), gzip);
}
return baos.toByteArray();
} catch (final IOException e) {
throw new RuntimeException("error in gzip", e);
}
}
}

View File

@ -0,0 +1,51 @@
package eu.dnetlib.dhp.oa.oaipmh;
import java.io.Serializable;
import java.util.List;
public class OaiRecordWrapper implements Serializable {
private static final long serialVersionUID = 8997046455575004880L;
private String id;
private byte[] body;
private String date;
private List<String> sets;
public OaiRecordWrapper() {
}
public String getId() {
return this.id;
}
public void setId(final String id) {
this.id = id;
}
public byte[] getBody() {
return this.body;
}
public void setBody(final byte[] body) {
this.body = body;
}
public String getDate() {
return this.date;
}
public void setDate(final String date) {
this.date = date;
}
public List<String> getSets() {
return this.sets;
}
public void setSets(final List<String> sets) {
this.sets = sets;
}
}

View File

@ -0,0 +1,32 @@
[
{
"paramName": "i",
"paramLongName": "inputPath",
"paramDescription": "The path of the input records on HDFS",
"paramRequired": true
},
{
"paramName": "nc",
"paramLongName": "numConnections",
"paramDescription": "number of connections to the postgres db (for the write operation)",
"paramRequired": false
},
{
"paramName": "du",
"paramLongName": "dbUrl",
"paramDescription": "the url of the database",
"paramRequired": true
},
{
"paramName": "dusr",
"paramLongName": "dbUser",
"paramDescription": "the user of the database",
"paramRequired": true
},
{
"paramName": "dpwd",
"paramLongName": "dbPwd",
"paramDescription": "the password for the user of the database",
"paramRequired": true
}
]

View File

@ -0,0 +1,12 @@
BEGIN;
DELETE FROM oai_data;
INSERT INTO oai_data(id, body, date, sets) SELECT
id,
body,
date::timestamp,
sets
FROM temp_oai_data;
COMMIT;

View File

@ -0,0 +1,106 @@
<workflow-app name="irish-oaipmh-provision" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>inputPath</name>
<description>The path of the input records on HDFS</description>
</property>
<property>
<name>numConnections</name>
<description>number of connections to the postgres db (for the write operation)</description>
</property>
<property>
<name>dbUrl</name>
<description>the url of the database</description>
</property>
<property>
<name>dbUser</name>
<description>the user of the database</description>
</property>
<property>
<name>dbPwd</name>
<description>the password for the user of the database</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="oaiphm_provision"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="irish_oaiphm_provision">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Irish OAI-PHM provision</name>
<class>eu.dnetlib.dhp.oa.oaipmh.IrishOaiExporterJob</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=8000
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}</arg>
<arg>--numConnections</arg><arg>${numConnections}</arg>
<arg>--dbUrl</arg><arg>${dbUrl}</arg>
<arg>--dbUser</arg><arg>${dbUser}</arg>
<arg>--dbPwd</arg><arg>${dbPwd}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,97 @@
package eu.dnetlib.dhp.oa.oaipmh;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@Disabled
public class DbSerializationTest {
private static SparkSession spark;
public static final String dbUrl = "jdbc:postgresql://localhost:5432/db_test";
public static final String dbUser = null;
public static final String dbPwd = null;
@BeforeAll
public static void beforeAll() throws IOException {
final SparkConf conf = new SparkConf();
conf.setAppName("TEST");
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
spark = SparkSession
.builder()
.appName("TEST")
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
spark.stop();
}
@Test
public void testDatabaseSerialization() throws Exception {
final Properties connectionProperties = new Properties();
if (dbUser != null) {
connectionProperties.put("user", dbUser);
}
if (dbPwd != null) {
connectionProperties.put("password", dbPwd);
}
runWithSparkSession(new SparkConf(), false, spark -> {
final List<OaiRecordWrapper> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final OaiRecordWrapper r = new OaiRecordWrapper();
r.setId("record_" + i);
r.setBody("jsahdjkahdjahdajad".getBytes());
r.setDate(LocalDateTime.now().toString());
r.setSets(Arrays.asList());
list.add(r);
}
final Dataset<OaiRecordWrapper> docs = spark.createDataset(list, Encoders.bean(OaiRecordWrapper.class));
docs
.write()
.mode(SaveMode.Overwrite)
.jdbc(dbUrl, IrishOaiExporterJob.TMP_OAI_TABLE, connectionProperties);
});
try (final Connection con = DriverManager.getConnection(dbUrl, dbUser, dbPwd)) {
try (final Statement st = con.createStatement()) {
final String query = IOUtils.toString(getClass().getResourceAsStream("oai-finalize.sql"));
st.execute(query);
}
}
}
}

View File

@ -0,0 +1,88 @@
package eu.dnetlib.dhp.oa.oaipmh;
import static org.junit.Assert.assertNull;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.time.LocalDateTime;
import java.util.zip.GZIPInputStream;
import org.apache.commons.io.IOUtils;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.io.SAXReader;
import org.junit.jupiter.api.Test;
public class IrishOaiExporterJobTest {
@Test
void testAsIrishOaiResult() throws Exception {
final String xml = IOUtils.toString(getClass().getResourceAsStream("record_IE.xml"));
final OaiRecordWrapper res = IrishOaiExporterJob.asIrishOaiResult(xml, LocalDateTime.now().toString());
assertNotNull(res.getId());
assertNotNull(res.getBody());
assertNotNull(res.getSets());
assertNotNull(res.getDate());
assertEquals("dedup_wf_002::532be02f990b479a1da46d71f1a4c3f0", res.getId());
assertTrue(res.getBody().length > 0);
assertTrue(res.getSets().isEmpty());
}
@Test
void testIsValid_IE() throws DocumentException {
final Document doc = new SAXReader().read(getClass().getResourceAsStream("record_IE.xml"));
assertTrue(IrishOaiExporterJob.isValid(doc));
}
@Test
void testIsValid_invalid_country() throws DocumentException {
final Document doc = new SAXReader().read(getClass().getResourceAsStream("record_IT.xml"));
assertFalse(IrishOaiExporterJob.isValid(doc));
}
@Test
void testIsValid_deleted() throws DocumentException {
final Document doc = new SAXReader().read(getClass().getResourceAsStream("record_IE_deleted.xml"));
assertFalse(IrishOaiExporterJob.isValid(doc));
}
@Test
void testGzip_simple() {
final String message = "<test />";
final byte[] bytes = IrishOaiExporterJob.gzip(message);
assertNotNull(bytes);
assertTrue(bytes.length > 0);
assertEquals(message, gunzip(bytes));
}
@Test
void testGzip_empty() {
assertNull(IrishOaiExporterJob.gzip(""));
assertNull(IrishOaiExporterJob.gzip(null));
}
public static String gunzip(final byte[] compressed) {
if ((compressed == null) || (compressed.length == 0)) {
return null;
}
if (!isCompressed(compressed)) {
return new String(compressed);
}
try (final GZIPInputStream gis = new GZIPInputStream(new ByteArrayInputStream(compressed))) {
return IOUtils.toString(gis, Charset.defaultCharset());
} catch (final IOException e) {
throw new RuntimeException("error in gunzip", e);
}
}
private static boolean isCompressed(final byte[] compressed) {
return (compressed[0] == (byte) GZIPInputStream.GZIP_MAGIC)
&& (compressed[1] == (byte) (GZIPInputStream.GZIP_MAGIC >> 8));
}
}

View File

@ -0,0 +1,89 @@
<record rank="null">
<result>
<header>
<objIdentifier>dedup_wf_002::532be02f990b479a1da46d71f1a4c3f0</objIdentifier>
<dateOfCollection>2023-03-31T18:37:45.599Z</dateOfCollection>
<dateOfTransformation>2023-03-31T18:45:52.701Z</dateOfTransformation>
</header>
<metadata>
<entity schemaLocation="http://namespace.openaire.eu/oaf https://www.openaire.eu/schema/1.0/oaf-1.0.xsd">
<result>
<collectedfrom name="Waterford Institute of Technology - Open Access Repository" id="opendoar____::50c1f44e426560f3f2cdcb3e19e39903" />
<collectedfrom name="SETU Open Access Repository" id="opendoar____::4daa3db355ef2b0e64b472968cb70f0d" />
<originalId>50|od______6005::55a12e2e0fee45ce8005633c6c17fe9f</originalId>
<originalId>oai:repository.wit.ie:3029</originalId>
<originalId>50|od_______934::e7162a5632264cd622ee7180ca66fdce</originalId>
<originalId>oai:generic.eprints.org:3029</originalId>
<originalId>50|od_______934::55a12e2e0fee45ce8005633c6c17fe9f</originalId>
<measure id="influence" score="3.1177596E-9" class="C5" />
<measure id="popularity" score="1.2305752E-9" class="C5" />
<measure id="influence_alt" score="0" class="C5" />
<measure id="popularity_alt" score="0.0" class="C5" />
<measure id="impulse" score="0" class="C5" />
<fulltext>http://repository.wit.ie/3029/1/Research%20Day%202015%20-%20Poster%20Tadhg%20Blommerde.pdf</fulltext>
<title classid="main title" classname="main title" schemeid="dnet:dataCite_title" schemename="dnet:dataCite_title" inferred="false" provenanceaction="sysimport:crosswalk:repository" trust="0.9">A service innovation capability maturity model for SMEs</title>
<bestaccessright classid="OPEN" classname="Open Access" schemeid="dnet:access_modes" schemename="dnet:access_modes" />
<creator rank="1" name="Tadhg" surname="Blommerde">Blommerde, Tadhg</creator>
<creator rank="2" name="Patrick" surname="Lynch">Lynch, Patrick</creator>
<country classid="IE" classname="Ireland" schemeid="dnet:countries" schemename="dnet:countries" />
<dateofacceptance>2015-04-28</dateofacceptance>
<description>There is general consensus that service innovations are prerequisite to sustained competitive advantage and are an essential mechanism for responding to changes in customer needs and the operating environment of firms (Giannopoulou et al., 2011; Stryja et al., 2013). Services have been described as ubiquitous in their role of generating economic growth and wellbeing and represent over 70% of employment and GDP in developed nations (Janssen et al., 2012; Mustak, 2014). As a consequence, service innovations must be a core ambition of all countries, regions, and firms wishing to remain competitive (van Ark et al., 2003). While acknowledging the importance of once-off innovations, more critical still is the capability to repeatedly introduce and exploit service innovations (Siguaw et al., 2006). This is generally referred to as service innovation capability (SIC) and describes the repeatable routines and behaviours that organisations have in place to transform ideas and knowledge into innovations (Basterretxea and Martínez, 2012). However, despite links between SIC and continuous, sustainable, and consistent service innovations, there is evidence that many organisations struggle with its effective management (Adams et al., 2006; den Hertog et al., 2010). This is often attributed to the lack of formal guidance available and the absence of metrics to determine an organisations SIC performance (Hogan et al., 2011; Szczygielski, 2011). Maturity modelling research in this discipline remains at an embryonic stage, thus far presenting only conceptual and opaque discussions that fail to address the necessity for an assessment and strategic management framework (Gryszkiewicz et al., 2013; Hipp and Grupp, 2005). Therefore, the purpose of this ongoing research project is to evaluate the maturity of an organisations SIC to inform its effective management and enhancement. To achieve this it dimensionalises the concept into four constituent capabilities, specifically, strategising, customer involvement, knowledge management, and networking (Blommerde and Lynch, 2014). The study then tracks the maturity of these capabilities as they progress through eight evolutionary plateaus towards a fully developed or optimal state. This is accomplished through a capability maturity model that enables organisations to rapidly diagnose key areas of strength and weakness to systematically cultivate behaviours that leverage their untapped innovative potential (Wendler, 2012; Essmann and du Preez, 2010). As a result of the immense knowledge vacuum characteristic of this discipline, it is anticipated that this ongoing research project will make a substantial contribution to both academic understanding and take strides towards filling the void in practical support (Rapaccini et al., 2013). It expands the service innovation literature by detailing key service innovation levers, bolsters the discipline through clear definitions of terminology, provides a powerful explanation of the development of SICs, and operationalises the dynamic capabilities view through a novel self-assessment reference model (Jochem et al., 2011). The next step in the project is the evaluation of the, as yet, conceptual service innovation capability maturity model. Adopting a positivistic philosophical stance, the study proposes the use of structural equation modelling on data gathered through an extensive survey to confirm the model and support theoretical assumptions.</description>
<subject classid="keyword" classname="keyword" schemeid="dnet:result_subject" schemename="dnet:result_subject" inferred="false" provenanceaction="sysimport:crosswalk:repository" trust="0.9">RIKON (Research in Inovation, Knowledge &amp; Organisational Networks)</subject>
<language classid="eng" classname="English" schemeid="dnet:languages" schemename="dnet:languages" />
<format>application/pdf</format>
<resulttype classid="publication" classname="publication" schemeid="dnet:result_typologies" schemename="dnet:result_typologies" />
<resourcetype classid="UNKNOWN" classname="UNKNOWN" schemeid="dnet:dataCite_resource" schemename="dnet:dataCite_resource" />
<isgreen>false</isgreen>
<isindiamondjournal>false</isindiamondjournal>
<publiclyfunded>true</publiclyfunded>
<context id="eu-conexus" label="European University for Smart Urban Coastal Sustainability" type="community" />
<context id="tunet" label="TU-NET" type="community" />
<datainfo>
<inferred>true</inferred>
<deletedbyinference>false</deletedbyinference>
<trust>0.8</trust>
<inferenceprovenance>dedup-result-decisiontree-v4</inferenceprovenance>
<provenanceaction classid="sysimport:dedup" classname="Inferred by OpenAIRE" schemeid="dnet:provenanceActions" schemename="dnet:provenanceActions" />
</datainfo>
<rels>
<rel inferred="true" trust="0.85" inferenceprovenance="propagation" provenanceaction="result:organization:instrepo">
<to class="hasAuthorInstitution" scheme="dnet:result_organization_relations" type="organization">openorgs____::54cd984fc7d3b153ec2181f985041f02</to>
<country classid="IE" classname="Ireland" schemeid="dnet:countries" schemename="dnet:countries" />
<legalshortname>WIT</legalshortname>
<legalname>South East Technological University</legalname>
</rel>
</rels>
<children>
<result objidentifier="od_______934::e7162a5632264cd622ee7180ca66fdce">
<title classid="main title" classname="main title" schemeid="dnet:dataCite_title" schemename="dnet:dataCite_title" inferred="false" provenanceaction="sysimport:crosswalk:repository" trust="0.9">A service innovation capability maturity model for SMEs</title>
<dateofacceptance>2015-04-28</dateofacceptance>
<collectedfrom name="SETU Open Access Repository" id="opendoar____::4daa3db355ef2b0e64b472968cb70f0d" />
</result>
<result objidentifier="od______6005::55a12e2e0fee45ce8005633c6c17fe9f">
<title classid="main title" classname="main title" schemeid="dnet:dataCite_title" schemename="dnet:dataCite_title" inferred="false" provenanceaction="sysimport:crosswalk:repository" trust="0.9">A service innovation capability maturity model for SMEs</title>
<dateofacceptance>2015-04-28</dateofacceptance>
<collectedfrom name="Waterford Institute of Technology - Open Access Repository" id="opendoar____::50c1f44e426560f3f2cdcb3e19e39903" />
</result>
<result objidentifier="od_______934::55a12e2e0fee45ce8005633c6c17fe9f">
<title classid="main title" classname="main title" schemeid="dnet:dataCite_title" schemename="dnet:dataCite_title" inferred="false" provenanceaction="sysimport:crosswalk:repository" trust="0.9">A service innovation capability maturity model for SMEs</title>
<dateofacceptance>2015-04-28</dateofacceptance>
<collectedfrom name="SETU Open Access Repository" id="opendoar____::4daa3db355ef2b0e64b472968cb70f0d" />
</result>
<instance>
<accessright classid="OPEN" classname="Open Access" schemeid="dnet:access_modes" schemename="dnet:access_modes" />
<collectedfrom name="SETU Open Access Repository" id="opendoar____::4daa3db355ef2b0e64b472968cb70f0d" />
<hostedby name="SETU Open Access Repository" id="opendoar____::4daa3db355ef2b0e64b472968cb70f0d" />
<dateofacceptance>2015-04-28</dateofacceptance>
<instancetype classid="0004" classname="Conference object" schemeid="dnet:publication_resource" schemename="dnet:publication_resource" />
<refereed classid="0002" classname="nonPeerReviewed" schemeid="dnet:review_levels" schemename="dnet:review_levels" />
<fulltext>http://repository.wit.ie/3029/1/Research%20Day%202015%20-%20Poster%20Tadhg%20Blommerde.pdf</fulltext>
<webresource>
<url>http://repository.wit.ie/3029/</url>
</webresource>
</instance>
</children>
</result>
</entity>
</metadata>
</result>
</record>

View File

@ -0,0 +1,89 @@
<record rank="null">
<result>
<header>
<objIdentifier>dedup_wf_002::532be02f990b479a1da46d71f1a4c3f0</objIdentifier>
<dateOfCollection>2023-03-31T18:37:45.599Z</dateOfCollection>
<dateOfTransformation>2023-03-31T18:45:52.701Z</dateOfTransformation>
</header>
<metadata>
<entity schemaLocation="http://namespace.openaire.eu/oaf https://www.openaire.eu/schema/1.0/oaf-1.0.xsd">
<result>
<collectedfrom name="Waterford Institute of Technology - Open Access Repository" id="opendoar____::50c1f44e426560f3f2cdcb3e19e39903" />
<collectedfrom name="SETU Open Access Repository" id="opendoar____::4daa3db355ef2b0e64b472968cb70f0d" />
<originalId>50|od______6005::55a12e2e0fee45ce8005633c6c17fe9f</originalId>
<originalId>oai:repository.wit.ie:3029</originalId>
<originalId>50|od_______934::e7162a5632264cd622ee7180ca66fdce</originalId>
<originalId>oai:generic.eprints.org:3029</originalId>
<originalId>50|od_______934::55a12e2e0fee45ce8005633c6c17fe9f</originalId>
<measure id="influence" score="3.1177596E-9" class="C5" />
<measure id="popularity" score="1.2305752E-9" class="C5" />
<measure id="influence_alt" score="0" class="C5" />
<measure id="popularity_alt" score="0.0" class="C5" />
<measure id="impulse" score="0" class="C5" />
<fulltext>http://repository.wit.ie/3029/1/Research%20Day%202015%20-%20Poster%20Tadhg%20Blommerde.pdf</fulltext>
<title classid="main title" classname="main title" schemeid="dnet:dataCite_title" schemename="dnet:dataCite_title" inferred="false" provenanceaction="sysimport:crosswalk:repository" trust="0.9">A service innovation capability maturity model for SMEs</title>
<bestaccessright classid="OPEN" classname="Open Access" schemeid="dnet:access_modes" schemename="dnet:access_modes" />
<creator rank="1" name="Tadhg" surname="Blommerde">Blommerde, Tadhg</creator>
<creator rank="2" name="Patrick" surname="Lynch">Lynch, Patrick</creator>
<country classid="IE" classname="Ireland" schemeid="dnet:countries" schemename="dnet:countries" />
<dateofacceptance>2015-04-28</dateofacceptance>
<description>There is general consensus that service innovations are prerequisite to sustained competitive advantage and are an essential mechanism for responding to changes in customer needs and the operating environment of firms (Giannopoulou et al., 2011; Stryja et al., 2013). Services have been described as ubiquitous in their role of generating economic growth and wellbeing and represent over 70% of employment and GDP in developed nations (Janssen et al., 2012; Mustak, 2014). As a consequence, service innovations must be a core ambition of all countries, regions, and firms wishing to remain competitive (van Ark et al., 2003). While acknowledging the importance of once-off innovations, more critical still is the capability to repeatedly introduce and exploit service innovations (Siguaw et al., 2006). This is generally referred to as service innovation capability (SIC) and describes the repeatable routines and behaviours that organisations have in place to transform ideas and knowledge into innovations (Basterretxea and Martínez, 2012). However, despite links between SIC and continuous, sustainable, and consistent service innovations, there is evidence that many organisations struggle with its effective management (Adams et al., 2006; den Hertog et al., 2010). This is often attributed to the lack of formal guidance available and the absence of metrics to determine an organisations SIC performance (Hogan et al., 2011; Szczygielski, 2011). Maturity modelling research in this discipline remains at an embryonic stage, thus far presenting only conceptual and opaque discussions that fail to address the necessity for an assessment and strategic management framework (Gryszkiewicz et al., 2013; Hipp and Grupp, 2005). Therefore, the purpose of this ongoing research project is to evaluate the maturity of an organisations SIC to inform its effective management and enhancement. To achieve this it dimensionalises the concept into four constituent capabilities, specifically, strategising, customer involvement, knowledge management, and networking (Blommerde and Lynch, 2014). The study then tracks the maturity of these capabilities as they progress through eight evolutionary plateaus towards a fully developed or optimal state. This is accomplished through a capability maturity model that enables organisations to rapidly diagnose key areas of strength and weakness to systematically cultivate behaviours that leverage their untapped innovative potential (Wendler, 2012; Essmann and du Preez, 2010). As a result of the immense knowledge vacuum characteristic of this discipline, it is anticipated that this ongoing research project will make a substantial contribution to both academic understanding and take strides towards filling the void in practical support (Rapaccini et al., 2013). It expands the service innovation literature by detailing key service innovation levers, bolsters the discipline through clear definitions of terminology, provides a powerful explanation of the development of SICs, and operationalises the dynamic capabilities view through a novel self-assessment reference model (Jochem et al., 2011). The next step in the project is the evaluation of the, as yet, conceptual service innovation capability maturity model. Adopting a positivistic philosophical stance, the study proposes the use of structural equation modelling on data gathered through an extensive survey to confirm the model and support theoretical assumptions.</description>
<subject classid="keyword" classname="keyword" schemeid="dnet:result_subject" schemename="dnet:result_subject" inferred="false" provenanceaction="sysimport:crosswalk:repository" trust="0.9">RIKON (Research in Inovation, Knowledge &amp; Organisational Networks)</subject>
<language classid="eng" classname="English" schemeid="dnet:languages" schemename="dnet:languages" />
<format>application/pdf</format>
<resulttype classid="publication" classname="publication" schemeid="dnet:result_typologies" schemename="dnet:result_typologies" />
<resourcetype classid="UNKNOWN" classname="UNKNOWN" schemeid="dnet:dataCite_resource" schemename="dnet:dataCite_resource" />
<isgreen>false</isgreen>
<isindiamondjournal>false</isindiamondjournal>
<publiclyfunded>true</publiclyfunded>
<context id="eu-conexus" label="European University for Smart Urban Coastal Sustainability" type="community" />
<context id="tunet" label="TU-NET" type="community" />
<datainfo>
<inferred>true</inferred>
<deletedbyinference>true</deletedbyinference>
<trust>0.8</trust>
<inferenceprovenance>dedup-result-decisiontree-v4</inferenceprovenance>
<provenanceaction classid="sysimport:dedup" classname="Inferred by OpenAIRE" schemeid="dnet:provenanceActions" schemename="dnet:provenanceActions" />
</datainfo>
<rels>
<rel inferred="true" trust="0.85" inferenceprovenance="propagation" provenanceaction="result:organization:instrepo">
<to class="hasAuthorInstitution" scheme="dnet:result_organization_relations" type="organization">openorgs____::54cd984fc7d3b153ec2181f985041f02</to>
<country classid="IE" classname="Ireland" schemeid="dnet:countries" schemename="dnet:countries" />
<legalshortname>WIT</legalshortname>
<legalname>South East Technological University</legalname>
</rel>
</rels>
<children>
<result objidentifier="od_______934::e7162a5632264cd622ee7180ca66fdce">
<title classid="main title" classname="main title" schemeid="dnet:dataCite_title" schemename="dnet:dataCite_title" inferred="false" provenanceaction="sysimport:crosswalk:repository" trust="0.9">A service innovation capability maturity model for SMEs</title>
<dateofacceptance>2015-04-28</dateofacceptance>
<collectedfrom name="SETU Open Access Repository" id="opendoar____::4daa3db355ef2b0e64b472968cb70f0d" />
</result>
<result objidentifier="od______6005::55a12e2e0fee45ce8005633c6c17fe9f">
<title classid="main title" classname="main title" schemeid="dnet:dataCite_title" schemename="dnet:dataCite_title" inferred="false" provenanceaction="sysimport:crosswalk:repository" trust="0.9">A service innovation capability maturity model for SMEs</title>
<dateofacceptance>2015-04-28</dateofacceptance>
<collectedfrom name="Waterford Institute of Technology - Open Access Repository" id="opendoar____::50c1f44e426560f3f2cdcb3e19e39903" />
</result>
<result objidentifier="od_______934::55a12e2e0fee45ce8005633c6c17fe9f">
<title classid="main title" classname="main title" schemeid="dnet:dataCite_title" schemename="dnet:dataCite_title" inferred="false" provenanceaction="sysimport:crosswalk:repository" trust="0.9">A service innovation capability maturity model for SMEs</title>
<dateofacceptance>2015-04-28</dateofacceptance>
<collectedfrom name="SETU Open Access Repository" id="opendoar____::4daa3db355ef2b0e64b472968cb70f0d" />
</result>
<instance>
<accessright classid="OPEN" classname="Open Access" schemeid="dnet:access_modes" schemename="dnet:access_modes" />
<collectedfrom name="SETU Open Access Repository" id="opendoar____::4daa3db355ef2b0e64b472968cb70f0d" />
<hostedby name="SETU Open Access Repository" id="opendoar____::4daa3db355ef2b0e64b472968cb70f0d" />
<dateofacceptance>2015-04-28</dateofacceptance>
<instancetype classid="0004" classname="Conference object" schemeid="dnet:publication_resource" schemename="dnet:publication_resource" />
<refereed classid="0002" classname="nonPeerReviewed" schemeid="dnet:review_levels" schemename="dnet:review_levels" />
<fulltext>http://repository.wit.ie/3029/1/Research%20Day%202015%20-%20Poster%20Tadhg%20Blommerde.pdf</fulltext>
<webresource>
<url>http://repository.wit.ie/3029/</url>
</webresource>
</instance>
</children>
</result>
</entity>
</metadata>
</result>
</record>

View File

@ -0,0 +1,66 @@
<record rank="null">
<result>
<header>
<objIdentifier>od_______310::02365c51a0ed7cbb54b2bbc7c0426d1b</objIdentifier>
<dateOfCollection>2024-04-06T06:05:16+0000</dateOfCollection>
<dateOfTransformation>2024-04-06T06:56:01.776Z</dateOfTransformation>
</header>
<metadata>
<entity schemaLocation="http://namespace.openaire.eu/oaf https://www.openaire.eu/schema/1.0/oaf-1.0.xsd">
<result>
<collectedfrom name="Flore (Florence Research Repository)" id="opendoar____::06eb61b839a0cefee4967c67ccb099dc" />
<originalId>50|od_______310::02365c51a0ed7cbb54b2bbc7c0426d1b</originalId>
<originalId>oai:flore.unifi.it:2158/608965</originalId>
<pid classid="handle" classname="Handle" schemeid="dnet:pid_types" schemename="dnet:pid_types" inferred="false" provenanceaction="sysimport:crosswalk:repository" trust="0.9">2158/608965</pid>
<measure id="influence" score="3.1177596E-9" class="C5" />
<measure id="popularity" score="7.6231693E-10" class="C5" />
<measure id="influence_alt" score="0" class="C5" />
<measure id="popularity_alt" score="0.0" class="C5" />
<measure id="impulse" score="0" class="C5" />
<title classid="main title" classname="main title" schemeid="dnet:dataCite_title" schemename="dnet:dataCite_title" inferred="false" provenanceaction="sysimport:crosswalk:repository" trust="0.9">Estorsione (art. 629)</title>
<bestaccessright classid="UNKNOWN" classname="not available" schemeid="dnet:access_modes" schemename="dnet:access_modes" />
<creator rank="1" name="Francesco" surname="Macri" orcid_pending="0000-0001-7658-6173">MACRI', FRANCESCO</creator>
<country classid="IT" classname="Italy" schemeid="dnet:countries" schemename="dnet:countries" />
<dateofacceptance>2011-01-01</dateofacceptance>
<language classid="und" classname="Undetermined" schemeid="dnet:languages" schemename="dnet:languages" />
<relevantdate classid="Accepted" classname="Accepted" schemeid="dnet:dataCite_date" schemename="dnet:dataCite_date" inferred="false" provenanceaction="sysimport:crosswalk:repository" trust="0.9">2011-01-01</relevantdate>
<relevantdate classid="issued" classname="issued" schemeid="dnet:dataCite_date" schemename="dnet:dataCite_date" inferred="false" provenanceaction="sysimport:crosswalk:repository" trust="0.9">2011-01-01</relevantdate>
<relevantdate classid="available" classname="available" schemeid="dnet:dataCite_date" schemename="dnet:dataCite_date" inferred="false" provenanceaction="sysimport:crosswalk:repository" trust="0.9">2015-04-28</relevantdate>
<publisher>UTET</publisher>
<resulttype classid="publication" classname="publication" schemeid="dnet:result_typologies" schemename="dnet:result_typologies" />
<resourcetype classid="book part" classname="book part" schemeid="dnet:dataCite_resource" schemename="dnet:dataCite_resource" />
<datainfo>
<inferred>false</inferred>
<deletedbyinference>false</deletedbyinference>
<trust>0.9</trust>
<inferenceprovenance>null</inferenceprovenance>
<provenanceaction classid="sysimport:crosswalk:repository" classname="Harvested" schemeid="dnet:provenanceActions" schemename="dnet:provenanceActions" />
</datainfo>
<rels>
<rel inferred="true" trust="0.85" inferenceprovenance="propagation" provenanceaction="result:organization:instrepo">
<to class="hasAuthorInstitution" scheme="dnet:result_organization_relations" type="organization">openorgs____::41406edad82942e9e0b29317b8a847e2</to>
<legalshortname>University of Florence</legalshortname>
<country classid="IT" classname="Italy" schemeid="dnet:countries" schemename="dnet:countries" />
<legalname>University of Florence</legalname>
</rel>
</rels>
<children>
<instance>
<accessright classid="UNKNOWN" classname="not available" schemeid="dnet:access_modes" schemename="dnet:access_modes" />
<collectedfrom name="Flore (Florence Research Repository)" id="opendoar____::06eb61b839a0cefee4967c67ccb099dc" />
<hostedby name="Flore (Florence Research Repository)" id="opendoar____::06eb61b839a0cefee4967c67ccb099dc" />
<dateofacceptance>2011-01-01</dateofacceptance>
<instancetype classid="0013" classname="Part of book or chapter of book" schemeid="dnet:publication_resource" schemename="dnet:publication_resource" />
<pid classid="handle" classname="Handle" schemeid="dnet:pid_types" schemename="dnet:pid_types" inferred="false" provenanceaction="sysimport:crosswalk:repository" trust="0.9">2158/608965</pid>
<alternateidentifier classid="urn" classname="urn" schemeid="dnet:pid_types" schemename="dnet:pid_types" inferred="false" provenanceaction="sysimport:crosswalk:repository" trust="0.9">http://hdl.handle.net/2158/608965</alternateidentifier>
<refereed classid="0002" classname="nonPeerReviewed" schemeid="dnet:review_levels" schemename="dnet:review_levels" />
<webresource>
<url>https://hdl.handle.net/2158/608965</url>
</webresource>
</instance>
</children>
</result>
</entity>
</metadata>
</result>
</record>

View File

@ -72,6 +72,8 @@ function copydb() {
rm -f error.log rm -f error.log
if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then
exit 2 exit 2
else
return 2
fi fi
fi fi
@ -90,19 +92,30 @@ function copydb() {
-pb \ -pb \
${OCEAN_HDFS_NODE}/user/hive/warehouse/${db}.db ${IMPALA_HDFS_DB_BASE_PATH} ${OCEAN_HDFS_NODE}/user/hive/warehouse/${db}.db ${IMPALA_HDFS_DB_BASE_PATH}
# Check the exit status of the "hadoop distcp" command. if [ $? -eq 0 ]; then # Check the exit status of the "hadoop distcp" command.
if [ $? -eq 0 ]; then echo -e "\nSuccessfully copied the files of '${db}' from Ocean to Impala cluster.\n"
echo -e "\nSuccessfully copied the files of '${db}'.\n"
else else
echo -e "\n\nERROR: FAILED TO TRANSFER THE FILES OF '${db}', WITH 'hadoop distcp'. GOT EXIT STATUS: $?\n\n" echo -e "\n\nERROR: FAILED TO TRANSFER THE FILES OF '${db}', WITH 'hadoop distcp'. GOT EXIT STATUS: $?\n\n"
rm -f error.log rm -f error.log
if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then
exit 3 exit 3
else
return 3
fi fi
fi fi
# In case we ever use this script for a writable DB (using inserts/updates), we should perform the following costly operation as well.. # Give WRITE and EXECUTE permissions to the DBs' directory only, in order to be able to create more tables later, on top of that DB.
#hdfs dfs -conf ${IMPALA_CONFIG_FILE} -chmod -R 777 ${TEMP_SUBDIR_FULLPATH}/${db}.db hdfs dfs -conf ${IMPALA_CONFIG_FILE} -chmod u+wx ${IMPALA_HDFS_DB_BASE_PATH}/${db}.db
# In case we ever use this script for a writable DB (using inserts/updates), we should perform the costly recursive operation as well, using the "-R" param.
if [ $? -ne 0 ]; then # Check the exit status..
echo -e "\n\nERROR: FAILED TO ASSIGN WRITE AND EXECUTE PERMISSIONS TO THE DIRECTORY OF DB: '${db}'. GOT EXIT STATUS: $?\n\n"
rm -f error.log
if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then
exit 4
else
return 4
fi
fi
echo -e "\nCreating schema for db: '${db}'\n" echo -e "\nCreating schema for db: '${db}'\n"
@ -131,7 +144,7 @@ function copydb() {
if [ -z "$CURRENT_PRQ_FILE" ]; then # If there is not parquet-file inside. if [ -z "$CURRENT_PRQ_FILE" ]; then # If there is not parquet-file inside.
echo -e "\nERROR: THE TABLE \"${i}\" HAD NO FILES TO GET THE SCHEMA FROM! IT'S EMPTY!\n\n" echo -e "\nERROR: THE TABLE \"${i}\" HAD NO FILES TO GET THE SCHEMA FROM! IT'S EMPTY!\n\n"
if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then
exit 4 exit 5
fi fi
else else
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "create table ${db}.${i} like parquet '${CURRENT_PRQ_FILE}' stored as parquet;" |& tee error.log impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "create table ${db}.${i} like parquet '${CURRENT_PRQ_FILE}' stored as parquet;" |& tee error.log
@ -139,7 +152,7 @@ function copydb() {
if [ -n "$log_errors" ]; then if [ -n "$log_errors" ]; then
echo -e "\n\nERROR: THERE WAS A PROBLEM WHEN CREATING TABLE '${i}'!\n\n" echo -e "\n\nERROR: THERE WAS A PROBLEM WHEN CREATING TABLE '${i}'!\n\n"
if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then
exit 5 exit 6
fi fi
fi fi
fi fi
@ -185,7 +198,7 @@ function copydb() {
if [[ $new_num_of_views_to_retry -eq $previous_num_of_views_to_retry ]]; then if [[ $new_num_of_views_to_retry -eq $previous_num_of_views_to_retry ]]; then
echo -e "\n\nERROR: THE NUMBER OF VIEWS TO RETRY HAS NOT BEEN REDUCED! THE SCRIPT IS LIKELY GOING TO AN INFINITE-LOOP! EXITING..\n\n" echo -e "\n\nERROR: THE NUMBER OF VIEWS TO RETRY HAS NOT BEEN REDUCED! THE SCRIPT IS LIKELY GOING TO AN INFINITE-LOOP! EXITING..\n\n"
if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then
exit 6 exit 7
fi fi
elif [[ $new_num_of_views_to_retry -gt 0 ]]; then elif [[ $new_num_of_views_to_retry -gt 0 ]]; then
echo -e "\nTo be retried \"create_view_statements\" (${new_num_of_views_to_retry}):\n\n${all_create_view_statements[@]}\n" echo -e "\nTo be retried \"create_view_statements\" (${new_num_of_views_to_retry}):\n\n${all_create_view_statements[@]}\n"
@ -215,7 +228,7 @@ function copydb() {
echo -e "\n\nERROR: 1 OR MORE ENTITIES OF DB '${db}' FAILED TO BE COPIED TO IMPALA CLUSTER!\n\n" echo -e "\n\nERROR: 1 OR MORE ENTITIES OF DB '${db}' FAILED TO BE COPIED TO IMPALA CLUSTER!\n\n"
rm -f error.log rm -f error.log
if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then
exit 7 exit 8
fi fi
fi fi

View File

@ -72,6 +72,8 @@ function copydb() {
rm -f error.log rm -f error.log
if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then
exit 2 exit 2
else
return 2
fi fi
fi fi
@ -90,19 +92,30 @@ function copydb() {
-pb \ -pb \
${OCEAN_HDFS_NODE}/user/hive/warehouse/${db}.db ${IMPALA_HDFS_DB_BASE_PATH} ${OCEAN_HDFS_NODE}/user/hive/warehouse/${db}.db ${IMPALA_HDFS_DB_BASE_PATH}
# Check the exit status of the "hadoop distcp" command. if [ $? -eq 0 ]; then # Check the exit status of the "hadoop distcp" command.
if [ $? -eq 0 ]; then echo -e "\nSuccessfully copied the files of '${db}' from Ocean to Impala cluster.\n"
echo -e "\nSuccessfully copied the files of '${db}'.\n"
else else
echo -e "\n\nERROR: FAILED TO TRANSFER THE FILES OF '${db}', WITH 'hadoop distcp'. GOT EXIT STATUS: $?\n\n" echo -e "\n\nERROR: FAILED TO TRANSFER THE FILES OF '${db}', WITH 'hadoop distcp'. GOT EXIT STATUS: $?\n\n"
rm -f error.log rm -f error.log
if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then
exit 3 exit 3
else
return 3
fi fi
fi fi
# In case we ever use this script for a writable DB (using inserts/updates), we should perform the following costly operation as well.. # Give WRITE and EXECUTE permissions to the DBs' directory only, in order to be able to create more tables later, on top of that DB.
#hdfs dfs -conf ${IMPALA_CONFIG_FILE} -chmod -R 777 ${TEMP_SUBDIR_FULLPATH}/${db}.db hdfs dfs -conf ${IMPALA_CONFIG_FILE} -chmod u+wx ${IMPALA_HDFS_DB_BASE_PATH}/${db}.db
# In case we ever use this script for a writable DB (using inserts/updates), we should perform the costly recursive operation as well, using the "-R" param.
if [ $? -ne 0 ]; then # Check the exit status..
echo -e "\n\nERROR: FAILED TO ASSIGN WRITE AND EXECUTE PERMISSIONS TO THE DIRECTORY OF DB: '${db}'. GOT EXIT STATUS: $?\n\n"
rm -f error.log
if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then
exit 4
else
return 4
fi
fi
echo -e "\nCreating schema for db: '${db}'\n" echo -e "\nCreating schema for db: '${db}'\n"
@ -131,7 +144,7 @@ function copydb() {
if [ -z "$CURRENT_PRQ_FILE" ]; then # If there is not parquet-file inside. if [ -z "$CURRENT_PRQ_FILE" ]; then # If there is not parquet-file inside.
echo -e "\nERROR: THE TABLE \"${i}\" HAD NO FILES TO GET THE SCHEMA FROM! IT'S EMPTY!\n\n" echo -e "\nERROR: THE TABLE \"${i}\" HAD NO FILES TO GET THE SCHEMA FROM! IT'S EMPTY!\n\n"
if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then
exit 4 exit 5
fi fi
else else
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "create table ${db}.${i} like parquet '${CURRENT_PRQ_FILE}' stored as parquet;" |& tee error.log impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "create table ${db}.${i} like parquet '${CURRENT_PRQ_FILE}' stored as parquet;" |& tee error.log
@ -139,7 +152,7 @@ function copydb() {
if [ -n "$log_errors" ]; then if [ -n "$log_errors" ]; then
echo -e "\n\nERROR: THERE WAS A PROBLEM WHEN CREATING TABLE '${i}'!\n\n" echo -e "\n\nERROR: THERE WAS A PROBLEM WHEN CREATING TABLE '${i}'!\n\n"
if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then
exit 5 exit 6
fi fi
fi fi
fi fi
@ -185,7 +198,7 @@ function copydb() {
if [[ $new_num_of_views_to_retry -eq $previous_num_of_views_to_retry ]]; then if [[ $new_num_of_views_to_retry -eq $previous_num_of_views_to_retry ]]; then
echo -e "\n\nERROR: THE NUMBER OF VIEWS TO RETRY HAS NOT BEEN REDUCED! THE SCRIPT IS LIKELY GOING TO AN INFINITE-LOOP! EXITING..\n\n" echo -e "\n\nERROR: THE NUMBER OF VIEWS TO RETRY HAS NOT BEEN REDUCED! THE SCRIPT IS LIKELY GOING TO AN INFINITE-LOOP! EXITING..\n\n"
if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then
exit 6 exit 7
fi fi
elif [[ $new_num_of_views_to_retry -gt 0 ]]; then elif [[ $new_num_of_views_to_retry -gt 0 ]]; then
echo -e "\nTo be retried \"create_view_statements\" (${new_num_of_views_to_retry}):\n\n${all_create_view_statements[@]}\n" echo -e "\nTo be retried \"create_view_statements\" (${new_num_of_views_to_retry}):\n\n${all_create_view_statements[@]}\n"
@ -215,7 +228,7 @@ function copydb() {
echo -e "\n\nERROR: 1 OR MORE ENTITIES OF DB '${db}' FAILED TO BE COPIED TO IMPALA CLUSTER!\n\n" echo -e "\n\nERROR: 1 OR MORE ENTITIES OF DB '${db}' FAILED TO BE COPIED TO IMPALA CLUSTER!\n\n"
rm -f error.log rm -f error.log
if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then
exit 7 exit 8
fi fi
fi fi

View File

@ -72,6 +72,8 @@ function copydb() {
rm -f error.log rm -f error.log
if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then
exit 2 exit 2
else
return 2
fi fi
fi fi
@ -90,19 +92,30 @@ function copydb() {
-pb \ -pb \
${OCEAN_HDFS_NODE}/user/hive/warehouse/${db}.db ${IMPALA_HDFS_DB_BASE_PATH} ${OCEAN_HDFS_NODE}/user/hive/warehouse/${db}.db ${IMPALA_HDFS_DB_BASE_PATH}
# Check the exit status of the "hadoop distcp" command. if [ $? -eq 0 ]; then # Check the exit status of the "hadoop distcp" command.
if [ $? -eq 0 ]; then echo -e "\nSuccessfully copied the files of '${db}' from Ocean to Impala cluster.\n"
echo -e "\nSuccessfully copied the files of '${db}'.\n"
else else
echo -e "\n\nERROR: FAILED TO TRANSFER THE FILES OF '${db}', WITH 'hadoop distcp'. GOT EXIT STATUS: $?\n\n" echo -e "\n\nERROR: FAILED TO TRANSFER THE FILES OF '${db}', WITH 'hadoop distcp'. GOT EXIT STATUS: $?\n\n"
rm -f error.log rm -f error.log
if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then
exit 3 exit 3
else
return 3
fi fi
fi fi
# In case we ever use this script for a writable DB (using inserts/updates), we should perform the following costly operation as well.. # Give WRITE and EXECUTE permissions to the DBs' directory only, in order to be able to create more tables later, on top of that DB.
#hdfs dfs -conf ${IMPALA_CONFIG_FILE} -chmod -R 777 ${TEMP_SUBDIR_FULLPATH}/${db}.db hdfs dfs -conf ${IMPALA_CONFIG_FILE} -chmod u+wx ${IMPALA_HDFS_DB_BASE_PATH}/${db}.db
# In case we ever use this script for a writable DB (using inserts/updates), we should perform the costly recursive operation as well, using the "-R" param.
if [ $? -ne 0 ]; then # Check the exit status..
echo -e "\n\nERROR: FAILED TO ASSIGN WRITE AND EXECUTE PERMISSIONS TO THE DIRECTORY OF DB: '${db}'. GOT EXIT STATUS: $?\n\n"
rm -f error.log
if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then
exit 4
else
return 4
fi
fi
echo -e "\nCreating schema for db: '${db}'\n" echo -e "\nCreating schema for db: '${db}'\n"
@ -131,7 +144,7 @@ function copydb() {
if [ -z "$CURRENT_PRQ_FILE" ]; then # If there is not parquet-file inside. if [ -z "$CURRENT_PRQ_FILE" ]; then # If there is not parquet-file inside.
echo -e "\nERROR: THE TABLE \"${i}\" HAD NO FILES TO GET THE SCHEMA FROM! IT'S EMPTY!\n\n" echo -e "\nERROR: THE TABLE \"${i}\" HAD NO FILES TO GET THE SCHEMA FROM! IT'S EMPTY!\n\n"
if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then
exit 4 exit 5
fi fi
else else
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "create table ${db}.${i} like parquet '${CURRENT_PRQ_FILE}' stored as parquet;" |& tee error.log impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "create table ${db}.${i} like parquet '${CURRENT_PRQ_FILE}' stored as parquet;" |& tee error.log
@ -139,7 +152,7 @@ function copydb() {
if [ -n "$log_errors" ]; then if [ -n "$log_errors" ]; then
echo -e "\n\nERROR: THERE WAS A PROBLEM WHEN CREATING TABLE '${i}'!\n\n" echo -e "\n\nERROR: THERE WAS A PROBLEM WHEN CREATING TABLE '${i}'!\n\n"
if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then
exit 5 exit 6
fi fi
fi fi
fi fi
@ -185,7 +198,7 @@ function copydb() {
if [[ $new_num_of_views_to_retry -eq $previous_num_of_views_to_retry ]]; then if [[ $new_num_of_views_to_retry -eq $previous_num_of_views_to_retry ]]; then
echo -e "\n\nERROR: THE NUMBER OF VIEWS TO RETRY HAS NOT BEEN REDUCED! THE SCRIPT IS LIKELY GOING TO AN INFINITE-LOOP! EXITING..\n\n" echo -e "\n\nERROR: THE NUMBER OF VIEWS TO RETRY HAS NOT BEEN REDUCED! THE SCRIPT IS LIKELY GOING TO AN INFINITE-LOOP! EXITING..\n\n"
if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then
exit 6 exit 7
fi fi
elif [[ $new_num_of_views_to_retry -gt 0 ]]; then elif [[ $new_num_of_views_to_retry -gt 0 ]]; then
echo -e "\nTo be retried \"create_view_statements\" (${new_num_of_views_to_retry}):\n\n${all_create_view_statements[@]}\n" echo -e "\nTo be retried \"create_view_statements\" (${new_num_of_views_to_retry}):\n\n${all_create_view_statements[@]}\n"
@ -215,7 +228,7 @@ function copydb() {
echo -e "\n\nERROR: 1 OR MORE ENTITIES OF DB '${db}' FAILED TO BE COPIED TO IMPALA CLUSTER!\n\n" echo -e "\n\nERROR: 1 OR MORE ENTITIES OF DB '${db}' FAILED TO BE COPIED TO IMPALA CLUSTER!\n\n"
rm -f error.log rm -f error.log
if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then
exit 7 exit 8
fi fi
fi fi

View File

@ -74,6 +74,8 @@ function copydb() {
rm -f error.log rm -f error.log
if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then
exit 2 exit 2
else
return 2
fi fi
fi fi
@ -92,19 +94,30 @@ function copydb() {
-pb \ -pb \
${OCEAN_HDFS_NODE}/user/hive/warehouse/${db}.db ${IMPALA_HDFS_DB_BASE_PATH} ${OCEAN_HDFS_NODE}/user/hive/warehouse/${db}.db ${IMPALA_HDFS_DB_BASE_PATH}
# Check the exit status of the "hadoop distcp" command. if [ $? -eq 0 ]; then # Check the exit status of the "hadoop distcp" command.
if [ $? -eq 0 ]; then echo -e "\nSuccessfully copied the files of '${db}' from Ocean to Impala cluster.\n"
echo -e "\nSuccessfully copied the files of '${db}'.\n"
else else
echo -e "\n\nERROR: FAILED TO TRANSFER THE FILES OF '${db}', WITH 'hadoop distcp'. GOT EXIT STATUS: $?\n\n" echo -e "\n\nERROR: FAILED TO TRANSFER THE FILES OF '${db}', WITH 'hadoop distcp'. GOT EXIT STATUS: $?\n\n"
rm -f error.log rm -f error.log
if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then
exit 3 exit 3
else
return 3
fi fi
fi fi
# In case we ever use this script for a writable DB (using inserts/updates), we should perform the following costly operation as well.. # Give WRITE and EXECUTE permissions to the DBs' directory only, in order to be able to create more tables later, on top of that DB.
#hdfs dfs -conf ${IMPALA_CONFIG_FILE} -chmod -R 777 ${TEMP_SUBDIR_FULLPATH}/${db}.db hdfs dfs -conf ${IMPALA_CONFIG_FILE} -chmod u+wx ${IMPALA_HDFS_DB_BASE_PATH}/${db}.db
# In case we ever use this script for a writable DB (using inserts/updates), we should perform the costly recursive operation as well, using the "-R" param.
if [ $? -ne 0 ]; then # Check the exit status..
echo -e "\n\nERROR: FAILED TO ASSIGN WRITE AND EXECUTE PERMISSIONS TO THE DIRECTORY OF DB: '${db}'. GOT EXIT STATUS: $?\n\n"
rm -f error.log
if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then
exit 4
else
return 4
fi
fi
echo -e "\nCreating schema for db: '${db}'\n" echo -e "\nCreating schema for db: '${db}'\n"
@ -133,7 +146,7 @@ function copydb() {
if [ -z "$CURRENT_PRQ_FILE" ]; then # If there is not parquet-file inside. if [ -z "$CURRENT_PRQ_FILE" ]; then # If there is not parquet-file inside.
echo -e "\nERROR: THE TABLE \"${i}\" HAD NO FILES TO GET THE SCHEMA FROM! IT'S EMPTY!\n\n" echo -e "\nERROR: THE TABLE \"${i}\" HAD NO FILES TO GET THE SCHEMA FROM! IT'S EMPTY!\n\n"
if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then
exit 4 exit 5
fi fi
else else
impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "create table ${db}.${i} like parquet '${CURRENT_PRQ_FILE}' stored as parquet;" |& tee error.log impala-shell --user ${HADOOP_USER_NAME} -i ${IMPALA_HOSTNAME} -q "create table ${db}.${i} like parquet '${CURRENT_PRQ_FILE}' stored as parquet;" |& tee error.log
@ -141,7 +154,7 @@ function copydb() {
if [ -n "$log_errors" ]; then if [ -n "$log_errors" ]; then
echo -e "\n\nERROR: THERE WAS A PROBLEM WHEN CREATING TABLE '${i}'!\n\n" echo -e "\n\nERROR: THERE WAS A PROBLEM WHEN CREATING TABLE '${i}'!\n\n"
if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then
exit 5 exit 6
fi fi
fi fi
fi fi
@ -187,7 +200,7 @@ function copydb() {
if [[ $new_num_of_views_to_retry -eq $previous_num_of_views_to_retry ]]; then if [[ $new_num_of_views_to_retry -eq $previous_num_of_views_to_retry ]]; then
echo -e "\n\nERROR: THE NUMBER OF VIEWS TO RETRY HAS NOT BEEN REDUCED! THE SCRIPT IS LIKELY GOING TO AN INFINITE-LOOP! EXITING..\n\n" echo -e "\n\nERROR: THE NUMBER OF VIEWS TO RETRY HAS NOT BEEN REDUCED! THE SCRIPT IS LIKELY GOING TO AN INFINITE-LOOP! EXITING..\n\n"
if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then
exit 6 exit 7
fi fi
elif [[ $new_num_of_views_to_retry -gt 0 ]]; then elif [[ $new_num_of_views_to_retry -gt 0 ]]; then
echo -e "\nTo be retried \"create_view_statements\" (${new_num_of_views_to_retry}):\n\n${all_create_view_statements[@]}\n" echo -e "\nTo be retried \"create_view_statements\" (${new_num_of_views_to_retry}):\n\n${all_create_view_statements[@]}\n"
@ -217,7 +230,7 @@ function copydb() {
echo -e "\n\nERROR: 1 OR MORE ENTITIES OF DB '${db}' FAILED TO BE COPIED TO IMPALA CLUSTER!\n\n" echo -e "\n\nERROR: 1 OR MORE ENTITIES OF DB '${db}' FAILED TO BE COPIED TO IMPALA CLUSTER!\n\n"
rm -f error.log rm -f error.log
if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then if [[ SHOULD_EXIT_WHOLE_SCRIPT_UPON_ERROR -eq 1 ]]; then
exit 7 exit 8
fi fi
fi fi

View File

@ -960,7 +960,7 @@
<commons.logging.version>1.1.3</commons.logging.version> <commons.logging.version>1.1.3</commons.logging.version>
<commons-validator.version>1.7</commons-validator.version> <commons-validator.version>1.7</commons-validator.version>
<dateparser.version>1.0.7</dateparser.version> <dateparser.version>1.0.7</dateparser.version>
<dhp-schemas.version>[6.1.2]</dhp-schemas.version> <dhp-schemas.version>[6.1.3-SNAPSHOT]</dhp-schemas.version>
<dhp.cdh.version>cdh5.9.2</dhp.cdh.version> <dhp.cdh.version>cdh5.9.2</dhp.cdh.version>
<dhp.commons.lang.version>3.5</dhp.commons.lang.version> <dhp.commons.lang.version>3.5</dhp.commons.lang.version>
<dhp.guava.version>11.0.2</dhp.guava.version> <dhp.guava.version>11.0.2</dhp.guava.version>