forked from D-Net/dnet-hadoop
import openorgs
This commit is contained in:
parent
574870b09b
commit
334eb2b927
|
@ -85,10 +85,17 @@ public abstract class AbstractDbApplication extends AbstractMigrationApplication
|
|||
final String hdfsPath, final String dbUrl, final String dbUser, final String dbPassword,
|
||||
final String isLookupUrl)
|
||||
throws Exception {
|
||||
|
||||
super(hdfsPath);
|
||||
this.dbClient = new DbClient(dbUrl, dbUser, dbPassword);
|
||||
this.lastUpdateTimestamp = new Date().getTime();
|
||||
this.vocs = VocabularyGroup.loadVocsFromIS(ISLookupClientFactory.getLookUpService(isLookupUrl));
|
||||
this.lastUpdateTimestamp = new Date().getTime();
|
||||
}
|
||||
|
||||
protected AbstractDbApplication(final DbClient dbClient, final VocabularyGroup vocs) { // ONLY FOT TESTS
|
||||
super();
|
||||
this.dbClient = dbClient;
|
||||
this.vocs = vocs;
|
||||
this.lastUpdateTimestamp = new Date().getTime();
|
||||
}
|
||||
|
||||
public void execute(final String sqlFile, final Function<ResultSet, List<Oaf>> producer)
|
||||
|
@ -164,6 +171,7 @@ public abstract class AbstractDbApplication extends AbstractMigrationApplication
|
|||
}
|
||||
|
||||
public List<Oaf> processProject(final ResultSet rs) {
|
||||
|
||||
try {
|
||||
final DataInfo info = prepareDataInfo(rs);
|
||||
|
||||
|
@ -219,6 +227,7 @@ public abstract class AbstractDbApplication extends AbstractMigrationApplication
|
|||
}
|
||||
|
||||
public List<Oaf> processOrganization(final ResultSet rs) {
|
||||
|
||||
try {
|
||||
final DataInfo info = prepareDataInfo(rs);
|
||||
|
||||
|
@ -238,7 +247,7 @@ public abstract class AbstractDbApplication extends AbstractMigrationApplication
|
|||
o.setOaiprovenance(null); // Values not present in the DB
|
||||
o.setLegalshortname(field(rs.getString("legalshortname"), info));
|
||||
o.setLegalname(field(rs.getString("legalname"), info));
|
||||
o.setAlternativeNames(new ArrayList<>()); // Values not returned by the SQL query
|
||||
o.setAlternativeNames(prepareListFields(rs.getArray("alternativenames"), info));
|
||||
o.setWebsiteurl(field(rs.getString("websiteurl"), info));
|
||||
o.setLogourl(field(rs.getString("logourl"), info));
|
||||
o.setEclegalbody(field(Boolean.toString(rs.getBoolean("eclegalbody")), info));
|
||||
|
|
|
@ -6,15 +6,11 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.common.DbClient;
|
||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
||||
|
||||
public class MigrateDbEntitiesApplication extends AbstractDbApplication {
|
||||
|
||||
public MigrateDbEntitiesApplication(final String hdfsPath, final String dbUrl, final String dbUser,
|
||||
final String dbPassword, final String isLookupUrl)
|
||||
throws Exception {
|
||||
super(hdfsPath, dbUrl, dbUser, dbPassword, isLookupUrl);
|
||||
}
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MigrateDbEntitiesApplication.class);
|
||||
|
||||
public static final String SOURCE_TYPE = "source_type";
|
||||
|
@ -80,4 +76,14 @@ public class MigrateDbEntitiesApplication extends AbstractDbApplication {
|
|||
}
|
||||
}
|
||||
|
||||
public MigrateDbEntitiesApplication(final String hdfsPath, final String dbUrl, final String dbUser,
|
||||
final String dbPassword, final String isLookupUrl)
|
||||
throws Exception {
|
||||
super(hdfsPath, dbUrl, dbUser, dbPassword, isLookupUrl);
|
||||
}
|
||||
|
||||
protected MigrateDbEntitiesApplication(final DbClient dbClient, final VocabularyGroup vocs) { // ONLY FOT TESTS
|
||||
super(dbClient, vocs);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -9,12 +9,6 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|||
|
||||
public class MigrateOpenOrgsApplication extends AbstractDbApplication {
|
||||
|
||||
public MigrateOpenOrgsApplication(final String hdfsPath, final String dbUrl, final String dbUser,
|
||||
final String dbPassword, final String isLookupUrl)
|
||||
throws Exception {
|
||||
super(hdfsPath, dbUrl, dbUser, dbPassword, isLookupUrl);
|
||||
}
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MigrateOpenOrgsApplication.class);
|
||||
|
||||
public static final String SOURCE_TYPE = "source_type";
|
||||
|
@ -47,7 +41,7 @@ public class MigrateOpenOrgsApplication extends AbstractDbApplication {
|
|||
try (final MigrateOpenOrgsApplication mapper = new MigrateOpenOrgsApplication(hdfsPath, dbUrl, dbUser,
|
||||
dbPassword, isLookupUrl)) {
|
||||
|
||||
log.info("Processing orgs...");
|
||||
log.info("Processing open orgs...");
|
||||
mapper.execute("queryOrganizationsFromOpenOrgsDB.sql", mapper::processOrganization);
|
||||
|
||||
log.info("Processing simrels...");
|
||||
|
@ -55,6 +49,13 @@ public class MigrateOpenOrgsApplication extends AbstractDbApplication {
|
|||
|
||||
log.info("All done.");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public MigrateOpenOrgsApplication(final String hdfsPath, final String dbUrl, final String dbUser,
|
||||
final String dbPassword, final String isLookupUrl)
|
||||
throws Exception {
|
||||
super(hdfsPath, dbUrl, dbUser, dbPassword, isLookupUrl);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -25,6 +25,18 @@
|
|||
<property>
|
||||
<name>postgresPassword</name>
|
||||
<description>the password postgres</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>postgresOpenOrgsURL</name>
|
||||
<description>the postgres URL to access to the OpenOrgs database</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>postgresOpenOrgsUser</name>
|
||||
<description>the user of OpenOrgs database</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>postgresOpenOrgsPassword</name>
|
||||
<description>the password of OpenOrgs database</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>dbSchema</name>
|
||||
|
@ -116,8 +128,25 @@
|
|||
<fork name="start_import">
|
||||
<path start="ImportDB"/>
|
||||
<path start="ImportDB_claims"/>
|
||||
<path start="ImportDB_openorgs"/>
|
||||
</fork>
|
||||
|
||||
<action name="ImportDB_openorgs">
|
||||
<java>
|
||||
<prepare>
|
||||
<delete path="${contentPath}/db_openorgs"/>
|
||||
</prepare>
|
||||
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateOpenOrgsApplication</main-class>
|
||||
<arg>--hdfsPath</arg><arg>${contentPath}/db_openorgs</arg>
|
||||
<arg>--postgresUrl</arg><arg>${postgresOpenOrgsURL}</arg>
|
||||
<arg>--postgresUser</arg><arg>${postgresOpenOrgsUser}</arg>
|
||||
<arg>--postgresPassword</arg><arg>${postgresOpenOrgsPassword}</arg>
|
||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||
</java>
|
||||
<ok to="wait_import"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ImportDB_claims">
|
||||
<java>
|
||||
<prepare>
|
||||
|
@ -308,7 +337,7 @@
|
|||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--sourcePaths</arg><arg>${contentPath}/db_records,${contentPath}/oaf_records,${contentPath}/odf_records</arg>
|
||||
<arg>--sourcePaths</arg><arg>${contentPath}/db_openorgs,${contentPath}/db_records,${contentPath}/oaf_records,${contentPath}/odf_records</arg>
|
||||
<arg>--targetPath</arg><arg>${workingDir}/entities</arg>
|
||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||
</spark>
|
||||
|
|
|
@ -0,0 +1,102 @@
|
|||
<workflow-app name="import OpenOrgs" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>contentPath</name>
|
||||
<description>path location to store (or reuse) content from the aggregator</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>postgresOpenOrgsURL</name>
|
||||
<description>the postgres URL to access to the OpenOrgs database</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>postgresOpenOrgsUser</name>
|
||||
<description>the user of OpenOrgs database</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>postgresOpenOrgsPassword</name>
|
||||
<description>the password of OpenOrgs database</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>isLookupUrl</name>
|
||||
<description>the address of the lookUp service</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozieActionShareLibForSpark2</name>
|
||||
<description>oozie action sharelib for spark 2.*</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2ExtraListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
||||
<description>spark 2.* extra listeners classname</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2SqlQueryExecutionListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
||||
<description>spark 2.* sql query execution listeners classname</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2YarnHistoryServerAddress</name>
|
||||
<description>spark 2.* yarn history server address</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2EventLogDir</name>
|
||||
<description>spark 2.* event log dir location</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<global>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<configuration>
|
||||
<property>
|
||||
<name>mapreduce.job.queuename</name>
|
||||
<value>${queueName}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.launcher.mapred.job.queue.name</name>
|
||||
<value>${oozieLauncherQueueName}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>${oozieActionShareLibForSpark2}</value>
|
||||
</property>
|
||||
</configuration>
|
||||
</global>
|
||||
|
||||
<start to="ImportDB_openorgs"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="ImportDB_openorgs">
|
||||
<java>
|
||||
<prepare>
|
||||
<delete path="${contentPath}/db_openorgs"/>
|
||||
</prepare>
|
||||
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateOpenOrgsApplication</main-class>
|
||||
<arg>--hdfsPath</arg><arg>${contentPath}/db_openorgs</arg>
|
||||
<arg>--postgresUrl</arg><arg>${postgresOpenOrgsURL}</arg>
|
||||
<arg>--postgresUser</arg><arg>${postgresOpenOrgsUser}</arg>
|
||||
<arg>--postgresPassword</arg><arg>${postgresOpenOrgsPassword}</arg>
|
||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||
</java>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -1,108 +0,0 @@
|
|||
<workflow-app name="import regular entities as Graph (step 1)" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>migrationPathStep1</name>
|
||||
<description>the base path to store hdfs file</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>postgresURL</name>
|
||||
<description>the postgres URL to access to the database</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>postgresUser</name>
|
||||
<description>the user postgres</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>postgresPassword</name>
|
||||
<description>the password postgres</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>mongoURL</name>
|
||||
<description>mongoDB url, example: mongodb://[username:password@]host[:port]</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>mongoDb</name>
|
||||
<description>mongo database</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>isLookupUrl</name>
|
||||
<description>the address of the lookUp service</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="ResetWorkingPath"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="ResetWorkingPath">
|
||||
<fs>
|
||||
<delete path='${migrationPathStep1}'/>
|
||||
<mkdir path='${migrationPathStep1}'/>
|
||||
</fs>
|
||||
<ok to="ImportDB"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ImportDB">
|
||||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.dhp.migration.step1.MigrateDbEntitiesApplication</main-class>
|
||||
<arg>-p</arg><arg>${migrationPathStep1}/db_records</arg>
|
||||
<arg>-pgurl</arg><arg>${postgresURL}</arg>
|
||||
<arg>-pguser</arg><arg>${postgresUser}</arg>
|
||||
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
|
||||
<arg>-islookup</arg><arg>${isLookupUrl}</arg>
|
||||
</java>
|
||||
<ok to="ImportODF"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ImportODF">
|
||||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class>
|
||||
<arg>-p</arg><arg>${migrationPathStep1}/odf_records</arg>
|
||||
<arg>-mongourl</arg><arg>${mongoURL}</arg>
|
||||
<arg>-mongodb</arg><arg>${mongoDb}</arg>
|
||||
<arg>-f</arg><arg>ODF</arg>
|
||||
<arg>-l</arg><arg>store</arg>
|
||||
<arg>-i</arg><arg>cleaned</arg>
|
||||
</java>
|
||||
<ok to="ImportOAF"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ImportOAF">
|
||||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class>
|
||||
<arg>-p</arg><arg>${migrationPathStep1}/oaf_records</arg>
|
||||
<arg>-mongourl</arg><arg>${mongoURL}</arg>
|
||||
<arg>-mongodb</arg><arg>${mongoDb}</arg>
|
||||
<arg>-f</arg><arg>OAF</arg>
|
||||
<arg>-l</arg><arg>store</arg>
|
||||
<arg>-i</arg><arg>cleaned</arg>
|
||||
</java>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -1,18 +0,0 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -1,65 +0,0 @@
|
|||
<workflow-app name="import regular entities as Graph (step 2)" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>migrationPathStep1</name>
|
||||
<description>the base path to store hdfs file</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>migrationPathStep2</name>
|
||||
<description>the temporary path to store entities before dispatching</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>isLookupUrl</name>
|
||||
<description>the address of the lookUp service</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="ResetEntities"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="ResetEntities">
|
||||
<fs>
|
||||
<delete path='${migrationPathStep2}'/>
|
||||
<mkdir path='${migrationPathStep2}'/>
|
||||
</fs>
|
||||
<ok to="GenerateEntities"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="GenerateEntities">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>GenerateEntities</name>
|
||||
<class>eu.dnetlib.dhp.migration.step2.GenerateEntitiesApplication</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>-s</arg><arg>${migrationPathStep1}/db_records,${migrationPathStep1}/oaf_records,${migrationPathStep1}/odf_records</arg>
|
||||
<arg>-t</arg><arg>${migrationPathStep2}/all_entities</arg>
|
||||
<arg>--islookup</arg><arg>${isLookupUrl}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -1,18 +0,0 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -1,60 +0,0 @@
|
|||
<workflow-app name="import regular entities as Graph (step 3)" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
|
||||
<property>
|
||||
<name>migrationPathStep2</name>
|
||||
<description>the temporary path to store entities before dispatching</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>migrationPathStep3</name>
|
||||
<description>the graph Raw base path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="ResetGraph"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="ResetGraph">
|
||||
<fs>
|
||||
<delete path='${migrationPathStep3}'/>
|
||||
<mkdir path='${migrationPathStep3}'/>
|
||||
</fs>
|
||||
<ok to="GenerateGraph"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="GenerateGraph">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>GenerateGraph</name>
|
||||
<class>eu.dnetlib.dhp.migration.step3.DispatchEntitiesApplication</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>-s</arg><arg>${migrationPathStep2}/all_entities</arg>
|
||||
<arg>-g</arg><arg>${migrationPathStep3}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -2,6 +2,7 @@ SELECT
|
|||
o.id AS organizationid,
|
||||
o.legalshortname AS legalshortname,
|
||||
o.legalname AS legalname,
|
||||
ARRAY[]::text[] AS alternativenames,
|
||||
o.websiteurl AS websiteurl,
|
||||
o.logourl AS logourl,
|
||||
o.ec_legalbody AS eclegalbody,
|
||||
|
|
|
@ -2,12 +2,24 @@ SELECT
|
|||
o.id AS organizationid,
|
||||
coalesce((array_agg(a.acronym))[1], o.name) AS legalshortname,
|
||||
o.name AS legalname,
|
||||
array_agg(DISTINCT n.name) AS "alternativeNames",
|
||||
array_agg(DISTINCT n.name) AS alternativenames,
|
||||
(array_agg(u.url))[1] AS websiteurl,
|
||||
o.modification_date AS dateoftransformation,
|
||||
'' AS logourl,
|
||||
DATE(o.creation_date) AS dateofcollection,
|
||||
DATE(o.modification_date) AS dateoftransformation,
|
||||
false AS ecenterprise,
|
||||
false AS echighereducation,
|
||||
false AS ecinternationalorganization,
|
||||
false AS ecinternationalorganizationeurinterests,
|
||||
false AS eclegalbody,
|
||||
false AS eclegalperson,
|
||||
false AS ecnonprofit,
|
||||
false AS ecnutscode,
|
||||
false AS ecresearchorganization,
|
||||
false AS ecsmevalidated,
|
||||
false AS inferred,
|
||||
false AS deletedbyinference,
|
||||
0.95 AS trust,
|
||||
0.99 AS trust,
|
||||
'' AS inferenceprovenance,
|
||||
'openaire____::openorgs' AS collectedfromid,
|
||||
'OpenOrgs Database' AS collectedfromname,
|
||||
|
@ -31,9 +43,21 @@ SELECT
|
|||
'openorgsmesh'||substring(o.id, 13)||'-'||md5(n.name) AS organizationid,
|
||||
n.name AS legalshortname,
|
||||
n.name AS legalname,
|
||||
ARRAY[]::text[] AS "alternativeNames",
|
||||
ARRAY[]::text[] AS alternativenames,
|
||||
(array_agg(u.url))[1] AS websiteurl,
|
||||
o.modification_date AS dateoftransformation,
|
||||
'' AS logourl,
|
||||
DATE(o.creation_date) AS dateofcollection,
|
||||
DATE(o.modification_date) AS dateoftransformation,
|
||||
false AS ecenterprise,
|
||||
false AS echighereducation,
|
||||
false AS ecinternationalorganization,
|
||||
false AS ecinternationalorganizationeurinterests,
|
||||
false AS eclegalbody,
|
||||
false AS eclegalperson,
|
||||
false AS ecnonprofit,
|
||||
false AS ecnutscode,
|
||||
false AS ecresearchorganization,
|
||||
false AS ecsmevalidated,
|
||||
false AS inferred,
|
||||
false AS deletedbyinference,
|
||||
0.88 AS trust,
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
|
|||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.common.DbClient;
|
||||
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
|
||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
||||
|
@ -47,6 +48,9 @@ public class MigrateDbEntitiesApplicationTest {
|
|||
@Mock
|
||||
private VocabularyGroup vocs;
|
||||
|
||||
@Mock
|
||||
private DbClient dbClient;
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() {
|
||||
lenient()
|
||||
|
@ -59,7 +63,7 @@ public class MigrateDbEntitiesApplicationTest {
|
|||
|
||||
lenient().when(vocs.termExists(anyString(), anyString())).thenReturn(true);
|
||||
|
||||
this.app = new MigrateDbEntitiesApplication(vocs);
|
||||
this.app = new MigrateDbEntitiesApplication(dbClient, vocs);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue