SPDKライトのメモリコピー

投稿者: | 2023年3月21日

ソケットからのデータ受信

SPDKは、POSIXのreadvでソケットからデータを読み取る。readv() システムコールは、ファイルディスクリプター fd に関連付けられた ファイルから、 iovcnt 個のバッファー分のデータを読み込み、 iov で指定 されたバッファーに格納する (“scatter input”;「ばらまき入力」)。ここでは2個分のデータを読みとる。

static inline ssize_t
posix_sock_read(struct spdk_posix_sock *sock)
{
    struct iovec iov[2];
    int bytes_avail, bytes_recvd;
    struct spdk_posix_sock_group_impl *group;

    bytes_avail = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);

    if (bytes_avail <= 0) {
        return bytes_avail;
    }

    if (sock->ssl) {
        bytes_recvd = SSL_readv(sock->ssl, iov, 2);
    } else {
        bytes_recvd = readv(sock->fd, iov, 2);★
    }

    assert(sock->pipe_has_data == false);

    if (bytes_recvd <= 0) {
        /* Errors count as draining the socket data */
        if (sock->base.group_impl && sock->socket_has_data) {
            group = __posix_group_impl(sock->base.group_impl);
            TAILQ_REMOVE(&group->socks_with_data, sock, link);
        }

        sock->socket_has_data = false;

        return bytes_recvd;
    }

    spdk_pipe_writer_advance(sock->recv_pipe, bytes_recvd);

#if DEBUG
    if (sock->base.group_impl) {
        assert(sock->socket_has_data == true);
    }
#endif

    sock->pipe_has_data = true;
    if (bytes_recvd < bytes_avail) {
        /* We drained the kernel socket entirely. */
        sock->socket_has_data = false;
    }

    return bytes_recvd;
}

ネットワークのソフトウェアスタックについては多くの公知情報がある。一つのサイトでうまく説明できていないため、その中でうまく説明できていると思われる2つを紹介する。

Packet queueing in the Linux network stack. | Download Scientific Diagram (researchgate.net)

NICはTCPパケットを受信した場合、NICドライバが確保したメモリ上のRX RingバッファにDMA転送する。上位アプリのSPKDはreadシステムコールで、RX Ringバッファを読み取る。

Linux network ring buffers. Trying to cover what I don’t know about… | by Tungdam | Coccoc Engineering Blog | Medium

RX buffer

RX ring bufferは、NICが受信したパケットを保持するFIFO型の循環バッファである。Linuxカーネルはksoftirq->net_rx_actionハンドラにより、RX ring buffer(rx ring)に到着したパケットを処理する。このハンドラはドライバ固有のポーリング関数を呼び出す。(例: igb_poll)。

  1. NICは到着した受信パケットをDMAでメモリ上のrx ringに保存する(一般的にホストのCPUは関与しない)。メモリにコピーされた受信データはsk buffsでパケットごとに管理する。
  2. 受信データのメモリへのデータ転送後、NICはCPUにハードウェア割り込み送信。
  3. ハードウェア割込みを受信したCPUはその割り込みを解除し(トップハーフ)、ソフト割込み(NET_RX_SOFTIRQ)をksoftirqdに送信(ボトムハーフ)。これにより、NICからの別パケットのハード割込み処理と、CPU上のソフト割込を処理を並列処理できるようになる。その間にksoftirqdはバックグラウンドでRXバッファからデータを消費し始め、プロセス全体をブロックしない。

TCPlinux.pdf (sn0rt.github.io)

readv


ソケットのデータ受信はsockfsを介して、通常のファイルと同様のreadvにより行われる。readvの実体はsocket_file_opsに定義されたsock_read_iter で、内部はsock_read_iter -> sock_recvmsg -> tcp_recvmsgとなっている。

hiboma/sockfs.md at master · hiboma/hiboma · GitHub

static ssize_t sock_read_iter(struct kiocb *iocb, struct iov_iter *to)
{
    struct file *file = iocb->ki_filp;
    struct socket *sock = file->private_data;
    struct msghdr msg = {.msg_iter = *to,
                 .msg_iocb = iocb};
    ssize_t res;

    if (file->f_flags & O_NONBLOCK || (iocb->ki_flags & IOCB_NOWAIT))
        msg.msg_flags = MSG_DONTWAIT;

    if (iocb->ki_pos != 0)
        return -ESPIPE;

    if (!iov_iter_count(to))    /* Match SYS5 behaviour */
        return 0;

    res = sock_recvmsg(sock, &msg, msg.msg_flags);★
    *to = msg.msg_iter;
    return res;
}
/**
 *  sock_recvmsg - receive a message from @sock
 *  @sock: socket
 *  @msg: message to receive
 *  @flags: message flags
 *
 *  Receives @msg from @sock, passing through LSM. Returns the total number
 *  of bytes received, or an error.
 */
int sock_recvmsg(struct socket *sock, struct msghdr *msg, int flags)
{
    int err = security_socket_recvmsg(sock, msg, msg_data_left(msg), flags);

    return err ?: sock_recvmsg_nosec(sock, msg, flags);
}
EXPORT_SYMBOL(sock_recvmsg);

static inline int sock_recvmsg_nosec(struct socket *sock, struct msghdr *msg,
                     int flags)
{
    int ret = INDIRECT_CALL_INET(sock->ops->recvmsg, inet6_recvmsg,
                     inet_recvmsg, sock, msg,
                     msg_data_left(msg), flags);
    if (trace_sock_recv_length_enabled())
        call_trace_sock_recv_length(sock->sk, ret, flags);
    return ret;
}

sk_buff

tcp_recvmsg内でsock構造体から、ユーザ空間へのメモリコピーを行う。

Linuxのソケットはstruct socket構造体とstruct sock構造体が実体となる。個々の socket データ構造体は、BSD ソケットに関する情報(状態、種別)とstruct sockへの参照を保持する。struct sockは、BSD ソケットに関するプロトコル固有の 情報を保持し、INET(Internet Address Domain)ソケット の場合、struct sockは、TCP/IP と UDP/IP 固有の情報を保持する。また、sk_bufferのキュー構造(sk_buffer_head)もstruct sockに紐づけられ、受信パケットをポイントしたsk_buffデータ構造体を格納するsk_receive_queueも、struct sockが保持する。

sk_buffはパケットデータを格納するバッファ。各パケットは、それぞれsk_buffに格納されLinuxのネットワークレイヤで扱われる。struct sk_buffはバッファの管理構造体である。実際にパケットデータを格納する領域が別にあり、sk_buffのhead,data,tail,endでデータの位置を管理する。

sk_buffは階層間でデータ受け渡しが行われる際のメモリコピーを避け、複数の処理でデータを共有する。NICからうけとったパケットはヘッダーを解析しながら、イーサ層、IP層、TCP層と渡っていく。sk_buff構造体は、各ヘッダにアクセスできるように各層のヘッダへのポインタを保持してる。

sk_buff – Linuxカーネルメモ (bit-hive.com)

tcp_recvmsg

アプリケーションからのデータ受信処理要求は、socketレイヤを経由しinet_recvmsg関数を呼び出す(struct proto_ops inet_stream_opsインターフェイステーブル経由)ことにより実現される。inet_recvmsg関数は、即tcp_recvmsg関数を呼び出す。(struct proto tcp_protインターフェイステーブル経由)。

tcp_recvmsg関数は、ソケットのreceive_queueにリンクされているパケットのデータをユーザ空間にコピーし、パケットを解放する。もしreceive_queueにデータが無い場合は、パケット到着を待つ。

internal22-282-受信処理アルゴリズム – Linux Kernel Documents Wiki – Linux Kernel Documents – OSDN

int tcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, int flags,
        int *addr_len)
{
    int cmsg_flags = 0, ret;
    struct scm_timestamping_internal tss;

    if (unlikely(flags & MSG_ERRQUEUE))
        return inet_recv_error(sk, msg, len, addr_len);

    if (sk_can_busy_loop(sk) &&
        skb_queue_empty_lockless(&sk->sk_receive_queue) &&
        sk->sk_state == TCP_ESTABLISHED)
        sk_busy_loop(sk, flags & MSG_DONTWAIT);

    lock_sock(sk);
    ret = tcp_recvmsg_locked(sk, msg, len, flags, &tss, &cmsg_flags);★recvmsg本体
    release_sock(sk);

    if ((cmsg_flags || msg->msg_get_inq) && ret >= 0) {
        if (cmsg_flags & TCP_CMSG_TS)
            tcp_recv_timestamp(msg, sk, &tss);
        if (msg->msg_get_inq) {
            msg->msg_inq = tcp_inq_hint(sk);
            if (cmsg_flags & TCP_CMSG_INQ)
                put_cmsg(msg, SOL_TCP, TCP_CM_INQ,
                     sizeof(msg->msg_inq), &msg->msg_inq);
        }
    }
    return ret;
}
EXPORT_SYMBOL(tcp_recvmsg);

/*
 *  This routine copies from a sock struct into the user buffer.
 *
 *  Technical note: in 2.3 we work on _locked_ socket, so that
 *  tricks with *seq access order and skb->users are not required.
 *  Probably, code can be easily improved even more.
 */

static int tcp_recvmsg_locked(struct sock *sk, struct msghdr *msg, size_t len,
                  int flags, struct scm_timestamping_internal *tss,
                  int *cmsg_flags)
{
    struct tcp_sock *tp = tcp_sk(sk);
    int copied = 0;
    u32 peek_seq;
    u32 *seq;
    unsigned long used;
    int err;
    int target;     /* Read at least this many bytes */
    long timeo;
    struct sk_buff *skb, *last;
    u32 urg_hole = 0;

    err = -ENOTCONN;
    if (sk->sk_state == TCP_LISTEN)
        goto out;

    if (tp->recvmsg_inq) {
        *cmsg_flags = TCP_CMSG_INQ;
        msg->msg_get_inq = 1;
    }
    timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);

    /* Urgent data needs to be handled specially. */
    if (flags & MSG_OOB)
        goto recv_urg;

    if (unlikely(tp->repair)) {
        err = -EPERM;
        if (!(flags & MSG_PEEK))
            goto out;

        if (tp->repair_queue == TCP_SEND_QUEUE)
            goto recv_sndq;

        err = -EINVAL;
        if (tp->repair_queue == TCP_NO_QUEUE)
            goto out;

        /* 'common' recv queue MSG_PEEK-ing */
    }

    seq = &tp->copied_seq;
    if (flags & MSG_PEEK) {
        peek_seq = tp->copied_seq;
        seq = &peek_seq;
    }

    target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);

    do {
        u32 offset;

        /* Are we at urgent data? Stop if we have read anything or have SIGURG pending. */
        if (unlikely(tp->urg_data) && tp->urg_seq == *seq) {
            if (copied)
                break;
            if (signal_pending(current)) {
                copied = timeo ? sock_intr_errno(timeo) : -EAGAIN;
                break;
            }
        }

        /* Next get a buffer. */

        last = skb_peek_tail(&sk->sk_receive_queue);★ソケットのreceive_queueの先頭から順にパケット内を参照
        skb_queue_walk(&sk->sk_receive_queue, skb) {
            last = skb;
            /* Now that we have two receive queues this
             * shouldn't happen.
             */
            if (WARN(before(*seq, TCP_SKB_CB(skb)->seq),
                 "TCP recvmsg seq # bug: copied %X, seq %X, rcvnxt %X, fl %X\n",
                 *seq, TCP_SKB_CB(skb)->seq, tp->rcv_nxt,
                 flags))
                break;

            offset = *seq - TCP_SKB_CB(skb)->seq;
            if (unlikely(TCP_SKB_CB(skb)->tcp_flags & TCPHDR_SYN)) {
                pr_err_once("%s: found a SYN, please report !\n", __func__);
                offset--;
            }
            if (offset < skb->len)
                goto found_ok_skb;
            if (TCP_SKB_CB(skb)->tcp_flags & TCPHDR_FIN)
                goto found_fin_ok;
            WARN(!(flags & MSG_PEEK),
                 "TCP recvmsg seq # bug 2: copied %X, seq %X, rcvnxt %X, fl %X\n",
                 *seq, TCP_SKB_CB(skb)->seq, tp->rcv_nxt, flags);
        }

        /* Well, if we have backlog, try to process it now yet. */

        if (copied >= target && !READ_ONCE(sk->sk_backlog.tail))
            break;

        if (copied) {
            if (!timeo ||
                sk->sk_err ||
                sk->sk_state == TCP_CLOSE ||
                (sk->sk_shutdown & RCV_SHUTDOWN) ||
                signal_pending(current))
                break;
        } else {
            if (sock_flag(sk, SOCK_DONE))
                break;

            if (sk->sk_err) {
                copied = sock_error(sk);
                break;
            }

            if (sk->sk_shutdown & RCV_SHUTDOWN)
                break;

            if (sk->sk_state == TCP_CLOSE) {
                /* This occurs when user tries to read
                 * from never connected socket.
                 */
                copied = -ENOTCONN;
                break;
            }

            if (!timeo) {
                copied = -EAGAIN;
                break;
            }

            if (signal_pending(current)) {
                copied = sock_intr_errno(timeo);
                break;
            }
        }

        if (copied >= target) {
            /* Do not sleep, just process backlog. */
            __sk_flush_backlog(sk);
        } else {
            tcp_cleanup_rbuf(sk, copied);
            sk_wait_data(sk, &timeo, last);
        }

        if ((flags & MSG_PEEK) &&
            (peek_seq - copied - urg_hole != tp->copied_seq)) {
            net_dbg_ratelimited("TCP(%s:%d): Application bug, race in MSG_PEEK\n",
                        current->comm,
                        task_pid_nr(current));
            peek_seq = tp->copied_seq;
        }
        continue;

found_ok_skb:
        /* Ok so how much can we use? */
        used = skb->len - offset;
        if (len < used)
            used = len;

        /* Do we have urgent data here? */
        if (unlikely(tp->urg_data)) {
            u32 urg_offset = tp->urg_seq - *seq;
            if (urg_offset < used) {
                if (!urg_offset) {
                    if (!sock_flag(sk, SOCK_URGINLINE)) {
                        WRITE_ONCE(*seq, *seq + 1);
                        urg_hole++;
                        offset++;
                        used--;
                        if (!used)
                            goto skip_copy;
                    }
                } else
                    used = urg_offset;
            }
        }

        if (!(flags & MSG_TRUNC)) {
            err = skb_copy_datagram_msg(skb, offset, msg, used);★パケット内のデータをユーザ空間にコピー
            if (err) {
                /* Exception. Bailout! */
                if (!copied)
                    copied = -EFAULT;
                break;
            }
        }

        WRITE_ONCE(*seq, *seq + used);
        copied += used;
        len -= used;

        tcp_rcv_space_adjust(sk);

skip_copy:
        if (unlikely(tp->urg_data) && after(tp->copied_seq, tp->urg_seq)) {
            WRITE_ONCE(tp->urg_data, 0);
            tcp_fast_path_check(sk);
        }

        if (TCP_SKB_CB(skb)->has_rxtstamp) {
            tcp_update_recv_tstamps(skb, tss);
            *cmsg_flags |= TCP_CMSG_TS;
        }

        if (used + offset < skb->len)
            continue;

        if (TCP_SKB_CB(skb)->tcp_flags & TCPHDR_FIN)
            goto found_fin_ok;
        if (!(flags & MSG_PEEK))
            tcp_eat_recv_skb(sk, skb);
        continue;

found_fin_ok:
        /* Process the FIN. */
        WRITE_ONCE(*seq, *seq + 1);
        if (!(flags & MSG_PEEK))
            tcp_eat_recv_skb(sk, skb);
        break;
    } while (len > 0);

    /* According to UNIX98, msg_name/msg_namelen are ignored
     * on connected socket. I was just happy when found this 8) --ANK
     */

    /* Clean up data we have read: This will do ACK frames. */
    tcp_cleanup_rbuf(sk, copied);
    return copied;

