minot changes

This commit is contained in:
Michele Artini 2020-06-24 09:24:45 +02:00
parent 8b9933b934
commit e53dd62e87
5 changed files with 21 additions and 13 deletions

View File

@ -53,7 +53,7 @@ public class GenerateEventsJob {
final String dedupConfigProfileId = parser.get("dedupConfProfile"); final String dedupConfigProfileId = parser.get("dedupConfProfile");
log.info("dedupConfigProfileId: {}", dedupConfigProfileId); log.info("dedupConfigProfileId: {}", dedupConfigProfileId);
final String eventsPath = workingPath + "/eventsPath"; final String eventsPath = workingPath + "/events";
log.info("eventsPath: {}", eventsPath); log.info("eventsPath: {}", eventsPath);
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();

View File

@ -29,15 +29,17 @@ public class RelatedDatasetAggregator
@Override @Override
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2<OaBrokerMainEntity, RelatedDataset> t) { public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2<OaBrokerMainEntity, RelatedDataset> t) {
final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOriginalId()) ? g : t._1; final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1;
res.getDatasets().add(t._2.getRelDataset()); if (t._2 != null) {
res.getDatasets().add(t._2.getRelDataset());
}
return res; return res;
} }
@Override @Override
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) { public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
if (StringUtils.isNotBlank(g1.getOriginalId())) { if (StringUtils.isNotBlank(g1.getOpenaireId())) {
g1.getDatasets().addAll(g2.getDatasets()); g1.getDatasets().addAll(g2.getDatasets());
return g1; return g1;
} else { } else {

View File

@ -29,15 +29,17 @@ public class RelatedProjectAggregator
@Override @Override
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2<OaBrokerMainEntity, RelatedProject> t) { public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2<OaBrokerMainEntity, RelatedProject> t) {
final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOriginalId()) ? g : t._1; final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1;
res.getProjects().add(t._2.getRelProject()); if (t._2 != null) {
res.getProjects().add(t._2.getRelProject());
}
return res; return res;
} }
@Override @Override
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) { public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
if (StringUtils.isNotBlank(g1.getOriginalId())) { if (StringUtils.isNotBlank(g1.getOpenaireId())) {
g1.getProjects().addAll(g2.getProjects()); g1.getProjects().addAll(g2.getProjects());
return g1; return g1;
} else { } else {

View File

@ -30,15 +30,17 @@ public class RelatedPublicationAggregator
@Override @Override
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, public OaBrokerMainEntity reduce(final OaBrokerMainEntity g,
final Tuple2<OaBrokerMainEntity, RelatedPublication> t) { final Tuple2<OaBrokerMainEntity, RelatedPublication> t) {
final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOriginalId()) ? g : t._1; final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1;
res.getPublications().add(t._2.getRelPublication()); if (t._2 != null) {
res.getPublications().add(t._2.getRelPublication());
}
return res; return res;
} }
@Override @Override
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) { public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
if (StringUtils.isNotBlank(g1.getOriginalId())) { if (StringUtils.isNotBlank(g1.getOpenaireId())) {
g1.getPublications().addAll(g2.getPublications()); g1.getPublications().addAll(g2.getPublications());
return g1; return g1;
} else { } else {

View File

@ -29,15 +29,17 @@ public class RelatedSoftwareAggregator
@Override @Override
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2<OaBrokerMainEntity, RelatedSoftware> t) { public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2<OaBrokerMainEntity, RelatedSoftware> t) {
final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOriginalId()) ? g : t._1; final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1;
res.getSoftwares().add(t._2.getRelSoftware()); if (t._2 != null) {
res.getSoftwares().add(t._2.getRelSoftware());
}
return res; return res;
} }
@Override @Override
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) { public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
if (StringUtils.isNotBlank(g1.getOriginalId())) { if (StringUtils.isNotBlank(g1.getOpenaireId())) {
g1.getSoftwares().addAll(g2.getSoftwares()); g1.getSoftwares().addAll(g2.getSoftwares());
return g1; return g1;
} else { } else {