Skip to content

Commit 9290950

Browse files
committed
fix notify
1 parent bc62346 commit 9290950

4 files changed

Lines changed: 503 additions & 423 deletions

File tree

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package tech.artcoded.websitev2.changelogs;
2+
3+
import io.mongock.api.annotations.ChangeUnit;
4+
import io.mongock.api.annotations.Execution;
5+
import io.mongock.api.annotations.RollbackExecution;
6+
import lombok.extern.slf4j.Slf4j;
7+
import org.springframework.data.mongodb.core.MongoTemplate;
8+
import org.springframework.data.mongodb.core.query.Query;
9+
import org.springframework.data.mongodb.core.query.Update;
10+
11+
import java.io.IOException;
12+
import java.util.Date;
13+
14+
import static org.springframework.data.mongodb.core.query.Criteria.where;
15+
16+
@ChangeUnit(id = "fix-notify-date-migration", order = "61", author = "Nordine Bittich")
17+
@Slf4j
18+
public class CHANGE_LOG_61_FixNotify {
19+
20+
@RollbackExecution
21+
public void rollbackExecution() {
22+
log.info("Rollback not implemented for notify date migration");
23+
}
24+
25+
@Execution
26+
public void execute(MongoTemplate template) throws IOException {
27+
log.info("Starting migration: removing notifyDate from Channel and adding to Message");
28+
29+
var channels = template.findAll(org.bson.Document.class, "channel");
30+
var now = new Date();
31+
32+
for (var channel : channels) {
33+
var channelId = channel.get("_id");
34+
log.debug("Processing channel: {}", channelId);
35+
36+
template.updateFirst(
37+
Query.query(where("_id").is(channelId)),
38+
new Update().unset("notifyDate"),
39+
"channel");
40+
41+
var messages = (java.util.List<?>) channel.get("messages");
42+
if (messages != null && !messages.isEmpty()) {
43+
for (int i = 0; i < messages.size(); i++) {
44+
var message = (org.bson.Document) messages.get(i);
45+
var read = message.getBoolean("read", true);
46+
if (read) {
47+
template.updateFirst(
48+
Query.query(where("_id").is(channelId)
49+
.and("messages." + i + ".id").is(message.get("id"))),
50+
new Update().set("messages." + i + ".notifyDate", now),
51+
"channel");
52+
} else {
53+
template.updateFirst(
54+
Query.query(where("_id").is(channelId)
55+
.and("messages." + i + ".id").is(message.get("id"))),
56+
new Update().set("messages." + i + ".notifyDate", null),
57+
"channel");
58+
}
59+
}
60+
}
61+
62+
}
63+
64+
}
65+
}

artcoded/src/main/java/tech/artcoded/websitev2/pages/report/Channel.java

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,25 +20,24 @@
2020
@Builder(toBuilder = true)
2121
@Document
2222
public class Channel {
23-
@Id
24-
@Builder.Default
25-
private String id = IdGenerators.get();
23+
@Id
24+
@Builder.Default
25+
private String id = IdGenerators.get();
2626

27-
@Builder.Default
28-
private List<String> subscribers = new ArrayList<>();
27+
@Builder.Default
28+
private List<String> subscribers = new ArrayList<>();
2929

30-
@Builder.Default
31-
private List<Message> messages = new ArrayList<>();
30+
@Builder.Default
31+
private List<Message> messages = new ArrayList<>();
3232

33-
@Builder.Default
34-
private Date creationDate = new Date();
33+
@Builder.Default
34+
private Date creationDate = new Date();
3535

36-
private String correlationId;
36+
private String correlationId;
3737

38-
private Date updatedDate;
39-
private Date notifyDate;
38+
private Date updatedDate;
4039

41-
public record Message(String id, Date creationDate, String emailFrom, String content, List<String> attachmentIds,
42-
boolean read) {
43-
}
40+
public record Message(String id, Date creationDate, String emailFrom, String content, List<String> attachmentIds,
41+
boolean read, Date notifyDate) {
42+
}
4443
}

artcoded/src/main/java/tech/artcoded/websitev2/pages/report/ChannelService.java