out:
    return err;

recv_urg:
    err = tcp_recv_urg(sk, msg, len, flags);
    goto out;

recv_sndq:
    err = tcp_peek_sndq(sk, msg, len);
    goto out;
}

/**
 *  struct socket - general BSD socket
 *  @state: socket state (%SS_CONNECTED, etc)
 *  @type: socket type (%SOCK_STREAM, etc)
 *  @flags: socket flags (%SOCK_NOSPACE, etc)
 *  @ops: protocol specific socket operations
 *  @file: File back pointer for gc
 *  @sk: internal networking protocol agnostic socket representation
 *  @wq: wait queue for several uses
 */
struct socket {
    socket_state        state;

    short           type;

    unsigned long       flags;

    struct file     *file;
    struct sock     *sk;
    const struct proto_ops  *ops;

    struct socket_wq    wq;
};

struct sock {
	/*
	 * Now struct inet_timewait_sock also uses sock_common, so please just
	 * don't add nothing before this first member (__sk_common) --acme
	 */
	struct sock_common	__sk_common;
#define sk_node			__sk_common.skc_node
#define sk_nulls_node		__sk_common.skc_nulls_node
#define sk_refcnt		__sk_common.skc_refcnt
#define sk_tx_queue_mapping	__sk_common.skc_tx_queue_mapping
#ifdef CONFIG_SOCK_RX_QUEUE_MAPPING
#define sk_rx_queue_mapping	__sk_common.skc_rx_queue_mapping
#endif

#define sk_dontcopy_begin	__sk_common.skc_dontcopy_begin
#define sk_dontcopy_end		__sk_common.skc_dontcopy_end
#define sk_hash			__sk_common.skc_hash
#define sk_portpair		__sk_common.skc_portpair
#define sk_num			__sk_common.skc_num
#define sk_dport		__sk_common.skc_dport
#define sk_addrpair		__sk_common.skc_addrpair
#define sk_daddr		__sk_common.skc_daddr
#define sk_rcv_saddr		__sk_common.skc_rcv_saddr
#define sk_family		__sk_common.skc_family
#define sk_state		__sk_common.skc_state
#define sk_reuse		__sk_common.skc_reuse
#define sk_reuseport		__sk_common.skc_reuseport
#define sk_ipv6only		__sk_common.skc_ipv6only
#define sk_net_refcnt		__sk_common.skc_net_refcnt
#define sk_bound_dev_if		__sk_common.skc_bound_dev_if
#define sk_bind_node		__sk_common.skc_bind_node
#define sk_prot			__sk_common.skc_prot
#define sk_net			__sk_common.skc_net
#define sk_v6_daddr		__sk_common.skc_v6_daddr
#define sk_v6_rcv_saddr	__sk_common.skc_v6_rcv_saddr
#define sk_cookie		__sk_common.skc_cookie
#define sk_incoming_cpu		__sk_common.skc_incoming_cpu
#define sk_flags		__sk_common.skc_flags
#define sk_rxhash		__sk_common.skc_rxhash

	/* early demux fields */
	struct dst_entry __rcu	*sk_rx_dst;
	int			sk_rx_dst_ifindex;
	u32			sk_rx_dst_cookie;

	socket_lock_t		sk_lock;
	atomic_t		sk_drops;
	int			sk_rcvlowat;
	struct sk_buff_head	sk_error_queue;
	struct sk_buff_head	sk_receive_queue;
	/*
	 * The backlog queue is special, it is always used with
	 * the per-socket spinlock held and requires low latency
	 * access. Therefore we special case it's implementation.
	 * Note : rmem_alloc is in this structure to fill a hole
	 * on 64bit arches, not because its logically part of
	 * backlog.
	 */
	struct {
		atomic_t	rmem_alloc;
		int		len;
		struct sk_buff	*head;
		struct sk_buff	*tail;
	} sk_backlog;

#define sk_rmem_alloc sk_backlog.rmem_alloc

	int			sk_forward_alloc;
	u32			sk_reserved_mem;
#ifdef CONFIG_NET_RX_BUSY_POLL
	unsigned int		sk_ll_usec;
	/* ===== mostly read cache line ===== */
	unsigned int		sk_napi_id;
#endif
	int			sk_rcvbuf;

	struct sk_filter __rcu	*sk_filter;
	union {
		struct socket_wq __rcu	*sk_wq;
		/* private: */
		struct socket_wq	*sk_wq_raw;
		/* public: */
	};
#ifdef CONFIG_XFRM
	struct xfrm_policy __rcu *sk_policy[2];
#endif

	struct dst_entry __rcu	*sk_dst_cache;
	atomic_t		sk_omem_alloc;
	int			sk_sndbuf;

	/* ===== cache line for TX ===== */
	int			sk_wmem_queued;
	refcount_t		sk_wmem_alloc;
	unsigned long		sk_tsq_flags;
	union {
		struct sk_buff	*sk_send_head;
		struct rb_root	tcp_rtx_queue;
	};
	struct sk_buff_head	sk_write_queue;
	__s32			sk_peek_off;
	int			sk_write_pending;
	__u32			sk_dst_pending_confirm;
	u32			sk_pacing_status; /* see enum sk_pacing */
	long			sk_sndtimeo;
	struct timer_list	sk_timer;
	__u32			sk_priority;
	__u32			sk_mark;
	unsigned long		sk_pacing_rate; /* bytes per second */
	unsigned long		sk_max_pacing_rate;
	struct page_frag	sk_frag;
	netdev_features_t	sk_route_caps;
	int			sk_gso_type;
	unsigned int		sk_gso_max_size;
	gfp_t			sk_allocation;
	__u32			sk_txhash;

	/*
	 * Because of non atomicity rules, all
	 * changes are protected by socket lock.
	 */
	u8			sk_gso_disabled : 1,
				sk_kern_sock : 1,
				sk_no_check_tx : 1,
				sk_no_check_rx : 1,
				sk_userlocks : 4;
	u8			sk_pacing_shift;
	u16			sk_type;
	u16			sk_protocol;
	u16			sk_gso_max_segs;
	unsigned long	        sk_lingertime;
	struct proto		*sk_prot_creator;
	rwlock_t		sk_callback_lock;
	int			sk_err,
				sk_err_soft;
	u32			sk_ack_backlog;
	u32			sk_max_ack_backlog;
	kuid_t			sk_uid;
	u8			sk_txrehash;
#ifdef CONFIG_NET_RX_BUSY_POLL
	u8			sk_prefer_busy_poll;
	u16			sk_busy_poll_budget;
#endif
	spinlock_t		sk_peer_lock;
	int			sk_bind_phc;
	struct pid		*sk_peer_pid;
	const struct cred	*sk_peer_cred;

	long			sk_rcvtimeo;
	ktime_t			sk_stamp;
#if BITS_PER_LONG==32
	seqlock_t		sk_stamp_seq;
#endif
	atomic_t		sk_tskey;
	atomic_t		sk_zckey;
	u32			sk_tsflags;
	u8			sk_shutdown;

	u8			sk_clockid;
	u8			sk_txtime_deadline_mode : 1,
				sk_txtime_report_errors : 1,
				sk_txtime_unused : 6;
	bool			sk_use_task_frag;

	struct socket		*sk_socket;
	void			*sk_user_data;
#ifdef CONFIG_SECURITY
	void			*sk_security;
#endif
	struct sock_cgroup_data	sk_cgrp_data;
	struct mem_cgroup	*sk_memcg;
	void			(*sk_state_change)(struct sock *sk);
	void			(*sk_data_ready)(struct sock *sk);
	void			(*sk_write_space)(struct sock *sk);
	void			(*sk_error_report)(struct sock *sk);
	int			(*sk_backlog_rcv)(struct sock *sk,
						  struct sk_buff *skb);
#ifdef CONFIG_SOCK_VALIDATE_XMIT
	struct sk_buff*		(*sk_validate_xmit_skb)(struct sock *sk,
							struct net_device *dev,
							struct sk_buff *skb);
#endif
	void                    (*sk_destruct)(struct sock *sk);
	struct sock_reuseport __rcu	*sk_reuseport_cb;
#ifdef CONFIG_BPF_SYSCALL
	struct bpf_local_storage __rcu	*sk_bpf_storage;
#endif
	struct rcu_head		sk_rcu;
	netns_tracker		ns_tracker;
	struct hlist_node	sk_bind2_node;
};

/**
 *  struct sock_common - minimal network layer representation of sockets
 *  @skc_daddr: Foreign IPv4 addr
 *  @skc_rcv_saddr: Bound local IPv4 addr
 *  @skc_addrpair: 8-byte-aligned __u64 union of @skc_daddr & @skc_rcv_saddr
 *  @skc_hash: hash value used with various protocol lookup tables
 *  @skc_u16hashes: two u16 hash values used by UDP lookup tables
 *  @skc_dport: placeholder for inet_dport/tw_dport
 *  @skc_num: placeholder for inet_num/tw_num
 *  @skc_portpair: __u32 union of @skc_dport & @skc_num
 *  @skc_family: network address family
 *  @skc_state: Connection state
 *  @skc_reuse: %SO_REUSEADDR setting
 *  @skc_reuseport: %SO_REUSEPORT setting
 *  @skc_ipv6only: socket is IPV6 only
 *  @skc_net_refcnt: socket is using net ref counting
 *  @skc_bound_dev_if: bound device index if != 0
 *  @skc_bind_node: bind hash linkage for various protocol lookup tables
 *  @skc_portaddr_node: second hash linkage for UDP/UDP-Lite protocol
 *  @skc_prot: protocol handlers inside a network family
 *  @skc_net: reference to the network namespace of this socket
 *  @skc_v6_daddr: IPV6 destination address
 *  @skc_v6_rcv_saddr: IPV6 source address
 *  @skc_cookie: socket's cookie value
 *  @skc_node: main hash linkage for various protocol lookup tables
 *  @skc_nulls_node: main hash linkage for TCP/UDP/UDP-Lite protocol
 *  @skc_tx_queue_mapping: tx queue number for this connection
 *  @skc_rx_queue_mapping: rx queue number for this connection
 *  @skc_flags: place holder for sk_flags
 *      %SO_LINGER (l_onoff), %SO_BROADCAST, %SO_KEEPALIVE,
 *      %SO_OOBINLINE settings, %SO_TIMESTAMPING settings
 *  @skc_listener: connection request listener socket (aka rsk_listener)
 *      [union with @skc_flags]
 *  @skc_tw_dr: (aka tw_dr) ptr to &struct inet_timewait_death_row
 *      [union with @skc_flags]
 *  @skc_incoming_cpu: record/match cpu processing incoming packets
 *  @skc_rcv_wnd: (aka rsk_rcv_wnd) TCP receive window size (possibly scaled)
 *      [union with @skc_incoming_cpu]
 *  @skc_tw_rcv_nxt: (aka tw_rcv_nxt) TCP window next expected seq number
 *      [union with @skc_incoming_cpu]
 *  @skc_refcnt: reference count
 *
 *  This is the minimal network layer representation of sockets, the header
 *  for struct sock and struct inet_timewait_sock.
 */
struct sock_common {
    union {
        __addrpair  skc_addrpair;
        struct {
            __be32  skc_daddr;
            __be32  skc_rcv_saddr;
        };
    };
    union  {
        unsigned int    skc_hash;
        __u16       skc_u16hashes[2];
    };
    /* skc_dport && skc_num must be grouped as well */
    union {
        __portpair  skc_portpair;
        struct {
            __be16  skc_dport;
            __u16   skc_num;
        };
    };

    unsigned short      skc_family;
    volatile unsigned char  skc_state;
    unsigned char       skc_reuse:4;
    unsigned char       skc_reuseport:1;
    unsigned char       skc_ipv6only:1;
    unsigned char       skc_net_refcnt:1;
    int         skc_bound_dev_if;
    union {
        struct hlist_node   skc_bind_node;
        struct hlist_node   skc_portaddr_node;
    };
    struct proto        *skc_prot;
    possible_net_t      skc_net;

#if IS_ENABLED(CONFIG_IPV6)
    struct in6_addr     skc_v6_daddr;
    struct in6_addr     skc_v6_rcv_saddr;
#endif

    atomic64_t      skc_cookie;

    /* following fields are padding to force
     * offset(struct sock, sk_refcnt) == 128 on 64bit arches
     * assuming IPV6 is enabled. We use this padding differently
     * for different kind of 'sockets'
     */
    union {
        unsigned long   skc_flags;
        struct sock *skc_listener; /* request_sock */
        struct inet_timewait_death_row *skc_tw_dr; /* inet_timewait_sock */
    };
    /*
     * fields between dontcopy_begin/dontcopy_end
     * are not copied in sock_copy()
     */
    /* private: */
    int         skc_dontcopy_begin[0];
    /* public: */
    union {
        struct hlist_node   skc_node;
        struct hlist_nulls_node skc_nulls_node;
    };
    unsigned short      skc_tx_queue_mapping;
#ifdef CONFIG_SOCK_RX_QUEUE_MAPPING
    unsigned short      skc_rx_queue_mapping;
#endif
    union {
        int     skc_incoming_cpu;
        u32     skc_rcv_wnd;
        u32     skc_tw_rcv_nxt; /* struct tcp_timewait_sock  */
    };

    refcount_t      skc_refcnt;
    /* private: */
    int                     skc_dontcopy_end[0];
    union {
        u32     skc_rxhash;
        u32     skc_window_clamp;
        u32     skc_tw_snd_nxt; /* struct tcp_timewait_sock */
    };
    /* public: */
};

SPDKの動作

バックトレース

