Skip to content

Instantly share code, notes, and snippets.

@unautre
Created June 10, 2014 12:32
Show Gist options
  • Select an option

  • Save unautre/3cf4e9924ee81b762f82 to your computer and use it in GitHub Desktop.

Select an option

Save unautre/3cf4e9924ee81b762f82 to your computer and use it in GitHub Desktop.
Simple IPC Message Queue class for Python3, using ctypes and libc.
from ctypes import CDLL, get_errno, create_string_buffer
from ctypes import c_ushort, c_int, c_uint, c_long, c_ulong
from ctypes import c_char_p, c_void_p, POINTER, Structure, cast
# we load the libc.
libc = CDLL("libc.so.6")
# define some consts.
IPC_CREAT = 0o1000
IPC_EXCL = 0o2000
IPC_NOWAIT= 0o4000
IPC_RMID = 0
IPC_SET = 1
IPC_STAT = 2
IPC_INFO = 3
# typedef some types
key_t = c_int
class ipc_perm(Structure):
_fields_ = [ ("__key", key_t),
("uid", c_uint),
("gid", c_uint),
("cuid", c_uint),
("cgid", c_uint),
("mode", c_ushort),
]
class msqid_ds(Structure):
_fields_ = [ ("msg_perm", ipc_perm),
("msg_stime", c_ulong),
("msg_rtime", c_ulong),
("msg_ctime", c_ulong),
("__msg_cbytes", c_ulong),
("msg_qnum", c_ulong),
("msg_qbytes", c_ulong),
("msg_lspid", c_int),
("msg_lrpid", c_int)
]
class MSQueue:
msgget = libc.msgget
msgget.argtypes = [key_t, c_int]
msgsnd = libc.msgsnd
msgsnd.argtypes = [c_int, c_char_p, c_ulong, c_int]
msgctl = libc.msgctl
msgctl.argtypes = [c_int, c_int, POINTER(msqid_ds)]
msgrcv = libc.msgrcv
msgrcv.argtypes = [c_int, c_char_p, c_ulong, c_ulong, c_int]
perror = libc.perror
perror.argtypes = [c_char_p]
ftok = libc.ftok
ftok.argtypes = [c_char_p, c_int]
def __init__(self, key, mode=IPC_CREAT | 0o644):
self.id = MSQueue.msgget(key, mode)
def send(self, msg, type=1, enc="utf-8", flag=0):
buf = create_string_buffer(len(msg)+8)
lbuf = cast(buf, POINTER(c_long))
lbuf[0] = type
msg = msg.encode(enc)
for i, c in enumerate(msg):
buf[i+8] = c
MSQueue.msgsnd(self.id, buf, len(msg), flag)
def recv(self, type=0, size=64, enc="utf-8", flag=0):
buf = create_string_buffer(size)
lbuf = cast(buf, POINTER(c_long)) # for type
len = MSQueue.msgrcv(self.id, buf, size, type, flag)
return lbuf.contents.value, buf[8:len+8].decode(enc)
def clear(self):
MSQueue.msgctl(self.id, IPC_RMID, None)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment