'Web Socket send private notification to a specific user from database

This is my first time working with websockets , I am trying to create an application that sends global notifications to everyone and also private notifications for specific users from Database. I have implemented a WebSockets Configuration and sending notifications to everyone works fine but sending private notifications messages to specific user from database does not work, when I open two browsers windows I am unable to see the private notifications coming but the global notifications are visible.

Here My WebSocket Configuration file:

@Configuration @EnableWebSocketMessageBroker public class
WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        //web sockets starts communication with /notification
        registry.enableSimpleBroker("/topic");
        //web socket prefix
        registry.setApplicationDestinationPrefixes("/ws");
    }


    //to use in front end
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        //front end  starts with this url to communicate with web socket
        registry.addEndpoint("/ws-notifications")
                .withSockJS();
    } 
}

NotificationsController:

@Controller 
public class NotificationsController {

    @Autowired
    private NotificationService notificationService;

    @Autowired
    NotificationRepository notificationRepository;


    @MessageMapping("/message")
    @SendTo("/topic/messages")
    public ResponseNotification getMessage(final NotificationDAO notificationDAO) throws InterruptedException {
      Thread.sleep(1000);
      notificationService.sendGlobalNotification();
      return new ResponseNotification("Hello, " + HtmlUtils.htmlEscape(notificationDAO.getMessageContent()) + "!");
    }


    @MessageMapping("/private-message")
    @SendTo("/topic/private-messages")
    public ResponseNotification getPrivateMessage(final NotificationDAO notificationDAO, final Principal principal) throws
InterruptedException {
        Thread.sleep(1000);
        notificationService.sendPrivateNotification(principal.getName());
        return new ResponseNotification("Sending private message to user  "+principal.getName()+ " :  " +
HtmlUtils.htmlEscape(notificationDAO.getMessageContent()) + "!");
    }

}

NotificationsService:

@Service 
public class NotificationService implements INotificationService {

    String apiBaseUrl = "https://api.github.com";

    private final SimpMessagingTemplate messagingTemplate;

    @Autowired  public NotificationService(SimpMessagingTemplate messagingTemplate) {       
        this.messagingTemplate = messagingTemplate;
    }

    public void sendGlobalNotification() {      
        ResponseNotification message = new ResponseNotification("Global Notification");

        messagingTemplate.convertAndSend("/topic/global-notifications", message);   
    }

    public void sendPrivateNotification(final String userId) {
        ResponseNotification message = new ResponseNotification("Private Notification");

        messagingTemplate.convertAndSendToUser(userId,"/topic/private-notifications", message);     }    
}

WebSocketController