Thread 1 "reactor_0" hit Breakpoint 2, __GI___readv (fd=73, iov=0x7fffffffda70, iovcnt=2) at ../sysdeps/unix/sysv/linux/readv.c:25
25      ../sysdeps/unix/sysv/linux/readv.c: No such file or directory.
(gdb) bt
#0  __GI___readv (fd=73, iov=0x7fffffffda70, iovcnt=2) at ../sysdeps/unix/sysv/linux/readv.c:25
#1  0x0000555555626d84 in posix_sock_read (sock=0x555555ed5090) at posix.c:1401
#2  0x00005555556270d6 in posix_sock_readv (_sock=0x555555ed5090, iov=0x7fffffffdb10, iovcnt=1) at posix.c:1476
#3  0x000055555562713e in posix_sock_recv (sock=0x555555ed5090, buf=0x2000366f43c8, len=8) at posix.c:1493
#4  0x00005555556e58b0 in spdk_sock_recv (sock=0x555555ed5090, buf=0x2000366f43c8, len=8) at sock.c:458
#5  0x000055555564e294 in nvme_tcp_read_data (sock=0x555555ed5090, bytes=8, buf=0x2000366f43c8)
    at /home/takayuki/repos/spdk/include/spdk_internal/nvme_tcp.h:382
#6  0x0000555555654074 in nvmf_tcp_sock_process (tqpair=0x555555ed5e70) at tcp.c:2191
#7  0x0000555555656bd3 in nvmf_tcp_sock_cb (arg=0x555555ed5e70, group=0x555555cbbb60, sock=0x555555ed5090) at tcp.c:3056
#8  0x00005555556e60d2 in sock_group_impl_poll_count (group_impl=0x555555ec22a0, group=0x555555cbbb60, max_events=32) at sock.c:691
#9  0x00005555556e616c in spdk_sock_group_poll_count (group=0x555555cbbb60, max_events=32) at sock.c:717
#10 0x00005555556e5fcb in spdk_sock_group_poll (group=0x555555cbbb60) at sock.c:668
#11 0x0000555555657200 in nvmf_tcp_poll_group_poll (group=0x555555d3acf0) at tcp.c:3209
#12 0x000055555564cd40 in nvmf_transport_poll_group_poll (group=0x555555d3acf0) at transport.c:666
#13 0x0000555555641a6d in nvmf_poll_group_poll (ctx=0x555555d21550) at nvmf.c:71
#14 0x00005555556f00ba in thread_execute_poller (thread=0x555555d21160, poller=0x555555d21600) at thread.c:946
#15 0x00005555556f0640 in thread_poll (thread=0x555555d21160, max_msgs=0, now=4449678885613) at thread.c:1072
#16 0x00005555556f08ef in spdk_thread_poll (thread=0x555555d21160, max_msgs=0, now=4449678885613) at thread.c:1156
#17 0x00005555556b2542 in _reactor_run (reactor=0x555555cfb1c0) at reactor.c:903
#18 0x00005555556b2634 in reactor_run (arg=0x555555cfb1c0) at reactor.c:941
#19 0x00005555556b2a8f in spdk_reactors_start () at reactor.c:1053
#20 0x00005555556aee7a in spdk_app_start (opts_user=0x7fffffffe340, start_fn=0x555555570d7a <nvmf_tgt_started>, arg1=0x0) at app.c:771
#21 0x0000555555570ed4 in main (argc=3, argv=0x7fffffffe518) at nvmf_main.c:47

nvmf_tcp_sock_process

SPDKではnvmf_tcp_sock_processでPDUデータを処理する。

static int
nvmf_tcp_sock_process(struct spdk_nvmf_tcp_qpair *tqpair)
{
    int rc = 0;
    struct nvme_tcp_pdu *pdu;
    enum nvme_tcp_pdu_recv_state prev_state;
    uint32_t data_len;
    struct spdk_nvmf_tcp_transport *ttransport = SPDK_CONTAINEROF(tqpair->qpair.transport,
            struct spdk_nvmf_tcp_transport, transport);

    /* The loop here is to allow for several back-to-back state changes. */
    do {
        prev_state = tqpair->recv_state;
        SPDK_DEBUGLOG(nvmf_tcp, "tqpair(%p) recv pdu entering state %d\n", tqpair, prev_state);

        pdu = tqpair->pdu_in_progress;
        assert(pdu || tqpair->recv_state == NVME_TCP_PDU_RECV_STATE_AWAIT_PDU_READY);
        switch (tqpair->recv_state) {
        /* Wait for the common header  */
        case NVME_TCP_PDU_RECV_STATE_AWAIT_PDU_READY:
            if (!pdu) {
                pdu = SLIST_FIRST(&tqpair->tcp_pdu_free_queue);
                if (spdk_unlikely(!pdu)) {
                    return NVME_TCP_PDU_IN_PROGRESS;
                }
                SLIST_REMOVE_HEAD(&tqpair->tcp_pdu_free_queue, slist);
                tqpair->pdu_in_progress = pdu;
            }
            memset(pdu, 0, offsetof(struct nvme_tcp_pdu, qpair));
            nvmf_tcp_qpair_set_recv_state(tqpair, NVME_TCP_PDU_RECV_STATE_AWAIT_PDU_CH);
        /* FALLTHROUGH */ 
        case NVME_TCP_PDU_RECV_STATE_AWAIT_PDU_CH:
            if (spdk_unlikely(tqpair->state == NVME_TCP_QPAIR_STATE_INITIALIZING)) {
                return rc;
            }

            rc = nvme_tcp_read_data(tqpair->sock,
                        sizeof(struct spdk_nvme_tcp_common_pdu_hdr) - pdu->ch_valid_bytes,
                        (void *)&pdu->hdr.common + pdu->ch_valid_bytes);★CH(共通ヘッダ)の未受信データをpdu->hdr.commonにコピーする。
            if (rc < 0) {
                SPDK_DEBUGLOG(nvmf_tcp, "will disconnect tqpair=%p\n", tqpair);
                return NVME_TCP_PDU_FATAL;
            } else if (rc > 0) {
                pdu->ch_valid_bytes += rc;
                spdk_trace_record(TRACE_TCP_READ_FROM_SOCKET_DONE, tqpair->qpair.qid, rc, 0, tqpair);
            }

            if (pdu->ch_valid_bytes < sizeof(struct spdk_nvme_tcp_common_pdu_hdr)) {
                return NVME_TCP_PDU_IN_PROGRESS;
            }

            /* The command header of this PDU has now been read from the socket. */
            nvmf_tcp_pdu_ch_handle(tqpair);
            break;
        /* Wait for the pdu specific header  */
        case NVME_TCP_PDU_RECV_STATE_AWAIT_PDU_PSH:
            rc = nvme_tcp_read_data(tqpair->sock,
                        pdu->psh_len - pdu->psh_valid_bytes,
                        (void *)&pdu->hdr.raw + sizeof(struct spdk_nvme_tcp_common_pdu_hdr) + pdu->psh_valid_bytes);
            if (rc < 0) {
                return NVME_TCP_PDU_FATAL;
            } else if (rc > 0) {
                spdk_trace_record(TRACE_TCP_READ_FROM_SOCKET_DONE, tqpair->qpair.qid, rc, 0, tqpair);
                pdu->psh_valid_bytes += rc;
            }

            if (pdu->psh_valid_bytes < pdu->psh_len) {
                return NVME_TCP_PDU_IN_PROGRESS;
            }

            /* All header(ch, psh, head digist) of this PDU has now been read from the socket. */
            nvmf_tcp_pdu_psh_handle(tqpair, ttransport);
            break;
        /* Wait for the req slot */
        case NVME_TCP_PDU_RECV_STATE_AWAIT_REQ:
            nvmf_tcp_capsule_cmd_hdr_handle(ttransport, tqpair, pdu);
            break;
        case NVME_TCP_PDU_RECV_STATE_AWAIT_PDU_PAYLOAD:
            /* check whether the data is valid, if not we just return */
            if (!pdu->data_len) {
                return NVME_TCP_PDU_IN_PROGRESS;
            }

            data_len = pdu->data_len;
            /* data digest */
            if (spdk_unlikely((pdu->hdr.common.pdu_type != SPDK_NVME_TCP_PDU_TYPE_H2C_TERM_REQ) &&
                      tqpair->host_ddgst_enable)) {
                data_len += SPDK_NVME_TCP_DIGEST_LEN;
                pdu->ddgst_enable = true;
            }

            rc = nvme_tcp_read_payload_data(tqpair->sock, pdu);★payload受信。
            if (rc < 0) {
                return NVME_TCP_PDU_FATAL;
            }
            pdu->rw_offset += rc;

            if (pdu->rw_offset < data_len) {
                return NVME_TCP_PDU_IN_PROGRESS;
            }

            /* Generate and insert DIF to whole data block received if DIF is enabled */
            if (spdk_unlikely(pdu->dif_ctx != NULL) &&
                spdk_dif_generate_stream(pdu->data_iov, pdu->data_iovcnt, 0, data_len,
                             pdu->dif_ctx) != 0) {
                SPDK_ERRLOG("DIF generate failed\n");
                return NVME_TCP_PDU_FATAL;
            }

            /* All of this PDU has now been read from the socket. */
            nvmf_tcp_pdu_payload_handle(tqpair, pdu);
            break;
        case NVME_TCP_PDU_RECV_STATE_ERROR:
            if (!spdk_sock_is_connected(tqpair->sock)) {
                return NVME_TCP_PDU_FATAL;
            }
            break;
        default:
            SPDK_ERRLOG("The state(%d) is invalid\n", tqpair->recv_state);
            abort();
            break;
        }
    } while (tqpair->recv_state != prev_state);

    return rc;
}

