added accumulator; last modified date of the record is added to saved data; lambda file is partitioned into 20 parts before starting downloading

This commit is contained in:
Enrico Ottonello 2020-05-18 19:51:29 +02:00
parent 0b29bb7e3b
commit fc80e8c7de
4 changed files with 124 additions and 7 deletions

View File

@ -6,6 +6,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.IOException; import java.io.IOException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Date; import java.util.Date;
import java.util.List;
import java.util.Optional; import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -20,6 +21,7 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
import org.apache.spark.util.LongAccumulator;
import org.mortbay.log.Log; import org.mortbay.log.Log;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -61,23 +63,53 @@ public class SparkOrcidGenerateAuthors {
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<String> lamdaFileRDD = sc.textFile(workingPath + "last_modified.csv");
LongAccumulator parsedRecordsAcc = sc.sc().longAccumulator("parsedRecords");
LongAccumulator modifiedRecordsAcc = sc.sc().longAccumulator("modifiedRecords");
LongAccumulator downloadedRecordsAcc = sc.sc().longAccumulator("downloadedRecords");
LongAccumulator alreadyDownloadedRecords = sc.sc().longAccumulator("alreadyDownloadedRecords");
JavaRDD<String> lamdaFileRDD = sc.textFile(workingPath + "lamdafiles");
JavaRDD<String> downloadedRDD = sc.textFile(workingPath + "downloaded");
Function<String, String> getOrcidIdFunction = line -> {
try {
String[] values = line.split(",");
return values[0].substring(1);
} catch (Exception e) {
return new String("");
}
};
List<String> downloadedRecords = downloadedRDD.map(getOrcidIdFunction).collect();
Function<String, Boolean> isModifiedAfterFilter = line -> { Function<String, Boolean> isModifiedAfterFilter = line -> {
String[] values = line.split(","); String[] values = line.split(",");
String orcidId = values[0]; String orcidId = values[0];
parsedRecordsAcc.add(1);
if (isModified(orcidId, values[3])) { if (isModified(orcidId, values[3])) {
modifiedRecordsAcc.add(1);
return true; return true;
} }
return false; return false;
}; };
Function<String, Boolean> isNotDownloadedFilter = line -> {
String[] values = line.split(",");
String orcidId = values[0];
if (downloadedRecords.contains(orcidId)) {
alreadyDownloadedRecords.add(1);
return false;
}
return true;
};
Function<String, Tuple2<String, String>> downloadRecordFunction = line -> { Function<String, Tuple2<String, String>> downloadRecordFunction = line -> {
String[] values = line.split(","); String[] values = line.split(",");
String orcidId = values[0]; String orcidId = values[0];
return downloadRecord(orcidId, token); String modifiedDate = values[3];
return downloadRecord(orcidId, modifiedDate, token, downloadedRecordsAcc);
}; };
lamdaFileRDD lamdaFileRDD
.filter(isModifiedAfterFilter) .filter(isModifiedAfterFilter)
.filter(isNotDownloadedFilter)
.map(downloadRecordFunction) .map(downloadRecordFunction)
.rdd() .rdd()
.saveAsTextFile(workingPath.concat(outputAuthorsPath)); .saveAsTextFile(workingPath.concat(outputAuthorsPath));
@ -101,9 +133,11 @@ public class SparkOrcidGenerateAuthors {
return modifiedDateDt.after(lastUpdateDt); return modifiedDateDt.after(lastUpdateDt);
} }
private static Tuple2<String, String> downloadRecord(String orcidId, String token) { private static Tuple2<String, String> downloadRecord(String orcidId, String modifiedDate, String token,
LongAccumulator downloadedRecordsAcc) {
final DownloadedRecordData data = new DownloadedRecordData(); final DownloadedRecordData data = new DownloadedRecordData();
data.setOrcidId(orcidId); data.setOrcidId(orcidId);
data.setModifiedDate(modifiedDate);
try (CloseableHttpClient client = HttpClients.createDefault()) { try (CloseableHttpClient client = HttpClients.createDefault()) {
HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + orcidId + "/record"); HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + orcidId + "/record");
httpGet.addHeader("Accept", "application/vnd.orcid+xml"); httpGet.addHeader("Accept", "application/vnd.orcid+xml");
@ -117,6 +151,7 @@ public class SparkOrcidGenerateAuthors {
"Downloading " + orcidId + " status code: " + response.getStatusLine().getStatusCode()); "Downloading " + orcidId + " status code: " + response.getStatusLine().getStatusCode());
return data.toTuple2(); return data.toTuple2();
} }
downloadedRecordsAcc.add(1);
data data
.setCompressedData( .setCompressedData(
ArgumentApplicationParser.compressArgument(IOUtils.toString(response.getEntity().getContent()))); ArgumentApplicationParser.compressArgument(IOUtils.toString(response.getEntity().getContent())));