@RestController 
@RequestMapping("api/websocket") public class
WebSocketController {

    @Autowired
    WebSocketService webSocketService;


    @PostMapping("/send-message")
    public void sendMessage(@RequestBody final NotificationDAO notificationDAO) {
    webSocketService.notifyFrontend(notificationDAO.getMessageContent());
    }

    @PostMapping("/send-private-message/{id}")
    public void sendPrivateMessage(@PathVariable final String id, @RequestBody final 
    NotificationDAO notificationDAO) {
        webSocketService.notifyUser(id,notificationDAO.getMessageContent());
    }

  My WebSocketService:

  @Service
  public class WebSocketService {

    private final SimpMessagingTemplate messagingTemplate;
    private final NotificationService notificationService;

    @Autowired
    public WebSocketService(SimpMessagingTemplate messagingTemplate, NotificationService notificationService) {
        this.messagingTemplate = messagingTemplate;
        this.notificationService = notificationService;
    }


    public void notifyFrontend(final String message) {
        ResponseNotification responseNotification = new ResponseNotification(message);
        notificationService.sendGlobalNotification();
        messagingTemplate.convertAndSend("topic/messages",responseNotification);
    }


    public void notifyUser(final String id, final String message) {
        ResponseNotification responseNotification = new ResponseNotification(message);
        notificationService.sendPrivateNotification(id);
        messagingTemplate.convertAndSendToUser(id,"topic/private-messages",responseNotification);
    }

}

My WebSocketService

@Service 
public class WebSocketService {
    
    private final SimpMessagingTemplate messagingTemplate;
    private final NotificationService notificationService;

    @Autowired
    public WebSocketService(SimpMessagingTemplate messagingTemplate, NotificationService notificationService) {
        this.messagingTemplate = messagingTemplate;
        this.notificationService = notificationService;
    }


    public void notifyFrontend(final String message) {
        ResponseNotification responseNotification = new ResponseNotification(message);
        notificationService.sendGlobalNotification();
        messagingTemplate.convertAndSend("topic/messages",responseNotification);
    }


    public void notifyUser(final String id, final String message) {
        ResponseNotification responseNotification = new ResponseNotification(message);
        notificationService.sendPrivateNotification(id);
        messagingTemplate.convertAndSendToUser(id,"topic/private-messages",responseNotification);
    }

}

my js file:

var stompClient = null; var notificationCount = 0;
    
    
$(document).ready( function () {

    console.log("document is ready")
    $("form").on('submit', function (e) {
        e.preventDefault();
    });
    $( "#connect" ).click(function() { connect(); });
    $( "#disconnect" ).click(function() { disconnect(); });
    $( "#send" ).click(function() { sendMessage(); });
    $( "#send-private" ).click(function() { sendPrivateMessage(); });
    $( "#notifications" ).click(function() { resetNotificationCount(); });
});


function setConnected(connected) {
    $("#connect").prop("disabled", connected);
    $("#disconnect").prop("disabled", !connected);
    if (connected) {
        $("#conversation").show();
    }
    else {
        $("#conversation").hide();
    }
    $("#greetings").html(""); }

function connect() {
    var socket = new SockJS('/ws-notifications');
    stompClient = Stomp.over(socket);
    stompClient.connect({}, function (frame) {
        console.log('Connected: ' + frame);
        updateNotificationDisplay();
        stompClient.subscribe('/topic/messages', function (message) {
            showMessage(JSON.parse(message.body).content);
        });

        stompClient.subscribe('/topic/private-messages', function (message) {
            showMessage(JSON.parse(message.body).content);
        });

        stompClient.subscribe('/topic/global-notifications', function (message) {
            notificationCount = notificationCount + 1;
            updateNotificationDisplay();
        });

        stompClient.subscribe('/topic/private-notifications', function (message) {
            notificationCount = notificationCount + 1;
            updateNotificationDisplay();
        });

    });

}

function disconnect() {
    if (stompClient !== null) {
        stompClient.disconnect();
    }
    setConnected(false);
    console.log("Disconnected"); 
}

function showMessage(message) {
    $("#messages").append("<tr><td>" + message + "</td></tr>"); 
}

function sendMessage() {
    console.log("sending message");
    stompClient.send("/ws/message", {}, JSON.stringify({'messageContent': $("#message").val()})); 
}

function sendPrivateMessage() {
    console.log("sending private message");
    stompClient.send("/ws/private-message", {}, JSON.stringify({'messageContent': $("#private-message").val()})); 
}

function updateNotificationDisplay() {
    if (notificationCount == 0) {
        $('#notifications').hide();
    } else {
        $('#notifications').show();
        $('#notifications').text(notificationCount);
    } 
}

function resetNotificationCount() {
    notificationCount = 0;
    updateNotificationDisplay(); 
}

My question is how to send a private message to a user with username for example david20 and how david20 can see his private notifications and on what url?

I have spring security implemented on the application and I am not sure if authentication is required or not to send/get private messages. I am fully lost and I almost understand a little about WebSocket. Any help will be so appreciated.



Solution 1:[1]

powerful code, actually you need to use auth token or any other identifier to handle the private notification. first of all you need to change your js file to listen on the following :

 stompClient.subscribe('/topic/private-messages/{TOKEN_OR_IDENTIFIER}', function (message) {
            showMessage(JSON.parse(message.body).content);
        });

Then you should custom you back to send for each user as following :

@MessageMapping("/private-message/{TOKEN_OR_IDENTIFIER}")
    @SendTo("/topic/private-messages/{TOKEN_OR_IDENTIFIER}")
    public ResponseNotification getPrivateMessage(final NotificationDAO notificationDAO, final Principal principal, @PathVariable String TOKEN_OR_IDENTIFIER) throws
InterruptedException {
        Thread.sleep(1000);
        notificationService.sendPrivateNotification(principal.getName());
        return new ResponseNotification("Sending private message to user  "+principal.getName()+ " :  " +
HtmlUtils.htmlEscape(notificationDAO.getMessageContent()) + "!");
    }

this two changes show makes the following :

  1. each client will open a new (private) channel with the broker.
  2. the broker will handle a custom uri's and publish to custom clients.

Solution 2:[2]

Using @RSale's input:

import pandas as pd
df = pd.DataFrame({'data': [30, 60, 120, 150, 30, 60, 90, 30, 60]})

d = dict(tuple(df.groupby(df['data'].eq(30).cumsum())))

d is a dictionary of three dataframes:

d[1]:

   data
0    30
1    60
2   120
3   150

d[2]:

   data
4    30
5    60
6    90

And d[3}:

   data
7    30
8    60

Solution 3:[3]

Not very elegant but it get's the job done.

Loop through an array. Add array to a list when a number is smaller than the one before. Remove the saved array from the list and repeat until no change is detected.

numpy & recursive

import numpy as np
a = np.array([30, 60, 120, 150, 30, 60, 90, 30, 60])
y = []

def split(a,y):
    for count,val in enumerate(a):
        if count == 0:
            pass
        elif val < a[count-1]:
            y.append(a[:count])
            a =  a[count:]
            if len(a)> 0 and sorted(a) != list(a):
                split(a,y)
            else:
                y.append(a)
                a = []
                return(y)
        
            return(y)

y = split(a,y)
print(y)
>>[array([ 30,  60, 120, 150]), array([30, 60, 90]), array([30, 60])]
print([max(lis) for lis in y])
>>[150,90,60]

This will not consider 30 as a starting point but the samllest number after the reset.

Or using diff to find the change points.

numpy & diff version

import numpy as np

a = np.array([30, 60, 120, 150, 30, 60, 90, 30, 60])

y = []

def split(a,y):
    
    a_diff = np.asarray(np.where(np.diff(a)<0))[0]
    
    while len(a_diff)>1:
        
        a_diff = np.asarray(np.where(np.diff(a)<0))[0] 
        
        y.append(a[:a_diff[0]+1])
        
        a = a[a_diff[0]+1:]
    
    y.append(a)
    
    return(y)    

y = split(a,y)

print(y)

print([max(lis) for lis in y])

>>[array([ 30,  60, 120, 150]), array([30, 60, 90]), array([30, 60])]

>>[150, 90, 60]

pandas & DataFrame version

import pandas as pd
df = pd.DataFrame({'data': [30, 60, 120, 150, 30, 60, 90, 30, 60]})
y = []
def split(df,y):
    a = df['data']
    a_diff = [count for count,val in enumerate(a.diff()[1:])  if val < 0 ]
    while len(a_diff)>1:
        a_diff = [count for count,val in enumerate(a.diff()[1:])  if val < 0 ]
        y.append(a[:a_diff[0]+1])
        a = a[a_diff[0]+1:]
    y.append(a)
    return(y)  

y = split(df,y)
print(y)
print([max(lis) for lis in y])

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Anas Altarazi
Solution 2 Scott Boston
Solution 3