向大家请教,一个驱动(或者说一个设备)被多个进程或应用程序调用的问题 Hot! Hot!

大家好,我向大家请教一个问题,我写一个驱动程序给应用程序提供读写设备的功能。但是只有一个设备,有多个应用程序调用,该怎么实现?我看了以前xtang老大给别人回复的帖子,参考mqueue实现的代码。帖子链接如下:
http://www.openqnx.com/chinese/viewtopic.php?t=473&highlight=一个设备

我看了下mqueue代码,没有发现供多个应用程序调用的机制。
我想问下是不是每个应用程序调用该设备时都有1个ctp_per,msg_per,ocb_per,
即每个应用程序打开设备时都给应该在驱动中创建一个 ctp,msg,ocb ?
还是有其他方式来实现?
请大家指点。如果问题成功解决后,我将在本论坛上发布一个
供多个应用程序调用的qnx资源管理器的驱动模板,以后大家碰到该问题时就不用纠结了。
请大家不吝指点!

先顺便贴上mqueue的代码。
io_open.c
static int
create_device(char *name, enum _file_type type, MQDEV *dev) {
char path[PATH_MAX];
unsigned *count, limit;
int id;

if (type == _FTYPE_MQUEUE)
count = &num_mq, limit = max_num_mq;
else
count = &num_sem, limit = max_num_sem;
if (*count >= limit)
errno = ENFILE, id = -1;
else if ((id = resmgr_attach(dpp, NULL, strcat(strcpy(path, “/”), name), type, 0, &mq_connect_funcs, &mq_io_funcs, dev)) != -1)
++*count;
return(id);
}

static MQDEV *
check_duplicate(char *name, MQDEV *head) {
while(head != NULL && strcmp(name, head->name)) {
head = head->link;
}
return head;
}

int
io_open(resmgr_context_t *ctp, io_open_t *msg, MQDEV *dev, void *extra) {
struct mq_attr *mqp = extra;
uint32_t *smp = extra;
iofunc_attr_t *attr = &dev->attr;
MQDEV **head;
struct mq_attr mq_attr;
int status;
dev_t rdev;

if(S_ISDIR(dev->attr.mode)) {
// Open on a new/non-existent queue.
if((msg->connect.ioflag & O_CREAT) == 0) {/*打开一个不存在的路径,且打开模式不是可创建 */
return ENOENT;
}

// It must have a file_type of _FTYPE_MQUEUE or _FTYPE_SEM
memset(&mq_attr, 0, sizeof(mq_attr));
switch(msg->connect.file_type) {
case _FTYPE_MQUEUE:
rdev = S_INMQ;
head = &mq_dir_attr.link;
if(msg->connect.extra_type == _IO_CONNECT_EXTRA_MQUEUE) {
if (msg->connect.extra_len != sizeof(struct mq_attr))
return(ENOSYS);
if (ctp->info.flags & _NTO_MI_ENDIAN_DIFF) {
ENDIAN_SWAP32(&mqp->mq_maxmsg);
ENDIAN_SWAP32(&mqp->mq_msgsize);
}
if((mq_attr.mq_maxmsg = mqp->mq_maxmsg) <= 0 ||
(mq_attr.mq_msgsize = mqp->mq_msgsize) <= 0) {
return EINVAL;
}
} else {
mq_attr.mq_maxmsg = 1024;
mq_attr.mq_msgsize = 4096;
}
break;

case _FTYPE_SEM:
rdev = S_INSEM;
head = &sem_dir_attr.link;
mq_attr.mq_maxmsg = _POSIX_SEM_VALUE_MAX;
mq_attr.mq_flags = MQ_SEMAPHORE;
if(msg->connect.extra_type == _IO_CONNECT_EXTRA_SEM) {
if (msg->connect.extra_len != sizeof(uint32_t))
return(ENOSYS);
if (ctp->info.flags & _NTO_MI_ENDIAN_DIFF) {
ENDIAN_SWAP32(smp);
}
mq_attr.mq_curmsgs = *smp;
}
break;

default:
return ENOSYS;
}

// Check for O_CREAT race condition (PR-11060)
if ((dev = check_duplicate(msg->connect.path, *head)) != NULL) {
// Re-target open to the already created device.
ctp->id = dev->id;
goto race; // In case non-trivial open verification code
}

// Get a device entry and the input/output buffers for it.
if((dev = MemchunkCalloc(memchunk, 1, sizeof(*dev) + msg->connect.path_len - sizeof(char))) == NULL) {
return ENOSPC;
}

msg->connect.mode = (msg->connect.mode & ~S_IFMT) | S_IFNAM;
if((status = iofunc_open(ctp, msg, &dev->attr, attr, 0)) != EOK) {
MemchunkFree(memchunk, dev);
return status;
}

dev->mq_attr = mq_attr;
dev->attr.rdev = rdev;
IOFUNC_NOTIFY_INIT(dev->notify);

// Add the new queue to the pathname space
if((dev->id = create_device(msg->connect.path, msg->connect.file_type, dev)) == -1) {
if ((status = errno) == EMFILE) { //We have created too many connections, this is the system limit
status = ENFILE; //Tell the client the system is full.
}
MemchunkFree(memchunk, dev);
return status;
}
strcpy(dev->name, msg->connect.path);
dev->link = *head, *head = dev;

// Re-target open to the newly created device.
ctp->id = dev->id;
} else {
race:
// Open on an existing queue.
if((status = iofunc_open(ctp, msg, &dev->attr, 0, 0)) != EOK) {
return status;
}
}

// Attach the ocb to the device
if((status = iofunc_ocb_attach(ctp, msg, NULL, &dev->attr, NULL)) == -1) {
return status;
}

return EOK;
}


