资源管理器还真难写。。
accept出来的句柄一close就会问题。出什么问题也看不懂。。
就是客户端再连一个就会被挂起。。
不把accept出来的句柄close的话,还能多接受几个客户端,但超过10几个就会崩溃。。。
都搞晕了。。。
不知道这个多线程资源管理器有什么问题。。
int my_io_open (resmgr_context_t *ctp, io_open_t *io,
RESMGR_HANDLE_T *h, void *v)
{
int ret;
io_socket_extra_t *ext = v;
int fd;
ret = iofunc_open_default(ctp,io,h,v);
if (ret != 0) {
printf("fail default open for pid:%d,coid:%d, strerror:%s\n", ctp->info.pid, ctp->info.coid,strerror(errno));
return ret;
}
fd = my_open(,ext->type, ext->protocol);
if (fd != 0) {
printf("fail my open for pid:%d,coid:%d, strerror:%s\n", ctp->info.pid, ctp->info.coid,strerror(errno));
return ret;
}
my_fd_map(fd, ctp->info.pid, ctp->info.coid);
return _RESMGR_ERRNO(EOK);
}
int my_io_openfd(resmgr_context_t *ctp, io_openfd_t *msg, RESMGR_OCB_T *ocb)
{
int listen_fd;
int child_fd;
int len;
int ret;
struct sockaddr_in *addr;
int type;
switch (msg->i.xtype) {
case _IO_OPENFD_ACCEPT:
listen_fd = my_lookup_fd(ctp->info.pid, ctp->info.coid);
if (listen_fd < 0) {
printf("Can't find socket for pid:%d,coid:%d\n", ctp->info.pid, ctp->info.coid);
return _RESMGR_ERRNO(EBADF);
}
ret = iofunc_openfd(ctp, msg, ocb, ocb->attr);
if (ret != EOK) {
printf("iofunc_openfd ret:%d,str:%s\n",ret,strerror(errno));
return _RESMGR_ERRNO(ret);
}
len = ctp->msg_max_size - ((char *)msg - (char *)ctp->msg);
addr = (struct sockaddr_in *)msg;
iofunc_attr_unlock(ocb->attr);
child_fd = my_accept(listen_fd, addr, &len);
printf("my_accept return child_fd:%d, strerror:%s.\n", child_fd, strerror(errno));
iofunc_attr_lock(ocb->attr);
_RESMGR_STATUS(ctp, len);
my_fd_map(child_fd, ctp->info.pid, ctp->info.coid);
return _RESMGR_ERRNO(errno);
}
return _RESMGR_ERRNO(EINVAL);
}
int
my_io_close(resmgr_context_t *ctp, void *msg, RESMGR_OCB_T *ocb)
{
int i;
iofunc_notify_t **notify;
int type, handle;
int ret;
fd = my_lookup_fd(ctp->info.pid, ctp->info.coid);
if (ret < 0) {
printf("Can't find socket for pid:%d,coid:%d\n", ctp->info.pid, ctp->info.coid);
return _RESMGR_ERRNO(EBADF);
}
my_notify_remove(ctp, fd);
my_close_fd(fd);
iofunc_close_ocb(ctp, ocb, ocb->attr);
return 0;
}
int main( int argc, char **argv )
{
resmgr_connect_funcs_t ConnectFuncs;
resmgr_io_funcs_t IoFuncs;
iofunc_attr_t IoFuncAttr;
thread_pool_attr_t pool_attr;
thread_pool_t *tpp;
resmgr_attr_t resmgr_attr;
dispatch_t *dpp;
dispatch_context_t *ctp, *ctp_ret;
int resmgr_id;
dpp = dispatch_create();
if( dpp == NULL )
{
fprintf( stderr, "dispatch_create() failed: %s\n", strerror( errno ) );
return EXIT_FAILURE;
}
memset( &resmgr_attr, 0, sizeof( resmgr_attr ) );
resmgr_attr.nparts_max = 5;
resmgr_attr.msg_max_size = 2048;
/* Setup the default I/O functions to handle open/read/write/... */
iofunc_func_init( _RESMGR_CONNECT_NFUNCS, &ConnectFuncs,
_RESMGR_IO_NFUNCS, &IoFuncs );
ConnectFuncs.open = my_io_open;
IoFuncs.read = my_io_read;
IoFuncs.write = my_io_write;
IoFuncs.close_ocb = my_io_close;
IoFuncs.devctl = my_io_devctl;
IoFuncs.openfd = my_io_openfd;
IoFuncs.notify = my_io_notify;
/* Setup the attribute for the entry in the filesystem */
iofunc_attr_init( &IoFuncAttr, S_IFNAM | 0666, 0, 0 );
printf("hello\n");
resmgr_id = resmgr_attach( dpp, &resmgr_attr, "/dev/socket/4", _FTYPE_SOCKET, _RESMGR_FLAG_SELF,
&ConnectFuncs, &IoFuncs, &IoFuncAttr );
if( resmgr_id == -1 )
{
fprintf( stderr, "resmgr_attach() failed: %s\n", strerror( errno ) );
return EXIT_FAILURE;
}
/* initialize thread pool attributes */
memset(&pool_attr, 0, sizeof pool_attr);
pool_attr.handle = dpp;
pool_attr.context_alloc = dispatch_context_alloc;
pool_attr.block_func = dispatch_block;
pool_attr.unblock_func = dispatch_unblock;
pool_attr.handler_func = dispatch_handler;
pool_attr.context_free = dispatch_context_free;
pool_attr.lo_water = 2;
pool_attr.hi_water = 10;
pool_attr.increment = 1;
pool_attr.maximum = 50;
/* allocate a thread pool handle */
if((tpp = thread_pool_create(&pool_attr,
POOL_FLAG_EXIT_SELF)) == NULL) {
return EXIT_FAILURE;
}
/* start the threads, will not return */
thread_pool_start(tpp);
}