WIP: Affiliation matching #474

Draft
schatz wants to merge 5 commits from affiliation-matching into beta
4 changed files with 288 additions and 0 deletions

View File

@ -6,7 +6,18 @@
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<version>1.2.5-SNAPSHOT</version> <version>1.2.5-SNAPSHOT</version>
</parent> </parent>
<artifactId>dhp-aggregation</artifactId> <artifactId>dhp-aggregation</artifactId>
<properties>
<affro.release.version>1.0.0</affro.release.version>
</properties>
<scm>
<url>https://code-repo.d4science.org/mkallipo/affRo</url>
<connection>scm:git:https://code-repo.d4science.org/mkallipo/affRo.git</connection>
</scm>
<build> <build>
<plugins> <plugins>
<plugin> <plugin>
@ -43,6 +54,32 @@
<scalaVersion>${scala.version}</scalaVersion> <scalaVersion>${scala.version}</scalaVersion>
</configuration> </configuration>
</plugin> </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-scm-plugin</artifactId>
<version>1.8.1</version>
<configuration>
<connectionType>connection</connectionType>
<!--
<scmVersionType>tag</scmVersionType>--><!-- 'branch' can also be provided here -->
<!-- <scmVersion>${affro.release.version}</scmVersion>--><!-- in case of scmVersionType == 'branch', this field points to the branch name -->
<scmVersionType>branch</scmVersionType><!-- 'branch' can also be provided here -->
<scmVersion>openaire-workflow-ready</scmVersion><!-- in case of scmVersionType == 'branch', this field points to the branch name -->
<checkoutDirectory>${project.build.directory}/${oozie.package.file.name}/${oozieAppDir}/affRo</checkoutDirectory>
</configuration>
<executions>
<execution>
<id>checkout-affro</id>
<phase>prepare-package</phase>
<goals>
<goal>checkout</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins> </plugins>
</build> </build>

View File

@ -0,0 +1,45 @@
# --- You can override the following properties (if needed) coming from your ~/.dhp/application.properties ---
# dhp.hadoop.frontend.temp.dir=/home/ilias.kanellos
# dhp.hadoop.frontend.user.name=ilias.kanellos
# dhp.hadoop.frontend.host.name=iis-cdh5-test-gw.ocean.icm.edu.pl
# dhp.hadoop.frontend.port.ssh=22
# oozieServiceLoc=http://iis-cdh5-test-m3:11000/oozie
# jobTracker=yarnRM
# nameNode=hdfs://nameservice1
# oozie.execution.log.file.location = target/extract-and-run-on-remote-host.log
# maven.executable=mvn
# The above is given differently in an example I found online
oozie.action.sharelib.for.spark=spark2
oozieActionShareLibForSpark2=spark2
spark2YarnHistoryServerAddress=http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089
spark2EventLogDir=/user/spark/spark2ApplicationHistory
sparkSqlWarehouseDir=/user/hive/warehouse
#hiveMetastoreUris=thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083
# This MAY avoid the no library used error
oozie.use.system.libpath=true
# Some stuff copied from openaire's jobs
spark2ExtraListeners=com.cloudera.spark.lineage.NavigatorAppListener
spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListener
# The following is needed as a property of a workflow
wfAppPath=${oozieTopWfApplicationPath}
resumeFrom=Crossref
#OpenAlex input/output
#resultFolder=/tmp/affro-results/oalex
#inputFolder=/user/zeppelin/affiliations/raw_aff_string/2024-08
#Crossref input/output
resultFolder=/tmp/affro-results/crossref
inputFolder=/data/doiboost/crossref/crossref_unpack
#
#crossrefInputPath=/data/bip-affiliations/crossref-data.json
#pubmedInputPath=/data/bip-affiliations/pubmed-data.json
#openapcInputPath=/data/bip-affiliations/openapc-data.json
#dataciteInputPath=/data/bip-affiliations/datacite-data.json
#
#outputPath=/tmp/crossref-affiliations-output-v5

View File