io_read.c


int
io_read(resmgr_context_t *ctp, io_read_t *msg, struct ocb *ocb) {
MQDEV *dev = ocb->ocb.attr;
MQMSG *mp;
static MQMSG dummy;
MQWAIT *wp;
int nonblock, status, n, rcvid;

// Is queue open for read?
if((status = iofunc_read_verify(ctp, msg, &ocb->ocb, &nonblock)) != EOK) {
return status;
}

// If an xtype is specified make sure it is a mqueue.
if((msg->i.xtype & _IO_XTYPE_MASK) != _IO_XTYPE_NONE && (msg->i.xtype & _IO_XTYPE_MASK) != _IO_XTYPE_MQUEUE) {
return ENOSYS;
}

// Is the msg buffer too small for the queue?
if(msg->i.nbytes < dev->mq_attr.mq_msgsize) {
return EMSGSIZE;
}

// Are there waiting msgs.
if(dev->mq_attr.mq_curmsgs == 0) {
if(nonblock & O_NONBLOCK) {
return EAGAIN;
}

if((wp = MemchunkMalloc(memchunk, sizeof(*wp))) == NULL) {
return ENOMEM;
}

wp->rcvid = ctp->rcvid;
wp->scoid = ctp->info.scoid;
wp->coid = ctp->info.coid;
wp->priority = 0; // Must get real priority from ctp->info
wp->xtype = msg->i.xtype;
LINK_PRI_CLIENT(&dev->waiting_read, wp);
++dev->mq_attr.mq_recvwait;

return _RESMGR_NOREPLY;
}

// Reply with the data
mp = (dev->mq_attr.mq_flags & MQ_SEMAPHORE) ? &dummy : dev->waiting_msg[0];
if(mp->nbytes) {
dev->attr.flags |= (IOFUNC_ATTR_ATIME | IOFUNC_ATTR_DIRTY_TIME);
}
resmgr_endian_context(ctp, _IO_READ, S_IFNAM, msg->i.xtype);
_IO_SET_READ_NBYTES(ctp, mp->nbytes);
if((msg->i.xtype & _IO_XTYPE_MASK) == _IO_XTYPE_MQUEUE) {
uint32_t prio;

prio = mp->priority;
SETIOV(&ctp->iov[0], &prio, sizeof(prio));
SETIOV(&ctp->iov[1], mp->data, mp->nbytes);
if(resmgr_msgreplyv(ctp, ctp->iov, 2) == -1) {
return errno;
}
} else {
SETIOV(&ctp->iov[0], mp->data, mp->nbytes);
if(resmgr_msgreplyv(ctp, ctp->iov, 1) == -1) {
return errno;
}
}

// Remove the msg
if(mp != &dummy) {
if ((dev->waiting_msg[0] = mp->next) == NULL)
dev->waiting_msg[1] = NULL;
MemchunkFree(memchunk, mp);
}
–dev->mq_attr.mq_curmsgs;

// Keep stat info up-to-date. We overload st_size to be messages waiting.
dev->attr.nbytes = dev->mq_attr.mq_curmsgs;

// Since we removed a msg we may need to wake someone waiting for a msg.
if(wp = dev->waiting_write) {

// Unlink and free wait entry
rcvid = wp->rcvid;
dev->waiting_write = wp->next;
MemchunkFree(memchunk, wp);
–dev->mq_attr.mq_sendwait;

// Process the message
resmgr_msg_again(ctp, rcvid);
}

if((n = dev->mq_attr.mq_maxmsg - dev->mq_attr.mq_curmsgs) != 0
&& IOFUNC_NOTIFY_OUTPUT_CHECK(dev->notify, n)) {
iofunc_notify_trigger(dev->notify, n, IOFUNC_NOTIFY_OUTPUT);
}

return _RESMGR_NOREPLY;
}


