enrichment steps #38

Merged
claudio.atzori merged 334 commits from miriam.baglioni/dnet-hadoop:master into enrichment_wfs 2020-08-11 16:40:26 +02:00
14 changed files with 436 additions and 105 deletions
Showing only changes of commit 618bc1fc72 - Show all commits

View File

@ -11,19 +11,15 @@
<artifactId>dhp-doiboost</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.3.4</version>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-common</artifactId>
@ -34,7 +30,6 @@
<artifactId>cxf-rt-transports-http</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
@ -46,6 +41,17 @@
<artifactId>json-path</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
</dependencies>

View File

@ -1,16 +0,0 @@
package eu.dnetlib.doiboost
case class Journal(
JournalId: Long,
Rank: Int,
NormalizedName: String,
DisplayName: String,
Issn: String,
Publisher: String,
Webpage: String,
PaperCount: Long,
CitationCount: Long,
CreatedDate: String
)

View File

@ -1,49 +0,0 @@
package eu.dnetlib.doiboost
//import org.apache.spark.SparkConf
//import org.apache.spark.sql.{Dataset, Encoders, Row, SparkSession}
//
//object SparkDownloadContentFromCrossref {
//
//
// def main(args: Array[String]): Unit = {
//
//
// val conf: SparkConf = new SparkConf().setAppName("DownloadContentFromCrossref").setMaster("local[*]")
//
// val spark = SparkSession.builder().config(conf).getOrCreate()
//
//
// val sc = spark.sparkContext
// import spark.implicits._
// spark.read.option("header", "false")
// .option("delimiter", "\t")
// .csv("/Users/sandro/Downloads/doiboost/mag_Journals.txt.gz")
//
//
// val d = spark.read.option("header", "false")
// .option("delimiter", "\t")
// .csv("/Users/sandro/Downloads/doiboost/mag_Journals.txt.gz")
// .map(f =>
// Journal( f.getAs[String](0).toLong, f.getAs[String](1).toInt, f.getAs[String](2),
// f.getAs[String](3), f.getAs[String](4), f.getAs[String](5), f.getAs[String](6),
// f.getAs[String](7).toLong, f.getAs[String](8).toLong, f.getAs[String](9)
// ))
//
// d.show()
//
// d.printSchema()
//
//
//
//
//
//
//
//
// }
//
//
//}
//

View File

