implemented import crossref job

This commit is contained in:
Sandro La Bruzzo 2020-04-01 14:12:33 +02:00
parent 36236dd1c1
commit 205e9521c6
14 changed files with 912 additions and 372 deletions

View File

@ -75,10 +75,10 @@ public class SparkCreateDedupTest {
final HashFunction hashFunction = Hashing.murmur3_128(); final HashFunction hashFunction = Hashing.murmur3_128();
System.out.println( s1.hashCode()); // System.out.println( s1.hashCode());
System.out.println(hashFunction.hashUnencodedChars(s1).asLong()); // System.out.println(hashFunction.hashUnencodedChars(s1).asLong());
System.out.println( s2.hashCode()); // System.out.println( s2.hashCode());
System.out.println(hashFunction.hashUnencodedChars(s2).asLong()); // System.out.println(hashFunction.hashUnencodedChars(s2).asLong());
} }

View File

@ -0,0 +1,53 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId>
<version>1.1.6-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dhp-doiboost</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.3.4</version>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-common</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-transports-http</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-schemas</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,65 @@
package eu.dnetlib.doiboost;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.http.HttpHost;
public class CrossrefImporter {
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(CrossrefImporter.class.getResourceAsStream("/eu/dnetlib/dhp/doiboost/import_from_es.json")));
parser.parseArgument(args);
System.out.println(parser.get("targetPath"));
final String hdfsuri = parser.get("namenode");
System.out.println(hdfsuri);
Path hdfswritepath = new Path(parser.get("targetPath"));
// ====== Init HDFS File System Object
Configuration conf = new Configuration();
// Set FileSystem URI
conf.set("fs.defaultFS", hdfsuri);
// Because of Maven
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
ESClient client = new ESClient("ip-90-147-167-25.ct1.garrservices.it", "crossref");
try (SequenceFile.Writer writer = SequenceFile.createWriter(conf,
SequenceFile.Writer.file(hdfswritepath), SequenceFile.Writer.keyClass(IntWritable.class),
SequenceFile.Writer.valueClass(Text.class))) {
int i = 0;
long start= System.currentTimeMillis();
long end = 0;
final IntWritable key = new IntWritable(i);
final Text value = new Text();
while (client.hasNext()) {
key.set(i++);
value.set(client.next());
writer.append(key, value);
if (i % 100000 == 0) {
end = System.currentTimeMillis();
final float time = (end - start) / 1000;
System.out.println(String.format("Imported %d records last 100000 imported in %f seconds", i, time));
start = System.currentTimeMillis();
}
}
}
}
}

View File

@ -0,0 +1,103 @@
package eu.dnetlib.doiboost;
import com.jayway.jsonpath.JsonPath;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
public class ESClient implements Iterator<String> {
final static String blobPath = "$.hits[*].hits[*]._source.blob";
final static String scrollIdPath = "$._scroll_id";
String scrollId;
List<String> buffer;
final String esHost;
final String esIndex;
public ESClient(final String esHost, final String esIndex) throws IOException {
this.esHost = esHost;
this.esIndex = esIndex;
final String body =getResponse(String.format("http://%s:9200/%s/_search?scroll=1m", esHost, esIndex), "{\"size\":1000}");
scrollId= getJPathString(scrollIdPath, body);
buffer = getBlobs(body);
}
private String getResponse(final String url,final String json ) {
CloseableHttpClient client = HttpClients.createDefault();
try {
HttpPost httpPost = new HttpPost(url);
if (json!= null) {
StringEntity entity = new StringEntity(json);
httpPost.setEntity(entity);
httpPost.setHeader("Accept", "application/json");
httpPost.setHeader("Content-type", "application/json");
}
CloseableHttpResponse response = client.execute(httpPost);
return IOUtils.toString(response.getEntity().getContent());
} catch (Throwable e) {
throw new RuntimeException("Error on executing request ",e);
} finally {
try {
client.close();
} catch (IOException e) {
throw new RuntimeException("Unable to close client ",e);
}
}
}
private String getJPathString(final String jsonPath, final String json) {
try {
Object o = JsonPath.read(json, jsonPath);
if (o instanceof String)
return (String) o;
return null;
} catch (Exception e) {
return "";
}
}
private List<String> getBlobs(final String body) {
final List<String > res = JsonPath.read(body, "$.hits.hits[*]._source.blob");
return res;
}
@Override
public boolean hasNext() {
return (buffer!= null && !buffer.isEmpty());
}
@Override
public String next() {
final String nextItem = buffer.remove(0);
if (buffer.isEmpty()) {
final String json_param = String.format("{\"scroll_id\":\"%s\",\"scroll\" : \"1m\"}", scrollId);
final String body =getResponse(String.format("http://%s:9200/_search/scroll", esHost), json_param);
try {
buffer = getBlobs(body);
} catch (Throwable e) {
System.out.println(body);
}
}
return nextItem;
}
}