View File

@ -0,0 +1,50 @@
package eu.dnetlib.doiboost.orcid;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.IOException;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
public class SparkPartitionLambdaFile {
public static void main(String[] args) throws IOException, Exception {
Logger logger = LoggerFactory.getLogger(SparkOrcidGenerateAuthors.class);
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkOrcidGenerateAuthors.class
.getResourceAsStream(
"/eu/dnetlib/dhp/doiboost/gen_orcid_authors_parameters.json")));
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
final String workingPath = parser.get("workingPath");
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<String> lamdaFileRDD = sc.textFile(workingPath + "last_modified.csv");
lamdaFileRDD
.repartition(20)
.saveAsTextFile(workingPath.concat("lamdafiles"));
});
}
}

View File

@ -12,6 +12,7 @@ import scala.Tuple2;
public class DownloadedRecordData implements Serializable { public class DownloadedRecordData implements Serializable {
private String orcidId; private String orcidId;
private String modifiedDate;
private String statusCode; private String statusCode;
private String compressedData; private String compressedData;
private String errorMessage; private String errorMessage;
@ -19,6 +20,7 @@ public class DownloadedRecordData implements Serializable {
public Tuple2<String, String> toTuple2() { public Tuple2<String, String> toTuple2() {
JsonObject data = new JsonObject(); JsonObject data = new JsonObject();
data.addProperty("statusCode", getStatusCode()); data.addProperty("statusCode", getStatusCode());
data.addProperty("modifiedDate", getModifiedDate());
if (getCompressedData() != null) { if (getCompressedData() != null) {
data.addProperty("compressedData", getCompressedData()); data.addProperty("compressedData", getCompressedData());
} }
@ -45,7 +47,11 @@ public class DownloadedRecordData implements Serializable {
} }
public int getStatusCode() { public int getStatusCode() {
return Integer.parseInt(statusCode); try {
return Integer.parseInt(statusCode);
} catch (Exception e) {
return -2;
}
} }
public void setStatusCode(int statusCode) { public void setStatusCode(int statusCode) {
@ -60,4 +66,11 @@ public class DownloadedRecordData implements Serializable {
this.compressedData = compressedData; this.compressedData = compressedData;
} }
public String getModifiedDate() {
return modifiedDate;
}
public void setModifiedDate(String modifiedDate) {
this.modifiedDate = modifiedDate;
}
} }

View File

@ -37,14 +37,14 @@
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="Gen_Orcid_Authors"> <action name="Split_Lambda_File">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker> <job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node> <name-node>${nameNode}</name-node>
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Gen_Orcid_Authors</name> <name>Split_Lambda_File</name>
<class>eu.dnetlib.doiboost.orcid.SparkOrcidGenerateAuthors</class> <class>eu.dnetlib.doiboost.orcid.SparkPartitionLambdaFile</class>
<jar>dhp-doiboost-1.2.1-SNAPSHOT.jar</jar> <jar>dhp-doiboost-1.2.1-SNAPSHOT.jar</jar>
<spark-opts>--num-executors 24 --conf spark.yarn.jars=&quot;hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2&quot; --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} <spark-opts>--num-executors 24 --conf spark.yarn.jars=&quot;hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2&quot; --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory}
</spark-opts> </spark-opts>
@ -56,5 +56,24 @@
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="Gen_Orcid_Authors">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn</master>
<mode>cluster</mode>
<name>Gen_Orcid_Authors</name>
<class>eu.dnetlib.doiboost.orcid.SparkOrcidGenerateAuthors</class>
<jar>dhp-doiboost-1.2.1-SNAPSHOT.jar</jar>
<spark-opts>--num-executors 20 --conf spark.yarn.jars=&quot;hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2&quot; --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory}
</spark-opts>
<arg>-w</arg><arg>${workingPath}/</arg>
<arg>-o</arg><arg>authors/</arg>
<arg>-t</arg><arg>${token}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/> <end name="End"/>
</workflow-app> </workflow-app>