forked from D-Net/dnet-hadoop
fixed issues
This commit is contained in:
parent
f34e63ffc0
commit
21399fc433
|
@ -109,12 +109,15 @@ public class SparkPatchRefereed implements Serializable {
|
||||||
private static <R extends Result> R updateRefereed(Tuple2<R, ResultInstance> value) {
|
private static <R extends Result> R updateRefereed(Tuple2<R, ResultInstance> value) {
|
||||||
R r = value._1();
|
R r = value._1();
|
||||||
Optional<ResultInstance> oInstanceList = Optional.ofNullable(value._2());
|
Optional<ResultInstance> oInstanceList = Optional.ofNullable(value._2());
|
||||||
List<Instance> resultInstance = r.getInstance();
|
Optional<List<Instance>> oresultInstance = Optional.ofNullable(r.getInstance());
|
||||||
|
if (!oresultInstance.isPresent()) {
|
||||||
|
return r;
|
||||||
|
}
|
||||||
if (oInstanceList.isPresent()) {
|
if (oInstanceList.isPresent()) {
|
||||||
List<Instance> instanceList = oInstanceList.get().getInstanceList();
|
List<Instance> instanceList = oInstanceList.get().getInstanceList();
|
||||||
resultInstance.forEach(i -> checkEquivalence(instanceList, i));
|
oresultInstance.get().forEach(i -> checkEquivalence(instanceList, i));
|
||||||
} else {
|
} else {
|
||||||
r.getInstance().forEach(i -> i.setRefereed(Constants.DEFAULT_REFEREED));
|
oresultInstance.get().forEach(i -> i.setRefereed(Constants.DEFAULT_REFEREED));
|
||||||
}
|
}
|
||||||
return r;
|
return r;
|
||||||
}
|
}
|
||||||
|
@ -151,6 +154,14 @@ public class SparkPatchRefereed implements Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean equals(List<String> url, List<String> url1) {
|
private static boolean equals(List<String> url, List<String> url1) {
|
||||||
|
if (url == null && url1 == null) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((url1 == null && url != null) || (url != null && url1 == null)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
if (url.size() != url1.size()) {
|
if (url.size() != url1.size()) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
|
@ -33,7 +34,7 @@ public class SparkPrepareResultInstanceList implements Serializable {
|
||||||
|
|
||||||
String jsonConfiguration = IOUtils
|
String jsonConfiguration = IOUtils
|
||||||
.toString(
|
.toString(
|
||||||
SparkPrepareResultInstanceList.class
|
SparkPrepareResultInstanceList.class
|
||||||
.getResourceAsStream(
|
.getResourceAsStream(
|
||||||
"/eu/dnetlib/dhp/patchrefereed/prepare_parameters.json"));
|
"/eu/dnetlib/dhp/patchrefereed/prepare_parameters.json"));
|
||||||
|
|
||||||
|
@ -85,7 +86,7 @@ public class SparkPrepareResultInstanceList implements Serializable {
|
||||||
|
|
||||||
Dataset<R> result = readPath(spark, inputPath, resultClazz);
|
Dataset<R> result = readPath(spark, inputPath, resultClazz);
|
||||||
|
|
||||||
//log.info("number of results: {}", result.count());
|
// log.info("number of results: {}", result.count());
|
||||||
result.map(r -> {
|
result.map(r -> {
|
||||||
ResultInstance ri = null;
|
ResultInstance ri = null;
|
||||||
List<Instance> instanceList = Optional
|
List<Instance> instanceList = Optional
|
||||||
|
|
|
@ -220,6 +220,9 @@
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.speculation=false
|
||||||
|
--conf spark.hadoop.mapreduce.map.speculative=false
|
||||||
|
--conf spark.hadoop.mapreduce.reduce.speculative=false
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
||||||
<arg>--inputPath</arg><arg>${inputPathProd}/publication</arg>
|
<arg>--inputPath</arg><arg>${inputPathProd}/publication</arg>
|
||||||
|
@ -234,7 +237,7 @@
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn-cluster</master>
|
<master>yarn-cluster</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>pathcRefereed-publication</name>
|
<name>pathcRefereed-dataset</name>
|
||||||
<class>eu.dnetlib.dhp.patchrefereed.SparkPatchRefereed</class>
|
<class>eu.dnetlib.dhp.patchrefereed.SparkPatchRefereed</class>
|
||||||
<jar>dhp-patch-${projectVersion}.jar</jar>
|
<jar>dhp-patch-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
|
@ -246,6 +249,9 @@
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.speculation=false
|
||||||
|
--conf spark.hadoop.mapreduce.map.speculative=false
|
||||||
|
--conf spark.hadoop.mapreduce.reduce.speculative=false
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--inputPath</arg><arg>${inputPathProd}/dataset</arg>
|
<arg>--inputPath</arg><arg>${inputPathProd}/dataset</arg>
|
||||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
||||||
|
@ -260,7 +266,7 @@
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn-cluster</master>
|
<master>yarn-cluster</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>pathcRefereed-publication</name>
|
<name>pathcRefereed-software</name>
|
||||||
<class>eu.dnetlib.dhp.patchrefereed.SparkPatchRefereed</class>
|
<class>eu.dnetlib.dhp.patchrefereed.SparkPatchRefereed</class>
|
||||||
<jar>dhp-patch-${projectVersion}.jar</jar>
|
<jar>dhp-patch-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
|
@ -272,6 +278,9 @@
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.speculation=false
|
||||||
|
--conf spark.hadoop.mapreduce.map.speculative=false
|
||||||
|
--conf spark.hadoop.mapreduce.reduce.speculative=false
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--inputPath</arg><arg>${inputPathProd}/software</arg>
|
<arg>--inputPath</arg><arg>${inputPathProd}/software</arg>
|
||||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
||||||
|
@ -286,7 +295,7 @@
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn-cluster</master>
|
<master>yarn-cluster</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>pathcRefereed-publication</name>
|
<name>pathcRefereed-orp</name>
|
||||||
<class>eu.dnetlib.dhp.patchrefereed.SparkPatchRefereed</class>
|
<class>eu.dnetlib.dhp.patchrefereed.SparkPatchRefereed</class>
|
||||||
<jar>dhp-patch-${projectVersion}.jar</jar>
|
<jar>dhp-patch-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
|
@ -298,6 +307,9 @@
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.speculation=false
|
||||||
|
--conf spark.hadoop.mapreduce.map.speculative=false
|
||||||
|
--conf spark.hadoop.mapreduce.reduce.speculative=false
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--inputPath</arg><arg>${inputPathProd}/otherresearchproduct</arg>
|
<arg>--inputPath</arg><arg>${inputPathProd}/otherresearchproduct</arg>
|
||||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
||||||
|
|
|
@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||||
|
import jdk.nashorn.internal.ir.annotations.Ignore;
|
||||||
|
|
||||||
public class PatchRefereedTest {
|
public class PatchRefereedTest {
|
||||||
|
|
||||||
|
@ -182,11 +183,11 @@ public class PatchRefereedTest {
|
||||||
new String[] {
|
new String[] {
|
||||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||||
"-inputPath",
|
"-inputPath",
|
||||||
getClass().getResource("/eu/dnetlib/dhp/patchrefereed/prod_publication.json").getPath(),
|
getClass().getResource("/eu/dnetlib/dhp/patchrefereed/simpleTest/prod_publication2.json").getPath(),
|
||||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
|
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
|
||||||
"-outputPath", workingDir.toString() + "/publication",
|
"-outputPath", workingDir.toString() + "/publication",
|
||||||
"-preparedInfoPath",
|
"-preparedInfoPath",
|
||||||
getClass().getResource("/eu/dnetlib/dhp/patchrefereed/preparedInfo.json").getPath()
|
getClass().getResource("/eu/dnetlib/dhp/patchrefereed/simpleTest/preparedInfo4.json").getPath()
|
||||||
});
|
});
|
||||||
|
|
||||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
|
@ -224,7 +225,7 @@ public class PatchRefereedTest {
|
||||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
|
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
|
||||||
"-outputPath", workingDir.toString() + "/publication",
|
"-outputPath", workingDir.toString() + "/publication",
|
||||||
"-preparedInfoPath",
|
"-preparedInfoPath",
|
||||||
getClass().getResource("/eu/dnetlib/dhp/patchrefereed/preparedInfo.json").getPath()
|
getClass().getResource("/eu/dnetlib/dhp/patchrefereed/simpleTest/preparedInfo4.json").getPath()
|
||||||
});
|
});
|
||||||
|
|
||||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
|
|
Loading…
Reference in New Issue