forked from D-Net/dnet-hadoop
Merge remote-tracking branch 'origin/stable_ids' into stable_id_scholexplorer
This commit is contained in:
commit
3100166d29
|
@ -0,0 +1,63 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.doiboost.crossref;
|
||||||
|
|
||||||
|
import java.io.BufferedOutputStream;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.zip.GZIPOutputStream;
|
||||||
|
|
||||||
|
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
|
||||||
|
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
|
||||||
|
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.mortbay.log.Log;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
|
||||||
|
public class ExtractCrossrefRecords {
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
ExtractCrossrefRecords.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/doiboost/crossref_dump_reader.json")));
|
||||||
|
parser.parseArgument(args);
|
||||||
|
final String hdfsServerUri = parser.get("hdfsServerUri");
|
||||||
|
final String workingPath = parser.get("workingPath");
|
||||||
|
final String outputPath = parser.get("outputPath");
|
||||||
|
final String crossrefFileNameTarGz = parser.get("crossrefFileNameTarGz");
|
||||||
|
|
||||||
|
Path hdfsreadpath = new Path(hdfsServerUri.concat(crossrefFileNameTarGz));
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set("fs.defaultFS", hdfsServerUri.concat(workingPath));
|
||||||
|
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
|
||||||
|
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
|
||||||
|
FileSystem fs = FileSystem.get(URI.create(hdfsServerUri.concat(workingPath)), conf);
|
||||||
|
FSDataInputStream crossrefFileStream = fs.open(hdfsreadpath);
|
||||||
|
try (TarArchiveInputStream tais = new TarArchiveInputStream(
|
||||||
|
new GzipCompressorInputStream(crossrefFileStream))) {
|
||||||
|
TarArchiveEntry entry = null;
|
||||||
|
while ((entry = tais.getNextTarEntry()) != null) {
|
||||||
|
if (!entry.isDirectory()) {
|
||||||
|
try (
|
||||||
|
FSDataOutputStream out = fs
|
||||||
|
.create(new Path(outputPath.concat(entry.getName()).concat(".gz")));
|
||||||
|
GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) {
|
||||||
|
|
||||||
|
IOUtils.copy(tais, gzipOs);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Log.info("Crossref dump reading completed");
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,71 @@
|
||||||
|
package eu.dnetlib.doiboost.crossref
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||||
|
import org.apache.spark.{SparkConf, SparkContext}
|
||||||
|
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
|
||||||
|
import org.json4s
|
||||||
|
import org.json4s.DefaultFormats
|
||||||
|
import org.json4s.JsonAST.JArray
|
||||||
|
import org.json4s.jackson.JsonMethods.{compact, parse, render}
|
||||||
|
import org.slf4j.{Logger, LoggerFactory}
|
||||||
|
|
||||||
|
import scala.io.Source
|
||||||
|
|
||||||
|
object GenerateCrossrefDataset {
|
||||||
|
|
||||||
|
val log: Logger = LoggerFactory.getLogger(GenerateCrossrefDataset.getClass)
|
||||||
|
|
||||||
|
implicit val mrEncoder: Encoder[CrossrefDT] = Encoders.kryo[CrossrefDT]
|
||||||
|
|
||||||
|
def extractDump(input:String):List[String] = {
|
||||||
|
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
||||||
|
lazy val json: json4s.JValue = parse(input)
|
||||||
|
|
||||||
|
val a = (json \ "items").extract[JArray]
|
||||||
|
a.arr.map(s => compact(render(s)))
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def crossrefElement(meta: String): CrossrefDT = {
|
||||||
|
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
||||||
|
lazy val json: json4s.JValue = parse(meta)
|
||||||
|
val doi:String = (json \ "DOI").extract[String]
|
||||||
|
val timestamp: Long = (json \ "indexed" \ "timestamp").extract[Long]
|
||||||
|
CrossrefDT(doi, meta, timestamp)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
def main(args: Array[String]): Unit = {
|
||||||
|
val conf = new SparkConf
|
||||||
|
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/crossref_dump_reader/generate_dataset_params.json")).mkString)
|
||||||
|
parser.parseArgument(args)
|
||||||
|
val master = parser.get("master")
|
||||||
|
val sourcePath = parser.get("sourcePath")
|
||||||
|
val targetPath = parser.get("targetPath")
|
||||||
|
|
||||||
|
val spark: SparkSession = SparkSession.builder().config(conf)
|
||||||
|
.appName(GenerateCrossrefDataset.getClass.getSimpleName)
|
||||||
|
.master(master)
|
||||||
|
.getOrCreate()
|
||||||
|
val sc: SparkContext = spark.sparkContext
|
||||||
|
|
||||||
|
import spark.implicits._
|
||||||
|
|
||||||
|
|
||||||
|
def extractDump(input:String):List[String] = {
|
||||||
|
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
||||||
|
lazy val json: json4s.JValue = parse(input)
|
||||||
|
|
||||||
|
val a = (json \ "items").extract[JArray]
|
||||||
|
a.arr.map(s => compact(render(s)))
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
sc.wholeTextFiles(sourcePath,6000).flatMap(d =>extractDump(d._2))
|
||||||
|
.map(meta => crossrefElement(meta))
|
||||||
|
.toDS()//.as[CrossrefDT]
|
||||||
|
.write.mode(SaveMode.Overwrite).save(targetPath)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,8 @@
|
||||||
|
[
|
||||||
|
{"paramName":"n", "paramLongName":"hdfsServerUri", "paramDescription": "the server uri", "paramRequired": true},
|
||||||
|
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the default work path", "paramRequired": true},
|
||||||
|
{"paramName":"f", "paramLongName":"crossrefFileNameTarGz", "paramDescription": "the name of the activities orcid file", "paramRequired": true},
|
||||||
|
{"paramName":"issm", "paramLongName":"isSparkSessionManaged", "paramDescription": "the name of the activities orcid file", "paramRequired": false},
|
||||||
|
{"paramName":"o", "paramLongName":"outputPath", "paramDescription": "the name of the activities orcid file", "paramRequired": true}
|
||||||
|
|
||||||
|
]
|
|
@ -0,0 +1,21 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"paramName": "s",
|
||||||
|
"paramLongName": "sourcePath",
|
||||||
|
"paramDescription": "the source mdstore path",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
|
||||||
|
{
|
||||||
|
"paramName": "t",
|
||||||
|
"paramLongName": "targetPath",
|
||||||
|
"paramDescription": "the target mdstore path",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "m",
|
||||||
|
"paramLongName": "master",
|
||||||
|
"paramDescription": "the master name",
|
||||||
|
"paramRequired": true
|
||||||
|
}
|
||||||
|
]
|
|
@ -0,0 +1,42 @@
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>jobTracker</name>
|
||||||
|
<value>yarnRM</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>nameNode</name>
|
||||||
|
<value>hdfs://nameservice1</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.use.system.libpath</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.action.sharelib.for.spark</name>
|
||||||
|
<value>spark2</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.launcher.mapreduce.user.classpath.first</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hive_metastore_uris</name>
|
||||||
|
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2YarnHistoryServerAddress</name>
|
||||||
|
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2EventLogDir</name>
|
||||||
|
<value>/user/spark/spark2ApplicationHistory</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2ExtraListeners</name>
|
||||||
|
<value>"com.cloudera.spark.lineage.NavigatorAppListener"</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2SqlQueryExecutionListeners</name>
|
||||||
|
<value>"com.cloudera.spark.lineage.NavigatorQueryListener"</value>
|
||||||
|
</property>
|
||||||
|
</configuration>
|
|
@ -0,0 +1,92 @@
|
||||||
|
<workflow-app name="read Crossref dump from HDFS" xmlns="uri:oozie:workflow:0.5">
|
||||||
|
<parameters>
|
||||||
|
<property>
|
||||||
|
<name>crossrefDumpPath</name>
|
||||||
|
<description>the working dir base path</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>inputPathCrossref</name>
|
||||||
|
<description>the working dir base path</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkDriverMemory</name>
|
||||||
|
<description>memory for driver process</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkExecutorMemory</name>
|
||||||
|
<description>memory for individual executor</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkExecutorCores</name>
|
||||||
|
<value>2</value>
|
||||||
|
<description>number of cores used by single executor</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
</parameters>
|
||||||
|
|
||||||
|
<start to="ImportCrossRef"/>
|
||||||
|
|
||||||
|
<kill name="Kill">
|
||||||
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
|
</kill>
|
||||||
|
|
||||||
|
<action name="ImportCrossRef">
|
||||||
|
<java>
|
||||||
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
|
<name-node>${nameNode}</name-node>
|
||||||
|
<main-class>eu.dnetlib.doiboost.crossref.ExtractCrossrefRecords</main-class>
|
||||||
|
<arg>--hdfsServerUri</arg><arg>${nameNode}</arg>
|
||||||
|
<arg>--crossrefFileNameTarGz</arg><arg>${crossrefDumpPath}/crossref.tar.gz</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${crossrefDumpPath}</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${workingDir}/files/</arg>
|
||||||
|
</java>
|
||||||
|
<ok to="generateCrossrefDataset"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="generateCrossrefDataset">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn-cluster</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>SparkGenerateCrossrefDataset</name>
|
||||||
|
<class>eu.dnetlib.doiboost.crossref.GenerateCrossrefDataset</class>
|
||||||
|
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||||
|
<arg>--sourcePath</arg><arg>${workingDir}/files</arg>
|
||||||
|
<arg>--targetPath</arg><arg>${inputPathCrossref}/crossref_ds_updated</arg>
|
||||||
|
|
||||||
|
</spark>
|
||||||
|
<ok to="removeFiles"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="removeFiles">
|
||||||
|
<fs>
|
||||||
|
<delete path="${workingDir}/files"/>
|
||||||
|
</fs>
|
||||||
|
<ok to="renameDataset"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="renameDataset">
|
||||||
|
<fs>
|
||||||
|
<delete path="${inputPathCrossref}/crossref_ds"/>
|
||||||
|
<move source="${inputPathCrossref}/crossref_ds_updated"
|
||||||
|
target="${inputPathCrossref}/crossref_ds"/>
|
||||||
|
</fs>
|
||||||
|
<ok to="End"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<end name="End"/>
|
||||||
|
</workflow-app>
|
|
@ -41,17 +41,21 @@
|
||||||
<description>the Crossref input path</description>
|
<description>the Crossref input path</description>
|
||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>crossrefTimestamp</name>
|
<name>crossrefDumpPath</name>
|
||||||
<description>Timestamp for the Crossref incremental Harvesting</description>
|
<description>the Crossref dump path</description>
|
||||||
</property>
|
|
||||||
<property>
|
|
||||||
<name>esServer</name>
|
|
||||||
<description>elasticsearch server url for the Crossref Harvesting</description>
|
|
||||||
</property>
|
|
||||||
<property>
|
|
||||||
<name>esIndex</name>
|
|
||||||
<description>elasticsearch index name for the Crossref Harvesting</description>
|
|
||||||
</property>
|
</property>
|
||||||
|
<!-- <property>-->
|
||||||
|
<!-- <name>crossrefTimestamp</name>-->
|
||||||
|
<!-- <description>Timestamp for the Crossref incremental Harvesting</description>-->
|
||||||
|
<!-- </property>-->
|
||||||
|
<!-- <property>-->
|
||||||
|
<!-- <name>esServer</name>-->
|
||||||
|
<!-- <description>elasticsearch server url for the Crossref Harvesting</description>-->
|
||||||
|
<!-- </property>-->
|
||||||
|
<!-- <property>-->
|
||||||
|
<!-- <name>esIndex</name>-->
|
||||||
|
<!-- <description>elasticsearch index name for the Crossref Harvesting</description>-->
|
||||||
|
<!-- </property>-->
|
||||||
|
|
||||||
<!-- MAG Parameters -->
|
<!-- MAG Parameters -->
|
||||||
<property>
|
<property>
|
||||||
|
@ -106,6 +110,7 @@
|
||||||
<case to="ProcessORCID">${wf:conf('resumeFrom') eq 'PreprocessORCID'}</case>
|
<case to="ProcessORCID">${wf:conf('resumeFrom') eq 'PreprocessORCID'}</case>
|
||||||
<case to="CreateDOIBoost">${wf:conf('resumeFrom') eq 'CreateDOIBoost'}</case>
|
<case to="CreateDOIBoost">${wf:conf('resumeFrom') eq 'CreateDOIBoost'}</case>
|
||||||
<case to="GenerateActionSet">${wf:conf('resumeFrom') eq 'GenerateActionSet'}</case>
|
<case to="GenerateActionSet">${wf:conf('resumeFrom') eq 'GenerateActionSet'}</case>
|
||||||
|
<case to="GenerateCrossrefDataset">${wf:conf('resumeFrom') eq 'GenerateCrossrefDataset'}</case>
|
||||||
<default to="ImportCrossRef"/>
|
<default to="ImportCrossRef"/>
|
||||||
</switch>
|
</switch>
|
||||||
</decision>
|
</decision>
|
||||||
|
@ -114,55 +119,104 @@
|
||||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
</kill>
|
</kill>
|
||||||
|
|
||||||
|
<!-- <action name="ImportCrossRef">-->
|
||||||
|
<!-- <java>-->
|
||||||
|
<!-- <main-class>eu.dnetlib.doiboost.crossref.CrossrefImporter</main-class>-->
|
||||||
|
<!-- <arg>--targetPath</arg><arg>${inputPathCrossref}/index_update</arg>-->
|
||||||
|
<!-- <arg>--namenode</arg><arg>${nameNode}</arg>-->
|
||||||
|
<!-- <arg>--esServer</arg><arg>${esServer}</arg>-->
|
||||||
|
<!-- <arg>--esIndex</arg><arg>${esIndex}</arg>-->
|
||||||
|
<!-- <arg>--timestamp</arg><arg>${crossrefTimestamp}</arg>-->
|
||||||
|
<!-- </java>-->
|
||||||
|
<!-- <ok to="GenerateCrossrefDataset"/>-->
|
||||||
|
<!-- <error to="Kill"/>-->
|
||||||
|
<!-- </action>-->
|
||||||
|
|
||||||
<action name="ImportCrossRef">
|
<action name="ImportCrossRef">
|
||||||
<java>
|
<java>
|
||||||
<main-class>eu.dnetlib.doiboost.crossref.CrossrefImporter</main-class>
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
<arg>--targetPath</arg><arg>${inputPathCrossref}/index_update</arg>
|
<name-node>${nameNode}</name-node>
|
||||||
<arg>--namenode</arg><arg>${nameNode}</arg>
|
<main-class>eu.dnetlib.doiboost.crossref.ExtractCrossrefRecords</main-class>
|
||||||
<arg>--esServer</arg><arg>${esServer}</arg>
|
<arg>--hdfsServerUri</arg><arg>${nameNode}</arg>
|
||||||
<arg>--esIndex</arg><arg>${esIndex}</arg>
|
<arg>--crossrefFileNameTarGz</arg><arg>${crossrefDumpPath}/crossref.tar.gz</arg>
|
||||||
<arg>--timestamp</arg><arg>${crossrefTimestamp}</arg>
|
<arg>--workingPath</arg><arg>${crossrefDumpPath}</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${crossrefDumpPath}/files/</arg>
|
||||||
</java>
|
</java>
|
||||||
<ok to="GenerateCrossrefDataset"/>
|
<ok to="GenerateCrossrefDataset"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
|
<action name="GenerateCrossrefDataset">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn-cluster</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>SparkGenerateCrossrefDataset</name>
|
||||||
|
<class>eu.dnetlib.doiboost.crossref.GenerateCrossrefDataset</class>
|
||||||
|
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--executor-cores=2
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||||
|
<arg>--sourcePath</arg><arg>${crossrefDumpPath}/files/</arg>
|
||||||
|
<arg>--targetPath</arg><arg>${inputPathCrossref}/crossref_ds</arg>
|
||||||
|
|
||||||
|
</spark>
|
||||||
|
<ok to="removeFiles"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="removeFiles">
|
||||||
|
<fs>
|
||||||
|
<delete path="${crossrefDumpPath}/files"/>
|
||||||
|
</fs>
|
||||||
|
<ok to="ResetMagWorkingPath"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
<!-- CROSSREF SECTION -->
|
<!-- CROSSREF SECTION -->
|
||||||
|
|
||||||
<action name="GenerateCrossrefDataset">
|
<!-- <action name="GenerateCrossrefDataset">-->
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<!-- <spark xmlns="uri:oozie:spark-action:0.2">-->
|
||||||
<master>yarn-cluster</master>
|
<!-- <master>yarn-cluster</master>-->
|
||||||
<mode>cluster</mode>
|
<!-- <mode>cluster</mode>-->
|
||||||
<name>GenerateCrossrefDataset</name>
|
<!-- <name>GenerateCrossrefDataset</name>-->
|
||||||
<class>eu.dnetlib.doiboost.crossref.CrossrefDataset</class>
|
<!-- <class>eu.dnetlib.doiboost.crossref.CrossrefDataset</class>-->
|
||||||
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
<!-- <jar>dhp-doiboost-${projectVersion}.jar</jar>-->
|
||||||
<spark-opts>
|
<!-- <spark-opts>-->
|
||||||
--executor-memory=${sparkExecutorMemory}
|
<!-- --executor-memory=${sparkExecutorMemory}-->
|
||||||
--executor-cores=${sparkExecutorCores}
|
<!-- --executor-cores=${sparkExecutorCores}-->
|
||||||
--driver-memory=${sparkDriverMemory}
|
<!-- --driver-memory=${sparkDriverMemory}-->
|
||||||
--conf spark.sql.shuffle.partitions=3840
|
<!-- --conf spark.sql.shuffle.partitions=3840-->
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
<!-- --conf spark.extraListeners=${spark2ExtraListeners}-->
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
<!-- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}-->
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
<!-- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}-->
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
<!-- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}-->
|
||||||
</spark-opts>
|
<!-- </spark-opts>-->
|
||||||
<arg>--workingPath</arg><arg>${inputPathCrossref}</arg>
|
<!-- <arg>--workingPath</arg><arg>${inputPathCrossref}</arg>-->
|
||||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
<!-- <arg>--master</arg><arg>yarn-cluster</arg>-->
|
||||||
</spark>
|
<!-- </spark>-->
|
||||||
<ok to="RenameDataset"/>
|
<!-- <ok to="RenameDataset"/>-->
|
||||||
<error to="Kill"/>
|
<!-- <error to="Kill"/>-->
|
||||||
</action>
|
<!-- </action>-->
|
||||||
|
|
||||||
<action name="RenameDataset">
|
<!-- <action name="RenameDataset">-->
|
||||||
<fs>
|
<!-- <fs>-->
|
||||||
<delete path="${inputPathCrossref}/crossref_ds"/>
|
<!-- <delete path="${inputPathCrossref}/crossref_ds"/>-->
|
||||||
<move source="${inputPathCrossref}/crossref_ds_updated"
|
<!-- <move source="${inputPathCrossref}/crossref_ds_updated"-->
|
||||||
target="${inputPathCrossref}/crossref_ds"/>
|
<!-- target="${inputPathCrossref}/crossref_ds"/>-->
|
||||||
</fs>
|
<!-- </fs>-->
|
||||||
<ok to="ResetMagWorkingPath"/>
|
<!-- <ok to="ResetMagWorkingPath"/>-->
|
||||||
<error to="Kill"/>
|
<!-- <error to="Kill"/>-->
|
||||||
</action>
|
<!-- </action>-->
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -94,14 +94,22 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication
|
||||||
.filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration()))
|
.filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration()))
|
||||||
.toArray(size -> new String[size]);
|
.toArray(size -> new String[size]);
|
||||||
|
|
||||||
spark
|
if (validPaths.length > 0) {
|
||||||
.read()
|
spark
|
||||||
.parquet(validPaths)
|
.read()
|
||||||
.map((MapFunction<Row, String>) r -> enrichRecord(r), Encoders.STRING())
|
.parquet(validPaths)
|
||||||
.toJavaRDD()
|
.map((MapFunction<Row, String>) r -> enrichRecord(r), Encoders.STRING())
|
||||||
.mapToPair(xml -> new Tuple2<>(new Text(UUID.randomUUID() + ":" + type), new Text(xml)))
|
.toJavaRDD()
|
||||||
// .coalesce(1)
|
.mapToPair(xml -> new Tuple2<>(new Text(UUID.randomUUID() + ":" + type), new Text(xml)))
|
||||||
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
// .coalesce(1)
|
||||||
|
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
||||||
|
} else {
|
||||||
|
spark
|
||||||
|
.emptyDataFrame()
|
||||||
|
.toJavaRDD()
|
||||||
|
.mapToPair(xml -> new Tuple2<>(new Text(), new Text()))
|
||||||
|
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String enrichRecord(final Row r) {
|
private static String enrichRecord(final Row r) {
|
||||||
|
|
|
@ -41,7 +41,8 @@ SELECT p.id,
|
||||||
CASE WHEN prr2.id IS NULL THEN 0 ELSE prr2.daysForlastPub END AS daysforlastpub,
|
CASE WHEN prr2.id IS NULL THEN 0 ELSE prr2.daysForlastPub END AS daysforlastpub,
|
||||||
CASE WHEN prr2.id IS NULL THEN 0 ELSE prr2.dp END AS delayedpubs,
|
CASE WHEN prr2.id IS NULL THEN 0 ELSE prr2.dp END AS delayedpubs,
|
||||||
p.callidentifier,
|
p.callidentifier,
|
||||||
p.code
|
p.code,
|
||||||
|
p.totalcost
|
||||||
FROM ${stats_db_name}.project_tmp p
|
FROM ${stats_db_name}.project_tmp p
|
||||||
LEFT JOIN (SELECT pr.id, count(distinct pr.result) AS np
|
LEFT JOIN (SELECT pr.id, count(distinct pr.result) AS np
|
||||||
FROM ${stats_db_name}.project_results pr
|
FROM ${stats_db_name}.project_results pr
|
||||||
|
|
|
@ -30,10 +30,21 @@ from rcount
|
||||||
group by rcount.pid;
|
group by rcount.pid;
|
||||||
|
|
||||||
create view ${stats_db_name}.rndexpenditure as select * from stats_ext.rndexpediture;
|
create view ${stats_db_name}.rndexpenditure as select * from stats_ext.rndexpediture;
|
||||||
--
|
|
||||||
-- ANALYZE TABLE ${stats_db_name}.result_projectcount COMPUTE STATISTICS;
|
create table ${stats_db_name}.result_instance stored as parquet as
|
||||||
-- ANALYZE TABLE ${stats_db_name}.result_projectcount COMPUTE STATISTICS FOR COLUMNS;
|
select distinct r.*
|
||||||
-- ANALYZE TABLE ${stats_db_name}.result_fundercount COMPUTE STATISTICS;
|
from (
|
||||||
-- ANALYZE TABLE ${stats_db_name}.result_fundercount COMPUTE STATISTICS FOR COLUMNS;
|
select substr(r.id, 4) as id, inst.accessright.classname as accessright, substr(inst.collectedfrom.key, 4) as collectedfrom,
|
||||||
-- ANALYZE TABLE ${stats_db_name}.project_resultcount COMPUTE STATISTICS;
|
substr(inst.hostedby.key, 4) as hostedby, inst.dateofacceptance.value as dateofacceptance, inst.license.value as license, p.qualifier.classname as pidtype, p.value as pid
|
||||||
-- ANALYZE TABLE ${stats_db_name}.project_resultcount COMPUTE STATISTICS FOR COLUMNS;
|
from ${openaire_db_name}.result r lateral view explode(r.instance) instances as inst lateral view explode(inst.pid) pids as p) r
|
||||||
|
join ${stats_db_name}.result res on res.id=r.id;
|
||||||
|
|
||||||
|
create table ${stats_db_name}.result_apc as
|
||||||
|
select r.id, r.amount, r.currency
|
||||||
|
from (
|
||||||
|
select substr(r.id, 4) as id, inst.processingchargeamount.value as amount, inst.processingchargecurrency.value as currency
|
||||||
|
from ${openaire_db_name}.result r lateral view explode(r.instance) instances as inst) r
|
||||||
|
join ${stats_db_name}.result res on res.id=r.id
|
||||||
|
where r.amount is not null;
|
||||||
|
|
||||||
|
create view ${stats_db_name}.issn_gold_oa_dataset as select * from stats_ext.issn_gold_oa_dataset;
|
|
@ -16,7 +16,13 @@ create table TARGET.result as
|
||||||
select distinct * from (
|
select distinct * from (
|
||||||
select * from SOURCE.result r where exists (select 1 from SOURCE.result_projects rp join SOURCE.project p on rp.project=p.id where rp.id=r.id)
|
select * from SOURCE.result r where exists (select 1 from SOURCE.result_projects rp join SOURCE.project p on rp.project=p.id where rp.id=r.id)
|
||||||
union all
|
union all
|
||||||
select * from SOURCE.result r where exists (select 1 from SOURCE.result_concepts rc where rc.id=r.id) ) foo;
|
select * from SOURCE.result r where exists (select 1 from SOURCE.result_concepts rc where rc.id=r.id)
|
||||||
|
union all
|
||||||
|
select * from SOURCE.result r where exists (select 1 from SOURCE.result_project rp join SOURCE.project p on p.id=rp.project join SOURCE.project_organizations po on po.id=p.id join SOURCE.organization o on o.id=po.organization where ro.id=r.id and o.name in (
|
||||||
|
'GEORG-AUGUST-UNIVERSITAT GOTTINGEN STIFTUNG OFFENTLICHEN RECHTS',
|
||||||
|
'ATHINA-EREVNITIKO KENTRO KAINOTOMIAS STIS TECHNOLOGIES TIS PLIROFORIAS, TON EPIKOINONION KAI TIS GNOSIS',
|
||||||
|
'Consiglio Nazionale delle Ricerche',
|
||||||
|
'Universidade do Minho') )) foo;
|
||||||
compute stats TARGET.result;
|
compute stats TARGET.result;
|
||||||
|
|
||||||
create table TARGET.result_citations as select * from SOURCE.result_citations orig where exists (select 1 from TARGET.result r where r.id=orig.id);
|
create table TARGET.result_citations as select * from SOURCE.result_citations orig where exists (select 1 from TARGET.result r where r.id=orig.id);
|
||||||
|
|
|
@ -39,7 +39,8 @@ CREATE TABLE ${stats_db_name}.project_tmp
|
||||||
daysforlastpub INT,
|
daysforlastpub INT,
|
||||||
delayedpubs INT,
|
delayedpubs INT,
|
||||||
callidentifier STRING,
|
callidentifier STRING,
|
||||||
code STRING
|
code STRING,
|
||||||
|
totalcost FLOAT
|
||||||
) CLUSTERED BY (id) INTO 100 buckets stored AS orc tblproperties ('transactional' = 'true');
|
) CLUSTERED BY (id) INTO 100 buckets stored AS orc tblproperties ('transactional' = 'true');
|
||||||
|
|
||||||
INSERT INTO ${stats_db_name}.project_tmp
|
INSERT INTO ${stats_db_name}.project_tmp
|
||||||
|
@ -62,7 +63,8 @@ SELECT substr(p.id, 4) AS id,
|
||||||
0 AS daysforlastpub,
|
0 AS daysforlastpub,
|
||||||
0 AS delayedpubs,
|
0 AS delayedpubs,
|
||||||
p.callidentifier.value AS callidentifier,
|
p.callidentifier.value AS callidentifier,
|
||||||
p.code.value AS code
|
p.code.value AS code,
|
||||||
|
p.totalcost AS totalcost
|
||||||
FROM ${openaire_db_name}.project p
|
FROM ${openaire_db_name}.project p
|
||||||
WHERE p.datainfo.deletedbyinference = false;
|
WHERE p.datainfo.deletedbyinference = false;
|
||||||
|
|
||||||
|
@ -70,15 +72,4 @@ create table ${stats_db_name}.funder as
|
||||||
select distinct xpath_string(fund, '//funder/id') as id,
|
select distinct xpath_string(fund, '//funder/id') as id,
|
||||||
xpath_string(fund, '//funder/name') as name,
|
xpath_string(fund, '//funder/name') as name,
|
||||||
xpath_string(fund, '//funder/shortname') as shortname
|
xpath_string(fund, '//funder/shortname') as shortname
|
||||||
from ${openaire_db_name}.project p lateral view explode(p.fundingtree.value) fundingtree as fund;
|
from ${openaire_db_name}.project p lateral view explode(p.fundingtree.value) fundingtree as fund;
|
||||||
|
|
||||||
-- ANALYZE TABLE ${stats_db_name}.project_oids COMPUTE STATISTICS;
|
|
||||||
-- ANALYZE TABLE ${stats_db_name}.project_oids COMPUTE STATISTICS FOR COLUMNS;
|
|
||||||
-- ANALYZE TABLE ${stats_db_name}.project_organizations COMPUTE STATISTICS;
|
|
||||||
-- ANALYZE TABLE ${stats_db_name}.project_organizations COMPUTE STATISTICS FOR COLUMNS;
|
|
||||||
-- ANALYZE TABLE ${stats_db_name}.project_results COMPUTE STATISTICS;
|
|
||||||
-- ANALYZE TABLE ${stats_db_name}.project_results COMPUTE STATISTICS FOR COLUMNS;
|
|
||||||
-- ANALYZE TABLE ${stats_db_name}.project_tmp COMPUTE STATISTICS;
|
|
||||||
-- ANALYZE TABLE ${stats_db_name}.project_tmp COMPUTE STATISTICS FOR COLUMNS;
|
|
||||||
-- ANALYZE TABLE ${stats_db_name}.funder COMPUTE STATISTICS;
|
|
||||||
-- ANALYZE TABLE ${stats_db_name}.funder COMPUTE STATISTICS FOR COLUMNS;
|
|
Loading…
Reference in New Issue