params to choose sql queries for beta or production

This commit is contained in:
Michele Artini 2020-06-25 09:28:13 +02:00
parent 202f6e62ff
commit 77d2a1b1c4
14 changed files with 79 additions and 96 deletions

View File

@ -45,6 +45,7 @@ public abstract class UpdateMatcher<T> {
final Topic topic = getTopicFunction().apply(hl); final Topic topic = getTopicFunction().apply(hl);
final UpdateInfo<T> info = new UpdateInfo<>(topic, hl, source, res, getCompileHighlightFunction(), final UpdateInfo<T> info = new UpdateInfo<>(topic, hl, source, res, getCompileHighlightFunction(),
getHighlightToStringFunction(), dedupConfig); getHighlightToStringFunction(), dedupConfig);
final String s = DigestUtils.md5Hex(info.getHighlightValueAsString()); final String s = DigestUtils.md5Hex(info.getHighlightValueAsString());
if (!infoMap.containsKey(s) || infoMap.get(s).getTrust() < info.getTrust()) { if (!infoMap.containsKey(s) || infoMap.get(s).getTrust() < info.getTrust()) {
} else { } else {

View File

@ -16,7 +16,7 @@ public abstract class AbstractEnrichMissingDataset extends UpdateMatcher<OaBroke
super(true, super(true,
rel -> topic, rel -> topic,
(p, rel) -> p.getDatasets().add(rel), (p, rel) -> p.getDatasets().add(rel),
rel -> rel.getOriginalId()); rel -> rel.getOpenaireId());
} }
protected abstract boolean filterByType(String relType); protected abstract boolean filterByType(String relType);
@ -29,14 +29,14 @@ public abstract class AbstractEnrichMissingDataset extends UpdateMatcher<OaBroke
.getDatasets() .getDatasets()
.stream() .stream()
.filter(rel -> filterByType(rel.getRelType())) .filter(rel -> filterByType(rel.getRelType()))
.map(OaBrokerRelatedDataset::getOriginalId) .map(OaBrokerRelatedDataset::getOpenaireId)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
return source return source
.getDatasets() .getDatasets()
.stream() .stream()
.filter(rel -> filterByType(rel.getRelType())) .filter(rel -> filterByType(rel.getRelType()))
.filter(d -> !existingDatasets.contains(d.getOriginalId())) .filter(d -> !existingDatasets.contains(d.getOpenaireId()))
.collect(Collectors.toList()); .collect(Collectors.toList());
} }

View File

@ -16,7 +16,7 @@ public abstract class AbstractEnrichMissingPublication extends UpdateMatcher<OaB
super(true, super(true,
rel -> topic, rel -> topic,
(p, rel) -> p.getPublications().add(rel), (p, rel) -> p.getPublications().add(rel),
rel -> rel.getOriginalId()); rel -> rel.getOpenaireId());
} }
@ -31,14 +31,14 @@ public abstract class AbstractEnrichMissingPublication extends UpdateMatcher<OaB
.getPublications() .getPublications()
.stream() .stream()
.filter(rel -> filterByType(rel.getRelType())) .filter(rel -> filterByType(rel.getRelType()))
.map(OaBrokerRelatedPublication::getOriginalId) .map(OaBrokerRelatedPublication::getOpenaireId)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
return source return source
.getPublications() .getPublications()
.stream() .stream()
.filter(rel -> filterByType(rel.getRelType())) .filter(rel -> filterByType(rel.getRelType()))
.filter(p -> !existingPublications.contains(p.getOriginalId())) .filter(p -> !existingPublications.contains(p.getOpenaireId()))
.collect(Collectors.toList()); .collect(Collectors.toList());
} }

View File

@ -16,7 +16,7 @@ public class EnrichMissingSoftware
super(true, super(true,
s -> Topic.ENRICH_MISSING_SOFTWARE, s -> Topic.ENRICH_MISSING_SOFTWARE,
(p, s) -> p.getSoftwares().add(s), (p, s) -> p.getSoftwares().add(s),
s -> s.getName()); s -> s.getOpenaireId());
} }
@Override @Override

View File

@ -16,7 +16,7 @@ public class EnrichMoreSoftware extends UpdateMatcher<OaBrokerRelatedSoftware> {
super(true, super(true,
s -> Topic.ENRICH_MORE_SOFTWARE, s -> Topic.ENRICH_MORE_SOFTWARE,
(p, s) -> p.getSoftwares().add(s), (p, s) -> p.getSoftwares().add(s),
s -> s.getName()); s -> s.getOpenaireId());
} }
@Override @Override

View File

@ -17,6 +17,8 @@ public class BrokerConstants {
public static final float MIN_TRUST = 0.25f; public static final float MIN_TRUST = 0.25f;
public static final float MAX_TRUST = 1.00f; public static final float MAX_TRUST = 1.00f;
public static final int MAX_NUMBER_OF_RELS = 20;
public static Class<?>[] getModelClasses() { public static Class<?>[] getModelClasses() {
final Set<Class<?>> list = new HashSet<>(); final Set<Class<?>> list = new HashSet<>();
list.addAll(Arrays.asList(ModelSupport.getOafModelClasses())); list.addAll(Arrays.asList(ModelSupport.getOafModelClasses()));

View File

@ -7,6 +7,7 @@ import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator; import org.apache.spark.sql.expressions.Aggregator;
import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
import scala.Tuple2; import scala.Tuple2;
public class RelatedDatasetAggregator public class RelatedDatasetAggregator
@ -30,7 +31,7 @@ public class RelatedDatasetAggregator
@Override @Override
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2<OaBrokerMainEntity, RelatedDataset> t) { public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2<OaBrokerMainEntity, RelatedDataset> t) {
final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1; final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1;
if (t._2 != null) { if (t._2 != null && res.getDatasets().size() < BrokerConstants.MAX_NUMBER_OF_RELS) {
res.getDatasets().add(t._2.getRelDataset()); res.getDatasets().add(t._2.getRelDataset());
} }
return res; return res;
@ -40,7 +41,14 @@ public class RelatedDatasetAggregator
@Override @Override
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) { public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
if (StringUtils.isNotBlank(g1.getOpenaireId())) { if (StringUtils.isNotBlank(g1.getOpenaireId())) {
g1.getDatasets().addAll(g2.getDatasets()); final int availables = BrokerConstants.MAX_NUMBER_OF_RELS - g1.getDatasets().size();
if (availables > 0) {
if (g2.getDatasets().size() <= availables) {
g1.getDatasets().addAll(g2.getDatasets());
} else {
g1.getDatasets().addAll(g2.getDatasets().subList(0, availables));
}
}
return g1; return g1;
} else { } else {
return g2; return g2;

View File

@ -7,6 +7,7 @@ import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator; import org.apache.spark.sql.expressions.Aggregator;
import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
import scala.Tuple2; import scala.Tuple2;
public class RelatedProjectAggregator public class RelatedProjectAggregator
@ -30,7 +31,7 @@ public class RelatedProjectAggregator
@Override @Override
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2<OaBrokerMainEntity, RelatedProject> t) { public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2<OaBrokerMainEntity, RelatedProject> t) {
final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1; final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1;
if (t._2 != null) { if (t._2 != null && res.getProjects().size() < BrokerConstants.MAX_NUMBER_OF_RELS) {
res.getProjects().add(t._2.getRelProject()); res.getProjects().add(t._2.getRelProject());
} }
return res; return res;
@ -40,7 +41,14 @@ public class RelatedProjectAggregator
@Override @Override
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) { public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
if (StringUtils.isNotBlank(g1.getOpenaireId())) { if (StringUtils.isNotBlank(g1.getOpenaireId())) {
g1.getProjects().addAll(g2.getProjects()); final int availables = BrokerConstants.MAX_NUMBER_OF_RELS - g1.getProjects().size();
if (availables > 0) {
if (g2.getProjects().size() <= availables) {
g1.getProjects().addAll(g2.getProjects());
} else {
g1.getProjects().addAll(g2.getProjects().subList(0, availables));
}
}
return g1; return g1;
} else { } else {
return g2; return g2;

View File

@ -7,6 +7,7 @@ import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator; import org.apache.spark.sql.expressions.Aggregator;
import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
import scala.Tuple2; import scala.Tuple2;
public class RelatedPublicationAggregator public class RelatedPublicationAggregator
@ -31,7 +32,7 @@ public class RelatedPublicationAggregator
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, public OaBrokerMainEntity reduce(final OaBrokerMainEntity g,
final Tuple2<OaBrokerMainEntity, RelatedPublication> t) { final Tuple2<OaBrokerMainEntity, RelatedPublication> t) {
final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1; final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1;
if (t._2 != null) { if (t._2 != null && res.getPublications().size() < BrokerConstants.MAX_NUMBER_OF_RELS) {
res.getPublications().add(t._2.getRelPublication()); res.getPublications().add(t._2.getRelPublication());
} }
return res; return res;
@ -41,8 +42,16 @@ public class RelatedPublicationAggregator
@Override @Override
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) { public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
if (StringUtils.isNotBlank(g1.getOpenaireId())) { if (StringUtils.isNotBlank(g1.getOpenaireId())) {
g1.getPublications().addAll(g2.getPublications()); final int availables = BrokerConstants.MAX_NUMBER_OF_RELS - g1.getPublications().size();
if (availables > 0) {
if (g2.getPublications().size() <= availables) {
g1.getPublications().addAll(g2.getPublications());
} else {
g1.getPublications().addAll(g2.getPublications().subList(0, availables));
}
}
return g1; return g1;
} else { } else {
return g2; return g2;
} }

View File

@ -7,6 +7,7 @@ import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator; import org.apache.spark.sql.expressions.Aggregator;
import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
import scala.Tuple2; import scala.Tuple2;
public class RelatedSoftwareAggregator public class RelatedSoftwareAggregator
@ -30,7 +31,7 @@ public class RelatedSoftwareAggregator
@Override @Override
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2<OaBrokerMainEntity, RelatedSoftware> t) { public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2<OaBrokerMainEntity, RelatedSoftware> t) {
final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1; final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1;
if (t._2 != null) { if (t._2 != null && res.getSoftwares().size() < BrokerConstants.MAX_NUMBER_OF_RELS) {
res.getSoftwares().add(t._2.getRelSoftware()); res.getSoftwares().add(t._2.getRelSoftware());
} }
return res; return res;
@ -40,7 +41,14 @@ public class RelatedSoftwareAggregator
@Override @Override
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) { public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
if (StringUtils.isNotBlank(g1.getOpenaireId())) { if (StringUtils.isNotBlank(g1.getOpenaireId())) {
g1.getSoftwares().addAll(g2.getSoftwares()); final int availables = BrokerConstants.MAX_NUMBER_OF_RELS - g1.getSoftwares().size();
if (availables > 0) {
if (g2.getSoftwares().size() <= availables) {
g1.getSoftwares().addAll(g2.getSoftwares());
} else {
g1.getSoftwares().addAll(g2.getSoftwares().subList(0, availables));
}
}
return g1; return g1;
} else { } else {
return g2; return g2;

View File

@ -73,19 +73,19 @@
</configuration> </configuration>
</global> </global>
<start to="join_entities_step1"/> <start to="generate_events"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="join_entities_step1"> <action name="generate_events">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>JoinStep1</name> <name>GenerateEventsJob</name>
<class>eu.dnetlib.dhp.broker.oa.JoinStep1Job</class> <class>eu.dnetlib.dhp.broker.oa.GenerateEventsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar> <jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
@ -97,80 +97,9 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="join_entities_step2"/>
<error to="Kill"/>
</action>
<action name="join_entities_step2">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>JoinStep2</name>
<class>eu.dnetlib.dhp.broker.oa.JoinStep2Job</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="join_entities_step3"/>
<error to="Kill"/>
</action>
<action name="join_entities_step3">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>JoinStep3</name>
<class>eu.dnetlib.dhp.broker.oa.JoinStep3Job</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="join_entities_step4"/>
<error to="Kill"/>
</action>
<action name="join_entities_step4">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>JoinStep4</name>
<class>eu.dnetlib.dhp.broker.oa.JoinStep4Job</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--dedupConfProfile</arg><arg>${dedupConfProfId}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -50,8 +50,6 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.DbClient; import eu.dnetlib.dhp.common.DbClient;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
@ -106,6 +104,9 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
final String dbPassword = parser.get("postgresPassword"); final String dbPassword = parser.get("postgresPassword");
log.info("postgresPassword: xxx"); log.info("postgresPassword: xxx");
final String dbSchema = parser.get("dbschema");
log.info("dbSchema {}: " + dbSchema);
final String isLookupUrl = parser.get("isLookupUrl"); final String isLookupUrl = parser.get("isLookupUrl");
log.info("isLookupUrl: {}", isLookupUrl); log.info("isLookupUrl: {}", isLookupUrl);
@ -125,7 +126,11 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
smdbe.execute("queryDatasources.sql", smdbe::processDatasource); smdbe.execute("queryDatasources.sql", smdbe::processDatasource);
log.info("Processing projects..."); log.info("Processing projects...");
smdbe.execute("queryProjects.sql", smdbe::processProject); if (dbSchema.equalsIgnoreCase("beta")) {
smdbe.execute("queryProjects.sql", smdbe::processProject);
} else {
smdbe.execute("queryProjects_production.sql", smdbe::processProject);
}
log.info("Processing orgs..."); log.info("Processing orgs...");
smdbe.execute("queryOrganizations.sql", smdbe::processOrganization); smdbe.execute("queryOrganizations.sql", smdbe::processOrganization);

View File

@ -34,5 +34,11 @@
"paramLongName": "isLookupUrl", "paramLongName": "isLookupUrl",
"paramDescription": "the url of the ISLookupService", "paramDescription": "the url of the ISLookupService",
"paramRequired": true "paramRequired": true
},
{
"paramName": "dbschema",
"paramLongName": "dbschema",
"paramDescription": "the database schema according to the D-Net infrastructure (beta or production)",
"paramRequired": true
} }
] ]

View File

@ -25,6 +25,11 @@
<property> <property>
<name>postgresPassword</name> <name>postgresPassword</name>
<description>the password postgres</description> <description>the password postgres</description>
</property>
<property>
<name>dbSchema</name>
<description>the database schema according to the D-Net infrastructure (beta or production)</description>
<value>beta</value>
</property> </property>
<property> <property>
<name>mongoURL</name> <name>mongoURL</name>
@ -125,6 +130,7 @@
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg> <arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--action</arg><arg>claims</arg> <arg>--action</arg><arg>claims</arg>
<arg>--dbschema</arg><arg>${dbSchema}</arg>
</java> </java>
<ok to="ImportODF_claims"/> <ok to="ImportODF_claims"/>
<error to="Kill"/> <error to="Kill"/>
@ -175,6 +181,7 @@
<arg>--postgresUser</arg><arg>${postgresUser}</arg> <arg>--postgresUser</arg><arg>${postgresUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg> <arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--dbschema</arg><arg>${dbSchema}</arg>
</java> </java>
<ok to="ImportODF"/> <ok to="ImportODF"/>
<error to="Kill"/> <error to="Kill"/>