Skip to content

Commit

Permalink
Use POSIX message queue for our IPC
Browse files Browse the repository at this point in the history
Drop socket-based communication if favor of the POSIX message queue. The
main advantage of using this approach is code simplification and reduction
of the communication overhead - the network stack. Note, that we are using
a non-portable implementation which allows multiplexing, which means that
this code will not work in the Cygwin environment...
  • Loading branch information
arkq committed Dec 31, 2015
1 parent 205af71 commit 5ac95e4
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 65 deletions.
12 changes: 10 additions & 2 deletions configure.ac
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# cmusfm - configure.ac
# Copyright (c) 2014 Arkadiusz Bokowy
# Copyright (c) 2014-2015 Arkadiusz Bokowy

AC_INIT([cmusfm], [0.2.0], [[email protected]])
AC_INIT([cmusfm], [0.2.5], [[email protected]])
AC_CONFIG_HEADERS([config.h])
AM_INIT_AUTOMAKE([foreign subdir-objects -Wall -Werror])

Expand All @@ -24,6 +24,14 @@ AC_CHECK_LIB(
[crypto], [MD5],
[], [AC_MSG_ERROR([crypto library not found])]
)
AC_CHECK_HEADERS(
[mqueue.h],
[], [AC_MSG_ERROR([mqueue.h header not found])]
)
AC_CHECK_LIB(
[rt], [mq_open],
[], [AC_MSG_ERROR([rt library not found])]
)
AC_CHECK_HEADERS(
[poll.h],
[], [AC_MSG_ERROR([poll.h header not found])]
Expand Down
3 changes: 1 addition & 2 deletions src/cmusfm.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@


#define CONFIG_FNAME "cmusfm.conf"
#define SOCKET_FNAME "cmusfm.socket"
#define CACHE_FNAME "cmusfm.cache"
#define MQUEUE_NAME "cmusfm"


/* time delay (in seconds) between login attempts to the Last.fm
Expand All @@ -40,7 +40,6 @@ extern unsigned char SC_api_key[16];
extern unsigned char SC_secret[16];
extern const char *cmusfm_cache_file;
extern const char *cmusfm_config_file;
extern const char *cmusfm_socket_file;
extern struct cmusfm_config config;


Expand Down
1 change: 0 additions & 1 deletion src/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ int main(int argc, char *argv[]) {
/* setup global variables - file locations */
cmusfm_cache_file = get_cmusfm_cache_file();
cmusfm_config_file = get_cmusfm_config_file();
cmusfm_socket_file = get_cmusfm_socket_file();

if (argc == 2 && strcmp(argv[1], "init") == 0) {
cmusfm_initialization();
Expand Down
94 changes: 37 additions & 57 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@

#include "server.h"

#include <errno.h>
#include <fcntl.h>
#include <libgen.h>
#include <mqueue.h>
#include <poll.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/un.h>
#if HAVE_SYS_INOTIFY_H
#include <sys/inotify.h>
#endif
Expand All @@ -47,6 +47,11 @@
#endif


/* Helper function for retrieving message queue name. */
static char *get_mqueue_name(void) {
return "/" MQUEUE_NAME;
}

/* Helper function for artist name retrieval. */
static char *get_record_artist(const struct cmusfm_data_record *r) {
return (char *)(r + 1);
Expand Down Expand Up @@ -96,7 +101,7 @@ static void set_trackinfo(scrobbler_trackinfo_t *sbt,
static void cmusfm_server_process_data(scrobbler_session_t *sbs,
const struct cmusfm_data_record *record) {

static char saved_data[CMSOCKET_BUFFER_SIZE];
static char saved_data[MQUEUE_BUFFER_SIZE];
static char saved_is_radio = 0;
struct cmusfm_data_record *saved_record = (struct cmusfm_data_record *)saved_data;

Expand Down Expand Up @@ -256,23 +261,15 @@ static void cmusfm_server_process_data(scrobbler_session_t *sbs,
* appropriately. */
int cmusfm_server_check(void) {

struct sockaddr_un saddr;
int fd;
mqd_t mq;

memset(&saddr, 0, sizeof(saddr));
saddr.sun_family = AF_UNIX;
strcpy(saddr.sun_path, cmusfm_socket_file);

if ((fd = socket(PF_UNIX, SOCK_STREAM, 0)) == -1)
return -1;

/* check if behind the socket there is an active server instance */
if (connect(fd, (struct sockaddr *)(&saddr), sizeof(saddr)) == 0) {
close(fd);
if ((mq = mq_open(get_mqueue_name(), O_WRONLY)) != -1) {
mq_close(mq);
return 1;
}

return 0;
/* missing message queue is not an error here */
return errno == ENOENT ? 0 : -1;
}

/* server shutdown stuff */
Expand All @@ -287,10 +284,17 @@ static void cmusfm_server_stop(int sig) {
* Upon error -1 is returned. */
int cmusfm_server_start(void) {

char buffer[CMSOCKET_BUFFER_SIZE];
const char *mqueue_name = get_mqueue_name();
const struct mq_attr mqueue_attr = {
.mq_flags = O_NONBLOCK,
.mq_maxmsg = 10,
.mq_msgsize = MQUEUE_BUFFER_SIZE,
.mq_curmsgs = 0,
};

char buffer[mqueue_attr.mq_msgsize];
scrobbler_session_t *sbs;
struct sigaction sigact;
struct sockaddr_un saddr;
struct pollfd pfds[3];
#if HAVE_SYS_INOTIFY_H
struct inotify_event inot_even;
Expand All @@ -301,16 +305,16 @@ int cmusfm_server_start(void) {

/* setup poll structure for data reading */
pfds[0].events = POLLIN; /* server */
pfds[1].events = POLLIN; /* client */
pfds[1].events = POLLIN; /* not used */
pfds[2].events = POLLIN; /* inotify */
pfds[1].fd = -1;
pfds[2].fd = -1;

memset(&saddr, 0, sizeof(saddr));
saddr.sun_family = AF_UNIX;
strcpy(saddr.sun_path, cmusfm_socket_file);
if ((pfds[0].fd = socket(PF_UNIX, SOCK_STREAM, 0)) == -1)
if ((pfds[0].fd = mq_open(mqueue_name,
O_CREAT | O_EXCL | O_RDONLY, 0600, &mqueue_attr)) == -1) {
perror("error: create message queue");
return -1;
}

/* initialize scrobbling library */
sbs = scrobbler_initialize(SC_api_key, SC_secret);
Expand All @@ -328,12 +332,6 @@ int cmusfm_server_start(void) {
sigaction(SIGTERM, &sigact, NULL);
sigaction(SIGINT, &sigact, NULL);

/* create server communication socket */
unlink(saddr.sun_path);
if (bind(pfds[0].fd, (struct sockaddr *)(&saddr), sizeof(saddr)) == -1 ||
listen(pfds[0].fd, 2) == -1)
goto return_failure;

#if HAVE_SYS_INOTIFY_H
/* initialize inode notification to watch changes in the config file */
pfds[2].fd = inotify_init();
Expand All @@ -347,14 +345,7 @@ int cmusfm_server_start(void) {
break; /* signal interruption */

if (pfds[0].revents & POLLIN) {
pfds[1].fd = accept(pfds[0].fd, NULL, NULL);
debug("new client accepted: %d", pfds[1].fd);
}

if (pfds[1].revents & POLLIN && pfds[1].fd != -1) {
read(pfds[1].fd, buffer, sizeof(buffer));
close(pfds[1].fd);
pfds[1].fd = -1;
mq_receive(pfds[0].fd, buffer, sizeof(buffer), NULL);
cmusfm_server_process_data(sbs, (struct cmusfm_data_record *)buffer);
}

Expand Down Expand Up @@ -385,20 +376,19 @@ int cmusfm_server_start(void) {
cmusfm_notify_free();
#endif
scrobbler_free(sbs);
close(pfds[0].fd);
unlink(saddr.sun_path);
mq_close(pfds[0].fd);
mq_unlink(mqueue_name);

return retval;
}

/* Send track info to server instance. */
int cmusfm_server_send_track(struct cmtrack_info *tinfo) {

char buffer[CMSOCKET_BUFFER_SIZE];
char buffer[MQUEUE_BUFFER_SIZE];
struct cmusfm_data_record *record = (struct cmusfm_data_record *)buffer;
struct format_match *match, *matches;
struct sockaddr_un saddr;
int sock;
mqd_t mq;

/* helper accessors for dynamic fields */
char *artist = &buffer[sizeof(*record)];
Expand Down Expand Up @@ -482,25 +472,15 @@ int cmusfm_server_send_track(struct cmtrack_info *tinfo) {
record->checksum1 = make_record_checksum1(record);
record->checksum2 = make_record_checksum2(record);

/* connect to the communication socket */
memset(&saddr, 0, sizeof(saddr));
strcpy(saddr.sun_path, cmusfm_socket_file);
saddr.sun_family = AF_UNIX;
sock = socket(PF_UNIX, SOCK_STREAM, 0);
if (connect(sock, (struct sockaddr *)(&saddr), sizeof(saddr)) == -1) {
close(sock);
if ((mq = mq_open(get_mqueue_name(), O_WRONLY)) == -1) {
perror("error: open message queue");
return -1;
}

debug("record length: %ld", sizeof(struct cmusfm_data_record) +
record->off_title + record->off_location + strlen(location) + 1);
write(sock, buffer, sizeof(struct cmusfm_data_record) +
record->off_title + record->off_location + strlen(location) + 1);

return close(sock);
}
mq_send(mq, buffer, sizeof(struct cmusfm_data_record) +
record->off_title + record->off_location + strlen(location) + 1, 0);

/* Helper function for retrieving cmusfm server socket file. */
char *get_cmusfm_socket_file(void) {
return get_cmus_home_file(SOCKET_FNAME);
return mq_close(mq);
}
5 changes: 2 additions & 3 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
#include "cmusfm.h"


/* communication socket buffer size */
#define CMSOCKET_BUFFER_SIZE 1024
/* message size for message queue */
#define MQUEUE_BUFFER_SIZE 512

/* shoutcast/stream flag for the status field */
#define CMSTATUS_SHOUTCASTMASK 0xF0
Expand Down Expand Up @@ -58,7 +58,6 @@ struct cmusfm_data_record {
};


char *get_cmusfm_socket_file(void);
int cmusfm_server_check(void);
int cmusfm_server_start(void);
int cmusfm_server_send_track(struct cmtrack_info *tinfo);
Expand Down

0 comments on commit 5ac95e4

Please sign in to comment.