@ -0,0 +1,30 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property>
<property>
<name>hiveDbName</name>
<value>openaire</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,176 @@
<workflow-app name="AffroAffiliations" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="resumeFrom"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<decision name="resumeFrom">
<switch>
<case to="run-affro-on-iisdata">${wf:conf('resumeFrom') eq 'IIS'}</case>
<case to="run-affro-on-crossref">${wf:conf('resumeFrom') eq 'Crossref'}</case>
<default to="run-affro-on-oalexstrings"/>
</switch>
</decision>
<action name="run-affro-on-iisdata">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Affiliations inference (Affro)</name>
<jar>update_records.py</jar>
<spark-opts>
--executor-cores=4
--executor-memory=6G
--driver-memory=15G
--conf spark.executor.memoryOverhead=6G
--conf spark.sql.shuffle.partitions=20000
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3
--conf spark.executorEnv.PYSPARK_PYTHON=python3
--py-files ${wfAppPath}/affRo/affro_cluster.py,${wfAppPath}/affRo/affro_test_example.py,${wfAppPath}/affRo/create_input_cluster.py,${wfAppPath}/affRo/functions_cluster.py,${wfAppPath}/affRo/matching_cluster.py
--files ${wfAppPath}/affRo/dictionaries/dix_acad.json,${wfAppPath}/affRo/dictionaries/dix_categ.json,${wfAppPath}/affRo/dictionaries/dix_city.json,${wfAppPath}/affRo/dictionaries/dix_country.json,${wfAppPath}/affRo/dictionaries/dix_mult.json,${wfAppPath}/affRo/txt_files/city_names.txt,${wfAppPath}/affRo/txt_files/remove_list.txt,${wfAppPath}/affRo/txt_files/stop_words.txt,${wfAppPath}/affRo/txt_files/university_terms.txt
</spark-opts>
<arg>${resultFolder}</arg>
<file>${wfAppPath}/affRo/update_records.py#update_records.py</file>
</spark>
<ok to="End" />
<error to="Kill" />
</action>
<action name="run-affro-on-oalexstrings">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Affiliations inference (Affro)</name>
<jar>strings.py</jar>
<spark-opts>
--executor-cores=4
--executor-memory=6G
--driver-memory=15G
--conf spark.executor.memoryOverhead=6G
--conf spark.sql.shuffle.partitions=20000
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3
--conf spark.executorEnv.PYSPARK_PYTHON=python3
--py-files ${wfAppPath}/affRo/affro_cluster.py,${wfAppPath}/affRo/create_input_cluster.py,${wfAppPath}/affRo/functions_cluster.py,${wfAppPath}/affRo/matching_cluster.py
--files ${wfAppPath}/affRo/dictionaries/dix_acad.json,${wfAppPath}/affRo/dictionaries/dix_categ.json,${wfAppPath}/affRo/dictionaries/dix_city.json,${wfAppPath}/affRo/dictionaries/dix_country.json,${wfAppPath}/affRo/dictionaries/dix_mult.json,${wfAppPath}/affRo/dictionaries/dix_status.json,${wfAppPath}/affRo/txt_files/city_names.txt,${wfAppPath}/affRo/txt_files/remove_list.txt,${wfAppPath}/affRo/txt_files/stop_words.txt,${wfAppPath}/affRo/txt_files/university_terms.txt
</spark-opts>
<arg>${inputFolder}</arg>
<arg>${resultFolder}</arg>
<file>${wfAppPath}/affRo/strings.py#strings.py</file>
</spark>
<ok to="End" />
<error to="Kill" />
</action>
<action name="run-affro-on-crossref">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Affiliations inference (Affro)</name>
<jar>crossref.py</jar>
<spark-opts>
--executor-cores=4
--executor-memory=6G
--driver-memory=15G
--conf spark.executor.memoryOverhead=6G
--conf spark.sql.shuffle.partitions=20000
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3
--conf spark.executorEnv.PYSPARK_PYTHON=python3
--py-files ${wfAppPath}/affRo/affro_cluster.py,${wfAppPath}/affRo/create_input_cluster.py,${wfAppPath}/affRo/functions_cluster.py,${wfAppPath}/affRo/matching_cluster.py
--files ${wfAppPath}/affRo/dictionaries/dix_acad.json,${wfAppPath}/affRo/dictionaries/dix_categ.json,${wfAppPath}/affRo/dictionaries/dix_city.json,${wfAppPath}/affRo/dictionaries/dix_country.json,${wfAppPath}/affRo/dictionaries/dix_mult.json,${wfAppPath}/affRo/dictionaries/dix_status.json,${wfAppPath}/affRo/txt_files/city_names.txt,${wfAppPath}/affRo/txt_files/remove_list.txt,${wfAppPath}/affRo/txt_files/stop_words.txt,${wfAppPath}/affRo/txt_files/university_terms.txt
</spark-opts>
<arg>${inputFolder}</arg>
<arg>${resultFolder}</arg>
<file>${wfAppPath}/affRo/crossref.py#crossref.py</file>
</spark>
<ok to="End" />
<error to="Kill" />
</action>
<end name="End"/>
</workflow-app>