diff options
Diffstat (limited to 'fs/ocfs2/cluster')
-rw-r--r-- | fs/ocfs2/cluster/heartbeat.c | 158 | ||||
-rw-r--r-- | fs/ocfs2/cluster/tcp.c | 35 | ||||
-rw-r--r-- | fs/ocfs2/cluster/tcp.h | 6 | ||||
-rw-r--r-- | fs/ocfs2/cluster/tcp_internal.h | 12 |
4 files changed, 74 insertions, 137 deletions
diff --git a/fs/ocfs2/cluster/heartbeat.c b/fs/ocfs2/cluster/heartbeat.c index 277ca67a2ad6..5a9779bb9236 100644 --- a/fs/ocfs2/cluster/heartbeat.c +++ b/fs/ocfs2/cluster/heartbeat.c @@ -184,10 +184,9 @@ static void o2hb_disarm_write_timeout(struct o2hb_region *reg) flush_scheduled_work(); } -static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc, - unsigned int num_ios) +static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc) { - atomic_set(&wc->wc_num_reqs, num_ios); + atomic_set(&wc->wc_num_reqs, 1); init_completion(&wc->wc_io_complete); wc->wc_error = 0; } @@ -212,6 +211,7 @@ static void o2hb_wait_on_io(struct o2hb_region *reg, struct address_space *mapping = reg->hr_bdev->bd_inode->i_mapping; blk_run_address_space(mapping); + o2hb_bio_wait_dec(wc, 1); wait_for_completion(&wc->wc_io_complete); } @@ -231,6 +231,7 @@ static int o2hb_bio_end_io(struct bio *bio, return 1; o2hb_bio_wait_dec(wc, 1); + bio_put(bio); return 0; } @@ -238,23 +239,22 @@ static int o2hb_bio_end_io(struct bio *bio, * start_slot. */ static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg, struct o2hb_bio_wait_ctxt *wc, - unsigned int start_slot, - unsigned int num_slots) + unsigned int *current_slot, + unsigned int max_slots) { - int i, nr_vecs, len, first_page, last_page; + int len, current_page; unsigned int vec_len, vec_start; unsigned int bits = reg->hr_block_bits; unsigned int spp = reg->hr_slots_per_page; + unsigned int cs = *current_slot; struct bio *bio; struct page *page; - nr_vecs = (num_slots + spp - 1) / spp; - /* Testing has shown this allocation to take long enough under * GFP_KERNEL that the local node can get fenced. It would be * nicest if we could pre-allocate these bios and avoid this * all together. */ - bio = bio_alloc(GFP_ATOMIC, nr_vecs); + bio = bio_alloc(GFP_ATOMIC, 16); if (!bio) { mlog(ML_ERROR, "Could not alloc slots BIO!\n"); bio = ERR_PTR(-ENOMEM); @@ -262,137 +262,53 @@ static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg, } /* Must put everything in 512 byte sectors for the bio... */ - bio->bi_sector = (reg->hr_start_block + start_slot) << (bits - 9); + bio->bi_sector = (reg->hr_start_block + cs) << (bits - 9); bio->bi_bdev = reg->hr_bdev; bio->bi_private = wc; bio->bi_end_io = o2hb_bio_end_io; - first_page = start_slot / spp; - last_page = first_page + nr_vecs; - vec_start = (start_slot << bits) % PAGE_CACHE_SIZE; - for(i = first_page; i < last_page; i++) { - page = reg->hr_slot_data[i]; + vec_start = (cs << bits) % PAGE_CACHE_SIZE; + while(cs < max_slots) { + current_page = cs / spp; + page = reg->hr_slot_data[current_page]; - vec_len = PAGE_CACHE_SIZE; - /* last page might be short */ - if (((i + 1) * spp) > (start_slot + num_slots)) - vec_len = ((num_slots + start_slot) % spp) << bits; - vec_len -= vec_start; + vec_len = min(PAGE_CACHE_SIZE, + (max_slots-cs) * (PAGE_CACHE_SIZE/spp) ); mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n", - i, vec_len, vec_start); + current_page, vec_len, vec_start); len = bio_add_page(bio, page, vec_len, vec_start); - if (len != vec_len) { - bio_put(bio); - bio = ERR_PTR(-EIO); - - mlog(ML_ERROR, "Error adding page to bio i = %d, " - "vec_len = %u, len = %d\n, start = %u\n", - i, vec_len, len, vec_start); - goto bail; - } + if (len != vec_len) break; + cs += vec_len / (PAGE_CACHE_SIZE/spp); vec_start = 0; } bail: + *current_slot = cs; return bio; } -/* - * Compute the maximum number of sectors the bdev can handle in one bio, - * as a power of two. - * - * Stolen from oracleasm, thanks Joel! - */ -static int compute_max_sectors(struct block_device *bdev) -{ - int max_pages, max_sectors, pow_two_sectors; - - struct request_queue *q; - - q = bdev_get_queue(bdev); - max_pages = q->max_sectors >> (PAGE_SHIFT - 9); - if (max_pages > BIO_MAX_PAGES) - max_pages = BIO_MAX_PAGES; - if (max_pages > q->max_phys_segments) - max_pages = q->max_phys_segments; - if (max_pages > q->max_hw_segments) - max_pages = q->max_hw_segments; - max_pages--; /* Handle I/Os that straddle a page */ - - if (max_pages) { - max_sectors = max_pages << (PAGE_SHIFT - 9); - } else { - /* If BIO contains 1 or less than 1 page. */ - max_sectors = q->max_sectors; - } - /* Why is fls() 1-based???? */ - pow_two_sectors = 1 << (fls(max_sectors) - 1); - - return pow_two_sectors; -} - -static inline void o2hb_compute_request_limits(struct o2hb_region *reg, - unsigned int num_slots, - unsigned int *num_bios, - unsigned int *slots_per_bio) -{ - unsigned int max_sectors, io_sectors; - - max_sectors = compute_max_sectors(reg->hr_bdev); - - io_sectors = num_slots << (reg->hr_block_bits - 9); - - *num_bios = (io_sectors + max_sectors - 1) / max_sectors; - *slots_per_bio = max_sectors >> (reg->hr_block_bits - 9); - - mlog(ML_HB_BIO, "My io size is %u sectors for %u slots. This " - "device can handle %u sectors of I/O\n", io_sectors, num_slots, - max_sectors); - mlog(ML_HB_BIO, "Will need %u bios holding %u slots each\n", - *num_bios, *slots_per_bio); -} - static int o2hb_read_slots(struct o2hb_region *reg, unsigned int max_slots) { - unsigned int num_bios, slots_per_bio, start_slot, num_slots; - int i, status; + unsigned int current_slot=0; + int status; struct o2hb_bio_wait_ctxt wc; - struct bio **bios; struct bio *bio; - o2hb_compute_request_limits(reg, max_slots, &num_bios, &slots_per_bio); + o2hb_bio_wait_init(&wc); - bios = kcalloc(num_bios, sizeof(struct bio *), GFP_KERNEL); - if (!bios) { - status = -ENOMEM; - mlog_errno(status); - return status; - } - - o2hb_bio_wait_init(&wc, num_bios); - - num_slots = slots_per_bio; - for(i = 0; i < num_bios; i++) { - start_slot = i * slots_per_bio; - - /* adjust num_slots at last bio */ - if (max_slots < (start_slot + num_slots)) - num_slots = max_slots - start_slot; - - bio = o2hb_setup_one_bio(reg, &wc, start_slot, num_slots); + while(current_slot < max_slots) { + bio = o2hb_setup_one_bio(reg, &wc, ¤t_slot, max_slots); if (IS_ERR(bio)) { - o2hb_bio_wait_dec(&wc, num_bios - i); - status = PTR_ERR(bio); mlog_errno(status); goto bail_and_wait; } - bios[i] = bio; + atomic_inc(&wc.wc_num_reqs); submit_bio(READ, bio); } @@ -403,38 +319,30 @@ bail_and_wait: if (wc.wc_error && !status) status = wc.wc_error; - if (bios) { - for(i = 0; i < num_bios; i++) - if (bios[i]) - bio_put(bios[i]); - kfree(bios); - } - return status; } static int o2hb_issue_node_write(struct o2hb_region *reg, - struct bio **write_bio, struct o2hb_bio_wait_ctxt *write_wc) { int status; unsigned int slot; struct bio *bio; - o2hb_bio_wait_init(write_wc, 1); + o2hb_bio_wait_init(write_wc); slot = o2nm_this_node(); - bio = o2hb_setup_one_bio(reg, write_wc, slot, 1); + bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1); if (IS_ERR(bio)) { status = PTR_ERR(bio); mlog_errno(status); goto bail; } + atomic_inc(&write_wc->wc_num_reqs); submit_bio(WRITE, bio); - *write_bio = bio; status = 0; bail: return status; @@ -826,7 +734,6 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg) { int i, ret, highest_node, change = 0; unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)]; - struct bio *write_bio; struct o2hb_bio_wait_ctxt write_wc; ret = o2nm_configured_node_map(configured_nodes, @@ -864,7 +771,7 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg) /* And fire off the write. Note that we don't wait on this I/O * until later. */ - ret = o2hb_issue_node_write(reg, &write_bio, &write_wc); + ret = o2hb_issue_node_write(reg, &write_wc); if (ret < 0) { mlog_errno(ret); return ret; @@ -882,7 +789,6 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg) * people we find in our steady state have seen us. */ o2hb_wait_on_io(reg, &write_wc); - bio_put(write_bio); if (write_wc.wc_error) { /* Do not re-arm the write timeout on I/O error - we * can't be sure that the new block ever made it to @@ -943,7 +849,6 @@ static int o2hb_thread(void *data) { int i, ret; struct o2hb_region *reg = data; - struct bio *write_bio; struct o2hb_bio_wait_ctxt write_wc; struct timeval before_hb, after_hb; unsigned int elapsed_msec; @@ -993,10 +898,9 @@ static int o2hb_thread(void *data) * * XXX: Should we skip this on unclean_stop? */ o2hb_prepare_block(reg, 0); - ret = o2hb_issue_node_write(reg, &write_bio, &write_wc); + ret = o2hb_issue_node_write(reg, &write_wc); if (ret == 0) { o2hb_wait_on_io(reg, &write_wc); - bio_put(write_bio); } else { mlog_errno(ret); } diff --git a/fs/ocfs2/cluster/tcp.c b/fs/ocfs2/cluster/tcp.c index ae4ff4a6636b..1718215fc018 100644 --- a/fs/ocfs2/cluster/tcp.c +++ b/fs/ocfs2/cluster/tcp.c @@ -556,6 +556,8 @@ static void o2net_register_callbacks(struct sock *sk, sk->sk_data_ready = o2net_data_ready; sk->sk_state_change = o2net_state_change; + mutex_init(&sc->sc_send_lock); + write_unlock_bh(&sk->sk_callback_lock); } @@ -688,6 +690,7 @@ static void o2net_handler_put(struct o2net_msg_handler *nmh) * be given to the handler if their payload is longer than the max. */ int o2net_register_handler(u32 msg_type, u32 key, u32 max_len, o2net_msg_handler_func *func, void *data, + o2net_post_msg_handler_func *post_func, struct list_head *unreg_list) { struct o2net_msg_handler *nmh = NULL; @@ -722,6 +725,7 @@ int o2net_register_handler(u32 msg_type, u32 key, u32 max_len, nmh->nh_func = func; nmh->nh_func_data = data; + nmh->nh_post_func = post_func; nmh->nh_msg_type = msg_type; nmh->nh_max_len = max_len; nmh->nh_key = key; @@ -856,10 +860,12 @@ static void o2net_sendpage(struct o2net_sock_container *sc, ssize_t ret; + mutex_lock(&sc->sc_send_lock); ret = sc->sc_sock->ops->sendpage(sc->sc_sock, virt_to_page(kmalloced_virt), (long)kmalloced_virt & ~PAGE_MASK, size, MSG_DONTWAIT); + mutex_unlock(&sc->sc_send_lock); if (ret != size) { mlog(ML_ERROR, "sendpage of size %zu to " SC_NODEF_FMT " failed with %zd\n", size, SC_NODEF_ARGS(sc), ret); @@ -974,8 +980,10 @@ int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *caller_vec, /* finally, convert the message header to network byte-order * and send */ + mutex_lock(&sc->sc_send_lock); ret = o2net_send_tcp_msg(sc->sc_sock, vec, veclen, sizeof(struct o2net_msg) + caller_bytes); + mutex_unlock(&sc->sc_send_lock); msglog(msg, "sending returned %d\n", ret); if (ret < 0) { mlog(0, "error returned from o2net_send_tcp_msg=%d\n", ret); @@ -1049,6 +1057,7 @@ static int o2net_process_message(struct o2net_sock_container *sc, int ret = 0, handler_status; enum o2net_system_error syserr; struct o2net_msg_handler *nmh = NULL; + void *ret_data = NULL; msglog(hdr, "processing message\n"); @@ -1101,17 +1110,26 @@ static int o2net_process_message(struct o2net_sock_container *sc, sc->sc_msg_type = be16_to_cpu(hdr->msg_type); handler_status = (nmh->nh_func)(hdr, sizeof(struct o2net_msg) + be16_to_cpu(hdr->data_len), - nmh->nh_func_data); + nmh->nh_func_data, &ret_data); do_gettimeofday(&sc->sc_tv_func_stop); out_respond: /* this destroys the hdr, so don't use it after this */ + mutex_lock(&sc->sc_send_lock); ret = o2net_send_status_magic(sc->sc_sock, hdr, syserr, handler_status); + mutex_unlock(&sc->sc_send_lock); hdr = NULL; mlog(0, "sending handler status %d, syserr %d returned %d\n", handler_status, syserr, ret); + if (nmh) { + BUG_ON(ret_data != NULL && nmh->nh_post_func == NULL); + if (nmh->nh_post_func) + (nmh->nh_post_func)(handler_status, nmh->nh_func_data, + ret_data); + } + out: if (nmh) o2net_handler_put(nmh); @@ -1795,13 +1813,13 @@ out: ready(sk, bytes); } -static int o2net_open_listening_sock(__be16 port) +static int o2net_open_listening_sock(__be32 addr, __be16 port) { struct socket *sock = NULL; int ret; struct sockaddr_in sin = { .sin_family = PF_INET, - .sin_addr = { .s_addr = (__force u32)htonl(INADDR_ANY) }, + .sin_addr = { .s_addr = (__force u32)addr }, .sin_port = (__force u16)port, }; @@ -1824,15 +1842,15 @@ static int o2net_open_listening_sock(__be16 port) sock->sk->sk_reuse = 1; ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin)); if (ret < 0) { - mlog(ML_ERROR, "unable to bind socket to port %d, ret=%d\n", - ntohs(port), ret); + mlog(ML_ERROR, "unable to bind socket at %u.%u.%u.%u:%u, " + "ret=%d\n", NIPQUAD(addr), ntohs(port), ret); goto out; } ret = sock->ops->listen(sock, 64); if (ret < 0) { - mlog(ML_ERROR, "unable to listen on port %d, ret=%d\n", - ntohs(port), ret); + mlog(ML_ERROR, "unable to listen on %u.%u.%u.%u:%u, ret=%d\n", + NIPQUAD(addr), ntohs(port), ret); } out: @@ -1865,7 +1883,8 @@ int o2net_start_listening(struct o2nm_node *node) return -ENOMEM; /* ? */ } - ret = o2net_open_listening_sock(node->nd_ipv4_port); + ret = o2net_open_listening_sock(node->nd_ipv4_address, + node->nd_ipv4_port); if (ret) { destroy_workqueue(o2net_wq); o2net_wq = NULL; diff --git a/fs/ocfs2/cluster/tcp.h b/fs/ocfs2/cluster/tcp.h index 21a4e43df836..da880fc215f0 100644 --- a/fs/ocfs2/cluster/tcp.h +++ b/fs/ocfs2/cluster/tcp.h @@ -50,7 +50,10 @@ struct o2net_msg __u8 buf[0]; }; -typedef int (o2net_msg_handler_func)(struct o2net_msg *msg, u32 len, void *data); +typedef int (o2net_msg_handler_func)(struct o2net_msg *msg, u32 len, void *data, + void **ret_data); +typedef void (o2net_post_msg_handler_func)(int status, void *data, + void *ret_data); #define O2NET_MAX_PAYLOAD_BYTES (4096 - sizeof(struct o2net_msg)) @@ -99,6 +102,7 @@ int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *vec, int o2net_register_handler(u32 msg_type, u32 key, u32 max_len, o2net_msg_handler_func *func, void *data, + o2net_post_msg_handler_func *post_func, struct list_head *unreg_list); void o2net_unregister_handler_list(struct list_head *list); diff --git a/fs/ocfs2/cluster/tcp_internal.h b/fs/ocfs2/cluster/tcp_internal.h index b700dc9624d1..4dae5df5e467 100644 --- a/fs/ocfs2/cluster/tcp_internal.h +++ b/fs/ocfs2/cluster/tcp_internal.h @@ -38,6 +38,12 @@ * locking semantics of the file system using the protocol. It should * be somewhere else, I'm sure, but right now it isn't. * + * New in version 7: + * - DLM join domain includes the live nodemap + * + * New in version 6: + * - DLM lockres remote refcount fixes. + * * New in version 5: * - Network timeout checking protocol * @@ -51,7 +57,7 @@ * - full 64 bit i_size in the metadata lock lvbs * - introduction of "rw" lock and pushing meta/data locking down */ -#define O2NET_PROTOCOL_VERSION 5ULL +#define O2NET_PROTOCOL_VERSION 7ULL struct o2net_handshake { __be64 protocol_version; __be64 connector_id; @@ -149,6 +155,8 @@ struct o2net_sock_container { struct timeval sc_tv_func_stop; u32 sc_msg_key; u16 sc_msg_type; + + struct mutex sc_send_lock; }; struct o2net_msg_handler { @@ -158,6 +166,8 @@ struct o2net_msg_handler { u32 nh_key; o2net_msg_handler_func *nh_func; o2net_msg_handler_func *nh_func_data; + o2net_post_msg_handler_func + *nh_post_func; struct kref nh_kref; struct list_head nh_unregister_item; }; |