io_write.c

int
io_write(resmgr_context_t *ctp, io_write_t *msg, struct ocb *ocb) {
MQDEV *dev = ocb->ocb.attr;
MQMSG *mp;
MQWAIT *wp;
void *data;
unsigned priority = 0;
unsigned client_prio;
int nonblock, status, nbytes, rcvid, preread;

// Will be NULL if called from io_closeocb with a closemsg.
if(msg != NULL) {
// Is queue open for write?
if((status = iofunc_write_verify(ctp, msg, &ocb->ocb, &nonblock)) != EOK) {
return status;
}

// If an xtype is specified make sure it is an mqueue.
if((msg->i.xtype & _IO_XTYPE_MASK) != _IO_XTYPE_NONE) {
if((msg->i.xtype & _IO_XTYPE_MASK) != _IO_XTYPE_MQUEUE) {
return ENOSYS;
}
priority = msg->i.xtype >> 16;
if(priority < 0 || priority >= MQ_PRIO_MAX) {
return EINVAL;
}
}

// Is the msg too big for the queue?
if(msg->i.nbytes > dev->mq_attr.mq_msgsize) {
return EMSGSIZE;
}

client_prio = ctp->info.priority;

// If there is not enough room for another msg we must block.
if(dev->mq_attr.mq_curmsgs >= dev->mq_attr.mq_maxmsg) {
if(nonblock & O_NONBLOCK) {
return EAGAIN;
}

if((wp = MemchunkMalloc(memchunk, sizeof(*wp))) == NULL) {
return ENOSPC;
}

wp->rcvid = ctp->rcvid;
wp->scoid = ctp->info.scoid;
wp->coid = ctp->info.coid;
wp->priority = client_prio;
wp->xtype = msg->i.xtype;
LINK_PRI_CLIENT(&dev->waiting_write, wp);
++dev->mq_attr.mq_sendwait;

return _RESMGR_NOREPLY;
}
data = (char *)msg + sizeof(msg->i), nbytes = msg->i.nbytes, preread = ctp->size - sizeof(msg->i);
} else {
data = ocb->closemsg->data, nbytes = preread = ocb->closemsg->nbytes;
}

if((dev->mq_attr.mq_flags & MQ_SEMAPHORE) == 0) {

// Try fast process-to-process flip (without queuing msg)
if((wp = dev->waiting_read) != NULL && (msg == NULL || ctp->size >= sizeof(msg->i) + nbytes)) {
if(nbytes) {
dev->attr.flags |= (IOFUNC_ATTR_CTIME | IOFUNC_ATTR_MTIME | IOFUNC_ATTR_ATIME | IOFUNC_ATTR_DIRTY_TIME);
}
resmgr_endian_context(ctp, _IO_READ, S_IFNAM, wp->xtype);
_IO_SET_READ_NBYTES(ctp, nbytes);
rcvid = ctp->rcvid, ctp->rcvid = wp->rcvid;
if((wp->xtype & _IO_XTYPE_MASK) == _IO_XTYPE_MQUEUE) {
uint32_t prio = priority;
SETIOV(&ctp->iov[0], &prio, sizeof(prio));
SETIOV(&ctp->iov[1], data, nbytes);
resmgr_msgreplyv(ctp, ctp->iov, 2);
} else {
SETIOV(&ctp->iov[0], data, nbytes);
resmgr_msgreplyv(ctp, ctp->iov, 1);
}
ctp->rcvid = rcvid;
dev->waiting_read = wp->next;
MemchunkFree(memchunk, wp);
–dev->mq_attr.mq_recvwait;
return EOK;
}

// Get a msg buffer.
if((mp = MemchunkMalloc(memchunk, MQ_DATAOFF + nbytes)) == NULL) {
return EAGAIN;
}

mp->next = NULL;
mp->priority = priority;
if(mp->nbytes = nbytes) {
dev->attr.flags |= (IOFUNC_ATTR_CTIME | IOFUNC_ATTR_MTIME | IOFUNC_ATTR_DIRTY_TIME);
}

// Save/Get the data into msg buffer
if(msg == NULL || preread >= nbytes) {
memcpy(mp->data, data, nbytes);
} else {
memcpy(&mp->data[0], data, preread);
if(MsgRead(ctp->rcvid, &mp->data[preread], nbytes - preread, ctp->size) != nbytes - preread) {
MemchunkFree(memchunk, mp);
return EIO;
}
}

// Queue the msg
LINK_PRI_MSG(dev->waiting_msg, mp);
}

// Reply with status
if(msg != NULL)
MsgError(ctp->rcvid, EOK);

++dev->mq_attr.mq_curmsgs;

// Keep stat info up-to-date. We overload st_size to be messages waiting.
dev->attr.nbytes = dev->mq_attr.mq_curmsgs;

// Since we added a msg we may need to wake someone waiting for a msg.
if(wp = dev->waiting_read) {

// Unlink and free wait entry
rcvid = wp->rcvid;
dev->waiting_read = wp->next;
MemchunkFree(memchunk, wp);
–dev->mq_attr.mq_recvwait;

// Process the message
resmgr_msg_again(ctp, rcvid);
} else {
// Check for notify conditions
if (IOFUNC_NOTIFY_INPUT_CHECK(dev->notify, dev->mq_attr.mq_curmsgs, dev->mq_attr.mq_curmsgs == 1)) {
iofunc_notify_trigger(dev->notify, dev->mq_attr.mq_curmsgs, IOFUNC_NOTIFY_INPUT);
}
}

return _RESMGR_NOREPLY;
}