nvme_tcp_read_payload_data

nvme_tcp_read_payload_dataは、受信PDU中のSGL構造体を読み取りIOVを作成する。
iovの実体はnvme_tcp_pdu_set_data_bufで設定された、tcp_reqのバッファ領域。

static int
nvme_tcp_read_payload_data(struct spdk_sock *sock, struct nvme_tcp_pdu *pdu)
{
    struct iovec iov[NVME_TCP_MAX_SGL_DESCRIPTORS + 1]; #NVME_TCP_MAX_SGL_DESCRIPTORSは16
    int iovcnt;

    iovcnt = nvme_tcp_build_payload_iovs(iov, NVME_TCP_MAX_SGL_DESCRIPTORS + 1, pdu,
                         pdu->ddgst_enable, NULL);★payload用のiov作成。
    assert(iovcnt >= 0);

    return nvme_tcp_readv_data(sock, iov, iovcnt);★ソケットからデータをiovにコピー。メモリコピーもする。
}
static int
nvme_tcp_build_payload_iovs(struct iovec *iov, int iovcnt, struct nvme_tcp_pdu *pdu,
                bool ddgst_enable, uint32_t *_mapped_length)
{
    struct spdk_iov_sgl sgl;

    if (iovcnt == 0) {
        return 0;
    }

    spdk_iov_sgl_init(&sgl, iov, iovcnt, pdu->rw_offset);

    if (spdk_likely(!pdu->dif_ctx)) {
        if (!_nvme_tcp_sgl_append_multi(&sgl, pdu->data_iov, pdu->data_iovcnt)) ★{PDUデータをSGLに加える。
            goto end;
        }
    } else {
        if (!_nvme_tcp_sgl_append_multi_with_md(&sgl, pdu->data_iov, pdu->data_iovcnt,
                            pdu->data_len, pdu->dif_ctx)) {
            goto end;
        }
    }

    /* Data Digest */
    if (ddgst_enable) {
        spdk_iov_sgl_append(&sgl, pdu->data_digest, SPDK_NVME_TCP_DIGEST_LEN);
    }

end:
    if (_mapped_length != NULL) {
        *_mapped_length = sgl.total_size;
    }
    return iovcnt - sgl.iovcnt;
}
static inline bool
_nvme_tcp_sgl_append_multi(struct spdk_iov_sgl *s, struct iovec *iov, int iovcnt)
{
    int i;

    for (i = 0; i < iovcnt; i++) {★PDU中のSGL構造体を読み取り、IOVの中身をspdk_iov_sglから参照させる。。
        if (!spdk_iov_sgl_append(s, iov[i].iov_base, iov[i].iov_len)) {
            return false;
        }
    }

    return true;
}
/** 
 * Append the data to the struct spdk_iov_sgl pointed by s
 *  
 * \param s the address of the struct spdk_iov_sgl
 * \param data the data buffer to be appended
 * \param data_len the length of the data.
 *
 * \return true if all the data is appended.
 */ 

static inline bool
spdk_iov_sgl_append(struct spdk_iov_sgl *s, uint8_t *data, uint32_t data_len)
{
    if (s->iov_offset >= data_len) {
        s->iov_offset -= data_len;
    } else {
        assert(s->iovcnt > 0);
        s->iov->iov_base = data + s->iov_offset;
        s->iov->iov_len = data_len - s->iov_offset;
        s->total_size += data_len - s->iov_offset;
        s->iov_offset = 0;
        s->iov++;
        s->iovcnt--;
        if (s->iovcnt == 0) {
            return false;
        }
    }

    return true;
}

nvme_tcp_readv_data

nvme_tcp_readv_dataはiovが1つの場合recv()を複数の場合readv()でソケットからデータ受信する。この際、カーネル空間の受信バッファからiovにコピーする。

static int
nvme_tcp_readv_data(struct spdk_sock *sock, struct iovec *iov, int iovcnt)
{
    int ret;

    assert(sock != NULL);
    if (iov == NULL || iovcnt == 0) {
        return 0;
    }

    if (iovcnt == 1) {
        return nvme_tcp_read_data(sock, iov->iov_len, iov->iov_base);
    }

    ret = spdk_sock_readv(sock, iov, iovcnt);

    if (ret > 0) {
        return ret;
    }

    if (ret < 0) {
        if (errno == EAGAIN || errno == EWOULDBLOCK) {
            return 0;
        }

        /* For connect reset issue, do not output error log */
        if (errno != ECONNRESET) {
            SPDK_ERRLOG("spdk_sock_readv() failed, errno %d: %s\n",
                    errno, spdk_strerror(errno));
        }
    }

    /* connection closed */
    return NVME_TCP_CONNECTION_FATAL;
}

struct nvme_tcp_pdu

SPDKが管理するバッファはstruct nvme_tcp_pduのdata_iovで管理する。

struct nvme_tcp_pdu {
    union {
        /* to hold error pdu data */
        uint8_t                 raw[SPDK_NVME_TCP_TERM_REQ_PDU_MAX_SIZE];
        struct spdk_nvme_tcp_common_pdu_hdr common;
        struct spdk_nvme_tcp_ic_req     ic_req;
        struct spdk_nvme_tcp_term_req_hdr   term_req;
        struct spdk_nvme_tcp_cmd        capsule_cmd;
        struct spdk_nvme_tcp_h2c_data_hdr   h2c_data;
        struct spdk_nvme_tcp_ic_resp        ic_resp;
        struct spdk_nvme_tcp_rsp        capsule_resp;
        struct spdk_nvme_tcp_c2h_data_hdr   c2h_data;
        struct spdk_nvme_tcp_r2t_hdr        r2t;

    } hdr;

