enrichment steps #38

Merged
claudio.atzori merged 334 commits from miriam.baglioni/dnet-hadoop:master into enrichment_wfs 2020-08-11 16:40:26 +02:00
3 changed files with 7 additions and 4 deletions
Showing only changes of commit f53e42bda7 - Show all commits

View File

@ -4,7 +4,7 @@
<parent> <parent>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<version>1.1.7-SNAPSHOT</version> <version>1.2.1-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@ -57,6 +57,7 @@ public class OrcidDownloader extends OrcidDSManager {
} catch (Throwable e) { } catch (Throwable e) {
Log.warn("Downloading " + orcidId, e.getMessage()); Log.warn("Downloading " + orcidId, e.getMessage());
} }
return new String(""); return new String("");
} }
@ -147,6 +148,9 @@ public class OrcidDownloader extends OrcidDSManager {
+ downloadedRecordsCounter + downloadedRecordsCounter
+ " saved: " + " saved: "
+ savedRecordsCounter); + savedRecordsCounter);
if (parsedRecordsCounter > REQ_MAX_TEST) {
break;
}
} }
} }
long endDownload = System.currentTimeMillis(); long endDownload = System.currentTimeMillis();
@ -194,7 +198,6 @@ public class OrcidDownloader extends OrcidDSManager {
Log.warn("[" + orcidId + "] Parsing date: ", e.getMessage()); Log.warn("[" + orcidId + "] Parsing date: ", e.getMessage());
return true; return true;
} }
return modifiedDateDt.after(lastUpdateDt); return modifiedDateDt.after(lastUpdateDt);
} }
} }

View File

@ -16,7 +16,7 @@ class MAGMappingTest {
val mapper = new ObjectMapper() val mapper = new ObjectMapper()
@Test //@Test
def testMAGCSV(): Unit = { def testMAGCSV(): Unit = {
val conf: SparkConf = new SparkConf() val conf: SparkConf = new SparkConf()
@ -31,7 +31,7 @@ class MAGMappingTest {
import spark.implicits._ import spark.implicits._
val d: Dataset[Papers] = spark.read.load("/data/doiboost/mag/datasets/Papers").as[Papers] val d: Dataset[Papers] = spark.read.load("/data/doiboost/mag/datasets/Papers").as[Papers]
logger.info(s"Total number of element: ${d.where(col("Doi").isNotNull).count()}") logger.info(s"Total number of element: ${d.where(col("Doi").isNotNull).count()}")
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Papers] //implicit val mapEncoder = org.apache.spark.sql.Encoders.bean[Papers]
val result: RDD[Papers] = d.where(col("Doi").isNotNull).rdd.map { p: Papers => Tuple2(p.Doi, p) }.reduceByKey {case (p1:Papers, p2:Papers) => val result: RDD[Papers] = d.where(col("Doi").isNotNull).rdd.map { p: Papers => Tuple2(p.Doi, p) }.reduceByKey {case (p1:Papers, p2:Papers) =>
var r = if (p1==null) p2 else p1 var r = if (p1==null) p2 else p1
if (p1!=null && p2!=null ) if (p1.CreatedDate.before(p2.CreatedDate)) if (p1!=null && p2!=null ) if (p1.CreatedDate.before(p2.CreatedDate))