@ -0,0 +1,117 @@
package eu.dnetlib.doiboost.crossref
import eu.dnetlib.dhp.schema.oaf._
import org.json4s
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods._
import org.slf4j.Logger
import scala.collection.JavaConverters._
class Crossref2Oaf {
val PID_TYPES = "dnet:pid_types"
val mappingCrossrefType = Map(
"book-section" -> "publication",
"book" -> "publication",
"book-chapter" -> "publication",
"book-part" -> "publication",
"book-series" -> "publication",
"book-set" -> "publication",
"book-track" -> "publication",
"edited-book" -> "publication",
"reference-book" -> "publication",
"monograph" -> "publication",
"journal-article" -> "publication",
"dissertation" -> "publication",
"other" -> "publication",
"peer-review" -> "publication",
"proceedings" -> "publication",
"proceedings-article" -> "publication",
"reference-entry" -> "publication",
"report" -> "publication",
"report-series" -> "publication",
"standard" -> "publication",
"standard-series" -> "publication",
"posted-content"-> "publication",
"dataset" -> "dataset"
)
val mappingCrossrefSubType = Map(
"book-section" -> "0013 Part of book or chapter of book",
"book" -> "0002 Book",
"book-chapter" -> "0013 Part of book or chapter of book",
"book-part" -> "0013 Part of book or chapter of book",
"book-series" -> "0002 Book",
"book-set" -> "0002 Book",
"book-track" -> "0002 Book",
"edited-book" -> "0002 Book",
"reference-book" -> "0002 Book",
"monograph" -> "0002 Book",
"journal-article" -> "0001 Article",
"dissertation" -> "0006 Doctoral thesis",
"other" -> "0038 Other literature type",
"peer-review" -> "0015 Review",
"proceedings" -> "0004 Conference object",
"proceedings-article" -> "0004 Conference object",
"reference-entry" -> "0013 Part of book or chapter of book",
"report" -> "0017 Report",
"report-series" -> "0017 Report",
"standard" -> "0038 Other literature type",
"standard-series" -> "0038 Other literature type",
"dataset"-> "0021 Dataset",
"preprint"-> "0016 Preprint",
"report"-> "0017 Report"
)
def convert(input: String, logger:Logger): Result = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: json4s.JValue = parse(input)
val objectType = (json \ "type").extractOrElse[String](null)
val objectSubType = (json \ "subtype").extractOrElse[String](null)
if (objectType == null)
return null
val result = generateItemFromType(objectType, objectSubType)
if (result == null)
return result
val cOBJCategory = mappingCrossrefSubType.getOrElse(objectType,mappingCrossrefSubType.getOrElse(objectSubType,"0038 Other literature type"));
logger.info(mappingCrossrefType(objectType))
logger.info(cOBJCategory)
val doi:String = (json \ "DOI").extract[String]
val pid = new StructuredProperty()
pid.setValue(doi)
pid.setQualifier(new Qualifier)
result.setPid(List(createSP(doi,"doi", PID_TYPES)).asJava)
logger.info(doi)
result
}
def createSP(value:String, classId:String, schemeId:String ):StructuredProperty = {
val sp = new StructuredProperty
val q = new Qualifier
q.setClassid(classId)
q.setClassname(classId)
q.setSchemeid(schemeId)
q.setSchemename(schemeId )
sp.setValue(value)
sp.setQualifier(q)
sp
}
def generateItemFromType (objectType:String, objectSubType:String):Result = {
if (mappingCrossrefType.contains(objectType)){
if (mappingCrossrefType(objectType).equalsIgnoreCase("publication"))
return new Publication()
if (mappingCrossrefType(objectType).equalsIgnoreCase("dataset"))
return new Dataset()
}
null
}
}

View File