    bool                        has_hdgst;
    bool                        ddgst_enable;
    uint32_t                    data_digest_crc32;
    uint8_t                     data_digest[SPDK_NVME_TCP_DIGEST_LEN];

    uint8_t                     ch_valid_bytes;
    uint8_t                     psh_valid_bytes;
    uint8_t                     psh_len;

    nvme_tcp_qpair_xfer_complete_cb         cb_fn;
    void                        *cb_arg;

    /* The sock request ends with a 0 length iovec. Place the actual iovec immediately
     * after it. There is a static assert below to check if the compiler inserted
     * any unwanted padding */
    struct spdk_sock_request            sock_req;
    struct iovec                    iov[NVME_TCP_MAX_SGL_DESCRIPTORS * 2];


    struct iovec                    data_iov[NVME_TCP_MAX_SGL_DESCRIPTORS];
    uint32_t                    data_iovcnt;
    uint32_t                    data_len;

    uint32_t                    rw_offset;
    TAILQ_ENTRY(nvme_tcp_pdu)           tailq;
    uint32_t                    remaining;
    uint32_t                    padding_len;

    struct spdk_dif_ctx             *dif_ctx;

    void                        *req; /* data tied to a tcp request */
    void                        *qpair;
    SLIST_ENTRY(nvme_tcp_pdu)           slist;
};
SPDK_STATIC_ASSERT(offsetof(struct nvme_tcp_pdu,
                sock_req) + sizeof(struct spdk_sock_request) == offsetof(struct nvme_tcp_pdu, iov),
           "Compiler inserted padding between iov and sock_req");

nvme_tcp_pdu_set_data_buf

struct nvme_tcp_pduのdata_iovはnvme_tcp_pdu_set_data_bufで設定する。nvmf_tcp_h2c_data_hdr_handleは、IOキュー初期化時に確保したtcp_req->bufをdata_iovに設定する。tcp_req->bufの実体はspdk_zmallocで確保したDPDKのHugepage領域である(DMA領域

)。

static void
nvmf_tcp_h2c_data_hdr_handle(struct spdk_nvmf_tcp_transport *ttransport,
                 struct spdk_nvmf_tcp_qpair *tqpair,
                 struct nvme_tcp_pdu *pdu)
{
    struct spdk_nvmf_tcp_req *tcp_req;
    uint32_t error_offset = 0;
    enum spdk_nvme_tcp_term_req_fes fes = 0;
    struct spdk_nvme_tcp_h2c_data_hdr *h2c_data;

    h2c_data = &pdu->hdr.h2c_data;

    SPDK_DEBUGLOG(nvmf_tcp, "tqpair=%p, r2t_info: datao=%u, datal=%u, cccid=%u, ttag=%u\n",
              tqpair, h2c_data->datao, h2c_data->datal, h2c_data->cccid, h2c_data->ttag);

    if (h2c_data->ttag > tqpair->resource_count) {
        SPDK_DEBUGLOG(nvmf_tcp, "ttag %u is larger than allowed %u.\n", h2c_data->ttag,
                  tqpair->resource_count);
        fes = SPDK_NVME_TCP_TERM_REQ_FES_PDU_SEQUENCE_ERROR;
        error_offset = offsetof(struct spdk_nvme_tcp_h2c_data_hdr, ttag);
        goto err;
    }

    tcp_req = &tqpair->reqs[h2c_data->ttag - 1];

    if (spdk_unlikely(tcp_req->state != TCP_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER &&
              tcp_req->state != TCP_REQUEST_STATE_AWAITING_R2T_ACK)) {
        SPDK_DEBUGLOG(nvmf_tcp, "tcp_req(%p), tqpair=%p, has error state in %d\n", tcp_req, tqpair,
                  tcp_req->state);
        fes = SPDK_NVME_TCP_TERM_REQ_FES_INVALID_HEADER_FIELD;
        error_offset = offsetof(struct spdk_nvme_tcp_h2c_data_hdr, ttag);
        goto err;
    }

    if (spdk_unlikely(tcp_req->req.cmd->nvme_cmd.cid != h2c_data->cccid)) {
        SPDK_DEBUGLOG(nvmf_tcp, "tcp_req(%p), tqpair=%p, expected %u but %u for cccid.\n", tcp_req, tqpair,
                  tcp_req->req.cmd->nvme_cmd.cid, h2c_data->cccid);
        fes = SPDK_NVME_TCP_TERM_REQ_FES_PDU_SEQUENCE_ERROR;
        error_offset = offsetof(struct spdk_nvme_tcp_h2c_data_hdr, cccid);
        goto err;
    }

    if (tcp_req->h2c_offset != h2c_data->datao) {
        SPDK_DEBUGLOG(nvmf_tcp,
                  "tcp_req(%p), tqpair=%p, expected data offset %u, but data offset is %u\n",
                  tcp_req, tqpair, tcp_req->h2c_offset, h2c_data->datao);
        fes = SPDK_NVME_TCP_TERM_REQ_FES_DATA_TRANSFER_OUT_OF_RANGE;
        goto err;
    }

    if ((h2c_data->datao + h2c_data->datal) > tcp_req->req.length) {
        SPDK_DEBUGLOG(nvmf_tcp,
                  "tcp_req(%p), tqpair=%p,  (datao=%u + datal=%u) exceeds requested length=%u\n",
                  tcp_req, tqpair, h2c_data->datao, h2c_data->datal, tcp_req->req.length);
        fes = SPDK_NVME_TCP_TERM_REQ_FES_DATA_TRANSFER_OUT_OF_RANGE;
        goto err;
    }

    pdu->req = tcp_req;

    if (spdk_unlikely(tcp_req->req.dif_enabled)) {
        pdu->dif_ctx = &tcp_req->req.dif.dif_ctx;
    }

    nvme_tcp_pdu_set_data_buf(pdu, tcp_req->req.iov, tcp_req->req.iovcnt,
                  h2c_data->datao, h2c_data->datal);
    nvmf_tcp_qpair_set_recv_state(tqpair, NVME_TCP_PDU_RECV_STATE_AWAIT_PDU_PAYLOAD);
    return;

err:
    nvmf_tcp_send_c2h_term_req(tqpair, pdu, fes, error_offset);
}

nvmf_tcp_req_parse_sgl

SPDKはデータ受信が必要なコマンド処理では、TCP_REQUEST_STATE_NEED_BUFFER状態に入り、nvmf_tcp_req_parse_sglを呼び出す。nvmf_tcp_req_parse_sglは非In-capsuleデータの場合、PDU受信時にspdk_nvmf_request_get_buffersを呼出しプールからデータ領域の確保を、In-capsuleデータの場合IOキュー接続時に確保したtcp_req->bufをstruct nvme_tcp_pduのdata_iovに設定する。

static int
nvmf_tcp_req_parse_sgl(struct spdk_nvmf_tcp_req *tcp_req,
               struct spdk_nvmf_transport *transport,
               struct spdk_nvmf_transport_poll_group *group)
{
    struct spdk_nvmf_request        *req = &tcp_req->req;
    struct spdk_nvme_cmd            *cmd;
    struct spdk_nvme_sgl_descriptor     *sgl;
    struct spdk_nvmf_tcp_poll_group     *tgroup;
    enum spdk_nvme_tcp_term_req_fes     fes;
    struct nvme_tcp_pdu         *pdu;
    struct spdk_nvmf_tcp_qpair      *tqpair;
    uint32_t                length, error_offset = 0;

    cmd = &req->cmd->nvme_cmd;
    sgl = &cmd->dptr.sgl1;

    if (sgl->generic.type == SPDK_NVME_SGL_TYPE_TRANSPORT_DATA_BLOCK &&
        sgl->unkeyed.subtype == SPDK_NVME_SGL_SUBTYPE_TRANSPORT) {★In-capsuleデータ。
        /* get request length from sgl */
        length = sgl->unkeyed.length;
        if (spdk_unlikely(length > transport->opts.max_io_size)) {
            SPDK_ERRLOG("SGL length 0x%x exceeds max io size 0x%x\n",
                    length, transport->opts.max_io_size);
            fes = SPDK_NVME_TCP_TERM_REQ_FES_DATA_TRANSFER_LIMIT_EXCEEDED;
            goto fatal_err;
        }

        /* fill request length and populate iovs */
        req->length = length;

        SPDK_DEBUGLOG(nvmf_tcp, "Data requested length= 0x%x\n", length);

        if (spdk_unlikely(req->dif_enabled)) {
            req->dif.orig_length = length;
            length = spdk_dif_get_length_with_md(length, &req->dif.dif_ctx);
            req->dif.elba_length = length;
        }

        if (nvmf_ctrlr_use_zcopy(req)) {
            SPDK_DEBUGLOG(nvmf_tcp, "Using zero-copy to execute request %p\n", tcp_req);
            req->data_from_pool = false;
            return 0;
        }

        if (spdk_nvmf_request_get_buffers(req, group, transport, length)) {
            /* No available buffers. Queue this request up. */
            SPDK_DEBUGLOG(nvmf_tcp, "No available large data buffers. Queueing request %p\n",
                      tcp_req);
            return 0;
        }

        /* backward compatible */
        req->data = req->iov[0].iov_base;

        SPDK_DEBUGLOG(nvmf_tcp, "Request %p took %d buffer/s from central pool, and data=%p\n",
                  tcp_req, req->iovcnt, req->iov[0].iov_base);

        return 0;
    } else if (sgl->generic.type == SPDK_NVME_SGL_TYPE_DATA_BLOCK &&
           sgl->unkeyed.subtype == SPDK_NVME_SGL_SUBTYPE_OFFSET) {
        uint64_t offset = sgl->address;
        uint32_t max_len = transport->opts.in_capsule_data_size;

        assert(tcp_req->has_in_capsule_data);
        /* Capsule Cmd with In-capsule Data should get data length from pdu header */
        tqpair = tcp_req->pdu->qpair;
        /* receiving pdu is not same with the pdu in tcp_req */
        pdu = tqpair->pdu_in_progress;
        length = pdu->hdr.common.plen - pdu->psh_len - sizeof(struct spdk_nvme_tcp_common_pdu_hdr);
        if (tqpair->host_ddgst_enable) {
            length -= SPDK_NVME_TCP_DIGEST_LEN;
        }
        /* This error is not defined in NVMe/TCP spec, take this error as fatal error */
        if (spdk_unlikely(length != sgl->unkeyed.length)) {
            SPDK_ERRLOG("In-Capsule Data length 0x%x is not equal to SGL data length 0x%x\n",
                    length, sgl->unkeyed.length);
            fes = SPDK_NVME_TCP_TERM_REQ_FES_INVALID_HEADER_FIELD;
            error_offset = offsetof(struct spdk_nvme_tcp_common_pdu_hdr, plen);
            goto fatal_err;
        }

        SPDK_DEBUGLOG(nvmf_tcp, "In-capsule data: offset 0x%" PRIx64 ", length 0x%x\n",
                  offset, length);

        /* The NVMe/TCP transport does not use ICDOFF to control the in-capsule data offset. ICDOFF should be '0' */
        if (spdk_unlikely(offset != 0)) {
            /* Not defined fatal error in NVMe/TCP spec, handle this error as a fatal error */
            SPDK_ERRLOG("In-capsule offset 0x%" PRIx64 " should be ZERO in NVMe/TCP\n", offset);
            fes = SPDK_NVME_TCP_TERM_REQ_FES_INVALID_DATA_UNSUPPORTED_PARAMETER;
            error_offset = offsetof(struct spdk_nvme_tcp_cmd, ccsqe.dptr.sgl1.address);
            goto fatal_err;
        }

        if (spdk_unlikely(length > max_len)) {
            /* According to the SPEC we should support ICD up to 8192 bytes for admin and fabric commands */
            if (length <= SPDK_NVME_TCP_IN_CAPSULE_DATA_MAX_SIZE &&
                (cmd->opc == SPDK_NVME_OPC_FABRIC || req->qpair->qid == 0)) {

                /* Get a buffer from dedicated list */
                SPDK_DEBUGLOG(nvmf_tcp, "Getting a buffer from control msg list\n");
                tgroup = SPDK_CONTAINEROF(group, struct spdk_nvmf_tcp_poll_group, group);
                assert(tgroup->control_msg_list);
                req->iov[0].iov_base = nvmf_tcp_control_msg_get(tgroup->control_msg_list);
                if (!req->iov[0].iov_base) {
                    /* No available buffers. Queue this request up. */
                    SPDK_DEBUGLOG(nvmf_tcp, "No available ICD buffers. Queueing request %p\n", tcp_req);
                    return 0;
                }
            } else {
                SPDK_ERRLOG("In-capsule data length 0x%x exceeds capsule length 0x%x\n",
                        length, max_len);
                fes = SPDK_NVME_TCP_TERM_REQ_FES_DATA_TRANSFER_LIMIT_EXCEEDED;
                goto fatal_err;
            }
        } else {
            req->iov[0].iov_base = tcp_req->buf;
        }

        req->length = length;
        req->data_from_pool = false;
        req->data = req->iov[0].iov_base;

        if (spdk_unlikely(req->dif_enabled)) {
            length = spdk_dif_get_length_with_md(length, &req->dif.dif_ctx);
            req->dif.elba_length = length;
        }

        req->iov[0].iov_len = length;
        req->iovcnt = 1;

        return 0;
    }
    /* If we want to handle the problem here, then we can't skip the following data segment.
     * Because this function runs before reading data part, now handle all errors as fatal errors. */
    SPDK_ERRLOG("Invalid NVMf I/O Command SGL:  Type 0x%x, Subtype 0x%x\n",
            sgl->generic.type, sgl->generic.subtype);
    fes = SPDK_NVME_TCP_TERM_REQ_FES_INVALID_DATA_UNSUPPORTED_PARAMETER;
    error_offset = offsetof(struct spdk_nvme_tcp_cmd, ccsqe.dptr.sgl1.generic);
fatal_err:
    nvmf_tcp_send_c2h_term_req(tcp_req->pdu->qpair, tcp_req->pdu, fes, error_offset);
    return -1;
}

参考文献

ソケットFSの説明:hiboma/sockfs.md at master · hiboma/hiboma · GitHub

sk_buff – Linuxカーネルメモ (bit-hive.com)

TCPlinux.pdf (sn0rt.github.io)

internal22-282-受信処理アルゴリズム – Linux Kernel Documents Wiki – Linux Kernel Documents – OSDN

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です