main.c

static const size_t memchunks[] = {
sizeof(struct ocb), sizeof(MQWAIT),
sizeof(MQDEV) + 32, sizeof(MQDEV) + 68,
MQ_DATAOFF + 564, MQ_DATAOFF + 1344,
MQ_DATAOFF + 2024, MQ_DATAOFF + 4064,
MQ_DATAOFF + 8158, MQ_DATAOFF + 12256,
MQ_DATAOFF + 16352, MQ_DATAOFF + 20448,
MQ_DATAOFF + 24544, MQ_DATAOFF + 28638,
};

int
main(int argc, char *argv[]) {
struct rlimit fdlimit;
resmgr_attr_t res_attr;
resmgr_context_t *ctp;
extern char *__progname;
int semaphores;

/*

  • Attempt to co-exist with 6.3 procnto which provides named semaphores
  • itself. If _SC_SEMAPHORES set already, then either duplicate mqueue
  • or new procnto; either way don’t put up _FTYPE_SEM services ourself.
    */
    semaphores = !((semaphores = sysconf(_SC_SEMAPHORES)) != -1 && semaphores != 0);

func_init();

if(MemchunkInit(&memchunk, sizeof(memchunks) / sizeof(memchunks[0]), memchunks) != EOK) {
fprintf(stderr, “%s: Unable to initialise memory allocator\n”, __progname);
return EXIT_FAILURE;
}

if((dpp = dispatch_create()) == NULL) {
fprintf(stderr, “%s: Unable to allocate dispatch context\n”, __progname);
return EXIT_FAILURE;
}

options(argc, argv);

// Init resmgr attributes
memset(&res_attr, 0, sizeof res_attr);
res_attr.flags = RESMGR_FLAG_CROSS_ENDIAN;
res_attr.nparts_max = 2;
res_attr.msg_max_size = sizeof(io_write_t) + 2560;

// Initialize mountpoint attributes
iofunc_attr_init(&mq_dir_attr.attr, S_IFDIR | S_ISVTX | S_IPERMS, 0, 0);
mq_dir_attr.attr.mount = &mq_mount;
mq_mount.conf = IOFUNC_PC_CHOWN_RESTRICTED | IOFUNC_PC_NO_TRUNC;
mq_mount.funcs = &ocb_funcs;
mq_dir_attr.link = NULL;

// Initialize mountpoint attributes
if (semaphores) {
iofunc_attr_init(&sem_dir_attr.attr, S_IFDIR | S_ISVTX | S_IPERMS, 0, 0);
sem_dir_attr.attr.mount = &sem_mount;
sem_mount.conf = IOFUNC_PC_CHOWN_RESTRICTED | IOFUNC_PC_NO_TRUNC;
sem_mount.funcs = &ocb_funcs;
sem_dir_attr.link = NULL;
}

// Create the mount points for all message queues and named semaphores
if(resmgr_attach(dpp, &res_attr, NULL, _FTYPE_MQUEUE, _RESMGR_FLAG_DIR | _RESMGR_FLAG_FTYPEONLY,
&mq_connect_funcs, &mq_io_dir_funcs, &mq_dir_attr) == -1) {
fprintf(stderr, “%s: Unable to allocate mqueue (%s)\n”, __progname, strerror(errno));
return EXIT_FAILURE;
}
if (semaphores) {
if(resmgr_attach(dpp, &res_attr, NULL, _FTYPE_SEM, _RESMGR_FLAG_DIR | _RESMGR_FLAG_FTYPEONLY,
&mq_connect_funcs, &mq_io_dir_funcs, &sem_dir_attr) == -1) {
fprintf(stderr, “%s: Unable to allocate sem (%s)\n”, __progname, strerror(errno));
return EXIT_FAILURE;
}
}

/*
The maximum number of semaphores/mqueues that the system can support
is actually limited by the number of fd connections it can establish
to proc (each resmgr_attach() is equivalent to an open()). At some
future point we will be able to boost our limit to match if it is
lower. Note that it must be the sum of the MAX_NUM_SEM and MAX_NUM_MQ
that we boost to since a user might want to use all of those together.

For now we ignore this fact and let the limits float.
*/
if (getrlimit(RLIMIT_NOFILE, &fdlimit) == -1) {
fdlimit.rlim_cur = semaphores ? __max(MAX_NUM_SEM, MAX_NUM_MQ) : MAX_NUM_MQ;
}
max_num_mq = __min(fdlimit.rlim_cur, MAX_NUM_MQ);
max_num_sem = semaphores ? __min(fdlimit.rlim_cur, MAX_NUM_SEM) : 0;

//Set the system configuration parameters
sysmgr_sysconf_set(0, _SC_MESSAGE_PASSING, _POSIX_MESSAGE_PASSING);
sysmgr_sysconf_set(0, _SC_MQ_OPEN_MAX, max_num_mq);
sysmgr_sysconf_set(0, _SC_MQ_PRIO_MAX, MQ_PRIO_MAX);
if (semaphores) {
sysmgr_sysconf_set(0, _SC_SEMAPHORES, _POSIX_SEMAPHORES);
sysmgr_sysconf_set(0, _SC_SEM_NSEMS_MAX, max_num_sem);
sysmgr_sysconf_set(0, _SC_SEM_VALUE_MAX, SEM_VALUE_MAX);
}

if(!nodaemon) {
procmgr_daemon(EXIT_SUCCESS, PROCMGR_DAEMON_NODEVNULL);
}

// mqueue is single-threaded (define MQUEUE_1_THREAD for optimisation)
ctp = resmgr_context_alloc(dpp);

for(;:wink: {
if((ctp = resmgr_block(ctp)) == NULL) {
return EXIT_FAILURE;
}
resmgr_handler(ctp);
}

return EXIT_SUCCESS;
}

有几个应用程序调用并没有什么区别啊。基本上,你的驱动提供一个文件名(路径名: /dev/mydevice什么的),应用程序来open()就行了。

像mqueue这样,程序的实现部份只有一个线程 (最后的那个 for 循环),那样就更简单了,不需要考虑同步问题(一个应用在write,另一个同时read的情形)。

回到你的问题,ctp只需要在for循环外创建一次就够了。不需要你关心。ocb是每次open()时自动创建的(如果你有io_open,你也可以接管这部份),ocb是用来对应不同的open的,对同一个fd的操作,你会收到同一个ocb.

原来是这样,谢谢唐工的解惑。