@ -1,14 +1,19 @@
package eu.dnetlib.doiboost;
package eu.dnetlib.doiboost.crossref;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.http.HttpHost;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.util.zip.Inflater;
public class CrossrefImporter {
@ -18,13 +23,18 @@ public class CrossrefImporter {
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(CrossrefImporter.class.getResourceAsStream("/eu/dnetlib/dhp/doiboost/import_from_es.json")));
Logger logger = LoggerFactory.getLogger(CrossrefImporter.class);
parser.parseArgument(args);
System.out.println(parser.get("targetPath"));
final String hdfsuri = parser.get("namenode");
System.out.println(hdfsuri);
logger.info("HDFS URI"+hdfsuri);
Path hdfswritepath = new Path(parser.get("targetPath"));
logger.info("TargetPath: "+hdfsuri);
final Long timestamp = StringUtils.isNotBlank(parser.get("timestamp"))?Long.parseLong(parser.get("timestamp")):-1;
if(timestamp>0)
logger.info("Timestamp added "+timestamp);
// ====== Init HDFS File System Object
@ -37,13 +47,12 @@ public class CrossrefImporter {
ESClient client = new ESClient("ip-90-147-167-25.ct1.garrservices.it", "crossref");
ESClient client = timestamp>0?new ESClient("ip-90-147-167-25.ct1.garrservices.it", "crossref", timestamp):new ESClient("ip-90-147-167-25.ct1.garrservices.it", "crossref");
try (SequenceFile.Writer writer = SequenceFile.createWriter(conf,
SequenceFile.Writer.file(hdfswritepath), SequenceFile.Writer.keyClass(IntWritable.class),
SequenceFile.Writer.valueClass(Text.class))) {
int i = 0;
long start= System.currentTimeMillis();
long end = 0;
@ -53,13 +62,32 @@ public class CrossrefImporter {
key.set(i++);
value.set(client.next());
writer.append(key, value);
if (i % 100000 == 0) {
if (i % 1000000 == 0) {
end = System.currentTimeMillis();
final float time = (end - start) / 1000;
System.out.println(String.format("Imported %d records last 100000 imported in %f seconds", i, time));
final float time = (end - start) / 1000.0F;
logger.info(String.format("Imported %d records last 100000 imported in %f seconds", i, time));
start = System.currentTimeMillis();
}
}
}
}
public static String decompressBlob(final String blob) {
try {
byte[] byteArray = Base64.decodeBase64(blob.getBytes());
final Inflater decompresser = new Inflater();
decompresser.setInput(byteArray);
final ByteArrayOutputStream bos = new ByteArrayOutputStream(byteArray.length);
byte[] buffer = new byte[8192];
while (!decompresser.finished()) {
int size = decompresser.inflate(buffer);
bos.write(buffer, 0, size);
}
byte[] unzippeddata = bos.toByteArray();
decompresser.end();
return new String(unzippeddata);
} catch (Throwable e) {
throw new RuntimeException("Wrong record:" + blob,e);
}
}
}

View File

@ -1,4 +1,4 @@
package eu.dnetlib.doiboost;
package eu.dnetlib.doiboost.crossref;
import com.jayway.jsonpath.JsonPath;
import org.apache.commons.io.IOUtils;
@ -7,34 +7,45 @@ import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
public class ESClient implements Iterator<String> {
private final static Logger logger = LoggerFactory.getLogger(ESClient.class);
final static String blobPath = "$.hits[*].hits[*]._source.blob";
final static String scrollIdPath = "$._scroll_id";
final static String JSON_NO_TS ="{\"size\":1000}";
final static String JSON_WITH_TS ="{\"size\":1000, \"query\":{\"range\":{\"timestamp\":{\"gte\":%d}}}}";
final static String JSON_SCROLL = "{\"scroll_id\":\"%s\",\"scroll\" : \"1m\"}";
String scrollId;
private final String scrollId;
List<String> buffer;
private List<String> buffer;
private final String esHost;
final String esHost;
final String esIndex;
public ESClient(final String esHost, final String esIndex) throws IOException {
this.esHost = esHost;
this.esIndex = esIndex;
final String body =getResponse(String.format("http://%s:9200/%s/_search?scroll=1m", esHost, esIndex), "{\"size\":1000}");
final String body =getResponse(String.format("http://%s:9200/%s/_search?scroll=1m", esHost, esIndex), JSON_NO_TS);
scrollId= getJPathString(scrollIdPath, body);
buffer = getBlobs(body);
}
public ESClient(final String esHost, final String esIndex, final long timestamp) throws IOException {
this.esHost = esHost;
final String body =getResponse(String.format("http://%s:9200/%s/_search?scroll=1m", esHost, esIndex), String.format(JSON_WITH_TS, timestamp));
scrollId= getJPathString(scrollIdPath, body);
buffer = getBlobs(body);
}
private String getResponse(final String url,final String json ) {
CloseableHttpClient client = HttpClients.createDefault();
try {
@ -77,7 +88,6 @@ public class ESClient implements Iterator<String> {
return res;
}
@Override
public boolean hasNext() {
return (buffer!= null && !buffer.isEmpty());
@ -88,15 +98,14 @@ public class ESClient implements Iterator<String> {
public String next() {
final String nextItem = buffer.remove(0);
if (buffer.isEmpty()) {
final String json_param = String.format("{\"scroll_id\":\"%s\",\"scroll\" : \"1m\"}", scrollId);
final String json_param = String.format(JSON_SCROLL, scrollId);
final String body =getResponse(String.format("http://%s:9200/_search/scroll", esHost), json_param);
try {
buffer = getBlobs(body);
} catch (Throwable e) {
System.out.println(body);
logger.error("Error on get next page: body:"+body);
}
}
return nextItem;
}

View File

@ -0,0 +1,60 @@
package eu.dnetlib.doiboost.crossref
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import org.apache.commons.io.IOUtils
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.{Logger, LoggerFactory}
case class Reference(author:String, firstPage:String) {}
object SparkMapDumpIntoOAF {
def main(args: Array[String]): Unit = {
val logger:Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkMapDumpIntoOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(SparkMapDumpIntoOAF.getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
val sc = spark.sparkContext
val x: String = sc.sequenceFile(parser.get("sourcePath"), classOf[IntWritable], classOf[Text])
.map(k => k._2.toString).first()
val item =CrossrefImporter.decompressBlob(x)
logger.info(item)
// lazy val json: json4s.JValue = parse(item)
//
//
// val references = for {
// JArray(references) <- json \\ "reference"
// JObject(reference) <- references
// JField("first-page", JString(firstPage)) <- reference
// JField("author", JString(author)) <- reference
// } yield Reference(author, firstPage)
//
//
//
//
// logger.info((json \ "created" \ "timestamp").extractOrElse("missing"))
// logger.info(references.toString())
//
// logger.info((json \ "type").extractOrElse("missing"))
}
}

View File

@ -28,7 +28,7 @@
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.doiboost.CrossrefImporter</main-class>
<main-class>eu.dnetlib.doiboost.crossref.CrossrefImporter</main-class>
<arg>-t</arg><arg>${workingPath}/input/crossref/index_dump</arg>
<arg>-n</arg><arg>${nameNode}</arg>
</java>

View File

@ -0,0 +1,5 @@
[
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true},
{"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true}
]

View File

@ -1,4 +1,5 @@
[
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the sequencial file to write", "paramRequired": true},
{"paramName":"n", "paramLongName":"namenode", "paramDescription": "the hive metastore uris", "paramRequired": true}
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the sequencial file to write", "paramRequired": true},
{"paramName":"n", "paramLongName":"namenode", "paramDescription": "the hive metastore uris", "paramRequired": true},
{"paramName":"ts", "paramLongName":"timestamp", "paramDescription": "timestamp", "paramRequired": false}
]

View File

@ -1,24 +1,68 @@
package eu.dnetlib.doiboost;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.jayway.jsonpath.JsonPath;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.doiboost.crossref.Crossref2Oaf;
import eu.dnetlib.doiboost.crossref.SparkMapDumpIntoOAF;
import org.apache.commons.io.IOUtils;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.platform.commons.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
public class DoiBoostTest {
Logger logger = LoggerFactory.getLogger(DoiBoostTest.class);
@Test
public void test() throws Exception {
//SparkDownloadContentFromCrossref.main(null);
//CrossrefImporter.main("-n file:///tmp -t file:///tmp/p.seq -ts 1586110000749".split(" "));
SparkMapDumpIntoOAF.main("-m local[*] -s file:///data/doiboost/crossref_dump.seq".split(" "));
}
@Test
public void testConvertCrossRef2Oaf() throws IOException {
final String json = IOUtils.toString(getClass().getResourceAsStream("pc.json"));
ObjectMapper mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT);
Assertions.assertNotNull(json);
Assertions.assertFalse(StringUtils.isBlank(json));
Crossref2Oaf cf = new Crossref2Oaf();
final Result result = cf.convert(json, logger);
Assertions.assertNotNull(result);
logger.info(mapper.writeValueAsString(result));
}
@Test
public void testPath() throws Exception {
final String json = IOUtils.toString(getClass().getResourceAsStream("response.json"));
final List<String> res = JsonPath.read(json, "$.hits.hits[*]._source.blob");
final List<String > res = JsonPath.read(json, "$.hits.hits[*]._source.blob");
System.out.println(res.size());
}
}

View File

@ -0,0 +1,120 @@
{
"DOI": "10.1101/030080",
"issued": {
"date-parts": [
[
2015,
10,
28
]
]
},
"abstract": "<jats:p>Abstract Key Message<jats:italic>Agrobacterium tumefaciens</jats:italic>was used to transform radiata pine shoots and to efficiently produce stable genetically modified pine plants. Abstract Micropropagated shoot explants from<jats:italic>Pinus radiata</jats:italic>D. Don were used to produce stable transgenic plants by<jats:italic>Agrobacterium tumefaciens</jats:italic>mediated transformation. Using this method any genotype that can be micropropagated could produce stable transgenic lines. As over 80% of<jats:italic>P. radiata</jats:italic>genotypes tested can be micropropagated, this effectively means that any line chosen for superior characteristics could be transformed. There are well established protocols for progressing such germplasm to field deployment. Here we used open and control pollinated seed lines and embryogenic clones. The method developed was faster than other methods previously developed using mature cotyledons. PCR positive shoots could be obtain within 6 months of<jats:italic>Agrobacterium</jats:italic>cocultivation compared with 12 months for cotyledon methods. Transformed shoots were obtained using either kanamycin or geneticin as the selectable marker gene. Shoots were recovered from selection, were tested and were not chimeric, indicating that the selection pressure was optimal for this explant type. GFP was used as a vital marker, and the bar gene, (for resistance to the herbicide Buster\\u00ae/Basta\\u00ae) was used to produce lines that could potentially be used in commercial application. As expected, a range of expression phenotypes were identified for both these reporter genes and the analyses for expression were relatively easy.</jats:p>",
"prefix": "10.1101",
"author": [
{
"affiliation": [],
"given": "Jan E",
"family": "Grant",
"sequence": "first"
},
{
"affiliation": [],
"given": "Pauline A",
"family": "Cooper",
"sequence": "additional"
},
{
"affiliation": [],
"given": "Tracy M",
"family": "Dale",
"sequence": "additional"
}
],
"reference-count": 0,
"member": "246",
"source": "Crossref",
"score": 1.0,
"deposited": {
"timestamp": 1483495053000,
"date-time": "2017-01-04T01:57:33Z",
"date-parts": [
[
2017,
1,
4
]
]
},
"indexed": {
"timestamp": 1550234353119,
"date-time": "2019-02-15T12:39:13Z",
"date-parts": [
[
2019,
2,
15
]
]
},
"type": "posted-content",
"URL": "http://dx.doi.org/10.1101/030080",
"is-referenced-by-count": 2,
"link": [
{
"URL": "https://syndication.highwire.org/content/doi/10.1101/030080",
"intended-application": "similarity-checking",
"content-version": "vor",
"content-type": "unspecified"
}
],
"accepted": {
"date-parts": [
[
2015,
10,
28
]
]
},
"references-count": 0,
"institution": {
"acronym": [
"-"
],
"place": [
"-"
],
"name": "bioRxiv"
},
"posted": {
"date-parts": [
[
2015,
10,
28
]
]
},
"publisher": "Cold Spring Harbor Laboratory",
"content-domain": {
"domain": [],
"crossmark-restriction": false
},
"created": {
"timestamp": 1446095513000,
"date-time": "2015-10-29T05:11:53Z",
"date-parts": [
[
2015,
10,
29
]
]
},
"title": [
"Genetic transformation of micropropagated shoots ofPinus radiataD.Don"
],
"group-title": "Plant Biology",
"subtype": "preprint"
}

View File

@ -0,0 +1,10 @@
# Set root logger level to DEBUG and its only appender to A1.
log4j.rootLogger=INFO, A1
# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.logger.org = ERROR
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

View File

@ -36,14 +36,11 @@ public class DataciteClientIterator implements Iterator<String> {
final String body =getResponse(String.format("http://%s:9200/%s/_search?scroll=1m", esHost, esIndex), String.format("{\"size\":1000, \"query\":{\"range\":{\"timestamp\":{\"gte\":%d}}}}", timestamp));
scrollId= getJPathString(scrollIdPath, body);
buffer = getBlobs(body);
}
public String getResponse(final String url,final String json ) {
CloseableHttpClient client = HttpClients.createDefault();
try {
HttpPost httpPost = new HttpPost(url);
if (json!= null) {
StringEntity entity = new StringEntity(json);
@ -63,7 +60,6 @@ public class DataciteClientIterator implements Iterator<String> {
throw new RuntimeException("Unable to close client ",e);
}
}
}
private String getJPathString(final String jsonPath, final String json) {