View File

@ -0,0 +1,16 @@
package eu.dnetlib.doiboost
case class Journal(
JournalId: Long,
Rank: Int,
NormalizedName: String,
DisplayName: String,
Issn: String,
Publisher: String,
Webpage: String,
PaperCount: Long,
CitationCount: Long,
CreatedDate: String
)

View File

@ -0,0 +1,49 @@
package eu.dnetlib.doiboost
//import org.apache.spark.SparkConf
//import org.apache.spark.sql.{Dataset, Encoders, Row, SparkSession}
//
//object SparkDownloadContentFromCrossref {
//
//
// def main(args: Array[String]): Unit = {
//
//
// val conf: SparkConf = new SparkConf().setAppName("DownloadContentFromCrossref").setMaster("local[*]")
//
// val spark = SparkSession.builder().config(conf).getOrCreate()
//
//
// val sc = spark.sparkContext
// import spark.implicits._
// spark.read.option("header", "false")
// .option("delimiter", "\t")
// .csv("/Users/sandro/Downloads/doiboost/mag_Journals.txt.gz")
//
//
// val d = spark.read.option("header", "false")
// .option("delimiter", "\t")
// .csv("/Users/sandro/Downloads/doiboost/mag_Journals.txt.gz")
// .map(f =>
// Journal( f.getAs[String](0).toLong, f.getAs[String](1).toInt, f.getAs[String](2),
// f.getAs[String](3), f.getAs[String](4), f.getAs[String](5), f.getAs[String](6),
// f.getAs[String](7).toLong, f.getAs[String](8).toLong, f.getAs[String](9)
// ))
//
// d.show()
//
// d.printSchema()
//
//
//
//
//
//
//
//
// }
//
//
//}
//

View File

@ -0,0 +1,18 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.action.sharelib.for.java</name>
<value>spark2</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,39 @@
<workflow-app name="import Crossref from index into HDFS" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>workingPath</name>
<description>the working dir base path</description>
</property>
</parameters>
<start to="ResetWorkingPath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ResetWorkingPath">
<fs>
<delete path='${workingPath}'/>
<mkdir path='${workingPath}/input/crossref'/>
</fs>
<ok to="ImportCrossRef"/>
<error to="Kill"/>
</action>
<action name="ImportCrossRef">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.doiboost.CrossrefImporter</main-class>
<arg>-t</arg><arg>${workingPath}/input/crossref/index_dump</arg>
<arg>-n</arg><arg>${nameNode}</arg>
</java>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,4 @@
[
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the sequencial file to write", "paramRequired": true},
{"paramName":"n", "paramLongName":"namenode", "paramDescription": "the hive metastore uris", "paramRequired": true}
]

View File

@ -0,0 +1,61 @@
package eu.dnetlib.doiboost;
import com.jayway.jsonpath.JsonPath;
import org.apache.commons.io.IOUtils;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
public class DoiBoostTest {
@Test
@Ignore
public void test() throws Exception {
//SparkDownloadContentFromCrossref.main(null);
CrossrefImporter.main(new String[]{
"-n","file:///tmp",
"-t","file:///tmp/p.seq",
});
}
@Test
public void testPath() throws Exception {
final String json = IOUtils.toString(getClass().getResourceAsStream("response.json"));
final List<String > res = JsonPath.read(json, "$.hits.hits[*]._source.blob");
System.out.println(res.size());
}
@Test
@Ignore
public void testParseResponse() throws IOException {
long end, start = System.currentTimeMillis();
ESClient client = new ESClient("ip-90-147-167-25.ct1.garrservices.it", "crossref");
int i = 0;
while (client.hasNext()) {
Assert.assertNotNull(client.next());
i++;
if(i % 1000 == 0) {
end = System.currentTimeMillis();
System.out.println("Vel 1000 records in "+((end -start)/1000)+"s");
start = System.currentTimeMillis();
}
if (i >1000000)
break;
}
}
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -19,6 +19,7 @@
<module>dhp-graph-mapper</module> <module>dhp-graph-mapper</module>
<module>dhp-dedup</module> <module>dhp-dedup</module>
<module>dhp-graph-provision</module> <module>dhp-graph-provision</module>
<module>dhp-doiboost</module>
</modules> </modules>
<pluginRepositories> <pluginRepositories>