Lines changed: 143 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -31,136 +31,152 @@
3131
@RequiredArgsConstructor
3232
public class ChannelService {
3333

34-
private final ChannelRepository channelRepository;
35-
private final MongoTemplate mongoTemplate;
36-
private final MailJobRepository mailJobRepository;
37-
private final PostRepository postRepository;
38-
39-
public Channel createChannel(String correlationId) {
40-
return channelRepository.save(Channel.builder().correlationId(correlationId).build());
41-
}
42-
43-
public Optional<Channel> getChannel(String id) {
44-
return channelRepository.findById(id);
45-
}
46-
47-
public record UnreadMeassgesCounter(PostStatus status, String subscriber, long counter) implements Serializable {
48-
private static final long serialVersionUID = 1L;
49-
}
50-
51-
@CachePut(cacheNames = "single_channel_counter_unread_messages", key = "#channelId + '_' + #subscriber")
52-
public long getCountForCorrelationId(String correlationId, String subscriber) {
53-
return channelRepository.findByCorrelationId(correlationId)
54-
.map(c -> c.getMessages().stream().filter(m -> !m.read() && !m.emailFrom().equals(subscriber)).count())
55-
.orElse(0L);
56-
}
57-
58-
@CachePut(cacheNames = "channel_counter_unread_messages", key = "'channel_counter_unread_messages'")
59-
public List<UnreadMeassgesCounter> countUnreadMessage() {
60-
List<UnreadMeassgesCounter> result = new ArrayList<>();
61-
62-
for (var status : PostStatus.values()) {
63-
var posts = this.postRepository.findByStatusIsOrderByUpdatedDateDesc(status, Pageable.unpaged());
64-
65-
Map<String, Long> counterPerSubscriber = new HashMap<>();
66-
67-
for (var post : posts.getContent()) {
68-
var chanOpt = channelRepository.findByCorrelationId(post.getId());
69-
if (chanOpt.isPresent()) {
70-
var channel = chanOpt.get();
71-
channel.getMessages().stream().filter(m -> !m.read()).forEach(m -> {
72-
channel.getSubscribers().stream().filter(s -> !m.emailFrom().equals(s))
73-
.forEach(s -> counterPerSubscriber.merge(s, 1L, Long::sum));
74-
});
75-
}
76-
}
77-
78-
counterPerSubscriber
79-
.forEach((subscriber, count) -> result.add(new UnreadMeassgesCounter(status, subscriber, count)));
34+
private final ChannelRepository channelRepository;
35+
private final MongoTemplate mongoTemplate;
36+
private final MailJobRepository mailJobRepository;
37+
private final PostRepository postRepository;
38+
39+
public Channel createChannel(String correlationId) {
40+
return channelRepository.save(Channel.builder().correlationId(correlationId).build());
41+
}
42+
43+
public Optional<Channel> getChannel(String id) {
44+
return channelRepository.findById(id);
45+
}
46+
47+
public record UnreadMeassgesCounter(PostStatus status, String subscriber, long counter) implements Serializable {
48+
private static final long serialVersionUID = 1L;
49+
}
50+
51+
@CachePut(cacheNames = "single_channel_counter_unread_messages", key = "#channelId + '_' + #subscriber")
52+
public long getCountForCorrelationId(String correlationId, String subscriber) {
53+
return channelRepository.findByCorrelationId(correlationId)
54+
.map(c -> c.getMessages().stream().filter(m -> !m.read() && !m.emailFrom().equals(subscriber)).count())
55+
.orElse(0L);
56+
}
57+
58+
@CachePut(cacheNames = "channel_counter_unread_messages", key = "'channel_counter_unread_messages'")
59+
public List<UnreadMeassgesCounter> countUnreadMessage() {
60+
List<UnreadMeassgesCounter> result = new ArrayList<>();
61+
62+
for (var status : PostStatus.values()) {
63+
var posts = this.postRepository.findByStatusIsOrderByUpdatedDateDesc(status, Pageable.unpaged());
64+
65+
Map<String, Long> counterPerSubscriber = new HashMap<>();
66+
67+
for (var post : posts.getContent()) {
68+
var chanOpt = channelRepository.findByCorrelationId(post.getId());
69+
if (chanOpt.isPresent()) {
70+
var channel = chanOpt.get();
71+
channel.getMessages().stream().filter(m -> !m.read()).forEach(m -> {
72+
channel.getSubscribers().stream().filter(s -> !m.emailFrom().equals(s))
73+
.forEach(s -> counterPerSubscriber.merge(s, 1L, Long::sum));
74+
});
8075
}
76+
}
8177

82-
return result;
83-
}
84-
85-
@Scheduled(cron = "0 30 9-18 * * MON,WED,FRI")
86-
public void checkUnreadMessages() {
87-
var fiveMinutesAgo = DateHelper.toDate(LocalDateTime.now().minusMinutes(5));
88-
89-
channelRepository.findAll().forEach(channel -> {
90-
log.info("sending channel message email...");
91-
Map<String, List<Channel.Message>> messages = channel.getMessages().stream()
92-
.filter(msg -> !msg.read() && (channel.getNotifyDate() == null)
93-
&& msg.creationDate().before(fiveMinutesAgo))
94-
.collect(Collectors.groupingBy(Channel.Message::emailFrom));
95-
if (!messages.isEmpty()) {
96-
log.info("found {} messages", messages.size());
97-
var post = this.postRepository.findById(channel.getCorrelationId())
98-
.orElseThrow(() -> new RuntimeException("post doesnt exist for channel " + channel.getId()));
99-
for (var e : messages.entrySet()) {
100-
mailJobRepository
101-
.sendDelayedMail(
102-
channel.getSubscribers().stream().filter(s -> !e.getKey().equals(s)).toList(),
103-
"New messages: " + post.getTitle(),
104-
"<p>New messages: <br>" + e.getValue().stream().map(m -> m.content())
105-
.collect(Collectors.joining("<br>")) + "</p>",
106-
false,
107-
e.getValue().stream().flatMap(m -> m.attachmentIds().stream()).distinct().toList(),
108-
LocalDateTime.now());
109-
}
110-
channelRepository.save(channel.toBuilder().notifyDate(new Date()).build());
111-
} else {
112-
log.warn("no unread message");
113-
}
114-
});
78+
counterPerSubscriber
79+
.forEach((subscriber, count) -> result.add(new UnreadMeassgesCounter(status, subscriber, count)));
11580
}
11681

117-
public Optional<Channel> getChannelByCorrelationId(String id) {
118-
return channelRepository.findByCorrelationId(id);
119-
}
120-
121-
@CacheEvict(cacheNames = { "single_channel_counter_unread_messages",
122-
"channel_counter_unread_messages" }, allEntries = true)
123-
public Optional<Channel> updateChannel(Channel updatedChannel) {
124-
return channelRepository
125-
.findById(updatedChannel.getId()).map(existing -> existing.toBuilder().updatedDate(new Date())
126-
.messages(updatedChannel.getMessages()).subscribers(updatedChannel.getSubscribers()).build())
127-
.map(channelRepository::save);
128-
129-
}
130-
131-
@CacheEvict(cacheNames = { "single_channel_counter_unread_messages",
132-
"channel_counter_unread_messages" }, allEntries = true)
133-
public void addMessage(String channelId, Channel.Message message) {
134-
Query query = new Query(Criteria.where("_id").is(channelId));
135-
Update update = new Update().push("messages", message).set("updatedDate", new Date());
136-
mongoTemplate.updateFirst(query, update, Channel.class);
137-
}
138-
139-
@CacheEvict(cacheNames = { "single_channel_counter_unread_messages",
140-
"channel_counter_unread_messages" }, allEntries = true)
141-
public void deleteMessage(String channelId, String messageId) {
142-
Query query = new Query(Criteria.where("_id").is(channelId));
143-
Update update = new Update().pull("messages", Query.query(Criteria.where("id").is(messageId)))
144-
.set("updatedDate", new Date());
145-
mongoTemplate.updateFirst(query, update, Channel.class);
146-
}
147-
148-
@CacheEvict(cacheNames = { "single_channel_counter_unread_messages",
149-
"channel_counter_unread_messages" }, allEntries = true)
150-
public void updateCorrelationId(String channelId, String correlationId) {
151-
Query query = new Query(Criteria.where("_id").is(channelId));
152-
Update update = new Update().set("correlationId", correlationId).set("updatedDate", new Date());
153-
mongoTemplate.updateFirst(query, update, Channel.class);
154-
}
155-
156-
@CacheEvict(cacheNames = { "single_channel_counter_unread_messages",
157-
"channel_counter_unread_messages" }, allEntries = true)
158-
public void deleteChannel(String channelId) {
159-
channelRepository.deleteById(channelId);
160-
}
161-
162-
public List<Channel> findChannelsBySubscriber(String email) {
163-
return channelRepository.findBySubscribersContaining(email);
164-
}
82+
return result;
83+
}
84+
85+
@Scheduled(cron = "0 30 9-18 * * MON,WED,FRI")
86+
public void checkUnreadMessages() {
87+
var fiveMinutesAgo = DateHelper.toDate(LocalDateTime.now().minusMinutes(5));
88+
89+
log.info("sending channel message email...");
90+
channelRepository.findAll().forEach(channel -> {
91+
Map<String, List<Channel.Message>> messages = channel.getMessages().stream()
92+
.filter(msg -> !msg.read() && (msg.notifyDate() == null)
93+
&& msg.creationDate().before(fiveMinutesAgo))
94+
.collect(Collectors.groupingBy(Channel.Message::emailFrom));
95+
if (!messages.isEmpty()) {
96+
log.info("found {} messages in channel {}", messages.size(), channel.getId());
97+
var post = this.postRepository.findById(channel.getCorrelationId())
98+
.orElseThrow(() -> new RuntimeException("post doesnt exist for channel " + channel.getId()));
99+
for (var e : messages.entrySet()) {
100+
mailJobRepository
101+
.sendDelayedMail(
102+
channel.getSubscribers().stream().filter(s -> !e.getKey().equals(s)).toList(),
103+
"New messages: " + post.getTitle(),
104+
"<p>New messages: <br>" + e.getValue().stream().map(m -> m.content())
105+
.collect(Collectors.joining("<br>")) + "</p>",
106+
false,
107+
e.getValue().stream().flatMap(m -> m.attachmentIds().stream()).distinct().toList(),
108+
LocalDateTime.now());
109+
}
110+
var updatedMessages = channel.getMessages().stream()
111+
.map(msg -> {
112+
if (!msg.read() && msg.notifyDate() == null && msg.creationDate().before(fiveMinutesAgo)) {
113+
return new Channel.Message(
114+
msg.id(),
115+
msg.creationDate(),
116+
msg.emailFrom(),
117+
msg.content(),
118+
msg.attachmentIds(),
119+
msg.read(),
120+
new Date());
121+
}
122+
return msg;
123+
})
124+
.toList();
125+
126+
channelRepository.save(channel.toBuilder().messages(updatedMessages).build());
127+
} else {
128+
log.warn("no unread message");
129+
}
130+
});
131+
}
132+
133+
public Optional<Channel> getChannelByCorrelationId(String id) {
134+
return channelRepository.findByCorrelationId(id);
135+
}
136+
137+
@CacheEvict(cacheNames = { "single_channel_counter_unread_messages",
138+
"channel_counter_unread_messages" }, allEntries = true)
139+
public Optional<Channel> updateChannel(Channel updatedChannel) {
140+
return channelRepository
141+
.findById(updatedChannel.getId()).map(existing -> existing.toBuilder().updatedDate(new Date())
142+
.messages(updatedChannel.getMessages()).subscribers(updatedChannel.getSubscribers()).build())
143+
.map(channelRepository::save);
144+
145+
}
146+
147+
@CacheEvict(cacheNames = { "single_channel_counter_unread_messages",
148+
"channel_counter_unread_messages" }, allEntries = true)
149+
public void addMessage(String channelId, Channel.Message message) {
150+
Query query = new Query(Criteria.where("_id").is(channelId));
151+
Update update = new Update().push("messages", message).set("updatedDate", new Date());
152+
mongoTemplate.updateFirst(query, update, Channel.class);
153+
}
154+
155+
@CacheEvict(cacheNames = { "single_channel_counter_unread_messages",
156+
"channel_counter_unread_messages" }, allEntries = true)
157+
public void deleteMessage(String channelId, String messageId) {
158+
Query query = new Query(Criteria.where("_id").is(channelId));
159+
Update update = new Update().pull("messages", Query.query(Criteria.where("id").is(messageId)))
160+
.set("updatedDate", new Date());
161+
mongoTemplate.updateFirst(query, update, Channel.class);
162+
}
163+
164+
@CacheEvict(cacheNames = { "single_channel_counter_unread_messages",
165+
"channel_counter_unread_messages" }, allEntries = true)
166+
public void updateCorrelationId(String channelId, String correlationId) {
167+
Query query = new Query(Criteria.where("_id").is(channelId));
168+
Update update = new Update().set("correlationId", correlationId).set("updatedDate", new Date());
169+
mongoTemplate.updateFirst(query, update, Channel.class);
170+
}
171+
172+
@CacheEvict(cacheNames = { "single_channel_counter_unread_messages",
173+
"channel_counter_unread_messages" }, allEntries = true)
174+
public void deleteChannel(String channelId) {
175+
channelRepository.deleteById(channelId);
176+
}
177+
178+
public List<Channel> findChannelsBySubscriber(String email) {
179+
return channelRepository.findBySubscribersContaining(email);
180+
}
165181

166182
}

0 commit comments

Comments
 (0)